Kafka相关内容总结(Kafka集群搭建手记)
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
入门请参照: https://wwwibmcom/developerworks/cn/opensource/os-cn-kafka/indexhtml
在此不再赘述。
这部分不是本文的重点,但是kafka需要用到kafka集群,所以先搭建kafka集群。
从kafka官方文档看到,kafka似乎在未来的版本希望抛弃zookeep集群,自己维护集群的一致性,拭目以待吧。
我们搭建集群使用的是三台同机房的机器,因为zookeeper不怎么占资源也不怎么占空间(我们的业务目前比较简单),所以三台机器上都搭建了zookeeper集群。
搭建zookeeper集群没什么难度,参考文档: http://wwwcnblogscom/huangxincheng/p/5654170html
下面列一下我的配置并解析:
一共用三台物理机器,搭建一个Kafka集群。
每台服务器的硬盘划分都是一样的,每个独立的物理磁盘挂在一个单独的分区里面,这样很方便用于Kafka多个partition的数据读写与冗余。
/data1比较小,为了不成为集群的瓶颈,所以/data1用于存放kafka以及Zookeeper
每台机器的磁盘分布如下:
下面是kafka的简单配置,三台服务器都一样,如有不一致的在下文有说明。
kafka安装在目录/usr/local/kafka/下,下面的说明以101xxx57为例。
最重要的配置文件serverproperties,需要配置的信息如下:
从上面的配置看到,kafka集群不需要像hadoop集群那样,配置ssh通讯,而且一个kafka服务器(官方文档称之为broker,下面统一使用这个称呼)并不知道其他的kafka服务器的存在,因此你需要逐个broker去启动kafka。各个broker根据自己的配置,会自动去配置文件上的zk服务器报到,这就是一个有zk服务器粘合起来的kafka集群。
我写了一个启动脚本,放在 /usr/local/kafka/bin 下面。启动脚本每个broker都一样:
如同kafka集群里面每一个broker都需要单独启动一样,kafka集群里面每一个broker都需要单独关闭。
官方给出的关闭脚本是单独运行 bin/kafka-server-stopsh
但是我运行的结果是无法关闭。打开脚本一看,才发现是最简单的办法,发一个TERM信号到kafka的java进程,官方脚本给出的grep有点问题。
发信号之后,一直tail着kafka日志,看到正常关闭。
指定zookeeper服务器,topic名称是LvsKafka(注意topic名称不能有英文句号()和下划线(_),否则会通不过,理由是名称会冲突,下文对此略有解析)
replication-factor指出重复因子是2,也就是每条数据有两个拷贝,可靠性考虑。
partitions 指出需要多少个partition,数据量大的多一点,无论生产和消费,这是负载均衡和高并发的需要。
可以看到刚才新建的24个partition,比如partition 5, 他的leader是broker 59,也就是101xxx59这台机器。
建立topic时我们指出需要2个拷贝,从上面的输出的Replicas字段看到,这两个拷贝放在59,58两个机器,也就是101xxx59和101xxx58
Isr表示当前partition的所有拷贝所在的机器中,哪些是还活着(可以提供服务)的。现在是59和58都还存活。
这个命令另外还会看到一些类似于下面的内容:
__consumer_offsets到底是什么呢?其实就是客户端的消费进度,客户端会定时上报到kafka集群,而kafka集群会把每个客户端的消费进度放入一个自己内部的topic中,这个topic就是__consumer_offsets。我查看过__consumer_offsets的内容,其实就是每个客户端的消费进度作为一条消息,放入__consumer_offsets这个topic中。
这里给了我们两个提示:
1、kafka自己管理客户端的消费进度,而不是依靠zk,这就是kafka官方文档说的kafka未来会抛弃zk的底气之一;
2、留意到这个kafka自己的topic是带下划线的,也就是,kafka担心我们自己建的topic如果带下划线的话会跟这些内部自用的topic冲突;
Kafka 是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
(1)生产者和消费者(producer和consumer):消息的发送者叫 Producer,消息的使用者和接受者是 Consumer,生产者将数据保存到 Kafka 集群中,消费者从中获取消息进行业务的处理。
(2)broker:Kafka 集群中有很多台 Server,其中每一台 Server 都可以存储消息,将每一台 Server 称为一个 kafka 实例,也叫做 broker。
(3)主题(topic):一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。
(4)分区(partition):每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。
(5)偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为 offset(偏移量),offset 为一个 long 型数字,它可以唯一标记一条消息。由于kafka 并没有提供其他额外的索引机制来存储 offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”。
(6)分布式和分区(distributed、partitioned)
我们说 kafka 是一个分布式消息系统,所谓的分布式,实际上我们已经大致了解。消息保存在 Topic 中,而为了能够实现大数据的存储,一个 topic 划分为多个分区,每个分区对应一个文件,可以分别存储到不同的机器上,以实现分布式的集群存储。另外,每个 partition 可以有一定的副本,备份到多台机器上,以提高可用性。
总结起来就是:一个 topic 对应的多个 partition 分散存储到集群中的多个 broker 上,存储方式是一个 partition 对应一个文件,每个 broker 负责存储在自己机器上的 partition 中的消息读写。
(7)副本(replicated )
kafka 还可以配置 partitions 需要备份的个数(replicas),每个 partition 将会被备份到多台机器上,以提高可用性,备份的数量可以通过配置文件指定。
这种冗余备份的方式在分布式系统中是很常见的,那么既然有副本,就涉及到对同一个文件的多个备份如何进行管理和调度。kafka 采取的方案是:每个 partition 选举一个 server 作为“leader”,由 leader 负责所有对该分区的读写,其他 server 作为 follower 只需要简单的与 leader 同步,保持跟进即可。如果原来的 leader 失效,会重新选举由其他的 follower 来成为新的 leader。
至于如何选取 leader,实际上如果我们了解 ZooKeeper,就会发现其实这正是 Zookeeper 所擅长的,Kafka 使用 ZK 在 Broker 中选出一个 Controller,用于 Partition 分配和 Leader 选举。
另外,这里我们可以看到,实际上作为 leader 的 server 承担了该分区所有的读写请求,因此其压力是比较大的,从整体考虑,有多少个 partition 就意味着会有多少个leader,kafka 会将 leader 分散到不同的 broker 上,确保整体的负载均衡。
Apache Kafka 的一个关键依赖是 Apache Zookeeper,它是一个分布式配置和同步服务。Zookeeper 是 Kafka 代理和消费者之间的协调接口。Kafka 服务器通过 Zookeeper 集群共享信息。Kafka 在 Zookeeper 中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。
由于所有关键信息存储在 Zookeeper 中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper 的故障不会影响 Kafka 集群的状态。Kafka 将恢复状态,一旦 Zookeeper 重新启动。 这为Kafka带来了零停机时间。Kafka 代理之间的领导者选举也通过使用 Zookeeper 在领导者失败的情况下完成。
以上流程将重复,直到消费者停止请求。
消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。
在队列消息传递系统而不是单个消费者中,具有相同组 ID 的一组消费者将订阅主题。 简单来说,订阅具有相同 Group ID 的主题的消费者被认为是单个组,并且消息在它们之间共享。 让我们检查这个系统的实际工作流程。
此功能也称为使用者组。 同样,Kafka 将以非常简单和高效的方式提供两个系统中最好的。
https://wwworchomecom/22
xxindex :相对offset ,绝对position
xxlog :offset,position,message
xxtimeindex:time,相对offset
(1)查找segment file
00000000000000000000index表示最开始的文件,起始偏移量(offset)为0第二个文件00000000000000368769index的消息量起始偏移量为368770 = 368769 + 1同样,第三个文件00000000000000737337index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769index|log
(2)通过segment file查找message
通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769index的元数据物理位置和00000000000000368769log的物理偏移地址,然后再通过00000000000000368769log顺序查找直到offset=368776为止。
https://blogcsdnnet/hyj_king/article/details/105710993
https://wwworchomecom/28
https://wwworchomecom/29
问题
https://wwworchomecom/20
发送可靠性: 发送消息后,等待确认(需要确保 足够副本节点可用状态)
提交offset,但处理消息失败,需要保存offset,重复消费
重复消费:有业务端 来保障(比如数据表唯一性)
https://wwworchomecom/22
一、kafka定义
二、kafka的优势
三、kafka的原理
四、kafka起源
一、Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
二、kafka的优势
高吞吐量、低延迟:kafka美妙之处是可以处理几十万条信息,它的延迟最低只有几毫秒,每个topic可以分多个partition,consumer
group对partition进行consume操作。
可扩展性:kafka集群支持热扩展
持久化、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数千个客户端同时读写
三、kafka的原理
kafka是如何实现以上所述这几点,我们逐一说明:
1高吞吐量、低延迟
kafka在设计之初就是为了针对大数据量的传输处理,高吞吐量、低延迟最主要看的就是单位时间内所能读写的数据总量,我们先来看生产端。
kafka采取了一定量的批处理机制,即当生产数据达到一定数量或者达到时间窗口后,将所收集到的数据一批次的提交到服务器,我们假设处理一次数据的时间为1ms,那每秒钟能处理1000条,延时为1ms,如果此时将处理间隔变成9ms,即每10ms处理一批数据,假设这段时间接收到100条处理,那每秒则能处理10000条,但是延时变成了10ms。为了获得最大的吞吐量,需要牺牲一定的延迟,但是这样的牺牲是值得的。当确定了这种小批量方式之后,高速的写则取决于kafka自身写磁盘的速度了。而由于kafka本身对数据不做任何的处理,只管写入数据,保管数据,分发数据,因此会是一种批量顺序写入数据的情况,而磁盘的读写速度大量消耗在寻址上,也就是随机读写,但是对于顺序写入的速度是非常快的,甚至能媲美内存的随机写入速度。有人做过一个对比,普通磁盘顺序写入每秒能达到532M/s,SSD的顺序写入速度为422M/s,内存的顺序写入速度为3582M/s。kafka正是利用了这个特性,顺序写入,速度相对较快。而kafka本身虽然也是写入磁盘持久化数据,但实际上kafka是将数据顺序写入页缓存中(page cache),然后由操作系统自行决定何时写到磁盘上,因此kafka的写操作能在每秒轻轻松松达到写入数十万条记录。并且基于kafka的动态扩展,这个数字还能不断增大。
kafka在消费端也有着高吞吐量,由于kafka是将数据写入到页缓存中,同时由于读写相间的间隔并不大,很大可能性会在缓存中命中,从而保证高吞吐量。另外kafka由于本身不对数据做任何的修改,完全使用零拷贝技术,大大提升数据的读取能力。
2kafka每个节点叫做broker,而每一个broker都是独立运行的,可以随时加入kafka集群,集群的心跳管理是由zookeeper负责,新加入的broker只要broker id不与原有的冲突就能顺利的加入集群中,实现动态扩展。
3kafka的持久化在上面已经提到,kafka绕过了java的堆处理数据,直接将数据写入页缓存,然后由操作系统来管理页缓存写入磁盘,实现持久化。kafka每一个主题topic是一个业务数据,他可由多个partition组成,而每个partition可以有多个replica副本,用于保证数据的可靠性。replica分为两个角色,一个是leader,一个是追随者,同一时间,每一个partition只能有一个leader,其他都是追问随者,laeder负责接收数据并写入log,而追随者不能被用户写入数据,只是从leader角色的replica副本中同步log写入自己的log,保持数据同步。kafka中有一个概念,ISR,全称是in-sync
replica,即所有可用的replica副本,这里的ISR数量只要大于1,这个partition就能正常运作,因此容错性非常好,假设n个replica,那最多可以坏n-1个replica的情况下,还能保持系统正常运行。当replica迟滞到一定时间后,会被kafka从ISR中剔除,当再次同步后,可以再次加入ISR,如果这时候leader出现问题,会从ISR中重新选举一个leader,原先的leader再次同步成功后会重新加入ISR,成为一个flower。
4上面提到了kafka的ISR机制,kafka的容错性就是由ISR的机制来保证的。
5kafka集群可以动态扩展broker,多个partition同时写入消费数据,实现真正的高并发。
四、kafka的起源
kafka起源于LinkedIn公司,当时领英公司需要收集两大类数据,一是业务系统和应用程序的性能监控指标数据,而是用户的操作行为数据。当时为了收集这两类数据,领英自研了两套相应的数据收集系统,但是这两套系统都存在一些弊端,无法实现实时交互、实时性差、维护成本高。因此领英的工程师希望找到一个统一的组件来收集分发消费这些大批量的数据,ActiveMQ由于扩展性不足,不能支撑大数据量而被抛弃,从而决定自研一套满足需求的系统组件,也就是kafka。
kafka的设计之初主要有三个目标:
1为生产者和消费者提供一套简单的API
2降低网络传输和磁盘存储开销
3具有高伸缩性架构
目前kafka可以算是超额完成了目标。
kafka的名称由来也很有意思,因为kafka系统的写操作性能特别强,因此想使用一个作家的名字来命名kafka,而Jay Kreps,kafka的三位作者之一,在上大学的时候很喜欢Franz Kafka,因此起来这样一个名字。
kafka在2010年开源,2011年7月正式进入Apache进行孵化,2012年10月顺利毕业,后成为Apache的顶级项目。
一打包项目镜像:
利用Dockerfile 来打包项目的镜像
本次项目共依赖两个镜像(一个基础系统环境和一个项目镜像)
本次直接将Dockerfile写好后,用shell脚本buildsh启动打包:
然后切换到项目的目录下找到buildsh,运行即可打包项目镜像
若
报错:"failed to dial gRPC: cannot connect to the Docker daemon Is 'docker daemon' running on this host: dial unix /var/run/dockersock: connect: permission denied
"
就用
出现以下说明打包成功,接下来可以开始部署:
https://jingyanbaiducom/article/9113f81b49ed2f2b3214c7fahtml
注意:如果遇到只读权限不能修改时,将host文件复制一份到桌面,修改后在替换原来的host文件
在hosts文件末尾加上kafka服务器< !外网! 39 025>地址,修改后的格式如下:
11注意: 修改阿里云服务器的hosts 文件来配置 kafka的服务器地址:
在hosts 文件最后加入:
添加的 kafka-server 就是以下创建topic命令中的 kafka-server别名,
监听远程kafka:新建消费者:
远程创建topic的实例:
查看远程已创建的topc:
本地:
远程修改后的kafka topic:
2通过git Bash 切换到kafka客户端的bin目录:
桌面打开 gitBash,切换到本地kafka软件目录:
这里一定要切换为windows
3查看已经有的topic
--topic 指定topic名字
--replication-factor 指定副本数,因为我的是集群环境,这里副本数就为3
--partitions 指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好
注意:服务器部署时候一定要用内网172 开头的,外部访问设为外网ip
不然会导致Kafka写入数据的时候报错 : TImeout
41本地docker创建topic:
42 本地windows 创建topic
进入本地软件路径KAFKA/BIN/WIONDOWS
创建topic
5修改服务器的host:
一定要注意加sudo 不然会导致readonly 无法修改
在host 文件的末尾加上以下:
6切换到工程部署的目录
7清理redis,不然数据有残留:
71服务器上的redis挂载清除:
在 docker-composeyml中注销这几行: 目的是每次启动不必记录上次没有执行完的数据
这个是用来记录redis中假如上次指定的是1到100万块,没有执行完下次接着执行没执行完的任务,测试时暂时关闭
72删除volume:
73 如果volume文件被占用时,先删除占用容器:
74 清除redis中的数据
进入redis容器中:
8部署命令:
81开启docker可视化web上监控docker:
然后访问: http://391004841:9000
宿主机IP + 9000端口
82执行部署命令,启动服务:
9部署时报错: yaml: line 46: did not find expected key
原因: docker-composeyml文件中第46行 报错
解决:将所有数据对齐,不要有多余的空格
AWS在re:Invent 2018大会上首先发布了托管Apache Kafka消息队列服务(Amazon Managed Streaming for Apache Kafka,MSK)的消息,现在已经从预览成为正式服务。
Apache Kafka是一个分布式的消息队列系统,其使用发布以及订阅的架构,将产生的流数据的应用与利用流数据的角色分离。Apache Kafka让使用者可以捕捉如消息队列事件、交易、物联网等事件,或是应用与日志等流数据,还能实时进行分析,连续不间断地转换数据,并再将收到的数据经过处理后,分发到其他的数据湖和数据库中。
AWS提到,用户在生产环境中要配置Apache Kafka,需要克服一些障碍,特别是在后续的管理以及规模扩展工作上,而现在AWS正式推出的MSK服务,则由AWS负责管理任务,让用户可以简单地配置使用,而且由于近几个版本的Kafka,都需要与节点协调程序Zookeeper共同使用,因此MSK服务也只要简单地设定,就能让Kafka与ZooKeeper一同运行。
使用MSK服务,用户可以在几分钟内创建集群,并使用AWS身分管理与访问控制IAM管理集群操作,也能通过ACM(AWS Certificate Manager)完全托管的TLS私密凭证颁发机构授权客户端,以TLS加密数据,并使用KMS(AWS Key Management Service)中的密钥加密其他数据。当服务器发生故障时,MSK还会替换故障机器,自动执行修补,用户可以从Amazon CloudWatch中,监控服务的状态指标。
AWS表示,MSK与Kafka 111和210版本完全兼容,因此用户可以在AWS直接执行原本的Kafka应用以及工具,而不需要修改任何的代码,用户能使用开源工具MirrorMaker,将数据从现有的Kafka集群直接迁移到MSK上。
MSK的计价方式是以Kafka Broker以及配置存储每小时计价,MSK的数据传输费用与原本的AWS数据传输相同,而集群所使用的Zookeeper节点,还有区域集群的Broker和Zookeeper节点互传数据是不额外收费的。现在用户已经可以在大部分的AWS区域使用到MSK服务,包括北美、亚洲与欧洲。
0条评论