spark源码(二)Master recive方法
2022/9/13 1:26:08
本文主要是介绍spark源码(二)Master recive方法,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Master recive全部方法
override def receive: PartialFunction[Any, Unit] = { case ElectedLeader => ...... case CompleteRecovery => ...... case RevokedLeadership => ...... case WorkerDecommissioning => ...... case DecommissionWorkers => ...... case RegisterWorker => ...... case RegisterApplication => ...... case ExecutorStateChanged => ...... case DriverStateChanged => ...... case Heartbeat => ...... case MasterChangeAcknowledged => ...... case WorkerSchedulerStateResponse => ...... case WorkerLatestState => ...... case UnregisterApplication => ...... case CheckForWorkerTimeOut => ...... }
一.ElectedLeader 详解
//下面这个方法,其实就是调用persistenceEngine 的方法的get属性 拿到driver,worker等信息 //persistenceEngine的初始化是在onStart 方法上 val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) //如果driver和worker是空的,当前节点就是主节点,不是的化就是备份节点 state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { RecoveryState.ALIVE } else { RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { //开始主备切换的工作,下面详解 beginRecovery(storedApps, storedDrivers, storedWorkers) //启动定时,定时给自己发送CompleteRecovery消息,CompleteRecovery下个详解 recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CompleteRecovery) } }, workerTimeoutMs, TimeUnit.MILLISECONDS) }
1.1 start方法(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) 详解
def createPersistenceEngine(): PersistenceEngine = { new ZooKeeperPersistenceEngine(conf, serializer)//再次往里走 }
1.2 ZooKeeperPersistenceEngine 详解
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer) extends PersistenceEngine with Logging { private val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/master_status" private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf) SparkCuratorUtil.mkdir(zk, workingDir) //后面的方法其实全部是基于zk对象完成的,那总体来说就是读取了zk中的配置信息,来确定master ....... }
1.3 beginRecovery(storedApps, storedDrivers, storedWorkers) 详解
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], storedWorkers: Seq[WorkerInfo]): Unit = { for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { //把当前的不在addressToApp中的app都添加到现有变量中 //apps,idToApp,endpointToApp,addressToApp,waitingApps 上面有详细的解释 registerApplication(app) app.state = ApplicationState.UNKNOWN app.driver.send(MasterChanged(self, masterWebUiUrl))//后续解析driver的时候再看这个 } catch { case e: Exception => logInfo("App " + app.id + " had exception on reconnect") } } for (driver <- storedDrivers) {//注册所有的driver drivers += driver } for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { //注册所有的worker到变量workers idToWorker addressToWorker里面 //将所有的worker driver 全部移除 registerWorker(worker) worker.state = WorkerState.UNKNOWN worker.endpoint.send(MasterChanged(self, masterWebUiUrl)) } catch { se e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } } }
1.3.1 beginRecovery 变量操作 详解
当前Master 类初始化的时候,其实属性里面都没有值的。 参数需要的(storedApps, storedDrivers, storedWorkers)也是从ZK里面拉取的,所以可以避免重复元素 属性其实有很多都是hashset和hashmap,也可以避免重复 函数里面操作了很多变量还有嵌套,但最后都是调用了 schedule() 方法, 在这就列出来所有操作的变量和RPC请求 操作的变量: registerApplication: applicationMetricsSystem.registerSource(app.appSource) apps += app idToApp(app.id) = app endpointToApp(app.driver) = app addressToApp(appAddress) = app waitingApps += app drivers += driver registerWorker: workers -= w workers += worker idToWorker(worker.id) = worker addressToWorker(workerAddress) = worker removeWorker: idToWorker -= worker.id addressToWorker -= worker.endpoint.address relaunchDriver: drivers.add(newDriver) waitingDrivers += newDriver removeDriver: drivers -= driver completedDrivers += driver driver.state = finalState driver.exception = exception driver.worker.foreach(w => w.removeDriver(driver)) RPC请求: worker.send(MasterChanged()) driver.send(MasterChanged()) driver.send(ExecutorUpdated()) driver.send(WorkerRemoved())
1.3.1.1 schedule() 详解
//就是为一个application分配资源 其实这个方法 被调用了很多次 private def schedule(): Unit = { if (state != RecoveryState.ALIVE) {//当前节点非活跃状态就退出 return } val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size//找到所有还在活跃的worker var curPos = 0 for (driver <- waitingDrivers.toList) {//循环所有worker var launched = false var isClusterIdle = true var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty numWorkersVisited += 1 if (canLaunchDriver(worker, driver.desc)) {/*worker的资源是否满足driver启动条件*/ val allocated = worker.acquireResources(driver.desc.resourceReqs) driver.withResources(allocated) launchDriver(worker, driver)/*启动driver程序 提交任务过多可能需要排队*/ waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } if (!launched && isClusterIdle) { logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.") } } startExecutorsOnWorkers()/*启动executor程序*/ }
1.3.1.1.1 launchDriver(worker, driver) 详解
private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources)) driver.state = DriverState.RUNNING }//可以看到其实没有很多代码,主要是一个worker.send(LaunchDriver())这个方法等worker类的时候详解
1.3.1.1.2 startExecutorsOnWorkers() 详解
private def startExecutorsOnWorkers(): Unit = { for (app <- waitingApps) { val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)//默认是一个cpu if (app.coresLeft >= coresPerExecutor) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canLaunchExecutor(_, app.desc)) .sortBy(_.coresFree).reverse //找最空闲的节点 val appMayHang = waitingApps.length == 1 && waitingApps.head.executors.isEmpty && usableWorkers.isEmpty//这一块代码有点迷 最后一个app才判断嘛?不应该是全部都判断嘛 if (appMayHang) { logWarning(s"App ${app.id} requires more resource than any of Workers could have.") } val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)//将可以执行的worker的进程数返回回来 for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {/*分配CPU资源和启动Executor*/ allocateWorkerResourceToExecutors( app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) } } } }
1.3.1.1.2.1 allocateWorkerResourceToExecutors()详解
private def allocateWorkerResourceToExecutors(app: ApplicationInfo,assignedCores: Int, coresPerExecutor: Option[Int],worker: WorkerInfo): Unit = { val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor) val exec = app.addExecutor(worker, coresToAssign, allocated) launchExecutor(worker, exec)/*启动Executor*/ app.state = ApplicationState.RUNNING } }
1.3.1.1.2.1.1 launchExecutor 详解
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { worker.addExecutor(exec) worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, exec.resources))/*给worker发送启动Executor的程序*/ exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))/*driver发送启动Executor的程序*/ } 其实最后就是两个消息:这一块其实是全部申请新资源,没有杀掉旧资源 worker.send(LaunchExecutor) driver.send(ExecutorAdded)
二.CompleteRecovery 详解
//上一步开始主备切换,但是没有杀掉旧的任务的情况其实在这里面 private def completeRecovery(): Unit = {//这个方法其实就是主备切换完成的方法 if (state != RecoveryState.RECOVERING) { return } state = RecoveryState.COMPLETING_RECOVERY workers.filter(_.state == WorkerState.UNKNOWN).foreach( removeWorker(_, "Not responding for recovery"))//worker去掉所有状态为UNKNOWN的 apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)//app去掉所有状态为UNKNOWN的 apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)//将所有app状态改为RUNNING drivers.filter(_.worker.isEmpty).foreach { d => logWarning(s"Driver ${d.id} was not found after master recovery") if (d.desc.supervise) { logWarning(s"Re-launching ${d.id}") relaunchDriver(d) } else { removeDriver(d.id, DriverState.ERROR, None) logWarning(s"Did not re-launch ${d.id} because it was not supervised") } } state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") }
2.1 removeWorker 详解
private def removeWorker(worker: WorkerInfo, msg: String): Unit = { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) worker.setState(WorkerState.DEAD) idToWorker -= worker.id addressToWorker -= worker.endpoint.address//去除掉所有的旧worker for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host)))//给所有的老的driver发送 ExecutorUpdated exec.state = ExecutorState.LOST exec.application.removeExecutor(exec)//去掉所有的老的任务的Executor 回收资源 } for (driver <- worker.drivers.values) { if (driver.desc.supervise) {//两者的差别的是有没有新建driver logInfo(s"Re-launching ${driver.id}") relaunchDriver(driver) } else { logInfo(s"Not re-launching ${driver.id} because it was not supervised") removeDriver(driver.id, DriverState.ERROR, None) } } logInfo(s"Telling app of lost worker: " + worker.id) apps.filterNot(completedApps.contains(_)).foreach { app => app.driver.send(WorkerRemoved(worker.id, worker.host, msg))//给所有driver发送WorkerRemoved 消息 } persistenceEngine.removeWorker(worker) schedule() }
2.1.1 relaunchDriver 详解
private def relaunchDriver(driver: DriverInfo): Unit = { removeDriver(driver.id, DriverState.RELAUNCHING, None)//移除driver 回收资源 val newDriver = createDriver(driver.desc)//创建新的Driver persistenceEngine.addDriver(newDriver) drivers.add(newDriver) waitingDrivers += newDriver schedule() }
2.1.1.1 removeDriver 详解
private def removeDriver(driverId: String,finalState: DriverState,exception: Option[Exception]): Unit = { drivers.find(d => d.id == driverId) match { case Some(driver) => logInfo(s"Removing driver: $driverId") drivers -= driver if (completedDrivers.size >= retainedDrivers) { val toRemove = math.max(retainedDrivers / 10, 1) completedDrivers.trimStart(toRemove) } completedDrivers += driver persistenceEngine.removeDriver(driver) driver.state = finalState driver.exception = exception driver.worker.foreach(w => w.removeDriver(driver))//移除所有driver 回收driver资源 schedule() case None => logWarning(s"Asked to remove unknown driver: $driverId") } }
这篇关于spark源码(二)Master recive方法的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-01巧用 TiCDC Syncpoint 构建银行实时交易和准实时计算一体化架构
- 2024-05-01银行核心背后的落地工程体系丨Oracle - TiDB 数据迁移详解
- 2024-04-26高性能表格工具VTable总体构成-icode9专业技术文章分享
- 2024-04-16软路由代理问题, tg 无法代理问题-icode9专业技术文章分享
- 2024-04-16程序猿用什么锅-icode9专业技术文章分享
- 2024-04-16自建 NAS 的方案-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数-icode9专业技术文章分享
- 2024-04-14ansible 在远程主机上执行脚本,并传入参数, 加上remote_src: yes 配置-icode9专业技术文章分享
- 2024-04-14ansible 检测远程主机的8080端口,如果关闭,则echo 进程已关闭-icode9专业技术文章分享
- 2024-04-14result 成功怎么写-icode9专业技术文章分享