消息队列学习

消息中间件

消息中间件的含义

  1. 利用可靠的消息传递机制进行系统和系统直接的通讯
  2. 通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯

消息中间件应用的场景

  1. 跨系统数据传递
  2. 高并发的流量削峰
  3. 数据的并发和异步处理
  4. 大数据分析与传递
  5. 分布式事务比如你有一个数据要进行迁移或者请求并发过多的时候,

常见的消息中间件

ActiveMQ、RabbitMQ、Kafka、RocketMQ等

消息中间件的本质及设计

它是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务

MQ消息队列:负责数据的传接受,存储和传递,所以性能要高于普通服务和技术

消息中间件的核心组成部分

消息的协议
消息的持久化机制
消息的分发策略
消息的高可用,高可靠
消息的容错机制

消息队列协议

AMQP协议

AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现由 RabbitMQ等

特性:

  1. 分布式事务支持
  2. 消息的持久化支持
  3. 高性能和高可靠的消息处理优势

支持者:RabbitMQ,ActiveMQ

MQTT协议

MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分

特点:

  1. 轻量
  2. 结构简单
  3. 传输快,不支持事务
  4. 没有持久化设计

应用场景:

  1. 适用于计算能力有限
  2. 低带宽
  3. 网络不稳定的场景

支持者:RabbitMQ,ActiveMQ

OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准

特点:

  1. 结构简单
  2. 解析速度快
  3. 支持事务和持久化设计

支持者:RocketMQ

Kafka协议

Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成

特点:

  1. 结构简单
  2. 解析速度快
  3. 无事务支持
  4. 有持久化设计

小结

协议:实在 tcp/ip协议基础之上构建的一种约定俗称的规范和机制、它的主要目的可以让客户端(应用程序 java,go)进行沟通和通讯。并且这种写一下规范必须具有持久性,高可用,高可靠的性能

消息的分发策略

ActiveMQ RabbitMQ Kafka RocketMQ
发布订阅 支持 支持 支持 支持
轮询分发 支持 支持 支持 /
公平分发 / 支持 支持 /
重发 支持 支持 / 支持
消息拉取 / 支持 支持 支持

RabbitMQ

Docker安装RabbitMQ

获取rabbit镜像

1
docker pull rabbitmq:management

创建并运行容器

1
docker run -id --name docker-rabbitmq-management -p 15672:15672 -p 5672:5672 rabbitmq:management

添加用户并授权权限

新增用户

1
rabbitmqctl add_user admin admin

设置用户分配操作权限

1
rabbitmqctl set_user_tags admin administrator

用户级别:

  1. administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理

    policymaker和monitoring可以做的任何事外加:
    创建和删除virtual hosts
    查看、创建和删除users
    查看创建和删除permissions
    关闭其他用户的connections

  2. monitoring:监控者 登录控制台,查看所有信息

    management可以做的任何事外加:

    列出所有virtual hosts,包括他们不能登录的virtual hosts
    查看其他用户的connections和channels
    查看节点级别的数据如clustering和memory使用情况
    查看真正的关于所有virtual hosts的全局的统计信息

  3. policymaker:策略制定者 登录控制台,指定策略

    management可以做的任何事外加:

    查看、创建和删除自己的virtual hosts所属的policies和parameters

  4. managment 普通管理员 登录控制台

    用户可以通过AMQP做的任何事外加:
    列出自己可以通过AMQP登入的virtual hosts
    查看自己的virtual hosts中的queues, exchanges 和 bindings
    查看和关闭自己的channels 和 connections
    查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。

为用户添加资源权限

1
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

RabbitMQ入门案例 - Simple 简单模式

导入依赖

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author monochrome
* @date 2021/7/4
*/
public class Producer {

public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("monochrome");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("生产者");
//2.创建通道
channel = connection.createChannel();
//3.通过创建交换机,声明队列,绑定关系,路由key,发送消息和接受消息
/*参数1:是否持久化,非持久化消息会存盘吗?会存盘,但是会随着重启服务器而丢失
参数2:是否独占队列
参数3:是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
参数4:携带附属属性
*/
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);

