跳到主要内容

Spark基础与架构

Spark是一个开源的分布式计算系统,它提供了一个快速、通用的集群计算平台


Spark 是什么?

  1. 快速
  • Spark的核心是一个分布式内存计算框架,它能够将数据加载到内存中进行处理,这使得它在处理速度上比Hadoop MapReduce快100倍以上。
  • 支持迭代计算和交互式数据挖掘,适合机器学习等需要多次迭代计算的场景。
  1. 易用性
  • Spark提供了丰富的API,支持Scala、Java、Python和R等多种编程语言,使得开发者可以快速上手。
  • 它还提供了DataFrame和Dataset API,这些高级抽象使得数据操作更加直观和易于管理。
  1. 通用性
  • Spark不仅仅是一个MapReduce模型的替代品,它还提供了SQL查询(通过Spark SQL)、机器学习(通过MLlib)、图计算(通过GraphX)和流式数据处理(通过Spark Streaming)等多种计算功能。
  • 这使得Spark可以处理不同类型的数据处理和分析任务,而不需要依赖于多个不同的系统。
  1. 可扩展性
  • Spark可以在多种集群管理器上运行,如Hadoop YARN、Apache Mesos以及Spark自己的Standalone模式。
  • 它可以轻松地从单个服务器扩展到数千个节点。
  1. 容错性
  • Spark提供了RDD(弹性分布式数据集),这是一种可以容错的数据结构,能够在节点失败时重新计算丢失的数据分区。
  • 通过血统(Lineage)机制,Spark能够优化计算和存储资源的使用。
  1. 内存计算
  • Spark的内存计算能力使其在处理需要快速迭代和复杂转换的大数据集时具有显著优势。
  1. 集成性
  • Spark可以与Hadoop生态系统中的其他组件(如HDFS、HBase)以及Apache Kafka等数据源集成,方便数据的读写。
  1. 丰富的库支持
  • Spark拥有强大的库支持,包括用于SQL查询的Spark SQL、用于机器学习的MLlib和用于图计算的GraphX。
  1. 社区支持
  • Spark拥有一个活跃的开源社区,不断有新的功能和改进被加入,同时社区也提供了大量的文档和教程。
  1. 商业支持
  • 除了开源社区,Spark也有商业支持,例如Databricks,它提供了一个基于Spark的统一分析平台。

Spark 和 Hadoop 的区别

SparkHadoop
计算模型以内存计算为核心,重要计算任务主要在内存中进行,避免了大量的磁盘 I/O 操作,极大地提升了速度。主要采用磁盘计算模式,数据存储和计算过程中需要多次读写磁盘,速度较慢,但适合处理批量性和持久化要求高的任务。
执行引擎与框架Spark 具备独立的 DAG(有向无环图)任务调度引擎,可以优化任务的执行顺序,灵活调度资源。Spark 兼容 Hadoop 的 HDFS 与 YARN,但本身也有独立的部署模式。Hadoop 核心是 HDFS(分布式文件系统)和 MapReduce(分布式计算框架)。MapReduce 采用“Job Tracker + Task Tracker”模式,执行效率相对不如 Spark。
编程模型与语言支持提供 RDD(弹性分布式数据集)编程模型,以及 Dataset 和 DataFrame 高层次抽象接口。支持多语言(Scala、Java、Python 和 R)。以 MapReduce 的键值对编程模型为主,多数情况下需要开发者手动处理复杂的任务拆分与合并。Hadoop 原生支持的编程语言是 Java,其他语言需要配合 Hadoop Streaming 实现。
容错机制通过RDD的血统(Lineage)机制实现容错通过HDFS的复制机制实现数据的高可用性和容错
适用场景适用于机器学习、实时数据处理、交互式查询、高频迭代任务等需要高计算速度的场景适合处理大规模离线的批处理任务,且对数据持久化要求高,适用于数据存储和归档等场景。

Spark 为什么快?

  • Spark之所以快,主要归功于以下几个关键因素:

    1. 内存计算
      • Spark支持将数据保留在内存中进行处理,这大大减少了与磁盘I/O操作相关的延迟。内存中的数据访问速度比磁盘快得多,从而加快了数据处理速度。
    2. 惰性计算和智能优化
      • Spark使用惰性计算(Lazy Evaluation),意味着它不会立即执行计算,而是等到需要结果时才进行。这种机制允许Spark优化整个计算图,合并多个操作以减少任务数量和数据传输。
    3. 高效的执行引擎
      • Spark的执行引擎能够智能地优化执行计划,包括预测任务的执行顺序和数据分区,以减少数据的移动和提高并行处理效率。
    4. 血统(Lineage)机制
      • 当数据因为节点故障而丢失时,Spark可以通过血统信息重新计算丢失的数据,而不是像Hadoop那样重新启动整个作业,这减少了容错所需的时间。
    提示

    Spark比MapReduce快,主要是因为它以内存计算为核心,优化了数据处理流程,减少了对磁盘I/O的依赖,而MapReduce主要设计为磁盘计算,即使在有内存资源的情况下也倾向于将数据写入磁盘。

