第 3 部分

最后更新于:2022-04-01 03:37:29

> 原文出处:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice3/ > 作者:王 龙, 软件开发工程师, IBM # 使用Spark SQL 对结构化数据进行统计分析 本文将通过两个例子向读者展示如何使用 Spark SQL/DataFrame API 编写应用程序来对结构化的大数据进行统计分析,并且还会通过分析程序运行日志以及利用 Spark Web Console 向读者介绍 Spark 应用程序运行的基本过程和原理。通过本文的阅读,读者将会对 Spark SQL 模块有较为深入的认识和理解。 [TOC=2,3] ## 引言 在很多领域,如电信,金融等,每天都会产生大量的结构化数据,当数据量不断变大,传统的数据存储 (DBMS) 和计算方式 (单机程序) 已经不能满足企业对数据存储,统计分析以及知识挖掘的需要。在过去的数年里,传统的软件开发和维护人员已经积累了大量的基于 DBMS 的操作数据知识和经验,他们已经习惯了通过编写 SQL 语句来对数据记录进行统计分析。于是大数据工程师们开始探索如何使用类 SQL 的方式来操作和分析大数据,通过大量的努力,目前业界已经出现很多 SQL on Hadoop 的方案,如 Hive, Impala 等。Spark SQL 就是其中的一个,实际上 Spark SQL 并不是一开始就存在于 Spark 生态系统里的,它的前身是 Shark。随着 Spark 自身的发展,Spark 团队开始试图放弃 Shark 这个对于 Hive 有太多依赖 (查询优化,语法解析) 的东西,于是才有了 Spark SQL 这个全新的模块,通过几个版本的发展,目前 Spark SQL 已经趋于稳定,功能也逐渐丰富。本文将以 Spark1.4.1 版本为基础,由浅入深地向读者介绍 Spark SQL/DataFrame 的基本概念和原理,并且通过实例向读者展示如何使用 Spark SQL/DataFrame API 开发应用程序。接下来,就让我们开始 Spark SQL 的体验之旅吧。 ## 关于 Spark SQL/DataFrame Spark SQL 是 Spark 生态系统里用于处理结构化大数据的模块,该模块里最重要的概念就是 DataFrame, 相信熟悉 R 语言的工程师对此并不陌生。Spark 的 DataFrame 是基于早期版本中的 SchemaRDD,所以很自然的使用分布式大数据处理的场景。Spark DataFrame 以 RDD 为基础,但是带有 Schema 信息,它类似于传统数据库中的二维表格。 Spark SQL 模块目前支持将多种外部数据源的数据转化为 DataFrame,并像操作 RDD 或者将其注册为临时表的方式处理和分析这些数据。当前支持的数据源有: * Json * 文本文件 * RDD * 关系数据库 * Hive * Parquet 一旦将 DataFrame 注册成临时表,我们就可以使用类 SQL 的方式操作这些数据,我们将在下文的案例中详细向读者展示如何使用 Spark SQL/DataFrame 提供的 API 完成数据读取,临时表注册,统计分析等步骤。 ## 案例介绍与编程实现 ### 案例一 **a.案例描述与分析** 本案例中,我们将使用 Spark SQL 分析包含 5 亿条人口信息的结构化数据,数据存储在文本文件上,总大小为 7.81G。文件总共包含三列,第一列是 ID,第二列是性别信息 (F -> 女,M -> 男),第三列是人口的身高信息,单位是 cm。实际上这个文件与我们在本系列文章第一篇中的案例三使用的格式是一致的,读者可以参考相关章节,并使用提供的测试数据生成程序,生成 5 亿条数据,用于本案例中。为了便于读者理解,本案例依然把用于分析的文本文件的内容片段贴出来,具体格式如下。 图 1\. 案例一测试数据格式预览 ![图 1\. 案例一测试数据格式预览](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f0d11e54.png) 生成该测试文件后,读者需要把文件上传到 HDFS 上,您可以选择使用 HDFS shell 命令或者 HDSF 的 eclipse 插件。上传到 HDFS 后,我们可以通过访问 HDFS web console(http://namenode:50070),查看文件具体信息。 图 2\. 案例一测试数据文件基本信息 ![图 2\. 案例一测试数据文件基本信息](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f0f473a4.png) 本例中,我们的统计任务如下: * 用 SQL 语句的方式统计男性中身高超过 180cm 的人数。 * 用 SQL 语句的方式统计女性中身高超过 170cm 的人数。 * 对人群按照性别分组并统计男女人数。 * 用类 RDD 转换的方式对 DataFrame 操作来统计并打印身高大于 210cm 的前 50 名男性。 * 对所有人按身高进行排序并打印前 50 名的信息。 * 统计男性的平均身高。 * 统计女性身高的最大值。 读者可以看到,上述统计任务中有些是相似的,但是我们要用不同的方式实现它,以向读者展示不同的语法。 **b.编码实现** 清单 1\. 案例一示例程序源代码 ~~~ import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD object PeopleDataStatistics2 { private val schemaString = "id,gender,height" def main(args: Array[String]) { if (args.length < 1) { println("Usage:PeopleDataStatistics2 filePath") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:People Data Statistics 2") val sc = new SparkContext(conf) val peopleDataRDD = sc.textFile(args(0)) val sqlCtx = new SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlCtx.implicits._ val schemaArray = schemaString.split(",") val schema = StructType(schemaArray.map(fieldName = > StructField(fieldName, StringType, true))) val rowRDD: RDD[Row] = peopleDataRDD.map(_.split(" ")).map( eachRow => Row(eachRow(0), eachRow(1), eachRow(2))) val peopleDF = sqlCtx.createDataFrame(rowRDD, schema) peopleDF.registerTempTable("people") //get the male people whose height is more than 180 val higherMale180 = sqlCtx.sql("select id,gender, height from people where height > 180 and gender='M'") println("Men whose height are more than 180: " + higherMale180.count()) println("<Display #1>") //get the female people whose height is more than 170 val higherFemale170 = sqlCtx.sql("select id,gender, height from people where height > 170 and gender='F'") println("Women whose height are more than 170: " + higherFemale170.count()) println("<Display #2>") //Grouped the people by gender and count the number peopleDF.groupBy(peopleDF("gender")).count().show() println("People Count Grouped By Gender") println("<Display #3>") // peopleDF.filter(peopleDF("gender").equalTo("M")).filter( peopleDF("height") > 210).show(50) println("Men whose height is more than 210") println("<Display #4>") // peopleDF.sort($"height".desc).take(50).foreach { row => println(row(0) + "," + row(1) + "," + row(2)) } println("Sorted the people by height in descend order,Show top 50 people") println("<Display #5>") // peopleDF.filter(peopleDF("gender").equalTo("M")).agg(Map("height" -> "avg")).show() println("The Average height for Men") println("<Display #6>") // peopleDF.filter(peopleDF("gender").equalTo("F")).agg("height" -> "max").show() println("The Max height for Women:") println("<Display #7>") //...... println("All the statistics actions are finished on structured People data.") } } ~~~ **c.提交并运行** 编码完成后,把项目打成 jar 包,在这里,我们将源码打成名为 spark_exercise-1.0.jar, 笔者使用 Maven 来管理项目,也建议读者可以尝试下 Maven 管理您的 Scala 项目。 清单 2\. 案例一示例程序执行命令 ~~~ ./spark-submit --class com.ibm.spark.exercise.sql.PeopleDataStatistics2 \ --master spark://<spark_master_node_ip>:7077 \ --driver-memory 8g \ --executor-memory 2g \ --total-executor-cores 12 \ /home/fams/spark_exercise-1.0.jar \ hdfs://<hdfs_namenode_ip>:9000/ user/fams/inputfiles/sample_people_info2.txt ~~~ **d.监控执行过程** 在提交后,我们可以在 Spark web console(http://:8080) 中监控程序执行过程。下面我们将分别向读者展示如何监控程序产生的 Jobs,Stages,以及 D 可视化的查看 DAG 信息。 图 3\. 案例一程序监控图 1 ![图 3\. 案例一程序监控图 1](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f13ee9a6.png) 图 4\. 案例一程序监控图 2 ![图 4\. 案例一程序监控图 2](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f15da997.png) 图 5\. 案例一程序监控图 3 ![图 5\. 案例一程序监控图 3](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f16e67b2.png) 图 6\. 案例一程序监控图 4 ![图 6\. 案例一程序监控图 4](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f17f1396.png) 其实在 Spark web console 中还可以查看很多信息,如运行环境信息,Executor 进程信息,读者可以在界面上一一查看,在这里不再赘述。 **e.运行结果** 图 7\. 案例一程序运行结果 (部分) ![图 7\. 案例一程序运行结果 (部分)](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f18e7547.png) ### 案例二 **a.案例描述与分析** 在案例一中,我们将存储于 HDFS 中的文件转换成 DataFrame 并进行了一系列的统计,细心的读者会发现,都是一个关联于一个 DataFrame 的简单查询和统计,那么在案例二中,我们将向读者展示如何连接多个 DataFrame 做更复杂的统计分析。 在本案例中,我们将统计分析 1 千万用户和 1 亿条交易数据。对于用户数据,它是一个包含 6 个列 (ID, 性别, 年龄, 注册日期, 角色 (从事行业), 所在区域) 的文本文件,具有以下格式。 图 8\. 案例二测试用户数据格式预览 ![图 8\. 案例二测试用户数据格式预览](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f1af381a.png) 我们使用以下 Scala 程序来生成本案例所需的测试数据。 清单 3\. 案例二用户测试数据生成类源代码 ~~~ import java.io.{File, FileWriter} import scala.util.Random object UserDataGenerator { private val FILE_PATH = "C:\\LOCAL_DISK_D\\sample_user_data.txt" private val ROLE_ID_ARRAY = Array[String]("ROLE001","ROLE002","ROLE003","ROLE004","ROLE005") private val REGION_ID_ARRAY = Array[String]("REG001","REG002","REG003","REG004","REG005") private val MAX_USER_AGE = 60 //how many records to be generated private val MAX_RECORDS = 10000000 def main(args:Array[String]): Unit = { generateDataFile(FILE_PATH , MAX_RECORDS) } private def generateDataFile(filePath : String, recordNum: Int): Unit = { var writer:FileWriter = null try { writer = new FileWriter(filePath,true) val rand = new Random() for (i <- 1 to recordNum) { //generate the gender of the user var gender = getRandomGender // var age = rand.nextInt(MAX_USER_AGE) if (age < 10) { age = age + 10 } //generate the registering date for the user var year = rand.nextInt(16) + 2000 var month = rand.nextInt(12)+1 //to avoid checking if it is a valid day for specific month //we always generate a day which is no more than 28 var day = rand.nextInt(28) + 1 var registerDate = year + "-" + month + "-" + day //generate the role of the user var roleIndex:Int = rand.nextInt(ROLE_ID_ARRAY.length) var role = ROLE_ID_ARRAY(roleIndex) //generate the region where the user is var regionIndex:Int = rand.nextInt(REGION_ID_ARRAY.length) var region = REGION_ID_ARRAY(regionIndex) writer.write(i + " " + gender + " " + age + " " + registerDate + " " + role + " " + region) writer.write(System.getProperty("line.separator")) } writer.flush() } catch { case e:Exception => println("Error occurred:" + e) } finally { if (writer != null) writer.close() } println("User Data File generated successfully.") } private def getRandomGender() :String = { val rand = new Random() val randNum = rand.nextInt(2) + 1 if (randNum % 2 == 0) { "M" } else { "F" } } } ~~~ 对于交易数据,它是一个包含 5 个列 (交易单号, 交易日期, 产品种类, 价格, 用户 ID) 的文本文件,具有以下格式。 图 9\. 案例二测试交易数据格式预览 ![图 9\. 案例二测试交易数据格式预览](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f1c96e1e.png) 对于交易数据,我们使用以下 Scala 程序来生成。 清单 4\. 案例二交易测试数据生成类源代码 ~~~ import java.io.{File, FileWriter} import scala.util.Random object ConsumingDataGenerator { private val FILE_PATH = "C:\\LOCAL_DISK_D\\sample_consuming_data.txt" // how many records to be generated private val MAX_RECORDS = 100000000 // we suppose only 10 kinds of products in the consuming data private val PRODUCT_ID_ARRAY = Array[Int](1,2,3,4,5,6,7,8,9,10) // we suppose the price of most expensive product will not exceed 2000 RMB private val MAX_PRICE = 2000 // we suppose the price of cheapest product will not be lower than 10 RMB private val MIN_PRICE = 10 //the users number which should be same as the one in UserDataGenerator object private val USERS_NUM = 10000000 def main(args:Array[String]): Unit = { generateDataFile(FILE_PATH,MAX_RECORDS); } private def generateDataFile(filePath : String, recordNum: Int): Unit = { var writer:FileWriter = null try { writer = new FileWriter(filePath,true) val rand = new Random() for (i <- 1 to recordNum) { //generate the buying date var year = rand.nextInt(16) + 2000 var month = rand.nextInt(12)+1 //to avoid checking if it is a valid day for specific // month,we always generate a day which is no more than 28 var day = rand.nextInt(28) + 1 var recordDate = year + "-" + month + "-" + day //generate the product ID var index:Int = rand.nextInt(PRODUCT_ID_ARRAY.length) var productID = PRODUCT_ID_ARRAY(index) //generate the product price var price:Int = rand.nextInt(MAX_PRICE) if (price == 0) { price = MIN_PRICE } // which user buys this product val userID = rand.nextInt(10000000)+1 writer.write(i + " " + recordDate + " " + productID + " " + price + " " + userID) writer.write(System.getProperty("line.separator")) } writer.flush() } catch { case e:Exception => println("Error occurred:" + e) } finally { if (writer != null) writer.close() } println("Consuming Data File generated successfully.") } } ~~~ **b.编码实现** 清单 5\. 案例二示例程序源代码 ~~~ import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkContext, SparkConf} //define case class for user case class User(userID: String, gender: String, age: Int, registerDate: String,role: String, region: String) //define case class for consuming data case class Order(orderID: String, orderDate: String, productID: Int, price: Int, userID: String) object UserConsumingDataStatistics { def main(args: Array[String]) { if (args.length < 1) { println("Usage:UserConsumingDataStatistics userDataFilePath consumingDataFilePath") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:User Consuming Data Statistics") //Kryo serializer is more quickly by default java serializer conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ctx = new SparkContext(conf) val sqlCtx = new SQLContext(ctx) import sqlCtx.implicits._ //Convert user data RDD to a DataFrame and register it as a temp table val userDF = ctx.textFile(args(0)).map(_.split(" ")).map( u => User(u(0), u(1), u(2).toInt,u(3),u(4),u(5))).toDF() userDF.registerTempTable("user") //Convert consuming data RDD to a DataFrame and register it as a temp table val orderDF = ctx.textFile(args(1)).map(_.split(" ")).map(o => Order( o(0), o(1), o(2).toInt,o(3).toInt,o(4))).toDF() orderDF.registerTempTable("orders") //cache the DF in memory with serializer should make the program run much faster userDF.persist(StorageLevel.MEMORY_ONLY_SER) orderDF.persist(StorageLevel.MEMORY_ONLY_SER) //The number of people who have orders in the year 2015 val count = orderDF.filter(orderDF("orderDate").contains("2015")).join( userDF, orderDF("userID").equalTo(userDF("userID"))).count() println("The number of people who have orders in the year 2015:" + count) //total orders produced in the year 2014 val countOfOrders2014 = sqlCtx.sql("SELECT * FROM orders where orderDate like '2014%'").count() println("total orders produced in the year 2014:" + countOfOrders2014) //Orders that are produced by user with ID 1 information overview val countOfOrdersForUser1 = sqlCtx.sql("SELECT o.orderID,o.productID, o.price,u.userID FROM orders o,user u where u.userID = 1 and u.userID = o.userID").show() println("Orders produced by user with ID 1 showed.") //Calculate the max,min,avg prices for the orders that are producted by user with ID 10 val orderStatsForUser10 = sqlCtx.sql("SELECT max(o.price) as maxPrice, min(o.price) as minPrice,avg(o.price) as avgPrice,u.userID FROM orders o, user u where u.userID = 10 and u.userID = o.userID group by u.userID") println("Order statistic result for user with ID 10:") orderStatsForUser10.collect().map(order => "Minimum Price=" + order.getAs("minPrice") + ";Maximum Price=" + order.getAs("maxPrice") + ";Average Price=" + order.getAs("avgPrice") ).foreach(result => println(result)) } } ~~~ **c.提交并执行** ~~~ ./spark-submit –class com.ibm.spark.exercise.sql.UserConsumingDataStatistics \ --master spark://<spark_master_node_ip>:7077 \ --num-executors 6 \ --driver-memory 8g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/spark_exercise-1.0.jar \ hdfs://<hdfs_namenode_ip>:9000/user/fams/inputfiles/sample_user_data.txt \ hdfs://<hdfs_namenode_ip>:9000/user/fams/inputfiles/sample_consuming_data.txt ~~~ **d.监控执行过程** 程序提交后,读者可以用案例一描述的方式在 Spark web console 监控执行过程,这样也能帮助您深入的理解 Spark SQL 程序的执行过程。 **e.运行结果** 图 10\. 案例二程序运行结果 (部分) ![图 10\. 案例二程序运行结果 (部分)](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f1d3ed2d.png) ## 总结 关于 Spark SQL 程序开发,我们通常有以下需要注意的地方。 1. Spark SQL 程序开发过程中,我们有两种方式确定 schema,第一种是反射推断 schema,如本文的案例二,这种方式下,我们需要定义样本类 (case class) 来对应数据的列;第二种方式是通过编程方式来确定 schema,这种方式主要是通过 Spark SQL 提供的 StructType 和 StructField 等 API 来编程实现,这种方式下我们不需要定义样本类,如本文中的案例一。 在程序实现中,我们需要使用![](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023f1e156f6.png)以便隐式的把 RDD 转化成 DataFrame 来操作。 2. 本文展示的 DataFrame API 使用的方法只是其中很小的一部分,但是一旦读者掌握了开发的基本流程,就能通过参考 [DataFrame API 文档](http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.DataFrame)写出更为复杂的程序。 3. 通常来说,我们有两种方式了解 Spark 程序的执行流程。第一种是通过在控制台观察输出日志,另一种则更直观,就是通过 Spark Web Console 来观察 Driver 程序里各个部分产生的 job 信息以及 job 里包含的 stages 信息。 4. 需要指出的是,熟练的掌握 Spark SQL/DataFrame 的知识对学习最新的 Spark 机器学习库 ML Pipeline 至关重要,因为 ML Pipeline 使用 DataFrame 作为数据集来支持多种的数据类型。 5. 笔者在测试的过程中发现,处理相同的数据集和类似的操作,Spark SQL/DataFrame 比传统的 RDD 转换操作具有更好的性能。这是由于 SQL 模块的 Catalyst 对用户 SQL 做了很好的查询优化。在以后的文章中会向读者详细的介绍该组件。 ## 结束语 本文通过两个案例向读者详细的介绍了使用 Spark SQL/DataFrame 处理结构化数据的过程,限于篇幅,我们并没有在文中向读者详细介绍 Spark SQL/DataFrame 程序的执行流程,以及 Catalyst SQL 解析和查询优化引擎。这个将会在本系列后面的文章中介绍。其实文中提供的测试数据还可以用来做更为复杂的 Spark SQL 测试,读者可以基于本文,进行更多的工作。需要指出的是,由于我们用到的数据是靠程序随机生成的,所以部分数据难免有不符合实际的情况,读者应该关注在使用 Spark SQL/DataFrame 处理这些数据的过程。最后,感谢您耐心的阅读本文,如果您有任何问题或者想法,请在文末留言,我们可以进行深入的讨论。让我们互相学习,共同进步。 ## 参考资料 ### 学习 * 参考[Spark SQL/DataFrame 官网文档](http://spark.apache.org/docs/latest/sql-programming-guide.html),了解 Spark SQL/DataFrame 的基本原理和编程模型。 * 参考[Spark Scala API 文档](http://spark.apache.org/docs/1.4.1/api/scala/index.html#package),了解 Spark SQL/DataFrame 相关 API 的使用。 * [developerWorks 开源技术主题](http://www.ibm.com/developerworks/cn/opensource/):查找丰富的操作信息、工具和项目更新,帮助您掌握开源技术并将其用于 IBM 产品。 ### 讨论 * 加入 [developerWorks 中文社区](http://www.ibm.com/developerworks/cn/community/),查看开发人员推动的博客、论坛、组和维基,并与其他 developerWorks 用户交流。
';

第 2 部分

最后更新于:2022-04-01 03:37:26

> 原文出处:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/ > 作者:王 龙, 软件开发工程师, IBM # 使用 Kafka 和 Spark Streaming 构建实时数据处理系统 本文旨在通过具有实际意义的案例向读者介绍如何使用 Kafka 分布式消息框架和 Spark 的 Streaming 模块构建一个实时的数据处理系统。内容将涉及数据产生,数据读取,数据处理,结果存储等数据系统处理的基本环节,也会提出一些开放式的问题,供读者一起讨论。 [TOC=2,3] ## 引言 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要。流计算的出现,就是为了更好地解决这类数据在处理过程中遇到的问题。与传统架构不同,流计算模型在数据流动的过程中实时地进行捕捉和处理,并根据业务需求对数据进行计算分析,最终把结果保存或者分发给需要的组件。本文将从实时数据产生和流向的各个环节出发,通过一个具有实际意义的案例,向读者介绍如何使用 Apache Kafka 和 Spark Streaming 模块构建一个实时的数据处理系统,当然本文只是抛砖引玉,因为构建一个良好健壮的实时数据处理系统并不是一篇文章可以说清楚的。在阅读本文前,假设您已经对 Apache Kafka 分布式消息系统有了基本的了解,并且可以使用 Spark Streaming API 进行简单的编程。接下来,就让我们一起看看如何构建一个简易的实时数据处理系统吧。 ## 关于 Kafka Kafka 是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统,最早是由 Linkedin 开发,并于 2011 年开源并贡献给 Apache 软件基金会。一般来说,Kafka 有以下几个典型的应用场景: * 作为消息队列。由于 Kafka 拥有高吞吐量,并且内置消息主题分区,备份,容错等特性,使得它更适合使用在大规模,高强度的消息数据处理的系统中。 * 流计算系统的数据源。流数据产生系统作为 Kafka 消息数据的生产者将数据流分发给 Kafka 消息主题,流数据计算系统 (Storm,Spark Streaming 等) 实时消费并计算数据。这也是本文将要介绍的应用场景。 * 系统用户行为数据源。这种场景下,系统将用户的行为数据,如访问页面,停留时间,搜索日志,感兴趣的话题等数据实时或者周期性的发布到 Kafka 消息主题,作为对接系统数据的来源。 * 日志聚集。Kafka 可以作为一个日志收集系统的替代解决方案,我们可以将系统日志数据按类别汇集到不同的 Kafka 消息主题中。 * 事件源。在基于事件驱动的系统中,我们可以将事件设计成合理的格式,作为 Kafka 消息数据存储起来,以便相应系统模块做实时或者定期处理。由于 Kafka 支持大数据量存储,并且有备份和容错机制,所以可以让事件驱动型系统更加健壮和高效。 当然 Kafka 还可以支持其他的应用场景,在这里我们就不一一罗列了。关于 Kafka 更详细的介绍,请读者参考[Kafka 官网](https://kafka.apache.org/)。需要指出的是,本文使用的 Kafka 版本是基于 Scala 2.10 版本构建的 0.8.2.1 版本。 ## 关于 Spark Steaming Spark Streaming 模块是对于 Spark Core 的一个扩展,目的是为了以高吞吐量,并且容错的方式处理持续性的数据流。目前 Spark Streaming 支持的外部数据源有 Flume、 Kafka、Twitter、ZeroMQ、TCP Socket 等。 Discretized Stream 也叫 DStream) 是 Spark Streaming 对于持续数据流的一种基本抽象,在内部实现上,DStream 会被表示成一系列连续的 RDD(弹性分布式数据集),每一个 RDD 都代表一定时间间隔内到达的数据。所以在对 DStream 进行操作时,会被 Spark Stream 引擎转化成对底层 RDD 的操作。对 Dstream 的操作类型有: * **Transformations: **类似于对 RDD 的操作,Spark Streaming 提供了一系列的转换操作去支持对 DStream 的修改。如 map,union,filter,transform 等 * **Window Operations: **窗口操作支持通过设置窗口长度和滑动间隔的方式操作数据。常用的操作有 reduceByWindow,reduceByKeyAndWindow,window 等 * **Output Operations: **输出操作允许将 DStream 数据推送到其他外部系统或存储平台, 如 HDFS, Database 等,类似于 RDD 的 Action 操作,Output 操作也会实际上触发对 DStream 的转换操作。常用的操作有 print,saveAsTextFiles,saveAsHadoopFiles, foreachRDD 等。 关于 DStream Operations 的更多信息,请参考 Spark 官网的 [Spark Streaming Programing Guide](http://spark.apache.org/docs/latest/streaming-programming-guide.html)。 ## Kafka 集群搭建步骤 1\. 机器准备 本文中,我们将准备三台机器搭建 Kafka 集群,IP 地址分别是 192.168.1.1,192.168.1.2,192.168.1.3,并且三台机器网络互通。 2\. 下载并安装 kafka_2.10-0.8.2.1 下载地址: [https://kafka.apache.org/downloads.html](https://kafka.apache.org/downloads.html) 下载完成后,上传到目标机器中的一个,如 192.168.1.1 , 使用以下命令解压: 清单 1\. Kafka 安装包解压命令 ~~~ tar –xvf kafka_2.10-0.8.2.1 ~~~ 安装完成。 3\. 创建 zookeeper 数据目录并设定服务器编号 在所有三台服务器上执行下面操作。 切换到当前用户工作目录,如/home/fams , 创建 zookeeper 保存数据的目录, 然后在这个目录下新建服务器编号文件。 清单 2\. 创建数据目录和服务器编号文件命令 ~~~ mkdir zk_data cat N > myid ~~~ 注意需要保证 N 在三台服务器上取不同值,如分别取 1,2,3。 4\. 编辑 zookeeper 配置文件 Kafka 安装包中内置 zookeeper 服务。进入 Kafka 安装目录, 如/home/fams/kafka_2.10-0.8.2.1, 编辑 config/zookeeper.properties 文件,增加以下配置: 清单 3\. zookeeper 配置项 ~~~ tickTime=2000 dataDir=/home/fams/zk_data/ clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.1.1:2888:3888 server.2=192.168.1.3:2888:3888 server.3=192.168.1.3:2888:3888 ~~~ 这些配置项的解释如下: * tickTime:zookeeper 服务器之间的心跳时间间隔,以毫秒为单位。 * dataDir:zookeeper 的数据保存目录,我们也把 zookeeper 服务器的 ID 文件保存到这个目录下,下文会介绍。 * clientPort:zookeeper 服务器会监听这个端口,然后等待客户端连接。 * initLimit:zookeeper 集群中 follower 服务器和 leader 服务器之间建立初始连接时所能容忍的心跳次数的极限值。 * syncLimit:zookeeper 集群中 follower 服务器和 leader 服务器之间请求和应答过程中所能容忍的心跳次数的极限值。 * server.N:N 代表的是 zookeeper 集群服务器的编号。对于配置值,以 192.168.1.1:2888:3888 为例,192.168.1.1 表示该服务器的 IP 地址,2888 端口表示该服务器与 leader 服务器的数据交换端口,3888 表示选举新的 leader 服务器时候用到的通信端口。 5.编辑 Kafka 配置文件 **a**. 编辑 config/server.properties 文件 添加或修改以下配置。 清单 4\. Kafka Broker 配置项 ~~~ broker.id=0 port=9092 host.name=192.168.1.1 zookeeper.contact=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 log.dirs=/home/fams/kafka-logs ~~~ 这些配置项解释如下: * broker.id:Kafka broker 的唯一标识,集群中不能重复。 * port: Broker 的监听端口,用于监听 Producer 或者 Consumer 的连接。 * host.name:当前 Broker 服务器的 IP 地址或者机器名。 * zookeeper.contact:Broker 作为 zookeeper 的 client,可以连接的 zookeeper 的地址信息。 * log.dirs:日志保存目录。 **b**. 编辑 config/producer.properties 文件 添加或者修改以下配置: 清单 5\. Kafka Producer 配置项 ~~~ broker.list=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 producer.type=async ~~~ 这些配置项解释如下: * broker.list:集群中 Broker 地址列表。 * producer.type: Producer 类型,async 异步生产者,sync 同步生产者。 **c**. 编辑 config/consumer.properties 文件 清单 6\. Kafka Consumer 配置项 ~~~ zookeeper.contact=192.168.0.10:2181,192.168.0.11:2181,192.168.0.12:2181 ~~~ 配置项解释如下: * zookeeper.contact: Consumer 可以连接的 zookeeper 服务器地址列表。 6.上传修改好的安装包到其他机器 至此,我们已经在 192.168.1.1 机器上修改好了所有需要的配置文件,那么接下来请用以下命令打包该 Kafka 安装包,并上传至 192.168.1.2 和 192.168.1.3 两台机器上。 清单 7\. 打包并上传 Kafka 安装包的命令 ~~~ tar –cvf kafka_2.10-0.8.2.1.tar ./kafka_2.10-0.8.2.1 scp ./kafka_2.10-0.8.2.1.tar fams@192.168.1.2:/home/fams scp ./kafka_2.10-0.8.2.1.tar fams@192.168.1.3:/home/fams ~~~ 上传完成后,我们需要到 192.168.1.2 和 192.168.1.3 两台机器上解压刚才上传的 tar 包,命令如清单一。之后需要分别在两台机器上修改 config/server.properties 文件中的 broker.id 和 host.name. broker.id,可以分别复制 1 和 2,host.name 需要改成当前机器的 IP。 7\. 启动 zookeeper 和 Kafka 服务 分别在三台机器上运行下面命令启动 zookeeper 和 Kafka 服务。 清单 8\. 启动 zookeeper 服务 ~~~ nohup bin/zookeeper-server-start.sh config/zookeeper.properties & ~~~ 清单 9\. 启动 kafka 服务 ~~~ nohup bin/kafka-server-start.sh config/server.properties & ~~~ 8\. 验证安装 我们的验证步骤有两个。 第一步,分别在三台机器上使用下面命令查看是否有 Kafka 和 zookeeper 相关服务进程。 清单 10\. 查看 Kafka 和 zookeeper 服务进程 ~~~ ps –ef | grep kafka ~~~ 第二步,创建消息主题,并通过 console producer 和 console consumer 验证消息可以被正常的生产和消费。 清单 11\. 创建消息主题 ~~~ bin/kafka-topics.sh --create \ --replication-factor 3 \ --partition 3 \ --topic user-behavior-topic \ --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 ~~~ 运行下面命令打开打开 console producer。 清单 12\. 启动 Console Producer ~~~ bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092 --topic user-behavior-topic ~~~ 在另一台机器打开 console consumer。 清单 13\. 启动 Console Consumer ~~~ ./kafka-console-consumer.sh --zookeeper 192.168.1.2:2181 --topic user-behavior-topic --from-beginning ~~~ 然后如果在 producer console 输入一条消息,能从 consumer console 看到这条消息就代表安装是成功的。 ## 案例介绍与编程实现 1\. 案例介绍 该案例中,我们假设某论坛需要根据用户对站内网页的点击量,停留时间,以及是否点赞,来近实时的计算网页热度,进而动态的更新网站的今日热点模块,把最热话题的链接显示其中。 2\. 案例分析 对于某一个访问论坛的用户,我们需要对他的行为数据做一个抽象,以便于解释网页话题热度的计算过程。 首先,我们通过一个向量来定义用户对于某个网页的行为即点击的网页,停留时间,以及是否点赞,可以表示如下: (page001.html, 1, 0.5, 1) 向量的第一项表示网页的 ID,第二项表示从进入网站到离开对该网页的点击次数,第三项表示停留时间,以分钟为单位,第四项是代表是否点赞,1 为赞,-1 表示踩,0 表示中立。 其次,我们再按照各个行为对计算网页话题热度的贡献,给其设定一个权重,在本文中,我们假设点击次数权重是 0.8,因为用户可能是由于没有其他更好的话题,所以再次浏览这个话题。停留时间权重是 0.8,因为用户可能同时打开多个 tab 页,但他真正关注的只是其中一个话题。是否点赞权重是 1,因为这一般表示用户对该网页的话题很有兴趣。 最后,我们定义用下列公式计算某条行为数据对于该网页热度的贡献值。 f(x,y,z)=0.8x+0.8y+z 那么对于上面的行为数据 (page001.html, 1, 0.5, 1),利用公式可得: H(page001)=f(x,y,z)= 0.8x+0.8y+z=0.8*1+0.8*0.5+1*1=2.2 读者可以留意到,在这个过程中,我们忽略了用户本身,也就是说我们不关注用户是谁,而只关注它对于网页热度所做的贡献。 3\. 生产行为数据消息 在本案例中我们将使用一段程序来模拟用户行为,该程序每隔 5 秒钟会随机的向 user-behavior-topic 主题推送 0 到 50 条行为数据消息,显然,这个程序扮演消息生产者的角色,在实际应用中,这个功能一般会由一个系统来提供。为了简化消息处理,我们定义消息的格式如下: 网页 ID|点击次数|停留时间 (分钟)|是否点赞 并假设该网站只有 100 个网页。以下是该类的 Scala 实现源码。 清单 14\. UserBehaviorMsgProducer 类源码 ~~~ import scala.util.Random import java.util.Properties import kafka.producer.KeyedMessage import kafka.producer.ProducerConfig import kafka.producer.Producer class UserBehaviorMsgProducer(brokers: String, topic: String) extends Runnable { private val brokerList = brokers private val targetTopic = topic private val props = new Properties() props.put("metadata.broker.list", this.brokerList) props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("producer.type", "async") private val config = new ProducerConfig(this.props) private val producer = new Producer[String, String](this.config) private val PAGE_NUM = 100 private val MAX_MSG_NUM = 3 private val MAX_CLICK_TIME = 5 private val MAX_STAY_TIME = 10 //Like,1;Dislike -1;No Feeling 0 private val LIKE_OR_NOT = Array[Int](1, 0, -1) def run(): Unit = { val rand = new Random() while (true) { //how many user behavior messages will be produced val msgNum = rand.nextInt(MAX_MSG_NUM) + 1 try { //generate the message with format like page1|2|7.123|1 for (i <- 0 to msgNum) { var msg = new StringBuilder() msg.append("page" + (rand.nextInt(PAGE_NUM) + 1)) msg.append("|") msg.append(rand.nextInt(MAX_CLICK_TIME) + 1) msg.append("|") msg.append(rand.nextInt(MAX_CLICK_TIME) + rand.nextFloat()) msg.append("|") msg.append(LIKE_OR_NOT(rand.nextInt(3))) println(msg.toString()) //send the generated message to broker sendMessage(msg.toString()) } println("%d user behavior messages produced.".format(msgNum+1)) } catch { case e: Exception => println(e) } try { //sleep for 5 seconds after send a micro batch of message Thread.sleep(5000) } catch { case e: Exception => println(e) } } } def sendMessage(message: String) = { try { val data = new KeyedMessage[String, String](this.topic, message); producer.send(data); } catch { case e:Exception => println(e) } } } object UserBehaviorMsgProducerClient { def main(args: Array[String]) { if (args.length < 2) { println("Usage:UserBehaviorMsgProducerClient 192.168.1.1:9092 user-behavior-topic") System.exit(1) } //start the message producer thread new Thread(new UserBehaviorMsgProducer(args(0), args(1))).start() } } ~~~ 4\. 编写 Spark Streaming 程序消费消息 在弄清楚了要解决的问题之后,就可以开始编码实现了。对于本案例中的问题,在实现上的基本步骤如下: * 构建 Spark 的 StreamingContext 实例,并且开启 checkpoint 功能。因为我们需要使用 updateStateByKey 原语去累计的更新网页话题的热度值。 * 利用 Spark 提供的 KafkaUtils.createStream 方法消费消息主题,这个方法会返回 ReceiverInputDStream 对象实例。 * 对于每一条消息,利用上文的公式计算网页话题的热度值。 * 定义一个匿名函数去把网页热度上一次的计算结果值和新计算的值相加,得到最新的热度值。 * 调用 updateStateByKey 原语并传入上面定义的匿名函数更新网页热度值。 * 最后得到最新结果后,需要对结果进行排序,最后打印热度值最高的 10 个网页。 源代码如下。 清单 15\. WebPagePopularityValueCalculator 类源码 ~~~ import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.HashPartitioner import org.apache.spark.streaming.Duration object WebPagePopularityValueCalculator { private val checkpointDir = "popularity-data-checkpoint" private val msgConsumerGroup = "user-behavior-topic-message-consumer-group" def main(args: Array[String]) { if (args.length < 2) { println("Usage:WebPagePopularityValueCalculator zkserver1:2181, zkserver2:2181,zkserver3:2181 consumeMsgDataTimeInterval(secs)") System.exit(1) } val Array(zkServers,processingInterval) = args val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator") val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt)) //using updateStateByKey asks for enabling checkpoint ssc.checkpoint(checkpointDir) val kafkaStream = KafkaUtils.createStream( //Spark streaming context ssc, //zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,... zkServers, //kafka message consumer group ID msgConsumerGroup, //Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread Map("user-behavior-topic" -> 3)) val msgDataRDD = kafkaStream.map(_._2) //for debug use only //println("Coming data in this interval...") //msgDataRDD.print() // e.g page37|5|1.5119122|-1 val popularityData = msgDataRDD.map { msgLine => { val dataArr: Array[String] = msgLine.split("\\|") val pageID = dataArr(0) //calculate the popularity value val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 (pageID, popValue) } } //sum the previous popularity value and current value val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(t => { val newValue:Double = t._2.sum val stateValue:Double = t._3.getOrElse(0); Some(newValue + stateValue) }.map(sumedValue => (t._1, sumedValue))) } val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00))) val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) //set the checkpoint interval to avoid too frequently data checkpoint which may //may significantly reduce operation throughput stateDstream.checkpoint(Duration(8*processingInterval.toInt*1000)) //after calculation, we need to sort the result and only show the top 10 hot pages stateDstream.foreachRDD { rdd => { val sortedData = rdd.map{ case (k,v) => (v,k) }.sortByKey(false) val topKData = sortedData.take(10).map{ case (v,k) => (k,v) } topKData.foreach(x => { println(x) }) } } ssc.start() ssc.awaitTermination() } } ~~~ ## 部署和测试 读者可以参考以下步骤部署并测试本案例提供的示例程序。 第一步,启动行为消息生产者程序, 可以直接在 Scala IDE 中启动,不过需要添加启动参数,第一个是 Kafka Broker 地址,第二个是目标消息主题的名称。 图 1\. UserBehaviorMsgProducer 类启动参数 ![图 1\. UserBehaviorMsgProducer 类启动参数](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023ce8e1524.jpg) 启动后,可以看到控制台有行为消息数据生成。 图 2\. 生成的行为消息数据预览 ![图 2\. 生成的行为消息数据预览](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023cea5024e.jpg) 第二步,启动作为行为消息消费者的 Spark Streaming 程序,需要在 Spark 集群环境中启动,命令如下: 清单 16\. WebPagePopularityValueCalculator 类启动命令 ~~~ bin/spark-submit \ --jars $SPARK_HOME/lib/spark-streaming-kafka_2.10-1.3.1.jar, \ $SPARK_HOME/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar, \ $SPARK_HOME/lib/kafka_2.10-0.8.2.1.jar, \ $SPARK_HOME/lib/kafka-clients-0.8.2.1.jar \ --class com.ibm.spark.exercise.streaming.WebPagePopularityValueCalculator --master spark://<spark_master_ip>:7077 \ --num-executors 4 \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 2 ~~~ 由于程序中我们要用到或者间接调用 Kafka 的 API,并且需要调用 Spark Streaming 集成 Kafka 的 API(KafkaUtils.createStream), 所以需要提前将启动命令中的 jar 包上传到 Spark 集群的每个机器上 (本例中我们将它们上传到 Spark 安装目录的 lib 目录下,即$SPARK_HOME/lib),并在启动命令中引用它们。 启动后,我们可以看到命令行 console 下面有消息打印出来,即计算的热度值最高的 10 个网页。 图 3\. 网页话题热度当前排序预览 ![图 3\. 网页话题热度当前排序预览](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023ceba4075.jpg) 我们也可以到 Spark Web Console 上去查看当前 Spark 程序的运行状态, 默认地址为: http://spark_master_ip:8080。 图 4\. 查看 Spark Streaming 程序的运行状态 ![图 4\. 查看 Spark Streaming 程序的运行状态](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023cee760c1.jpg) ## 注意事项 利用 Spark Streaming 构建一个高效健壮的流数据计算系统,我们还需要注意以下方面。 * 需要合理的设置数据处理的间隔,即需要保证每一批数据的处理时间必须小于处理间隔,保证在处理下一批数据的时候,前一批已经处理完毕。显然这需要由您的 Spark 集群的计算能力还有 input 数据的量决定。 * 需要尽可能的提升读取 input 数据的能力。在 Spark Streaming 与外部系统如 Kafka,Flume 等集成时,为了避免接收数据环节成为系统的瓶颈,我们可以启动多个 ReceiverInputDStream 对象实例。 * 虽然本文案例中,我们只是把 (近) 实时计算结果打印出来,但是实际上很多时候这些结果会被保存到数据库,HDFS, 或者发送回 Kafka, 以供其他系统利用这些数据做进一步的业务处理。 * 由于流计算对实时性要求很高,所以任何由于 JVM Full GC 引起的系统暂停都是不可接受的。除了在程序中合理使用内存,并且定期清理不需要的缓存数据外,CMS(Concurrent Mark and Sweep) GC 也是被 Spark 官方推荐的 GC 方式,它能有效的把由于 GC 引起的暂停维持在一个在很低的水平。我们可以在使用 spark-submit 命令时通过增加 --driver-java-options 选项来添加 CMS GC 相关的参数。 * 在 Spark 官方提供关于集成 Kafka 和 Spark Streaming 的指导文档中,提到了两种方式,第一种是 Receiver Based Approach,即通过在 Receiver 里实现 Kafka consumer 的功能来接收消息数据;第二种是 Direct Approach, 即不通过 Receiver,而是周期性的主动查询 Kafka 消息分区中的最新 offset 值,进而去定义在每个 batch 中需要处理的消息的 offset 范围。本文采用的是第一种方式,因为目前第二种方式还处于试验阶段。 * 如果采用 Receiver Based Approach 集成 Kafka 和 Spark Streaming,就需要考虑到由于 Driver 或者 Worker 节点宕机而造成的数据丢失的情况,在默认配置下,是有可能造成数据丢失的,除非我们开启 Write Ahead Log(WAL) 功能。在这种情况下,从 Kafka 接收到的消息数据会同步的被写入到 WAL 并保存到可靠的分布式文件系统上,如 HDFS。可以通过在 Spark 配置文件中 (conf/spark-defaults.conf) 把 spark.streaming.receiver.writeAheadLog.enable 配置项设置成 true 开启这个功能。当然在开启 WAL 的情况下,会造成单个 Receiver 吞吐量下降,这时候,我们可能需要并行的运行多个 Receiver 来改善这种情况。 * 由于 updateStateByKey 操作需要开启 checkpoint 功能,但是频繁的 checkpoint 会造成程序处理时间增长,也会造成吞吐量下降。默认情况下,checkpoint 时间间隔会取 steaming 程序数据处理间隔或者 10 秒两者中较大的那个。官方推荐的间隔是 streaming 程序数据处理间隔的 5-10 倍。可以通过 dsteam.checkpoint(checkpointInterval) 来设置,参数需要用样本类 Duration 包装下,单位是毫秒。 ## 结束语 本文包含了集成 Spark Streaming 和 Kafka 分布式消息系统的基本知识,但是需要指出的是,在实际问题中,我们可能面临更多的问题,如性能优化,内存不足,以及其他未曾遇到的问题。希望通过本文的阅读,读者能对使用 Spark Streaming 和 Kafka 构建实时数据处理系统有一个基本的认识,为读者进行更深入的研究提供一个参考依据。读者在阅读本文的时候发现任何问题或者有任何建议,请不吝赐教,留下您的评论,我会及时回复。希望我们可以一起讨论,共同进步。 ## 参考资料 ### 学习 * 参考 Spark 官网的 [Spark Streaming](http://spark.apache.org/docs/latest/streaming-programming-guide.html)编程指导,了解 Spark Streaming 编程的基本知识和需要注意的方面。 * 查看 [Scala 官网](http://www.scala-lang.org/),了解更多关于 Scala 语言的内容。 * [developerWorks 开源技术主题](http://www.ibm.com/developerworks/cn/opensource/):查找丰富的操作信息、工具和项目更新,帮助您掌握开源技术并将其用于 IBM 产品。 ### 讨论 * 加入 [developerWorks 中文社区](http://www.ibm.com/developerworks/cn/community/),查看开发人员推动的博客、论坛、组和维基,并与其他 developerWorks 用户交流。
';

