本文共 9661 字,大约阅读时间需要 32 分钟。
当task在executor上运行时最终会在taskrunner中调用execBackend.statusUpdate来向driver端发送状态更新\spark-master\core\src\main\scala\org\apache\spark\executor\CoarseGrainedExecutorBackend.scala直接调用driverRef.Send函数来发送消息 override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }根据消息机制,send发送的消息会在receive中处理spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scala override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) =>#调用TaskScheduler中的statusupdate方法 scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }}spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl.scaladef statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None synchronized { try { taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, // where each executor corresponds to a single task, so mark the executor as failed. val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) failedExecutor = Some(execId) } }#task执行成功时处理,调用taskResultGatter处理 if (TaskState.isFinished(state)) { cleanupTaskState(tid) taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) { taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } }spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskResultGetter.scala def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer): Unit = { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try {#最终计算结果 val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } // deserialize "value" without holding any lock so that it won't block other threads. // We should call it here, so that when it's called again in // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value. directResult.value(taskResultSerializer.get()) (directResult, serializedData.limit())#结果保存在worker节点的blockmanager中 case IndirectTaskResult(blockId, size) => if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize sparkEnv.blockManager.master.removeBlock(blockId) return } logDebug("Fetching indirect task result for TID %s".format(tid)) scheduler.handleTaskGettingResult(taskSetManager, tid)#从远程worker获得结果 val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { /* We won't be able to get the task result if the machine that ran the task failed * between when the task ended and when we tried to fetch the result, or if the * block manager had to flush the result. */ scheduler.handleFailedTask( taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return }#反序列化获取的结果 val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( serializedTaskResult.get.toByteBuffer) // force deserialization of referenced value deserializedResult.value(taskResultSerializer.get()) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) }#处理获取到的结果 scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { }) }spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl.scala def handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized {#调用tasksetmanager的方法处理成功的task taskSetManager.handleSuccessfulTask(tid, taskResult) } def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {#调用dagscheduler的taskend方法 sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) maybeFinishTaskSet() }spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala def taskEnded( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo): Unit = {#通过post方法将CompletionEvent放到事件队列中,会被同一个类中的OnReceive方法处理 eventProcessLoop.post( CompletionEvent(task, reason, result, accumUpdates, taskInfo)) } override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try {#调用doOnReceive继续处理 doOnReceive(event) } finally { timerContext.stop() } } private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)#处理前面的CompletionEvent时间,可见这里是调用dagScheduler.handleTaskCompletion 来处理 case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion)}private[scheduler] def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match {#处理ResultTask case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage val resultStage = stage.asInstanceOf[ResultStage] resultStage.activeJob match { case Some(job) => if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it#判断释放所有的job 都已经处理完毕了 if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } // taskSucceeded runs some user code that might throw an exception. Make sure // we are resilient against that. try {#通过jobwaiter,job处理完毕 job.listener.taskSucceeded(rt.outputId, event.result) } catch { case e: Exception => // TODO: Perhaps we want to mark the resultStage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") }#处理shuffleMaptask case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) { // This task was for the currently running attempt of the stage. Since the task // completed successfully from the perspective of the TaskSetManager, mark it as // no longer pending (the TaskSetManager may consider the task complete even // when the output needs to be ignored because the task's epoch is too small below. // In this case, when pending partitions is empty, there will still be missing // output locations, which will cause the DAGScheduler to resubmit the stage below.) shuffleStage.pendingPartitions -= task.partitionId } }
转载地址:http://jsnmi.baihongyu.com/