Skip to content

Commit

Permalink
fix: improve logs / skip entities with no values to insert (#43)
Browse files Browse the repository at this point in the history
Signed-off-by: Benoit Orihuela <[email protected]>
  • Loading branch information
bobeal authored Oct 16, 2024
1 parent f1ab559 commit 07c4f37
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,18 +235,18 @@ public class NgsiLdToPostgreSQL extends AbstractSessionFactoryProcessor {
final PreparedStatement stmt = enclosure.getCachedStatement(conn);
JdbcCommon.setParameters(stmt, flowFile.getAttributes());
try {
getLogger().info("Gonna create schema {}", schemaName);
getLogger().debug("Gonna create schema {}", schemaName);
conn.createStatement().execute(postgres.createSchema(schemaName));
getLogger().info("Gonna create table {} with columns {}", tableName, updatedListOfTypedFields);
getLogger().debug("Gonna create table {} with columns {}", tableName, updatedListOfTypedFields);
conn.createStatement().execute(postgres.createTable(schemaName, tableName, updatedListOfTypedFields));
ResultSet rs = conn.createStatement().executeQuery(postgres.checkColumnNames(tableName));
Map<String, POSTGRESQL_COLUMN_TYPES> newColumns = postgres.getNewColumns(rs, updatedListOfTypedFields);
if (newColumns.size() > 0) {
getLogger().info("Identified new columns to create: {}", newColumns);
if (!newColumns.isEmpty()) {
getLogger().debug("Identified new columns to create: {}", newColumns);
conn.createStatement().execute(postgres.addColumns(schemaName, tableName, newColumns));
}
} catch (SQLException s) {
getLogger().error(s.toString(), s);
getLogger().error("Error when preparing schema: {}", s.toString(), s);
}
stmt.addBatch();
}, onFlowFileError(context, session, result))) {
Expand All @@ -255,7 +255,7 @@ public class NgsiLdToPostgreSQL extends AbstractSessionFactoryProcessor {
enclosure.addFlowFile(flowFile);
}
} catch (Exception e) {
getLogger().error(e.toString(), e);
getLogger().error("Unexpected exception processing flow file: {}", e.toString(), e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,11 @@ public String insertQuery(
List<String> valuesForInsert =
this.getValuesForInsert(entity, listOfFields, creationTime, datasetIdPrefixToTruncate, exportSysAttrs, ignoreEmptyObservedAt, flattenObservations);

return "insert into " + schemaName + "." + tableName + " " + this.getFieldsForInsert(listOfFields.keySet()) + " values " + String.join(",", valuesForInsert) + ";";
if (valuesForInsert.isEmpty()) {
logger.warn("Unable to get values to insert for {}, returning fake statement", entity.entityId);
return "select 1;";
} else
return "insert into " + schemaName + "." + tableName + " " + this.getFieldsForInsert(listOfFields.keySet()) + " values " + String.join(",", valuesForInsert) + ";";
}

public String checkColumnNames(String tableName) {
Expand Down

0 comments on commit 07c4f37

Please sign in to comment.