Storm新特性之Flux

最后更新于:2022-04-01 10:00:50

# Storm新特性之Flux Flux是Storm版本0.10.0中的新组件,主要目的是为了方便拓扑的开发与部署。原先在开发Storm拓扑的时候整个拓扑的结构都是硬编码写在代码中的,当要对其进行修改时,需要修改代码并重新编译和打包,这是一件繁琐和痛苦的事情,Flux解决了这一问题。 ### 特性 下面是Flux提供的所有的特性: * 容易配置和部署拓扑(包括Storm和Trident) * 支持变更已存在的拓扑 * 通过YAML文件来定义Spouts和Bolts,甚至可以支持Storm的其他组件,如storm-kafka/storm-hdfs/storm-hbase等 * 容易支持多语言协议组件 * 方便在不同环境中切换 ### 使用 想要用Flux最简单的方法就是添加Maven依赖,然后打包成一个胖jar文件。依赖配置如下: ~~~ <!-- include Flux and user dependencies in the shaded jar --> <dependencies> <!-- Flux include --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> <version>${storm.version}</version> </dependency> <!-- add user dependencies here... --> </dependencies> <!-- create a fat jar that includes all dependencies --> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.apache.storm.flux.Flux</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> ~~~ 接下来是YAML文件的定义,一个拓扑的定义需要包括以下的部分: 1. 拓扑名 2. 拓扑组件的列表 3. spouts、bolts、stream,或者是一个可以提供`org.apache.storm.generated.StormTopology`实例的JVM类。 下面是YAML文件实例: ~~~ name: "yaml-topology" config: topology.workers: 1 # spout definitions spouts: - id: "spout-1" className: "org.apache.storm.testing.TestWordSpout" parallelism: 1 # bolt definitions bolts: - id: "bolt-1" className: "org.apache.storm.testing.TestWordCounter" parallelism: 1 - id: "bolt-2" className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" parallelism: 1 #stream definitions streams: - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) from: "spout-1" to: "bolt-1" grouping: type: FIELDS args: ["word"] - name: "bolt-1 --> bolt2" from: "bolt-1" to: "bolt-2" grouping: type: SHUFFLE ~~~ 在有了jar文件和YAML文件后就可以通过以下的命令运行Flux拓扑了,其中`myTopology-0.1.0-SNAPSHOT.jar`是打包后的jar文件,`org.apache.storm.flux.Flux`是Flux的入口类,`--local`表示是在本地运行拓扑,`my_config.yaml`使YAML配置文件。 ~~~ storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml ~~~ ### 其他特性详解 ### 不同环境切换 在不同的环境中运行拓扑需要不一样的配置,如开发环境和生产环境,这些环境中切换一般不会改变拓扑的结构,只是要修改主机、端口号和并行度等。如果用两份不一样的YAML文件来进行会产生不必要的重复,Flux可以通过.properites文件来加载不同的环境变量。只需要添加`--filter`参数即可: ~~~ torm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml --filter dev.properties ~~~ 以YAML文件中的Kafka主机为例,YAML文件修改如下: ~~~ - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "${kafka.zookeeper.hosts}" ~~~ 而dev.properties问价如下: ~~~ kafka.zookeeper.hosts: localhost:2181 ~~~ > 注:YAML文件中也可以解析系统环境变量${ENV-VARIABLE} ### 多语言协议的支持 多语言特性的支持比较简单,只需要修改YAML文件中构造参数,如下面是一个由Python写成的bolts: ~~~ bolts: - id: "splitsentence" className: "org.apache.storm.flux.bolts.GenericShellBolt" constructorArgs: # command line - ["python", "splitsentence.py"] # output fields - ["word"] parallelism: 1 ~~~ ###  展望 Flux虽然可以加方便拓扑的修改与部署,但这仍然不支持动态的修改拓扑结构,在修改拓扑时仍要中断并重启。不过现在在开发中的几个特性有望改善这个情况。 本文由 DRFish([http://www.drfish.me/](http://www.drfish.me/))原创,转载请写明原链接,谢谢。 参考内容: [Flux github](https://github.com/apache/storm/blob/a4f9f8bc5b4ca85de487a0a868e519ddcb94e852/external/flux/README.md)
';

分布式基础之二阶段提交

最后更新于:2022-04-01 10:00:48

