-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
74 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,11 +39,9 @@ | |
class SensorThingsEDRProvider(BaseEDRProvider, SensorThingsProvider): | ||
def __init__(self, provider_def): | ||
""" | ||
Initialize object | ||
Initialize the SensorThingsEDRProvider instance. | ||
:param provider_def: provider definition | ||
:returns: pygeoapi.provider.rasterio_.RasterioProvider | ||
""" | ||
provider_def['entity'] = 'ObservedProperties' | ||
BaseEDRProvider.__init__(self, provider_def) | ||
|
@@ -54,6 +52,11 @@ def __init__(self, provider_def): | |
self.get_fields() | ||
|
||
def get_fields(self): | ||
""" | ||
Retrieve and store ObservedProperties field definitions. | ||
:returns: A dictionary mapping field IDs to their properties. | ||
""" | ||
if not self._fields: | ||
r = self._get_response(self._url, | ||
entity='ObservedProperties', | ||
|
@@ -82,22 +85,27 @@ def get_fields(self): | |
|
||
@BaseEDRProvider.register(output_formats=['GeoJSON']) | ||
def items(self, **kwargs): | ||
""" | ||
Retrieve a collection of items. | ||
:param kwargs: Additional parameters for the request. | ||
:returns: A GeoJSON representation of the items. | ||
""" | ||
pass | ||
|
||
@BaseEDRProvider.register(output_formats=['GeoJSON']) | ||
def locations(self, select_properties=[], bbox=[], | ||
datetime_=None, location_id=None, **kwargs): | ||
""" | ||
Extract data from collection collection | ||
Extract and return location data from ObservedProperties. | ||
:param datetime_: temporal (datestamp or extent) | ||
:param select_properties: list of parameters | ||
:param bbox: bbox geometry (for cube queries) | ||
:param location_id: location identifier | ||
:param select_properties: List of properties to include. | ||
:param bbox: Bounding box geometry for spatial queries. | ||
:param datetime_: Temporal filter for observations. | ||
:param location_id: Identifier of the location to filter by. | ||
:returns: GeoJSON FeatureCollection | ||
:returns: A GeoJSON FeatureCollection of locations. | ||
""" | ||
|
||
fc = { | ||
'type': 'FeatureCollection', | ||
'features': [] | ||
|
@@ -152,16 +160,14 @@ def locations(self, select_properties=[], bbox=[], | |
def cube(self, select_properties=[], bbox=[], | ||
datetime_=None, **kwargs): | ||
""" | ||
Extract data from collection collection | ||
Extract and return coverage data from ObservedProperties. | ||
:param datetime_: temporal (datestamp or extent) | ||
:param select_properties: list of parameters | ||
:param bbox: bbox geometry (for cube queries) | ||
:param location_id: location identifier | ||
:param select_properties: List of properties to include. | ||
:param bbox: Bounding box geometry for spatial queries. | ||
:param datetime_: Temporal filter for observations. | ||
:returns: CovJSON CovCollection | ||
:returns: A CovJSON CoverageCollection. | ||
""" | ||
|
||
cc = { | ||
'type': 'CoverageCollection', | ||
'domainType': 'PointSeries', | ||
|
@@ -197,7 +203,7 @@ def cube(self, select_properties=[], bbox=[], | |
self._make_coverage_collection(feature, cc) | ||
except IndexError: | ||
continue | ||
|
||
if cc['parameters'] == {} or cc['coverages'] == []: | ||
msg = 'No data found' | ||
LOGGER.warning(msg) | ||
|
@@ -209,16 +215,16 @@ def cube(self, select_properties=[], bbox=[], | |
def area(self, wkt, select_properties=[], | ||
datetime_=None, **kwargs): | ||
""" | ||
Extract data from collection collection | ||
Extract and return coverage data from a specified area. | ||
:param wkt: `shapely.geometry` WKT geometry | ||
:param select_properties: list of parameters | ||
:param datetime_: temporal (datestamp or extent) | ||
:param location_id: location identifier | ||
:param wkt: Well-Known Text (WKT) representation of the | ||
geometry for the area. | ||
:param select_properties: List of properties to include | ||
in the response. | ||
:param datetime_: Temporal filter for observations. | ||
:returns: CovJSON CovCollection | ||
:returns: A CovJSON CoverageCollection. | ||
""" | ||
|
||
cc = { | ||
'type': 'CoverageCollection', | ||
'domainType': 'PointSeries', | ||
|
@@ -253,7 +259,7 @@ def area(self, wkt, select_properties=[], | |
self._make_coverage_collection(feature, cc) | ||
except IndexError: | ||
continue | ||
|
||
if cc['parameters'] == {} or cc['coverages'] == []: | ||
msg = 'No data found' | ||
LOGGER.warning(msg) | ||
|
@@ -262,6 +268,14 @@ def area(self, wkt, select_properties=[], | |
return cc | ||
|
||
def _generate_coverage(self, datastream, id): | ||
""" | ||
Generate a coverage object for a datastream. | ||
:param datastream: The datastream data to generate coverage for. | ||
:param id: The ID to use for the coverage. | ||
:returns: A tuple containing the coverage object and size. | ||
""" | ||
times, values = \ | ||
self._expand_observations(datastream) | ||
thing = datastream['Thing'] | ||
|
@@ -308,6 +322,14 @@ def _generate_coverage(self, datastream, id): | |
|
||
@staticmethod | ||
def _generate_paramters(datastream, label): | ||
""" | ||
Generate parameters for a given datastream. | ||
:param datastream: The datastream data to generate parameters for. | ||
:param label: The label for the parameter. | ||
:returns: A dictionary containing the parameter definition. | ||
""" | ||
return { | ||
'type': 'Parameter', | ||
'description': {'en': datastream['description']}, | ||
|
@@ -320,9 +342,16 @@ def _generate_paramters(datastream, label): | |
'symbol': datastream['unitOfMeasurement']['symbol'] | ||
} | ||
} | ||
|
||
@staticmethod | ||
def _make_dtf(datetime_): | ||
""" | ||
Create a Date-Time filter for querying. | ||
:param datetime_: Temporal filter. | ||
:returns: A string datetime filter for use in queries. | ||
""" | ||
dtf_r = [] | ||
if '/' in datetime_: | ||
time_start, time_end = datetime_.split('/') | ||
|
@@ -336,23 +365,38 @@ def _make_dtf(datetime_): | |
dtf_r.append(f'phenomenonTime eq {datetime_}') | ||
|
||
return ' and '.join(dtf_r) | ||
|
||
def _make_coverage_collection(self, feature, cc): | ||
""" | ||
Build a CoverageCollection from a feature. | ||
:param feature: source feature from sensorthings. | ||
:param cc: The CoverageCollection object to populate. | ||
:returns: The updated CoverageCollection object. | ||
""" | ||
id = feature['name'].replace(' ', '+') | ||
|
||
for datastream in feature['Datastreams']: | ||
coverage, length = self._generate_coverage(datastream, id) | ||
if length > 0: | ||
cc['coverages'].append(coverage) | ||
cc['parameters'][id] = \ | ||
self._generate_paramters(datastream, | ||
feature['name']) | ||
self._generate_paramters( | ||
datastream, feature['name'] | ||
) | ||
|
||
return cc | ||
|
||
@staticmethod | ||
def _expand_observations(datastream): | ||
""" | ||
Expand and extract observation times and values from a datastream. | ||
:param datastream: The datastream containing observations. | ||
:returns: A tuple containing lists of times and values. | ||
""" | ||
times = [] | ||
values = [] | ||
# TODO: Expand observations when '[email protected]' | ||
|