ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Flink源码漫游指南<伍>ClusterEntrypoint与集群的启动

2022-03-18 18:32:03  阅读:266  来源: 互联网

标签:webMonitorEndpoint configuration Flink 源码 集群 ClusterEntrypoint new null final


        当用户用Session cli命令启动集群时,首先会在Flink集群启动脚本中调用ClusterEntrypoint抽象类中提供的main()方法,以启动和运行相应类型的集群环境。

也就是说,ClusterEntrypoint是整个集群的入口类,且带有main()方法。在运行时管理中,所有的服务都是通过CE类进行触发和启动,进而完成核心组件的创建和初始化。

我们先通过下图看一下CE抽象类的继承关系

可以看到ClusterEntrypoint分为两类

  • SessionClusterEntrypoint
    • 只建立一个集群,能够同时运行多个作业,这样资源利用率更高,但是如果集群挂掉, 会影响很多作业。
  • JobClusterEntrypoint
    • 又叫Per-job模式,为每个job单独创建一个集群,这样如果集群挂掉也只影响一个任务。

standalone对应的本地模式,mesos、yarn集群模式的不同调度器。

我们再从StandaloneSessionClusterEntrypoint中的main()方法开始,看看ClusterEntrypoint如何启动集群

public static void main(String[] args) {
		// startup checks and logging 启动配置检查和日志加载
		EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
		SignalHandler.register(LOG);
		JvmShutdownSafeguard.installAsShutdownHook(LOG);

		EntrypointClusterConfiguration entrypointClusterConfiguration = null;
		final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());

		try {
			entrypointClusterConfiguration = commandLineParser.parse(args);
		} catch (FlinkParseException e) {
			LOG.error("Could not parse command line arguments {}.", args, e);
			commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
			System.exit(1);
		}

		Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);

        //经过上面一系列的配置之后,通过调用CE抽象类的runClusterEntrypoint启动
		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
	}

通过最后一行代码我们可以发现,经过一系列的配置和日志加载,最后调用了ClusterEntrypoint里的runClusterEntrypoint方法。我们再来看看这个方法干了什么。

	public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

		final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
		try {
			clusterEntrypoint.startCluster();//⭐通过这一行启动集群
		} catch (ClusterEntrypointException e) {
			LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
			System.exit(STARTUP_FAILURE_RETURN_CODE);
		}

		clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
			final int returnCode;

			if (throwable != null) {
				returnCode = RUNTIME_FAILURE_RETURN_CODE;
			} else {
				returnCode = applicationStatus.processExitCode();
			}

			LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
			System.exit(returnCode);
		});
	}

上述代码中带⭐的代码又调用的CE.startCluster()继续启动,然后等运行结束,用clusterEntrypoint.getTerminationFuture().whenComplete()获取运行结束状态并进行对应的处理。

我们再看看startCluster()干了什么

	public void startCluster() throws ClusterEntrypointException {
		LOG.info("Starting {}.", getClass().getSimpleName());

		try {
			configureFileSystems(configuration);//配置文件系统

			SecurityContext securityContext = installSecurityContext(configuration);

			securityContext.runSecured((Callable<Void>) () -> {
				runCluster(configuration);//⭐在securityContext安全环境里继续启动

				return null;
			});
		} catch (Throwable t) {
			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);

			try {
				// clean up any partial state
				shutDownAsync(
					ApplicationStatus.FAILED,
					ExceptionUtils.stringifyException(strippedThrowable),
					false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
			} catch (InterruptedException | ExecutionException | TimeoutException e) {
				strippedThrowable.addSuppressed(e);
			}

			throw new ClusterEntrypointException(
				String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
				strippedThrowable);
		}
	}

注意⭐号的代码,这里是SecurityContext在继续runCluster,而不是ClusterEntrypoint在做,继续看runCluster

	private void runCluster(Configuration configuration) throws Exception {
		synchronized (lock) {
			//⭐初始化运行时集群需要创建的基础组件服务,如HAServices、CommonRPCService等。
			initializeServices(configuration);

			// write host information into configuration 把host信息写入配置
			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

			final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);

			//⭐创建集群组件clusterComponent
			//⭐其中包含了resourceManager、dispatcher、webMonitorEndpoint
			clusterComponent = dispatcherResourceManagerComponentFactory.create(
				configuration,
				commonRpcService,
				haServices,
				blobServer,
				heartbeatServices,
				metricRegistry,
				archivedExecutionGraphStore,
				new AkkaQueryServiceRetriever(
					metricQueryServiceActorSystem,
					Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
				this);

			clusterComponent.getShutDownFuture().whenComplete(
				(ApplicationStatus applicationStatus, Throwable throwable) -> {
					if (throwable != null) {
						shutDownAsync(
							ApplicationStatus.UNKNOWN,
							ExceptionUtils.stringifyException(throwable),
							false);
					} else {
						// This is the general shutdown path. If a separate more specific shutdown was
						// already triggered, this will do nothing
						shutDownAsync(
							applicationStatus,
							null,
							true);
					}
				});
		}
	}

