RabbitMQ 简介与使用

2021/7/27 6:05:53

本文主要是介绍RabbitMQ 简介与使用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1. 什么是消息队列

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

2. 为什么要使用消息队列

消息队列是一种应用间的异步协作机制,常用于以下场景:

2.1 异步

可以将一些非核心流程,如日志,短信,邮件等,通过MQ的方式异步去处理。这样做的好处是缩短主流程的响应时间,提升用户体验。

2.2 解耦

按照不同的功能, 把一个服务, 拆分成多个系统。比如下订单的过程:扣减库存、生成相应单据、发红包、发短信通知等。每个不同的过程互不影响,通过 MQ 来实现任务的推送和消费。

2.3 削峰

当一个活动瞬时流量过大,对于不需要及时反馈,并且服务处理比较耗时可能会造成服务响应的缓慢甚至奔溃时,可以将对应的需求先放入队列中,服务顺序去处理对应的请求,后续通知用户结果即可。

异步,解耦,消峰,MQ的三大主要应用场景。

3. RabbitMQ 的特点

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性(Reliability)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  2. 灵活的路由(Flexible Routing)
    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  3. 消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  4. 高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  5. 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

  6. 多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  8. 跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  9. 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

4. RabbitMQ 中的模型概念

4.1 消息模型

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
消息模型

4.2 基本概念

RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:

  1. Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  2. Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  3. Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  4. Connection
    网络连接,比如一个 TCP 连接。

  5. Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  6. Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  7. Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  8. Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  9. Virtual Host
    虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
    表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  10. Broker
    表示消息队列服务器实体。

4.3 Exchange 类型

Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多。

  • 直连交换机:Direct exchange
  • 扇形交换机:Fanout exchange
  • 主题交换机:Topic exchange
  • 首部交换机:Headers exchange

创建交换机

Exchange.DeclareOk exchangeDeclare(String exchange,
    BuiltinExchangeType type,
    boolean durable,
    boolean autoDelete,
    boolean internal,
    Map<String, Object> arguments) throws IOException;

交换机具有以下属性:

  • exchange – 名称
  • type – 类型
  • durable – 如果我们声明持久交换,则为真(交换将在服务器重启后继续存在)
  • autoDelete – 如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
  • internal – 如果交换是内部的,则为 true,即不能由客户端直接发布。
  • arguments- 扩展参数

4.3.1 Direct 直连交换机

直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个 routing_key,当消息被发送的时候,需要指定一个 binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个 binding_key 也是支持应用到多个队列中的。

这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。
直连交换机

4.3.2 Fanout 扇形交换机

扇形交换机是最基本的交换机类型,它所能做的事情非常简单 ——— 广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要 “思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
扇形交换机

4.3.3 Topic 主题交换机

主题交换机的 routing_key 需要有一定的规则,交换机和队列的 binding_key 需要采用 .#...... 的格式,每个部分用. 分开,其中:

* 表示一个单词
#表示任意数量(零个或多个)单词。

假设有一条消息的 routing_key 为 fast.rabbit.white, 那么带有这样 binding_key 的几个队列都会接收这条消息:

fast..
..white
fast.#
……
主题交换机
当一个队列的绑定键为#的时候,这个队列将会无视消息的路由键,接收所有的消息。

4.3.4 Headers 首部交换机

首部交换机是忽略 routing_key 的一种路由方式。路由器和交换机路由的规则是通过 Headers 信息来交换的,这个有点像 HTTPHeaders。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个 Hash 的数据结构,消息发送的时候,会携带一组 hash 数据结构的信息,当 Hash 的内容匹配上的时候,消息就会被写入队列。

绑定交换机和队列的时候,Hash 结构中要求携带一个键 “x-match”,这个键的 Value 可以是 any 或者 all,这代表消息携带的 Hash 是需要全部匹配 (all),还是仅匹配一个键 (any) 就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串 (string)。

匹配规则 x-match 有下列两种类型:

  • x-match = all :表示所有的键值对都匹配才能接受到消息
  • x-match = any :表示只要有键值对匹配就能接受到消息

4.3.4 headers 直连交换机

Headers Exchange 不同于上面三种 Exchange,它是根据 Message 的一些头部信息来分发过滤 Message,忽略routing key 的属性,如果 Header 信息和 message 消息的头信息相匹配,那么这条消息就匹配上了。