# 分布式基础之二阶段提交 二阶段提交(Two Phase Commit)在分布式事务处理中非常常见。它主要用来保证分布式事务处理的一致性,决定事务的提交或回滚。目前二阶段提交广泛应用于关系型数据库的分布式事务处理中,它是分布式系统中的一个常见协议。 ### 需求 为什么要二阶段提交?因为在分布式系统中,每个节点只知道自己的事务是否执行成功了,而分布式系统要求一致性,也就是所有的节点的状态都应该一致。如果某一个事务只在部分节点执行成功,那么势必会导致各分布式节点不一致。二阶段提交就是用来保证要么所有的节点都执行了该事务,要么都不执行。 ### 简介 顾名思义,二阶段提交要通过两个阶段来完成。此外在二阶段中主要分为两种角色:协调节点与普通节点。而且协议中还假设节点不会发生永久性故障,而且任意两个节点都可以互相通信。 ### 提交阶段 在提交阶段,协调节点将事务分发给普通节点,普通节点进行事务的处理并返回响应。具体过程如下: 1. 协调节点向各普通节点发送事务内容,等待普通节点的回应 1. 各普通节点执行事务,并记录下日志(用来事务中断时进行回滚) 1. 各普通节点返回响应,判断事务是否执行成功 > 注:协调节点需要设置超时时限,如果有普通节点在超时时限内都没有响应,认为事务执行失败。 ### 执行阶段 在执行阶段,根据各普通节点的响应结果来进行事务的提交或回滚。 **执行事务** 1. 所有的普通节点都返回说事务执行成功 1. 协调者向所有的普通节点发送执行命令 1. 普通节点在收到执行命令后对事务结果进行提交,并释放整个事务执行期间占用的资源 1. 提交完成后向协调节点发送回应消息(ack) 1. 协调节点接收到所有的回应消息后完成事务处理 **中断事务** 1. 有普通节点执行事务失败或者超时 1. 协调者向所有普通节点发送回滚请求 1. 普通节点根据日志执行回滚操作,并释放在整个事务执行期间占用的资源 1. 回滚完成后向协调节点发送回应消息(ack) 1. 协调节点接收到所有的回应消息后完成事务中断 下面是事务提交成功的说明图: ![二阶段事务成功](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-21_56efa3c6b9f62.jpg "") 下面是事务中断的说明图: ![二阶段事务中断](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-21_56efa3c6d0325.jpg "") > 注:图片来源于网络 ### 节点运行状况 我们来分类讨论一下各节点出现问题时会对二阶段提交产生什么影响: **协调节点正常,普通节点失效**: 如果普通节点在提交阶段失效会导致因超时而中断事务;如果在执行阶段失效,那么会导致这个节点与其他的节点出现不一致现象。这时可以通过协调者对该节点进行事务的重新提交,或者该节点主动询问协调者或其他节点进行同步。 **协调节点失效,普通节点正常**: 如果调节节点失效,那么会导致收到提交请求的普通节点占用了事务资源,并一直在等待调节节点作出反馈来决定是执行事务还是回滚。可以等待协调节点恢复或者设定一种备用协调节点机制。 **协调节点失效,普通节点失效**: 在这种情况下会出现一种非常危险的错误,如果协调节点在发出了部分执行命令后失效,且接收到该执行命令的节点后来也失效了,那么就无法知道整个分布式系统的最终决定要不要执行该事务操作。这时只能进行数据订正或者等节点恢复正常后再进行相关操作。 ### 优缺点 **优点** 二阶段明显的优点就是逻辑很简单,易于在系统中实现。 **缺点** 就是因为二阶段提交协议的简单,也使得它存在一些缺陷: 单点问题: 显然协调节点在整个协议中处在一个中心位置,一旦协调节点出现问题,那么整个二阶段操作都会受到严重影响。 数据不一致: 也就是协调节点失效,普通节点失效的情况,这时候可能部分普通节点执行了事务,而部分普通节点没有执行事务。 阻塞引起的性能问题: 由于在二阶段提交中需要等到所有的节点都作出反应,需要花费大量的开销在阻塞操作上,此时普通节点不能进行其他事物操作,性能会受到影响。
';

Storm异常之RuntimeException: Found multiple defaults.yaml resources

最后更新于:2022-04-01 10:00:45

# Storm异常之RuntimeException: Found multiple defaults.yaml resources ### 异常 今天在运行Storm与Kafka集成的Topology时抛出如下异常: ~~~ java.lang.RuntimeException: Found multiple defaults.yaml resources at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:106) at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:126) at backtype.storm.utils.Utils.readStormConfig(Utils.java:146) at backtype.storm.config$read_storm_config.invoke(config.clj:66) at backtype.storm.testing$mk_local_storm_cluster.doInvoke(testing.clj:103) at clojure.lang.RestFn.invoke(RestFn.java:398) at backtype.storm.LocalCluster$_init.invoke(LocalCluster.clj:10) at backtype.storm.LocalCluster.<init>(Unknown Source) at TopologyMain.main(TopologyMain.java:29) ~~~ ### 解决方案 搜索了一下依赖包,发现storm-core下面也存在一个defaults.yaml文件。修改Maven配置文件,添加scope属性并设为provided: ~~~ <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.6</version> <scope>provided</scope> </dependency> ~~~ 重新打包运行,异常消失。 ### 知识点回忆 下面罗列一下scope的使用场景和说明: > 注:以下内容摘录自acooly的博客文章 [MAVEN Scope使用](http://acooly.iteye.com/blog/1788890) **1.compile** 编译范围,默认scope,在工程环境的classpath(编译环境)和打包(如果是WAR包,会包含在WAR包中)时候都有效。 **2.provided** 容器或JDK已提供范围,表示该依赖包已经由目标容器(如tomcat)和JDK提供,只在编译的classpath中加载和使用,打包的时候不会包含在目标包中。最常见的是j2ee规范相关的servlet-api和jsp-api等jar包,一般由servlet容器提供,无需在打包到war包中,如果不配置为provided,把这些包打包到工程war包中,在tomcat6以上版本会出现冲突无法正常运行程序(版本不符的情况)。 **3.runtime** 一般是运行和测试环境使用,编译时候不用加入classpath,打包时候会打包到目标包中。一般是通过动态加载或接口反射加载的情况比较多。也就是说程序只使用了接口,具体的时候可能有多个,运行时通过配置文件或jar包扫描动态加载的情况。典型的包括:JDBC驱动等。 **4.test** 测试范围,一般是单元测试场景使用,在编译环境加入classpath,但打包时不会加入,如junit等。 **5.system** 系统范围,与provided类似,只是标记为该scope的依赖包需要明确指定基于文件系统的jar包路径。因为需要通过systemPath指定本地jar文件路径,所以该scope是不推荐的。如果是基于组织的,一般会建立本地镜像,会把本地的或组织的基础组件加入本地镜像管理,避过使用该scope的情况。 **实践:** provided是没有传递性的,也就是说,如果你依赖的某个jar包,它的某个jar的范围是provided,那么该jar不会在你的工程中依靠jar依赖传递加入到你的工程中。 provided具有继承性,上面的情况,如果需要统一配置一个组织的通用的provided依赖,可以使用parent,然后在所有工程中继承。
';

Zookeeper集群的部署

最后更新于:2022-04-01 10:00:43

# Zookeeper集群的部署 标签(空格分隔): Zookeeper ZooKeeper是一个开源的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop、Hbase、Kafka等流行开源框架的重要组件。 以下实验环境为Ubuntu14.04,局域网内的三台普通计算机,虚拟机可以进行相同的配置。 ### 配置IP映射 为了方便后续的操作,以及容易修改配置信息,我们先做个IP地址的映射。在/etc/hosts文件中添加如下内容: ~~~ 192.168.0.100 zk1 192.168.0.101 zk2 192.168.0.102 zk3 ~~~ 注意每台机器都要进行相同的配置。 ### 下载及配置 我们先在zk1的机器上进行配置,我下载的是 [zookeeper-3.4.7](http://www.eu.apache.org/dist/zookeeper/zookeeper-3.4.7/zookeeper-3.4.7.tar.gz)。解压完成后要复制该目录下的 `conf/zoo_sample.cfg` 为 `conf/zoo.cfg` ,并对 `zoo.cfg` 进行配置: ~~~ tickTime=2000 # 修改成任意想要存放数据的位置,建议使用绝对路径 dataDir=/home/user/storage/zookeeper clientPort=2181 initLimit=5 syncLimit=2 server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888 ~~~ 在dataDir指定的目录下创建文件myid,里面添加一个数字1。 ### 远程复制配置 现在把zk1上的文件复制到zk2和zk3上: ~~~ scp -r zookeeper-3.4.7/ user@zk2:/home/user/ scp -r zookeeper-3.4.7/ user@zk3:/home/user/ ~~~ 在这两台机器上也要有myid文件,内容改成相应的数字。 ### 启动集群 在每台机器上zookeeper目录下执行以下命令来启动zookeeper服务: ~~~ bin/zkServer.sh start ~~~ 启动zookeeper时每个节点都会试图去连接集群中的其他节点,所以在开启前面两台上的服务时会记录一些异常,等所有集群上的机器都启动完毕就恢复正常了。 ### 日志查看 zookeeper的日志在目录下的zookeeper.out文件中,可以通过查看日志来了解启动的状况与节点运行情况,如下面的命令可以查看日志的最新部分: ~~~ tail -200f zookeeper.out ~~~ ### 运行情况 我们还可以通过脚本来检测一下节点的运行情况: ~~~ bin/zkServer.sh status ~~~ 下面是一个节点的输出,节点模式分为leader和follower: ~~~ ZooKeeper JMX enabled by default Using config: /home/user/zookeeper-3.4.7/bin/../conf/zoo.cfg Mode: follower ~~~
';

Kafka系统工具

最后更新于:2022-04-01 10:00:41

# Kafka系统工具 ### 前言 Kafka为我们提供了很多有用的系统工具,这些工具都放在kafka.tools包中,具体的类如下图: ![kafka-tools](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-21_56efa3c6987e7.jpg "") ### 工具介绍 下面简要的介绍一下一些比较常用的工具: **1. Consumer Offset Checker** 用来展示消费者组、话题、分区、指针偏移量(offset)等值。 可选描述: ~~~ --broker-info 打印broker的信息 --group 消费者组 --help 打印帮助信息 --retry.backoff.ms <Integer> 错误查询重新尝试间隔(默认为3000) --socket.timeout.ms <Integer> 连接超时时间 (默认为6000) --topic 消费者的某一个话题,缺失时默认包含所有的话题 --zookeeper zookeeper的地址(默认为localhost:2181) ~~~ **2. Dump Log Segments** 从日志文件中打印消息或者验证日志下标是否正确。 可选描述: ~~~ --deep-iteration 使用深迭代而不是浅迭代 --files <file1, file2, ...> 输入的文件 --key-decoder-class 用自己定义的反序列化方式反序列化关键字 --max-message-size <Integer: size> 消息最大的字节数(默认为5242880) --print-data-log 同时打印出日志消息 --value-decoder-class 用自己定义的序列化方式来序列化关键字 --verify-index-only 只是认证下标 ~~~ **3. Export Zookeeper Offsets** 把不同Kafka节点分区中的指针偏移量输出到如下格式的文件中: > /consumers/group1/offsets/topic1/1-0:286894308 /consumers/group1/offsets/topic1/2-0:284803985 可选描述: ~~~ --group 消费者组 --help 打印帮助信息 --output-file 导出的文件名 --zkconnect zookeeper的地址(默认为localhost:2181) ~~~ **4. Get Offset Shell** 获得某一个消息的指针偏移量。 可选描述: ~~~ --broker-list <hostname:port,..., 每个broker的主机名和端口号 hostname:port> --max-wait-ms <Integer: ms> 每次抓取最长的等待时间 (默认为1000) --offsets <Integer: count> 返回的偏移量的数目(默认为1) --partitions <partition ids> 需要在哪些分区中查询,默认为所有的分区 --time <Long: timestamp/-1(latest)/-2 设定时间区间 (earliest)> --topic <topic> 设定特定的话题 ~~~ **5. Import Zookeeper Offsets** 把导出的指针偏移量文件再倒入并放到对应的分区中 必须的参数: group 可选描述: ~~~ --help 打印帮助信息 --input-file 需要导入的文件 --zkconnect zookeeper的地址(默认为localhost:2181) ~~~ **6. JMX Tool** 通过JMX管理来打印metrics 可选描述: ~~~ --attributes <name> 需要查询的属性的白名单 --date-format <format> 用来格式化时间信息 --help 打印帮助信息 --jmx-url <service-url> 获取JMX信息的URL --object-name <name> 查询特定的JMX对象信息,可以设置多个值, 如果不设置会查询所有的JMX对象 --reporting-interval <Integer: ms> 多久获取一次JMX信息 (默认为2000) ~~~ **7. Kafka Migration** 可以将Kafka从0.7版本迁移到0.8版本 可选描述: ~~~ --blacklist <Java regex (String)> 不需要复制的话题的黑名单 --consumer.config <config file> 消费者配置文件 --help 打印帮助信息 --kafka.07.jar <kafka 0.7 jar> kafka 0.7版本的压缩包 --num.producers <Integer: Number of 生产者实例数目(默认为1) producers> --num.streams <Integer: Number of 消费者实例数目(默认为1) consumer threads> --producer.config <config file> 生产者配置文件 --queue.size <Integer: Queue size in 在版本间迁移时消息的缓冲数目 terms of number of messages> --whitelist <Java regex (String)> 需要从旧集群复制过去的话题的白名单 --zkclient.01.jar <zkClient 0.1 jar zookeeper 0.1版本压缩包 file required by Kafka 0.7> ~~~ **8. Mirror Maker** 提供Kafka集群之间的映射关系,实现跨集群的同步。 可选描述: ~~~ --abort.on.send.failure <Stop the 出错时是否终止操作(默认为true) entire mirror maker when a send failure occurs> --blacklist <Java regex (String)> 不需要同步的话题的黑名单 --consumer.config <config file> 消费者配置文件 --consumer.rebalance.listener <A 消费者复杂均衡监听器 custom rebalance listener of type ConsumerRebalanceListener> --help 打印帮助信息 --message.handler <A custom message 在生产者和消费者之间来处理消息的处理器 handler of type MirrorMakerMessageHandler> --message.handler.args <Arguments 消费者负载均衡参数 passed to message handler constructor.> --new.consumer 在mirror maker时使用新的消费者 --num.streams <Integer: Number of 指定消费者的线程数(默认为1) threads> --offset.commit.interval.ms <Integer: 偏移量提交间隔(默认为60000) offset commit interval in millisecond> --producer.config <config file> 生产者配置文件 --rebalance.listener.args <Arguments 消费者负载均衡参数 passed to custom rebalance listener constructor as a string.> --whitelist <Java regex (String)> 需要同步的话题的白名单 ~~~ **9. State Change Log Merger** 状态转变日志整合工具,可以把来自不同节点不同时间的日志进行整合。 可选描述: ~~~ --end-time <end timestamp in the 需要整合的日志的截止时间,在这之前的 format java.text. 日志都要整合(默认为 9999-12-31 23:59:59,999) SimpleDateFormat@f17a63e7> --logs <file1,file2,...> 需要整合的日志 --logs-regex <for example: /tmp/state- 日志名的正则表达式 change.log*> --partitions <0,1,2,...> 哪些分区的日志需要整合 --start-time <start timestamp in the 需要整合的日志的开始时间,在这之后的 format java.text. 日志都要整合(默认为 0000-00-00 00:00:00,000) SimpleDateFormat@f17a63e7> --topic <topic> 哪些话题的日志需要整合 ~~~ **10. Verify Consumer Rebalance** 确认消费者是否平衡,确保每个分区只有一个消费者,因为Kafka不支持多个消费者同时对一个分区进行读写。 可选描述: ~~~ --group 消费者组 --help 打印帮助信息 --zookeeper.connect zookeeper的地址(默认为localhost:2181) ~~~ ### 使用方式 ### 脚本命令 在${KAFKA_HOME}/bin目录下(windows用户可以在${KAFKA_HOME}/bin/windows中找到对应的bat脚本)为我们封装好了很多基本的命令脚本,可以直接调用,如: ~~~ drfish@kafka:~/kafka_2.11-0.9.0.0$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group group-1 ~~~ ### 类调用 在${KAFKA_HOME}/bin目录下还有一个特殊的脚本kafka-run-class.sh,它可以调用所需的类来运行类中的方法,具体方法如下: ~~~ drfish@kafka:~/kafka_2.11-0.9.0.0$ bin/kfka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group=group-1 ~~~ 需要注意的是,当通过调用类来使用工具时,需要使用完整的类名,而且有些类名使用了缩写,如kafka.tools.ImportZkOffsets。 ### Java代码 同样我们可以用Java代码直接通过类来调用相应的工具: ~~~ String[] arg = new String[] { "--zookeeper=10.64.253.238:2181", "--group=group-1" }; ConsumerOffsetChecker.main(arg); ~~~ ### 运行结果 上面三种不同的调用方式,最后都会返回类似如下的结果: ~~~ Group Topic Pid Offset LogSize Lag Owner group-1 test 0 255229 255229 0 none ~~~ ### 总结 本文介绍了一下Kafka提供的系统工具的常用工具,并给出了通过不同方式来运用工具的方法,能够帮助大家更好地来管理Kafka集群。
';

