Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let source backfill finish when there's no data from upstream #18299

Closed
Tracked by #16003
xxchan opened this issue Aug 28, 2024 · 1 comment
Closed
Tracked by #16003

Let source backfill finish when there's no data from upstream #18299

xxchan opened this issue Aug 28, 2024 · 1 comment
Assignees
Milestone

Comments

@xxchan
Copy link
Member

xxchan commented Aug 28, 2024

Current status and the problem

Previous algorithm in risingwavelabs/rfcs#72:
image

We only track backfill_offset, but not upstream_offset, and we finish backfill only when upstream_offset > backfill offset. So if there's no data from upstream, we cannot finish backfill.

This wasn't a problem until we want blocking DDL on source (#15587). The DDL will be blocked forever until new messages come, which would confuse users.

Solution

Since the message stream is unbounded, we have to do such kind of "compare progress" between backfill and upstream stream. But we can improve that by checking in the backfill side instead of the upstream side. The former is more controllable in the SourceBackfillExecutor.

  1. We add a new memory state target_offset, which is updated to upstream_offset when upstream messages come. Then we can compare progress and finish backfill in the backfill side. (To be more precise, we can only enter SourceCachingUp stage and wait for upstream_offset to be larger than it.) After this, we still cannot finish backfill when there's no upstream message, but it may end slightly faster.
  2. Initialize target_offset to the latest available offset instead of None (in Kafka this is high_watermark-1). With this, we can finish backfill even there's no message from upstream. Note that this will relies on the connector to providing the ability to fetch latest_offset.
  3. There's still a case not covered: there's no history data to backfill at all. We still cannot finish backfill in this case. To optimize, we need the ability of has_message_available provided by the connector. And check this before starting backfill.
  4. Perhaps we can also use has_message_available as an exit condition of backfill, but that may require larger change in the source reader.

Diagram of new algorithm:
(Note: this applies to each split. After #18300, we do not need special hacks (wait for all splits finished to finish backfill), and can treat all splits equally.)

stateDiagram-v2
    state Init <<choice>>
    [*] --> hasMessageAvailable
    hasMessageAvailable --> Init
    Init --> Backfilling: Yes (<i>Or Unknown</i>)<br/>target_offset #colone; fetchLatestOffset() (<i>Or None</i>)
    Init --> Finished: No
    state Backfilling {
        [*] --> backfilling
        state bf_choice <<choice>>
        backfilling --> bf_choice: <b>backfill message</b><br/>update backfill_offset
        bf_choice --> [*]:  <b>if</b> backfill_offset >= target_offset<br/><b>close backfill stream</b>
        bf_choice --> backfilling : <b>if</b> backfill_offset < target_offset<br/>

        backfilling --> bf_choice: <b>upstream message</b><br/>target_offset #colone; upstream_offset
    }
    Backfilling --> SourceCatchingUp
    state SourceCatchingUp {
        [*] --> source_catching_up
        state choice <<choice>>
        source_catching_up --> source_catching_up: <b>backfill message</b><br/>ignore
        source_catching_up --> choice: <b>upstream message</b> 
        choice --> source_catching_up:   <b>if</b> upstream_offset < backfill_offset<br/>
        choice --> [*]:   <b>if</b> upstream_offset >= backfill_offset <br/>
    }
    SourceCatchingUp --> Finished
    Finished --> ForwardingUpstream
Loading

Connector APIs

I investigated some connectors' API docs to see whether they can get latest_offset, and has_message_available

Note: it might be tempting to just use poll to check has_message_available, but not all connector APIs will return sth like NoMoreMessage in poll. We cannot distinguish timeout from no more messages in this case.

  • Kafka
    • latest_offset: high_watermark-1
    • has_message_available: high_watermark>low_watermark
  • Kinesis
    • Does not support neither latest_offset nor has_message_available directly.
      • DescribeStream return EndingSequenceNumber only for closed shards (after shard merge/split), not open ones.
    • However, GetRecords will return MillisBehindLatest. "A value of zero indicates that record processing is caught up, and there are no new records to process at this moment." So we can use this API to workaround (we will fetch and drop data). Note that
  • Pulsar
    • supports getLastMessageIds
    • supports hasMessageAvailable. It explicitly mentioned that "This check can be used by an application to scan through a topic and stop when the reader reaches the current last published message.".

Note: another possibly useful ability is "fetch last 1/N messages". But this is subtle. We need to make sure the message is "inclusive".

  • Kafka: supports OffsetTail(n) natively (will become -n in the protocol). And Kafka offset is consecutive.
  • Pulsar: supports startMessageId(MessageId.latest).startMessageIdInclusive() to read last 1. But not last N.
  • Kinesis: seems not supported.
@xxchan
Copy link
Member Author

xxchan commented Sep 18, 2024

We've implemented 1-3 for kafka with:

/// Information used to determine whether we should start and finish source backfill.
///
/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
/// perhaps we should ban blocking DDL for it.
#[derive(Debug, Clone)]
pub enum BackfillInfo {
    HasDataToBackfill {
        /// The last available offsets for each split (**inclusive**).
        ///
        /// This will be used to determine whether source backfill is finished when
        /// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
        /// blocking DDL cannot finish until new messages come.
        ///
        /// When there are upstream messages, we will use the latest offsets from the upstream.
        latest_offset: String,
    },
    /// If there are no messages in the split at all, we don't need to start backfill.
    /// In this case, there will be no message from the backfill stream too.
    /// If we started backfill, we cannot finish it until new messages come.
    /// So we mark this a special case for optimization.
    NoDataToBackfill,
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant