Skip to content

Commit

Permalink
Merge commit 'd4228feca341cd707a3a26372cae71a94a93b4fd' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-june-week-2-2
  • Loading branch information
appletreeisyellow committed Jun 24, 2024
2 parents 0165366 + d4228fe commit 2bf07c9
Show file tree
Hide file tree
Showing 98 changed files with 2,033 additions and 1,530 deletions.
3 changes: 2 additions & 1 deletion benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
data
results
results
venv
24 changes: 18 additions & 6 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
#CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # for faster iterations
PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true}
VIRTUAL_ENV=${VIRTUAL_ENV:-$SCRIPT_DIR/venv}

usage() {
echo "
Expand All @@ -46,6 +47,7 @@ Usage:
$0 data [benchmark]
$0 run [benchmark]
$0 compare <branch1> <branch2>
$0 venv
**********
Examples:
Expand All @@ -62,6 +64,7 @@ DATAFUSION_DIR=/source/datafusion ./bench.sh run tpch
data: Generates or downloads data needed for benchmarking
run: Runs the named benchmark
compare: Compares results from benchmark runs
venv: Creates new venv (unless already exists) and installs compare's requirements into it
**********
* Benchmarks
Expand All @@ -84,7 +87,8 @@ DATA_DIR directory to store datasets
CARGO_COMMAND command that runs the benchmark binary
DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
PREFER_HASH_JOIN Prefer hash join algorithm(default true)
PREFER_HASH_JOIN Prefer hash join algorithm (default true)
VENV_PATH Python venv to use for compare and venv commands (default ./venv, override by <your-venv>/bin/activate)
"
exit 1
}
Expand Down Expand Up @@ -243,6 +247,9 @@ main() {
compare)
compare_benchmarks "$ARG2" "$ARG3"
;;
venv)
setup_venv
;;
"")
usage
;;
Expand Down Expand Up @@ -302,7 +309,7 @@ data_tpch() {
else
echo " creating parquet files using benchmark binary ..."
pushd "${SCRIPT_DIR}" > /dev/null
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --prefer_hash_join ${PREFER_HASH_JOIN} --output "${TPCH_DIR}" --format parquet
$CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
popd > /dev/null
fi
}
Expand Down Expand Up @@ -405,23 +412,23 @@ run_clickbench_1() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench benchmark with the partitioned parquet files
run_clickbench_partitioned() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (partitioned, 100 files) benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE}
}

# Runs the clickbench "extended" benchmark with a single large parquet file
run_clickbench_extended() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_extended.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running clickbench (1 file) extended benchmark..."
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --prefer_hash_join ${PREFER_HASH_JOIN} --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
$CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o ${RESULTS_FILE}
}

compare_benchmarks() {
Expand All @@ -448,13 +455,18 @@ compare_benchmarks() {
echo "--------------------"
echo "Benchmark ${bench}"
echo "--------------------"
python3 "${SCRIPT_DIR}"/compare.py "${RESULTS_FILE1}" "${RESULTS_FILE2}"
PATH=$VIRTUAL_ENV/bin:$PATH python3 "${SCRIPT_DIR}"/compare.py "${RESULTS_FILE1}" "${RESULTS_FILE2}"
else
echo "Note: Skipping ${RESULTS_FILE1} as ${RESULTS_FILE2} does not exist"
fi
done

}

setup_venv() {
python3 -m venv $VIRTUAL_ENV
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
}

# And start the process up
main
2 changes: 1 addition & 1 deletion benchmarks/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from rich.console import Console
from rich.table import Table
except ImportError:
print("Try `pip install rich` for using this script.")
print("Couldn't import modules -- run `./bench.sh venv` first")
raise


Expand Down
18 changes: 18 additions & 0 deletions benchmarks/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

rich
2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct Args {

#[clap(
long,
help = "The max number of rows to display for 'Table' format\n[default: 40] [possible values: numbers(0/10/...), inf(no limit)]",
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
default_value = "40"
)]
maxrows: MaxRows,
Expand Down
11 changes: 6 additions & 5 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3100,10 +3100,7 @@ mod tests {
let join_schema = physical_plan.schema();

match join_type {
JoinType::Inner
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti => {
JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
let left_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c1", &join_schema)?),
Arc::new(Column::new_with_schema("c2", &join_schema)?),
Expand All @@ -3113,7 +3110,10 @@ mod tests {
&Partitioning::Hash(left_exprs, default_partition_count)
);
}
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
JoinType::Inner
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti => {
let right_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c2_c1", &join_schema)?),
Arc::new(Column::new_with_schema("c2_c2", &join_schema)?),
Expand All @@ -3133,6 +3133,7 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn nested_explain_should_fail() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ mod test {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(
// select / skip all 20 rows in row group 1
// specifies all 20 rows in row group 1
vec![
RowSelector::select(5),
RowSelector::skip(7),
Expand Down Expand Up @@ -463,7 +463,7 @@ mod test {
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 12 rows, but row group 1 has 20
// specify only 12 rows in selection, but row group 1 has 20
RowGroupAccess::Selection(
vec![RowSelector::select(5), RowSelector::skip(7)].into(),
),
Expand All @@ -484,7 +484,7 @@ mod test {
fn test_invalid_too_many() {
let access_plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
// select 22 rows, but row group 1 has only 20
// specify 22 rows in selection, but row group 1 has only 20
RowGroupAccess::Selection(
vec![
RowSelector::select(10),
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ pub use writer::plan_to_parquet;
/// used to implement external indexes on top of parquet files and select only
/// portions of the files.
///
/// The `ParquetExec` will try and further reduce any provided
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and
/// other settings.
/// The `ParquetExec` will try and reduce any provided `ParquetAccessPlan`
/// further based on the contents of `ParquetMetadata` and other settings.
///
/// ## Example of providing a ParquetAccessPlan
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ fn create_initial_plan(

// check row group count matches the plan
return Ok(access_plan.clone());
} else {
debug!("ParquetExec Ignoring unknown extension specified for {file_name}");
}
}

Expand Down
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for
//! creating parquet file readers
//! low level control of parquet file readers
use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics};
use bytes::Bytes;
Expand All @@ -33,12 +33,19 @@ use std::sync::Arc;
///
/// The combined implementations of [`ParquetFileReaderFactory`] and
/// [`AsyncFileReader`] can be used to provide custom data access operations
/// such as pre-cached data, I/O coalescing, etc.
/// such as pre-cached metadata, I/O coalescing, etc.
///
/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
/// Provides an `AsyncFileReader` for reading data from a parquet file specified
///
/// # Notes
///
/// If the resulting [`AsyncFileReader`] returns `ParquetMetaData` without
/// page index information, the reader will load it on demand. Thus it is important
/// to ensure that the returned `ParquetMetaData` has the necessary information
/// if you wish to avoid a subsequent I/O
///
/// # Arguments
/// * partition_index - Index of the partition (for reporting metrics)
/// * file_meta - The file to be read
Expand Down
Loading

0 comments on commit 2bf07c9

Please sign in to comment.