5. RabbitMQ 服务搭建

6. RabbitMQ 使用介绍

6.1 流程介绍

pom.xml

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
</dependency>

Producer

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * rabbitmq 生产者
 */
public class Producer {
    public static void main(String[] args) throws Exception {

        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("user");
        factory.setPassword("user");

        //也可以使用url来配置
        //factory.setUri("amqp://guest:guest@localhost:5672");

        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();

        //4. 发送消息
        String exchangeName = "exchangeName";
        String routingKey1 = "cat";
        String routingKey2 = "dong";
        for (int i = 0; i < 100; i++) {

            String msg = "this is msg";

            if (i % 2 == 0) {
                msg = "cat:" + msg;
                channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
            } else {
                msg = "dong:" + msg;
                channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
            }

            System.out.println("发送了消息:" + msg);
            Thread.sleep(1000);
        }

        //5. 关闭连接
        channel.close();
        connection.close();
    }
}

Consumer

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * rabbitmq 消费者
 */
public class Consumer {
    public static void main(String[] args) throws Exception {

        //1. 创建一个 ConnectionFactory 并进行设置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("user");
        factory.setPassword("user");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2. 通过连接工厂来创建连接
        Connection connection = factory.newConnection();

        //3. 通过 Connection 来创建 Channel
        Channel channel = connection.createChannel();

        //4. 创建交换机
        String exchangeName = "exchangeName";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, false, true, null);

        // 5.创建队列
        String queueName1 = "queueCat";
        String queueName2 = "queueDong";
        channel.queueDeclare(queueName1, false, false, true, null);
        channel.queueDeclare(queueName2, false, false, true, null);

        //5. 通过不同的key 绑定队列
        String routingKey1 = "cat";
        String routingKey2 = "dong";
        channel.queueBind(queueName1, exchangeName, routingKey1);
        channel.queueBind(queueName2, exchangeName, routingKey2);

        // 消费者处理
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String format = String.format("[%s]-收到消息:【%s】", envelope.getRoutingKey(), new String(body, StandardCharsets.UTF_8));
                System.out.println(format);

                //消息消费确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 6.绑定消费者进行消费
        while (true) {
            channel.basicConsume(queueName1, false, defaultConsumer);
            channel.basicConsume(queueName2, false, defaultConsumer);
            Thread.sleep(1000);
        }
    }
}

先启动 消费者 创建交换机、队列,进行指定队列的监听,然后启动生产者进行生产消息。

6.2 RabbitMQ 在 SpringBoot 中的使用

pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${rabbitmq.version}</version>
</dependency>

application.yml

# rabbitmq
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest

6.2.1 声明 exchange、queue、Binding

SpringAMQP 项目对 RabbitMQ 做了很好的封装,可以很方便的手动声明队列,交换器,绑定。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * 声明一个直连交换机
 * 声明两个队列
 * 将两个队列通过 routingKey 绑定到交换机
 */
@Component
public class DirectDeclare {

    @Bean(RabbitConfig.Direct.EXCHANGE)
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(RabbitConfig.Direct.EXCHANGE).durable(true).build();
    }

    @Bean(RabbitConfig.Direct.QUEUE_ONE)
    public Queue directQueueOne() {
        return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_ONE).build();
    }

    @Bean(RabbitConfig.Direct.QUEUE_TWO)
    public Queue directQueueTwo() {
        return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_TWO).build();
    }

    @Bean
    public Binding bindingQueueOne(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange,
                                   @Qualifier(RabbitConfig.Direct.QUEUE_ONE) Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_ONE_KEY);
    }

    @Bean
    public Binding bindingQueueTwo(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange,
                                   @Qualifier(RabbitConfig.Direct.QUEUE_TWO) Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_TWO_KEY);
    }
}

6.2.2 生产者发送消息

