Skip to main content

消息模型

David LiuAbout 3 min

消息模型

  • 基本消息队列(basicQueue)
  • 工作消息队列(workQueue)

发布订阅(Publish,Sub),根据交换机类型不同分为三种:

  • Fanout 广播
  • Direct 路由
  • Topic 主题

publish

Work Queue 模型

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。

消息预取

prefetch,控制消费者预取的消息数量。在消息被处理之前,各个消费者会平均的预取,最多prefetch条消息,为了让各个队列根据消费能力来取可以prefetch设置为1,这样用多少取多少,不会平均prefetch了,可以实现能者多劳

发布订阅模型

实现同一消息发送给多个消费者。实现方式是加入exchange(交换机)

exchange负责消息路由,而不是存储,路由失败则消息丢失

FanoutExchange

截屏2023-02-09 15.24.15

利用SpringAMQP演示FanoutExchange的使用

实现思路如下:

  1. 在consumer服务中,利用代码声明队列、交换机,并将两者绑定
  2. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue.
  3. 在publisher中编写测试方法,向itcast.fanout发送消息
@Configuration
public class FanoutConfig {
    // 声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("neud.fanout");
    }
    // 声明第一个队列
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("neud.queue1");
    }
    // 声明第一个队列
    @Bean
    public Queue bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
    // 声明第2个队列
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("neud.queue2");
    }
    // 声明第2个队列
    @Bean
    public Queue bindingQueue1(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
    
}

交换机的作用是什么?

  • 接收publisher2发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

DirectExchange

截屏2023-02-09 17.57.31

会发送给所有key匹配的queue,一个queue可以有多个key,所以可以拿来模拟Fanout

声明队列、交换机和绑定规则可能会很复杂,如果都在Configure的Bean里面声明会很复杂和麻烦,可以在消费者上面通过注解,来自动创建这些东西,eg.

@Component
public class SpringRabbitListener {
    
    @RabbitListener(bindings = @QueueBinding(
    	value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "neud.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
    ))
    public void listenDirectQueueMessage(String msg) throws InterruptedException {
        System.out.println(msg);
    }
    
    @RabbitListener(bindings = @QueueBinding(
    	value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "neud.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
    ))
    public void listenDirectQueueMessage2(String msg) throws InterruptedException {
        System.out.println(msg);
    }
}

生产者通过routingKey来选择发给谁。

描述下Direct交换机与Fanout:交换机的差异?

  • Fanout:交换机将消息路由给每一个与之绑定的队列
  • Direct:交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?

  • @Queue
  • @Exchange

TopicExchange

与Direct类似,区别在于routingKey必须是多个单词的列表,并且以.分割。

Queue与Exchange指定BindingKey时可以使用通配符,注意这里面的通配符和平时的规则不太一样,如下:

  • #:代指0或多个单词
  • *:代指一个单词

(最常用)

@Component
public class SpringRabbitListener {
    
    @RabbitListener(bindings = @QueueBinding(
    	value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "neud.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
    ))
    public void listenTopicQueueMessage(String msg) throws InterruptedException {
        System.out.println(msg);
    }
    
    @RabbitListener(bindings = @QueueBinding(
    	value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "neud.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
    ))
    public void listenTopicQueueMessage2(String msg) throws InterruptedException {
        System.out.println(msg);
    }
}