Skip to content

Commit

Permalink
Add Area query to STA-EDR
Browse files Browse the repository at this point in the history
  • Loading branch information
webb-ben committed Jul 27, 2024
1 parent 397f2fd commit 49a8c78
Showing 1 changed file with 138 additions and 62 deletions.
200 changes: 138 additions & 62 deletions pygeoapi_plugins/provider/sensorthings_edr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@
#
# =================================================================

from json.decoder import JSONDecodeError
import logging

import numpy as np

from pygeoapi.provider.base import (ProviderNoDataError, ProviderQueryError, ProviderConnectionError)
from pygeoapi.provider.base import ProviderNoDataError
from pygeoapi.provider.base_edr import BaseEDRProvider
from pygeoapi.provider.sensorthings import SensorThingsProvider

LOGGER = logging.getLogger(__name__)


class SensorThingsEDRProvider(BaseEDRProvider, SensorThingsProvider):
def __init__(self, provider_def):
"""
Expand All @@ -49,8 +47,8 @@ def __init__(self, provider_def):
"""
provider_def['entity'] = 'ObservedProperties'
BaseEDRProvider.__init__(self, provider_def)
# SensorThingsProvider.__init__(self, provider_def)
self.expand['ObservedProperties'] = 'Datastreams/Thing/Locations,Datastreams/Observations'
SensorThingsProvider.__init__(self, provider_def)
self.expand['ObservedProperties'] = 'Datastreams/Thing/Locations,Datastreams/Observations' # noqa
self.time_field = 'Datastreams/Observations/resultTime'
self._fields = {}
self.get_fields()
Expand Down Expand Up @@ -87,7 +85,7 @@ def items(self, **kwargs):
pass

