6生产者


接下来我们用代码来实现一个简单的生产者和消费者的案例,首先创建一个springBoot工程,没有接触过springBoot的同学就创建一个maven工程也可以,然后在pom.xml中导入rabbitmq的依赖

1
2
3
4
5
6
<!--rabbitmq依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
接着我们来编写生产者类,代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class Procuder {
public static void main(String[] args)throws Exception {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.3.5");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/test");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");

//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个Channel
Channel channel = connection.createChannel();

String queueName = "test001";
//4.发送
channel.basicPublish("",queueName,null,"哈喽啊".getBytes());
//5.关闭连接
channel.close();
connection.close();
}
}


消费者
public class Consumer {
public static void main(String[] args)throws Exception {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.3.5");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/test");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
//2.通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个Channel
Channel channel = connection.createChannel();
/**
* 4.声明一个队列
* 参数1 队列的名字
* 参数2 是否持久化队列,我们的队列模式是在内存中的,如果rabbitmq重启会丢失,如果我们设置为true则会保存
* 到erlang自带的数据库中,重启后会读取
* 参数3 是否排外,有两个作用,第一个当我们的连接关闭后是否会自动删除队列,第二个,是否私有当前队列
* 如果私有了其他通道不可以访问当前队列,如果为true,一般是一个队列只适用于一个消费者的时候
* 参数4 是否自动删除
* 参数5 一些其他的参数
*/
String queueName = "test001";
channel.queueDeclare(queueName,false,false,false,null);
//5.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName,true,queueingConsumer);
//6.获取消息,如果没有消息会等待,有就获取,可以指定等待时间
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("接收到"+msg);
//7.关闭
channel.close();
connection.close();
}

 评论


博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

本站使用 Material X 作为主题 , 总访问量为 次 。
载入天数...载入时分秒...