Skip to content

Commit

Permalink
Refactor usage of InventoryDumperContextSplitter (#32592)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Aug 18, 2024
1 parent 7c5c88f commit a51323f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public List<InventoryTask> split(final TransmissionJobItemContext jobItemContext
List<InventoryTask> result = new LinkedList<>();
long startTimeMillis = System.currentTimeMillis();
TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
for (InventoryDumperContext each : new InventoryDumperContextSplitter(sourceDataSource, dumperContext).split(jobItemContext)) {
InventoryDumperContextSplitter dumperContextSplitter = new InventoryDumperContextSplitter(sourceDataSource, dumperContext);
for (InventoryDumperContext each : dumperContextSplitter.split(jobItemContext)) {
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel = InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(), importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
TransmissionProcessContext processContext = jobItemContext.getJobProcessContext();
for (InventoryDumperContext each : new InventoryDumperContextSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()))
.split(jobItemContext)) {
InventoryDumperContextSplitter dumperContextSplitter = new InventoryDumperContextSplitter(
jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()));
for (InventoryDumperContext each : dumperContextSplitter.split(jobItemContext)) {
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel = InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(), importerConfig.getBatchSize(), position);
if (!(position.get() instanceof IngestFinishedPosition)) {
Expand Down

0 comments on commit a51323f

Please sign in to comment.