百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Spark源码阅读:DataFrame.collect 作业提交流程思维导图

liuian 2025-04-09 17:52 12 浏览

本文分为两个部分:

  1. 作业提交流程思维导图
  2. 关键函数列表

作业提交流程思维导图

collect后Job的提交流程

点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。

关键函数列表

Dataset.collect

def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

Dataset.withAction

Dataset.collectFromPlan

触发物理计划的执行,其中 plan 的类型是 SparkPlan

private def collectFromPlan(plan: SparkPlan): Array[T] = {
  val fromRow = resolvedEnc.createDeserializer()
  plan.executeCollect().map(fromRow)
}

Spark 有很多action 函数,比如:

  • collect
  • count
  • show

最终都是通过 collectFromPlan 去创建 Job

SparkPlan.executeCollect


executeCollect

这个函数分为三部:

  • getByteArrayRdd函数 将UnsafeRow RDD 转化为 byte array RDD,加速序列化
  • 然后调用了 RDD.collect
  • 解析 collect 结果,并返回

RDD.collect

Resilient Distributed Dataset (RDD), 是一种不可变、支持分区的数据集合。由于支持分区,该数据集支持并行访问。

class RDD是一个基类,它有很多子类:

  • ShuffledRDD:存储shuffle结果数据,parent RDD 是 Java key-value 对
  • ShuffledRowRDD:存储shuffle结果数据,parent RDD 是 InternalRow,SparkSQL使用
  • MapPartitionsRDD:算子会被应用到 parent RDD 的所有分区
  • UnionRDD:存储 union 的结果数据
  • 其他 RDD 子类

collect 方法的主要职能是提交 Spark 作业,该功能代理给了 SparkContext 去支持:

SparkContext.runJob

runJob 方法有很多重载,我们只关心最复杂的一个:

从功能上来说,它实现了

  • 准备 callSite,以便出问题知道是哪一行代码出错了
  • 通过 DAGScheduler.runJob提交作业
  • progressBar: 命令行里 stage的进度条显示
  • doCheckpoint 将 RDD的中间和最后结果缓存下来

从代码上来说,方法声明如下:

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit 

它有两个泛型类型参数:

  • T: ClassTag 输入RDD的类型
  • U: ClassTag 输出数据的类型

参数列表:

  • rdd: RDD[T] 指输入RDD类型,比如 RDD[(Long, Array[Byte])]
  • func: (TaskContext, Iterator[T]) => U。func会被作用到 rdd的每个分区,返回U
  • partitions: Seq[Int]。分区下标列表
  • resultHandler: (Int, U)。这是一个回调函数。处理func执行完返回的数据,第一个参数是分区index,第二个是func的返回值

返回值: Unit 表示没有任何返回值

DAGScheduler.runJob

对于 DAGScheduler 而言,Stage是最小的调度单元。它会

  • 给Job生成以Stage为调度单位的DAG图
  • 追踪RDD和Stage的输出状态,比如哪些已经被物化,并基于这些信息提供一个最优的调度方案
  • 提交Stage,以TaskSet的形式提交给 TasksetManager

DAGScheduler 对Job的调度是围绕
DAGSchedulerEventProcessLoop 展开的。这是一个经典的EventLoop使用场景。runJob 方法的执行流程如下:

  1. 提交任务本质上是向 EventLoop 发送一个 JobSubmitted 事件
  2. 通过一个JobWaiter对象等待结果

在 EventLoop 的另一端,onReceive 接收到 JobSubmitted事件,交给成员函数 handleJobSubmitted 处理该事件。

JobWaiter 内部有一个 Promise 对象,它会不停接收到 taskSucceeded,增加计数,知道成功task的数量等于task的总数量,将promise置为成功。

DAGSchedulerEventProcessLoop.onReceive

onReceive 负责接收各类事件,并分发给特定的 handler 函数处理,具体可以看思维导图或spark代码。

这里我们只看 handleJobSubmitted,它做了五件事情:

  • 创建Stage:递归式地创建,先创建parent stage
  • 注册Stage
  • 创建Job
  • 注册Job
  • 提交Stage

由于 stage 是一个有向无环图,所以创建和执行都遵循 topological order。

DAGScheduler.createResultStage

在 SparkPlan 对象调用 execute 时,会递归地生成 RDD,从而构成了 RDD Lineage Graph,它是一个有向无环图。那么在 RDD Lineage 上如何切分 stage 呢?

RDD依赖分为宽依赖和窄依赖,代码体现为两个类ShuffleDependency和NarrowDependency。在构建 RDD Lineage时,相邻的两个RDD必须有其中一种依赖关系。Spark通过这种依赖关系划分 Stage。根节点的RDD必须分配到 ResultStage里,而之前所有的Stage,不管有多少级依赖,都是 ShuffleMapStage。

DAGScheduler.getShuffleDependenciesAndResourceProfiles

