Skip to content

Commit

Permalink
Detailed ingest response (#5635)
Browse files Browse the repository at this point in the history
* Detailed ingest response

* Fix unit tests

* Change detailed_parse_failures into detailed_response

And other nit improvements.

* Add to cli
  • Loading branch information
rdettai authored Jan 21, 2025
1 parent 1cfacc0 commit 072d0fe
Show file tree
Hide file tree
Showing 16 changed files with 601 additions and 107 deletions.
1 change: 1 addition & 0 deletions docs/internals/ingest-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ See [full configuration example](https://github.com/quickwit-oss/quickwit/blob/m
- but ingest V2 can also be configured with:
- `ingest_api.replication_factor`, not working yet
- ingest V1 always writes to the WAL of the node receiving the request, V2 potentially forwards it to another node, dynamically assigned by the control plane to distribute the indexing work more evenly.
- ingest V2 parses and validates input documents synchronously. Schema and JSON formatting errors are returned in the ingest response (for ingest V1 those errors were available in the server logs only).
2 changes: 2 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ quickwit index ingest
[--input-path <input-path>]
[--batch-size-limit <batch-size-limit>]
[--wait]
[--detailed-response]
[--force]
[--commit-timeout <commit-timeout>]
```
Expand All @@ -354,6 +355,7 @@ quickwit index ingest
| `--input-path` | Location of the input file. |
| `--batch-size-limit` | Size limit of each submitted document batch. |
| `--wait` | Wait for all documents to be committed and available for search before exiting. Applies only to the last batch, see [#5417](https://github.com/quickwit-oss/quickwit/issues/5417). |
| `--detailed-response` | Print detailed errors. Enabling might impact performance negatively. |
| `--force` | Force a commit after the last document is sent, and wait for all documents to be committed and available for search before exiting. Applies only to the last batch, see [#5417](https://github.com/quickwit-oss/quickwit/issues/5417). |
| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting, which sets the maximum time before committing splits after their creation. |

Expand Down
15 changes: 13 additions & 2 deletions docs/reference/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ POST api/v1/<index id>/ingest?commit=wait_for -d \
```

:::info
The payload size is limited to 10MB as this endpoint is intended to receive documents in batch.

The payload size is limited to 10MB [by default](../configuration/node-config.md#ingest-api-configuration) since this endpoint is intended to receive documents in batches.

:::

#### Path variable
Expand All @@ -207,14 +209,23 @@ The payload size is limited to 10MB as this endpoint is intended to receive docu
| Variable | Type | Description | Default value |
|---------------------|------------|----------------------------------------------------|---------------|
| `commit` | `String` | The commit behavior: `auto`, `wait_for` or `force` | `auto` |
| `detailed_response` | `bool` | Enable `parse_failures` in the response. Setting to `true` might impact performances negatively. | `false` |

#### Response

The response is a JSON object, and the content type is `application/json; charset=UTF-8.`

| Field | Description | Type |
|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------:|
| `num_docs_for_processing` | Total number of documents ingested for processing. The documents may not have been processed. The API will not return indexing errors, check the server logs for errors. | `number` |
| `num_docs_for_processing` | Total number of documents submitted for processing. The documents may not have been processed. | `number` |
| `num_ingested_docs` | Number of documents successfully persisted in the write ahead log | `number` |
| `num_rejected_docs` | Number of documents that couldn't be parsed (invalid json, bad schema...) | `number` |
| `parse_failures` | List detailing parsing failures. Only available if `detailed_response` is set to `true`. | `list(object)` |

The parse failure objects contain the following fields:
- `message`: a detailed message explaining the error
- `reason`: one of `invalid_json`, `invalid_schema` or `unspecified`
- `document`: the utf-8 decoded string of the document byte chunk that generated the error


## Index API
Expand Down
48 changes: 42 additions & 6 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use tabled::settings::{Alignment, Disable, Format, Modify, Panel, Rotate, Style}
use tabled::{Table, Tabled};
use tracing::{debug, Level};

use crate::checklist::GREEN_COLOR;
use crate::checklist::{GREEN_COLOR, RED_COLOR};
use crate::stats::{mean, percentile, std_deviation};
use crate::{client_args, make_table, prompt_confirmation, ClientArgs};

Expand Down Expand Up @@ -143,6 +143,10 @@ pub fn build_index_command() -> Command {
.short('w')
.help("Wait for all documents to be committed and available for search before exiting. Applies only to the last batch, see [#5417](https://github.com/quickwit-oss/quickwit/issues/5417).")
.action(ArgAction::SetTrue),
Arg::new("detailed-response")
.long("detailed-response")
.help("Print detailed errors. Enabling might impact performance negatively.")
.action(ArgAction::SetTrue),
Arg::new("force")
.long("force")
.short('f')
Expand Down Expand Up @@ -228,6 +232,7 @@ pub struct IngestDocsArgs {
pub input_path_opt: Option<PathBuf>,
pub batch_size_limit_opt: Option<ByteSize>,
pub commit_type: CommitType,
pub detailed_response: bool,
}

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -372,7 +377,7 @@ impl IndexCliCommand {
} else {
None
};

let detailed_response: bool = matches.get_flag("detailed-response");
let batch_size_limit_opt = matches
.remove_one::<String>("batch-size-limit")
.map(|limit| limit.parse::<ByteSize>())
Expand All @@ -395,6 +400,7 @@ impl IndexCliCommand {
input_path_opt,
batch_size_limit_opt,
commit_type,
detailed_response,
}))
}

Expand Down Expand Up @@ -1019,15 +1025,19 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
progress_bar.set_message(format!("{throughput:.1} MiB/s"));
};

