SpringBoot 整合 Kafka

SpringBoot 整合 Kafka

leo 3327 2021-06-03

环境

ZooKeeper、Kafka。

如果使用 docker-compose 部署 Kafka 集群,可以参考我的这篇文章 docker-compose安装Kafka集群

依赖

pom.xml 依赖文件如下:springboot 版本是 2.3.0.RELEASE 。

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.4.0.RELEASE</version>
    </dependency>
    <!-- lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!-- test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

配置

可以通过Java类配置配置文件配置两种方式来配置 Kafka。

Java 类配置

配置主题(KafkaAdmin)

创建配置类 KafkaTopicConfiguration 代码如下:该配置可选,通常会事先通过 Kafka 提供的脚本创建主题。

/**
 * kafka 主题配置类
 *
 * @author Leo
 * @create 2020/12/31 15:57
 **/
@Configuration
public class KafkaTopicConfiguration {

    /**
     * 创建 KafkaAmin,可以自动检测集群中是否存在topic,不存在则创建
     * @return
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = new HashMap<>();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
        return new KafkaAdmin(props);
    }

    @Bean
    public NewTopic newTopic() {
        // 创建 topic,指定 名称、分区数、副本数
        return new NewTopic("hello-kafka-test-topic", 3, (short) 2);
    }
}

配置生产者

创建配置类KafkaProducerConfiguration 代码如下:里面涉及基本配置和自定义分区器、拦截器、事务等配置。

/**
 * kafka 生产者配置类
 *
 * @author Leo
 * @create 2020/12/31 15:09
 **/
@Configuration
public class KafkaProducerConfiguration {
    /**
     * 不包含事务 producerFactory
     * @return
     */
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        //kafka 集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
        //重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        //应答级别
        //acks=0 把消息发送到kafka就认为发送成功
        //acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
        //acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        //批量处理的最大大小 单位 byte
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        //发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        //生产者可用缓冲区的最大值 单位 byte
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //每条消息最大的大小
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
        //客户端ID
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-kafka");
        //Key 序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //Value 序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //消息压缩:none、lz4、gzip、snappy,默认为 none。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        //自定义分区器
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
        return new DefaultKafkaProducerFactory<>(props);
    }

    /**
     * 包含事务 producerFactory
     * @return
     */
    public ProducerFactory<String, String> producerFactoryWithTransaction() {
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = (DefaultKafkaProducerFactory<String, String>) producerFactory();
        //设置事务Id前缀
        defaultKafkaProducerFactory.setTransactionIdPrefix("tx");
        return defaultKafkaProducerFactory;
    }

    /**
     * 不包含事务 kafkaTemplate
     * @return
     */
    @Bean("kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * 包含事务 kafkaTemplate
     * @return
     */
    @Bean("kafkaTemplateWithTransaction")
    public KafkaTemplate<String, String> kafkaTemplateWithTransaction() {
        return new KafkaTemplate<>(producerFactoryWithTransaction());
    }

    /**
     * 以该方式配置事务管理器:就不能以普通方式发送消息,只能通过 kafkaTemplate.executeInTransaction 或
     * 在方法上加 @Transactional 注解来发送消息,否则报错
     * @param producerFactory
     * @return
     */
//    @Bean
//    public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<Integer, String> producerFactory) {
//        return new KafkaTransactionManager<>(producerFactory);
//    }

}

配置消费者

创建配置类KafkaConsumerConfiguration代码如下:

/**
 * kafka 消费者配置类
 *
 * @author Leo
 * @create 2020/12/31 15:09
 **/
@Slf4j
@Configuration
public class KafkaConsumerConfiguration {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        //设置 consumerFactory
        factory.setConsumerFactory(consumerFactory());
        //设置是否开启批量监听
        factory.setBatchListener(false);
        //设置消费者组中的线程数量
        factory.setConcurrency(1);
        return factory;
    }
    /**
     * consumerFactory
     * @return
     */
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        //kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
        //自动提交 offset 默认 true
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //自动提交的频率 单位 ms
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        //批量消费最大数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        //消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
        //session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        //请求超时
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
        //Key 反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //Value 反序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //当kafka中没有初始offset或offset超出范围时将自动重置offset
        //earliest:重置为分区中最小的offset
        //latest:重置为分区中最新的offset(消费分区中新产生的数据)
        //none:只要有一个分区不存在已提交的offset,就抛出异常
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //设置Consumer拦截器
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * 消费异常处理器
     * @return
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
                //打印消费异常的消息和异常信息
                log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId());
                return null;
            }
        };
    }
}

配置文件配置

以配置文件的方式来进行配置的话,所有配置都在application.yml中,配置内容如下:

spring:
  application:
    name: hello-kafka
  kafka:
    listener:
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: single
    # 集群地址
    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
    # 生产者配置
    producer:
      # 重试次数
      retries: 3
      # 应答级别
      # acks=0 把消息发送到kafka就认为发送成功
      # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
      # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
      acks: all
      # 批量处理的最大大小 单位 byte
      batch-size: 4096
      # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
      buffer-memory: 33554432
      # 客户端ID
      client-id: hello-kafka
      # Key 序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value 序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息压缩:none、lz4、gzip、snappy,默认为 none。
      compression-type: gzip
      properties:
        partitioner:
          #指定自定义分区器
          class: top.zysite.hello.kafka.partitioner.MyPartitioner
        linger:
          # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
          ms: 1000
        max:
          block:
            # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
            ms: 6000
    # 消费者配置
    consumer:
      # 默认消费者组
      group-id: testGroup
      # 自动提交 offset 默认 true
      enable-auto-commit: false
      # 自动提交的频率 单位 ms
      auto-commit-interval: 1000
      # 批量消费最大数量
      max-poll-records: 100
      # Key 反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset
      # latest:重置为分区中最新的offset(消费分区中新产生的数据)
      # none:只要有一个分区不存在已提交的offset,就抛出异常
      auto-offset-reset: latest
      properties:
        interceptor:
          classes: top.zysite.hello.kafka.interceptor.MyConsumerInterceptor
        session:
          timeout:
            # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
            ms: 120000
        request:
          timeout:
            # 请求超时
            ms: 120000

这里采用Java类配置。在 SpringBoot 启动类上添加注解@EnableKafka启用KafkaTemplate

@SpringBootApplication
@EnableKafka
public class HelloKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloKafkaApplication.class, args);
    }

}

自定义分区器

可以看到上述配置包含自定义分区器的配置,自定义分区器需实现Partitioner接口并在生产者端进行配置,代码如下:

/**
 * 自定义分区器
 *
 * @author Leo
 * @create 2021/5/26 13:40
 **/
public class MyPartitioner implements Partitioner {
    /**
     * 分区策略核心方法
     * @param topic
     * @param key
     * @param keyBytes
     * @param value
     * @param valueBytes
     * @param cluster
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //具体分区逻辑,这里全部发送到0号分区

        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

配置了之后所有的消息都会发送到 0 号分区。(这里只是为了演示)

消费者拦截器

可以看到上述配置也包含了消费者拦截器的配置,消费者拦截器需实现ConsumerInterceptor接口,代码如下:

/**
 * 消费者拦截器
 *
 * @author Leo
 * @create 2021/5/27 16:30
 **/
@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    /**
     * KafkaConsumer 会在 poll 方法返回之前调用该方法,可以在该方法中对消息进行过滤
     * @param records
     * @return
     */
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.out.println("********** before interceptor: " + records.count() + "**********");
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
        //遍历每个topic、partition
        for (TopicPartition topicPartition : records.partitions()) {
            //获取特定topic、partition下的消息列表
            List<ConsumerRecord<String, String>> recordList = records.records(topicPartition);
            //过滤
            List<ConsumerRecord<String, String>> filteredList = recordList.stream()
                    .filter(record -> !record.value().contains("filter")).collect(Collectors.toList());
            //放入新的消息记录里
            newRecords.put(topicPartition, filteredList);
        }
        ConsumerRecords<String, String> filteredRecords = new ConsumerRecords<>(newRecords);
        System.out.println("********** after interceptor: " + filteredRecords.count() + "**********");
        //返回过滤后的消息记录
        return filteredRecords;
    }

    /**
     * 提交完offset之后调用该方法
     * @param offsets
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (!offsets.isEmpty()) {
            offsets.forEach(((topicPartition, offsetAndMetadata) -> {
                log.info("partition : " + topicPartition + ", offset : " + offsetAndMetadata);
            }));
        }
    }


    @Override
    public void close() {

    }


    @Override
    public void configure(Map<String, ?> configs) {

    }
}

有消费者拦截器,自然也有生产者拦截器,只需要实现ProducerInterceptor接口并在生产者端进行配置即可,这里只演示消费者拦截器。

测试

为了方便看到效果,这里统一测试生产者和消费者。

创建生产者服务类

创建生产消息服务类KafkaProducerService代码如下:

/**
 * kafka 生产服务
 *
 * @author Leo
 * @create 2020/12/31 16:06
 **/
