同步/异步调用

同步调用

同步调用优点:

  • 时效性强,可以立即得到结果

微服务键基于 Feign 的调用就属于同步方式,存在一些问题:

  • 耦合度高

    • 每次加入新需求,都要修改原来的代码
  • 性能下降

    • 调用者需要等待服务器提供响应,如果调用链过长则响应时间等于每次调用的时间之和(性能和吞吐能力下降)
  • 资源浪费

    • 调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下极度浪费系统资源
  • 级联失败

    • 如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌,迅速导致整个微服务群故障

异步调用

异步调用常见实现就是事件驱动模式

异步调用的优点:

  • 解除耦合
  • 提升性能
  • 故障隔离
  • 流量削峰

其缺点是:

  • 依赖 Broker 的可靠性、安全性、吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

MQ

MQ (Message Queue),译为消息队列,字面来看就是存放消息的队列。

也就是事件驱动架构中的 Broker

  • RabbitMQ
  • RocketMQ
  • Kafka
  • ActiveMQ
RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP, XMPP, SMTP, STOMPOpenWire, STOMP, REST, XMPP, AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
常规业务只做消息队列的, 一般用 RabbitMQ 或 RocketMQ

RabbitMQ

使用 Docker 快速安装:

docker run \
  -e RABBITMQ_DEFAULT_USER=voilone \
  -e RABBITMQ_DEFAULT_PASS=123456 \
  -v mq-plugins:/plugins \
  --name mq \
  --hostname mq \
  -p 15672:15672 \
  -p 5672:5672 \
  -d \
  rabbitmq:3.8-management

安装完成并启动后,可通过端口 15672 访问RabbitMQ 的控制台


RabbitMQ的结构与概念:

Rabbit Broker.png

RabbitMQ的概念:

  • channel:操作 MQ 的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对 queue、exchange 等资源的逻辑分组

常见消息模型

MQ 的官方文档中给出了 5 个 MQ 的 Demo 示例, 对应了几种不同的用法:

  1. 基本消息队列

消息模型1.png

  1. 工作消息队列

消息模型2.png

  • 发布订阅(Publish,Subscribe),根据交换机类型不同分为三种

    1. Fanout Exchange:广播
    2. Direct Exchange:路由
    3. Topic Exchange:主题

消息模型3.png

消息模型4.png

消息模型5.png


入门

编写一个 Hello World 案例,包括三个角色:

  • publisher:消息发布者,将消息发送到队列 queue
  • queue:消息队列,复杂接受并缓存消息
  • consumer:订阅队列,处理队列中的消息
flowchart LR

publisher ---> queue ---> consumer

这里使用 Java编写

先导入依赖:

<!-- - AMQP 依赖, 包含RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
还包含 Spring Boot 依赖和 Spring 单元测试依赖

创建两个 Spring Boot 项目,一个作为发布者,一个作为消费者:

发布者单元测试:

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("voilone");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

消费者单元测试:

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("voilone");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

基本消息队列的消息发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列的消息接收流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

Spring AMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,springrabbit是底层的默认实现。

Advanced Message Queuing Protocol, 是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关, 更符合微服务中独立性的要求。

基础消息队列

引入 Spring AMQP 依赖, 并添加配置:

spring:
  rabbitmq:
    host: 192.168.233.128 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: voilone # 用户名
    password: 123456  # 密码

在发布者项目中创建一个单测类,发送消息:

利用 RabbitTemplate 发送基本消息 publisher

@SpringBootTest
public class SpringAmqpTest {

  @Autowired
  private RabbitTemplate rabbitTemplate; // 借用 template 发消息

  /**
   * 基础消息队列
   */
  @Test
  public void testSimpleQueue() {
    String queueName = "simple.queue"; // 队列名 (必须已有)
    String message = "Hello, RibbitMQ";
    rabbitTemplate.convertAndSend(queueName, message);
    System.out.println("已发送消息至队列 " + queueName);
  }
}

在消费者项目中,创建类,使用 @RabbitListener 接受消息:

@Component
public class SpringRabbitListener{

