【Flink源码】四、YarnJobClusterEntrypoint

2021/6/18 20:27:29

本文主要是介绍【Flink源码】四、YarnJobClusterEntrypoint,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、YarnJobClusterEntrypoint

进入main方法

SignalHandler.register(LOG);
		JvmShutdownSafeguard.installAsShutdownHook(LOG);

		Map<String, String> env = System.getenv();

		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
		Preconditions.checkArgument(
			workingDirectory != null,
			"Working directory variable (%s) not set",
			ApplicationConstants.Environment.PWD.key());

		try {
			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
		} catch (IOException e) {
			LOG.warn("Could not log YARN environment information.", e);
		}

		final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit(
			args,
			new DynamicParametersConfigurationParserFactory(),
			YarnJobClusterEntrypoint.class);
		final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);

		YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration);
          //执行程序的入口
		ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

  

ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

clusterEntrypoint.startCluster();

securityContext.runSecured((Callable<Void>) () -> {
   runCluster(configuration, pluginManager);

   return null;
});


synchronized (lock) {
  //初始化服务rpc相关
   initializeServices(configuration, pluginManager);

   // write host information into configuration
   configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
   configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

   final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

   //创建ResourceManage,创建、启动Dispatcher,启动ResourceManage  
   clusterComponent = dispatcherResourceManagerComponentFactory.create(
      configuration,
      ioExecutor,
      commonRpcService,
      haServices,
      blobServer,
      heartbeatServices,
      metricRegistry,
      archivedExecutionGraphStore,
      new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
      this);

   clusterComponent.getShutDownFuture().whenComplete(
      (ApplicationStatus applicationStatus, Throwable throwable) -> {
         if (throwable != null) {
            shutDownAsync(
               ApplicationStatus.UNKNOWN,
               ExceptionUtils.stringifyException(throwable),
               false);
         } else {
            // This is the general shutdown path. If a separate more specific shutdown was
            // already triggered, this will do nothing
            shutDownAsync(
               applicationStatus,
               null,
               true);
         }
      });
}

  创建ResourceMange、Dispatcher,并启动

clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				ioExecutor,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
				this); 

具体实现:网页的打开

webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				blobServer,
				executor,
				metricFetcher,
				highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
				fatalErrorHandler);

			log.debug("Starting Dispatcher REST endpoint.");
			webMonitorEndpoint.start();

 resourceManage的启动

resourceManager = resourceManagerFactory.createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				fatalErrorHandler,
				new ClusterInformation(hostname, blobServer.getPort()),
				webMonitorEndpoint.getRestBaseUrl(),
				metricRegistry,
				hostname,
				ioExecutor);

  创建Dispatcher

dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
				highAvailabilityServices.getDispatcherLeaderElectionService(),
				fatalErrorHandler,
				new HaServicesJobGraphStoreFactory(highAvailabilityServices),
				ioExecutor,
				rpcService,
				partialDispatcherServices);

  选举服务:每个组件都有选举服务,最终要调用这个

contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);

  具体实现:

 

 最终执行这个东西:lamda表达式

previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));

  start的实现

 

 

runIfStateIs(
			State.CREATED,
			this::startInternal);

  

final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
			DispatcherId.fromUuid(getLeaderSessionId()),
			Collections.singleton(jobGraph),
			ThrowingJobGraphWriter.INSTANCE);

  最终启动dispatcher,并启动

final Dispatcher dispatcher;
		try {
			dispatcher = dispatcherFactory.createDispatcher(
				rpcService,
				fencingToken,
				recoveredJobs,
				(dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
				PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
		} catch (Exception e) {
			throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
		}

		dispatcher.start();

  最终rpc调用,akka组件通信onStart方法

rpcServer.start();

  

 



这篇关于【Flink源码】四、YarnJobClusterEntrypoint的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程