Spark基础与架构
Spark是一个开源的分布式计算系统,它提供了一个快速、通用的集群计算平台
Spark 是什么?
- 快速:
- Spark的核心是一个分布式内存计算框架,它能够将数据加载到内存中进行处理,这使得它在处理速度上比Hadoop MapReduce快100倍以上。
- 支持迭代计算和交互式数据挖掘,适合机器学习等需要多次迭代计算的场景。
- 易用性:
- Spark提供了丰富的API,支持Scala、Java、Python和R等多种编程语言,使得开发者可以快速上手。
- 它还提供了DataFrame和Dataset API,这些高级抽象使得数据操作更加直观和易于管理。
- 通用性:
- Spark不仅仅是一个MapReduce模型的替代品,它还提供了SQL查询(通过Spark SQL)、机器学习(通过MLlib)、图计算(通过GraphX)和流式数据处理(通过Spark Streaming)等多种计算功能。
- 这使得Spark可以处理不同类型的数据处理和分析任务,而不需要依赖于多个不同的系统。
- 可扩展性:
- Spark可以在多种集群管理器上运行,如Hadoop YARN、Apache Mesos以及Spark自己的Standalone模式。
- 它可以轻松地从单个服务器扩展到数千个节点。
- 容错性:
- Spark提供了RDD(弹性分布式数据集),这是一种可以容错的数据结构,能够在节点失败时重新计算丢失的数据分区。
- 通过血统(Lineage)机制,Spark能够优化计算和存储资源的使用。
- 内存计算:
- Spark的内存计算能力使其在处理需要快速迭代和复杂转换的大数据集时具有显著优势。
- 集成性:
- Spark可以与Hadoop生态系统中的其他组件(如HDFS、HBase)以及Apache Kafka等数据源集成,方便数据的读写。
- 丰富的库支持:
- Spark拥有强大的库支持,包括用于SQL查询的Spark SQL、用于机器学习的MLlib和用于图计算的GraphX。
- 社区支持:
- Spark拥有一个活跃的开源社区,不断有新的功能和改进被加入,同时社区也提供了大量的文档和教程。
- 商业支持:
- 除了开源社区,Spark也有 商业支持,例如Databricks,它提供了一个基于Spark的统一分析平台。
Spark 和 Hadoop 的区别
Spark | Hadoop | |
---|---|---|
计算模型 | 以内存计算为核心,重要计算任务主要在内存中进行,避免了大量的磁盘 I/O 操作,极大地提升了速度。 | 主要采用磁盘计算模式,数据存储和计算过程中需要多次读写磁盘,速度较慢,但适合处理批量性和持久化要求高的任务。 |
执行引擎与框架 | Spark 具备独立的 DAG(有向无环图)任务调度引擎,可以优化任务的执行顺序,灵活调度资源。Spark 兼容 Hadoop 的 HDFS 与 YARN,但本身也有独立的部署模式。 | Hadoop 核心是 HDFS(分布式文件系统)和 MapReduce(分布式计算框架)。MapReduce 采用“Job Tracker + Task Tracker”模式,执行效率相对不如 Spark。 |
编程模型与语言支持 | 提供 RDD(弹性分布式数据集)编程模型,以及 Dataset 和 DataFrame 高层次抽象接口。支持多语言(Scala、Java、Python 和 R)。 | 以 MapReduce 的键值对编程模型为主,多数情况下需要开发者手动处理复杂的任务拆分与合并。Hadoop 原生支持的编程语言是 Java,其他语言需要配合 Hadoop Streaming 实现。 |
容错机制 | 通过RDD的血统(Lineage)机制实现容错 | 通过HDFS的复制机制实现数据的高可用性和容错 |
适用场景 | 适用于机器学习、实时数据处理、交互式查询、高频迭代任务等需要高计算速度的场景 | 适合处理大规模离线的批处理任务,且对 数据持久化要求高,适用于数据存储和归档等场景。 |
Spark 为什么快?
-
Spark之所以快,主要归功于以下几个关键因素:
- 内存计算:
- Spark支持将数据保留在内存中进行处理,这大大减少了与磁盘I/O操作相关的延迟。内存中的数据访问速度比磁盘快得多,从而加快了数据处理速度。
- 惰性计算和智能优化:
- Spark使用惰性计算(Lazy Evaluation),意味着它不会立即执行计算,而是等到需要结果时才进行。这种机制允许Spark优化整个计算图,合并多个操作以减少任务数量和数据传输。
- 高效的执行引擎:
- Spark的执行引擎能够智能地优化执行计划,包括预测任务的执行顺序和数据分区,以减少数据的移动和提高并行处理效率。
- 血统(Lineage)机制:
- 当数据因为节点故障而丢失时,Spark可以通过血统信息重新计算丢失的数据,而不是像Hadoop那样重新启动整个作业,这减少了容错所需的时间。
提示Spark比MapReduce快,主要是因为它以内存计算为核心,优化了数据处理流程,减少了对磁盘I/O的依赖,而MapReduce主要设计为磁盘计算,即使在有内存资源的情况下也倾向于将数据写入磁盘。
- 内存计算:
Spark 是怎么基于内存计算的?
-
RDD(弹性分布式数据集) 内存存储 Spark的核心数据结构RDD支持将数据缓存在内存中,这是内存计算的基础。当对RDD执行缓存(cache())或持久化(persist())操作时,数据会优先存储在内存中,大大减少了重复计算的开销。不同的持久化级别可以根据内存资源和计算需求进行灵活选择,如MEMORY_ONLY、MEMORY_AND_DISK等。
-
内存数据共享 在分布式计算过程中,Spark通过将中间计算结果保存在内存中,避免了频繁的磁盘I/O。多个作业和任务可以共享这些内存中的数据,显著提高了计算效率。特别是对于迭代型算法和机器学习任务,内存共享能带来巨大的性能提升。
-
内存计算调度 Spark的任务调度器能智能地将计算任务分配到最靠近数据的节点上,并尽可能地利用内存进行计算。通过最小化数据移动和最大化内存使用,Spark能够显著降低计算开销。
-
shuffle操作优化 在shuffle过程中,Spark尽可能地将中间结果保存在内存中,而不是写入磁盘。这种设 计大大减少了磁盘I/O,提高了数据处理的整体性能。
-
内存管理与动态分配 Spark提供了灵活的内存管理机制,可以动态地在存储内存和计算内存之间进行资源分配。这种动态调整能力确保了内存资源的高效利用,并能根据实际任务需求进行实时优化。
-
DataFrame和Dataset的内存优化 在高级API如DataFrame和Dataset中,Spark还引入了编码器(Encoder)和内存列存储等技术,进一步提升了内存计算的效率。这些技术能够对数据进行压缩和快速序列化,减少内存占用并加速计算。
-
广播变量 通过广播变量机制,Spark可以将只读的大数据集高效地分发到各个工作节点的内存中,避免了数据的重复传输,降低了网络开销。
Spark 运行模式
运行模式 | 描述 | 适用场景 |
---|---|---|
独立模式 | Spark自带的独立集群管理模式,无需外部依赖。 | 小型集群或开发测试环境 |
YARN模式 | 基于Hadoop YARN的资源管理和调度。 | Hadoop生态中的大规模数据处理场景 |
Mesos模式 | 使用Apache Mesos进行资源管理和任务调度。 | 跨多个集群的复杂资源管理场景 |
Kubernetes模式 | 在Kubernetes环境中运行,支持容器化部署和云原生架构。 | 云原生环境或使用Kubernetes的场景 |
如果你已经使用 Hadoop 集群,YARN模式可能是最常用的生产环境选择。
如果你的组织采用了容器化、微服务架构或者云原生应用,Kubernetes模式也会是一个非常合适的选择。
Spark 生态圈
- Spark Core:
- Spark生态圈的核心组件,提供数据分布式处理的基本功能,如内存计算、任务调度、容错处理等。
- Spark SQL:
- 用于处理结构化数据的组件,提供SQL查询和DataFrame API,方便用户对数据进行查询和处理。
- Spark Streaming:
- 用于处理实时数据流的组件,能够实时接收数据并对其进行处理和分析。
- Spark MLlib:
- Spark生态圈中用于机器学习的组件,提供多种机器学习算法和工具,用于数据挖掘和分析。
- Spark GraphX:
- 用于图处理的组件,可以对大规模图数据进行处理和分析。
- 数据源支持:
- Spark可以与多种数据源集成,包括Hadoop分布式文件系统(HDFS)、Apache Cassandra、Apache HBase、Amazon S3等。
- 资源管理器:
- Spark可以以Mesos、YARN和自身携带的Standalone作为资源管理器调度Job,完成Spark应用程序的计算。
- 其他组件:
- 包括BlinkDB(近似查询引擎)、Tachyon(内存分布式文件系统)等子项目,这些在Spark上层提供了更高层、更丰富的计算范式
Spark 核心组件
Master(Cluster Manager):集群中的管理节点,管理集群资源,通知 Worker 启动 Executor 或 Driver。
Worker :集群中的工作节点,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver
或 Executor。
Driver:执行 Spark 应用中的 main 方法,负责实际代码的执行工作。其主要任务:
-
负责向集群申请资源,向master注册信息
-
Executor启动后向 Driver 反向注册
-
负责作业的解析、生成Stage并调度Task到Executor上
-
监控Task的执行情况,执行完毕后释放资源
-
通知 Master 注销应用程序
Executor:是一个 JVM 进程,负责执行具体的Task。Spark 应用启动时, Executor节点被同时启动,并且始终伴随
着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃, 会将出错节点上的任务调度到其他
Executor 节点上继续运行。Executor 核心功能:
- 负责运行组成 Spark 应用的任务,并将结果返回给 Driver 进程
- 通过自身的 Block Manage 为应 用程序缓存RDD
Master 启动流程
在Spark中,"master"通常指的是集群管理器(Cluster Manager),它负责资源的分配和管理。Spark可以与多种集群管理器一起工作,包括Standalone模式、Apache Mesos、Apache YARN(Yet Another Resource Negotiator)和Kubernetes。以下是Spark Standalone模式下master启动的流程:
-
启动Master节点:在集群的主节点上,运行
sbin/start-master.sh
脚本来启动Spark master。这个脚本会启动一个Master进程,它监听某个端口(默认是7077)。 -
注册Web UI端口:Master节点还会启动一个Web界面(默认端口是8080),用于展示集群状态、正在运行的应用程序和资源使用情况。
-
等待Worker注册:Master节点启动后,会等待接收来自集群中各个工作节点(Worker)的注册请求。
-
启动Worker节点:在集群的工作节点上,运行
sbin/start-worker.sh
脚本来启动Spark worker。这个脚本会启动一个Worker进程,它会连接到Master节点并注册自己。 -
资源注册:每个Worker节点在注册时会报告自己的资源情况(如CPU核心数、内存大小等)给Master节点。
-
调度器初始化:Master节点上的调度器(Scheduler)会根据注册的Worker节点资源情况初始化,准备接受来自Driver的作业提交。
-
作业提交和调度:用户的应用程序(Driver)提交作业后,Master节点上的调度器会根据资源情况和作业需求,将任务分配给不同的Worker节点执行。
-
监控和状态更新:Master节点持续监控Worker节点的状态,并更新Web UI以反映集群的最新状态。
-
异常处理:如果Worker节点失败,Master节点会检测到并重新调度失败的任务到其他可用的Worker节点上。
提示在Mesos或YARN模式下,启动流程会有所不同,因为这些集群管理器有自己的资源管理和调度机制,Spark需要与它们进行交互以获取资源和调度任务。在这些模式下,Spark不会启动自己的Master节点,而是依赖于Mesos或YARN的资源管理器来调度资源和任务。
Worker 启动流程
Spark Worker的启动流程可以概括为以下几个主要步骤:
-
解析参数:Worker启动时首先解析传入的参数,包括配置文件、程序运行参数及环境变量。
-
启动RPC服务端:接着启动Worker的RPC服务端,这是Worker与Master通信的基础。
-
创建工 作目录:Worker会创建一个工作目录,用于存放临时数据和日志。
-
启动外部Shuffle服务:如果配置启用,Worker会启动一个外部Shuffle服务,用于处理Shuffle过程中的数据传输。
-
启动Web UI:Worker启动一个Web UI界面,用于监控Worker状态和资源使用情况。
-
向Master注册:Worker向Master发送注册请求,包括Worker的资源信息、地址等。
-
注册成功并发送心跳:Master接收到Worker的注册信息后,会将Worker信息加入到集群资源管理中,并响应Worker注册成功。Worker收到注册成功的响应后,会定期向Master发送心跳以保持连接。
-
资源监控和调度:Worker会监控自身的资源使用情况,并根据Master的调度指令启动Executor来执行任务。
-
启动指标监控系统:最后,Worker会启动一个指标监控系统,用于度量和报告Worker的各种性能指标。
这些步骤构成了Spark Worker的启动和注册流程,确保Worker能够正常加入到Spark集群中,并准备好执行来自Driver的任务。
Master、Worker都是RpcEndpoint,实现了 RpcEndpoint 接口。主要任务收发、处理消息;
Master、Worker的生命周期都遵循:constructor -> onStart -> receive* -> onStop
在Master的onStart方法中最重要的事情是:执行恢复
在Worker的onStart方法中最重要的事情是:向master注册
结合源码,可以参考 https://blog.csdn.net/u013771019/article/details/106800941
SparkContext 启动流程
Spark 应用程序的第一步就是创建并初始化SparkContext,SparkContext的初始化过程包含了内部组件的创建和准备,主要涉及网络通信、分布式、消息、存储、计算、调度、缓存、度量、清理、文件服务和UI等方面。
SparkContext
是 Spark 程序主要功能的入口点,链接 Spark集群,创建RDD、累加器和广播变量,一个线程只能运行一个SparkContext。
SparkContext
在应用程序中将外部数据转换成RDD,建立了第一个RDD,也就是说SparkContext
建立了RDD血缘关系的根,是DAG的根源。
- 初始设置
- 创建 SparkEnv
- 创建 SparkUI
- Hadoop 相关配置
- Executor 环境变量
- 注册 HeartbeatReceiver 心跳接收器
- 创建 TaskScheduler、SchedulerBackend
- 创建和启动 DAGScheduler
- 启动TaskScheduler、SchedulerBackend
- 启动测量系统 MetricsSystem
- 创建事件日志监听器
- 创建和启动 ExecutorAllocationManager
- ContextCleaner 的创建与启动
- 自定义 SparkListener 与启动事件
- Spark 环境更新
- 投递应用程序启动事件
- 测量系统添加Source
- 将 SparkContext 标记为激活
DAGScheduler
DAGScheduler是Spark架构中的核心组件之一,负责任务的逻辑调度。下面我将从功能、工作原理以及与TaskScheduler和SchedulerBackend的交互三个方面来详细解释DAGScheduler。
功能
DAGScheduler的主要功能是接收Spark应用提交的作业,根据RDD(弹性分布式数据集)的依赖关系划分调度阶段(Stage),并提交这些调度阶段给TaskScheduler进行具体处理。简而言之,DAGScheduler负责将复杂的作业拆分成可以并行执行的较小任务集。
工作原理
- 作业提交与阶段划分:当Spark应用中的某个操作触发计算时(通常是一个行动操作,如
count
、collect
等),会向DAGScheduler提交作业。DAGScheduler从RDD依赖链的最末端出发,遍历整个RDD依赖链,根据RDD之间的依赖关系(特别是是否存在Shuffle依赖)来划分阶段。每个阶段包含一个或多个可以并行执行的任务。 - 阶段提交与依赖管理:划分好阶段后,DAGScheduler会检查每个阶段所依赖的父阶段的结果是否可用。如果所有父阶段的结果都可用,则该阶段可以被提交给TaskScheduler进行调度执行。如果有任何父阶 段的结果不可用,DAGScheduler会等待这些父阶段完成后再提交当前阶段。
- 任务集提交与生命周期管理:每个阶段的提交最终会转换成一个任务集(TaskSet)的提交。DAGScheduler通过TaskScheduler接口提交任务集,TaskScheduler则负责将这些任务分发到集群的Worker节点上的Executor中执行。DAGScheduler还会监控任务的执行状态,并根据任务的生命周期信息来维护作业和阶段的状态。
与TaskScheduler和SchedulerBackend的交互
- 与TaskScheduler的交互:DAGScheduler与TaskScheduler之间有着紧密的交互关系。DAGScheduler负责将划分好的阶段提交给TaskScheduler,TaskScheduler则负责将这些阶段中的任务分发到具体的Executor上执行。同时,TaskScheduler还会将任务的执行状态反馈给DAGScheduler,以便DAGScheduler能够监控作业的执行进度并作出相应的调整。
- 与SchedulerBackend的交互:虽然DAGScheduler不直接与SchedulerBackend交互,但它们都是Spark调度系统的重要组成部分。SchedulerBackend负责应用程序运行期间与底层资源调度系统的交互,为任务的执行提供必要的资源和支持。而DAGScheduler则利用这些资源来优化任务的执行顺序和方式,确保作业能够高效、正确地完成。
总之,DAGScheduler在Spark架构中扮演着至关重要的角色,它负责将复杂的作业拆分成可以并行执行的较小任务集,并与TaskScheduler和SchedulerBackend紧密协作,确保这些任务能够在集群中高效、正确地执行。
Spark的stage如何划分?
在Spark中,stage的划分是基于RDD(弹性分布式数据集)之间的依赖关系进行的,特别是是否存在shuffle依赖。下面我将从划分依据、划分过程以及与DAGScheduler的关系三个方面来详细解释Spark中stage的划分。
划分依据
Spark中stage的划分主要依据是RDD之间的依赖关系,特别是宽依赖(Shuffle Dependency)和窄依赖(Narrow Dependency)。
- 宽依赖:父RDD的一个分区会被子RDD的多个分区使用。这种依赖关系会导致数据的重新分布(即shuffle),因此宽依赖是划分stage的关键点。
- 窄依赖:父RDD的分区最多只会被子RDD的一个分区使用。窄依赖不会导致数据的重新分布,因此可以在同一个stage内处理。
划分过程
Spark中stage的划分过程是从作业的最后一个RDD开始,向前追溯依赖关系链,根据遇到的依赖类型(宽依赖或窄依赖)来划分stage。
- 从最后一个RDD开始:当Spark应用中的某个行动操作(如
collect
、count
等)触发计算时,会形成一个作业(Job)。作业的最后一个RDD是结果RDD,从这个RDD开始向前追溯依赖关系。 - 遇到窄依赖则继续追溯:在追溯过程中,如果遇到窄依赖,则继续将当前RDD的父RDD纳入同一个stage,继续向前追溯。
- 遇到宽依赖则划分stage:如果追溯过程中遇到宽依赖,则停止追溯,并将当前已经追溯到的RDD及其父RDD划分为一个stage。然后,从宽依赖的父RDD开始,继续向前追溯,直到遇到下一个宽依赖或作业的开始。
- 重复上述过程:重复上述过程,直到作业的所有RDD都被划分到相应的stage中。
与DAGScheduler的关系
DAGScheduler是Spark中的作业调度器,它负责将作业划分成多个stage,并管理这些stage的执行。
- 作业划分成stage:DAGScheduler会根据RDD之间的依赖关系(特别是宽依赖)来划分作业中的stage。
- 管理stage的执行:划分好stage后,DAGScheduler会按照stage的依赖关系(即后一个stage依赖于前一个stage的结果)来管理stage的执行。它会根据集群的资源情况和stage的优先级来决定何时执行哪个stage。
- 提交任务给TaskScheduler:当某个stage准备好执行时,DAGScheduler会将其转换成一个或多个任务集(TaskSet),并提交给TaskScheduler进行具体的任务调度和执行。
stage的数量等于作业中宽依赖(Shuffle依赖)的数量加1。这是因为Spark的作业调度器(DAGScheduler)会根据RDD之间的依赖关系将作业划分为多个stage,每个stage代表作业执行的一个阶段。