Skip to content

Commit

Permalink
Add functions to get all the subjects, and all versions of a subject.…
Browse files Browse the repository at this point in the history
… As well as opening up the function to get a schema from schema registry.
  • Loading branch information
gklijs committed Oct 21, 2023
1 parent 13c9460 commit 96ccea2
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 36 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ from the diagram.

## Example with consumer and producer using Avro (blocking)

Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an implementation
Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an
implementation
of either the `serde::Deserialize` or `serde::Serialize` trait to work. The examples are especially useful to update
from the 1.x.x version, when starting you probably want to use the async versions.

Expand Down Expand Up @@ -177,6 +178,13 @@ fn get_future_record_from_struct<'a>(
}
```

## Direct interaction with schema registry

Some functions have been opened so this library can be used to directly get all the subjects, all the version of a
subject, or the raw schema with a subject and version. For these see the
either [async](tests/async_impl/schema_registry_calls.rs) or [blocking](tests/blocking/schema_registry_calls.rs) version
of the integration tests.

## Example using to post schema to schema registry

```rust
Expand Down
118 changes: 105 additions & 13 deletions src/async_impl/schema_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::time::Duration;
use dashmap::DashMap;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{self, StreamExt};
use reqwest::header;
use reqwest::header::{HeaderName, ACCEPT, CONTENT_TYPE};
use reqwest::{header, RequestBuilder, Response};
use reqwest::{Client, ClientBuilder};
use serde_json::{json, Map, Value};

Expand Down Expand Up @@ -399,7 +399,7 @@ fn post_reference<'a>(
.boxed()
}

async fn perform_sr_call(
pub async fn perform_sr_call(
sr_settings: &SrSettings,
sr_call: SrCall<'_>,
) -> Result<RawRegisteredSchema, SRCError> {
Expand All @@ -420,6 +420,23 @@ async fn perform_sr_call(
}
}

async fn apply_authentication(
builder: RequestBuilder,
authentication: &SrAuthorization,
) -> Result<Response, reqwest::Error> {
match authentication {
SrAuthorization::None => builder.send().await,
SrAuthorization::Token(token) => builder.bearer_auth(token).send().await,
SrAuthorization::Basic(username, password) => {
let p = match password {
None => None,
Some(v) => Some(v),
};
builder.basic_auth(username, p).send().await
}
}
}

async fn perform_single_sr_call(
base_url: &str,
client: &Client,
Expand All @@ -437,17 +454,7 @@ async fn perform_single_sr_call(
.header(CONTENT_TYPE, "application/vnd.schemaregistry.v1+json")
.header(ACCEPT, "application/vnd.schemaregistry.v1+json"),
};
let call = match authentication {
SrAuthorization::None => builder.send().await,
SrAuthorization::Token(token) => builder.bearer_auth(token).send().await,
SrAuthorization::Basic(username, password) => {
let p = match password {
None => None,
Some(v) => Some(v),
};
builder.basic_auth(username, p).send().await
}
};
let call = apply_authentication(builder, authentication).await;
match call {
Ok(v) => match v.json::<RawRegisteredSchema>().await {
Ok(r) => Ok(r),
Expand All @@ -463,6 +470,91 @@ async fn perform_single_sr_call(
}
}

pub async fn get_all_subjects(sr_settings: &SrSettings) -> Result<Vec<String>, SRCError> {
let url_count = sr_settings.urls.len();
let mut n = 0;
loop {
let result = perform_single_subjects_call(
&sr_settings.urls[n],
&sr_settings.client,
&sr_settings.authorization,
)
.await;
if result.is_ok() || n + 1 == url_count {
break result;
}
n += 1
}
}

async fn perform_single_subjects_call(
base_url: &str,
client: &Client,
authentication: &SrAuthorization,
) -> Result<Vec<String>, SRCError> {
let url = format!("{}/subjects", base_url);
let builder = client.get(url);
let call = apply_authentication(builder, authentication).await;
match call {
Ok(v) => match v.json::<Vec<String>>().await {
Ok(r) => Ok(r),
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not parse to list of subjects, the http call failed, cause will give more information",
)),
},
Err(e) => Err(SRCError::retryable_with_cause(
e,
"http call to schema registry failed",
)),
}
}

pub async fn get_all_versions(
sr_settings: &SrSettings,
subject: String,
) -> Result<Vec<u32>, SRCError> {
let url_count = sr_settings.urls.len();
let mut n = 0;
loop {
let result = perform_single_versions_call(
&sr_settings.urls[n],
&sr_settings.client,
&sr_settings.authorization,
&subject,
)
.await;
if result.is_ok() || n + 1 == url_count {
break result;
}
n += 1
}
}

async fn perform_single_versions_call(
base_url: &str,
client: &Client,
authentication: &SrAuthorization,
subject: &String,
) -> Result<Vec<u32>, SRCError> {
let url = format!("{}/subjects/{}/versions", base_url, subject);
let builder = client.get(url);
let call = apply_authentication(builder, authentication).await;
match call {
Ok(v) => match v.json::<Vec<u32>>().await {
Ok(r) => Ok(r),
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not parse to list of versions, the http call failed, cause will give more information",
)),
},
Err(e) => Err(SRCError::retryable_with_cause(
e,
"http call to schema registry failed",
)),
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
113 changes: 100 additions & 13 deletions src/blocking/schema_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::str;
use std::time::Duration;

use dashmap::DashMap;
use reqwest::blocking::{Client, ClientBuilder};
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
use reqwest::header;
use reqwest::header::{HeaderName, ACCEPT, CONTENT_TYPE};
use serde_json::{json, Map, Value};
Expand Down Expand Up @@ -385,7 +385,7 @@ fn post_reference(
})
}

