Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade rs-consul to latest http, hyper, opentelemetry #46

Merged
merged 20 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/format-code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
format-code:
runs-on: "ubuntu-latest"
container: rust:1.77
container: rust:1.79

steps:
- name: Checkout the code on merge
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
lint:
runs-on: "ubuntu-latest"
container: rust:1.77
container: rust:1.79

steps:
- uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
matrix:
features: ["", "--no-default-features --features rustls-native"]
runs-on: "ubuntu-latest"
container: rust:1.74
container: rust:1.79

steps:
- uses: actions/checkout@v2
Expand All @@ -26,7 +26,7 @@ jobs:
matrix:
features: ["", "--no-default-features --features rustls-native"]
runs-on: "ubuntu-latest"
container: rust:1.77
container: rust:1.79
services:
consul:
image: consul:1.11.11
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
jobs:
test:
runs-on: ubuntu-latest
container: rust:1.77
container: rust:1.79
services:
consul:
image: consul:1.11.11
Expand All @@ -25,7 +25,7 @@ jobs:

dry-run:
runs-on: ubuntu-latest
container: rust:1.77
container: rust:1.79

steps:
- uses: actions/checkout@v2
Expand All @@ -36,7 +36,7 @@ jobs:
publish:
needs: [test, dry-run]
runs-on: ubuntu-latest
container: rust:1.74
container: rust:1.79
environment: crates-publish

steps:
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

## 0.7.0 - 2024-06-25

### Changed

- `opentelemetry` updated to version `0.23` from `0.22`.
- `http` updated to version `1.0` from `0.2`.
- `hyper` updated to version `1.0` from `0.14`.
- `hyper-rustls` updated to version `0.27` from `0.24`.

## 0.6.0 - 2024-04-01

