Flink运行环境源码解析

StreamExecutionEnvironment

环境属性相关配置

  public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
	private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
	private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
	/**上下文环境*/
	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;
	private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();
	// 默认本地并行度为当前机器core数
	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
	// 当前环境执行配置,包含并行度、序列化方式等
	private final ExecutionConfig config = new ExecutionConfig();
	// 配置控制checkpoint行为
	private final CheckpointConfig checkpointCfg = new CheckpointConfig();
	/**transformation算子集合,记录从基础的transformations到最终transforms的逻辑集合*/
	protected final List<Transformation<?>> transformations = new ArrayList<>();
	// buffer刷新的频率
	private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
	// 是否开启任务链优化,相同并行度的one-to-one算子会放在同一个task slot中,优化网络io
	protected boolean isChainingEnabled = true;
	// 默认状态后端,用于存储kv状态和状态快照
	private StateBackend defaultStateBackend;
	/** 默认时间语义:processing time**/
	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
	// 分布式缓存文件
	protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<>();
	/*executor服务加载器,加载yarn、local、k8s等相关执行器*/
	private final PipelineExecutorServiceLoader executorServiceLoader;
	private final Configuration configuration;
	// 用户指定的累加载器
	private final ClassLoader userClassloader;
	/**任务监听器,监听job状态的变化*/
	private final List<JobListener> jobListeners = new ArrayList<>();

数据流相关操作

读取文件操作

  • 底层依赖于文件的修改时间做的checkpoint,记录文件修改时间,读取大于最后文件修改时间的文件

StreamEnv执行

  • Transformations->StreamGraph->JobGraph

  • 核心类方法

	public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
		checkNotNull(streamGraph, "StreamGraph cannot be null.");
		checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

		final PipelineExecutorFactory executorFactory =
			executorServiceLoader.getExecutorFactory(configuration);

		checkNotNull(
			executorFactory,
			"Cannot find compatible factory for specified execution.target (=%s)",
			configuration.get(DeploymentOptions.TARGET));

		// 通过executorFactory得到特定配置的executor
		CompletableFuture<JobClient> jobClientFuture = executorFactory
			.getExecutor(configuration)
			.execute(streamGraph, configuration);

		try {
			JobClient jobClient = jobClientFuture.get();
			jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
			return jobClient;
		} catch (ExecutionException executionException) {
			final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
			jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));

			throw new FlinkException(
				String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
				strippedException);
		}
	}

执行环境创建

根据运行环境创建对应执行环境

	public static StreamExecutionEnvironment getExecutionEnvironment() {
		// 解析执行环境创建工程,如果不存在则创建本地执行环境,根据任务运行环境区分
		return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
			.map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
			.orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
	}

本地执行环境

public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
		final LocalStreamEnvironment currentEnvironment;

		// 创建本地执行环境,传入空配置,并将execution.target设置为local
		currentEnvironment = new LocalStreamEnvironment(configuration);
		currentEnvironment.setParallelism(parallelism);

		return currentEnvironment;
	}

## 本地运行环境webUI
  	public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
		checkNotNull(conf, "conf");

		if (!conf.contains(RestOptions.PORT)) {
			// explicitly set this option so that it's not set to 0 later
			conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
		}

		return createLocalEnvironment(defaultLocalParallelism, conf);
	}

