diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 9b00bdeaf6dc..fbc39c825af5 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -237,6 +237,14 @@ pub async fn remote_read( handler.read(request, query_ctx).await } +fn try_decompress(is_zstd: bool, body: &[u8]) -> Result { + Ok(Bytes::from(if is_zstd { + zstd_decompress(body)? + } else { + snappy_decompress(body)? + })) +} + async fn decode_remote_write_request( is_zstd: bool, body: Body, @@ -247,11 +255,18 @@ async fn decode_remote_write_request( .await .context(error::HyperSnafu)?; - let buf = Bytes::from(if is_zstd { - zstd_decompress(&body[..])? + // due to vmagent's limitation, there is a chance that vmagent is + // sending content type wrong so we have to apply a fallback with decoding + // the content in another method. + // + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301 + // see https://github.com/GreptimeTeam/greptimedb/issues/3929 + let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) { + buf } else { - snappy_decompress(&body[..])? - }); + // fallback to the other compression method + try_decompress(!is_zstd, &body[..])? + }; let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default); request diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 49132fbc7bf2..3e055a12367b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -963,5 +963,30 @@ pub async fn test_vm_proto_remote_write(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::NO_CONTENT); + // also test fallback logic, vmagent could sent data in wrong content-type + // we encode it as zstd but send it as snappy + let compressed_request = + zstd::stream::encode_all(&serialized_request[..], 1).expect("Failed to encode zstd"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // reversed + let compressed_request = + prom_store::snappy_compress(&serialized_request[..]).expect("Failed to encode snappy"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "zstd") + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + guard.remove_all().await; }