这一步启动了多种服务和组件,并通过dispatcherResourceManagerComponentFactory调用create来启动,继续看

	@Override
	public DispatcherResourceManagerComponent<T> create(
			Configuration configuration,
			RpcService rpcService,
			HighAvailabilityServices highAvailabilityServices,
			BlobServer blobServer,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry,
			ArchivedExecutionGraphStore archivedExecutionGraphStore,
			MetricQueryServiceRetriever metricQueryServiceRetriever,
			FatalErrorHandler fatalErrorHandler) throws Exception {

		LeaderRetrievalService dispatcherLeaderRetrievalService = null;
		LeaderRetrievalService resourceManagerRetrievalService = null;
		WebMonitorEndpoint<U> webMonitorEndpoint = null;
		ResourceManager<?> resourceManager = null;
		JobManagerMetricGroup jobManagerMetricGroup = null;
		T dispatcher = null;

		try {
			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();

			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();

			final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
				rpcService,
				DispatcherGateway.class,
				DispatcherId::fromUuid,
				10,
				Time.milliseconds(50L));

			final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
				rpcService,
				ResourceManagerGateway.class,
				ResourceManagerId::fromUuid,
				10,
				Time.milliseconds(50L));

			final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
				configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
				configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
				"DispatcherRestEndpoint");

			final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
			final MetricFetcher metricFetcher = updateInterval == 0
				? VoidMetricFetcher.INSTANCE
				: MetricFetcherImpl.fromConfiguration(
					configuration,
					metricQueryServiceRetriever,
					dispatcherGatewayRetriever,
					executor);

			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
				configuration,
				dispatcherGatewayRetriever,
				resourceManagerGatewayRetriever,
				blobServer,
				executor,
				metricFetcher,
				highAvailabilityServices.getWebMonitorLeaderElectionService(),
				fatalErrorHandler);//⭐创建webMonitorEndpoint

			log.debug("Starting Dispatcher REST endpoint.");
			webMonitorEndpoint.start();//⭐启动webMonitorEndpoint

			final String hostname = getHostname(rpcService);

			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
				metricRegistry,
				hostname,
				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));

			resourceManager = resourceManagerFactory.createResourceManager(
				configuration,
				ResourceID.generate(),
				rpcService,
				highAvailabilityServices,
				heartbeatServices,
				metricRegistry,
				fatalErrorHandler,
				new ClusterInformation(hostname, blobServer.getPort()),
				webMonitorEndpoint.getRestBaseUrl(),
				jobManagerMetricGroup); //⭐创建ResourceManager

			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);

			dispatcher = dispatcherFactory.createDispatcher(
				configuration,
				rpcService,
				highAvailabilityServices,
				resourceManagerGatewayRetriever,
				blobServer,
				heartbeatServices,
				jobManagerMetricGroup,
				metricRegistry.getMetricQueryServicePath(),
				archivedExecutionGraphStore,
				fatalErrorHandler,
				historyServerArchivist);//⭐创建dispatcher

			log.debug("Starting ResourceManager.");
			resourceManager.start();//⭐启动ResourceManager
			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);

			log.debug("Starting Dispatcher.");
			dispatcher.start();//⭐启动dispatcher
			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);

			return createDispatcherResourceManagerComponent(
				dispatcher,
				resourceManager,
				dispatcherLeaderRetrievalService,
				resourceManagerRetrievalService,
				webMonitorEndpoint,
				jobManagerMetricGroup);

		} catch (Exception exception) {
			// clean up all started components
			if (dispatcherLeaderRetrievalService != null) {
				try {
					dispatcherLeaderRetrievalService.stop();
				} catch (Exception e) {
					exception = ExceptionUtils.firstOrSuppressed(e, exception);
				}
			}

			if (resourceManagerRetrievalService != null) {
				try {
					resourceManagerRetrievalService.stop();
				} catch (Exception e) {
					exception = ExceptionUtils.firstOrSuppressed(e, exception);
				}
			}

			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);

			if (webMonitorEndpoint != null) {
				terminationFutures.add(webMonitorEndpoint.closeAsync());
			}

			if (resourceManager != null) {
				terminationFutures.add(resourceManager.closeAsync());
			}

			if (dispatcher != null) {
				terminationFutures.add(dispatcher.closeAsync());
			}

			final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);

			try {
				terminationFuture.get();
			} catch (Exception e) {
				exception = ExceptionUtils.firstOrSuppressed(e, exception);
			}

			if (jobManagerMetricGroup != null) {
				jobManagerMetricGroup.close();
			}

			throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
		}
	}

几个创建和启动组件的地方用⭐标注出来了。

标签:webMonitorEndpoint,configuration,Flink,源码,集群,ClusterEntrypoint,new,null,final
来源: https://blog.csdn.net/Li_DeSheng/article/details/121567901

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有