方法中,通过一个栈来记录分配到当前stage中的 RDD(窄依赖中的rdd都会被push到栈里),碰到宽依赖,则加到 shuffleDeps 中。

getShuffleDependenciesAndResourceProfiles

相关推荐

GANs为何引爆机器学习?这篇基于TensorFlow的实例教程为你解惑!

「机器人圈导览」:生成对抗网络无疑是机器学习领域近三年来最火爆的研究领域,相关论文层出不求,各种领域的应用层出不穷。那么,GAN到底如何实践?本文编译自Medium,该文作者以一朵玫瑰花为例,详细阐...

高丽大学等机构联合发布StarGAN:可自定义表情和面部特征

原文来源:arXiv、GitHub作者:YunjeyChoi、MinjeChoi、MunyoungKim、Jung-WooHa、SungKim、JaegulChoo「雷克世界」编译:嗯~...

TensorFlow和PyTorch相继发布最新版,有何变化

原文来源:GitHub「机器人圈」编译:嗯~阿童木呀、多啦A亮Tensorflow主要特征和改进在Tensorflow库中添加封装评估量。所添加的评估量列表如下:1.深度神经网络分类器(DNNCl...

「2022 年」崔庆才 Python3 爬虫教程 - 深度学习识别滑动验证码缺口

上一节我们使用OpenCV识别了图形验证码躯壳欧。这时候就有朋友可能会说了,现在深度学习不是对图像识别很准吗?那深度学习可以用在识别滑动验证码缺口位置吗?当然也是可以的,本节我们就来了解下使用深度...

20K star!搞定 LLM 微调的开源利器

LLM(大语言模型)微调一直都是老大难问题,不仅因为微调需要大量的计算资源,而且微调的方法也很多,要去尝试每种方法的效果,需要安装大量的第三方库和依赖,甚至要接入一些框架,可能在还没开始微调就已经因为...

大模型DeepSeek本地部署后如何进行自定义调整?

1.理解模型架构a)查看深度求索官方文档或提供的源代码文件,了解模型的结构、输入输出格式以及支持的功能。模型是否为预训练权重?如果是,可以在预训练的基础上进行微调(Fine-tuning)。是否需要...

因配置不当,约5000个AI模型与数据集在公网暴露

除了可访问机器学习模型外,暴露的数据还可能包括训练数据集、超参数,甚至是用于构建模型的原始数据。前情回顾·人工智能安全动态向ChatGPT植入恶意“长期记忆”,持续窃取用户输入数据多模态大语言模型的致...

基于pytorch的深度学习人员重识别

基于pytorch的深度学习人员重识别Torchreid是一个库。基于pytorch的深度学习人员重识别。特点:支持多GPU训练支持图像的人员重识别与视频的人员重识别端到端的训练与评估简单的re...

DeepSeek本地部署:轻松训练你的AI模型

引言:为什么选择本地部署?在AI技术飞速发展的今天,越来越多的企业和个人希望将AI技术应用于实际场景中。然而,对于一些对数据隐私和计算资源有特殊需求的用户来说,云端部署可能并不是最佳选择。此时,本地部...

谷歌今天又开源了,这次是Sketch-RNN

前不久,谷歌公布了一项最新技术,可以教机器画画。今天,谷歌开源了代码。在我们研究其代码之前,首先先按要求设置Magenta环境。(https://github.com/tensorflow/magen...

Tensorflow 使用预训练模型训练的完整流程

前面已经介绍了深度学习框架Tensorflow的图像的标注和训练数据的准备工作,本文介绍一下使用预训练模型完成训练并导出训练的模型。1.选择预训练模型1.1下载预训练模型首先需要在Tensorf...

30天大模型调优学习计划(30分钟训练大模型)

30天大模型调优学习计划,结合Unsloth和Lora进行大模型微调,掌握大模型基础知识和调优方法,熟练应用。第1周:基础入门目标:了解大模型基础并熟悉Unsloth等工具的基本使用。Day1:大模...

python爬取喜马拉雅音频,json参数解析

一.抓包分析json,获取加密方式1.抓包获取音频界面f12打开抓包工具,播放一个(非vip)视频,点击“媒体”单击打开可以复制URL,发现就是我们要的音频。复制“CKwRIJEEXn-cABa0Tg...

五、JSONPath使用(Python)(json数据python)

1.安装方法pipinstalljsonpath2.jsonpath与Xpath下面表格是jsonpath语法与Xpath的完整概述和比较。Xpathjsonpath概述/$根节点.@当前节点...

Python网络爬虫的时候json=就是让你少写个json.dumps()

大家好,我是皮皮。一、前言前几天在Python白银交流群【空翼】问了一个Python网络爬虫的问题,提问截图如下:登录请求地址是这个:二、实现过程这里【甯同学】给了一个提示,如下所示:估计很多小伙伴和...