同步/异步调用
同步调用
同步调用优点:
- 时效性强,可以立即得到结果
微服务键基于 Feign 的调用就属于同步方式,存在一些问题:
耦合度高
- 每次加入新需求,都要修改原来的代码
性能下降
- 调用者需要等待服务器提供响应,如果调用链过长则响应时间等于每次调用的时间之和(性能和吞吐能力下降)
资源浪费
- 调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下极度浪费系统资源
级联失败
- 如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌,迅速导致整个微服务群故障
异步调用
异步调用常见实现就是事件驱动模式
异步调用的优点:
- 解除耦合
- 提升性能
- 故障隔离
- 流量削峰
其缺点是:
- 依赖 Broker 的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
MQ
MQ (Message Queue),译为消息队列,字面来看就是存放消息的队列。
也就是事件驱动架构中的 Broker
- RabbitMQ
- RocketMQ
- Kafka
- ActiveMQ
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP, XMPP, SMTP, STOMP | OpenWire, STOMP, REST, XMPP, AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
常规业务只做消息队列的, 一般用 RabbitMQ 或 RocketMQ
RabbitMQ
使用 Docker 快速安装:
docker run \
-e RABBITMQ_DEFAULT_USER=voilone \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
安装完成并启动后,可通过端口 15672
访问RabbitMQ 的控制台
RabbitMQ的结构与概念:
RabbitMQ的概念:
- channel:操作 MQ 的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对 queue、exchange 等资源的逻辑分组
常见消息模型
MQ 的官方文档中给出了 5 个 MQ 的 Demo 示例, 对应了几种不同的用法:
- 基本消息队列
- 工作消息队列
发布订阅(Publish,Subscribe),根据交换机类型不同分为三种
- Fanout Exchange:广播
- Direct Exchange:路由
- Topic Exchange:主题
入门
编写一个 Hello World 案例,包括三个角色:
- publisher:消息发布者,将消息发送到队列 queue
- queue:消息队列,复杂接受并缓存消息
- consumer:订阅队列,处理队列中的消息
flowchart LR
publisher ---> queue ---> consumer
这里使用 Java编写
先导入依赖:
<!-- - AMQP 依赖, 包含RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
还包含 Spring Boot 依赖和 Spring 单元测试依赖
创建两个 Spring Boot 项目,一个作为发布者,一个作为消费者:
发布者单元测试:
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("voilone");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
消费者单元测试:
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("voilone");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
Spring AMQP
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,springrabbit是底层的默认实现。
Advanced Message Queuing Protocol, 是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关, 更符合微服务中独立性的要求。
基础消息队列
引入 Spring AMQP 依赖, 并添加配置:
spring:
rabbitmq:
host: 192.168.233.128 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: voilone # 用户名
password: 123456 # 密码
在发布者项目中创建一个单测类,发送消息:
利用 RabbitTemplate 发送基本消息 publisher
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate; // 借用 template 发消息
/**
* 基础消息队列
*/
@Test
public void testSimpleQueue() {
String queueName = "simple.queue"; // 队列名 (必须已有)
String message = "Hello, RibbitMQ";
rabbitTemplate.convertAndSend(queueName, message);
System.out.println("已发送消息至队列 " + queueName);
}
}
在消费者项目中,创建类,使用 @RabbitListener
接受消息:
@Component
public class SpringRabbitListener{
@RabbitListener(queues = "simple.queue")
public void receiveBasicMessageAndOne2One(String msg) {
System.out.println("接收到消息 " + msg);
}
}
工作队列
Work queue,工作队列,可提高消息处理速度,避免队列消息堆积
模拟 Work Queue,实现一个队列绑定多个消费者
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
模拟方法:
- 在 publisher 服务中定义测试方法, 每秒产生50条消息, 发送到
simple.queue
- 在 consumer 服务中定义两个消息监听者, 都监听
simple.queue
队列 - 消费者 1 每秒处理 50 条消息, 消费者 2 每秒处理 10 条消息
模拟运行后,会发现消费者 1 和消费者 2 的消息会被均分处理
work 1 接收到消息:【Work queue, 50 times per seconds0】2025-07-07T16:09:33.466246400
work 2 接收到消息:【Work queue, 50 times per seconds1】2025-07-07T16:09:33.475765700
work 1 接收到消息:【Work queue, 50 times per seconds2】2025-07-07T16:09:33.506358
work 1 接收到消息:【Work queue, 50 times per seconds4】2025-07-07T16:09:33.568549100
work 2 接收到消息:【Work queue, 50 times per seconds3】2025-07-07T16:09:33.577303400
work 1 接收到消息:【Work queue, 50 times per seconds6】2025-07-07T16:09:33.630413400
work 2 接收到消息:【Work queue, 50 times per seconds5】2025-07-07T16:09:33.678346300
work 1 接收到消息:【Work queue, 50 times per seconds8】2025-07-07T16:09:33.690692800
work 1 接收到消息:【Work queue, 50 times per seconds10】2025-07-07T16:09:33.755916400
work 2 接收到消息:【Work queue, 50 times per seconds7】2025-07-07T16:09:33.785482800
...
默认情况下,RabbitMQ 会把所有消息预先进行均分处理
我们可以配置 application.yml
来控制消费预取限制:
spring:
rabbitmq:
listener:
simple:
# 消费预取机制,处理完这条立马去获取下一条,谁的速度快谁就多处理(能者多劳)
prefetch: 1
配置完成后,输出结果为:
work 1 接收到消息:【Work queue, 50 times per seconds0】2025-07-07T16:17:46.955150400
work 2 接收到消息:【Work queue, 50 times per seconds1】2025-07-07T16:17:46.964781900
work 1 接收到消息:【Work queue, 50 times per seconds2】2025-07-07T16:17:46.994906500
work 1 接收到消息:【Work queue, 50 times per seconds3】2025-07-07T16:17:47.026479500
work 1 接收到消息:【Work queue, 50 times per seconds4】2025-07-07T16:17:47.056551500
work 1 接收到消息:【Work queue, 50 times per seconds5】2025-07-07T16:17:47.086976900
...
通过 prefetch 来控制消费者预取的消息数量,实现能者多劳
广播模式
Fanout Exchange 会将接受到的消息广播道每一个跟其绑定的 queue
Fanout 这里译为 广播,Exchange 为交换机
一旦消息发送,所有消费者都会接受到其队列消息
模拟 Fanout Exchange:
- 在 consumer 服务中,利用代码声明队列、交换机,并将两者绑定
- 在 consumer 服务中,编写两个消费方法,分别监听
fanout.queue1
和fanout.queue2
- 在 publisher 中编写测试方法,向
voilone.fanout
发送消息
我们要在 consumer 服务中创建配置类,来配置交换机和队列, 并绑定关系:
@Configuration
public class ExchangeConfig {
// 声明交换机 voilone.fanout
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("voilone.fanout");
}
// 声明队列:fanout.queue1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
// 绑定关系
@Bean
public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 声明队列:fanout.queue2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
// 绑定
@Bean
public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
注意一点,在发送消息时,要有三个入参,才是发送到 Exchange 中
// 若是两个参,第一个参数就是队列名
rabbitTemplate.convertAndSend(exchangeName, "", message);
交换机的作用:
- 接收 publisher 发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- Fanout Exchange 会将消息路由到每个绑定的队列
路由模式
广播模式在经过交换机时,没有设置 routingKey
,因此能够实现消息广播
路由模式,即 Direct Exchange,会将接收到的消息根据规则路由到指定的 Queue
构建消费者:
@Component
public class DirectListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "voilone.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void testConsumerDirectQueue1(String msg) {
System.out.println("direct-q1 receive msg:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "voilone.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void testConsumerDirectQueue2(String msg) {
System.err.println("direct-q2 receive msg:" + msg);
}
}
这里注解的 bindings 做了队列的声明工作,因此可不用额外写配置代码
publisher 发送消息:
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
Direct 交换机与 Fanout 交换机的差异:
- Fanout 交换机将消息路由给每一个与之绑定的队列
- Direct 交换机根据
RoutingKey
判断路由给哪个队列 - 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
主题模式
与路由模式不同,Topic 支持通配符路由
模拟 Topic Exchange:
- 声明对应的消息监听器
- 在 consumer 服务中,编写两个消费者方法,分别监听
topic.queue1
和topic.queue2
- 在 publisher 中编写测试方法,向
voilone.topic
发送消息
创建 Topic Exchange 监听器:
@Component
public class TopicMqListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "voilone.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void testConsumerTopicQueue1(String msg) {
System.out.println("topic-q1 receive msg:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "voilone.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void testConsumerTopicQueue2(String msg) {
System.err.println("topic-q2 receive msg:" + msg);
}
}
通配符:
#
代指 0 个或多个单词*
代指一个单词
以.
分割单词,比如china.news.now
,路由键为china.#
匹配的单词为 news 和 now
发送消息:
rabbitTemplate.convertAndSend("voilone.topic", "china.news", message);
对象消息
实际项目中,用到最多的是对象消息
先在 RabbitMQ 中声明队列 object.queue
然后在 publisher 中创建测试方法, 发送对象消息:
@Test
public void testSendObjectMessage() {
// 填充消息(默认16。要存的数据如果明显小于16对应负载因子的大小-12,建议声明时指定大小)
// 要存储元素个数/0.75
Map<String, Object> message = new HashMap<>(4);
message.put("name", "voilone");
message.put("year", 2025);
message.put("state", "programing");
rabbitTemplate.convertAndSend("object.queue", message);
}
Spring 的对消息对象的处理是由 org.springframework.amqp.support.converter.MessageConverter
来处理的。而默认实现是 SimpleMessageConverter
, 基于 JDK 的 ObjectOutputStream
完成序列化。
定义一个 MessageConverter
类型的 Bean 即可指定序列化。推荐 JSON 方式的进行序列化
在 publisher 中引入依赖 jackson-databind
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
在启动类中添加序列化方法:
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
在 consumer 服务中重复 publisher 的序列化操作,以接收 JSON 序列化消息
最后接受对象消息:
@Component
public class ObjectListener {
@RabbitListener(queues = "object.queue")
public void testConsumerObjectMessage(Map<String, Object> message) {
System.out.println("receive object message:" + message);
}
}