Skip to content

Commit

Permalink
Cached parsed forecasts
Browse files Browse the repository at this point in the history
  • Loading branch information
kellpossible committed Jan 19, 2024
1 parent 5f8c56f commit 290a41f
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 26 deletions.
20 changes: 13 additions & 7 deletions forecast-spreadsheet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl Display for ParseCellError {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
pub struct Version {
pub major: u8,
pub minor: u8,
Expand All @@ -179,6 +179,12 @@ pub enum ParseVersionError {
ParseInt(#[from] ParseIntError),
}

impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}

impl FromStr for Version {
type Err = ParseVersionError;

Expand Down Expand Up @@ -209,7 +215,7 @@ impl FromStr for Version {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ElevationRange {
pub upper: Option<i64>,
pub lower: Option<i64>,
Expand Down Expand Up @@ -370,12 +376,12 @@ fn parse_aspects(input: &str) -> std::result::Result<IndexSet<Aspect>, ParseAspe
.collect()
}

#[derive(Debug, Serialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AspectElevation {
pub aspects: IndexSet<Aspect>,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct AvalancheProblem {
pub kind: ProblemKind,
pub aspect_elevation: IndexMap<ElevationBandId, AspectElevation>,
Expand Down Expand Up @@ -552,14 +558,14 @@ mod size {
pub use size::Size;
use time_tz::{Offset, TimeZone};

#[derive(Debug, Serialize, Clone)]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct HazardRating {
pub value: Option<HazardRatingValue>,
pub trend: Option<Trend>,
pub confidence: Option<Confidence>,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Forecast {
pub template_version: Version,
pub area: AreaId,
Expand All @@ -581,7 +587,7 @@ pub struct Forecast {
pub elevation_bands: IndexMap<ElevationBandId, ElevationRange>,
}

#[derive(Debug, Serialize, Clone)]
#[derive(Debug, Serialize, Clone, Deserialize)]
pub struct Forecaster {
pub name: String,
pub organisation: Option<String>,
Expand Down
5 changes: 5 additions & 0 deletions src/database/migrations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ fn list_migrations() -> Vec<Migration> {
name: "forecast_files",
kind: MigrationKind::Sql(include_str!("v4_forecast_files.sql")),
},
Migration {
version: 5,
name: "forecast_json_cache",
kind: MigrationKind::Sql(include_str!("v5_forecast_json_cache.sql")),
},
]
}

Expand Down
4 changes: 4 additions & 0 deletions src/database/migrations/v5_forecast_json_cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE forecast_files
ADD COLUMN parsed_forecast JSON;
ALTER TABLE forecast_files
ADD COLUMN schema_version TEXT;
40 changes: 35 additions & 5 deletions src/forecasts/files.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use rusqlite::Row;
use std::str::FromStr;

use rusqlite::{types::Type, Row};
use sea_query::SimpleExpr;

use crate::{database::DATETIME_FORMAT, types};
Expand All @@ -9,23 +11,32 @@ pub struct ForecastFiles {
pub google_drive_id: String,
pub last_modified: types::Time,
pub file_blob: Vec<u8>,
pub parsed_forecast: Option<forecast_spreadsheet::Forecast>,
pub schema_version: Option<forecast_spreadsheet::Version>,
}

impl ForecastFiles {
pub const COLUMNS: [ForecastFilesIden; 3] = [
pub const COLUMNS: [ForecastFilesIden; 5] = [
ForecastFilesIden::GoogleDriveId,
ForecastFilesIden::LastModified,
ForecastFilesIden::FileBlob,
ForecastFilesIden::ParsedForecast,
ForecastFilesIden::SchemaVersion,
];
pub const TABLE: ForecastFilesIden = ForecastFilesIden::Table;

pub fn values(self) -> [SimpleExpr; 3] {
pub fn values(self) -> eyre::Result<[SimpleExpr; 5]> {
let last_modified = self.last_modified_value();
[
Ok([
self.google_drive_id.into(),
last_modified,
self.file_blob.into(),
]
self.parsed_forecast
.map(|f| serde_json::to_string(&f))
.transpose()?
.into(),
self.schema_version.map(|v| v.to_string()).into(),
])
}

pub fn last_modified_value(&self) -> SimpleExpr {
Expand All @@ -43,6 +54,8 @@ impl AsRef<str> for ForecastFilesIden {
Self::GoogleDriveId => "google_drive_id",
Self::LastModified => "last_modified",
Self::FileBlob => "file_blob",
Self::ParsedForecast => "parsed_forecast",
Self::SchemaVersion => "schema_version",
}
}
}
Expand All @@ -53,11 +66,28 @@ impl TryFrom<&Row<'_>> for ForecastFiles {
let google_drive_id = row.get(ForecastFilesIden::GoogleDriveId.as_ref())?;
let last_modified = row.get(ForecastFilesIden::LastModified.as_ref())?;
let file_blob = row.get(ForecastFilesIden::FileBlob.as_ref())?;
let parsed_forecast: Option<serde_json::Value> =
row.get(ForecastFilesIden::ParsedForecast.as_ref())?;
let schema_version: Option<String> = row.get(ForecastFilesIden::SchemaVersion.as_ref())?;

Ok(ForecastFiles {
google_drive_id,
last_modified,
file_blob,
parsed_forecast: parsed_forecast
.map(|f| {
serde_json::from_value(f).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(4, Type::Text, Box::new(e))
})
})
.transpose()?,
schema_version: schema_version
.map(|v| {
forecast_spreadsheet::Version::from_str(&v).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(5, Type::Text, Box::new(e))
})
})
.transpose()?,
})
}
}
101 changes: 87 additions & 14 deletions src/forecasts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ pub async fn get_forecast_data(
}
}
let google_drive_id = file_metadata.id.clone();
let cached_forecast_file: Option<Vec<u8>> = database
let cached_forecast_file: Option<ForecastFiles> = database
.interact(move |conn| {
let mut query = sea_query::Query::select();

Expand Down Expand Up @@ -477,19 +477,25 @@ pub async fn get_forecast_data(
// This logic is a bit buggy on google's side it seems, sometimes they change document
// but don't update modified time.
if cached_last_modified == *server_last_modified {
Some(cached_forecast_file.file_blob)
Some(cached_forecast_file)
} else {
tracing::debug!("Found cached forecast file, but it's outdated");
None
}
});

let forecast_file_bytes: Vec<u8> = if let Some(cached_forecast_file) = cached_forecast_file {
let forecast_file: ForecastFiles = if let Some(cached_forecast_file) = cached_forecast_file {
tracing::debug!("Using cached forecast file");
cached_forecast_file
} else {
tracing::debug!("Fetching updated/new forecast file");
let forecast_file_bytes: Vec<u8> = match requested {
let (forecast_file_bytes, forecast): (
Vec<u8>,
Option<(
forecast_spreadsheet::Forecast,
forecast_spreadsheet::Version,
)>,
) = match requested {
RequestedForecastData::Forecast => {
let file = google_drive::export_file(
&file_metadata.id,
Expand All @@ -498,42 +504,69 @@ pub async fn get_forecast_data(
client,
)
.await?;
file.bytes().await?.into()
let forecast_file_bytes: Vec<u8> = file.bytes().await?.into();
let forecast: forecast_spreadsheet::Forecast =
forecast_spreadsheet::parse_excel_spreadsheet(
&forecast_file_bytes,
&*FORECAST_SCHEMA,
)
.context("Error parsing forecast spreadsheet")?;

(
forecast_file_bytes,
Some((forecast, FORECAST_SCHEMA.schema_version.clone())),
)
}
RequestedForecastData::File => {
let file =
google_drive::get_file(&file_metadata.id, google_drive_api_key, client).await?;
file.bytes().await?.into()
(file.bytes().await?.into(), None)
}
};
let forecast_files_db = ForecastFiles {
let forecast_file_db = ForecastFiles {
google_drive_id: file_metadata.id.clone(),
last_modified: file_metadata.modified_time.clone().into(),
file_blob: forecast_file_bytes.clone(),
parsed_forecast: forecast.as_ref().map(|f| f.0.clone()),
schema_version: forecast.as_ref().map(|f| f.1.clone()),
};
let forecast_file_db_query = forecast_file_db.clone();
tracing::debug!("Updating cached forecast file");
database
.interact(move |conn| {
let mut query = sea_query::Query::insert();

let values = forecast_files_db.values();
let values = forecast_file_db_query.values()?;
query
.into_table(ForecastFiles::TABLE)
.columns(ForecastFiles::COLUMNS)
.values(values)?;

let excluded_table: Alias = Alias::new("excluded");
let excluded_column: Alias = Alias::new("excluded");
query.on_conflict(
OnConflict::column(ForecastFilesIden::GoogleDriveId)
.values([
(
ForecastFilesIden::LastModified,
(excluded_table.clone(), ForecastFilesIden::LastModified)
(excluded_column.clone(), ForecastFilesIden::LastModified)
.into_column_ref()
.into(),
),
(
ForecastFilesIden::FileBlob,
(excluded_table, ForecastFilesIden::FileBlob)
(excluded_column.clone(), ForecastFilesIden::FileBlob)
.into_column_ref()
.into(),
),
(
ForecastFilesIden::ParsedForecast,
(excluded_column.clone(), ForecastFilesIden::ParsedForecast)
.into_column_ref()
.into(),
),
(
ForecastFilesIden::SchemaVersion,
(excluded_column, ForecastFilesIden::SchemaVersion)
.into_column_ref()
.into(),
),
Expand All @@ -546,21 +579,61 @@ pub async fn get_forecast_data(
Result::<_, eyre::Error>::Ok(())
})
.await??;
forecast_file_bytes
forecast_file_db
};

match requested {
RequestedForecastData::Forecast => {
if let Some(forecast) = forecast_file.parsed_forecast {
if forecast_file.schema_version == Some(FORECAST_SCHEMA.schema_version) {
tracing::debug!("Re-using parsed forecast");
return Ok(ForecastData::Forecast(forecast));
} else {
tracing::warn!(
"Cached forecast schema version {:?} doesn't match current {:?}",
forecast_file.schema_version,
FORECAST_SCHEMA.schema_version
);
}
}
tracing::debug!("Re-parsing forecast");
let forecast: forecast_spreadsheet::Forecast =
forecast_spreadsheet::parse_excel_spreadsheet(
&forecast_file_bytes,
&forecast_file.file_blob,
&*FORECAST_SCHEMA,
)
.context("Error parsing forecast spreadsheet")?;

let json_forecast: serde_json::Value = serde_json::to_value(forecast.clone())?;
tracing::debug!("Updating cached parsed forecast and schema version");
database
.interact(move |conn| {
let mut query = sea_query::Query::update();
query
.table(ForecastFiles::TABLE)
.values([
(
ForecastFilesIden::ParsedForecast,
Some(json_forecast).into(),
),
(
ForecastFilesIden::SchemaVersion,
Some(FORECAST_SCHEMA.schema_version.to_string()).into(),
),
])
.and_where(
Expr::col(ForecastFilesIden::GoogleDriveId)
.eq(forecast_file.google_drive_id),
);
let (sql, values) = query.build_rusqlite(SqliteQueryBuilder);
conn.execute(&sql, &*values.as_params())?;
Result::<_, eyre::Error>::Ok(())
})
.await??;

Ok(ForecastData::Forecast(forecast))
}
RequestedForecastData::File => Ok(ForecastData::File(forecast_file_bytes)),
RequestedForecastData::File => Ok(ForecastData::File(forecast_file.file_blob)),
}
}

Expand Down

0 comments on commit 290a41f

Please sign in to comment.