1.kafka介绍
最后更新于:2022-04-02 04:51:59
Kafka是什么
~~~
kafka使用scala开发,支持多语言客户端(c++、java、python、go等)
Kafka最先由LinkedIn公司开发,之后成为Apache的顶级项目。
Kafka是一个分布式的、分区化、可复制提交的日志服务
LinkedIn使用Kafka实现了公司不同应用程序之间的松耦和,那么作为一个可扩展、高可靠的消息系统
支持高Throughput的应用
scale out:无需停机即可扩展机器
持久化:通过将数据持久化到硬盘以及replication防止数据丢失
支持online和offline的场景
~~~
Kafka的特点
Kafka是分布式的,其所有的构件borker(服务端集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。
在消息的生产时可以使用一个标识topic来区分,且可以进行分区;每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。
同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡
常用的场景
监控:主机通过Kafka发送与系统和应用程序健康相关的指标,然后这些信息会被收集和处理从而创建监控仪表盘并发送警告。
消息队列: 应用程度使用Kafka作为传统的消息系统实现标准的队列和消息的发布—订阅,例如搜索和内容提要(Content Feed)。比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统 一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ
站点的用户活动追踪: 为了更好地理解用户行为,改善用户体验,将用户查看了哪个页面、点击了哪些内容等信息发送到每个数据中心的Kafka集群上,并通过Hadoop进行分析、生成日常报告。
流处理:保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行 阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内 容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返 还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。
日志聚合。使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉 文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的 系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟
持久性日志:Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。
Kafka中包含以下基础概念
1. Topic(话题):Kafka中用于区分不同类别信息的类别名称。由producer指定
2. Producer(生产者):将消息发布到Kafka特定的Topic的对象(过程)
3. Consumers(消费者):订阅并处理特定的Topic中的消息的对象(过程)
4. Broker(Kafka服务集群):已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
5. Partition(分区):Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)
6. Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
消息
消息由一个固定大小的报头和可变长度但不透明的字节阵列负载。报头包含格式版本和CRC32效验和以检测损坏或截断
消息格式
~~~
1. 4 byte CRC32 of the message
2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
bit 0 ~ 2 : Compression codec
0 : no compression
1 : gzip
2 : snappy
3 : lz4
bit 3 : Timestamp type
0 : create time
1 : log append time
bit 4 ~ 7 : reserved
4. (可选) 8 byte timestamp only if "magic" identifier is greater than 0
5. 4 byte key length, containing length K
6. K byte key
7. 4 byte payload length, containing length V
8. V byte payload
~~~
安装kafka
1)打开config目录下的server.properties, 修改log.dirs为D:\kafka_logs,修改advertised.host.name=服务器ip
2)启动kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties
// 报错:(Error: missing `server' JVM at `C:\Program Files (x86)\Java\jre1.8.0_144\binver\jvm.dll'.
// Please install or use the JRE or JDK that contains these missing components.)
解决方法:将C:\Program Files (x86)\Java\jre1.8.0_144\bin\servertool.exe 改名为:server.exe
Mac启动:sh bin/kafka-server-start.sh config/server.properties
kafka搭建
kafka环境基于zookeeper,zookeeper环境基于JAVA-JDK。
安装JAVA-JDK
~~~
从oracle下载最新的SDK安装。
~~~
安装zookeeper
下载zookeeper 下载地址:http://apache.fayea.com/zookeeper/
重命名conf/zoo_sample.cfg 为conf/zoo.cfg
编辑 conf/zoo.cfg,修改dataDir=/Users/liupengjie/Desktop/tool/zookeeper-3.4.10/data
(Windows:修改dataDir=D:\zookeeper-3.3.6\data\)
启动zookeeper: ./bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /Users/liupengjie/Desktop/tool/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
停止ZK服务: ./bin/zkServer.sh stop
重启ZK服务: ./bin/zkServer.sh restart
查看ZK服务状态: ./bin/zkServer.sh status
(Windows 启动:bin/zkServer.cmd)
安装kafka
下载kafka2.1.2 打开链接:http://kafka.apache.org/downloads.html (下载二进制包)
kafka配置
通常情况下需要在解压缩kafka后,修改config/server.properties 配置文件中的以下项
~~~
log.dirs = kafka-logs
advertised.host.name=192.168.1.125 (本机ip)
# root directory for all kafka znodes.
zookeeper.connect = localhost:9092
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
listeners = PLAINTEXT://ip:9092 (listeners = PLAINTEXT://your.host.name:9092)
/*
log.dirs=/Users/liupengjie/Desktop/tool/kafka_2.11-0.11.0.0/kafka_logs
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
#listeners=PLAINTEXT://:9092
advertised.host.name=192.168.1.125
*/
~~~
log.dirs 指的是kafka的log Data保存的目录,默认为Null。如果不指定log Data会保存到log.dir设置的目录中,log.dir默认为/tmp/kafka-logs。需要保证启动KafKaServer的用户对log.dirs或log.dir设置的目录具有读与写的权限。
zookeeper.connect 指的是zookeeper集群的地址,可以是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3
listeners 监听列表(以逗号分隔 不同的协议(如plaintext,trace,ssl、不同的IP和端口))
启动kafka
~~~
cd bin
./kafka-server-start.sh ../config/server.properties
(Windows启动:./bin/windows/kafka-server-start.bat ./config/server.preperties)
/*
Windows启动报错:
Error: missing `server' JVM at `C:\Program Files (x86)\Java\jre1.8.0_144\binver\jvm.dll'.
Please install or use the JRE or JDK that contains these missing components.
解决方法:
将C:\Program Files (x86)\Java\jre1.8.0_144\bin\servertool.exe 改名为:server.exe
*/
~~~
kafka链接zookeeper
~~~
cd bin
$ kafka-console-consumer.sh --topic nginx_log --zookeeper 127.0.0.1 2181
(Windows 执行:kafka-console-consumer.bat --topic nginx_log --zookeeper 127.0.0.1 2181)
/*
Windows启动报错:
Error: missing `server' JVM at `C:\Program Files (x86)\Java\jre1.8.0_144\binver\jvm.dll'.
Please install or use the JRE or JDK that contains these missing components.
解决方法:将C:\Program Files (x86)\Java\jre1.8.0_144\bin\servertool.exe 改名为:server.exe
*/
~~~
查看topic的详细信息
~~~
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic nginx_log --describe
~~~
kafka消费者客户端命令
~~~
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic nginx_log --from-beginning
~~~
Kafka常用命令
以下是kafka常用命令行总结:
1.查看topic的详细信息
./kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1
2、为topic增加副本
./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
3、创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1
4、为topic增加partition
./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1
5、kafka生产者客户端命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1
6、kafka消费者客户端命令
./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1
7、kafka服务启动
./kafka-server-start.sh -daemon ../config/server.properties
8、下线broker
./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60
shutdown broker
9、删除topic
./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic testKJ1
10、查看consumer组内消费的offset
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic testKJ1
';