Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for non-opid-emitting sources to be restartable. #2474

Closed
wants to merge 1 commit into from
Closed

Allow for non-opid-emitting sources to be restartable. #2474

wants to merge 1 commit into from

Conversation

Jesse-Bakker
Copy link
Contributor

This is useful for the aerospike source, because it uses XDR to track
source offsets.

Additionally, the oracle sink can now be append-only and it does not
need additional metadata columns in that case. It will also not create a
metadata table if the source does not emit OpIdentifiers.

This is useful for the aerospike source, because it uses XDR to track
source offsets.

Additionally, the oracle sink can now be append-only and it does not
need additional metadata columns in that case. It will also not create a
metadata table if the source does not emit OpIdentifiers.
@@ -56,14 +56,20 @@ pub trait SourceFactory: Send + Sync + Debug {
) -> Result<Box<dyn Source>, BoxedError>;
}

pub struct SourceMessage {
pub id: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of get that id is a unique id that's only in memory and never persisted, but I think a comment here would be helpful.

fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError>;
fn set_source_state_data(&mut self, source_state: &[u8]) -> Result<(), BoxedError>;
fn get_source_state_data(&mut self) -> Result<Option<Vec<u8>>, BoxedError>;
fn get_source_state(&mut self) -> Result<SourceState, BoxedError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename SourceState to OperationState? Now the names are getting really confusing.

@@ -66,11 +67,17 @@ impl Epoch {
common_info: EpochCommonInfo { id, source_states },
decision_instant,
source_time: None,
originating_msg_id: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all I found a bug in previous code. In #2335 I said Commit "is generated when all sources happen to be at a transaction boundary", but apparently this commit generation doesn't have the "all sources be on boundary" check.

This might have misled you to believe that, every source's TransactionInfo::Commit message results in a ExecutorOperation::Commit, hence adding this field to record the original message id.

I think I should fix the problem and move originating_msg_id as part of common_info.source_states once this is merged.

@@ -104,23 +104,61 @@ impl OpIdentifier {
}
}

#[derive(Debug, Clone)]
pub enum ResumeState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be my oversight but I didn't find where this is used.

// Ignore the error, because the server can be down.
for message in message.messages {
let _ = ingestor.handle_message(message).await;
let _ = ingestor.handle_message(message).await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we unwrap, we don't have to write let _ = . And the comment above should be removed.

@Jesse-Bakker Jesse-Bakker closed this by deleting the head repository Apr 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants