From 49a8c785a20ebf3986ecb2d4da9ccfcd06826033 Mon Sep 17 00:00:00 2001 From: Benjamin Webb <40066515+webb-ben@users.noreply.github.com> Date: Sat, 27 Jul 2024 16:45:26 -0400 Subject: [PATCH] Add Area query to STA-EDR --- pygeoapi_plugins/provider/sensorthings_edr.py | 200 ++++++++++++------ 1 file changed, 138 insertions(+), 62 deletions(-) diff --git a/pygeoapi_plugins/provider/sensorthings_edr.py b/pygeoapi_plugins/provider/sensorthings_edr.py index 83c3782..c190b5e 100644 --- a/pygeoapi_plugins/provider/sensorthings_edr.py +++ b/pygeoapi_plugins/provider/sensorthings_edr.py @@ -27,17 +27,15 @@ # # ================================================================= -from json.decoder import JSONDecodeError import logging -import numpy as np - -from pygeoapi.provider.base import (ProviderNoDataError, ProviderQueryError, ProviderConnectionError) +from pygeoapi.provider.base import ProviderNoDataError from pygeoapi.provider.base_edr import BaseEDRProvider from pygeoapi.provider.sensorthings import SensorThingsProvider LOGGER = logging.getLogger(__name__) + class SensorThingsEDRProvider(BaseEDRProvider, SensorThingsProvider): def __init__(self, provider_def): """ @@ -49,8 +47,8 @@ def __init__(self, provider_def): """ provider_def['entity'] = 'ObservedProperties' BaseEDRProvider.__init__(self, provider_def) - # SensorThingsProvider.__init__(self, provider_def) - self.expand['ObservedProperties'] = 'Datastreams/Thing/Locations,Datastreams/Observations' + SensorThingsProvider.__init__(self, provider_def) + self.expand['ObservedProperties'] = 'Datastreams/Thing/Locations,Datastreams/Observations' # noqa self.time_field = 'Datastreams/Observations/resultTime' self._fields = {} self.get_fields() @@ -87,7 +85,7 @@ def items(self, **kwargs): pass @BaseEDRProvider.register(output_formats=['GeoJSON']) - def locations(self, select_properties=[], bbox=[], + def locations(self, select_properties=[], bbox=[], datetime_=None, location_id=None, **kwargs): """ Extract data from collection collection @@ -99,44 +97,59 @@ def locations(self, select_properties=[], bbox=[], :returns: GeoJSON FeatureCollection """ - fc = {'type': 'FeatureCollection', 'features': []} + + fc = { + 'type': 'FeatureCollection', + 'features': [] + } + params = {} - expand = None - if location_id: - try: - location_id = int(location_id) - except ValueError: - location_id = f"'{location_id}'" + filter = '' + expand = [ + 'Datastreams($select=description,name,unitOfMeasurement)', + 'Datastreams/Thing($select=@iot.id)', + 'Datastreams/Thing/Locations($select=location)' + ] - properties = [['Datastreams/Thing/@iot.id', location_id]] - params['$filter'] = self._make_filter(properties) + if location_id: + return self.get(location_id) - elif select_properties or datetime_: + if select_properties: properties = [['@iot.id', f"'{p}'"] for p in select_properties] - _ = self._make_filter(properties, datetime_) - params['$filter'] = _.replace('and name', 'or name') + ret = [f'{name} eq {value}' for (name, value) in properties] + params['$filter'] = ' or '.join(ret) + + filter = f'$filter={self._make_dtf(datetime_)};' if datetime_ else '' + expand.append(f'Datastreams/Observations({filter}$select=result;$top=1)') # noqa if bbox: geom_filter = self._make_bbox(bbox, 'Datastreams') - expand = f'Datastreams($filter={geom_filter}),Datastreams/Thing/Locations,Datastreams/Observations' # noqa + expand[0] = f'Datastreams($filter={geom_filter})' + expand = ','.join(expand) response = self._get_response(url=self._url, params=params, entity='ObservedProperties', expand=expand) for property in response['value']: for datastream in property['Datastreams']: - feature = self._make_feature(datastream['Thing'], - entity='Things') - fc['features'].append(feature) + if len(datastream['Observations']) == 0: + continue + + fc['features'].append( + self._make_feature( + datastream['Thing'], + entity='Things' + ) + ) if location_id: return fc['features'][0] else: return fc - + @BaseEDRProvider.register(output_formats=['CoverageJSON']) - def cube(self, select_properties=[], bbox=[], + def cube(self, select_properties=[], bbox=[], datetime_=None, **kwargs): """ Extract data from collection collection @@ -148,6 +161,7 @@ def cube(self, select_properties=[], bbox=[], :returns: CovJSON CovCollection """ + cc = { 'type': 'CoverageCollection', 'domainType': 'PointSeries', @@ -156,7 +170,7 @@ def cube(self, select_properties=[], bbox=[], } params = {} - filter = '' + geom_filter = self._make_bbox(bbox, 'Datastreams') expand = [ f'Datastreams($filter={geom_filter};$select=description,name,unitOfMeasurement)', # noqa @@ -169,24 +183,64 @@ def cube(self, select_properties=[], bbox=[], ret = [f'{name} eq {value}' for (name, value) in properties] params['$filter'] = ' or '.join(ret) - if datetime_: - dtf_r = [] - if '/' in datetime_: - time_start, time_end = datetime_.split('/') - if time_start != '..': - dtf_r.append(f'phenomenonTime ge {time_start}') + filter = f'$filter={self._make_dtf(datetime_)};' if datetime_ else '' + expand.append(f'Datastreams/Observations({filter}$orderby=phenomenonTime;$select=result,phenomenonTime,resultTime)') # noqa - if time_end != '..': - dtf_r.append(f'phenomenonTime le {time_end}') + expand = ','.join(expand) + response = self._get_response(url=self._url, params=params, + entity='ObservedProperties', + expand=expand) - else: - dtf_r.append(f'phenomenonTime eq {datetime_}') + for feature in response['value']: + try: + _ = feature['Datastreams'][0] + self._make_coverage_collection(feature, cc) + except IndexError: + continue + + if cc['parameters'] == {} or cc['coverages'] == []: + msg = 'No data found' + LOGGER.warning(msg) + raise ProviderNoDataError(msg) - dtf = ' and '.join(dtf_r) - filter = f'$filter={dtf};' + return cc - expand.append(f'Datastreams/Observations({filter}$orderby=phenomenonTime;$select=result,phenomenonTime,resultTime)') # noqa + @BaseEDRProvider.register(output_formats=['CoverageJSON']) + def area(self, wkt, select_properties=[], + datetime_=None, **kwargs): + """ + Extract data from collection collection + :param wkt: `shapely.geometry` WKT geometry + :param select_properties: list of parameters + :param datetime_: temporal (datestamp or extent) + :param location_id: location identifier + + :returns: CovJSON CovCollection + """ + + cc = { + 'type': 'CoverageCollection', + 'domainType': 'PointSeries', + 'parameters': {}, + 'coverages': [] + } + + params = {} + + expand = [ + f"Datastreams($filter=st_within(Thing/Locations/location,geography'{wkt}');$select=description,name,unitOfMeasurement)", # noqa + 'Datastreams/Thing($select=@iot.id)', + 'Datastreams/Thing/Locations($select=location)', + ] + + if select_properties: + properties = [['@iot.id', f"'{p}'"] for p in select_properties] + ret = [f'{name} eq {value}' for (name, value) in properties] + params['$filter'] = ' or '.join(ret) + + filter = f'$filter={self._make_dtf(datetime_)};' if datetime_ else '' + expand.append(f'Datastreams/Observations({filter}$orderby=phenomenonTime;$select=result,phenomenonTime,resultTime)') # noqa expand = ','.join(expand) response = self._get_response(url=self._url, params=params, @@ -194,22 +248,19 @@ def cube(self, select_properties=[], bbox=[], expand=expand) for feature in response['value']: - id = feature['name'].replace(' ', '+') try: - datastream = feature['Datastreams'][0] + _ = feature['Datastreams'][0] + self._make_coverage_collection(feature, cc) except IndexError: continue - - cc['parameters'][id] = \ - self._generate_paramters(datastream, id, feature['name']) - for datastream in feature['Datastreams']: - coverage, length = self._generate_coverage(datastream, id) - if length > 0: - cc['coverages'].append(coverage) + if cc['parameters'] == {} or cc['coverages'] == []: + msg = 'No data found' + LOGGER.warning(msg) + raise ProviderNoDataError(msg) return cc - + def _generate_coverage(self, datastream, id): times, values = \ self._expand_observations(datastream) @@ -233,7 +284,7 @@ def _generate_coverage(self, datastream, id): 'coordinates': ['x', 'y'], 'system': { 'type': 'GeographicCRS', - 'id': 'http://www.opengis.net/def/crs/OGC/1.3/CRS84' + 'id': 'http://www.opengis.net/def/crs/OGC/1.3/CRS84' # noqa } }, { 'coordinates': ['t'], @@ -256,31 +307,56 @@ def _generate_coverage(self, datastream, id): }, length @staticmethod - def _generate_paramters(datastream, id, label): + def _generate_paramters(datastream, label): return { 'type': 'Parameter', - 'description': { - 'en': datastream['description'] - }, + 'description': {'en': datastream['description']}, 'observedProperty': { - 'id': id, - 'label': { - 'en': label - } + 'id': label, + 'label': {'en': label} }, 'unit': { - 'label': { - 'en': datastream['unitOfMeasurement']['name'] - }, + 'label': {'en': datastream['unitOfMeasurement']['name']}, 'symbol': datastream['unitOfMeasurement']['symbol'] } } + @staticmethod + def _make_dtf(datetime_): + dtf_r = [] + if '/' in datetime_: + time_start, time_end = datetime_.split('/') + if time_start != '..': + dtf_r.append(f'phenomenonTime ge {time_start}') + + if time_end != '..': + dtf_r.append(f'phenomenonTime le {time_end}') + + else: + dtf_r.append(f'phenomenonTime eq {datetime_}') + + return ' and '.join(dtf_r) + + def _make_coverage_collection(self, feature, cc): + + id = feature['name'].replace(' ', '+') + + for datastream in feature['Datastreams']: + coverage, length = self._generate_coverage(datastream, id) + if length > 0: + cc['coverages'].append(coverage) + cc['parameters'][id] = \ + self._generate_paramters(datastream, + feature['name']) + + return cc + @staticmethod def _expand_observations(datastream): times = [] values = [] - # TODO: Expand observations when 'Observations@iot.nextLink' or '@iot.nextLink' is present + # TODO: Expand observations when 'Observations@iot.nextLink' + # or '@iot.nextLink' is present for obs in datastream['Observations']: resultTime = obs['resultTime'] or obs['phenomenonTime'] if obs['result'] is not None and resultTime: