Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functions to get all the subjects, and all versions of a subject.… #97

Merged
merged 1 commit into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ from the diagram.

## Example with consumer and producer using Avro (blocking)

Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an implementation
Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an
implementation
of either the `serde::Deserialize` or `serde::Serialize` trait to work. The examples are especially useful to update
from the 1.x.x version, when starting you probably want to use the async versions.

Expand Down Expand Up @@ -177,6 +178,13 @@ fn get_future_record_from_struct<'a>(
}
```

## Direct interaction with schema registry

Some functions have been opened so this library can be used to directly get all the subjects, all the version of a
subject, or the raw schema with a subject and version. For these see the
either [async](tests/async_impl/schema_registry_calls.rs) or [blocking](tests/blocking/schema_registry_calls.rs) version
of the integration tests.

## Example using to post schema to schema registry

```rust
Expand Down
118 changes: 105 additions & 13 deletions src/async_impl/schema_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::time::Duration;
use dashmap::DashMap;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{self, StreamExt};
use reqwest::header;
use reqwest::header::{HeaderName, ACCEPT, CONTENT_TYPE};
use reqwest::{header, RequestBuilder, Response};
use reqwest::{Client, ClientBuilder};
use serde_json::{json, Map, Value};

Expand Down Expand Up @@ -399,7 +399,7 @@ fn post_reference<'a>(
.boxed()
}

async fn perform_sr_call(
pub async fn perform_sr_call(
sr_settings: &SrSettings,
sr_call: SrCall<'_>,
) -> Result<RawRegisteredSchema, SRCError> {
Expand All @@ -420,6 +420,23 @@ async fn perform_sr_call(
}
}

async fn apply_authentication(
builder: RequestBuilder,
authentication: &SrAuthorization,
) -> Result<Response, reqwest::Error> {
match authentication {
SrAuthorization::None => builder.send().await,
SrAuthorization::Token(token) => builder.bearer_auth(token).send().await,
SrAuthorization::Basic(username, password) => {
let p = match password {
None => None,
Some(v) => Some(v),
};
builder.basic_auth(username, p).send().await
}
}
}

async fn perform_single_sr_call(
base_url: &str,
client: &Client,
Expand All @@ -437,17 +454,7 @@ async fn perform_single_sr_call(
.header(CONTENT_TYPE, "application/vnd.schemaregistry.v1+json")
.header(ACCEPT, "application/vnd.schemaregistry.v1+json"),
};
let call = match authentication {
SrAuthorization::None => builder.send().await,
SrAuthorization::Token(token) => builder.bearer_auth(token).send().await,
SrAuthorization::Basic(username, password) => {
let p = match password {
None => None,
Some(v) => Some(v),
};
builder.basic_auth(username, p).send().await
}
};
let call = apply_authentication(builder, authentication).await;
match call {
Ok(v) => match v.json::<RawRegisteredSchema>().await {
Ok(r) => Ok(r),
Expand All @@ -463,6 +470,91 @@ async fn perform_single_sr_call(
}
}

pub async fn get_all_subjects(sr_settings: &SrSettings) -> Result<Vec<String>, SRCError> {
let url_count = sr_settings.urls.len();
let mut n = 0;
loop {
let result = perform_single_subjects_call(
&sr_settings.urls[n],
&sr_settings.client,
&sr_settings.authorization,
)
.await;
if result.is_ok() || n + 1 == url_count {
break result;
}
n += 1
}
}

async fn perform_single_subjects_call(
base_url: &str,
client: &Client,
authentication: &SrAuthorization,
) -> Result<Vec<String>, SRCError> {
let url = format!("{}/subjects", base_url);
let builder = client.get(url);
let call = apply_authentication(builder, authentication).await;
match call {
Ok(v) => match v.json::<Vec<String>>().await {
Ok(r) => Ok(r),
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not parse to list of subjects, the http call failed, cause will give more information",
)),
},
Err(e) => Err(SRCError::retryable_with_cause(
e,
"http call to schema registry failed",
)),
}
}

pub async fn get_all_versions(
sr_settings: &SrSettings,
subject: String,
) -> Result<Vec<u32>, SRCError> {
let url_count = sr_settings.urls.len();
let mut n = 0;
loop {
let result = perform_single_versions_call(
&sr_settings.urls[n],
&sr_settings.client,
&sr_settings.authorization,
&subject,
)
.await;
if result.is_ok() || n + 1 == url_count {
break result;
}
n += 1
}
}

async fn perform_single_versions_call(
base_url: &str,
client: &Client,
authentication: &SrAuthorization,
subject: &String,
) -> Result<Vec<u32>, SRCError> {
let url = format!("{}/subjects/{}/versions", base_url, subject);
let builder = client.get(url);
let call = apply_authentication(builder, authentication).await;
match call {
Ok(v) => match v.json::<Vec<u32>>().await {
Ok(r) => Ok(r),
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not parse to list of versions, the http call failed, cause will give more information",
)),
},
Err(e) => Err(SRCError::retryable_with_cause(
e,
"http call to schema registry failed",
)),
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
113 changes: 100 additions & 13 deletions src/blocking/schema_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::str;
use std::time::Duration;

use dashmap::DashMap;
use reqwest::blocking::{Client, ClientBuilder};
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
use reqwest::header;
use reqwest::header::{HeaderName, ACCEPT, CONTENT_TYPE};
use serde_json::{json, Map, Value};
Expand Down Expand Up @@ -385,7 +385,7 @@ fn post_reference(
})
}