Kafka实战二

最后更新于:2022-04-01 10:00:39

# Kafka实战二 ### 前言 在上一章 [Kafka实战](http://blog.csdn.net/u013291394/article/details/50231681) 中我们在局域网中搭建了一个Kafka节点,并尝试了通过命令行脚本来实现本地消息的发布与接收,了解了主从节点之间的关系等。这一章主要实现在本机通过Java代码实现对局域网中的Kafka节点进行消息的发布与接收。 ### 准备工作 在Java中进行Kafka编程需要依赖kafka和kafka-clients两个包,下面直接提供maven配置文件pom.xml,不要忘记修改工程名: ~~~ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Test</groupId> <artifactId>Test</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency> </dependencies> </project> ~~~ ### 生产者 生产者有两种发布方式,同步和异步。异步方式增加了一个Callback参数来实现在消息成功发送后,开展后续的工作。 ~~~ import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; // 是否需要异步发送 private final Boolean isAsync; // 装有Kafka的机器的IP地址 private final String serverIp = "10.64.***.***"; public Producer(String topic, Boolean isAsync) { Properties props = new Properties(); props.put("bootstrap.servers", serverIp+":9092"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<Integer, String>(props); this.topic = topic; this.isAsync = isAsync; } public void run() { int messageNo = 1; while(true) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { try { producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } ++messageNo; } } } class DemoCallBack implements Callback { private long startTime; private int key; private String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * 当异步发送完成后需要进行的处理 **/ public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } } ~~~ 调用方式: ~~~ // 开启生产者线程后,会向Kafka节点中对应的topic发送Message_**类型的消息 boolean isAsync = true; Producer producerThread = new Producer(KafkaProperties.topic, isAsync); producerThread.start(); ~~~ ### 消费者 消费者用来接收特定话题的消息。 ~~~ import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import kafka.utils.ShutdownableThread; public class Consumer extends ShutdownableThread { private final KafkaConsumer<Integer, String> consumer; private final String topic; private final String serverIp = "10.64.***.***"; public Consumer(String topic) { super("KafkaConsumerExample", false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverIp+":9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); this.topic = topic; } @Override public void doWork() { consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } } @Override public String name() { return null; } @Override public boolean isInterruptible() { return false; } } ~~~ 调用方式: ~~~ //开启消费者线程后,会接收到之前生产者发送的消息 Consumer consumerThread = new Consumer(KafkaProperties.topic); consumerThread.start(); ~~~ ### 总结 通过上面的简单的例子,我们就可以在自己的工程中向Kafka发送消息,并接收到自己订阅的消息了。
';

Kafka实战

最后更新于:2022-04-01 10:00:36

# Kafka实战 实验环境为Ubuntu。 ### 安装 Kafka的安装非常简单,只需要下载解压就可以了。需要注意的是Kafka依赖于Java环境,所以确保你的系统中装有JDK。 ~~~ //安装sun默认JDK drfish@kafka-5934:~# sudo apt-get install default-jdk //下载Kafka并解压 drfish@kafka-5934:~$ wget http://apache.mirrors.lucidnetworks.net/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz jshen4@kafka-5934:~$ tar -xzf kafka_2.11-0.9.0.0.tgz jshen4@kafka-5934:~$ cd kafka_2.11-0.9.0.0/ ~~~ ### 启动 Kafka的运行是依赖于Zookeeper的,Zookeeper是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。在这里我们直接用Kafka自带的Zookeeper即可。 ~~~ //启动Zookeeper drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/zookeeper-server-start.sh config/zookeeper.properties & //启动Kafka drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server. properties & ~~~ ### 创建话题 所有消息要发布到相应的话题下,我们来创建一个测试话题。 ~~~ //创建一个名为test的topic drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test //查看现有的topic,正常情况应输出test和一些日志 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --list --zookeeper l ocalhost:2181 ~~~ ### 发送消息 现在可以通过生产者脚本来发布消息了,运行脚本后就可在命令行输入消息。 ~~~ drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message. ~~~ ### 接受消息 创建一个消费者来接收消息,通过自带的消费者脚本可以简单的把接受的消息输出,在命令行中可以看见刚刚输入的那条信息“This is a message.”。 ~~~ drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning This is a message. ~~~ 到这里一条最基本的流程就走通了,下面来尝试一下Kafka的多节点集群。 ### 多节点集群尝试 在启动新节点之前,首先要修改配置文件,因为已经有一个Kafka进程在运行中,我们要保证新的Kafka节点不与它冲突。运行下面的命令,对相应文件做如下修改: config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 ~~~ //配置节点启动文件 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ cp config/server.properties config/server-1.properties drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ cp config/server.properties config/serve r-2.properties drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ vi config/server-1.properties drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ vi config/server-2.properties //启动新节点 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server-1.properties & drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server-2.properties & ~~~ 接下去要创建能够被多个节点接收的话题。从返回的结果可以看出,主节点是节点0,节点1和2是从节点。而Isr后面的节点是指与主节点同步的节点。 ~~~ //创建一个叫做my-replicated-topic的话题,只有一个分区,但备份因子为3 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic //查看新创建话题的结果 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 ~~~ 测试一下消息的发布与接收。 ~~~ //向新的话题发布消息 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic new message //消费者读取消息 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic new message ~~~ 既然有了备份节点,我们就来看一下它们的实际作用,我们通过关闭主节点进程来模拟主节点异常的情况,测试从节点能否继续正常完成主节点应该做的工作。 ~~~ //查看我们的主节点运行在哪一个后台命令中,由于主节点是0,即2号命令 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ jobs [1] Running bin/zookeeper-server-start.sh config/zookeeper.properties & [2] Running bin/kafka-server-start.sh config/server.properties & [3]- Running bin/kafka-server-start.sh config/server-1.properties & [4]+ Running bin/kafka-server-start.sh config/server-2.properties & //将对应的命令调到前台运行,并通过^C终止命令 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ fg 2 //在终止了上面一条命令后发现如下日志,新的主节点是原来的从节点1 [2015-12-07 23:55:53,118] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer) [2015-12-07 23:55:53,158] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) //查看话题消息来验证我们的发现 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2 //看新的主节点能否处理消息 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic new message ~~~ 从上面的实验结果可以看出,当主节点0发生异常时,从节点1变为主节点,此时节点0仍在这个备份组里,但它已经不与其它节点同步(通过Isr属性看出)。接下去做最后一个实验 ,如果现在我们重启节点一会怎么样呢? ~~~ //重启节点1 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-server-start.sh config/server. properties & //查看话题 drfish@kafka-5934:~/kafka_2.11-0.9.0.0$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2,0 ~~~ 我们可以看出来节点1又处在同步状态了,我们可以看出Kafka节点的可用性是非常好的,如果节点出现异常,它会临时把该节点废弃,一旦当节点恢复正常,它又使节点进行正常的备份工作。
';

