From e7b10823f5937cfcf32c76f165651300e6216166 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 18 Dec 2024 13:43:44 +0100 Subject: [PATCH 01/13] feat(nf-tower): Add `TowerFusionEnv` provider to set required env vars Signed-off-by: Alberto Miranda --- plugins/nf-tower/build.gradle | 2 + .../seqera/tower/plugin/TowerFusionEnv.groovy | 266 +++++++++++ .../exception/BadResponseException.groovy | 7 + .../exception/UnauthorizedException.groovy | 7 + .../exchange/LicenseTokenRequest.groovy | 22 + .../exchange/LicenseTokenResponse.groovy | 23 + .../src/resources/META-INF/extensions.idx | 1 + .../tower/plugin/TowerFusionEnvTest.groovy | 420 ++++++++++++++++++ 8 files changed, 748 insertions(+) create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/BadResponseException.groovy create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/UnauthorizedException.groovy create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenRequest.groovy create mode 100644 plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenResponse.groovy create mode 100644 plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy diff --git a/plugins/nf-tower/build.gradle b/plugins/nf-tower/build.gradle index 347d127762..797e6fb538 100644 --- a/plugins/nf-tower/build.gradle +++ b/plugins/nf-tower/build.gradle @@ -39,4 +39,6 @@ dependencies { testImplementation(testFixtures(project(":nextflow"))) testImplementation "org.apache.groovy:groovy:4.0.24" testImplementation "org.apache.groovy:groovy-nio:4.0.24" + // wiremock required by TowerFusionEnvTest + testImplementation "org.wiremock:wiremock:3.5.4" } diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy new file mode 100644 index 0000000000..4a36fda8b2 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -0,0 +1,266 @@ +package io.seqera.tower.plugin + +import com.google.gson.Gson +import dev.failsafe.Failsafe +import dev.failsafe.RetryPolicy +import dev.failsafe.event.EventListener +import dev.failsafe.event.ExecutionAttemptedEvent +import dev.failsafe.function.CheckedSupplier +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.seqera.tower.plugin.exception.BadResponseException +import io.seqera.tower.plugin.exception.UnauthorizedException +import io.seqera.tower.plugin.exchange.LicenseTokenRequest +import io.seqera.tower.plugin.exchange.LicenseTokenResponse +import nextflow.Global +import nextflow.Session +import nextflow.SysEnv +import nextflow.exception.AbortOperationException +import nextflow.fusion.FusionConfig +import nextflow.fusion.FusionEnv +import nextflow.util.Threads +import org.pf4j.Extension + +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse +import java.time.Duration +import java.time.temporal.ChronoUnit +import java.util.concurrent.Executors +import java.util.function.Predicate + +/** + * Environment provider for Platform-specific environment variables. + * + * @author Alberto Miranda + */ +@Slf4j +@Extension +@CompileStatic +class TowerFusionEnv implements FusionEnv { + + // The endpoint where license-scoped JWT tokens are obtained + private static final String LICENSE_TOKEN_ENDPOINT = 'license/token/' + + // Server errors that should trigger a retry + private static final List SERVER_ERRORS = [429, 500, 502, 503, 504] + + // Default connection timeout for HTTP requests + private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS) + + // Default retry policy settings for HTTP requests: delay, max delay, attempts, and jitter + private static final Duration DEFAULT_RETRY_POLICY_DELAY = Duration.of(450, ChronoUnit.MILLIS) + private static final Duration DEFAULT_RETRY_POLICY_MAX_DELAY = Duration.of(90, ChronoUnit.SECONDS) + private static final int DEFAULT_RETRY_POLICY_MAX_ATTEMPTS = 10 + private static final double DEFAULT_RETRY_POLICY_JITTER = 0.5 + + // The HttpClient instance used to send requests + private final HttpClient httpClient = newDefaultHttpClient() + + // The RetryPolicy instance used to retry requests + private final RetryPolicy retryPolicy = newDefaultRetryPolicy(SERVER_ERRORS) + + // Nextflow session + private final Session session + + // Platform endpoint to use for requests + private final String endpoint + + // Platform access token to use for requests + private final String accessToken + + /** + * Constructor for the class. It initializes the session, endpoint, and access token. + */ + TowerFusionEnv() { + this.session = Global.session as Session + final towerConfig = session.config.navigate('tower') as Map ?: [:] + final env = SysEnv.get() + this.endpoint = endpoint0(towerConfig, env) + this.accessToken = accessToken0(towerConfig, env) + } + + /** + * Return any environment variables relevant to Fusion execution. This method is called + * by {@link nextflow.fusion.FusionEnvProvider#getEnvironment} to determine which + * environment variables are needed for the current run. + * + * @param scheme The scheme for which the environment variables are needed (currently unused) + * @param config The Fusion configuration object + * @return A map of environment variables + */ + @Override + Map getEnvironment(String scheme, FusionConfig config) { + + // TODO(amiranda): Hardcoded for now. We need to find out how to obtain + // the concrete product SKU and version. Candidate: FusionConfig? + final product = 'fusion' + final version = '2.4' + + try { + final token = getLicenseToken(product, version) + return [ + 'FUSION_LICENSE_TOKEN': token, + ] + } catch (Exception e) { + log.warn("Error retrieving Fusion license information: ${e.message}") + return [:] + } + } + + /** + * Send a request to Platform to obtain a license-scoped JWT for Fusion. The request is authenticated using the + * Platform access token provided in the configuration of the current session. + * + * @throws AbortOperationException if a Platform access token cannot be found + * + * @return The signed JWT token + */ + protected String getLicenseToken(product, version) { + // FIXME(amiranda): Find out how to obtain the product and version + // Candidate: FusionConfig? + + if (accessToken == null) { + throw new AbortOperationException("Missing personal access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment") + } + + final req = HttpRequest.newBuilder() + .uri(URI.create("${endpoint}/${LICENSE_TOKEN_ENDPOINT}").normalize()) + .header('Content-Type', 'application/json') + .header('Authorization', "Bearer ${accessToken}") + .POST( + HttpRequest.BodyPublishers.ofString( + new Gson().toJson( + new LicenseTokenRequest( + product: product, + version: version + ), + LicenseTokenRequest.class + ), + ) + ) + .build() + + try { + final resp = safeHttpSend(req, retryPolicy) + + if (resp.statusCode() == 200) { + return new Gson().fromJson(resp.body(), LicenseTokenResponse.class).signedToken + } + + if (resp.statusCode() == 401) { + throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a valid access token") + } + + throw new BadResponseException("Invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}") + + } catch (IOException e) { + throw new IllegalStateException("Unable to send request to '${req.uri()}' : ${e.message}") + } + } + + /************************************************************************** + * Helper methods + *************************************************************************/ + + /** + * Get the configured Platform API endpoint: if the endpoint is not provided in the configuration, we fallback to the + * environment variable `TOWER_API_ENDPOINT`. If neither is provided, we fallback to the default endpoint. + * + * @param opts the configuration options for Platform + * @param env the applicable environment variables + * @return the Platform API endpoint + */ + protected static String endpoint0(Map opts, Map env) { + def result = opts.endpoint as String + if (!result || result == '-') { + result = env.get('TOWER_API_ENDPOINT') ?: TowerClient.DEF_ENDPOINT_URL + } + return result.stripEnd('/') + } + + /** + * Get the configured Platform access token: if `TOWER_WORKFLOW_ID` is provided in the environment, we are running + * in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, check + * the configuration file or fallback to the environment. If no token is found, returns null. + * + * @param opts the configuration options for Platform + * @param env the applicable environment variables + * @return the Platform access token + */ + protected static String accessToken0(Map opts, Map env) { + def token = env.get('TOWER_WORKFLOW_ID') + ? env.get('TOWER_ACCESS_TOKEN') + : opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN') + return token + } + + /** + * Create a new HttpClient instance with default settings + * @return The new HttpClient instance + */ + private static HttpClient newDefaultHttpClient() { + final builder = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_1_1) + .followRedirects(HttpClient.Redirect.NEVER) + .cookieHandler(new CookieManager()) + .connectTimeout(DEFAULT_CONNECTION_TIMEOUT) + // use virtual threads executor if enabled + if ( Threads.useVirtual() ) { + builder.executor(Executors.newVirtualThreadPerTaskExecutor()) + } + // build and return the new client + return builder.build() + } + + /** + * Create a new RetryPolicy instance with default settings and the given list of retryable errors. With this policy, + * a request is retried on IOExceptions and any server errors defined in errorsToRetry. The number of retries, delay, + * max delay, and jitter are controlled by the corresponding values defined at class level. + * + * @return The new RetryPolicy instance + */ + private static RetryPolicy> newDefaultRetryPolicy(List errorsToRetry) { + + final retryOnException = (e -> e instanceof IOException) as Predicate + final retryOnStatusCode = ((HttpResponse resp) -> resp.statusCode() in errorsToRetry) as Predicate> + + final listener = new EventListener>>() { + @Override + void accept(ExecutionAttemptedEvent event) throws Throwable { + def msg = "connection failure - attempt: ${event.attemptCount}" + if (event.lastResult != null) + msg += "; response: ${event.lastResult}" + if (event.lastFailure != null) + msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}" + log.debug(msg) + } + } + return RetryPolicy.> builder() + .handleIf(retryOnException) + .handleResultIf(retryOnStatusCode) + .withBackoff(DEFAULT_RETRY_POLICY_DELAY.toMillis(), DEFAULT_RETRY_POLICY_MAX_DELAY.toMillis(), ChronoUnit.MILLIS) + .withMaxAttempts(DEFAULT_RETRY_POLICY_MAX_ATTEMPTS) + .withJitter(DEFAULT_RETRY_POLICY_JITTER) + .onRetry(listener) + .build() + } + + /** + * Send an HTTP request and return the response. This method automatically retries the request according to the + * given RetryPolicy. + * + * @param req The HttpRequest to send + * @return The HttpResponse received + */ + private HttpResponse safeHttpSend(HttpRequest req, RetryPolicy policy) { + return Failsafe.with(policy).get( + () -> { + log.debug "Request: method:=${req.method()}; uri:=${req.uri()}; request:=${req}" + final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString()) + log.debug "Response: statusCode:=${resp.statusCode()}; body:=${resp.body()}" + return resp + } as CheckedSupplier + ) as HttpResponse + } +} diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/BadResponseException.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/BadResponseException.groovy new file mode 100644 index 0000000000..4de9a30882 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/BadResponseException.groovy @@ -0,0 +1,7 @@ +package io.seqera.tower.plugin.exception + +import groovy.transform.InheritConstructors + +@InheritConstructors +class BadResponseException extends RuntimeException{ +} diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/UnauthorizedException.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/UnauthorizedException.groovy new file mode 100644 index 0000000000..6269a825b5 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exception/UnauthorizedException.groovy @@ -0,0 +1,7 @@ +package io.seqera.tower.plugin.exception + +import groovy.transform.InheritConstructors + +@InheritConstructors +class UnauthorizedException extends RuntimeException { +} diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenRequest.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenRequest.groovy new file mode 100644 index 0000000000..831b6641a8 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenRequest.groovy @@ -0,0 +1,22 @@ +package io.seqera.tower.plugin.exchange + +import groovy.transform.CompileStatic +import groovy.transform.EqualsAndHashCode +import groovy.transform.ToString + +/** + * Models a REST request to obtain a license-scoped JWT token from Platform + * + * @author Alberto Miranda + */ +@EqualsAndHashCode +@ToString(includeNames = true, includePackage = false) +@CompileStatic +class LicenseTokenRequest { + + /** The product code */ + String product + + /** The product version */ + String version +} diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenResponse.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenResponse.groovy new file mode 100644 index 0000000000..a7c41454e9 --- /dev/null +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/exchange/LicenseTokenResponse.groovy @@ -0,0 +1,23 @@ +package io.seqera.tower.plugin.exchange + +import groovy.transform.CompileStatic +import groovy.transform.ToString + +/** + * Models a REST response containing a license-scoped JWT token from Platform + * + * @author Alberto Miranda + */ +@CompileStatic +@ToString(includeNames = true, includePackage = false) +class LicenseTokenResponse { + /** + * The signed JWT token + */ + String signedToken + + /** + * The expiration date of the token + */ + Date expirationDate +} diff --git a/plugins/nf-tower/src/resources/META-INF/extensions.idx b/plugins/nf-tower/src/resources/META-INF/extensions.idx index 68286228bc..8a61ce9a9c 100644 --- a/plugins/nf-tower/src/resources/META-INF/extensions.idx +++ b/plugins/nf-tower/src/resources/META-INF/extensions.idx @@ -10,3 +10,4 @@ # io.seqera.tower.plugin.TowerFactory +io.seqera.tower.plugin.TowerFusionEnv diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy new file mode 100644 index 0000000000..cdd35d49de --- /dev/null +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy @@ -0,0 +1,420 @@ +package io.seqera.tower.plugin + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock +import io.seqera.tower.plugin.exception.UnauthorizedException +import nextflow.Global +import nextflow.Session +import nextflow.SysEnv +import nextflow.exception.AbortOperationException +import nextflow.fusion.FusionConfig +import spock.lang.Shared +import spock.lang.Specification + +/** + * Test cases for the TowerFusionEnv class. + * + * @author Alberto Miranda + */ +class TowerFusionEnvTest extends Specification { + + @Shared + WireMockServer wireMockServer + + def setupSpec() { + wireMockServer = new WireMockServer(18080) + wireMockServer.start() + } + + def cleanupSpec() { + wireMockServer.stop() + } + + def setup() { + wireMockServer.resetAll() + SysEnv.push([:]) // <-- ensure the system host env does not interfere + } + + def cleanup() { + SysEnv.pop() // <-- restore the system host env + } + + + def 'should return the endpoint from the config'() { + given: 'a session' + Global.session = Mock(Session) { + config >> [ + tower: [ + endpoint: 'https://tower.nf' + ] + ] + } + + when: 'the provider is created' + def provider = new TowerFusionEnv() + + then: 'the endpoint has the expected value' + provider.endpoint == 'https://tower.nf' + } + + def 'should return the endpoint from the environment'() { + setup: + SysEnv.push(['TOWER_API_ENDPOINT': 'https://tower.nf']) + Global.session = Mock(Session) { + config >> [:] + } + + when: 'the provider is created' + def provider = new TowerFusionEnv() + + then: 'the endpoint has the expected value' + provider.endpoint == 'https://tower.nf' + + cleanup: + SysEnv.pop() + } + + def 'should return the default endpoint'() { + when: 'session config is empty' + Global.session = Mock(Session) { + config >> [ + tower: [:] + ] + } + def provider = new TowerFusionEnv() + + then: 'the endpoint has the expected value' + provider.endpoint == TowerClient.DEF_ENDPOINT_URL + + when: 'session config is null' + Global.session = Mock(Session) { + config >> null + } + + then: 'the endpoint has the expected value' + provider.endpoint == TowerClient.DEF_ENDPOINT_URL + + when: 'session config is missing' + Global.session = Mock(Session) { + config >> [:] + } + + then: 'the endpoint has the expected value' + provider.endpoint == TowerClient.DEF_ENDPOINT_URL + + when: 'session.config.tower.endpoint is not defined' + Global.session = Mock(Session) { + config >> [ + tower: [:] + ] + } + + then: 'the endpoint has the expected value' + provider.endpoint == TowerClient.DEF_ENDPOINT_URL + + when: 'session.config.tower.endpoint is null' + Global.session = Mock(Session) { + config >> [ + tower: [ + endpoint: null + ] + ] + } + + then: 'the endpoint has the expected value' + + when: 'session.config.tower.endpoint is empty' + Global.session = Mock(Session) { + config >> [ + tower: [ + endpoint: '' + ] + ] + } + + then: 'the endpoint has the expected value' + + when: 'session.config.tower.endpoint is defined as "-"' + Global.session = Mock(Session) { + config >> [ + tower: [ + endpoint: '-' + ] + ] + } + + then: 'the endpoint has the expected value' + } + + def 'should return the access token from the config'() { + given: 'a session' + Global.session = Mock(Session) { + config >> [ + tower: [ + accessToken: 'abc123' + ] + ] + } + + when: 'the provider is created' + def provider = new TowerFusionEnv() + + then: 'the access token has the expected value' + provider.accessToken == 'abc123' + } + + def 'should return the access token from the environment'() { + setup: + Global.session = Mock(Session) { + config >> [:] + } + SysEnv.push(['TOWER_ACCESS_TOKEN': 'abc123']) + + when: 'the provider is created' + def provider = new TowerFusionEnv() + + then: 'the access token has the expected value' + provider.accessToken == 'abc123' + + cleanup: + SysEnv.pop() + } + + def 'should prefer the access token from the config'() { + setup: + Global.session = Mock(Session) { + config >> [ + tower: [ + accessToken: 'abc123' + ] + ] + } + SysEnv.push(['TOWER_ACCESS_TOKEN': 'xyz789']) + + when: 'the provider is created' + def provider = new TowerFusionEnv() + + then: 'the access token has the expected value' + provider.accessToken == 'abc123' + + cleanup: + SysEnv.pop() + } + + def 'should prefer the access token from the config despite being null'() { + setup: + Global.session = Mock(Session) { + config >> [ + tower: [ + accessToken: null + ] + ] + } + SysEnv.push(['TOWER_ACCESS_TOKEN': 'xyz789']) + + when: 'the provider is created' + def provider = new TowerFusionEnv() + + then: 'the access token has the expected value' + provider.accessToken == null + + cleanup: + SysEnv.pop() + } + + def 'should prefer the access token from the environment if TOWER_WORKFLOW_ID is set'() { + setup: + Global.session = Mock(Session) { + config >> [ + tower: [ + accessToken: 'abc123' + ] + ] + } + SysEnv.push(['TOWER_ACCESS_TOKEN' : 'xyz789', 'TOWER_WORKFLOW_ID': '123']) + + when: 'the provider is created' + def provider = new TowerFusionEnv() + + then: 'the access token has the expected value' + provider.accessToken == 'xyz789' + + cleanup: + SysEnv.pop() + } + + def 'should get a license token'() { + given: 'a TowerFusionEnv provider' + Global.session = Mock(Session) { + config >> [ + tower: [ + endpoint : 'http://localhost:18080', + accessToken: 'abc123' + ] + ] + } + def provider = new TowerFusionEnv() + + and: 'a mock endpoint returning a valid token' + wireMockServer.stubFor( + WireMock.post(WireMock.urlEqualTo("/license/token/")) + .withHeader('Authorization', WireMock.equalTo('Bearer abc123')) + .willReturn( + WireMock.aResponse() + .withStatus(200) + .withHeader('Content-Type', 'application/json') + .withBody('{"signedToken":"xyz789"}') + ) + ) + + when: 'a license token is requested' + final token = provider.getLicenseToken(PRODUCT, VERSION) + + then: 'the token has the expected value' + token == 'xyz789' + + and: 'the request is correct' + wireMockServer.verify(1, WireMock.postRequestedFor(WireMock.urlEqualTo("/license/token/")) + .withHeader('Authorization', WireMock.equalTo('Bearer abc123'))) + + where: + PRODUCT | VERSION + 'some-product' | 'some-version' + 'some-product' | null + null | 'some-version' + null | null + } + + def 'should fail getting a token if the Platform configuration is missing'() { + given: 'a TowerFusionEnv provider' + Global.session = Mock(Session) { + config >> [:] + } + def provider = new TowerFusionEnv() + + when: 'a license token is requested' + provider.getLicenseToken('some-product', 'some-version') + + then: 'an exception is thrown' + final ex = thrown(AbortOperationException) + ex.message == 'Missing personal access token -- Make sure there\'s a variable TOWER_ACCESS_TOKEN in your environment' + } + + def 'should fail getting a token if the Platform configuration is empty'() { + given: 'a TowerFusionEnv provider' + Global.session = Mock(Session) { + config >> [ + tower: [:] + ] + } + def provider = new TowerFusionEnv() + + when: 'a license token is requested' + provider.getLicenseToken('some-product', 'some-version') + + then: 'an exception is thrown' + final ex = thrown(AbortOperationException) + ex.message == 'Missing personal access token -- Make sure there\'s a variable TOWER_ACCESS_TOKEN in your environment' + } + + def 'should fail getting a token if the Platform access token is missing'() { + given: 'a TowerFusionEnv provider' + Global.session = Mock(Session) { + config >> [ + tower: [ + endpoint: 'http://localhost:18080' + ] + ] + } + def provider = new TowerFusionEnv() + + when: 'a license token is requested' + provider.getLicenseToken('some-product', 'some-version') + + then: 'an exception is thrown' + final ex = thrown(AbortOperationException) + ex.message == 'Missing personal access token -- Make sure there\'s a variable TOWER_ACCESS_TOKEN in your environment' + } + + def 'should throw UnauthorizedException if getting a token fails with 401'() { + given: 'a TowerFusionEnv provider' + Global.session = Mock(Session) { + config >> [ + tower: [ + endpoint : 'http://localhost:18080', + accessToken: 'abc123' + ] + ] + } + def provider = new TowerFusionEnv() + + and: 'a mock endpoint returning an error' + wireMockServer.stubFor( + WireMock.post(WireMock.urlEqualTo("/license/token/")) + .withHeader('Authorization', WireMock.equalTo('Bearer abc123')) + .willReturn( + WireMock.aResponse() + .withStatus(401) + .withHeader('Content-Type', 'application/json') + .withBody('{"error":"Unauthorized"}') + ) + ) + + when: 'a license token is requested' + provider.getLicenseToken('some-product', 'some-version') + + then: 'an exception is thrown' + thrown(UnauthorizedException) + } + + def 'should return a valid environment' () { + given: 'a TowerFusionEnv provider' + Global.session = Mock(Session) { + config >> [:] + } + def provider = Spy(TowerFusionEnv) + + when: 'the environment is requested' + def env = provider.getEnvironment('s3', Mock(FusionConfig)) + + then: 'the environment has the expected values' + 1 * provider.getLicenseToken(_, _) >> 'xyz789' + env == [FUSION_LICENSE_TOKEN: 'xyz789'] + } + + def 'should return an empty environment if no Platform config is available' () { + given: 'a session with no config for Platform' + Global.session = Mock(Session) { + config >> [:] + } + + when: 'the environment is requested' + def provider = new TowerFusionEnv() + def env = provider.getEnvironment('-', Mock(FusionConfig)) + + then: 'the environment is empty' + env == [:] + } + + def 'should return an empty environment if the license token cannot be obtained' () { + given: 'a TowerFusionEnv provider' + Global.session = Mock(Session) { + config >> [ + tower: [ + endpoint : 'http://localhost:18080', + accessToken: 'abc123' + ] + ] + } + def provider = Spy(TowerFusionEnv) + + when: 'the environment is requested' + def env = provider.getEnvironment('s3', Mock(FusionConfig)) + + then: 'the environment has the expected values' + 1 * provider.getLicenseToken(_, _) >> { + throw new Exception('error') + } + env == [:] + } +} From c1147cdc922a9a5423e1ea5131cbf7c5a70ce497 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 18 Dec 2024 16:55:02 +0100 Subject: [PATCH 02/13] refactor: Move common Platform code to `nextflow.platform.PlatformHelper` Signed-off-by: Alberto Miranda --- .../nextflow/platform/PlatformHelper.groovy | 74 +++++++++++++++++++ .../seqera/tower/plugin/TowerFusionEnv.groovy | 37 +--------- .../wave/plugin/config/TowerConfig.groovy | 50 ++----------- 3 files changed, 82 insertions(+), 79 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/platform/PlatformHelper.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/platform/PlatformHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/platform/PlatformHelper.groovy new file mode 100644 index 0000000000..afe51fddce --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/platform/PlatformHelper.groovy @@ -0,0 +1,74 @@ +package nextflow.platform + +import groovy.transform.CompileStatic + +/** + * Helper methods for Platform-related operations + * + * @author Alberto Miranda + */ +@CompileStatic +class PlatformHelper { + + /** + * Get the configured Platform API endpoint: if the endpoint is not provided in the configuration, we fallback to the + * environment variable `TOWER_API_ENDPOINT`. If neither is provided, we fallback to the default endpoint. + * + * @param opts the configuration options for Platform (e.g. `session.config.navigate('tower')`) + * @param env the applicable environment variables + * @return the Platform API endpoint + */ + static String getEndpoint(Map opts, Map env) { + def result = opts.endpoint as String + if( !result || result=='-' ) + result = env.get('TOWER_API_ENDPOINT') ?: 'https://api.cloud.seqera.io' + return result.stripEnd('/') + } + + /** + * Return the configured Platform access token: if `TOWER_WORKFLOW_ID` is provided in the environment, it means + * we are running in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, + * check the configuration or fallback to the environment. If no token is found, return null. + * + * @param opts the configuration options for Platform (e.g. `session.config.navigate('tower')`) + * @param env the applicable environment variables + * @return the Platform access token + */ + static String getAccessToken(Map opts, Map env) { + def token = env.get('TOWER_WORKFLOW_ID') + ? env.get('TOWER_ACCESS_TOKEN') + : opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN') + return token + } + + /** + * Return the configured Platform refresh token: if `TOWER_WORKFLOW_ID` is provided in the environment, it means + * we are running in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, + * check the configuration or fallback to the environment. If no token is found, return null. + * + * @param opts the configuration options for Platform (e.g. `session.config.navigate('tower')`) + * @param env the applicable environment variables + * @return the Platform refresh token + */ + static String getRefreshToken(Map opts, Map env) { + def token = env.get('TOWER_WORKFLOW_ID') + ? env.get('TOWER_REFRESH_TOKEN') + : opts.containsKey('refreshToken') ? opts.refreshToken as String : env.get('TOWER_REFRESH_TOKEN') + return token + } + + /** + * Return the Platform Workspace ID: if `TOWER_WORKFLOW_ID` is provided in the environment, it means we are running + * in a Platform-made run and we should ONLY retrieve the workspace ID from the environment. Otherwise, check the + * configuration or fallback to the environment. If no workspace ID is found, return null. + * @param opts + * @param env + * @return + */ + static String getWorkspaceId(Map opts, Map env) { + def workspaceId = env.get('TOWER_WORKFLOW_ID') + ? env.get('TOWER_WORKSPACE_ID') + : opts.workspaceId as Long ?: env.get('TOWER_WORKSPACE_ID') as Long + return workspaceId + } +} diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index 4a36fda8b2..b29e18140b 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -18,6 +18,7 @@ import nextflow.SysEnv import nextflow.exception.AbortOperationException import nextflow.fusion.FusionConfig import nextflow.fusion.FusionEnv +import nextflow.platform.PlatformHelper import nextflow.util.Threads import org.pf4j.Extension @@ -76,8 +77,8 @@ class TowerFusionEnv implements FusionEnv { this.session = Global.session as Session final towerConfig = session.config.navigate('tower') as Map ?: [:] final env = SysEnv.get() - this.endpoint = endpoint0(towerConfig, env) - this.accessToken = accessToken0(towerConfig, env) + this.endpoint = PlatformHelper.getEndpoint(towerConfig, env) + this.accessToken = PlatformHelper.getAccessToken(towerConfig, env) } /** @@ -163,38 +164,6 @@ class TowerFusionEnv implements FusionEnv { * Helper methods *************************************************************************/ - /** - * Get the configured Platform API endpoint: if the endpoint is not provided in the configuration, we fallback to the - * environment variable `TOWER_API_ENDPOINT`. If neither is provided, we fallback to the default endpoint. - * - * @param opts the configuration options for Platform - * @param env the applicable environment variables - * @return the Platform API endpoint - */ - protected static String endpoint0(Map opts, Map env) { - def result = opts.endpoint as String - if (!result || result == '-') { - result = env.get('TOWER_API_ENDPOINT') ?: TowerClient.DEF_ENDPOINT_URL - } - return result.stripEnd('/') - } - - /** - * Get the configured Platform access token: if `TOWER_WORKFLOW_ID` is provided in the environment, we are running - * in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, check - * the configuration file or fallback to the environment. If no token is found, returns null. - * - * @param opts the configuration options for Platform - * @param env the applicable environment variables - * @return the Platform access token - */ - protected static String accessToken0(Map opts, Map env) { - def token = env.get('TOWER_WORKFLOW_ID') - ? env.get('TOWER_ACCESS_TOKEN') - : opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN') - return token - } - /** * Create a new HttpClient instance with default settings * @return The new HttpClient instance diff --git a/plugins/nf-wave/src/main/io/seqera/wave/plugin/config/TowerConfig.groovy b/plugins/nf-wave/src/main/io/seqera/wave/plugin/config/TowerConfig.groovy index 5a0877039f..ee8a126857 100644 --- a/plugins/nf-wave/src/main/io/seqera/wave/plugin/config/TowerConfig.groovy +++ b/plugins/nf-wave/src/main/io/seqera/wave/plugin/config/TowerConfig.groovy @@ -19,6 +19,7 @@ package io.seqera.wave.plugin.config import groovy.transform.CompileStatic import groovy.transform.ToString +import nextflow.platform.PlatformHelper /** * Model Tower config accessed by Wave @@ -40,51 +41,10 @@ class TowerConfig { final String workflowId TowerConfig(Map opts, Map env) { - this.accessToken = accessToken0(opts, env) - this.refreshToken = refreshToken0(opts, env) - this.workspaceId = workspaceId0(opts, env) as Long - this.endpoint = endpoint0(opts, env) + this.accessToken = PlatformHelper.getAccessToken(opts, env) + this.refreshToken = PlatformHelper.getRefreshToken(opts, env) + this.workspaceId = PlatformHelper.getWorkspaceId(opts, env) as Long + this.endpoint = PlatformHelper.getEndpoint(opts, env) this.workflowId = env.get('TOWER_WORKFLOW_ID') } - - private String endpoint0(Map opts, Map env) { - def result = opts.endpoint as String - if( !result || result=='-' ) - result = env.get('TOWER_API_ENDPOINT') ?: 'https://api.cloud.seqera.io' - return result.stripEnd('/') - } - - private String accessToken0(Map opts, Map env) { - // when 'TOWER_WORKFLOW_ID' is provided in the env, it's a tower made launch - // therefore the access token should only be taken from the env - // otherwise check into the config file and fallback in the env - // see also - // https://github.com/nextflow-io/nextflow/blob/master/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy#L369-L377 - def token = env.get('TOWER_WORKFLOW_ID') - ? env.get('TOWER_ACCESS_TOKEN') - : opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN') - return token - } - - private String refreshToken0(Map opts, Map env) { - // when 'TOWER_WORKFLOW_ID' is provided in the env, it's a tower made launch - // therefore the access token should only be taken from the env - // otherwise check into the config file and fallback in the env - // see also - // https://github.com/nextflow-io/nextflow/blob/master/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerClient.groovy#L369-L377 - def token = env.get('TOWER_WORKFLOW_ID') - ? env.get('TOWER_REFRESH_TOKEN') - : opts.containsKey('refreshToken') ? opts.refreshToken as String : env.get('TOWER_REFRESH_TOKEN') - return token - } - - private String workspaceId0(Map opts, Map env) { - // when 'TOWER_WORKFLOW_ID' is provided in the env, it's a tower made launch - // therefore the workspace should only be taken from the env - // otherwise check into the config file and fallback in the env - def workspaceId = env.get('TOWER_WORKFLOW_ID') - ? env.get('TOWER_WORKSPACE_ID') - : opts.workspaceId as Long ?: env.get('TOWER_WORKSPACE_ID') as Long - return workspaceId - } } From 20c8a535dca9cb55e84f19682cfe3031baa75309 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 18 Dec 2024 16:56:11 +0100 Subject: [PATCH 03/13] chore: Rename `LICENSE_TOKEN_ENDPOINT` to `LICENSE_TOKEN_PATH` Signed-off-by: Alberto Miranda --- .../src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index b29e18140b..45df5186ef 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -40,8 +40,8 @@ import java.util.function.Predicate @CompileStatic class TowerFusionEnv implements FusionEnv { - // The endpoint where license-scoped JWT tokens are obtained - private static final String LICENSE_TOKEN_ENDPOINT = 'license/token/' + // The path relative to the Platform endpoint where license-scoped JWT tokens are obtained + private static final String LICENSE_TOKEN_PATH = 'license/token/' // Server errors that should trigger a retry private static final List SERVER_ERRORS = [429, 500, 502, 503, 504] @@ -126,7 +126,7 @@ class TowerFusionEnv implements FusionEnv { } final req = HttpRequest.newBuilder() - .uri(URI.create("${endpoint}/${LICENSE_TOKEN_ENDPOINT}").normalize()) + .uri(URI.create("${endpoint}/${LICENSE_TOKEN_PATH}").normalize()) .header('Content-Type', 'application/json') .header('Authorization', "Bearer ${accessToken}") .POST( From f32f0ca129e35e1305cb8389f0d83d527441ee16 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 18 Dec 2024 16:58:18 +0100 Subject: [PATCH 04/13] style: Remove quotes from key name Signed-off-by: Alberto Miranda --- .../src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index 45df5186ef..4f35b69402 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -101,7 +101,7 @@ class TowerFusionEnv implements FusionEnv { try { final token = getLicenseToken(product, version) return [ - 'FUSION_LICENSE_TOKEN': token, + FUSION_LICENSE_TOKEN: token, ] } catch (Exception e) { log.warn("Error retrieving Fusion license information: ${e.message}") From 25cd5ae9987a5bc518a0abed150997ca759b0991 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 18 Dec 2024 16:58:49 +0100 Subject: [PATCH 05/13] style: Prefer `Map.of()` to `[:]` Signed-off-by: Alberto Miranda --- .../src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index 4f35b69402..9f467148d7 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -105,7 +105,7 @@ class TowerFusionEnv implements FusionEnv { ] } catch (Exception e) { log.warn("Error retrieving Fusion license information: ${e.message}") - return [:] + return Map.of() } } From 2f11171ac1ab67d4bc79227563ddfddcc40ad6bc Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 18 Dec 2024 17:24:34 +0100 Subject: [PATCH 06/13] refactor: Encapsulate request/response serialization/deserialization Signed-off-by: Alberto Miranda --- .../seqera/tower/plugin/TowerFusionEnv.groovy | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index 9f467148d7..9a79587ea9 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -117,7 +117,7 @@ class TowerFusionEnv implements FusionEnv { * * @return The signed JWT token */ - protected String getLicenseToken(product, version) { + protected String getLicenseToken(String product, String version) { // FIXME(amiranda): Find out how to obtain the product and version // Candidate: FusionConfig? @@ -131,13 +131,7 @@ class TowerFusionEnv implements FusionEnv { .header('Authorization', "Bearer ${accessToken}") .POST( HttpRequest.BodyPublishers.ofString( - new Gson().toJson( - new LicenseTokenRequest( - product: product, - version: version - ), - LicenseTokenRequest.class - ), + makeLicenseTokenRequest(product, version) ) ) .build() @@ -146,7 +140,8 @@ class TowerFusionEnv implements FusionEnv { final resp = safeHttpSend(req, retryPolicy) if (resp.statusCode() == 200) { - return new Gson().fromJson(resp.body(), LicenseTokenResponse.class).signedToken + final ret = parseLicenseTokenResponse(resp) + return ret.signedToken } if (resp.statusCode() == 401) { @@ -232,4 +227,31 @@ class TowerFusionEnv implements FusionEnv { } as CheckedSupplier ) as HttpResponse } + + /** + * Create a JSON string representing a {@link LicenseTokenRequest} object + * + * @param product The product SKU + * @param version The version + * @return The resulting JSON string + */ + private static String makeLicenseTokenRequest(String product, String version) { + return new Gson().toJson( + new LicenseTokenRequest( + product: product, + version: version + ), + LicenseTokenRequest.class + ) + } + + /** + * Parse a JSON string into a {@link LicenseTokenResponse} object + * + * @param stringHttpResponse The HttpResponse containing the JSON string + * @return The resulting LicenseTokenResponse object + */ + private static LicenseTokenResponse parseLicenseTokenResponse(HttpResponse resp) { + return new Gson().fromJson(resp.body(), LicenseTokenResponse.class) + } } From 94f6075fa27cdba2db7d75eee5740cd9e6318d8b Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 20 Dec 2024 13:11:11 +0100 Subject: [PATCH 07/13] chore: Get fusion's SKU from `FusionConfig` Signed-off-by: Alberto Miranda --- .../main/groovy/nextflow/fusion/FusionConfig.groovy | 12 ++++++++++++ .../io/seqera/tower/plugin/TowerFusionEnv.groovy | 6 ++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionConfig.groovy index b17812baea..6ec3aab344 100644 --- a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionConfig.groovy @@ -40,6 +40,7 @@ class FusionConfig { final static public String FUSION_PATH = '/usr/bin/fusion' + final static private PRODUCT_NAME = 'fusion' final static private Pattern VERSION_JSON = ~/https:\/\/.*\/releases\/v(\d+(?:\.\w+)*)-(\w*)\.json$/ final private Boolean enabled @@ -122,6 +123,17 @@ class FusionConfig { return null } + /** + * Return the Fusion SKU string + * + * @return A string representing the Fusion SKU + */ + String sku() { + return enabled + ? PRODUCT_NAME + : null + } + String version() { return enabled ? retrieveFusionVersion(this.containerConfigUrl ?: DEFAULT_FUSION_AMD64_URL) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index 9a79587ea9..e7ce13d608 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -93,10 +93,8 @@ class TowerFusionEnv implements FusionEnv { @Override Map getEnvironment(String scheme, FusionConfig config) { - // TODO(amiranda): Hardcoded for now. We need to find out how to obtain - // the concrete product SKU and version. Candidate: FusionConfig? - final product = 'fusion' - final version = '2.4' + final product = config.sku() + final version = config.version() try { final token = getLicenseToken(product, version) From 1fed85918c016495c598fc805f4bbce4d1d3fce4 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 20 Dec 2024 13:12:07 +0100 Subject: [PATCH 08/13] chore: Improve javadoc for `getLicenseToken()` Signed-off-by: Alberto Miranda --- .../src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index e7ce13d608..ba6b8a2144 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -112,6 +112,9 @@ class TowerFusionEnv implements FusionEnv { * Platform access token provided in the configuration of the current session. * * @throws AbortOperationException if a Platform access token cannot be found + * @throws UnauthorizedException if the access token is invalid + * @throws BadResponseException if the response is not as expected + * @throws IllegalStateException if the request cannot be sent * * @return The signed JWT token */ From 6ce5be3dc2a152d2642aba4af779bf3254d8b2c4 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 20 Dec 2024 16:10:08 +0100 Subject: [PATCH 09/13] chore: Ensure requests are retried on 408 error Signed-off-by: Alberto Miranda --- .../src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index ba6b8a2144..c6f51e6f91 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -44,7 +44,7 @@ class TowerFusionEnv implements FusionEnv { private static final String LICENSE_TOKEN_PATH = 'license/token/' // Server errors that should trigger a retry - private static final List SERVER_ERRORS = [429, 500, 502, 503, 504] + private static final List SERVER_ERRORS = [408, 429, 500, 502, 503, 504] // Default connection timeout for HTTP requests private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS) From a9849f55891aebac3bb04a4a75b98e18ad39e5a5 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 20 Dec 2024 16:41:49 +0100 Subject: [PATCH 10/13] refactor: Cache license token responses Signed-off-by: Alberto Miranda --- .../seqera/tower/plugin/TowerFusionEnv.groovy | 134 ++++++++++++------ 1 file changed, 88 insertions(+), 46 deletions(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index c6f51e6f91..75c3b10d61 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -1,6 +1,9 @@ package io.seqera.tower.plugin +import com.google.common.cache.Cache +import com.google.common.cache.CacheBuilder import com.google.gson.Gson +import com.google.gson.JsonSyntaxException import dev.failsafe.Failsafe import dev.failsafe.RetryPolicy import dev.failsafe.event.EventListener @@ -61,6 +64,14 @@ class TowerFusionEnv implements FusionEnv { // The RetryPolicy instance used to retry requests private final RetryPolicy retryPolicy = newDefaultRetryPolicy(SERVER_ERRORS) + // Time-to-live for cached tokens + private Duration tokenTTL = Duration.of(1, ChronoUnit.HOURS) + + // Cache used for storing license tokens + private Cache tokenCache = CacheBuilder.newBuilder() + .expireAfterWrite(tokenTTL) + .build() + // Nextflow session private final Session session @@ -112,48 +123,32 @@ class TowerFusionEnv implements FusionEnv { * Platform access token provided in the configuration of the current session. * * @throws AbortOperationException if a Platform access token cannot be found - * @throws UnauthorizedException if the access token is invalid - * @throws BadResponseException if the response is not as expected - * @throws IllegalStateException if the request cannot be sent * * @return The signed JWT token */ - protected String getLicenseToken(String product, String version) { - // FIXME(amiranda): Find out how to obtain the product and version - // Candidate: FusionConfig? - + protected String getLicenseToken(String product, String version) throws AbortOperationException { if (accessToken == null) { throw new AbortOperationException("Missing personal access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment") } - final req = HttpRequest.newBuilder() - .uri(URI.create("${endpoint}/${LICENSE_TOKEN_PATH}").normalize()) - .header('Content-Type', 'application/json') - .header('Authorization', "Bearer ${accessToken}") - .POST( - HttpRequest.BodyPublishers.ofString( - makeLicenseTokenRequest(product, version) - ) - ) - .build() - - try { - final resp = safeHttpSend(req, retryPolicy) - - if (resp.statusCode() == 200) { - final ret = parseLicenseTokenResponse(resp) - return ret.signedToken - } - - if (resp.statusCode() == 401) { - throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a valid access token") - } + final req = new LicenseTokenRequest( + product: product, + version: version + ) - throw new BadResponseException("Invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}") + final key = '${product}-${version}' + def resp = tokenCache.get( + key, + () -> sendRequest(req) + ) as LicenseTokenResponse - } catch (IOException e) { - throw new IllegalStateException("Unable to send request to '${req.uri()}' : ${e.message}") + if( resp.expirationDate.before(new Date()) ) { + log.debug "Cached token already expired; refreshing" + resp = sendRequest(req) + tokenCache.put(key, resp) } + + return resp.signedToken } /************************************************************************** @@ -230,29 +225,76 @@ class TowerFusionEnv implements FusionEnv { } /** - * Create a JSON string representing a {@link LicenseTokenRequest} object + * Create a {@link HttpRequest} representing a {@link LicenseTokenRequest} object + * + * @param req The LicenseTokenRequest object + * @return The resulting HttpRequest object + */ + private HttpRequest makeHttpRequest(LicenseTokenRequest req) { + return HttpRequest.newBuilder() + .uri(URI.create("${endpoint}/${LICENSE_TOKEN_PATH}").normalize()) + .header('Content-Type', 'application/json') + .header('Authorization', "Bearer ${accessToken}") + .POST( + HttpRequest.BodyPublishers.ofString( + serializeToJson(req) + ) + ) + .build() + } + + /** + * Serialize a {@link LicenseTokenRequest} object into a JSON string * - * @param product The product SKU - * @param version The version + * @param req The LicenseTokenRequest object * @return The resulting JSON string */ - private static String makeLicenseTokenRequest(String product, String version) { - return new Gson().toJson( - new LicenseTokenRequest( - product: product, - version: version - ), - LicenseTokenRequest.class - ) + private static String serializeToJson(LicenseTokenRequest req) { + return new Gson().toJson(req) } /** * Parse a JSON string into a {@link LicenseTokenResponse} object * - * @param stringHttpResponse The HttpResponse containing the JSON string + * @param resp The String containing the JSON representation of the LicenseTokenResponse object * @return The resulting LicenseTokenResponse object + * + * @throws JsonSyntaxException if the JSON string is not well-formed + */ + private static LicenseTokenResponse parseLicenseTokenResponse(String resp) throws JsonSyntaxException { + return new Gson().fromJson(resp, LicenseTokenResponse.class) + } + + /** + * Request a license token from Platform. + * + * @param req The LicenseTokenRequest object + * @return The LicenseTokenResponse object + * + * @throws AbortOperationException if a Platform access token cannot be found + * @throws UnauthorizedException if the access token is invalid + * @throws BadResponseException if the response is not as expected + * @throws IllegalStateException if the request cannot be sent */ - private static LicenseTokenResponse parseLicenseTokenResponse(HttpResponse resp) { - return new Gson().fromJson(resp.body(), LicenseTokenResponse.class) + private LicenseTokenResponse sendRequest(LicenseTokenRequest req) throws AbortOperationException, UnauthorizedException, BadResponseException, IllegalStateException { + + final httpReq = makeHttpRequest(req) + + try { + final resp = safeHttpSend(httpReq, retryPolicy) + + if( resp.statusCode() == 200 ) { + final ret = parseLicenseTokenResponse(resp.body()) + return ret + } + + if( resp.statusCode() == 401 ) { + throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a valid access token") + } + + throw new BadResponseException("Invalid response: ${httpReq.method()} ${httpReq.uri()} [${resp.statusCode()}] ${resp.body()}") + } catch (IOException e) { + throw new IllegalStateException("Unable to send request to '${httpReq.uri()}' : ${e.message}") + } } } From eaa985ee92951b9ba7a3fb11965da9ca21d22d0d Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 20 Dec 2024 16:58:59 +0100 Subject: [PATCH 11/13] chore: Scope license-related errors as debug level Signed-off-by: Alberto Miranda --- .../src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index 75c3b10d61..b89a7ff9aa 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -113,7 +113,7 @@ class TowerFusionEnv implements FusionEnv { FUSION_LICENSE_TOKEN: token, ] } catch (Exception e) { - log.warn("Error retrieving Fusion license information: ${e.message}") + log.debug("Error retrieving Fusion license information: ${e.message}") return Map.of() } } From 43f8bba068db13895486f1222d5b343d3aa89762 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 20 Dec 2024 17:30:22 +0100 Subject: [PATCH 12/13] chore: Fix failing tests Signed-off-by: Alberto Miranda --- .../seqera/tower/plugin/TowerFusionEnv.groovy | 28 +++++++++++-------- .../tower/plugin/TowerFusionEnvTest.groovy | 7 ++++- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy index b89a7ff9aa..0fd858bbcd 100644 --- a/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy +++ b/plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy @@ -2,6 +2,7 @@ package io.seqera.tower.plugin import com.google.common.cache.Cache import com.google.common.cache.CacheBuilder +import com.google.common.util.concurrent.UncheckedExecutionException import com.google.gson.Gson import com.google.gson.JsonSyntaxException import dev.failsafe.Failsafe @@ -136,19 +137,22 @@ class TowerFusionEnv implements FusionEnv { version: version ) - final key = '${product}-${version}' - def resp = tokenCache.get( - key, - () -> sendRequest(req) - ) as LicenseTokenResponse - - if( resp.expirationDate.before(new Date()) ) { - log.debug "Cached token already expired; refreshing" - resp = sendRequest(req) - tokenCache.put(key, resp) + try { + final key = '${product}-${version}' + def resp = tokenCache.get( + key, + () -> sendRequest(req) + ) as LicenseTokenResponse + + if( resp.expirationDate.before(new Date()) ) { + log.debug "Cached token already expired; refreshing" + resp = sendRequest(req) + tokenCache.put(key, resp) + } + return resp.signedToken + } catch (UncheckedExecutionException e) { + throw e.getCause() } - - return resp.signedToken } /************************************************************************** diff --git a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy index cdd35d49de..a7e47e683b 100644 --- a/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy +++ b/plugins/nf-tower/src/test/io/seqera/tower/plugin/TowerFusionEnvTest.groovy @@ -2,6 +2,7 @@ package io.seqera.tower.plugin import com.github.tomakehurst.wiremock.WireMockServer import com.github.tomakehurst.wiremock.client.WireMock +import groovy.json.JsonOutput import io.seqera.tower.plugin.exception.UnauthorizedException import nextflow.Global import nextflow.Session @@ -11,6 +12,8 @@ import nextflow.fusion.FusionConfig import spock.lang.Shared import spock.lang.Specification +import java.time.temporal.ChronoUnit + /** * Test cases for the TowerFusionEnv class. * @@ -256,6 +259,8 @@ class TowerFusionEnvTest extends Specification { def provider = new TowerFusionEnv() and: 'a mock endpoint returning a valid token' + final now = new Date().toInstant() + final expirationDate = JsonOutput.toJson(Date.from(now.plus(1, ChronoUnit.DAYS))) wireMockServer.stubFor( WireMock.post(WireMock.urlEqualTo("/license/token/")) .withHeader('Authorization', WireMock.equalTo('Bearer abc123')) @@ -263,7 +268,7 @@ class TowerFusionEnvTest extends Specification { WireMock.aResponse() .withStatus(200) .withHeader('Content-Type', 'application/json') - .withBody('{"signedToken":"xyz789"}') + .withBody('{"signedToken":"xyz789", "expirationDate":' + expirationDate + '}') ) ) From 073ebdef1f8635308253b527db7a25c61d4eff7a Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Tue, 21 Jan 2025 18:03:28 +0100 Subject: [PATCH 13/13] Minor changes Signed-off-by: Paolo Di Tommaso --- .../src/main/groovy/nextflow/fusion/FusionConfig.groovy | 2 +- .../src/main/groovy/nextflow/platform/PlatformHelper.groovy | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionConfig.groovy index 6ec3aab344..45d1bd489a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionConfig.groovy @@ -40,7 +40,7 @@ class FusionConfig { final static public String FUSION_PATH = '/usr/bin/fusion' - final static private PRODUCT_NAME = 'fusion' + final static private String PRODUCT_NAME = 'fusion' final static private Pattern VERSION_JSON = ~/https:\/\/.*\/releases\/v(\d+(?:\.\w+)*)-(\w*)\.json$/ final private Boolean enabled diff --git a/modules/nextflow/src/main/groovy/nextflow/platform/PlatformHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/platform/PlatformHelper.groovy index afe51fddce..52461fde91 100644 --- a/modules/nextflow/src/main/groovy/nextflow/platform/PlatformHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/platform/PlatformHelper.groovy @@ -35,7 +35,7 @@ class PlatformHelper { * @return the Platform access token */ static String getAccessToken(Map opts, Map env) { - def token = env.get('TOWER_WORKFLOW_ID') + final token = env.get('TOWER_WORKFLOW_ID') ? env.get('TOWER_ACCESS_TOKEN') : opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN') return token @@ -51,7 +51,7 @@ class PlatformHelper { * @return the Platform refresh token */ static String getRefreshToken(Map opts, Map env) { - def token = env.get('TOWER_WORKFLOW_ID') + final token = env.get('TOWER_WORKFLOW_ID') ? env.get('TOWER_REFRESH_TOKEN') : opts.containsKey('refreshToken') ? opts.refreshToken as String : env.get('TOWER_REFRESH_TOKEN') return token @@ -66,7 +66,7 @@ class PlatformHelper { * @return */ static String getWorkspaceId(Map opts, Map env) { - def workspaceId = env.get('TOWER_WORKFLOW_ID') + final workspaceId = env.get('TOWER_WORKFLOW_ID') ? env.get('TOWER_WORKSPACE_ID') : opts.workspaceId as Long ?: env.get('TOWER_WORKSPACE_ID') as Long return workspaceId