Skip to content

Commit

Permalink
fix: Synchronize EDR refresh (#1633)
Browse files Browse the repository at this point in the history
* Failing test and added code to make it pass

* Created module for EdrLockSql and added statements

* alter test to check for correct response

* remove debug messages

* Introduce InMemoryEdrLock

* working version

* improve test for two different edrs.

* code dup

* refactor inmem acquireLock

* handle

* handle2

* refactor in mem EDR lock

* update EdrServiceImpl to enable force refresh

* Remove non existing job from verify.yaml

* Add ComponentTests

* fix failing uts

* Removes global lock in inmem variant

* inmem release lock should be atomic

* retrigger CI

* retrigger CI
  • Loading branch information
rafaelmag110 authored Oct 23, 2024
1 parent 6072467 commit c8cd9d6
Show file tree
Hide file tree
Showing 23 changed files with 1,169 additions and 257 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ jobs:
exit 1
fi
verify-dependencies:
uses: eclipse-edc/.github/.github/workflows/dependency-check.yml@main

verify-formatting:
runs-on: ubuntu-latest
steps:
Expand Down
4 changes: 4 additions & 0 deletions core/edr-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ dependencies {
implementation(libs.edc.spi.edrstore)
implementation(libs.edc.spi.transactionspi)

implementation(libs.edc.spi.transaction.datasource)

implementation(project(":spi:tokenrefresh-spi"))
implementation(project(":spi:edr-spi"))
implementation(project(":spi:core-spi"))

testImplementation(libs.edc.junit)
testImplementation(libs.edc.core.edrstore)
testImplementation(libs.edc.lib.query)
testImplementation(libs.awaitility)
testImplementation(testFixtures(project(":spi:edr-spi")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.spi.tokenrefresh.common.TokenRefreshHandler;

Expand All @@ -49,13 +50,16 @@ public class EdrCoreServiceExtension implements ServiceExtension {
@Inject
private TransactionContext transactionContext;

@Inject
private EndpointDataReferenceLock edrLock;

@Override
public String name() {
return NAME;
}

@Provider
public EdrService edrService() {
return new EdrServiceImpl(edrStore, tokenRefreshHandler, transactionContext, monitor);
return new EdrServiceImpl(edrStore, tokenRefreshHandler, transactionContext, monitor, edrLock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.edr.core.lock;

import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;

@Extension("Provides A Default EdrLock Provider")
public class DefaultEdrLockProviderExtension implements ServiceExtension {

@Inject
EndpointDataReferenceEntryIndex entryIndex;

@Inject
TransactionContext transactionContext;

@Provider(isDefault = true)
public EndpointDataReferenceLock createInMemoryEdrLock() {
return new InMemoryEdrLock(entryIndex, transactionContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.edr.core.lock;

import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class InMemoryEdrLock implements EndpointDataReferenceLock {

private final EndpointDataReferenceEntryIndex entryIndex;
private final TransactionContext transactionContext;
private final Map<String, ReentrantReadWriteLock> lockedEdrs = new ConcurrentHashMap<>();

public InMemoryEdrLock(EndpointDataReferenceEntryIndex entryIndex, TransactionContext transactionContext) {
this.entryIndex = entryIndex;
this.transactionContext = transactionContext;
}

@Override
public StoreResult<Boolean> acquireLock(String edrId, DataAddress edr) {

var rowLock = lockedEdrs.computeIfAbsent(edrId, k -> new ReentrantReadWriteLock());

rowLock.writeLock().lock(); // this lock synchronizes row-level access

var edrEntry = transactionContext.execute(() -> entryIndex.findById(edrId));

return StoreResult.success(isExpired(edr, edrEntry));

}


@Override
public StoreResult<Void> releaseLock(String edrId) {

lockedEdrs.computeIfPresent(edrId, (k, rowLock) -> {
if (rowLock.writeLock().isHeldByCurrentThread()) {
rowLock.writeLock().unlock();
if (!rowLock.hasQueuedThreads()) {
return null;
}
}
return rowLock;
});

return StoreResult.success();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.types.RefreshMode;
import org.eclipse.tractusx.edc.spi.tokenrefresh.common.TokenRefreshHandler;

import java.time.Instant;
import java.util.List;

import static org.eclipse.tractusx.edc.edr.spi.CoreConstants.EDR_PROPERTY_EXPIRES_IN;

public class EdrServiceImpl implements EdrService {

private final EndpointDataReferenceStore edrStore;
private final TokenRefreshHandler tokenRefreshHandler;
private final TransactionContext transactionContext;
private final Monitor monitor;
private final EndpointDataReferenceLock edrLock;

public EdrServiceImpl(EndpointDataReferenceStore edrStore, TokenRefreshHandler tokenRefreshHandler, TransactionContext transactionContext, Monitor monitor) {
public EdrServiceImpl(EndpointDataReferenceStore edrStore, TokenRefreshHandler tokenRefreshHandler, TransactionContext transactionContext, Monitor monitor, EndpointDataReferenceLock edrLock) {
this.edrStore = edrStore;
this.tokenRefreshHandler = tokenRefreshHandler;
this.transactionContext = transactionContext;
this.monitor = monitor;
this.edrLock = edrLock;
}

@Override
Expand Down Expand Up @@ -74,12 +74,26 @@ private ServiceResult<DataAddress> autoRefresh(String id, DataAddress edr, Refre
if (edrEntry == null) {
return ServiceResult.notFound("An EndpointDataReferenceEntry with ID '%s' does not exist".formatted(id));
}
if (isExpired(edr, edrEntry) || mode.equals(RefreshMode.FORCE_REFRESH)) {
monitor.debug("Token expired, need to refresh.");
return tokenRefreshHandler.refreshToken(id, edr)
.compose(updated -> updateEdr(edrEntry, updated));
if (edrLock.isExpired(edr, edrEntry) || mode.equals(RefreshMode.FORCE_REFRESH)) {
var result = ServiceResult.from(edrLock.acquireLock(id, edr))
.compose(shouldRefresh -> {
if (!shouldRefresh && !mode.equals(RefreshMode.FORCE_REFRESH)) {
monitor.debug("Dont need to refresh. Will resolve existing.");
var refreshedEdr = edrStore.resolveByTransferProcess(id);
return ServiceResult.from(refreshedEdr);
} else {
monitor.debug("Token '%s' expired, need to refresh.".formatted(id));
return tokenRefreshHandler.refreshToken(id, edr)
.compose(updated -> updateEdr(edrEntry, updated));
}
});
edrLock.releaseLock(id)
.onFailure(error -> monitor.warning("Error releasing lock: %s".formatted(error)));
return result;

}
return ServiceResult.success(edr);
var refreshedEdr = edrStore.resolveByTransferProcess(id);
return ServiceResult.from(refreshedEdr);
}

private ServiceResult<DataAddress> updateEdr(EndpointDataReferenceEntry entry, DataAddress dataAddress) {
Expand All @@ -94,24 +108,12 @@ private ServiceResult<DataAddress> updateEdr(EndpointDataReferenceEntry entry, D

var updateResult = edrStore.save(newEntry, dataAddress);


if (updateResult.failed()) {
return ServiceResult.fromFailure(updateResult);
}
return ServiceResult.success(dataAddress);
}

private boolean isExpired(DataAddress edr, EndpointDataReferenceEntry metadata) {
var expiresInString = edr.getStringProperty(EDR_PROPERTY_EXPIRES_IN);
if (expiresInString == null) {
return false;
}

var expiresIn = Long.parseLong(expiresInString);
// createdAt is in millis, expires-in is in seconds
var expiresAt = metadata.getCreatedAt() / 1000L + expiresIn;
var expiresAtInstant = Instant.ofEpochSecond(expiresAt);

return expiresAtInstant.isBefore(Instant.now());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
#################################################################################

org.eclipse.tractusx.edc.edr.core.EdrCoreServiceExtension
org.eclipse.tractusx.edc.edr.core.lock.DefaultEdrLockProviderExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.tractusx.edc.edr.core.lock;

import org.eclipse.edc.edr.store.defaults.InMemoryEndpointDataReferenceEntryIndex;
import org.eclipse.edc.junit.annotations.ComponentTest;
import org.eclipse.edc.query.CriterionOperatorRegistryImpl;
import org.eclipse.edc.transaction.spi.NoopTransactionContext;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.tractusx.edc.edr.spi.index.lock.EndpointDataReferenceLock;
import org.eclipse.tractusx.edc.edr.spi.testfixtures.index.lock.EndpointDataReferenceLockBaseTest;
import org.junit.jupiter.api.BeforeEach;


@ComponentTest
class InMemoryEdrLockTest extends EndpointDataReferenceLockBaseTest {

private InMemoryEdrLock edrLock;
private final TransactionContext transactionContext = new NoopTransactionContext();

@BeforeEach
void setUp() {
var entryIndex = new InMemoryEndpointDataReferenceEntryIndex(CriterionOperatorRegistryImpl.ofDefaults());
edrLock = new InMemoryEdrLock(entryIndex, transactionContext);
entryIndex.save(edrEntry("mock", ACQUIRE_LOCK_TP));
}

@Override
protected EndpointDataReferenceLock getStore() {
return edrLock;
}
}
Loading

0 comments on commit c8cd9d6

Please sign in to comment.