第 1 部分

最后更新于:2022-04-01 03:37:24

> 原文出处:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/ > 作者:王 龙, 软件开发工程师, IBM # 使用 Scala 语言开发 Spark 应用程序 本文旨在通过具有实际意义的案例向读者介绍如何使用 Scala 语言开发 Spark 应用程序并在 Spark 集群上运行。本文涉及的所有源数据都将从 HDFS(Hadoop Distributed File System)读取,部分案例的输出结果也会写入到 HDFS, 所以通过阅读本文,读者也会学习到 Spark 和 HDFS 交互的一些知识。 [TOC=2,3] ## 引言 在当前这个信息时代里,大数据所蕴含的价值已经被绝大多数的企业所认知。在 IT 的世界里,往往都是需求驱动技术的发展和革新。Hadoop 在这个大背景下应运而生,它给我们提供了一个存储和处理大数据的良好的解决方案,短短的几年时间里,它已无处不在,事实上它已经成了大数据技术的代名词。然而在人们越来越多的使用 Hadoop 提供的 MapReduce 框架处理大数据的时候,却发现它存在许多天生的缺陷, 如效率低,编程模型不够灵活,只适合做离线计算等。Spark 的出现无疑让诸多大数据计算的从业者和爱好者眼前一亮,它基于内存,并且提供了更加丰富的算子使得我们可以更高效和灵活的处理大数据。本文将从实例出发,向读者介绍如何使用 Scala 语言 (Spark 框架的开发语言) 开发 Spark 应用程序并且将其运行在 Spark 集群环境里。本文假设读者已经对 Spark 基本原理和编程模型有了基本的了解,并且已经掌握了 Scala 语言开发的基础知识,那么通过阅读本文,相信您一定会对 Spark 应用程序的开发有更深入的认识。接下来,就让我们开始 Spark 应用程序的开发之旅吧。 ## 关于 Spark Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的大数据处理的应用程序。并且提供了用于机器学习 (MLlib), 流计算(Streaming), 图计算 (GraphX) 等子模块,最新的 1.4.0 版本更是提供了与 R 语言的集成,这使得 Spark 几乎成为了多领域通吃的全能技术。Spark 对数据的存储,转换,以及计算都是基于一个叫 RDD(Resilient Distributed Dataset) 分布式内存的抽象,应用程序对需要计算的数据的操作都是通过对 RDD 的一系列转化 (Transformation) 和动作 (Action) 算子完成的,其中转化算子可以把一个 RDD 转成另一个 RDD,如 filter 算子可以通过添加过滤条件生成一个只包含符合条件的数据的新的 RDD。动作算子负责完成最终的计算,如 count 算子可以计算出整个 RDD 表示的数据集中元素的个数。关于 Spark 所支持的算子 以及使用方法请参考 [Spark 官方网站](http://spark.apache.org/docs/latest/programming-guide.html)。本文所使用的 Spark 的发行版是 1.3.1,读者可根据需要下载相应的版本。 ## 关于 Scala Scala 语言是一门类 Java 的多范式语言,其设计初衷就是为了继承函数式编程的面向对象编程的各种特性,正如 [Scala 语言官网](http://www.scala-lang.org/) 描述的那样:Object-Oriented Meets Functional, 就是给出了一个关于 Scala 语言特性的最简单明了的概括。 Spark 框架使用 Scala 语言开发,那么使用 Scala 语言开发 Spark 应用程序就变成一件很自然的事情,虽然 Spark 提供了面向 Python,Java 等语言的编程接口,但是从各个方面来看使用 Scala 编程都是最简单最容易理解的,特别是当程序出现异常或者是需要通过学习源码来定位问题时,您会发现学习 Scala 语言来编写 Spark 应用程序是多么有意义的事情。关于 Scala 语言,如果您还没有基础,请参考 * [Scala 语言官网](http://www.scala-lang.org/) * [Scala 中文网](http://scalachina.com/node/61) * Twitter 提供的 [Scala 课堂](http://twitter.github.io/scala_school/zh_cn/index.html) * [面向 Java 开发人员的 Scala 指南系列](http://www.ibm.com/developerworks/cn/java/j-scala/) 由于 Spark 1.3.1 版本使用的是 Scala 2.10.x 版本,所以本文将使用 Scala 2.10.5 版本。 ## 搭建开发环境 1. 安装 Scala IDE 搭建 Scala 语言开发环境很容易,[Scala IDE 官网](http://scala-ide.org/download/current.html) 下载合适的版本并解压就可以完成安装,本文使用的版本是 4.1.0。 2. 安装 Scala 语言包 如果下载的 Scala IDE 自带的 Scala 语言包与 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不一致,那么就需要下载和本文所使用的 Spark 所匹配的版本,以确保实现的 Scala 程序不会因为版本问题而运行失败。 请下载并安装 [Scala 2.10.5 版本](http://www.scala-lang.org/download/2.10.5.html) 3. 安装 JDK 如果您的机器上没有安装 JDK,请下载并安装 1.6 版本以上的 JDK。 4. 创建并配置 Spark 工程 打开 Scala IDE,创建一个名称为 spark-exercise 的 Scala 工程。 图 1\. 创建 scala 工程 ![图 1\. 创建 scala 工程](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bc18b8c1.jpg) 在工程目录下创建一个 lib 文件夹,并且把您的 Spark 安装包下的 spark-assembly jar 包拷贝到 lib 目录下。 图 2\. Spark 开发 jar 包 ![图 2\. Spark 开发 jar 包](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bc2ca48f.jpg) 并且添加该 jar 包到工程的 classpath 并配置工程使用刚刚安装的 Scala 2.10.5 版本.,工程目录结构如下。 图 3\. 添加 jar 包到 classpath ![图 3\. 添加 jar 包到 classpath](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bc4587c0.jpg) ## 运行环境介绍 为了避免读者对本文案例运行环境产生困惑,本节会对本文用到的集群环境的基本情况做个简单介绍。 * 本文所有实例数据存储的环境是一个 8 个机器的 Hadoop 集群,文件系统总容量是 1.12T,NameNode 叫 hadoop036166, 服务端口是 9000。读者可以不关心具体的节点分布,因为这个不会影响到您阅读后面的文章。 * 本文运行实例程序使用的 Spark 集群是一个包含四个节点的 Standalone 模式的集群, 其中包含一个 Master 节点 (监听端口 7077) 和三个 Worker 节点,具体分布如下: | Server Name | Role | | --- | --- | | hadoop036166 | Master | | hadoop036187 | Worker | | hadoop036188 | Worker | | hadoop036227 | Worker | * Spark 提供一个 Web UI 去查看集群信息并且监控执行结果,默认地址是:http://:8080 ,对于该实例提交后我们也可以到 web 页面上去查看执行结果,当然也可以通过查看日志去找到执行结果。 图 4\. Spark 的 web console ![图 4\. Spark 的 web console](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bc55d64d.jpg) ## 案例分析与编程实现 ### 案例一 a. 案例描述 提起 Word Count(词频数统计),相信大家都不陌生,就是统计一个或者多个文件中单词出现的次数。本文将此作为一个入门级案例,由浅入深的开启使用 Scala 编写 Spark 大数据处理程序的大门。 b.案例分析 对于词频数统计,用 Spark 提供的算子来实现,我们首先需要将文本文件中的每一行转化成一个个的单词, 其次是对每一个出现的单词进行记一次数,最后就是把所有相同单词的计数相加得到最终的结果。 对于第一步我们自然的想到使用 flatMap 算子把一行文本 split 成多个单词,然后对于第二步我们需要使用 map 算子把单个的单词转化成一个有计数的 Key-Value 对,即 word -> (word,1). 对于最后一步统计相同单词的出现次数,我们需要使用 reduceByKey 算子把相同单词的计数相加得到最终结果。 c. 编程实现 清单 1.SparkWordCount 类源码 ~~~ import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object SparkWordCount { def FILE_NAME:String = "word_count_results_"; def main(args:Array[String]) { if (args.length < 1) { println("Usage:SparkWordCount FileName"); System.exit(1); } val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program"); val sc = new SparkContext(conf); val textFile = sc.textFile(args(0)); val wordCounts = textFile.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey((a, b) => a + b) //print the results,for debug use. //println("Word Count program running results:"); //wordCounts.collect().foreach(e => { //val (k,v) = e //println(k+"="+v) //}); wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis()); println("Word Count program running results are successfully saved."); } } ~~~ d. 提交到集群执行 本实例中, 我们将统计 HDFS 文件系统中/user/fams 目录下所有 txt 文件中词频数。其中 spark-exercise.jar 是 Spark 工程打包后的 jar 包,这个 jar 包执行时会被上传到目标服务器的/home/fams 目录下。运行此实例的具体命令如下: 清单 2.SparkWordCount 类执行命令 ~~~ ./spark-submit \ --class com.ibm.spark.exercise.basic.SparkWordCount \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/*.txt ~~~ e. 监控执行状态 该实例把最终的结果存储在了 HDFS 上,那么如果程序运行正常我们可以在 HDFS 上找到生成的文件信息 图 5\. 案例一输出结果 ![图 5\. 案例一输出结果](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bc6e5d2b.jpg) 打开 Spark 集群的 Web UI, 可以看到刚才提交的 job 的执行结果。 图 6\. 案例一完成状态 ![图 6\. 案例一完成状态](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bc8f083a.jpg) 如果程序还没运行完成,那么我们可以在 Running Applications 列表里找到它。 ### 案例二 a. 案例描述 该案例中,我们将假设我们需要统计一个 1000 万人口的所有人的平均年龄,当然如果您想测试 Spark 对于大数据的处理能力,您可以把人口数放的更大,比如 1 亿人口,当然这个取决于测试所用集群的存储容量。假设这些年龄信息都存储在一个文件里,并且该文件的格式如下,第一列是 ID,第二列是年龄。 图 7\. 案例二测试数据格式预览 ![图 7\. 案例二测试数据格式预览](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bca97617.jpg) 现在我们需要用 Scala 写一个生成 1000 万人口年龄数据的文件,源程序如下: 清单 3\. 年龄信息文件生成类源码 ~~~ import java.io.FileWriter import java.io.File import scala.util.Random object SampleDataFileGenerator { def main(args:Array[String]) { val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false) val rand = new Random() for ( i <- 1 to 10000000) { writer.write( i + " " + rand.nextInt(100)) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close() } } ~~~ b. 案例分析 要计算平均年龄,那么首先需要对源文件对应的 RDD 进行处理,也就是将它转化成一个只包含年龄信息的 RDD,其次是计算元素个数即为总人数,然后是把所有年龄数加起来,最后平均年龄=总年龄/人数。 对于第一步我们需要使用 map 算子把源文件对应的 RDD 映射成一个新的只包含年龄数据的 RDD,很显然需要对在 map 算子的传入函数中使用 split 方法,得到数组后只取第二个元素即为年龄信息;第二步计算数据元素总数需要对于第一步映射的结果 RDD 使用 count 算子;第三步则是使用 reduce 算子对只包含年龄信息的 RDD 的所有元素用加法求和;最后使用除法计算平均年龄即可。 由于本例输出结果很简单,所以只打印在控制台即可。 c. 编程实现 清单 4.AvgAgeCalculator 类源码 ~~~ import org.apache.spark.SparkConf import org.apache.spark.SparkContext object AvgAgeCalculator { def main(args:Array[String]) { if (args.length < 1){ println("Usage:AvgAgeCalculator datafile") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator") val sc = new SparkContext(conf) val dataFile = sc.textFile(args(0), 5); val count = dataFile.count() val ageData = dataFile.map(line => line.split(" ")(1)) val totalAge = ageData.map(age => Integer.parseInt( String.valueOf(age))).collect().reduce((a,b) => a+b) println("Total Age:" + totalAge + ";Number of People:" + count ) val avgAge : Double = totalAge.toDouble / count.toDouble println("Average Age is " + avgAge) } } ~~~ d. 提交到集群执行 要执行本实例的程序,需要将刚刚生成的年龄信息文件上传到 HDFS 上,假设您刚才已经在目标机器上执行生成年龄信息文件的 Scala 类,并且文件被生成到了/home/fams 目录下。 那么您需要运行一下 HDFS 命令把文件拷贝到 HDFS 的/user/fams 目录。 清单 5\. 年龄信息文件拷贝到 HDFS 目录的命令 ~~~ hdfs dfs –copyFromLocal /home/fams /user/fams ~~~ 清单 6.AvgAgeCalculator 类的执行命令 ~~~ ./spark-submit \ --class com.ibm.spark.exercise.basic.AvgAgeCalculator \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt ~~~ e. 监控执行状态 在控制台您可以看到如下所示信息: 图 8\. 案例二输出结果 ![图 8\. 案例二输出结果](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bcc52f63.jpg) 我们也可以到 Spark Web Console 去查看 Job 的执行状态 图 9\. 案例二完成状态 ![图 9\. 案例二完成状态](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bcd7ccb5.jpg) ### 案例三 a. 案例描述 本案例假设我们需要对某个省的人口 (1 亿) 性别还有身高进行统计,需要计算出男女人数,男性中的最高和最低身高,以及女性中的最高和最低身高。本案例中用到的源文件有以下格式, 三列分别是 ID,性别,身高 (cm)。 图 10\. 案例三测试数据格式预览 ![图 10\. 案例三测试数据格式预览](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bd4317be.jpg) 我们将用以下 Scala 程序生成这个文件,源码如下: 清单 7\. 人口信息生成类源码 ~~~ import java.io.FileWriter import java.io.File import scala.util.Random object PeopleInfoFileGenerator { def main(args:Array[String]) { val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false) val rand = new Random() for ( i <- 1 to 100000000) { var height = rand.nextInt(220) if (height < 50) { height = height + 50 } var gender = getRandomGender if (height < 100 && gender == "M") height = height + 100 if (height < 100 && gender == "F") height = height + 50 writer.write( i + " " + getRandomGender + " " + height) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close() println("People Information File generated successfully.") } def getRandomGender() :String = { val rand = new Random() val randNum = rand.nextInt(2) + 1 if (randNum % 2 == 0) { "M" } else { "F" } } } ~~~ b. 案例分析 对于这个案例,我们要分别统计男女的信息,那么很自然的想到首先需要对于男女信息从源文件的对应的 RDD 中进行分离,这样会产生两个新的 RDD,分别包含男女信息;其次是分别对男女信息对应的 RDD 的数据进行进一步映射,使其只包含身高数据,这样我们又得到两个 RDD,分别对应男性身高和女性身高;最后需要对这两个 RDD 进行排序,进而得到最高和最低的男性或女性身高。 对于第一步,也就是分离男女信息,我们需要使用 filter 算子,过滤条件就是包含”M” 的行是男性,包含”F”的行是女性;第二步我们需要使用 map 算子把男女各自的身高数据从 RDD 中分离出来;第三步我们需要使用 sortBy 算子对男女身高数据进行排序。 c. 编程实现 在实现上,有一个需要注意的点是在 RDD 转化的过程中需要把身高数据转换成整数,否则 sortBy 算子会把它视为字符串,那么排序结果就会受到影响,例如 身高数据如果是:123,110,84,72,100,那么升序排序结果将会是 100,110,123,72,84,显然这是不对的。 清单 8.PeopleInfoCalculator 类源码 ~~~ object PeopleInfoCalculator { def main(args:Array[String]) { if (args.length < 1){ println("Usage:PeopleInfoCalculator datafile") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator") val sc = new SparkContext(conf) val dataFile = sc.textFile(args(0), 5); val maleData = dataFile.filter(line => line.contains("M")).map( line => (line.split(" ")(1) + " " + line.split(" ")(2))) val femaleData = dataFile.filter(line => line.contains("F")).map( line => (line.split(" ")(1) + " " + line.split(" ")(2))) //for debug use //maleData.collect().foreach { x => println(x)} //femaleData.collect().foreach { x => println(x)} val maleHeightData = maleData.map(line => line.split(" ")(1).toInt) val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt) //for debug use //maleHeightData.collect().foreach { x => println(x)} //femaleHeightData.collect().foreach { x => println(x)} val lowestMale = maleHeightData.sortBy(x => x,true).first() val lowestFemale = femaleHeightData.sortBy(x => x,true).first() //for debug use //maleHeightData.collect().sortBy(x => x).foreach { x => println(x)} //femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)} val highestMale = maleHeightData.sortBy(x => x, false).first() val highestFemale = femaleHeightData.sortBy(x => x, false).first() println("Number of Male Peole:" + maleData.count()) println("Number of Female Peole:" + femaleData.count()) println("Lowest Male:" + lowestMale) println("Lowest Female:" + lowestFemale) println("Highest Male:" + highestMale) println("Highest Female:" + highestFemale) } } ~~~ d. 提交到集群执行 在提交该程序到集群执行之前,我们需要将刚才生成的人口信息数据文件上传到 HDFS 集群,具体命令可以参照上文。 清单 9.PeopleInfoCalculator 类的执行命令 ~~~ ./spark-submit \ --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 3g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt ~~~ e. 监控执行状态 对于该实例,如程序中打印的一样,会在控制台显示如下信息: 图 11\. 案例三输出结果 ![图 11\. 案例三输出结果](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bd575c2a.jpg) 在 Spark Web Console 里可以看到具体的执行状态信息 图 12\. 案例三完成状态 ![图 12\. 案例三完成状态](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bd6b42ca.jpg) ### 案例四 a. 案例描述 该案例中我们假设某搜索引擎公司要统计过去一年搜索频率最高的 K 个科技关键词或词组,为了简化问题,我们假设关键词组已经被整理到一个或者多个文本文件中,并且文档具有以下格式。 图 13\. 案例四测试数据格式预览 ![图 13\. 案例四测试数据格式预览](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bd92197a.jpg) 我们可以看到一个关键词或者词组可能出现多次,并且大小写格式可能不一致。 b. 案例分析 要解决这个问题,首先我们需要对每个关键词出现的次数进行计算,在这个过程中需要识别不同大小写的相同单词或者词组,如”Spark”和“spark” 需要被认定为一个单词。对于出现次数统计的过程和 word count 案例类似;其次我们需要对关键词或者词组按照出现的次数进行降序排序,在排序前需要把 RDD 数据元素从 (k,v) 转化成 (v,k);最后取排在最前面的 K 个单词或者词组。 对于第一步,我们需要使用 map 算子对源数据对应的 RDD 数据进行全小写转化并且给词组记一次数,然后调用 reduceByKey 算子计算相同词组的出现次数;第二步我们需要对第一步产生的 RDD 的数据元素用 sortByKey 算子进行降序排序;第三步再对排好序的 RDD 数据使用 take 算子获取前 K 个数据元素。 c. 编程实现 清单 10.TopKSearchKeyWords 类源码 ~~~ import org.apache.spark.SparkConf import org.apache.spark.SparkContext object TopKSearchKeyWords { def main(args:Array[String]){ if (args.length < 2) { println("Usage:TopKSearchKeyWords KeyWordsFile K"); System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words") val sc = new SparkContext(conf) val srcData = sc.textFile(args(0)) val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b) //for debug use //countedData.foreach(x => println(x)) val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false) val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) } topKData.foreach(println) } } ~~~ d. 提交到集群执行 清单 11.TopKSearchKeyWords 类的执行命令 ~~~ ./spark-submit \ --class com.ibm.spark.exercise.basic.TopKSearchKeyWords \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt ~~~ e. 监控执行状态 如果程序成功执行,我们将在控制台看到以下信息。当然读者也可以仿照案例二和案例三那样,自己尝试使用 Scala 写一段小程序生成此案例需要的源数据文件,可以根据您的 HDFS 集群的容量,生成尽可能大的文件,用来测试本案例提供的程序。 图 14\. 案例四输出结果 ![图 14\. 案例四输出结果](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bda5ae90.jpg) 图 15\. 案例四完成状态 ![图 15\. 案例四完成状态](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-09-23_56023bdc5650b.jpg) ## Spark job 的执行流程简介 我们可以发现,Spark 应用程序在提交执行后,控制台会打印很多日志信息,这些信息看起来是杂乱无章的,但是却在一定程度上体现了一个被提交的 Spark job 在集群中是如何被调度执行的,那么在这一节,将会向大家介绍一个典型的 Spark job 是如何被调度执行的。 我们先来了解以下几个概念: **DAG**: 即 Directed Acyclic Graph,有向无环图,这是一个图论中的概念。如果一个[有向图](http://baike.baidu.com/view/807915.htm)无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图。 **Job:**我们知道,Spark 的计算操作是 lazy 执行的,只有当碰到一个动作 (Action) 算子时才会触发真正的计算。一个 Job 就是由动作算子而产生包含一个或多个 Stage 的计算作业。 **Stage**:Job 被确定后,Spark 的调度器 (DAGScheduler) 会根据该计算作业的计算步骤把作业划分成一个或者多个 Stage。Stage 又分为 ShuffleMapStage 和 ResultStage,前者以 shuffle 为输出边界,后者会直接输出结果,其边界可以是获取外部数据,也可以是以一个 ShuffleMapStage 的输出为边界。每一个 Stage 将包含一个 TaskSet。 **TaskSet: **代表一组相关联的没有 shuffle 依赖关系的任务组成任务集。一组任务会被一起提交到更加底层的 TaskScheduler。 **Task**:代表单个数据分区上的最小处理单元。分为 ShuffleMapTask 和 ResultTask。ShuffleMapTask 执行任务并把任务的输出划分到 (基于 task 的对应的数据分区) 多个 bucket(ArrayBuffer) 中,ResultTask 执行任务并把任务的输出发送给驱动程序。 Spark 的作业任务调度是复杂的,需要结合源码来进行较为详尽的分析,但是这已经超过本文的范围,所以这一节我们只是对大致的流程进行分析。 Spark 应用程序被提交后,当某个动作算子触发了计算操作时,SparkContext 会向 DAGScheduler 提交一个作业,接着 DAGScheduler 会根据 RDD 生成的依赖关系划分 Stage,并决定各个 Stage 之间的依赖关系,Stage 之间的依赖关系就形成了 DAG。Stage 的划分是以 ShuffleDependency 为依据的,也就是说当某个 RDD 的运算需要将数据进行 Shuffle 时,这个包含了 Shuffle 依赖关系的 RDD 将被用来作为输入信息,进而构建一个新的 Stage。我们可以看到用这样的方式划分 Stage,能够保证有依赖关系的数据可以以正确的顺序执行。根据每个 Stage 所依赖的 RDD 数据的 partition 的分布,会产生出与 partition 数量相等的 Task,这些 Task 根据 partition 的位置进行分布。其次对于 finalStage 或是 mapStage 会产生不同的 Task,最后所有的 Task 会封装到 TaskSet 内提交到 TaskScheduler 去执行。有兴趣的读者可以通过阅读 DAGScheduler 和 TaskScheduler 的源码获取更详细的执行流程。 ## 结束语 通过本文,相信读者对如何使用 Scala 编写 Spark 应用程序处理大数据已经有了较为深入的了解。当然在处理实际问题时,情况可能比本文举得例子复杂很多,但是解决问题的基本思想是一致的。在碰到实际问题的时候,首先要对源数据结构格式等进行分析,然后确定如何去使用 Spark 提供的算子对数据进行转化,最终根据实际需求选择合适的算子操作数据并计算结果。本文并未介绍其它 Spark 模块的知识,显然这不是一篇文章所能完成的,希望以后会有机会总结更多的 Spark 应用程序开发以及性能调优方面的知识,写成文章与更多的 Spark 技术爱好者分享,一起进步。由于时间仓促并且本人知识水平有限,文章难免有未考虑周全的地方甚至是错误,希望各位朋友不吝赐教。有任何问题,都可以在文末留下您的评论,我会及时回复。 ## 参考资料 * 参考[Spark 官网的编程指导](http://spark.apache.org/docs/1.3.1/programming-guide.html),查看 Spark 各种转换和动作算子的描述。 * 查看[Scala 官网](http://www.scala-lang.org/),了解更多关于 Scala 语言的内容。 * 查看文章“[Spark 之任务调度](http://www.aboutyun.com/thread-8548-1-1.html)”,确认并加深对于 Spark 任务调度原理的理解。 * [developerWorks 开源技术主题](http://www.ibm.com/developerworks/cn/opensource/):查找丰富的操作信息、工具和项目更新,帮助您掌握开源技术并将其用于 IBM 产品。
';