Flink源码漫游指南<伍>ClusterEntrypoint与集群的启动

2022/3/18 20:29:04

本文主要是介绍Flink源码漫游指南<伍>ClusterEntrypoint与集群的启动,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

        当用户用Session cli命令启动集群时,首先会在Flink集群启动脚本中调用ClusterEntrypoint抽象类中提供的main()方法,以启动和运行相应类型的集群环境。

也就是说,ClusterEntrypoint是整个集群的入口类,且带有main()方法。在运行时管理中,所有的服务都是通过CE类进行触发和启动,进而完成核心组件的创建和初始化。

我们先通过下图看一下CE抽象类的继承关系

可以看到ClusterEntrypoint分为两类

  • SessionClusterEntrypoint
    • 只建立一个集群,能够同时运行多个作业,这样资源利用率更高,但是如果集群挂掉, 会影响很多作业。
  • JobClusterEntrypoint
    • 又叫Per-job模式,为每个job单独创建一个集群,这样如果集群挂掉也只影响一个任务。

standalone对应的本地模式,mesos、yarn集群模式的不同调度器。

我们再从StandaloneSessionClusterEntrypoint中的main()方法开始,看看ClusterEntrypoint如何启动集群

public static void main(String[] args) {
		// startup checks and logging 启动配置检查和日志加载
		EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
		SignalHandler.register(LOG);
		JvmShutdownSafeguard.installAsShutdownHook(LOG);

		EntrypointClusterConfiguration entrypointClusterConfiguration = null;
		final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());

		try {
			entrypointClusterConfiguration = commandLineParser.parse(args);
		} catch (FlinkParseException e) {
			LOG.error("Could not parse command line arguments {}.", args, e);
			commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
			System.exit(1);
		}

		Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);

        //经过上面一系列的配置之后,通过调用CE抽象类的runClusterEntrypoint启动
		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
	}

通过最后一行代码我们可以发现,经过一系列的配置和日志加载,最后调用了ClusterEntrypoint里的runClusterEntrypoint方法。我们再来看看这个方法干了什么。

	public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

		final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
		try {
			clusterEntrypoint.startCluster();//⭐通过这一行启动集群
		} catch (ClusterEntrypointException e) {
			LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
			System.exit(STARTUP_FAILURE_RETURN_CODE);
		}

		clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
			final int returnCode;

			if (throwable != null) {
				returnCode = RUNTIME_FAILURE_RETURN_CODE;
			} else {
				returnCode = applicationStatus.processExitCode();
			}

			LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
			System.exit(returnCode);
		});
	}

上述代码中带⭐的代码又调用的CE.startCluster()继续启动,然后等运行结束,用clusterEntrypoint.getTerminationFuture().whenComplete()获取运行结束状态并进行对应的处理。

我们再看看startCluster()干了什么

	public void startCluster() throws ClusterEntrypointException {
		LOG.info("Starting {}.", getClass().getSimpleName());

		try {
			configureFileSystems(configuration);//配置文件系统

			SecurityContext securityContext = installSecurityContext(configuration);

			securityContext.runSecured((Callable<Void>) () -> {
				runCluster(configuration);//⭐在securityContext安全环境里继续启动

				return null;
			});
		} catch (Throwable t) {
			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);

			try {
				// clean up any partial state
				shutDownAsync(
					ApplicationStatus.FAILED,
					ExceptionUtils.stringifyException(strippedThrowable),
					false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
			} catch (InterruptedException | ExecutionException | TimeoutException e) {
				strippedThrowable.addSuppressed(e);
			}

			throw new ClusterEntrypointException(
				String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
				strippedThrowable);
		}
	}

注意⭐号的代码,这里是SecurityContext在继续runCluster,而不是ClusterEntrypoint在做,继续看runCluster

	private void runCluster(Configuration configuration) throws Exception {
		synchronized (lock) {
			//⭐初始化运行时集群需要创建的基础组件服务,如HAServices、CommonRPCService等。
			initializeServices(configuration);

			// write host information into configuration 把host信息写入配置
			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

			final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

			//⭐创建集群组件clusterComponent
			//⭐其中包含了resourceManager、dispatcher、webMonitorEndpoint
			clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new AkkaQueryServiceRetriever(
					metricQueryServiceActorSystem,
					Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
				this);

			clusterComponent.getShutDownFuture().whenComplete(
				(ApplicationStatus applicationStatus, Throwable throwable) -> {
					if (throwable != null) {
						shutDownAsync(
							ApplicationStatus.UNKNOWN,
							ExceptionUtils.stringifyException(throwable),
							false);
					} else {
						// This is the general shutdown path. If a separate more specific shutdown was
						// already triggered, this will do nothing
						shutDownAsync(
							applicationStatus,
							null,
							true);
					}
				});
		}
	}