远程执行环境

	private static Configuration getEffectiveConfiguration(
			final Configuration baseConfiguration,
			final String host,
			final int port,
			final String[] jars,
			final List<URL> classpaths,
			final SavepointRestoreSettings savepointRestoreSettings) {

		// 将客户端传入配置合并
		final Configuration effectiveConfiguration = new Configuration(baseConfiguration);

		// 设置jobManager配置
		RemoteEnvironmentConfigUtils.setJobManagerAddressToConfig(host, port, effectiveConfiguration);
		// 设置执行jar包路径
		RemoteEnvironmentConfigUtils.setJarURLsToConfig(jars, effectiveConfiguration);
		ConfigUtils.encodeCollectionToConfig(effectiveConfiguration, PipelineOptions.CLASSPATHS, classpaths, URL::toString);

		if (savepointRestoreSettings != null) {
			// 设置savepoint配置
			SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, effectiveConfiguration);
		} else {
			SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.none(), effectiveConfiguration);
		}

		// these should be set in the end to overwrite any values from the client config provided in the constructor.
		effectiveConfiguration.setString(DeploymentOptions.TARGET, "remote");
		effectiveConfiguration.setBoolean(DeploymentOptions.ATTACHED, true);

		return effectiveConfiguration;
	}

注册分布式缓存文件

registerCachedFile

  • 将本地文件或者分布式文件注册到分布式缓存中,如果需要,运行时会将文件临时复制到本地缓存中。

  • 可以通过RuntimeContext#getDistibutedCache读取

	public void registerCachedFile(String filePath, String name, boolean executable) {
		// 文件映射存储Tuple2元组
		this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
	}

ExecutionEnvironment

环境属性相关配置

private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;

	/** The ThreadLocal used to store {@link ExecutionEnvironmentFactory}. */
	private static final ThreadLocal<ExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();

	/** The default parallelism used by local environments. */
	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();


	// sink算子数组
	private final List<DataSink<?>> sinks = new ArrayList<>();

	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<>();

	private final ExecutionConfig config = new ExecutionConfig();

	/** Result from the latest execution, to make it retrievable when using eager execution methods. */
	protected JobExecutionResult lastJobExecutionResult;

	/** Flag to indicate whether sinks have been cleared in previous executions. */
	private boolean wasExecuted = false;

	private final PipelineExecutorServiceLoader executorServiceLoader;

	private final Configuration configuration;

	private final ClassLoader userClassloader;

	private final List<JobListener> jobListeners = new ArrayList<>();

任务执行

  • dataSinks->Plan->JobGraph(这里和Stream一致,最终都需要转换成JobGraph提交给对应的集群环境)

# dataSinks转换Plan
public Plan createProgramPlan(String jobName, boolean clearSinks) {
		checkNotNull(jobName);

		if (this.sinks.isEmpty()) {
			if (wasExecuted) {
				throw new RuntimeException("No new data sinks have been defined since the " +
						"last execution. The last execution refers to the latest call to " +
						"'execute()', 'count()', 'collect()', or 'print()'.");
			} else {
				throw new RuntimeException("No data sinks have been created yet. " +
						"A program needs at least one sink that consumes data. " +
						"Examples are writing the data set or printing it.");
			}
		}

		final PlanGenerator generator = new PlanGenerator(
				sinks, config, getParallelism(), cacheFile, jobName);
		final Plan plan = generator.generate();

		// clear all the sinks such that the next execution does not redo everything
		if (clearSinks) {
			this.sinks.clear();
			wasExecuted = true;
		}

		return plan;
	}

PlanGenerator

public class PlanGenerator {

	private static final Logger LOG = LoggerFactory.getLogger(PlanGenerator.class);

	private final List<DataSink<?>> sinks;
	private final ExecutionConfig config;
	private final int defaultParallelism;
	private final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile;
	private final String jobName;

	public PlanGenerator(
			List<DataSink<?>> sinks,
			ExecutionConfig config,
			int defaultParallelism,
			List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile,
			String jobName) {
		this.sinks = checkNotNull(sinks);
		this.config = checkNotNull(config);
		this.cacheFile = checkNotNull(cacheFile);
		this.jobName = checkNotNull(jobName);
		this.defaultParallelism = defaultParallelism;
	}

	public Plan generate() {
		final Plan plan = createPlan();
		registerGenericTypeInfoIfConfigured(plan);
		registerCachedFiles(plan);

		logTypeRegistrationDetails();
		return plan;
	}

