Skip to content

Commit

Permalink
Minor: Add implementation examples to ExecutionPlan::execute (apache#…
Browse files Browse the repository at this point in the history
…8013)

* Add implementation examples to ExecutionPlan::execute

* Review feedback
  • Loading branch information
tustvold authored Nov 1, 2023
1 parent 94dac76 commit 35e8e33
Showing 1 changed file with 104 additions and 0 deletions.
104 changes: 104 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,110 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}

/// Begin execution of `partition`, returning a stream of [`RecordBatch`]es.
///
/// # Implementation Examples
///
/// ## Return Precomputed Batch
///
/// We can return a precomputed batch as a stream
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::RecordBatch;
/// # use arrow_schema::SchemaRef;
/// # use datafusion_common::Result;
/// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
/// # use datafusion_physical_plan::memory::MemoryStream;
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
/// struct MyPlan {
/// batch: RecordBatch,
/// }
///
/// impl MyPlan {
/// fn execute(
/// &self,
/// partition: usize,
/// context: Arc<TaskContext>
/// ) -> Result<SendableRecordBatchStream> {
/// let fut = futures::future::ready(Ok(self.batch.clone()));
/// let stream = futures::stream::once(fut);
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
/// }
/// }
/// ```
///
/// ## Async Compute Batch
///
/// We can also lazily compute a RecordBatch when the returned stream is polled
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::RecordBatch;
/// # use arrow_schema::SchemaRef;
/// # use datafusion_common::Result;
/// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
/// # use datafusion_physical_plan::memory::MemoryStream;
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
/// struct MyPlan {
/// schema: SchemaRef,
/// }
///
/// async fn get_batch() -> Result<RecordBatch> {
/// todo!()
/// }
///
/// impl MyPlan {
/// fn execute(
/// &self,
/// partition: usize,
/// context: Arc<TaskContext>
/// ) -> Result<SendableRecordBatchStream> {
/// let fut = get_batch();
/// let stream = futures::stream::once(fut);
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
/// }
/// }
/// ```
///
/// ## Async Compute Batch Stream
///
/// We can lazily compute a RecordBatch stream when the returned stream is polled
/// flattening the result into a single stream
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::RecordBatch;
/// # use arrow_schema::SchemaRef;
/// # use futures::TryStreamExt;
/// # use datafusion_common::Result;
/// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
/// # use datafusion_physical_plan::memory::MemoryStream;
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
/// struct MyPlan {
/// schema: SchemaRef,
/// }
///
/// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
/// todo!()
/// }
///
/// impl MyPlan {
/// fn execute(
/// &self,
/// partition: usize,
/// context: Arc<TaskContext>
/// ) -> Result<SendableRecordBatchStream> {
/// // A future that yields a stream
/// let fut = get_batch_stream();
/// // Use TryStreamExt::try_flatten to flatten the stream of streams
/// let stream = futures::stream::once(fut).try_flatten();
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
/// }
/// }
/// ```
///
/// See [`futures::stream::StreamExt`] and [`futures::stream::TryStreamExt`] for further
/// combinators that can be used with streams
fn execute(
&self,
partition: usize,
Expand Down

0 comments on commit 35e8e33

Please sign in to comment.