这一步启动了多种服务和组件,并通过dispatcherResourceManagerComponentFactory调用create来启动,继续看

	@Override
	public DispatcherResourceManagerComponent<T> create(
			Configuration configuration,
			RpcService rpcService,
			HighAvailabilityServices highAvailabilityServices,
			BlobServer blobServer,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry,
			ArchivedExecutionGraphStore archivedExecutionGraphStore,
			MetricQueryServiceRetriever metricQueryServiceRetriever,
			FatalErrorHandler fatalErrorHandler) throws Exception {

		LeaderRetrievalService dispatcherLeaderRetrievalService = null;
		LeaderRetrievalService resourceManagerRetrievalService = null;
		WebMonitorEndpoint<U> webMonitorEndpoint = null;
		ResourceManager<?> resourceManager = null;
		JobManagerMetricGroup jobManagerMetricGroup = null;
		T dispatcher = null;

		try {
			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();

			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();

			final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
				rpcService,
				DispatcherGateway.class,
				DispatcherId::fromUuid,
				10,
				Time.milliseconds(50L));

			final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
				rpcService,
				ResourceManagerGateway.class,
				ResourceManagerId::fromUuid,
				10,
				Time.milliseconds(50L));

			final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
				configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
				configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
				"DispatcherRestEndpoint");

			final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
			final MetricFetcher metricFetcher = updateInterval == 0
				? VoidMetricFetcher.INSTANCE
				: MetricFetcherImpl.fromConfiguration(
					configuration,
					metricQueryServiceRetriever,
					dispatcherGatewayRetriever,
					executor);

			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				blobServer,
				executor,
				metricFetcher,
				highAvailabilityServices.getWebMonitorLeaderElectionService(),
				fatalErrorHandler);//⭐创建webMonitorEndpoint

			log.debug("Starting Dispatcher REST endpoint.");
			webMonitorEndpoint.start();//⭐启动webMonitorEndpoint

			final String hostname = getHostname(rpcService);

			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
				metricRegistry,
				hostname,
				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));

			resourceManager = resourceManagerFactory.createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				metricRegistry,
				fatalErrorHandler,
				new ClusterInformation(hostname, blobServer.getPort()),
				webMonitorEndpoint.getRestBaseUrl(),
				jobManagerMetricGroup); //⭐创建ResourceManager

			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);

			dispatcher = dispatcherFactory.createDispatcher(
				configuration,
				rpcService,
				highAvailabilityServices,
				resourceManagerGatewayRetriever,
				blobServer,
				heartbeatServices,
				jobManagerMetricGroup,
				metricRegistry.getMetricQueryServicePath(),
				archivedExecutionGraphStore,
				fatalErrorHandler,
				historyServerArchivist);//⭐创建dispatcher

			log.debug("Starting ResourceManager.");
			resourceManager.start();//⭐启动ResourceManager
			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);

			log.debug("Starting Dispatcher.");
			dispatcher.start();//⭐启动dispatcher
			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

			return createDispatcherResourceManagerComponent(
				dispatcher,
				resourceManager,
				dispatcherLeaderRetrievalService,
				resourceManagerRetrievalService,
				webMonitorEndpoint,
				jobManagerMetricGroup);

		} catch (Exception exception) {
			// clean up all started components
			if (dispatcherLeaderRetrievalService != null) {
				try {
					dispatcherLeaderRetrievalService.stop();
				} catch (Exception e) {
					exception = ExceptionUtils.firstOrSuppressed(e, exception);
				}
			}

			if (resourceManagerRetrievalService != null) {
				try {
					resourceManagerRetrievalService.stop();
				} catch (Exception e) {
					exception = ExceptionUtils.firstOrSuppressed(e, exception);
				}
			}

			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);

			if (webMonitorEndpoint != null) {
				terminationFutures.add(webMonitorEndpoint.closeAsync());
			}

			if (resourceManager != null) {
				terminationFutures.add(resourceManager.closeAsync());
			}

			if (dispatcher != null) {
				terminationFutures.add(dispatcher.closeAsync());
			}

			final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);

			try {
				terminationFuture.get();
			} catch (Exception e) {
				exception = ExceptionUtils.firstOrSuppressed(e, exception);
			}

			if (jobManagerMetricGroup != null) {
				jobManagerMetricGroup.close();
			}

			throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
		}
	}

几个创建和启动组件的地方用⭐标注出来了。



这篇关于Flink源码漫游指南<伍>ClusterEntrypoint与集群的启动的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程