Spark 是怎么基于内存计算的?

  1. RDD(弹性分布式数据集) 内存存储 Spark的核心数据结构RDD支持将数据缓存在内存中,这是内存计算的基础。当对RDD执行缓存(cache())或持久化(persist())操作时,数据会优先存储在内存中,大大减少了重复计算的开销。不同的持久化级别可以根据内存资源和计算需求进行灵活选择,如MEMORY_ONLY、MEMORY_AND_DISK等。

  2. 内存数据共享 在分布式计算过程中,Spark通过将中间计算结果保存在内存中,避免了频繁的磁盘I/O。多个作业和任务可以共享这些内存中的数据,显著提高了计算效率。特别是对于迭代型算法和机器学习任务,内存共享能带来巨大的性能提升。

  3. 内存计算调度 Spark的任务调度器能智能地将计算任务分配到最靠近数据的节点上,并尽可能地利用内存进行计算。通过最小化数据移动和最大化内存使用,Spark能够显著降低计算开销。

  4. shuffle操作优化 在shuffle过程中,Spark尽可能地将中间结果保存在内存中,而不是写入磁盘。这种设计大大减少了磁盘I/O,提高了数据处理的整体性能。

  5. 内存管理与动态分配 Spark提供了灵活的内存管理机制,可以动态地在存储内存和计算内存之间进行资源分配。这种动态调整能力确保了内存资源的高效利用,并能根据实际任务需求进行实时优化。

  6. DataFrame和Dataset的内存优化 在高级API如DataFrame和Dataset中,Spark还引入了编码器(Encoder)和内存列存储等技术,进一步提升了内存计算的效率。这些技术能够对数据进行压缩和快速序列化,减少内存占用并加速计算。

  7. 广播变量 通过广播变量机制,Spark可以将只读的大数据集高效地分发到各个工作节点的内存中,避免了数据的重复传输,降低了网络开销。

Spark 运行模式

运行模式描述适用场景
独立模式Spark自带的独立集群管理模式,无需外部依赖。小型集群或开发测试环境
YARN模式基于Hadoop YARN的资源管理和调度。Hadoop生态中的大规模数据处理场景
Mesos模式使用Apache Mesos进行资源管理和任务调度。跨多个集群的复杂资源管理场景
Kubernetes模式在Kubernetes环境中运行,支持容器化部署和云原生架构。云原生环境或使用Kubernetes的场景
提示

如果你已经使用 Hadoop 集群,YARN模式可能是最常用的生产环境选择。

如果你的组织采用了容器化、微服务架构或者云原生应用,Kubernetes模式也会是一个非常合适的选择。

Spark 生态圈

  1. Spark Core
    • Spark生态圈的核心组件,提供数据分布式处理的基本功能,如内存计算、任务调度、容错处理等。
  2. Spark SQL
    • 用于处理结构化数据的组件,提供SQL查询和DataFrame API,方便用户对数据进行查询和处理。
  3. Spark Streaming
    • 用于处理实时数据流的组件,能够实时接收数据并对其进行处理和分析。
  4. Spark MLlib
    • Spark生态圈中用于机器学习的组件,提供多种机器学习算法和工具,用于数据挖掘和分析。
  5. Spark GraphX
    • 用于图处理的组件,可以对大规模图数据进行处理和分析。
  6. 数据源支持
    • Spark可以与多种数据源集成,包括Hadoop分布式文件系统(HDFS)、Apache Cassandra、Apache HBase、Amazon S3等。
  7. 资源管理器
    • Spark可以以Mesos、YARN和自身携带的Standalone作为资源管理器调度Job,完成Spark应用程序的计算。
  8. 其他组件
    • 包括BlinkDB(近似查询引擎)、Tachyon(内存分布式文件系统)等子项目,这些在Spark上层提供了更高层、更丰富的计算范式

Spark 核心组件

Master(Cluster Manager):集群中的管理节点,管理集群资源,通知 Worker 启动 Executor 或 Driver。

Worker :集群中的工作节点,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver

或 Executor。

Driver:执行 Spark 应用中的 main 方法,负责实际代码的执行工作。其主要任务:

  • 负责向集群申请资源,向master注册信息

  • Executor启动后向 Driver 反向注册

  • 负责作业的解析、生成Stage并调度Task到Executor上

  • 监控Task的执行情况,执行完毕后释放资源

  • 通知 Master 注销应用程序

Executor:是一个 JVM 进程,负责执行具体的Task。Spark 应用启动时, Executor节点被同时启动,并且始终伴随

着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃, 会将出错节点上的任务调度到其他

Executor 节点上继续运行。Executor 核心功能:

  • 负责运行组成 Spark 应用的任务,并将结果返回给 Driver 进程
  • 通过自身的 Block Manage 为应用程序缓存RDD

Master 启动流程

在Spark中,"master"通常指的是集群管理器(Cluster Manager),它负责资源的分配和管理。Spark可以与多种集群管理器一起工作,包括Standalone模式、Apache Mesos、Apache YARN(Yet Another Resource Negotiator)和Kubernetes。以下是Spark Standalone模式下master启动的流程:

  1. 启动Master节点:在集群的主节点上,运行sbin/start-master.sh脚本来启动Spark master。这个脚本会启动一个Master进程,它监听某个端口(默认是7077)。

  2. 注册Web UI端口:Master节点还会启动一个Web界面(默认端口是8080),用于展示集群状态、正在运行的应用程序和资源使用情况。

  3. 等待Worker注册:Master节点启动后,会等待接收来自集群中各个工作节点(Worker)的注册请求。

  4. 启动Worker节点:在集群的工作节点上,运行sbin/start-worker.sh脚本来启动Spark worker。这个脚本会启动一个Worker进程,它会连接到Master节点并注册自己。

  5. 资源注册:每个Worker节点在注册时会报告自己的资源情况(如CPU核心数、内存大小等)给Master节点。

  6. 调度器初始化:Master节点上的调度器(Scheduler)会根据注册的Worker节点资源情况初始化,准备接受来自Driver的作业提交。

  7. 作业提交和调度:用户的应用程序(Driver)提交作业后,Master节点上的调度器会根据资源情况和作业需求,将任务分配给不同的Worker节点执行。

  8. 监控和状态更新:Master节点持续监控Worker节点的状态,并更新Web UI以反映集群的最新状态。

  9. 异常处理:如果Worker节点失败,Master节点会检测到并重新调度失败的任务到其他可用的Worker节点上。

    提示

    在Mesos或YARN模式下,启动流程会有所不同,因为这些集群管理器有自己的资源管理和调度机制,Spark需要与它们进行交互以获取资源和调度任务。在这些模式下,Spark不会启动自己的Master节点,而是依赖于Mesos或YARN的资源管理器来调度资源和任务。

