Skip to content

Commit

Permalink
fix: test on windows (#2462)
Browse files Browse the repository at this point in the history
* fix: test on windows

* fix: fix windows root

* fix: use relative path instead of root

* fix: remove incorrect replace

* fix: fix tests

---------

Co-authored-by: WenyXu <[email protected]>
  • Loading branch information
fengjiachun and WenyXu authored Sep 21, 2023
1 parent c7b490e commit 40781ec
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 78 deletions.
69 changes: 34 additions & 35 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,38 +194,37 @@ jobs:
runs-on: windows-latest-8-cores
timeout-minutes: 60
steps:
- run: 'echo "temporary disabled"'
# - run: git config --global core.autocrlf false
# - uses: actions/checkout@v3
# - uses: arduino/setup-protoc@v1
# with:
# repo-token: ${{ secrets.GITHUB_TOKEN }}
# - name: Install Rust toolchain
# uses: dtolnay/rust-toolchain@master
# with:
# toolchain: ${{ env.RUST_TOOLCHAIN }}
# components: llvm-tools-preview
# - name: Rust Cache
# uses: Swatinem/rust-cache@v2
# - name: Install Cargo Nextest
# uses: taiki-e/install-action@nextest
# - name: Install Python
# uses: actions/setup-python@v4
# with:
# python-version: '3.10'
# - name: Install PyArrow Package
# run: pip install pyarrow
# - name: Install WSL distribution
# uses: Vampire/setup-wsl@v2
# with:
# distribution: Ubuntu-22.04
# - name: Running tests
# run: cargo nextest run -F pyo3_backend,dashboard
# env:
# RUST_BACKTRACE: 1
# CARGO_INCREMENTAL: 0
# GT_S3_BUCKET: ${{ secrets.S3_BUCKET }}
# GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}
# GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }}
# GT_S3_REGION: ${{ secrets.S3_REGION }}
# UNITTEST_LOG_DIR: "__unittest_logs"
- run: git config --global core.autocrlf false
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: llvm-tools-preview
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Install Cargo Nextest
uses: taiki-e/install-action@nextest
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install PyArrow Package
run: pip install pyarrow
- name: Install WSL distribution
uses: Vampire/setup-wsl@v2
with:
distribution: Ubuntu-22.04
- name: Running tests
run: cargo nextest run -F pyo3_backend,dashboard
env:
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ secrets.S3_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }}
GT_S3_REGION: ${{ secrets.S3_REGION }}
UNITTEST_LOG_DIR: "__unittest_logs"
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ common-runtime = { workspace = true }
datafusion.workspace = true
derive_builder.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store = { workspace = true }
orc-rust = "0.2"
paste = "1.0"
Expand Down
3 changes: 2 additions & 1 deletion src/common/datasource/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ pub enum Error {
location: Location,
},

#[snafu(display("Unsupported backend protocol: {}", protocol))]
#[snafu(display("Unsupported backend protocol: {}, url: {}", protocol, url))]
UnsupportedBackendProtocol {
protocol: String,
location: Location,
url: String,
},

