Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cache HTTP GET requests #6327

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 218 additions & 5 deletions src/net/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ use http_body_util::BodyExt;
use hyper_util::rt::TokioIo;
use mime::Mime;
use serde::Serialize;
use tokio::fs;

use crate::blob::BlobObject;
use crate::context::Context;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionStream;
use crate::net::tls::wrap_rustls;
use crate::tools::{create_id, time};

/// HTTP(S) GET response.
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Response {
/// Response body.
pub blob: Vec<u8>,
Expand Down Expand Up @@ -90,9 +93,127 @@ where
Ok(sender)
}

/// Converts the URL to expiration timestamp.
fn http_url_cache_expires(url: &str, mimetype: Option<&str>) -> i64 {
let now = time();
if url.ends_with(".xdc") {
// WebXDCs expire in 5 weeks.
now + 3600 * 24 * 35
link2xt marked this conversation as resolved.
Show resolved Hide resolved
} else if mimetype.is_some_and(|s| s.starts_with("image/")) {
// Cache images for 1 day.
now + 3600 * 24
} else {
// Cache everything else for 1 hour.
link2xt marked this conversation as resolved.
Show resolved Hide resolved
now + 3600
}
}

/// Places the binary into HTTP cache.
async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
let blob = BlobObject::create(
context,
&format!("http_cache_{}", create_id()),
response.blob.as_slice(),
)
.await?;

context
.sql
.insert(
"INSERT OR IGNORE INTO http_cache (url, expires, blobname, mimetype, encoding)
VALUES (?, ?, ?, ?, ?)",
(
url,
http_url_cache_expires(url, response.mimetype.as_deref()),
blob.as_name(),
response.mimetype.as_deref().unwrap_or_default(),
response.encoding.as_deref().unwrap_or_default(),
),
)
.await?;

Ok(())
}

/// Retrieves the binary from HTTP cache.
async fn http_cache_get(context: &Context, url: &str) -> Result<Option<Response>> {
let Some((blob_name, mimetype, encoding)) = context
.sql
.query_row_optional(
"SELECT blobname, mimetype, encoding
FROM http_cache WHERE url=? AND expires > ?",
(url, time()),
|row| {
let blob_name: String = row.get(0)?;
let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
Ok((blob_name, mimetype, encoding))
},
)
.await?
else {
return Ok(None);
};

let blob_object = BlobObject::from_name(context, blob_name)?;
let blob_abs_path = blob_object.to_abs_path();
let blob = match fs::read(blob_abs_path)
.await
.with_context(|| format!("Failed to read blob for {url:?} cache entry."))
{
Ok(blob) => blob,
Err(err) => {
// This should not happen, but user may go into the blobdir and remove files,
// antivirus may delete the file or there may be a bug in housekeeping.
warn!(context, "{err:?}.");
return Ok(None);
}
};

let expires = http_url_cache_expires(url, mimetype.as_deref());
let response = Response {
blob,
mimetype,
encoding,
};

// Update expiration timestamp.
context
.sql
.execute(
"UPDATE http_cache SET expires=? WHERE url=?",
(expires, url),
)
.await?;

Ok(Some(response))
}

/// Removes expired cache entries.
pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
// Remove cache entries that are already expired
// or entries that will not expire in a year
// to make sure we don't have invalid timestamps that are way forward in the future.
context
.sql
.execute(
"DELETE FROM http_cache
WHERE ?1 > expires OR expires > ?1 + 31536000",
(time(),),
)
.await?;
Ok(())
}

/// Retrieves the binary contents of URL using HTTP GET request.
pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
let mut url = url.to_string();
pub async fn read_url_blob(context: &Context, original_url: &str) -> Result<Response> {
if let Some(response) = http_cache_get(context, original_url).await? {
info!(context, "Returning {original_url:?} from cache.");
return Ok(response);
}

info!(context, "Not found {original_url:?} in cache.");
let mut url = original_url.to_string();

// Follow up to 10 http-redirects
for _i in 0..10 {
Expand Down Expand Up @@ -139,11 +260,14 @@ pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
});
let body = response.collect().await?.to_bytes();
let blob: Vec<u8> = body.to_vec();
return Ok(Response {
let response = Response {
blob,
mimetype,
encoding,
});
};
info!(context, "Inserting {original_url:?} into cache.");
http_cache_put(context, &url, &response).await?;
return Ok(response);
}