Worker 启动流程

Spark Worker的启动流程可以概括为以下几个主要步骤:

  1. 解析参数:Worker启动时首先解析传入的参数,包括配置文件、程序运行参数及环境变量。

  2. 启动RPC服务端:接着启动Worker的RPC服务端,这是Worker与Master通信的基础。

  3. 创建工作目录:Worker会创建一个工作目录,用于存放临时数据和日志。

  4. 启动外部Shuffle服务:如果配置启用,Worker会启动一个外部Shuffle服务,用于处理Shuffle过程中的数据传输。

  5. 启动Web UI:Worker启动一个Web UI界面,用于监控Worker状态和资源使用情况。

  6. 向Master注册:Worker向Master发送注册请求,包括Worker的资源信息、地址等。

  7. 注册成功并发送心跳:Master接收到Worker的注册信息后,会将Worker信息加入到集群资源管理中,并响应Worker注册成功。Worker收到注册成功的响应后,会定期向Master发送心跳以保持连接。

  8. 资源监控和调度:Worker会监控自身的资源使用情况,并根据Master的调度指令启动Executor来执行任务。

  9. 启动指标监控系统:最后,Worker会启动一个指标监控系统,用于度量和报告Worker的各种性能指标。

这些步骤构成了Spark Worker的启动和注册流程,确保Worker能够正常加入到Spark集群中,并准备好执行来自Driver的任务。

提示

Master、Worker都是RpcEndpoint,实现了 RpcEndpoint 接口。主要任务收发、处理消息;

Master、Worker的生命周期都遵循:constructor -> onStart -> receive* -> onStop

在Master的onStart方法中最重要的事情是:执行恢复

在Worker的onStart方法中最重要的事情是:向master注册

结合源码,可以参考 https://blog.csdn.net/u013771019/article/details/106800941

SparkContext 启动流程

Spark 应用程序的第一步就是创建并初始化SparkContext,SparkContext的初始化过程包含了内部组件的创建和准备,主要涉及网络通信、分布式、消息、存储、计算、调度、缓存、度量、清理、文件服务和UI等方面。

SparkContext 是 Spark 程序主要功能的入口点,链接 Spark集群,创建RDD、累加器和广播变量,一个线程只能运行一个SparkContext。

SparkContext在应用程序中将外部数据转换成RDD,建立了第一个RDD,也就是说SparkContext建立了RDD血缘关系的根,是DAG的根源。

  1. 初始设置
  2. 创建 SparkEnv
  3. 创建 SparkUI
  4. Hadoop 相关配置
  5. Executor 环境变量
  6. 注册 HeartbeatReceiver 心跳接收器
  7. 创建 TaskScheduler、SchedulerBackend
  8. 创建和启动 DAGScheduler
  9. 启动TaskScheduler、SchedulerBackend
  10. 启动测量系统 MetricsSystem
  11. 创建事件日志监听器
  12. 创建和启动 ExecutorAllocationManager
  13. ContextCleaner 的创建与启动
  14. 自定义 SparkListener 与启动事件
  15. Spark 环境更新
  16. 投递应用程序启动事件
  17. 测量系统添加Source
  18. 将 SparkContext 标记为激活

DAGScheduler

DAGScheduler是Spark架构中的核心组件之一,负责任务的逻辑调度。下面我将从功能、工作原理以及与TaskScheduler和SchedulerBackend的交互三个方面来详细解释DAGScheduler。

功能

DAGScheduler的主要功能是接收Spark应用提交的作业,根据RDD(弹性分布式数据集)的依赖关系划分调度阶段(Stage),并提交这些调度阶段给TaskScheduler进行具体处理。简而言之,DAGScheduler负责将复杂的作业拆分成可以并行执行的较小任务集。

工作原理

  1. 作业提交与阶段划分‌:当Spark应用中的某个操作触发计算时(通常是一个行动操作,如countcollect等),会向DAGScheduler提交作业。DAGScheduler从RDD依赖链的最末端出发,遍历整个RDD依赖链,根据RDD之间的依赖关系(特别是是否存在Shuffle依赖)来划分阶段。每个阶段包含一个或多个可以并行执行的任务。
  2. 阶段提交与依赖管理‌:划分好阶段后,DAGScheduler会检查每个阶段所依赖的父阶段的结果是否可用。如果所有父阶段的结果都可用,则该阶段可以被提交给TaskScheduler进行调度执行。如果有任何父阶段的结果不可用,DAGScheduler会等待这些父阶段完成后再提交当前阶段。
  3. 任务集提交与生命周期管理‌:每个阶段的提交最终会转换成一个任务集(TaskSet)的提交。DAGScheduler通过TaskScheduler接口提交任务集,TaskScheduler则负责将这些任务分发到集群的Worker节点上的Executor中执行。DAGScheduler还会监控任务的执行状态,并根据任务的生命周期信息来维护作业和阶段的状态。

