Skip to content

Commit

Permalink
GH-55 # Replace Alexa by Tranco on API side
Browse files Browse the repository at this point in the history
  • Loading branch information
Hugo-C committed Apr 21, 2024
1 parent 0172327 commit 43db624
Show file tree
Hide file tree
Showing 10 changed files with 1,221 additions and 111 deletions.
914 changes: 815 additions & 99 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ openssl = { version = "~0.10.55", features = ["vendored"] } # Required for sent
sentry = "0.32"
rocket-sentry = "0.17"
redis = "0.25"
log = "0.4"
env_logger = "0.11"
async-std = { version = "1.12", features = ["attributes", "tokio1"] }
reqwest = { version = "0.12.2", features = ["stream"] }
tempfile = "3.10.1"

[dependencies.rocket_db_pools]
version = "0.1.0"
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,22 @@ It takes a required `host` parameter and optionally `port` (default to 443).

### Retrieve domains from alexa top 1 million that match a jarm hash

**DEPRECATED** see `tranco-overlap` instead

````http request
GET api/v1/alexa-overlap?jarm_hash=<jarm-hash>
````

The returned list is ordered by top alexa rank first

### Retrieve domains from tranco top 1 million that match a jarm hash

````http request
GET api/v1/tranco-overlap?jarm_hash=<jarm-hash>
````

The returned list is ordered by top tranco rank first

### Retrieve recently scanned hosts

````http request
Expand Down
29 changes: 29 additions & 0 deletions examples/tranco_top1m.url
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Request for no overlap
GET {{host}}/tranco-overlap?jarm_hash=123

HTTP 200
Content-Type: application/json
[Asserts]
jsonpath "$.overlapping_domains" isEmpty

# Request with 1 overlap
GET {{host}}/tranco-overlap?jarm_hash=3fd3fd20d3fd3fd21c3fd3fd3fd3fd2b66a312d81ed1efa0f55830f7490cb2

HTTP 200
Content-Type: application/json
[Asserts]
jsonpath "$.overlapping_domains" count == 1
jsonpath "$.overlapping_domains.[0].rank" == 9
jsonpath "$.overlapping_domains.[0].domain" == "zhihu.com"

# Request with 2 overlap
GET {{host}}/tranco-overlap?jarm_hash=21d19d00021d21d00021d19d21d21d1a46380b04d662f0848f508dd171125d

HTTP 200
Content-Type: application/json
[Asserts]
jsonpath "$.overlapping_domains" count == 2
jsonpath "$.overlapping_domains.[0].rank" == 11
jsonpath "$.overlapping_domains.[0].domain" == "fake_site_1.com"
jsonpath "$.overlapping_domains.[1].rank" == 12
jsonpath "$.overlapping_domains.[1].domain" == "fake_site_2.com"
65 changes: 58 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@ extern crate rocket;

pub mod utils;
pub mod alexa_top1m;
pub mod tranco_top1m;

use rocket_db_pools::{Connection, deadpool_redis};
use crate::alexa_top1m::{AlexaTop1M, RankedDomain};
use crate::tranco_top1m::{TrancoTop1M};
use crate::tranco_top1m::RankedDomain as TrancoRankedDomain;

use std::env;
use std::path::Path;
use rocket::{Build, Rocket, State};
use rocket::{Build, fairing, Rocket, State};
use rocket::serde::json::Json;
use rust_jarm::Jarm;
use serde::Serialize;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use rocket::fairing::AdHoc;
use rocket::response::status::Custom;
use rocket::http::Status;
use rocket::serde::Deserialize;
use rocket::serde::json::serde_json;
use rust_jarm::error::JarmError;
use rocket_db_pools::{Database};
use rocket_db_pools::deadpool_redis::redis::{AsyncCommands};
use rocket_db_pools::deadpool_redis::redis::AsyncCommands;

pub const DEFAULT_SCAN_TIMEOUT_IN_SECONDS: u64 = 15;
pub const REDIS_LAST_SCAN_LIST_KEY: &str = "redis_last_scan_list_key";
Expand All @@ -27,11 +33,16 @@ pub const LAST_SCAN_SIZE_RETURNED: isize = 10;

#[derive(Database)]
#[database("redis_db")]
struct Db(deadpool_redis::Pool);
pub struct Db(deadpool_redis::Pool);


#[derive(Serialize)]
struct ErrorResponse {
// TODO rename in JarmErrorResponse
error: String,
}

#[derive(Serialize)]
struct JarmErrorResponse {
error_type: String,
error_message: String,
}
Expand All @@ -41,7 +52,7 @@ struct JarmResponse {
host: String,
port: String,
jarm_hash: String,
error: Option<ErrorResponse>,
error: Option<JarmErrorResponse>,
}

