-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for CKAN issues: recvTimeTs and attrValue Integer 0 #1982
base: master
Are you sure you want to change the base?
Changes from all commits
a98ebf0
4380bb6
427f678
6151ad4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
|
||
import com.google.gson.JsonElement; | ||
import com.google.gson.JsonObject; | ||
import com.google.gson.JsonPrimitive; | ||
import com.telefonica.iot.cygnus.aggregation.NGSIGenericAggregator; | ||
import com.telefonica.iot.cygnus.aggregation.NGSIGenericColumnAggregator; | ||
import com.telefonica.iot.cygnus.aggregation.NGSIGenericRowAggregator; | ||
|
@@ -342,9 +343,30 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError { | |
} // try catch | ||
} // truncateByTime | ||
|
||
private class NGSICKANRowAggregator extends NGSIGenericRowAggregator { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good idea. Maybe on this class you can override the aggregation method to loop all the events processed by it's super, then you can add the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see how to loop, it is too hard to understand what is happening. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with you. This aggregation is a bit difficult to understand. First of all, here you can find a guide to debug Cygnus with docker in order to understand better how it works. About the implementation. In this case (Row-storage) all events are processed by the
For this solution, you are interested on the RECV_TIME_TS constant. As mentioned before, a way to implement this, would be to get the key RECV_TIME_TS from the Hope this helps. |
||
@Override | ||
public long getRecvTimeTsValue(NGSIEvent cygnusEvent) { | ||
return cygnusEvent.getRecvTimeTs() / 1000; | ||
} | ||
|
||
@Override | ||
public JsonElement adaptAttrValue(JsonElement attrValue) { | ||
JsonElement adaptedValue = attrValue; | ||
// JR 2020-10-7: Attempt to fix a coerce error at CKAN, not coercing 0 to the nested data type. | ||
if (attrValue.isJsonPrimitive()) { | ||
JsonPrimitive jp = attrValue.getAsJsonPrimitive(); | ||
if (jp.isNumber() && "0".equals(jp.getAsString())) { | ||
adaptedValue = new JsonPrimitive("0"); | ||
} | ||
} | ||
return adaptedValue; | ||
} | ||
|
||
} | ||
|
||
protected NGSIGenericAggregator getAggregator(boolean rowAttrPersistence) { | ||
if (rowAttrPersistence) { | ||
return new NGSIGenericRowAggregator(); | ||
return new NGSICKANRowAggregator(); | ||
} else { | ||
return new NGSIGenericColumnAggregator(); | ||
} // if else | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be here. This data is very specific for this solution. This is a generic class which is used by many sinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, but the aggregate method is difficult to understand for non-insiders. By adding these idempotent method I was sure nothing to break and still able to inject a specific change. I do not see an alternative, even not after a fresh look.