Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream executor): adopt the ExecutorInner + ExecutionVars fashion in all stream executors #8882

Closed
stdrc opened this issue Mar 30, 2023 · 2 comments
Assignees

Comments

@stdrc
Copy link
Member

stdrc commented Mar 30, 2023

In HashAggExecutor we began to use a new fashion to write Executor:

pub struct XxxExecutor {
    input: BoxedExecutor,
    inner: ExecutorInner, // anything other than input executors are in `inner`
}

struct ExecutorInner {
    actor_ctx: ActorContextRef,
    info: ExecutorInfo,

    // ...Other executor arguments passed in via `XxxExecutor::new`.
    // These are expected to be immutable except for state tables.
}

struct ExecutionVars {
    xxx_cache: ExecutorCache,
    xxx_builder: ChunkBuilder,
    buffered_xxx: Foo,
    // etc

    // These are mutable variables that are modified at runtime
    // when handling incoming chunks.
}

// just like before
impl Executor for XxxExecutor {
    fn execute(self: Box<Self>) -> BoxedMessageStream {
        self.execute_inner().boxed()
    }

    fn schema(&self) -> &Schema {
        &self.inner.info.schema
    }

    fn pk_indices(&self) -> PkIndicesRef<'_> {
        &self.inner.info.pk_indices
    }

    fn identity(&self) -> &str {
        &self.inner.info.identity
    }
}

impl XxxExecutor {
    pub fn new(args: XxxExecutorArgs) -> StreamResult<Self> {
        Ok(Self {
            input: args.input,
            inner: ExecutorInner {
                // ...
            }
        })
    }

    async fn handle_chunk(
        this: &mut ExecutorInner,
        vars: &mut ExecutionVars,
        chunk: StreamChunk
    ) -> StreamExecutorResult<()> {
        // blahblah
    }

    #[try_stream(ok = Message, error = StreamExecutorError)]
    async fn execute_inner(self) {
        let HashAggExecutor {
            input,
            inner: mut this,
        } = self;

        let mut input = input.execute();
        let barrier = expect_first_barrier(&mut input).await?;
        // ...Initialize epoch for state tables.
        yield Message::Barrier(barrier);

        // Create execution variables according to `this`.
        let mut vars = ExecutionVars {
            // ...
        }

        #[for_await]
        for msg in input {
            match msg? {
                Message::Chunk(chunk) => {
                    Self::handle_chunk(&mut this, &mut vars, chunk).await?;
                }
                // ...
            }
        }
    }
}

I think this new fashion can be adopted by other stream executors as well, to improve code readability and extensibility.

@github-actions github-actions bot added this to the release-0.19 milestone Mar 30, 2023
@fuyufjh fuyufjh removed this from the release-0.19 milestone Apr 4, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Jun 4, 2023

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

@stdrc
Copy link
Member Author

stdrc commented Nov 4, 2024

Seems not important any more. Closing this.

@stdrc stdrc closed this as not planned Won't fix, can't repro, duplicate, stale Nov 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants