From 96ccea2e49102018666ab77696e04f0798d1960c Mon Sep 17 00:00:00 2001 From: Gerard Klijs Date: Sat, 21 Oct 2023 13:38:07 +0200 Subject: [PATCH] Add functions to get all the subjects, and all versions of a subject. As well as opening up the function to get a schema from schema registry. --- README.md | 10 +- src/async_impl/schema_registry.rs | 118 +++++++++++++++++++--- src/blocking/schema_registry.rs | 113 ++++++++++++++++++--- src/schema_registry_common.rs | 16 +-- tests/async_impl/mod.rs | 1 + tests/async_impl/schema_registry_calls.rs | 35 +++++++ tests/blocking/mod.rs | 1 + tests/blocking/proto_tests.rs | 2 +- tests/blocking/schema_registry_calls.rs | 30 ++++++ tests/lib.rs | 2 + 10 files changed, 292 insertions(+), 36 deletions(-) create mode 100644 tests/async_impl/mod.rs create mode 100644 tests/async_impl/schema_registry_calls.rs create mode 100644 tests/blocking/schema_registry_calls.rs diff --git a/README.md b/README.md index 33a1f74..5b00e0d 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,8 @@ from the diagram. ## Example with consumer and producer using Avro (blocking) -Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an implementation +Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an +implementation of either the `serde::Deserialize` or `serde::Serialize` trait to work. The examples are especially useful to update from the 1.x.x version, when starting you probably want to use the async versions. @@ -177,6 +178,13 @@ fn get_future_record_from_struct<'a>( } ``` +## Direct interaction with schema registry + +Some functions have been opened so this library can be used to directly get all the subjects, all the version of a +subject, or the raw schema with a subject and version. For these see the +either [async](tests/async_impl/schema_registry_calls.rs) or [blocking](tests/blocking/schema_registry_calls.rs) version +of the integration tests. + ## Example using to post schema to schema registry ```rust diff --git a/src/async_impl/schema_registry.rs b/src/async_impl/schema_registry.rs index 69668a2..0084f7a 100644 --- a/src/async_impl/schema_registry.rs +++ b/src/async_impl/schema_registry.rs @@ -5,8 +5,8 @@ use std::time::Duration; use dashmap::DashMap; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{self, StreamExt}; -use reqwest::header; use reqwest::header::{HeaderName, ACCEPT, CONTENT_TYPE}; +use reqwest::{header, RequestBuilder, Response}; use reqwest::{Client, ClientBuilder}; use serde_json::{json, Map, Value}; @@ -399,7 +399,7 @@ fn post_reference<'a>( .boxed() } -async fn perform_sr_call( +pub async fn perform_sr_call( sr_settings: &SrSettings, sr_call: SrCall<'_>, ) -> Result { @@ -420,6 +420,23 @@ async fn perform_sr_call( } } +async fn apply_authentication( + builder: RequestBuilder, + authentication: &SrAuthorization, +) -> Result { + match authentication { + SrAuthorization::None => builder.send().await, + SrAuthorization::Token(token) => builder.bearer_auth(token).send().await, + SrAuthorization::Basic(username, password) => { + let p = match password { + None => None, + Some(v) => Some(v), + }; + builder.basic_auth(username, p).send().await + } + } +} + async fn perform_single_sr_call( base_url: &str, client: &Client, @@ -437,17 +454,7 @@ async fn perform_single_sr_call( .header(CONTENT_TYPE, "application/vnd.schemaregistry.v1+json") .header(ACCEPT, "application/vnd.schemaregistry.v1+json"), }; - let call = match authentication { - SrAuthorization::None => builder.send().await, - SrAuthorization::Token(token) => builder.bearer_auth(token).send().await, - SrAuthorization::Basic(username, password) => { - let p = match password { - None => None, - Some(v) => Some(v), - }; - builder.basic_auth(username, p).send().await - } - }; + let call = apply_authentication(builder, authentication).await; match call { Ok(v) => match v.json::().await { Ok(r) => Ok(r), @@ -463,6 +470,91 @@ async fn perform_single_sr_call( } } +pub async fn get_all_subjects(sr_settings: &SrSettings) -> Result, SRCError> { + let url_count = sr_settings.urls.len(); + let mut n = 0; + loop { + let result = perform_single_subjects_call( + &sr_settings.urls[n], + &sr_settings.client, + &sr_settings.authorization, + ) + .await; + if result.is_ok() || n + 1 == url_count { + break result; + } + n += 1 + } +} + +async fn perform_single_subjects_call( + base_url: &str, + client: &Client, + authentication: &SrAuthorization, +) -> Result, SRCError> { + let url = format!("{}/subjects", base_url); + let builder = client.get(url); + let call = apply_authentication(builder, authentication).await; + match call { + Ok(v) => match v.json::>().await { + Ok(r) => Ok(r), + Err(e) => Err(SRCError::non_retryable_with_cause( + e, + "could not parse to list of subjects, the http call failed, cause will give more information", + )), + }, + Err(e) => Err(SRCError::retryable_with_cause( + e, + "http call to schema registry failed", + )), + } +} + +pub async fn get_all_versions( + sr_settings: &SrSettings, + subject: String, +) -> Result, SRCError> { + let url_count = sr_settings.urls.len(); + let mut n = 0; + loop { + let result = perform_single_versions_call( + &sr_settings.urls[n], + &sr_settings.client, + &sr_settings.authorization, + &subject, + ) + .await; + if result.is_ok() || n + 1 == url_count { + break result; + } + n += 1 + } +} + +async fn perform_single_versions_call( + base_url: &str, + client: &Client, + authentication: &SrAuthorization, + subject: &String, +) -> Result, SRCError> { + let url = format!("{}/subjects/{}/versions", base_url, subject); + let builder = client.get(url); + let call = apply_authentication(builder, authentication).await; + match call { + Ok(v) => match v.json::>().await { + Ok(r) => Ok(r), + Err(e) => Err(SRCError::non_retryable_with_cause( + e, + "could not parse to list of versions, the http call failed, cause will give more information", + )), + }, + Err(e) => Err(SRCError::retryable_with_cause( + e, + "http call to schema registry failed", + )), + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/src/blocking/schema_registry.rs b/src/blocking/schema_registry.rs index 3da600b..f739a1c 100644 --- a/src/blocking/schema_registry.rs +++ b/src/blocking/schema_registry.rs @@ -4,7 +4,7 @@ use std::str; use std::time::Duration; use dashmap::DashMap; -use reqwest::blocking::{Client, ClientBuilder}; +use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response}; use reqwest::header; use reqwest::header::{HeaderName, ACCEPT, CONTENT_TYPE}; use serde_json::{json, Map, Value}; @@ -385,7 +385,7 @@ fn post_reference( }) } -fn perform_sr_call( +pub fn perform_sr_call( sr_settings: &SrSettings, sr_call: SrCall, ) -> Result { @@ -405,6 +405,23 @@ fn perform_sr_call( } } +fn apply_authentication( + builder: RequestBuilder, + authentication: &SrAuthorization, +) -> Result { + match authentication { + SrAuthorization::None => builder.send(), + SrAuthorization::Token(token) => builder.bearer_auth(token).send(), + SrAuthorization::Basic(username, password) => { + let p = match password { + None => None, + Some(v) => Some(v), + }; + builder.basic_auth(username, p).send() + } + } +} + fn perform_single_sr_call( base_url: &str, client: &Client, @@ -422,17 +439,7 @@ fn perform_single_sr_call( .header(CONTENT_TYPE, "application/vnd.schemaregistry.v1+json") .header(ACCEPT, "application/vnd.schemaregistry.v1+json"), }; - let call = match authentication { - SrAuthorization::None => builder.send(), - SrAuthorization::Token(token) => builder.bearer_auth(token).send(), - SrAuthorization::Basic(username, password) => { - let p = match password { - None => None, - Some(v) => Some(v), - }; - builder.basic_auth(username, p).send() - } - }; + let call = apply_authentication(builder, authentication); match call { Ok(v) => match v.json::() { Ok(r) => Ok(r), @@ -448,6 +455,86 @@ fn perform_single_sr_call( } } +pub fn get_all_subjects(sr_settings: &SrSettings) -> Result, SRCError> { + let url_count = sr_settings.urls.len(); + let mut n = 0; + loop { + let result = perform_single_subjects_call( + &sr_settings.urls[n], + &sr_settings.client, + &sr_settings.authorization, + ); + if result.is_ok() || n + 1 == url_count { + break result; + } + n += 1 + } +} + +fn perform_single_subjects_call( + base_url: &str, + client: &Client, + authentication: &SrAuthorization, +) -> Result, SRCError> { + let url = format!("{}/subjects", base_url); + let builder = client.get(url); + let call = apply_authentication(builder, authentication); + match call { + Ok(v) => match v.json::>() { + Ok(r) => Ok(r), + Err(e) => Err(SRCError::non_retryable_with_cause( + e, + "could not parse to list of subjects, the http call failed, cause will give more information", + )), + }, + Err(e) => Err(SRCError::retryable_with_cause( + e, + "http call to schema registry failed", + )), + } +} + +pub fn get_all_versions(sr_settings: &SrSettings, subject: String) -> Result, SRCError> { + let url_count = sr_settings.urls.len(); + let mut n = 0; + loop { + let result = perform_single_versions_call( + &sr_settings.urls[n], + &sr_settings.client, + &sr_settings.authorization, + &subject, + ); + if result.is_ok() || n + 1 == url_count { + break result; + } + n += 1 + } +} + +fn perform_single_versions_call( + base_url: &str, + client: &Client, + authentication: &SrAuthorization, + subject: &String, +) -> Result, SRCError> { + let url = format!("{}/subjects/{}/versions", base_url, subject); + let builder = client.get(url); + let call = apply_authentication(builder, authentication); + match call { + Ok(v) => match v.json::>() { + Ok(r) => Ok(r), + Err(e) => Err(SRCError::non_retryable_with_cause( + e, + "could not parse to list of versions, the http call failed, cause will give more information", + )), + }, + Err(e) => Err(SRCError::retryable_with_cause( + e, + "http call to schema registry failed", + )), + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/src/schema_registry_common.rs b/src/schema_registry_common.rs index 4a0e467..69a8021 100644 --- a/src/schema_registry_common.rs +++ b/src/schema_registry_common.rs @@ -72,13 +72,13 @@ pub struct RegisteredSchema { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub(crate) struct RawRegisteredSchema { - pub(crate) subject: Option, - pub(crate) version: Option, - pub(crate) id: Option, - pub(crate) schema_type: Option, - pub(crate) references: Option>, - pub(crate) schema: Option, +pub struct RawRegisteredSchema { + pub subject: Option, + pub version: Option, + pub id: Option, + pub schema_type: Option, + pub references: Option>, + pub schema: Option, } /// Intermediate result to just handle the byte transformation. When used in a decoder just the @@ -158,7 +158,7 @@ impl SubjectNameStrategy { } #[derive(Debug, Clone, Copy)] -pub(crate) enum SrCall<'a> { +pub enum SrCall<'a> { GetById(u32), GetLatest(&'a str), GetBySubjectAndVersion(&'a str, u32), diff --git a/tests/async_impl/mod.rs b/tests/async_impl/mod.rs new file mode 100644 index 0000000..51b551d --- /dev/null +++ b/tests/async_impl/mod.rs @@ -0,0 +1 @@ +mod schema_registry_calls; diff --git a/tests/async_impl/schema_registry_calls.rs b/tests/async_impl/schema_registry_calls.rs new file mode 100644 index 0000000..884756c --- /dev/null +++ b/tests/async_impl/schema_registry_calls.rs @@ -0,0 +1,35 @@ +use schema_registry_converter::async_impl::schema_registry::{ + get_all_subjects, get_all_versions, perform_sr_call, SrSettings, +}; +use schema_registry_converter::schema_registry_common::SrCall; + +fn get_schema_registry_url() -> String { + String::from("http://localhost:8081") +} + +#[tokio::test] +async fn test_get_all_subjects() { + let sr_settings = SrSettings::new(get_schema_registry_url()); + let result = get_all_subjects(&sr_settings).await.unwrap(); + assert!( + result.contains(&String::from("testavro-value")), + "List of subjects contains testavro-value" + ); +} + +#[tokio::test] +async fn test_get_versions_for_testavro_value() { + let sr_settings = SrSettings::new(get_schema_registry_url()); + let result = get_all_versions(&sr_settings, String::from("testavro-value")) + .await + .unwrap(); + assert_eq!(vec![1], result, "List of version is just one"); +} + +#[tokio::test] +async fn test_get_schema_for_testavro_value() { + let sr_settings = SrSettings::new(get_schema_registry_url()); + let sr_call = SrCall::GetBySubjectAndVersion("testavro-value", 1); + let result = perform_sr_call(&sr_settings, sr_call).await.unwrap(); + assert_eq!(Some(1), result.version, "Returned schema has version 1"); +} diff --git a/tests/blocking/mod.rs b/tests/blocking/mod.rs index c2c7296..150887d 100644 --- a/tests/blocking/mod.rs +++ b/tests/blocking/mod.rs @@ -8,3 +8,4 @@ pub mod kafka_producer; mod proto_consumer; #[cfg(feature = "proto_decoder")] mod proto_tests; +mod schema_registry_calls; diff --git a/tests/blocking/proto_tests.rs b/tests/blocking/proto_tests.rs index a6e29a1..6b4e79c 100644 --- a/tests/blocking/proto_tests.rs +++ b/tests/blocking/proto_tests.rs @@ -4,7 +4,7 @@ use protofish::decode::Value; use crate::blocking::proto_consumer::{consume_proto, DeserializedProtoRecord}; -fn get_schema_registry_url() -> String { +pub fn get_schema_registry_url() -> String { String::from("http://localhost:8081") } diff --git a/tests/blocking/schema_registry_calls.rs b/tests/blocking/schema_registry_calls.rs new file mode 100644 index 0000000..db2d39a --- /dev/null +++ b/tests/blocking/schema_registry_calls.rs @@ -0,0 +1,30 @@ +use crate::blocking::proto_tests::get_schema_registry_url; +use schema_registry_converter::blocking::schema_registry::{ + get_all_subjects, get_all_versions, perform_sr_call, SrSettings, +}; +use schema_registry_converter::schema_registry_common::SrCall; + +#[test] +fn test_get_all_subjects() { + let sr_settings = SrSettings::new(get_schema_registry_url()); + let result = get_all_subjects(&sr_settings).unwrap(); + assert!( + result.contains(&String::from("testavro-value")), + "List of subjects contains testavro-value" + ); +} + +#[test] +fn test_get_versions_for_testavro_value() { + let sr_settings = SrSettings::new(get_schema_registry_url()); + let result = get_all_versions(&sr_settings, String::from("testavro-value")).unwrap(); + assert_eq!(vec![1], result, "List of version is just one"); +} + +#[test] +fn test_get_schema_for_testavro_value() { + let sr_settings = SrSettings::new(get_schema_registry_url()); + let sr_call = SrCall::GetBySubjectAndVersion("testavro-value", 1); + let result = perform_sr_call(&sr_settings, sr_call).unwrap(); + assert_eq!(Some(1), result.version, "Returned schema has version 1"); +} diff --git a/tests/lib.rs b/tests/lib.rs index d48c005..b6a08da 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "kafka_test")] +pub mod async_impl; #[cfg(all(feature = "blocking", feature = "kafka_test"))] pub mod blocking; #[cfg(all(feature = "proto_raw", feature = "kafka_test"))]