Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update plugin based on new mapping input #1

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/default.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ WIS2BOX_DOCKER_API_URL=http://wis2box-api:80/oapi
WIS2BOX_LOGGING_LOGLEVEL=ERROR
WIS2BOX_LOGGING_LOGFILE=stdout

WIS2BOX_UI_CLUSTER=True

# PubSub
WIS2BOX_BROKER_USERNAME=wis2box
WIS2BOX_BROKER_PASSWORD=wis2box
Expand Down
5 changes: 3 additions & 2 deletions docker/sta.env
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ http_cors_allowed_origins=*
# MQTT
bus_mqttBroker=tcp://${WIS2BOX_BROKER_HOST}:${WIS2BOX_BROKER_PORT}
bus_busImplementationClass=de.fraunhofer.iosb.ilt.sta.messagebus.MqttMessageBus
bus_sendQueueSize=500
bus_sendQueueSize=1000
bus_sendWorkerPoolSize=3

# Plugins
# plugins.multiDatastream.enable=true
plugins.coreModel.idType=STRING
plugins_coreModel_idType=STRING
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
isodate
minio
OWSLib
paho-mqtt
paho-mqtt<2
pygeometa
PyYAML
requests
4 changes: 4 additions & 0 deletions wis2box/api/backend/sensorthings.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ def delete_collection_item(self, collection_id: str, item_id: str) -> str:

LOGGER.debug(f'Deleting {item_id} from {collection_id}')
sta_index = self.sta_id(collection_id)
try:
item_id = int(item_id)
except ValueError:
item_id = f"'{item_id}'"
try:
self.http.delete(f'{sta_index}({item_id})')
except Exception as err:
Expand Down
148 changes: 101 additions & 47 deletions wis2box/data/csv2sta.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
from io import StringIO
import logging
from pathlib import Path
from requests import Session
from typing import Union

from wis2box.env import NLDI_URL
from wis2box.data.geojson import ObservationDataGeoJSON
from wis2box.util import make_uuid
from wis2box.util import make_uuid, url_join

LOGGER = logging.getLogger(__name__)

