Spark 入门实例指南

因为课程的机会接触了一下 Spark,许多概念以及对应的思维方式需要改变,这里结合网上的资料以及一些自己测试的例子来进行讲解。


更新记录

  • 2016.04.11: 初稿

Spark 是基于内存的分布式计算框架,因为无需利用 HDFS 作为中间结果保存的介质,性能杠杠的。Spark 是由 Scala 实现的,所以最好学习一下 Scala(当然用 Python 和 Java 也是可以的)。

虽然网上的资料很多,但是我看了一圈,都比较冗长,这里用问答的形式,尽量在 10 分钟之内让大家对 Spark 有基本的认知。

为啥要用 Spark?

  1. 快!基于内存
  2. 易用!Scala, Java, Python 都支持,还有交互式的 Python 和 Scala 的 shell,可以快速进行原型开发
  3. 通用!批处理、交互查询、流处理、机器学习、图计算,样样精通
  4. 兼容!可以使用各种现有的技术作为底层,也可以自己独立运行

Spark 的架构是怎么样的?

  • Driver 就是用户编写的程序,需要创建 SparkContext
  • SparkContext 是用户和 Spark 集群交互的接口,它和 Cluster Manager 交互
  • Cluster Manager 负责资源管理调度
  • Worker Node 是具体执行计算的节点
  • Executer 是在 Worker Node 为某个应用启动的一个进程(相互独立)
  • Task 是被送到 Executor 上的计算任务

Spark 应用是如何工作的?

  1. 用户创建 SparkContext 之后会连接到 Cluster Manager,Cluster Manager 分配计算资源并启动对应的 Executer
  2. Driver 会将用户程序划分为不同的执行阶段,每个执行阶段由一组完全相同的 Task 组成,这些 Task 分别作用于待处理数据的不同分区。在阶段划分完成和 Task 创建后,Driver 会向 Executor 发送 Task
  3. Executor 在接收到 Task 后,会下载 Task 的运行时依赖,在准备好 Task 的执行环境后,会开始执行 Task,并且将 Task 的运行状态汇报给 Driver
  4. Driver 会根据收到的 Task 的运行状态来处理不同的状态更新。Task 分为两种:一种是 Shuffle Map Task,它实现数据的重新洗牌,洗牌的结果保存到 Executor 所在节点的文件系统中;另外一种是 Result Task,它负责生成结果数据
  5. Driver 会不断地调用 Task,将 Task发送到 Executor 执行,在所有的 Task 都正确执行或者超过执行次数的限制仍然没有执行成功时停止

Spark 生态系统有哪些组件?

  • Spark SQL: 类似 Hive,支持在不同 RDD 上进行类似 SQL 的操作
  • Spark Streaming: 对于流数据进行处理
  • MLlib: 机器学习库
  • GraphX: 图并行框架

RDD 是什么?

在 Spark 框架中,最重要的是一类新的数据抽象,叫做 Resilient Distributed Dataset - RDD。RDD 是分布式存储在集群中的内存对象,按照值的范围或者哈希结果进行划分。与此同时 RDD 会记录关于数据进行的各种操作(每次操作都会生成新的 RDD),这样即使节点挂掉,也能够根据之前的操作日志重新得到损失的 RDD

RDD 支持2种操作:

  1. 转换(transformation):从现有的数据集创建一个新的数据集
  2. 动作(actions):在数据集上运行计算后,返回一个值给驱动程序

进阶概念

Lazy Evaluation

RDD 对象分布于很多个部分,我们无法对其进行列表的标准操作,而且 RDD 本身就是为了处理分布式数据开发的。RDD 抽象方式的优势是可以让 Spark 在本地计算机运行。在本地运行时,Spark 把本地计算机的内存划分为很多部分,以模拟在许多机器上进行计算的情境,所以在本地运行时也无需改动代码。

scala> val textFile = sc.textFile("README.md")
scala> textFile.count()