  @RabbitListener(queues = "simple.queue")
  public void receiveBasicMessageAndOne2One(String msg) {
    System.out.println("接收到消息 " + msg);
  }
}

工作队列

Work queue,工作队列,可提高消息处理速度,避免队列消息堆积

消息模型2.png

模拟 Work Queue,实现一个队列绑定多个消费者

多个消费者绑定到一个队列,同一条消息只会被一个消费者处理


模拟方法:

  1. 在 publisher 服务中定义测试方法, 每秒产生50条消息, 发送到 simple.queue
  2. 在 consumer 服务中定义两个消息监听者, 都监听 simple.queue 队列
  3. 消费者 1 每秒处理 50 条消息, 消费者 2 每秒处理 10 条消息

模拟运行后,会发现消费者 1 和消费者 2 的消息会被均分处理

work 1 接收到消息:【Work queue, 50 times per seconds0】2025-07-07T16:09:33.466246400
work 2 接收到消息:【Work queue, 50 times per seconds1】2025-07-07T16:09:33.475765700
work 1 接收到消息:【Work queue, 50 times per seconds2】2025-07-07T16:09:33.506358
work 1 接收到消息:【Work queue, 50 times per seconds4】2025-07-07T16:09:33.568549100
work 2 接收到消息:【Work queue, 50 times per seconds3】2025-07-07T16:09:33.577303400
work 1 接收到消息:【Work queue, 50 times per seconds6】2025-07-07T16:09:33.630413400
work 2 接收到消息:【Work queue, 50 times per seconds5】2025-07-07T16:09:33.678346300
work 1 接收到消息:【Work queue, 50 times per seconds8】2025-07-07T16:09:33.690692800
work 1 接收到消息:【Work queue, 50 times per seconds10】2025-07-07T16:09:33.755916400
work 2 接收到消息:【Work queue, 50 times per seconds7】2025-07-07T16:09:33.785482800
...

默认情况下,RabbitMQ 会把所有消息预先进行均分处理

我们可以配置 application.yml 来控制消费预取限制:

spring:
  rabbitmq:
    listener:
      simple:
        # 消费预取机制,处理完这条立马去获取下一条,谁的速度快谁就多处理(能者多劳)  
        prefetch: 1 

配置完成后,输出结果为:

work 1 接收到消息:【Work queue, 50 times per seconds0】2025-07-07T16:17:46.955150400
work 2 接收到消息:【Work queue, 50 times per seconds1】2025-07-07T16:17:46.964781900
work 1 接收到消息:【Work queue, 50 times per seconds2】2025-07-07T16:17:46.994906500
work 1 接收到消息:【Work queue, 50 times per seconds3】2025-07-07T16:17:47.026479500
work 1 接收到消息:【Work queue, 50 times per seconds4】2025-07-07T16:17:47.056551500
work 1 接收到消息:【Work queue, 50 times per seconds5】2025-07-07T16:17:47.086976900
...
通过 prefetch 来控制消费者预取的消息数量,实现能者多劳

广播模式

Fanout Exchange 会将接受到的消息广播道每一个跟其绑定的 queue

Fanout 这里译为 广播,Exchange 为交换机

Fanout Exchange.png

一旦消息发送,所有消费者都会接受到其队列消息

模拟 Fanout Exchange:

  1. 在 consumer 服务中,利用代码声明队列、交换机,并将两者绑定
  2. 在 consumer 服务中,编写两个消费方法,分别监听 fanout.queue1fanout.queue2
  3. 在 publisher 中编写测试方法,向 voilone.fanout 发送消息

我们要在 consumer 服务中创建配置类,来配置交换机和队列, 并绑定关系:

@Configuration
public class ExchangeConfig {

    // 声明交换机 voilone.fanout
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("voilone.fanout");
    }

    // 声明队列:fanout.queue1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    // 绑定关系
    @Bean
    public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    // 声明队列:fanout.queue2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    // 绑定
    @Bean
    public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

注意一点,在发送消息时,要有三个入参,才是发送到 Exchange 中

// 若是两个参,第一个参数就是队列名
rabbitTemplate.convertAndSend(exchangeName, "", message);