### Changed
Expand Down
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rs-consul"
version = "0.6.0"
version = "0.7.0"
authors = ["Roblox"]
edition = "2021"
description = "This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/)"
Expand All @@ -20,11 +20,13 @@ trace = ["dep:opentelemetry"]
[dependencies]
base64 = "0.22"
futures = "0.3"
http = "0.2"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = { version = "0.24" }
http = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["full"] }
hyper-rustls = { version = "0.27" }
hyper-util = { version = "0.1", features = ["client", "client-legacy", "tokio", "http2"] }
lazy_static = { version = "1", optional = true }
opentelemetry = { version = "0.22", optional = true }
opentelemetry = { version = "0.23", optional = true }
prometheus = { version = "0.13", optional = true }
quick-error = "2"
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.77
1.79
1 change: 0 additions & 1 deletion src/hyper_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#![cfg(feature = "trace")]
use hyper::Version;
use opentelemetry::{
global::{BoxedSpan, BoxedTracer},
Expand Down
92 changes: 60 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ SOFTWARE.
//! This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/)
#![deny(missing_docs)]

use http_body_util::BodyExt;
use std::collections::HashMap;
use std::convert::Infallible;
use std::time::{Duration, Instant};
use std::{env, str::Utf8Error};

use base64::Engine;
use hyper::{body::Buf, client::HttpConnector, Body, Method};
use http_body_util::combinators::BoxBody;
use http_body_util::{Empty, Full};
use hyper::body::Bytes;
use hyper::{body::Buf, Method};
use hyper_util::client::legacy::{connect::HttpConnector, Builder, Client};
#[cfg(any(feature = "rustls-native", feature = "rustls-webpki"))]
#[cfg(feature = "metrics")]
use lazy_static::lazy_static;
Expand Down Expand Up @@ -66,7 +72,7 @@ quick_error! {
/// The request was invalid and could not be converted into a proper http request.
RequestError(err: http::Error) {}
/// The consul server response could not be converted into a proper http response.
ResponseError(err: hyper::Error) {}
ResponseError(err: hyper_util::client::legacy::Error) {}
/// The consul server response was invalid.
InvalidResponse(err: hyper::Error) {}
/// The consul server response could not be deserialized from json.
Expand Down Expand Up @@ -151,7 +157,7 @@ const GET_SESSION_METHOD_NAME: &str = "get_session";
pub(crate) type Result<T> = std::result::Result<T, ConsulError>;

/// The config necessary to create a new consul client.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
/// The address of the consul server. This must include the protocol to connect over eg. http or https.
pub address: String,
Expand All @@ -160,7 +166,12 @@ pub struct Config {

/// The hyper builder for the internal http client.
#[serde(skip)]
pub hyper_builder: hyper::client::Builder,
#[serde(default = "default_builder")]
pub hyper_builder: hyper_util::client::legacy::Builder,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the right way to get rid of legacy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no replacement at the moment. The client has some issues which is why it has been moved out of hyper to hyper-util. A new client will be built at some point.

}

fn default_builder() -> Builder {
Builder::new(hyper_util::rt::TokioExecutor::new())
}

impl Config {
Expand All @@ -176,7 +187,7 @@ impl Config {
Config {
address: addr,
token: Some(token),
hyper_builder: Default::default(),
hyper_builder: Builder::new(hyper_util::rt::TokioExecutor::new()),
}
}
}
Expand Down Expand Up @@ -224,42 +235,44 @@ impl Drop for Lock<'_> {
#[derive(Debug)]
/// This struct defines the consul client and allows access to the consul api via method syntax.
pub struct Consul {
https_client: hyper::Client<hyper_rustls::HttpsConnector<HttpConnector>, Body>,
https_client: Client<hyper_rustls::HttpsConnector<HttpConnector>, BoxBody<Bytes, Infallible>>,
config: Config,
#[cfg(feature = "trace")]
tracer: BoxedTracer,
}

fn https_connector() -> hyper_rustls::HttpsConnector<HttpConnector> {
fn https_connector() -> Result<hyper_rustls::HttpsConnector<HttpConnector>> {
#[cfg(feature = "rustls-webpki")]
return hyper_rustls::HttpsConnectorBuilder::new()
return Ok(hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.build();
.build());
#[allow(unreachable_code)]
// Clippy doesn't realize if the feature is disabled, this code would execute.
hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
Ok(hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()?
.https_or_http()
.enable_http1()
.build()
.build())
}

impl Consul {
/// Creates a new instance of [`Consul`](consul::Consul).
/// This is the entry point for this crate.
/// #Arguments:
/// - [Config](consul::Config)
pub fn new(config: Config) -> Self {
let https = https_connector();
let https_client = config.hyper_builder.build::<_, hyper::Body>(https);
Consul {
pub fn new(config: Config) -> Result<Self> {
let https = https_connector()?;
let https_client = config
.hyper_builder
.build::<_, BoxBody<Bytes, Infallible>>(https);
Ok(Consul {
https_client,
config,
#[cfg(feature = "trace")]
tracer: global::tracer("consul"),
}
})
}

/// Reads a key from Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information.
Expand All @@ -270,7 +283,12 @@ impl Consul {
pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result<Vec<ReadKeyResponse>> {
let req = self.build_read_key_req(request);
let (mut response_body, _index) = self
.execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME)
.execute_request(
req,
BoxBody::new(http_body_util::Empty::<Bytes>::new()),
None,
READ_KEY_METHOD_NAME,
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
serde_json::from_slice::<Vec<ReadKeyResponse>>(&bytes)
Expand Down Expand Up @@ -310,7 +328,7 @@ impl Consul {
let (mut response_body, index) = self
.execute_request(
req,
Body::from(value),
BoxBody::new(Full::<Bytes>::new(Bytes::from(value))),
None,
CREATE_OR_UPDATE_KEY_METHOD_NAME,
)
Expand Down Expand Up @@ -398,7 +416,12 @@ impl Consul {
url = add_namespace_and_datacenter(url, request.namespace, request.datacenter);
req = req.uri(url);
let (mut response_body, _index) = self
.execute_request(req, hyper::Body::empty(), None, DELETE_KEY_METHOD_NAME)
.execute_request(
req,
BoxBody::new(Empty::<Bytes>::new()),
None,
DELETE_KEY_METHOD_NAME,
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed)
Expand Down Expand Up @@ -444,7 +467,7 @@ impl Consul {
let (_watch, index) = self
.execute_request(
lock_index_req,
hyper::Body::empty(),
BoxBody::new(http_body_util::Empty::<Bytes>::new()),
None,
GET_LOCK_METHOD_NAME,
)
Expand Down Expand Up @@ -486,7 +509,7 @@ impl Consul {
let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?;
self.execute_request(
request,
payload.into(),
BoxBody::new(Full::<Bytes>::new(Bytes::from(payload.into_bytes()))),
Some(Duration::from_secs(5)),
REGISTER_ENTITY_METHOD_NAME,
)
Expand All @@ -506,7 +529,7 @@ impl Consul {
let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?;
self.execute_request(
request,
payload.into(),
BoxBody::new(Full::<Bytes>::new(Bytes::from(payload.into_bytes()))),
Some(Duration::from_secs(5)),
DEREGISTER_ENTITY_METHOD_NAME,
)
Expand Down Expand Up @@ -534,7 +557,7 @@ impl Consul {
let (mut response_body, index) = self
.execute_request(
request,
hyper::Body::empty(),
BoxBody::new(Empty::<Bytes>::new()),
query_opts.timeout,
GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME,
)
Expand Down Expand Up @@ -566,7 +589,7 @@ impl Consul {
let (mut response_body, index) = self
.execute_request(
req,
hyper::Body::empty(),
BoxBody::new(Empty::<Bytes>::new()),
query_opts.timeout,
GET_SERVICE_NODES_METHOD_NAME,
)
Expand Down Expand Up @@ -684,7 +707,9 @@ impl Consul {
let (mut response_body, _index) = self
.execute_request(
req,
hyper::Body::from(create_session_json),
BoxBody::new(Full::<Bytes>::new(Bytes::from(
create_session_json.into_bytes(),
))),
None,
GET_SESSION_METHOD_NAME,
)
Expand Down Expand Up @@ -718,7 +743,7 @@ impl Consul {
async fn execute_request<'a>(
&self,
req: http::request::Builder,
body: hyper::Body,
body: BoxBody<Bytes, Infallible>,
duration: Option<std::time::Duration>,
request_name: &str,
) -> Result<(Box<dyn Buf>, u64)> {
Expand Down Expand Up @@ -764,9 +789,12 @@ impl Consul {
if status != hyper::StatusCode::OK {
record_failure_metric_if_enabled(&method, request_name);

let mut response_body = hyper::body::aggregate(response.into_body())
let mut response_body = response
.into_body()
.collect()
.await
.map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?;
.map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?
.aggregate();
let bytes = response_body.copy_to_bytes(response_body.remaining());
let resp = std::str::from_utf8(&bytes)
.map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?;
Expand All @@ -780,7 +808,7 @@ impl Consul {
None => 0,
};

match hyper::body::aggregate(response.into_body()).await {
match response.into_body().collect().await.map(|b| b.aggregate()) {
Ok(body) => Ok((Box::new(body), index)),
Err(e) => {
record_failure_metric_if_enabled(&method, request_name);
Expand Down Expand Up @@ -974,7 +1002,7 @@ mod tests {
.iter()
.map(|sn| sn.service.address.clone())
.collect();
let expected_addresses = vec![
let expected_addresses = [
"1.1.1.1".to_string(),
"2.2.2.2".to_string(),
"3.3.3.3".to_string(),
Expand Down Expand Up @@ -1261,7 +1289,7 @@ mod tests {

fn get_client() -> Consul {
let conf: Config = Config::from_env();
Consul::new(conf)
Consul::new(conf).unwrap()
}

async fn create_or_update_key_value(
Expand Down
Loading