与TaskScheduler和SchedulerBackend的交互

  1. 与TaskScheduler的交互‌:DAGScheduler与TaskScheduler之间有着紧密的交互关系。DAGScheduler负责将划分好的阶段提交给TaskScheduler,TaskScheduler则负责将这些阶段中的任务分发到具体的Executor上执行。同时,TaskScheduler还会将任务的执行状态反馈给DAGScheduler,以便DAGScheduler能够监控作业的执行进度并作出相应的调整。
  2. 与SchedulerBackend的交互‌:虽然DAGScheduler不直接与SchedulerBackend交互,但它们都是Spark调度系统的重要组成部分。SchedulerBackend负责应用程序运行期间与底层资源调度系统的交互,为任务的执行提供必要的资源和支持。而DAGScheduler则利用这些资源来优化任务的执行顺序和方式,确保作业能够高效、正确地完成。

总之,DAGScheduler在Spark架构中扮演着至关重要的角色,它负责将复杂的作业拆分成可以并行执行的较小任务集,并与TaskScheduler和SchedulerBackend紧密协作,确保这些任务能够在集群中高效、正确地执行。

Spark的stage如何划分?

在Spark中,stage的划分是基于RDD(弹性分布式数据集)之间的依赖关系进行的,特别是是否存在shuffle依赖。下面我将从划分依据、划分过程以及与DAGScheduler的关系三个方面来详细解释Spark中stage的划分。

划分依据

Spark中stage的划分主要依据是RDD之间的依赖关系,特别是宽依赖(Shuffle Dependency)和窄依赖(Narrow Dependency)。

  • 宽依赖‌:父RDD的一个分区会被子RDD的多个分区使用。这种依赖关系会导致数据的重新分布(即shuffle),因此宽依赖是划分stage的关键点。
  • 窄依赖‌:父RDD的分区最多只会被子RDD的一个分区使用。窄依赖不会导致数据的重新分布,因此可以在同一个stage内处理。

划分过程

Spark中stage的划分过程是从作业的最后一个RDD开始,向前追溯依赖关系链,根据遇到的依赖类型(宽依赖或窄依赖)来划分stage。

  1. 从最后一个RDD开始‌:当Spark应用中的某个行动操作(如collectcount等)触发计算时,会形成一个作业(Job)。作业的最后一个RDD是结果RDD,从这个RDD开始向前追溯依赖关系。
  2. 遇到窄依赖则继续追溯‌:在追溯过程中,如果遇到窄依赖,则继续将当前RDD的父RDD纳入同一个stage,继续向前追溯。
  3. 遇到宽依赖则划分stage‌:如果追溯过程中遇到宽依赖,则停止追溯,并将当前已经追溯到的RDD及其父RDD划分为一个stage。然后,从宽依赖的父RDD开始,继续向前追溯,直到遇到下一个宽依赖或作业的开始。
  4. 重复上述过程‌:重复上述过程,直到作业的所有RDD都被划分到相应的stage中。

与DAGScheduler的关系

DAGScheduler是Spark中的作业调度器,它负责将作业划分成多个stage,并管理这些stage的执行。

  • 作业划分成stage‌:DAGScheduler会根据RDD之间的依赖关系(特别是宽依赖)来划分作业中的stage。
  • 管理stage的执行‌:划分好stage后,DAGScheduler会按照stage的依赖关系(即后一个stage依赖于前一个stage的结果)来管理stage的执行。它会根据集群的资源情况和stage的优先级来决定何时执行哪个stage。
  • 提交任务给TaskScheduler‌:当某个stage准备好执行时,DAGScheduler会将其转换成一个或多个任务集(TaskSet),并提交给TaskScheduler进行具体的任务调度和执行。

stage的数量等于作业中宽依赖(Shuffle依赖)的数量加1。这是因为Spark的作业调度器(DAGScheduler)会根据RDD之间的依赖关系将作业划分为多个stage,每个stage代表作业执行的一个阶段。

RDD的宽依赖和窄依赖

在Spark中,RDD(弹性分布式数据集)之间的依赖关系分为两种类型:宽依赖(Wide Dependency)和窄依赖(Narrow Dependency)。这两种依赖关系对于理解Spark的作业调度和执行至关重要。下面我将详细解释它们的定义、区别以及在Spark作业调度中的作用。

定义

  1. 窄依赖‌:
    • 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用。
    • 形象地说,窄依赖可以比喻为“独生子女”,即一个父RDD的分区只对应一个子RDD的分区。
  2. 宽依赖‌:
    • 宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition。
    • 宽依赖通常涉及Shuffle操作,即数据在父RDD和子RDD之间需要重新分布。

区别

  1. 依赖关系‌:
    • 窄依赖中,父RDD的每个分区只被子RDD的一个分区所使用,关系简单且一对一。
    • 宽依赖中,父RDD的每个分区可能被子RDD的多个分区所使用,关系复杂且可能涉及多对多的数据交换。
  2. 执行方式‌:
    • 窄依赖支持在一个节点上管道化执行,可以高效地利用计算资源。
    • 宽依赖由于涉及Shuffle操作,通常需要在不同的节点之间传输数据,执行效率相对较低。
  3. 容错性‌:
    • 窄依赖中,如果某个分区数据丢失,只需要重新计算该分区的父RDD即可,容错性较高。
    • 宽依赖中,一个节点的故障可能导致来自所有父RDD的分区丢失,因此需要完全重新执行,容错性相对较低。

窄依赖示例:map操作

工作原理‌:

  • 窄依赖是指父RDD的每个分区最多被子RDD的一个分区所使用。
  • 在map操作中,每个输入分区的数据会直接映射到一个输出分区中,每个分区的数据处理是独立的。