Kafka基础概念

最后更新于:2022-04-01 10:00:34

# Kafka基础概念 ### 前言 最近项目中要使用到流式数据处理,借此机会学习一下Kafka的相关知识。 ### 简介 Apache Kafka是一个分布式的、分区的、可重复提交的日志系统。它以独特的设计提供了消息系统的功能。作为一个Apache开源项目,Kafka已经得到很多业内人士的关注,并且已经在很多公司内部进行使用,如Linkin、eBay等。 Kafka作为一个分布式发布-订阅系统,也遵循着一般的发布-订阅系统的基础架构。 ![kafka-pubsub](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-21_56efa3c64ed50.jpg "") Kafka中将不同的消息通过不同的主题(topic)来区分,在图中,生产者向Kafka集群发布消息,消费者通过向某一个主题注册,进而接收相关主题的消息。Kafka可以运行在一台或多台的服务器上,每台服务器成为一个经纪人(broker)。 ### 主要概念 ### 分区(Partition) Kafka会为每个主题分一个区,发布的消息都会存到相应的分区中。每个分区是一个有序的、不可变的序列,每条消息通过偏移量来定位。具体情形如下: ![kafka-partition](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2016-03-21_56efa3c670f10.jpg "") 这样做的好处是,因为一个主题可以有多个分区,所以可以容易地拓展主题的容量到多台机器。同时分区也为高并发、高容错性提供了可能性。 ### 主题(Topics)和日志(Logs) 需要注意的是,Kafka保存数据是根据时间来决定的,而不是它是否被消费者消费。如果在数据的生命周期内,它被消费后仍然会存储在Kafka中。但如果它在生命周期内没有被消费,同样它也会在生命周期结束时被丢弃。而消费者是通过分区中的偏移量来定位消费的数据的,通常是顺序读下去,但也可以通过改变偏移量的值来随意读取信息。 ### 分布式(Distribution) 分布式跟分区是紧密相连的,正是数据是以分区的形式存储的,才给分布式提供了可能。分区的数据会在多个服务器上进行备份以便容错。每份数据都会以主分区的形式存储在某一台服务器上,它的副本称为从分区,存储在其他服务器上。主分区处理来自外界的读写请求,从分区会被动的复制数据的变化。如果主分区出现了故障,某一个从分区会变为主分区。此外一台服务器可以拥有一个主分区和多个其他分区的从分区,这样能充分利用资源,又保证了容错性。 ### 生产者(Producers) 生产者决定发布什么消息、消息属于哪个主题及在该主题的哪个分区下,关于分区的选择可以通过自己设定的方法来实现不同的方案。 ### 消费者(Consumers) 消费者会被被划分成一个个消费者组,属于某一个主题的消息会被分派到它的一个分区下,而该分区与订阅了该主题的消费组中的某一个消费者相对应,也就是说消息只会发送给订阅的消费者组中的一个消费者。可以把消费者组理解成消息的真正的订阅者,而它下面的消费者只是处理消息的线程池,这样做可以保证系统的扩展性和容错性。而消费者与分区的关系是,每个分区只能有一个消费者,这样保证了在这一分区中的所有消息都能按序处理。但不同分区中的消息处理顺序不能保证,如果要保证所有的数据都按序处理,可以使每个话题只有一个分区,每个消费者组只有一个消费者。 ### 保证(Guarantees) Kafka可以保证: - 由生产者发送给特定主题分区的消息会以发送的顺序添加。也就是说,后发送的消息一定在先发送的消息之后。 - 消费者会按消息在日志中的存储顺序看到它们。 - 对于副本因子为N的主题,我们可以承受N-1个服务器发生故障而保证提交到日志的消息不会丢失。 ### 应用场景 ### 消息队列 跟大多数的消息系统相比,Kafka有更好的吞吐量和容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,而Kafka能提供这些保障。 ### 网页行为跟踪 Kafka的另一个应用场景是跟踪用户浏览页面、搜索操作及其他用户行为,以发布-订阅的模式实时记录到对应的话题中。这些结果被订阅者拿到后就可以做进一步的处理。 ### 数据监控 Kafka也可以作为操作记录的监控模块来使用,即汇集记录一些操作信息,对系统进行监控。 ### 日志聚合 日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为冗余达到的更高的可用性保证,以及更低的端到端延迟。 ### 流式处理 这是Kafka的一个重要应用场景。保存收集流数据,以提供给之后对接的Storm或其他流式计算框架进行处理。很多用户会对那些从原始话题来的数据进行阶段性处理、汇总、扩充或者以其他的方式转换到新的话题下再继续后面的处理。 ### 事件源 事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。 ### 持久性日志 Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。 ### 总结 这篇文章主要介绍了Apache Kafka中的几个基本概念,及其应用场景。
';