@BaseEDRProvider.register(output_formats=['GeoJSON'])
def locations(self, select_properties=[], bbox=[],
def locations(self, select_properties=[], bbox=[],
datetime_=None, location_id=None, **kwargs):
"""
Extract data from collection collection
Expand All @@ -99,44 +97,59 @@ def locations(self, select_properties=[], bbox=[],
:returns: GeoJSON FeatureCollection
"""
fc = {'type': 'FeatureCollection', 'features': []}

fc = {
'type': 'FeatureCollection',
'features': []
}

params = {}
expand = None
if location_id:
try:
location_id = int(location_id)
except ValueError:
location_id = f"'{location_id}'"
filter = ''
expand = [
'Datastreams($select=description,name,unitOfMeasurement)',
'Datastreams/Thing([email protected])',
'Datastreams/Thing/Locations($select=location)'
]

properties = [['Datastreams/Thing/@iot.id', location_id]]
params['$filter'] = self._make_filter(properties)
if location_id:
return self.get(location_id)

elif select_properties or datetime_:
if select_properties:
properties = [['@iot.id', f"'{p}'"] for p in select_properties]
_ = self._make_filter(properties, datetime_)
params['$filter'] = _.replace('and name', 'or name')
ret = [f'{name} eq {value}' for (name, value) in properties]
params['$filter'] = ' or '.join(ret)

filter = f'$filter={self._make_dtf(datetime_)};' if datetime_ else ''
expand.append(f'Datastreams/Observations({filter}$select=result;$top=1)') # noqa

if bbox:
geom_filter = self._make_bbox(bbox, 'Datastreams')
expand = f'Datastreams($filter={geom_filter}),Datastreams/Thing/Locations,Datastreams/Observations' # noqa
expand[0] = f'Datastreams($filter={geom_filter})'

expand = ','.join(expand)
response = self._get_response(url=self._url, params=params,
entity='ObservedProperties',
expand=expand)

for property in response['value']:
for datastream in property['Datastreams']:
feature = self._make_feature(datastream['Thing'],
entity='Things')
fc['features'].append(feature)
if len(datastream['Observations']) == 0:
continue

fc['features'].append(
self._make_feature(
datastream['Thing'],
entity='Things'
)
)

if location_id:
return fc['features'][0]
else:
return fc

@BaseEDRProvider.register(output_formats=['CoverageJSON'])
def cube(self, select_properties=[], bbox=[],
def cube(self, select_properties=[], bbox=[],
datetime_=None, **kwargs):
"""
Extract data from collection collection
Expand All @@ -148,6 +161,7 @@ def cube(self, select_properties=[], bbox=[],
:returns: CovJSON CovCollection
"""

cc = {
'type': 'CoverageCollection',
'domainType': 'PointSeries',
Expand All @@ -156,7 +170,7 @@ def cube(self, select_properties=[], bbox=[],
}

params = {}
filter = ''

geom_filter = self._make_bbox(bbox, 'Datastreams')
expand = [
f'Datastreams($filter={geom_filter};$select=description,name,unitOfMeasurement)', # noqa
Expand All @@ -169,47 +183,84 @@ def cube(self, select_properties=[], bbox=[],
ret = [f'{name} eq {value}' for (name, value) in properties]
params['$filter'] = ' or '.join(ret)

if datetime_:
dtf_r = []
if '/' in datetime_:
time_start, time_end = datetime_.split('/')
if time_start != '..':
dtf_r.append(f'phenomenonTime ge {time_start}')
filter = f'$filter={self._make_dtf(datetime_)};' if datetime_ else ''
expand.append(f'Datastreams/Observations({filter}$orderby=phenomenonTime;$select=result,phenomenonTime,resultTime)') # noqa

if time_end != '..':
dtf_r.append(f'phenomenonTime le {time_end}')
expand = ','.join(expand)
response = self._get_response(url=self._url, params=params,
entity='ObservedProperties',
expand=expand)

else:
dtf_r.append(f'phenomenonTime eq {datetime_}')
for feature in response['value']:
try:
_ = feature['Datastreams'][0]
self._make_coverage_collection(feature, cc)
except IndexError:
continue

if cc['parameters'] == {} or cc['coverages'] == []:
msg = 'No data found'
LOGGER.warning(msg)
raise ProviderNoDataError(msg)

dtf = ' and '.join(dtf_r)
filter = f'$filter={dtf};'
return cc

expand.append(f'Datastreams/Observations({filter}$orderby=phenomenonTime;$select=result,phenomenonTime,resultTime)') # noqa
@BaseEDRProvider.register(output_formats=['CoverageJSON'])
def area(self, wkt, select_properties=[],
datetime_=None, **kwargs):
"""
Extract data from collection collection
:param wkt: `shapely.geometry` WKT geometry
:param select_properties: list of parameters
:param datetime_: temporal (datestamp or extent)
:param location_id: location identifier
:returns: CovJSON CovCollection
"""

cc = {
'type': 'CoverageCollection',
'domainType': 'PointSeries',
'parameters': {},
'coverages': []
}

params = {}

expand = [
f"Datastreams($filter=st_within(Thing/Locations/location,geography'{wkt}');$select=description,name,unitOfMeasurement)", # noqa
'Datastreams/Thing([email protected])',
'Datastreams/Thing/Locations($select=location)',
]

if select_properties:
properties = [['@iot.id', f"'{p}'"] for p in select_properties]
ret = [f'{name} eq {value}' for (name, value) in properties]
params['$filter'] = ' or '.join(ret)

filter = f'$filter={self._make_dtf(datetime_)};' if datetime_ else ''
expand.append(f'Datastreams/Observations({filter}$orderby=phenomenonTime;$select=result,phenomenonTime,resultTime)') # noqa

expand = ','.join(expand)
response = self._get_response(url=self._url, params=params,
entity='ObservedProperties',
expand=expand)

for feature in response['value']:
id = feature['name'].replace(' ', '+')
try:
datastream = feature['Datastreams'][0]
_ = feature['Datastreams'][0]
self._make_coverage_collection(feature, cc)
except IndexError:
continue

cc['parameters'][id] = \
self._generate_paramters(datastream, id, feature['name'])

for datastream in feature['Datastreams']:
coverage, length = self._generate_coverage(datastream, id)
if length > 0:
cc['coverages'].append(coverage)
if cc['parameters'] == {} or cc['coverages'] == []:
msg = 'No data found'
LOGGER.warning(msg)
raise ProviderNoDataError(msg)

return cc

def _generate_coverage(self, datastream, id):
times, values = \
self._expand_observations(datastream)
Expand All @@ -233,7 +284,7 @@ def _generate_coverage(self, datastream, id):
'coordinates': ['x', 'y'],
'system': {
'type': 'GeographicCRS',
'id': 'http://www.opengis.net/def/crs/OGC/1.3/CRS84'
'id': 'http://www.opengis.net/def/crs/OGC/1.3/CRS84' # noqa
}
}, {
'coordinates': ['t'],
Expand All @@ -256,31 +307,56 @@ def _generate_coverage(self, datastream, id):
}, length

@staticmethod
def _generate_paramters(datastream, id, label):
def _generate_paramters(datastream, label):
return {
'type': 'Parameter',
'description': {
'en': datastream['description']
},
'description': {'en': datastream['description']},
'observedProperty': {
'id': id,
'label': {
'en': label
}
'id': label,
'label': {'en': label}
},
'unit': {
'label': {
'en': datastream['unitOfMeasurement']['name']
},
'label': {'en': datastream['unitOfMeasurement']['name']},
'symbol': datastream['unitOfMeasurement']['symbol']
}
}

@staticmethod
def _make_dtf(datetime_):
dtf_r = []
if '/' in datetime_:
time_start, time_end = datetime_.split('/')
if time_start != '..':
dtf_r.append(f'phenomenonTime ge {time_start}')

if time_end != '..':
dtf_r.append(f'phenomenonTime le {time_end}')

else:
dtf_r.append(f'phenomenonTime eq {datetime_}')

return ' and '.join(dtf_r)

def _make_coverage_collection(self, feature, cc):

id = feature['name'].replace(' ', '+')

for datastream in feature['Datastreams']:
coverage, length = self._generate_coverage(datastream, id)
if length > 0:
cc['coverages'].append(coverage)
cc['parameters'][id] = \
self._generate_paramters(datastream,
feature['name'])

return cc

@staticmethod
def _expand_observations(datastream):
times = []
values = []
# TODO: Expand observations when '[email protected]' or '@iot.nextLink' is present
# TODO: Expand observations when '[email protected]'
# or '@iot.nextLink' is present
for obs in datastream['Observations']:
resultTime = obs['resultTime'] or obs['phenomenonTime']
if obs['result'] is not None and resultTime:
Expand Down

0 comments on commit 49a8c78

Please sign in to comment.