RabbitTemplate 是 Spring 用于简化同步 RabbitMQ 访问(发送和接收消息)的 Helper 类。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitService {

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send(RabbitSendVo sendVo) {

        String exchange = sendVo.getExchange();
        String routingKey = sendVo.getRoutingKey();
        String message = sendVo.getMessage();

        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

6.2.3 消费者监听消息

消息的监听使用 @RabbitListener 来完成,使用 queues 指定需要监听的队列。

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class DirectListener {

    @RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE})
    public void listenerOne(Message message) throws IOException {
        log.info("【directQueueOne】收到消息:{}", new String(message.getBody()));
    }
}

同时,@RabbitListener 也可以作用于 class 上,然后使用 @RabbitHandler 配合使用。
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

@Slf4j
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE})
public class DirectListener {

    @RabbitHandler 
    public void listenerOne(Message message) throws IOException {
        log.info("【directQueueOne】收到消息:{}", new String(message.getBody()));
    }
}

6.2.4 @RabbitListener、@Exchange、@Queue、@QueueBinding

使用 @RabbitListener 可以搭配 @Exchange、@Queue、@QueueBinding 等注解简化使用的过程,exchange、queue 的一些属性也可以在这里配置。

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 通过注解实现一个主题交换机
 */
@Slf4j
@Component
public class TopicListener {

    /**
     * 主题交换机
     * 处理 Cat 相关消息
     * 通过注解方式创建,命名的都是持久的
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(name = RabbitConfig.Topic.EXCHANGE, type = ExchangeTypes.TOPIC),
            key = RabbitConfig.Topic.QUEUE_CAT_KEY,
            value = @Queue(RabbitConfig.Topic.QUEUE_CAT)
    ))
    public void topicCat(Channel channel, Message message) throws IOException {
        log.info("【topic-cat】收到消息:{}", new String(message.getBody());
    }
}

6.2.5 MessageConverter

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte [] 数组的解析

  • RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等

  • 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化

  • SimpleMessageConverter 对于要发送的消息体 body 为 byte [] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含 class 类名,类相应方法等信息。因此性能较差

  • 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能

public interface MessageConverter {

	/**
	 * Convert a Java object to a Message.
	 * @param object the object to convert
	 * @param messageProperties The message properties.
	 * @return the Message
	 * @throws MessageConversionException in case of conversion failure
	 */
	Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException;

	/**
	 * Convert a Java object to a Message.
	 * The default implementation calls {@link #toMessage(Object, MessageProperties)}.
	 * @param object the object to convert
	 * @param messageProperties The message properties.
	 * @param genericType the type to use to populate type headers.
	 * @return the Message
	 * @throws MessageConversionException in case of conversion failure
	 * @since 2.1
	 */
	default Message toMessage(Object object, MessageProperties messageProperties, @Nullable Type genericType)
			throws MessageConversionException {

			return toMessage(object, messageProperties);
	}

	/**
	 * Convert from a Message to a Java object.
	 * @param message the message to convert
	 * @return the converted Java object
	 * @throws MessageConversionException in case of conversion failure
	 */
	Object fromMessage(Message message) throws MessageConversionException;

}

6.2.5.1 SimpleMessageConverter

SpringAMQP 默认使用 SimpleMessageConverter 来实现 消息的转换,是一个 可以处理字符串、可序列化实例或字节数组的MessageConverter实现。

SimpleMessageConverter.createMessage:

