3 深入了解

最后更新于:2022-04-01 21:41:31

亲爱的读者,恭喜你运行了你的第一个Spark应用程序! 你肯定不仅仅满足于此,以下是更多的深入学习的资料: * 深度学习API和其它组件, 请参照[Spark开发指南](https://spark.apache.org/docs/latest/programming-guide.html) * 学习在集群中运行程序,访问 [发布概览](https://spark.apache.org/docs/latest/cluster-overview.html). * 最后, Spark发布包中的examples文件夹下包含几个例子 ([Scala](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples), [Java](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples), [Python](https://github.com/apache/spark/tree/master/examples/src/main/python)). 你可以运行它们: ~~~ # For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py ~~~ 翻译自 [Quick Start](https://spark.apache.org/docs/latest/quick-start.html)
';

2 独立应用

最后更新于:2022-04-01 21:41:29

下面我们想说一下怎样使用Spark API编写一个独立的应用程序。 这里使用Scala (SBT构建工具)和Java举例。 (Python官方文档中有,译者未翻译) ~~~ /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } } ~~~ 这个程序统计Spark README文件中包含字符`a`和`b`的行数。 注意你需要用你实际的Spark路径替换 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我们初始化一个SparkContext 作为程序的一部分. 我们将一个SparkConf对象传给SparkContext的构造函数, 它包含了我们程序的信息。 我们的程序依赖Spark API,所以我们包含一个sbt配置文件:simple.sbt 指明Spark是一个依赖, 这个文件也增加了Spark依赖的仓库(repository): ~~~ name := "Simple Project" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1" ~~~ 为了保证sbt工作正常,我们需要将SimpleApp.scala和simple.sbt放入典型的sbt项目布局的文件夹中。 如此一来我们将应用代码可以打包成一个jar文件, 然后使用spark-submit脚本来运行此程序。 ~~~ # Your directory layout should look like this $ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.10/simple-project_2.10-1.0.jar ... Lines with a: 46, Lines with b: 23 ~~~ 或者使用Java ~~~ /* SimpleApp.java */ import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; public class SimpleApp { public static void main(String[] args) { String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new Function() { public Boolean call(String s) { return s.contains("a"); } }).count(); long numBs = logData.filter(new Function() { public Boolean call(String s) { return s.contains("b"); } }).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); } } ~~~ 这个程序统计Spark README文件中包含字符`a`和`b`的行数。. 注意你需要用你实际的Spark路径替换 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我们需要一个JavaSparkContext对象. 我们也创建了RDD (JavaRDD)然后运行transformations. 最后我们传递给Spark一个function对象, 这个function对象是一个匿名类,继承于 spark.api.java.function.Function. Spark开发指南描述了细节. (译者注: 这是Java 7的语法, 通过Java 8 Lambda表达式,上面的代码和scala一样的简化) 为了编译此程序,我们需要写一个Maven pom.xml文件, 增加Spark作为依赖. 注意Spark artifact带有Scala的版本. ~~~ project> groupId>edu.berkeleygroupId> artifactId>simple-projectartifactId> modelVersion>4.0.0modelVersion> name>Simple Projectname> packaging>jarpackaging> version>1.0version> dependencies> dependency> groupId>org.apache.sparkgroupId> artifactId>spark-core_2.10artifactId> version>1.1.1version> dependency> dependencies> project> ~~~ 使用Maven项目的布局: ~~~ $ find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java ~~~ 现在,我们使用Maven打包并使用./bin/spark-submit执行此程序. ~~~ # Package a jar containing your application $ mvn package ... [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23 ~~~
';

1.3 缓存

最后更新于:2022-04-01 21:41:26

Spark也支持将数据集放入集群的内存中缓存起来. 当数据重复访问时特别有用, 比如查询一个小的 “hot”数据集或者运行一个交互式算法PageRank. 看一个简单的例子, 我们把上面的linesWithSpark数据集缓存起来: ~~~ scala> linesWithSpark.cache() res7: spark.RDD[String] = spark.FilteredRDD@17e51082 scala> linesWithSpark.count() res8: Long = 15 scala> linesWithSpark.count() res9: Long = 15 ~~~ 当然使用Spark缓存一个100行的文本文件看起来有些傻,我们只是做个示范。 你可以将它用在非常大的数据集上,即使它们可能横跨几十甚至上百个节点。你也可以使用bin/spark-shell交互式实现此功能, 就像开发指南中描述的那样。
';

1.2 更多的RDD操作

最后更新于:2022-04-01 21:41:24

RDD的transformation和action可以组成起来完成复杂的计算。 比如查找包含最多单词的一行: ~~~ scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15 ~~~ 第一步map一行包含的单词数到一个整数, 第二步调用reduce得到最大的单词数。map和reduce的参数都是lambda表达式(closures), 可以调用 Scala/Java库. 例如我们很容易的调用在其它地方声明的方法。 这里我们使用`Math.max()`函数简化代码: ~~~ scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15 ~~~ 一个通用的数据流模式就是MapReduce,在Hadoop中相当流行. Spark实现MapReduce流很容易: ~~~ scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 ~~~ 此处我们使用flatMap, map 和 reduceByKey转换来计算文件中每个单词的频度。 为了收集单词频度结果,我们可以调用collect action: ~~~ scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...) ~~~
';

1.1 基本操作

最后更新于:2022-04-01 21:41:22

Spark shell提供了一个简单方式去学习API,它也是一个交互式分析数据的强大工具。 你既可以使用Scala(运行在JVM之上,所以可以使用众多的Java库),也可以使用Python。运行Spark文件夹下的的命令: ~~~ ./bin/spark-shell ~~~ Spark最主要的一个抽象出来的概念就是分布式的数据集合, 也就是弹性分布式数据集Resilient Distributed Dataset (RDD). RDD可以从Hadoop InputFormats (比如HDFS文件)创建, 也可以通过其它RDD转换(transforming)得到。 让我们从Spark源代码文件夹下的README文件创建一个RDD: ~~~ scala> val textFile = sc.textFile("README.md") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 ~~~ RDD包含[action](https://spark.apache.org/docs/latest/programming-guide.html#actions),可以返回数据, 也包含[transformation](https://spark.apache.org/docs/latest/programming-guide.html#transformations),返回新的RDD的指针。 先看一些action的例子: ~~~ scala> textFile.count() // 此RDD中的item的数量 res0: Long = 126 scala> textFile.first() // 此RDD第一个item res1: String = # Apache Spark ~~~ 现在再看一个转换的例子。我们使用`filter`返回一个新的RDD, 新的RDD是文件中item的一个子集。 ~~~ scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09 ~~~ 将transformation和action串起来: ~~~ scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15 ~~~
';

1 使用Spark进行交互式分析

最后更新于:2022-04-01 21:41:20

';

0 关于

最后更新于:2022-04-01 21:41:17

> 本文转自:[鸟窝](http://colobu.com/2014/12/08/spark-quick-start/) > 本教程翻译时的Spark版本为1.1.1 本教程快速介绍了Spark的使用。 首先我们介绍了通过Spark 交互式shell调用API( Python或者scala代码),然后演示如何使用Java, Scala或者Python编写独立程序。 你可以查看[Spark编程指南](http://colobu.com/2014/12/08/spark-programming-guide)了解完整的参考。 开始下面的快速入门之前,首先需要到[Spark网站](https://spark.apache.org/)下载一份打包好的spark。 既然本教程中我们不使用HDFS,你可以随便下载一个适配任何Hadoop的版本的Spark。
';