Kafka实战

最后更新于:2022-04-01 10:00:36

# Kafka实战 实验环境为Ubuntu。 ### 安装 Kafka的安装非常简单,只需要下载解压就可以了。需要注意的是Kafka依赖于Java环境,所以确保你的系统中装有JDK。 ~~~ //安装sun默认JDK drfish@kafka-5934:~# sudo apt-get install default-jdk //下载Kafka并解压 drfish@kafka-5934:~$ wget http://apache.mirrors.lucidnetworks.net/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz jshen4@kafka-5934:~$ tar -xzf kafka_2.11-0.9.0.0.tgz jshen4@kafka-5934:~$ cd kafka_2.11-0.9.0.0/ ~~~ ### 启动 Kafka的运行是依赖于Zookeeper的,Zookeeper是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。在这里我们直接用Kafka自带的Zookeeper即可。 ~~~ //启动Zookeeper drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/zookeeper-server-start.sh config/zookeeper.properties & //启动Kafka drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server. properties & ~~~ ### 创建话题 所有消息要发布到相应的话题下,我们来创建一个测试话题。 ~~~ //创建一个名为test的topic drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test //查看现有的topic,正常情况应输出test和一些日志 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --list --zookeeper l ocalhost:2181 ~~~ ### 发送消息 现在可以通过生产者脚本来发布消息了,运行脚本后就可在命令行输入消息。 ~~~ drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message. ~~~ ### 接受消息 创建一个消费者来接收消息,通过自带的消费者脚本可以简单的把接受的消息输出,在命令行中可以看见刚刚输入的那条信息“This is a message.”。 ~~~ drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning This is a message. ~~~ 到这里一条最基本的流程就走通了,下面来尝试一下Kafka的多节点集群。 ### 多节点集群尝试 在启动新节点之前,首先要修改配置文件,因为已经有一个Kafka进程在运行中,我们要保证新的Kafka节点不与它冲突。运行下面的命令,对相应文件做如下修改: config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 ~~~ //配置节点启动文件 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ cp config/server.properties config/server-1.properties drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ cp config/server.properties config/serve r-2.properties drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ vi config/server-1.properties drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ vi config/server-2.properties //启动新节点 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server-1.properties & drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server-2.properties & ~~~ 接下去要创建能够被多个节点接收的话题。从返回的结果可以看出,主节点是节点0,节点1和2是从节点。而Isr后面的节点是指与主节点同步的节点。 ~~~ //创建一个叫做my-replicated-topic的话题,只有一个分区,但备份因子为3 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic //查看新创建话题的结果 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 ~~~ 测试一下消息的发布与接收。 ~~~ //向新的话题发布消息 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic new message //消费者读取消息 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic new message ~~~ 既然有了备份节点,我们就来看一下它们的实际作用,我们通过关闭主节点进程来模拟主节点异常的情况,测试从节点能否继续正常完成主节点应该做的工作。 ~~~ //查看我们的主节点运行在哪一个后台命令中,由于主节点是0,即2号命令 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ jobs [1] Running bin/zookeeper-server-start.sh config/zookeeper.properties & [2] Running bin/kafka-server-start.sh config/server.properties & [3]- Running bin/kafka-server-start.sh config/server-1.properties & [4]+ Running bin/kafka-server-start.sh config/server-2.properties & //将对应的命令调到前台运行,并通过^C终止命令 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ fg 2 //在终止了上面一条命令后发现如下日志,新的主节点是原来的从节点1 [2015-12-07 23:55:53,118] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer) [2015-12-07 23:55:53,158] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) //查看话题消息来验证我们的发现 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2 //看新的主节点能否处理消息 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic new message ~~~ 从上面的实验结果可以看出,当主节点0发生异常时,从节点1变为主节点,此时节点0仍在这个备份组里,但它已经不与其它节点同步(通过Isr属性看出)。接下去做最后一个实验 ,如果现在我们重启节点一会怎么样呢? ~~~ //重启节点1 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server. properties & //查看话题 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2,0 ~~~ 我们可以看出来节点1又处在同步状态了,我们可以看出Kafka节点的可用性是非常好的,如果节点出现异常,它会临时把该节点废弃,一旦当节点恢复正常,它又使节点进行正常的备份工作。
';