Loading... ## 同步/异步调用 ### 同步调用 同步调用优点: - 时效性强,可以立即得到结果 微服务键基于 Feign 的调用就属于同步方式,存在一些问题: - 耦合度高 - 每次加入新需求,都要修改原来的代码 - 性能下降 - 调用者需要等待服务器提供响应,如果调用链过长则响应时间等于每次调用的时间之和(性能和吞吐能力下降) - 资源浪费 - 调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下极度浪费系统资源 - 级联失败 - 如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌,迅速导致整个微服务群故障 --- ### 异步调用 异步调用常见实现就是事件驱动模式 异步调用的优点: * 解除耦合 * 提升性能 * 故障隔离 * 流量削峰 其缺点是: - 依赖 Broker 的可靠性、安全性、吞吐能力 - 架构复杂了,业务没有明显的流程线,不好追踪管理 --- ## MQ MQ (**M**essage **Q**ueue),译为消息队列,字面来看就是存放消息的队列。 也就是事件驱动架构中的 Broker - RabbitMQ - RocketMQ - Kafka - ActiveMQ | | RabbitMQ | ActiveMQ | RocketMQ | Kafka | | :--------: | :---------------------: | :-------------------------------: | :--------: | :--------: | | 公司/社区 | Rabbit | Apache | 阿里 | Apache | | 开发语言 | Erlang | Java | Java | Scala&Java | | 协议支持 | AMQP, XMPP, SMTP, STOMP | OpenWire, STOMP, REST, XMPP, AMQP | 自定义协议 | 自定义协议 | | 可用性 | 高 | 一般 | 高 | 高 | | 单机吞吐量 | 一般 | 差 | 高 | 非常高 | | 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 | | 消息可靠性 | 高 | 一般 | 高 | 一般 | > 常规业务只做消息队列的, 一般用 RabbitMQ 或 RocketMQ --- ## RabbitMQ 使用 Docker 快速安装: ```bash docker run \ -e RABBITMQ_DEFAULT_USER=voilone \ -e RABBITMQ_DEFAULT_PASS=123456 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8-management ``` 安装完成并启动后,可通过端口 `15672` 访问RabbitMQ 的控制台 --- RabbitMQ的结构与概念:  RabbitMQ的概念: - channel:操作 MQ 的工具 - exchange:路由消息到队列中 - queue:缓存消息 - virtual host:虚拟主机,是对 queue、exchange 等资源的逻辑分组 --- ### 常见消息模型 MQ 的官方文档中给出了 5 个 MQ 的 Demo 示例, 对应了几种不同的用法: 1. 基本消息队列  2. 工作消息队列  - 发布订阅(Publish,Subscribe),根据交换机类型不同分为三种 3. Fanout Exchange:广播 4. Direct Exchange:路由 5. Topic Exchange:主题    --- ### 入门 编写一个 Hello World 案例,包括三个角色: - publisher:消息发布者,将消息发送到队列 queue - queue:消息队列,复杂接受并缓存消息 - consumer:订阅队列,处理队列中的消息 ```mermaid flowchart LR publisher ---> queue ---> consumer ``` 这里使用 Java编写 先导入依赖: ```xml <!-- - AMQP 依赖, 包含RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> ``` > 还包含 Spring Boot 依赖和 Spring 单元测试依赖 创建两个 Spring Boot 项目,一个作为发布者,一个作为消费者: 发布者单元测试: ```java public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("voilone"); factory.setPassword("123456"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } } ``` 消费者单元测试: ```java public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.233.128"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("voilone"); factory.setPassword("123456"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } } ``` 基本消息队列的消息**发送流程**: 1. 建立connection 2. 创建channel 3. 利用channel声明队列 4. 利用channel向队列发送消息 基本消息队列的消息**接收流程**: 1. 建立connection 2. 创建channel 3. 利用channel声明队列 4. 定义consumer的消费行为handleDelivery() 5. 利用channel将消费者与队列绑定 --- ### Spring AMQP Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,springrabbit是底层的默认实现。 > **Advanced Message Queuing Protocol**, 是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关, 更符合微服务中独立性的要求。 --- #### 基础消息队列 引入 Spring AMQP 依赖, 并添加配置: ```yaml spring: rabbitmq: host: 192.168.233.128 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: voilone # 用户名 password: 123456 # 密码 ``` 在发布者项目中创建一个单测类,发送消息: 利用 RabbitTemplate 发送基本消息 `publisher` ```java @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; // 借用 template 发消息 /** * 基础消息队列 */ @Test public void testSimpleQueue() { String queueName = "simple.queue"; // 队列名 (必须已有) String message = "Hello, RibbitMQ"; rabbitTemplate.convertAndSend(queueName, message); System.out.println("已发送消息至队列 " + queueName); } } ``` 在消费者项目中,创建类,使用 `@RabbitListener` 接受消息: ```java @Component public class SpringRabbitListener{ @RabbitListener(queues = "simple.queue") public void receiveBasicMessageAndOne2One(String msg) { System.out.println("接收到消息 " + msg); } } ``` --- #### 工作队列 Work queue,工作队列,可提高消息处理速度,避免队列消息堆积  模拟 Work Queue,实现一个队列绑定多个消费者 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理 --- 模拟方法: 1. 在 publisher 服务中定义测试方法, 每秒产生50条消息, 发送到 `simple.queue` 2. 在 consumer 服务中定义两个消息监听者, 都监听 `simple.queue` 队列 3. 消费者 1 每秒处理 50 条消息, 消费者 2 每秒处理 10 条消息 模拟运行后,会发现消费者 1 和消费者 2 的消息会被均分处理 ```plaintext work 1 接收到消息:【Work queue, 50 times per seconds0】2025-07-07T16:09:33.466246400 work 2 接收到消息:【Work queue, 50 times per seconds1】2025-07-07T16:09:33.475765700 work 1 接收到消息:【Work queue, 50 times per seconds2】2025-07-07T16:09:33.506358 work 1 接收到消息:【Work queue, 50 times per seconds4】2025-07-07T16:09:33.568549100 work 2 接收到消息:【Work queue, 50 times per seconds3】2025-07-07T16:09:33.577303400 work 1 接收到消息:【Work queue, 50 times per seconds6】2025-07-07T16:09:33.630413400 work 2 接收到消息:【Work queue, 50 times per seconds5】2025-07-07T16:09:33.678346300 work 1 接收到消息:【Work queue, 50 times per seconds8】2025-07-07T16:09:33.690692800 work 1 接收到消息:【Work queue, 50 times per seconds10】2025-07-07T16:09:33.755916400 work 2 接收到消息:【Work queue, 50 times per seconds7】2025-07-07T16:09:33.785482800 ... ``` 默认情况下,RabbitMQ 会把所有消息预先进行均分处理 我们可以配置 `application.yml` 来控制消费预取限制: ```yaml spring: rabbitmq: listener: simple: # 消费预取机制,处理完这条立马去获取下一条,谁的速度快谁就多处理(能者多劳) prefetch: 1 ``` 配置完成后,输出结果为: ```plaintext work 1 接收到消息:【Work queue, 50 times per seconds0】2025-07-07T16:17:46.955150400 work 2 接收到消息:【Work queue, 50 times per seconds1】2025-07-07T16:17:46.964781900 work 1 接收到消息:【Work queue, 50 times per seconds2】2025-07-07T16:17:46.994906500 work 1 接收到消息:【Work queue, 50 times per seconds3】2025-07-07T16:17:47.026479500 work 1 接收到消息:【Work queue, 50 times per seconds4】2025-07-07T16:17:47.056551500 work 1 接收到消息:【Work queue, 50 times per seconds5】2025-07-07T16:17:47.086976900 ... ``` > 通过 prefetch 来控制消费者预取的消息数量,实现**能者多劳** --- #### 广播模式 Fanout Exchange 会将接受到的消息广播道每一个跟其绑定的 queue > Fanout 这里译为 广播,Exchange 为交换机  一旦消息发送,所有消费者都会接受到其队列消息 模拟 Fanout Exchange: 1. 在 consumer 服务中,利用代码声明队列、交换机,并将两者绑定 2. 在 consumer 服务中,编写两个消费方法,分别监听 `fanout.queue1` 和 `fanout.queue2` 3. 在 publisher 中编写测试方法,向 `voilone.fanout` 发送消息 我们要在 consumer 服务中创建配置类,来配置交换机和队列, 并绑定关系: ```java @Configuration public class ExchangeConfig { // 声明交换机 voilone.fanout @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("voilone.fanout"); } // 声明队列:fanout.queue1 @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } // 绑定关系 @Bean public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } // 声明队列:fanout.queue2 @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } // 绑定 @Bean public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } } ``` 注意一点,在发送消息时,要有三个入参,才是发送到 Exchange 中 ```java // 若是两个参,第一个参数就是队列名 rabbitTemplate.convertAndSend(exchangeName, "", message); ``` --- 交换机的作用: * 接收 publisher 发送的消息 * 将消息按照规则路由到与之绑定的队列 * 不能缓存消息,路由失败,消息丢失 * Fanout Exchange 会将消息路由到每个绑定的队列 --- #### 路由模式 广播模式在经过交换机时,没有设置 `routingKey`,因此能够实现消息广播 路由模式,即 Direct Exchange,会将**接收到的消息根据规则路由**到指定的 Queue 构建消费者: ```java @Component public class DirectListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "voilone.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void testConsumerDirectQueue1(String msg) { System.out.println("direct-q1 receive msg:" + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "voilone.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void testConsumerDirectQueue2(String msg) { System.err.println("direct-q2 receive msg:" + msg); } } ``` > 这里注解的 bindings 做了队列的声明工作,因此可不用额外写配置代码 publisher 发送消息: ```java rabbitTemplate.convertAndSend(exchangeName, "blue", message); ``` --- Direct 交换机与 Fanout 交换机的**差异**: * Fanout 交换机将消息路由给每一个与之绑定的队列 - Direct 交换机根据 `RoutingKey` 判断路由给哪个队列 * 如果多个队列具有相同的RoutingKey,则与Fanout功能类似 --- #### 主题模式 与路由模式不同,Topic 支持通配符路由 模拟 Topic Exchange: 1. 声明对应的消息监听器 2. 在 consumer 服务中,编写两个消费者方法,分别监听 `topic.queue1` 和 `topic.queue2` 3. 在 publisher 中编写测试方法,向 `voilone.topic` 发送消息 创建 Topic Exchange 监听器: ```java @Component public class TopicMqListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "voilone.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void testConsumerTopicQueue1(String msg) { System.out.println("topic-q1 receive msg:" + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "voilone.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void testConsumerTopicQueue2(String msg) { System.err.println("topic-q2 receive msg:" + msg); } } ``` 通配符: - `#` 代指 0 个或多个单词 - `*` 代指一个单词 > 以 `.` 分割单词,比如 `china.news.now`,路由键为 `china.#` 匹配的单词为 news 和 now 发送消息: ```java rabbitTemplate.convertAndSend("voilone.topic", "china.news", message); ``` --- ### 对象消息 实际项目中,用到最多的是对象消息 先在 RabbitMQ 中声明队列 `object.queue` 然后在 publisher 中创建测试方法, 发送对象消息: ```java @Test public void testSendObjectMessage() { // 填充消息(默认16。要存的数据如果明显小于16对应负载因子的大小-12,建议声明时指定大小) // 要存储元素个数/0.75 Map<String, Object> message = new HashMap<>(4); message.put("name", "voilone"); message.put("year", 2025); message.put("state", "programing"); rabbitTemplate.convertAndSend("object.queue", message); } ``` Spring 的对消息对象的处理是由 `org.springframework.amqp.support.converter.MessageConverter` 来处理的。而默认实现是 `SimpleMessageConverter`, 基于 JDK 的 `ObjectOutputStream` 完成序列化。 定义一个 `MessageConverter` 类型的 Bean 即可指定序列化。推荐 JSON 方式的进行序列化 在 publisher 中引入依赖 `jackson-databind` ```xml <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> ``` 在启动类中添加序列化方法: ```java @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } ``` --- 在 consumer 服务中重复 publisher 的序列化操作,以接收 JSON 序列化消息 最后接受对象消息: ```java @Component public class ObjectListener { @RabbitListener(queues = "object.queue") public void testConsumerObjectMessage(Map<String, Object> message) { System.out.println("receive object message:" + message); } } ``` 最后修改:2026 年 03 月 13 日 © 允许规范转载 赞 1 如果觉得我的文章对你有用,请随意赞赏