Skip to content

Commit

Permalink
Improve logging to include taskId in segment handoff notifier thread (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
hardikbajaj authored Oct 1, 2024
1 parent f33f60b commit 3d56fa6
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ protected boolean waitForSegmentAvailability(

try (
SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory()
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource())
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource(), getId())
) {

final ExecutorService exec = Execs.directExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,7 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout()
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once();
EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes();
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once();
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource", indexTask.getId())).andReturn(mockNotifier).once();
mockNotifier.start();
EasyMock.expectLastCall().once();
mockNotifier.registerSegmentHandoffCallback(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private SegmentHandoffNotifierFactory setUpSegmentHandOffNotifierFactory()
return new SegmentHandoffNotifierFactory()
{
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId)
{
return new SegmentHandoffNotifier()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ public boolean checkPointDataSourceMetadata(
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskActionToolbox
);
final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier()
final SegmentHandoffNotifierFactory handoffNotifierFactory = (dataSource, taskId) -> new SegmentHandoffNotifier()
{
@Override
public boolean registerSegmentHandoffCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,25 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
private volatile ScheduledExecutorService scheduledExecutor;
private final Duration pollDuration;
private final String dataSource;
private final String taskId;

public CoordinatorBasedSegmentHandoffNotifier(
String dataSource,
CoordinatorClient coordinatorClient,
CoordinatorBasedSegmentHandoffNotifierConfig config
CoordinatorBasedSegmentHandoffNotifierConfig config,
String taskId
)
{
this.dataSource = dataSource;
this.coordinatorClient = coordinatorClient;
this.pollDuration = config.getPollDuration();
this.taskId = taskId;
}

@Override
public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable)
{
log.debug("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s]", dataSource, descriptor);
log.debug("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s] for task[%s]", dataSource, descriptor, taskId);
Pair<Executor, Runnable> prev = handOffCallbacks.putIfAbsent(
descriptor,
new Pair<>(exec, handOffRunnable)
Expand Down Expand Up @@ -91,30 +94,32 @@ void checkForSegmentHandoffs()
Boolean handOffComplete =
FutureUtils.getUnchecked(coordinatorClient.isHandoffComplete(dataSource, descriptor), true);
if (Boolean.TRUE.equals(handOffComplete)) {
log.debug("Segment handoff complete for dataSource[%s] segment[%s]", dataSource, descriptor);
log.debug("Segment handoff complete for dataSource[%s] segment[%s] for task[%s]", dataSource, descriptor, taskId);
entry.getValue().lhs.execute(entry.getValue().rhs);
itr.remove();
}
}
catch (Exception e) {
log.error(
e,
"Exception while checking handoff for dataSource[%s] Segment[%s]; will try again after [%s]",
"Exception while checking handoff for dataSource[%s] Segment[%s], taskId[%s]; will try again after [%s]",
dataSource,
descriptor,
taskId,
pollDuration
);
}
}
if (!handOffCallbacks.isEmpty()) {
log.info("Still waiting for handoff for [%d] segments", handOffCallbacks.size());
log.info("Still waiting for handoff for [%d] segments for task[%s]", handOffCallbacks.size(), taskId);
}
}
catch (Throwable t) {
log.error(
t,
"Exception while checking handoff for dataSource[%s]; will try again after [%s]",
"Exception while checking handoff for dataSource[%s], taskId[%s]; will try again after [%s]",
dataSource,
taskId,
pollDuration
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ public CoordinatorBasedSegmentHandoffNotifierFactory(
}

@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId)
{
return new CoordinatorBasedSegmentHandoffNotifier(
dataSource,
client,
config
config,
taskId
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void close()
};

@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId)
{
return NOTIFIER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@

public interface SegmentHandoffNotifierFactory
{
SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource);
SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public StreamAppenderatorDriver(
super(appenderator, segmentAllocator, segmentRetriever, dataSegmentKiller);

this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory")
.createSegmentHandoffNotifier(appenderator.getDataSource());
.createSegmentHandoffNotifier(appenderator.getDataSource(), appenderator.getId());
this.metrics = Preconditions.checkNotNull(metrics, "metrics");
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
}
Expand Down Expand Up @@ -360,7 +360,7 @@ public ListenableFuture<SegmentsAndCommitMetadata> registerHandoff(SegmentsAndCo
),
Execs.directExecutor(),
() -> {
log.debug("Segment[%s] successfully handed off, dropping.", segmentIdentifier);
log.debug("Segment[%s] successfully handed off for task[%s], dropping.", segmentIdentifier, appenderator.getId());
metrics.incrementHandOffCount();

final ListenableFuture<?> dropFuture = appenderator.drop(segmentIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public void testHandoffCallbackNotCalled()
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
"test_ds",
coordinatorClient,
notifierConfig
notifierConfig,
"test_task"
);
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
notifier.registerSegmentHandoffCallback(
Expand Down Expand Up @@ -89,7 +90,8 @@ public void testHandoffCallbackCalled()
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
"test_ds",
coordinatorClient,
notifierConfig
notifierConfig,
"test_task"
);

notifier.registerSegmentHandoffCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public Set<SegmentDescriptor> getHandedOffSegmentDescriptors()
}

@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId)
{
return new SegmentHandoffNotifier()
{
Expand Down

0 comments on commit 3d56fa6

Please sign in to comment.