交换机的作用:

  • 接收 publisher 发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • Fanout Exchange 会将消息路由到每个绑定的队列

路由模式

广播模式在经过交换机时,没有设置 routingKey,因此能够实现消息广播

路由模式,即 Direct Exchange,会将接收到的消息根据规则路由到指定的 Queue

构建消费者:

@Component
public class DirectListener {
  @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "direct.queue1"),
          exchange = @Exchange(name = "voilone.direct", type = ExchangeTypes.DIRECT),
          key = {"red", "blue"} 
  ))
  public void testConsumerDirectQueue1(String msg) {
    System.out.println("direct-q1 receive msg:" + msg);
  }

  @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "direct.queue2"),
          exchange = @Exchange(name = "voilone.direct", type = ExchangeTypes.DIRECT),
          key = {"red", "yellow"}
  ))
  public void testConsumerDirectQueue2(String msg) {
    System.err.println("direct-q2 receive msg:" + msg);
  }
}
这里注解的 bindings 做了队列的声明工作,因此可不用额外写配置代码

publisher 发送消息:

rabbitTemplate.convertAndSend(exchangeName, "blue", message);

Direct 交换机与 Fanout 交换机的差异

  • Fanout 交换机将消息路由给每一个与之绑定的队列
  • Direct 交换机根据 RoutingKey 判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

主题模式

与路由模式不同,Topic 支持通配符路由

模拟 Topic Exchange:

  1. 声明对应的消息监听器
  2. 在 consumer 服务中,编写两个消费者方法,分别监听 topic.queue1topic.queue2
  3. 在 publisher 中编写测试方法,向 voilone.topic 发送消息

创建 Topic Exchange 监听器:

@Component
public class TopicMqListener {
  @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "topic.queue1"),
          exchange = @Exchange(name = "voilone.topic", type = ExchangeTypes.TOPIC),
          key = "china.#"
  ))
  public void testConsumerTopicQueue1(String msg) {
    System.out.println("topic-q1 receive msg:" + msg);
  }

  @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "topic.queue2"),
          exchange = @Exchange(name = "voilone.topic", type = ExchangeTypes.TOPIC),
          key = "#.news"
  ))
  public void testConsumerTopicQueue2(String msg) {
    System.err.println("topic-q2 receive msg:" + msg);
  }
}

通配符:

  • # 代指 0 个或多个单词
  • * 代指一个单词
. 分割单词,比如 china.news.now,路由键为 china.# 匹配的单词为 news 和 now

发送消息:

rabbitTemplate.convertAndSend("voilone.topic", "china.news", message);

对象消息

实际项目中,用到最多的是对象消息

先在 RabbitMQ 中声明队列 object.queue

然后在 publisher 中创建测试方法, 发送对象消息:

@Test
public void testSendObjectMessage() {
  // 填充消息(默认16。要存的数据如果明显小于16对应负载因子的大小-12,建议声明时指定大小)
  // 要存储元素个数/0.75
  Map<String, Object> message = new HashMap<>(4);
  message.put("name", "voilone");
  message.put("year", 2025);
  message.put("state", "programing");

  rabbitTemplate.convertAndSend("object.queue", message);
}

Spring 的对消息对象的处理是由 org.springframework.amqp.support.converter.MessageConverter 来处理的。而默认实现是 SimpleMessageConverter, 基于 JDK 的 ObjectOutputStream 完成序列化。

定义一个 MessageConverter 类型的 Bean 即可指定序列化。推荐 JSON 方式的进行序列化

在 publisher 中引入依赖 jackson-databind

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

在启动类中添加序列化方法:

 @Bean
public MessageConverter jsonMessageConverter() {
  return new Jackson2JsonMessageConverter();
}

在 consumer 服务中重复 publisher 的序列化操作,以接收 JSON 序列化消息

最后接受对象消息:

@Component
public class ObjectListener {
  @RabbitListener(queues = "object.queue")
  public void testConsumerObjectMessage(Map<String, Object> message) {
    System.out.println("receive object message:" + message);
  }
}
最后修改:2025 年 07 月 07 日
如果觉得我的文章对你有用,请随意赞赏