Skip to content

Commit

Permalink
Send custom text and reporter in the topic payload
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Mar 27, 2024
1 parent 25fd587 commit b4b4426
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 82 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ libc = "0.2.153"
log = "0.4.21"
nostr-sdk = "0.29.0"
ractor = "0.9.7"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.115"
tokio = { version = "1.36.0", features = ["full"] }
tokio-util = { version = "0.7.10", features = ["rt"] }
Expand Down
40 changes: 26 additions & 14 deletions src/actors/event_enqueuer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::actors::messages::EventEnqueuerMessage;
use crate::actors::messages::EventToReport;
use crate::actors::messages::ReportRequest;
use anyhow::Result;
use ractor::{Actor, ActorProcessingErr, ActorRef};
use tracing::{error, info};
Expand All @@ -21,7 +21,7 @@ pub struct State<T: PubsubPublisher> {

#[ractor::async_trait]
pub trait PubsubPublisher: Send + Sync + 'static {
async fn publish_event(&mut self, event: &EventToReport) -> Result<()>;
async fn publish_event(&mut self, event: &ReportRequest) -> Result<()>;
}

#[ractor::async_trait]
Expand Down Expand Up @@ -50,13 +50,16 @@ where
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
Self::Msg::Enqueue(event_to_report) => {
if let Err(e) = state.pubsub_publisher.publish_event(&event_to_report).await {
Self::Msg::Enqueue(report_request) => {
if let Err(e) = state.pubsub_publisher.publish_event(&report_request).await {
error!("Failed to publish event: {}", e);
return Ok(());
}

info!("Event {} enqueued for moderation", event_to_report.id());
info!(
"Event {} enqueued for moderation",
report_request.reported_event.id()
);
}
}

Expand All @@ -67,14 +70,16 @@ where
#[cfg(test)]
mod tests {
use nostr_sdk::prelude::{EventBuilder, Keys};
use nostr_sdk::JsonUtil;
use ractor::cast;
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

#[derive(Clone)]
struct TestGooglePublisher {
published_events: Arc<Mutex<Vec<EventToReport>>>,
published_events: Arc<Mutex<Vec<ReportRequest>>>,
}
impl TestGooglePublisher {
fn new() -> Self {
Expand All @@ -86,7 +91,7 @@ mod tests {

#[ractor::async_trait]
impl PubsubPublisher for TestGooglePublisher {
async fn publish_event(&mut self, event: &EventToReport) -> Result<()> {
async fn publish_event(&mut self, event: &ReportRequest) -> Result<()> {
self.published_events.lock().await.push(event.clone());
Ok(())
}
Expand All @@ -105,14 +110,21 @@ mod tests {
.await
.unwrap();

let event_to_report = EventToReport::new(
EventBuilder::text_note("First event", [])
.to_event(&Keys::generate())
.unwrap(),
);
let event_to_report = EventBuilder::text_note("First event", [])
.to_event(&Keys::generate())
.unwrap();

let report_request_string = json!({
"reportedEvent": event_to_report,
"reporterText": "This is hateful. Report it!"
})
.to_string();

let report_request: ReportRequest = serde_json::from_str(&report_request_string).unwrap();

cast!(
event_enqueuer_ref,
EventEnqueuerMessage::Enqueue(event_to_report.clone())
EventEnqueuerMessage::Enqueue(report_request.clone())
)
.unwrap();

Expand All @@ -125,7 +137,7 @@ mod tests {

assert_eq!(
test_google_publisher.published_events.lock().await.as_ref(),
[event_to_report]
[report_request]
);
}
}
65 changes: 39 additions & 26 deletions src/actors/gift_unwrapper.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::actors::messages::{EventToReport, GiftUnwrapperMessage};
use crate::actors::messages::{GiftUnwrapperMessage, ReportRequest};
use anyhow::Result;
use nostr_sdk::prelude::*;
use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort};
Expand All @@ -8,7 +8,7 @@ use tracing::{error, info};
pub struct GiftUnwrapper;
pub struct State {
keys: Keys, // Keys used for decrypting messages.
message_parsed_output_port: OutputPort<EventToReport>, // Port for publishing the events to report parsed from gift wrapped payload
message_parsed_output_port: OutputPort<ReportRequest>, // Port for publishing the events to report parsed from gift wrapped payload
}

#[ractor::async_trait]
Expand Down Expand Up @@ -40,31 +40,29 @@ impl Actor for GiftUnwrapper {
) -> Result<(), ActorProcessingErr> {
match message {
// Decrypts and forwards private messages to the output port.
GiftUnwrapperMessage::UnwrapEvent(event) => {
let unwrapped_gift = match event.extract_rumor(&state.keys) {
GiftUnwrapperMessage::UnwrapEvent(gift_wrap) => {
let unwrapped_gift = match gift_wrap.extract_rumor(&state.keys) {
Ok(gift) => gift,
Err(e) => {
error!("Error extracting rumor: {}", e);
return Ok(());
}
};

match Event::from_json(&unwrapped_gift.rumor.content) {
Ok(event_to_report) => {
match serde_json::from_str::<ReportRequest>(&unwrapped_gift.rumor.content) {
Ok(report_request) => {
info!(
"Request from {} to moderate event {}",
unwrapped_gift.sender,
event_to_report.id()
report_request.reported_event.id()
);

if let Err(e) = event_to_report.verify() {
if let Err(e) = report_request.reported_event.verify() {
error!("Error verifying event: {}", e);
return Ok(());
}

state
.message_parsed_output_port
.send(EventToReport::new(event_to_report))
state.message_parsed_output_port.send(report_request)
}
Err(e) => {
error!("Error parsing event from {}, {}", unwrapped_gift.sender, e);
Expand All @@ -86,21 +84,29 @@ impl Actor for GiftUnwrapper {
// properly implement the nip like created_at treatment. The nip itself is not
// finished at this time so hopefully in the future this can be done through the
// nostr crate.
#[allow(dead_code)]
#[allow(dead_code)] // Besides the tests, it's used from the giftwrapper utility binary
pub async fn create_private_dm_message(
message: &str,
sender_keys: &Keys,
report_request: &ReportRequest,
reporter_keys: &Keys,
receiver_pubkey: &PublicKey,
) -> Result<Event> {
if let Some(reporter_pubkey) = &report_request.reporter_pubkey {
if reporter_pubkey != &reporter_keys.public_key() {
return Err(anyhow::anyhow!(
"Reporter public key doesn't match the provided keys"
));
}
}
// Compose rumor
let kind_14_rumor = EventBuilder::sealed_direct(receiver_pubkey.clone(), message)
.to_unsigned_event(sender_keys.public_key());
let kind_14_rumor =
EventBuilder::sealed_direct(receiver_pubkey.clone(), report_request.as_json())
.to_unsigned_event(reporter_keys.public_key());

// Compose seal
let content: String = NostrSigner::Keys(sender_keys.clone())
let content: String = NostrSigner::Keys(reporter_keys.clone())
.nip44_encrypt(receiver_pubkey.clone(), kind_14_rumor.as_json())
.await?;
let kind_13_seal = EventBuilder::new(Kind::Seal, content, []).to_event(&sender_keys)?;
let kind_13_seal = EventBuilder::new(Kind::Seal, content, []).to_event(&reporter_keys)?;

// Compose gift wrap
let kind_1059_gift_wrap: Event =
Expand All @@ -115,6 +121,7 @@ mod tests {
use crate::actors::messages::GiftWrap;
use crate::actors::TestActor;
use ractor::{cast, Actor};
use serde_json::json;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
Expand All @@ -131,19 +138,25 @@ mod tests {
let sender_keys = Keys::parse(sender_secret).unwrap();

let bad_guy_keys = Keys::generate();
let event_to_report = EventToReport::new(
EventBuilder::text_note("I hate you!!", [])
.to_event(&bad_guy_keys)
.unwrap(),
);

let event_to_report = EventBuilder::text_note("I hate you!!", [])
.to_event(&bad_guy_keys)
.unwrap();

let report_request_string = json!({
"reportedEvent": event_to_report,
"reporterText": "This is hateful. Report it!"
})
.to_string();
let report_request: ReportRequest = serde_json::from_str(&report_request_string).unwrap();

let gift_wrapped_event = GiftWrap::new(
create_private_dm_message(&event_to_report.as_json(), &sender_keys, &receiver_pubkey)
create_private_dm_message(&report_request, &sender_keys, &receiver_pubkey)
.await
.unwrap(),
);

let messages_received = Arc::new(Mutex::new(Vec::<EventToReport>::new()));
let messages_received = Arc::new(Mutex::new(Vec::<ReportRequest>::new()));
let (receiver_actor_ref, receiver_actor_handle) =
Actor::spawn(None, TestActor::default(), messages_received.clone())
.await
Expand Down Expand Up @@ -175,6 +188,6 @@ mod tests {
parser_handle.await.unwrap();
receiver_actor_handle.await.unwrap();

assert_eq!(messages_received.lock().await.as_ref(), [event_to_report]);
assert_eq!(messages_received.lock().await.as_ref(), [report_request]);
}
}
56 changes: 19 additions & 37 deletions src/actors/messages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::actors::utilities::OutputPortSubscriber;
use anyhow::{Context, Result};
use nostr_sdk::prelude::*;
use std::fmt::{Debug, Display, Formatter};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;

//Newtype
#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -27,47 +28,39 @@ pub enum RelayEventDispatcherMessage {
#[derive(Debug)]
pub enum GiftUnwrapperMessage {
UnwrapEvent(GiftWrap),
SubscribeToEventUnwrapped(OutputPortSubscriber<EventToReport>),
SubscribeToEventUnwrapped(OutputPortSubscriber<ReportRequest>),
}

// How to subscribe to actors that publish Event messages like RelayEventDispatcher
// How to subscribe to actors that publish DM messages like RelayEventDispatcher
impl From<GiftWrap> for GiftUnwrapperMessage {
fn from(gift_wrap: GiftWrap) -> Self {
GiftUnwrapperMessage::UnwrapEvent(gift_wrap)
}
}

//Newtype
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EventToReport(Event);
impl EventToReport {
pub fn new(event: Event) -> Self {
EventToReport(event)
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReportRequest {
pub reported_event: Event,
pub reporter_pubkey: Option<PublicKey>,
pub reporter_text: Option<String>,
}

impl ReportRequest {
pub fn as_json(&self) -> String {
self.0.as_json()
}

pub fn id(&self) -> String {
self.0.id().to_string()
}
}
impl Display for EventToReport {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.as_json())
serde_json::to_string(self).expect("Failed to serialize ReportRequest to JSON")
}
}

#[derive(Debug)]
pub enum EventEnqueuerMessage {
Enqueue(EventToReport),
Enqueue(ReportRequest),
}

// How to subscribe to actors that publish EventToReport messages like GiftUnwrapper
impl From<EventToReport> for EventEnqueuerMessage {
fn from(event_to_report: EventToReport) -> Self {
EventEnqueuerMessage::Enqueue(event_to_report)
impl From<ReportRequest> for EventEnqueuerMessage {
fn from(report_request: ReportRequest) -> Self {
EventEnqueuerMessage::Enqueue(report_request)
}
}

Expand All @@ -76,19 +69,8 @@ pub enum TestActorMessage<T> {
EventHappened(T),
}

impl From<EventToReport> for TestActorMessage<EventToReport> {
fn from(event: EventToReport) -> Self {
impl From<ReportRequest> for TestActorMessage<ReportRequest> {
fn from(event: ReportRequest) -> Self {
TestActorMessage::EventHappened(event)
}
}

#[derive(Debug, Clone)]
pub enum LogActorMessage {
Info(String),
}

impl From<EventToReport> for LogActorMessage {
fn from(event: EventToReport) -> Self {
LogActorMessage::Info(event.as_json())
}
}
8 changes: 5 additions & 3 deletions src/adapters/google_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::actors::messages::EventToReport;
use crate::actors::messages::ReportRequest;
use crate::actors::PubsubPublisher;
use anyhow::{Context, Result};
use gcloud_sdk::{
Expand Down Expand Up @@ -33,9 +33,11 @@ impl GooglePublisher {

#[ractor::async_trait]
impl PubsubPublisher for GooglePublisher {
async fn publish_event(&mut self, event: &EventToReport) -> Result<()> {
async fn publish_event(&mut self, report_request: &ReportRequest) -> Result<()> {
let pubsub_message = PubsubMessage {
data: event.as_json().as_bytes().into(),
data: serde_json::to_vec(report_request)
.context("Failed to serialize event to JSON")?
.into(),
..Default::default()
};

Expand Down
3 changes: 2 additions & 1 deletion src/adapters/nostr_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::actors::Subscribe;
use nostr_sdk::prelude::*;
use ractor::{cast, concurrency::Duration, ActorRef};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use tracing::{debug, error, info};

#[derive(Clone)]
pub struct NostrSubscriber {
Expand Down Expand Up @@ -38,6 +38,7 @@ impl Subscribe for NostrSubscriber {

client.disconnect().await?;
client.connect().await;
info!("Subscribing to {:?}", self.filters.clone());
client.subscribe(self.filters.clone(), None).await;

let client_clone = client.clone();
Expand Down
Loading

0 comments on commit b4b4426

Please sign in to comment.