Skip to content

Commit

Permalink
Add to cli
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jan 20, 2025
1 parent b8fdd12 commit d6be5c2
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
2 changes: 2 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ quickwit index ingest
[--input-path <input-path>]
[--batch-size-limit <batch-size-limit>]
[--wait]
[--detailed-response]
[--force]
[--commit-timeout <commit-timeout>]
```
Expand All @@ -354,6 +355,7 @@ quickwit index ingest
| `--input-path` | Location of the input file. |
| `--batch-size-limit` | Size limit of each submitted document batch. |
| `--wait` | Wait for all documents to be committed and available for search before exiting. Applies only to the last batch, see [#5417](https://github.com/quickwit-oss/quickwit/issues/5417). |
| `--detailed-response` | Print detailed errors. Enabling might impact performance negatively. |
| `--force` | Force a commit after the last document is sent, and wait for all documents to be committed and available for search before exiting. Applies only to the last batch, see [#5417](https://github.com/quickwit-oss/quickwit/issues/5417). |
| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting, which sets the maximum time before committing splits after their creation. |

Expand Down
48 changes: 42 additions & 6 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use tabled::settings::{Alignment, Disable, Format, Modify, Panel, Rotate, Style}
use tabled::{Table, Tabled};
use tracing::{debug, Level};

use crate::checklist::GREEN_COLOR;
use crate::checklist::{GREEN_COLOR, RED_COLOR};
use crate::stats::{mean, percentile, std_deviation};
use crate::{client_args, make_table, prompt_confirmation, ClientArgs};

Expand Down Expand Up @@ -143,6 +143,10 @@ pub fn build_index_command() -> Command {
.short('w')
.help("Wait for all documents to be committed and available for search before exiting. Applies only to the last batch, see [#5417](https://github.com/quickwit-oss/quickwit/issues/5417).")
.action(ArgAction::SetTrue),
Arg::new("detailed-response")
.long("detailed-response")
.help("Print detailed errors. Enabling might impact performance negatively.")
.action(ArgAction::SetTrue),
Arg::new("force")
.long("force")
.short('f')
Expand Down Expand Up @@ -228,6 +232,7 @@ pub struct IngestDocsArgs {
pub input_path_opt: Option<PathBuf>,
pub batch_size_limit_opt: Option<ByteSize>,
pub commit_type: CommitType,
pub detailed_response: bool,
}

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -372,7 +377,7 @@ impl IndexCliCommand {
} else {
None
};

let detailed_response: bool = matches.get_flag("detailed-response");
let batch_size_limit_opt = matches
.remove_one::<String>("batch-size-limit")
.map(|limit| limit.parse::<ByteSize>())
Expand All @@ -395,6 +400,7 @@ impl IndexCliCommand {
input_path_opt,
batch_size_limit_opt,
commit_type,
detailed_response,
}))
}

Expand Down Expand Up @@ -1019,15 +1025,19 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
progress_bar.set_message(format!("{throughput:.1} MiB/s"));
};

let qw_client = args.client_args.client();
let mut qw_client_builder = args.client_args.client_builder();
if args.detailed_response {
qw_client_builder = qw_client_builder.detailed_response(args.detailed_response);
}
let qw_client = qw_client_builder.build();
let ingest_source = match args.input_path_opt {
Some(filepath) => IngestSource::File(filepath),
None => IngestSource::Stdin,
};
let batch_size_limit_opt = args
.batch_size_limit_opt
.map(|batch_size_limit| batch_size_limit.as_u64() as usize);
qw_client
let response = qw_client
.ingest(
&args.index_id,
ingest_source,
Expand All @@ -1038,9 +1048,35 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
.await?;
progress_bar.finish();
println!(
"Ingested {} documents successfully.",
"✔".color(GREEN_COLOR)
"{} Ingested {} document(s) successfully.",
"✔".color(GREEN_COLOR),
response
.num_ingested_docs
// TODO(#5604) remove unwrap
.unwrap_or(response.num_docs_for_processing),
);
if let Some(rejected) = response.num_rejected_docs {
if rejected > 0 {
println!(
"{} Rejected {} document(s).",
"✖".color(RED_COLOR),
rejected
);
}
}
if let Some(parse_failures) = response.parse_failures {
if parse_failures.len() > 0 {
println!("Detailed parse failures:");
}
for (idx, failure) in parse_failures.iter().enumerate() {
let reason_value = serde_json::to_value(&failure.reason).unwrap();
println!();
println!("┌ error {}", idx + 1);
println!("├ reason: {}", reason_value.as_str().unwrap());
println!("├ message: {}", failure.message);
println!("└ document: {}", failure.document);
}
}
Ok(())
}

Expand Down
8 changes: 6 additions & 2 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Default for ClientArgs {
}

impl ClientArgs {
pub fn client(self) -> QuickwitClient {
pub fn client_builder(self) -> QuickwitClientBuilder {
let mut builder = QuickwitClientBuilder::new(self.cluster_endpoint);
if let Some(connect_timeout) = self.connect_timeout {
builder = builder.connect_timeout(connect_timeout);
Expand All @@ -135,7 +135,11 @@ impl ClientArgs {
if let Some(commit_timeout) = self.commit_timeout {
builder = builder.commit_timeout(commit_timeout);
}
builder.build()
builder
}

pub fn client(self) -> QuickwitClient {
self.client_builder().build()
}

pub fn parse_for_ingest(matches: &mut ArgMatches) -> anyhow::Result<Self> {
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: None,
commit_type: CommitType::Auto,
detailed_response: false,
})) if &index_id == "wikipedia"
&& client_args.timeout.is_none()
&& client_args.connect_timeout.is_none()
Expand All @@ -255,6 +256,7 @@ mod tests {
"ingest",
"--index",
"wikipedia",
"--detailed-response",
"--batch-size-limit",
"8MB",
"--force",
Expand All @@ -269,6 +271,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: Some(batch_size_limit),
commit_type: CommitType::Force,
detailed_response: true,
})) if &index_id == "wikipedia"
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.timeout.is_none()
Expand Down Expand Up @@ -297,6 +300,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: Some(batch_size_limit),
commit_type: CommitType::WaitFor,
detailed_response: false,
})) if &index_id == "wikipedia"
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.timeout.is_none()
Expand Down Expand Up @@ -326,6 +330,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: None,
commit_type: CommitType::Auto,
detailed_response: false,
})) if &index_id == "wikipedia"
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.timeout == Some(Timeout::from_secs(10))
Expand Down Expand Up @@ -357,6 +362,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: None,
commit_type: CommitType::WaitFor,
detailed_response: false,
})) if &index_id == "wikipedia"
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.timeout == Some(Timeout::none())
Expand Down

0 comments on commit d6be5c2

Please sign in to comment.