环境
ZooKeeper、Kafka。
如果使用 docker-compose 部署 Kafka 集群,可以参考我的这篇文章 docker-compose安装Kafka集群
依赖
pom.xml 依赖文件如下:springboot 版本是 2.3.0.RELEASE 。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置
可以通过Java类配置
和配置文件配置
两种方式来配置 Kafka。
Java 类配置
配置主题(KafkaAdmin)
创建配置类 KafkaTopicConfiguration
代码如下:该配置可选,通常会事先通过 Kafka 提供的脚本创建主题。
/**
* kafka 主题配置类
*
* @author Leo
* @create 2020/12/31 15:57
**/
@Configuration
public class KafkaTopicConfiguration {
/**
* 创建 KafkaAmin,可以自动检测集群中是否存在topic,不存在则创建
* @return
*/
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
return new KafkaAdmin(props);
}
@Bean
public NewTopic newTopic() {
// 创建 topic,指定 名称、分区数、副本数
return new NewTopic("hello-kafka-test-topic", 3, (short) 2);
}
}
配置生产者
创建配置类KafkaProducerConfiguration
代码如下:里面涉及基本配置和自定义分区器、拦截器、事务
等配置。
/**
* kafka 生产者配置类
*
* @author Leo
* @create 2020/12/31 15:09
**/
@Configuration
public class KafkaProducerConfiguration {
/**
* 不包含事务 producerFactory
* @return
*/
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
//kafka 集群地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
//重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//应答级别
//acks=0 把消息发送到kafka就认为发送成功
//acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
//acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG, "all");
//KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
//批量处理的最大大小 单位 byte
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
//发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
//生产者可用缓冲区的最大值 单位 byte
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//每条消息最大的大小
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
//客户端ID
props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-kafka");
//Key 序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//Value 序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//消息压缩:none、lz4、gzip、snappy,默认为 none。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
//自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
return new DefaultKafkaProducerFactory<>(props);
}
/**
* 包含事务 producerFactory
* @return
*/
public ProducerFactory<String, String> producerFactoryWithTransaction() {
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = (DefaultKafkaProducerFactory<String, String>) producerFactory();
//设置事务Id前缀
defaultKafkaProducerFactory.setTransactionIdPrefix("tx");
return defaultKafkaProducerFactory;
}
/**
* 不包含事务 kafkaTemplate
* @return
*/
@Bean("kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* 包含事务 kafkaTemplate
* @return
*/
@Bean("kafkaTemplateWithTransaction")
public KafkaTemplate<String, String> kafkaTemplateWithTransaction() {
return new KafkaTemplate<>(producerFactoryWithTransaction());
}
/**
* 以该方式配置事务管理器:就不能以普通方式发送消息,只能通过 kafkaTemplate.executeInTransaction 或
* 在方法上加 @Transactional 注解来发送消息,否则报错
* @param producerFactory
* @return
*/
// @Bean
// public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<Integer, String> producerFactory) {
// return new KafkaTransactionManager<>(producerFactory);
// }
}
配置消费者
创建配置类KafkaConsumerConfiguration
代码如下:
/**
* kafka 消费者配置类
*
* @author Leo
* @create 2020/12/31 15:09
**/
@Slf4j
@Configuration
public class KafkaConsumerConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
//设置 consumerFactory
factory.setConsumerFactory(consumerFactory());
//设置是否开启批量监听
factory.setBatchListener(false);
//设置消费者组中的线程数量
factory.setConcurrency(1);
return factory;
}
/**
* consumerFactory
* @return
*/
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
//kafka集群地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
//自动提交 offset 默认 true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//自动提交的频率 单位 ms
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
//批量消费最大数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
//消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
//session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
//请求超时
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
//Key 反序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//Value 反序列化类
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//当kafka中没有初始offset或offset超出范围时将自动重置offset
//earliest:重置为分区中最小的offset
//latest:重置为分区中最新的offset(消费分区中新产生的数据)
//none:只要有一个分区不存在已提交的offset,就抛出异常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//设置Consumer拦截器
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 消费异常处理器
* @return
*/
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//打印消费异常的消息和异常信息
log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId());
return null;
}
};
}
}
配置文件配置
以配置文件的方式来进行配置的话,所有配置都在application.yml
中,配置内容如下:
spring:
application:
name: hello-kafka
kafka:
listener:
#设置是否批量消费,默认 single(单条),batch(批量)
type: single
# 集群地址
bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
# 生产者配置
producer:
# 重试次数
retries: 3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
acks: all
# 批量处理的最大大小 单位 byte
batch-size: 4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
buffer-memory: 33554432
# 客户端ID
client-id: hello-kafka
# Key 序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
compression-type: gzip
properties:
partitioner:
#指定自定义分区器
class: top.zysite.hello.kafka.partitioner.MyPartitioner
linger:
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
ms: 1000
max:
block:
# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
ms: 6000
# 消费者配置
consumer:
# 默认消费者组
group-id: testGroup
# 自动提交 offset 默认 true
enable-auto-commit: false
# 自动提交的频率 单位 ms
auto-commit-interval: 1000
# 批量消费最大数量
max-poll-records: 100
# Key 反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset
# latest:重置为分区中最新的offset(消费分区中新产生的数据)
# none:只要有一个分区不存在已提交的offset,就抛出异常
auto-offset-reset: latest
properties:
interceptor:
classes: top.zysite.hello.kafka.interceptor.MyConsumerInterceptor
session:
timeout:
# session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
ms: 120000
request:
timeout:
# 请求超时
ms: 120000
这里采用Java类配置
。在 SpringBoot 启动类上添加注解@EnableKafka
启用KafkaTemplate
:
@SpringBootApplication
@EnableKafka
public class HelloKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(HelloKafkaApplication.class, args);
}
}
自定义分区器
可以看到上述配置包含自定义分区器的配置,自定义分区器需实现Partitioner
接口并在生产者端进行配置,代码如下:
/**
* 自定义分区器
*
* @author Leo
* @create 2021/5/26 13:40
**/
public class MyPartitioner implements Partitioner {
/**
* 分区策略核心方法
* @param topic
* @param key
* @param keyBytes
* @param value
* @param valueBytes
* @param cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//具体分区逻辑,这里全部发送到0号分区
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
配置了之后所有的消息都会发送到 0 号分区。(这里只是为了演示)
消费者拦截器
可以看到上述配置也包含了消费者拦截器的配置,消费者拦截器需实现ConsumerInterceptor
接口,代码如下:
/**
* 消费者拦截器
*
* @author Leo
* @create 2021/5/27 16:30
**/
@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
/**
* KafkaConsumer 会在 poll 方法返回之前调用该方法,可以在该方法中对消息进行过滤
* @param records
* @return
*/
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("********** before interceptor: " + records.count() + "**********");
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
//遍历每个topic、partition
for (TopicPartition topicPartition : records.partitions()) {
//获取特定topic、partition下的消息列表
List<ConsumerRecord<String, String>> recordList = records.records(topicPartition);
//过滤
List<ConsumerRecord<String, String>> filteredList = recordList.stream()
.filter(record -> !record.value().contains("filter")).collect(Collectors.toList());
//放入新的消息记录里
newRecords.put(topicPartition, filteredList);
}
ConsumerRecords<String, String> filteredRecords = new ConsumerRecords<>(newRecords);
System.out.println("********** after interceptor: " + filteredRecords.count() + "**********");
//返回过滤后的消息记录
return filteredRecords;
}
/**
* 提交完offset之后调用该方法
* @param offsets
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (!offsets.isEmpty()) {
offsets.forEach(((topicPartition, offsetAndMetadata) -> {
log.info("partition : " + topicPartition + ", offset : " + offsetAndMetadata);
}));
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
有消费者拦截器,自然也有生产者拦截器,只需要实现ProducerInterceptor
接口并在生产者端进行配置即可,这里只演示消费者拦截器。
测试
为了方便看到效果,这里统一测试生产者和消费者。
创建生产者服务类
创建生产消息服务类KafkaProducerService
代码如下:
/**
* kafka 生产服务
*
* @author Leo
* @create 2020/12/31 16:06
**/
@Slf4j
@Service
public class KafkaProducerService {
@Qualifier("kafkaTemplate")
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Qualifier("kafkaTemplateWithTransaction")
@Resource
private KafkaTemplate<String, String> kafkaTemplateWithTransaction;
/**
* 发送消息(同步)
* @param topic 主题
* @param key 键
* @param message 值
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
//可以指定最长等待时间,也可以不指定
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
//指定key,kafka根据key进行hash,决定存入哪个partition
// kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
//存入指定partition
// kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
}
/**
* 发送消息并获取结果
* @param topic
* @param message
* @throws ExecutionException
* @throws InterruptedException
*/
public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
}
/**
* 发送消息(异步)
* @param topic 主题
* @param message 消息内容
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
//添加回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("sendMessageAsync success! topic: {}, message: {}", topic, message);
}
});
}
/**
* 可以将消息组装成 Message 对象和 ProducerRecord 对象发送
* @param topic
* @param key
* @param message
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
// 组装消息
Message msg = MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PREFIX,"kafka_")
.build();
//同步发送
kafkaTemplate.send(msg).get();
// 组装消息
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
}
/**
* 以事务方式发送消息
* @param topic
* @param key
* @param message
*/
public void sendMessageInTransaction(String topic, String key, String message) {
kafkaTemplateWithTransaction.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {
@Override
public Object doInOperations(KafkaOperations<String, String> kafkaOperations) {
kafkaOperations.send(topic, key, message);
//出现异常将会中断事务,消息不会发送出去
throw new RuntimeException("exception");
}
});
}
}
创建消费者服务类
创建消费消息服务类KafkaConsumerService
代码如下:
/**
* kafka 消费服务
*
* @author Leo
* @create 2020/12/31 16:06
**/
@Slf4j
@Service
public class KafkaConsumerService {
/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
* @param message 消息
*/
@KafkaListener(id = "consumerSingle", topics = "hello-kafka-test-topic")
public void consumerSingle(String message) {
log.info("consumerSingle ====> message: {}", message);
}
/* @KafkaListener(id = "consumerBatch", topicPartitions = {
@TopicPartition(topic = "hello-batch1", partitions = "0"),
@TopicPartition(topic = "hello-batch2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "4"))
})*/
/**
* 批量消费消息
* @param messages
*/
@KafkaListener(id = "consumerBatch", topics = "hello-batch")
public void consumerBatch(List<ConsumerRecord<String, String>> messages) {
log.info("consumerBatch =====> messageSize: {}", messages.size());
log.info(messages.toString());
}
/**
* 指定消费异常处理器
* @param message
*/
@KafkaListener(id = "consumerException", topics = "hello-kafka-test-topic", errorHandler = "consumerAwareListenerErrorHandler")
public void consumerException(String message) {
throw new RuntimeException("consumer exception");
}
/**
* 验证ConsumerInterceptor
* @param message
*/
@KafkaListener(id = "interceptor", topics = "consumer-interceptor")
public void consumerInterceptor(String message) {
log.info("consumerInterceptor ====> message: {}", message);
}
}
创建 Junit 测试类
使用 Junit 整合 SpringBoot 来测试:
@RunWith(SpringRunner.class)
@SpringBootTest
class HelloKafkaApplicationTests {
@Resource
private KafkaProducerService kafkaProducerService;
@Test
void testSendMessageSync() throws Exception {
String topic = "hello-kafka-test-topic";
String key = "key1";
String message = "firstMessage";
kafkaProducerService.sendMessageSync(topic, key, message);
}
@Test
public void testSendMessageGetResult() throws Exception {
String topic = "hello-kafka-test-topic";
String key = "key";
String message = "helloSendMessageGetResult";
kafkaProducerService.sendMessageGetResult(topic, key, message);
kafkaProducerService.sendMessageGetResult(topic, null, message);
}
@Test
public void testSendMessageAsync() {
String topic = "hello-kafka-test-topic";
String message = "firstAsyncMessage";
kafkaProducerService.sendMessageAsync(topic, message);
}
@Test
public void testMessageBuilder() throws Exception {
String topic = "hello-kafka-test-topic";
String key = "key1";
String message = "helloMessageBuilder";
kafkaProducerService.testMessageBuilder(topic, key, message);
}
/**
* 测试事务
*/
@Test
public void testSendMessageInTransaction() {
String topic = "hello-kafka-test-topic";
String key = "key1";
String message = "helloSendMessageInTransaction";
kafkaProducerService.sendMessageInTransaction(topic, key, message);
}
/**
* 测试批量消费
* @throws Exception
*/
@Test
public void testConsumerBatch() throws Exception {
//写入多条数据到批量topic:hello-batch
String topic = "hello-batch";
for(int i = 0; i < 20; i++) {
kafkaProducerService.sendMessageSync(topic, null, "batchMessage" + i);
}
}
/**
* 测试消费者拦截器
* @throws Exception
*/
@Test
public void testConsumerInterceptor() throws Exception {
String topic = "consumer-interceptor";
for(int i = 0; i < 2; i++) {
kafkaProducerService.sendMessageSync(topic,null, "normalMessage" + i);
}
kafkaProducerService.sendMessageSync(topic, null, "filteredMessage");
kafkaProducerService.sendMessageSync(topic, null, "filterMessage");
}
}
测试结果
先启动应用程序,然后依次执行 Junit 中的测试方法:
同步发送消息
执行方法testSendMessageSync
,可以看到日志打印:
消费日志:
发送消息并获取消息发往的分区
执行方法testSendMessageGetResult
,可以看到日志打印:
由于配置了自定义分区器,可以看到消息都发往了 0 号分区,如果没有配置自定义分区器,并且主题包含多个分区的话,正常情况下多条消息不会全部发往同一分区。(可以自行测试)
异步发送消息
异步发送消息无法通过 Junit 来测试,因为 Junit 方法执行完就结束了,没法看到成功或失败的回调打印。(可以通过System.in.read来阻塞,或则 Thread.sleep)。这里就不演示了。
MessageBuilder 只是以不同的形式来组装消息,可以自行测试。
以事务的形式发送消息
KafkaTemplate
封装了方法executeInTransaction
方法,可以让我们以事务的形式发送消息。
执行方法testSendMessageInTransaction
,可以看到:
抛出了异常,通过异常说明和没有该消息的消费日志,可以证明该消息并没有发送到 Kafka。
测试批量消费
批量消费需要修改上述消费者端的配置:将配置类KafkaConsumerConfiguration
中的配置稍作修改,往 setBatchListener 方法传入 true,表示开启批量监听。
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
//设置 consumerFactory
factory.setConsumerFactory(consumerFactory());
//设置是否开启批量监听
factory.setBatchListener(true);
//设置消费者组中的线程数量
factory.setConcurrency(1);
return factory;
}
先将批量消费监听器@KafkaListener
注解注释掉并停止 SpringBoot 应用程序,再执行testConsumerBatch
方法往主题中写入 20 条消息。
//@KafkaListener(id = "consumerBatch", topics = "hello-batch")
public void consumerBatch(List<String> messages) {
log.info("consumerBatch =====> messageSize: {}", messages.size());
log.info(messages.toString());
}
写入消息之后启动 SpringBoot ,可以看到批量消费的日志打印:
消费者拦截器
将批量监听改回单条,即 setBatchListener 传入 false。执行方法testConsumerInterceptor
,日志打印:
可以看到绿框中的消息可以被正常消费,红框中的消息被拦截器过滤了,没有消费,因为消息内容包含filter
,这是前面自定义消费者拦截器的逻辑。
源码
源码地址:https://github.com/leo1604270786/hello-kafka