Spark 把一个计算拖延到不得不运行的时候。在上面的代码中,直到运行 textFile.count(),Spark 才把 README 文件载入 RDD。当 val textFile = sc.textFile("README.md") 被调用时,创建了一个指向此文件的指针。但只有当 textFile.count() 需要此文件时,文件才真正被读取进 textFile

sbt 配置

类似于 maven,我们需要一个描述文件,命名为 simple.sbt

name := "Follower"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2"

具体打包时需要按照规矩放置文件,如下

$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# 打包
$ sbt package
# 使用
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "Follower" \
--master local[4] \
target/scala-2.10/follower_2.10-1.0.jar

性能调优

  1. 尽量使用对象数组和原生类型
  2. 尽量避免带有很多小对象和指针的嵌套结构
  3. 使用数值型 ID 或者枚举类来代替 string 作为主键

垃圾回收器调优

当java需要腾出空间给新的对象时,需要跟踪所有的java对象来找出没用的对象,由于垃圾回收器的代价是与java对象的数量成比例的,所以使用较少对象的数据结构(如使用ints数组代替LinkedList)会减少相应的代价。一个更好的方法是将对象序列化,这时每个RDD分片就只有一个对象了(字节数组)。

衡量 GC 影响

GC 调优的第一步是收集 GC 发生的频率和每次 GC 消耗的时间等统计信息,这个工作可以通过添加 -verbose:gc -XX:+PrintGCDetails -XX:PrintGCTimeStamps 属性到 SPARK_JAVA_OPTS 环境变量中。这样当 spark 任务运行时,worker 节点的日志就会显示 GC 每次发生的信息。

GC 当中一个重要的配置参数是缓存 RDD 需要多少内存,默认,spark 使用配置的 spark.executor.memory 值的 60% 的内存来缓存 RDD,意味着 40% 的内存可用于任务执行时对象的创建。

当发现任务执行速度减慢并且发现 GC 频繁工作或者内存溢出,需要降低该百分比以减少内存消耗,比如需要降低为 50%,则使用 conf.set("spark.storage.memoryFraction","0.5")。结合序列化缓存,使用更小的缓存大小对减轻 GC 问题是很有作用的。

  • java 堆空间是被分成年轻代和老年代两个区域,年轻代存放生命周期较短的对象而老年代存放生命周期长的对象
  • 年轻代又被分成 Eden,Survivor1,Survivor2 三个区域

  • cache 的机制
  • persist 的机制
  • 参数的意义

迭代时间变长是由于RDD的lazy computation的特性,以及RDD为了错误恢复,在迭代的过程中都保留着很长且复杂的依赖关系(lineage dependency),不过为什么lineage越长会造成迭代时间越长,揣测是因为迭代的时候会占用大量内存,这也是为什么迭代次数太多会发生内存泄露。解决这个问题,主要就是要懂得合理的cache以及unpersist, 仅仅这些还不够,还需要在适当的时候,truncate lineage设置checkpoint。

环境搭建

目前最新的 Spark 版本是 1.6.1,我们可以在官方网站下载 spark-1.6.1.tgz 并解压(如果想省事儿可以直接用编译好的),然后需要安装一些依赖,在 ubuntu 下执行以下命令即可

# 安装 Scala
sudo apt-get install scala
# 配置环境变量,在 ~/.bashrc 中添加下面语句,注意更改文件夹
export PATH=$PATH:/home/parallels/spark-1.6.1/bin
# 应用改动
source ~/.bashrc
# 进入 spark 文件夹
cd spark-1.6.1
# 启动 spark shell
spark-shell

简单测试

这里尝试一下官方教程中的一些命令

// 读取 README 文件
scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:27
// RDD 中的文件数量
scala> textFile.count()
res0: Long = 95
// RDD 中的第一个内容
scala> textFile.first()
res1: String = # Apache Spark
// 有多少行包括 Spark
scala> textFile.filter(line => line.contains("Spark")).count()
res2: Long = 17

中级测试

这里我们选用 twitter 的数据,样例如下,数据的意思是左边的人(id),关注了右边的人(id):

10000018 19941587 10000018 18348492 10000018 18007448 10000018 14539055 10000018 13942840 10000018 13603904 10000018 12797198 10000018 10000029 10000018 10000017 10000018 10000017

需要注意的是,这里的数据进行了修改,理论上来说应该不会有两条一样的记录,我们启动 spark-shell 完成下面的任务:

  1. 统计一共有多少个不同的 id
  2. 统计一共又多少个不同的 (id, id) 对
// 载入测试数据,这里只有十行,如上面所示
scala> val textFile = sc.textFile("tg10.txt")
// 统计不同的 id 数目
// 先只用 Mapper
scala> val userCount = textFile.flatMap(line => line.split("\t")).map(id => (id, 1))
scala> userCount.count()
// 只用 Mapper,会生成 20 项 res0: Long = 20
// 配合上 Reducer 就可以知道有多少不同的的 id
// 顺带可以统计每个 id 出现了多少次(在左在右都算)
scala> val userCount = textFile.flatMap(line => line.split("\t")).map(id => (id, 1)).reduceByKey(_ + _)
scala> userCount.count()
// 有了 Reducer,就可以知道是 10
res0: Long = 10
// 都打印出来看看,发现和数据相符
scala> userCount.collect() res8: Array[(String, Int)] = Array((10000029,1), (13603904,1), (14539055,1), (19941587,1), (12797198,1), (18007448,1), (10000018,10), (10000017,2), (18348492,1), (13942840,1))
// 统计有多少不同的 (id, id) 组,直接用每行做 key 即可
scala> val edgeCount = textFile.map(id => (id, 1)).reduceByKey(_ + _)
scala> edgeCount.count()
// 结果应当为 9,说明测试正确
res11: Long = 9
// 打印出来看看,无误
scala> edgeCount.collect() res12: Array[(String, Int)] = Array((10000018 13942840,1), (10000018 10000017,2), (10000018 18348492,1), (10000018 19941587,1), (10000018 13603904,1), (10000018 12797198,1), (10000018 14539055,1), (10000018 18007448,1), (10000018 10000029,1))

进阶测试

换一些数据数据,我们需要统计每个人的粉丝数量,也就是需要对第二列进行统计,看看每个 id 各出现了多少次。

具体的数据为,这里修改了一点,理论上来说,id 为 19941587 的用户有俩粉丝:

10000018 19941587 10000017 19941587 10000018 18007448 10000018 14539055 10000018 13942840 10000018 13603904 10000018 12797198 10000018 10000029 10000018 10000017 10000018 5658436 10000018 4869533 10000018 3588965 10000018 40813667 10000018 40184678 10000018 40042307 10000018 39314362 10000018 38198153 10000018 37867561 10000018 36243138 10000018 35679228

代码为

// 载入测试数据,这里有 20 行,如上面所示
scala> val textFile = sc.textFile("tg20.txt")
// 选取第二列,用类似前面的例子来进行统计
scala> val fansCount = textFile.map(line => line.split("\t")(1)).map(id => (id, 1)).reduceByKey(_ + _)
// 看看结果
scala> fansCount.collect()
// 可以看到已经统计完成
res3: Array[(String, Int)] = Array((10000029,1), (13603904,1), (38198153,1), (14539055,1), (40042307,1), (19941587,2), (36243138,1), (12797198,1), (18007448,1), (40184678,1), (35679228,1), (4869533,1), (3588965,1), (39314362,1), (40813667,1), (10000017,1), (13942840,1), (5658436,1), (37867561,1))
// 也可以只看几个
scala> fansCount.take(5) res4: Array[(String, Int)] = Array((10000029,1), (13603904,1), (38198153,1), (14539055,1), (40042307,1))
// 引入头文件
import java.io.PrintWriter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
val fs = FileSystem.get(new Configuration())
val writer = new PrintWriter(fs.create(new Path("result")))
// for 循环输出
writer.close
scala> for((k,v)<-fansCount) println(k + "\t" + v)

参考资料

捧个钱场?