SpringBoot集成Kafka,实现简单的收发消息

SpringBoot集成Kafka,实现简单的收发消息,第1张

在kafka的 config 目录下找到 serverproperties 配置文件

把 listeners 和 advertisedlisteners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092

KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。

send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码

通过 KafkaTemplate 模板类发送数据。

kafkaTemplatesend(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics

bootstrap-servers :kafka服务器地址(可以多个)

consumergroup-id :指定一个默认的组名

不指定的话会报

1 earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费

2 latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据

3 none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常

这个属性也是必须配置的,不然也是会报错的

在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输

consumerkey-deserializer 和 consumervalue-deserializer 是消费者 key/value 反序列化

producerkey-deserializer 和 producervalue-deserializer 是生产者 key/value 序列化

StringDeserializer 是内置的字符串反序列化方式

StringSerializer 是内置的字符串序列化方式

在 orgapachekafkacommonserialization 源码包中还提供了多种类型的序列化和反序列化方式

要自定义序列化方式,需要实现接口 Serializer

要自定义反序列化方式,需要实现接口 Deserializer

详细可以参考

https://blogcsdnnet/shirukai/article/details/82152172

这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息

访问 http://localhost:8080/kafka ,就可以看到控制台打印消息了

  

serverSocket = new ServerSocket ( 9999 );

serverThread = new ServerThread ( serverSocket );

serverThreadstart ( );

看你这块的代码,你的server定义了serverSocket,当你开启服务后,新建了一个服务线程,去监听

while( flag ){

try{

    Socket socket = serverSocketaccept( );

    ServerThreadAgent serverThreadAgent = new ServerThreadAgent( socket );

    serverThreadAgentstart( );

}

catch( Exception e ){

    eprintStackTrace( );

}

在监听的线程中,当遇到客户端的请求时,创建一个Socket,与客户端建立一条通道,进行通信

public void run ( )

{

    try

    { 

    

        Object message = SocketUtilreadStream ( socket );// 接收信息

        Systemoutprintln ( socketgetInetAddress ( ) + "发来信息:"

        + message);

        String sendMessage = "信息已经收到。";

        SocketUtilwriteStream ( sendMessage, socket );// 发送信息

    }

    catch ( Exception e )

    {

        eprintStackTrace ( );

    }

}

看你的与客户端保持连接的服务代理,服务代理线程开启后,先是接受客户端的信息,先是出来,然后向客户端发送一条消息,然后,然后呢???????

然后,就没有然后来了,服务代理线程的生命就此结束,也就是说客户端与服务端的通话结束,连接已经断开。

与客户端既然已经断了,怎么可能进行消息传递呢???

这样一分析,问题明显出来了,问题出在哪?

问题就是在于与客户端保持通话的服务代理线程,怎样才能保持通话呢?

那就是在代理线程的run方法中进行循环进行监听,也就是说消息的接受,这样,才能保持通话,想结束的时候,就退出循环,或是客户端主动的结束通话。

获取从客户端读入的字符串

String result = bufferedReaderreadLine(); 这里会阻塞。

你服务器端获取Socket以后 需要另外启动一个线程去处理,你现在是单线程的服务器端设计,当然只能接收一次请求了。

服务器端接收到Socket以后应该启动一个线程

new Thread(new Runable(){

})start();

DABAN RP主题是一个优秀的主题,极致后台体验,无插件,集成会员系统
网站模板库 » SpringBoot集成Kafka,实现简单的收发消息

0条评论

发表评论

提供最优质的资源集合

立即查看 了解详情