消息中间件
消息中间件的含义
- 利用可靠的消息传递机制进行系统和系统直接的通讯
- 通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯
消息中间件应用的场景
- 跨系统数据传递
- 高并发的流量削峰
- 数据的并发和异步处理
- 大数据分析与传递
- 分布式事务比如你有一个数据要进行迁移或者请求并发过多的时候,
常见的消息中间件
ActiveMQ、RabbitMQ、Kafka、RocketMQ等
消息中间件的本质及设计
它是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务
MQ消息队列:负责数据的传接受,存储和传递,所以性能要高于普通服务和技术
消息中间件的核心组成部分
消息的协议
消息的持久化机制
消息的分发策略
消息的高可用,高可靠
消息的容错机制
消息队列协议
AMQP协议
AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现由 RabbitMQ等
特性:
- 分布式事务支持
- 消息的持久化支持
- 高性能和高可靠的消息处理优势
支持者:RabbitMQ,ActiveMQ
MQTT协议
MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分
特点:
- 轻量
- 结构简单
- 传输快,不支持事务
- 没有持久化设计
应用场景:
- 适用于计算能力有限
- 低带宽
- 网络不稳定的场景
支持者:RabbitMQ,ActiveMQ
OpenMessage协议
是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准
特点:
- 结构简单
- 解析速度快
- 支持事务和持久化设计
支持者:RocketMQ
Kafka协议
Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成
特点:
- 结构简单
- 解析速度快
- 无事务支持
- 有持久化设计
小结
协议:实在 tcp/ip协议基础之上构建的一种约定俗称的规范和机制、它的主要目的可以让客户端(应用程序 java,go)进行沟通和通讯。并且这种写一下规范必须具有持久性,高可用,高可靠的性能
消息的分发策略
|
ActiveMQ |
RabbitMQ |
Kafka |
RocketMQ |
发布订阅 |
支持 |
支持 |
支持 |
支持 |
轮询分发 |
支持 |
支持 |
支持 |
/ |
公平分发 |
/ |
支持 |
支持 |
/ |
重发 |
支持 |
支持 |
/ |
支持 |
消息拉取 |
/ |
支持 |
支持 |
支持 |
RabbitMQ
Docker安装RabbitMQ
获取rabbit镜像
1
| docker pull rabbitmq:management
|
创建并运行容器
1
| docker run -id --name docker-rabbitmq-management -p 15672:15672 -p 5672:5672 rabbitmq:management
|
添加用户并授权权限
新增用户
1
| rabbitmqctl add_user admin admin
|
设置用户分配操作权限
1
| rabbitmqctl set_user_tags admin administrator
|
用户级别:
administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
policymaker和monitoring可以做的任何事外加:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections
monitoring:监控者 登录控制台,查看所有信息
management可以做的任何事外加:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息
policymaker:策略制定者 登录控制台,指定策略
management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parameters
managment 普通管理员 登录控制台
用户可以通过AMQP做的任何事外加:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
为用户添加资源权限
1
| rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
|
RabbitMQ入门案例 - Simple 简单模式
导入依赖
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.12.0</version> </dependency>
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("monochrome"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("生产者"); channel = connection.createChannel();
String queueName = "queue1"; channel.queueDeclare(queueName, false, false, false, null);
String message = "Hello"; channel.basicPublish("", queueName, null, message.getBytes()); } catch (IOException | TimeoutException e) { e.printStackTrace(); }finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import com.rabbitmq.client.*;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Consumer { public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("monochrome"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者"); channel = connection.createChannel();
String queueName = "queue1";
channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println("收到消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8)); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("接收消息失败!"); } }); System.out.println("开始接收消息"); while (true) {
} } catch (IOException | TimeoutException e) { e.printStackTrace(); }finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
|
AMQP
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计
AMQP生产者流转过程
AMQP消费者流转过程
RabbitMQ的核心组成部分
RabbitMQ的核心组成部分
Server:又被称为Broker,接受客户端的连接,实现AMQP服务。就是我们自己安装的rabbitmq-server
Connection:连接,应用程序与Broker的网络连接(使用的是TCP/IP连接)
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,每个通道Channel代表一个会话任务。
Message:消息,服务于应用程序之间传递的数据,由Properties和body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特征,Body则是消息体的内容。
Virtual Host:虚拟地址,用于逻辑层隔离,最上层的消息路由,一个虚拟机理由可以有若干的Exchange和Queueu,同一个虚拟机里面不能有相同名字的Exchange。
Exchange:交换机,接收消息,根据路由键发送消息到绑定的队列(不具备储存消息的能力)
Bindings:Exchange和Queue之间的虚拟连接,Binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列,也是MessageQueue队列,保存消息转发给消费者
RabbitMQ的操作流程
第一:获取Conection
第二:获取Channel
第三:定义Exchange,Queue
第四:使用一个RoutingKey将Queue Binding到一个Exchange上
第五:通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,
第六:Consumer在接收时也是获取connection,接着获取channel,然后指定一个Queue,到Queue上取消息,它对Exchange,RoutingKey及如何Binding都不关心,到对应的Queue上去取消息就行了。
注意:一个PublisherClient发送消息,哪些ConsumerClient可以收到消息,在于Exchange,RoutingKey,Queue的关系上
RabbitMQ支持的消息模型
- 简单模式 Simple
- 工作模式 Work
- 发布订阅模式
- 路由模式
- 主题 Topic模式
- 参数模式
RabbitMQ - fanout 模式
发布订阅模式的具体实现
- 类型:fanout
- 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class Producer{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("10.15.0.9"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection("生产者"); Channel channel = connection.createChannel();
String queueName = "queue1"; channel.queueDeclare(queueName,false,false,false,null);
String message = "Hello"; String exchangeName = "fanout-exchange"; String routeKey = ""; String type = "fanout"; channel.basicPublish(exchangeName,routeKey, null,message.getBytes()); channel.close(); connection.close(); }
|
RabbitMQ - direct 模式
1 2 3 4 5 6
| String routeKey = "email";
String type = "direct"; channel.basicPublish(exchangeName,routeKey, null,message.getBytes());
|
RabbitMQ - topic 模式
#:匹配0、1或多个
*:匹配一个
1 2 3 4 5 6
| String routeKey = "com.order.test.xxx";
String type = "direct"; channel.basicPublish(exchangeName, routeKey, null,message.getBytes());
|
代码创建及绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| String exchangeName = "direct_message_exchange"; String exchangeType = "direct";
channel.exchangeDeclare(exchangeName,exchangeType,true);
channel.queueDeclare("queue5",true,false,false,null); channel.queueDeclare("queue6",true,false,false,null); channel.queueDeclare("queue7",true,false,false,null);
channel.queueBind("queue5",exchangeName,"order"); channel.queueBind("queue6",exchangeName,"order"); channel.queueBind("queue7",exchangeName,"course");
channel.basicPublish(exchangeName,course, null,message.getBytes());
|
RabbitMQ - work模式
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
- 轮询模式的分发:一个消费者一条,按均分配
- 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配
work模式轮询模式(Round-Robin)
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.monochrome.rabbitmq.work.polling;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("monochrome"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("生产者"); channel = connection.createChannel();
String queueName = "queue-polling"; channel.queueDeclare(queueName, false, false, false, null);
for (int i = 0; i < 20; i++) { String message = "Hello" + i; channel.basicPublish("", queueName, null, message.getBytes()); } } catch (IOException | TimeoutException e) { e.printStackTrace(); }finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| package com.monochrome.rabbitmq.work.polling;
import com.rabbitmq.client.*;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;
public class Consumer {
private static final Runnable runnable = new Runnable() { @Override public void run() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("monochrome"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者"); channel = connection.createChannel();
String workName = Thread.currentThread().getName(); String routingName = "queue-polling";
channel.basicConsume(routingName, true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println(workName + "收到消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8)); switch (workName) { case "work1": try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } break; case "work2": try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } break; }
} }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println(workName + "接收消息失败!"); } }); System.out.println("开始接收消息:"); while (true) {
} } catch (IOException | TimeoutException e) { e.printStackTrace(); }finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } };
public static void main(String[] args) { new Thread(runnable, "work1").start(); new Thread(runnable, "work2").start(); } }
|
work模式公平分发模式
生产者同轮询模式
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| package com.monochrome.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;
public class Consumer {
private static final Runnable runnable = new Runnable() { @Override public void run() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("monochrome"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = connectionFactory.newConnection("消费者"); channel = connection.createChannel();
String workName = Thread.currentThread().getName(); String routingName = "queue-polling";
Channel finalChannel = channel; int prefetchCount = 1 ; finalChannel.basicQos(prefetchCount); channel.basicConsume(routingName, false, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { System.out.println(workName + "收到消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8)); switch (workName) { case "work1": try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } break; case "work2": try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } break; } finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println(workName + "接收消息失败!"); } }); System.out.println("开始接收消息:"); while (true) {
} } catch (IOException | TimeoutException e) { e.printStackTrace(); }finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } };
public static void main(String[] args) { new Thread(runnable, "work1").start(); new Thread(runnable, "work2").start(); } }
|
Springboot集成RabbitMQ
application.yml
1 2 3 4 5 6 7 8 9
| server: port: 8081 spring: rabbitmq: host: localhost port: 5672 username: admin password: monochrome virtual-host: /
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Service public class OrderServiceImpl implements IOrderService {
@Autowired private RabbitTemplate rabbitTemplate;
@Override public void createOrder() { String uuid = UUID.randomUUID().toString(); System.out.println("生成订单:" + uuid); String exchangeName = "fanout-order-exchange"; String routingKey = ""; rabbitTemplate.convertAndSend(exchangeName, routingKey, uuid); } }
|
消费者
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Configuration public class RabbitMQConfiguration {
@Bean public FanoutExchange fanoutOrderExchange() { return new FanoutExchange("fanout-order-exchange", true, false); }
@Bean public Queue smsQueue() { return new Queue("sms.fanout.queue", true); } @Bean public Queue emailQueue() { return new Queue("email.fanout.queue", true); } @Bean public Queue wechatQueue() { return new Queue("wechat.fanout.queue", true); }
@Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()); } @Bean public Binding emailBinding() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()); } @Bean public Binding wechatBinding() { return BindingBuilder.bind(wechatQueue()).to(fanoutOrderExchange()); } }
|
Consumer
1 2 3 4 5 6 7 8 9 10
| @RabbitListener(queues = {"email.fanout.queue"}) @Component public class FanoutEmailConsumer {
@RabbitHandler public void receiveMessage(String message) { System.out.println("email接收到消息:" + message); }
}
|
1 2 3 4 5 6 7 8
| @RabbitListener(queues = {"sms.fanout.queue"}) @Component public class FanoutSmsConsumer {
@RabbitHandler public void receiveMessage(String message) { System.out.println("sms接收到消息:" + message); }
|
1 2 3 4 5 6 7 8 9 10
| @RabbitListener(queues = {"wechat.fanout.queue"}) @Component public class FanoutWechatConsumer {
@RabbitHandler public void receiveMessage(String message) { System.out.println("wechat接收到消息:" + message); }
}
|
完整代码:https://github.com/zhaoyangmushiyi/springboot-rabbitmq