From 42b9a96d808ad796e10d138e184d0b78b74314e5 Mon Sep 17 00:00:00 2001 From: link2xt Date: Tue, 10 Dec 2024 02:29:34 +0000 Subject: [PATCH] feat: cache HTTP GET requests --- src/net/http.rs | 198 ++++++++++++++++++++++++++++++++++++++++-- src/sql.rs | 7 ++ src/sql/migrations.rs | 15 ++++ 3 files changed, 215 insertions(+), 5 deletions(-) diff --git a/src/net/http.rs b/src/net/http.rs index 94e85f68c1..a768bfae1b 100644 --- a/src/net/http.rs +++ b/src/net/http.rs @@ -6,14 +6,18 @@ use http_body_util::BodyExt; use hyper_util::rt::TokioIo; use mime::Mime; use serde::Serialize; +use tokio::fs; +use crate::blob::BlobObject; use crate::context::Context; +use crate::log::LogExt; use crate::net::proxy::ProxyConfig; use crate::net::session::SessionStream; use crate::net::tls::wrap_rustls; +use crate::tools::{create_id, time}; /// HTTP(S) GET response. -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Response { /// Response body. pub blob: Vec, @@ -90,9 +94,117 @@ where Ok(sender) } +/// Converts the URL to expiration timestamp. +fn http_url_cache_expires(url: &str) -> i64 { + let now = time(); + if url.ends_with(".xdc") { + // WebXDCs expire in 5 weeks. + now + 3600 * 24 * 35 + } else { + // Cache everything else for 1 hour. + now + 3600 + } +} + +/// Places the binary into HTTP cache. +async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> { + let blob = BlobObject::create( + context, + &format!("http_cache_{}", create_id()), + response.blob.as_slice(), + ) + .await?; + + context + .sql + .insert( + "INSERT OR IGNORE INTO http_cache (url, expires, blob, mimetype, encoding) + VALUES (?, ?, ?, ?, ?)", + ( + url, + http_url_cache_expires(url), + blob.as_name(), + response.mimetype.as_deref().unwrap_or_default(), + response.encoding.as_deref().unwrap_or_default(), + ), + ) + .await?; + + Ok(()) +} + +/// Retrieves the binary from HTTP cache. +async fn http_cache_get(context: &Context, url: &str) -> Result> { + let Some((blob_name, mimetype, encoding)) = context + .sql + .query_row_optional( + "SELECT blob, mimetype, encoding + FROM http_cache WHERE url=? AND expires > ?", + (url, time()), + |row| { + let blob_name: String = row.get(0)?; + let mimetype: Option = Some(row.get(1)?).filter(|s: &String| !s.is_empty()); + let encoding: Option = Some(row.get(2)?).filter(|s: &String| !s.is_empty()); + Ok((blob_name, mimetype, encoding)) + }, + ) + .await? + else { + return Ok(None); + }; + + + let blob_object = BlobObject::from_name(context, blob_name)?; + let blob_abs_path = blob_object.to_abs_path(); + let blob = fs::read(blob_abs_path).await?; + + let response = Response { + blob, + mimetype, + encoding, + }; + + // Update expiration timestamp. + context + .sql + .execute( + "UPDATE http_cache SET expires=? WHERE url=?", + (http_url_cache_expires(url), url), + ) + .await?; + + Ok(Some(response)) +} + +/// Removes expired cache entries. +pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> { + // Remove cache entries that are already expired + // or entries that will not expire in a year + // to make sure we don't have invalid timestamps that are way forward in the future. + context + .sql + .execute( + "DELETE FROM http_cache + WHERE ?1 > expires OR expires > ?1 + 31536000", + (time(),), + ) + .await?; + Ok(()) +} + /// Retrieves the binary contents of URL using HTTP GET request. -pub async fn read_url_blob(context: &Context, url: &str) -> Result { - let mut url = url.to_string(); +pub async fn read_url_blob(context: &Context, original_url: &str) -> Result { + if let Some(response) = http_cache_get(context, original_url) + .await + .log_err(context) + .unwrap_or_default() + { + info!(context, "Returning {original_url:?} from cache."); + return Ok(response); + } + + info!(context, "Not found {original_url:?} in cache."); + let mut url = original_url.to_string(); // Follow up to 10 http-redirects for _i in 0..10 { @@ -139,11 +251,14 @@ pub async fn read_url_blob(context: &Context, url: &str) -> Result { }); let body = response.collect().await?.to_bytes(); let blob: Vec = body.to_vec(); - return Ok(Response { + let response = Response { blob, mimetype, encoding, - }); + }; + info!(context, "Inserting {original_url:?} into cache."); + http_cache_put(context, &url, &response).await?; + return Ok(response); } Err(anyhow!("Followed 10 redirections")) @@ -241,3 +356,76 @@ pub(crate) async fn post_form( let bytes = response.collect().await?.to_bytes(); Ok(bytes) } + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + use crate::test_utils::TestContext; + use crate::tools::SystemTime; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_http_cache() -> Result<()> { + let t = &TestContext::new().await; + + assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None); + + let html_response = Response { + blob: b" ...".to_vec(), + mimetype: Some("text/html".to_string()), + encoding: None, + }; + + let xdc_response = Response { + blob: b"PK...".to_vec(), + mimetype: Some("application/octet-stream".to_string()), + encoding: None, + }; + let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc"; + let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc"; + + http_cache_put(t, "https://webxdc.org/", &html_response).await?; + + assert_eq!(http_cache_get(t, xdc_editor_url).await?, None); + assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None); + assert_eq!( + http_cache_get(t, "https://webxdc.org/").await?, + Some(html_response.clone()) + ); + + http_cache_put(t, xdc_editor_url, &xdc_response).await?; + assert_eq!( + http_cache_get(t, xdc_editor_url).await?, + Some(xdc_response.clone()) + ); + + http_cache_cleanup(t).await?; + assert_eq!( + http_cache_get(t, "https://webxdc.org/").await?, + Some(html_response.clone()) + ); + + // HTML expires after 1 hour, but .xdc does not. + SystemTime::shift(Duration::from_secs(3600 + 100)); + assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None); + assert_eq!( + http_cache_get(t, xdc_editor_url).await?, + Some(xdc_response.clone()) + ); + + // 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour. + // But editor is still there because we did not request it for just 35 days. + SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100)); + assert_eq!( + http_cache_get(t, xdc_editor_url).await?, + Some(xdc_response.clone()) + ); + assert_eq!( + http_cache_get(t, xdc_pixel_url).await?, + None + ); + + Ok(()) + } +} diff --git a/src/sql.rs b/src/sql.rs index 47604778d3..cd83906880 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -19,6 +19,7 @@ use crate::location::delete_orphaned_poi_locations; use crate::log::LogExt; use crate::message::{Message, MsgId}; use crate::net::dns::prune_dns_cache; +use crate::net::http::http_cache_cleanup; use crate::net::prune_connection_history; use crate::param::{Param, Params}; use crate::peerstate::Peerstate; @@ -788,6 +789,12 @@ pub async fn housekeeping(context: &Context) -> Result<()> { .log_err(context) .ok(); + http_cache_cleanup(context) + .await + .context("Failed to cleanup HTTP cache") + .log_err(context) + .ok(); + info!(context, "Housekeeping done."); Ok(()) } diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 15d89c6f1a..8be0504ea1 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -1088,6 +1088,21 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid); .await?; } + inc_and_check(&mut migration_version, 125)?; + if dbversion < migration_version { + sql.execute_migration( + "CREATE TABLE http_cache ( + url TEXT PRIMARY KEY, + expires INTEGER NOT NULL, -- When the cache entry is considered expired, timestamp in seconds. + blob TEXT NOT NULL, + mimetype TEXT NOT NULL DEFAULT '', -- MIME type extracted from Content-Type header. + encoding TEXT NOT NULL DEFAULT '' -- Encoding from Content-Type header. + ) STRICT", + migration_version, + ) + .await?; + } + let new_version = sql .get_raw_config_int(VERSION_CFG) .await?