Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-29764 Add timeout to sink put cycle #801

Merged
merged 3 commits into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
+ "`errors.tolerance` should be set to 'all' for successfully writing into dlq";
public static final String REPORT_NULL_RECORDS_TO_DLQ_DISPLAY = "Report null value to dlq";

public static final String MAX_WRITE_DURATION = "max.write.duration.ms";
public static final long MAX_WRITE_DURATION_DEFAULT = Long.MAX_VALUE;
public static final String MAX_WRITE_DURATION_DOC = "The maximum duration that a task will "
+ "spend in batching and writing to S3. If the write operation takes longer than this "
+ "the task will voluntarily return from the put method. This prevents the consumer from "
+ "being revoked from the group. It also mitigates (but does not eliminate) the risk of "
+ "a zombie task to continue writing to S3 after it has been revoked.";

/**
* Maximum back-off time when retrying failed requests.
*/
Expand Down Expand Up @@ -631,6 +639,18 @@ public static ConfigDef newConfigDef() {
Width.SHORT,
REPORT_NULL_RECORDS_TO_DLQ_DISPLAY
);

configDef.define(
MAX_WRITE_DURATION,
Type.LONG,
MAX_WRITE_DURATION_DEFAULT,
Importance.LOW,
MAX_WRITE_DURATION_DOC,
group,
++orderInGroup,
Width.SHORT,
"Maximum write duration"
);
}

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.confluent.common.utils.SystemTime;
Expand Down Expand Up @@ -211,6 +214,8 @@ private Partitioner<?> newPartitioner(S3SinkConnectorConfig config)

@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
long putStartTime = time.milliseconds();

for (SinkRecord record : records) {
String topic = record.topic();
int partition = record.kafkaPartition();
Expand All @@ -228,9 +233,15 @@ public void put(Collection<SinkRecord> records) throws ConnectException {
log.debug("Read {} records from Kafka", records.size());
}

for (TopicPartition tp : topicPartitionWriters.keySet()) {
//shuffle the topic partitions as otherwise the last topic partition will
//always get the least amount of time to write
List<TopicPartition> shuffledList = new ArrayList<>(topicPartitionWriters.keySet());
Collections.shuffle(shuffledList);

for (TopicPartition tp : shuffledList) {
TopicPartitionWriter writer = topicPartitionWriters.get(tp);
try {
writer.setWriteDeadline(putStartTime);
writer.write();
if (log.isDebugEnabled()) {
log.debug("TopicPartition: {}, SchemaCompatibility:{}, FileRotations: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public class TopicPartitionWriter {
private static final Time SYSTEM_TIME = new SystemTime();
private ErrantRecordReporter reporter;

private final long maxWriteDurationMs;
private long writeDeadline;

boolean isPaused = false;

private final FileRotationTracker fileRotationTracker;

public TopicPartitionWriter(TopicPartition tp,
Expand Down Expand Up @@ -173,6 +178,9 @@ public TopicPartitionWriter(TopicPartition tp,
+ "d";
fileRotationTracker = new FileRotationTracker();

maxWriteDurationMs = connectorConfig.getLong(S3SinkConnectorConfig.MAX_WRITE_DURATION);
writeDeadline = Long.MAX_VALUE;

// Initialize scheduled rotation timer if applicable
setNextScheduledRotation();
}
Expand Down Expand Up @@ -200,7 +208,8 @@ public void write() {

resetExpiredScheduledRotationIfNoPendingRecords(now);

while (!buffer.isEmpty()) {

while (!buffer.isEmpty() && !isWriteDeadlineExceeded()) {
try {
executeState(now);
} catch (IllegalWorkerStateException e) {
Expand All @@ -214,7 +223,39 @@ public void write() {
}
}
}
commitOnTimeIfNoData(now);
if (!isWriteDeadlineExceeded()) {
commitOnTimeIfNoData(now);
}
pauseOrResumeOnBuffer();

}

private void pauseOrResumeOnBuffer() {
// if the deadline exceeds before all the records in buffer are processed, pause the writer
// if the buffered records are greater than flush size, this is to ensure that we don't keep
// getting messages from the consumer while we are still processing the buffer which can lead
// to memory issues
if (buffer.size() >= Math.max(flushSize, 1)) {
pause();
} else if (isPaused) {
resume();
}
}

public void setWriteDeadline(long currentTimeMs) {
writeDeadline = currentTimeMs + maxWriteDurationMs;
//prevent overflow
if (writeDeadline < 0) {
writeDeadline = Long.MAX_VALUE;
}
}

protected boolean isWriteDeadlineExceeded() {
boolean isWriteDeadlineExceeded = time.milliseconds() > writeDeadline;
if (isWriteDeadlineExceeded) {
log.info("Deadline exceeded");
}
return isWriteDeadlineExceeded;
}

@SuppressWarnings("fallthrough")
Expand Down Expand Up @@ -262,6 +303,11 @@ private void executeState(long now) {
}
// fallthrough
case SHOULD_ROTATE:
if (isWriteDeadlineExceeded()) {
// note: this is a best-effort attempt to rotate the file before the deadline
// this check can pass and the deadline gets exceeded before the rotation is complete
break;
}
commitFiles();
nextState();
// fallthrough
Expand Down Expand Up @@ -502,11 +548,13 @@ private boolean rotateOnSize() {
private void pause() {
log.trace("Pausing writer for topic-partition '{}'", tp);
context.pause(tp);
isPaused = true;
}

private void resume() {
log.trace("Resuming writer for topic-partition '{}'", tp);
context.resume(tp);
isPaused = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be false right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. I have fixed it

}

private RecordWriter newWriter(SinkRecord record, String encodedPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,9 @@ public void testWallclockUsesBatchTimePartitionBoundary() throws Exception {
// Freeze clock passed into topicPartitionWriter, so we know what time it will use for "now"
long freezeTime = 3599000L;
EasyMock.expect(systemTime.milliseconds()).andReturn(freezeTime);

// Mock system time for deadline checks
EasyMock.expect(systemTime.milliseconds()).andReturn(System.currentTimeMillis()).anyTimes();
EasyMock.replay(systemTime);

String key = "key";
Expand Down