SpringBoot集成Kafka,实现简单的收发消息
在kafka的 config 目录下找到 serverproperties 配置文件
把 listeners 和 advertisedlisteners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092
KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码
通过 KafkaTemplate 模板类发送数据。
kafkaTemplatesend(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics
bootstrap-servers :kafka服务器地址(可以多个)
consumergroup-id :指定一个默认的组名
不指定的话会报
1 earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2 latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3 none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常
这个属性也是必须配置的,不然也是会报错的
在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumerkey-deserializer 和 consumervalue-deserializer 是消费者 key/value 反序列化
producerkey-deserializer 和 producervalue-deserializer 是生产者 key/value 序列化
StringDeserializer 是内置的字符串反序列化方式
StringSerializer 是内置的字符串序列化方式
在 orgapachekafkacommonserialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer
详细可以参考
https://blogcsdnnet/shirukai/article/details/82152172
这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息
访问 http://localhost:8080/kafka ,就可以看到控制台打印消息了
0条评论