Spark学习笔记总结

Spring Wu 281 2021-02-03

Spark与MapReduce

Spark 运算比 Hadoop 的 MapReduce 框架快的原因是因为 Hadoop 在一次 MapReduce 运算之后,会将数据的运算结果从内存写入到磁盘中,第二次 Mapredue 运算时在从磁盘中读取数据,所以其瓶颈在2次运算间的多余 IO 消耗. Spark 则是将数据一直缓存在内存中,直到计算得到最后的结果,再将结果写入到磁盘,所以多次运算的情况下, Spark 是比较快的. 其优化了迭代式工作负载。

Spark的Master节点与Worker节点

  • Master

在集群中master节点上运行着master进程以及Driver,master进程将用户提交的job转为可并行执行的任务集Tasks。并将这些Tasks分配给集群上的Worker,Driver进程负责执行用户编写的application。

  • Worker

在集群中,除了master节点负责管理集群的调度以外,还有worker,worker中存在executer进程,每个worker可以存在一个或多个executer进程,每个executer都拥有一个线程池,线程池中的每个线程都负责一个Task的执行。
每个executer可并行执行的Task数量是根据cpu core来的。executer拥有多少cpu core就能同时并行执行多少Task。

  • Master挂了怎么办?

如果master挂了想恢复集群的正常运行,需要依靠zookeeper,zookeeper记录了集群中所有的worker、driver、application。当master挂掉后,zookeeper根据本身的选举算法,在集群中选出一个worker作为新的master。在恢复master的这段时间内,用户无法提交新的job。

  • 某个worker挂了怎么办?

在集群中如果某个正在执行task任务的worker挂掉了,master会重新把该worker负责的task分配给其他worker。在worker挂掉的这段时间内,如果worker长时间(默认是60s)没有上报master心跳。master则会将该worker从集群中移除,并标识DEAD

Spark RDD

RDD(Resilient Distributed Dataset): 一个可并行操作的有容错机制的数据集合,有 2 种方式创建 RDDs:第一种是在你的驱动程序中并行化一个已经存在的集合;另外一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其他 Hadoop 数据格式的数据源。

以上描述在实操中会更好理解。在未理解它之前把它看做是一种数据集合即可。

RDD 特点

  • 它是在集群节点上的不可变的、已分区的集合对象;
  • 通过并行转换的方式来创建(如 Map、 filter、join 等);
  • 失败自动重建;
  • 可以控制存储级别(内存、磁盘等)来进行重用;
  • 必须是可序列化的;
  • 是静态类型的(只读)。

RDD的操作函数主要分为两种Transformation和Action

Transformation: 返回值为RDD,不会马上提交任务去集群执行

Action: 返回值不是RDD,形成DAG图后,将任务提交到集群执行,并返回结果。

使用RDD操作数据

统计某个文件的总字数:

// 设置appname和master,这里设置了local[2]的意思是在本地以2个CPU核心执行这个任务。或者使用:"spark://master_hostname:7077"指定任务到远程的机器执行。
SparkConf sparkConf = new SparkConf().setAppName("firstDemo").setMaster("local[2]");
// 创建SoarkContext对象
JavaSparkContext sc = new JavaSparkContext(sparkConf);

// 读取文件
final JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt");
// 统计字数
Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b)

创建RDD的两种方式:

第一种,使用RDD并行化一个已有的集合:

List<Integer> integers = Arrays.asList(1, 1, 2, 2, 3, 4, 5);
// 把数据并行化为RDD
JavaRDD<Integer> parallelize = sc.parallelize(integers);
List<Integer> collect = parallelize.distinct().collect();

第二种,使用外部数据创建RDD。比如本地磁盘、HDFS、HBase等

// Spark可以根据本地文件系统、HDFS、Hbase等作为数据源,然后进行操作。
// 指定不同的数据源,只需要有对应的uri即可比如hdfs的:"hdfs://"
JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt");
Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b);

Spark的懒加载

spark的懒加载与scala的lazy val是有关系的。scala Lazy vals介绍

我们把一个文件读取出来:

JavaRDD<String> lazyLoad = sc.textFile("/usr/test/test.txt");

按照一般程序的执行流程,执行这种操作spark会立马把数据从磁盘中读取出来放到内存中。但是事实却不是这样的,这里的操作,只是把地址映射了起来,并没有去把它加载到内存中去。

对数据进行map操作,转换为另外一个RDD:

JavaRDD<Integer> lineLengths = lazyLoad.map(String::length)

这里的操作也没有真正去执行,只是定义了把JavaRDD<String>转换为JavaRDD<Integer>的操作。

最后统计出test.txt的字数:

Integer count = lineLengths.reduce((a, b) -> a + b)

该操作是真正的计算,当spark的操作到这步时,才会真正将计算分解在集群中的机器上运行。

将RDD操作保存到内存中,供下次使用:

lineLengths.persist(StorageLevel.MEMORY_ONLY())

Spark 内存分配及缓存机制

spark支持将某次的RDD操作保存到内存中,以便之后其他操作复用该RDD的数据。这样使得之后的操作更快,因为复用的数据不需要重新计算,直接从缓存中取即可。如果在内存分区中,缓存的RDD数据丢失,spark会执行RDD重新计算,并放到缓存中。

当我们在代码中执行了cache/persist等持久化操作时,spark会根据我们设置的缓存级别不同,每个task计算出来的数据会保存到task所在节点的内存或磁盘中。

主要分为三块:

  1. task在执行我们写的代码时占用到的内存,默认占总内存的20%
  2. Task通过shuffle过程拉取了上一个stage的Task的输出后,进行聚合等操作时使用的内存,默认也是占总内存的20%
  3. RDD持久化使用到的内存总共占60%
  • spark RDD一共有以下几种缓存级别
Storage Leveldescription
MEMORY_ONLY默认级别,将RDD操作作为序列化的java对象存储在jvm中,如果内存放不下全部的RDD操作。那么无法缓存的RDD操作在下次需要时再重新计算,而已经缓存的部分就直接使用。该级别只使用内存,不使用磁盘。效率非常高。
MEMORY_AND_DISK将RDD操作作为序列化的java对象存储在jvm中,如果内存放不下全部的RDD操作。那么无法缓存的RDD操作会持久化到磁盘上,并在需要时从磁盘中取出来。该级别需要使用的内存和磁盘。效率中等
MEMORY_ONLY_SER将RDD存储为序列化Java对象(每个分区一个字节的数组),与反序列化对象相比,它更节省空间,特别是当它使用快速序列化器时。但它增加了CPU的开销。在此级别中,存储空间较小,CPU计算时间较长,数据存储在内存中。它不使用磁盘。
MEMORY_AND_DISK_SERMEMORY_ONLY_SER,但它会将不适合内存的分区丢弃到磁盘,而不是每次需要时重新计算。在此存储级别中,用于存储的空间较低,CPU计算时间较长,它使用内存和磁盘存储。
DISK_ONLY在此存储级别中,RDD仅存储在磁盘上。用于存储的空间很小,CPU计算时间很长,并且它利用磁盘存储。