示例‌:

val rdd1 = sc.parallelize(Seq(1, 2, 3, 4), 2) // 创建一个包含4个元素、2个分区的RDD
val rdd2 = rdd1.map(_ * 2) // 对rdd1中的每个元素乘以2

在这个例子中,rdd1的每个分区直接传递给rdd2的对应分区,并对每个元素进行乘以2的操作。由于每个分区的处理是独立的,没有跨分区的数据依赖,因此这是一个窄依赖。

宽依赖示例:reduceByKey操作

工作原理‌:

  • 宽依赖是指父RDD的一个分区被子RDD的多个分区所使用。
  • 在reduceByKey操作中,具有相同key的元素需要从多个分区中读取并聚合到一个分区中,这通常涉及shuffle操作。

示例‌:

val rdd1 = sc.parallelize(Seq((1, "a"), (2, "b"), (1, "c")), 2) // 创建一个包含3个元素(键值对)、2个分区的RDD
val rdd2 = rdd1.reduceByKey(_ + _) // 对rdd1中具有相同key的元素进行聚合

在这个例子中,rdd1中的元素需要按照key进行聚合。由于具有相同key的元素可能分布在不同的分区中,因此需要将这些元素重新分组到一个分区中,这涉及shuffle操作。因此,rdd1rdd2之间的依赖关系是宽依赖。

区别

  • 依赖关系‌:窄依赖中,父RDD的每个分区只被子RDD的一个分区所使用;而宽依赖中,父RDD的一个分区可能被子RDD的多个分区所使用。
  • 执行方式‌:窄依赖支持在一个节点上管道化执行,效率高;而宽依赖涉及shuffle操作,需要在不同的节点之间传输数据,执行效率相对较低。
  • 容错性‌:窄依赖中,如果某个分区数据丢失,只需要重新计算该分区的父RDD即可;而宽依赖中,一个节点的故障可能导致来自所有父RDD的分区丢失,因此需要完全重新执行。

在Spark作业调度中的作用

  1. Stage划分‌:
    • Spark的作业调度器(DAGScheduler)会根据RDD之间的依赖关系来划分作业中的Stage。
    • 宽依赖是划分Stage的关键点,因为宽依赖涉及Shuffle操作,需要在不同的Stage中执行。
  2. 任务调度‌:
    • 划分好Stage后,DAGScheduler会按照Stage的依赖关系来管理Stage的执行。
    • 对于每个Stage,DAGScheduler会将其转换成一个或多个任务集(TaskSet),并提交给TaskScheduler进行具体的任务调度和执行。
  3. 资源利用‌:
    • 通过合理划分Stage和任务调度,Spark可以合理规划资源利用,做到尽可能用最少的资源高效地完成任务计算。

TaskScheduler

TaskScheduler是Spark中的核心组件之一,负责具体任务的调度和执行。下面我将从实现原理、工作流程和架构三个方面来详细解释TaskScheduler。

实现原理

  1. 抽象与可插拔性‌:
    • TaskScheduler是一个面向接口的调度器,这使得底层资源调度器具有可插拔性。Spark可以因此运行在多种资源调度器模式上,如Standalone、Yarn、Mesos、local等。
  2. 创建与初始化‌:
    • 在SparkContext实例化时,通过createTaskScheduler方法创建TaskSchedulerImpl和对应的SchedulerBackend(如StandaloneSchedulerBackend)。
    • TaskSchedulerImpl在初始化时会接收SchedulerBackend的实例,并在其start方法中调用backend.start
  3. 核心任务‌:
    • TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。
    • 它为TaskSet创建和维护TaskSetManager,追踪任务的本地性、错误信息,并在遇到Straggle任务时尝试在其他节点重试。
    • 同时,它向DAGScheduler汇报执行情况,包括Shuffle输出丢失时的fetch failed错误等。
  4. 与SchedulerBackend的交互‌:
    • TaskScheduler内部持有SchedulerBackend的实例,通过它与底层资源调度系统交互。
    • 在Standalone模式下,StandaloneSchedulerBackend负责收集Worker上的资源信息,并通过DriverEndpoint与Master注册和通信。

工作流程

  1. 任务提交‌:
    • DAGScheduler将划分的一系列stage按照先后顺序依次提交给底层的TaskScheduler执行。
    • 当stage创建完成后,DAGScheduler调用submitMissingTasks方法,将stage转换成TaskSet,并提交给TaskScheduler。
  2. 任务调度‌:
    • TaskScheduler接收到任务集后,为其创建一个TaskSetManager,并将其添加到调度池中。
    • 调度池可以通过FIFOSchedulableBuilderFairSchedulableBuilder构建,默认为FIFO。
    • TaskScheduler调用backend.reviveOffers()方法,请求资源来运行任务。
  3. 任务执行‌:
    • SchedulerBackend接收到任务后,通过reviveOffers方法分配运行资源,并启动运行节点的Executor。
    • Executor以多线程的方式运行任务,每个线程负责一个任务。
  4. 任务监控与汇报‌:
    • TaskScheduler接收任务运行状态,如果任务运行完毕或失败,则进行相应的处理,如重试或继续分配下一个任务。
    • 最终,当应用程序所有任务运行完毕后,释放相关资源。