	/**
	 * Create plan.
	 *
	 * @return the generated plan.
	 */
	private Plan createPlan() {
		final OperatorTranslation translator = new OperatorTranslation();
		final Plan plan = translator.translateToPlan(sinks, jobName);

		if (defaultParallelism > 0) {
			plan.setDefaultParallelism(defaultParallelism);
		}
		plan.setExecutionConfig(config);
		return plan;
	}

	/**
	 * Check plan for GenericTypeInfo's and register the types at the serializers.
	 *
	 * @param plan the generated plan.
	 */
	private void registerGenericTypeInfoIfConfigured(Plan plan) {
		if (!config.isAutoTypeRegistrationDisabled()) {
			plan.accept(new Visitor<Operator<?>>() {

				private final Set<Class<?>> registeredTypes = new HashSet<>();
				private final Set<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = new HashSet<>();

				@Override
				public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
					if (!visitedOperators.add(visitable)) {
						return false;
					}
					OperatorInformation<?> opInfo = visitable.getOperatorInfo();
					Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes);
					return true;
				}

				@Override
				public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
				}
			});
		}
	}

	private void registerCachedFiles(Plan plan) {
		try {
			registerCachedFilesWithPlan(plan);
		} catch (Exception e) {
			throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
		}
	}

	/**
	 * Registers all files that were registered at this execution environment's cache registry of the
	 * given plan's cache registry.
	 *
	 * @param p The plan to register files at.
	 * @throws IOException Thrown if checks for existence and sanity fail.
	 */
	private void registerCachedFilesWithPlan(Plan p) throws IOException {
		for (Tuple2<String, DistributedCache.DistributedCacheEntry> entry : cacheFile) {
			p.registerCachedFile(entry.f0, entry.f1);
		}
	}

	private void logTypeRegistrationDetails() {
		int registeredTypes = getNumberOfRegisteredTypes();
		int defaultKryoSerializers = getNumberOfDefaultKryoSerializers();

		LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);

		if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
			LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer for serializing POJOs");
		} else if (config.isForceKryoEnabled()) {
			LOG.info("Using KryoSerializer for serializing POJOs");
		} else if (config.isForceAvroEnabled()) {
			LOG.info("Using AvroSerializer for serializing POJOs");
		}

		if (LOG.isDebugEnabled()) {
			logDebuggingTypeDetails();
		}
	}

	private int getNumberOfRegisteredTypes() {
		return config.getRegisteredKryoTypes().size() +
				config.getRegisteredPojoTypes().size() +
				config.getRegisteredTypesWithKryoSerializerClasses().size() +
				config.getRegisteredTypesWithKryoSerializers().size();
	}

	private int getNumberOfDefaultKryoSerializers() {
		return config.getDefaultKryoSerializers().size() +
				config.getDefaultKryoSerializerClasses().size();
	}

	private void logDebuggingTypeDetails() {
		LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
		LOG.debug("Registered Kryo with Serializers types: {}",
				config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
		LOG.debug("Registered Kryo with Serializer Classes types: {}",
				config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
		LOG.debug("Registered Kryo default Serializers: {}",
				config.getDefaultKryoSerializers().entrySet().toString());
		LOG.debug("Registered Kryo default Serializers Classes {}",
				config.getDefaultKryoSerializerClasses().entrySet().toString());
		LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString());

		// print information about static code analysis
		LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
	}
}
  • 执行环境创建于Stream类似

TableEnvironment

  • 链接外部系统

  • 通过catalog注册和检索其他元数据对象

  • 执行SQL语句

  • 设置配置参数

接口方法

public interface TableEnvironment {
  // 创建Table执行环境,入参配置使用的执行器和流/批模式
  static TableEnvironment create(EnvironmentSettings settings) {
		return TableEnvironmentImpl.create(settings);
	}
  /**
  使用方式,从传入对象构造Table对象
  *	 <pre>{@code
	 *  tEnv.fromValues(
	 *      row(1, "ABC"),
	 *      row(2L, "ABCDE")
	 *  )
	 * }</pre>
  */
	Table fromValues(Expression... values);
  
  /*  tEnv.fromValues(
	 *      DataTypes.ROW(
	 *          DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
	 *          DataTypes.FIELD("name", DataTypes.STRING())
	 *      ),
	 *      row(1, "ABC"),
	 *      row(2L, "ABCDE")
	  )
   */  
	Table fromValues(AbstractDataType<?> rowType, Expression... values);
  
  // 传递迭代器
  Table fromValues(Iterable<?> values);
  Table fromValues(AbstractDataType<?> rowType, Iterable<?> values);
  // 注册catalog
  void registerCatalog(String catalogName, Catalog catalog);
  // 根据catalogName获取catalog
  Optional<Catalog> getCatalog(String catalogName);
  // 根据moduleName记载一个Module,会根据顺序加载Module,如果已经存在抛出异常,module主要记录一些catalog函数和flink内置函数
  void loadModule(String moduleName, Module module);
  	void unloadModule(String moduleName);
  // 注册临时UDF函数
  	void createTemporarySystemFunction(String name, Class<? extends UserDefinedFunction> functionClass);
  void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance);
  boolean dropTemporarySystemFunction(String name);
  void createFunction(String path, Class<? extends UserDefinedFunction> functionClass);
	void createFunction(String path, Class<? extends UserDefinedFunction> functionClass, boolean ignoreIfExists);
  boolean dropFunction(String path);
  void createTemporaryFunction(String path, Class<? extends UserDefinedFunction> functionClass);
  boolean dropTemporaryFunction(String path);
  void createTemporaryView(String path, Table view);
  Table from(String path);
  String[] listCatalogs();
  String[] listModules();
  String[] listDatabases();
  String[] listTables();
  String[] listViews();
  // 分析sql执行计划
  String explainSql(String statement, ExplainDetail... extraDetails);
  Table sqlQuery(String query);
  TableResult executeSql(String statement);
  String getCurrentCatalog();
  void useCatalog(String catalogName);
  String getCurrentDatabase();
  void useDatabase(String databaseName);
  TableConfig getConfig();
  StatementSet createStatementSet();
}

TableEnvironmentInternal

  • 表环境内部接口

public interface TableEnvironmentInternal extends TableEnvironment {

	Parser getParser();

	CatalogManager getCatalogManager();

	TableResult executeInternal(List<ModifyOperation> operations);

	TableResult executeInternal(QueryOperation operation);

	String explainInternal(List<Operation> operations, ExplainDetail... extraDetails);

	void registerTableSourceInternal(String name, TableSource<?> tableSource);

	void registerTableSinkInternal(String name, TableSink<?> configuredSink);
}

核心方法实现

FromValues

// 底层调用createTable方法
public Table fromValues(AbstractDataType<?> rowType, Expression... values) {
		// 将rowType解析成dataType
		final DataType resolvedDataType = catalogManager.getDataTypeFactory().createDataType(rowType);
		// 将表达式转换成QueryOperation
		return createTable(operationTreeBuilder.values(resolvedDataType, values));
	}

registerCatalog

  public void registerCatalog(String catalogName, Catalog catalog) {
		catalogManager.registerCatalog(catalogName, catalog);
	}
	
	public void registerCatalog(String catalogName, Catalog catalog) {
		// 校验catalogname是否合法
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty.");
		// 校验catalog
		checkNotNull(catalog, "Catalog cannot be null");

		// 判断catalog是否已经存在
		if (catalogs.containsKey(catalogName)) {
			throw new CatalogException(format("Catalog %s already exists.", catalogName));
		}

		// 将catalog放入catalogs linkedHashMap有序链表map中
		catalogs.put(catalogName, catalog);
		// 初始化catalog链接
		catalog.open();
	}

loadModule

