-
Notifications
You must be signed in to change notification settings - Fork 244
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: SQL implementation of the EDR index store (#4025)
- Loading branch information
Showing
18 changed files
with
786 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
30 changes: 30 additions & 0 deletions
30
extensions/common/store/sql/edr-index-sql/build.gradle.kts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License, Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* Contributors: | ||
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation | ||
* | ||
*/ | ||
|
||
plugins { | ||
`java-library` | ||
} | ||
|
||
dependencies { | ||
api(project(":spi:common:core-spi")) | ||
api(project(":spi:common:transaction-spi")) | ||
|
||
implementation(project(":extensions:common:sql:sql-core")) | ||
implementation(project(":spi:common:edr-store-spi")) | ||
implementation(project(":spi:common:transaction-datasource-spi")) | ||
testImplementation(project(":core:common:junit")) | ||
testImplementation(testFixtures(project(":extensions:common:sql:sql-core"))) | ||
testImplementation(testFixtures(project(":spi:common:edr-store-spi"))) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
|
||
CREATE TABLE IF NOT EXISTS edc_edr_entry | ||
( | ||
transfer_process_id VARCHAR NOT NULL PRIMARY KEY, | ||
agreement_id VARCHAR NOT NULL, | ||
asset_id VARCHAR NOT NULL, | ||
provider_id VARCHAR NOT NULL, | ||
contract_negotiation_id VARCHAR, | ||
created_at BIGINT NOT NULL | ||
); | ||
|
161 changes: 161 additions & 0 deletions
161
...main/java/org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndex.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License, Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* Contributors: | ||
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation | ||
* | ||
*/ | ||
|
||
package org.eclipse.edc.connector.store.sql.edr; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import org.eclipse.edc.connector.store.sql.edr.schema.EndpointDataReferenceEntryStatements; | ||
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; | ||
import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; | ||
import org.eclipse.edc.spi.persistence.EdcPersistenceException; | ||
import org.eclipse.edc.spi.query.QuerySpec; | ||
import org.eclipse.edc.spi.result.StoreResult; | ||
import org.eclipse.edc.sql.QueryExecutor; | ||
import org.eclipse.edc.sql.store.AbstractSqlStore; | ||
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; | ||
import org.eclipse.edc.transaction.spi.TransactionContext; | ||
import org.jetbrains.annotations.Nullable; | ||
|
||
import java.sql.Connection; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.stream.Collectors; | ||
|
||
import static java.lang.String.format; | ||
|
||
public class SqlEndpointDataReferenceEntryIndex extends AbstractSqlStore implements EndpointDataReferenceEntryIndex { | ||
|
||
private final EndpointDataReferenceEntryStatements statements; | ||
|
||
public SqlEndpointDataReferenceEntryIndex(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext, | ||
ObjectMapper objectMapper, EndpointDataReferenceEntryStatements statements, QueryExecutor queryExecutor) { | ||
super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); | ||
this.statements = statements; | ||
} | ||
|
||
@Override | ||
public @Nullable EndpointDataReferenceEntry findById(String transferProcessId) { | ||
Objects.requireNonNull(transferProcessId); | ||
return transactionContext.execute(() -> { | ||
try (var connection = getConnection()) { | ||
return findById(connection, transferProcessId); | ||
} catch (Exception exception) { | ||
throw new EdcPersistenceException(exception); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public StoreResult<List<EndpointDataReferenceEntry>> query(QuerySpec querySpec) { | ||
return transactionContext.execute(() -> { | ||
Objects.requireNonNull(querySpec); | ||
try { | ||
var queryStmt = statements.createQuery(querySpec); | ||
try (var stream = queryExecutor.query(getConnection(), true, this::mapResultSet, queryStmt.getQueryAsString(), queryStmt.getParameters())) { | ||
return StoreResult.success(stream.collect(Collectors.toList())); | ||
} | ||
} catch (SQLException exception) { | ||
throw new EdcPersistenceException(exception); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public StoreResult<Void> save(EndpointDataReferenceEntry entry) { | ||
return transactionContext.execute(() -> { | ||
try (var connection = getConnection()) { | ||
if (existsById(connection, entry.getTransferProcessId())) { | ||
updateInternal(connection, entry); | ||
} else { | ||
insertInternal(connection, entry); | ||
} | ||
return StoreResult.success(); | ||
} catch (Exception e) { | ||
throw new EdcPersistenceException(e.getMessage(), e); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public StoreResult<EndpointDataReferenceEntry> delete(String transferProcessId) { | ||
Objects.requireNonNull(transferProcessId); | ||
return transactionContext.execute(() -> { | ||
try (var connection = getConnection()) { | ||
var entity = findById(connection, transferProcessId); | ||
if (entity != null) { | ||
queryExecutor.execute(connection, statements.getDeleteByIdTemplate(), transferProcessId); | ||
return StoreResult.success(entity); | ||
} else { | ||
return StoreResult.notFound(format(ENDPOINT_DATA_REFERENCE_ENTRY_FOUND, transferProcessId)); | ||
} | ||
} catch (Exception e) { | ||
throw new EdcPersistenceException(e.getMessage(), e); | ||
} | ||
}); | ||
} | ||
|
||
private EndpointDataReferenceEntry findById(Connection connection, String id) { | ||
var sql = statements.getFindByTemplate(); | ||
return queryExecutor.single(connection, false, this::mapResultSet, sql, id); | ||
} | ||
|
||
private boolean existsById(Connection connection, String definitionId) { | ||
var sql = statements.getCountTemplate(); | ||
try (var stream = queryExecutor.query(connection, false, this::mapCount, sql, definitionId)) { | ||
return stream.findFirst().orElse(0L) > 0; | ||
} | ||
} | ||
|
||
private long mapCount(ResultSet resultSet) throws SQLException { | ||
return resultSet.getLong(1); | ||
} | ||
|
||
private void insertInternal(Connection connection, EndpointDataReferenceEntry entry) { | ||
transactionContext.execute(() -> { | ||
queryExecutor.execute(connection, statements.getInsertTemplate(), | ||
entry.getTransferProcessId(), | ||
entry.getAssetId(), | ||
entry.getProviderId(), | ||
entry.getAgreementId(), | ||
entry.getContractNegotiationId(), | ||
entry.getCreatedAt()); | ||
}); | ||
} | ||
|
||
private void updateInternal(Connection connection, EndpointDataReferenceEntry entry) { | ||
transactionContext.execute(() -> { | ||
queryExecutor.execute(connection, statements.getUpdateTemplate(), | ||
entry.getTransferProcessId(), | ||
entry.getAssetId(), | ||
entry.getProviderId(), | ||
entry.getAgreementId(), | ||
entry.getContractNegotiationId(), | ||
entry.getCreatedAt(), | ||
entry.getTransferProcessId()); | ||
}); | ||
} | ||
|
||
private EndpointDataReferenceEntry mapResultSet(ResultSet resultSet) throws Exception { | ||
return EndpointDataReferenceEntry.Builder.newInstance() | ||
.createdAt(resultSet.getLong(statements.getCreatedAtColumn())) | ||
.assetId(resultSet.getString(statements.getAssetIdColumn())) | ||
.transferProcessId(resultSet.getString(statements.getTransferProcessIdColumn())) | ||
.agreementId(resultSet.getString(statements.getAgreementIdColumn())) | ||
.providerId(resultSet.getString(statements.getProviderIdColumn())) | ||
.contractNegotiationId(resultSet.getString(statements.getContractNegotiationIdColumn())) | ||
.build(); | ||
} | ||
} |
71 changes: 71 additions & 0 deletions
71
.../org/eclipse/edc/connector/store/sql/edr/SqlEndpointDataReferenceEntryIndexExtension.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License, Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* Contributors: | ||
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation | ||
* | ||
*/ | ||
|
||
package org.eclipse.edc.connector.store.sql.edr; | ||
|
||
|
||
import org.eclipse.edc.connector.store.sql.edr.schema.EndpointDataReferenceEntryStatements; | ||
import org.eclipse.edc.connector.store.sql.edr.schema.postgres.PostgresDialectStatements; | ||
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex; | ||
import org.eclipse.edc.runtime.metamodel.annotation.Extension; | ||
import org.eclipse.edc.runtime.metamodel.annotation.Inject; | ||
import org.eclipse.edc.runtime.metamodel.annotation.Provides; | ||
import org.eclipse.edc.runtime.metamodel.annotation.Setting; | ||
import org.eclipse.edc.spi.system.ServiceExtension; | ||
import org.eclipse.edc.spi.system.ServiceExtensionContext; | ||
import org.eclipse.edc.spi.types.TypeManager; | ||
import org.eclipse.edc.sql.QueryExecutor; | ||
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; | ||
import org.eclipse.edc.transaction.spi.TransactionContext; | ||
|
||
@Provides({ EndpointDataReferenceEntryIndex.class }) | ||
@Extension(value = "SQL edr entry store") | ||
public class SqlEndpointDataReferenceEntryIndexExtension implements ServiceExtension { | ||
|
||
/** | ||
* Name of the datasource to use for accessing edr entries. | ||
*/ | ||
@Setting(required = true) | ||
public static final String DATASOURCE_SETTING_NAME = "edc.datasource.edr.name"; | ||
|
||
@Inject | ||
private DataSourceRegistry dataSourceRegistry; | ||
|
||
@Inject | ||
private TransactionContext transactionContext; | ||
|
||
@Inject(required = false) | ||
private EndpointDataReferenceEntryStatements statements; | ||
|
||
@Inject | ||
private QueryExecutor queryExecutor; | ||
|
||
@Inject | ||
private TypeManager typeManager; | ||
|
||
@Override | ||
public void initialize(ServiceExtensionContext context) { | ||
var dataSourceName = context.getConfig().getString(DATASOURCE_SETTING_NAME, DataSourceRegistry.DEFAULT_DATASOURCE); | ||
|
||
var sqlStore = new SqlEndpointDataReferenceEntryIndex(dataSourceRegistry, dataSourceName, transactionContext, typeManager.getMapper(), | ||
getStatementImpl(), queryExecutor); | ||
|
||
context.registerService(EndpointDataReferenceEntryIndex.class, sqlStore); | ||
} | ||
|
||
private EndpointDataReferenceEntryStatements getStatementImpl() { | ||
return statements == null ? new PostgresDialectStatements() : statements; | ||
} | ||
|
||
} |
81 changes: 81 additions & 0 deletions
81
...rc/main/java/org/eclipse/edc/connector/store/sql/edr/schema/BaseSqlDialectStatements.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License, Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* Contributors: | ||
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation | ||
* | ||
*/ | ||
|
||
package org.eclipse.edc.connector.store.sql.edr.schema; | ||
|
||
import org.eclipse.edc.connector.store.sql.edr.schema.postgres.EndpointDataReferenceEntryMapping; | ||
import org.eclipse.edc.spi.query.QuerySpec; | ||
import org.eclipse.edc.sql.translation.SqlOperatorTranslator; | ||
import org.eclipse.edc.sql.translation.SqlQueryStatement; | ||
|
||
import static java.lang.String.format; | ||
|
||
public class BaseSqlDialectStatements implements EndpointDataReferenceEntryStatements { | ||
|
||
protected final SqlOperatorTranslator operatorTranslator; | ||
|
||
public BaseSqlDialectStatements(SqlOperatorTranslator operatorTranslator) { | ||
this.operatorTranslator = operatorTranslator; | ||
} | ||
|
||
@Override | ||
public String getDeleteByIdTemplate() { | ||
return executeStatement().delete(getEdrEntryTable(), getTransferProcessIdColumn()); | ||
} | ||
|
||
@Override | ||
public String getFindByTemplate() { | ||
return format("SELECT * FROM %s WHERE %s = ?", getEdrEntryTable(), getTransferProcessIdColumn()); | ||
} | ||
|
||
@Override | ||
public String getInsertTemplate() { | ||
return executeStatement() | ||
.column(getTransferProcessIdColumn()) | ||
.column(getAssetIdColumn()) | ||
.column(getProviderIdColumn()) | ||
.column(getAgreementIdColumn()) | ||
.column(getContractNegotiationIdColumn()) | ||
.column(getCreatedAtColumn()) | ||
.insertInto(getEdrEntryTable()); | ||
} | ||
|
||
@Override | ||
public String getCountTemplate() { | ||
return format("SELECT COUNT (%s) FROM %s WHERE %s = ?", | ||
getTransferProcessIdColumn(), | ||
getEdrEntryTable(), | ||
getTransferProcessIdColumn()); | ||
} | ||
|
||
@Override | ||
public String getUpdateTemplate() { | ||
return executeStatement() | ||
.column(getTransferProcessIdColumn()) | ||
.column(getAssetIdColumn()) | ||
.column(getProviderIdColumn()) | ||
.column(getAgreementIdColumn()) | ||
.column(getContractNegotiationIdColumn()) | ||
.column(getCreatedAtColumn()) | ||
.update(getEdrEntryTable(), getTransferProcessIdColumn()); | ||
|
||
} | ||
|
||
@Override | ||
public SqlQueryStatement createQuery(QuerySpec querySpec) { | ||
var select = format("SELECT * FROM %s", getEdrEntryTable()); | ||
return new SqlQueryStatement(select, querySpec, new EndpointDataReferenceEntryMapping(this), operatorTranslator); | ||
} | ||
|
||
} |
Oops, something went wrong.