Err(anyhow!("Followed 10 redirections"))
Expand Down Expand Up @@ -241,3 +365,92 @@ pub(crate) async fn post_form<T: Serialize + ?Sized>(
let bytes = response.collect().await?.to_bytes();
Ok(bytes)
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

use crate::sql::housekeeping;
use crate::test_utils::TestContext;
use crate::tools::SystemTime;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_cache() -> Result<()> {
let t = &TestContext::new().await;

assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);

let html_response = Response {
blob: b"<!DOCTYPE html> ...".to_vec(),
mimetype: Some("text/html".to_string()),
encoding: None,
};

let xdc_response = Response {
blob: b"PK...".to_vec(),
mimetype: Some("application/octet-stream".to_string()),
encoding: None,
};
let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";

http_cache_put(t, "https://webxdc.org/", &html_response).await?;

assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
assert_eq!(
http_cache_get(t, "https://webxdc.org/").await?,
Some(html_response.clone())
);

http_cache_put(t, xdc_editor_url, &xdc_response).await?;
assert_eq!(
http_cache_get(t, xdc_editor_url).await?,
Some(xdc_response.clone())
);

assert_eq!(
http_cache_get(t, "https://webxdc.org/").await?,
Some(html_response.clone())
);

// HTML expires after 1 hour, but .xdc does not.
SystemTime::shift(Duration::from_secs(3600 + 100));
assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
assert_eq!(
http_cache_get(t, xdc_editor_url).await?,
Some(xdc_response.clone())
);

// 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour.
// But editor is still there because we did not request it for just 35 days.
SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));

// Run housekeeping to test that it does not delete the blob too early.
housekeeping(t).await?;

assert_eq!(
http_cache_get(t, xdc_editor_url).await?,
Some(xdc_response.clone())
);
assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);

// Test that if the file is accidentally removed from the blobdir,
// there is no error when trying to load the cache entry.
for entry in std::fs::read_dir(t.get_blobdir())? {
let entry = entry.unwrap();
let path = entry.path();
std::fs::remove_file(path).expect("Failed to remove blob");
}

assert_eq!(
http_cache_get(t, xdc_editor_url)
.await
.context("Failed to get no cache response")?,
None
);

Ok(())
iequidoo marked this conversation as resolved.
Show resolved Hide resolved
}
}
23 changes: 23 additions & 0 deletions src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::location::delete_orphaned_poi_locations;
use crate::log::LogExt;
use crate::message::{Message, MsgId};
use crate::net::dns::prune_dns_cache;
use crate::net::http::http_cache_cleanup;
use crate::net::prune_connection_history;
use crate::param::{Param, Params};
use crate::peerstate::Peerstate;
Expand Down Expand Up @@ -720,6 +721,12 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
warn!(context, "Can't set config: {e:#}.");
}

http_cache_cleanup(context)
.await
.context("Failed to cleanup HTTP cache")
.log_err(context)
.ok();

if let Err(err) = remove_unused_files(context).await {
warn!(
context,
Expand Down Expand Up @@ -846,6 +853,22 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> {
.await
.context("housekeeping: failed to SELECT value FROM config")?;

context
.sql
.query_map(
"SELECT blobname FROM http_cache",
(),
|row| row.get::<_, String>(0),
|rows| {
for row in rows {
maybe_add_file(&mut files_in_use, &row?);
}
Ok(())
},
)
.await
.context("Failed to SELECT blobname FROM http_cache")?;

info!(context, "{} files in use.", files_in_use.len());
/* go through directories and delete unused files */
let blobdir = context.get_blobdir();
Expand Down
15 changes: 15 additions & 0 deletions src/sql/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,21 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
.await?;
}

inc_and_check(&mut migration_version, 125)?;
if dbversion < migration_version {
sql.execute_migration(
"CREATE TABLE http_cache (
url TEXT PRIMARY KEY,
expires INTEGER NOT NULL, -- When the cache entry is considered expired, timestamp in seconds.
blobname TEXT NOT NULL,
mimetype TEXT NOT NULL DEFAULT '', -- MIME type extracted from Content-Type header.
encoding TEXT NOT NULL DEFAULT '' -- Encoding from Content-Type header.
) STRICT",
migration_version,
)
.await?;
}

let new_version = sql
.get_raw_config_int(VERSION_CFG)
.await?
Expand Down
Loading