diff --git a/Cargo.lock b/Cargo.lock index e7c72eebc..71bd6c8f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,17 +398,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-recursion" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.39", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -2874,6 +2863,7 @@ dependencies = [ "kitsune-jobs", "kitsune-language", "kitsune-mastodon", + "kitsune-messaging", "kitsune-observability", "kitsune-oidc", "kitsune-search", @@ -2920,7 +2910,6 @@ dependencies = [ name = "kitsune-activitypub" version = "0.0.1-pre.4" dependencies = [ - "async-recursion", "async-trait", "autometrics", "base64-simd", @@ -3085,7 +3074,6 @@ dependencies = [ "askama_axum", "diesel", "diesel-async", - "iso8601-timestamp", "kitsune-db", "kitsune-url", "lettre", diff --git a/crates/kitsune-activitypub/Cargo.toml b/crates/kitsune-activitypub/Cargo.toml index 0a7aa2246..ae4ae3f04 100644 --- a/crates/kitsune-activitypub/Cargo.toml +++ b/crates/kitsune-activitypub/Cargo.toml @@ -5,7 +5,6 @@ edition.workspace = true version.workspace = true [dependencies] -async-recursion = "1.0.5" async-trait = "0.1.74" autometrics = { version = "0.6.0", default-features = false } base64-simd = { version = "0.8.0", features = ["unstable"] } diff --git a/crates/kitsune-activitypub/src/error.rs b/crates/kitsune-activitypub/src/error.rs index b86d9ee8a..375b14049 100644 --- a/crates/kitsune-activitypub/src/error.rs +++ b/crates/kitsune-activitypub/src/error.rs @@ -30,6 +30,15 @@ pub enum Error { #[error(transparent)] FederationFilter(#[from] kitsune_federation_filter::error::Error), + #[error(transparent)] + FetchAccount(BoxError), + + #[error(transparent)] + FetchEmoji(BoxError), + + #[error(transparent)] + FetchPost(BoxError), + #[error(transparent)] Http(#[from] http::Error), diff --git a/crates/kitsune-activitypub/src/fetcher/mod.rs b/crates/kitsune-activitypub/src/fetcher/mod.rs index 279b9ba55..548f709e6 100644 --- a/crates/kitsune-activitypub/src/fetcher/mod.rs +++ b/crates/kitsune-activitypub/src/fetcher/mod.rs @@ -6,7 +6,10 @@ use kitsune_cache::ArcCache; use kitsune_core::{ consts::USER_AGENT, error::BoxError, - traits::{fetcher::AccountFetchOptions, Fetcher as FetcherTrait, Resolver}, + traits::{ + fetcher::{AccountFetchOptions, PostFetchOptions}, + Fetcher as FetcherTrait, Resolver, + }, }; use kitsune_db::{ model::{account::Account, custom_emoji::CustomEmoji, post::Post}, @@ -130,7 +133,7 @@ impl FetcherTrait for Fetcher { Ok(self.fetch_emoji(url).await?) } - async fn fetch_post(&self, url: &str) -> Result, BoxError> { - Ok(self.fetch_object(url).await?) + async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result, BoxError> { + Ok(self.fetch_object(opts.url, opts.call_depth).await?) } } diff --git a/crates/kitsune-activitypub/src/fetcher/object.rs b/crates/kitsune-activitypub/src/fetcher/object.rs index ea4eb7bf1..2976321c9 100644 --- a/crates/kitsune-activitypub/src/fetcher/object.rs +++ b/crates/kitsune-activitypub/src/fetcher/object.rs @@ -1,6 +1,5 @@ use super::Fetcher; use crate::{error::Result, process_new_object, ProcessNewObject}; -use async_recursion::async_recursion; use autometrics::autometrics; use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper}; use diesel_async::RunQueryDsl; @@ -13,12 +12,9 @@ use scoped_futures::ScopedFutureExt; pub const MAX_FETCH_DEPTH: u32 = 30; impl Fetcher { - #[async_recursion] - pub(crate) async fn fetch_object_inner( - &self, - url: &str, - call_depth: u32, - ) -> Result> { + #[instrument(skip(self))] + #[autometrics(track_concurrency)] + pub(crate) async fn fetch_object(&self, url: &str, call_depth: u32) -> Result> { if call_depth > MAX_FETCH_DEPTH { return Ok(None); } @@ -65,10 +61,4 @@ impl Fetcher { Ok(Some(post)) } - - #[instrument(skip(self))] - #[autometrics(track_concurrency)] - pub(crate) async fn fetch_object(&self, url: &str) -> Result> { - self.fetch_object_inner(url, 0).await - } } diff --git a/crates/kitsune-activitypub/src/lib.rs b/crates/kitsune-activitypub/src/lib.rs index b19fc6f73..351ab0e3d 100644 --- a/crates/kitsune-activitypub/src/lib.rs +++ b/crates/kitsune-activitypub/src/lib.rs @@ -11,6 +11,7 @@ use diesel_async::{AsyncPgConnection, RunQueryDsl}; use futures_util::{stream, FutureExt, StreamExt, TryStreamExt}; use http::Uri; use iso8601_timestamp::Timestamp; +use kitsune_core::traits::{fetcher::PostFetchOptions, Fetcher as FetcherTrait}; use kitsune_db::{ model::{ account::Account, @@ -79,7 +80,7 @@ async fn handle_mentions( async fn handle_custom_emojis( db_conn: &mut AsyncPgConnection, post_id: Uuid, - fetcher: &Fetcher, + fetcher: &dyn FetcherTrait, tags: &[Tag], ) -> Result<()> { let emoji_iter = tags.iter().filter(|tag| tag.r#type == TagType::Emoji); @@ -107,7 +108,8 @@ async fn handle_custom_emojis( emoji_text: emoji_tag.name.clone(), }) .try_collect::>() - .await?; + .await + .map_err(Error::FetchEmoji)?; diesel::insert_into(posts_custom_emojis::table) .values(emojis) @@ -171,7 +173,7 @@ pub struct ProcessNewObject<'a> { db_pool: &'a PgPool, embed_client: Option<&'a EmbedClient>, object: Box, - fetcher: &'a Fetcher, + fetcher: &'a dyn FetcherTrait, search_backend: &'a AnySearchBackend, } @@ -184,7 +186,7 @@ struct PreprocessedObject<'a> { content_lang: Language, db_pool: &'a PgPool, object: Box, - fetcher: &'a Fetcher, + fetcher: &'a dyn FetcherTrait, search_backend: &'a AnySearchBackend, } @@ -208,7 +210,11 @@ async fn preprocess_object( return Err(Error::InvalidDocument); } - let Some(author) = fetcher.fetch_actor(attributed_to.into()).await? else { + let Some(author) = fetcher + .fetch_account(attributed_to.into()) + .await + .map_err(Error::FetchAccount)? + else { return Err(Error::NotFound); }; @@ -218,8 +224,14 @@ async fn preprocess_object( let visibility = Visibility::from_activitypub(&user, &object).unwrap(); let in_reply_to_id = if let Some(ref in_reply_to) = object.in_reply_to { fetcher - .fetch_object_inner(in_reply_to, call_depth + 1) - .await? + .fetch_post( + PostFetchOptions::builder() + .url(in_reply_to) + .call_depth(call_depth + 1) + .build(), + ) + .await + .map_err(Error::FetchPost)? .map(|post| post.id) } else { None diff --git a/crates/kitsune-activitypub/tests/fetcher/basic.rs b/crates/kitsune-activitypub/tests/fetcher/basic.rs index 58965fc87..0b7b9163e 100644 --- a/crates/kitsune-activitypub/tests/fetcher/basic.rs +++ b/crates/kitsune-activitypub/tests/fetcher/basic.rs @@ -139,7 +139,7 @@ async fn fetch_note() { .build(); let note = fetcher - .fetch_post("https://corteximplant.com/@0x0/109501674056556919") + .fetch_post("https://corteximplant.com/@0x0/109501674056556919".into()) .await .expect("Fetch note") .unwrap(); diff --git a/crates/kitsune-activitypub/tests/fetcher/filter.rs b/crates/kitsune-activitypub/tests/fetcher/filter.rs index 5eaa70c32..92442104c 100644 --- a/crates/kitsune-activitypub/tests/fetcher/filter.rs +++ b/crates/kitsune-activitypub/tests/fetcher/filter.rs @@ -47,7 +47,7 @@ async fn federation_allow() { assert!(matches!( *fetcher - .fetch_post("https://example.com/fakeobject") + .fetch_post("https://example.com/fakeobject".into()) .await .unwrap_err() .downcast_ref() @@ -57,7 +57,7 @@ async fn federation_allow() { assert!(matches!( *fetcher - .fetch_post("https://other.badstuff.com/otherfake") + .fetch_post("https://other.badstuff.com/otherfake".into()) .await .unwrap_err() .downcast_ref() @@ -77,7 +77,7 @@ async fn federation_allow() { assert!(matches!( fetcher - .fetch_post("https://corteximplant.com/@0x0/109501674056556919") + .fetch_post("https://corteximplant.com/@0x0/109501674056556919".into()) .await, Ok(..) )); @@ -118,7 +118,7 @@ async fn federation_deny() { assert!(matches!( fetcher - .fetch_post("https://example.com/fakeobject") + .fetch_post("https://example.com/fakeobject".into()) .await .unwrap_err() .downcast_ref() @@ -127,7 +127,7 @@ async fn federation_deny() { )); assert!(matches!( *fetcher - .fetch_post("https://other.badstuff.com/otherfake") + .fetch_post("https://other.badstuff.com/otherfake".into()) .await .unwrap_err() .downcast_ref() diff --git a/crates/kitsune-activitypub/tests/fetcher/infinite.rs b/crates/kitsune-activitypub/tests/fetcher/infinite.rs index 7e5eec8fe..f8c84a1e8 100644 --- a/crates/kitsune-activitypub/tests/fetcher/infinite.rs +++ b/crates/kitsune-activitypub/tests/fetcher/infinite.rs @@ -110,7 +110,7 @@ async fn fetch_infinitely_long_reply_chain() { .build(); assert!(fetcher - .fetch_post("https://example.com/notes/0") + .fetch_post("https://example.com/notes/0".into()) .await .is_ok()); }) diff --git a/crates/kitsune-activitypub/tests/fetcher/origin.rs b/crates/kitsune-activitypub/tests/fetcher/origin.rs index 7df829ce3..f293feb32 100644 --- a/crates/kitsune-activitypub/tests/fetcher/origin.rs +++ b/crates/kitsune-activitypub/tests/fetcher/origin.rs @@ -70,7 +70,7 @@ async fn check_ap_id_authority() { .build(); let _ = fetcher - .fetch_post("https://example.com/@0x0/109501674056556919") + .fetch_post("https://example.com/@0x0/109501674056556919".into()) .await .unwrap_err(); }) @@ -109,7 +109,7 @@ async fn check_ap_content_type() { assert!(matches!( *fetcher - .fetch_post("https://corteximplant.com/users/0x0") + .fetch_post("https://corteximplant.com/users/0x0".into()) .await .unwrap_err() .downcast_ref() diff --git a/crates/kitsune-core/src/traits/fetcher.rs b/crates/kitsune-core/src/traits/fetcher.rs index 9fc3699af..e5c02ec99 100644 --- a/crates/kitsune-core/src/traits/fetcher.rs +++ b/crates/kitsune-core/src/traits/fetcher.rs @@ -12,7 +12,7 @@ pub struct AccountFetchOptions<'a> { #[builder(default, setter(strip_option))] pub acct: Option<(&'a str, &'a str)>, - /// Refetch the ActivityPub entity + /// Refetch the account /// /// This is mainly used to refresh possibly stale actors /// @@ -20,7 +20,7 @@ pub struct AccountFetchOptions<'a> { #[builder(default = false)] pub refetch: bool, - /// URL of the ActivityPub entity + /// URL of the account pub url: &'a str, } @@ -30,6 +30,22 @@ impl<'a> From<&'a str> for AccountFetchOptions<'a> { } } +#[derive(Clone, Copy, Debug, TypedBuilder)] +pub struct PostFetchOptions<'a> { + /// Call depth of recursive calls of the post fetch logic + #[builder(default)] + pub call_depth: u32, + + /// URL of the object + pub url: &'a str, +} + +impl<'a> From<&'a str> for PostFetchOptions<'a> { + fn from(value: &'a str) -> Self { + Self::builder().url(value).build() + } +} + #[async_trait] pub trait Fetcher: Send + Sync + 'static { fn resolver(&self) -> Arc; @@ -41,7 +57,7 @@ pub trait Fetcher: Send + Sync + 'static { async fn fetch_emoji(&self, url: &str) -> Result, BoxError>; - async fn fetch_post(&self, url: &str) -> Result, BoxError>; + async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result, BoxError>; } #[async_trait] @@ -61,8 +77,8 @@ impl Fetcher for Arc { (**self).fetch_emoji(url).await } - async fn fetch_post(&self, url: &str) -> Result, BoxError> { - (**self).fetch_post(url).await + async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result, BoxError> { + (**self).fetch_post(opts).await } } @@ -98,9 +114,9 @@ where Ok(None) } - async fn fetch_post(&self, url: &str) -> Result, BoxError> { + async fn fetch_post(&self, opts: PostFetchOptions<'_>) -> Result, BoxError> { for fetcher in self { - if let Some(post) = fetcher.fetch_post(url).await? { + if let Some(post) = fetcher.fetch_post(opts).await? { return Ok(Some(post)); } } diff --git a/crates/kitsune-email/Cargo.toml b/crates/kitsune-email/Cargo.toml index 0cc408cca..1352ef724 100644 --- a/crates/kitsune-email/Cargo.toml +++ b/crates/kitsune-email/Cargo.toml @@ -12,7 +12,6 @@ askama = "0.12.1" askama_axum = "0.3.0" # Damn it, cargo. Because "kitsune" uses "askama" with the axum feature, we have to have the crate available here as well.. diesel = "2.1.4" diesel-async = "0.4.1" -iso8601-timestamp = "0.2.12" kitsune-db = { path = "../kitsune-db" } kitsune-url = { path = "../kitsune-url" } lettre = { version = "0.11.2", default-features = false, features = [ diff --git a/crates/kitsune-email/src/service.rs b/crates/kitsune-email/src/service.rs index 3cf33cb15..9e4c84986 100644 --- a/crates/kitsune-email/src/service.rs +++ b/crates/kitsune-email/src/service.rs @@ -1,8 +1,7 @@ use crate::{error::Result, mails::confirm_account::ConfirmAccount, MailSender}; -use diesel::{ExpressionMethods, QueryDsl}; +use diesel::{ExpressionMethods, NullableExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use iso8601_timestamp::Timestamp; -use kitsune_db::{model::user::User, schema::users, PgPool}; +use kitsune_db::{function::now, model::user::User, schema::users, PgPool}; use kitsune_url::UrlService; use lettre::{AsyncSmtpTransport, Tokio1Executor}; use scoped_futures::ScopedFutureExt; @@ -22,11 +21,16 @@ impl Mailing { self.sender.is_some() } + #[must_use] + pub fn sender(&self) -> Option>> { + self.sender.clone() + } + pub async fn mark_as_confirmed(&self, user_id: Uuid) -> Result<()> { self.db_pool .with_connection(|db_conn| { diesel::update(users::table.find(user_id)) - .set(users::confirmed_at.eq(Timestamp::now_utc())) + .set(users::confirmed_at.eq(now().nullable())) .execute(db_conn) .scoped() }) @@ -43,7 +47,7 @@ impl Mailing { .filter(users::confirmation_token.eq(confirmation_token)) .filter(users::confirmed_at.is_null()), ) - .set(users::confirmed_at.eq(Timestamp::now_utc())) + .set(users::confirmed_at.eq(now().nullable())) .execute(db_conn) .scoped() }) diff --git a/crates/kitsune-service/src/search.rs b/crates/kitsune-service/src/search.rs index f6f7ee91c..3e870f60c 100644 --- a/crates/kitsune-service/src/search.rs +++ b/crates/kitsune-service/src/search.rs @@ -38,7 +38,7 @@ pub struct Search<'a> { max_id: Option, } -#[derive(TypedBuilder)] +#[derive(Clone, TypedBuilder)] pub struct SearchService { db_pool: PgPool, fetcher: Arc, @@ -71,7 +71,7 @@ impl SearchService { } } - match self.fetcher.fetch_post(searched_url.as_str()).await { + match self.fetcher.fetch_post(searched_url.as_str().into()).await { Ok(Some(post)) => results.push(SearchResult::Post(post)), Ok(None) => debug!("no post found"), Err(error) => { diff --git a/crates/kitsune-service/src/user.rs b/crates/kitsune-service/src/user.rs index 0900db7e8..8a9ee01fd 100644 --- a/crates/kitsune-service/src/user.rs +++ b/crates/kitsune-service/src/user.rs @@ -4,11 +4,9 @@ use super::{ }; use crate::error::{Error, Result, UserError}; use argon2::{password_hash::SaltString, Argon2, PasswordHasher}; -use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use futures_util::future::OptionFuture; use garde::Validate; -use iso8601_timestamp::Timestamp; use kitsune_captcha::ChallengeStatus; use kitsune_db::{ model::{ diff --git a/kitsune/Cargo.toml b/kitsune/Cargo.toml index 74b1e7cb9..ff3d552e7 100644 --- a/kitsune/Cargo.toml +++ b/kitsune/Cargo.toml @@ -45,6 +45,7 @@ human-panic = "1.2.2" hyper = { version = "0.14.27", features = ["deprecated"] } include_dir = "0.7.3" iso8601-timestamp = "0.2.12" +kitsune-activitypub = { path = "../crates/kitsune-activitypub" } kitsune-blocking = { path = "../crates/kitsune-blocking" } kitsune-cache = { path = "../crates/kitsune-cache" } kitsune-captcha = { path = "../crates/kitsune-captcha" } @@ -59,6 +60,7 @@ kitsune-http-signatures = { path = "../crates/kitsune-http-signatures" } kitsune-job-runner = { path = "../kitsune-job-runner" } kitsune-jobs = { path = "../crates/kitsune-jobs" } kitsune-language = { path = "../crates/kitsune-language" } +kitsune-messaging = { path = "../crates/kitsune-messaging" } kitsune-observability = { path = "../crates/kitsune-observability" } kitsune-search = { path = "../crates/kitsune-search" } kitsune-service = { path = "../crates/kitsune-service" } @@ -123,7 +125,6 @@ kitsune-oidc = { path = "../crates/kitsune-oidc", optional = true } [dev-dependencies] deadpool-redis = "0.13.0" -kitsune-activitypub = { path = "../crates/kitsune-activitypub" } kitsune-http-client = { path = "../crates/kitsune-http-client" } kitsune-test = { path = "../crates/kitsune-test" } pretty_assertions = "1.4.0" diff --git a/kitsune/src/error.rs b/kitsune/src/error.rs index dec29d7f4..aa216a6a9 100644 --- a/kitsune/src/error.rs +++ b/kitsune/src/error.rs @@ -5,6 +5,7 @@ use axum::{ response::{IntoResponse, Response}, }; use http::StatusCode; +use kitsune_core::error::{BoxError, HttpError}; use kitsune_service::error::{Error as ServiceError, PostError}; use std::str::ParseBoolError; use thiserror::Error; @@ -14,6 +15,9 @@ pub type Result = std::result::Result; #[derive(Debug, Error)] #[non_exhaustive] pub enum Error { + #[error(transparent)] + ActivityPub(#[from] kitsune_activitypub::error::Error), + #[error(transparent)] Blocking(#[from] kitsune_blocking::Error), @@ -32,6 +36,9 @@ pub enum Error { #[error(transparent)] Der(#[from] der::Error), + #[error(transparent)] + Fetcher(BoxError), + #[error(transparent)] Http(#[from] http::Error), @@ -42,6 +49,9 @@ pub enum Error { #[error(transparent)] Mastodon(#[from] kitsune_mastodon::error::Error), + #[error(transparent)] + Messaging(kitsune_messaging::BoxError), + #[error(transparent)] Multipart(#[from] MultipartError), @@ -125,7 +135,7 @@ impl IntoResponse for Error { Self::Service(ServiceError::Validate(report)) => { (StatusCode::BAD_REQUEST, Json(report)).into_response() } - err @ Self::Service(ServiceError::NotFound) => { + err @ Self::CoreHttp(HttpError::NotFound) => { (StatusCode::NOT_FOUND, err.to_string()).into_response() } err @ Self::Service(ServiceError::Post(PostError::BadRequest)) => { diff --git a/kitsune/src/http/extractor/auth.rs b/kitsune/src/http/extractor/auth.rs index 835ad5870..7d00a644a 100644 --- a/kitsune/src/http/extractor/auth.rs +++ b/kitsune/src/http/extractor/auth.rs @@ -62,7 +62,7 @@ impl FromRequestParts } let (user, account) = state - .db_pool() + .db_pool .with_connection(|db_conn| { user_account_query .select(<(User, Account)>::as_select()) diff --git a/kitsune/src/http/extractor/signed_activity.rs b/kitsune/src/http/extractor/signed_activity.rs index 3ae29627d..b20a7334b 100644 --- a/kitsune/src/http/extractor/signed_activity.rs +++ b/kitsune/src/http/extractor/signed_activity.rs @@ -62,25 +62,32 @@ impl FromRequest for SignedActivity { }; let ap_id = activity.actor(); - let remote_user = state - .fetcher() - .fetch_actor(ap_id.into()) + let Some(remote_user) = state + .fetcher + .fetch_account(ap_id.into()) .await - .map_err(Error::from)?; + .map_err(Error::Fetcher)? + else { + return Err(Error::CoreHttp(HttpError::BadRequest).into()); + }; - if !verify_signature(&parts, state.db_pool(), Some(&remote_user)).await? { + if !verify_signature(&parts, &state.db_pool, Some(&remote_user)).await? { // Refetch the user and try again. Maybe they rekeyed let opts = AccountFetchOptions::builder() .refetch(true) .url(ap_id) .build(); - let remote_user = state - .fetcher() - .fetch_actor(opts) + + let Some(remote_user) = state + .fetcher + .fetch_account(opts) .await - .map_err(Error::from)?; + .map_err(Error::Fetcher)? + else { + return Err(Error::CoreHttp(HttpError::BadRequest).into()); + }; - if !verify_signature(&parts, state.db_pool(), Some(&remote_user)).await? { + if !verify_signature(&parts, &state.db_pool, Some(&remote_user)).await? { return Err(StatusCode::UNAUTHORIZED.into_response()); } } diff --git a/kitsune/src/http/handler/custom_emojis.rs b/kitsune/src/http/handler/custom_emojis.rs index afa687f9a..d96da6c5a 100644 --- a/kitsune/src/http/handler/custom_emojis.rs +++ b/kitsune/src/http/handler/custom_emojis.rs @@ -1,6 +1,6 @@ use crate::{error::Result, http::responder::ActivityPubJson, state::Zustand}; use axum::{debug_handler, extract::Path, extract::State, routing, Router}; -use kitsune_core::mapping::IntoObject; +use kitsune_activitypub::mapping::IntoObject; use kitsune_service::custom_emoji::CustomEmojiService; use kitsune_type::ap::emoji::Emoji; use speedy_uuid::Uuid; @@ -12,8 +12,9 @@ async fn get( Path(id): Path, ) -> Result> { let custom_emoji = emoji_service.get_by_id(id).await?; + Ok(ActivityPubJson( - custom_emoji.into_object(&state.core).await?, + custom_emoji.into_object(state.ap_state()).await?, )) } diff --git a/kitsune/src/http/handler/posts/activity.rs b/kitsune/src/http/handler/posts/activity.rs index 8f0d99956..448bf0518 100644 --- a/kitsune/src/http/handler/posts/activity.rs +++ b/kitsune/src/http/handler/posts/activity.rs @@ -3,7 +3,7 @@ use axum::{ debug_handler, extract::{Path, State}, }; -use kitsune_core::mapping::IntoActivity; +use kitsune_activitypub::mapping::IntoActivity; use kitsune_type::ap::Activity; use speedy_uuid::Uuid; @@ -12,6 +12,6 @@ pub async fn get( State(state): State, Path(id): Path, ) -> Result> { - let post = state.service().post.get_by_id(id, None).await?; - Ok(ActivityPubJson(post.into_activity(&state.core).await?)) + let post = state.service.post.get_by_id(id, None).await?; + Ok(ActivityPubJson(post.into_activity(state.ap_state()).await?)) } diff --git a/kitsune/src/http/handler/posts/mod.rs b/kitsune/src/http/handler/posts/mod.rs index 9edea8b10..1161d4aa9 100644 --- a/kitsune/src/http/handler/posts/mod.rs +++ b/kitsune/src/http/handler/posts/mod.rs @@ -1,6 +1,6 @@ use crate::{error::Result, http::responder::ActivityPubJson, state::Zustand}; use axum::{debug_handler, extract::Path, extract::State, routing, Router}; -use kitsune_core::mapping::IntoObject; +use kitsune_activitypub::mapping::IntoObject; use kitsune_service::post::PostService; use kitsune_type::ap::Object; use speedy_uuid::Uuid; @@ -14,7 +14,7 @@ async fn get( Path(id): Path, ) -> Result> { let post = post.get_by_id(id, None).await?; - Ok(ActivityPubJson(post.into_object(&state.core).await?)) + Ok(ActivityPubJson(post.into_object(state.ap_state()).await?)) } pub fn routes() -> Router { diff --git a/kitsune/src/http/handler/users/inbox.rs b/kitsune/src/http/handler/users/inbox.rs index 94b495ab5..a5a48b9ce 100644 --- a/kitsune/src/http/handler/users/inbox.rs +++ b/kitsune/src/http/handler/users/inbox.rs @@ -7,13 +7,12 @@ use axum::{debug_handler, extract::State}; use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, SelectableHelper}; use diesel_async::RunQueryDsl; use iso8601_timestamp::Timestamp; +use kitsune_activitypub::{ + error::Error as ApError, process_new_object, update_object, ProcessNewObject, +}; use kitsune_core::{ - activitypub::{process_new_object, update_object, ProcessNewObject}, - error::Error as CoreError, + error::HttpError, event::{post::EventType, PostEvent}, - job::deliver::accept::DeliverAccept, - service::{federation_filter::FederationFilterService, job::Enqueue}, - try_join, }; use kitsune_db::{ model::{ @@ -27,14 +26,18 @@ use kitsune_db::{ post_permission_check::{PermissionCheck, PostPermissionCheckExt}, schema::{accounts_follows, accounts_preferences, notifications, posts, posts_favourites}, }; +use kitsune_federation_filter::FederationFilter; +use kitsune_jobs::deliver::accept::DeliverAccept; +use kitsune_service::job::Enqueue; use kitsune_type::ap::{Activity, ActivityType}; +use kitsune_util::try_join; use scoped_futures::ScopedFutureExt; use speedy_uuid::Uuid; use std::ops::Not; async fn accept_activity(state: &Zustand, activity: Activity) -> Result<()> { state - .db_pool() + .db_pool .with_connection(|db_conn| { diesel::update( accounts_follows::table.filter(accounts_follows::url.eq(activity.object())), @@ -49,10 +52,17 @@ async fn accept_activity(state: &Zustand, activity: Activity) -> Result<()> { } async fn announce_activity(state: &Zustand, author: Account, activity: Activity) -> Result<()> { - let reposted_post = state.fetcher().fetch_object(activity.object()).await?; + let Some(reposted_post) = state + .fetcher + .fetch_post(activity.object().into()) + .await + .map_err(Error::Fetcher)? + else { + return Err(HttpError::BadRequest.into()); + }; state - .db_pool() + .db_pool .with_connection(|db_conn| { diesel::insert_into(posts::table) .values(NewPost { @@ -83,23 +93,23 @@ async fn create_activity(state: &Zustand, author: Account, activity: Activity) - if let Some(object) = activity.object.into_object() { let process_data = ProcessNewObject::builder() .author(&author) - .db_pool(state.db_pool()) - .embed_client(state.embed_client()) - .fetcher(state.fetcher()) + .db_pool(&state.db_pool) + .embed_client(state.embed_client.as_ref()) + .fetcher(&state.fetcher) .object(Box::new(object)) - .search_backend(state.service().search.backend()) + .search_backend(state.service.search.backend()) .build(); let new_post = process_new_object(process_data).await?; state - .event_emitter() + .event_emitter .post .emit(PostEvent { r#type: EventType::Create, post_id: new_post.id, }) .await - .map_err(CoreError::Event)?; + .map_err(Error::Messaging)?; } Ok(()) @@ -107,7 +117,7 @@ async fn create_activity(state: &Zustand, author: Account, activity: Activity) - async fn delete_activity(state: &Zustand, author: Account, activity: Activity) -> Result<()> { let post_id = state - .db_pool() + .db_pool .with_connection(|db_conn| { async move { let post_id = posts::table @@ -128,27 +138,32 @@ async fn delete_activity(state: &Zustand, author: Account, activity: Activity) - .await?; state - .event_emitter() + .event_emitter .post .emit(PostEvent { r#type: EventType::Delete, post_id, }) .await - .map_err(CoreError::Event)?; + .map_err(Error::Messaging)?; Ok(()) } async fn follow_activity(state: &Zustand, author: Account, activity: Activity) -> Result<()> { - let followed_user = state - .fetcher() - .fetch_actor(activity.object().into()) - .await?; + let Some(followed_user) = state + .fetcher + .fetch_account(activity.object().into()) + .await + .map_err(Error::Fetcher)? + else { + return Err(HttpError::BadRequest.into()); + }; + let approved_at = followed_user.locked.not().then(Timestamp::now_utc); let follow_id = state - .db_pool() + .db_pool .with_connection(|db_conn| { diesel::insert_into(accounts_follows::table) .values(NewFollow { @@ -168,7 +183,7 @@ async fn follow_activity(state: &Zustand, author: Account, activity: Activity) - if followed_user.local { let preferences = state - .db_pool() + .db_pool .with_connection(|mut db_conn| { accounts_preferences::table .find(followed_user.id) @@ -190,7 +205,7 @@ async fn follow_activity(state: &Zustand, author: Account, activity: Activity) - .follow(author.id) }; state - .db_pool() + .db_pool .with_connection(|mut db_conn| { diesel::insert_into(notifications::table) .values(notification) @@ -201,7 +216,7 @@ async fn follow_activity(state: &Zustand, author: Account, activity: Activity) - .await?; } state - .service() + .service .job .enqueue(Enqueue::builder().job(DeliverAccept { follow_id }).build()) .await?; @@ -216,7 +231,7 @@ async fn like_activity(state: &Zustand, author: Account, activity: Activity) -> .build(); state - .db_pool() + .db_pool .with_connection(|db_conn| { async move { let post = posts::table @@ -248,7 +263,7 @@ async fn like_activity(state: &Zustand, author: Account, activity: Activity) -> async fn reject_activity(state: &Zustand, author: Account, activity: Activity) -> Result<()> { state - .db_pool() + .db_pool .with_connection(|db_conn| { diesel::delete( accounts_follows::table.filter( @@ -267,7 +282,7 @@ async fn reject_activity(state: &Zustand, author: Account, activity: Activity) - async fn undo_activity(state: &Zustand, author: Account, activity: Activity) -> Result<()> { state - .db_pool() + .db_pool .with_connection(|db_conn| { async move { // An undo activity can apply for likes and follows and announces @@ -311,23 +326,23 @@ async fn update_activity(state: &Zustand, author: Account, activity: Activity) - if let Some(object) = activity.object.into_object() { let process_data = ProcessNewObject::builder() .author(&author) - .db_pool(state.db_pool()) - .embed_client(state.embed_client()) - .fetcher(state.fetcher()) + .db_pool(&state.db_pool) + .embed_client(state.embed_client.as_ref()) + .fetcher(&state.fetcher) .object(Box::new(object)) - .search_backend(state.service().search.backend()) + .search_backend(state.service.search.backend()) .build(); let modified_post = update_object(process_data).await?; state - .event_emitter() + .event_emitter .post .emit(PostEvent { r#type: EventType::Update, post_id: modified_post.id, }) .await - .map_err(CoreError::Event)?; + .map_err(Error::Messaging)?; } Ok(()) @@ -342,14 +357,14 @@ async fn update_activity(state: &Zustand, author: Account, activity: Activity) - #[debug_handler(state = Zustand)] pub async fn post( State(state): State, - State(federation_filter): State, + State(federation_filter): State, SignedActivity(author, activity): SignedActivity, ) -> Result<()> { increment_counter!("received_activities"); if !federation_filter .is_entity_allowed(&activity) - .map_err(CoreError::from)? + .map_err(ApError::from)? { return Ok(()); } diff --git a/kitsune/src/http/handler/users/mod.rs b/kitsune/src/http/handler/users/mod.rs index 7f78e9efd..e5893d752 100644 --- a/kitsune/src/http/handler/users/mod.rs +++ b/kitsune/src/http/handler/users/mod.rs @@ -4,7 +4,8 @@ use axum::{ routing::{self, post}, Router, }; -use kitsune_core::{error::HttpError, mapping::IntoObject}; +use kitsune_activitypub::mapping::IntoObject; +use kitsune_core::error::HttpError; use kitsune_service::account::AccountService; use kitsune_type::ap::actor::Actor; use speedy_uuid::Uuid; @@ -24,7 +25,9 @@ async fn get( .await? .ok_or(HttpError::NotFound)?; - Ok(ActivityPubJson(account.into_object(&state.core).await?)) + Ok(ActivityPubJson( + account.into_object(state.ap_state()).await?, + )) } pub fn routes() -> Router { diff --git a/kitsune/src/http/handler/users/outbox.rs b/kitsune/src/http/handler/users/outbox.rs index 44390e8de..304a75ea8 100644 --- a/kitsune/src/http/handler/users/outbox.rs +++ b/kitsune/src/http/handler/users/outbox.rs @@ -3,7 +3,7 @@ use axum::extract::{OriginalUri, Path, Query, State}; use axum_extra::either::Either; use diesel::{BelongingToDsl, ExpressionMethods, QueryDsl, SelectableHelper}; use futures_util::{stream, StreamExt, TryStreamExt}; -use kitsune_core::mapping::IntoActivity; +use kitsune_activitypub::mapping::IntoActivity; use kitsune_db::{ model::{account::Account, post::Post}, post_permission_check::{PermissionCheck, PostPermissionCheckExt}, @@ -38,7 +38,7 @@ pub async fn get( Query(query): Query, ) -> Result>, ActivityPubJson>> { let account = state - .db_pool() + .db_pool .with_connection(|db_conn| { use diesel_async::RunQueryDsl; @@ -79,7 +79,7 @@ pub async fn get( posts.last().map_or(Uuid::nil(), |post| post.id) ); let ordered_items = stream::iter(posts) - .then(|post| post.into_activity(&state.core)) + .then(|post| post.into_activity(state.ap_state())) .try_collect() .await?; @@ -94,7 +94,7 @@ pub async fn get( }))) } else { let public_post_count = state - .db_pool() + .db_pool .with_connection(|db_conn| { use diesel_async::RunQueryDsl; diff --git a/kitsune/src/http/handler/well_known/webfinger.rs b/kitsune/src/http/handler/well_known/webfinger.rs index af8522add..aec8edcd6 100644 --- a/kitsune/src/http/handler/well_known/webfinger.rs +++ b/kitsune/src/http/handler/well_known/webfinger.rs @@ -134,9 +134,9 @@ mod tests { .unwrap(), ) .search_backend(NoopSearchService) + .account_cache(Arc::new(NoopCache.into())) .post_cache(Arc::new(NoopCache.into())) - .user_cache(Arc::new(NoopCache.into())) - .webfinger(Webfinger::new(Arc::new(NoopCache.into()))) + .resolver(Arc::new(Webfinger::new(Arc::new(NoopCache.into())))) .build(); let context_repo = KitsuneContextRepo::builder() @@ -156,7 +156,7 @@ mod tests { .fetcher(fetcher) .job_service(job_service) .url_service(url_service) - .webfinger(Webfinger::new(Arc::new(NoopCache.into()))) + .resolver(Arc::new(Webfinger::new(Arc::new(NoopCache.into())))) .build() } diff --git a/kitsune/src/lib.rs b/kitsune/src/lib.rs index 9c316c9f0..34f4869c3 100644 --- a/kitsune/src/lib.rs +++ b/kitsune/src/lib.rs @@ -4,6 +4,7 @@ #![allow( clippy::cast_sign_loss, clippy::missing_errors_doc, + clippy::missing_panics_doc, clippy::module_name_repetitions, forbidden_lint_groups )] @@ -53,6 +54,7 @@ use kitsune_url::UrlService; #[cfg(feature = "oidc")] use {futures_util::future::OptionFuture, kitsune_oidc::OidcService}; +#[allow(clippy::too_many_lines)] pub async fn initialise_state( config: &Configuration, db_pool: PgPool, @@ -193,7 +195,7 @@ pub async fn initialise_state( let search_service = SearchService::builder() .db_pool(db_pool.clone()) - .fetcher(fetcher) + .fetcher(fetcher.clone()) .search_backend(search_backend) .build(); @@ -215,6 +217,7 @@ pub async fn initialise_state( post: status_event_emitter, }, federation_filter, + fetcher, #[cfg(feature = "mastodon-api")] mastodon_mapper, oauth2: oauth2_service, diff --git a/kitsune/src/main.rs b/kitsune/src/main.rs index fc26caae5..55f9191ca 100644 --- a/kitsune/src/main.rs +++ b/kitsune/src/main.rs @@ -10,6 +10,7 @@ use eyre::Context; use kitsune::consts::STARTUP_FIGLET; use kitsune_config::Configuration; use kitsune_core::consts::VERSION; +use kitsune_job_runner::JobDispatcherState; use std::{ borrow::Cow, env, future, @@ -109,9 +110,18 @@ async fn boot() -> eyre::Result<()> { } } }); + + let dispatcher_state = JobDispatcherState::builder() + .attachment_service(state.service.attachment.clone()) + .db_pool(state.db_pool.clone()) + .federation_filter(state.federation_filter.clone()) + .mail_sender(state.service.mailing.sender()) + .url_service(state.service.url.clone()) + .build(); + tokio::spawn(kitsune_job_runner::run_dispatcher( job_queue, - state.core.clone(), + dispatcher_state, config.job_queue.num_workers.get(), )); diff --git a/kitsune/src/state.rs b/kitsune/src/state.rs index 5586e30ad..95864c4c0 100644 --- a/kitsune/src/state.rs +++ b/kitsune/src/state.rs @@ -1,6 +1,6 @@ use crate::oauth2::{OAuth2Service, OAuthEndpoint}; use axum_extra::extract::cookie; -use kitsune_core::event::PostEventEmitter; +use kitsune_core::{event::PostEventEmitter, traits::Fetcher}; use kitsune_db::PgPool; use kitsune_email::MailingService; use kitsune_embed::Client as EmbedClient; @@ -45,7 +45,7 @@ impl_from_ref! { impl_from_ref! { Zustand; [ - MastodonMapper => |input: &Zustand| input.mastodon_mapper().clone() + MastodonMapper => |input: &Zustand| input.mastodon_mapper.clone() ] } @@ -55,7 +55,7 @@ impl_from_ref! { AccountService => |input: &Zustand| input.service.account.clone(), AttachmentService => |input: &Zustand| input.service.attachment.clone(), CustomEmojiService => |input: &Zustand| input.service.custom_emoji.clone(), - FederationFilter => |input: &Zustand| input.service.federation_filter.clone(), + FederationFilter => |input: &Zustand| input.federation_filter.clone(), JobService => |input: &Zustand| input.service.job.clone(), MailingService => |input: &Zustand| input.service.mailing.clone(), NotificationService => |input: &Zustand| input.service.notification.clone(), @@ -71,10 +71,11 @@ impl_from_ref! { impl_from_ref! { Zustand; [ - PostEventEmitter => |input: &Zustand| input.event_emitter().post.clone() + PostEventEmitter => |input: &Zustand| input.event_emitter.post.clone() ] } +#[derive(Clone)] pub struct SessionConfig { pub cookie_key: cookie::Key, pub flash_config: axum_flash::Config, @@ -158,6 +159,7 @@ pub struct ZustandInner { pub embed_client: Option, pub event_emitter: EventEmitter, pub federation_filter: FederationFilter, + pub fetcher: Arc, #[cfg(feature = "mastodon-api")] pub mastodon_mapper: MastodonMapper, pub oauth2: OAuth2Service, @@ -168,6 +170,21 @@ pub struct ZustandInner { pub session_config: SessionConfig, } +impl ZustandInner { + #[must_use] + pub fn ap_state(&self) -> kitsune_activitypub::mapping::State<'_> { + kitsune_activitypub::mapping::State::builder() + .db_pool(&self.db_pool) + .service( + kitsune_activitypub::mapping::Service::builder() + .attachment(&self.service.attachment) + .url(&self.service.url) + .build(), + ) + .build() + } +} + #[derive(Clone)] pub struct Zustand(Arc);