@Slf4j
@Service
public class KafkaProducerService {
    @Qualifier("kafkaTemplate")
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Qualifier("kafkaTemplateWithTransaction")
    @Resource
    private KafkaTemplate<String, String> kafkaTemplateWithTransaction;

    /**
     * 发送消息(同步)
     * @param topic 主题
     * @param key 键
     * @param message 值
     */
    public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
        //可以指定最长等待时间,也可以不指定
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
        log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
        //指定key,kafka根据key进行hash,决定存入哪个partition
//        kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
        //存入指定partition
//        kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
    }

    /**
     * 发送消息并获取结果
     * @param topic
     * @param message
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
        SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
        log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
        log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
    }

    /**
     * 发送消息(异步)
     * @param topic 主题
     * @param message 消息内容
     */
    public void sendMessageAsync(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        //添加回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message);
            }

            @Override
            public void onSuccess(SendResult<String, String> stringStringSendResult) {
                log.info("sendMessageAsync success! topic: {}, message: {}", topic, message);
            }
        });
    }

    /**
     * 可以将消息组装成 Message 对象和 ProducerRecord 对象发送
     * @param topic
     * @param key
     * @param message
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
        // 组装消息
        Message msg = MessageBuilder.withPayload(message)
                .setHeader(KafkaHeaders.MESSAGE_KEY, key)
                .setHeader(KafkaHeaders.TOPIC, topic)
                .setHeader(KafkaHeaders.PREFIX,"kafka_")
                .build();
        //同步发送
        kafkaTemplate.send(msg).get();
        // 组装消息
//        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
//        kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
    }

    /**
     * 以事务方式发送消息
     * @param topic
     * @param key
     * @param message
     */
    public void sendMessageInTransaction(String topic, String key, String message) {
        kafkaTemplateWithTransaction.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<String, String> kafkaOperations) {
                kafkaOperations.send(topic, key, message);
                //出现异常将会中断事务,消息不会发送出去
                throw new RuntimeException("exception");
            }
        });
    }

}

创建消费者服务类

创建消费消息服务类KafkaConsumerService代码如下:

/**
 * kafka 消费服务
 *
 * @author Leo
 * @create 2020/12/31 16:06
 **/
@Slf4j
@Service
public class KafkaConsumerService {

    /**
     * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
     * @param message 消息
     */
    @KafkaListener(id = "consumerSingle", topics = "hello-kafka-test-topic")
    public void consumerSingle(String message) {
        log.info("consumerSingle ====> message: {}", message);
    }


/*    @KafkaListener(id = "consumerBatch", topicPartitions = {
            @TopicPartition(topic = "hello-batch1", partitions = "0"),
            @TopicPartition(topic = "hello-batch2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "4"))
    })*/
    /**
     * 批量消费消息
     * @param messages
     */
    @KafkaListener(id = "consumerBatch", topics = "hello-batch")
    public void consumerBatch(List<ConsumerRecord<String, String>> messages) {
        log.info("consumerBatch =====> messageSize: {}", messages.size());
        log.info(messages.toString());
    }

    /**
     * 指定消费异常处理器
     * @param message
     */
    @KafkaListener(id = "consumerException", topics = "hello-kafka-test-topic", errorHandler = "consumerAwareListenerErrorHandler")
    public void consumerException(String message) {
        throw new RuntimeException("consumer exception");
    }

    /**
     * 验证ConsumerInterceptor
     * @param message
     */
    @KafkaListener(id = "interceptor", topics = "consumer-interceptor")
    public void consumerInterceptor(String message) {
        log.info("consumerInterceptor ====> message: {}", message);
    }
}

创建 Junit 测试类

使用 Junit 整合 SpringBoot 来测试:

@RunWith(SpringRunner.class)
@SpringBootTest
class HelloKafkaApplicationTests {

    @Resource
    private KafkaProducerService kafkaProducerService;

    @Test
    void testSendMessageSync() throws Exception {
        String topic = "hello-kafka-test-topic";
        String key = "key1";
        String message = "firstMessage";
        kafkaProducerService.sendMessageSync(topic, key, message);
    }
    @Test
    public void testSendMessageGetResult() throws Exception {
        String topic = "hello-kafka-test-topic";
        String key = "key";
        String message = "helloSendMessageGetResult";
        kafkaProducerService.sendMessageGetResult(topic, key, message);
        kafkaProducerService.sendMessageGetResult(topic, null, message);
    }

