2025-03-30 02:58:50
在《Shuffle实现框架》一节中对Shuffle框架的组成进行了介绍,本节介绍基于排序的Shuffle框架(SortShuffleManager类)的组成和特性。
在spark2中,ShuffleManager接口的实现只有一种:SortShuffleManager,即基于排序的Shuffle(Sort-based shuffle)。也可以自己实现ShuffleManager接口,然后通过配置参数spark.shuffle.manager来指定使用这个Shuffle Manager。
在基于排序的Shuffle中,输入的数据会根据其目标分区ID进行排序,然后写入单个map输出文件。 Reducers获取此文件的连续区域,以便读取它们map的输出部分。若map输出数据太大而无法放入内存,则会将输出的已排序数据的一部分写到磁盘,并合并那些磁盘上的文件以生成最终输出文件。
基于排序的Shuffle有两种不同的方式来产生map的输出文件:在同时满足以下三个条件时使用:
(1)Shuffle依赖指定没有聚合或输出排序操作
(2)Shuffle序列化程序支持重新定位序列化值(目前这是由KryoSerializer和Spark SQL的自定义序列化程序支持)。
(3)Shuffle产生的输出分区少于16777216。
在序列化排序模式下,输入记录会在它被传递给shuffler writer时被序列化,并在排序期间以序列化形式进行缓冲。 该写入模式实现了几个优化:它的排序操作是序列化的二进制数据而不是Java对象,这样可以减少内存消耗和GC开销。 此优化要求记录序列化程序具有某些属性,以允许重新排序序列化记录,而无需反序列化。 有关详细信息,请参见SPARK-4550,其中首次提出并实施了此优化。
它使用专门的缓存高效排序器([[ShuffleExternalSorter]])来排序压缩记录指针和分区ID的数组。 通过在排序数组中每个记录仅使用8个字节的空间,这样就可以把更多的数组放到内存中。
(3)溢出合并过程对属于同一分区的序列化记录块进行操作,并且在合并期间不需要对记录进行反序列化。
(4)当溢出压缩编解码器支持压缩数据的连接时,溢出合并只是简单地连接序列化和压缩的溢出分区以生成最终输出分区。 这允许使用高效的数据复制方法,如NIO的transferTo,并避免在合并期间分配解压缩或复制缓冲区的需要。
2. 反序列化排序模式
用于处理所有其他情况。
Shuffle Manager负责管理Shuffle相关的组件,通过它可以很方便的获取Shuffle的各个成员对象和元数据。根据实现方式的不同,有多种类型的Shuffle Manager,在早期的Spark版本中实现过hash shuffle manager,后来全部统一成了sort shuffle manager。
总的来说Shuffle Manager主要有以下功能:1)获取一个ShuffleHandle对象(它包含Shuffle的依赖信息),该对象会被传给Task。2)获取一个ShuffleWriter对象。3)获取一个ShuffleReader对象。4)获取一个ShuffleBlockResolver对象。5)清除以上已经注册的信息,删除shuffle过程中产生的中间数据。
从实现层面来说,Spark定义了一个可插拔的接口:ShuffleManager,这样就可以根据需要来实现不同的Shuffle Manager。前面提到过,有多种类型的Shuffle Manager,也就是说有多个类实现了该接口,但为了更好的性能,以前的实现都不使用了,目前的实现类只有:SortShuffleManager。
Shuffle Manager的功能是通过实现ShuffleManager接口函数来实现的,这些接口函数的说明如下表:
初始化:Shuffle Manager的对象在SparkEnv中进行创建。可以通过参数:spark.shuffle.manager来设置启用的Shuffle Manager的名称。该参数的默认值是:sort,也可以设置:tungsten-sort,但对应的实现类都是:org.apache.spark.shuffle.sort.SortShuffleManager。
Shuffle Manager对象的创建在SparkEnv#create函数中,代码如下:Shuffle Manager对象创建完成后,可以通过SparkEnv来获取获取,该对象的引用保存在SparkEnv的shuffleManager变量中,通过shuffleManager可以获取到ShuffleWriter和ShuffleReader对象:
Shuffle Handle的成员:该类主要用来承载Shuffle Manager的相关信息,ShuffleManager可以通过它来把这些信息传递给Task。该类包含以下成员:Shuffle Handle的类型:根据参数:ShuffleDependency和conf(Spark的配置信息)的不同,把Shuffle Handle分成三类,Spark会根据不同类型的Shuffle Handle来选择不同的Shuffle Writer实现类。
Shuffle Writer:Shuffle Writer负责将Map任务的输出,写出到Shuflle系统中。Spark定义了一个ShuffleWriter的抽象类,所有的Shuffle Writer的实现都必须继承该抽象类,并实现write函数。该抽象类的代码如下:目前Spark中有三个类实现了该抽象类,对应三种不同类型的Shuffle Writer,它们是:
BypassMergeSortShuffleWriter简介:这种Shuffle Writer将传入的数据写入单独的文件,每个Reduce分区生成一个文件,再把这些文件进行合并,形成单个输出文件。选择这种Shuffle Writer必须同时满足以下三个条件:1)Map端不需要进行聚合操作 2)没有排序操作 3)分区数小于参数:spark.shuffle.sort.bypassMergeThreshold的值,该参数的默认是:200。
UnsafeShuffleWriter简介:使用这种shuffle writer的条件是:1)序列化工具类支持对象的重定位 2)不需要在map端进行聚合操作 3)分区数小于:PackedRecordPointer.MAXIMUM_PARTITION_ID + 1。
SortShuffleWriter简介:若以上两种shuffle writer都不能选择,则使用该shuffle writer类。这也是相对比较常用的一种shuffle writer。这种Shuffle Writer在写出数据时会使用外部排序对数据进行排序。
Shuffle Reader:Shuffle Reader负责在Reduce任务中读取来自多个map任务的合并数据。从实现层面来讲,Spark定义了一个接口:ShuffleReader,所有Shuffle Reader都必须实现该接口。在该接口中只定义了一个read函数,代码如下:目前,该接口的实现类只有一个:BlockStoreShuffleReader。该类的详细实现会在分析ShuffleReader的实现时详细分析。
BlockStoreShuffleReader简介:该类实现了ShuffleReader接口,其实就只是实现了一个read函数,该函数返回一个迭代器。该类的主要功能是:在一个Shuffle过程中,向其他节点的块存储系统(block store)请求,并获取范围在[startPartition,endPartition)的分区数据。
ShuffleBlockResolver:该接口的实现类通过逻辑的shuffle块标识(比如:map,reduce,shuffle的id),来获取shuffle的数据块。实现类可以通过文件或文件段来封装shuffle数据。这样,当获取shuffle数据时,BlockStore就可以使用它来抽象不同的shufle实现。该接口的实现类为:IndexShuffleBlockResolver。
小结:本文讲述了spark2的shuffle框架的实现概要分析。主要分析了spark2中的shuffle框架的实现接口和相关实现类的大概实现逻辑。接下来的文章会对shuffle框架的三个部分的详细实现进行分析。