在Ubuntu上部署Hadoop单节点

最后更新于:2022-04-01 10:00:32

# 在Ubuntu上部署Hadoop单节点 ### 安装Java环境 Hadoop框架是依赖Java环境的,直接用最简单的方式,直接安装OpenJDK。 ~~~ # 更新一下源列表 fish@hadoop:~$ sudo apt-get update # 安装OpenJDK fish@hadoop:~$ sudo apt-get install default-jdk # 测试Java环境 fish@hadoop:~$ java -version java version "1.7.0_65" OpenJDK Runtime Environment (IcedTea 2.5.3) (7u71-2.5.3-0ubuntu0.14.04.1) OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode) ~~~ ### 安装SSH Hadoop要依赖于SSH来管理节点,下面安装SSH。 ~~~ # 安装SSH fish@hadoop:~$ sudo apt-get install ssh # 测试SSH fish@hadoop:~$ which ssh /usr/bin/ssh fish@hadoop:~$ which sshd /usr/sbin/sshd ~~~ ### 添加Hadoop用户 为Hadoop添加专门的用户。 ~~~ # 添加用户 fish@hadoop:~$ sudo addgroup hadoop Adding group `hadoop' (GID 1002) ... Done. fish@hadoop:~$ sudo adduser --ingroup hadoop hduser Adding user `hduser' ... Adding new user `hduser' (1001) with group `hadoop' ... Creating home directory `/home/hduser' ... Copying files from `/etc/skel' ... Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully Changing the user information for hduser Enter the new value, or press ENTER for the default Full Name []: Room Number []: Work Phone []: Home Phone []: Other []: Is the information correct? [Y/n] Y # 添加root权限 fish@hadoop:~$ sudo adduser hduser sudo [sudo] password for fish: Adding user `hduser' to group `sudo' ... Adding user hduser to group sudo Done. ~~~ ### 设置SSH密钥 ~~~ # 切换用户 fish@hadoop:/home$ su hduser Password: hduser@hadoop:/home$ cd ~ # 生成密钥 hduser@hadoop:~$ ssh-keygen -t rsa -P "" Generating public/private rsa key pair. Enter file in which to save the key (/home/hduser/.ssh/id_rsa): Created directory '/home/hduser/.ssh'. Your identification has been saved in /home/hduser/.ssh/id_rsa. Your public key has been saved in /home/hduser/.ssh/id_rsa.pub. The key fingerprint is: 93:94:8f:3b:8e:c8:2b:c0:52:96:6c:8c:61:54:46:03 hduser@hadoop-7260 The key's randomart image is: +--[ RSA 2048]----+ |.E+= | |... . . | |.= . o | |. B . + | |.+ S . | |o. o | |.. o | | .. . o . | | .+.. . | +-----------------+ ' # 添加到认证密钥中,避免每次都要输密码 hduser@hadoop:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys ~~~ ### 安装Hadoop ~~~ # 下载解压Hadoop hduser@hadoop:~$ wget http://mirrors.sonic.net/apache/hadoop/common/hadoop-2.7.1/hadoop-2.7.1.tar.gz hduser@hadoop:~$ tar xvzf hadoop-2.7.1.tar.gz # 创建存放目录 hduser@hadoop:~$ sudo mkdir /usr/local/hadoop hduser@hadoop:~$ cd hadoop-2.7.1/ hduser@hadoop:~/hadoop-2.7.1$ sudo mv * /usr/local/hadoop hduser@hadoop:~/hadoop-2.7.1$ sudo chown -R hduser:hadoop /usr/local/hadoop ~~~ ### 设置配置文件 Hadoop还有很多配置文件要设置。 ##### 1. ~/.bashrc 在bashrc文件中添加如下内容: ~~~ #HADOOP VARIABLES START export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 export HADOOP_INSTALL=/usr/local/hadoop export PATH=$PATH:$HADOOP_INSTALL/bin export PATH=$PATH:$HADOOP_INSTALL/sbin export HADOOP_MAPRED_HOME=$HADOOP_INSTALL export HADOOP_COMMON_HOME=$HADOOP_INSTALL export HADOOP_HDFS_HOME=$HADOOP_INSTALL export YARN_HOME=$HADOOP_INSTALL export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib" #HADOOP VARIABLES END ~~~ ~~~ # 查看Java路径 hduser@hadoop:~$ update-alternatives --config java There is only one alternative in link group java (providing /usr/bin/java): /usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java Nothing to configure. # 编辑~/.bashrc hduser@hadoop:~$ vi ~/.bashrc hduser@hadoop:~$ source ~/.bashrc ~~~ ##### 2. /usr/local/hadoop/etc/hadoop/hadoop-env.sh 在hadoop-env.sh文件中添加如下内容: ~~~ export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 ~~~ ~~~ hduser@hadoop:~$ vi /usr/local/hadoop/etc/hadoop/hadoop-env.sh ~~~ ##### 3. /usr/local/hadoop/etc/hadoop/core-site.xml 将以下内容添加到core-site.xml文件中的`<configuration>`标签中: ~~~ <property> <name>hadoop.tmp.dir</name> <value>/app/hadoop/tmp</value> <description>A base for other temporary directories.</description> </property> <property> <name>fs.default.name</name> <value>hdfs://localhost:54310</value> <description>The name of the default file system. A URI whose scheme and authority determine the FileSystem implementation. The uri's scheme determines the config property (fs.SCHEME.impl) naming the FileSystem implementation class. The uri's authority is used to determine the host, port, etc. for a filesystem.</description> </property> ~~~ ~~~ hduser@hadoop:~$ sudo mkdir -p /app/hadoop/tmp hduser@hadoop:~$ sudo chown hduser:hadoop /app/hadoop/tmp hduser@hadoop:~$ vi /usr/local/hadoop/etc/hadoop/core-site.xml ~~~ ##### 4. /usr/local/hadoop/etc/hadoop/mapred-site.xml 将以下内容添加到mapred-site.xml文件中的`<configuration>`标签中: ~~~ <property> <name>mapred.job.tracker</name> <value>localhost:54311</value> <description>The host and port that the MapReduce job tracker runs at. If "local", then jobs are run in-process as a single map and reduce task. </description> </property> ~~~ ~~~ hduser@hadoop:~$ cp /usr/local/hadoop/etc/hadoop/mapred-site.xml.template /usr/local/hadoop/etc/hadoop/mapred-site.xml hduser@hadoop:~$ vi /usr/local/hadoop/etc/hadoop/mapred-site.xml ~~~ ##### 5. /usr/local/hadoop/etc/hadoop/hdfs-site.xml 将以下内容添加到hdfs-site.xml文件中的`<configuration>`标签中: ~~~ <property> <name>dfs.replication</name> <value>1</value> <description>Default block replication. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time. </description> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:/usr/local/hadoop_store/hdfs/namenode</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:/usr/local/hadoop_store/hdfs/datanode</value> </property> ~~~ ~~~ hduser@hadoop:~$ sudo mkdir -p /usr/local/hadoop_store/hdfs/namenode hduser@hadoop:~$ sudo mkdir -p /usr/local/hadoop_store/hdfs/datanode hduser@hadoop:~$ sudo chown -R hduser:hadoop /usr/local/hadoop_store hduser@hadoop:~$ vi /usr/local/hadoop/etc/hadoop/hdfs-site.xml ~~~ ### 格式化Hadoop文件系统 在开始使用之前,我们要格式化Hadoop文件系统。 ~~~ hduser@hadoop:~$ hadoop namenode -format DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 15/11/29 19:49:27 INFO namenode.NameNode: STARTUP_MSG: /************************************************************ STARTUP_MSG: Starting NameNode STARTUP_MSG: host = hadoop/10.64.253.22 STARTUP_MSG: args = [-format] STARTUP_MSG: version = 2.7.1 ...... 15/11/29 19:49:29 INFO util.ExitUtil: Exiting with status 0 15/11/29 19:49:29 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************ SHUTDOWN_MSG: Shutting down NameNode at hadoop-7260.lvs01.dev.ebayc3.com/10.64.253.22 ************************************************************/ ~~~ ### 启动Hadoop ~~~ hduser@hadoop-7260:~$ cd /usr/local/hadoop/sbin/ // 查看所有的命令集 hduser@hadoop-7260:/usr/local/hadoop/sbin$ ls distribute-exclude.sh start-all.cmd stop-balancer.sh hadoop-daemon.sh start-all.sh stop-dfs.cmd hadoop-daemons.sh start-balancer.sh stop-dfs.sh hdfs-config.cmd start-dfs.cmd stop-secure-dns.sh hdfs-config.sh start-dfs.sh stop-yarn.cmd httpfs.sh start-secure-dns.sh stop-yarn.sh kms.sh start-yarn.cmd yarn-daemon.sh mr-jobhistory-daemon.sh start-yarn.sh yarn-daemons.sh refresh-namenodes.sh stop-all.cmd slaves.sh stop-all.sh // 启动Hadoop hduser@hadoop:/usr/local/hadoop/sbin$ start-all.sh This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh 15/11/29 19:56:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Starting namenodes on [localhost] ...... # 检查启动情况 hduser@hadoop-7260:/usr/local/hadoop/sbin$ jps 5814 ResourceManager 5620 SecondaryNameNode 5401 DataNode 5951 NodeManager 5239 NameNode 6837 Jps ~~~ 至此Hadoop在Ubuntu中的搭建工作完成。
';

前言

最后更新于:2022-04-01 10:00:29

> 原文出处:[大数据开源框架](http://blog.csdn.net/column/details/bigdata-open.html) 作者:[u013291394](http://blog.csdn.net/u013291394) **本系列文章经作者授权在看云整理发布,未经作者允许,请勿转载!** # 大数据开源框架 > 介绍大数据常用的开源框架,zookeeper、kafka、storm、hadoop,后续会添加spark相关知识。
';