Skip to content

Commit

Permalink
Support parallel compilation in model manager (#2948)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelbey authored Jul 8, 2024
1 parent 37af507 commit 5bc85ae
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
import javax.servlet.FilterRegistration;
import javax.ws.rs.container.DynamicFeature;
import java.io.FileInputStream;
import java.util.concurrent.ForkJoinPool;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -268,7 +269,8 @@ public void run(T serverConfiguration, Environment environment)
DeploymentStateAndVersions.DEPLOYMENT_MODE = serverConfiguration.deployment.mode;

SDLCLoader sdlcLoader = new SDLCLoader(serverConfiguration.metadataserver, null);
ModelManager modelManager = new ModelManager(serverConfiguration.deployment.mode, sdlcLoader);
ForkJoinPool compilerPool = new ForkJoinPool();
ModelManager modelManager = new ModelManager(serverConfiguration.deployment.mode, compilerPool, sdlcLoader);

ChainFixingFilterHandler.apply(environment.getApplicationContext(), serverConfiguration.filterPriorities);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void testSdlcLoaderForWorkspacesWithDependency() throws Exception
configureWireMockForRetries();
SDLCLoader sdlcLoader = createSDLCLoader();

ModelManager modelManager = new ModelManager(DeploymentMode.TEST, tracer, sdlcLoader);
ModelManager modelManager = new ModelManager(DeploymentMode.TEST, null, tracer, sdlcLoader);

PureModelContextData pmcdLoaded = modelManager.loadData(pointer, CLIENT_VERSION, Identity.getAnonymousIdentity());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.eclipse.collections.api.block.procedure.Procedure;
import org.eclipse.collections.api.list.MutableList;
Expand All @@ -29,6 +30,7 @@
import org.eclipse.collections.impl.tuple.Tuples;
import org.finos.legend.engine.language.pure.compiler.Compiler;
import org.finos.legend.engine.language.pure.compiler.toPureGraph.PureModel;
import org.finos.legend.engine.language.pure.compiler.toPureGraph.PureModelProcessParameter;
import org.finos.legend.engine.language.pure.grammar.from.PureGrammarParser;
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContext;
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData;
Expand Down Expand Up @@ -57,23 +59,32 @@ public class ModelManager
private final DeploymentMode deploymentMode;
private final MutableList<ModelLoader> modelLoaders;
private final Tracer tracer;
private final ForkJoinPool forkJoinPool;

public ModelManager(DeploymentMode mode, ModelLoader... modelLoaders)
{
this(mode, GlobalTracer.get(), modelLoaders);
this(mode, null, GlobalTracer.get(), modelLoaders);
}

public ModelManager(DeploymentMode mode, Tracer tracer, ModelLoader... modelLoaders)
public ModelManager(DeploymentMode mode, ForkJoinPool forkJoinPool, ModelLoader... modelLoaders)
{
this(mode, forkJoinPool, GlobalTracer.get(), modelLoaders);
}

public ModelManager(DeploymentMode mode, ForkJoinPool forkJoinPool, Tracer tracer, ModelLoader... modelLoaders)
{
this.tracer = tracer;
this.modelLoaders = Lists.mutable.of(modelLoaders);
this.modelLoaders.forEach((Procedure<ModelLoader>) loader -> loader.setModelManager(this));
this.deploymentMode = mode;
this.forkJoinPool = forkJoinPool;
}

// Remove clientVersion
public PureModel loadModel(PureModelContext context, String clientVersion, Identity identity, String packageOffset)
{
PureModelProcessParameter modelProcessParameter = PureModelProcessParameter.newBuilder().withPackagePrefix(packageOffset).withForkJoinPool(this.forkJoinPool).build();

if (!(context instanceof PureModelContextData) && !(context instanceof PureModelContextText))
{
ModelLoader loader = this.modelLoaderForContext(context);
Expand All @@ -82,15 +93,15 @@ public PureModel loadModel(PureModelContext context, String clientVersion, Ident
PureModelContext cacheKey = loader.cacheKey(context, identity);
try
{
return this.pureModelCache.get(cacheKey, () -> Compiler.compile(this.loadData(cacheKey, clientVersion, identity), this.deploymentMode, identity.getName(), packageOffset));
return this.pureModelCache.get(cacheKey, () -> Compiler.compile(this.loadData(cacheKey, clientVersion, identity), this.deploymentMode, identity.getName(), null, modelProcessParameter));
}
catch (ExecutionException e)
{
throw new EngineException("Engine was not able to cache", e);
}
}
}
return Compiler.compile(this.loadData(context, clientVersion, identity), this.deploymentMode, identity.getName(), packageOffset);
return Compiler.compile(this.loadData(context, clientVersion, identity), this.deploymentMode, identity.getName(), null, modelProcessParameter);
}

// Remove clientVersion
Expand Down

0 comments on commit 5bc85ae

Please sign in to comment.