-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow for non-opid-emitting sources to be restartable. #2474
Conversation
This is useful for the aerospike source, because it uses XDR to track source offsets. Additionally, the oracle sink can now be append-only and it does not need additional metadata columns in that case. It will also not create a metadata table if the source does not emit OpIdentifiers.
@@ -56,14 +56,20 @@ pub trait SourceFactory: Send + Sync + Debug { | |||
) -> Result<Box<dyn Source>, BoxedError>; | |||
} | |||
|
|||
pub struct SourceMessage { | |||
pub id: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of get that id
is a unique id that's only in memory and never persisted, but I think a comment here would be helpful.
fn get_latest_op_id(&mut self) -> Result<Option<OpIdentifier>, BoxedError>; | ||
fn set_source_state_data(&mut self, source_state: &[u8]) -> Result<(), BoxedError>; | ||
fn get_source_state_data(&mut self) -> Result<Option<Vec<u8>>, BoxedError>; | ||
fn get_source_state(&mut self) -> Result<SourceState, BoxedError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename SourceState
to OperationState
? Now the names are getting really confusing.
@@ -66,11 +67,17 @@ impl Epoch { | |||
common_info: EpochCommonInfo { id, source_states }, | |||
decision_instant, | |||
source_time: None, | |||
originating_msg_id: None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all I found a bug in previous code. In #2335 I said Commit
"is generated when all sources happen to be at a transaction boundary", but apparently this commit generation doesn't have the "all sources be on boundary" check.
This might have misled you to believe that, every source's TransactionInfo::Commit
message results in a ExecutorOperation::Commit
, hence adding this field to record the original message id.
I think I should fix the problem and move originating_msg_id
as part of common_info.source_states
once this is merged.
@@ -104,23 +104,61 @@ impl OpIdentifier { | |||
} | |||
} | |||
|
|||
#[derive(Debug, Clone)] | |||
pub enum ResumeState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be my oversight but I didn't find where this is used.
// Ignore the error, because the server can be down. | ||
for message in message.messages { | ||
let _ = ingestor.handle_message(message).await; | ||
let _ = ingestor.handle_message(message).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we unwrap
, we don't have to write let _ =
. And the comment above should be removed.
This is useful for the aerospike source, because it uses XDR to track
source offsets.
Additionally, the oracle sink can now be append-only and it does not
need additional metadata columns in that case. It will also not create a
metadata table if the source does not emit OpIdentifiers.