RocketMQ——总结(一)
2022/5/4 23:15:42
本文主要是介绍RocketMQ——总结(一),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
官网:https://rocketmq.apache.org/ 源码地址:https://github.com/apache/rocketmq一、RocketMQ的优点
1、天然支持集群模式、负载均衡、水平扩展能力 2、上亿级别的消息堆积能力 3、采用零拷贝的原理、顺序写盘、随机读(借鉴kafka) 4、丰富的API使用,支持顺序消息,事务消息,rabbitmq不支持 5、代码优秀,底层通信通过Netty NIO框架 6、NameServer代替Zookeeper(2.x时还是zk) 7、强调集群无单点,可扩展,任意一点高可用,水平扩展 8、消息失败重试机制(rabbit没有)、消息可查询 9、开源社区活跃、成熟度高(经过双十一的考验) 10,支持类sql表达式过滤,可能借鉴activemq二、概念
Producer: 消息生产者,负责产生消息,一般由业务系统负责产生消息。 Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。 Push Consumer: Consumer的一 种,需要向Consumer对象注册监听。 Pull Consumer: Consumer的一种,需要主动请求Broker拉取消息。 Producer Group:生产者集合, 一般用于发送一类消息,可用于事务消息 Consumer Group:消费者集合,一般用于接受一类消息进行消费 Broker : MQ消息服务(中转角色,用于消息存储与生产消费转发) 其他核心概念参见:https://rocketmq.apache.org/docs/core-concept/ 说明:推拉两种消费模式,类似rabbitmq,但推送实现机制不同,这里的推模式实际是基于长轮询三、集群架构
整体集群架构常见部署模式: 1.单点模式 2.主从模式 3.双主模式 4.双主双从模式、多主多从模式
四、生产者
使用说明: 1.创建生产者对象DefaultMQProducer 2.设置NamesrvAddr 3.启动生产者服务 4.创建消息并发送 5.关闭生产者 其他说明: topic是某一类主题, tag是标签用来过滤,key一般是用来作为用户自定义的Key,唯一标识 topic对消息队列是一对多的关系,默认队列有4个,没有主题的可自动创建 rocketmq的异常区分较细,异常区分细说明rocketmq的源码较为细致 示例代码 发送消息的示例代码:org.apache.rocketmq.example.quickstart.Producerpublic class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /* * Instantiate with a producer group name. */ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ /* * Launch the instance. */ producer.start(); for (int i = 0; i < 1000; i++) { try { /* * Create a message instance, specifying topic, tag and message body. */ Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); /* * Call send message to deliver message to one of brokers. */ SendResult sendResult = producer.send(msg); /* * There are different ways to send message, if you don't care about the send result,you can use this way * {@code * producer.sendOneway(msg); * } */ /* * if you want to get the send result in a synchronize way, you can use this send method * {@code * SendResult sendResult = producer.send(msg); * System.out.printf("%s%n", sendResult); * } */ /* * if you want to get the send result in a asynchronize way, you can use this send method * {@code * * producer.send(msg, new SendCallback() { * @Override * public void onSuccess(SendResult sendResult) { * // do something * } * * @Override * public void onException(Throwable e) { * // do something * } *}); * *} */ System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /* * Shut down once the producer instance is not longer in use. */ producer.shutdown(); } }
五、消费者
使用说明: 1.创建消费者对象DefaultMQPushConsumer 2.设置NamesrvAddr及其消费位置ConsumeFromWhere 3.进行订阅主题subscribe 4.注册监听并消费registerMessageListener 其他说明: rocketmq的订阅模式不是服务端真的推,而是客户端长轮询的机制 消费成功返回:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 消费失败了可以自动重试,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER 生产环境下重试多次失败后,需要自己做补偿和记录日志等 重试策略:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9r 10m 20m 30m 1h 2h 示例代码 消费者的示例代码:org.apache.rocketmq.example.quickstart.Consumerpublic class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { /* * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); /* * Specify name server addresses. * <p/> * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ /* * Specify where to start in case the specific consumer group is a brand-new one. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /* * Subscribe one more topic to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start(); System.out.printf("Consumer Started.%n"); } }
这篇关于RocketMQ——总结(一)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-04-26高性能表格工具VTable总体构成-icode9专业技术文章分享
- 2024-04-16软路由代理问题, tg 无法代理问题-icode9专业技术文章分享
- 2024-04-16程序猿用什么锅-icode9专业技术文章分享
- 2024-04-16自建 NAS 的方案-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数, 加上remote_src: yes 配置-icode9专业技术文章分享
- 2024-04-14ansible 检测远程主机的8080端口,如果关闭,则echo 进程已关闭-icode9专业技术文章分享
- 2024-04-14result 成功怎么写-icode9专业技术文章分享
- 2024-04-14stopped 状态设置为变量,由外部传递进来-icode9专业技术文章分享
- 2024-04-14为什么ansible执行远程脚本需要放到后台-icode9专业技术文章分享