fn perform_sr_call(
pub fn perform_sr_call(
sr_settings: &SrSettings,
sr_call: SrCall,
) -> Result<RawRegisteredSchema, SRCError> {
Expand All @@ -405,6 +405,23 @@ fn perform_sr_call(
}
}

fn apply_authentication(
builder: RequestBuilder,
authentication: &SrAuthorization,
) -> Result<Response, reqwest::Error> {
match authentication {
SrAuthorization::None => builder.send(),
SrAuthorization::Token(token) => builder.bearer_auth(token).send(),
SrAuthorization::Basic(username, password) => {
let p = match password {
None => None,
Some(v) => Some(v),
};
builder.basic_auth(username, p).send()
}
}
}

fn perform_single_sr_call(
base_url: &str,
client: &Client,
Expand All @@ -422,17 +439,7 @@ fn perform_single_sr_call(
.header(CONTENT_TYPE, "application/vnd.schemaregistry.v1+json")
.header(ACCEPT, "application/vnd.schemaregistry.v1+json"),
};
let call = match authentication {
SrAuthorization::None => builder.send(),
SrAuthorization::Token(token) => builder.bearer_auth(token).send(),
SrAuthorization::Basic(username, password) => {
let p = match password {
None => None,
Some(v) => Some(v),
};
builder.basic_auth(username, p).send()
}
};
let call = apply_authentication(builder, authentication);
match call {
Ok(v) => match v.json::<RawRegisteredSchema>() {
Ok(r) => Ok(r),
Expand All @@ -448,6 +455,86 @@ fn perform_single_sr_call(
}
}

pub fn get_all_subjects(sr_settings: &SrSettings) -> Result<Vec<String>, SRCError> {
let url_count = sr_settings.urls.len();
let mut n = 0;
loop {
let result = perform_single_subjects_call(
&sr_settings.urls[n],
&sr_settings.client,
&sr_settings.authorization,
);
if result.is_ok() || n + 1 == url_count {
break result;
}
n += 1
}
}

fn perform_single_subjects_call(
base_url: &str,
client: &Client,
authentication: &SrAuthorization,
) -> Result<Vec<String>, SRCError> {
let url = format!("{}/subjects", base_url);
let builder = client.get(url);
let call = apply_authentication(builder, authentication);
match call {
Ok(v) => match v.json::<Vec<String>>() {
Ok(r) => Ok(r),
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not parse to list of subjects, the http call failed, cause will give more information",
)),
},
Err(e) => Err(SRCError::retryable_with_cause(
e,
"http call to schema registry failed",
)),
}
}

pub fn get_all_versions(sr_settings: &SrSettings, subject: String) -> Result<Vec<u32>, SRCError> {
let url_count = sr_settings.urls.len();
let mut n = 0;
loop {
let result = perform_single_versions_call(
&sr_settings.urls[n],
&sr_settings.client,
&sr_settings.authorization,
&subject,
);
if result.is_ok() || n + 1 == url_count {
break result;
}
n += 1
}
}

fn perform_single_versions_call(
base_url: &str,
client: &Client,
authentication: &SrAuthorization,
subject: &String,
) -> Result<Vec<u32>, SRCError> {
let url = format!("{}/subjects/{}/versions", base_url, subject);
let builder = client.get(url);
let call = apply_authentication(builder, authentication);
match call {
Ok(v) => match v.json::<Vec<u32>>() {
Ok(r) => Ok(r),
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not parse to list of versions, the http call failed, cause will give more information",
)),
},
Err(e) => Err(SRCError::retryable_with_cause(
e,
"http call to schema registry failed",
)),
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
16 changes: 8 additions & 8 deletions src/schema_registry_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ pub struct RegisteredSchema {

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct RawRegisteredSchema {
pub(crate) subject: Option<String>,
pub(crate) version: Option<u32>,
pub(crate) id: Option<u32>,
pub(crate) schema_type: Option<String>,
pub(crate) references: Option<Vec<RegisteredReference>>,
pub(crate) schema: Option<String>,
pub struct RawRegisteredSchema {
pub subject: Option<String>,
pub version: Option<u32>,
pub id: Option<u32>,
pub schema_type: Option<String>,
pub references: Option<Vec<RegisteredReference>>,
pub schema: Option<String>,
}

/// Intermediate result to just handle the byte transformation. When used in a decoder just the
Expand Down Expand Up @@ -158,7 +158,7 @@ impl SubjectNameStrategy {
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum SrCall<'a> {
pub enum SrCall<'a> {
GetById(u32),
GetLatest(&'a str),
GetBySubjectAndVersion(&'a str, u32),
Expand Down
1 change: 1 addition & 0 deletions tests/async_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod schema_registry_calls;
35 changes: 35 additions & 0 deletions tests/async_impl/schema_registry_calls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use schema_registry_converter::async_impl::schema_registry::{
get_all_subjects, get_all_versions, perform_sr_call, SrSettings,
};
use schema_registry_converter::schema_registry_common::SrCall;

fn get_schema_registry_url() -> String {
String::from("http://localhost:8081")
}

#[tokio::test]
async fn test_get_all_subjects() {
let sr_settings = SrSettings::new(get_schema_registry_url());
let result = get_all_subjects(&sr_settings).await.unwrap();
assert!(
result.contains(&String::from("testavro-value")),
"List of subjects contains testavro-value"
);
}

#[tokio::test]
async fn test_get_versions_for_testavro_value() {
let sr_settings = SrSettings::new(get_schema_registry_url());
let result = get_all_versions(&sr_settings, String::from("testavro-value"))
.await
.unwrap();
assert_eq!(vec![1], result, "List of version is just one");
}

#[tokio::test]
async fn test_get_schema_for_testavro_value() {
let sr_settings = SrSettings::new(get_schema_registry_url());
let sr_call = SrCall::GetBySubjectAndVersion("testavro-value", 1);
let result = perform_sr_call(&sr_settings, sr_call).await.unwrap();
assert_eq!(Some(1), result.version, "Returned schema has version 1");
}
1 change: 1 addition & 0 deletions tests/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pub mod kafka_producer;
mod proto_consumer;
#[cfg(feature = "proto_decoder")]
mod proto_tests;
mod schema_registry_calls;
Loading

0 comments on commit 96ccea2

Please sign in to comment.