From c84cd60bd6180357de31955dfe33ecb53f111af1 Mon Sep 17 00:00:00 2001 From: Aron Meszaros Date: Wed, 8 Nov 2023 16:30:52 +0100 Subject: [PATCH] [CALCITE-6135] BEARER authentication support --- bom/build.gradle.kts | 1 + core/build.gradle.kts | 1 + .../avatica/BuiltInConnectionProperty.java | 5 +- .../calcite/avatica/ConnectionConfig.java | 2 + .../calcite/avatica/ConnectionConfigImpl.java | 4 + .../avatica/remote/AuthenticationType.java | 1 + .../remote/AvaticaCommonsHttpClientImpl.java | 21 +- .../remote/AvaticaHttpClientFactoryImpl.java | 18 ++ .../remote/BearerAuthenticateable.java | 22 ++ .../avatica/remote/BearerCredentials.java | 54 ++++ .../calcite/avatica/remote/BearerScheme.java | 142 ++++++++++ .../avatica/remote/BearerSchemeFactory.java | 34 +++ .../avatica/remote/BearerTokenProvider.java | 28 ++ .../remote/FileBearerTokenProvider.java | 142 ++++++++++ .../avatica/remote/FileChangeWatcher.java | 251 +++++++++++++++++ .../apache/calcite/avatica/util/Unsafe.java | 5 + .../avatica/remote/BearerSchemeTest.java | 148 ++++++++++ .../remote/FileBearerTokenProviderTest.java | 226 +++++++++++++++ .../avatica/remote/FileChangeWatcherTest.java | 258 ++++++++++++++++++ gradle.properties | 1 + 20 files changed, 1362 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/org/apache/calcite/avatica/remote/BearerAuthenticateable.java create mode 100644 core/src/main/java/org/apache/calcite/avatica/remote/BearerCredentials.java create mode 100644 core/src/main/java/org/apache/calcite/avatica/remote/BearerScheme.java create mode 100644 core/src/main/java/org/apache/calcite/avatica/remote/BearerSchemeFactory.java create mode 100644 core/src/main/java/org/apache/calcite/avatica/remote/BearerTokenProvider.java create mode 100644 core/src/main/java/org/apache/calcite/avatica/remote/FileBearerTokenProvider.java create mode 100644 core/src/main/java/org/apache/calcite/avatica/remote/FileChangeWatcher.java create mode 100644 core/src/test/java/org/apache/calcite/avatica/remote/BearerSchemeTest.java create mode 100644 core/src/test/java/org/apache/calcite/avatica/remote/FileBearerTokenProviderTest.java create mode 100644 core/src/test/java/org/apache/calcite/avatica/remote/FileChangeWatcherTest.java diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 294af95608..04ae013471 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -78,6 +78,7 @@ dependencies { apiv("org.ow2.asm:asm-tree", "asm") apiv("org.ow2.asm:asm-util", "asm") apiv("org.slf4j:slf4j-api", "slf4j") + apiv("commons-io:commons-io") // The log4j2 binding should be a runtime dependency but given that // some modules shade this dependency we need to keep it as api apiv("org.apache.logging.log4j:log4j-slf4j-impl", "log4j2") diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 34b64ce668..441a87dead 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -42,6 +42,7 @@ dependencies { testImplementation("org.mockito:mockito-core") testImplementation("org.mockito:mockito-inline") testImplementation("org.hamcrest:hamcrest-core") + testImplementation("commons-io:commons-io") testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl") } diff --git a/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java b/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java index d68d39dbf5..6a0f3c2c62 100644 --- a/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java +++ b/core/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java @@ -127,7 +127,10 @@ public enum BuiltInConnectionProperty implements ConnectionProperty { * HTTP Connection Timeout in milliseconds. */ HTTP_CONNECTION_TIMEOUT("http_connection_timeout", - Type.NUMBER, Timeout.ofMinutes(3).toMilliseconds(), false); + Type.NUMBER, Timeout.ofMinutes(3).toMilliseconds(), false), + + /** File containing bearer tokens. */ + TOKEN_FILE("tokenfile", Type.STRING, "", false); private final String camelName; private final Type type; diff --git a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java index adb98339b8..b3ed1a0c89 100644 --- a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java +++ b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java @@ -81,6 +81,8 @@ public interface ConnectionConfig { long getLBConnectionFailoverSleepTime(); /** @see BuiltInConnectionProperty#HTTP_CONNECTION_TIMEOUT **/ long getHttpConnectionTimeout(); + /** @see BuiltInConnectionProperty#TOKEN_FILE */ + String tokenFile(); } // End ConnectionConfig.java diff --git a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java index 9ae4446e79..e6e96ab264 100644 --- a/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java +++ b/core/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java @@ -169,6 +169,10 @@ public long getHttpConnectionTimeout() { return BuiltInConnectionProperty.HTTP_CONNECTION_TIMEOUT.wrap(properties).getLong(); } + public String tokenFile() { + return BuiltInConnectionProperty.TOKEN_FILE.wrap(properties).getString(); + } + /** Converts a {@link Properties} object containing (name, value) * pairs into a map whose keys are * {@link org.apache.calcite.avatica.InternalProperty} objects. diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/AuthenticationType.java b/core/src/main/java/org/apache/calcite/avatica/remote/AuthenticationType.java index f483be9bdf..8bdb7709a2 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/AuthenticationType.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/AuthenticationType.java @@ -24,6 +24,7 @@ public enum AuthenticationType { BASIC, DIGEST, SPNEGO, + BEARER, CUSTOM; } diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java index 1c5327f13a..9e2b469d7a 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java @@ -60,6 +60,9 @@ import java.net.URISyntaxException; import java.net.URL; import java.security.Principal; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -68,7 +71,7 @@ * sent and received across the wire. */ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient, HttpClientPoolConfigurable, - UsernamePasswordAuthenticateable, GSSAuthenticateable { + UsernamePasswordAuthenticateable, GSSAuthenticateable, BearerAuthenticateable { private static final Logger LOG = LoggerFactory.getLogger(AvaticaCommonsHttpClientImpl.class); // SPNEGO specific settings @@ -91,6 +94,10 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient, HttpClie protected CredentialsProvider credentialsProvider = null; protected Lookup authRegistry = null; protected Object userToken; + private static final List AVATICA_SCHEME_PRIORITY = + Collections.unmodifiableList(Arrays.asList(StandardAuthScheme.BASIC, + StandardAuthScheme.DIGEST, StandardAuthScheme.SPNEGO, StandardAuthScheme.NTLM, + StandardAuthScheme.KERBEROS, "Bearer")); public AvaticaCommonsHttpClientImpl(URL url) { this.uri = toURI(Objects.requireNonNull(url)); @@ -104,6 +111,7 @@ protected void initializeClient(PoolingHttpClientConnectionManager pool, RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); RequestConfig requestConfig = requestConfigBuilder .setConnectTimeout(config.getHttpConnectionTimeout(), TimeUnit.MILLISECONDS) + .setTargetPreferredAuthSchemes(AVATICA_SCHEME_PRIORITY) .build(); HttpClientBuilder httpClientBuilder = HttpClients.custom().setConnectionManager(pool) .setDefaultRequestConfig(requestConfig); @@ -206,6 +214,17 @@ CloseableHttpResponse execute(HttpPost post, HttpClientContext context) } } + @Override public void setTokenProvider(String username, BearerTokenProvider tokenProvider) { + this.credentialsProvider = new BasicCredentialsProvider(); + ((BasicCredentialsProvider) this.credentialsProvider) + .setCredentials(anyAuthScope, new BearerCredentials(username, tokenProvider)); + + this.authRegistry = RegistryBuilder.create() + .register("Bearer", + new BearerSchemeFactory()) + .build(); + } + /** * A credentials implementation which returns null. */ diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java b/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java index 0b9d66c07c..cc4e5b5abe 100644 --- a/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java +++ b/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java @@ -130,6 +130,24 @@ public static AvaticaHttpClientFactoryImpl getInstance() { LOG.debug("{} is not capable of kerberos authentication.", authType); } + if (client instanceof BearerAuthenticateable) { + if (AuthenticationType.BEARER == authType) { + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + try { + tokenProvider.init(config); + String username = config.avaticaUser(); + if (null == username) { + username = System.getProperty("user.name"); + } + ((BearerAuthenticateable) client).setTokenProvider(username, tokenProvider); + } catch (java.io.IOException e) { + LOG.debug("Failed to initialize bearer authentication"); + } + } + } else { + LOG.debug("{} is not capable of bearer authentication.", authType); + } + if (null != kerberosUtil) { client = new DoAsAvaticaHttpClient(client, kerberosUtil); } diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/BearerAuthenticateable.java b/core/src/main/java/org/apache/calcite/avatica/remote/BearerAuthenticateable.java new file mode 100644 index 0000000000..5e0094eecd --- /dev/null +++ b/core/src/main/java/org/apache/calcite/avatica/remote/BearerAuthenticateable.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +public interface BearerAuthenticateable { + + void setTokenProvider(String username, BearerTokenProvider tokenProvider); +} diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/BearerCredentials.java b/core/src/main/java/org/apache/calcite/avatica/remote/BearerCredentials.java new file mode 100644 index 0000000000..693be548d7 --- /dev/null +++ b/core/src/main/java/org/apache/calcite/avatica/remote/BearerCredentials.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.hc.client5.http.auth.Credentials; +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.util.Args; + +import java.io.Serializable; +import java.security.Principal; + +@Contract(threading = ThreadingBehavior.IMMUTABLE) +public class BearerCredentials implements Credentials, Serializable { + + private final BearerTokenProvider tokenProvider; + + private final String userName; + + public BearerCredentials(final String userName, final BearerTokenProvider tokenProvider) { + Args.notNull(userName, "userName"); + Args.notNull(tokenProvider, "tokenProvider"); + this.tokenProvider = tokenProvider; + this.userName = userName; + } + + public String getToken() { + return tokenProvider.obtain(userName); + } + + @Override + public Principal getUserPrincipal() { + return null; + } + + @Override + public char[] getPassword() { + return null; + } +} diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/BearerScheme.java b/core/src/main/java/org/apache/calcite/avatica/remote/BearerScheme.java new file mode 100644 index 0000000000..35e6aa94df --- /dev/null +++ b/core/src/main/java/org/apache/calcite/avatica/remote/BearerScheme.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.hc.client5.http.auth.AuthChallenge; +import org.apache.hc.client5.http.auth.AuthScheme; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.AuthenticationException; +import org.apache.hc.client5.http.auth.Credentials; +import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.auth.MalformedChallengeException; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.security.Principal; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +public class BearerScheme implements AuthScheme, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(BearerScheme.class); + private String token; + + private final Map paramMap; + private boolean complete; + + public BearerScheme() { + super(); + this.paramMap = new HashMap<>(); + this.complete = false; + } + + @Override + public String getName() { + return "Bearer"; + } + + @Override + public boolean isConnectionBased() { + return false; + } + + @Override + public String getRealm() { + return this.paramMap.get("realm"); + } + + @Override + public void processChallenge( + final AuthChallenge authChallenge, + final HttpContext context) throws MalformedChallengeException { + this.paramMap.clear(); + final List params = authChallenge.getParams(); + if (params != null) { + for (final NameValuePair param: params) { + this.paramMap.put(param.getName().toLowerCase(Locale.ROOT), param.getValue()); + } + if (LOG.isDebugEnabled()) { + final String error = paramMap.get("error"); + if (error != null) { + final StringBuilder buf = new StringBuilder(); + buf.append(error); + final String desc = paramMap.get("error_description"); + final String uri = paramMap.get("error_uri"); + if (desc != null || uri != null) { + buf.append(" ("); + buf.append(desc).append("; ").append(uri); + buf.append(")"); + } + LOG.debug(buf.toString()); + } + } + } + this.complete = true; + } + + @Override + public boolean isChallengeComplete() { + return this.complete; + } + + @Override + public boolean isResponseReady( + final HttpHost host, + final CredentialsProvider credentialsProvider, + final HttpContext context) throws AuthenticationException { + Args.notNull(host, "Auth host"); + Args.notNull(credentialsProvider, "CredentialsProvider"); + + final Credentials credentials = credentialsProvider.getCredentials( + new AuthScope(host, null, getName()), context); + + if (!(credentials instanceof BearerCredentials)) { + return false; + } + + this.token = ((BearerCredentials) credentials).getToken(); + return null != this.token; + } + + @Override + public Principal getPrincipal() { + return null; + } + + @Override + public String generateAuthResponse( + final HttpHost host, + final HttpRequest request, + final HttpContext context) throws AuthenticationException { + Asserts.notNull(this.token, "Bearer token"); + return "Bearer " + this.token; + } + + @Override + public String toString() { + return getName() + this.paramMap; + } +} diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/BearerSchemeFactory.java b/core/src/main/java/org/apache/calcite/avatica/remote/BearerSchemeFactory.java new file mode 100644 index 0000000000..ec634faa0e --- /dev/null +++ b/core/src/main/java/org/apache/calcite/avatica/remote/BearerSchemeFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.hc.client5.http.auth.AuthScheme; +import org.apache.hc.client5.http.auth.AuthSchemeFactory; +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.http.protocol.HttpContext; + +@Contract(threading = ThreadingBehavior.STATELESS) +public class BearerSchemeFactory implements AuthSchemeFactory { + public static final BearerSchemeFactory INSTANCE = new BearerSchemeFactory(); + + @Override + public AuthScheme create(final HttpContext context) { + return new BearerScheme(); + } + +} diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/BearerTokenProvider.java b/core/src/main/java/org/apache/calcite/avatica/remote/BearerTokenProvider.java new file mode 100644 index 0000000000..ea291b2ee1 --- /dev/null +++ b/core/src/main/java/org/apache/calcite/avatica/remote/BearerTokenProvider.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.ConnectionConfig; + +import java.io.IOException; + +public interface BearerTokenProvider { + + void init(ConnectionConfig config) throws IOException; + + String obtain(String username); +} diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/FileBearerTokenProvider.java b/core/src/main/java/org/apache/calcite/avatica/remote/FileBearerTokenProvider.java new file mode 100644 index 0000000000..bfa02277b2 --- /dev/null +++ b/core/src/main/java/org/apache/calcite/avatica/remote/FileBearerTokenProvider.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.BuiltInConnectionProperty; +import org.apache.calcite.avatica.ConnectionConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; + +public class FileBearerTokenProvider implements BearerTokenProvider { + + private static final Logger LOG = LoggerFactory.getLogger(FileBearerTokenProvider.class); + + private final Map tokenMap = new HashMap<>(); + private String filename; + + @Override + public void init(ConnectionConfig config) throws IOException { + filename = config.tokenFile(); + if (filename == null || filename.trim().isEmpty()) { + throw new UnsupportedOperationException("Config option " + + BuiltInConnectionProperty.TOKEN_FILE + + " must be specified to use file based Token Provider"); + } + + reload(); + newFileChangeWatcher(filename).start(); + } + + @Override + public synchronized String obtain(String username) { + return tokenMap.get(username); + } + + private synchronized void reload() throws FileNotFoundException { + try (Scanner scanner = new Scanner(new File(filename), StandardCharsets.UTF_8.name())) { + tokenMap.clear(); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + if (line.isEmpty()) { + LOG.warn("Skip empty line in: {}", filename); + continue; + } + String[] parts = line.split(","); + if (parts.length != 2 || parts[0].isEmpty() || parts[1].isEmpty()) { + LOG.warn("Skip invalid line in {}: {}", filename, line); + continue; + } + if (tokenMap.put(parts[0], parts[1]) != null) { + LOG.warn("Multiple tokens, latest takes precedence for user: {}", parts[0]); + } + } + LOG.info("OAuth Bearer tokens have been updated from file: {}", filename); + } catch (FileNotFoundException e) { + LOG.warn("File not found: {}", e.getMessage()); + } + } + + private FileChangeWatcher newFileChangeWatcher(String fileLocation) throws + IOException { + if (fileLocation == null || fileLocation.isEmpty()) { + return null; + } + final Path filePath = Paths.get(fileLocation).toAbsolutePath(); + Path parentPath = filePath.getParent(); + if (parentPath == null) { + throw new IOException( + "File path does not have a parent: " + filePath); + } + return new FileChangeWatcher( + parentPath, + watchEvent -> { + handleWatchEvent(filePath, watchEvent); + }); + } + + /** + * Handler for watch events that let us know a file we may care about has changed on disk. + * + * @param filePath the path to the file we are watching for changes. + * @param event the WatchEvent. + */ + private void handleWatchEvent(Path filePath, WatchEvent event) { + boolean shouldReload = false; + Path dirPath = filePath.getParent(); + if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { + // If we get notified about possibly missed events, + // reload the key store / trust store just to be sure. + shouldReload = true; + } else if (event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) + || event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)) { + Path eventFilePath = dirPath.resolve((Path) event.context()); + if (filePath.equals(eventFilePath)) { + shouldReload = true; + } + } + // Note: we don't care about delete events + if (shouldReload) { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to reload tokens from file after receiving watch event: " + + event.kind() + " with context: " + event.context()); + } + try { + reload(); + } catch (FileNotFoundException e) { + LOG.error("Error reloading tokens from file", e); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring watch event and keeping previous tokens. Event kind: " + + event.kind() + " with context: " + event.context()); + } + } + } +} diff --git a/core/src/main/java/org/apache/calcite/avatica/remote/FileChangeWatcher.java b/core/src/main/java/org/apache/calcite/avatica/remote/FileChangeWatcher.java new file mode 100644 index 0000000000..cb8f4e006c --- /dev/null +++ b/core/src/main/java/org/apache/calcite/avatica/remote/FileChangeWatcher.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.util.Unsafe; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.function.Consumer; + +/** + * This file has been copied from the Apache ZooKeeper project. + * "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9 + * /zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java" + * + * Instances of this class can be used to watch a directory for file changes. + * When a file is added to, deleted from, or is modified in the given directory, + * the callback provided by the user will be called from a background thread. + * Some things to keep in mind: + *
    + *
  • The callback should be thread-safe.
  • + *
  • Changes that happen around the time the thread is started may be missed.
  • + *
  • There is a delay between a file changing and the callback firing.
  • + *
  • The watch is not recursive - changes to subdirectories will not trigger a callback.
  • + *
+ */ +public final class FileChangeWatcher { + private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class); + + public enum State { + NEW, // object created but start() not called yet + STARTING, // start() called but background thread has not entered main loop + RUNNING, // background thread is running + STOPPING, // stop() called but background thread has not exited main loop + STOPPED // stop() called and background thread has exited, or background thread crashed + } + + private final WatcherThread watcherThread; + private State state; // protected by synchronized(this) + + /** + * Creates a watcher that watches dirPath + * and invokes callback on changes. + * + * @param dirPath the directory to watch. + * @param callback the callback to invoke with events. + * event.kind() will return the type of event, and + * event.context() will return the filename relative to + * dirPath. + * @throws IOException if there is an error creating the WatchService. + */ + public FileChangeWatcher(Path dirPath, Consumer> callback) throws IOException { + FileSystem fs = dirPath.getFileSystem(); + WatchService watchService = fs.newWatchService(); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering with watch service: " + dirPath); + } + dirPath.register(watchService, new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.OVERFLOW }); + state = State.NEW; + this.watcherThread = new WatcherThread(watchService, callback); + this.watcherThread.setDaemon(true); + } + + /** + * Returns the current {@link FileChangeWatcher.State}. + * @return the current state. + */ + public synchronized State getState() { + return state; + } + + /** + * Blocks until the current state becomes desiredState. + * Currently only used by tests, thus package-private. + * @param desiredState the desired state. + * @throws InterruptedException if the current thread gets interrupted. + */ + synchronized void waitForState(State desiredState) throws InterruptedException { + while (this.state != desiredState) { + Unsafe.wait(this); + } + } + + /** + * Sets the state to newState. + * @param newState the new state. + */ + private synchronized void setState(State newState) { + state = newState; + Unsafe.notifyAll(this); + } + + /** + * Atomically sets the state to update if and only if the + * state is currently expected. + * @param expected the expected state. + * @param update the new state. + * @return true if the update succeeds, or false if the current state + * does not equal expected. + */ + private synchronized boolean compareAndSetState(State expected, State update) { + if (state == expected) { + setState(update); + return true; + } else { + return false; + } + } + + /** + * Atomically sets the state to update if and only if the + * state is currently one of expectedStates. + * @param expectedStates the expected states. + * @param update the new state. + * @return true if the update succeeds, or false if the current state + * does not equal any of the expectedStates. + */ + private synchronized boolean compareAndSetState(State[] expectedStates, State update) { + for (State expected : expectedStates) { + if (state == expected) { + setState(update); + return true; + } + } + return false; + } + + /** + * Tells the background thread to start. Does not wait for it to be running. + * Calling this method more than once has no effect. + */ + public void start() { + if (!compareAndSetState(State.NEW, State.STARTING)) { + // If previous state was not NEW, start() has already been called. + return; + } + this.watcherThread.start(); + } + + /** + * Tells the background thread to stop. Does not wait for it to exit. + */ + public void stop() { + if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) { + watcherThread.interrupt(); + } + } + + /** + * Inner class that implements the watcher thread logic. + */ + private class WatcherThread extends Thread { + private static final String THREAD_NAME = "FileChangeWatcher"; + + final WatchService watchService; + final Consumer> callback; + + WatcherThread(WatchService watchService, Consumer> callback) { + super(THREAD_NAME); + this.watchService = watchService; + this.callback = callback; + } + + @Override public void run() { + try { + LOG.info(getName() + " thread started"); + if (!compareAndSetState(FileChangeWatcher.State.STARTING, + FileChangeWatcher.State.RUNNING)) { + // stop() called shortly after start(), before + // this thread started running. + FileChangeWatcher.State state = FileChangeWatcher.this.getState(); + if (state != FileChangeWatcher.State.STOPPING) { + throw new IllegalStateException("Unexpected state: " + state); + } + return; + } + runLoop(); + } catch (Exception e) { + LOG.warn("Error in runLoop()", e); + throw e; + } finally { + try { + watchService.close(); + } catch (IOException e) { + LOG.warn("Error closing watch service", e); + } + LOG.info(getName() + " thread finished"); + FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED); + } + } + + private void runLoop() { + while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) { + WatchKey key; + try { + key = watchService.take(); + } catch (InterruptedException | ClosedWatchServiceException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + " was interrupted and is shutting down ..."); + } + break; + } + for (WatchEvent event : key.pollEvents()) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Got file changed event: " + event.kind() + " with context: " + event.context()); + } + try { + callback.accept(event); + } catch (Throwable e) { + LOG.error("Error from callback", e); + } + } + boolean isKeyValid = key.reset(); + if (!isKeyValid) { + // This is likely a problem, it means that file reloading is broken, probably because the + // directory we are watching was deleted or otherwise became inaccessible + // (unmounted, permissions changed, ???). + // For now, we log an error and exit the watcher thread. + LOG.error("Watch key no longer valid, maybe the directory is inaccessible?"); + break; + } + } + } + } +} diff --git a/core/src/main/java/org/apache/calcite/avatica/util/Unsafe.java b/core/src/main/java/org/apache/calcite/avatica/util/Unsafe.java index 906651d92e..32449b1a2f 100644 --- a/core/src/main/java/org/apache/calcite/avatica/util/Unsafe.java +++ b/core/src/main/java/org/apache/calcite/avatica/util/Unsafe.java @@ -45,6 +45,11 @@ public static void wait(Object o) throws InterruptedException { o.wait(); } + /** Calls {@link Object#wait(long timeout)}. */ + public static void wait(Object o, long timeout) throws InterruptedException { + o.wait(timeout); + } + /** Returns a {@link java.util.Calendar} with the local time zone and root * locale. */ public static Calendar localCalendar() { diff --git a/core/src/test/java/org/apache/calcite/avatica/remote/BearerSchemeTest.java b/core/src/test/java/org/apache/calcite/avatica/remote/BearerSchemeTest.java new file mode 100644 index 0000000000..645ff4392e --- /dev/null +++ b/core/src/test/java/org/apache/calcite/avatica/remote/BearerSchemeTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.BuiltInConnectionProperty; +import org.apache.calcite.avatica.ConnectionConfig; +import org.apache.calcite.avatica.ConnectionConfigImpl; + +import org.apache.hc.client5.http.auth.AuthChallenge; +import org.apache.hc.client5.http.auth.AuthScheme; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.ChallengeType; +import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.impl.auth.CredentialsProviderBuilder; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.message.BasicHttpRequest; +import org.apache.hc.core5.http.message.BasicNameValuePair; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +import static org.junit.Assert.*; + + +/** + * Bearer authentication test cases. + * This file has been copied from the Apache HttpComponents Client project + * https://github.com/apache/httpcomponents-client/blob/master/ + * httpclient5/src/test/java/org/apache/hc/client5/http/impl/auth/TestBearerScheme.java + */ +public class BearerSchemeTest { + File tokensFile; + ConnectionConfig conf; + @Before + public void setup() throws IOException { + tokensFile = File.createTempFile("bearertoken_", ".txt"); + + try (Writer fileWriter = new OutputStreamWriter( + new FileOutputStream(tokensFile), StandardCharsets.UTF_8)) { + fileWriter.write("testUser,token1\n"); + } + + Properties props = new Properties(); + props.put(BuiltInConnectionProperty.TOKEN_FILE.camelName(), tokensFile.getAbsolutePath()); + conf = new ConnectionConfigImpl(props); + } + + @After + public void teardown() { + tokensFile.delete(); + } + + @Test + public void testBearerAuthenticationEmptyChallenge() throws Exception { + final AuthChallenge authChallenge = new AuthChallenge(ChallengeType.TARGET, "BEARER"); + final AuthScheme authscheme = new BearerScheme(); + authscheme.processChallenge(authChallenge, null); + assertNull(authscheme.getRealm()); + } + + @Test + public void testBearerAuthentication() throws Exception { + final AuthChallenge authChallenge = new AuthChallenge(ChallengeType.TARGET, "Bearer", + new BasicNameValuePair("realm", "test")); + + final AuthScheme authscheme = new BearerScheme(); + authscheme.processChallenge(authChallenge, null); + + final HttpHost host = new HttpHost("somehost", 80); + final FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + final CredentialsProvider credentialsProvider = CredentialsProviderBuilder.create() + .add(new AuthScope(host, "test", null), + new BearerCredentials("testUser", tokenProvider)) + .build(); + + final HttpRequest request = new BasicHttpRequest("GET", "/"); + assertTrue(authscheme.isResponseReady(host, credentialsProvider, null)); + assertEquals("Bearer token1", authscheme.generateAuthResponse(host, request, null)); + + assertEquals("test", authscheme.getRealm()); + assertTrue(authscheme.isChallengeComplete()); + assertFalse(authscheme.isConnectionBased()); + } + + @Test + public void testNoTokenForUser() throws Exception { + final AuthChallenge authChallenge = new AuthChallenge(ChallengeType.TARGET, "Bearer", + new BasicNameValuePair("realm", "test")); + + final AuthScheme authscheme = new BearerScheme(); + authscheme.processChallenge(authChallenge, null); + + final HttpHost host = new HttpHost("somehost", 80); + final FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + final CredentialsProvider credentialsProvider = CredentialsProviderBuilder.create() + .add(new AuthScope(host, "test", null), + new BearerCredentials("testUser2", tokenProvider)) + .build(); + + final HttpRequest request = new BasicHttpRequest("GET", "/"); + assertFalse(authscheme.isResponseReady(host, credentialsProvider, null)); + } + + @Test + public void testSerialization() throws Exception { + final AuthChallenge authChallenge = new AuthChallenge(ChallengeType.TARGET, "Bearer", + new BasicNameValuePair("realm", "test"), + new BasicNameValuePair("code", "read")); + + final AuthScheme authscheme = new BearerScheme(); + authscheme.processChallenge(authChallenge, null); + + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + final ObjectOutputStream out = new ObjectOutputStream(buffer); + out.writeObject(authscheme); + out.flush(); + final byte[] raw = buffer.toByteArray(); + final ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw)); + final BearerScheme authscheme2 = (BearerScheme) in.readObject(); + + assertEquals(authscheme2.getName(), authscheme2.getName()); + assertEquals(authscheme2.getRealm(), authscheme2.getRealm()); + assertEquals(authscheme.isChallengeComplete(), authscheme2.isChallengeComplete()); + } + +} diff --git a/core/src/test/java/org/apache/calcite/avatica/remote/FileBearerTokenProviderTest.java b/core/src/test/java/org/apache/calcite/avatica/remote/FileBearerTokenProviderTest.java new file mode 100644 index 0000000000..3ae164d0ac --- /dev/null +++ b/core/src/test/java/org/apache/calcite/avatica/remote/FileBearerTokenProviderTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.BuiltInConnectionProperty; +import org.apache.calcite.avatica.ConnectionConfig; +import org.apache.calcite.avatica.ConnectionConfigImpl; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +public class FileBearerTokenProviderTest { + File tokensFile; + ConnectionConfig conf; + @Before + public void setup() throws IOException { + tokensFile = File.createTempFile("bearertoken_", ".txt"); + + Properties props = new Properties(); + props.put(BuiltInConnectionProperty.TOKEN_FILE.camelName(), tokensFile.getAbsolutePath()); + conf = new ConnectionConfigImpl(props); + } + + @After + public void teardown() { + tokensFile.delete(); + } + + @Test + public void testTokens() throws IOException { + // Arrange + try (Writer fileWriter = new OutputStreamWriter( + new FileOutputStream(tokensFile), StandardCharsets.UTF_8)) { + fileWriter.write("user1,token1\n"); + fileWriter.write("user2,token2\n"); + fileWriter.write("user3,token3\n"); + } + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + + // Act + String token1 = tokenProvider.obtain("user1"); + String token2 = tokenProvider.obtain("user2"); + String token3 = tokenProvider.obtain("user3"); + + // Assert + assertEquals("token1", token1); + assertEquals("token2", token2); + assertEquals("token3", token3); + } + + @Test + public void testInvalidLine() throws IOException { + // Arrange + try (Writer fileWriter = new OutputStreamWriter( + new FileOutputStream(tokensFile), StandardCharsets.UTF_8)) { + fileWriter.write("user1,token1\n"); + fileWriter.write("user2,,token2\n"); + fileWriter.write("user3\n"); + } + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + + // Act + String token1 = tokenProvider.obtain("user1"); + String token2 = tokenProvider.obtain("user2"); + String token3 = tokenProvider.obtain("user3"); + + // Assert + assertEquals("token1", token1); + assertNull(token2); + assertNull(token3); + } + + @Test + public void testEmptyLine() throws IOException { + // Arrange + try (Writer fileWriter = new OutputStreamWriter( + new FileOutputStream(tokensFile), StandardCharsets.UTF_8)) { + fileWriter.write("user1,token1\n"); + fileWriter.write("\n"); + fileWriter.write("user3,token3\n"); + } + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + + // Act + String token1 = tokenProvider.obtain("user1"); + String token2 = tokenProvider.obtain("user2"); + String token3 = tokenProvider.obtain("user3"); + + // Assert + assertEquals("token1", token1); + assertNull(token2); + assertEquals("token3", token3); + } + + @Test + public void testMultiple() throws IOException { + // Arrange + try (Writer fileWriter = new OutputStreamWriter( + new FileOutputStream(tokensFile), StandardCharsets.UTF_8)) { + fileWriter.write("user1,token1\n"); + fileWriter.write("user2,token2\n"); + fileWriter.write("user1,token3\n"); + } + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + + // Act + String token1 = tokenProvider.obtain("user1"); + String token2 = tokenProvider.obtain("user2"); + + // Assert + assertEquals("token3", token1); + assertEquals("token2", token2); + } + + @Test(expected = UnsupportedOperationException.class) + public void testMissingConfig() throws IOException { + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + Properties props = new Properties(); + ConnectionConfig emptyConf = new ConnectionConfigImpl(props); + tokenProvider.init(emptyConf); + } + + @Test + public void testFileChanged() throws IOException, InterruptedException { + // Arrange + try (Writer fileWriter = new OutputStreamWriter( + new FileOutputStream(tokensFile), StandardCharsets.UTF_8)) { + fileWriter.write("user1,token1\n"); + } + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + + // Act & Assert + assertEquals("token1", tokenProvider.obtain("user1")); + assertNull(tokenProvider.obtain("user2")); + try (Writer fileWriter = new OutputStreamWriter( + new FileOutputStream(tokensFile), StandardCharsets.UTF_8)) { + fileWriter.write("user2,token2\n"); + fileWriter.write("user1,token3\n"); + } + boolean success; + int attempts = 0; + do { + success = Objects.equals(tokenProvider.obtain("user1"), "token3") + && Objects.equals(tokenProvider.obtain("user2"), "token2"); + ++attempts; + Thread.sleep(1000); + } while (attempts < 5 && !success); + if (!success) { + fail("Tokens have not been reloaded from the file that we changed"); + } + } + + @Test + public void testMissingFile() throws IOException { + // Arrange + tokensFile.delete(); + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + + // Act + String token1 = tokenProvider.obtain("user1"); + + // Assert + assertNull(token1); + } + + @Test + public void testDelayedFileCreation() throws IOException, InterruptedException { + // Arrange + tokensFile.delete(); + FileBearerTokenProvider tokenProvider = new FileBearerTokenProvider(); + tokenProvider.init(conf); + + // Act & Assert + assertNull(tokenProvider.obtain("user1")); + tokensFile = new File(tokensFile.getAbsolutePath()); + tokensFile.deleteOnExit(); + try (Writer fileWriter = new OutputStreamWriter( + new FileOutputStream(tokensFile), StandardCharsets.UTF_8)) { + fileWriter.write("user1,token1\n"); + } + boolean success; + int attempts = 0; + do { + success = Objects.equals(tokenProvider.obtain("user1"), "token1"); + ++attempts; + Thread.sleep(1000); + } while (attempts < 5 && !success); + if (!success) { + fail("Token has not been reloaded from the file that we created"); + } + } +} diff --git a/core/src/test/java/org/apache/calcite/avatica/remote/FileChangeWatcherTest.java b/core/src/test/java/org/apache/calcite/avatica/remote/FileChangeWatcherTest.java new file mode 100644 index 0000000000..44fb00fb65 --- /dev/null +++ b/core/src/test/java/org/apache/calcite/avatica/remote/FileChangeWatcherTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.util.Unsafe; + +import org.apache.commons.io.FileUtils; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This file has been copied from the Apache ZooKeeper project. + * "https://github.com/apache/zookeeper/blob/c42c8c94085ed1d94a22158fbdfe2945118a82bc + * /zookeeper-server/src/test/java/org/apache/zookeeper/common/FileChangeWatcherTest.java" + */ +public class FileChangeWatcherTest { + private static File tempDir; + private static File tempFile; + + private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcherTest.class); + + @BeforeClass public static void createTempFile() throws IOException { + tempDir = Files.createTempDirectory("jwt_test_", + PosixFilePermissions.asFileAttribute( + PosixFilePermissions.fromString("rwx------"))).toFile(); + tempDir.deleteOnExit(); + tempFile = File.createTempFile("jwt_testfile_", "", tempDir); + tempFile.deleteOnExit(); + } + + @Test public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + Unsafe.notifyAll(events); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // XXX hack + for (int i = 0; i < 3; i++) { + LOG.info("Modifying file, attempt " + (i + 1)); + FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8, + true); + synchronized (events) { + if (events.size() < i + 1) { + Unsafe.wait(events, 3000L); + } + assertEquals("Wrong number of events", i + 1, events.size()); + WatchEvent event = events.get(i); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + Unsafe.notifyAll(events); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // XXX hack + LOG.info("Touching file"); + FileUtils.touch(tempFile); + synchronized (events) { + if (events.isEmpty()) { + Unsafe.wait(events, 3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + synchronized (events) { + events.add(event); + Unsafe.notifyAll(events); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // XXX hack + File tempFile2 = File.createTempFile("jwt_testfile_", "", tempDir); + tempFile2.deleteOnExit(); + synchronized (events) { + if (events.isEmpty()) { + Unsafe.wait(events, 3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind()); + assertEquals(tempFile2.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + Unsafe.notifyAll(events); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // XXX hack + tempFile.delete(); + synchronized (events) { + if (events.isEmpty()) { + Unsafe.wait(events, 3000L); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test public void testCallbackErrorDoesNotCrashWatcherThread() + throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final AtomicInteger callCount = new AtomicInteger(0); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: " + event.kind() + " " + event.context()); + int oldValue; + synchronized (callCount) { + oldValue = callCount.getAndIncrement(); + Unsafe.notifyAll(callCount); + } + if (oldValue == 0) { + throw new RuntimeException("This error should not crash the watcher thread"); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // XXX hack + LOG.info("Modifying file"); + FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); + synchronized (callCount) { + while (callCount.get() == 0) { + Unsafe.wait(callCount, 3000L); + } + } + LOG.info("Modifying file again"); + FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); + synchronized (callCount) { + if (callCount.get() == 1) { + Unsafe.wait(callCount, 3000L); + } + } + // The value of callCount can exceed 1 only if the callback thread + // survives the exception thrown by the first callback. + assertTrue(callCount.get() > 1); + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } +} diff --git a/gradle.properties b/gradle.properties index 6b59c1c6e3..eaef1980ff 100644 --- a/gradle.properties +++ b/gradle.properties @@ -76,3 +76,4 @@ protobuf.version=3.21.9 scott-data-hsqldb.version=0.1 servlet.version=4.0.1 slf4j.version=1.7.25 +commons-io.version=2.15.0