消息模型
February 27, 2023About 3 min
消息模型
- 基本消息队列(basicQueue)
- 工作消息队列(workQueue)
发布订阅(Publish, Subscribe),根据交换机类型不同分为三种:
- Fanout 广播
- Direct 路由
- Topic 主题
publish
Work Queue 模型
Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
消息预取
prefetch,控制消费者预取的消息数量。在消息被处理之前,各个消费者会平均的预取,最多prefetch条消息,为了让各个队列根据消费能力来取可以prefetch设置为1,这样用多少取多少,不会平均prefetch了,可以实现能者多劳
发布订阅模型
实现同一消息发送给多个消费者。实现方式是加入exchange(交换机)
exchange负责消息路由,而不是存储,路由失败则消息丢失
FanoutExchange
利用SpringAMQP演示FanoutExchange的使用
实现思路如下:
- 在consumer服务中,利用代码声明队列、交换机,并将两者绑定
- 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue.
- 在publisher中编写测试方法,向itcast.fanout发送消息
@Configuration
public class FanoutConfig {
// 声明交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("neud.fanout");
}
// 声明第一个队列
@Bean
public Queue fanoutQueue1() {
return new Queue("neud.queue1");
}
// 声明第一个队列
@Bean
public Queue bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
// 声明第2个队列
@Bean
public Queue fanoutQueue2() {
return new Queue("neud.queue2");
}
// 声明第2个队列
@Bean
public Queue bindingQueue1(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
交换机的作用是什么?
- 接收publisher2发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange
- Binding
DirectExchange
会发送给所有key匹配的queue,一个queue可以有多个key,所以可以拿来模拟Fanout
声明队列、交换机和绑定规则可能会很复杂,如果都在Configure的Bean里面声明会很复杂和麻烦,可以在消费者上面通过注解,来自动创建这些东西,eg.
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "neud.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueueMessage(String msg) throws InterruptedException {
System.out.println(msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "neud.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueueMessage2(String msg) throws InterruptedException {
System.out.println(msg);
}
}
生产者通过routingKey来选择发给谁。
描述下Direct交换机与Fanout:交换机的差异?
- Fanout:交换机将消息路由给每一个与之绑定的队列
- Direct:交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
- @Queue
- @Exchange
TopicExchange
与Direct类似,区别在于routingKey必须是多个单词的列表,并且以.
分割。
Queue与Exchange指定BindingKey时可以使用通配符,注意这里面的通配符和平时的规则不太一样,如下:
#
:代指0或多个单词*
:代指一个单词
(最常用)
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "neud.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueueMessage(String msg) throws InterruptedException {
System.out.println(msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "neud.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueueMessage2(String msg) throws InterruptedException {
System.out.println(msg);
}
}