Skip to content

Commit

Permalink
[Mobile Config] Handle entity duplicates in Mobile Radio Tracker (#923)
Browse files Browse the repository at this point in the history
  • Loading branch information
kurotych authored Jan 13, 2025
1 parent cd3f79a commit 8ced78b
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 143 deletions.
37 changes: 20 additions & 17 deletions mobile_config/src/mobile_radio_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ impl MobileRadio {
}

#[derive(Debug, sqlx::FromRow)]
struct TrackedMobileRadio {
entity_key: EntityKey,
hash: String,
last_changed_at: DateTime<Utc>,
last_checked_at: DateTime<Utc>,
pub struct TrackedMobileRadio {
pub entity_key: EntityKey,
pub hash: String,
pub last_changed_at: DateTime<Utc>,
pub last_checked_at: DateTime<Utc>,
}

impl TrackedMobileRadio {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl MobileRadioTracker {
}
}

async fn track_changes(pool: &Pool<Postgres>, metadata: &Pool<Postgres>) -> anyhow::Result<()> {
pub async fn track_changes(pool: &Pool<Postgres>, metadata: &Pool<Postgres>) -> anyhow::Result<()> {
tracing::info!("looking for changes to radios");
let tracked_radios = get_tracked_radios(pool).await?;
let all_mobile_radios = get_all_mobile_radios(metadata);
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn identify_changes(
.await
}

async fn get_tracked_radios(
pub async fn get_tracked_radios(
pool: &Pool<Postgres>,
) -> anyhow::Result<HashMap<EntityKey, TrackedMobileRadio>> {
sqlx::query_as::<_, TrackedMobileRadio>(
Expand All @@ -209,20 +209,23 @@ fn get_all_mobile_radios(metadata: &Pool<Postgres>) -> impl Stream<Item = Mobile
sqlx::query_as::<_, MobileRadio>(
r#"
SELECT
kta.entity_key,
mhi.refreshed_at,
mhi.location::bigint,
mhi.is_full_hotspot::int,
mhi.num_location_asserts,
mhi.is_active::int,
mhi.dc_onboarding_fee_paid::bigint,
mhi.device_type::text,
mhi.deployment_info::text
DISTINCT ON (kta.entity_key, mhi.asset)
kta.entity_key,
mhi.asset,
mhi.refreshed_at,
mhi.location::bigint,
mhi.is_full_hotspot::int,
mhi.num_location_asserts,
mhi.is_active::int,
mhi.dc_onboarding_fee_paid::bigint,
mhi.device_type::text,
mhi.deployment_info::text
FROM key_to_assets kta
INNER JOIN mobile_hotspot_infos mhi ON
kta.asset = mhi.asset
kta.asset = mhi.asset
WHERE kta.entity_key IS NOT NULL
AND mhi.refreshed_at IS NOT NULL
ORDER BY kta.entity_key, mhi.asset, refreshed_at DESC
"#,
)
.fetch(metadata)
Expand Down
133 changes: 133 additions & 0 deletions mobile_config/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use bs58;
use chrono::{DateTime, Duration, Utc};
use helium_crypto::PublicKeyBinary;
use helium_crypto::{KeyTag, Keypair};
use sqlx::PgPool;

pub async fn add_mobile_tracker_record(
pool: &PgPool,
key: PublicKeyBinary,
last_changed_at: DateTime<Utc>,
) {
let b58 = bs58::decode(key.to_string()).into_vec().unwrap();

sqlx::query(
r#"
INSERT INTO
"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at")
VALUES
($1, $2, $3, $4);
"#,
)
.bind(b58)
.bind("hash")
.bind(last_changed_at)
.bind(last_changed_at + Duration::hours(1))
.execute(pool)
.await
.unwrap();
}

#[allow(clippy::too_many_arguments)]
pub async fn add_db_record(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
key: PublicKeyBinary,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
add_mobile_hotspot_infos(
pool,
asset,
location,
device_type,
created_at,
refreshed_at,
deployment_info,
)
.await;
add_asset_key(pool, asset, key).await;
}

pub async fn add_mobile_hotspot_infos(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
sqlx::query(
r#"
INSERT INTO
"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info")
VALUES
($1, $2, $3::jsonb, $4, $5, $6::jsonb);
"#,
)
.bind(asset)
.bind(location)
.bind(device_type)
.bind(created_at)
.bind(refreshed_at)
.bind(deployment_info)
.execute(pool)
.await
.unwrap();
}

pub async fn add_asset_key(pool: &PgPool, asset: &str, key: PublicKeyBinary) {
let b58 = bs58::decode(key.to_string()).into_vec().unwrap();
sqlx::query(
r#"
INSERT INTO
"key_to_assets" ("asset", "entity_key")
VALUES ($1, $2);
"#,
)
.bind(asset)
.bind(b58)
.execute(pool)
.await
.unwrap();
}

pub async fn create_db_tables(pool: &PgPool) {
sqlx::query(
r#"
CREATE TABLE mobile_hotspot_infos (
asset character varying(255) NULL,
location numeric NULL,
device_type jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT NOW(),
refreshed_at timestamptz,
deployment_info jsonb,
is_full_hotspot bool NULL,
num_location_asserts integer NULL,
is_active bool NULL,
dc_onboarding_fee_paid numeric NULL
);"#,
)
.execute(pool)
.await
.unwrap();

sqlx::query(
r#"
CREATE TABLE key_to_assets (
asset character varying(255) NULL,
entity_key bytea NULL
);"#,
)
.execute(pool)
.await
.unwrap();
}

pub fn make_keypair() -> Keypair {
Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng)
}
131 changes: 5 additions & 126 deletions mobile_config/tests/gateway_service.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::vec;

use chrono::{DateTime, Duration, Utc};
use chrono::{Duration, Utc};
use futures::stream::StreamExt;

use helium_crypto::{KeyTag, Keypair, PublicKey, PublicKeyBinary, Sign};
use helium_crypto::{Keypair, PublicKey, Sign};
use helium_proto::services::mobile_config::{
self as proto, gateway_metadata_v2::DeploymentInfo, DeviceType, GatewayClient,
GatewayInfoStreamReqV1, GatewayInfoStreamReqV2, GatewayInfoStreamResV2,
Expand All @@ -18,6 +18,9 @@ use sqlx::PgPool;
use tokio::net::TcpListener;
use tonic::{transport, Code};

pub mod common;
use common::*;

#[sqlx::test]
async fn gateway_info_authorization_errors(pool: PgPool) -> anyhow::Result<()> {
// NOTE(mj): The information we're requesting does not exist in the DB for
Expand Down Expand Up @@ -800,130 +803,6 @@ async fn gateway_stream_info_v2_deployment_info(pool: PgPool) {
}
}

async fn add_mobile_tracker_record(
pool: &PgPool,
key: PublicKeyBinary,
last_changed_at: DateTime<Utc>,
) {
let b58 = bs58::decode(key.to_string()).into_vec().unwrap();

sqlx::query(
r#"
INSERT INTO
"mobile_radio_tracker" ("entity_key", "hash", "last_changed_at", "last_checked_at")
VALUES
($1, $2, $3, $4);
"#,
)
.bind(b58)
.bind("hash")
.bind(last_changed_at)
.bind(last_changed_at + Duration::hours(1))
.execute(pool)
.await
.unwrap();
}

#[allow(clippy::too_many_arguments)]
async fn add_db_record(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
key: PublicKeyBinary,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
add_mobile_hotspot_infos(
pool,
asset,
location,
device_type,
created_at,
refreshed_at,
deployment_info,
)
.await;
add_asset_key(pool, asset, key).await;
}

async fn add_mobile_hotspot_infos(
pool: &PgPool,
asset: &str,
location: i64,
device_type: &str,
created_at: DateTime<Utc>,
refreshed_at: Option<DateTime<Utc>>,
deployment_info: Option<&str>,
) {
sqlx::query(
r#"
INSERT INTO
"mobile_hotspot_infos" ("asset", "location", "device_type", "created_at", "refreshed_at", "deployment_info")
VALUES
($1, $2, $3::jsonb, $4, $5, $6::jsonb);
"#,
)
.bind(asset)
.bind(location)
.bind(device_type)
.bind(created_at)
.bind(refreshed_at)
.bind(deployment_info)
.execute(pool)
.await
.unwrap();
}

async fn add_asset_key(pool: &PgPool, asset: &str, key: PublicKeyBinary) {
let b58 = bs58::decode(key.to_string()).into_vec().unwrap();
sqlx::query(
r#"
INSERT INTO
"key_to_assets" ("asset", "entity_key")
VALUES ($1, $2);
"#,
)
.bind(asset)
.bind(b58)
.execute(pool)
.await
.unwrap();
}

async fn create_db_tables(pool: &PgPool) {
sqlx::query(
r#"
CREATE TABLE mobile_hotspot_infos (
asset character varying(255) NULL,
location numeric NULL,
device_type jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT NOW(),
refreshed_at timestamptz,
deployment_info jsonb
);"#,
)
.execute(pool)
.await
.unwrap();

sqlx::query(
r#"
CREATE TABLE key_to_assets (
asset character varying(255) NULL,
entity_key bytea NULL
);"#,
)
.execute(pool)
.await
.unwrap();
}

fn make_keypair() -> Keypair {
Keypair::generate(KeyTag::default(), &mut rand::rngs::OsRng)
}

fn make_gateway_stream_signed_req_v2(
signer: &Keypair,
device_types: &[DeviceType],
Expand Down
Loading

0 comments on commit 8ced78b

Please sign in to comment.