Skip to main content

消息应答与确认

David LiuAbout 4 min

消息应答与确认

RabbitMQ 的三种确认机制:

  1. Publish ——》Broker:confirmCallback
  2. Exchange ——》Queue:returnCallback
  3. Queue——》Client:Ack机制

消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为刷除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它己经处理了,rabbitmq可以把该消息删除了。

三种方式:

  • 自动确认:none
  • 手动确认:manual
  • 根据一切情况确认:auto(比较麻烦)

自动应答

接受到消息就当作应答了

不可靠,生产中不推荐

手动应答

工作线程执行完成以后手动应答。

  • 肯定确认

    basicAck

  • 否定确认

    basicNack(可以批量应答)

    basicReject(不可批量)

需要实现ChannelAwareListener接口,可以获取到Channel

channel.basicAck(tag, false);

  1. deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

  2. multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

    建议不开启,因为还是有概率前面的消息没处理完就被应答造成丢失

批量应答好处:可以减少网络拥堵

推荐不开启批量应答,因为后面的应答了不代表前面的处理完了,这样可能产生丢失

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "adornment.queue"),
        exchange = @Exchange(name = "neud.adornment", type = ExchangeTypes.TOPIC),
        key = "adornment.orders"
), ackMode = "MANUAL")
public void listenTopicQueueMessage(
        AdornmentOrder adornmentOrder,
        @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
        Channel channel
) throws IOException {
    try {
        // 创建订单
        adornmentOrderService.createAdornmentOrder(adornmentOrder);
        // 手动应答 "amqp_deliveryTag"
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        log.error("处理订单异常", e);
        // 拒绝消息
        channel.basicReject(deliveryTag, false);
    }
}

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

高级消息确认

解决问题:

生产者发消息给MQ(交换机或队列)收不到的时候要进行确认机制处理,如果收不到就放到缓存中,以后重试。

生产者角度

确认机制:Confirm 确认模式

设置方法:spring.rabbitmq.publisher-confirm-type=correlated

配置选项:

  • NONE

    禁用发布确认模式,默认值

  • CORRELATION

    发布消息成功后会触发回调方法

    异步批量确认

  • SIMPLE

    同步确认消息,发一条确认一条,浪费时间

    722ms

回退消息:return 回退模式

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息如 果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

设置方法:Mandatory参数

交换机收到后,发现消息不可送达时,把消息返回生产者。

使用x-delayed-message 延迟插件,每次都强制触发returnedMessage回调方法

解决方案

如果配置了发送回调ReturnCallback,插件延迟队列则会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。

并非是BUG,而是有原因的,建议利用if 去拦截这个异常,判断延迟队列交换机名称,然后break;