You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We only track backfill_offset, but not upstream_offset, and we finish backfill only when upstream_offset > backfill offset. So if there's no data from upstream, we cannot finish backfill.
This wasn't a problem until we want blocking DDL on source (#15587). The DDL will be blocked forever until new messages come, which would confuse users.
Solution
Since the message stream is unbounded, we have to do such kind of "compare progress" between backfill and upstream stream. But we can improve that by checking in the backfill side instead of the upstream side. The former is more controllable in the SourceBackfillExecutor.
We add a new memory state target_offset, which is updated to upstream_offset when upstream messages come. Then we can compare progress and finish backfill in the backfill side. (To be more precise, we can only enter SourceCachingUp stage and wait for upstream_offset to be larger than it.) After this, we still cannot finish backfill when there's no upstream message, but it may end slightly faster.
Initialize target_offset to the latest available offset instead of None (in Kafka this is high_watermark-1). With this, we can finish backfill even there's no message from upstream. Note that this will relies on the connector to providing the ability to fetch latest_offset.
There's still a case not covered: there's no history data to backfill at all. We still cannot finish backfill in this case. To optimize, we need the ability of has_message_available provided by the connector. And check this before starting backfill.
Perhaps we can also use has_message_available as an exit condition of backfill, but that may require larger change in the source reader.
Diagram of new algorithm:
(Note: this applies to each split. After #18300, we do not need special hacks (wait for all splits finished to finish backfill), and can treat all splits equally.)
I investigated some connectors' API docs to see whether they can get latest_offset, and has_message_available
Note: it might be tempting to just use poll to check has_message_available, but not all connector APIs will return sth like NoMoreMessage in poll. We cannot distinguish timeout from no more messages in this case.
Does not support neither latest_offset nor has_message_available directly.
DescribeStream return EndingSequenceNumber only for closed shards (after shard merge/split), not open ones.
However, GetRecords will return MillisBehindLatest. "A value of zero indicates that record processing is caught up, and there are no new records to process at this moment." So we can use this API to workaround (we will fetch and drop data). Note that
supports hasMessageAvailable. It explicitly mentioned that "This check can be used by an application to scan through a topic and stop when the reader reaches the current last published message.".
Note: another possibly useful ability is "fetch last 1/N messages". But this is subtle. We need to make sure the message is "inclusive".
Kafka: supports OffsetTail(n) natively (will become -n in the protocol). And Kafka offset is consecutive.
Pulsar: supports startMessageId(MessageId.latest).startMessageIdInclusive() to read last 1. But not last N.
Kinesis: seems not supported.
The text was updated successfully, but these errors were encountered:
/// Information used to determine whether we should start and finish source backfill.////// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),/// perhaps we should ban blocking DDL for it.#[derive(Debug,Clone)]pubenumBackfillInfo{HasDataToBackfill{/// The last available offsets for each split (**inclusive**).////// This will be used to determine whether source backfill is finished when/// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,/// blocking DDL cannot finish until new messages come.////// When there are upstream messages, we will use the latest offsets from the upstream.latest_offset:String,},/// If there are no messages in the split at all, we don't need to start backfill./// In this case, there will be no message from the backfill stream too./// If we started backfill, we cannot finish it until new messages come./// So we mark this a special case for optimization.NoDataToBackfill,}
Current status and the problem
Previous algorithm in risingwavelabs/rfcs#72:
We only track
backfill_offset
, but notupstream_offset
, and we finish backfill only whenupstream_offset > backfill offset
. So if there's no data from upstream, we cannot finish backfill.This wasn't a problem until we want blocking DDL on source (#15587). The DDL will be blocked forever until new messages come, which would confuse users.
Solution
Since the message stream is unbounded, we have to do such kind of "compare progress" between backfill and upstream stream. But we can improve that by checking in the backfill side instead of the upstream side. The former is more controllable in the
SourceBackfillExecutor
.target_offset
, which is updated toupstream_offset
when upstream messages come. Then we can compare progress and finish backfill in the backfill side. (To be more precise, we can only enterSourceCachingUp
stage and wait forupstream_offset
to be larger than it.) After this, we still cannot finish backfill when there's no upstream message, but it may end slightly faster.target_offset
to the latest available offset instead ofNone
(in Kafka this ishigh_watermark-1
). With this, we can finish backfill even there's no message from upstream. Note that this will relies on the connector to providing the ability to fetchlatest_offset
.has_message_available
provided by the connector. And check this before starting backfill.has_message_available
as an exit condition of backfill, but that may require larger change in the source reader.Diagram of new algorithm:
(Note: this applies to each split. After #18300, we do not need special hacks (wait for all splits finished to finish backfill), and can treat all splits equally.)
Connector APIs
I investigated some connectors' API docs to see whether they can get
latest_offset
, andhas_message_available
Note: it might be tempting to just use
poll
to checkhas_message_available
, but not all connector APIs will return sth likeNoMoreMessage
inpoll
. We cannot distinguish timeout from no more messages in this case.latest_offset
:high_watermark-1
has_message_available
:high_watermark>low_watermark
latest_offset
norhas_message_available
directly.DescribeStream
returnEndingSequenceNumber
only for closed shards (after shard merge/split), not open ones.GetRecords
will returnMillisBehindLatest
. "A value of zero indicates that record processing is caught up, and there are no new records to process at this moment." So we can use this API to workaround (we will fetch and drop data). Note thatgetLastMessageIds
hasMessageAvailable
. It explicitly mentioned that "This check can be used by an application to scan through a topic and stop when the reader reaches the current last published message.".Note: another possibly useful ability is "fetch last 1/N messages". But this is subtle. We need to make sure the message is "inclusive".
OffsetTail(n)
natively (will become-n
in the protocol). And Kafka offset is consecutive.startMessageId(MessageId.latest).startMessageIdInclusive()
to read last 1. But not last N.The text was updated successfully, but these errors were encountered: