Skip to content

Commit

Permalink
remove batchProcessingMode from task config, remove AppenderatorImpl (#…
Browse files Browse the repository at this point in the history
…16765)

changes:
* removes `druid.indexer.task.batchProcessingMode` in favor of always using `CLOSED_SEGMENT_SINKS` which uses `BatchAppenderator`. This was intended to become the default for native batch, but that was missed so `CLOSED_SEGMENTS` was the default (using `AppenderatorImpl`), however MSQ has been exclusively using `BatchAppenderator` with no problems so it seems safe to just roll it out as the only option for batch ingestion everywhere.
* with `batchProcessingMode` gone, there is no use for `AppenderatorImpl` so it has been removed
* implify `Appenderator` construction since there are only separate stream and batch versions now
* simplify tests since `batchProcessingMode` is gone
  • Loading branch information
clintropolis authored Jul 22, 2024
1 parent 6a2348b commit 02b8738
Show file tree
Hide file tree
Showing 39 changed files with 87 additions and 3,790 deletions.
1 change: 0 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,6 @@ Additional Peon configs include:
|`druid.peon.mode`|One of `local` or `remote`. Setting this property to `local` means you intend to run the Peon as a standalone process which is not recommended.|`remote`|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.batchProcessingMode`| Batch ingestion tasks have three operating modes to control construction and tracking for intermediary segments: `OPEN_SEGMENTS`, `CLOSED_SEGMENTS`, and `CLOSED_SEGMENT_SINKS`. `OPEN_SEGMENTS` uses the streaming ingestion code path and performs a `mmap` on intermediary segments to build a timeline to make these segments available to realtime queries. Batch ingestion doesn't require intermediary segments, so the default mode, `CLOSED_SEGMENTS`, eliminates `mmap` of intermediary segments. `CLOSED_SEGMENTS` mode still tracks the entire set of segments in heap. The `CLOSED_SEGMENTS_SINKS` mode is the most aggressive configuration and should have the smallest memory footprint. It eliminates in-memory tracking and `mmap` of intermediary segments produced during segment creation. `CLOSED_SEGMENTS_SINKS` mode isn't as well tested as other modes so is currently considered experimental. You can use `OPEN_SEGMENTS` mode if problems occur with the 2 newer modes. |`CLOSED_SEGMENTS`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|`org.apache.hadoop:hadoop-client-api:3.3.6`, `org.apache.hadoop:hadoop-client-runtime:3.3.6`|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie Peons to exit before giving up on their replacements.|PT10M|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,8 @@ public Pair<Integer, ReadableInput> apply(ReadableInput readableInput)

// Create directly, without using AppenderatorsManager, because we need different memory overrides due to
// using one Appenderator per processing thread instead of per task.
// Note: "createOffline" ignores the batchProcessingMode and always acts like CLOSED_SEGMENTS_SINKS.
final Appenderator appenderator =
Appenderators.createOffline(
Appenderators.createBatch(
idString,
dataSchema,
makeAppenderatorConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.EnumUtils;
import org.apache.druid.common.config.Configs;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -65,16 +64,6 @@ public class TaskConfig
}
}

// This enum controls processing mode of batch ingestion "segment creation" phase (i.e. appenderator logic)
public enum BatchProcessingMode
{
OPEN_SEGMENTS, /* mmap segments, legacy code */
CLOSED_SEGMENTS, /* Do not mmap segments but keep most other legacy code */
CLOSED_SEGMENTS_SINKS /* Most aggressive memory optimization, do not mmap segments and eliminate sinks, etc. */
}

public static final BatchProcessingMode BATCH_PROCESSING_MODE_DEFAULT = BatchProcessingMode.CLOSED_SEGMENTS;

private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");
private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true;
Expand Down Expand Up @@ -110,12 +99,6 @@ public enum BatchProcessingMode
@JsonProperty
private final boolean ignoreTimestampSpecForDruidInputSource;

@JsonProperty
private final boolean batchMemoryMappedIndex;

@JsonProperty
private final BatchProcessingMode batchProcessingMode;

@JsonProperty
private final boolean storeEmptyColumns;

Expand All @@ -137,9 +120,6 @@ public TaskConfig(
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
@JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMappedIndex,
// deprecated, only set to true to fall back to older behavior
@JsonProperty("batchProcessingMode") String batchProcessingMode,
@JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask
Expand Down Expand Up @@ -171,26 +151,8 @@ public TaskConfig(
);

this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
this.batchMemoryMappedIndex = batchMemoryMappedIndex;
this.encapsulatedTask = enableTaskLevelLogPush;

// Conflict resolution. Assume that if batchMemoryMappedIndex is set (since false is the default) that
// the user changed it intentionally to use legacy, in this case oveeride batchProcessingMode and also
// set it to legacy else just use batchProcessingMode and don't pay attention to batchMemoryMappedIndexMode:
if (batchMemoryMappedIndex) {
this.batchProcessingMode = BatchProcessingMode.OPEN_SEGMENTS;
} else if (EnumUtils.isValidEnum(BatchProcessingMode.class, batchProcessingMode)) {
this.batchProcessingMode = BatchProcessingMode.valueOf(batchProcessingMode);
} else {
// batchProcessingMode input string is invalid, log & use the default.
this.batchProcessingMode = BatchProcessingMode.CLOSED_SEGMENTS; // Default
log.warn(
"Batch processing mode argument value is null or not valid:[%s], defaulting to[%s] ",
batchProcessingMode, this.batchProcessingMode
);
}
log.debug("Batch processing mode:[%s]", this.batchProcessingMode);

this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS);
this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK);
}
Expand All @@ -206,8 +168,6 @@ private TaskConfig(
Period directoryLockTimeout,
List<StorageLocationConfig> shuffleDataLocations,
boolean ignoreTimestampSpecForDruidInputSource,
boolean batchMemoryMappedIndex,
BatchProcessingMode batchProcessingMode,
boolean storeEmptyColumns,
boolean encapsulatedTask,
long tmpStorageBytesPerTask
Expand All @@ -223,8 +183,6 @@ private TaskConfig(
this.directoryLockTimeout = directoryLockTimeout;
this.shuffleDataLocations = shuffleDataLocations;
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
this.batchMemoryMappedIndex = batchMemoryMappedIndex;
this.batchProcessingMode = batchProcessingMode;
this.storeEmptyColumns = storeEmptyColumns;
this.encapsulatedTask = encapsulatedTask;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
Expand Down Expand Up @@ -310,22 +268,6 @@ public boolean isIgnoreTimestampSpecForDruidInputSource()
return ignoreTimestampSpecForDruidInputSource;
}

@JsonProperty
public BatchProcessingMode getBatchProcessingMode()
{
return batchProcessingMode;
}

/**
* Do not use in code! use {@link TaskConfig#getBatchProcessingMode() instead}
*/
@Deprecated
@JsonProperty
public boolean getbatchMemoryMappedIndex()
{
return batchMemoryMappedIndex;
}

@JsonProperty
public boolean isStoreEmptyColumns()
{
Expand Down Expand Up @@ -366,8 +308,6 @@ public TaskConfig withBaseTaskDir(File baseTaskDir)
directoryLockTimeout,
shuffleDataLocations,
ignoreTimestampSpecForDruidInputSource,
batchMemoryMappedIndex,
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask
Expand All @@ -387,8 +327,6 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask)
directoryLockTimeout,
shuffleDataLocations,
ignoreTimestampSpecForDruidInputSource,
batchMemoryMappedIndex,
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -75,54 +73,20 @@ public static Appenderator newAppenderator(
boolean useMaxMemoryEstimates
)
{
if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.OPEN_SEGMENTS) {
return appenderatorsManager.createOpenSegmentsOfflineAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler,
useMaxMemoryEstimates,
toolbox.getCentralizedTableSchemaConfig()
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS) {
return appenderatorsManager.createClosedSegmentsOfflineAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler,
useMaxMemoryEstimates,
toolbox.getCentralizedTableSchemaConfig()
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS_SINKS) {
return appenderatorsManager.createOfflineAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler,
useMaxMemoryEstimates,
toolbox.getCentralizedTableSchemaConfig()
);
} else {
throw new IAE("Invalid batchProcesingMode[%s]", toolbox.getConfig().getBatchProcessingMode());
}
return appenderatorsManager.createBatchAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler,
useMaxMemoryEstimates,
toolbox.getCentralizedTableSchemaConfig()
);
}

public static BatchAppenderatorDriver newDriver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public void setUp() throws IOException
TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(temporaryFolder.newFile().toString())
.setDefaultRowFlushBoundary(50000)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.build();

taskToolbox = new TaskToolboxFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public class TaskConfigBuilder
private Period directoryLockTimeout;
private List<StorageLocationConfig> shuffleDataLocations;
private boolean ignoreTimestampSpecForDruidInputSource;
private boolean batchMemoryMappedIndex; // deprecated; only set to true to fall back to older behavior
private String batchProcessingMode;
private Boolean storeEmptyColumns;
private boolean enableTaskLevelLogPush;
private Long tmpStorageBytesPerTask;
Expand Down Expand Up @@ -102,18 +100,6 @@ public TaskConfigBuilder setIgnoreTimestampSpecForDruidInputSource(boolean ignor
return this;
}

public TaskConfigBuilder setBatchMemoryMappedIndex(boolean batchMemoryMappedIndex)
{
this.batchMemoryMappedIndex = batchMemoryMappedIndex;
return this;
}

public TaskConfigBuilder setBatchProcessingMode(String batchProcessingMode)
{
this.batchProcessingMode = batchProcessingMode;
return this;
}

public TaskConfigBuilder setStoreEmptyColumns(Boolean storeEmptyColumns)
{
this.storeEmptyColumns = storeEmptyColumns;
Expand Down Expand Up @@ -145,8 +131,6 @@ public TaskConfig build()
directoryLockTimeout,
shuffleDataLocations,
ignoreTimestampSpecForDruidInputSource,
batchMemoryMappedIndex,
batchProcessingMode,
storeEmptyColumns,
enableTaskLevelLogPush,
tmpStorageBytesPerTask
Expand Down
Loading

0 comments on commit 02b8738

Please sign in to comment.