public void loadModule(String moduleName, Module module) {
		moduleManager.loadModule(moduleName, module);
	}
	
		public void loadModule(String name, Module module) {
		checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string");
		checkNotNull(module, "module cannot be null");

		// 类似于catalog操作
		if (!modules.containsKey(name)) {
			modules.put(name, module);

			LOG.info("Loaded module {} from class {}", name, module.getClass().getName());
		} else {
			throw new ValidationException(
				String.format("A module with name %s already exists", name));
		}
	}

createTemporarySystemFunction

	public void createTemporarySystemFunction(String name, UserDefinedFunction functionInstance) {
		// 注册临时系统函数
		functionCatalog.registerTemporarySystemFunction(
			name,
			functionInstance,
			false);
	}

private void registerTemporarySystemFunction(
			String name,
			CatalogFunction function,
			boolean ignoreIfExists) {
		// 将functionName转换为全小写
		final String normalizedName = FunctionIdentifier.normalizeName(name);

		try {
			// 校验函数
			validateAndPrepareFunction(function);
		} catch (Throwable t) {
			throw new ValidationException(
				String.format(
					"Could not register temporary system function '%s' due to implementation errors.",
					name),
				t);
		}

		if (!tempSystemFunctions.containsKey(normalizedName)) {
			tempSystemFunctions.put(normalizedName, function);
		} else if (!ignoreIfExists) {
			throw new ValidationException(
				String.format(
					"Could not register temporary system function. A function named '%s' does already exist.",
					name));
		}
	}

createFunction

	public void createFunction(String path, Class<? extends UserDefinedFunction> functionClass, boolean ignoreIfExists) {
		final UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(path);
		functionCatalog.registerCatalogFunction(
			unresolvedIdentifier,
			functionClass,
			ignoreIfExists);
	}
	
		public void registerCatalogFunction(
			UnresolvedIdentifier unresolvedIdentifier,
			Class<? extends UserDefinedFunction> functionClass,
			boolean ignoreIfExists) {
		final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
		final ObjectIdentifier normalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);

		try {
			UserDefinedFunctionHelper.validateClass(functionClass);
		} catch (Throwable t) {
			throw new ValidationException(
				String.format(
					"Could not register catalog function '%s' due to implementation errors.",
					identifier.asSummaryString()),
				t);
		}

		final Catalog catalog = catalogManager.getCatalog(normalizedIdentifier.getCatalogName())
			.orElseThrow(IllegalStateException::new);
		final ObjectPath path = identifier.toObjectPath();

		// we force users to deal with temporary catalog functions first
		// 判断内存中是否存在
		if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
			if (ignoreIfExists) {
				return;
			}
			throw new ValidationException(
				String.format(
					"Could not register catalog function. A temporary function '%s' does already exist. " +
						"Please drop the temporary function first.",
					identifier.asSummaryString()));
		}

		// 判断该catalog是否存在
		if (catalog.functionExists(path)) {
			if (ignoreIfExists) {
				return;
			}
			throw new ValidationException(
				String.format(
					"Could not register catalog function. A function '%s' does already exist.",
					identifier.asSummaryString()));
		}

		final CatalogFunction catalogFunction = new CatalogFunctionImpl(
			functionClass.getName(),
			FunctionLanguage.JAVA);
		try {
			// 调用catalog创建函数
			catalog.createFunction(path, catalogFunction, ignoreIfExists);
		} catch (Throwable t) {
			throw new TableException(
				String.format(
					"Could not register catalog function '%s'.",
					identifier.asSummaryString()),
				t);
		}
	}

dropFunction

