SpringBoot 整合 RabbitMQ

SpringBoot 整合 RabbitMQ

leo 2522 2021-06-08

环境

RabbitMQ
如果使用 docker-compose 部署,可以参考我的这篇文章:docker-compose部署RabbitMQ(含管理页面)

项目结构及依赖

为了方便测试,创建一个项目结构如下:

rabbitmq项目结构

主要依赖是 springboot 整合 amqp 的 starter。如下:(Junit 用于 provider 中测试发送消息)

<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.11.RELEASE</version>
</dependency>
<!-- junit -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
    <scope>test</scope>
</dependency>

在父工程里进行版本管理,生产者和消费者子工程里引入。

配置、使用和测试

生产者(Provider)

在生产者项目rabbitmq-provider中创建配置文件 application.yml,内容如下:

spring:
  application:
    name: rabbitmq-provider
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    username: rabbit
    password: 123456

主要是配置 rabbitmq 服务器地址、端口、用户名和密码。

继续在该项目中创建配置类RabbitmqProviderConfig,内容如下:

/**
 * Rabbitmq生产者配置类
 *
 * @author Leo
 * @create 2021/6/8 10:03
 **/
@Configuration
public class RabbitmqProviderConfig {
    /**
     * 创建一个 queue
     * @return
     */
    @Bean
    public Queue directQueue() {
        return new Queue("directQueue", true);
    }

    /**
     * 创建一个 direct 类型的 exchange
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("directExchange", true, false);
    }

    /**
     * 绑定前面创建的 directQueue 和 directExchange ,指定 routing key 为 directRk
     * @return
     */
    @Bean
    public Binding directBinding() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRk");
    }
}

主要是配置queue(队列)exchange(交换器)bingding(交换器和队列的绑定关系)

我们可以通过 Junit 来测试发送消息,创建测试类RabbitmqProviderTests,内容如下:

/**
 * @author Leo
 * @create 2021/6/8 10:07
 **/
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitmqProviderTests {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend() {
        String msg = "hello-first-rabbitmq-msg";
        rabbitTemplate.convertAndSend("directExchange", "directRk", msg);
    }
}

执行testSend方法发送一条消息,发送时除了消息体,还需指定 exchange 和 routing key。convertAndSend有很多重载方法,都可以自行尝试,这里就不演示了。

执行成功后到 RabbitMQ Management管理界面查看,可以看到红色箭头处分别表示等待消费的消息数总消息数

rabbitmq-first-msg

可以在Exchanges选项卡里看到代码里创建的 directExchange

rabbitmq-exchange

以及Queues选项卡里的directQueue

rabbitmq-queue

消费者(Consumer)

在消费者项目rabbitmq-consumer中创建配置文件 application.yml,内容如下:

spring:
  application:
    name: rabbitmq-consumer
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    username: rabbit
    password: 123456

和生产者基本一致。

创建消息监听器RabbitmqDirectReceiver,用于接收消息,代码如下:

/**
 * rabbitmq消费者
 *
 * @author Leo
 * @create 2021/6/8 10:26
 **/
//指定监听队列
@RabbitListener(queues = "directQueue")
@Component
@Slf4j
public class RabbitmqDirectReceiver {
    /**
     * 处理消息
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        log.info("RabbitmqDirectReceiver received msg: " + msg);
    }
}

启动消费者项目,查看日志打印:

rabbitmq-consumer-first

可以看到刚生产者发送的消息被消费了。

多个消费者监听同一队列

当多个消费者监听同一队列时,是怎么消费的呢?先说结论,多个消费者会以轮询的方式消费消息。

再额外创建一个监听器RabbitmqDirectReceiver2,和上面的监听器监听同一队列。

/**
 * @author Leo
 * @create 2021/6/8 13:32
 **/
//和 RabbitmqDirectReceiver 监听同一个队列
@RabbitListener(queues = "directQueue")
@Component
@Slf4j
public class RabbitmqDirectReceiver2 {
    /**
     * 处理消息
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        log.info("RabbitmqDirectReceiver2 received msg: " + msg);
    }
}

在生产者测试类中添加一个方法,向交换器directExchange发送 10 条消息:

private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Test
    public void testSend2() {
        for (int i = 0; i < 10; i++) {
            String msg = "msg-" + i + "-" +  LocalDateTime.now().format(dateTimeFormatter);
            rabbitTemplate.convertAndSend("directExchange", "directRk", msg);
        }
    }

先启动消费者项目,然后执行测试方法testSend2

rabbitmq-multiReceiver

通过消费的日志可以看到两个消费者确实是以轮询的方式消费消息。

其他交换器(Exchange)

前面的示例我们使用的是direct类型的 exchange,还有两个常用的:fanouttopic

fanout exchange

在生产者配置类RabbitmqProviderConfig中添加以下代码:

    /**
     * 创建两个队列:fanoutQueue1、fanoutQueue2
     * @return
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanoutQueue1", true);
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanoutQueue2", true);
    }

    /**
     * 创建一个 fanout 类型的 exchange
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange", true, false);
    }

    /**
     * 将队列:fanoutQueue1、fanoutQueue2 绑定 交换器:fanoutExchange
     * 这里不用配置 routing key,因为 fanout 类型的 exchange 在收到消息时会将消息发送所有与其绑定的队列里
     * @return
     */
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

创建 fanout 类型的 exchange,并且为其绑定了两个队列。

在生产者测试类RabbitmqProviderTests中添加以下测试方法:

    @Test
    public void testFanoutExchange() {
        String fanoutMsg = "hello-fanout-exchange";
        rabbitTemplate.convertAndSend("fanoutExchange", null, fanoutMsg);
    }

在消费者项目中添加两个监听器,分别监听fanoutQueue1fanoutQueue2

/**
 * @author Leo
 * @create 2021/6/8 14:33
 **/
@Component
@Slf4j
@RabbitListener(queues = "fanoutQueue1")
public class RabbitmqFanoutReceiver {
    
    @RabbitHandler
    public void process(String msg) {
        log.info("RabbitmqFanoutReceiver received msg: " + msg);
    }
}
/**
 * @author Leo
 * @create 2021/6/8 14:35
 **/
@Component
@Slf4j
@RabbitListener(queues = "fanoutQueue2")
public class RabbitmqFanoutReceiver2 {

    @RabbitHandler
    public void process(String msg) {
        log.info("RabbitmqFanoutReceiver2 received msg: " + msg);
    }
}

先执行上述测试方法,再启动消费者。

rabbitmq-fanout

可以看到两个监听器都消费到了消息,说明消息确实发送到了两个队列中。

topic exchange

在生产者配置类RabbitmqProviderConfig中添加以下代码:

    /**
     * 创建两个队列:topicQueue1、topicQueue2
     * @return
     */
    @Bean
    public Queue topicQueue1() {
        return new Queue("topicQueue1", true);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topicQueue2", true);
    }

    /**
     * 创建一个 topic 类型的 exchange
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicExchange", true, false);
    }

    /**
     * 对于 topic 类型的 exchange,指定 routing key 时,有两个特殊通配符: # 匹配 0 个或多个单词,* 匹配一个单词。
     * 单词用 . 隔开
     * @return
     */
    @Bean
    public Binding topicBinding1() {
        //绑定 topicQueue1 和 topicExchange,指定 routing key 为 top.zysite.*
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("top.zysite.*");
    }

    @Bean
    public Binding topicBinding2() {
        //绑定 topicQueue2 和 topicExchange,指定 routing key 为 top.#
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("top.#");
    }

创建了topic类型的 exchange 并为其绑定了两个队列。指定 routing key 时使用了通配符。

在生产者测试类RabbitmqProviderTests中添加以下测试方法:

    @Test
    public void testTopicExchange() {
        String routingKey1 = "top.zysite.topic";
        String topicMsg1 = "hello-topic-exchange1";
        
        String routingKey2 = "top.baidu.topic";
        String topicMsg2 = "hello-topic-exchange2";
        
        String routingKey3 = "top.zysite.topic.test";
        String topicMsg3 = "hello-topic-exchange3";
        
        rabbitTemplate.convertAndSend("topicExchange", routingKey1, topicMsg1);
        rabbitTemplate.convertAndSend("topicExchange", routingKey2, topicMsg2);
        rabbitTemplate.convertAndSend("topicExchange", routingKey3, topicMsg3);
    }

同样在消费者项目中添加两个监听器,分别监听topicQueue1topicQueue2

/**
 * @author Leo
 * @create 2021/6/8 15:04
 **/
@Component
@Slf4j
@RabbitListener(queues = "topicQueue1")
public class RabbitmqTopicReceiver {
    
    @RabbitHandler
    public void process(String msg) {
        log.info("RabbitmqTopicReceiver received msg: " + msg);
    }
}
/**
 * @author Leo
 * @create 2021/6/8 15:05
 **/
@Component
@Slf4j
@RabbitListener(queues = "topicQueue2")
public class RabbitmqTopicReceiver2 {

    @RabbitHandler
    public void process(String msg) {
        log.info("RabbitmqTopicReceiver2 received msg: " + msg);
    }
}

先执行上述测试方法,再启动消费者。

rabbitmq-topic-consumer

可以看到 routing keytop.zysite.topic的消息被发送到了两个队列中,其它两个消息只被发送到topicQueue2中。说明绑定 queue 和 exchange 时,指定 routing key 的通配符起了作用。

监听消息是否被正确路由

从前面的示例可以看出,如果我们指定了错误的 exchange 或 routing key,那么消息可能就无法正确地被发送到队列中,那么我们怎么指定生产者发送的消息有没有正确的路由到队列中呢?RabbitMQ 提供了相关的确认机制和回调。

修改生产者项目配置文件:

spring:
  application:
    name: rabbitmq-provider
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    username: rabbit
    password: 123456
    #开启处理 rabbitmq 返回的消息
    publisher-returns: true

在生产者项目中创建配置类RabbitmqConfig,代码如下:

/**
 * 配置 RabbitTemplate
 *
 * @author Leo
 * @create 2021/6/8 15:45
 **/
@Slf4j
@Configuration
public class RabbitmqConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //mandatory 为 true 时,当消息无法路由到队列时,会将消息返还给消费者客户端(通过Basic.Return)
        //可以添加回调,接收并处理返回的消息
        rabbitTemplate.setMandatory(true);
        //添加 rabbitmq 服务器返回的消息回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.info("Received Returned Msg: " + returned.toString());
            }
        });

        return rabbitTemplate;
    }
}

在生产者测试类中添加以下方法:

    @Test
    public void testReturnCallback() {
        String msg = "returnMsg";
        String routingKey = "com.baidu.topic";

        rabbitTemplate.convertAndSend("topicExchange", routingKey, msg);
    }

我们知道,该 routing key 无法匹配绑定了 topicExchange 的 topicQueue1 和 topicQueue2 之中的任意一个。该消息会被 rabbiitmq 服务器返回给客户端。

执行该测试方法。

rabbitmq-returnMsg

可以看到消息确实被返回给生产者客户端了。

消费者手动确认消息

默认情况下 RabbitMQ 消费者消费消息时是自动确认的,可以通过配置的方式修改为手动提交确认(ack)。

在生产者项目配置类RabbitmqProviderConfig中添加以下配置:

    /**
     * 创建一个 ackQueue ,并绑定到 ackExchange,指定 routing key 为 ackRk
     * @return
     */
    @Bean
    public Queue ackQueue() {
        return new Queue("ackQueue", true);
    }
    @Bean
    public DirectExchange ackExchange() {
        return new DirectExchange("ackExchange", true, false);
    }
    @Bean
    public Binding ackBinding() {
        return BindingBuilder.bind(ackQueue()).to(ackExchange()).with("ackRk");
    }

主要是创建一个 ackQueue ,并绑定到 ackExchange,指定 routing key 为 ackRk。

并在测试类中添加以下方法:

    @Test
    public void testConsumerAck() {
        String msg = "ackMsg";
        String routingKey = "ackRk";
        rabbitTemplate.convertAndSend("ackExchange", routingKey, msg);
    }

执行上述方法。

在消费者项目中创建配置类RabbitmqConfig,代码如下:

/**
 * @author Leo
 * @create 2021/6/8 16:22
 **/
@Slf4j
@Configuration
public class RabbitmqConfig {

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        //改为手动提交确认(ack)
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //往该容器中添加队列
        simpleMessageListenerContainer.setQueueNames("ackQueue");
        //设置消息监听回调,
        simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                try {
                    log.info("ChannelAwareMessageListener received msg : " + message.toString());
                    //第2个参数为 是否开启 批量 ack,即提交 <= 该 DeliveryTag 的所有消息的 ack
                    channel.basicAck(deliveryTag, true);
                } catch (IOException e) {
                    //发生异常时,向 rabbitmq 服务器拒绝处理该消息, 第2个参数表示是否重新入队该消息
                    channel.basicReject(deliveryTag, false);
                }
            }
        });
        return simpleMessageListenerContainer;
    }
}

启动消费者项目。

rabbitmq-consumer-ack

可以看到执行了我们配置的消息监听回调逻辑,如果出现异常的话会发送拒绝消息给 rabbitmq 服务器。

是提交 ack 还是拒绝都可以通过调用相应的方法来实现。

源码

源码地址:https://github.com/leo1604270786/hello-rabbitmq