#[snafu(display("Unsupported format protocol: {}", format))]
Expand Down
25 changes: 12 additions & 13 deletions src/common/datasource/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ pub enum Source {
pub struct Lister {
object_store: ObjectStore,
source: Source,
path: String,
root: String,
regex: Option<Regex>,
}

impl Lister {
pub fn new(
object_store: ObjectStore,
source: Source,
path: String,
root: String,
regex: Option<Regex>,
) -> Self {
Lister {
object_store,
source,
path,
root,
regex,
}
}
Expand All @@ -51,9 +51,9 @@ impl Lister {
Source::Dir => {
let streamer = self
.object_store
.lister_with(&self.path)
.lister_with("/")
.await
.context(error::ListObjectsSnafu { path: &self.path })?;
.context(error::ListObjectsSnafu { path: &self.root })?;

streamer
.try_filter(|f| {
Expand All @@ -66,22 +66,21 @@ impl Lister {
})
.try_collect::<Vec<_>>()
.await
.context(error::ListObjectsSnafu { path: &self.path })
.context(error::ListObjectsSnafu { path: &self.root })
}
Source::Filename(filename) => {
// make sure this file exists
let file_full_path = format!("{}{}", self.path, filename);
let _ = self.object_store.stat(&file_full_path).await.context(
let _ = self.object_store.stat(filename).await.with_context(|_| {
error::ListObjectsSnafu {
path: &file_full_path,
},
)?;
path: format!("{}{}", &self.root, filename),
}
})?;

Ok(self
.object_store
.list_with(&self.path)
.list_with("/")
.await
.context(error::ListObjectsSnafu { path: &self.path })?
.context(error::ListObjectsSnafu { path: &self.root })?
.into_iter()
.find(|f| f.name() == filename)
.map(|f| vec![f])
Expand Down
48 changes: 44 additions & 4 deletions src/common/datasource/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,29 @@ pub mod fs;
pub mod s3;
use std::collections::HashMap;

use lazy_static::lazy_static;
use object_store::ObjectStore;
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use url::{ParseError, Url};

use self::fs::build_fs_backend;
use self::s3::build_s3_backend;
use crate::error::{self, Result};
use crate::util::find_dir_and_filename;

pub const FS_SCHEMA: &str = "FS";
pub const S3_SCHEMA: &str = "S3";

/// Returns `(schema, Option<host>, path)`
pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
#[cfg(windows)]
{
// On Windows, the url may start with `C:/`.
if let Some(_) = handle_windows_path(url) {
return Ok((FS_SCHEMA.to_string(), None, url.to_string()));
}
}
let parsed_url = Url::parse(url);
match parsed_url {
Ok(url) => Ok((
Expand All @@ -44,17 +54,47 @@ pub fn parse_url(url: &str) -> Result<(String, Option<String>, String)> {
}

pub fn build_backend(url: &str, connection: &HashMap<String, String>) -> Result<ObjectStore> {
let (schema, host, _path) = parse_url(url)?;
let (schema, host, path) = parse_url(url)?;
let (root, _) = find_dir_and_filename(&path);

match schema.to_uppercase().as_str() {
S3_SCHEMA => {
let host = host.context(error::EmptyHostPathSnafu {
url: url.to_string(),
})?;
Ok(build_s3_backend(&host, "/", connection)?)
Ok(build_s3_backend(&host, &root, connection)?)
}
FS_SCHEMA => Ok(build_fs_backend(&root)?),

_ => error::UnsupportedBackendProtocolSnafu {
protocol: schema,
url,
}
FS_SCHEMA => Ok(build_fs_backend("/")?),
.fail(),
}
}

lazy_static! {
static ref DISK_SYMBOL_PATTERN: Regex = Regex::new("^([A-Za-z]:/)").unwrap();
}

pub fn handle_windows_path(url: &str) -> Option<String> {
DISK_SYMBOL_PATTERN
.captures(url)
.map(|captures| captures[0].to_string())
}

#[cfg(test)]
mod tests {
use super::handle_windows_path;

_ => error::UnsupportedBackendProtocolSnafu { protocol: schema }.fail(),
#[test]
fn test_handle_windows_path() {
assert_eq!(
handle_windows_path("C:/to/path/file"),
Some("C:/".to_string())
);
assert_eq!(handle_windows_path("https://google.com"), None);
assert_eq!(handle_windows_path("s3://bucket/path/to"), None);
}
}
5 changes: 3 additions & 2 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_datasource::object_store::{build_backend, parse_url};
use common_datasource::util::find_dir_and_filename;
use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter;
use common_recordbatch::DfSendableRecordBatchStream;
use common_telemetry::debug;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream};
Expand Down Expand Up @@ -75,10 +76,10 @@ impl StatementExecutor {
Source::Dir
};

let lister = Lister::new(object_store.clone(), source, dir, regex);
let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex);

let entries = lister.list().await.context(error::ListObjectsSnafu)?;

debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
Ok((object_store, entries))
}

Expand Down
14 changes: 10 additions & 4 deletions src/operator/src/statement/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ use common_datasource::file_format::csv::stream_to_csv;
use common_datasource::file_format::json::stream_to_json;
use common_datasource::file_format::Format;
use common_datasource::object_store::{build_backend, parse_url};
use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::TableReference as DfTableReference;
use datafusion_expr::LogicalPlanBuilder;
use object_store::ObjectStore;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use storage::sst::SstInfo;
use storage::{ParquetWriter, Source};
use table::engine::TableReference;
Expand Down Expand Up @@ -89,7 +91,7 @@ impl StatementExecutor {
) -> Result<usize> {
let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
let table = self.get_table(&table_ref).await?;

let table_id = table.table_info().table_id();
let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;

let df_table_ref = DfTableReference::from(table_ref);
Expand Down Expand Up @@ -132,11 +134,15 @@ impl StatementExecutor {
};

let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
let (_, filename) = find_dir_and_filename(&path);
let filename = filename.context(error::UnexpectedSnafu {
violated: format!("Expected filename, path: {path}"),
})?;
let object_store =
build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;

debug!("Copy table: {table_id} to path: {path}");
let rows_copied = self
.stream_to_file(stream, &format, object_store, &path)
.stream_to_file(stream, &format, object_store, &filename)
.await?;

Ok(rows_copied)
Expand Down
15 changes: 5 additions & 10 deletions tests-integration/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use session::context::{QueryContext, QueryContextRef};
use crate::test_util::check_output_stream;
use crate::tests::test_util::{
both_instances_cases, check_unordered_output_stream, distributed, find_testing_resource,
standalone, standalone_instance_case, MockInstance,
prepare_path, standalone, standalone_instance_case, MockInstance,
};

#[apply(both_instances_cases)]
Expand Down Expand Up @@ -538,7 +538,7 @@ async fn test_execute_external_create(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();

let tmp_dir = temp_dir::create_temp_dir("test_execute_external_create");
let location = tmp_dir.path().to_str().unwrap();
let location = prepare_path(tmp_dir.path().to_str().unwrap());

let output = execute_sql(
&instance,
Expand Down Expand Up @@ -575,12 +575,7 @@ async fn test_execute_external_create_infer_format(instance: Arc<dyn MockInstanc
let instance = instance.frontend();

let tmp_dir = temp_dir::create_temp_dir("test_execute_external_create_infer_format");
let location = tmp_dir
.path()
.to_str()
.unwrap()
.replace(':', "")
.replace('\\', "/");
let location = prepare_path(tmp_dir.path().to_str().unwrap());

let output = execute_sql(
&instance,
Expand All @@ -595,7 +590,7 @@ async fn test_execute_external_create_without_ts(instance: Arc<dyn MockInstance>
let instance = instance.frontend();

let tmp_dir = temp_dir::create_temp_dir("test_execute_external_create_without_ts");
let location = tmp_dir.path().to_str().unwrap();
let location = prepare_path(tmp_dir.path().to_str().unwrap());

let result = try_execute_sql(
&instance,
Expand All @@ -616,7 +611,7 @@ async fn test_execute_external_create_with_invalid_ts(instance: Arc<dyn MockInst
let instance = instance.frontend();

let tmp_dir = temp_dir::create_temp_dir("test_execute_external_create_with_invalid_ts");
let location = tmp_dir.path().to_str().unwrap();
let location = prepare_path(tmp_dir.path().to_str().unwrap());

let result = try_execute_sql(
&instance,
Expand Down
19 changes: 10 additions & 9 deletions tests-integration/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,21 @@ pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str
assert_eq!(pretty_print, expected);
}

/// Find the testing file resource under workspace root to be used in object store.
pub fn find_testing_resource(path: &str) -> String {
let p = find_workspace_path(path).display().to_string();

pub fn prepare_path(p: &str) -> String {
#[cfg(windows)]
let p = {
// We need unix style path even in the Windows, because the path is used in object-store, must
// be delimited with '/'. Inside the object-store, it will be converted to file system needed
// path in the end.
let p = p.replace(':', "").replace('\\', "/");

// Prepend a '/' to indicate it's a file system path when parsed as object-store url in Windows.
format!("/{p}")
p.replace('\\', "/")
};

p
p.to_string()
}

/// Find the testing file resource under workspace root to be used in object store.
pub fn find_testing_resource(path: &str) -> String {
let p = find_workspace_path(path).display().to_string();

prepare_path(&p)
}

0 comments on commit 40781ec

Please sign in to comment.