Expand All @@ -45,29 +47,69 @@ def transform(self, input_data: Union[Path, bytes],
fh = StringIO(input_bytes.decode())
reader = DictReader(fh)

http = Session()

for row in reader:
monitoring_location_identifier = \
row['MonitoringLocationIdentifier']
url = url_join(NLDI_URL, monitoring_location_identifier)
try:
result = http.get(url)
feature = result.json()['features'][0]
except KeyError:
msg = f'Could not discover {monitoring_location_identifier}'
LOGGER.info(msg)
continue

identifier = row['ResultIdentifier']
unitOfMeasurement = row['ResultMeasure/MeasureUnitCode'] or row['DetectionQuantitationLimitMeasure/MeasureUnitCode'] # noqa
datastream = make_uuid(f"{row['CharacteristicName']}-{row['MonitoringLocationIdentifier']}-{unitOfMeasurement}") # noqa

_ = f"{row['ActivityStartDate']} {row['ActivityStartTime/Time']}"
isodate = datetime.strptime(
_, '%Y-%m-%d %H:%M:%S'
).replace(tzinfo=timezone(row['ActivityStartTime/TimeZoneCode']))
_ = ' '.join([row['ActivityStartDate'], row['ActivityStartTime/Time']]) # noqa
try:
isodate = datetime.strptime(_, '%Y-%m-%d %H:%M:%S')
except ValueError:
isodate = datetime.strptime(_, '%Y-%m-%d ')
try:
isodate = isodate.replace(
tzinfo=timezone(row['ActivityStartTime/TimeZoneCode']))
except Exception:
LOGGER.info('Could not apply time zone information')

rowdate = isodate.strftime('%Y-%m-%dT%H:%M:%SZ')
isodate = isodate.strftime('%Y%m%dT%H%M%S')

LongitudeMeasure = row['ActivityLocation/LongitudeMeasure'] # noqa
LatitudeMeasure = row['ActivityLocation/LatitudeMeasure'] # noqa
try:
analysisStartDate = datetime.strptime(
row['AnalysisStartDate'], '%Y-%m-%d'
).strftime('%Y-%m-%dT%H:%M:%SZ')
except ValueError:
analysisStartDate = rowdate

try:
LongitudeMeasure = float(row['ActivityLocation/LongitudeMeasure']) # noqa
LatitudeMeasure = float(row['ActivityLocation/LatitudeMeasure']) # noqa
geom = {
'type': 'Point',
'coordinates': [LongitudeMeasure, LatitudeMeasure]
}
except ValueError:
geom = feature['geometry']

try:
result = float(row['ResultMeasureValue'])
except ValueError:
result = row['ResultDetectionConditionText']

if not result:
LOGGER.warning(f'No results for {identifier}')
continue

resultQuality = {
'detectionCondition': row['ResultDetectionConditionText'],
'precision': row['DataQuality/PrecisionValue'],
'accuracy': row['DataQuality/BiasValue'],
'detectionLimit': {
'value': row['DetectionQuantitationLimitMeasure/MeasureValue'], # noqa
'unit': row['DetectionQuantitationLimitMeasure/MeasureUnitCode'] # noqa
}
}
resultQuality = (row['MeasureQualifierCode'] or row['ResultStatusIdentifier']) or ' '.join([ # noqa
row['ResultDetectionQuantitationLimitUrl'],
row['DetectionQuantitationLimitMeasure/MeasureValue'],
Expand All @@ -82,57 +124,69 @@ def transform(self, input_data: Union[Path, bytes],
},
'geojson': {
'phenomenonTime': rowdate,
'resultTime': rowdate,
'resultTime': analysisStartDate,
'result': result,
'resultQuality': resultQuality,
'parameters': {
'ResultCommentText': row['ResultCommentText'],
'HydrologicCondition': row['HydrologicCondition'],
'HydrologicEvent': row['HydrologicEvent']
'hydrologicCondition': row['HydrologicCondition'],
'hydrologicEvent': row['HydrologicEvent'],
'modified': row['LastUpdated'],
'status': row['ResultStatusIdentifier'],
'publisher': row['ProviderName'],
'valueType': row['ResultValueTypeName'],
'comment': row['ResultCommentText']
},
'Datastream': {'@iot.id': datastream},
'FeatureOfInterest': {
'@iot.id': datastream,
'name': row['MonitoringLocationName'],
'description': row['MonitoringLocationName'],
'encodingType': 'application/vnd.geo+json',
'feature': {
'type': 'Point',
'coordinates': [LongitudeMeasure, LatitudeMeasure]
},
'encodingType': 'application/geo+json',
'feature': geom,
},
}
}

try:
depth = float(row['ActivityDepthHeightMeasure/MeasureValue'])
LOGGER.info('Adding samplings')
deployment_info = row['ActivityTypeCode'] in (
'Field Msr/Obs-Portable Data Logger', 'Field Msr/Obs')
if not deployment_info:
LOGGER.info('Adding Sampling Entity')
sampling_name = '-'.join([
row['MonitoringLocationIdentifier'],
row['ActivityIdentifier']
])
samplingProcedure_id = '-'.join([
row['SampleCollectionMethod/MethodIdentifierContext'],
row['SampleCollectionMethod/MethodIdentifier']
])
featureOfInterest = self.output_data[identifier]['geojson']['FeatureOfInterest'] # noqa
featureOfInterest['Samplings'] = [{
'name': row['ActivityTypeCode'],
'description': row['ActivityTypeCode'] + row['ActivityRelativeDepthName'], # noqa
'atDepth': depth, # noqa
'depthUom': row['ActivityDepthHeightMeasure/MeasureUnitCode'], # noqa
'encodingType': 'application/vnd.geo+json',
'samplingLocation': {
'type': 'Point',
'coordinates': [LongitudeMeasure, LatitudeMeasure]
},
'Thing': {
'@iot.id': row['MonitoringLocationIdentifier']
},
'Sampler': {
'name': row['OrganizationFormalName'],
'SamplingProcedure': {
'name': row['ActivityTypeCode']

try:
featureOfInterest['Samplings'] = [{
'name': sampling_name,
'description': row['ActivityTypeCode'] + row['ActivityRelativeDepthName'], # noqa
'depthUom': row['ResultDepthHeightMeasure/MeasureUnitCode'], # noqa
'encodingType': 'application/geo+json',
# 'samplingLocation': geom,
'Thing': {
'@iot.id': row['MonitoringLocationIdentifier']
},
'Sampler': {
'name': row['OrganizationFormalName'],
'SamplingProcedure': {
'@iot.id': make_uuid(samplingProcedure_id),
'name': row['SampleCollectionMethod/MethodName'], # noqa
'definition': row['SampleCollectionMethod/MethodDescriptionText'], # noqa
'description': row['SampleCollectionMethod/MethodDescriptionText'] # noqa
}
}
},
'SamplingProcedure': {
'name': row['ActivityTypeCode']
}
}]
except (TypeError, ValueError):
LOGGER.info('No Sampling detected')
}]
if row['ActivityDepthHeightMeasure/MeasureValue']:
featureOfInterest['Samplings'][0]['atDepth'] = \
row['ActivityDepthHeightMeasure/MeasureValue']

except (TypeError, ValueError):
LOGGER.error('No Sampling detected')

def __repr__(self):
return '<ObservationDataCSV>'
1 change: 1 addition & 0 deletions wis2box/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
THINGS = 'Things'

GEOCONNEX = 'https://geoconnex.us/'
NLDI_URL = 'https://labs.waterdata.usgs.gov/api/nldi/linked-data/wqp'
WQP_URL = 'https://www.waterqualitydata.us'
STATION_URL = url_join(WQP_URL, 'data/Station/search')
RESULTS_URL = url_join(WQP_URL, 'data/Result/search')
Expand Down
Loading
Loading