// SimpleMessageConverter.createMessage
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
	byte[] bytes = null;
	if (object instanceof byte[]) {
		bytes = (byte[]) object;
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
	}
	else if (object instanceof String) {
		try {
			bytes = ((String) object).getBytes(this.defaultCharset);
		}
		catch (UnsupportedEncodingException e) {
			throw new MessageConversionException(
					"failed to convert to Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		messageProperties.setContentEncoding(this.defaultCharset);
	}
	else if (object instanceof Serializable) {
		try {
			bytes = SerializationUtils.serialize(object);
		}
		catch (IllegalArgumentException e) {
			throw new MessageConversionException(
					"failed to convert to serialized Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
	}
	if (bytes != null) {
		messageProperties.setContentLength(bytes.length);
		return new Message(bytes, messageProperties);
	}
	throw new IllegalArgumentException(getClass().getSimpleName()
			+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}

消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

  • application/octet-stream:二进制字节数组存储,使用 byte []
  • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
  • text/plain:文本数据类型存储,使用 String
  • application/json:JSON 格式,使用 Object、相应类型

普通消息实例:

contentType=text/plain

(Body:'测试消息' MessageProperties [headers={spring_listener_return_correlation=d4e4e756-e85e-4ec3-accb-e6b08a7c2f80}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct, receivedRoutingKey=one, deliveryTag=2, consumerTag=amq.ctag-7eT4L6V5TKzC4GMDE_hOcg, consumerQueue=directQueueOne])

Java Bean 消息实例:

contentType=application/x-java-serialized-object

(Body:'[B@3823efd5(byte[151])' MessageProperties [headers={spring_listener_return_correlation=0dc30aa6-b8cd-4f2f-9129-0fc8b729e8ea}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct, receivedRoutingKey=one, deliveryTag=1, consumerTag=amq.ctag-U0u4SdugbrO9-SbQ3bD6gg, consumerQueue=directQueueOne])

6.2.5.2 修改默认 MessageConverter

消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在RabbitListenerContainerFactory实例中去设置(默认 Spring 使用的实现是SimpleRabbitListenerContainerFactory`)

使用 SpringAMQP 提供的 Jackson2JsonMessageConverter 实现 消息的 Json 转换。

publisher 和 consumers 使用相同的 MessageConverter。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    /**
     * 消息序列化方式
     */
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

直接注入是最方便的方法了,版本 spring-boot-starter-amqp 2.4.4.

当然也可以使用另一种方式,在生产者和消费者两边都设置同样的 MessageConverter, 同时可以配置一些其他业务相关的属性。

@Configuration
public class RabbitConfig {

    /**
     * consumers 监听容器
     * @param listenerContainerFactoryConfigurer listener configurer
     * @param connectionFactory 连接工厂
     * @return RabbitListenerContainerFactory
     */
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, CachingConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();

        //使用配置
        listenerContainerFactoryConfigurer.configure(listenerContainerFactory, connectionFactory);

        //配置消息反序列方式
        listenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        return listenerContainerFactory;
    }

    /**
     * publisher template
     * @return RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplateConfigurer templateConfigurer) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        //使用配置
        templateConfigurer.configure(rabbitTemplate, connectionFactory);

        //消息序列化方式
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        //避免阻塞,将 publish 和 consumer 的 connection 分开
        rabbitTemplate.setUsePublisherConnection(true);

        return rabbitTemplate;
    }
}

6.2.5.3 @Payload 与 @Headers

使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
    System.out.println("body:"+body);
    System.out.println("Headers:"+headers);
}

也可以获取单个 Header 属性

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
    System.out.println("body:"+body);
    System.out.println("token:"+token);
}

6.2.6 消息确认机制

RabbitMQ 的消息确认有两种。

一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。

第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

6.2.6.1 消息发送确认

6.2.6.1.1 ConfirmCallback

通过实现 ConfirmCallBack 接口,消息发送到交换器 Exchange 后触发回调。

使用该功能需要开启确认,spring-boot 中配置如下:

spring.rabbitmq.publisher-confirm-type= correlated

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
 * mq 消息投递到exchange 确认回调
 * 处理失败消息
 */
@Slf4j
@Component("rabbitConfirmCallback")
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("消息投递到exchange失败,cause={}", cause);
        }
    }
}
  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。

  • ack:消息投递到 broker 的状态,true 表示成功。

  • cause:表示投递失败的原因。

消息投递到exchange失败,cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'direct1' in vhost '/', class-id=60, method-id=40)

不管有没有成功发送到 exchange ,都会走 ConfirmCallback 的回调,参数 boolean ack 表示是否成功,case 是失败的原因,例如:NOT_FOUND - no exchange 'direct1' in vhost '/'

6.2.6.1.2 ReturnsCallback

通过实现 ReturnCallback 接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的 routingKey 找不到队列时会触发)

使用该功能需要开启确认,spring-boot 中配置如下:

spring.rabbitmq.template.mandatory: true

/**
 * 消息没有匹配到合适的队列时,退回回调处理
 */
@Slf4j
@Component("rabbitReturnsCallback")
public class ReturnsCallback implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        String exchange = returned.getExchange();
        String routingKey = returned.getRoutingKey();
        Message message = returned.getMessage();
        log.error("消息丢失[exchange={},routingKey={},message={}]", exchange, routingKey, message);
    }
}
[exchange=direct,routingKey=one1,message=(Body:'[B@669a964c(byte[151])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]
6.2.6.1.3 配置使用 ConfirmCallback、ReturnsCallback
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {
    
    /**
     * publisher template
     * @param connectionFactory  连接工厂
     * @param templateConfigurer configurer
     * @param confirmCallback    confimCallback
     * @param returnsCallback    returnsCallback
     * @return RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplateConfigurer templateConfigurer,
                                         RabbitTemplate.ConfirmCallback confirmCallback, RabbitTemplate.ReturnsCallback returnsCallback) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        //使用配置
        templateConfigurer.configure(rabbitTemplate, connectionFactory);

        //消息序列化方式
        //rabbitTemplate.setMessageConverter(jsonMessageConverter());

        //避免阻塞,将 publish 和 consumer 的 connection 分开
        rabbitTemplate.setUsePublisherConnection(true);

        //confirm 回调,开启publisher-confirm-type:correlated
        //connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        rabbitTemplate.setConfirmCallback(confirmCallback);

        //return 回调,开启template.mandatory=true
        //消息根据 routingKey 无法找到合适的 queue 时,将消息退回,而不是丢失。
        //rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnsCallback(returnsCallback);

        return rabbitTemplate;
    }

    /**
     * 消息序列化方式
     */
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

6.2.6.2 消息接收确认

6.2.6.2.1 确认模式
  • AcknowledgeMode.NONE:不确认
  • AcknowledgeMode.AUTO:自动确认
  • AcknowledgeMode.MANUAL:手动确认

spring-boot 中配置方法:

spring.rabbitmq.listener.simple.acknowledge-mode = manual

6.2.6.2.2 手动确认
  1. 确认成功
void basicAck(long deliveryTag, boolean multiple) throws IOException;
  • deliveryTag: 该消息的 index

  • multiple:是否批量. true:将一次性 ack 所有小于 deliveryTag 的消息。

假设我先发送三条消息 deliveryTag 分别是 5、6、7,可它们都没有被确认,当我发第四条消息此时 deliveryTag 为 8,multiple 设置为 true,会将 5、6、7、8 的消息全部进行确认。

消费者成功处理后,调用 channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false); 方法对消息进行确认。

@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE})
public void listenerOne(Channel channel, Message message, @Payload Mail mail) throws IOException {
    log.info("【directQueueOne】收到消息:{}", mail);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
  1. 确认失败

确认失败,可以使用 requeue参数 选择是否将消息重新放回队列。

//第一种方式
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • deliveryTag: 该消息的 index,递增的。

  • multiple:是否批量. true:将一次性拒绝所有小于 deliveryTag 的消息。

  • requeue:被拒绝的是否重新入队列。

//第二种方式
void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag: 该消息的 index。

  • requeue:被拒绝的是否重新入队列。

如果没有其他接收者监控这个 queue 的话,要注意一直无限循环发送的危险。
当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。
建议为队列设置私信队列,当出现异常时,由死信队列监听后再进行处理。

@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception {
    try {
        log.info("【directQueueTwo】收到消息:{}", mail);
        int i = 1 / 0;
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }catch (Exception e){
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
    }
}

basicNack、basicReject区别:

channel.basicNackchannel.basicReject 的区别在于 basicNack 可以批量拒绝多条消息(一次性拒绝所有小于 deliveryTag 的消息),而 basicReject 一次只能拒绝一条消息。

6.2.7 死信队列

为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。

6.2.7.1 死信队列是什么

死信,在官网中对应的单词为 “Dead Letter”。
死信RabbitMQ 中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时 requeue 属性被设置为 false
  • 消息在队列的存活时间超过设置的 TTL 时间。
  • 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为 “死信”。

TTL - 通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。
设置了 TTL 后,在设置的时间内,没有消费者消费这条消息,那么判定这条消息为过期。

“死信” 消息会被 RabbitMQ 进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

6.2.7.2 配置使用死信队列

大概可以分为以下步骤:

  • 配置业务队列,绑定到业务交换机上
  • 为业务队列配置死信交换机和路由 key
  • 为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由 key。

有了死信交换机和路由 key 后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由 key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

声明 exchange、queue,将 exchange、queue 绑定,然后在其他队列创建时,指定扩展参数设定其死信消息都发送到该队列。

import com.yohaps.mq.rabbittemplate.config.RabbitConfig;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * 声明死信队列
 * 就是普通的交换机和队列,接收其他队列的死信消息
 */
@Component
public class DeadLetterDeclare {

    @Bean(RabbitConfig.DeadLetter.EXCHANGE)
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder.directExchange(RabbitConfig.DeadLetter.EXCHANGE).durable(true).build();
    }

    @Bean(RabbitConfig.DeadLetter.QUEUE_ONE)
    public Queue deadLetterQueueOne() {
        return QueueBuilder.durable(RabbitConfig.DeadLetter.QUEUE_ONE).build();
    }

    @Bean(RabbitConfig.DeadLetter.QUEUE_TWO)
    public Queue deadLetterQueueTwo() {
        return QueueBuilder.durable(RabbitConfig.DeadLetter.QUEUE_TWO).build();
    }

    @Bean
    public Binding bindingDeadLetterQueueOne(@Qualifier(RabbitConfig.DeadLetter.EXCHANGE) DirectExchange exchange,
                                   @Qualifier(RabbitConfig.DeadLetter.QUEUE_ONE) Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.DeadLetter.QUEUE_ONE_KEY);
    }

    @Bean
    public Binding bindingDeadLetterQueueTwo(@Qualifier(RabbitConfig.DeadLetter.EXCHANGE) DirectExchange exchange,
                                   @Qualifier(RabbitConfig.DeadLetter.QUEUE_TWO) Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.DeadLetter.QUEUE_TWO_KEY);
    }
}

关键:
在需要配置死信队列的队列创建时,指定死信交换机和key.

队列属性:

  • x-dead-letter-exchange:指定死信交换机
  • x-dead-letter-routing-key:指定死信 routingKey
  • x-message-ttl:消息过期时间 ms
  • alternate-exchange:备份交换机
import com.yohaps.mq.rabbittemplate.config.RabbitConfig;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * 声明一个直连交换机
 * 声明两个队列
 * 将两个队列通过 routingKey 绑定到交换机
 */
@Component
public class DirectDeclare {

    @Bean(RabbitConfig.Direct.EXCHANGE)
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(RabbitConfig.Direct.EXCHANGE).durable(true).build();
    }

    @Bean(RabbitConfig.Direct.QUEUE_ONE)
    public Queue directQueueOne() {
        //声明当前队列绑定的死信交换机与指定的key
        return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_ONE)
                .withArgument("x-dead-letter-exchange", RabbitConfig.DeadLetter.EXCHANGE)
                .withArgument("x-dead-letter-routing-key", RabbitConfig.DeadLetter.QUEUE_ONE_KEY)
                .build();
    }

    @Bean(RabbitConfig.Direct.QUEUE_TWO)
    public Queue directQueueTwo() {
        //声明当前队列绑定的死信交换机与指定的key
        return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_TWO)
                .withArgument("x-dead-letter-exchange", RabbitConfig.DeadLetter.EXCHANGE)
                .withArgument("x-dead-letter-routing-key", RabbitConfig.DeadLetter.QUEUE_TWO_KEY)
                .withArgument("x-message-ttl",20000)
                .build();
    }

    @Bean
    public Binding bindingQueueOne(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange,
                                   @Qualifier(RabbitConfig.Direct.QUEUE_ONE) Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_ONE_KEY);
    }

    @Bean
    public Binding bindingQueueTwo(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange,
                                   @Qualifier(RabbitConfig.Direct.QUEUE_TWO) Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_TWO_KEY);
    }
}

死信队列的监听,也是跟普通队列没区别

@Slf4j
@Component
public class DeadLetterListener {

    @RabbitListener(queues = {RabbitConfig.DeadLetter.QUEUE_ONE})
    public void listenerOne(Channel channel, Message message, @Payload Mail mail) throws IOException {
        log.info("【DeadLetterListener-One】收到消息:{}", mail);
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        System.out.println(receivedRoutingKey);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
    
    @RabbitListener(queues = {RabbitConfig.DeadLetter.QUEUE_TWO})
    public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws IOException {
        log.info("【DeadLetterListener-Two】收到消息:{}", mail);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

测试:在业务处理中,发生异常,捕捉到异常后,设置 queues=false,消息会进入到死信队列。

@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception {
    try {
        log.info("【directQueueTwo】收到消息:{}", mail);
        int i = 1 / 0;
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }catch (Exception e){
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
    }
}

利用 死信队列 + TTL 的特性,可以实现延迟消息
比如:登录后,发送一条消息(推荐商品,发邮件通知等),设置 TTL 而没有消费者进行消费,过期后就会进入死信队列,在死信队列处理业务。

6.2.8 重试机制

在消费端在处理消息过程中发生异常,进行重新尝试。
开启重试机制:

# rabbitmq
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 自动ack
        retry:
          enabled: true
          max-attempts: 5 # 重试次数
          max-interval: 10000   # 重试最大间隔时间
          initial-interval: 2000  # 重试初始间隔时间
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

消费端代码模拟发生异常:

@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception {
    log.info("【directQueueTwo】收到消息:{}", mail);
    int i = 1 / 0;
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

此时启动程序,发送消息后可以看到控制台输出内容如下:

可以看到重试次数是 5 次(包含自身消费的一次),重试时间依次是 2s,4s,8s,10s(上一次间隔时间 * 间隔时间乘子),最后一次重试时间理论上是 16s,但是由于设置了最大间隔时间是 10s,因此最后一次间隔时间只能是 10s,和配置相符合。

注意:

重试并不是 RabbitMQ 重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟 mq 没有任何关系;
因此上述消费者代码不能添加 try {} catch (){},一旦捕获了异常,在自动 ack 模式下,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试的;

重试次数耗尽后,依然没有正确消费的处理查看 《MessageReCoverer》

6.2.9 MessageReCoverer

在消费端处理业务的过程中发生异常,会使用 MessageRecoverer 处理。

public interface MessageRecoverer {

	/**
         * 已消费但所有重试尝试失败的消息的回调。
	 * Callback for message that was consumed but failed all retry attempts.
	 * 
	 * @param message the message to recover
	 * @param cause the cause of the error
	 */
	void recover(Message message, Throwable cause);
}

MessageRecoverer 有以下几个实现类:

  • RejectAndDontRequeueRecoverer:拒绝并且不会将消息重新发回队列
  • RepublishMessageRecoverer:重新发布消息
  • ImmediateRequeueMessageRecoverer:立即重新返回队列

默认使用 RejectAndDontRequeueRecoverer 配置。

6.2.9.1 配置MessageReCoverer

/**
 * 消费端处理消息,发生异常后,将消息发送到新的队列
 */
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
    return new RepublishMessageRecoverer(rabbitTemplate, "errorExchange","errorRoutingKey");
}
/**
 * 消费端处理消息,发生异常后,将消息重新放入队列
 * 周而复始直到不抛出异常为止,这样还是会影响后续的消息消费。
 */
@Bean
public MessageRecoverer messageRecoverer(){
    return new ImmediateRequeueMessageRecoverer();
}

6.2.9.2 特别注意

在自动 ACK 模式下,消费端发生异常并且在重试次数耗尽后,默认情况 下使用 RejectAndDontRequeueRecoverer 处理(拒绝消息并且不会将消息重新发回队列),也就是 requeue=false,如果配置了 死信队列 会将消息发送到 死信队列。

一般来说重试实在 自动 ACK 模式下进行的,如果是 手动 ACK ,重试也是正常进行的,但是尝试将消息使用 RepublishMessageRecoverer 重新发布到新的队列会失败,配置了死信队列也不会进入,而在自动 ACK 模式下,三种实习方式都是正常的。

6.2.10 手动ACK实现重试,超出后放入死信

在手动ACK模式下,尝试在重试次数耗尽后,将其放入新的队列处理,出现异常。于是,手动实现重试机制:

  1. 在出现异常时,进行捕获
  2. 将当前消息ACK
  3. 将当前消息手动重新放入队列,并在其 Header 记录重试次数
  4. 达到重试次数后,利用 TTL 过期机制,将其放入 死信队列
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws IOException, Int
    try {
        log.info("【directQueueTwo】收到消息:{}", mail);
        int i = 1 / 0;
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        //发生异常后,拒绝消息
        //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
        System.out.println("catch");
        /**
         * 以下手动实现重试,耗尽次数后再进行处理
         * 发生异常后,先 ACK ,然后在消息的 header 中记录重试次数,重新将消息发回队列
         */
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        //重试次数
        Integer retryCount;
        String mapKey = "retry-count";
        if (!headers.containsKey(mapKey)) {
            retryCount = 0;
        } else {
            retryCount = (Integer) headers.get(mapKey);
        }
        if (++retryCount < 3) {
            /**
             * 当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部。
             * 这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行
             * 而比较理想的方式是,出现异常时,消息到达消息队列尾部,这样既保证消息不回丢失,又保证了正常业务的进行。
             * 因此我们采取的解决方案是,将消息进行应答。
             * 这时消息队列会删除该消息,同时我们再次发送该消息 到消息队列,这时就实现了错误消息进行消息队列尾部的方案
             */
            log.info("已经重试 " + retryCount + " 次");
            headers.put("retry-count", retryCount);
            //1.应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //2.重新发送到MQ中
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("
            channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), basicProperties,
                    message.getBody());
        } else {
            log.info("现在重试次数为:" + retryCount);
            /**
             * 不重要的操作放入 死信队列
             * 消息异常处理:消费出现异常后,延时几秒,然后从新入队列消费,直到达到ttl超时时间,再转到死信,证明这个信息有问题需要人工干预
             */
            //休眠2s 延迟写入队列,触发转入死信队列
            Thread.sleep(2000);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

7. RabbitMQ 进阶

7.1 消息幂等性

目前实现的最多的就是 At least one 语义,也就是保证消费者至少接收到一次消息,所以会存在重复消息的情况,需要我们业务代码进行幂等的处理.

可以给发送的消息一个唯一 ID,将 ID 存储在分布式缓存(或数据库)中,这样在每次接收到消息之后,进行比较,看是否是重复的消息。

@Service
public class RabbitService {

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void send(RabbitSendVo sendVo) {

        String exchange = sendVo.getExchange();
        String routingKey = sendVo.getRoutingKey();
        Mail mail = sendVo.getMail();

        //默认使用 UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData();
        //correlationData.setId("1");

        rabbitTemplate.convertAndSend(exchange, routingKey, mail,correlationData);
    }
}

需要保证缓存一定是高可用的

7.2 消息顺序

一个 queue,有多个 consumer 去消费,这样就会造成顺序的错误,consumer 从 MQ 里面读取数据是有序的,但是每个 consumer 的执行时间是不固定的,无法保证先读到消息的 consumer 一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。

这时,可以采用单消费者模式,一个队列只绑定一个消费者。

7.3 限流

在订单高峰期,rabbitmq 上已经堆积了很多消息等待消费,如果没有任何限流措施,贸然启动一个消费者时,如此多的消息瞬间推送给消费者,消费者可能因无法处理这么多的消息而承受巨大压力,甚至崩溃!

在 SpringBoot 中:

spring.rabbitmq.listener.prefetch=1
// 表示不限制消息大小, 一次只处理一条消息, 限制只是当前消费者有效
channel.basicQos(0, 1, false);

7.4 队列最大限制

默认情况下,rabbitmq 中的 queue 的最大长度和总字节数不受限制的(仅受全局内存,磁盘阈值的影响)。

queue 达到限制的阈值时,如果此 queue 配置了 dead-lettered,则 queue 最前面的消息将被转发到 dead-lettered,如果没有配置则最前面的消息将会被直接丢弃。从而达到通过转发或丢弃 queue 最前面的消息,来为新消息腾出空间的目的。

可以通过参数来配置 queue 的最大长度和最大总字节数:

  • x-max-length:控制 queue 的最大长度 —— 非负整数
  • x-max-length-bytes:控制 queue 中所有消息的最大总字节数 —— 非负整数
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10); // 设置queue的最大长度10
args.put("x-max-length-bytes", 1024); // 设置最大总字节数1KB
channel.queueDeclare("myqueue", false, false, false, args);


这篇关于RabbitMQ 简介与使用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程