Spark 面试题
1、Apache Spark有哪些常见的稳定版本,Spark1.6.0的数字分别代表什么意思?
常见的大的稳定版本有Spark 1.3,Spark1.6, Spark 2.0 ,Spark1.6.0的数字含义
1)第一个数字:1
major version : 代表大版本更新,一般都会有一些 api 的变化,以及大的优化或是一些结构的改变;
2)第二个数字:6
minor version : 代表小版本更新,一般会新加 api,或者是对当前的 api 就行优化,或者是其他内容的更新,比如说 WEB UI 的更新等等;
3)第三个数字:0
patch version , 代表修复当前小版本存在的一些 bug,基本不会有任何 api 的改变和功能更新;记得有一个大神曾经说过,如果要切换 spark 版本的话,最好选 patch version 非 0 的版本,因为一般类似于 1.2.0, … 1.6.0 这样的版本是属于大更新的,有可能会有一些隐藏的 bug 或是不稳定性存在,所以最好选择 1.2.1, … 1.6.1 这样的版本。更新的,有可能会有一些隐藏的 bug 或是不稳定性存在,所以最好选择 1.2.1, … 1.6.1 这样的版本。
spark在1.6之前使用的是Akka进行通信,1.6及以后是基于netty。
现阶段的Flink是基于Akka+Netty
2、Spark技术栈有哪些组件,每个组件都有什么功能,适合什么应用场景?
Spark 技术栈包含以 下几个核心组件:
- Spark Core:Spark 的基础组件,提供了任务调度、内存管理和错误恢复等功能。它还定义了 RDD(Resilient Distributed Datasets)数据结构,用于在集群上进行分布式计算。
- Spark SQL:用于处理结构化数据的组件,支持使用 SQL 查询数据。它提供了 DataFrame 和 Dataset 两个 API,可以方便地进行数据处理和分析。适合处理大规模的结构化数据。
- Spark Streaming:用于实时数据处理的组件,可以将实时数据流划分为小批次进行处理。它支持各种数据源,如 Kafka、Flume 和 HDFS,并提供了窗口操作和状态管理等功能。适合实时数据分析和流式处理。
- Spark MLlib:用于机器学习的组件,提供了常见的机器学习算法和工具。它支持分类、回归、聚类和推荐等任务,并提供了特征提取、模型评估和模型调优等功能。适合大规模的机器学习任务。
- Spark GraphX:用于图计算的组件,提供了图结构的抽象和常见的图算法。它支持图的构建、遍历和计算,并提供了图分析和图挖掘等功能。适合社交网络分析和图计算任务。
- SparkR:用于在 R 语言中使用 Spark 的组件,提供了在 R 中进行分布式计算和数据处理的能力。它支持使用 R 语法进行数据操作,并提供了与 Spark SQL 和 MLlib 的集成。适合 R 语言用户进行大规模数据处理和分析。
这些组件可以根据具体的应用场景进行组合和使用,例如:
- 如果需要处理大规模的结构化数据,并进行复杂的数据分析和查询操作,可以使用 Spark SQL。
- 如果需要进行实时数据处理和流式计算,可以使用 Spark Streaming。
- 如果需要进行大规模的机器学习任务,可以使用 Spark MLlib。
- 如果需要进行图计算和图分析,可以使用 Spark GraphX。
- 如果需要在 R 语言中进行分布式计算和数据处理,可以使用 SparkR。
总的来说,Spark 技术栈提供了一套强大的工具和组件,可以满足不同场景下的大规模数据处理和分析需求。
3、Spark为什么比mapreduce快?为什么快呢? 快在哪里呢?
Spark计算比MapReduce快的根本原因在于DAG计算模型。一般而言,DAG相比Hadoop的MapReduce在大多数情况下可以减少shuffle次数
具体来说:
- Spark的DAGScheduler相当于一个改进版的MapReduce,如果计算不涉及与其他节点进行数据交换,Spark可以在内存中一次性完成这些操作,也就是中间结果无须落盘,减少了磁盘IO的操作。但是,如果计算过程中涉及数据交换,Spark也是会把shuffle的数据写磁盘的!!!
- 有同学提到,Spark是基于内存的计算,所以快,这也不是主要原因,要对数据做计算,必然得加载到内存,Hadoop也是如此,只不过Spark支持将需要反复用到的数据给Cache到内存中,减少数据加载耗时,所以Spark跑机器学习算法比较在行(需要对数据进行反复迭代)。Spark基于磁盘的计算依然也是比Hadoop快。
- 刚刚提到了Spark的DAGScheduler是个改进版的MapReduce,所以Spark天生适合做批处理的任务。而不是某些同学说的:Hadoop更适合做批处理,Spark更适合做需要反复迭代的计算。
- Hadoop的MapReduce相比Spark真是没啥优 势了。但是Hadoop的HDFS还是业界的大数据存储标准。
4、聊聊:Spark和MR的区别?
Spark vs MapReduce ≠ 内存 vs 磁盘
具体解答:
其实Spark和MapReduce的计算都发生在内存中,区别在于:
- MapReduce通常需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO。
- Spark则不需要将计算的中间结果写入磁盘,这得益于Spark的RDD(弹性分布式数据集,很强大)和DAG(有向无环图),其中DAG记录了job的stage以及在job执行过程中父RDD和子RDD之间的依赖关系。中间结果能够以RDD的形式存放在内存中,且能够从DAG中恢复,大大减少了磁盘IO。
5、Mapreduce和Spark的都是并行计算,那么他们有什么相同和区别
MapReduce和Spark都是用于并行计算的框架。
相同点:
- 并行计算:两者都支持将大规模的数据集划分为多个小任务,并在分布式环境中并行执行这些任务。
- 可扩展性:它们都可以在大规模集群上运行,通过添加更多的计算节点来扩展计算能力。
- 容错性:它们都具备故障恢复机制,能够处理计算节点的故障,并保证计算的正确性。
区别:
- 内存使用:MapReduce将中间数据写入磁盘,而Spark将中间数据存储在内存中,这使得Spark在某些情况下比MapReduce更快,尤其是对于迭代计算和交互式查询等需要多次读写数据的场景。
- 数据处理模型:MapReduce采用了经典的"map"和"reduce"操作模型,而Spark引入了更多的数据处理操作,如过滤、排序、连接等,使得编写数据处理逻辑更加灵活。
- 实时计算支持:Spark提供了实时流处理功能,可以对数据进行实时处理和分析,而MapReduce主要用于离线批处理。
- 编程接口:MapReduce使用Java编程接口,而Spark支持多种编程语言接口,包括Java、Scala、Python和R,使得开发者可以使用自己熟悉的语言进行开发。
总体而言,Spark相对于MapReduce来说更加灵活和高效,尤其适用于需要实时计算和复杂数据处理的场景。但对于一些传统的离线批处理任务,MapReduce仍然是一个可靠的选择。
6、Spark SQL为什么比hive快呢?
Spark SQL 相对于 Hive 具有以下几个方面的优势,使其在性能上更快:
- 内存计算:Spark SQL 使用内存计算作为其核心计算引擎,而 Hive 则是基于磁盘的计算模型。内存计算可以大大提高计算速度,因为内存的读写速度比磁盘快得多。
- 数据存储格式:Spark SQL 支持多种数据存储格式,如 Parquet、ORC 等,这些格式在压缩和列式存储方面具有优势。而 Hive 默认使用的存储格式是文本格式,对于大规模数据的查询和分析来说效率较低。
- 数据分区和索引:Spark SQL 具有更灵活的数据分区和索引机制,可以根据数据的特点进行自定义的分区和索引策略,从而提高查询性能。而 Hive 的分区和索引功能相对较为简单。
- 执行计划优化:Spark SQL 使用 Catalyst 优化器来生成更高效的执行计划,该优化器可以在查询执行之前对查询进行优化和重写。而 Hive 使用的是基于规则的优化器,优化能力相对较弱。
- 并行计算:Spark SQL 可以通过并行计算来加速查询和分析任务。Spark 的分布式计算框架可以将任务划分为多个并行的任务,在多个节点上同时执行,从而提高计算速度。而 Hive 的并行计算能力相对较弱。
需要注意的是,Spark SQL 适用于大规模数据分析和处理,特别是在需要实时计算和迭代计算的场景下表现出色。而 Hive 更适合于批处理和离线计算,特别是在大规模数据仓库中使用较多。
7、Spark为什么快,Spark SQL一定比Hive快吗
Spark为什么快? 主要答案:
- 消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 cache 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了
- 消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中
- JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少
极端反例:
Select month_id, sum(sales) from T group by month_id;
这个查询只有一次 shuffle 操作,此时,也许 Hive HQL 的运行时间也许比 Spark 还快,反正 shuffle 完了都会落一次盘,或者都不落盘
8、var、val、def三个关键字的区别?
- val:常量申明关键字
- var:变量声明关键字,类似于Java中的变量,变量值可更改,但是变量类型不能更改
- def:关键字,用于创建方法
var x = 3 // x是Int类型
x = 4 //
x = "error" // 类型变化,编译器报错'error: type mismatch'
val y = 3
y = 4 //常量值不可更改,报错 'error: reassignment to val'
def fun(name: String) = "Hey! My name is: " + name
fun("Scala") // "Hey! My name is: Scala"
//注意scala中函数式编程一切都是表达式
lazy val x = {
println("computing x")
3
}
val y = {
println("computing y")
10
}
x+x //
y+y // x 没有计算, 打印结果"computing y"
9、伴生类和伴生对象 有何区别?
伴生对象和伴生类:单例对象与类同名时,这个单例对象被称为这个类的伴生对象,而这个类被称之为这个单例对象的伴生类
伴生类和伴生对象要在同一个源文件中定义,伴生对象和伴生类可以互相访问其私有成员,不与伴生类同名的单例对象称之为孤立对象
1. 当在同一个文件中,有 class ScalaPerson 和 object ScalaPerson
2. class ScalaPerson 称为伴生类,将非静态的内容写到该类中
3. object ScalaPerson 称为伴生对象,将静态的内容写入到该对象(类)
10、case class是什么,与case object的区别是什么
case class:样例类,样例类用case关键字申明;样例类是为模式匹配而优化的类;
构造器中的每一个参数都成为val-除非被显式的申明为var;将自动生成toString、equals、hashCode 和 copy 方法11、描述一下你对RDD的理解
- 一组分片(Partition):数据集的基本组成单位,对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,若没有指定,则会采用默认值,即程序所分配到的CPU Core数
- 一个计算每个分区的函数:Spark中RDD的计算是以分片为单位的,每个RDD都会实现compite函数以达到这个目的。cpmpute函数会对迭代器进行复合,不需要保存每次计算的结果
- RDD之间的依赖关系:RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算
- 一个Partitioner:即RDD的分片函数。当前Apark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个时基于范围的RangePatititon。只有对于是key-value 的 RDD,才会有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决定了 parent RDD Shuffle 输出时的分片数量
- 一个列表:存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置
11、描述以下算子的区别与联系:
一、groupByKey、reduceByKey、aggreageByKey
reduceByKey:可以在每个分区移动数据之前将输出数据与一个共用的key
结合,适用于大数据集计算
groupByKey:所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 groupByKey
aggreageByKey:
二、cache、presist
都用于将RDD进行缓存
cache:只有一个默认的缓存级别MEMORY_ONLY
presist:persist有一个 StorageLevel 类型的参数,总共有12种缓存级别
三、repartition、coalesce
都是RDD的分区进行重新划分
repartition:repartition只是coalesce接口中shuffle为true的实现
coalesce:如果shuff为false时,如果传入的参数大于现有的分区 数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的分区数变多的
四、map、flatMap
map:将函数用于RDD中的每个元素,将返回值构成新的RDD
flatMap:将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD
12、简述Spark中的缓存机制与checkpoint机制,说明两者的区别与联系
两者都是做RDD持久化的
一、RDD的缓存机制
RDD通过cache方法或者persist方法可以将前面的计算结果缓存,但并不是立即缓存,而是在接下来调用Action类的算子的时候,该RDD将会被缓存在计算节点的内存中,并供后面使用。它既不是transformation也不是action类的算子。
注意:缓存结束后,不会产生新的RDD
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
使用缓存的条件:(或者说什么时候进行缓存)
- 要求的计算速度快,对效率要求高的时候
- 集群的资源要足够大,能容得下要被缓存的数据
- 被缓存的数据会多次的触发Action(多次调用Action类的算子)
- 先进行过滤,然后将缩小范围后的数据缓存到内存中
在使用完数据之后,要释放缓存,否则会一直在内存中占用资源
二、CheckPoint机制(容错机制)
RDD的缓存容错机制能保证数据丢失也能正常的运行,是因为在每一个RDD中,都存有上一个RDD的信息,当RDD丢失以后,可以追溯到元数据,再进行计算
检查点(本质是通过将RDD写入高可用的地方(例如 hdfs)做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销
设置checkpoint的目录,可以是本地的文件夹、也可以是HDFS。一般是在具有容错能力,高可靠的文件系统上(比如HDFS, S3等)设置一个检查点路径,用于保存检查点数据
在设置检查点之后,该RDD之前的有依赖关系的父RDD都会被销毁,下次调用的时候直接从检查点开始计算。 checkPoint和cache一样,都是通过调用一个Action类的算子才能运行
checkPoint减少运行时间的原因:第一次调用检查点的时候,会产生两个executor,两个 进程分别是从hdfs读文件和计算(调用的Action类的算子),在第二次调用的时候会发现,运行的时间大大减少,是由于第二次调用算子的时候,不会再从hdfs读文件,而读取的是缓存到的数据,同样是从hdfs上读取
13、说说RDD、DataFrame、DataSet三者的区别与联系
一、RDD
RDD叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表了一个弹性的、不可变的、可分区、里面的元素可进行计算的集合
RDD封装了计算逻辑,并不保存数据;RDD是一个抽象类,需要子类具体实现;RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑;可分区、并行计算
二、DataFrame
DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格
三、RDD与DataFrame的区别
两者均为懒执行
- DataFrame
- 带有schema元信息,即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型,便于Spark SQL的操作
- 支持嵌套数据类型(struct、array和Map),从易用性来说,DataFrame提供的是一套高层的关系操作,比函数式的RDD API更加友好
- 因为优化的查询执行计划,导致DataFrame执行效率优于RDD
- RDD
- 无法得到所存数据元素的具体结构,SparkCore只能在Stage层面进行简单、通用的流水线优化
- 操作门槛高
四、DataSet
DataSet是分布式数据集,是DataFrame的一个扩展。提供了RDD的优势(强类型)以及Spark SQL优化执行引擎的优点
DataFrame是DataSet的特例:DataFrame=DataSet[Row]
,
14、介绍Spark核心组件及功能
Spark是一个快速、通用的大数据处理框架,它提供了丰富的核心组件和功能,用于处理和分析大规模数据集。下面是Spark的核心组件及其功能的详细介绍:
- Spark Core:Spark的核心组件,提供了分布式任务调度、内存管理和错误恢复等基本功能。它还定义了RDD(弹性分布式数据集)的概念,RDD是Spark中的基本数据结构,用于表示可并行处理的数据集。
- Spark SQL:用于处理结构化数据的模块,支持SQL查询和DataFrame API。它可以将数据从多种数据源(如Hive、Avro、Parquet等) 加载到Spark中,并提供了强大的查询优化和执行功能。
- Spark Streaming:用于实时数据流处理的模块,支持高吞吐量的数据流处理和复杂事件处理。它可以将实时数据流分成小批次,并在每个批次上应用批处理操作。
- MLlib:Spark的机器学习库,提供了常见的机器学习算法和工具,如分类、回归、聚类、推荐等。它支持分布式训练和推理,并提供了丰富的特征提取、转换和选择功能。
- GraphX:用于图计算的模块,支持图的创建、操作和分析。它提供了一组高级图算法和操作符,可以用于社交网络分析、推荐系统等领域。
- SparkR:用于在R语言中使用Spark的接口。它提供了与Spark Core和Spark SQL的集成,可以在R中使用Spark的分布式计算和数据处理功能。
除了以上核心组件,Spark还提供了一些其他功能和工具,如:
- Spark Streaming可以与Kafka、Flume等数据源集成,实现实时数据流处理。
- Spark可以与Hadoop、Hive、HBase等大数据生态系统中的其他工具和框架无缝集成。
- Spark提供了交互式Shell(spark-shell)和Web界面(Spark UI)等开发和监控工具。
- Spark支持在本地模式、独立模式和集群模式下运行,可以根据需求进行灵活部署和扩展。
总之,Spark的核心组件和功能使其成为处理和分析大规模数据的强大工具,可以满足各种数据处理和机器学习任务的需求。
15、说说Spark Yarn任务提交流程
一、Yarn Client模式
Client模式一般用于测试
- Driver在任务提交的本地机器上运行
- Driver启动后会向ResourceManager通讯申请启动ApplicationMaster
- ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor内存
- ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后ApplicationMaster在资源分配指定的NodeManager上驱动Executor进程
- Executor进程启动后会向Driver反向注册,Executor全部注册完成之后,Driver开始执行Main函数
- 之后执行到Action算子,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行
二、Yarn Cluster模式
- 任务提交后,Client会先和ResourceManager通讯,申请启动ApplicationMaster
- ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver
- Driver启动后会向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后在合适的NodeManager上启动Executor进程
- Executor进程启动后会向Driver反向注册,Executor全部注册完成之后,Driver开始执行Main函数
- 之后执行到Action算子,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行
16、什么是迭代计算,迭代与循环的区别是什么?
迭代计算的基本思想是逐次逼近,先取一个粗糙的近似值,然后用同一个递推公式,反复校正此初值,直至达到预定精度要求为止;也可以说是将输出作为输入,再次进行处理。如KMeans对中心点的计算就是迭代
循环:是最基础的概念,凡是重复执行一段代码,都可以称之为循环
迭代:一定是循环,但循环不一定是迭代
17、MapReduce和Spark多是并行计算,区别是什么
两者都是使用 MapReduce 模型来进行并行计算的。
- Hadoop的一个作业称为job,job里面包含Map task和Reduce task(也有Map-Only的Task),每个task都在自己的进程中运行,当task结束时,进程也会结束
- Spark用户提交的任务称为Application,一个Application对应一个SparkContext。Application中可能存在多个job,一个Action操作产生一个job。这些job串行执行,每个job包含一个或多个Stage,Stage是DAGScheduler 通过RDD之间的依赖关系划分而来的。每个Stage里面有多个Task,这些Task被TaskScheduler 分发到 Executor 中执行,Executor的生命周期与Application是一样的,即使没有job运行也是存在的,Task可以快速启动读取数据进行计算
- Hadoop的job只有map和reduce操作,表达能力比较欠缺;而且job之间会重复的读写HDFS,造成大量的IO操作,多个job需要自己管理它们之间的关系
- Spark API提供了丰富的RDD操作,如join、groupBy等,并通过DAG图实现良好的容错
18、Spark算子可分为那两类,这两类算子的区别是什么,分别距离6个这两类算子,举例6个会产生Shuffle的算子
Spark的算子可以分为两类:Transformation、Action
- Transformation:从现有的数据集创建一个新的数据集,返回一个新的 RDD 操作。Transformation都是惰性的,它们并不会立刻执行,只是记住了这些应用到 RDD 上的转换动作
- Action:触发在 RDD 上的计算,这些计算可以是向应用程序返回结果,也可以是向存储系统保存数据
Transformation 最重要的特点:延迟执行、返回 RDD
Action最重要的特点:触发 Job ,返回的结果一定不是 RDD
- 常见的 Transformation 包括:map、mapVaules、filter、flatMap、mapPartitions、uoin、join、distinct、xxxByKey
- 常见的 Action 包括:count、collect、collectAsMap、first、reduce、fold、aggregate、saveAsTextFile
有Shuffle的 Transformation 包括:
- 一堆的 xxxByKey(sortBykey、groupByKey、reduceByKey、foldByKey、aggrea geByKey、combineByKey)。备注:不包括countByKey
- join相关(join、leftOuterJoin、rightOuterJoin、fullOuterJoin、cogroup)
- distinct、intersection、subtract、partionBy、repartition
19、简述Spark中共享变量(广播变量和累加器)的基本原理和用途
通常情况下,一个传递给 RDD 操作(如map、reduceByKey)的 func 是在远程节点上执行的。函数 func 在多个节点执行过程中使用的变量,是Driver上同一个变量的多个副本。这些变量以副本的方式拷贝到每个task中,并且各task中变量的更新并不会返回 Driver
为了解决以上问题,Spark 提供了两种特定类型的共享变量 : 广播变量 和 累加器。广播变量主要用于高效分发较大的数据对象,累加器主要用于对信息进行聚合
广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少。而且 Spark 使用高效广播算法(BT协议)分发广播变量以降低通信成本
累加器是 Spark 中提供的一种分布式的变量机制,在Driver端进行初始化,task中对变量进行累 加操作
广播变量典型的使用案例是Map Side Join;累加器经典的应用场景是用来在 Spark 应用中记录某些事件/信息的数量
20、说说Spark提交作业参数
提交作业时的重要参数:
- executor_cores 缺省值:1 in YARN mode, all the available cores on the worker in standalone and Mesos coarse-grained modes 生产环境中不宜设置为1!否则 work 进程中线程数过少,一般 2~5 为宜
- executor_memory 缺省值:1g 该参数与executor分配的core有关,分配的core越多 executor_memory 值就应该越大; core与memory的比值一般在 1:2 到 1:4 之间,即每个core可分配2~4G内存。如 executor_cores 为4,那么executor_memory 可以分配 8G ~ 16G; 单个Executor内存大小一般在 20G 左右(经验值),单个JVM内存太高易导致GC代价过高,或资源浪费
- executor_cores * num_executors 表示的是能够并行执行Task的数目。不宜太小或太大!理想情况下,一般给每个core分配 2-3 个task,由此可反推 num_executors 的个数
- driver-memory driver 不做任何计算和存储,只是下发任务与yarn资源管理器和task交互,一般设为 1-2G 即可;
增加每个executor的内存量,增加了内存量以后,对性能的提升,有三点:
- 如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO
- 对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存,会减少磁盘的写入操作,进而提升性能
- task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,性能提升;
21、Spark的宽窄依赖,以及Spark如何划分Stage,如何确定每个Stage中Task个数
RDD之间的依赖关系分为窄依赖(narrow dependency)和宽依赖(Wide Depencency,也称为Shuffle Depencency)
- 窄依赖:指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)
- 宽依赖:是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)
相比于依赖,窄依赖对优化很有利,主要基于以下几点:
- 宽依赖往往对应着Shuffle操作,需要在运行过程中将同一个父RDD的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换
- 当RDD分区丢失时(某个节点故障),Spark会对数据进行重算
- 对于窄依赖,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重算和子RDD分区对应的父RDD分区即可,所以这个重算对数 据的利用率是100%的
- 对于宽依赖,重算的父RDD分区对应多个子RDD分区的,这样实际上父RDD中只有一部分的数据是被用于恢复这个丢失的子RDD分区的,另一部分对应子RDD的其他未丢失分区,这就造成了多余的计算;更一般的,宽依赖中子RDD分区通常来自多个父RDD分区,所有的父RDD分区都要进行重新计算
Stage:根据RDD之间的依赖关系将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task
22、如何理解Spark中Lineage
首先要明确一下 Spark 中 RDD 的容错机制。每一个 RDD 都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系( lineage ),所以只要输入数据是可容错的,那么任意一个 RDD 的分区( Partition )出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。如果一个分区出错了:
- 对于窄依赖,则只要把丢失的父 RDD 分区重算即可,不依赖于其他分区
- 对于宽依赖,则父 RDD 的所有分区都需要重算,代价昂贵
所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点