Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/maven/cygnus-common/com.google.…
Browse files Browse the repository at this point in the history
…code.gson-gson-2.8.9
  • Loading branch information
AlvaroVega authored Sep 14, 2023
2 parents 7061ae9 + bee00f8 commit 07ec3f5
Show file tree
Hide file tree
Showing 57 changed files with 663 additions and 262 deletions.
10 changes: 9 additions & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
[cygnus-commons] Upgrade gson dependency from 2.6.2 to 2.10.1
- [cygnus-commons] Upgrade gson dependency from 2.6.2 to 2.10.1
- [cygnus-ngsi][cygnus-common] upgrade mockito dep from 1.9.5 to 5.5.0
- [cygnus-ngsi] Fix runtime error: check access aggregator size (#2293)
- [cygnus-ngsi] Switch log level to CYGNUS_LOG_LEVEL env var if was provided to docker (#2286)
- [cygnus-common][SQL] Fix expiration records tablename used by delete and select (#2265)
- [cygnus-common][SQL] Fix expiration records select with a limit to avoid java out of memory error (#2273)
- [cygnus-ngsi] Removes "_" in schema name for DM -schema family (#2270, #2201 reopens for Postgis)
- [cygnus-ngsi] UPGRADE: Debian version from 11.6 to 12.1 in Dockerfile
- [cygnus-common] Upgrade mongodb driver dep from 3.12.12 to 3.12.14
8 changes: 4 additions & 4 deletions cygnus-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.telefonica.iot</groupId>
<artifactId>cygnus-common</artifactId>
<version>3.0.0</version>
<version>3.2.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>cygnus-common</name>
Expand All @@ -26,8 +26,8 @@
<!-- Required for testing -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<artifactId>mockito-core</artifactId>
<version>5.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -123,7 +123,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.12.12</version>
<version>3.12.14</version>
</dependency>
<!-- Required by KafkaBackendImpl -->
<dependency>
Expand Down
17 changes: 17 additions & 0 deletions cygnus-common/spec/SPECS/cygnus-common.spec
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ rm -rf $RPM_BUILD_ROOT
/var/run/%{_project_name}

%changelog

* Tue Jun 20 2023 Alvaro Vega <[email protected]> 3.2.0
- [cygnus-common][pSQL] Allow use null for ST_GeomFromGeoJSON
- [cygnus-common][SQL] cache tablename with schema in pSQL for capRecords case
- [cygnus-common][SQL] Log full values in insert_error instead of placeholder
- [cygnus-common][SQL] Check keys for delete lastdata in lowercase for PSQL
- [cygnus-common][SQL] Check if delete query for lastdata was completed before use it
- [cygnus-common][cygnus-ngsi] Fix: exclude of attribute processing alterationType send by a subscription in all of their values (#2231, reopened)
- [cygnus-common][SQL] Fix sql syntax for expiration records (select, delete, filters) in non mysql instances (#2242)
- [cygnus-common][SQL] Exclude error table from persist policy (cap records is based on recvtime which is non used by persistError)
- [cygnus-common][SQL] Add missed capRecords implementation for PSQL backends

* Thu May 24 2023 Alvaro Vega <[email protected]> 3.1.0
- [cygnus-common] Add: delete entity in lastData when alterationType entityDelete is received (#2231). Requires Orion 3.9+
- [cygnus-common] Add: exclude of attribute processing alterationType send by a subscription in all of their values (#2231)
- [cygnus-common] Fix: missing blank space in create table if not exists for postgresql

* Thu May 04 2023 Alvaro Vega <[email protected]> 3.0.0
- [cygnus-common] OracleSQL backend (#2195)
- [cygnus-common] MongoDB indexes are created depending on DM (#2204)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ void insertContextData(String dataBase, String schema, String tableName, String
/**
* Caps records from the given table within the given database according to the given maximum number.
* @param dataBase
* @param schemaName
* @param tableName
* @param maxRecords
* @throws com.telefonica.iot.cygnus.errors.CygnusRuntimeError
* @throws com.telefonica.iot.cygnus.errors.CygnusPersistenceError
*/
void capRecords(String dataBase, String tableName, long maxRecords) throws CygnusRuntimeError, CygnusPersistenceError;
void capRecords(String dataBase, String schemaName, String tableName, long maxRecords) throws CygnusRuntimeError, CygnusPersistenceError;

/**
* Expirates records within all the cached tables based on the expiration time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SQLBackendImpl implements SQLBackend{
private final int maxLatestErrors;
private static final String DEFAULT_ERROR_TABLE_SUFFIX = "_error_log";
private static final int DEFAULT_MAX_LATEST_ERRORS = 100;
private static final String DEFAULT_LIMIT_SELECT_EXP_RECORDS = "4096";
private String nlsTimestampFormat;
private String nlsTimestampTzFormat;

Expand Down Expand Up @@ -300,13 +301,23 @@ public void insertContextData(String dataBase, String schema, String table, Stri
cache.addTable(dataBase, tableName);
} // insertContextData

private CachedRowSet select(String dataBase, String tableName, String selection)
private CachedRowSet select(String dataBase, String schema, String tableName, String selection)
throws CygnusRuntimeError, CygnusPersistenceError {
Statement stmt = null;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
String query = "select " + selection + " from `" + tableName + "` order by recvTime";
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "select " + selection + " from `" + tableName + "` order by recvTime desc limit " + DEFAULT_LIMIT_SELECT_EXP_RECORDS;
} else if (sqlInstance == SQLInstance.POSTGRESQL) {
if (schema != null && (!tableName.startsWith(schema))) {
tableName = schema + '.' + tableName;
}
query = "select " + selection + " from " + tableName + " order by recvTime desc limit " + DEFAULT_LIMIT_SELECT_EXP_RECORDS;
} else {
query = "select " + selection + " from " + tableName + " order by recvTime desc limit " + DEFAULT_LIMIT_SELECT_EXP_RECORDS;
}

try {
stmt = con.createStatement();
Expand All @@ -324,26 +335,36 @@ private CachedRowSet select(String dataBase, String tableName, String selection)
// used once the statement is closed
@SuppressWarnings("restriction")
CachedRowSet crs = new CachedRowSetImpl();

crs.populate(rs); // FIXME: close Resultset Objects??
closeSQLObjects(con, stmt);
rs.close();
return crs;
} catch (SQLTimeoutException e) {
throw new CygnusPersistenceError(sqlInstance.toString().toUpperCase() + " Data select error. Query " + query, "SQLTimeoutException", e.getMessage());
} catch (SQLException e) {
closeSQLObjects(con, stmt);
persistError(dataBase, "", query, e);
persistError(dataBase, schema, query, e);
throw new CygnusPersistenceError(sqlInstance.toString().toUpperCase() + " Querying error", "SQLException", e.getMessage());
} // try catch
} // select

private void delete(String dataBase, String tableName, String filters)
private void delete(String dataBase, String schema, String tableName, String filters)
throws CygnusRuntimeError, CygnusPersistenceError {
Statement stmt = null;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
String query = "delete from `" + tableName + "` where " + filters;
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "delete from `" + tableName + "` where " + filters;
} else if (sqlInstance == SQLInstance.POSTGRESQL) {
if (schema != null && (!tableName.startsWith(schema))) {
tableName = schema + '.' + tableName;
}
query = "delete from " + tableName + " where " + filters;
} else {
query = "delete from " + tableName + " where " + filters;
}

try {
stmt = con.createStatement();
Expand All @@ -357,20 +378,22 @@ private void delete(String dataBase, String tableName, String filters)
stmt.executeUpdate(query);
} catch (SQLTimeoutException e) {
throw new CygnusPersistenceError(sqlInstance.toString().toUpperCase() + " Data delete error. Query " + query, "SQLTimeoutException", e.getMessage());
}catch (SQLException e) {
} catch (SQLException e) {
closeSQLObjects(con, stmt);
persistError(dataBase, "", query, e);
persistError(dataBase, schema, query, e);
throw new CygnusPersistenceError(sqlInstance.toString().toUpperCase() + " Deleting error", "SQLException", e.getMessage());
} // try catch

closeSQLObjects(con, stmt);
} // delete

@Override
public void capRecords(String dataBase, String tableName, long maxRecords)
public void capRecords(String dataBase, String schemaName, String tableName, long maxRecords)
throws CygnusRuntimeError, CygnusPersistenceError {
LOGGER.debug(sqlInstance.toString().toUpperCase() + " capRecords for database: " +
dataBase + " schema: " + schemaName + " tableName: " + tableName);
// Get the records within the table
CachedRowSet records = select(dataBase, tableName, "*");
CachedRowSet records = select(dataBase, schemaName, tableName, "*");

// Get the number of records
int numRecords = 0;
Expand Down Expand Up @@ -411,9 +434,9 @@ public void capRecords(String dataBase, String tableName, long maxRecords)
if (filters.isEmpty()) {
LOGGER.debug(sqlInstance.toString().toUpperCase() + " No records to be deleted");
} else {
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Records must be deleted (destination=" + dataBase + ",tableName=" + tableName + ", filters="
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Records must be deleted (destination=" + dataBase + ",schemaName=" + schemaName + ",tableName=" + tableName + ", filters="
+ filters + ")");
delete(dataBase, tableName, filters);
delete(dataBase, schemaName, tableName, filters);
} // if else
} // capRecords

Expand All @@ -429,8 +452,18 @@ public void expirateRecordsCache(long expirationTime) throws CygnusRuntimeError,
while (cache.hasNextTable(dataBase)) {
String tableName = cache.nextTable(dataBase);

// Get schema from tableName if PSQL, just for persistError after
String schema = null;
if (sqlInstance == SQLInstance.POSTGRESQL) {
String[] parts = tableName.split("\\.");
if (parts.length > 0) {
schema = parts[0];
}
}
LOGGER.debug(sqlInstance.toString().toUpperCase() + " expirateRecordsCache for database: " +
dataBase + " schema: " + schema + " tableName: " + tableName);
// Get the records within the table
CachedRowSet records = select(dataBase, tableName, "*");
CachedRowSet records = select(dataBase, schema, tableName, "*");

// Get the number of records
int numRecords = 0;
Expand Down Expand Up @@ -476,14 +509,16 @@ public void expirateRecordsCache(long expirationTime) throws CygnusRuntimeError,
throw new CygnusRuntimeError(sqlInstance.toString().toUpperCase() + " Data expiration error", "SQLException", e.getMessage());
} catch (ParseException e) {
throw new CygnusRuntimeError(sqlInstance.toString().toUpperCase() + " Data expiration error", "ParseException", e.getMessage());
} catch (Exception e) {
throw new CygnusRuntimeError(sqlInstance.toString().toUpperCase() + " Data expiration error", "Exception", e.getMessage());
} // try catch

if (filters.isEmpty()) {
LOGGER.debug(sqlInstance.toString().toUpperCase() + " No records to be deleted");
} else {
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Records must be deleted (destination=" + dataBase + ",tableName=" + tableName + ", filters="
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Records must be deleted (destination=" + dataBase + ",schemaName=" + schema + ",tableName=" + tableName + ", filters="
+ filters + ")");
delete(dataBase, tableName, filters);
delete(dataBase, schema, tableName, filters);
} // if else
} // while
} // while
Expand Down Expand Up @@ -542,15 +577,11 @@ private void closeConnection (Connection connection) {

public void createErrorTable(String dataBase, String schema)
throws CygnusRuntimeError, CygnusPersistenceError {
// the defaul table for error log will be called the same as the destination name
// the default table for error log will be called the same as the destination name
String errorTableName = dataBase + DEFAULT_ERROR_TABLE_SUFFIX;
if (sqlInstance == SQLInstance.POSTGRESQL) {
errorTableName = schema + "." + dataBase + DEFAULT_ERROR_TABLE_SUFFIX;
}
if (cache.isCachedTable(dataBase, errorTableName)) {
LOGGER.debug(sqlInstance.toString().toUpperCase() + " '" + errorTableName + "' is cached, thus it is not created");
return;
} // if
String typedFieldNames = "(" +
"timestamp TIMESTAMP" +
", error text" +
Expand All @@ -568,7 +599,7 @@ public void createErrorTable(String dataBase, String schema)
if (sqlInstance == SQLInstance.MYSQL) {
query = "create table if not exists `" + errorTableName + "`" + typedFieldNames;
} else if (sqlInstance == SQLInstance.POSTGRESQL) {
query = "CREATE TABLE IF NOT EXISTS" + errorTableName + " " + typedFieldNames;
query = "CREATE TABLE IF NOT EXISTS " + errorTableName + " " + typedFieldNames;
} else if (sqlInstance == SQLInstance.ORACLE) {
// FIXME: Add an oracle workaround for "if not exists"
query = "CREATE TABLE " + errorTableName + " " + typedFieldNamesOracle;
Expand All @@ -590,16 +621,14 @@ public void createErrorTable(String dataBase, String schema)
} // try catch

closeSQLObjects(con, stmt);

LOGGER.debug(sqlInstance.toString().toUpperCase() + " Trying to add '" + errorTableName + "' to the cache after table creation");
cache.addTable(dataBase, errorTableName);
} // createErrorTable

/**
* Upsert transaction.
*
* @param aggregation the aggregation
* @param lastData the last data
* @param lastDataDelete the last data delete
* @param dataBase the dataBase
* @param tableName the table name
* @param tableSuffix the table suffix
Expand All @@ -614,6 +643,7 @@ public void createErrorTable(String dataBase, String schema)
*/
public void upsertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> aggregation,
LinkedHashMap<String, ArrayList<JsonElement>> lastData,
LinkedHashMap<String, ArrayList<JsonElement>> lastDataDelete,
String dataBase,
String schema,
String tableName,
Expand All @@ -636,6 +666,7 @@ public void upsertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg

ArrayList<StringBuffer> upsertQuerysList = SQLQueryUtils.sqlUpsertQuery(aggregation,
lastData,
lastDataDelete,
tableName,
tableSuffix,
uniqueKey,
Expand Down Expand Up @@ -695,7 +726,9 @@ public void upsertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg
} finally {
closeConnection(connection);
} // try catch

if (sqlInstance == SQLInstance.POSTGRESQL) {
tableName = schema + "." + tableName;
}
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Trying to add '" + dataBase + "' and '" + tableName + "' to the cache after upsertion");
cache.addDataBase(dataBase);
cache.addTable(dataBase, tableName);
Expand Down Expand Up @@ -769,7 +802,9 @@ public void insertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg
} finally {
closeConnection(connection);
} // try catch

if (sqlInstance == SQLInstance.POSTGRESQL) {
tableName = schema + "." + tableName;
}
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Trying to add '" + dataBase + "' and '" + tableName + "' to the cache after insertion");
cache.addDataBase(dataBase);
cache.addTable(dataBase, tableName);
Expand All @@ -790,8 +825,6 @@ public void purgeErrorTable(String dataBase, String schema)
String errorTableName = dataBase + DEFAULT_ERROR_TABLE_SUFFIX;
if (sqlInstance == SQLInstance.POSTGRESQL) {
errorTableName = schema + "." + dataBase + DEFAULT_ERROR_TABLE_SUFFIX;
} else if (sqlInstance == SQLInstance.ORACLE) {
errorTableName = dataBase + DEFAULT_ERROR_TABLE_SUFFIX;
}
String limit = String.valueOf(maxLatestErrors);

Expand Down Expand Up @@ -857,7 +890,7 @@ private void insertErrorLog(String dataBase, String schema, String errorQuery, E
preparedStatement.setObject(1, java.sql.Timestamp.from(Instant.now()));
preparedStatement.setString(2, exception.getMessage());
preparedStatement.setString(3, errorQuery);
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Executing SQL query '" + query + "'");
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Executing SQL query '" + preparedStatement.toString() + "'");
preparedStatement.executeUpdate();
} catch (SQLTimeoutException e) {
throw new CygnusPersistenceError(sqlInstance.toString().toUpperCase() + " Data insertion error. Query: `" + preparedStatement, "SQLTimeoutException", e.getMessage());
Expand All @@ -867,9 +900,6 @@ private void insertErrorLog(String dataBase, String schema, String errorQuery, E
closeSQLObjects(con, preparedStatement);
} // try catch

LOGGER.debug(sqlInstance.toString().toUpperCase() + " Trying to add '" + dataBase + "' and '" + errorTableName + "' to the cache after insertion");
cache.addDataBase(dataBase);
cache.addTable(dataBase, errorTableName);
} // insertErrorLog

private void persistError(String destination, String schema, String query, Exception exception) throws CygnusPersistenceError, CygnusRuntimeError {
Expand Down
Loading

0 comments on commit 07ec3f5

Please sign in to comment.