You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// 一个迭代器,迭代一个查询作业的结果。privatefinalList<CollectResultIterator<?>> collectIterators = newArrayList<>();
@InternalpublicvoidregisterCollectIterator(CollectResultIterator<?> iterator) {
collectIterators.add(iterator);
}
/** * The default name to use for a streaming job if no other name has been specified. * * @deprecated This constant does not fit well to batch runtime mode. */@DeprecatedpublicstaticfinalStringDEFAULT_JOB_NAME = StreamGraphGenerator.DEFAULT_STREAMING_JOB_NAME;
/** The time characteristic that is used if none other is set. */privatestaticfinalTimeCharacteristicDEFAULT_TIME_CHARACTERISTIC =
TimeCharacteristic.EventTime;
/** * The environment of the context (local by default, cluster if invoked through command line). */privatestaticStreamExecutionEnvironmentFactorycontextEnvironmentFactory = null;
/** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */privatestaticfinalThreadLocal<StreamExecutionEnvironmentFactory>
threadLocalContextEnvironmentFactory = newThreadLocal<>();
/** The default parallelism used when creating a local environment. */privatestaticintdefaultLocalParallelism = Runtime.getRuntime().availableProcessors();
/** The execution configuration for this environment. */// 当前环境执行配置,包含并行度、序列化方式等protectedfinalExecutionConfigconfig = newExecutionConfig();
/** Settings that control the checkpointing behavior. */// ck配置protectedfinalCheckpointConfigcheckpointCfg = newCheckpointConfig();
/**transformation算子集合,记录从基础的transformations到最终transforms的逻辑集合*/protectedfinalList<Transformation<?>> transformations = newArrayList<>();
// cacheStream实现逻辑privatefinalMap<AbstractID, CacheTransformation<?>> cachedTransformations = newHashMap<>();
// buffer刷新的频率privatelongbufferTimeout = ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis();
// 是否开启opeartor chain优化protectedbooleanisChainingEnabled = true;
/** The state backend used for storing k/v state and state snapshots. */// 默认状态后端privateStateBackenddefaultStateBackend;
/** Whether to enable ChangelogStateBackend, default value is unset. */// 是否开启changelog ckprivateTernaryBooleanchangelogStateBackendEnabled = TernaryBoolean.UNDEFINED;
/** The default savepoint directory used by the job. */privatePathdefaultSavepointDirectory;
/** The time characteristic used by the data streams. */privateTimeCharacteristictimeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
protectedfinalList<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile =
newArrayList<>();
/*executor服务加载器,加载yarn、local、k8s等相关执行器*/privatefinalPipelineExecutorServiceLoaderexecutorServiceLoader;
/** * Currently, configuration is split across multiple member variables and classes such as {@link * ExecutionConfig} or {@link CheckpointConfig}. This architecture makes it quite difficult to * handle/merge/enrich configuration or restrict access in other APIs. * * <p>In the long-term, this {@link Configuration} object should be the source of truth for * newly added {@link ConfigOption}s that are relevant for DataStream API. Make sure to also * update {@link #configure(ReadableConfig, ClassLoader)}. */protectedfinalConfigurationconfiguration;
privatefinalClassLoaderuserClassloader;
privatefinalList<JobListener> jobListeners = newArrayList<>();
// Records the slot sharing groups and their corresponding fine-grained ResourceProfileprivatefinalMap<String, ResourceProfile> slotSharingGroupResources = newHashMap<>();
publicstaticLocalStreamEnvironmentcreateLocalEnvironment(Configurationconfiguration) {
if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
returnnewLocalStreamEnvironment(configuration);
} else {
ConfigurationcopyOfConfiguration = newConfiguration();
copyOfConfiguration.addAll(configuration);
copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, defaultLocalParallelism);
returnnewLocalStreamEnvironment(copyOfConfiguration);
}
}
// 本地运行环境webUIpublicstaticStreamExecutionEnvironmentcreateLocalEnvironmentWithWebUI(Configurationconf) {
checkNotNull(conf, "conf");
if (!conf.contains(RestOptions.PORT)) {
// explicitly set this option so that it's not set to 0 laterconf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
}
returncreateLocalEnvironment(defaultLocalParallelism, conf);
}
远程执行环境
// RemoteStreamEnvironment类下 privatestaticConfigurationgetEffectiveConfiguration(
finalConfigurationbaseConfiguration,
finalStringhost,
finalintport,
finalString[] jars,
finalList<URL> classpaths,
finalSavepointRestoreSettingssavepointRestoreSettings) {
// 将客户端传入配置合并finalConfigurationeffectiveConfiguration = newConfiguration(baseConfiguration);
// 设置jobManager配置RemoteEnvironmentConfigUtils.setJobManagerAddressToConfig(host, port, effectiveConfiguration);
// 设置执行jar包路径RemoteEnvironmentConfigUtils.setJarURLsToConfig(jars, effectiveConfiguration);
ConfigUtils.encodeCollectionToConfig(effectiveConfiguration, PipelineOptions.CLASSPATHS, classpaths, URL::toString);
if (savepointRestoreSettings != null) {
// 设置savepoint配置SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, effectiveConfiguration);
} else {
SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.none(), effectiveConfiguration);
}
// these should be set in the end to overwrite any values from the client config provided in the constructor.effectiveConfiguration.setString(DeploymentOptions.TARGET, "remote");
effectiveConfiguration.setBoolean(DeploymentOptions.ATTACHED, true);
returneffectiveConfiguration;
}
privatestaticExecutionEnvironmentFactorycontextEnvironmentFactory = null;
/** The ThreadLocal used to store {@link ExecutionEnvironmentFactory}. */privatestaticfinalThreadLocal<ExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = newThreadLocal<>();
/** The default parallelism used by local environments. */privatestaticintdefaultLocalDop = Runtime.getRuntime().availableProcessors();
// sink算子数组privatefinalList<DataSink<?>> sinks = newArrayList<>();
privatefinalList<Tuple2<String, DistributedCacheEntry>> cacheFile = newArrayList<>();
privatefinalExecutionConfigconfig = newExecutionConfig();
/** Result from the latest execution, to make it retrievable when using eager execution methods. */protectedJobExecutionResultlastJobExecutionResult;
/** Flag to indicate whether sinks have been cleared in previous executions. */privatebooleanwasExecuted = false;
privatefinalPipelineExecutorServiceLoaderexecutorServiceLoader;
privatefinalConfigurationconfiguration;
privatefinalClassLoaderuserClassloader;
privatefinalList<JobListener> jobListeners = newArrayList<>();
# dataSinks转换PlanpublicPlancreateProgramPlan(StringjobName, booleanclearSinks) {
checkNotNull(jobName);
if (this.sinks.isEmpty()) {
if (wasExecuted) {
thrownewRuntimeException("No new data sinks have been defined since the " +
"last execution. The last execution refers to the latest call to " +
"'execute()', 'count()', 'collect()', or 'print()'.");
} else {
thrownewRuntimeException("No data sinks have been created yet. " +
"A program needs at least one sink that consumes data. " +
"Examples are writing the data set or printing it.");
}
}
finalPlanGeneratorgenerator = newPlanGenerator(
sinks, config, getParallelism(), cacheFile, jobName);
finalPlanplan = generator.generate();
// clear all the sinks such that the next execution does not redo everythingif (clearSinks) {
this.sinks.clear();
wasExecuted = true;
}
returnplan;
}
PlanGenerator
// 将dataSink转换称planpublicclassPlanGenerator {
privatestaticfinalLoggerLOG = LoggerFactory.getLogger(PlanGenerator.class);
privatefinalList<DataSink<?>> sinks;
privatefinalExecutionConfigconfig;
privatefinalintdefaultParallelism;
privatefinalList<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile;
privatefinalStringjobName;
publicPlanGenerator(
List<DataSink<?>> sinks,
ExecutionConfigconfig,
intdefaultParallelism,
List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile,
StringjobName) {
this.sinks = checkNotNull(sinks);
this.config = checkNotNull(config);
this.cacheFile = checkNotNull(cacheFile);
this.jobName = checkNotNull(jobName);
this.defaultParallelism = defaultParallelism;
}
publicPlangenerate() {
finalPlanplan = createPlan();
registerGenericTypeInfoIfConfigured(plan);
registerCachedFiles(plan);
logTypeRegistrationDetails();
returnplan;
}
/** * Create plan. * * @return the generated plan. */privatePlancreatePlan() {
finalOperatorTranslationtranslator = newOperatorTranslation();
finalPlanplan = translator.translateToPlan(sinks, jobName);
if (defaultParallelism > 0) {
plan.setDefaultParallelism(defaultParallelism);
}
plan.setExecutionConfig(config);
returnplan;
}
/** * Check plan for GenericTypeInfo's and register the types at the serializers. * * @param plan the generated plan. */privatevoidregisterGenericTypeInfoIfConfigured(Planplan) {
if (!config.isAutoTypeRegistrationDisabled()) {
plan.accept(newVisitor<Operator<?>>() {
privatefinalSet<Class<?>> registeredTypes = newHashSet<>();
privatefinalSet<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = newHashSet<>();
@OverridepublicbooleanpreVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
if (!visitedOperators.add(visitable)) {
returnfalse;
}
OperatorInformation<?> opInfo = visitable.getOperatorInfo();
Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes);
returntrue;
}
@OverridepublicvoidpostVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
}
});
}
}
privatevoidregisterCachedFiles(Planplan) {
try {
registerCachedFilesWithPlan(plan);
} catch (Exceptione) {
thrownewRuntimeException("Error while registering cached files: " + e.getMessage(), e);
}
}
/** * Registers all files that were registered at this execution environment's cache registry of the * given plan's cache registry. * * @param p The plan to register files at. * @throws IOException Thrown if checks for existence and sanity fail. */privatevoidregisterCachedFilesWithPlan(Planp) throwsIOException {
for (Tuple2<String, DistributedCache.DistributedCacheEntry> entry : cacheFile) {
p.registerCachedFile(entry.f0, entry.f1);
}
}
privatevoidlogTypeRegistrationDetails() {
intregisteredTypes = getNumberOfRegisteredTypes();
intdefaultKryoSerializers = getNumberOfDefaultKryoSerializers();
LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);
if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer for serializing POJOs");
} elseif (config.isForceKryoEnabled()) {
LOG.info("Using KryoSerializer for serializing POJOs");
} elseif (config.isForceAvroEnabled()) {
LOG.info("Using AvroSerializer for serializing POJOs");
}
if (LOG.isDebugEnabled()) {
logDebuggingTypeDetails();
}
}
privateintgetNumberOfRegisteredTypes() {
returnconfig.getRegisteredKryoTypes().size() +
config.getRegisteredPojoTypes().size() +
config.getRegisteredTypesWithKryoSerializerClasses().size() +
config.getRegisteredTypesWithKryoSerializers().size();
}
privateintgetNumberOfDefaultKryoSerializers() {
returnconfig.getDefaultKryoSerializers().size() +
config.getDefaultKryoSerializerClasses().size();
}
privatevoidlogDebuggingTypeDetails() {
LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
LOG.debug("Registered Kryo with Serializers types: {}",
config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
LOG.debug("Registered Kryo with Serializer Classes types: {}",
config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
LOG.debug("Registered Kryo default Serializers: {}",
config.getDefaultKryoSerializers().entrySet().toString());
LOG.debug("Registered Kryo default Serializers Classes {}",
config.getDefaultKryoSerializerClasses().entrySet().toString());
LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString());
// print information about static code analysisLOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
}
}
publicvoidloadModule(StringmoduleName, Modulemodule) {
moduleManager.loadModule(moduleName, module);
}
publicvoidloadModule(Stringname, Modulemodule) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string");
checkNotNull(module, "module cannot be null");
// 类似于catalog操作if (!modules.containsKey(name)) {
modules.put(name, module);
LOG.info("Loaded module {} from class {}", name, module.getClass().getName());
} else {
thrownewValidationException(
String.format("A module with name %s already exists", name));
}
}
createTemporarySystemFunction
publicvoidcreateTemporarySystemFunction(Stringname, UserDefinedFunctionfunctionInstance) {
// 注册临时系统函数functionCatalog.registerTemporarySystemFunction(
name,
functionInstance,
false);
}
privatevoidregisterTemporarySystemFunction(
Stringname,
CatalogFunctionfunction,
booleanignoreIfExists) {
// 将functionName转换为全小写finalStringnormalizedName = FunctionIdentifier.normalizeName(name);
try {
// 校验函数validateAndPrepareFunction(function);
} catch (Throwablet) {
thrownewValidationException(
String.format(
"Could not register temporary system function '%s' due to implementation errors.",
name),
t);
}
if (!tempSystemFunctions.containsKey(normalizedName)) {
tempSystemFunctions.put(normalizedName, function);
} elseif (!ignoreIfExists) {
thrownewValidationException(
String.format(
"Could not register temporary system function. A function named '%s' does already exist.",
name));
}
}
createFunction
publicvoidcreateFunction(Stringpath, Class<? extendsUserDefinedFunction> functionClass, booleanignoreIfExists) {
finalUnresolvedIdentifierunresolvedIdentifier = parser.parseIdentifier(path);
functionCatalog.registerCatalogFunction(
unresolvedIdentifier,
functionClass,
ignoreIfExists);
}
publicvoidregisterCatalogFunction(
UnresolvedIdentifierunresolvedIdentifier,
Class<? extendsUserDefinedFunction> functionClass,
booleanignoreIfExists) {
finalObjectIdentifieridentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
finalObjectIdentifiernormalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);
try {
UserDefinedFunctionHelper.validateClass(functionClass);
} catch (Throwablet) {
thrownewValidationException(
String.format(
"Could not register catalog function '%s' due to implementation errors.",
identifier.asSummaryString()),
t);
}
finalCatalogcatalog = catalogManager.getCatalog(normalizedIdentifier.getCatalogName())
.orElseThrow(IllegalStateException::new);
finalObjectPathpath = identifier.toObjectPath();
// we force users to deal with temporary catalog functions first// 判断内存中是否存在if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
if (ignoreIfExists) {
return;
}
thrownewValidationException(
String.format(
"Could not register catalog function. A temporary function '%s' does already exist. " +
"Please drop the temporary function first.",
identifier.asSummaryString()));
}
// 判断该catalog是否存在if (catalog.functionExists(path)) {
if (ignoreIfExists) {
return;
}
thrownewValidationException(
String.format(
"Could not register catalog function. A function '%s' does already exist.",
identifier.asSummaryString()));
}
finalCatalogFunctioncatalogFunction = newCatalogFunctionImpl(
functionClass.getName(),
FunctionLanguage.JAVA);
try {
// 调用catalog创建函数catalog.createFunction(path, catalogFunction, ignoreIfExists);
} catch (Throwablet) {
thrownewTableException(
String.format(
"Could not register catalog function '%s'.",
identifier.asSummaryString()),
t);
}
}
dropFunction
publicbooleandropCatalogFunction(
UnresolvedIdentifierunresolvedIdentifier,
booleanignoreIfNotExist) {
finalObjectIdentifieridentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
finalObjectIdentifiernormalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);
finalCatalogcatalog = catalogManager.getCatalog(normalizedIdentifier.getCatalogName())
.orElseThrow(IllegalStateException::new);
finalObjectPathpath = identifier.toObjectPath();
// we force users to deal with temporary catalog functions first// 优先处理内存中的临时catalog函数if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
thrownewValidationException(
String.format(
"Could not drop catalog function. A temporary function '%s' does already exist. " +
"Please drop the temporary function first.",
identifier.asSummaryString()));
}
if (!catalog.functionExists(path)) {
if (ignoreIfNotExist) {
returnfalse;
}
thrownewValidationException(
String.format(
"Could not drop catalog function. A function '%s' doesn't exist.",
identifier.asSummaryString()));
}
try {
catalog.dropFunction(path, ignoreIfNotExist);
} catch (Throwablet) {
thrownewTableException(
String.format(
"Could not drop catalog function '%s'.",
identifier.asSummaryString()),
t);
}
returntrue;
}
createTemporaryFunction
publicvoidregisterTemporaryCatalogFunction(
UnresolvedIdentifierunresolvedIdentifier,
CatalogFunctioncatalogFunction,
booleanignoreIfExists) {
// 处理函数标识符finalObjectIdentifieridentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
finalObjectIdentifiernormalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);
try {
// 校验和前置处理函数validateAndPrepareFunction(catalogFunction);
} catch (Throwablet) {
thrownewValidationException(
String.format(
"Could not register temporary catalog function '%s' due to implementation errors.",
identifier.asSummaryString()),
t);
}
// 放入tempCatalogFunctions内存map中if (!tempCatalogFunctions.containsKey(normalizedIdentifier)) {
tempCatalogFunctions.put(normalizedIdentifier, catalogFunction);
} elseif (!ignoreIfExists) {
thrownewValidationException(
String.format(
"Could not register temporary catalog function. A function '%s' does already exist.",
identifier.asSummaryString()));
}
}
from
//from->scanInternalpublicOptional<TableLookupResult> getTable(ObjectIdentifierobjectIdentifier) {
Preconditions.checkNotNull(schemaResolver, "schemaResolver should not be null");
// 获取临时表不存在从catalog中获取CatalogBaseTabletemporaryTable = temporaryTables.get(objectIdentifier);
if (temporaryTable != null) {
TableSchemaresolvedSchema = resolveTableSchema(temporaryTable);
returnOptional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema));
} else {
returngetPermanentTable(objectIdentifier);
}
}
sqlQuery
@OverridepublicTablesqlQuery(Stringquery) {
// 解析query sql语句转换为Operation集合List<Operation> operations = parser.parse(query);
if (operations.size() != 1) {
thrownewValidationException(
"Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
}
Operationoperation = operations.get(0);
// 判断是否为QueryOperationif (operationinstanceofQueryOperation && !(operationinstanceofModifyOperation)) {
// 创建TablereturncreateTable((QueryOperation) operation);
} else {
thrownewValidationException(
"Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
"SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
}
}
protectedTableImplcreateTable(QueryOperationtableOperation) {
returnTableImpl.createTable(
this,
tableOperation,
operationTreeBuilder,
functionCatalog.asLookup(parser::parseIdentifier));
}
publicinterfaceFactory {
/** * Returns a unique identifier among same factory interfaces. * * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code * kafka}). If multiple factories exist for different versions, a version should be appended * using "-" (e.g. {@code elasticsearch-7}). */StringfactoryIdentifier();
/** * Returns a set of {@link ConfigOption} that an implementation of this factory requires in * addition to {@link #optionalOptions()}. * * <p>See the documentation of {@link Factory} for more information. */Set<ConfigOption<?>> requiredOptions();
/** * Returns a set of {@link ConfigOption} that an implementation of this factory consumes in * addition to {@link #requiredOptions()}. * * <p>See the documentation of {@link Factory} for more information. */Set<ConfigOption<?>> optionalOptions();
}