#[derive(Serialize, Deserialize)]
Expand All @@ -61,6 +72,11 @@ struct AlexaOverlapResponse {
overlapping_domains: Vec<RankedDomain>,
}

#[derive(Serialize)]
struct TrancoOverlapResponse {
overlapping_domains: Vec<TrancoRankedDomain>,
}

pub fn scan_timeout_in_seconds() -> u64 {
env::var("SCAN_TIMEOUT_IN_SECONDS")
.unwrap_or(DEFAULT_SCAN_TIMEOUT_IN_SECONDS.to_string())
Expand Down Expand Up @@ -129,6 +145,16 @@ fn alexa_overlap(alexa_top1m: &State<AlexaTop1M>, jarm_hash: String) -> Json<Ale
Json(AlexaOverlapResponse { overlapping_domains: overlap })
}

#[get("/?<jarm_hash>")]
async fn tranco_overlap(redis_client: Connection<Db>, jarm_hash: String) -> Result<Json<TrancoOverlapResponse>, Custom<Json<ErrorResponse>>> {
let mut tranco = TrancoTop1M::from(redis_client);
if !tranco.is_initialized().await {
return Err(Custom(Status::ServiceUnavailable, Json(ErrorResponse { error: "db not yet loaded".to_string()})))
}
let overlapping_domains = tranco.get(jarm_hash).await;
Ok(Json(TrancoOverlapResponse { overlapping_domains }))
}

fn build_error_json(jarm_error: JarmError) -> Json<JarmResponse> {
// error_message is a debug view of a an unknown error, to be improved.
let (error_type, error_message) = match jarm_error {
Expand All @@ -146,17 +172,42 @@ fn build_error_json(jarm_error: JarmError) -> Json<JarmResponse> {
host: "".to_string(),
port: "".to_string(),
jarm_hash: "".to_string(),
error: Some(ErrorResponse { error_type, error_message }),
error: Some(JarmErrorResponse { error_type, error_message }),
})
}

