环境
RabbitMQ
如果使用 docker-compose 部署,可以参考我的这篇文章:docker-compose部署RabbitMQ(含管理页面)
项目结构及依赖
为了方便测试,创建一个项目结构如下:
主要依赖是 springboot 整合 amqp 的 starter。如下:(Junit 用于 provider 中测试发送消息)
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.11.RELEASE</version>
</dependency>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
在父工程里进行版本管理,生产者和消费者子工程里引入。
配置、使用和测试
生产者(Provider)
在生产者项目rabbitmq-provider
中创建配置文件 application.yml,内容如下:
spring:
application:
name: rabbitmq-provider
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
username: rabbit
password: 123456
主要是配置 rabbitmq 服务器地址、端口、用户名和密码。
继续在该项目中创建配置类RabbitmqProviderConfig
,内容如下:
/**
* Rabbitmq生产者配置类
*
* @author Leo
* @create 2021/6/8 10:03
**/
@Configuration
public class RabbitmqProviderConfig {
/**
* 创建一个 queue
* @return
*/
@Bean
public Queue directQueue() {
return new Queue("directQueue", true);
}
/**
* 创建一个 direct 类型的 exchange
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange", true, false);
}
/**
* 绑定前面创建的 directQueue 和 directExchange ,指定 routing key 为 directRk
* @return
*/
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRk");
}
}
主要是配置queue(队列)
、exchange(交换器)
、bingding(交换器和队列的绑定关系)
。
我们可以通过 Junit 来测试发送消息,创建测试类RabbitmqProviderTests
,内容如下:
/**
* @author Leo
* @create 2021/6/8 10:07
**/
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitmqProviderTests {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() {
String msg = "hello-first-rabbitmq-msg";
rabbitTemplate.convertAndSend("directExchange", "directRk", msg);
}
}
执行testSend
方法发送一条消息,发送时除了消息体,还需指定 exchange 和 routing key。convertAndSend
有很多重载方法,都可以自行尝试,这里就不演示了。
执行成功后到 RabbitMQ Management
管理界面查看,可以看到红色箭头处分别表示等待消费的消息数
和总消息数
。
可以在Exchanges
选项卡里看到代码里创建的 directExchange
。
以及Queues
选项卡里的directQueue
。
消费者(Consumer)
在消费者项目rabbitmq-consumer
中创建配置文件 application.yml,内容如下:
spring:
application:
name: rabbitmq-consumer
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
username: rabbit
password: 123456
和生产者基本一致。
创建消息监听器RabbitmqDirectReceiver
,用于接收消息,代码如下:
/**
* rabbitmq消费者
*
* @author Leo
* @create 2021/6/8 10:26
**/
//指定监听队列
@RabbitListener(queues = "directQueue")
@Component
@Slf4j
public class RabbitmqDirectReceiver {
/**
* 处理消息
* @param msg
*/
@RabbitHandler
public void process(String msg) {
log.info("RabbitmqDirectReceiver received msg: " + msg);
}
}
启动消费者项目,查看日志打印:
可以看到刚生产者发送的消息被消费了。
多个消费者监听同一队列
当多个消费者监听同一队列时,是怎么消费的呢?先说结论,多个消费者会以轮询的方式消费消息。
再额外创建一个监听器RabbitmqDirectReceiver2
,和上面的监听器监听同一队列。
/**
* @author Leo
* @create 2021/6/8 13:32
**/
//和 RabbitmqDirectReceiver 监听同一个队列
@RabbitListener(queues = "directQueue")
@Component
@Slf4j
public class RabbitmqDirectReceiver2 {
/**
* 处理消息
* @param msg
*/
@RabbitHandler
public void process(String msg) {
log.info("RabbitmqDirectReceiver2 received msg: " + msg);
}
}
在生产者测试类中添加一个方法,向交换器directExchange
发送 10 条消息:
private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Test
public void testSend2() {
for (int i = 0; i < 10; i++) {
String msg = "msg-" + i + "-" + LocalDateTime.now().format(dateTimeFormatter);
rabbitTemplate.convertAndSend("directExchange", "directRk", msg);
}
}
先启动消费者项目,然后执行测试方法testSend2
。
通过消费的日志可以看到两个消费者确实是以轮询的方式消费消息。
其他交换器(Exchange)
前面的示例我们使用的是direct
类型的 exchange,还有两个常用的:fanout
、topic
。
fanout exchange
在生产者配置类RabbitmqProviderConfig
中添加以下代码:
/**
* 创建两个队列:fanoutQueue1、fanoutQueue2
* @return
*/
@Bean
public Queue fanoutQueue1() {
return new Queue("fanoutQueue1", true);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanoutQueue2", true);
}
/**
* 创建一个 fanout 类型的 exchange
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange", true, false);
}
/**
* 将队列:fanoutQueue1、fanoutQueue2 绑定 交换器:fanoutExchange
* 这里不用配置 routing key,因为 fanout 类型的 exchange 在收到消息时会将消息发送所有与其绑定的队列里
* @return
*/
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
创建 fanout
类型的 exchange,并且为其绑定了两个队列。
在生产者测试类RabbitmqProviderTests
中添加以下测试方法:
@Test
public void testFanoutExchange() {
String fanoutMsg = "hello-fanout-exchange";
rabbitTemplate.convertAndSend("fanoutExchange", null, fanoutMsg);
}
在消费者项目中添加两个监听器,分别监听fanoutQueue1
、fanoutQueue2
:
/**
* @author Leo
* @create 2021/6/8 14:33
**/
@Component
@Slf4j
@RabbitListener(queues = "fanoutQueue1")
public class RabbitmqFanoutReceiver {
@RabbitHandler
public void process(String msg) {
log.info("RabbitmqFanoutReceiver received msg: " + msg);
}
}
/**
* @author Leo
* @create 2021/6/8 14:35
**/
@Component
@Slf4j
@RabbitListener(queues = "fanoutQueue2")
public class RabbitmqFanoutReceiver2 {
@RabbitHandler
public void process(String msg) {
log.info("RabbitmqFanoutReceiver2 received msg: " + msg);
}
}
先执行上述测试方法,再启动消费者。
可以看到两个监听器都消费到了消息,说明消息确实发送到了两个队列中。
topic exchange
在生产者配置类RabbitmqProviderConfig
中添加以下代码:
/**
* 创建两个队列:topicQueue1、topicQueue2
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue("topicQueue1", true);
}
@Bean
public Queue topicQueue2() {
return new Queue("topicQueue2", true);
}
/**
* 创建一个 topic 类型的 exchange
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange", true, false);
}
/**
* 对于 topic 类型的 exchange,指定 routing key 时,有两个特殊通配符: # 匹配 0 个或多个单词,* 匹配一个单词。
* 单词用 . 隔开
* @return
*/
@Bean
public Binding topicBinding1() {
//绑定 topicQueue1 和 topicExchange,指定 routing key 为 top.zysite.*
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("top.zysite.*");
}
@Bean
public Binding topicBinding2() {
//绑定 topicQueue2 和 topicExchange,指定 routing key 为 top.#
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("top.#");
}
创建了topic
类型的 exchange 并为其绑定了两个队列。指定 routing key 时使用了通配符。
在生产者测试类RabbitmqProviderTests
中添加以下测试方法:
@Test
public void testTopicExchange() {
String routingKey1 = "top.zysite.topic";
String topicMsg1 = "hello-topic-exchange1";
String routingKey2 = "top.baidu.topic";
String topicMsg2 = "hello-topic-exchange2";
String routingKey3 = "top.zysite.topic.test";
String topicMsg3 = "hello-topic-exchange3";
rabbitTemplate.convertAndSend("topicExchange", routingKey1, topicMsg1);
rabbitTemplate.convertAndSend("topicExchange", routingKey2, topicMsg2);
rabbitTemplate.convertAndSend("topicExchange", routingKey3, topicMsg3);
}
同样在消费者项目中添加两个监听器,分别监听topicQueue1
、topicQueue2
:
/**
* @author Leo
* @create 2021/6/8 15:04
**/
@Component
@Slf4j
@RabbitListener(queues = "topicQueue1")
public class RabbitmqTopicReceiver {
@RabbitHandler
public void process(String msg) {
log.info("RabbitmqTopicReceiver received msg: " + msg);
}
}
/**
* @author Leo
* @create 2021/6/8 15:05
**/
@Component
@Slf4j
@RabbitListener(queues = "topicQueue2")
public class RabbitmqTopicReceiver2 {
@RabbitHandler
public void process(String msg) {
log.info("RabbitmqTopicReceiver2 received msg: " + msg);
}
}
先执行上述测试方法,再启动消费者。
可以看到 routing key
为 top.zysite.topic
的消息被发送到了两个队列中,其它两个消息只被发送到topicQueue2
中。说明绑定 queue 和 exchange 时,指定 routing key 的通配符起了作用。
监听消息是否被正确路由
从前面的示例可以看出,如果我们指定了错误的 exchange 或 routing key,那么消息可能就无法正确地被发送到队列中,那么我们怎么指定生产者发送的消息有没有正确的路由到队列中呢?RabbitMQ 提供了相关的确认机制和回调。
修改生产者项目配置文件:
spring:
application:
name: rabbitmq-provider
#rabbitmq配置
rabbitmq:
host: localhost
port: 5672
username: rabbit
password: 123456
#开启处理 rabbitmq 返回的消息
publisher-returns: true
在生产者项目中创建配置类RabbitmqConfig
,代码如下:
/**
* 配置 RabbitTemplate
*
* @author Leo
* @create 2021/6/8 15:45
**/
@Slf4j
@Configuration
public class RabbitmqConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//mandatory 为 true 时,当消息无法路由到队列时,会将消息返还给消费者客户端(通过Basic.Return)
//可以添加回调,接收并处理返回的消息
rabbitTemplate.setMandatory(true);
//添加 rabbitmq 服务器返回的消息回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("Received Returned Msg: " + returned.toString());
}
});
return rabbitTemplate;
}
}
在生产者测试类中添加以下方法:
@Test
public void testReturnCallback() {
String msg = "returnMsg";
String routingKey = "com.baidu.topic";
rabbitTemplate.convertAndSend("topicExchange", routingKey, msg);
}
我们知道,该 routing key 无法匹配绑定了 topicExchange 的 topicQueue1 和 topicQueue2 之中的任意一个。该消息会被 rabbiitmq 服务器返回给客户端。
执行该测试方法。
可以看到消息确实被返回给生产者客户端了。
消费者手动确认消息
默认情况下 RabbitMQ 消费者消费消息时是自动确认的,可以通过配置的方式修改为手动提交确认(ack)。
在生产者项目配置类RabbitmqProviderConfig
中添加以下配置:
/**
* 创建一个 ackQueue ,并绑定到 ackExchange,指定 routing key 为 ackRk
* @return
*/
@Bean
public Queue ackQueue() {
return new Queue("ackQueue", true);
}
@Bean
public DirectExchange ackExchange() {
return new DirectExchange("ackExchange", true, false);
}
@Bean
public Binding ackBinding() {
return BindingBuilder.bind(ackQueue()).to(ackExchange()).with("ackRk");
}
主要是创建一个 ackQueue ,并绑定到 ackExchange,指定 routing key 为 ackRk。
并在测试类中添加以下方法:
@Test
public void testConsumerAck() {
String msg = "ackMsg";
String routingKey = "ackRk";
rabbitTemplate.convertAndSend("ackExchange", routingKey, msg);
}
执行上述方法。
在消费者项目中创建配置类RabbitmqConfig
,代码如下:
/**
* @author Leo
* @create 2021/6/8 16:22
**/
@Slf4j
@Configuration
public class RabbitmqConfig {
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//改为手动提交确认(ack)
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//往该容器中添加队列
simpleMessageListenerContainer.setQueueNames("ackQueue");
//设置消息监听回调,
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("ChannelAwareMessageListener received msg : " + message.toString());
//第2个参数为 是否开启 批量 ack,即提交 <= 该 DeliveryTag 的所有消息的 ack
channel.basicAck(deliveryTag, true);
} catch (IOException e) {
//发生异常时,向 rabbitmq 服务器拒绝处理该消息, 第2个参数表示是否重新入队该消息
channel.basicReject(deliveryTag, false);
}
}
});
return simpleMessageListenerContainer;
}
}
启动消费者项目。
可以看到执行了我们配置的消息监听回调逻辑,如果出现异常的话会发送拒绝消息给 rabbitmq 服务器。
是提交 ack 还是拒绝都可以通过调用相应的方法来实现。
源码
源码地址:https://github.com/leo1604270786/hello-rabbitmq