SpringBoot集成Kafka,实现简单的收发消息
在kafka的 config 目录下找到 serverproperties 配置文件
把 listeners 和 advertisedlisteners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092
KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码
通过 KafkaTemplate 模板类发送数据。
kafkaTemplatesend(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics
bootstrap-servers :kafka服务器地址(可以多个)
consumergroup-id :指定一个默认的组名
不指定的话会报
1 earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2 latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3 none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常
这个属性也是必须配置的,不然也是会报错的
在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumerkey-deserializer 和 consumervalue-deserializer 是消费者 key/value 反序列化
producerkey-deserializer 和 producervalue-deserializer 是生产者 key/value 序列化
StringDeserializer 是内置的字符串反序列化方式
StringSerializer 是内置的字符串序列化方式
在 orgapachekafkacommonserialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer
详细可以参考
https://blogcsdnnet/shirukai/article/details/82152172
这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息
访问 http://localhost:8080/kafka ,就可以看到控制台打印消息了
serverSocket = new ServerSocket ( 9999 );
serverThread = new ServerThread ( serverSocket );
serverThreadstart ( );
看你这块的代码,你的server定义了serverSocket,当你开启服务后,新建了一个服务线程,去监听
while( flag ){try{
Socket socket = serverSocketaccept( );
ServerThreadAgent serverThreadAgent = new ServerThreadAgent( socket );
serverThreadAgentstart( );
}
catch( Exception e ){
eprintStackTrace( );
}
在监听的线程中,当遇到客户端的请求时,创建一个Socket,与客户端建立一条通道,进行通信
public void run ( ){
try
{
Object message = SocketUtilreadStream ( socket );// 接收信息
Systemoutprintln ( socketgetInetAddress ( ) + "发来信息:"
+ message);
String sendMessage = "信息已经收到。";
SocketUtilwriteStream ( sendMessage, socket );// 发送信息
}
catch ( Exception e )
{
eprintStackTrace ( );
}
}
看你的与客户端保持连接的服务代理,服务代理线程开启后,先是接受客户端的信息,先是出来,然后向客户端发送一条消息,然后,然后呢???????
然后,就没有然后来了,服务代理线程的生命就此结束,也就是说客户端与服务端的通话结束,连接已经断开。
与客户端既然已经断了,怎么可能进行消息传递呢???
这样一分析,问题明显出来了,问题出在哪?
问题就是在于与客户端保持通话的服务代理线程,怎样才能保持通话呢?
那就是在代理线程的run方法中进行循环进行监听,也就是说消息的接受,这样,才能保持通话,想结束的时候,就退出循环,或是客户端主动的结束通话。
获取从客户端读入的字符串
String result = bufferedReaderreadLine(); 这里会阻塞。
你服务器端获取Socket以后 需要另外启动一个线程去处理,你现在是单线程的服务器端设计,当然只能接收一次请求了。
服务器端接收到Socket以后应该启动一个线程
new Thread(new Runable(){
})start();
0条评论