Skip to content

Commit

Permalink
Add DB support for weather data + backfill logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber committed Nov 18, 2020
1 parent 69f674e commit b56a8e9
Show file tree
Hide file tree
Showing 7 changed files with 853 additions and 0 deletions.
653 changes: 653 additions & 0 deletions alembic/versions/5f15493966f7_adding_weather_data.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions anyway/db_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Views(object):
markers.km,
markers.km_raw,
markers.km_accurate,
accident_weathers.rain_rate as accident_rain_rate,
road_segments.segment_id as road_segment_id,
road_segments.segment as road_segment_number,
road_segments.from_name || ' - ' || road_segments.to_name as road_segment_name,
Expand Down Expand Up @@ -121,6 +122,7 @@ class Views(object):
LEFT JOIN road_light ON markers.road_light = road_light.id AND markers.accident_year = road_light.year AND markers.provider_code = road_light.provider_code
LEFT JOIN road_control ON markers.road_control = road_control.id AND markers.accident_year = road_control.year AND markers.provider_code = road_control.provider_code
LEFT JOIN weather ON markers.weather = weather.id AND markers.accident_year = weather.year AND markers.provider_code = weather.provider_code
LEFT JOIN accident_weathers ON accident_weathers.provider_code = markers.provider_code AND accident_weathers.accident_id = markers.id AND accident_weathers.accident_year = markers.accident_year
LEFT JOIN road_surface ON markers.road_surface = road_surface.id AND markers.accident_year = road_surface.year AND markers.provider_code = road_surface.provider_code
LEFT JOIN road_object ON markers.road_object = road_object.id AND markers.accident_year = road_object.year AND markers.provider_code = road_object.provider_code
LEFT JOIN object_distance ON markers.object_distance = object_distance.id AND markers.accident_year = object_distance.year AND markers.provider_code = object_distance.provider_code
Expand Down
44 changes: 44 additions & 0 deletions anyway/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class AccidentMarker(MarkerMixin, Base):
cross_direction = Column(Integer())
involved = relationship("Involved")
vehicles = relationship("Vehicle")
weather_data = relationship("AccidentWeather", uselist=False)
video_link = Column(Text())
road1 = Column(Integer())
road2 = Column(Integer())
Expand Down Expand Up @@ -649,6 +650,47 @@ def parse(cls, data):
)


class AccidentWeather(Base):
__tablename__ = "accident_weathers"
id = Column(BigInteger(), primary_key=True)
provider_and_id = Column(BigInteger())
provider_code = Column(Integer())
accident_id = Column(BigInteger())
accident_year = Column(Integer())
rain_rate = Column(Integer())
__table_args__ = (
ForeignKeyConstraint(
[accident_id, provider_code, accident_year],
[AccidentMarker.id, AccidentMarker.provider_code, AccidentMarker.accident_year],
ondelete="CASCADE",
),
Index("accident_id_idx_accident_weather", "accident_id", unique=False),
Index("provider_and_id_idx_accident_weather", "provider_and_id", unique=False),
{},
)

def serialize(self):
return {
"id": self.id,
"provider_code": self.provider_code,
"accident_id": self.accident_id,
"rain_rate": self.rain_rate,
}

# Flask-Login integration
def is_authenticated(self):
return True

def is_active(self):
return True

def is_anonymous(self):
return False

def get_id(self):
return self.id


