Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weather sync fix maybe #76

Merged
merged 1 commit into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions freezing/sync/data/athlete.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
from datetime import datetime

from freezing.model import meta
from freezing.model.orm import (
Athlete,
Team,
)
from freezing.model.orm import Athlete, Team
from stravalib import model as sm

from freezing.sync.config import config
from freezing.sync.exc import (
MultipleTeamsError,
NoTeamsError,
)
from freezing.sync.exc import MultipleTeamsError, NoTeamsError

from . import BaseSync, StravaClientForAthlete


Expand Down Expand Up @@ -44,9 +39,13 @@ def sync_athletes(self, max_records: int = None):
if not self.all_done():
self.register_athlete_team(strava_athlete, athlete)
except NoTeamsError as ex:
self.logger.info(f'Athlete "{athlete}" is not on a registered team: {ex}')
self.logger.info(
f'Athlete "{athlete}" is not on a registered team: {ex}'
)
except MultipleTeamsError as ex:
self.logger.info(f'Athlete "{athlete}" is on multiple competition teams: {ex}')
self.logger.info(
f'Athlete "{athlete}" is on multiple competition teams: {ex}'
)
except Exception:
self.logger.exception(
"Error registering athlete {0}".format(athlete), exc_info=True
Expand Down
41 changes: 32 additions & 9 deletions freezing/sync/data/weather.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
from datetime import timedelta
from datetime import datetime, timedelta
from decimal import Decimal
from statistics import mean

from freezing.model import meta, orm
from pytz import timezone
from sqlalchemy import text

from freezing.sync.config import config
Expand Down Expand Up @@ -45,14 +46,15 @@ def sync_weather(
sess.query(orm.RideWeather)
q = text(
"""
select R.id from rides R
select R.id, ST_AsText(G.start_geo) AS start_geo from rides R
join ride_geo G on G.ride_id = R.id
left join ride_weather W on W.ride_id = R.id
where W.ride_id is null
and date(R.start_date) < CURDATE() -- Only include rides from yesterday or before
and time(R.start_date) != '00:00:00' -- Exclude bad entries.
and date_add(CONVERT_TZ(R.start_date, R.timezone, '{0}'), INTERVAL R.elapsed_time SECOND) < (NOW() - INTERVAL 1 HOUR) -- Only include rides that ended over an hour ago
;
"""
""".format(
config.TIMEZONE
)
)

visual_crossing = HistoVisualCrossing(
Expand All @@ -71,15 +73,18 @@ def sync_weather(
break

ride = sess.query(orm.Ride).get(r["id"])
start_geo_wkt = r["start_geo"]
self.logger.info(
"Processing ride: {0} ({1}/{2})".format(ride.id, i, num_rides)
"Processing ride: {0} ({1}/{2}) ({3})".format(
ride.id, i, num_rides, start_geo_wkt
)
)

try:
# If you can't reproduce the ancient infrastructure required by all this and so can't run any of the
# geoalchemy stuff you can hardcode this to debug
# start_geo_wkt = "POINT(-76.96 38.96)"
start_geo_wkt = meta.scoped_session().scalar(ride.geo.start_geo.wkt)
# start_geo_wkt = meta.scoped_session().scalar(ride.geo.start_geo.wkt)
point = parse_point_wkt(start_geo_wkt)

# We round lat/lon to decrease the granularity and allow better re-use of cache data.
Expand All @@ -93,13 +98,31 @@ def sync_weather(
)
)

ride_today = datetime.now(timezone(ride.timezone))
start_date = ride.start_date.replace(tzinfo=timezone(ride.timezone))
fetch_date = start_date + timedelta(seconds=ride.elapsed_time)
# For caching purposes we're saying we want weather as of the end of the ride, so if
# we have weather from earlier in the day we don't use it. Because we're lame and
# don't want to handle rides that span midnight, we max to 23:59 of the day. If
# we are fetching old weather, we also just ask for the end of the day so we will
# use the latest cache file.
if (
fetch_date.date() < ride_today.date()
or fetch_date.date() != start_date.date()
):
fetch_date = start_date.replace(
hour=23, minute=59, second=0, microsecond=0
)

# VC gives us back weather in the timezone of the lat/lon that we asked. So we ask for
# weather in the ride-local date and interpret times accordingly.
hist = visual_crossing.histo_forecast(
time=ride.start_date, latitude=lat, longitude=lon
time=fetch_date, latitude=lat, longitude=lon
)

self.logger.debug("Got response in timezone {0}".format(hist.timezone))

ride_start = ride.start_date.replace(tzinfo=hist.timezone)
ride_start = start_date.astimezone(tz=hist.timezone)
ride_end = ride_start + timedelta(seconds=ride.elapsed_time)

# NOTE: if elapsed_time is significantly more than moving_time then we need to assume
Expand Down
6 changes: 3 additions & 3 deletions freezing/sync/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ def segmented_sync_activities():
# This should generally not pick up anytihng.
scheduler.add_job(activity_sync.sync_rides_detail, "cron", minute="20")

# Sync weather at 8am UTC
scheduler.add_job(weather_sync.sync_weather, "cron", hour="8")
# Sync weather every hour
scheduler.add_job(weather_sync.sync_weather, "cron", minute="45")

# Sync athletes once a day at 6am UTC
# Sync athletes every hour
scheduler.add_job(athlete_sync.sync_athletes, "cron", minute="30")

scheduler.start()
Expand Down
2 changes: 1 addition & 1 deletion freezing/sync/wx/visualcrossing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _cache_file(self, time: datetime, longitude: float, latitude: float):
if not self.cache_dir:
return None # where are all the monads
directory = os.path.join(self.cache_dir, f"{longitude}x{latitude}")
return os.path.join(directory, f'{time.strftime("%Y-%m-%d")}.json')
return os.path.join(directory, f'{time.strftime("%Y-%m-%dT%H")}.json')

def _get_cached(self, path: str, fetch):
if not path:
Expand Down
Loading