Skip to content

Commit

Permalink
Merge pull request #7 from trocco-io/feature/oauth-m2m
Browse files Browse the repository at this point in the history
Implementation of oauth-m2m authentication
  • Loading branch information
yu-kioo authored Dec 26, 2024
2 parents 91fa488 + 3e37a90 commit fa39bfc
Show file tree
Hide file tree
Showing 14 changed files with 269 additions and 65 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ jobs:
NON_ASCII_CATALOG_NAME: ${{ vars.TEST_NON_ASCII_CATALOG_NAME }}
NON_ASCII_SCHEMA_NAME: ${{ vars.TEST_NON_ASCII_SCHEMA_NAME }}
STAGING_VOLUME_NAME_PREFIX: ${{ vars.TEST_STAGING_VOLUME_NAME_PREFIX }}
OAUTH2_CLIENT_ID: ${{ vars.TEST_OAUTH2_CLIENT_ID }}
OAUTH2_CLIENT_SECRET: ${{ secrets.TEST_OAUTH2_CLIENT_SECRET }}
- run: ./gradlew test
env:
EMBULK_OUTPUT_DATABRICKS_TEST_CONFIG: "./test-config.yml"
Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ Databricks output plugin for Embulk loads records to Databricks Delta Table.
- **product_version**: product version of user agent (string, default: "0.0.0")
- **server_hostname**: The Databricks compute resource’s Server Hostname value, see [Compute settings for the Databricks JDBC Driver](https://docs.databricks.com/en/integrations/jdbc/compute.html). (string, required)
- **http_path**: The Databricks compute resource’s HTTP Path value, see [Compute settings for the Databricks JDBC Driver](https://docs.databricks.com/en/integrations/jdbc/compute.html). (string, required)
- **personal_access_token**: The Databaricks personal_access_token, see [Authentication settings for the Databricks JDBC Driver](https://docs.databricks.com/en/integrations/jdbc/authentication.html#authentication-pat). (string, required)
- **auth_type**: The Databricks authentication type, personal access token (PAT)-based or machine-to-machine (M2M) authentication. (`pat`, `oauth-m2m`, default: `pat`)
- If **auth_type** is `pat`,
- **personal_access_token**: The Databaricks personal_access_token, see [Authentication settings for the Databricks JDBC Driver](https://docs.databricks.com/en/integrations/jdbc/authentication.html#authentication-pat). (string, required)
- If **auth_type** is `m2m-auth`,
- **oauth2_client_id**: The Databaricks oauth2_client_id, see [Use a service principal to authenticate with Databricks](https://docs.databricks.com/en/dev-tools/auth/oauth-m2m.html). (string, required)
- **oauth2_client_secret**: The Databaricks oauth2_client_secret, see [Use a service principal to authenticate with Databricks](https://docs.databricks.com/en/dev-tools/auth/oauth-m2m.html). (string, required)
- **catalog_name**: destination catalog name (string, required)
- **schema_name**: destination schema name (string, required)
- **table**: destination table name (string, required)
Expand Down
63 changes: 15 additions & 48 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ plugins {
id "com.palantir.git-version" version "0.12.3"
id "com.diffplug.spotless" version "5.15.0"
id "com.adarshr.test-logger" version "3.0.0"
id "com.github.johnrengelman.shadow" version "6.0.0" apply false
}
repositories {
mavenCentral()
Expand All @@ -26,30 +27,20 @@ version = {
}()

dependencies {
compileOnly "org.embulk:embulk-api:0.10.31"
compileOnly "org.embulk:embulk-spi:0.10.31"
def embulkVersion = "0.10.31"
compileOnly("org.embulk:embulk-api:${embulkVersion}")
compileOnly("org.embulk:embulk-spi:${embulkVersion}")

compile("org.embulk:embulk-util-config:0.3.0") {
// They conflict with embulk-core. They are once excluded here,
// and added explicitly with versions exactly the same with embulk-core:0.10.19.
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
def jdbcVersion = "0.10.5"
compile("org.embulk:embulk-output-jdbc:$jdbcVersion")
compile("org.embulk:embulk-output-postgresql:$jdbcVersion")

compile project(path: ":shadow-databricks-jdbc", configuration: "shadow")
compile("com.databricks:databricks-sdk-java:0.20.0") {
exclude group: "org.slf4j", module: "slf4j-api"
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
exclude group: "com.fasterxml.jackson.datatype", module: "jackson-datatype-jdk8"
exclude group: "javax.validation", module: "validation-api"
}

// They are once excluded from transitive dependencies of other dependencies,
// and added explicitly with versions exactly the same with embulk-core:0.10.19.
compile "com.fasterxml.jackson.core:jackson-annotations:2.6.7"
compile "com.fasterxml.jackson.core:jackson-core:2.6.7"
compile "com.fasterxml.jackson.core:jackson-databind:2.6.7"
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7"
compile "javax.validation:validation-api:1.1.0.Final"

compile("org.embulk:embulk-util-json:0.1.0") {
exclude group: "org.msgpack", module: "msgpack-core" // Included in embulk-api.
}
testImplementation "junit:junit:4.+"
testImplementation "org.embulk:embulk-junit4:0.10.31"
testImplementation "org.embulk:embulk-core:0.10.31"
Expand All @@ -58,45 +49,19 @@ dependencies {
testImplementation "org.embulk:embulk-input-file:0.10.31"
testImplementation "org.embulk:embulk-parser-csv:0.10.31"

compile "org.embulk:embulk-output-jdbc:0.10.5"
compile "org.embulk:embulk-output-postgresql:0.10.5"
compile 'com.databricks:databricks-jdbc:2.6.36'
compile("com.databricks:databricks-sdk-java:0.20.0") {
exclude group: "org.slf4j", module: "slf4j-api"
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
exclude group: "com.fasterxml.jackson.datatype", module: "jackson-datatype-jdk8"
exclude group: "javax.validation", module: "validation-api"
}

// Supress following logs in gradlew test.
// SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
// SLF4J: Defaulting to no-operation (NOP) logger implementation
// SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
testImplementation("org.slf4j:slf4j-simple:1.7.30")
}

embulkPlugin {
mainClass = "org.embulk.output.DatabricksOutputPlugin"
category = "output"
type = "databricks"
}
// This Gradle plugin's POM dependency modification works for "maven-publish" tasks.
//
// Note that "uploadArchives" is no longer supported. It is deprecated in Gradle 6 to be removed in Gradle 7.
// https://github.com/gradle/gradle/issues/3003#issuecomment-495025844
publishing {
publications {
embulkPluginMaven(MavenPublication) { // Publish it with "publishEmbulkPluginMavenPublicationToMavenRepository".
from components.java // Must be "components.java". The dependency modification works only for it.
}
}
repositories {
maven {
url = "${project.buildDir}/mavenPublishLocal"
}
}
}

gem {
from("LICENSE.txt")
authors = [ "" ]
Expand All @@ -105,9 +70,11 @@ gem {
homepage = "https://github.com/trocco-io/embulk-output-databricks"
licenses = [ "MIT" ]
}

test {
maxParallelForks 1
}

spotless {
java {
importOrder()
Expand Down
2 changes: 2 additions & 0 deletions ci/config_template.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
server_hostname: "$SERVER_HOSTNAME"
http_path: "$HTTP_PATH"
personal_access_token: "$PERSONAL_ACCESS_TOKEN"
oauth2_client_id: "$OAUTH2_CLIENT_ID"
oauth2_client_secret: "$OAUTH2_CLIENT_SECRET"
catalog_name: "$CATALOG_NAME"
schema_name: "$SCHEMA_NAME"
table_prefix: "$TABLE_PREFIX"
Expand Down
2 changes: 2 additions & 0 deletions example/test.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
server_hostname:
http_path:
personal_access_token:
oauth2_client_id:
oauth2_client_secret:
catalog_name:
schema_name:
non_ascii_schema_name:
Expand Down
1 change: 0 additions & 1 deletion gradle/dependency-locks/embulkPluginRuntime.lockfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
com.databricks:databricks-jdbc:2.6.36
com.databricks:databricks-sdk-java:0.20.0
com.fasterxml.jackson.core:jackson-annotations:2.6.7
com.fasterxml.jackson.core:jackson-core:2.6.7
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
rootProject.name = 'embulk-output-databricks'
include "shadow-databricks-jdbc"
36 changes: 36 additions & 0 deletions shadow-databricks-jdbc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apply plugin: "java"
apply plugin: "com.github.johnrengelman.shadow"

repositories {
mavenCentral()
}

group = "io.trocco"
version = "${rootProject.version}"
description = "A helper library for embulk-output-databricks"

sourceCompatibility = 1.8
targetCompatibility = 1.8

configurations {
runtimeClasspath {
resolutionStrategy.activateDependencyLocking()
}
shadow {
resolutionStrategy.activateDependencyLocking()
transitive = false
}
}

dependencies {
compile('com.databricks:databricks-jdbc:2.6.38')
}

shadowJar {
// suppress the following undesirable log (https://stackoverflow.com/a/61475766/24393181)
//
// ERROR StatusLogger Unrecognized format specifier [d]
// ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
// ...
exclude "**/Log4j2Plugins.dat"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
com.databricks:databricks-jdbc:2.6.38
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
57 changes: 52 additions & 5 deletions src/main/java/org/embulk/output/DatabricksOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.sql.SQLException;
import java.util.*;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.output.databricks.DatabricksAPIClient;
import org.embulk.output.databricks.DatabricksCopyBatchInsert;
Expand Down Expand Up @@ -31,8 +32,21 @@ public interface DatabricksPluginTask extends PluginTask {
@Config("http_path")
public String getHTTPPath();

@Config("auth_type")
@ConfigDefault("\"pat\"") // oauth-m2m or pat
public String getAuthType();

@Config("personal_access_token")
public String getPersonalAccessToken();
@ConfigDefault("null")
public Optional<String> getPersonalAccessToken();

@Config("oauth2_client_id")
@ConfigDefault("null")
public Optional<String> getOauth2ClientId();

@Config("oauth2_client_secret")
@ConfigDefault("null")
public Optional<String> getOauth2ClientSecret();

@Config("catalog_name")
public String getCatalogName();
Expand Down Expand Up @@ -65,6 +79,25 @@ public interface UserAgentEntry extends Task {
@ConfigDefault("\"0.0.0\"")
public String getProductVersion();
}

static String fetchPersonalAccessToken(DatabricksPluginTask t) {
return validatePresence(t.getPersonalAccessToken(), "personal_access_token");
}

static String fetchOauth2ClientId(DatabricksPluginTask t) {
return validatePresence(t.getOauth2ClientId(), "oauth2_client_id");
}

static String fetchOauth2ClientSecret(DatabricksPluginTask t) {
return validatePresence(t.getOauth2ClientSecret(), "oauth2_client_secret");
}
}

static <T> T validatePresence(Optional<T> val, String varName) {
if (val.isPresent()) {
return val.get();
}
throw new ConfigException(String.format("%s must not be null.", varName));
}

@Override
Expand Down Expand Up @@ -98,9 +131,22 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
String url = String.format("jdbc:databricks://%s:443", t.getServerHostname());
Properties props = new java.util.Properties();
props.put("httpPath", t.getHTTPPath());
props.put("AuthMech", "3");
props.put("UID", "token");
props.put("PWD", t.getPersonalAccessToken());
String authType = t.getAuthType();
switch (authType) {
case "pat":
props.put("AuthMech", "3");
props.put("UID", "token");
props.put("PWD", DatabricksPluginTask.fetchPersonalAccessToken(t));
break;
case "oauth-m2m":
props.put("AuthMech", "11");
props.put("Auth_Flow", "1");
props.put("OAuth2ClientId", DatabricksPluginTask.fetchOauth2ClientId(t));
props.put("OAuth2Secret", DatabricksPluginTask.fetchOauth2ClientSecret(t));
break;
default:
throw new ConfigException(String.format("unknown auth_type '%s'", authType));
}
props.put("SSL", "1");
props.put("ConnCatalog", t.getCatalogName());
props.put("ConnSchema", t.getSchemaName());
Expand Down Expand Up @@ -160,10 +206,11 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg

@Override
protected void logConnectionProperties(String url, Properties props) {
List<String> maskedKeys = Arrays.asList("PWD", "OAuth2Secret");
Properties maskedProps = new Properties();
for (Object keyObj : props.keySet()) {
String key = (String) keyObj;
String maskedVal = key.equals("PWD") ? "***" : props.getProperty(key);
String maskedVal = maskedKeys.contains(key) ? "***" : props.getProperty(key);
maskedProps.setProperty(key, maskedVal);
}
super.logConnectionProperties(url, maskedProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import org.embulk.output.DatabricksOutputPlugin;
import org.embulk.config.ConfigException;
import org.embulk.output.DatabricksOutputPlugin.DatabricksPluginTask;

public class DatabricksAPIClient {
public static DatabricksAPIClient create(DatabricksOutputPlugin.DatabricksPluginTask task) {
public static DatabricksAPIClient create(DatabricksPluginTask task) {
setUserAgent(task);

return new DatabricksAPIClient(createDatabricksConfig(task));
}

private static void setUserAgent(DatabricksOutputPlugin.DatabricksPluginTask task) {
private static void setUserAgent(DatabricksPluginTask task) {
String name = task.getUserAgentEntry().getProductName();
String version = task.getUserAgentEntry().getProductVersion();

Expand Down Expand Up @@ -51,11 +52,22 @@ public void deleteFile(String filePath) {
workspaceClient.files().delete(filePath);
}

public static DatabricksConfig createDatabricksConfig(
DatabricksOutputPlugin.DatabricksPluginTask task) {
return new DatabricksConfig()
.setHost(task.getServerHostname())
.setToken(task.getPersonalAccessToken());
public static DatabricksConfig createDatabricksConfig(DatabricksPluginTask task) {
DatabricksConfig config = new DatabricksConfig().setHost(task.getServerHostname());
String authType = task.getAuthType();
config.setAuthType(authType);
switch (authType) {
case "pat":
config.setToken(DatabricksPluginTask.fetchPersonalAccessToken(task));
break;
case "oauth-m2m":
config.setClientId(DatabricksPluginTask.fetchOauth2ClientId(task));
config.setClientSecret(DatabricksPluginTask.fetchOauth2ClientSecret(task));
break;
default:
throw new ConfigException(String.format("unknown auth_type '%s'", authType));
}
return config;
}

public static String createFilePath(
Expand Down
Loading

0 comments on commit fa39bfc

Please sign in to comment.