let qw_client = args.client_args.client();
let mut qw_client_builder = args.client_args.client_builder();
if args.detailed_response {
qw_client_builder = qw_client_builder.detailed_response(args.detailed_response);
}
let qw_client = qw_client_builder.build();
let ingest_source = match args.input_path_opt {
Some(filepath) => IngestSource::File(filepath),
None => IngestSource::Stdin,
};
let batch_size_limit_opt = args
.batch_size_limit_opt
.map(|batch_size_limit| batch_size_limit.as_u64() as usize);
qw_client
let response = qw_client
.ingest(
&args.index_id,
ingest_source,
Expand All @@ -1038,9 +1048,35 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
.await?;
progress_bar.finish();
println!(
"Ingested {} documents successfully.",
"✔".color(GREEN_COLOR)
"{} Ingested {} document(s) successfully.",
"✔".color(GREEN_COLOR),
response
.num_ingested_docs
// TODO(#5604) remove unwrap
.unwrap_or(response.num_docs_for_processing),
);
if let Some(rejected) = response.num_rejected_docs {
if rejected > 0 {
println!(
"{} Rejected {} document(s).",
"✖".color(RED_COLOR),
rejected
);
}
}
if let Some(parse_failures) = response.parse_failures {
if !parse_failures.is_empty() {
println!("Detailed parse failures:");
}
for (idx, failure) in parse_failures.iter().enumerate() {
let reason_value = serde_json::to_value(failure.reason).unwrap();
println!();
println!("┌ error {}", idx + 1);
println!("├ reason: {}", reason_value.as_str().unwrap());
println!("├ message: {}", failure.message);
println!("└ document: {}", failure.document);
}
}
Ok(())
}

Expand Down
8 changes: 6 additions & 2 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Default for ClientArgs {
}

