From d649ec87ce9352b0e03357c92a6829df98ec90ef Mon Sep 17 00:00:00 2001 From: Jari Maijenburg Date: Tue, 5 Mar 2024 18:29:02 +0100 Subject: [PATCH] Copied recent changes over to R4B and STU3 --- crates/fhir-sdk/src/client/r4b/mod.rs | 41 ++--- crates/fhir-sdk/src/client/r4b/patch.rs | 10 +- crates/fhir-sdk/src/client/r4b/search/mod.rs | 62 ++++++- .../fhir-sdk/src/client/r4b/search/paging.rs | 165 ++++++++--------- crates/fhir-sdk/src/client/r4b/transaction.rs | 8 +- crates/fhir-sdk/src/client/stu3/mod.rs | 43 ++--- crates/fhir-sdk/src/client/stu3/patch.rs | 10 +- crates/fhir-sdk/src/client/stu3/search/mod.rs | 62 ++++++- .../fhir-sdk/src/client/stu3/search/paging.rs | 167 +++++++++--------- .../fhir-sdk/src/client/stu3/search/params.rs | 32 ++-- .../fhir-sdk/src/client/stu3/transaction.rs | 8 +- crates/fhir-sdk/tests/client-r4b.rs | 62 ++++++- crates/fhir-sdk/tests/client-stu3.rs | 64 ++++++- 13 files changed, 475 insertions(+), 259 deletions(-) diff --git a/crates/fhir-sdk/src/client/r4b/mod.rs b/crates/fhir-sdk/src/client/r4b/mod.rs index 52a968bb..c451293c 100644 --- a/crates/fhir-sdk/src/client/r4b/mod.rs +++ b/crates/fhir-sdk/src/client/r4b/mod.rs @@ -14,7 +14,6 @@ use fhir_model::{ TypedResource, }, types::Reference, - JSON_MIME_TYPE, }, ParsedReference, }; @@ -28,28 +27,26 @@ use self::{ patch::{PatchViaFhir, PatchViaJson}, transaction::BatchTransaction, }; -use super::{misc, Client, Error, FhirR4B}; - -/// FHIR MIME-type this client uses. -const MIME_TYPE: &str = JSON_MIME_TYPE; +use super::{misc, Client, Error, FhirR4B, FhirVersion}; impl Client { /// Get the server's capabilities. Fails if the respective FHIR version is /// not supported at all. pub async fn capabilities(&self) -> Result { let url = self.url(&["metadata"]); - let request = self.0.client.get(url).header(header::ACCEPT, MIME_TYPE); - let response = self.run_request(request).await?; + self.fetch_resource(url).await + } + + async fn fetch_resource>(&self, url: Url) -> Result { + let response = self.fetch_url(url).await?; response.body().await } /// Read any resource from any URL. async fn read_generic>(&self, url: Url) -> Result, Error> { - let request = self.0.client.get(url).header(header::ACCEPT, MIME_TYPE); - - let response = self.run_request(request).await?; + let response = self.fetch_url(url).await?; if [StatusCode::NOT_FOUND, StatusCode::GONE].contains(&response.status()) { return Ok(None); @@ -118,8 +115,8 @@ impl Client { .0 .client .post(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, MIME_TYPE) + .header(header::ACCEPT, FhirR4B::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, FhirR4B::JSON_MIME_TYPE) .json(resource); let response = self.run_request(request).await?; @@ -147,8 +144,8 @@ impl Client { .0 .client .put(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, MIME_TYPE) + .header(header::ACCEPT, FhirR4B::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, FhirR4B::JSON_MIME_TYPE) .json(resource); if conditional { let version_id = resource @@ -186,7 +183,7 @@ impl Client { /// Delete a FHIR resource on the server. pub async fn delete(&self, resource_type: ResourceType, id: &str) -> Result<(), Error> { let url = self.url(&[resource_type.as_str(), id]); - let request = self.0.client.delete(url).header(header::ACCEPT, MIME_TYPE); + let request = self.0.client.delete(url).header(header::ACCEPT, FhirR4B::JSON_MIME_TYPE); let response = self.run_request(request).await?; @@ -207,22 +204,16 @@ impl Client { /// resources for an `Encounter` record. pub async fn operation_encounter_everything(&self, id: &str) -> Result { let url = self.url(&["Encounter", id, "$everything"]); - let request = self.0.client.get(url).header(header::ACCEPT, MIME_TYPE); - let response = self.run_request(request).await?; - - response.body().await + self.fetch_resource(url).await } /// Operation `$everything` on `Patient`, returning a Bundle with all /// resources for an `Patient` record. pub async fn operation_patient_everything(&self, id: &str) -> Result { let url = self.url(&["Patient", id, "$everything"]); - let request = self.0.client.get(url).header(header::ACCEPT, MIME_TYPE); - let response = self.run_request(request).await?; - - response.body().await + self.fetch_resource(url).await } /// Operation `$match` on `Patient`, returning matches for Patient records @@ -266,8 +257,8 @@ impl Client { .0 .client .post(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, MIME_TYPE) + .header(header::ACCEPT, FhirR4B::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, FhirR4B::JSON_MIME_TYPE) .json(¶meters); let response = self.run_request(request).await?; diff --git a/crates/fhir-sdk/src/client/r4b/patch.rs b/crates/fhir-sdk/src/client/r4b/patch.rs index 7176e90e..a25482b5 100644 --- a/crates/fhir-sdk/src/client/r4b/patch.rs +++ b/crates/fhir-sdk/src/client/r4b/patch.rs @@ -6,7 +6,9 @@ use fhir_model::r4b::resources::{ use reqwest::header::{self, HeaderValue}; use serde::Serialize; -use super::{Client, Error, FhirR4B, MIME_TYPE}; +use crate::client::FhirVersion; + +use super::{Client, Error, FhirR4B}; /// Builder for a PATCH request via FHIRPath for a FHIR resource. #[derive(Debug, Clone)] @@ -234,8 +236,8 @@ impl<'a> PatchViaFhir<'a> { .0 .client .patch(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, HeaderValue::from_static(MIME_TYPE)) + .header(header::ACCEPT, FhirR4B::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, HeaderValue::from_static(FhirR4B::JSON_MIME_TYPE)) .json(¶meters); let response = self.client.run_request(request).await?; @@ -359,7 +361,7 @@ impl<'a> PatchViaJson<'a> { .0 .client .patch(url) - .header(header::ACCEPT, MIME_TYPE) + .header(header::ACCEPT, FhirR4B::JSON_MIME_TYPE) .header(header::CONTENT_TYPE, HeaderValue::from_static("application/json-patch+json")) .json(&self.operations); diff --git a/crates/fhir-sdk/src/client/r4b/search/mod.rs b/crates/fhir-sdk/src/client/r4b/search/mod.rs index 636a3d2c..141705b8 100644 --- a/crates/fhir-sdk/src/client/r4b/search/mod.rs +++ b/crates/fhir-sdk/src/client/r4b/search/mod.rs @@ -1,27 +1,77 @@ //! Client search implementation. +use crate::client::search::NextPageCursor; use crate::client::{search::SearchExecutor, Client, Error, FhirR4B, SearchParameters}; -use fhir_model::r4b::resources::{DomainResource, NamedResource, Resource}; -use futures::Stream; -use paging::Paged; +use async_trait::async_trait; +use fhir_model::r4b::resources::{Bundle, DomainResource, NamedResource, Resource}; +use paging::{Page, Unpaged}; #[allow(unused_imports)] pub use params::*; +use reqwest::Url; +use tracing::warn; mod paging; mod params; +#[async_trait] impl SearchExecutor for Client where R: NamedResource + DomainResource + TryFrom + 'static, { - fn execute_search( + #[allow(refining_impl_trait)] + async fn search_paged( self, params: SearchParameters, - ) -> impl Stream> + Send + 'static { + page_size: Option, + ) -> Result<(Page, Option>), Error> { let mut url = self.url(&[R::TYPE.as_str()]); url.query_pairs_mut().extend_pairs(params.into_queries()).finish(); - Paged::new(self, url) + if let Some(page_size) = page_size { + url.query_pairs_mut().append_pair("_count", &page_size.to_string()); + } + + self.fetch_next_page(url).await } + + #[allow(refining_impl_trait)] + async fn fetch_next_page( + self, + url: Url, + ) -> Result<(Page, Option>), Error> { + let searchset: Bundle = self.fetch_resource(url).await?; + + let cursor = match find_next_page_url(&searchset) { + Some(Ok(u)) => Some(NextPageCursor::new(self.clone(), u)), + Some(Err(e)) => { + warn!("Unable to parse next page URL: {e}"); + + None + } + _ => None, + }; + + let page = Page::from_searchset(self, searchset); + + Ok((page, cursor)) + } + + #[allow(refining_impl_trait)] + async fn search_unpaged(self, params: SearchParameters) -> Result, Error> { + let mut url = self.url(&[R::TYPE.as_str()]); + url.query_pairs_mut().extend_pairs(params.into_queries()).finish(); + + let searchset: Bundle = self.fetch_resource(url.clone()).await?; + + Ok(Unpaged::from_searchset(self, searchset)) + } +} + +/// Find the URL of the next page of the results returned in the Bundle. +pub(self) fn find_next_page_url(bundle: &Bundle) -> Option> { + let url_str = + bundle.link.iter().flatten().find(|link| link.relation == "next").map(|link| &link.url)?; + + Some(Url::parse(url_str).map_err(|_| Error::UrlParse(url_str.to_string()))) } diff --git a/crates/fhir-sdk/src/client/r4b/search/paging.rs b/crates/fhir-sdk/src/client/r4b/search/paging.rs index 0dceeb9a..133c8a93 100644 --- a/crates/fhir-sdk/src/client/r4b/search/paging.rs +++ b/crates/fhir-sdk/src/client/r4b/search/paging.rs @@ -6,37 +6,37 @@ use fhir_model::r4b::{ codes::{BundleType, SearchEntryMode}, resources::{Bundle, BundleEntry, DomainResource, NamedResource, Resource, TypedResource}, }; -use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt}; +use futures::{future::BoxFuture, ready, Future, FutureExt, Stream, StreamExt}; use reqwest::Url; -use crate::client::r4b::references::populate_reference_targets_internal; +use crate::client::{r4b::references::populate_reference_targets_internal, search::Paged}; -use super::{Client, Error, FhirR4B}; +use super::{find_next_page_url, Client, Error, FhirR4B}; -/// Results of a query that can be paged or given via URL only. The resources +/// Unwraps search pages into a single stream of resources. The resources /// can be consumed via the `Stream`/`StreamExt` traits. -pub struct Paged { +pub struct Unpaged { /// The FHIR client to make further requests for the next pages. client: Client, - /// The URL of the next page. This is opaque and can be anything the server - /// wants. The client ensures it accesses the same server only. - next_url: Option, - /// The current set of search matches. - matches: Option>, + /// The current page of matches. + page: Page, /// Current future to retrieve the next page. - future_next_page: Option>>, + future_next_page: Option, Error>>>, } -impl Paged { - /// Start up a new Paged stream. - pub(crate) fn new(client: Client, url: Url) -> Self { - let future_next_page = Some(fetch_resource(client.clone(), url).boxed()); +impl Unpaged +where + R: NamedResource + DomainResource + TryFrom + 'static, +{ + /// Start up a new Unpaged stream. + pub fn from_searchset(client: Client, searchset: Bundle) -> Self { + let page = Page::from_searchset(client.clone(), searchset); - Self { client, next_url: None, matches: None, future_next_page } + Self { client, page, future_next_page: None } } } -impl Stream for Paged +impl Stream for Unpaged where R: NamedResource + DomainResource + TryFrom + 'static, { @@ -46,95 +46,70 @@ where mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let span = tracing::trace_span!("Paged::poll_next"); + let span = tracing::trace_span!("Unpaged::poll_next"); let _span_guard = span.enter(); // If there are still matches left, get the next one - if let Some(matches) = self.matches.as_mut() { - tracing::trace!("Paged::matches is set, polling for next match"); - if let Poll::Ready(res) = matches.poll_next_unpin(cx) { + if !self.page.is_empty() { + tracing::trace!("Unpaged::page is not empty, polling for next match"); + if let Poll::Ready(res) = self.page.poll_next_unpin(cx) { if let Some(r) = res { - tracing::debug!("Next match in Paged::matches available"); + tracing::debug!("Next match in Unpaged::matches available"); return Poll::Ready(Some(r)); } else { - tracing::debug!("Paged::matches is empty, waiting for next page"); - self.matches = None; + tracing::debug!("Unpaged::matches is empty, waiting for next page"); } } - // If there are no more matches and there is a next page future, check if it's ready - } else if let Some(future_next_page) = self.future_next_page.as_mut() { - tracing::trace!("Paged::future_next_page is set, polling for next page"); + } + + if let Some(future_next_page) = self.future_next_page.as_mut() { + tracing::trace!("Unpaged::future_next_page is set, polling for next page"); if let Poll::Ready(next_page) = future_next_page.as_mut().poll(cx) { self.future_next_page = None; tracing::debug!("Next page fetched and ready"); - // Get the Bundle or error out. - let bundle = match next_page { - Ok(bundle) => bundle, - Err(err) => return Poll::Ready(Some(Err(err))), - }; - - // Parse the next page's URL or error out. - if let Some(next_url_string) = find_next_page_url(&bundle) { - let Ok(next_url) = Url::parse(next_url_string) else { - tracing::error!("Could not parse next page URL"); - return Poll::Ready(Some(Err(Error::UrlParse(next_url_string.clone())))); - }; - self.next_url = Some(next_url); - } + self.page = match next_page { + Ok(page) => page, + Err(e) => { + tracing::error!("Fetching next page returned error: {}", e); - // Save the `BundleEntry`s. - self.matches = Some(SearchMatches::from_searchset(self.client.clone(), bundle)); + return Poll::Ready(Some(Err(e))); + } + }; } // Start retrieving the next page if we have a next URL and there is no next page being fetched. - } else if let Some(next_url) = self.next_url.as_ref() { + } else if let Some(next_page) = self.page.next_page() { tracing::debug!("Current page has next URL, starting to fetch next page"); - self.future_next_page = - Some(fetch_resource(self.client.clone(), next_url.clone()).boxed()); - self.next_url = None; + self.future_next_page = Some(next_page.boxed()); } // Else check if all resources were consumed or if we are waiting for new // resources to arrive. - if self.matches.is_some() { - tracing::trace!("Paged results waiting for remaining resources in current page"); + if !self.page.is_empty() { + tracing::trace!("Unpaged results waiting for remaining resources in current page"); cx.waker().wake_by_ref(); Poll::Pending } else if self.future_next_page.is_some() { - tracing::trace!("Paged results waiting for response to next URL fetch"); - cx.waker().wake_by_ref(); - Poll::Pending - } else if self.next_url.is_some() { - tracing::trace!("Paged results waiting to fetch next URL"); + tracing::trace!("Unpaged results waiting for response to next URL fetch"); cx.waker().wake_by_ref(); Poll::Pending } else { - tracing::debug!("Paged results exhausted"); + tracing::debug!("Unpaged results exhausted"); Poll::Ready(None) } } fn size_hint(&self) -> (usize, Option) { - if let Some(matches) = &self.matches { - let (page_min, page_max) = matches.size_hint(); + let (page_min, page_max) = self.page.size_hint(); - if self.next_url.is_some() { - return (page_min, None); - } else { - return (page_min, page_max); - } + if self.page.next_page().is_some() { + (page_min, None) + } else { + (page_min, page_max) } - - (0, None) } } -/// Find the URL of the next page of the results returned in the Bundle. -fn find_next_page_url(bundle: &Bundle) -> Option<&String> { - bundle.link.iter().flatten().find(|link| link.relation == "next").map(|link| &link.url) -} - -/// Query a resource from a given URL. async fn fetch_resource>( client: Client, url: Url, @@ -151,12 +126,11 @@ async fn fetch_resource>( resource.ok_or_else(|| Error::ResourceNotFound(url_str)) } -impl std::fmt::Debug for Paged { +impl std::fmt::Debug for Unpaged { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Paged") + f.debug_struct("Unpaged") .field("client", &self.client) - .field("next_url", &self.next_url) - .field("matches", &self.matches.as_ref().map(|_| "_")) + .field("page", &self.page) .field("future_next_page", &self.future_next_page.as_ref().map(|_| "_")) .finish() } @@ -165,18 +139,18 @@ impl std::fmt::Debug for Paged { /// Stream that yields each match entry in a searchset Bundle. Tries to fetch entries from the fullUrl property /// if the resource field is empty. Fills resource reference target fields with the referred to resources, /// if available in the Bundle. -struct SearchMatches { +pub struct Page { client: Client, bundle: Bundle, matches: VecDeque, future_resource: Option>>, } -impl SearchMatches +impl Page where - R: NamedResource + DomainResource + TryFrom, + R: NamedResource + DomainResource + TryFrom + 'static, { - pub fn from_searchset(client: Client, bundle: Bundle) -> SearchMatches { + pub fn from_searchset(client: Client, bundle: Bundle) -> Page { assert!( bundle.r#type == BundleType::Searchset, "unable to get search matches from non-searchset Bundles" @@ -199,9 +173,31 @@ where Self { client, bundle, matches, future_resource: None } } + + fn is_empty(&self) -> bool { + self.matches.is_empty() + } } -impl Stream for SearchMatches +impl Paged for Page +where + R: NamedResource + DomainResource + TryFrom + 'static, +{ + fn next_page(&self) -> Option> + 'static> { + let next_url = find_next_page_url(&self.bundle)?; + let client = self.client.clone(); + + let fut = async move { + let searchset: Bundle = client.fetch_resource(next_url?).await?; + + Ok(Page::from_searchset(client, searchset)) + }; + + Some(fut.boxed()) + } +} + +impl Stream for Page where R: NamedResource + DomainResource + TryFrom + 'static, { @@ -278,3 +274,14 @@ where (size, Some(size)) } } + +impl std::fmt::Debug for Page { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Page") + .field("client", &self.client) + .field("bundle", &self.bundle) + .field("matches", &self.matches) + .field("future_resource", &self.future_resource.as_ref().map(|_| "_")) + .finish() + } +} diff --git a/crates/fhir-sdk/src/client/r4b/transaction.rs b/crates/fhir-sdk/src/client/r4b/transaction.rs index 8938650c..d1eef4b3 100644 --- a/crates/fhir-sdk/src/client/r4b/transaction.rs +++ b/crates/fhir-sdk/src/client/r4b/transaction.rs @@ -8,7 +8,9 @@ use fhir_model::r4b::{ use reqwest::header::{self, HeaderValue}; use uuid::Uuid; -use super::{Client, Error, FhirR4B, MIME_TYPE}; +use crate::client::FhirVersion; + +use super::{Client, Error, FhirR4B}; /// A batch/transaction request builder. #[derive(Debug, Clone)] @@ -145,8 +147,8 @@ impl BatchTransaction { .0 .client .post(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, HeaderValue::from_static(MIME_TYPE)) + .header(header::ACCEPT, FhirR4B::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, HeaderValue::from_static(FhirR4B::JSON_MIME_TYPE)) .json(&bundle); let response = self.client.run_request(request).await?; diff --git a/crates/fhir-sdk/src/client/stu3/mod.rs b/crates/fhir-sdk/src/client/stu3/mod.rs index 0e2f56bd..82948596 100644 --- a/crates/fhir-sdk/src/client/stu3/mod.rs +++ b/crates/fhir-sdk/src/client/stu3/mod.rs @@ -1,4 +1,4 @@ -//! FHIR Stu3 client implementation. +//! FHIR STU3 client implementation. mod patch; mod references; @@ -13,7 +13,6 @@ use fhir_model::{ ParametersParameter, ParametersParameterValue, Patient, Resource, ResourceType, }, types::Reference, - JSON_MIME_TYPE, }, ParsedReference, }; @@ -27,28 +26,26 @@ use self::{ patch::{PatchViaFhir, PatchViaJson}, transaction::BatchTransaction, }; -use super::{misc, Client, Error, FhirStu3}; - -/// FHIR MIME-type this client uses. -const MIME_TYPE: &str = JSON_MIME_TYPE; +use super::{misc, Client, Error, FhirStu3, FhirVersion}; impl Client { /// Get the server's capabilities. Fails if the respective FHIR version is /// not supported at all. pub async fn capabilities(&self) -> Result { let url = self.url(&["metadata"]); - let request = self.0.client.get(url).header(header::ACCEPT, MIME_TYPE); - let response = self.run_request(request).await?; + self.fetch_resource(url).await + } + + async fn fetch_resource>(&self, url: Url) -> Result { + let response = self.fetch_url(url).await?; response.body().await } /// Read any resource from any URL. async fn read_generic>(&self, url: Url) -> Result, Error> { - let request = self.0.client.get(url).header(header::ACCEPT, MIME_TYPE); - - let response = self.run_request(request).await?; + let response = self.fetch_url(url).await?; if [StatusCode::NOT_FOUND, StatusCode::GONE].contains(&response.status()) { return Ok(None); @@ -109,8 +106,8 @@ impl Client { .0 .client .post(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, MIME_TYPE) + .header(header::ACCEPT, FhirStu3::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, FhirStu3::JSON_MIME_TYPE) .json(resource); let response = self.run_request(request).await?; @@ -138,8 +135,8 @@ impl Client { .0 .client .put(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, MIME_TYPE) + .header(header::ACCEPT, FhirStu3::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, FhirStu3::JSON_MIME_TYPE) .json(resource); if conditional { let version_id = resource @@ -177,7 +174,7 @@ impl Client { /// Delete a FHIR resource on the server. pub async fn delete(&self, resource_type: ResourceType, id: &str) -> Result<(), Error> { let url = self.url(&[resource_type.as_str(), id]); - let request = self.0.client.delete(url).header(header::ACCEPT, MIME_TYPE); + let request = self.0.client.delete(url).header(header::ACCEPT, FhirStu3::JSON_MIME_TYPE); let response = self.run_request(request).await?; @@ -198,22 +195,16 @@ impl Client { /// resources for an `Encounter` record. pub async fn operation_encounter_everything(&self, id: &str) -> Result { let url = self.url(&["Encounter", id, "$everything"]); - let request = self.0.client.get(url).header(header::ACCEPT, MIME_TYPE); - let response = self.run_request(request).await?; - - response.body().await + self.fetch_resource(url).await } /// Operation `$everything` on `Patient`, returning a Bundle with all /// resources for an `Patient` record. pub async fn operation_patient_everything(&self, id: &str) -> Result { let url = self.url(&["Patient", id, "$everything"]); - let request = self.0.client.get(url).header(header::ACCEPT, MIME_TYPE); - let response = self.run_request(request).await?; - - response.body().await + self.fetch_resource(url).await } /// Operation `$match` on `Patient`, returning matches for Patient records @@ -257,8 +248,8 @@ impl Client { .0 .client .post(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, MIME_TYPE) + .header(header::ACCEPT, FhirStu3::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, FhirStu3::JSON_MIME_TYPE) .json(¶meters); let response = self.run_request(request).await?; diff --git a/crates/fhir-sdk/src/client/stu3/patch.rs b/crates/fhir-sdk/src/client/stu3/patch.rs index 4ae6c6e0..9abd6d05 100644 --- a/crates/fhir-sdk/src/client/stu3/patch.rs +++ b/crates/fhir-sdk/src/client/stu3/patch.rs @@ -6,7 +6,9 @@ use fhir_model::stu3::resources::{ use reqwest::header::{self, HeaderValue}; use serde::Serialize; -use super::{Client, Error, FhirStu3, MIME_TYPE}; +use crate::client::FhirVersion; + +use super::{Client, Error, FhirStu3}; /// Builder for a PATCH request via FHIRPath for a FHIR resource. #[derive(Debug, Clone)] @@ -234,8 +236,8 @@ impl<'a> PatchViaFhir<'a> { .0 .client .patch(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, HeaderValue::from_static(MIME_TYPE)) + .header(header::ACCEPT, FhirStu3::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, HeaderValue::from_static(FhirStu3::JSON_MIME_TYPE)) .json(¶meters); let response = self.client.run_request(request).await?; @@ -359,7 +361,7 @@ impl<'a> PatchViaJson<'a> { .0 .client .patch(url) - .header(header::ACCEPT, MIME_TYPE) + .header(header::ACCEPT, FhirStu3::JSON_MIME_TYPE) .header(header::CONTENT_TYPE, HeaderValue::from_static("application/json-patch+json")) .json(&self.operations); diff --git a/crates/fhir-sdk/src/client/stu3/search/mod.rs b/crates/fhir-sdk/src/client/stu3/search/mod.rs index 37b3f402..361bea18 100644 --- a/crates/fhir-sdk/src/client/stu3/search/mod.rs +++ b/crates/fhir-sdk/src/client/stu3/search/mod.rs @@ -1,27 +1,77 @@ //! Client search implementation. +use crate::client::search::NextPageCursor; use crate::client::{search::SearchExecutor, Client, Error, FhirStu3, SearchParameters}; -use fhir_model::stu3::resources::{DomainResource, NamedResource, Resource}; -use futures::Stream; -use paging::Paged; +use async_trait::async_trait; +use fhir_model::stu3::resources::{Bundle, DomainResource, NamedResource, Resource}; +use paging::{Page, Unpaged}; #[allow(unused_imports)] pub use params::*; +use reqwest::Url; +use tracing::warn; mod paging; mod params; +#[async_trait] impl SearchExecutor for Client where R: NamedResource + DomainResource + TryFrom + 'static, { - fn execute_search( + #[allow(refining_impl_trait)] + async fn search_paged( self, params: SearchParameters, - ) -> impl Stream> + Send + 'static { + page_size: Option, + ) -> Result<(Page, Option>), Error> { let mut url = self.url(&[R::TYPE.as_str()]); url.query_pairs_mut().extend_pairs(params.into_queries()).finish(); - Paged::new(self, url) + if let Some(page_size) = page_size { + url.query_pairs_mut().append_pair("_count", &page_size.to_string()); + } + + self.fetch_next_page(url).await } + + #[allow(refining_impl_trait)] + async fn fetch_next_page( + self, + url: Url, + ) -> Result<(Page, Option>), Error> { + let searchset: Bundle = self.fetch_resource(url).await?; + + let cursor = match find_next_page_url(&searchset) { + Some(Ok(u)) => Some(NextPageCursor::new(self.clone(), u)), + Some(Err(e)) => { + warn!("Unable to parse next page URL: {e}"); + + None + } + _ => None, + }; + + let page = Page::from_searchset(self, searchset); + + Ok((page, cursor)) + } + + #[allow(refining_impl_trait)] + async fn search_unpaged(self, params: SearchParameters) -> Result, Error> { + let mut url = self.url(&[R::TYPE.as_str()]); + url.query_pairs_mut().extend_pairs(params.into_queries()).finish(); + + let searchset: Bundle = self.fetch_resource(url.clone()).await?; + + Ok(Unpaged::from_searchset(self, searchset)) + } +} + +/// Find the URL of the next page of the results returned in the Bundle. +pub(self) fn find_next_page_url(bundle: &Bundle) -> Option> { + let url_str = + bundle.link.iter().flatten().find(|link| link.relation == "next").map(|link| &link.url)?; + + Some(Url::parse(url_str).map_err(|_| Error::UrlParse(url_str.to_string()))) } diff --git a/crates/fhir-sdk/src/client/stu3/search/paging.rs b/crates/fhir-sdk/src/client/stu3/search/paging.rs index 08f124b9..1652be93 100644 --- a/crates/fhir-sdk/src/client/stu3/search/paging.rs +++ b/crates/fhir-sdk/src/client/stu3/search/paging.rs @@ -1,4 +1,4 @@ -//! FHIR paging functionality, e.g. for search results. +//! FHIR search paging functionality use std::{collections::VecDeque, pin::Pin, task::Poll}; @@ -6,37 +6,37 @@ use fhir_model::stu3::{ codes::{BundleType, SearchEntryMode}, resources::{Bundle, BundleEntry, DomainResource, NamedResource, Resource, TypedResource}, }; -use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt}; +use futures::{future::BoxFuture, ready, Future, FutureExt, Stream, StreamExt}; use reqwest::Url; -use crate::client::stu3::references::populate_reference_targets_internal; +use crate::client::{stu3::references::populate_reference_targets_internal, search::Paged}; -use super::{Client, Error, FhirStu3}; +use super::{find_next_page_url, Client, Error, FhirStu3}; -/// Results of a query that can be paged or given via URL only. The resources +/// Unwraps search pages into a single stream of resources. The resources /// can be consumed via the `Stream`/`StreamExt` traits. -pub struct Paged { +pub struct Unpaged { /// The FHIR client to make further requests for the next pages. client: Client, - /// The URL of the next page. This is opaque and can be anything the server - /// wants. The client ensures it accesses the same server only. - next_url: Option, - /// The current set of search matches. - matches: Option>, + /// The current page of matches. + page: Page, /// Current future to retrieve the next page. - future_next_page: Option>>, + future_next_page: Option, Error>>>, } -impl Paged { - /// Start up a new Paged stream. - pub(crate) fn new(client: Client, url: Url) -> Self { - let future_next_page = Some(fetch_resource(client.clone(), url).boxed()); +impl Unpaged +where + R: NamedResource + DomainResource + TryFrom + 'static, +{ + /// Start up a new Unpaged stream. + pub fn from_searchset(client: Client, searchset: Bundle) -> Self { + let page = Page::from_searchset(client.clone(), searchset); - Self { client, next_url: None, matches: None, future_next_page } + Self { client, page, future_next_page: None } } } -impl Stream for Paged +impl Stream for Unpaged where R: NamedResource + DomainResource + TryFrom + 'static, { @@ -46,95 +46,70 @@ where mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let span = tracing::trace_span!("Paged::poll_next"); + let span = tracing::trace_span!("Unpaged::poll_next"); let _span_guard = span.enter(); // If there are still matches left, get the next one - if let Some(matches) = self.matches.as_mut() { - tracing::trace!("Paged::matches is set, polling for next match"); - if let Poll::Ready(res) = matches.poll_next_unpin(cx) { + if !self.page.is_empty() { + tracing::trace!("Unpaged::page is not empty, polling for next match"); + if let Poll::Ready(res) = self.page.poll_next_unpin(cx) { if let Some(r) = res { - tracing::debug!("Next match in Paged::matches available"); + tracing::debug!("Next match in Unpaged::matches available"); return Poll::Ready(Some(r)); } else { - tracing::debug!("Paged::matches is empty, waiting for next page"); - self.matches = None; + tracing::debug!("Unpaged::matches is empty, waiting for next page"); } } - // If there are no more matches and there is a next page future, check if it's ready - } else if let Some(future_next_page) = self.future_next_page.as_mut() { - tracing::trace!("Paged::future_next_page is set, polling for next page"); + } + + if let Some(future_next_page) = self.future_next_page.as_mut() { + tracing::trace!("Unpaged::future_next_page is set, polling for next page"); if let Poll::Ready(next_page) = future_next_page.as_mut().poll(cx) { self.future_next_page = None; tracing::debug!("Next page fetched and ready"); - // Get the Bundle or error out. - let bundle = match next_page { - Ok(bundle) => bundle, - Err(err) => return Poll::Ready(Some(Err(err))), - }; - - // Parse the next page's URL or error out. - if let Some(next_url_string) = find_next_page_url(&bundle) { - let Ok(next_url) = Url::parse(next_url_string) else { - tracing::error!("Could not parse next page URL"); - return Poll::Ready(Some(Err(Error::UrlParse(next_url_string.clone())))); - }; - self.next_url = Some(next_url); - } + self.page = match next_page { + Ok(page) => page, + Err(e) => { + tracing::error!("Fetching next page returned error: {}", e); - // Save the `BundleEntry`s. - self.matches = Some(SearchMatches::from_searchset(self.client.clone(), bundle)); + return Poll::Ready(Some(Err(e))); + } + }; } // Start retrieving the next page if we have a next URL and there is no next page being fetched. - } else if let Some(next_url) = self.next_url.as_ref() { + } else if let Some(next_page) = self.page.next_page() { tracing::debug!("Current page has next URL, starting to fetch next page"); - self.future_next_page = - Some(fetch_resource(self.client.clone(), next_url.clone()).boxed()); - self.next_url = None; + self.future_next_page = Some(next_page.boxed()); } // Else check if all resources were consumed or if we are waiting for new // resources to arrive. - if self.matches.is_some() { - tracing::trace!("Paged results waiting for remaining resources in current page"); + if !self.page.is_empty() { + tracing::trace!("Unpaged results waiting for remaining resources in current page"); cx.waker().wake_by_ref(); Poll::Pending } else if self.future_next_page.is_some() { - tracing::trace!("Paged results waiting for response to next URL fetch"); - cx.waker().wake_by_ref(); - Poll::Pending - } else if self.next_url.is_some() { - tracing::trace!("Paged results waiting to fetch next URL"); + tracing::trace!("Unpaged results waiting for response to next URL fetch"); cx.waker().wake_by_ref(); Poll::Pending } else { - tracing::debug!("Paged results exhausted"); + tracing::debug!("Unpaged results exhausted"); Poll::Ready(None) } } fn size_hint(&self) -> (usize, Option) { - if let Some(matches) = &self.matches { - let (page_min, page_max) = matches.size_hint(); + let (page_min, page_max) = self.page.size_hint(); - if self.next_url.is_some() { - return (page_min, None); - } else { - return (page_min, page_max); - } + if self.page.next_page().is_some() { + (page_min, None) + } else { + (page_min, page_max) } - - (0, None) } } -/// Find the URL of the next page of the results returned in the Bundle. -fn find_next_page_url(bundle: &Bundle) -> Option<&String> { - bundle.link.iter().flatten().find(|link| link.relation == "next").map(|link| &link.url) -} - -/// Query a resource from a given URL. async fn fetch_resource>( client: Client, url: Url, @@ -151,12 +126,11 @@ async fn fetch_resource>( resource.ok_or_else(|| Error::ResourceNotFound(url_str)) } -impl std::fmt::Debug for Paged { +impl std::fmt::Debug for Unpaged { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Paged") + f.debug_struct("Unpaged") .field("client", &self.client) - .field("next_url", &self.next_url) - .field("matches", &self.matches.as_ref().map(|_| "_")) + .field("page", &self.page) .field("future_next_page", &self.future_next_page.as_ref().map(|_| "_")) .finish() } @@ -165,18 +139,18 @@ impl std::fmt::Debug for Paged { /// Stream that yields each match entry in a searchset Bundle. Tries to fetch entries from the fullUrl property /// if the resource field is empty. Fills resource reference target fields with the referred to resources, /// if available in the Bundle. -struct SearchMatches { +pub struct Page { client: Client, bundle: Bundle, matches: VecDeque, future_resource: Option>>, } -impl SearchMatches +impl Page where - R: NamedResource + DomainResource + TryFrom, + R: NamedResource + DomainResource + TryFrom + 'static, { - pub fn from_searchset(client: Client, bundle: Bundle) -> SearchMatches { + pub fn from_searchset(client: Client, bundle: Bundle) -> Page { assert!( bundle.r#type == BundleType::Searchset, "unable to get search matches from non-searchset Bundles" @@ -199,9 +173,31 @@ where Self { client, bundle, matches, future_resource: None } } + + fn is_empty(&self) -> bool { + self.matches.is_empty() + } } -impl Stream for SearchMatches +impl Paged for Page +where + R: NamedResource + DomainResource + TryFrom + 'static, +{ + fn next_page(&self) -> Option> + 'static> { + let next_url = find_next_page_url(&self.bundle)?; + let client = self.client.clone(); + + let fut = async move { + let searchset: Bundle = client.fetch_resource(next_url?).await?; + + Ok(Page::from_searchset(client, searchset)) + }; + + Some(fut.boxed()) + } +} + +impl Stream for Page where R: NamedResource + DomainResource + TryFrom + 'static, { @@ -278,3 +274,14 @@ where (size, Some(size)) } } + +impl std::fmt::Debug for Page { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Page") + .field("client", &self.client) + .field("bundle", &self.bundle) + .field("matches", &self.matches) + .field("future_resource", &self.future_resource.as_ref().map(|_| "_")) + .finish() + } +} diff --git a/crates/fhir-sdk/src/client/stu3/search/params.rs b/crates/fhir-sdk/src/client/stu3/search/params.rs index 31a1224e..be663910 100644 --- a/crates/fhir-sdk/src/client/stu3/search/params.rs +++ b/crates/fhir-sdk/src/client/stu3/search/params.rs @@ -356,6 +356,22 @@ impl<'a> SearchParameter for UriParam<'a> { } } +/// Search on any item whether it is a missing field using the `missing` +/// modifier. +#[derive(Debug, Clone, Copy)] +pub struct MissingParam<'a> { + /// Name of the field. + pub name: &'a str, + /// Whether to search for the absent field (or the present). + pub missing: bool, +} + +impl<'a> SearchParameter for MissingParam<'a> { + fn into_query(self) -> (String, String) { + (format!("{}:missing", self.name), self.missing.to_string()) + } +} + /// Include referred to resources in the search response #[derive(Debug, Clone, Copy)] pub struct IncludeParam<'a> { @@ -393,22 +409,6 @@ impl<'a> SearchParameter for IncludeParam<'a> { } } -/// Search on any item whether it is a missing field using the `missing` -/// modifier. -#[derive(Debug, Clone, Copy)] -pub struct MissingParam<'a> { - /// Name of the field. - pub name: &'a str, - /// Whether to search for the absent field (or the present). - pub missing: bool, -} - -impl<'a> SearchParameter for MissingParam<'a> { - fn into_query(self) -> (String, String) { - (format!("{}:missing", self.name), self.missing.to_string()) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/fhir-sdk/src/client/stu3/transaction.rs b/crates/fhir-sdk/src/client/stu3/transaction.rs index ae82fdef..ecbb8358 100644 --- a/crates/fhir-sdk/src/client/stu3/transaction.rs +++ b/crates/fhir-sdk/src/client/stu3/transaction.rs @@ -8,7 +8,9 @@ use fhir_model::stu3::{ use reqwest::header::{self, HeaderValue}; use uuid::Uuid; -use super::{Client, Error, FhirStu3, MIME_TYPE}; +use crate::client::FhirVersion; + +use super::{Client, Error, FhirStu3}; /// A batch/transaction request builder. #[derive(Debug, Clone)] @@ -145,8 +147,8 @@ impl BatchTransaction { .0 .client .post(url) - .header(header::ACCEPT, MIME_TYPE) - .header(header::CONTENT_TYPE, HeaderValue::from_static(MIME_TYPE)) + .header(header::ACCEPT, FhirStu3::JSON_MIME_TYPE) + .header(header::CONTENT_TYPE, HeaderValue::from_static(FhirStu3::JSON_MIME_TYPE)) .json(&bundle); let response = self.client.run_request(request).await?; diff --git a/crates/fhir-sdk/tests/client-r4b.rs b/crates/fhir-sdk/tests/client-r4b.rs index 25bd277c..612d6dec 100644 --- a/crates/fhir-sdk/tests/client-r4b.rs +++ b/crates/fhir-sdk/tests/client-r4b.rs @@ -260,6 +260,7 @@ async fn search_inner() -> Result<()> { }) .and(TokenParam::Standard { name: "active", system: None, code: Some("false"), not: false }) .send() + .await? .try_collect() .await?; @@ -340,11 +341,11 @@ async fn transaction_inner() -> Result<()> { } #[test] -fn paging() -> Result<()> { - common::RUNTIME.block_on(paging_inner()) +fn unpaged() -> Result<()> { + common::RUNTIME.block_on(unpaged_inner()) } -async fn paging_inner() -> Result<()> { +async fn unpaged_inner() -> Result<()> { let client = client().await?; let date = "5123-05-10"; @@ -367,6 +368,7 @@ async fn paging_inner() -> Result<()> { .search() .with(DateParam { name: "birthdate", comparator: Some(SearchComparator::Eq), value: date }) .send() + .await? .try_collect() .await?; assert_eq!(patients.len(), n); @@ -380,6 +382,58 @@ async fn paging_inner() -> Result<()> { Ok(()) } +#[test] +fn paged() -> Result<()> { + common::RUNTIME.block_on(paged_inner()) +} + +async fn paged_inner() -> Result<()> { + let client = client().await?; + + let date = "5123-06-10"; + let n = 30; + let page_size = 20; + + println!("Preparing.."); + let patient = Patient::builder() + .active(false) + .birth_date(Date::from_str(date).expect("parse Date")) + .build() + .unwrap(); + let mut batch = client.batch(); + for _ in 0..n { + batch.create(patient.clone()); + } + + let patients = batch.send().await?; + ensure_batch_succeeded(patients.clone()); + + println!("Starting searches.."); + let (page1, next) = client + .search::() + .with(DateParam { name: "birthdate", comparator: Some(SearchComparator::Eq), value: date }) + .paged(Some(page_size)) + .send() + .await?; + + let patients1: Vec = page1.try_collect().await?; + assert_eq!(patients1.len() as u32, page_size); + + let (page2, next) = next.expect("second page not found").next_page().await?; + let patients2: Vec = page2.try_collect().await?; + + assert_eq!(patients2.len() as u32, n - page_size); + assert!(next.is_none()); + + println!("Cleaning up.."); + let mut batch = client.batch(); + for patient in patients1.iter().chain(patients2.iter()) { + batch.delete(ResourceType::Patient, patient.id.as_ref().expect("Patient.id")); + } + ensure_batch_succeeded(batch.send().await?); + Ok(()) +} + #[test] fn paging_with_includes() -> Result<()> { common::RUNTIME.block_on(paging_with_includes_inner()) @@ -429,6 +483,7 @@ async fn paging_with_includes_inner() -> Result<()> { }) .and_raw("_include", "Observation:subject") .send() + .await? .try_collect() .await?; @@ -442,6 +497,7 @@ async fn paging_with_includes_inner() -> Result<()> { value: birthdate, }) .send() + .await? .try_collect() .await?; diff --git a/crates/fhir-sdk/tests/client-stu3.rs b/crates/fhir-sdk/tests/client-stu3.rs index 87a551e8..cbd2e002 100644 --- a/crates/fhir-sdk/tests/client-stu3.rs +++ b/crates/fhir-sdk/tests/client-stu3.rs @@ -260,6 +260,7 @@ async fn search_inner() -> Result<()> { }) .and(TokenParam::Standard { name: "active", system: None, code: Some("false"), not: false }) .send() + .await? .try_collect() .await?; @@ -340,11 +341,11 @@ async fn transaction_inner() -> Result<()> { } #[test] -fn paging() -> Result<()> { - common::RUNTIME.block_on(paging_inner()) +fn unpaged() -> Result<()> { + common::RUNTIME.block_on(unpaged_inner()) } -async fn paging_inner() -> Result<()> { +async fn unpaged_inner() -> Result<()> { let client = client().await?; let date = "5123-05-10"; @@ -367,6 +368,7 @@ async fn paging_inner() -> Result<()> { .search() .with(DateParam { name: "birthdate", comparator: Some(SearchComparator::Eq), value: date }) .send() + .await? .try_collect() .await?; assert_eq!(patients.len(), n); @@ -380,6 +382,58 @@ async fn paging_inner() -> Result<()> { Ok(()) } +#[test] +fn paged() -> Result<()> { + common::RUNTIME.block_on(paged_inner()) +} + +async fn paged_inner() -> Result<()> { + let client = client().await?; + + let date = "5123-06-10"; + let n = 30; + let page_size = 20; + + println!("Preparing.."); + let patient = Patient::builder() + .active(false) + .birth_date(Date::from_str(date).expect("parse Date")) + .build() + .unwrap(); + let mut batch = client.batch(); + for _ in 0..n { + batch.create(patient.clone()); + } + + let patients = batch.send().await?; + ensure_batch_succeeded(patients.clone()); + + println!("Starting searches.."); + let (page1, next) = client + .search::() + .with(DateParam { name: "birthdate", comparator: Some(SearchComparator::Eq), value: date }) + .paged(Some(page_size)) + .send() + .await?; + + let patients1: Vec = page1.try_collect().await?; + assert_eq!(patients1.len() as u32, page_size); + + let (page2, next) = next.expect("second page not found").next_page().await?; + let patients2: Vec = page2.try_collect().await?; + + assert_eq!(patients2.len() as u32, n - page_size); + assert!(next.is_none()); + + println!("Cleaning up.."); + let mut batch = client.batch(); + for patient in patients1.iter().chain(patients2.iter()) { + batch.delete(ResourceType::Patient, patient.id.as_ref().expect("Patient.id")); + } + ensure_batch_succeeded(batch.send().await?); + Ok(()) +} + #[test] fn paging_with_includes() -> Result<()> { common::RUNTIME.block_on(paging_with_includes_inner()) @@ -429,6 +483,7 @@ async fn paging_with_includes_inner() -> Result<()> { }) .and_raw("_include", "Observation:subject") .send() + .await? .try_collect() .await?; @@ -442,6 +497,7 @@ async fn paging_with_includes_inner() -> Result<()> { value: birthdate, }) .send() + .await? .try_collect() .await?; @@ -512,7 +568,7 @@ async fn operation_encounter_everything_inner() -> Result<()> { } #[test] -#[ignore = "Server does not support this"] +#[ignore = "HAPI server does not support this"] fn operation_patient_match() -> Result<()> { common::RUNTIME.block_on(operation_patient_match_inner()) }