diff --git a/cygnus-common/pom.xml b/cygnus-common/pom.xml index 58584212a..fc1a52ec1 100644 --- a/cygnus-common/pom.xml +++ b/cygnus-common/pom.xml @@ -230,6 +230,26 @@ + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.2 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/dependencies + false + true + false + + + + diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java index f7efe515e..0551fc6cd 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java @@ -694,4 +694,22 @@ public void setEnableNameMappings(boolean enableNameMappings) { */ public abstract void initialize(NGSIEvent cygnusEvent); + /** + * Returns the timestamp to use for the RECV_TIME_TS field. + * @param cygnusEvent the event to produce the timestamp for. + * @return the timestamp value to use. Default milliseconds. + */ + public long getRecvTimeTsValue(NGSIEvent cygnusEvent) { + return cygnusEvent.getRecvTimeTs(); + } + + /** + * Returns a possible adapted value for attribute value. + * @param attrValue The input + * @return the adapted output. Default unchanged, same object. + */ + public JsonElement adaptAttrValue(JsonElement attrValue) { + // Default: No adaptation. + return attrValue; + } } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java index da42a4c40..d1270d9cd 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java @@ -88,21 +88,21 @@ public void aggregate(NGSIEvent event) { LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type=" + attrType + ")"); // aggregate the attribute information - aggregation.get(NGSIConstants.RECV_TIME_TS).add(new JsonPrimitive(Long.toString(recvTimeTs))); + aggregation.get(NGSIConstants.RECV_TIME_TS).add(new JsonPrimitive(Long.toString(getRecvTimeTsValue(event)))); aggregation.get(NGSIConstants.RECV_TIME).add(new JsonPrimitive(recvTime)); aggregation.get(NGSIConstants.FIWARE_SERVICE_PATH).add(new JsonPrimitive(getServicePathForData())); aggregation.get(NGSIConstants.ENTITY_ID).add(new JsonPrimitive(entityId)); aggregation.get(NGSIConstants.ENTITY_TYPE).add(new JsonPrimitive(entityType)); aggregation.get(NGSIConstants.ATTR_NAME).add(new JsonPrimitive(attrName)); aggregation.get(NGSIConstants.ATTR_TYPE).add(new JsonPrimitive(attrType)); - aggregation.get(NGSIConstants.ATTR_VALUE).add(attrValue); + aggregation.get(NGSIConstants.ATTR_VALUE).add(adaptAttrValue(attrValue)); aggregation.get(NGSIConstants.ATTR_MD).add(jsonAttrMetadata); } // for setAggregation(aggregation); } // aggregate private String getName() { - return "NGSIUtils.GenericColumnAggregator"; + return "NGSIUtils.GenericRowAggregator"; } } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java index 12b5bba53..9ae53b29c 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java @@ -20,6 +20,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.telefonica.iot.cygnus.aggregation.NGSIGenericAggregator; import com.telefonica.iot.cygnus.aggregation.NGSIGenericColumnAggregator; import com.telefonica.iot.cygnus.aggregation.NGSIGenericRowAggregator; @@ -342,9 +343,30 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError { } // try catch } // truncateByTime + private class NGSICKANRowAggregator extends NGSIGenericRowAggregator { + @Override + public long getRecvTimeTsValue(NGSIEvent cygnusEvent) { + return cygnusEvent.getRecvTimeTs() / 1000; + } + + @Override + public JsonElement adaptAttrValue(JsonElement attrValue) { + JsonElement adaptedValue = attrValue; + // JR 2020-10-7: Attempt to fix a coerce error at CKAN, not coercing 0 to the nested data type. + if (attrValue.isJsonPrimitive()) { + JsonPrimitive jp = attrValue.getAsJsonPrimitive(); + if (jp.isNumber() && "0".equals(jp.getAsString())) { + adaptedValue = new JsonPrimitive("0"); + } + } + return adaptedValue; + } + + } + protected NGSIGenericAggregator getAggregator(boolean rowAttrPersistence) { if (rowAttrPersistence) { - return new NGSIGenericRowAggregator(); + return new NGSICKANRowAggregator(); } else { return new NGSIGenericColumnAggregator(); } // if else