架构

  • TaskScheduler与DAGScheduler‌:
    • DAGScheduler负责任务的逻辑调度,将作业拆分成不同阶段的具有依赖关系的任务集。
    • TaskScheduler则负责具体任务的调度执行,接收DAGScheduler提交过来的任务集,并将其分发到集群的Worker节点上执行。
  • TaskScheduler与SchedulerBackend‌:
    • TaskScheduler是高层调度器DAGScheduler与任务执行SchedulerBackend的桥梁。
    • SchedulerBackend负责应用程序运行期间与底层资源调度系统的交互,如资源的分配、Executor的启动等。
  • TaskSchedulerImpl与子类‌:
    • TaskScheduler是一个特质类(trait),定义了任务调度相关的实现方法。
    • TaskSchedulerImpl是其最重要的子类,实现了TaskScheduler所有的接口方法。
    • 在不同的运行模式下,如Yarn模式,会有相应的子类(如YarnSchedulerYarnClusterScheduler)重写其中一两个方法以适应特定的资源调度系统。

SchedulerBackend

SchedulerBackend是Spark架构中的核心组件之一,负责应用程序运行期间与底层资源调度系统的交互。下面我将从功能、工作原理以及与TaskScheduler的交互三个方面来详细解释SchedulerBackend。

功能

SchedulerBackend作为TaskScheduler的后台进程,其主要功能包括:

  1. 与集群管理器交互‌:负责与各种平台的集群管理器(如Yarn、Mesos、Standalone等)进行交互,为应用程序申请相应的资源。
  2. 管理Executor‌:接收集群中为当前应用程序而分配的计算资源(即Executor)的注册,并管理这些Executor。
  3. 发送任务‌:负责将TaskScheduler提交的任务发送到具体的Executor上执行。

工作原理

以Spark Standalone部署方式为例,SchedulerBackend的工作原理如下:

  1. 启动与注册‌:在SparkContext启动时,SchedulerBackend会被初始化并启动。它会构造一个与集群管理器通信的客户端(如StandaloneAppClient),并向Master注册当前程序。
  2. 资源收集与管理‌:SchedulerBackend会收集Worker上的资源信息,当ExecutorBackend启动时,会发送RegisteredExecutor信息向DriverEndpoint注册。这样,SchedulerBackend就掌握了当前应用程序拥有的计算资源。
  3. 任务分配与执行‌:当TaskScheduler提交任务时,SchedulerBackend会接收到任务并通过reviveOffers方法分配运行资源。然后,它会启动运行节点的Executor,并将任务发送到Executor上执行。
  4. 状态监控与汇报‌:SchedulerBackend会监控Executor和任务的执行状态,并将状态信息汇报给TaskScheduler。如果任务执行完毕或出现故障,TaskScheduler会根据这些信息进行相应的处理。

与TaskScheduler的交互

SchedulerBackend与TaskScheduler之间有着紧密的交互关系:

  1. 初始化与启动‌:在SparkContext启动时,TaskScheduler和SchedulerBackend会同时被初始化并启动。TaskScheduler会持有SchedulerBackend的引用,以便在需要时与其进行交互。
  2. 任务提交与执行‌:当DAGScheduler将任务划分并提交给TaskScheduler后,TaskScheduler会将任务集(TaskSet)提交给SchedulerBackend。SchedulerBackend接收到任务后,会分配资源并启动Executor来执行任务。
  3. 状态监控与反馈‌:在执行过程中,SchedulerBackend会监控任务的执行状态,并将状态信息反馈给TaskScheduler。TaskScheduler根据这些信息来调整任务调度策略,确保任务的顺利执行。
  4. 资源释放与回收‌:当应用程序执行完毕后,SchedulerBackend会负责释放和回收相关的资源,包括Executor和计算资源等。

总之,SchedulerBackend在Spark架构中扮演着至关重要的角色,它负责应用程序与底层资源调度系统的交互,为任务的执行提供必要的资源和支持。

Spark 内存模型

主要分为执行内存(Execution Memory)和存储内存(Storage Memory)。下面我将详细解释Spark的内存分配、管理、优化以及内存不足时的处理机制。

内存分配

  1. 堆内内存与堆外内存‌:
    • 堆内内存(On-heap Memory):用于存储Java对象,由JVM管理。
    • 堆外内存(Off-heap Memory):不在JVM内申请,而是直接向操作系统申请内存,可以避免频繁的GC扫描和回收,提升处理性能。
  2. 执行内存与存储内存‌:
    • 执行内存:主要用于存放Shuffle、Join、Sort、Aggregation等计算过程中的临时数据。
    • 存储内存:主要用于缓存和传播内部数据,如RDD的持久化数据。
  3. 内存分配策略‌:
    • 静态内存管理(Static Memory Manager):在Spark 1.6之前使用,存储内存、执行内存和其他内存的大小在Spark应用程序运行期间均为固定。
    • 统一内存管理(Unified Memory Manager):Spark 1.6之后引入,存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。

内存管理

  • 内存管理接口‌:Spark为执行内存和存储内存的管理提供了统一的接口——MemoryManager。
  • 动态调整‌:在统一内存管理下,存储内存和执行内存可以根据需要动态调整,提高内存利用率。

内存优化

  1. 合理使用缓存‌:对于经常需要重复计算的数据,可以使用缓存功能,减少计算开销。
  2. 选择合适的持久化级别‌:根据数据访问频率和内存大小,选择合适的持久化级别,如MEMORY_ONLY、MEMORY_AND_DISK等。
  3. 减少数据序列化‌:尽量使用Kryo序列化器,减少数据序列化开销。
  4. 避免频繁的Shuffle操作‌:合理设计数据分区,减少数据Shuffle过程。

