6生产者
接下来我们用代码来实现一个简单的生产者和消费者的案例,首先创建一个springBoot工程,没有接触过springBoot的同学就创建一个maven工程也可以,然后在pom.xml中导入rabbitmq的依赖
1 2 3 4 5 6
| <!--rabbitmq依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
|
接着我们来编写生产者类,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class Procuder { public static void main(String[] args)throws Exception { //1.创建一个ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.3.5"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456");
//2.通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3.通过connection创建一个Channel Channel channel = connection.createChannel();
String queueName = "test001"; //4.发送 channel.basicPublish("",queueName,null,"哈喽啊".getBytes()); //5.关闭连接 channel.close(); connection.close(); } }
消费者 public class Consumer { public static void main(String[] args)throws Exception { //1.创建一个ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.3.5"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); //2.通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3.通过connection创建一个Channel Channel channel = connection.createChannel(); /** * 4.声明一个队列 * 参数1 队列的名字 * 参数2 是否持久化队列,我们的队列模式是在内存中的,如果rabbitmq重启会丢失,如果我们设置为true则会保存 * 到erlang自带的数据库中,重启后会读取 * 参数3 是否排外,有两个作用,第一个当我们的连接关闭后是否会自动删除队列,第二个,是否私有当前队列 * 如果私有了其他通道不可以访问当前队列,如果为true,一般是一个队列只适用于一个消费者的时候 * 参数4 是否自动删除 * 参数5 一些其他的参数 */ String queueName = "test001"; channel.queueDeclare(queueName,false,false,false,null); //5.创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName,true,queueingConsumer); //6.获取消息,如果没有消息会等待,有就获取,可以指定等待时间 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("接收到"+msg); //7.关闭 channel.close(); connection.close(); }
|