//4.发送消息给队列queue
/*参数1:交换机
参数2:队列、路由key
参数3:消息的状态控制
参数4:消息主题
*/
//面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机
String message = "Hello";
channel.basicPublish("", queueName, null, message.getBytes());
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}finally {
//5.关闭
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author monochrome
* @date 2021/7/4
*/
public class Consumer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("monochrome");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者");
//2.创建通道
channel = connection.createChannel();

String queueName = "queue1";

channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接收消息失败!");
}
});
System.out.println("开始接收消息");
while (true) {

}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}finally {
//5.关闭
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

AMQP

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计

AMQP生产者流转过程

AMQP生产者流转过程

AMQP消费者流转过程

AMQP消费者流转过程

RabbitMQ的核心组成部分

RabbitMQ的核心组成部分

RabbitMQ的核心组成部分

Server:又被称为Broker,接受客户端的连接,实现AMQP服务。就是我们自己安装的rabbitmq-server
Connection:连接,应用程序与Broker的网络连接(使用的是TCP/IP连接)
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,每个通道Channel代表一个会话任务。
Message:消息,服务于应用程序之间传递的数据,由Properties和body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特征,Body则是消息体的内容。
Virtual Host:虚拟地址,用于逻辑层隔离,最上层的消息路由,一个虚拟机理由可以有若干的Exchange和Queueu,同一个虚拟机里面不能有相同名字的Exchange。
Exchange:交换机,接收消息,根据路由键发送消息到绑定的队列(不具备储存消息的能力)
Bindings:Exchange和Queue之间的虚拟连接,Binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列,也是MessageQueue队列,保存消息转发给消费者

RabbitMQ的操作流程

第一:获取Conection
第二:获取Channel
第三:定义Exchange,Queue
第四:使用一个RoutingKey将Queue Binding到一个Exchange上
第五:通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,
第六:Consumer在接收时也是获取connection,接着获取channel,然后指定一个Queue,到Queue上取消息,它对Exchange,RoutingKey及如何Binding都不关心,到对应的Queue上去取消息就行了。

注意:一个PublisherClient发送消息,哪些ConsumerClient可以收到消息,在于Exchange,RoutingKey,Queue的关系上

RabbitMQ支持的消息模型

RabbitMQ支持的消息模型-1

RabbitMQ支持的消息模型-2

  1. 简单模式 Simple
  2. 工作模式 Work
  3. 发布订阅模式
  4. 路由模式
  5. 主题 Topic模式
  6. 参数模式

RabbitMQ - fanout 模式

fanout模式

发布订阅模式的具体实现

  1. 类型:fanout
  2. 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//简单模式
public class Producer{
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.15.0.9");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection("生产者");
//2.创建通道
Channel channel = connection.createChannel();
//3.通过创建交换机,声明队列,绑定关系,路由key,发送消息和接受消息
/*参数1: 是否持久化,非持久化消息会存盘吗?会存盘,但是会随着重启服务器而丢失
参数2:是否独占队列
参数3:是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
参数4:携带附属属性
*/
String queueName = "queue1";
channel.queueDeclare(queueName,false,false,false,null);
//4.发送消息给队列queue
/*参数1: 交换机
参数2:队列、路由key
参数3:消息的状态控制
参数4:消息主题
*/
//面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机
String message = "Hello";
//5.准备交换机
String exchangeName = "fanout-exchange";
//6.定义路由key
String routeKey = "";
//7.指定交换机的类型
String type = "fanout";
channel.basicPublish(exchangeName,routeKey, null,message.getBytes());
//8.关闭
channel.close();
connection.close();
}

RabbitMQ - direct 模式

direct模式

1
2
3
4
5
6
//6.定义路由key
String routeKey = "email";
//7.指定交换机的类型
String type = "direct";
channel.basicPublish(exchangeName,routeKey, null,message.getBytes());

RabbitMQ - topic 模式

topic模式

#:匹配0、1或多个
*:匹配一个

1
2
3
4
5
6
//6.定义路由key
String routeKey = "com.order.test.xxx";
//7.指定交换机的类型
String type = "direct";
channel.basicPublish(exchangeName, routeKey, null,message.getBytes());

代码创建及绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//5.准备交换机
String exchangeName = "direct_message_exchange";
String exchangeType = "direct";
//如果你用界面把queue和exchange的关系先绑定话,代码就不需要在编写这些声明代码可以让代码变得更简洁
//如果用代码的方式去声明,我们要学习一下
//6.声明交换机 所谓的持久化就是指,交换机会不会随着服务器重启造成丢失
channel.exchangeDeclare(exchangeName,exchangeType,true);

//7.声明队列
channel.queueDeclare("queue5",true,false,false,null);
channel.queueDeclare("queue6",true,false,false,null);
channel.queueDeclare("queue7",true,false,false,null);

//8.绑定队列和交换机的关系
channel.queueBind("queue5",exchangeName,"order");
channel.queueBind("queue6",exchangeName,"order");
channel.queueBind("queue7",exchangeName,"course");

channel.basicPublish(exchangeName,course, null,message.getBytes());

RabbitMQ - header模式

headers模式

RabbitMQ - work模式

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:

  1. 轮询模式的分发:一个消费者一条,按均分配
  2. 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配

work模式轮询模式(Round-Robin)

work模式-轮询模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.monochrome.rabbitmq.work.polling;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author monochrome
* @date 2021/7/4
*/
public class Producer {

public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("monochrome");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("生产者");
//2.创建通道
channel = connection.createChannel();
//3.通过创建交换机,声明队列,绑定关系,路由key,发送消息和接受消息
/*参数1:是否持久化,非持久化消息会存盘吗?会存盘,但是会随着重启服务器而丢失
参数2:是否独占队列
参数3:是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
参数4:携带附属属性
*/
String queueName = "queue-polling";
channel.queueDeclare(queueName, false, false, false, null);

//4.发送消息给队列queue
/*参数1:交换机
参数2:队列、路由key
参数3:消息的状态控制
参数4:消息主题
*/
for (int i = 0; i < 20; i++) {
String message = "Hello" + i;
channel.basicPublish("", queueName, null, message.getBytes());
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}finally {
//5.关闭
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.monochrome.rabbitmq.work.polling;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* @author monochrome
* @date 2021/7/4
*/
public class Consumer {

private static final Runnable runnable = new Runnable() {
@Override
public void run() {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("monochrome");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者");
//2.创建通道
channel = connection.createChannel();

String workName = Thread.currentThread().getName();
String routingName = "queue-polling";

channel.basicConsume(routingName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(workName + "收到消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8));
switch (workName) {
case "work1":
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
case "work2":
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}

}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(workName + "接收消息失败!");
}
});
System.out.println("开始接收消息:");
while (true) {

}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}finally {
//5.关闭
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};

public static void main(String[] args) {
new Thread(runnable, "work1").start();
new Thread(runnable, "work2").start();
}
}

work模式公平分发模式

work模式-公平模式

生产者同轮询模式

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.monochrome.rabbitmq.work.fair;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* @author monochrome
* @date 2021/7/4
*/
public class Consumer {

private static final Runnable runnable = new Runnable() {
@Override
public void run() {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("monochrome");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者");
//2.创建通道
channel = connection.createChannel();

String workName = Thread.currentThread().getName();
String routingName = "queue-polling";

//公平分发需要改成手动应答
Channel finalChannel = channel;
int prefetchCount = 1 ;
//这告诉 RabbitMQ 一次不要给一个工人多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会将它分派给下一个不忙的工人。
finalChannel.basicQos(prefetchCount);
channel.basicConsume(routingName, false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(workName + "收到消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8));
switch (workName) {
case "work1":
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
case "work2":
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println(workName + "接收消息失败!");
}
});
System.out.println("开始接收消息:");
while (true) {

}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}finally {
//5.关闭
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};

public static void main(String[] args) {
new Thread(runnable, "work1").start();
new Thread(runnable, "work2").start();
}
}

Springboot集成RabbitMQ

application.yml

1
2
3
4
5
6
7
8
9
server:
port: 8081
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: monochrome
virtual-host: /

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class OrderServiceImpl implements IOrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

@Override
public void createOrder() {
String uuid = UUID.randomUUID().toString();
System.out.println("生成订单:" + uuid);
String exchangeName = "fanout-order-exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName, routingKey, uuid);
}
}

消费者

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
public class RabbitMQConfiguration {

@Bean
public FanoutExchange fanoutOrderExchange() {
return new FanoutExchange("fanout-order-exchange", true, false);
}

@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue wechatQueue() {
return new Queue("wechat.fanout.queue", true);
}

@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding wechatBinding() {
return BindingBuilder.bind(wechatQueue()).to(fanoutOrderExchange());
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class FanoutEmailConsumer {

@RabbitHandler
public void receiveMessage(String message) {
System.out.println("email接收到消息:" + message);
}

}
1
2
3
4
5
6
7
8
@RabbitListener(queues = {"sms.fanout.queue"})
@Component
public class FanoutSmsConsumer {

@RabbitHandler
public void receiveMessage(String message) {
System.out.println("sms接收到消息:" + message);
}
1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = {"wechat.fanout.queue"})
@Component
public class FanoutWechatConsumer {

@RabbitHandler
public void receiveMessage(String message) {
System.out.println("wechat接收到消息:" + message);
}

}

完整代码:https://github.com/zhaoyangmushiyi/springboot-rabbitmq