博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark job提交7
阅读量:4216 次
发布时间:2019-05-26

本文共 9661 字,大约阅读时间需要 32 分钟。

当task在executor上运行时最终会在taskrunner中调用execBackend.statusUpdate来向driver端发送状态更新\spark-master\core\src\main\scala\org\apache\spark\executor\CoarseGrainedExecutorBackend.scala直接调用driverRef.Send函数来发送消息  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {    val msg = StatusUpdate(executorId, taskId, state, data)    driver match {      case Some(driverRef) => driverRef.send(msg)      case None => logWarning(s"Drop $msg because has not yet connected to driver")    }  }根据消息机制,send发送的消息会在receive中处理spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scala    override def receive: PartialFunction[Any, Unit] = {      case StatusUpdate(executorId, taskId, state, data) =>#调用TaskScheduler中的statusupdate方法        scheduler.statusUpdate(taskId, state, data.value)        if (TaskState.isFinished(state)) {          executorDataMap.get(executorId) match {            case Some(executorInfo) =>              executorInfo.freeCores += scheduler.CPUS_PER_TASK              makeOffers(executorId)            case None =>              // Ignoring the update since we don't know about the executor.              logWarning(s"Ignored task status update ($taskId state $state) " +                s"from unknown executor with ID $executorId")          }        }}spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl.scaladef statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {    var failedExecutor: Option[String] = None    var reason: Option[ExecutorLossReason] = None    synchronized {      try {        taskIdToTaskSetManager.get(tid) match {          case Some(taskSet) =>            if (state == TaskState.LOST) {              // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,              // where each executor corresponds to a single task, so mark the executor as failed.              val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(                "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))              if (executorIdToRunningTaskIds.contains(execId)) {                reason = Some(                  SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))                removeExecutor(execId, reason.get)                failedExecutor = Some(execId)              }            }#task执行成功时处理,调用taskResultGatter处理            if (TaskState.isFinished(state)) {              cleanupTaskState(tid)              taskSet.removeRunningTask(tid)              if (state == TaskState.FINISHED) {                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)              }    }spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskResultGetter.scala  def enqueueSuccessfulTask(      taskSetManager: TaskSetManager,      tid: Long,      serializedData: ByteBuffer): Unit = {    getTaskResultExecutor.execute(new Runnable {      override def run(): Unit = Utils.logUncaughtExceptions {        try {#最终计算结果          val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {            case directResult: DirectTaskResult[_] =>              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {                return              }              // deserialize "value" without holding any lock so that it won't block other threads.              // We should call it here, so that when it's called again in              // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.              directResult.value(taskResultSerializer.get())              (directResult, serializedData.limit())#结果保存在worker节点的blockmanager中            case IndirectTaskResult(blockId, size) =>              if (!taskSetManager.canFetchMoreResults(size)) {                // dropped by executor if size is larger than maxResultSize                sparkEnv.blockManager.master.removeBlock(blockId)                return              }              logDebug("Fetching indirect task result for TID %s".format(tid))              scheduler.handleTaskGettingResult(taskSetManager, tid)#从远程worker获得结果              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)              if (!serializedTaskResult.isDefined) {                /* We won't be able to get the task result if the machine that ran the task failed                 * between when the task ended and when we tried to fetch the result, or if the                 * block manager had to flush the result. */                scheduler.handleFailedTask(                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)                return              }#反序列化获取的结果              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](                serializedTaskResult.get.toByteBuffer)              // force deserialization of referenced value              deserializedResult.value(taskResultSerializer.get())              sparkEnv.blockManager.master.removeBlock(blockId)              (deserializedResult, size)          }#处理获取到的结果          scheduler.handleSuccessfulTask(taskSetManager, tid, result)        } catch {         })  }spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl.scala  def handleSuccessfulTask(      taskSetManager: TaskSetManager,      tid: Long,      taskResult: DirectTaskResult[_]): Unit = synchronized {#调用tasksetmanager的方法处理成功的task    taskSetManager.handleSuccessfulTask(tid, taskResult)  }  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {#调用dagscheduler的taskend方法    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)    maybeFinishTaskSet()  }spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala  def taskEnded(      task: Task[_],      reason: TaskEndReason,      result: Any,      accumUpdates: Seq[AccumulatorV2[_, _]],      taskInfo: TaskInfo): Unit = {#通过post方法将CompletionEvent放到事件队列中,会被同一个类中的OnReceive方法处理    eventProcessLoop.post(      CompletionEvent(task, reason, result, accumUpdates, taskInfo))  }  override def onReceive(event: DAGSchedulerEvent): Unit = {    val timerContext = timer.time()    try {#调用doOnReceive继续处理      doOnReceive(event)    } finally {      timerContext.stop()    }  }  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)#处理前面的CompletionEvent时间,可见这里是调用dagScheduler.handleTaskCompletion 来处理    case completion: CompletionEvent =>      dagScheduler.handleTaskCompletion(completion)}private[scheduler] def handleTaskCompletion(event: CompletionEvent) {    event.reason match {      case Success =>        task match {#处理ResultTask          case rt: ResultTask[_, _] =>            // Cast to ResultStage here because it's part of the ResultTask            // TODO Refactor this out to a function that accepts a ResultStage            val resultStage = stage.asInstanceOf[ResultStage]            resultStage.activeJob match {              case Some(job) =>                if (!job.finished(rt.outputId)) {                  job.finished(rt.outputId) = true                  job.numFinished += 1                  // If the whole job has finished, remove it#判断释放所有的job 都已经处理完毕了                  if (job.numFinished == job.numPartitions) {                    markStageAsFinished(resultStage)                    cleanupStateForJobAndIndependentStages(job)                    listenerBus.post(                      SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))                  }                  // taskSucceeded runs some user code that might throw an exception. Make sure                  // we are resilient against that.                  try {#通过jobwaiter,job处理完毕                    job.listener.taskSucceeded(rt.outputId, event.result)                  } catch {                    case e: Exception =>                      // TODO: Perhaps we want to mark the resultStage as failed?                      job.listener.jobFailed(new SparkDriverExecutionException(e))                  }                }              case None =>                logInfo("Ignoring result from " + rt + " because its job has finished")            }#处理shuffleMaptask          case smt: ShuffleMapTask =>            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]            val status = event.result.asInstanceOf[MapStatus]            val execId = status.location.executorId            logDebug("ShuffleMapTask finished on " + execId)            if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) {              // This task was for the currently running attempt of the stage. Since the task              // completed successfully from the perspective of the TaskSetManager, mark it as              // no longer pending (the TaskSetManager may consider the task complete even              // when the output needs to be ignored because the task's epoch is too small below.              // In this case, when pending partitions is empty, there will still be missing              // output locations, which will cause the DAGScheduler to resubmit the stage below.)              shuffleStage.pendingPartitions -= task.partitionId            }            }

 

转载地址:http://jsnmi.baihongyu.com/

你可能感兴趣的文章
java中的mmap实现
查看>>
Redis的Aof被阻塞原因调查
查看>>
Redis Cluster的FailOver失败案例分析
查看>>
Android Alarm驱动源代码分析(Alarm.c)
查看>>
S3C2440上LCD驱动 (FrameBuffer)实例开发讲解
查看>>
Linux音频编程指南
查看>>
usb-otg-调试心得
查看>>
USB规范浏览--设备和主机规范
查看>>
男人的品位--我们自己的最求
查看>>
Android (Linux) Suspend流程
查看>>
LINUX时间管理
查看>>
定时器的使用
查看>>
为Android加入busybox工具
查看>>
使用技巧busybox
查看>>
如何查看与/dev/input目录下的event对应的设备
查看>>
bootloader-bootable解析
查看>>
bootloader (LK)&&android lk bootloader中相关修改指南
查看>>
SD卡驱动分析--基于高通平台
查看>>
SD Card 驱动流程分析
查看>>
Linux之debugfs介绍
查看>>