内存不足处理机制

  • Spill to Disk‌:当执行内存不足时,Spark会将一部分数据溢写到磁盘,以释放内存空间。
  • Eviction‌:当存储内存不足时,Spark会根据LRU(Least Recently Used)算法淘汰最近最少使用的数据。
  • 动态调整内存比例‌:在Spark 2.x版本之后,支持动态调整执行内存和存储内存之间的比例,以更好地利用资源。

统一内存管理

​ Spark 2.0 之后引入统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态

​ 占用对方的空闲区域,统一内存管理的堆内内存结构如下图所示:

Spark-Unified-Memory-Management

堆外内存分配较为简单,只有存储内存和执行内存。可用的执行内存和存储内存占用的空间大小直接由参数 spark.memory.storageFraction 决定

spark-Unified-off-heap

在执行过程中:执行内存的优先级 > 存储内存的优先级

凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的

难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量

垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。

Spark 容错机制

Apache Spark的容错机制主要依赖于其核心数据结构——弹性分布式数据集(RDD),以及与之相关的一些策略和技术。以下是Spark容错机制的几个关键点:

  1. RDD的自我恢复特性

    • Spark的RDD具有自我恢复的能力,这意味着在任何失败阶段,RDD本身可以恢复损失。RDD通过冗余数据在自我恢复过程中发挥重要作用,如果发生任何错误或数据丢失,可以通过冗余数据来恢复丢失的数据。
  2. RDD的血统(Lineage)

    • RDD的血统是指创建RDD的确定性操作的历史记录。如果任何分区的RDD由于工作节点故障而丢失,可以使用血统信息重新计算该分区。这种基于血统的容错机制使得Spark在部分计算结果丢失时能够根据Lineage重新恢复计算。
  3. 有向无环图(DAG)

    • Spark将计算转换为一个有向无环图(DAG),这样可以通过重复执行DAG里的一部分任务来完成容错恢复。DAG保持了所有操作的记录,如果任何节点在操作中途崩溃,集群管理器会发现该节点,然后尝试分配另一个节点继续在同一位置处理。
  4. 任务失败重试

    • 当任务失败时,Spark可以自动重新执行该任务,这有助于提高作业的稳定性和可靠性。
  5. Checkpointing

    • 通过定期将RDD持久化到可靠的存储系统中(如HDFS),可以避免因节点故障而导致的数据丢失。这种方法可以增加系统的恢复能力,但也可能会带来一定的性能开销。
  6. 数据冗余

    • 通过在多个节点上存储数据的副本,确保即使某个节点发生故障,数据仍然可用。这有助于提高数据的可用性和可靠性。
  7. 外部持久性

    • 当默认的内存存储级别不满足需求时,可以考虑使用外部存储系统(如HDFS)来持久化RDD。外部持久化可以提供更好的数据可靠性和持久性,但可能会影响查询性能。
  8. 调度层的容错

    • Spark框架层面的容错机制包括调度层、RDD血统层、Checkpoint层。在调度层,如果Stage输出失败,上层调度器DAGScheduler会重试;如果Task内部任务失败,底层调度器会重试。
  9. 血统层的容错

    • 在RDD血统层,Spark处理窄依赖和宽依赖的计算。在窄依赖中,如果子RDD的分区丢失,只需重新计算父RDD的相关分区;而在宽依赖情况下,丢失一个子RDD分区可能需要重新计算父RDD的所有分区,这可能导致冗余计算。

这些机制共同确保了Spark在面对节点故障、数据丢失等异常情况时,能够保持作业的连续性和数据的完整性。

Spark shuffle 原理

Apache Spark中的Shuffle原理涉及到数据的重新分区和处理,主要发生在需要跨多个节点聚合数据的任务中。以下是Spark Shuffle的工作原理:

  1. Shuffle触发

    • Shuffle通常由宽依赖(wide dependency)触发,即一个节点的输出会成为另一个节点的输入,如在reduceByKeygroupByKey操作中。
  2. Map Stage

    • 在Map Stage,每个节点会处理其本地的数据分区,并生成一系列的中间键值对(K-V对)。
  3. Partitioning

    • Spark使用Partitioner来决定如何将键值对分配到不同的分区中。常见的Partitioner包括HashPartitioner和RangePartitioner。
  4. Serialization

    • 为了通过网络传输,中间键值对会被序列化成字节形式。
  5. Shuffle Write Path

    • 每个任务会将序列化后的键值对写入到本地磁盘的临时文件中,同时保留内存中的索引以供快速访问。
  6. Spilling

    • 如果内存中的数据超过了一定阈值,Spark会将部分数据溢写(spill)到磁盘,以避免内存溢出。
  7. Shuffle Read Path

    • 在Reduce Stage,每个任务会从其他节点拉取对应的分区文件,并将这些文件中的键值对反序列化。
  8. Merging

    • 读取过程中,Spark会合并来自不同节点的相同键的数据,以减少磁盘I/O和网络传输。
  9. Aggregation

    • 对于聚合操作(如reduceaggregate),Spark会对相同键的值进行合并。
  10. Disk and Memory Management

    • Spark会根据配置和资源使用情况,动态地在磁盘和内存之间管理数据,以优化性能。
  11. Shuffle Service

    • 在Spark 1.x版本中,Shuffle数据是由JVM进程管理的。从Spark 2.x版本开始,引入了外部Shuffle Service,可以在JVM进程之外管理Shuffle数据,提高了Shuffle的稳定性和性能。
  12. Shuffle Metrics

    • Spark提供了Shuffle操作的详细指标,如Shuffle read/write bytes、records等,可以通过这些指标监控和调优Shuffle性能。
  13. Shuffle Compression

    • 为了减少网络传输的数据量,Spark默认会对Shuffle数据进行压缩。
  14. Data Locality

    • Spark会尽量利用数据本地性,即优先在数据所在的节点上进行计算,以减少网络传输。