pub fn build_rocket() -> Rocket<Build> {
pub fn build_rocket_without_tranco_initialisation() -> Rocket<Build> {
let alexa_top1m = AlexaTop1M::new(&alexa_top1m_raw_data_path())
.expect("AlexaTop1M built correctly");
rocket::build()
.mount("/jarm", routes![jarm])
.mount("/last-scans", routes![last_scans])
.mount("/alexa-overlap", routes![alexa_overlap])
.mount("/tranco-overlap", routes![tranco_overlap])
.attach(Db::init())
.manage(alexa_top1m)
}

pub fn build_rocket() -> Rocket<Build> {
let rocket = build_rocket_without_tranco_initialisation();
rocket.attach(AdHoc::try_on_ignite("Initialize tranco", initialize_tranco_in_redis))
}

async fn initialize_tranco_in_redis(rocket: Rocket<Build>) -> fairing::Result {
let pool = match Db::fetch(&rocket) {
Some(db) => db.0.clone(),
None => return Err(rocket)
};

rocket::tokio::task::spawn(async move {
let connection = match pool.get().await {
Ok(connection) => connection,
Err(_) => return,
};
let mut tranco = TrancoTop1M::new(connection);
tranco.initialize().await;
});
// We don't wait for the initialization to complete.
// This means it can be stopped unexpectedly and must be able to recover from it on the next run
Ok(rocket)
}
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use ::rocket_sentry::RocketSentry;
use env_logger::Env;
use jarm_online::build_rocket;
use rocket::fairing::{Fairing, Info, Kind};
use rocket::http::Header;
Expand All @@ -25,6 +26,7 @@ impl Fairing for CORS {

#[rocket::main]
async fn main() -> Result<(), rocket::Error> {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
build_rocket()
.attach(CORS)
.attach(RocketSentry::fairing())
Expand Down
159 changes: 159 additions & 0 deletions src/tranco_top1m/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use std::{env, io};
use std::error::Error;
use std::fs::File;
use csv::ReaderBuilder;
use rocket_db_pools::Connection as RocketConnection;
use rocket_db_pools::deadpool_redis::redis::{AsyncCommands, RedisError};
use serde::Serialize;
use tempfile::NamedTempFile;
use crate::rocket::futures::StreamExt;

use crate::Db;
use crate::deadpool_redis::Connection;

// Env var key
const FORCE_TRANCO_TOP1M_RAW_DATA_PATH: &str = "FORCE_TRANCO_TOP1M_RAW_DATA_PATH";
const TRANCO_TOP1M_S3_URL: &str = "TRANCO_TOP1M_S3_URL";

// Redis keys
const TRANCO_TOP_1M_JARM_PREFIX_KEY: &str = "trancotop1m:jarm:";
const TRANCO_TOP_1M_INITIALIZED_KEY: &str = "trancotop1m:initialized";

#[derive(PartialEq, Serialize, Clone, Debug)]
pub struct RankedDomain {
pub rank: u64,
pub domain: String,
}

pub struct TrancoTop1M {
redis_client: Connection,
}

impl TrancoTop1M {
pub fn new(redis_client: Connection) -> TrancoTop1M {
TrancoTop1M {
redis_client
}
}

pub fn from(redis_client: RocketConnection<Db>) -> TrancoTop1M {
TrancoTop1M {
redis_client: redis_client.into_inner()
}
}

pub async fn get(&mut self, jarm_hash: String) -> Vec<RankedDomain> {
let key = format!("{TRANCO_TOP_1M_JARM_PREFIX_KEY}:{jarm_hash}");
// Fetch all keys
let values: Vec<String> = self.redis_client.lrange(key, 0, -1).await.unwrap();
let mut res = vec![];
for value in values {
let (rank_as_str, domain_as_str) = value.split_once('#').unwrap();
let rank = rank_as_str.parse::<u64>().unwrap();
let domain = domain_as_str.to_string();
res.push(RankedDomain { rank, domain })
}
res
}

pub async fn is_initialized(&mut self) -> bool {
let result: bool = self.redis_client.exists(TRANCO_TOP_1M_INITIALIZED_KEY).await.unwrap();
result
}

pub async fn initialize(&mut self) {
if self.is_initialized().await {
info!("Tranco already initialized, skipping.");
return;
}
// tmp file get cleared once out of scope, so we need a var to hold it
let mut downloaded_s3_file = NamedTempFile::new().unwrap();

let path = if let Ok(path) = env::var(FORCE_TRANCO_TOP1M_RAW_DATA_PATH) {
info!("Forcing tranco initialisation on {path}");
path
} else {
let url = match env::var(TRANCO_TOP1M_S3_URL) {
Ok(url) => url,
Err(_) => {
warn!("TRANCO_TOP1M_S3_URL env var has to be set for tranco initialization");
warn!("Skipping tranco initialization");
return;
}
};
info!("Downloading tranco initialisation file");
let download_path = downloaded_s3_file.path();
let path = download_path.to_str().unwrap().to_string();
info!("Using temporary file path: {path}");

match Self::download_top_1m_file(url, downloaded_s3_file.as_file_mut()).await {
Ok(_) => info!("Tranco top 1M file downloaded successfully!"),
Err(err) => {
sentry::capture_error(&err);
error!("Failed to download tranco file, aborting.");
return;
}
}
path
};
self.destroy_db().await.unwrap();
let count = match self.add_domains_from_path(path).await {
Ok(count) => count,
Err(_) => {
error!("Failed to add tranco hashes from path");
return;
}
};
match count {
0..=10 => {
error!("Only {count} values found during initialisation, most likely something went wrong.");
error!("Initialisation key will not be set so as to retry on next start.");
}
_ => {
let _: () = self.redis_client.set(TRANCO_TOP_1M_INITIALIZED_KEY, 1).await.unwrap();
info!("Tranco DB successfully initialized, {count} website's jarm hashes loaded");
}
}
}

async fn download_top_1m_file(url: String, file: &mut File) -> Result<(), reqwest::Error> {
let mut byte_stream = reqwest::get(url).await?.bytes_stream();

while let Some(item) = byte_stream.next().await {
io::copy(&mut item?.as_ref(), file).unwrap();
}
Ok(())
}

/// Remove all keys related to Tranco in the Redis DB
pub async fn destroy_db(&mut self) -> Result<(), RedisError> {
// First remove the init key so the other keys are not used in a partial state
let _: () = self.redis_client.del(TRANCO_TOP_1M_INITIALIZED_KEY).await?;

// Then we remove all the jarm hash keys
let pattern = format!("{TRANCO_TOP_1M_JARM_PREFIX_KEY}*");
let keys: Vec<String> = self.redis_client.keys(pattern).await?;
for key in keys {
let _: () = self.redis_client.del(key).await?;
}
Ok(())
}

async fn add_domains_from_path(&mut self, path: String) -> Result<u64, Box<dyn Error>> {
let mut count = 0;
let mut reader = ReaderBuilder::new().has_headers(false).from_path(path)?;
for result in reader.records() {
let record = result?;

let rank = record.get(0).ok_or("No rank provided")?;
let domain = record.get(1).ok_or("No domain provided")?;
let jarm_hash = record.get(2).ok_or("No jarm hash provided")?;

let key = format!("{TRANCO_TOP_1M_JARM_PREFIX_KEY}:{jarm_hash}");
let value = format!("{rank}#{domain}");
let _: () = self.redis_client.rpush(key, value).await?;
count += 1;
}
Ok(count)
}
}
Loading

0 comments on commit 43db624

Please sign in to comment.