impl ClientArgs {
pub fn client(self) -> QuickwitClient {
pub fn client_builder(self) -> QuickwitClientBuilder {
let mut builder = QuickwitClientBuilder::new(self.cluster_endpoint);
if let Some(connect_timeout) = self.connect_timeout {
builder = builder.connect_timeout(connect_timeout);
Expand All @@ -135,7 +135,11 @@ impl ClientArgs {
if let Some(commit_timeout) = self.commit_timeout {
builder = builder.commit_timeout(commit_timeout);
}
builder.build()
builder
}

pub fn client(self) -> QuickwitClient {
self.client_builder().build()
}

pub fn parse_for_ingest(matches: &mut ArgMatches) -> anyhow::Result<Self> {
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: None,
commit_type: CommitType::Auto,
detailed_response: false,
})) if &index_id == "wikipedia"
&& client_args.timeout.is_none()
&& client_args.connect_timeout.is_none()
Expand All @@ -255,6 +256,7 @@ mod tests {
"ingest",
"--index",
"wikipedia",
"--detailed-response",
"--batch-size-limit",
"8MB",
"--force",
Expand All @@ -269,6 +271,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: Some(batch_size_limit),
commit_type: CommitType::Force,
detailed_response: true,
})) if &index_id == "wikipedia"
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.timeout.is_none()
Expand Down Expand Up @@ -297,6 +300,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: Some(batch_size_limit),
commit_type: CommitType::WaitFor,
detailed_response: false,
})) if &index_id == "wikipedia"
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.timeout.is_none()
Expand Down Expand Up @@ -326,6 +330,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: None,
commit_type: CommitType::Auto,
detailed_response: false,
})) if &index_id == "wikipedia"
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.timeout == Some(Timeout::from_secs(10))
Expand Down Expand Up @@ -357,6 +362,7 @@ mod tests {
input_path_opt: None,
batch_size_limit_opt: None,
commit_type: CommitType::WaitFor,
detailed_response: false,
})) if &index_id == "wikipedia"
&& client_args.cluster_endpoint == Url::from_str("http://127.0.0.1:7280").unwrap()
&& client_args.timeout == Some(Timeout::none())
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub enum IngestServiceError {
RateLimited(RateLimitingCause),
#[error("ingest service is unavailable ({0})")]
Unavailable(String),
#[error("bad request ({0})")]
BadRequest(String),
}

impl From<AskError<IngestServiceError>> for IngestServiceError {
Expand Down Expand Up @@ -161,6 +163,7 @@ impl ServiceError for IngestServiceError {
}
Self::RateLimited(_) => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
Self::BadRequest(_) => ServiceErrorCode::BadRequest,
}
}
}
Expand Down Expand Up @@ -204,6 +207,7 @@ impl From<IngestServiceError> for tonic::Status {
IngestServiceError::IoError { .. } => tonic::Code::Internal,
IngestServiceError::RateLimited(_) => tonic::Code::ResourceExhausted,
IngestServiceError::Unavailable(_) => tonic::Code::Unavailable,
IngestServiceError::BadRequest(_) => tonic::Code::InvalidArgument,
};
let message = error.to_string();
tonic::Status::new(code, message)
Expand Down
10 changes: 0 additions & 10 deletions quickwit/quickwit-ingest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,6 @@ pub async fn start_ingest_api_service(
init_ingest_api(universe, &queues_dir_path, config).await
}

impl CommitType {
pub fn to_query_parameter(&self) -> Option<&'static [(&'static str, &'static str)]> {
match self {
CommitType::Auto => None,
CommitType::WaitFor => Some(&[("commit", "wait_for")]),
CommitType::Force => Some(&[("commit", "force")]),
}
}
}

#[macro_export]
macro_rules! with_lock_metrics {
($future:expr, $($label:tt),*) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use quickwit_rest_client::rest_client::{
CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL,
};
use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver;
use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString};
use quickwit_serve::{
serve_quickwit, ListSplitsQueryParams, RestIngestResponse, SearchRequestQueryString,
};
use quickwit_storage::StorageResolver;
use reqwest::Url;
use serde_json::Value;
Expand Down Expand Up @@ -246,11 +248,11 @@ pub(crate) async fn ingest(
index_id: &str,
ingest_source: IngestSource,
commit_type: CommitType,
) -> anyhow::Result<()> {
client
) -> anyhow::Result<RestIngestResponse> {
let resp = client
.ingest(index_id, ingest_source, None, None, commit_type)
.await?;
Ok(())
Ok(resp)
}

/// A test environment where you can start a Quickwit cluster and use the gRPC
Expand Down Expand Up @@ -286,6 +288,15 @@ impl ClusterSandbox {
QuickwitClientBuilder::new(transport_url(node_config.rest_config.listen_addr)).build()
}

/// A client configured to ingest documents and return detailed parse failures.
pub fn detailed_ingest_client(&self) -> QuickwitClient {
let node_config = self.find_node_for_service(QuickwitService::Indexer);

QuickwitClientBuilder::new(transport_url(node_config.rest_config.listen_addr))
.detailed_response(true)
.build()
}

// TODO(#5604)
pub fn rest_client_legacy_indexer(&self) -> QuickwitClient {
let node_config = self.find_node_for_service(QuickwitService::Indexer);
Expand Down
Loading

0 comments on commit 072d0fe

Please sign in to comment.