Skip to content

Commit

Permalink
Consolidate Example: dataframe_output.rs into dataframe.rs (apache#13877
Browse files Browse the repository at this point in the history
)
  • Loading branch information
zhuqi-lucas committed Dec 23, 2024
1 parent b0185b2 commit d1c8e0b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 80 deletions.
3 changes: 1 addition & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ cargo run --example dataframe
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
Expand Down
67 changes: 67 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::config::CsvOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::DataFusionError;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
Expand All @@ -29,13 +33,19 @@ use tempfile::tempdir;
/// * [read_parquet]: execute queries against parquet files
/// * [read_csv]: execute queries against csv files
/// * [read_memory]: execute queries against in-memory arrow data
///
/// This example demonstrates the various methods to write out a DataFrame to local storage.
/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example
/// using a remote object store.
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
#[tokio::main]
async fn main() -> Result<()> {
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();
read_parquet(&ctx).await?;
read_csv(&ctx).await?;
read_memory(&ctx).await?;
write_out(&ctx).await?;
Ok(())
}

Expand Down Expand Up @@ -139,3 +149,60 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> {

Ok(())
}

/// Use the DataFrame API to:
/// 1. Write out a DataFrame to a table
/// 2. Write out a DataFrame to a parquet file
/// 3. Write out a DataFrame to a csv file
/// 4. Write out a DataFrame to a json file
async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionError> {
let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap();

// Ensure the column names and types match the target table
df = df.with_column_renamed("column1", "tablecol1").unwrap();

ctx.sql(
"create external table
test(tablecol1 varchar)
stored as parquet
location './datafusion-examples/test_table/'",
)
.await?
.collect()
.await?;

// This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c').
// The behavior of write_table depends on the TableProvider's implementation
// of the insert_into method.
df.clone()
.write_table("test", DataFrameWriteOptions::new())
.await?;

df.clone()
.write_parquet(
"./datafusion-examples/test_parquet/",
DataFrameWriteOptions::new(),
None,
)
.await?;

df.clone()
.write_csv(
"./datafusion-examples/test_csv/",
// DataFrameWriteOptions contains options which control how data is written
// such as compression codec
DataFrameWriteOptions::new(),
Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)),
)
.await?;

df.clone()
.write_json(
"./datafusion-examples/test_json/",
DataFrameWriteOptions::new(),
None,
)
.await?;

Ok(())
}
78 changes: 0 additions & 78 deletions datafusion-examples/examples/dataframe_output.rs

This file was deleted.

0 comments on commit d1c8e0b

Please sign in to comment.