    @Test
    public void testSendMessageAsync() {
        String topic = "hello-kafka-test-topic";
        String message = "firstAsyncMessage";
        kafkaProducerService.sendMessageAsync(topic, message);
    }

    @Test
    public void testMessageBuilder() throws Exception {
        String topic = "hello-kafka-test-topic";
        String key = "key1";
        String message = "helloMessageBuilder";
        kafkaProducerService.testMessageBuilder(topic, key, message);
    }

    /**
     * 测试事务
     */
    @Test
    public void testSendMessageInTransaction() {
        String topic = "hello-kafka-test-topic";
        String key = "key1";
        String message = "helloSendMessageInTransaction";
        kafkaProducerService.sendMessageInTransaction(topic, key, message);
    }

    /**
     * 测试批量消费
     * @throws Exception
     */
    @Test
    public void testConsumerBatch() throws Exception {
        //写入多条数据到批量topic:hello-batch
        String topic = "hello-batch";
        for(int i = 0; i < 20; i++) {
            kafkaProducerService.sendMessageSync(topic, null, "batchMessage" + i);
        }
    }

    /**
     * 测试消费者拦截器
     * @throws Exception
     */
    @Test
    public void testConsumerInterceptor() throws Exception {
        String topic = "consumer-interceptor";
        for(int i = 0; i < 2; i++) {
            kafkaProducerService.sendMessageSync(topic,null, "normalMessage" + i);
        }
        kafkaProducerService.sendMessageSync(topic, null, "filteredMessage");
        kafkaProducerService.sendMessageSync(topic, null, "filterMessage");
    }

}

测试结果

先启动应用程序,然后依次执行 Junit 中的测试方法:

同步发送消息

执行方法testSendMessageSync,可以看到日志打印:

发送单条消息

消费日志:

消费单条

发送消息并获取消息发往的分区

执行方法testSendMessageGetResult,可以看到日志打印:

消费消息并获得发往分区编号

由于配置了自定义分区器,可以看到消息都发往了 0 号分区,如果没有配置自定义分区器,并且主题包含多个分区的话,正常情况下多条消息不会全部发往同一分区。(可以自行测试)

异步发送消息

异步发送消息无法通过 Junit 来测试,因为 Junit 方法执行完就结束了,没法看到成功或失败的回调打印。(可以通过System.in.read来阻塞,或则 Thread.sleep)。这里就不演示了。

MessageBuilder 只是以不同的形式来组装消息,可以自行测试。

以事务的形式发送消息

KafkaTemplate封装了方法executeInTransaction方法,可以让我们以事务的形式发送消息。

执行方法testSendMessageInTransaction,可以看到:

以事务的形式发送消息

抛出了异常,通过异常说明和没有该消息的消费日志,可以证明该消息并没有发送到 Kafka。

测试批量消费

批量消费需要修改上述消费者端的配置:将配置类KafkaConsumerConfiguration中的配置稍作修改,往 setBatchListener 方法传入 true,表示开启批量监听。

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        //设置 consumerFactory
        factory.setConsumerFactory(consumerFactory());
        //设置是否开启批量监听
        factory.setBatchListener(true);
        //设置消费者组中的线程数量
        factory.setConcurrency(1);
        return factory;
    }

先将批量消费监听器@KafkaListener注解注释掉并停止 SpringBoot 应用程序,再执行testConsumerBatch方法往主题中写入 20 条消息。

//@KafkaListener(id = "consumerBatch", topics = "hello-batch")
    public void consumerBatch(List<String> messages) {
        log.info("consumerBatch =====> messageSize: {}", messages.size());
        log.info(messages.toString());
    }

批量发送消息

写入消息之后启动 SpringBoot ,可以看到批量消费的日志打印:

批量消费打印

消费者拦截器

将批量监听改回单条,即 setBatchListener 传入 false。执行方法testConsumerInterceptor,日志打印:

消费者拦截器

可以看到绿框中的消息可以被正常消费,红框中的消息被拦截器过滤了,没有消费,因为消息内容包含filter,这是前面自定义消费者拦截器的逻辑。

源码

源码地址:https://github.com/leo1604270786/hello-kafka