Skip to content

Commit

Permalink
[JBPM-9696] ClusteredRuntimeManagerLockFactory based on infinispan
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Apr 12, 2021
1 parent dd9f0ea commit c623b33
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
<groupId>org.jbpm</groupId>
<artifactId>jbpm-executor</artifactId>
</dependency>
<dependency>
<groupId>org.jbpm</groupId>
<artifactId>jbpm-runtime-manager</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.spec.javax.ejb</groupId>
<artifactId>jboss-ejb-api_3.2_spec</artifactId>
Expand Down Expand Up @@ -58,6 +62,11 @@
<artifactId>infinispan-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-clustered-lock</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

@Singleton
@Startup
public class EJBCacheInitializer {
public class EJBCacheBean {

public static final String CACHE_NAME_LOOKUP = "java:jboss/infinispan/container/jbpm";

Expand All @@ -41,6 +41,10 @@ public class EJBCacheInitializer {
public static final String CACHE_JOBS_NAME_LOOKUP = "java:jboss/infinispan/cache/jbpm/jobs";

// this enforce the cache initializer
public static final String CACHE_LOCKS_NAME_LOOKUP = "java:jboss/infinispan/cache/jbpm";

@Resource(lookup = CACHE_LOCKS_NAME_LOOKUP)
private EmbeddedCacheManager cacheContainer;

@Resource(lookup = CACHE_NAME_LOOKUP)
private EmbeddedCacheManager cacheManager;
Expand All @@ -57,4 +61,9 @@ public void init() {
((InfinispanClusterAwareService) clusterService).init(cacheManager);
}


public EmbeddedCacheManager getCacheContainer() {
return cacheContainer;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.server.services.jbpm.cluster.lock;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.infinispan.lock.api.ClusteredLockManager;
import org.jbpm.runtime.manager.spi.RuntimeManagerLock;
import org.jgroups.util.UUID;


public class ClusteredRuntimeManagerLock implements RuntimeManagerLock {

private String clusteredLockId;

private ClusteredLockManager clusteredLockManager;

public ClusteredRuntimeManagerLock(ClusteredLockManager clusteredLockManager) {
this.clusteredLockManager = clusteredLockManager;
clusteredLockId = UUID.randomUUID().toString();
this.clusteredLockManager.defineLock(clusteredLockId);
}

@Override
public void lock() {
clusteredLockManager.get(clusteredLockId).lock();
}

@Override
public boolean tryLock(long units, TimeUnit milliseconds) throws InterruptedException {
try {
return clusteredLockManager.get(clusteredLockId).tryLock(units, milliseconds).get();
} catch(ExecutionException e) {
if(e.getCause() instanceof InterruptedException) {
throw (InterruptedException) e.getCause();
} else {
throw new RuntimeException(e);
}
}
}

@Override
public void lockInterruptible() throws InterruptedException {
throw new UnsupportedOperationException("this lock does not support interruptible lock");
}

@Override
public void forceUnlock() {
try {
clusteredLockManager.forceRelease(clusteredLockId).get();
} catch(Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void unlock() {
try {
clusteredLockManager.get(clusteredLockId).unlock().get();
} catch(Exception e) {
throw new RuntimeException(e);
}
}

@Override
public boolean hasQueuedThreads() {
try {
return clusteredLockManager.get(clusteredLockId).isLocked().get();
} catch(Exception e) {
throw new RuntimeException(e);
}
}

@Override
public boolean isHeldByCurrentThread() {
try {
return clusteredLockManager.get(clusteredLockId).isLockedByMe().get();
} catch(Exception e) {
throw new RuntimeException(e);
}
}

@Override
public int getQueueLength() {
try {
return clusteredLockManager.get(clusteredLockId).isLocked().get() ? 0 : 1;
} catch(Exception e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.server.services.jbpm.cluster.lock;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.infinispan.lock.EmbeddedClusteredLockManagerFactory;
import org.infinispan.lock.api.ClusteredLockManager;
import org.jbpm.runtime.manager.spi.RuntimeManagerLock;
import org.jbpm.runtime.manager.spi.RuntimeManagerLockFactory;
import org.kie.server.services.jbpm.cluster.EJBCacheBean;

public class ClusteredRuntimeManagerLockFactory implements RuntimeManagerLockFactory {

private EJBCacheBean mbean;

private ClusteredLockManager clusterecLockManager;

public ClusteredRuntimeManagerLockFactory() {
try {
Context context = new InitialContext();
mbean = (EJBCacheBean) context.lookup("java:global/EJBCacheBean");
clusterecLockManager = EmbeddedClusteredLockManagerFactory.from(mbean.getCacheContainer());
} catch (NamingException e) {
throw new RuntimeException(e);
}
}

@Override
public RuntimeManagerLock newRuntimeManagerLock() {
return new ClusteredRuntimeManagerLock(clusterecLockManager);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.kie.server.services.jbpm.cluster.lock.ClusteredRuntimeManagerLockFactory
20 changes: 10 additions & 10 deletions process-migration-service/frontend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion process-migration-service/frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"jest-fetch-mock": "^2.1.2",
"jest-junit": "^6.4.0",
"lint-staged": "^8.2.1",
"lock-treatment-tool": "^0.4.1",
"lock-treatment-tool": "^0.4.2",
"moment": "^2.24.0",
"patternfly-react": "^2.36.2",
"prettier": "^1.18.2",
Expand Down

0 comments on commit c623b33

Please sign in to comment.