Shuffle是Spark中一个复杂的过程,它涉及到数据的序列化、传输、合并等多个步骤。优化Shuffle操作可以显著提高Spark作业的性能,尤其是在处理大规模数据时。

spark3.x的优化

  1. 自适应查询执行(AQE‌):
    • AQE 允许 Spark 在运行时根据实际数据优化查询计划。这包括动态合并shuffle分区、动态调整join策略和优化倾斜的join,从而显著提高查询性能。
  2. 动态分区裁剪‌:
    • 在运行时基于推断信息进一步进行分区裁剪,特别是在星型模型中,可以显著减少需要读取的数据量,提高查询速度。
  3. 改进的广播连接‌:
    • 增强了广播连接功能,使得广播表的选择更加智能。这可以减少不必要的广播操作,并更好地处理大表与小表之间的连接。
  4. Shuffle操作优化‌:
    • 通过减少磁盘IO和网络传输,加速数据交换过程,提高了shuffle操作的效率。
  5. 内存模型优化‌:
    • 对内存模型进行了优化,包括堆内和堆外内存的更好利用,以及storage和execution内存的相互抢占机制,提高了内存使用效率。

Spark 3.0在TPC-DS基准测试中表现出约两倍于Spark 2.4的速度

spark对比mr性能提升的细节

  1. 内存计算‌:
    • Spark采用内存计算模型,将数据存储在内存中进行高速计算,而不是像MapReduce那样将所有计算结果写入磁盘。这种内存计算方式显著减少了磁盘I/O操作,从而提高了计算速度。
  2. DAG(有向无环图)调度‌:
    • Spark将任务分解成多个可并行执行的阶段,并使用DAG调度器来优化任务之间的依赖关系。通过优化任务的执行顺序和并行执行,Spark能够减少数据读取和写入操作,提高整体计算效率。
  3. 更少的IO开销‌:
    • 在MapReduce中,每个Map和Reduce阶段都需要将数据从磁盘读取到内存,再将结果写回磁盘,产生大量的IO开销。而Spark由于数据存储在内存中,避免了频繁的磁盘读写操作,从而降低了IO开销。
  4. 数据共享‌:
    • Spark支持中间结果的内存存储和共享,避免了MapReduce中每个任务都需要从磁盘中读取相同数据的重复操作。这种数据共享机制减少了数据传输和计算的时间,提高了效率。
  5. 丰富的算子操作‌:
    • Spark提供了更为丰富的算子操作,能够构建DAG来减少数据落地和shuffle次数。相比之下,MapReduce通常只支持Map和Reduce两种操作,限制了其处理复杂数据流的能力。
  6. 线程方式运行‌:
    • Spark以线程的方式运行,线程之间共享数据,减少了数据的开销。而MapReduce以进程方式运行,进程之间数据通信开销较大。

Spark中的join操作有哪些

Spark的join类型

  1. Inner Join(内连接)‌:

    • 仅保留两个数据集中键匹配的记录。
    • 示例:df1.join(df2, on='id', how='inner')
  2. Outer Join(外连接)‌:

    • 保留两个数据集中的所有记录,如果匹配不上则用null填充。

    • ‌Left Outer Join(左外连接):保留左边数据集中的所有记录,如果匹配不上则用null填充右边数据集的字段。

      • 示例:df1.join(df2, on='id', how='left')
    • ‌Right Outer Join(右外连接)

      ‌:保留右边数据集中的所有记录,如果匹配不上则用null填充左边数据集的字段。

      • 示例:df1.join(df2, on='id', how='right')
    • ‌Full Outer Join(全外连接)‌:保留两个数据集中的所有记录,如果匹配不上则用null填充未匹配的字段。

      • 示例:df1.join(df2, on='id', how='outer')
  3. Left Semi Join(左半连接)‌:

    • 只返回左边数据集中存在匹配的记录,不返回右边数据集的字段。
    • 示例:df1.join(df2, on='id', how='left_semi')
  4. Left Anti Join(左反连接)‌:

    • 只返回左边数据集中不存在匹配的记录,不返回右边数据集的字段。
    • 示例:df1.join(df2, on='id', how='left_anti')

Spark的join策略

Spark根据数据集的大小和分区情况选择合适的join策略来执行连接操作,包括:

  1. Broadcast Hash Join(广播哈希连接)‌:
    • 当一个数据集可以全部放入内存时,将其广播到其他节点,然后使用哈希连接算法进行连接操作。
    • 适用于小数据集和大数据集之间的连接操作,可以减少数据的传输和网络开销。
  2. ‌Shuffle Hash Join(洗牌哈希连接)‌:
    • 当两个数据集都无法全部放入内存时,将数据按照连接键进行分区,并将相同键的数据分发到相同节点,然后在每个节点上进行哈希连接操作。
    • 适用于大规模数据集的连接操作,但会涉及到数据的重新分区和网络传输。
  3. ‌Sort Merge Join(排序合并连接)‌:
    • 当两个数据集都可以按照连接键进行排序时,先对数据进行排序,然后按照排序顺序进行合并操作。
    • 适用于数据集较大且已经排序的情况下,可以减少数据的传输和网络开销。

在选择join类型和策略时,需要根据数据集的大小、分区情况以及具体的业务需求进行权衡。

可以通过设置Spark的配置参数(如spark.sql.autoBroadcastJoinThresholdspark.sql.shuffle.partitions等)来优化join操作的性能。