public boolean dropCatalogFunction(
			UnresolvedIdentifier unresolvedIdentifier,
			boolean ignoreIfNotExist) {
		final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
		final ObjectIdentifier normalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);

		final Catalog catalog = catalogManager.getCatalog(normalizedIdentifier.getCatalogName())
			.orElseThrow(IllegalStateException::new);
		final ObjectPath path = identifier.toObjectPath();

		// we force users to deal with temporary catalog functions first
		// 优先处理内存中的临时catalog函数
		if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
			throw new ValidationException(
				String.format(
					"Could not drop catalog function. A temporary function '%s' does already exist. " +
						"Please drop the temporary function first.",
					identifier.asSummaryString()));
		}

		if (!catalog.functionExists(path)) {
			if (ignoreIfNotExist) {
				return false;
			}
			throw new ValidationException(
				String.format(
					"Could not drop catalog function. A function '%s' doesn't exist.",
					identifier.asSummaryString()));
		}

		try {
			catalog.dropFunction(path, ignoreIfNotExist);
		} catch (Throwable t) {
			throw new TableException(
				String.format(
					"Could not drop catalog function '%s'.",
					identifier.asSummaryString()),
				t);
		}
		return true;
	}

createTemporaryFunction

public void registerTemporaryCatalogFunction(
			UnresolvedIdentifier unresolvedIdentifier,
			CatalogFunction catalogFunction,
			boolean ignoreIfExists) {
		// 处理函数标识符
		final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
		final ObjectIdentifier normalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);

		try {
			// 校验和前置处理函数
			validateAndPrepareFunction(catalogFunction);
		} catch (Throwable t) {
			throw new ValidationException(
				String.format(
					"Could not register temporary catalog function '%s' due to implementation errors.",
					identifier.asSummaryString()),
				t);
		}

		// 放入tempCatalogFunctions内存map中
		if (!tempCatalogFunctions.containsKey(normalizedIdentifier)) {
			tempCatalogFunctions.put(normalizedIdentifier, catalogFunction);
		} else if (!ignoreIfExists) {
			throw new ValidationException(
				String.format(
					"Could not register temporary catalog function. A function '%s' does already exist.",
					identifier.asSummaryString()));
		}
	}

from

//from->scanInternal
public Optional<TableLookupResult> getTable(ObjectIdentifier objectIdentifier) {
		Preconditions.checkNotNull(schemaResolver, "schemaResolver should not be null");
		// 获取临时表不存在从catalog中获取
		CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
		if (temporaryTable != null) {
			TableSchema resolvedSchema = resolveTableSchema(temporaryTable);
			return Optional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema));
		} else {
			return getPermanentTable(objectIdentifier);
		}
	}

sqlQuery

