Skip to content

Commit

Permalink
Reduce database locking time
Browse files Browse the repository at this point in the history
  • Loading branch information
bubelov committed Sep 26, 2024
1 parent 0d6adff commit cdb58a9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 29 deletions.
6 changes: 3 additions & 3 deletions src/rpc/sync_elements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ pub async fn run(Params(args): Params<Args>, pool: Data<Arc<Pool>>) -> Result<Me
let elements = overpass::query_bitcoin_merchants().await?;
let mut conn = db::open_connection()?;
let res = sync::merge_overpass_elements(elements, &mut conn).await?;
if res.elements_updated + res.elements_deleted > 2 {
if res.elements_created + res.elements_updated + res.elements_deleted > 2 {
let log_message = format!(
"{} ran a sync with high number of changes (updated: {}, deleted: {})",
token.owner, res.elements_updated, res.elements_deleted,
"{} ran a sync with high number of changes (created: {}, updated: {}, deleted: {})",
token.owner, res.elements_created, res.elements_updated, res.elements_deleted,
);
info!(log_message);
discord::send_message_to_channel(&log_message, discord::CHANNEL_API).await;
Expand Down
51 changes: 25 additions & 26 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::osm::osm::{self, OsmElement};
use crate::osm::overpass::OverpassElement;
use crate::user::User;
use crate::{discord, Error, Result};
use rusqlite::{Connection, Transaction};
use rusqlite::Connection;
use serde::Serialize;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
Expand All @@ -13,8 +13,9 @@ use tracing::{error, info, warn};

#[derive(Serialize)]
pub struct MergeResult {
pub elements_updated: usize,
pub elements_deleted: usize,
pub elements_created: i64,
pub elements_updated: i64,
pub elements_deleted: i64,
}

pub async fn merge_overpass_elements(
Expand All @@ -30,10 +31,11 @@ pub async fn merge_overpass_elements(
// stage 2: find and process updated elements
let updated_element_events = sync_updated_elements(&fresh_overpass_elements, conn).await?;
// stage 3: find and process new elements
sync_new_elements(&fresh_overpass_elements, conn).await?;
let created_element_evnets = sync_new_elements(&fresh_overpass_elements, conn).await?;
Ok(MergeResult {
elements_updated: updated_element_events.len(),
elements_deleted: deleted_element_events.len(),
elements_created: created_element_evnets.len() as i64,
elements_updated: updated_element_events.len() as i64,
elements_deleted: deleted_element_events.len() as i64,
})
}

Expand Down Expand Up @@ -190,9 +192,9 @@ pub async fn sync_updated_elements(
pub async fn sync_new_elements(
fresh_overpass_elements: &Vec<OverpassElement>,
conn: &mut Connection,
) -> Result<()> {
let tx: Transaction = conn.transaction()?;
let cached_elements = Element::select_all(None, &tx)?;
) -> Result<Vec<Event>> {
let mut res = vec![];
let cached_elements = Element::select_all(None, conn)?;
for fresh_element in fresh_overpass_elements {
let btcmap_id = fresh_element.btcmap_id();
let user_id = fresh_element.uid;
Expand All @@ -204,42 +206,39 @@ pub async fn sync_new_elements(
Some(_) => {}
None => {
info!(btcmap_id, "Element does not exist, inserting");

if let Some(user_id) = user_id {
insert_user_if_not_exists(user_id, &tx).await?;
insert_user_if_not_exists(user_id, conn).await?;
}

let element = Element::insert(&fresh_element, &tx)?;

let sp = conn.savepoint()?;
let element = Element::insert(&fresh_element, &sp)?;
let event = Event::insert(
user_id.unwrap().try_into().unwrap(),
element.id,
"create",
&tx,
&sp,
)?;
event::service::on_new_event(&event, &tx).await?;

res.push(event);
let category = element.overpass_data.generate_category();
let android_icon = element.overpass_data.generate_android_icon();

let element =
Element::set_tag(element.id, "category", &category.clone().into(), &tx)?;
Element::set_tag(element.id, "category", &category.clone().into(), &sp)?;
let element = Element::set_tag(
element.id,
"icon:android",
&android_icon.clone().into(),
&tx,
&sp,
)?;

info!(category, android_icon);

element::service::generate_issues(vec![&element], &tx)?;
element::service::generate_areas_mapping_old(&vec![element], &tx)?;
element::service::generate_issues(vec![&element], &sp)?;
element::service::generate_areas_mapping_old(&vec![element], &sp)?;
sp.commit()?;
}
}
}
tx.commit()?;
Ok(())
for event in &res {
event::service::on_new_event(&event, conn).await?;
}
Ok(res)
}

async fn insert_user_if_not_exists(user_id: i64, conn: &Connection) -> Result<()> {
Expand Down

0 comments on commit cdb58a9

Please sign in to comment.