Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for CKAN issues: recvTimeTs and attrValue Integer 0 #1982

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cygnus-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,26 @@
</reportPlugins>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
<excludeTransitive>false</excludeTransitive>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,4 +694,22 @@ public void setEnableNameMappings(boolean enableNameMappings) {
*/
public abstract void initialize(NGSIEvent cygnusEvent);

/**
* Returns the timestamp to use for the RECV_TIME_TS field.
* @param cygnusEvent the event to produce the timestamp for.
* @return the timestamp value to use. Default milliseconds.
*/
public long getRecvTimeTsValue(NGSIEvent cygnusEvent) {
return cygnusEvent.getRecvTimeTs();
}

/**
* Returns a possible adapted value for attribute value.
* @param attrValue The input
* @return the adapted output. Default unchanged, same object.
*/
public JsonElement adaptAttrValue(JsonElement attrValue) {
// Default: No adaptation.
return attrValue;
}
Comment on lines +702 to +714
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be here. This data is very specific for this solution. This is a generic class which is used by many sinks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but the aggregate method is difficult to understand for non-insiders. By adding these idempotent method I was sure nothing to break and still able to inject a specific change. I do not see an alternative, even not after a fresh look.

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,21 @@ public void aggregate(NGSIEvent event) {
LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type="
+ attrType + ")");
// aggregate the attribute information
aggregation.get(NGSIConstants.RECV_TIME_TS).add(new JsonPrimitive(Long.toString(recvTimeTs)));
aggregation.get(NGSIConstants.RECV_TIME_TS).add(new JsonPrimitive(Long.toString(getRecvTimeTsValue(event))));
aggregation.get(NGSIConstants.RECV_TIME).add(new JsonPrimitive(recvTime));
aggregation.get(NGSIConstants.FIWARE_SERVICE_PATH).add(new JsonPrimitive(getServicePathForData()));
aggregation.get(NGSIConstants.ENTITY_ID).add(new JsonPrimitive(entityId));
aggregation.get(NGSIConstants.ENTITY_TYPE).add(new JsonPrimitive(entityType));
aggregation.get(NGSIConstants.ATTR_NAME).add(new JsonPrimitive(attrName));
aggregation.get(NGSIConstants.ATTR_TYPE).add(new JsonPrimitive(attrType));
aggregation.get(NGSIConstants.ATTR_VALUE).add(attrValue);
aggregation.get(NGSIConstants.ATTR_VALUE).add(adaptAttrValue(attrValue));
reitsma marked this conversation as resolved.
Show resolved Hide resolved
aggregation.get(NGSIConstants.ATTR_MD).add(jsonAttrMetadata);
} // for
setAggregation(aggregation);
} // aggregate

private String getName() {
return "NGSIUtils.GenericColumnAggregator";
return "NGSIUtils.GenericRowAggregator";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.telefonica.iot.cygnus.aggregation.NGSIGenericAggregator;
import com.telefonica.iot.cygnus.aggregation.NGSIGenericColumnAggregator;
import com.telefonica.iot.cygnus.aggregation.NGSIGenericRowAggregator;
Expand Down Expand Up @@ -342,9 +343,30 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError {
} // try catch
} // truncateByTime

private class NGSICKANRowAggregator extends NGSIGenericRowAggregator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea. Maybe on this class you can override the aggregation method to loop all the events processed by it's super, then you can add the / 1000 operation to theNGSIConstants.RECV_TIME_TS key of the LinkedHashMap.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how to loop, it is too hard to understand what is happening.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. This aggregation is a bit difficult to understand.

First of all, here you can find a guide to debug Cygnus with docker in order to understand better how it works.

About the implementation.

In this case (Row-storage) all events are processed by the NGSIGenericRowAggregator.aggregate method on each sink. Exactly here for this sink. The output of this aggregation method is a LinkedHashMap. This LinkedHashMap contains all events processed on the batch on the following way.

  • The keys of the LinkedHashMap on Row aggregation are deffined as constants here.
  • The values for those keys are ArrayList collections, wich contains all of the values processed on the batch.

For this solution, you are interested on the RECV_TIME_TS constant. As mentioned before, a way to implement this, would be to get the key RECV_TIME_TS from the LinkedHashMap after the aggregation ends and loop each one of the elements of the list to add the / 1000 operation. Then set this new list on the LinkedHashMap for the key RECV_TIME_TS.

Hope this helps.

@Override
public long getRecvTimeTsValue(NGSIEvent cygnusEvent) {
return cygnusEvent.getRecvTimeTs() / 1000;
}

@Override
public JsonElement adaptAttrValue(JsonElement attrValue) {
JsonElement adaptedValue = attrValue;
// JR 2020-10-7: Attempt to fix a coerce error at CKAN, not coercing 0 to the nested data type.
if (attrValue.isJsonPrimitive()) {
JsonPrimitive jp = attrValue.getAsJsonPrimitive();
if (jp.isNumber() && "0".equals(jp.getAsString())) {
adaptedValue = new JsonPrimitive("0");
}
}
return adaptedValue;
}

}

protected NGSIGenericAggregator getAggregator(boolean rowAttrPersistence) {
if (rowAttrPersistence) {
return new NGSIGenericRowAggregator();
return new NGSICKANRowAggregator();
} else {
return new NGSIGenericColumnAggregator();
} // if else
Expand Down