Spark配置
最后更新于:2022-04-01 22:20:16
# Spark配置
Spark提供三个位置用来配置系统:
- Spark properties控制大部分的应用程序参数,可以用SparkConf对象或者java系统属性设置
- Environment variables可以通过每个节点的`conf/spark-env.sh`脚本设置每台机器的设置。例如IP地址
- Logging可以通过log4j.properties配置
### Spark属性
Spark属性控制大部分的应用程序设置,并且为每个应用程序分别配置它。这些属性可以直接在[SparkConf](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf)上配置,然后传递给`SparkContext`。`SparkConf`允许你配置一些通用的属性(如master URL、应用程序明)以及通过`set()`方法设置的任意键值对。例如,我们可以用如下方式创建一个拥有两个线程的应用程序。注意,我们用`local[2]`运行,这意味着两个线程-表示最小的并行度,它可以帮助我们检测当在分布式环境下运行的时才出现的错误。
~~~
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
.set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
~~~
注意,我们在本地模式中拥有超过1个线程。和Spark Streaming的情况一样,我们可能需要一个线程防止任何形式的饥饿问题。
### 动态加载Spark属性
在一些情况下,你可能想在`SparkConf`中避免硬编码确定的配置。例如,你想用不同的master或者不同的内存数运行相同的应用程序。Spark允许你简单地创建一个空conf。
~~~
val sc = new SparkContext(new SparkConf())
~~~
然后你在运行时提供值。
~~~
./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
~~~
Spark shell和`spark-submit`工具支持两种方式动态加载配置。第一种方式是命令行选项,例如`--master`,如上面shell显示的那样。`spark-submit`可以接受任何Spark属性,用`--conf`标记表示。但是那些参与Spark应用程序启动的属性要用特定的标记表示。运行`./bin/spark-submit --help`将会显示选项的整个列表。
`bin/spark-submit`也会从`conf/spark-defaults.conf`中读取配置选项,这个配置文件中,每一行都包含一对以空格分开的键和值。例如:
~~~
spark.master spark://5.6.7.8:7077
spark.executor.memory 512m
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
~~~
任何标签(flags)指定的值或者在配置文件中的值将会传递给应用程序,并且通过`SparkConf`合并这些值。在`SparkConf`上设置的属性具有最高的优先级,其次是传递给`spark-submit`或者`spark-shell`的属性值,最后是`spark-defaults.conf`文件中的属性值。
### 查看Spark属性
在`http://:4040`上的应用程序web UI在“Environment”标签中列出了所有的Spark属性。这对你确保设置的属性的正确性是很有用的。注意,只有通过spark-defaults.conf, SparkConf以及命令行直接指定的值才会显示。对于其它的配置属性,你可以认为程序用到了默认的值。
### 可用的属性
控制内部设置的大部分属性都有合理的默认值,一些最通用的选项设置如下:
#### 应用程序属性
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.app.name | (none) | 你的应用程序的名字。这将在UI和日志数据中出现 |
| spark.master | (none) | 集群管理器连接的地方 |
| spark.executor.memory | 512m | 每个executor进程使用的内存数。和JVM内存串拥有相同的格式(如512m,2g) |
| spark.driver.memory | 512m | driver进程使用的内存数 |
| spark.driver.maxResultSize | 1g | 每个Spark action(如collect)所有分区的序列化结果的总大小限制。设置的值应该不小于1m,0代表没有限制。如果总大小超过这个限制,工作将会终止。大的限制值可能导致driver出现内存溢出错误(依赖于spark.driver.memory和JVM中对象的内存消耗)。设置合理的限制,可以避免出现内存溢出错误。 |
| spark.serializer | org.apache.spark.serializer.JavaSerializer | 序列化对象使用的类。默认的java序列化类可以序列化任何可序列化的java对象但是它很慢。所有我们建议用[org.apache.spark.serializer.KryoSerializer](http://spark.apache.org/docs/latest/tuning.html) |
| spark.kryo.classesToRegister | (none) | 如果你用Kryo序列化,给定的用逗号分隔的自定义类名列表表示要注册的类 |
| spark.kryo.registrator | (none) | 如果你用Kryo序列化,设置这个类去注册你的自定义类。如果你需要用自定义的方式注册你的类,那么这个属性是有用的。否则`spark.kryo.classesToRegister`会更简单。它应该设置一个继承自[KryoRegistrator](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.serializer.KryoRegistrator)的类 |
| spark.local.dir | /tmp | Spark中暂存空间的使用目录。在Spark1.0以及更高的版本中,这个属性被SPARK_LOCAL_DIRS(Standalone, Mesos)和LOCAL_DIRS(YARN)环境变量覆盖。 |
| spark.logConf | false | 当SparkContext启动时,将有效的SparkConf记录为INFO。 |
#### 运行环境
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.executor.extraJavaOptions | (none) | 传递给executors的JVM选项字符串。例如GC设置或者其它日志设置。注意,在这个选项中设置Spark属性或者堆大小是不合法的。Spark属性需要用SparkConf对象或者`spark-submit`脚本用到的`spark-defaults.conf`文件设置。堆内存可以通过`spark.executor.memory`设置 |
| spark.executor.extraClassPath | (none) | 附加到executors的classpath的额外的classpath实体。这个设置存在的主要目的是Spark与旧版本的向后兼容问题。用户一般不用设置这个选项 |
| spark.executor.extraLibraryPath | (none) | 指定启动executor的JVM时用到的库路径 |
| spark.executor.logs.rolling.strategy | (none) | 设置executor日志的滚动(rolling)策略。默认情况下没有开启。可以配置为`time`(基于时间的滚动)和`size`(基于大小的滚动)。对于`time`,用`spark.executor.logs.rolling.time.interval`设置滚动间隔;对于`size`,用`spark.executor.logs.rolling.size.maxBytes`设置最大的滚动大小 |
| spark.executor.logs.rolling.time.interval | daily | executor日志滚动的时间间隔。默认情况下没有开启。合法的值是`daily`, `hourly`, `minutely`以及任意的秒。 |
| spark.executor.logs.rolling.size.maxBytes | (none) | executor日志的最大滚动大小。默认情况下没有开启。值设置为字节 |
| spark.executor.logs.rolling.maxRetainedFiles | (none) | 设置被系统保留的最近滚动日志文件的数量。更老的日志文件将被删除。默认没有开启。 |
| spark.files.userClassPathFirst | false | (实验性)当在Executors中加载类时,是否用户添加的jar比Spark自己的jar优先级高。这个属性可以降低Spark依赖和用户依赖的冲突。它现在还是一个实验性的特征。 |
| spark.python.worker.memory | 512m | 在聚合期间,每个python worker进程使用的内存数。在聚合期间,如果内存超过了这个限制,它将会将数据塞进磁盘中 |
| spark.python.profile | false | 在Python worker中开启profiling。通过`sc.show_profiles()`展示分析结果。或者在driver退出前展示分析结果。可以通过`sc.dump_profiles(path)`将结果dump到磁盘中。如果一些分析结果已经手动展示,那么在driver退出前,它们再不会自动展示 |
| spark.python.profile.dump | (none) | driver退出前保存分析结果的dump文件的目录。每个RDD都会分别dump一个文件。可以通过`ptats.Stats()`加载这些文件。如果指定了这个属性,分析结果不会自动展示 |
| spark.python.worker.reuse | true | 是否重用python worker。如果是,它将使用固定数量的Python workers,而不需要为每个任务fork()一个Python进程。如果有一个非常大的广播,这个设置将非常有用。因为,广播不需要为每个任务从JVM到Python worker传递一次 |
| spark.executorEnv.[EnvironmentVariableName] | (none) | 通过`EnvironmentVariableName`添加指定的环境变量到executor进程。用户可以指定多个`EnvironmentVariableName`,设置多个环境变量 |
| spark.mesos.executor.home | driver side SPARK_HOME | 设置安装在Mesos的executor上的Spark的目录。默认情况下,executors将使用driver的Spark本地(home)目录,这个目录对它们不可见。注意,如果没有通过`spark.executor.uri`指定Spark的二进制包,这个设置才起作用 |
| spark.mesos.executor.memoryOverhead | executor memory * 0.07, 最小384m | 这个值是`spark.executor.memory`的补充。它用来计算mesos任务的总内存。另外,有一个7%的硬编码设置。最后的值将选择`spark.mesos.executor.memoryOverhead`或者`spark.executor.memory`的7%二者之间的大者 |
#### Shuffle行为(Behavior)
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.shuffle.consolidateFiles | false | 如果设置为"true",在shuffle期间,合并的中间文件将会被创建。创建更少的文件可以提供文件系统的shuffle的效率。这些shuffle都伴随着大量递归任务。当用ext4和dfs文件系统时,推荐设置为"true"。在ext3中,因为文件系统的限制,这个选项可能机器(大于8核)降低效率 |
| spark.shuffle.spill | true | 如果设置为"true",通过将多出的数据写入磁盘来限制内存数。通过`spark.shuffle.memoryFraction`来指定spilling的阈值 |
| spark.shuffle.spill.compress | true | 在shuffle时,是否将spilling的数据压缩。压缩算法通过`spark.io.compression.codec`指定。 |
| spark.shuffle.memoryFraction | 0.2 | 如果`spark.shuffle.spill`为“true”,shuffle中聚合和合并组操作使用的java堆内存占总内存的比重。在任何时候,shuffles使用的所有内存内maps的集合大小都受这个限制的约束。超过这个限制,spilling数据将会保存到磁盘上。如果spilling太过频繁,考虑增大这个值 |
| spark.shuffle.compress | true | 是否压缩map操作的输出文件。一般情况下,这是一个好的选择。 |
| spark.shuffle.file.buffer.kb | 32 | 每个shuffle文件输出流内存内缓存的大小,单位是kb。这个缓存减少了创建只中间shuffle文件中磁盘搜索和系统访问的数量 |
| spark.reducer.maxMbInFlight | 48 | 从递归任务中同时获取的map输出数据的最大大小(mb)。因为每一个输出都需要我们创建一个缓存用来接收,这个设置代表每个任务固定的内存上限,所以除非你有更大的内存,将其设置小一点 |
| spark.shuffle.manager | sort | 它的实现用于shuffle数据。有两种可用的实现:`sort`和`hash`。基于sort的shuffle有更高的内存使用率 |
| spark.shuffle.sort.bypassMergeThreshold | 200 | (Advanced) In the sort-based shuffle manager, avoid merge-sorting data if there is no map-side aggregation and there are at most this many reduce partitions |
| spark.shuffle.blockTransferService | netty | 实现用来在executor直接传递shuffle和缓存块。有两种可用的实现:`netty`和`nio`。基于netty的块传递在具有相同的效率情况下更简单 |
#### Spark UI
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.ui.port | 4040 | 你的应用程序dashboard的端口。显示内存和工作量数据 |
| spark.ui.retainedStages | 1000 | 在垃圾回收之前,Spark UI和状态API记住的stage数 |
| spark.ui.retainedJobs | 1000 | 在垃圾回收之前,Spark UI和状态API记住的job数 |
| spark.ui.killEnabled | true | 运行在web UI中杀死stage和相应的job |
| spark.eventLog.enabled | false | 是否记录Spark的事件日志。这在应用程序完成后,重新构造web UI是有用的 |
| spark.eventLog.compress | false | 是否压缩事件日志。需要`spark.eventLog.enabled`为true |
| spark.eventLog.dir | file:///tmp/spark-events | Spark事件日志记录的基本目录。在这个基本目录下,Spark为每个应用程序创建一个子目录。各个应用程序记录日志到直到的目录。用户可能想设置这为统一的地点,像HDFS一样,所以历史文件可以通过历史服务器读取 |
#### 压缩和序列化
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.broadcast.compress | true | 在发送广播变量之前是否压缩它 |
| spark.rdd.compress | true | 是否压缩序列化的RDD分区。在花费一些额外的CPU时间的同时节省大量的空间 |
| spark.io.compression.codec | snappy | 压缩诸如RDD分区、广播变量、shuffle输出等内部数据的编码解码器。默认情况下,Spark提供了三种选择:lz4, lzf和snappy。你也可以用完整的类名来制定。`org.apache.spark.io.LZ4CompressionCodec`,`org.apache.spark.io.LZFCompressionCodec`,`org.apache.spark.io.SnappyCompressionCodec` |
| spark.io.compression.snappy.block.size | 32768 | Snappy压缩中用到的块大小。降低这个块的大小也会降低shuffle内存使用率 |
| spark.io.compression.lz4.block.size | 32768 | LZ4压缩中用到的块大小。降低这个块的大小也会降低shuffle内存使用率 |
| spark.closure.serializer | org.apache.spark.serializer.JavaSerializer | 闭包用到的序列化类。目前只支持java序列化器 |
| spark.serializer.objectStreamReset | 100 | 当用`org.apache.spark.serializer.JavaSerializer`序列化时,序列化器通过缓存对象防止写多余的数据,然而这会造成这些对象的垃圾回收停止。通过请求'reset',你从序列化器中flush这些信息并允许收集老的数据。为了关闭这个周期性的reset,你可以将值设为-1。默认情况下,每一百个对象reset一次 |
| spark.kryo.referenceTracking | true | 当用Kryo序列化时,跟踪是否引用同一对象。如果你的对象图有环,这是必须的设置。如果他们包含相同对象的多个副本,这个设置对效率是有用的。如果你知道不在这两个场景,那么可以禁用它以提高效率 |
| spark.kryo.registrationRequired | false | 是否需要注册为Kyro可用。如果设置为true,然后如果一个没有注册的类序列化,Kyro会抛出异常。如果设置为false,Kryo将会同时写每个对象和其非注册类名。写类名可能造成显著地性能瓶颈。 |
| spark.kryoserializer.buffer.mb | 0.064 | Kyro序列化缓存的大小。这样worker上的每个核都有一个缓存。如果有需要,缓存会涨到`spark.kryoserializer.buffer.max.mb`设置的值那么大。 |
| spark.kryoserializer.buffer.max.mb | 64 | Kryo序列化缓存允许的最大值。这个值必须大于你尝试序列化的对象 |
#### Networking
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.driver.host | (local hostname) | driver监听的主机名或者IP地址。这用于和executors以及独立的master通信 |
| spark.driver.port | (random) | driver监听的接口。这用于和executors以及独立的master通信 |
| spark.fileserver.port | (random) | driver的文件服务器监听的端口 |
| spark.broadcast.port | (random) | driver的HTTP广播服务器监听的端口 |
| spark.replClassServer.port | (random) | driver的HTTP类服务器监听的端口 |
| spark.blockManager.port | (random) | 块管理器监听的端口。这些同时存在于driver和executors |
| spark.executor.port | (random) | executor监听的端口。用于与driver通信 |
| spark.port.maxRetries | 16 | 当绑定到一个端口,在放弃前重试的最大次数 |
| spark.akka.frameSize | 10 | 在"control plane"通信中允许的最大消息大小。如果你的任务需要发送大的结果到driver中,调大这个值 |
| spark.akka.threads | 4 | 通信的actor线程数。当driver有很多CPU核时,调大它是有用的 |
| spark.akka.timeout | 100 | Spark节点之间的通信超时。单位是s |
| spark.akka.heartbeat.pauses | 6000 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart beat pause in seconds for akka. This can be used to control sensitivity to gc pauses. Tune this in combination of `spark.akka.heartbeat.interval` and `spark.akka.failure-detector.threshold` if you need to. |
| spark.akka.failure-detector.threshold | 300.0 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). This maps to akka's `akka.remote.transport-failure-detector.threshold`. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.heartbeat.interval` if you need to. |
| spark.akka.heartbeat.interval | 1000 | This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` and `spark.akka.failure-detector.threshold` if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real Spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those. |
#### Security
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.authenticate | false | 是否Spark验证其内部连接。如果不是运行在YARN上,请看`spark.authenticate.secret` |
| spark.authenticate.secret | None | 设置Spark两个组件之间的密匙验证。如果不是运行在YARN上,但是需要验证,这个选项必须设置 |
| spark.core.connection.auth.wait.timeout | 30 | 连接时等待验证的实际。单位为秒 |
| spark.core.connection.ack.wait.timeout | 60 | 连接等待回答的时间。单位为秒。为了避免不希望的超时,你可以设置更大的值 |
| spark.ui.filters | None | 应用到Spark web UI的用于过滤类名的逗号分隔的列表。过滤器必须是标准的[javax servlet Filter](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html)。通过设置java系统属性也可以指定每个过滤器的参数。`spark..params='param1=value1,param2=value2'`。例如`-Dspark.ui.filters=com.test.filter1`、`-Dspark.com.test.filter1.params='param1=foo,param2=testing'` |
| spark.acls.enable | false | 是否开启Spark acls。如果开启了,它检查用户是否有权限去查看或修改job。 Note this requires the user to be known, so if the user comes across as null no checks are done。UI利用使用过滤器验证和设置用户 |
| spark.ui.view.acls | empty | 逗号分隔的用户列表,列表中的用户有查看(view)Spark web UI的权限。默认情况下,只有启动Spark job的用户有查看权限 |
| spark.modify.acls | empty | 逗号分隔的用户列表,列表中的用户有修改Spark job的权限。默认情况下,只有启动Spark job的用户有修改权限 |
| spark.admin.acls | empty | 逗号分隔的用户或者管理员列表,列表中的用户或管理员有查看和修改所有Spark job的权限。如果你运行在一个共享集群,有一组管理员或开发者帮助debug,这个选项有用 |
#### Spark Streaming
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.streaming.blockInterval | 200 | 在这个时间间隔(ms)内,通过Spark Streaming receivers接收的数据在保存到Spark之前,chunk为数据块。推荐的最小值为50ms |
| spark.streaming.receiver.maxRate | infinite | 每秒钟每个receiver将接收的数据的最大记录数。有效的情况下,每个流将消耗至少这个数目的记录。设置这个配置为0或者-1将会不作限制 |
| spark.streaming.receiver.writeAheadLogs.enable | false | Enable write ahead logs for receivers. All the input data received through receivers will be saved to write ahead logs that will allow it to be recovered after driver failures |
| spark.streaming.unpersist | true | 强制通过Spark Streaming生成并持久化的RDD自动从Spark内存中非持久化。通过Spark Streaming接收的原始输入数据也将清除。设置这个属性为false允许流应用程序访问原始数据和持久化RDD,因为它们没有被自动清除。但是它会造成更高的内存花费 |
### 环境变量
通过环境变量配置确定的Spark设置。环境变量从Spark安装目录下的`conf/spark-env.sh`脚本读取(或者windows的`conf/spark-env.cmd`)。在独立的或者Mesos模式下,这个文件可以给机器确定的信息,如主机名。当运行本地应用程序或者提交脚本时,它也起作用。
注意,当Spark安装时,`conf/spark-env.sh`默认是不存在的。你可以复制`conf/spark-env.sh.template`创建它。
可以在`spark-env.sh`中设置如下变量:
| Environment Variable | Meaning |
|-----|-----|
| JAVA_HOME | java安装的路径 |
| PYSPARK_PYTHON | PySpark用到的Python二进制执行文件路径 |
| SPARK_LOCAL_IP | 机器绑定的IP地址 |
| SPARK_PUBLIC_DNS | 你Spark应用程序通知给其他机器的主机名 |
除了以上这些,Spark [standalone cluster scripts](http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts)也可以设置一些选项。例如每台机器使用的核数以及最大内存。
因为`spark-env.sh`是shell脚本,其中的一些可以以编程方式设置。例如,你可以通过特定的网络接口计算`SPARK_LOCAL_IP`。
### 配置Logging
Spark用[log4j](http://logging.apache.org/log4j/) logging。你可以通过在conf目录下添加`log4j.properties`文件来配置。一种方法是复制`log4j.properties.template`文件。
';
在yarn上运行Spark
最后更新于:2022-04-01 22:20:13
# 在YARN上运行Spark
### 配置
大部分为`Spark on YARN`模式提供的配置与其它部署模式提供的配置相同。下面这些是为`Spark on YARN`模式提供的配置。
### Spark属性
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.yarn.applicationMaster.waitTries | 10 | ApplicationMaster等待Spark master的次数以及SparkContext初始化尝试的次数 |
| spark.yarn.submit.file.replication | HDFS默认的复制次数(3) | 上传到HDFS的文件的HDFS复制水平。这些文件包括Spark jar、app jar以及任何分布式缓存文件/档案 |
| spark.yarn.preserve.staging.files | false | 设置为true,则在作业结束时保留阶段性文件(Spark jar、app jar以及任何分布式缓存文件)而不是删除它们 |
| spark.yarn.scheduler.heartbeat.interval-ms | 5000 | Spark application master给YARN ResourceManager发送心跳的时间间隔(ms) |
| spark.yarn.max.executor.failures | numExecutors * 2,最小为3 | 失败应用程序之前最大的执行失败数 |
| spark.yarn.historyServer.address | (none) | Spark历史服务器(如host.com:18080)的地址。这个地址不应该包含一个模式([http://)。默认情况下没有设置值,这是因为该选项是一个可选选项。当Spark应用程序完成从ResourceManager](http://)。默认情况下没有设置值,这是因为该选项是一个可选选项。当Spark应用程序完成从ResourceManager) UI到Spark历史服务器UI的连接时,这个地址从YARN ResourceManager得到 |
| spark.yarn.dist.archives | (none) | 提取逗号分隔的档案列表到每个执行器的工作目录 |
| spark.yarn.dist.files | (none) | 放置逗号分隔的文件列表到每个执行器的工作目录 |
| spark.yarn.executor.memoryOverhead | executorMemory * 0.07,最小384 | 分配给每个执行器的堆内存大小(以MB为单位)。它是VM开销、interned字符串或者其它本地开销占用的内存。这往往随着执行器大小而增长。(典型情况下是6%-10%) |
| spark.yarn.driver.memoryOverhead | driverMemory * 0.07,最小384 | 分配给每个driver的堆内存大小(以MB为单位)。它是VM开销、interned字符串或者其它本地开销占用的内存。这往往随着执行器大小而增长。(典型情况下是6%-10%) |
| spark.yarn.queue | default | 应用程序被提交到的YARN队列的名称 |
| spark.yarn.jar | (none) | Spark jar文件的位置,覆盖默认的位置。默认情况下,Spark on YARN将会用到本地安装的Spark jar。但是Spark jar也可以HDFS中的一个公共位置。这允许YARN缓存它到节点上,而不用在每次运行应用程序时都需要分配。指向HDFS中的jar包,可以这个参数为"hdfs:///some/path" |
| spark.yarn.access.namenodes | (none) | 你的Spark应用程序访问的HDFS namenode列表。例如,`spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032`,Spark应用程序必须访问namenode列表,Kerberos必须正确配置来访问它们。Spark获得namenode的安全令牌,这样Spark应用程序就能够访问这些远程的HDFS集群。 |
| spark.yarn.containerLauncherMaxThreads | 25 | 为了启动执行者容器,应用程序master用到的最大线程数 |
| spark.yarn.appMasterEnv.[EnvironmentVariableName] | (none) | 添加通过`EnvironmentVariableName`指定的环境变量到Application Master处理YARN上的启动。用户可以指定多个该设置,从而设置多个环境变量。在yarn-cluster模式下,这控制Spark driver的环境。在yarn-client模式下,这仅仅控制执行器启动者的环境。 |
### 在YARN上启动Spark
确保`HADOOP_CONF_DIR`或`YARN_CONF_DIR`指向的目录包含Hadoop集群的(客户端)配置文件。这些配置用于写数据到dfs和连接到YARN ResourceManager。
有两种部署模式可以用来在YARN上启动Spark应用程序。在yarn-cluster模式下,Spark driver运行在application master进程中,这个进程被集群中的YARN所管理,客户端会在初始化应用程序之后关闭。在yarn-client模式下,driver运行在客户端进程中,application master仅仅用来向YARN请求资源。
和Spark单独模式以及Mesos模式不同,在这些模式中,master的地址由"master"参数指定,而在YARN模式下,ResourceManager的地址从Hadoop配置得到。因此master参数是简单的`yarn-client`和`yarn-cluster`。
在yarn-cluster模式下启动Spark应用程序。
~~~
./bin/spark-submit --class path.to.your.Class --master yarn-cluster [options] [app options]
~~~
例子:
~~~
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--num-executors 3 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10
~~~
以上启动了一个YARN客户端程序用来启动默认的 Application Master,然后SparkPi会作为Application Master的子线程运行。客户端会定期的轮询Application Master用于状态更新并将更新显示在控制台上。一旦你的应用程序运行完毕,客户端就会退出。
在yarn-client模式下启动Spark应用程序,运行下面的shell脚本
~~~
$ ./bin/spark-shell --master yarn-client
~~~
### 添加其它的jar
在yarn-cluster模式下,driver运行在不同的机器上,所以离开了保存在本地客户端的文件,`SparkContext.addJar`将不会工作。为了使`SparkContext.addJar`用到保存在客户端的文件,在启动命令中加上`--jars`选项。
~~~
$ ./bin/spark-submit --class my.main.Class \
--master yarn-cluster \
--jars my-other-jar.jar,my-other-other-jar.jar
my-main-jar.jar
app_arg1 app_arg2
~~~
### 注意事项
- 在Hadoop 2.2之前,YARN不支持容器核的资源请求。因此,当运行早期的版本时,通过命令行参数指定的核的数量无法传递给YARN。在调度决策中,核请求是否兑现取决于用哪个调度器以及如何配置调度器。
- Spark executors使用的本地目录将会是YARN配置(yarn.nodemanager.local-dirs)的本地目录。如果用户指定了`spark.local.dir`,它将被忽略。
- `--files`和`--archives`选项支持指定带 * # * 号文件名。例如,你能够指定`--files localtest.txt#appSees.txt`,它上传你在本地命名为`localtest.txt`的文件到HDFS,但是将会链接为名称`appSees.txt`。当你的应用程序运行在YARN上时,你应该使用`appSees.txt`去引用该文件。
- 如果你在yarn-cluster模式下运行`SparkContext.addJar`,并且用到了本地文件, `--jars`选项允许`SparkContext.addJar`函数能够工作。如果你正在使用 HDFS, HTTP, HTTPS或FTP,你不需要用到该选项
';
独立运行Spark
最后更新于:2022-04-01 22:20:11
# Spark独立部署模式
### 安装Spark独立模式集群
安装Spark独立模式,你只需要将Spark的编译版本简单的放到集群的每个节点。你可以获得每个稳定版本的预编译版本,也可以自己编译。
### 手动启动集群
你能够通过下面的方式启动独立的master服务器。
~~~
./sbin/start-master.sh
~~~
一旦启动,master将会为自己打印出`spark://HOST:PORT` URL,你能够用它连接到workers或者作为"master"参数传递给`SparkContext`。你也可以在master web UI上发现这个URL,master web UI默认的地址是`http://localhost:8080`。
相同的,你也可以启动一个或者多个workers或者将它们连接到master。
~~~
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
~~~
一旦你启动了一个worker,查看master web UI。你可以看到新的节点列表以及节点的CPU数以及内存。
下面的配置参数可以传递给master和worker。
| Argument | Meaning |
|-----|-----|
| -h HOST, --host HOST | 监听的主机名 |
| -i HOST, --ip HOST | 同上,已经被淘汰 |
| -p PORT, --port PORT | 监听的服务的端口(master默认是7077,worker随机) |
| --webui-port PORT | web UI的端口(master默认是8080,worker默认是8081) |
| -c CORES, --cores CORES | Spark应用程序可以使用的CPU核数(默认是所有可用);这个选项仅在worker上可用 |
| -m MEM, --memory MEM | Spark应用程序可以使用的内存数(默认情况是你的机器内存数减去1g);这个选项仅在worker上可用 |
| -d DIR, --work-dir DIR | 用于暂存空间和工作输出日志的目录(默认是SPARK_HOME/work);这个选项仅在worker上可用 |
| --properties-file FILE | 自定义的Spark配置文件的加载目录(默认是conf/spark-defaults.conf) |
### 集群启动脚本
为了用启动脚本启动Spark独立集群,你应该在你的Spark目录下建立一个名为`conf/slaves`的文件,这个文件必须包含所有你要启动的Spark worker所在机器的主机名,一行一个。如果`conf/slaves`不存在,启动脚本默认为单个机器(localhost),这台机器对于测试是有用的。注意,master机器通过ssh访问所有的worker。在默认情况下,SSH是并行运行,需要设置无密码(采用私有密钥)的访问。如果你没有设置为无密码访问,你可以设置环境变量`SPARK_SSH_FOREGROUND`,为每个worker提供密码。
一旦你设置了这个文件,你就可以通过下面的shell脚本启动或者停止你的集群。
- sbin/start-master.sh:在机器上启动一个master实例
- sbin/start-slaves.sh:在每台机器上启动一个slave实例
- sbin/start-all.sh:同时启动一个master实例和所有slave实例
- sbin/stop-master.sh:停止master实例
- sbin/stop-slaves.sh:停止所有slave实例
- sbin/stop-all.sh:停止master实例和所有slave实例
注意,这些脚本必须在你的Spark master运行的机器上执行,而不是在你的本地机器上面。
你可以在`conf/spark-env.sh`中设置环境变量进一步配置集群。利用`conf/spark-env.sh.template`创建这个文件,然后将它复制到所有的worker机器上使设置有效。下面的设置可以起作用:
| Environment Variable | Meaning |
|-----|-----|
| SPARK_MASTER_IP | 绑定master到一个指定的ip地址 |
| SPARK_MASTER_PORT | 在不同的端口上启动master(默认是7077) |
| SPARK_MASTER_WEBUI_PORT | master web UI的端口(默认是8080) |
| SPARK_MASTER_OPTS | 应用到master的配置属性,格式是 "-Dx=y"(默认是none),查看下面的表格的选项以组成一个可能的列表 |
| SPARK_LOCAL_DIRS | Spark中暂存空间的目录。包括map的输出文件和存储在磁盘上的RDDs(including map output files and RDDs that get stored on disk)。这必须在一个快速的、你的系统的本地磁盘上。它可以是一个逗号分隔的列表,代表不同磁盘的多个目录 |
| SPARK_WORKER_CORES | Spark应用程序可以用到的核心数(默认是所有可用) |
| SPARK_WORKER_MEMORY | Spark应用程序用到的内存总数(默认是内存总数减去1G)。注意,每个应用程序个体的内存通过`spark.executor.memory`设置 |
| SPARK_WORKER_PORT | 在指定的端口上启动Spark worker(默认是随机) |
| SPARK_WORKER_WEBUI_PORT | worker UI的端口(默认是8081) |
| SPARK_WORKER_INSTANCES | 每台机器运行的worker实例数,默认是1。如果你有一台非常大的机器并且希望运行多个worker,你可以设置这个数大于1。如果你设置了这个环境变量,确保你也设置了`SPARK_WORKER_CORES`环境变量用于限制每个worker的核数或者每个worker尝试使用所有的核。 |
| SPARK_WORKER_DIR | Spark worker运行目录,该目录包括日志和暂存空间(默认是SPARK_HOME/work) |
| SPARK_WORKER_OPTS | 应用到worker的配置属性,格式是 "-Dx=y"(默认是none),查看下面表格的选项以组成一个可能的列表 |
| SPARK_DAEMON_MEMORY | 分配给Spark master和worker守护进程的内存(默认是512m) |
| SPARK_DAEMON_JAVA_OPTS | Spark master和worker守护进程的JVM选项,格式是"-Dx=y"(默认为none) |
| SPARK_PUBLIC_DNS | Spark master和worker公共的DNS名(默认是none) |
注意,启动脚本还不支持windows。为了在windows上启动Spark集群,需要手动启动master和workers。
`SPARK_MASTER_OPTS`支持一下的系统属性:
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.deploy.retainedApplications | 200 | 展示完成的应用程序的最大数目。老的应用程序会被删除以满足该限制 |
| spark.deploy.retainedDrivers | 200 | 展示完成的drivers的最大数目。老的应用程序会被删除以满足该限制 |
| spark.deploy.spreadOut | true | 这个选项控制独立的集群管理器是应该跨节点传递应用程序还是应努力将程序整合到尽可能少的节点上。在HDFS中,传递程序是数据本地化更好的选择,但是,对于计算密集型的负载,整合会更有效率。 |
| spark.deploy.defaultCores | (infinite) | 在Spark独立模式下,给应用程序的默认核数(如果没有设置`spark.cores.max`)。如果没有设置,应用程序总数获得所有可用的核,除非设置了`spark.cores.max`。在共享集群上设置较低的核数,可用防止用户默认抓住整个集群。 |
| spark.worker.timeout | 60 | 独立部署的master认为worker失败(没有收到心跳信息)的间隔时间。 |
`SPARK_WORKER_OPTS`支持的系统属性:
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.worker.cleanup.enabled | false | 周期性的清空worker/应用程序目录。注意,这仅仅影响独立部署模式。不管应用程序是否还在执行,用于程序目录都会被清空 |
| spark.worker.cleanup.interval | 1800 (30分) | 在本地机器上,worker清空老的应用程序工作目录的时间间隔 |
| spark.worker.cleanup.appDataTtl | 7 * 24 * 3600 (7天) | 每个worker中应用程序工作目录的保留时间。这个时间依赖于你可用磁盘空间的大小。应用程序日志和jar包上传到每个应用程序的工作目录。随着时间的推移,工作目录会很快的填满磁盘空间,特别是如果你运行的作业很频繁。 |
### 连接一个应用程序到集群中
为了在Spark集群中运行一个应用程序,简单地传递`spark://IP:PORT` URL到[SparkContext](http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark)
为了在集群上运行一个交互式的Spark shell,运行一下命令:
~~~
./bin/spark-shell --master spark://IP:PORT
~~~
你也可以传递一个选项`--total-executor-cores `去控制spark-shell的核数。
### 启动Spark应用程序
[spark-submit脚本](#)支持最直接的提交一个Spark应用程序到集群。对于独立部署的集群,Spark目前支持两种部署模式。在`client`模式中,driver启动进程与客户端提交应用程序所在的进程是同一个进程。然而,在`cluster`模式中,driver在集群的某个worker进程中启动,只有客户端进程完成了提交任务,它不会等到应用程序完成就会退出。
如果你的应用程序通过Spark submit启动,你的应用程序jar包将会自动分发到所有的worker节点。对于你的应用程序依赖的其它jar包,你应该用`--jars`符号指定(如`--jars jar1,jar2`)。
另外,`cluster`模式支持自动的重启你的应用程序(如果程序一非零的退出码退出)。为了用这个特征,当启动应用程序时,你可以传递`--supervise`符号到`spark-submit`。如果你想杀死反复失败的应用,你可以通过如下的方式:
~~~
./bin/spark-class org.apache.spark.deploy.Client kill
~~~
你可以在独立部署的Master web UI(http://:8080)中找到driver ID。
### 资源调度
独立部署的集群模式仅仅支持简单的FIFO调度器。然而,为了允许多个并行的用户,你能够控制每个应用程序能用的最大资源数。在默认情况下,它将获得集群的所有核,这只有在某一时刻只允许一个应用程序才有意义。你可以通过`spark.cores.max`在[SparkConf](http://spark.apache.org/docs/latest/configuration.html#spark-properties)中设置核数。
~~~
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)
~~~
另外,你可以在集群的master进程中配置`spark.deploy.defaultCores`来改变默认的值。在`conf/spark-env.sh`添加下面的行:
~~~
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores="
~~~
这在用户没有配置最大核数的共享集群中是有用的。
### 高可用
默认情况下,独立的调度集群对worker失败是有弹性的(在Spark本身的范围内是有弹性的,对丢失的工作通过转移它到另外的worker来解决)。然而,调度器通过master去执行调度决定,这会造成单点故障:如果master死了,新的应用程序就无法创建。为了避免这个,我们有两个高可用的模式。
### 用ZooKeeper的备用master
利用ZooKeeper去支持领导选举以及一些状态存储,你能够在你的集群中启动多个master,这些master连接到同一个ZooKeeper实例上。一个被选为“领导”,其它的保持备用模式。如果当前的领导死了,另一个master将会被选中,恢复老master的状态,然后恢复调度。整个的恢复过程大概需要1到2分钟。注意,这个恢复时间仅仅会影响调度新的应用程序-运行在失败master中的应用程序不受影响。
#### 配置
为了开启这个恢复模式,你可以用下面的属性在`spark-env`中设置`SPARK_DAEMON_JAVA_OPTS`。
| System property | Meaning |
|-----|-----|
| spark.deploy.recoveryMode | 设置ZOOKEEPER去启动备用master模式(默认为none) |
| spark.deploy.zookeeper.url | zookeeper集群url(如192.168.1.100:2181,192.168.1.101:2181) |
| spark.deploy.zookeeper.dir | zookeeper保存恢复状态的目录(默认是/spark) |
可能的陷阱:如果你在集群中有多个masters,但是没有用zookeeper正确的配置这些masters,这些masters不会发现彼此,会认为它们都是leaders。这将会造成一个不健康的集群状态(因为所有的master都会独立的调度)。
#### 细节
zookeeper集群启动之后,开启高可用是简单的。在相同的zookeeper配置(zookeeper URL和目录)下,在不同的节点上简单地启动多个master进程。master可以随时添加和删除。
为了调度新的应用程序或者添加worker到集群,它需要知道当前leader的IP地址。这可以通过简单的传递一个master列表来完成。例如,你可能启动你的SparkContext指向`spark://host1:port1,host2:port2`。这将造成你的SparkContext同时注册这两个master-如果`host1`死了,这个配置文件将一直是正确的,因为我们将找到新的leader-`host2`。
"registering with a Master"和正常操作之间有重要的区别。当启动时,一个应用程序或者worker需要能够发现和注册当前的leader master。一旦它成功注册,它就在系统中了。如果错误发生,新的leader将会接触所有之前注册的应用程序和worker,通知他们领导关系的变化,所以它们甚至不需要事先知道新启动的leader的存在。
由于这个属性的存在,新的master可以在任何时候创建。你唯一需要担心的问题是新的应用程序和workers能够发现它并将它注册进来以防它成为leader master。
### 用本地文件系统做单节点恢复
zookeeper是生产环境下最好的选择,但是如果你想在master死掉后重启它,`FILESYSTEM`模式可以解决。当应用程序和worker注册,它们拥有足够的状态写入提供的目录,以至于在重启master进程时它们能够恢复。
#### 配置
为了开启这个恢复模式,你可以用下面的属性在`spark-env`中设置`SPARK_DAEMON_JAVA_OPTS`。
| System property | Meaning |
|-----|-----|
| spark.deploy.recoveryMode | 设置为FILESYSTEM开启单节点恢复模式(默认为none) |
| spark.deploy.recoveryDirectory | 用来恢复状态的目录 |
#### 细节
- 这个解决方案可以和监控器/管理器(如[monit](http://mmonit.com/monit/))相配合,或者仅仅通过重启开启手动恢复。
- 虽然文件系统的恢复似乎比没有做任何恢复要好,但对于特定的开发或实验目的,这种模式可能是次优的。特别是,通过`stop-master.sh`杀掉master不会清除它的恢复状态,所以,不管你何时启动一个新的master,它都将进入恢复模式。这可能使启动时间增加到1分钟。
- 虽然它不是官方支持的方式,你也可以创建一个NFS目录作为恢复目录。如果原始的master节点完全死掉,你可以在不同的节点启动master,它可以正确的恢复之前注册的所有应用程序和workers。未来的应用程序会发现这个新的master。
';
提交应用程序
最后更新于:2022-04-01 22:20:09
# 提交应用程序
在Spark bin目录下的`spark-submit`可以用来在集群上启动应用程序。它可以通过统一的接口使用Spark支持的所有[集群管理器](https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types),所有你不必为每一个管理器做相应的配置。
### 用spark-submit启动应用程序
`bin/spark-submit`脚本负责建立包含Spark以及其依赖的类路径(classpath),它支持不同的集群管理器以及Spark支持的加载模式。
~~~
./bin/spark-submit \
--class
--master \
--deploy-mode \
--conf = \
... # other options
\
[application-arguments]
~~~
一些常用的选项是:
- `--class`:你的应用程序的入口点(如org.apache.spark.examples.SparkPi)
- `--master`:集群的master URL(如spark://23.195.26.187:7077)
- `--deploy-mode`:在worker节点部署你的driver(cluster)或者本地作为外部客户端(client)。默认是client。
- `--conf`:任意的Spark配置属性,格式是key=value。
- `application-jar`:包含应用程序以及其依赖的jar包的路径。这个URL必须在集群中全局可见,例如,存在于所有节点的`hdfs://`路径或`file://`路径
- `application-arguments`:传递给主类的主方法的参数
一个通用的部署策略是从网关集群提交你的应用程序,这个网关机器和你的worker集群物理上协作。在这种设置下,`client`模式是适合的。在`client`模式下,driver直接在`spark-submit`进程中启动,而这个进程直接作为集群的客户端。应用程序的输入和输出都和控制台相连接。因此,这种模式特别适合涉及REPL的应用程序。
另一种选择,如果你的应用程序从一个和worker机器相距很远的机器上提交,通常情况下用`cluster`模式减少drivers和executors的网络迟延。注意,`cluster`模式目前不支持独立集群、mesos集群以及python应用程序。
有几个我们使用的集群管理器特有的可用选项。例如,在Spark独立集群的`cluster`模式下,你也可以指定`--supervise`用来确保driver自动重启(如果它因为非零退出码失败)。为了列举spark-submit所有的可用选项,用`--help`运行它。
~~~
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
100
# Run on a Spark Standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark Standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster
--supervise
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn-cluster \ # can also be `yarn-client` for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark Standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
~~~
### Master URLs
传递给Spark的url可以用下面的模式
| Master URL | Meaning |
|-----|-----|
| local | 用一个worker线程本地运行Spark |
| local[K] | 用k个worker线程本地运行Spark(理想情况下,设置这个值为你的机器的核数) |
| local[*] | 用尽可能多的worker线程本地运行Spark |
| spark://HOST:PORT | 连接到给定的Spark独立部署集群master。端口必须是master配置的端口,默认是7077 |
| mesos://HOST:PORT | 连接到给定的mesos集群 |
| yarn-client | 以`client`模式连接到Yarn集群。群集位置将基于通过HADOOP_CONF_DIR变量找到 |
| yarn-cluster | 以`cluster`模式连接到Yarn集群。群集位置将基于通过HADOOP_CONF_DIR变量找到 |
';
例子
最后更新于:2022-04-01 22:20:06
# 例子
假定我们想从一些文本文件中构建一个图,限制这个图包含重要的关系和用户,并且在子图上运行page-rank,最后返回与top用户相关的属性。可以通过如下方式实现。
~~~
// Connect to the Spark cluster
val sc = new SparkContext("spark://master.amplab.org", "research")
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("graphx/data/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
~~~
';
图算法
最后更新于:2022-04-01 22:20:04
# 图算法
GraphX包括一组图算法来简化分析任务。这些算法包含在`org.apache.spark.graphx.lib`包中,可以被直接访问。
### PageRank算法
PageRank度量一个图中每个顶点的重要程度,假定从u到v的一条边代表v的重要性标签。例如,一个Twitter用户被许多其它人粉,该用户排名很高。GraphX带有静态和动态PageRank的实现方法,这些方法在[PageRank object](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.lib.PageRank$)中。静态的PageRank运行固定次数的迭代,而动态的PageRank一直运行,直到收敛。[GraphOps]()允许直接调用这些算法作为图上的方法。
GraphX包含一个我们可以运行PageRank的社交网络数据集的例子。用户集在`graphx/data/users.txt`中,用户之间的关系在`graphx/data/followers.txt`中。我们通过下面的方法计算每个用户的PageRank。
~~~
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
~~~
### 连通体算法
连通体算法用id标注图中每个连通体,将连通体中序号最小的顶点的id作为连通体的id。例如,在社交网络中,连通体可以近似为集群。GraphX在[ConnectedComponents object](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.lib.ConnectedComponents$)中包含了一个算法的实现,我们通过下面的方法计算社交网络数据集中的连通体。
~~~
/ Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
~~~
### 三角形计数算法
一个顶点有两个相邻的顶点以及相邻顶点之间的边时,这个顶点是一个三角形的一部分。GraphX在[TriangleCount object](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.lib.TriangleCount$)中实现了一个三角形计数算法,它计算通过每个顶点的三角形的数量。需要注意的是,在计算社交网络数据集的三角形计数时,`TriangleCount`需要边的方向是规范的方向(srcId < dstId),并且图通过`Graph.partitionBy`分片过。
~~~
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
~~~
';
顶点和边RDDs
最后更新于:2022-04-01 22:20:02
# 顶点和边RDDs
GraphX暴露保存在图中的顶点和边的RDD。然而,因为GraphX包含的顶点和边拥有优化的数据结构,这些数据结构提供了额外的功能。顶点和边分别返回`VertexRDD`和`EdgeRDD`。这一章我们将学习它们的一些有用的功能。
### VertexRDDs
`VertexRDD[A]`继承自`RDD[(VertexID, A)]`并且添加了额外的限制,那就是每个`VertexID`只能出现一次。此外,`VertexRDD[A]`代表了一组属性类型为A的顶点。在内部,这通过保存顶点属性到一个可重复使用的hash-map数据结构来获得。所以,如果两个`VertexRDDs`从相同的基本`VertexRDD`获得(如通过filter或者mapValues),它们能够在固定的时间内连接而不需要hash评价。为了利用这个索引数据结构,`VertexRDD`暴露了一下附加的功能:
~~~
class VertexRDD[VD] extends RDD[(VertexID, VD)] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
~~~
举个例子,`filter`操作如何返回一个VertexRDD。过滤器实际使用一个`BitSet`实现,因此它能够重用索引以及保留和其它`VertexRDDs`做连接时速度快的能力。同样的,`mapValues`操作不允许`map`函数改变`VertexID`,因此可以保证相同的`HashMap`数据结构能够重用。当连接两个从相同的`hashmap`获取的VertexRDDs和使用线性扫描而不是昂贵的点查找实现连接操作时,`leftJoin`和`innerJoin`都能够使用。
从一个`RDD[(VertexID, A)]`高效地构建一个新的`VertexRDD`,`aggregateUsingIndex`操作是有用的。概念上,如果我通过一组顶点构造了一个`VertexRDD[B]`,而`VertexRDD[B]`是一些`RDD[(VertexID, A)]`中顶点的超集,那么我们就可以在聚合以及随后索引`RDD[(VertexID, A)]`中重用索引。例如:
~~~
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
~~~
### EdgeRDDs
`EdgeRDD[ED]`继承自`RDD[Edge[ED]]`,使用定义在[PartitionStrategy](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.PartitionStrategy)的各种分区策略中的一个在块分区中组织边。在每个分区中,边属性和相邻结构被分别保存,当属性值改变时,它们可以最大化的重用。
`EdgeRDD`暴露了三个额外的函数
~~~
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
~~~
在大多数的应用中,我们发现,EdgeRDD操作可以通过图操作者(graph operators)或者定义在基本RDD中的操作来完成。
';
图构造者
最后更新于:2022-04-01 22:20:00
# 图构造者
GraphX提供了几种方式从RDD或者磁盘上的顶点和边集合构造图。默认情况下,没有哪个图构造者为图的边重新分区,而是把边保留在默认的分区中(例如HDFS中它们的原始块)。[Graph.groupEdges](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@groupEdges((ED,ED)⇒ED):Graph[VD,ED])需要重新分区图,因为它假定相同的边将会被分配到同一个分区,所以你必须在调用groupEdges之前调用[Graph.partitionBy](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@partitionBy(PartitionStrategy):Graph[VD,ED])
~~~
object GraphLoader {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int]
}
~~~
[GraphLoader.edgeListFile](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int])提供了一个方式从磁盘上的边列表中加载一个图。它解析如下形式(源顶点ID,目标顶点ID)的连接表,跳过以`#`开头的注释行。
~~~
# This is a comment
2 1
4 1
1 2
~~~
它从指定的边创建一个图,自动地创建边提及的所有顶点。所有的顶点和边的属性默认都是1。`canonicalOrientation`参数允许重定向正方向(srcId < dstId)的边。这在[connected components](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.lib.ConnectedComponents$)算法中需要用到。`minEdgePartitions`参数指定生成的边分区的最少数量。边分区可能比指定的分区更多,例如,一个HDFS文件包含更多的块。
~~~
object Graph {
def apply[VD, ED](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
def fromEdges[VD, ED](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
}
~~~
[Graph.apply](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED])允许从顶点和边的RDD上创建一个图。重复的顶点可以任意的选择其中一个,在边RDD中而不是在顶点RDD中发现的顶点分配默认的属性。
[Graph.fromEdges](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED])允许仅仅从一个边RDD上创建一个图,它自动地创建边提及的顶点,并分配这些顶点默认的值。
[Graph.fromEdgeTuples](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int])允许仅仅从一个边元组组成的RDD上创建一个图。分配给边的值为1。它自动地创建边提及的顶点,并分配这些顶点默认的值。它还支持删除边。为了删除边,需要传递一个[PartitionStrategy](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.PartitionStrategy)为值的`Some`作为`uniqueEdges`参数(如uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。分配相同的边到同一个分区从而使它们可以被删除,一个分区策略是必须的。
';
Pregel API
最后更新于:2022-04-01 22:19:57
# Pregel API
图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。一系列的graph-parallel抽象已经被提出来用来表达这些迭代算法。GraphX公开了一个类似Pregel的操作,它是广泛使用的Pregel和GraphLab抽象的一个融合。
在GraphX中,更高级的Pregel操作是一个约束到图拓扑的批量同步(bulk-synchronous)并行消息抽象。Pregel操作者执行一系列的超级步骤(super steps),在这些步骤中,顶点从之前的超级步骤中接收进入(inbound)消息的总和,为顶点属性计算一个新的值,然后在以后的超级步骤中发送消息到邻居顶点。不像Pregel而更像GraphLab,消息作为一个边三元组的函数被并行计算,消息计算既访问了源顶点特征也访问了目的顶点特征。在超级步中,没有收到消息的顶点被跳过。当没有消息遗留时,Pregel操作停止迭代并返回最终的图。
注意,与更标准的Pregel实现不同的是,GraphX中的顶点仅仅能发送信息给邻居顶点,并利用用户自定义的消息函数构造消息。这些限制允许在GraphX进行额外的优化。
一下是[ Pregel操作](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexId,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED])的类型签名以及实现草图(注意,访问graph.cache已经被删除)
~~~
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages: -----------------------------------------------------------------------
// Run the vertex program on all vertices that receive messages
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Merge the new vertex values back into the graph
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
// Send Messages: ------------------------------------------------------------------------------
// Vertices that didn't receive a message above don't appear in newVerts and therefore don't
// get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
// on edges in the activeDir of vertices in newVerts
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}
~~~
注意,pregel有两个参数列表(graph.pregel(list1)(list2))。第一个参数列表包含配置参数初始消息、最大迭代数、发送消息的边的方向(默认是沿边方向出)。第二个参数列表包含用户自定义的函数用来接收消息(vprog)、计算消息(sendMsg)、合并消息(mergeMsg)。
我们可以用Pregel操作表达计算单源最短路径( single source shortest path)。
~~~
import org.apache.spark.graphx._
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
~~~
';
图操作符
最后更新于:2022-04-01 22:19:55
# 图操作符
正如RDDs有基本的操作map, filter和reduceByKey一样,属性图也有基本的集合操作,这些操作采用用户自定义的函数并产生包含转换特征和结构的新图。定义在[Graph](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph)中的核心操作是经过优化的实现。表示为核心操作的组合的便捷操作定义在[GraphOps](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps)中。然而,因为有Scala的隐式转换,定义在`GraphOps`中的操作可以作为`Graph`的成员自动使用。例如,我们可以通过下面的方式计算每个顶点(定义在GraphOps中)的入度。
~~~
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
~~~
区分核心图操作和`GraphOps`的原因是为了在将来支持不同的图表示。每个图表示都必须提供核心操作的实现并重用很多定义在`GraphOps`中的有用操作。
### 操作一览
一下是定义在`Graph`和`GraphOps`中(为了简单起见,表现为图的成员)的功能的快速浏览。注意,某些函数签名已经简化(如默认参数和类型的限制已删除),一些更高级的功能已经被删除,所以请参阅API文档了解官方的操作列表。
~~~
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexID, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexID, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}
~~~
### 属性操作
如RDD的`map`操作一样,属性图包含下面的操作:
~~~
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
~~~
每个操作都产生一个新的图,这个新的图包含通过用户自定义的map操作修改后的顶点或边的属性。
注意,每种情况下图结构都不受影响。这些操作的一个重要特征是它允许所得图形重用原有图形的结构索引(indices)。下面的两行代码在逻辑上是等价的,但是第一个不保存结构索引,所以不会从GraphX系统优化中受益。
~~~
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)
~~~
另一种方法是用[mapVertices](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED])保存索引。
~~~
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
~~~
这些操作经常用来初始化的图形,用作特定计算或者用来处理项目不需要的属性。例如,给定一个图,这个图的顶点特征包含出度,我们为PageRank初始化它。
~~~
// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
~~~
### 结构性操作
当前的GraphX仅仅支持一组简单的常用结构性操作。下面是基本的结构性操作列表。
~~~
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
~~~
[reverse](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@reverse:Graph[VD,ED])操作返回一个新的图,这个图的边的方向都是反转的。例如,这个操作可以用来计算反转的PageRank。因为反转操作没有修改顶点或者边的属性或者改变边的数量,所以我们可以在不移动或者复制数据的情况下有效地实现它。
[subgraph](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexId,VD)⇒Boolean):Graph[VD,ED])操作利用顶点和边的谓词(predicates),返回的图仅仅包含满足顶点谓词的顶点、满足边谓词的边以及满足顶点谓词的连接顶点(connect vertices)。`subgraph`操作可以用于很多场景,如获取感兴趣的顶点和边组成的图或者获取清除断开链接后的图。下面的例子删除了断开的链接。
~~~
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
~~~
注意,上面的例子中,仅仅提供了顶点谓词。如果没有提供顶点或者边的谓词,`subgraph`操作默认为true。
[mask](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@mask[VD2,ED2](Graph[VD2,ED2])(ClassTag[VD2],ClassTag[ED2]):Graph[VD,ED])操作构造一个子图,这个子图包含输入图中包含的顶点和边。这个操作可以和`subgraph`操作相结合,基于另外一个相关图的特征去约束一个图。例如,我们可能利用缺失顶点的图运行连通体(?连通组件connected components),然后返回有效的子图。
~~~
/ Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
~~~
[groupEdges](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@groupEdges((ED,ED)⇒ED):Graph[VD,ED])操作合并多重图中的并行边(如顶点对之间重复的边)。在大量的应用程序中,并行的边可以合并(它们的权重合并)为一条边从而降低图的大小。
### 连接操作
在许多情况下,有必要将外部数据加入到图中。例如,我们可能有额外的用户属性需要合并到已有的图中或者我们可能想从一个图中取出顶点特征加入到另外一个图中。这些任务可以用join操作完成。下面列出的是主要的join操作。
~~~
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
~~~
[joinVertices](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)⇒VD)(ClassTag[U]):Graph[VD,ED])操作将输入RDD和顶点相结合,返回一个新的带有顶点特征的图。这些特征是通过在连接顶点的结果上使用用户定义的`map`函数获得的。在RDD中没有匹配值的顶点保留其原始值。
注意,对于给定的顶点,如果RDD中有超过1个的匹配值,则仅仅使用其中的一个。建议用下面的方法保证输入RDD的唯一性。下面的方法也会预索引返回的值用以加快后续的join操作。
~~~
val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
(id, oldCost, extraCost) => oldCost + extraCost)
~~~
除了将用户自定义的map函数用到所有顶点和改变顶点属性类型以外,更一般的[outerJoinVertices](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED])与`joinVertices`类似。因为并不是所有顶点在RDD中拥有匹配的值,map函数需要一个option类型。
~~~
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
outDegOpt match {
case Some(outDeg) => outDeg
case None => 0 // No outDegree means zero outDegree
}
}
~~~
你可能已经注意到了,在上面的例子中用到了curry函数的多参数列表。虽然我们可以将f(a)(b)写成f(a,b),但是f(a,b)意味着b的类型推断将不会依赖于a。因此,用户需要为定义的函数提供类型标注。
~~~
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)
~~~
### 相邻聚合(Neighborhood Aggregation)
图分析任务的一个关键步骤是汇总每个顶点附近的信息。例如我们可能想知道每个用户的追随者的数量或者每个用户的追随者的平均年龄。许多迭代图算法(如PageRank,最短路径和连通体)多次聚合相邻顶点的属性。
为了提高性能,主要的聚合操作从`graph.mapReduceTriplets`改为了新的`graph.AggregateMessages`。虽然API的改变相对较小,但是我们仍然提供了过渡的指南。
### 聚合消息(aggregateMessages)
GraphX中的核心聚合操作是[aggregateMessages](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A])。这个操作将用户定义的`sendMsg`函数应用到图的每个边三元组(edge triplet),然后应用`mergeMsg`函数在其目的顶点聚合这些消息。
~~~
class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}
~~~
用户自定义的`sendMsg`函数是一个[EdgeContext](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.EdgeContext)类型。它暴露源和目的属性以及边缘属性以及发送消息给源和目的属性的函数([sendToSrc](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToSrc(msg:A):Unit)和[sendToDst](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.EdgeContext@sendToDst(msg:A):Unit))。可将`sendMsg`函数看做map-reduce过程中的map函数。用户自定义的`mergeMsg`函数指定两个消息到相同的顶点并保存为一个消息。可以将`mergeMsg`函数看做map-reduce过程中的reduce函数。[aggregateMessages](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A])操作返回一个包含聚合消息(目的地为每个顶点)的`VertexRDD[Msg]`。没有接收到消息的顶点不包含在返回的`VertexRDD`中。
另外,[aggregateMessages](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A])有一个可选的`tripletFields`参数,它指出在`EdgeContext`中,哪些数据被访问(如源顶点特征而不是目的顶点特征)。`tripletsFields`可能的选项定义在[TripletFields](http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/TripletFields.html)中。`tripletFields`参数用来通知GraphX仅仅只需要`EdgeContext`的一部分允许GraphX选择一个优化的连接策略。例如,如果我们想计算每个用户的追随者的平均年龄,我们仅仅只需要源字段。所以我们用`TripletFields.Src`表示我们仅仅只需要源字段。
在下面的例子中,我们用`aggregateMessages`操作计算每个用户更年长的追随者的年龄。
~~~
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst(1, triplet.srcAttr)
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
~~~
当消息(以及消息的总数)是常量大小(列表和连接替换为浮点数和添加)时,`aggregateMessages`操作的效果最好。
### Map Reduce三元组过渡指南
在之前版本的GraphX中,利用[mapReduceTriplets]操作完成相邻聚合。
~~~
class Graph[VD, ED] {
def mapReduceTriplets[Msg](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
reduce: (Msg, Msg) => Msg)
: VertexRDD[Msg]
}
~~~
`mapReduceTriplets`操作在每个三元组上应用用户定义的map函数,然后保存用用户定义的reduce函数聚合的消息。然而,我们发现用户返回的迭代器是昂贵的,它抑制了我们添加额外优化(例如本地顶点的重新编号)的能力。[aggregateMessages](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@aggregateMessages[A]((EdgeContext[VD,ED,A])⇒Unit,(A,A)⇒A,TripletFields)(ClassTag[A]):VertexRDD[A])暴露三元组字段和函数显示的发送消息到源和目的顶点。并且,我们删除了字节码检测转而需要用户指明三元组的哪些字段实际需要。
下面的代码用到了`mapReduceTriplets`
~~~
val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
~~~
下面的代码用到了`aggregateMessages`
~~~
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: Int, b: Int): Int = a + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
~~~
### 计算度信息
最一般的聚合任务就是计算顶点的度,即每个顶点相邻边的数量。在有向图中,经常需要知道顶点的入度、出度以及总共的度。[GraphOps](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps)类包含一个操作集合用来计算每个顶点的度。例如,下面的例子计算最大的入度、出度和总度。
~~~
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
~~~
### Collecting Neighbors
在某些情况下,通过收集每个顶点相邻的顶点及它们的属性来表达计算可能更容易。这可以通过[collectNeighborIds](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]])和[collectNeighbors](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]])操作来简单的完成
~~~
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
~~~
这些操作是非常昂贵的,因为它们需要重复的信息和大量的通信。如果可能,尽量用`aggregateMessages`操作直接表达相同的计算。
### 缓存和不缓存
在Spark中,RDDs默认是不缓存的。为了避免重复计算,当需要多次利用它们时,我们必须显示地缓存它们。GraphX中的图也有相同的方式。当利用到图多次时,确保首先访问`Graph.cache()`方法。
在迭代计算中,为了获得最佳的性能,不缓存可能是必须的。默认情况下,缓存的RDDs和图会一直保留在内存中直到因为内存压力迫使它们以LRU的顺序删除。对于迭代计算,先前的迭代的中间结果将填充到缓存中。虽然它们最终会被删除,但是保存在内存中的不需要的数据将会减慢垃圾回收。只有中间结果不需要,不缓存它们是更高效的。这涉及到在每次迭代中物化一个图或者RDD而不缓存所有其它的数据集。在将来的迭代中仅用物化的数据集。然而,因为图是由多个RDD组成的,正确的不持久化它们是困难的。对于迭代计算,我们建议使用Pregel API,它可以正确的不持久化中间结果。
';
属性图
最后更新于:2022-04-01 22:19:53
# 属性图
[属性图](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph)是一个有向多重图,它带有连接到每个顶点和边的用户定义的对象。有向多重图中多个并行(parallel)的边共享相同的源和目的地顶点。支持并行边的能力简化了建模场景,这个场景中,相同的顶点存在多种关系(例如co-worker和friend)。每个顶点由一个唯一的64位长的标识符(VertexID)作为key。GraphX并没有对顶点标识强加任何排序。同样,顶点拥有相应的源和目的顶点标识符。
属性图通过vertex(VD)和edge(ED)类型参数化,这些类型是分别与每个顶点和边相关联的对象的类型。
在某些情况下,在相同的图形中,可能希望顶点拥有不同的属性类型。这可以通过继承完成。例如,将用户和产品建模成一个二分图,我们可以用如下方式
~~~
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
~~~
和RDD一样,属性图是不可变的、分布式的、容错的。图的值或者结构的改变需要按期望的生成一个新的图来实现。注意,原始图的大部分都可以在新图中重用,用来减少这种固有的功能数据结构的成本。执行者使用一系列顶点分区试探法来对图进行分区。如RDD一样,图中的每个分区可以在发生故障的情况下被重新创建在不同的机器上。
逻辑上的属性图对应于一对类型化的集合(RDD),这个集合编码了每一个顶点和边的属性。因此,图类包含访问图中顶点和边的成员。
~~~
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
~~~
`VertexRDD[VD]`和`EdgeRDD[ED]`类分别继承和优化自`RDD[(VertexID, VD)]`和`RDD[Edge[ED]]`。`VertexRDD[VD]`和`EdgeRDD[ED]`都支持额外的功能来建立在图计算和利用内部优化。
### 属性图的例子
在GraphX项目中,假设我们想构造一个包括不同合作者的属性图。顶点属性可能包含用户名和职业。我们可以用描述合作者之间关系的字符串标注边缘。
![属性图](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-16_55d04e997662d.png)
所得的图形将具有类型签名
~~~
val userGraph: Graph[(String, String), String]
~~~
有很多方式从一个原始文件、RDD构造一个属性图。最一般的方法是利用[Graph object](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph$)。下面的代码从RDD集合生成属性图。
~~~
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
~~~
在上面的例子中,我们用到了[Edge](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Edge)样本类。边有一个`srcId`和`dstId`分别对应于源和目标顶点的标示符。另外,`Edge`类有一个`attr`成员用来存储边属性。
我们可以分别用`graph.vertices`和`graph.edges`成员将一个图解构为相应的顶点和边。
~~~
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
~~~
~~~
注意,graph.vertices返回一个VertexRDD[(String, String)],它继承于 RDD[(VertexID, (String, String))]。所以我们可以用scala的case表达式解构这个元组。另一方面,
graph.edges返回一个包含Edge[String]对象的EdgeRDD。我们也可以用到case类的类型构造器,如下例所示。
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
~~~
除了属性图的顶点和边视图,GraphX也包含了一个三元组视图,三元视图逻辑上将顶点和边的属性保存为一个`RDD[EdgeTriplet[VD, ED]]`,它包含[EdgeTriplet](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.EdgeTriplet)类的实例。可以通过下面的Sql表达式表示这个连接。
~~~
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
~~~
或者通过下面的图来表示。
![triplet](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-16_55d04e99926f1.png)
`EdgeTriplet`类继承于`Edge`类,并且加入了`srcAttr`和`dstAttr`成员,这两个成员分别包含源和目的的属性。我们可以用一个三元组视图渲染字符串集合用来描述用户之间的关系。
~~~
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
graph.triplets.map(triplet =>
triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
~~~
';
开始
最后更新于:2022-04-01 22:19:51
# 开始
开始的第一步是引入Spark和GraphX到你的项目中,如下面所示
~~~
mport org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
~~~
如果你没有用到Spark shell,你还将需要SparkContext。
';
GraphX编程指南
最后更新于:2022-04-01 22:19:48
# GraphX编程指南
GraphX是一个新的(alpha)Spark API,它用于图和并行图(graph-parallel)的计算。GraphX通过引入[Resilient Distributed Property Graph](#):带有顶点和边属性的有向多重图,来扩展Spark RDD。为了支持图计算,GraphX公开一组基本的功能操作以及Pregel API的一个优化。另外,GraphX包含了一个日益增长的图算法和图builders的集合,用以简化图分析任务。
从社交网络到语言建模,不断增长的规模和图形数据的重要性已经推动了许多新的`graph-parallel`系统(如[Giraph](http://giraph.apache.org/)和[GraphLab](http://graphlab.org/))的发展。通过限制可表达的计算类型和引入新的技术来划分和分配图,这些系统可以高效地执行复杂的图形算法,比一般的`data-parallel`系统快很多。
![data parallel vs graph parallel](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-16_55d04e9915533.png)
然而,通过这种限制可以提高性能,但是很难表示典型的图分析途径(构造图、修改它的结构或者表示跨多个图的计算)中很多重要的stages。另外,我们如何看待数据取决于我们的目标,并且同一原始数据可能有许多不同表和图的视图。
![表和图](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-16_55d04e993d65c.png)
结论是,图和表之间经常需要能够相互移动。然而,现有的图分析管道必须组成`graph-parallel`和`data- parallel`系统`,从而实现大数据的迁移和复制并生成一个复杂的编程模型。
![图分析路径](https://docs.gechiui.com/gc-content/uploads/sites/kancloud/2015-08-16_55d04e9952109.png)
GraphX项目的目的就是将`graph-parallel`和`data-parallel`统一到一个系统中,这个系统拥有一个唯一的组合API。GraphX允许用户将数据当做一个图和一个集合(RDD),而不需要数据移动或者复制。通过将最新的进展整合进`graph-parallel`系统,GraphX能够优化图操作的执行。
- [开始](#)
- [属性图](#)
- [图操作符](#)
- [Pregel API](#)
- [图构造者](#)
- [顶点和边RDDs](#)
- [图算法](#)
- [例子](#)
';
Spark SQL数据类型
最后更新于:2022-04-01 22:19:46
# Spark SQL数据类型
- 数字类型
- ByteType:代表一个字节的整数。范围是-128到127
- ShortType:代表两个字节的整数。范围是-32768到32767
- IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
- LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
- FloatType:代表4字节的单精度浮点数
- DoubleType:代表8字节的双精度浮点数
- DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
- StringType:代表一个字符串值
- BinaryType:代表一个byte序列值
- BooleanType:代表boolean值
- Datetime类型
- TimestampType:代表包含字段年,月,日,时,分,秒的值
- DateType:代表包含字段年,月,日的值
- 复杂类型
- ArrayType(elementType, containsNull):代表由elementType类型元素组成的序列值。`containsNull`用来指明`ArrayType`中的值是否有null值
- MapType(keyType, valueType, valueContainsNull):表示包括一组键 - 值对的值。通过keyType表示key数据的类型,通过valueType表示value数据的类型。`valueContainsNull`用来指明`MapType`中的值是否有null值
- StructType(fields):表示一个拥有`StructFields (fields)`序列结构的值
- StructField(name, dataType, nullable):代表`StructType`中的一个字段,字段的名字通过`name`指定,`dataType`指定field的数据类型,`nullable`表示字段的值是否有null值。
Spark的所有数据类型都定义在包`org.apache.spark.sql`中,你可以通过`import org.apache.spark.sql._`访问它们。
| 数据类型 | Scala中的值类型 | 访问或者创建数据类型的API |
|-----|-----|-----|
| ByteType | Byte | ByteType |
| ShortType | Short | ShortType |
| IntegerType | Int | IntegerType |
| LongType | Long | LongType |
| FloatType | Float | FloatType |
| DoubleType | Double | DoubleType |
| DecimalType | scala.math.BigDecimal | DecimalType |
| StringType | String | StringType |
| BinaryType | Array[Byte] | BinaryType |
| BooleanType | Boolean | BooleanType |
| TimestampType | java.sql.Timestamp | TimestampType |
| DateType | java.sql.Date | DateType |
| ArrayType | scala.collection.Seq | ArrayType(elementType, [containsNull]) 注意containsNull默认为true |
| MapType | scala.collection.Map | MapType(keyType, valueType, [valueContainsNull]) 注意valueContainsNull默认为true |
| StructType | org.apache.spark.sql.Row | StructType(fields) ,注意fields是一个StructField序列,相同名字的两个StructField不被允许 |
| StructField | The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType) | StructField(name, dataType, nullable) |
';
编写语言集成(Language-Integrated)的相关查询
最后更新于:2022-04-01 22:19:44
# 编写语言集成(Language-Integrated)的相关查询
语言集成的相关查询是实验性的,现在暂时只支持scala。
Spark SQL也支持用领域特定语言编写查询。
~~~
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
~~~
DSL使用Scala的符号来表示在潜在表(underlying table)中的列,这些列以前缀(')标示。将这些符号隐式转换成由SQL执行引擎计算的表达式。你可以在[ScalaDoc](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SchemaRDD)中了解详情。
';
其它SQL接口
最后更新于:2022-04-01 22:19:41
# 其它SQL接口
Spark SQL也支持直接运行SQL查询的接口,不用写任何代码。
### 运行Thrift JDBC/ODBC服务器
这里实现的Thrift JDBC/ODBC服务器与Hive 0.12中的[HiveServer2](https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2)相一致。你可以用在Spark或者Hive 0.12附带的beeline脚本测试JDBC服务器。
在Spark目录中,运行下面的命令启动JDBC/ODBC服务器。
~~~
./sbin/start-thriftserver.sh
~~~
这个脚本接受任何的`bin/spark-submit`命令行参数,加上一个`--hiveconf`参数用来指明Hive属性。你可以运行`./sbin/start-thriftserver.sh --help`来获得所有可用选项的完整列表。默认情况下,服务器监听`localhost:10000`。你可以用环境变量覆盖这些变量。
~~~
export HIVE_SERVER2_THRIFT_PORT=
export HIVE_SERVER2_THRIFT_BIND_HOST=
./sbin/start-thriftserver.sh \
--master \
...
~~~
或者通过系统变量覆盖。
~~~
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port= \
--hiveconf hive.server2.thrift.bind.host= \
--master
...
~~~
现在你可以用beeline测试Thrift JDBC/ODBC服务器。
~~~
./bin/beeline
~~~
连接到Thrift JDBC/ODBC服务器的方式如下:
~~~
beeline> !connect jdbc:hive2://localhost:10000
~~~
Beeline将会询问你用户名和密码。在非安全的模式,简单地输入你机器的用户名和空密码就行了。对于安全模式,你可以按照[Beeline文档](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients)的说明来执行。
### 运行Spark SQL CLI
Spark SQL CLI是一个便利的工具,它可以在本地运行Hive元存储服务、执行命令行输入的查询。注意,Spark SQL CLI不能与Thrift JDBC服务器通信。
在Spark目录运行下面的命令可以启动Spark SQL CLI。
~~~
./bin/spark-sql
~~~
';
性能调优
最后更新于:2022-04-01 22:19:39
# 性能调优
对于某些工作负载,可以在通过在内存中缓存数据或者打开一些实验选项来提高性能。
### 在内存中缓存数据
Spark SQL可以通过调用`sqlContext.cacheTable("tableName")`方法来缓存使用柱状格式的表。然后,Spark将会仅仅浏览需要的列并且自动地压缩数据以减少内存的使用以及垃圾回收的压力。你可以通过调用`sqlContext.uncacheTable("tableName")`方法在内存中删除表。
注意,如果你调用`schemaRDD.cache()`而不是`sqlContext.cacheTable(...)`,表将不会用柱状格式来缓存。在这种情况下,`sqlContext.cacheTable(...)`是强烈推荐的用法。
可以在SQLContext上使用setConf方法或者在用SQL时运行`SET key=value`命令来配置内存缓存。
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.sql.inMemoryColumnarStorage.compressed | true | 当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。 |
| spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险 |
### 其它的配置选项
以下的选项也可以用来调整查询执行的性能。有可能这些选项会在以后的版本中弃用,这是因为更多的优化会自动执行。
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.sql.autoBroadcastJoinThreshold | 10485760(10m) | 配置一个表的最大大小(byte)。当执行join操作时,这个表将会广播到所有的worker节点。可以将值设置为-1来禁用广播。注意,目前的统计数据只支持Hive Metastore表,命令`ANALYZE TABLE COMPUTE STATISTICS noscan`已经在这个表中运行。 |
| spark.sql.codegen | false | 当为true时,特定查询中的表达式求值的代码将会在运行时动态生成。对于一些拥有复杂表达式的查询,此选项可导致显著速度提升。然而,对于简单的查询,这个选项会减慢查询的执行 |
| spark.sql.shuffle.partitions | 200 | 配置join或者聚合操作shuffle数据时分区的数量 |
';
Hive表
最后更新于:2022-04-01 22:19:37
# Hive表
Spark SQL也支持从Apache Hive中读出和写入数据。然而,Hive有大量的依赖,所以它不包含在Spark集合中。可以通过`-Phive`和`-Phive-thriftserver`参数构建Spark,使其支持Hive。注意这个重新构建的jar包必须存在于所有的worker节点中,因为它们需要通过Hive的序列化和反序列化库访问存储在Hive中的数据。
当和Hive一起工作是,开发者需要提供HiveContext。HiveContext从SQLContext继承而来,它增加了在MetaStore中发现表以及利用HiveSql写查询的功能。没有Hive部署的用户也可以创建HiveContext。当没有通过`hive-site.xml`配置,上下文将会在当前目录自动地创建`metastore_db`和`warehouse`。
~~~
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
~~~
';
JSON数据集
最后更新于:2022-04-01 22:19:35
# JSON数据集
Spark SQL能够自动推断JSON数据集的模式,加载它为一个SchemaRDD。这种转换可以通过下面两种方法来实现
- jsonFile :从一个包含JSON文件的目录中加载。文件中的每一行是一个JSON对象
- jsonRDD :从存在的RDD加载数据,这些RDD的每个元素是一个包含JSON对象的字符串
注意,作为jsonFile的文件不是一个典型的JSON文件,每行必须是独立的并且包含一个有效的JSON对象。结果是,一个多行的JSON文件经常会失败
~~~
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
// Create a SchemaRDD from the file(s) pointed to by path
val people = sqlContext.jsonFile(path)
// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// Register this SchemaRDD as a table.
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// Alternatively, a SchemaRDD can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
~~~
';
parquet文件
最后更新于:2022-04-01 22:19:32
# Parquet文件
Parquet是一种柱状(columnar)格式,可以被许多其它的数据处理系统支持。Spark SQL提供支持读和写Parquet文件的功能,这些文件可以自动地保留原始数据的模式。
### 加载数据
~~~
// sqlContext from the previous example is used in this example.
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a SchemaRDD.
val parquetFile = sqlContext.parquetFile("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
~~~
### 配置
可以在SQLContext上使用setConf方法配置Parquet或者在用SQL时运行`SET key=value`命令来配置Parquet。
| Property Name | Default | Meaning |
|-----|-----|-----|
| spark.sql.parquet.binaryAsString | false | 一些其它的Parquet-producing系统,特别是Impala和其它版本的Spark SQL,当写出Parquet模式的时候,二进制数据和字符串之间无法区分。这个标记告诉Spark SQL将二进制数据解释为字符串来提供这些系统的兼容性。 |
| spark.sql.parquet.cacheMetadata | true | 打开parquet元数据的缓存,可以提高静态数据的查询速度 |
| spark.sql.parquet.compression.codec | gzip | 设置写parquet文件时的压缩算法,可以接受的值包括:uncompressed, snappy, gzip, lzo |
| spark.sql.parquet.filterPushdown | false | 打开Parquet过滤器的pushdown优化。因为已知的Paruet错误,这个特征默认是关闭的。如果你的表不包含任何空的字符串或者二进制列,打开这个特征仍是安全的 |
| spark.sql.hive.convertMetastoreParquet | true | 当设置为false时,Spark SQL将使用Hive SerDe代替内置的支持 |
';