fn perform_sr_call(
pub fn perform_sr_call(
sr_settings: &SrSettings,
sr_call: SrCall,
) -> Result<RawRegisteredSchema, SRCError> {
Expand All @@ -405,6 +405,23 @@ fn perform_sr_call(
}
}

fn apply_authentication(
builder: RequestBuilder,
authentication: &SrAuthorization,
) -> Result<Response, reqwest::Error> {
match authentication {
SrAuthorization::None => builder.send(),
SrAuthorization::Token(token) => builder.bearer_auth(token).send(),
SrAuthorization::Basic(username, password) => {
let p = match password {
None => None,
Some(v) => Some(v),
};
builder.basic_auth(username, p).send()
}
}
}

fn perform_single_sr_call(
base_url: &str,
client: &Client,
Expand All @@ -422,17 +439,7 @@ fn perform_single_sr_call(
.header(CONTENT_TYPE, "application/vnd.schemaregistry.v1+json")
.header(ACCEPT, "application/vnd.schemaregistry.v1+json"),
};
let call = match authentication {
SrAuthorization::None => builder.send(),
SrAuthorization::Token(token) => builder.bearer_auth(token).send(),
SrAuthorization::Basic(username, password) => {
let p = match password {
None => None,
Some(v) => Some(v),
};
builder.basic_auth(username, p).send()
}
};
let call = apply_authentication(builder, authentication);
match call {
Ok(v) => match v.json::<RawRegisteredSchema>() {
Ok(r) => Ok(r),
Expand All @@ -448,6 +455,86 @@ fn perform_single_sr_call(
}
}

pub fn get_all_subjects(sr_settings: &SrSettings) -> Result<Vec<String>, SRCError> {
let url_count = sr_settings.urls.len();
let mut n = 0;
loop {
let result = perform_single_subjects_call(
&sr_settings.urls[n],
&sr_settings.client,
&sr_settings.authorization,
);
if result.is_ok() || n + 1 == url_count {
break result;
}
n += 1
}
}

fn perform_single_subjects_call(
base_url: &str,
client: &Client,
authentication: &SrAuthorization,
) -> Result<Vec<String>, SRCError> {
let url = format!("{}/subjects", base_url);
let builder = client.get(url);
let call = apply_authentication(builder, authentication);
match call {
Ok(v) => match v.json::<Vec<String>>() {
Ok(r) => Ok(r),
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not parse to list of subjects, the http call failed, cause will give more information",
)),
},
Err(e) => Err(SRCError::retryable_with_cause(
e,
"http call to schema registry failed",
)),
}
}

pub fn get_all_versions(sr_settings: &SrSettings, subject: String) -> Result<Vec<u32>, SRCError> {
let url_count = sr_settings.urls.len();
let mut n = 0;
loop {
let result = perform_single_versions_call(
&sr_settings.urls[n],
&sr_settings.client,
&sr_settings.authorization,
&subject,
);
if result.is_ok() || n + 1 == url_count {
break result;
}
n += 1
}
}

fn perform_single_versions_call(
base_url: &str,
client: &Client,
authentication: &SrAuthorization,
subject: &String,
) -> Result<Vec<u32>, SRCError> {
let url = format!("{}/subjects/{}/versions", base_url, subject);
let builder = client.get(url);
let call = apply_authentication(builder, authentication);
match call {
Ok(v) => match v.json::<Vec<u32>>() {
Ok(r) => Ok(r),
Err(e) => Err(SRCError::non_retryable_with_cause(
e,
"could not parse to list of versions, the http call failed, cause will give more information",
)),
},
Err(e) => Err(SRCError::retryable_with_cause(
e,
"http call to schema registry failed",
)),
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
16 changes: 8 additions & 8 deletions src/schema_registry_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ pub struct RegisteredSchema {

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct RawRegisteredSchema {
pub(crate) subject: Option<String>,
pub(crate) version: Option<u32>,
pub(crate) id: Option<u32>,
pub(crate) schema_type: Option<String>,
pub(crate) references: Option<Vec<RegisteredReference>>,
pub(crate) schema: Option<String>,
pub struct RawRegisteredSchema {
pub subject: Option<String>,
pub version: Option<u32>,
pub id: Option<u32>,
pub schema_type: Option<String>,
pub references: Option<Vec<RegisteredReference>>,
pub schema: Option<String>,
}

/// Intermediate result to just handle the byte transformation. When used in a decoder just the
Expand Down Expand Up @@ -158,7 +158,7 @@ impl SubjectNameStrategy {
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum SrCall<'a> {
pub enum SrCall<'a> {
GetById(u32),
GetLatest(&'a str),
GetBySubjectAndVersion(&'a str, u32),
Expand Down
1 change: 1 addition & 0 deletions tests/async_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod schema_registry_calls;
35 changes: 35 additions & 0 deletions tests/async_impl/schema_registry_calls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use schema_registry_converter::async_impl::schema_registry::{
get_all_subjects, get_all_versions, perform_sr_call, SrSettings,
};
use schema_registry_converter::schema_registry_common::SrCall;

fn get_schema_registry_url() -> String {
String::from("http://localhost:8081")
}

#[tokio::test]
async fn test_get_all_subjects() {
let sr_settings = SrSettings::new(get_schema_registry_url());
let result = get_all_subjects(&sr_settings).await.unwrap();
assert!(
result.contains(&String::from("testavro-value")),
"List of subjects contains testavro-value"
);
}

#[tokio::test]
async fn test_get_versions_for_testavro_value() {
let sr_settings = SrSettings::new(get_schema_registry_url());
let result = get_all_versions(&sr_settings, String::from("testavro-value"))
.await
.unwrap();
assert_eq!(vec![1], result, "List of version is just one");
}

#[tokio::test]
async fn test_get_schema_for_testavro_value() {
let sr_settings = SrSettings::new(get_schema_registry_url());
let sr_call = SrCall::GetBySubjectAndVersion("testavro-value", 1);
let result = perform_sr_call(&sr_settings, sr_call).await.unwrap();
assert_eq!(Some(1), result.version, "Returned schema has version 1");
}
1 change: 1 addition & 0 deletions tests/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pub mod kafka_producer;
mod proto_consumer;
#[cfg(feature = "proto_decoder")]
mod proto_tests;
mod schema_registry_calls;
Loading