Skip to content

Commit

Permalink
Change detailed_parse_failures into detailed_response
Browse files Browse the repository at this point in the history
And other nit improvements.
  • Loading branch information
rdettai committed Jan 20, 2025
1 parent 52377c6 commit 7855e44
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 48 deletions.
16 changes: 9 additions & 7 deletions docs/reference/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ POST api/v1/<index id>/ingest?commit=wait_for -d \
```

:::info
The payload size is limited to 10MB [by default](../configuration/node-config.md#ingest-api-configuration) as this endpoint is intended to receive documents in batch.

The payload size is limited to 10MB [by default](../configuration/node-config.md#ingest-api-configuration) since this endpoint is intended to receive documents in batches.

:::

#### Path variable
Expand All @@ -204,10 +206,10 @@ The payload size is limited to 10MB [by default](../configuration/node-config.md

#### Query parameters

| Variable | Type | Description | Default value |
|---------------------------|------------|----------------------------------------------------|---------------|
| `commit` | `String` | The commit behavior: `auto`, `wait_for` or `force` | `auto` |
| `detailed_parse_failures` | `bool` | Enable `parse_failures` in the response. Setting to `true` might impact performances negatively. | `false` |
| Variable | Type | Description | Default value |
|---------------------|------------|----------------------------------------------------|---------------|
| `commit` | `String` | The commit behavior: `auto`, `wait_for` or `force` | `auto` |
| `detailed_response` | `bool` | Enable `parse_failures` in the response. Setting to `true` might impact performances negatively. | `false` |

#### Response

Expand All @@ -218,11 +220,11 @@ The response is a JSON object, and the content type is `application/json; charse
| `num_docs_for_processing` | Total number of documents submitted for processing. The documents may not have been processed. | `number` |
| `num_ingested_docs` | Number of documents successfully persisted in the write ahead log | `number` |
| `num_rejected_docs` | Number of documents that couldn't be parsed (invalid json, bad schema...) | `number` |
| `parse_failures` | List detailing parsing failures. Only available if `detailed_parse_failures` is set to `true`. | `list(object)` |
| `parse_failures` | List detailing parsing failures. Only available if `detailed_response` is set to `true`. | `list(object)` |

The parse failure objects contain the following fields:
- `message`: a detailed message explaining the error
- `reason`: on of `invalid_json`, `invalid_schema` or `unspecified`
- `reason`: one of `invalid_json`, `invalid_schema` or `unspecified`
- `document`: the utf-8 decoded string of the document byte chunk that generated the error


Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub enum IngestServiceError {
#[error("ingest service is unavailable ({0})")]
Unavailable(String),
#[error("bad request ({0})")]
Unsupported(String),
BadRequest(String),
}

impl From<AskError<IngestServiceError>> for IngestServiceError {
Expand Down Expand Up @@ -163,7 +163,7 @@ impl ServiceError for IngestServiceError {
}
Self::RateLimited(_) => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
Self::Unsupported(_) => ServiceErrorCode::BadRequest,
Self::BadRequest(_) => ServiceErrorCode::BadRequest,
}
}
}
Expand Down Expand Up @@ -207,7 +207,7 @@ impl From<IngestServiceError> for tonic::Status {
IngestServiceError::IoError { .. } => tonic::Code::Internal,
IngestServiceError::RateLimited(_) => tonic::Code::ResourceExhausted,
IngestServiceError::Unavailable(_) => tonic::Code::Unavailable,
IngestServiceError::Unsupported(_) => tonic::Code::InvalidArgument,
IngestServiceError::BadRequest(_) => tonic::Code::InvalidArgument,
};
let message = error.to_string();
tonic::Status::new(code, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl ClusterSandbox {
let node_config = self.find_node_for_service(QuickwitService::Indexer);

QuickwitClientBuilder::new(transport_url(node_config.rest_config.listen_addr))
.detailed_parse_failures(true)
.detailed_response(true)
.build()
}

Expand Down
19 changes: 10 additions & 9 deletions quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub struct QuickwitClientBuilder {
/// Forces use of ingest v1.
use_legacy_ingest: bool,
/// Request detailed parse failures report from the ingest api.
detailed_parse_failures: bool,
detailed_response: bool,
}

impl QuickwitClientBuilder {
Expand All @@ -139,7 +139,7 @@ impl QuickwitClientBuilder {
ingest_timeout: DEFAULT_CLIENT_INGEST_TIMEOUT,
commit_timeout: DEFAULT_CLIENT_COMMIT_TIMEOUT,
use_legacy_ingest: false,
detailed_parse_failures: false,
detailed_response: false,
}
}

Expand Down Expand Up @@ -169,8 +169,8 @@ impl QuickwitClientBuilder {
self
}

pub fn detailed_parse_failures(mut self, is_detailed: bool) -> Self {
self.detailed_parse_failures = is_detailed;
pub fn detailed_response(mut self, is_detailed: bool) -> Self {
self.detailed_response = is_detailed;
self
}

Expand All @@ -188,7 +188,7 @@ impl QuickwitClientBuilder {
ingest_timeout: self.ingest_timeout,
commit_timeout: self.commit_timeout,
use_legacy_ingest: self.use_legacy_ingest,
detailed_parse_failures: self.detailed_parse_failures,
detailed_response: self.detailed_response,
}
}
}
Expand All @@ -207,7 +207,7 @@ pub struct QuickwitClient {
/// Forces use of ingest v1.
use_legacy_ingest: bool,
/// Request detailed parse failures report from the ingest api.
detailed_parse_failures: bool,
detailed_response: bool,
}

impl QuickwitClient {
Expand Down Expand Up @@ -275,8 +275,8 @@ impl QuickwitClient {
if self.use_legacy_ingest {
query_params.insert("use_legacy_ingest", "true");
}
if self.detailed_parse_failures {
query_params.insert("detailed_parse_failures", "true");
if self.detailed_response {
query_params.insert("detailed_response", "true");
}
let batch_size_limit = batch_size_limit_opt.unwrap_or(INGEST_CONTENT_LENGTH_LIMIT);
let mut batch_reader = match ingest_source {
Expand Down Expand Up @@ -322,7 +322,8 @@ impl QuickwitClient {
}
tokio::time::sleep(Duration::from_millis(500)).await;
} else {
cumulated_resp = cumulated_resp + response.deserialize().await?;
let current_parsed_resp = response.deserialize().await?;
cumulated_resp = cumulated_resp.merge(current_parsed_resp);
break;
}
}
Expand Down
35 changes: 14 additions & 21 deletions quickwit/quickwit-serve/src/ingest_api/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeMap;
use std::ops::Add;

use bytes::Bytes;
use quickwit_ingest::{IngestResponse, IngestServiceError};
Expand All @@ -40,12 +39,12 @@ pub struct RestIngestResponse {
pub num_docs_for_processing: u64,
/// Number of docs successfully ingested
#[serde(skip_serializing_if = "Option::is_none")]
pub num_ingested_docs: Option<u64>,
pub num_ingested_docs: Option<u64>, // TODO(#5604) remove Option
/// Number of docs rejected because of parsing errors
#[serde(skip_serializing_if = "Option::is_none")]
pub num_rejected_docs: Option<u64>,
pub num_rejected_docs: Option<u64>, // TODO(#5604) remove Option
/// Detailed description of parsing errors (available if the path param
/// `detailed_parse_failures` is set to `true`)
/// `detailed_response` is set to `true`)
#[serde(skip_serializing_if = "Option::is_none")]
pub parse_failures: Option<Vec<RestParseFailure>>,
}
Expand All @@ -54,16 +53,14 @@ impl RestIngestResponse {
pub(crate) fn from_ingest_v1(ingest_response: IngestResponse) -> Self {
Self {
num_docs_for_processing: ingest_response.num_docs_for_processing,
num_ingested_docs: None,
num_rejected_docs: None,
parse_failures: None,
..Default::default()
}
}

/// Generate a detailed failure description if `doc_batch_clone_opt.is_some()`
pub(crate) fn from_ingest_v2(
mut ingest_response: IngestResponseV2,
doc_batch_clone_opt: Option<DocBatchV2>,
doc_batch_clone_opt: Option<&DocBatchV2>,
num_docs_for_processing: u64,
) -> Result<Self, IngestServiceError> {
let num_responses = ingest_response.successes.len() + ingest_response.failures.len();
Expand All @@ -84,7 +81,7 @@ impl RestIngestResponse {
parse_failures: None,
};
if let Some(doc_batch) = doc_batch_clone_opt {
let docs: BTreeMap<DocUid, Bytes> = doc_batch.into_docs().collect();
let docs: BTreeMap<DocUid, Bytes> = doc_batch.docs().collect();
let mut parse_failures = Vec::with_capacity(success_resp.parse_failures.len());
for failure in success_resp.parse_failures {
let doc = docs.get(&failure.doc_uid()).ok_or_else(|| {
Expand All @@ -94,21 +91,17 @@ impl RestIngestResponse {
))
})?;
parse_failures.push(RestParseFailure {
reason: failure.reason(),
message: failure.message,
document: String::from_utf8(doc.to_vec()).unwrap(),
reason: ParseFailureReason::from_i32(failure.reason).unwrap_or_default(),
});
}
resp.parse_failures = Some(parse_failures);
}
Ok(resp)
}
}

impl Add for RestIngestResponse {
type Output = Self;

fn add(self, other: Self) -> Self {
pub fn merge(self, other: Self) -> Self {
Self {
num_docs_for_processing: self.num_docs_for_processing + other.num_docs_for_processing,
num_ingested_docs: apply_op(self.num_ingested_docs, other.num_ingested_docs, |a, b| {
Expand Down Expand Up @@ -214,7 +207,7 @@ mod tests {
}

#[test]
fn test_add_responses() {
fn test_merge_responses() {
let response1 = RestIngestResponse {
num_docs_for_processing: 10,
num_ingested_docs: Some(5),
Expand All @@ -235,12 +228,12 @@ mod tests {
reason: ParseFailureReason::InvalidJson,
}]),
};
let combined_response = response1 + response2;
assert_eq!(combined_response.num_docs_for_processing, 25);
assert_eq!(combined_response.num_ingested_docs.unwrap(), 15);
assert_eq!(combined_response.num_rejected_docs.unwrap(), 5);
let merged_response = response1.merge(response2);
assert_eq!(merged_response.num_docs_for_processing, 25);
assert_eq!(merged_response.num_ingested_docs.unwrap(), 15);
assert_eq!(merged_response.num_rejected_docs.unwrap(), 5);
assert_eq!(
combined_response.parse_failures.unwrap(),
merged_response.parse_failures.unwrap(),
vec![
RestParseFailure {
message: "error1".to_string(),
Expand Down
18 changes: 11 additions & 7 deletions quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct IngestOptions {
#[serde(default)]
use_legacy_ingest: bool,
#[serde(default)]
detailed_parse_failures: bool,
detailed_response: bool,
}

impl IngestOptions {
Expand Down Expand Up @@ -175,9 +175,9 @@ async fn ingest_v1(
ingest_options: IngestOptions,
ingest_service: IngestServiceClient,
) -> Result<RestIngestResponse, IngestServiceError> {
if ingest_options.detailed_parse_failures {
return Err(IngestServiceError::Unsupported(
"detailed_parse_failures is not supported in ingest v1".to_string(),
if ingest_options.detailed_response {
return Err(IngestServiceError::BadRequest(
"detailed_response is not supported in ingest v1".to_string(),
));
}
// The size of the body should be an upper bound of the size of the batch. The removal of the
Expand Down Expand Up @@ -214,7 +214,7 @@ async fn ingest_v2(
return Ok(response);
};
let num_docs_for_processing = doc_batch.num_docs() as u64;
let doc_batch_clone_opt = if ingest_options.detailed_parse_failures {
let doc_batch_clone_opt = if ingest_options.detailed_response {
Some(doc_batch.clone())
} else {
None
Expand All @@ -231,7 +231,11 @@ async fn ingest_v2(
subrequests: vec![subrequest],
};
let response = ingest_router.ingest(request).await?;
RestIngestResponse::from_ingest_v2(response, doc_batch_clone_opt, num_docs_for_processing)
RestIngestResponse::from_ingest_v2(
response,
doc_batch_clone_opt.as_ref(),
num_docs_for_processing,
)
}

pub fn tail_handler(
Expand Down Expand Up @@ -572,7 +576,7 @@ pub(crate) mod tests {
false,
);
let resp = warp::test::request()
.path("/my-index/ingest?detailed_parse_failures=true")
.path("/my-index/ingest?detailed_response=true")
.method("POST")
.json(&true)
.body(r#"{"id": 1, "message": "push"}"#)
Expand Down

0 comments on commit 7855e44

Please sign in to comment.