@Override
	public Table sqlQuery(String query) {
		// 解析query sql语句转换为Operation集合
		List<Operation> operations = parser.parse(query);

		if (operations.size() != 1) {
			throw new ValidationException(
				"Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
		}

		Operation operation = operations.get(0);

		// 判断是否为QueryOperation
		if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {
			// 创建Table
			return createTable((QueryOperation) operation);
		} else {
			throw new ValidationException(
				"Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
					"SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
		}
	}
	
		protected TableImpl createTable(QueryOperation tableOperation) {
		return TableImpl.createTable(
			this,
			tableOperation,
			operationTreeBuilder,
			functionCatalog.asLookup(parser::parseIdentifier));
	}

executeSql

public TableResult executeSql(String statement) {
		List<Operation> operations = parser.parse(statement);

		if (operations.size() != 1) {
			throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
		}
		// 根据不同的算子执行不同的操作
		return executeOperation(operations.get(0));
	}

TableFactory

public interface TableFactory {

 // 必须的属性配置,比如connector.type, format.type等
	Map<String, String> requiredContext();

/**
* 支持的属性配置
*   - schema.#.type
*   - schema.#.name
*   - connector.topic
*   - format.line-delimiter
*   - format.ignore-parse-errors
*   - format.fields.#.type
*   - format.fields.#.name
* 表示值的数组,其中“#”表示一个或多个数字
* 在某些情况下,声明通配符“ *”可能很有用。通配符只能在属性键的末尾声明。
*/
	List<String> supportedProperties();
}

ComponentFactory

  • 组件的工厂接口,如果存在多个匹配的实现,则可以进一步进行歧义消除

public interface ComponentFactory extends TableFactory {

  // 可选上下文
	Map<String, String> optionalContext();

  // 比选上下文配置
	@Override
	Map<String, String> requiredContext();

  // 支持配置属性
	@Override
	List<String> supportedProperties();
}

ExecutorFactory

  • 执行器工厂

public interface ExecutorFactory extends ComponentFactory {

  // 根据配置创建executor
	Executor create(Map<String, String> properties);
}

BlinkExecutorFactory

  • Blink执行器工厂

public class BlinkExecutorFactory implements ExecutorFactory {

	/**
	 * Creates a corresponding {@link ExecutorBase}.
	 *
	 * @param properties Static properties of the {@link Executor}, the same that were used for factory lookup.
	 * @param executionEnvironment a {@link StreamExecutionEnvironment} to use while executing Table programs.
	 * @return instance of a {@link Executor}
	 */
	public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment) {
		// 根据对应模式创建
		if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.STREAMING_MODE, "true"))) {
			return new StreamExecutor(executionEnvironment);
		} else {
			return new BatchExecutor(executionEnvironment);
		}
	}

	@Override
	public Executor create(Map<String, String> properties) {
		return create(properties, StreamExecutionEnvironment.getExecutionEnvironment());
	}

	@Override
	public Map<String, String> requiredContext() {
		DescriptorProperties properties = new DescriptorProperties();
		return properties.asMap();
	}

	@Override
	public List<String> supportedProperties() {
		return Arrays.asList(EnvironmentSettings.STREAMING_MODE, EnvironmentSettings.CLASS_NAME);
	}

	@Override
	public Map<String, String> optionalContext() {
		Map<String, String> context = new HashMap<>();
		context.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName());
		return context;
	}
}

Executor

  • createPipeline(List<Transformation<?>> transformations,TableConfig tableConfig,String jobName);

    • 将给定的transformations转换成Pipeline

  • execute(Pipeline pipeline) throws Exception;

  • executeAsync(Pipeline pipeline) throws Exception;

ExecutorBase

public abstract class ExecutorBase implements Executor {
	// 默认Job名称
	private static final String DEFAULT_JOB_NAME = "Flink Exec Table Job";

	// Job执行环境
	private final StreamExecutionEnvironment executionEnvironment;
	// 表配置
	protected TableConfig tableConfig;

	public ExecutorBase(StreamExecutionEnvironment executionEnvironment) {
		this.executionEnvironment = executionEnvironment;
	}

	public StreamExecutionEnvironment getExecutionEnvironment() {
		return executionEnvironment;
	}

	@Override
	public JobExecutionResult execute(Pipeline pipeline) throws Exception {
		return executionEnvironment.execute((StreamGraph) pipeline);
	}

	@Override
	public JobClient executeAsync(Pipeline pipeline) throws Exception {
		return executionEnvironment.executeAsync((StreamGraph) pipeline);
	}

	protected String getNonEmptyJobName(String jobName) {
		if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
			return DEFAULT_JOB_NAME;
		} else {
			return jobName;
		}
	}
}

StreamExecutor

@Internal
public class StreamExecutor extends ExecutorBase {

	@VisibleForTesting
	public StreamExecutor(StreamExecutionEnvironment executionEnvironment) {
		super(executionEnvironment);
	}

	/**
	 * 表执行器,将transformations转换为StreamGraph
	 * @param transformations list of transformations
	 * @param tableConfig
	 * @param jobName what should be the name of the job
	 * @return
	 */
	@Override
	public Pipeline createPipeline(List<Transformation<?>> transformations, TableConfig tableConfig, String jobName) {
		// 将transformations转换成StreamGraph
		StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations);
		// 设置job名称
		streamGraph.setJobName(getNonEmptyJobName(jobName));
		return streamGraph;
	}
}

BatchExecutor

public class BatchExecutor extends ExecutorBase {

	@VisibleForTesting
	public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
		super(executionEnvironment);
	}

	@Override
	public Pipeline createPipeline(List<Transformation<?>> transformations, TableConfig tableConfig, String jobName) {
		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
		// 设置table配置
		ExecutorUtils.setBatchProperties(execEnv, tableConfig);
		StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations);
		streamGraph.setJobName(getNonEmptyJobName(jobName));
		ExecutorUtils.setBatchProperties(streamGraph, tableConfig);
		return streamGraph;
	}
}

Parser

  • 用于将SQL字符串解析成SQL对象

  • List parse(String statement);

  • UnresolvedIdentifier parseIdentifier(String identifier);

  • ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema);

ParserImpl

public class ParserImpl implements Parser {

	// catalog管理器
	private final CatalogManager catalogManager;

	// we use supplier pattern here in order to use the most up to
	// date configuration. Users might change the parser configuration in a TableConfig in between
	// multiple statements parsing
	// 校验器提供器
	private final Supplier<FlinkPlannerImpl> validatorSupplier;
	private final Supplier<CalciteParser> calciteParserSupplier;
	private final Function<TableSchema, SqlExprToRexConverter> sqlExprToRexConverterCreator;

	public ParserImpl(
			CatalogManager catalogManager,
			Supplier<FlinkPlannerImpl> validatorSupplier,
			Supplier<CalciteParser> calciteParserSupplier,
			Function<TableSchema, SqlExprToRexConverter> sqlExprToRexConverterCreator) {
		this.catalogManager = catalogManager;
		this.validatorSupplier = validatorSupplier;
		this.calciteParserSupplier = calciteParserSupplier;
		this.sqlExprToRexConverterCreator = sqlExprToRexConverterCreator;
	}

	@Override
	public List<Operation> parse(String statement) {
		CalciteParser parser = calciteParserSupplier.get();
		FlinkPlannerImpl planner = validatorSupplier.get();
		// parse the sql query,解析SQL
		SqlNode parsed = parser.parse(statement);

		Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
			.orElseThrow(() -> new TableException("Unsupported query: " + statement));
		return Collections.singletonList(operation);
	}

	@Override
	public UnresolvedIdentifier parseIdentifier(String identifier) {
		CalciteParser parser = calciteParserSupplier.get();
		SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
		return UnresolvedIdentifier.of(sqlIdentifier.names);
	}

	@Override
	public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
		SqlExprToRexConverter sqlExprToRexConverter = sqlExprToRexConverterCreator.apply(inputSchema);
		RexNode rexNode = sqlExprToRexConverter.convertToRexNode(sqlExpression);
		// 转换成逻辑类型
		LogicalType logicalType = FlinkTypeFactory.toLogicalType(rexNode.getType());
		return new RexNodeExpression(rexNode, TypeConversions.fromLogicalToDataType(logicalType));
	}
}

Planner

  • 转换一个SQL字符串为table api指定的对象,如Operation

  • 关系型执行器,提供了一种计划,优化和将ModifyOperation的树转换为可运行形式Transformation

public interface Planner {
	Parser getParser();

	// 将ModifyOperation的关系树转换为一组可运行的Transformation 。
//此方法接受ModifyOperation的列表,以允许重用多个关系查询的公共子树。 每个查询的顶部节点应该是ModifyOperation ,以便传递输出Transformation的预期属性,例如输出模式(追加,撤回,向上插入)或预期的输出类型。
	List<Transformation<?>> translate(List<ModifyOperation> modifyOperations);
	// 执行计划
	String explain(List<Operation> operations, ExplainDetail... extraDetails);

  // 在给定的光标位置返回给定语句的完成提示。 完成不区分大小写。
	String[] getCompletionHints(String statement, int position);
}

最后更新于