class DiscussionMarker(MarkerMixin, Base):
__tablename__ = "discussions"
__table_args__ = (
Expand Down Expand Up @@ -1772,6 +1814,7 @@ class AccidentMarkerView(Base):
road_segment_id = Column(Integer())
road_segment_name = Column(Text())
road_segment_number = Column(Integer())
accident_rain_rate = Column(Integer())

def serialize(self):
return {
Expand Down Expand Up @@ -1866,6 +1909,7 @@ def serialize(self):
"longitude": self.longitude,
"x": self.x,
"y": self.y,
"accident_rain_rate": self.accident_rain_rate,
}


Expand Down
3 changes: 3 additions & 0 deletions anyway/parsers/cbs/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sqlalchemy import or_, and_

from anyway.parsers.cbs import preprocessing_cbs_files, importmail_cbs
from anyway.parsers.cbs.weather_data import ensure_accidents_weather_data
from anyway import field_names, localization
from anyway.backend_constants import BE_CONST
from anyway.models import (
Expand Down Expand Up @@ -1146,6 +1147,8 @@ def main(
)
logging.info("Total: {0} items in {1}".format(total, time_delta(started)))

ensure_accidents_weather_data()

create_views()
except Exception as ex:
print("Exception occured while loading the cbs data: {0}".format(str(ex)))
Expand Down
48 changes: 48 additions & 0 deletions anyway/parsers/cbs/weather_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import logging

from anyway.app_and_db import db
from anyway.models import (
AccidentMarker,
AccidentWeather,
)
from anyway.parsers.cbs.weather_interpolator import get_weather


def ensure_accidents_weather_data(start_date=None, filters=None):
"""
:param start_date: Add start date filter to the query that lists accident markers to add weather data to
:param filters: additional filters to add to the query that lists accident markers to add weather data to
This is used mainly for testing - format DD-MM-YYYY
:returns: int representing the number of accidents to which weather data was added
"""
logging.info(f"Ensuring accidents weather data {start_date} {filters}")
query = db.session.query(AccidentMarker).filter(AccidentMarker.weather_data == None)
if start_date:
query = query.filter(AccidentMarker.created > start_date)
if filters is not None:
query = query.filter(*filters)
accident_markers_to_update = query.all()
if accident_markers_to_update:
logging.debug(
f"Found accident markers without weather data. {len(accident_markers_to_update)}"
)
accidents_weather_data = []
for accident_marker in query.all():
weather_data = get_weather(
accident_marker.latitude, accident_marker.longitude, accident_marker.created.isoformat()
)
accidents_weather_data.append(
{
"accident_id": accident_marker.id,
"provider_and_id": accident_marker.provider_and_id,
"provider_code": accident_marker.provider_code,
"accident_year": accident_marker.accident_year,
"rain_rate": weather_data["rain"],
}
)
if accidents_weather_data:
logging.debug(f"Adding weather data to accidents. {accidents_weather_data}")
db.session.bulk_insert_mappings(AccidentWeather, accidents_weather_data)
db.session.commit()
logging.debug("Finished filling accidents weather data")
return len(accident_markers_to_update) if accident_markers_to_update else 0
14 changes: 14 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ def waze_data(from_s3, start_date, end_date):
return ingest_waze_from_api()


@process.command()
@click.option(
"--start_date", default=None, type=valid_date, help="The Start Date - format DD-MM-YYYY"
)
def weather_data(start_date):
"""
Looping on the accidents from the cbs and ensuring they have weather data
Start date can be given to filter out accident before the given date
"""

from anyway.parsers.cbs.weather_data import ensure_accidents_weather_data
ensure_accidents_weather_data(start_date)


@process.command()
@click.argument("filename", type=str, default="static/data/embedded_reports/embedded_reports.csv")
def embedded_reports(filename):
Expand Down
89 changes: 89 additions & 0 deletions tests/test_weather_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging
from datetime import datetime

from sqlalchemy import func

from anyway.app_and_db import db
from anyway.backend_constants import BE_CONST
from anyway.models import (
AccidentMarker,
AccidentWeather,
)
from anyway.parsers.cbs.weather_data import ensure_accidents_weather_data


class TestWeatherData:

def _insert_accident_marker(self, created=None):
logging.info("Inserting test accident marker")

# to not conflict with existing ids find max value and add one
accident_marker_id = db.session.query(func.max(AccidentMarker.id)).one()[0] + 1
logging.debug(f"Calculated id for accident marker: {accident_marker_id}")
accident_marker = AccidentMarker(
id=accident_marker_id,
provider_and_id=0,
provider_code=BE_CONST.CBS_ACCIDENT_TYPE_1_CODE,
accident_year=2020,
latitude=32.0580,
longitude=34.7588,
)
if created:
accident_marker.created = created

db.session.add(accident_marker)
db.session.commit()

return accident_marker_id

def test_ensure_accidents_weather_data(self):
accident_marker_id = self._insert_accident_marker()

logging.debug("Verifying accident marker does not have weather data")
accident_marker = db.session.query(AccidentMarker).filter(AccidentMarker.id == accident_marker_id).one()
assert accident_marker.weather_data is None

# the test DB may have other markers we don't want to add weather data to, so ensuring only our marker
filters = (
AccidentMarker.id == accident_marker_id,
)
number_of_accidents_updated = ensure_accidents_weather_data(filters=filters)
assert number_of_accidents_updated == 1

logging.debug("Verifying weather data added to accident marker")
accident_marker = db.session.query(AccidentMarker).filter(AccidentMarker.id == accident_marker_id).one()
assert accident_marker.weather_data is not None

logging.debug(f"Weather data verified {accident_marker.weather_data}")

logging.debug("Verifying another run of ensure weather data changes nothing")
number_of_accidents_updated = ensure_accidents_weather_data(filters=filters)
assert number_of_accidents_updated == 0

logging.debug("Removing test data")
db.session.query(AccidentMarker).filter(AccidentMarker.id == accident_marker_id).delete()
db.session.query(AccidentWeather).filter(AccidentWeather.id == accident_marker.weather_data.id).delete()
db.session.commit()

def test_ensure_accidents_weather_data_with_date_filter(self):
old_date = datetime(year=2010, day=1, month=1)
new_date = datetime(year=2020, day=1, month=1)
old_accident_marker_id = self._insert_accident_marker(old_date)
new_accident_marker_id = self._insert_accident_marker(new_date)

# the test DB may have other markers we don't want to add weather data to, so ensuring only our marker
filters = (
AccidentMarker.id.in_([old_accident_marker_id, new_accident_marker_id]),
)
filter_date = datetime(year=2015, day=1, month=1)
number_of_accidents_updated = ensure_accidents_weather_data(filter_date, filters)

# only one accident will be updated since only one is after the filter date
assert number_of_accidents_updated == 1

logging.debug("Removing test data")
accident_marker = db.session.query(AccidentMarker).filter(AccidentMarker.id == new_accident_marker_id).one()
db.session.query(AccidentWeather).filter(AccidentWeather.id == accident_marker.weather_data.id).delete()
db.session.query(AccidentMarker).filter(AccidentMarker.id == new_accident_marker_id).delete()
db.session.query(AccidentMarker).filter(AccidentMarker.id == old_accident_marker_id).delete()
db.session.commit()

0 comments on commit b56a8e9

Please sign in to comment.