2021-04-13-Java如何操作RabbitMQ
2021/4/13 12:27:56
本文主要是介绍2021-04-13-Java如何操作RabbitMQ,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
文章目录
- Java操作RabbitMQ
- 简单模式
- Work模式
- SpringBoot操作RabbitMQ
Java操作RabbitMQ
- 依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency>
简单模式
- 消息提供者
// 1.连接RamabbitMQ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.189.137"); // 连接地址 factory.setPort(5672); // 设置端口号 factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); // 配置虚拟主机(每个团队用自己的消息队列) Connection connection = factory.newConnection();// 创建连接 // 2.通过连接对象获取channel对象(队列,绑定,消息都是通过这个对象操作) Channel channel = connection.createChannel(); // 3.通过channel创建一个队列(1:队列名称[如果队列不存在就创建]) channel.queueDeclare("myqueue",false,false,false,null); // 4.发送消息到指定的队列(1:交换机,2:队列的名称,3:其他属性,4:消息体) String msg = "hello 中文 2"; channel.basicPublish("","myqueue",null,msg.getBytes("utf-8")); // 5.断开连接 connection.close();
- 消息消费者
工具类提取
public class ConnectionUtils { private static ConnectionFactory factory =null; static { // 1.连接RamabbitMQ factory = new ConnectionFactory(); factory.setHost("192.168.189.137"); // 连接地址 factory.setPort(5672); // 设置端口号 factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); // 配置虚拟主机(每个团队用自己的消息队列) } public static Connection getConnection(){ Connection connection = null;// 创建连接 try { connection = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return connection; } }
消费者
// 1.获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2.获取Channel Channel channel = connection.createChannel(); // 3.通过channel创建一个队列(1:队列名称) channel.queueDeclare("myqueue",false,false,false,null); // 3.设置监听的队列(1:设置监听的队列,2:,3:消费者的回调方法 channel.basicConsume("myqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者:"+new String(body,"utf-8")); } }); // 消费者不能关闭连接
- 注意:
1.如果在队列没有的创建的情况下,消费者先启动,提供者后启动这时消费者会报错,并且线程也会被阻塞主,而且还获取不到提供者发送的消息。所以在开发中遇到这样的启动顺序的问题,在消息提供者和消息消费者都进行队列的创建。
2.消费者消费消息是一个同步的过程。开发的过程中一般都会使用一个线程池并发的消费消息。 - 线程池并发消费消息
// 创建一个线程池 private static ExecutorService executorService = Executors.newFixedThreadPool(10); public static void main(String[] args) throws IOException { // 1.获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2.获取Channel Channel channel = connection.createChannel(); // 3.通过channel创建一个队列(1:队列名称) channel.queueDeclare("myqueue",false,false,false,null); // 3.设置监听的队列(1:设置监听的队列,2:true为自动回复模式,3:消费者的回调方法) channel.basicConsume("myqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { executorService.submit(new Runnable() { @Override public void run() { try { System.out.println("消费者:"+new String(body,"utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }); }
Work模式
work模式和简单模式类似,区别在于它有多个消费者交替获取队列中的内容。实现起来也比简单,以上面的代码为例,重新拷贝一个消费者变成两个消费者即可。
- Fonout模式(广播类型)
- 消息提供者
// 1.得到连接对象 Connection connection = ConnectionUtils.getConnection(); // 2.获取channel对象 Channel channel = connection.createChannel(); // 3.创建交换机(fanout:广播类型) channel.exchangeDeclare("myexchange1","fanout"); // 4.发送消息给交换机(1:交换机名称,2:队列名称,3:其他属性,4:消息内容) String msg ="hello RebbitMQ"; channel.basicPublish("myexchange","",null,msg.getBytes("utf-8")); connection.close();
- 消息消费者1
Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 创建一个队列 channel.queueDeclare("queue1",false,false,false,null); // 把队列和路交换机绑定(1:队列名称,2:交换机名称,3:路由键) channel.queueBind("queue1","myexchange1",""); channel.basicConsume("queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumer1:"+new String(body,"utf-8")); } });
- 消息消费者2
Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 创建一个队列 channel.queueDeclare("queue2",false,false,false,null); // 把队列和路交换机绑定(1:队列名称,2:交换机名称,3:路由键) channel.queueBind("queue2","myexchange1",""); // 设置监听的队列 channel.basicConsume("queue2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumer2:"+new String(body,"utf-8")); } });
SpringBoot操作RabbitMQ
- 之前在做商品添加的时候除了做把商品添加到数据库之外,还要添加商品信息添加到索引库,还要生成静态页面,这些都是通过同步来完成的效率比较低。现在我们学习完消息队列后可以进行改造了,利用消息队列异步来完成上面的操作从而来提高效率。
商品添加完之后可以通过异步的方式给MQ中添加一条消息,然后索引和静态页面消费MQ中的消息即可。 - 添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
- 配置属性文件
spring.rabbitmq.host=192.168.189.137 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
- 提供者
- 创建队列和交换机
在创建商品的时候要通知索引和item两个工程,所以我们要用广播模式。创建完商品给交换机发送一条消息,然后交换机把消息广播给两个工程。所以我们需要创建两个队列和一个广播。
import org.springframework.amqp.core.*; @Configuration public class RabbitMQConfig { // 创建一个搜索的队列 @Bean public Queue getSeachQueue() { return new Queue("search_queue"); } // 创建一个静态页面的广播 @Bean public Queue getItemQueue(){ return new Queue("item_queue"); } // 创建一个交换机 @Bean public FanoutExchange getFanoutExchange(){ return new FanoutExchange("goods_exchange"); } // 把搜索队列绑定到交换机 @Bean public Binding getBinDing1(){ return BindingBuilder.bind(getSeachQueue()).to(getFanoutExchange()); } // 把详情页面绑定到交换机 @Bean public Binding getBinDing2(){ return BindingBuilder.bind(getItemQueue()).to(getFanoutExchange()); } }
- 发送消息给交换机
@Autowired private RabbitTemplate rabbitTemplate; @Override public int add(Goods goods) { int i = goodsMapper.insertSelective(goods); System.out.println("GoodsServiceImpl.add:"+goods.getId()); // 2.同步到索引库 // searchService.addGoods(goods); // 3.创建静态模板 // HttpUtils.sendRequset("http://localhost:8083/createHtml?gID="+goods.getId()); // 发送消息给交换机(1:交换机的名称,2:路由键,3:发送的对象[实现序列化接口]) rabbitTemplate.convertAndSend("goods_exchange","",goods); return i; }
- 消费者
添加依赖和配置属性文件和之前都一样 - 同步到索引库
@Component public class MyRabbitMQListener { @Autowired private ISearchService searchService; @RabbitListener(queues = "search_queue") public void goodsSysncToSolr(Goods goods){ searchService.addGoods(goods); } }
- 生成静态页面
@Component public class MyRabbitListener { @Reference private IGoodsService goodsService; @Autowired private Configuration configuration; @RabbitListener(queues = "item_queue") public void createItemPage(Goods goodsParam) throws Exception { //生成静态页面的代码 } catch (Exception e) { e.printStackTrace(); } } }
这篇关于2021-04-13-Java如何操作RabbitMQ的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-13TiDB + ES:转转业财系统亿级数据存储优化实践
- 2024-05-09“2024鸿蒙零基础快速实战-仿抖音App开发(ArkTS版)”实战课程已上线
- 2024-05-09聊聊如何通过arthas-tunnel-server来远程管理所有需要arthas监控的应用
- 2024-05-09log4j2这么配就对了
- 2024-05-09nginx修改Content-Type
- 2024-05-09Redis多数据源,看这篇就够了
- 2024-05-09Google Chrome驱动程序 124.0.6367.62(正式版本)去哪下载?
- 2024-05-09有没有大佬知道这种数据应该怎么抓取呀?
- 2024-05-09这种运行结果里的10.100000001,怎么能最快改成10.1?
- 2024-05-09企业src漏洞挖掘-有意思的命令执行