一:通过Spark-shell运行程序来观察TaskScheduler内幕
1,当我们启动Spark-shell本身的时候命令终端反馈回来的主要是ClientEndpoint和SparkDeploySchedulerBackend,这是因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master且从集群中获得ExecutorBackend计算资源;
2,DAGScheduler划分好Stage后会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSet,TaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(例如重试、慢任务进行推测式执行等)
TaskSet.scala - L9 |
/**
只是一个简单的数据结构。 包含了高层调度器交给底层调度器的包含了哪些成员; val priority: Int, //Pool调度池中规定了Stage的优先级
|
TaskSetManager.scala |
private[spark] class TaskSetManager(
|
|
我们期望最好 是 PROCESS_LOCAL ,最大化利用内存资源 |
二:TaskScheduler与SchedulerBackend
<!--[if !supportLists]-->1, <!--[endif]-->总体的底层任务调度的过程如下:
<!--[if !supportLists]-->a) <!--[endif]-->TaskSchedulerImpl.submitTasks:主要的作用是将TaskSet加入到TaskSetManager中进行管理;
|
if (tasks.size > 0) {
|
|
|
override def submitTasks(taskSet: TaskSet) {
|
private[spark] class TaskSchedulerImpl( def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) //任务默认失败重试次数是 4次 |
<!--[if !supportLists]-->b) <!--[endif]-->SchedulableBuilder.addTaskSetManager:SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中;
<!--[if !supportLists]-->c) <!--[endif]-->CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers,ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候会发送ReviveOffers这个消息作为触发器;
<!--[if !supportLists]-->d) <!--[endif]-->在DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)
<!--[if !supportLists]-->e) <!--[endif]-->TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入是ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组,在其中确定了每个Task具体运行在哪个ExecutorBackend;resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?算法的实现具体如下:
<!--[if !supportLists]-->i. <!--[endif]-->通过Random.shuffle方法重新洗牌所有的计算资源以寻求计算的负载均衡;
<!--[if !supportLists]-->ii. <!--[endif]-->根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组;
<!--[if !supportLists]-->iii. <!--[endif]-->如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的完整的可用计算计算资源;
<!--[if !supportLists]-->iv. <!--[endif]-->通过下述代码最求最高级别的优先级本地性:
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
<!--[if !supportLists]-->v. <!--[endif]-->通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level;
f)通过launchTasks把任务发送给ExecutorBackend去执行;
补充:
<!--[if !supportLists]-->1, <!--[endif]-->Task默认的最大重试次数是4次:
def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
<!--[if !supportLists]-->2, <!--[endif]-->Spark应用程序目前支持两种调度器:FIFO、FAIR,可以通过spark-env.sh中spark.scheduler.mode进行具体的设置,默认情况下是FIFO的方式:
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
val schedulingMode: SchedulingMode = try {
SchedulingMode.withName(schedulingModeConf.toUpperCase)
<!--[if !supportLists]-->3, <!--[endif]-->TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中;
<!--[if !supportLists]-->4, <!--[endif]-->TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上:
private[spark] class TaskDescription(
val taskId: Long,
val attemptNumber: Int,
val executorId: String,
val name: String,
val index: Int, // Index within this task's TaskSet
_serializedTask: ByteBuffer)
extends Serializable {
而确定Task具体运行在哪个ExecutorBackend上的算法是由TaskSetManager的resourceOffer方法决定
<!--[if !supportLists]-->5, <!--[endif]-->数据本地优先级从高到底以此为:优先级高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY,其中NO_PREF是指机器本地性
<!--[if !supportLists]-->6, <!--[endif]-->每个Task默认是采用一个线程进行计算的:
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
<!--[if !supportLists]-->7, <!--[endif]-->DAGScheduler是从数据层面考虑preferedLocation的,而TaskScheduler是从具体计算Task角度考虑计算的本地性;
<!--[if !supportLists]-->8, <!--[endif]-->Task进行广播时候的AkkFrameSize大小是128MB,如果任务大于等于128MB-200K的话则Task会直接被丢弃掉;如果小于128MB-200K的话会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上;
相关推荐
TaskSchedulerTaskScheduler,它决定了task该如何被调度,而在.net framework中有两种系统定义Scheduler,第一个是Task默认的ThreadPoolTaskScheduler,还是一种SynchronizationContextTaskScheduler,以及这两种...
Arduino-TaskScheduler.zip,Arduino、ESPX和STM32微控制器任务调度器的协同多任务处理,Arduino是一家开源软硬件公司和制造商社区。Arduino始于21世纪初,深受电子制造商的欢迎,Arduino通过开源系统提供了很多灵活性...
使用编程方式添加计划任务的.net组件taskscheduler
C#利用Interop.TaskScheduler.dll添加删除计划任务,可实现程序随Windows系统自动启动; 项目用VS2017打开,需要.net 2.0支持,需要管理员权限;支持win7 win10;不支持xp。
Basic class for using the Microsoft Task Scheduler(33KB)
Task scheduler based on algorithms with absolute priorities
您所需要做的就是通过简单的配置定义参数组合,在我们的框架中编写测试代码,然后让GPU Task Scheduler为您并行运行测试。 如果您已经有一个测试代码,请不要担心。 将您的测试代码迁移到我们的框架非常容易。 ...
里面有两个文件夹,TaskScheduler1.0对应的是windows xp/2000/2003系统中的任务计划,TaskScheduler2.0对应的是所有的windows系统中的任务计划。每个文件夹中都有C#源码、库文档说明和dll文件。方便C#开发windows...
Calamus.TaskScheduler 基于Asp.Net Core 5.0采用Quartz.Net编写的开源任务调度Web管理平台 部署步骤 1,创建持久化数据库(以MySQL为例) -创建数据库石英,Charset为utf8mb4 -根据数据库/表/表_mysql_innodb.sql...
任务计划程序使用Spring Boot的TaskScheduler
用粒子群算法实现资源任务的调度,并且能够优化调度
Windows Task Scheduler的原始.NET包装器聚集了多个版本并提供了用于编辑的本地化控件。 快速链接 示例代码,库使用方法,故障排除等。 -类/方法/属性文档和示例 -使用搜索框查看您的问题是否已经回答。 -帮助...
一个简单的项目,可以帮助我创建必须使用Windows Task Scheduler运行的任务的jar文件。 有可能的使用 创建一个可执行的jar文件,然后从Windows Tasks Scheduler运行。 入门 创建一个新的过程 创建一个实现Processor ...
.archMicrosoft.Win32.TaskScheduler.dll
一个简洁、实用、方便的Android异步处理库,已应用到百万日活的线上项目中
Advanced Task Scheduler提供了一整套的计划任务工具,允许多种方式自动运行计划好的任务,如只运行一次、每分钟运行一次、每小时运行一次、每天运行一次、每月运行一次、每年运行一次或者是在打开计算机后的指定...
GM_Scheduler 一个简单的Javascript Task Scheduler,主要编写为在GreaseMonkey脚本上运行,但也可以用于其他情况。 调度程序确保它在给定的时间间隔内仅运行一次任务,而与并发运行的页面数无关。 您可以使用它在想...
Basic class for using the Microsoft Task Scheduler (26KB)
主要介绍了Spring TaskScheduler使用实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下