1.2 更多的RDD操作
最后更新于:2022-04-01 21:41:24
RDD的transformation和action可以组成起来完成复杂的计算。 比如查找包含最多单词的一行:
~~~
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
~~~
第一步map一行包含的单词数到一个整数, 第二步调用reduce得到最大的单词数。map和reduce的参数都是lambda表达式(closures), 可以调用 Scala/Java库. 例如我们很容易的调用在其它地方声明的方法。 这里我们使用`Math.max()`函数简化代码:
~~~
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
~~~
一个通用的数据流模式就是MapReduce,在Hadoop中相当流行. Spark实现MapReduce流很容易:
~~~
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
~~~
此处我们使用flatMap, map 和 reduceByKey转换来计算文件中每个单词的频度。 为了收集单词频度结果,我们可以调用collect action:
~~~
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
~~~
';