Skip to content

Commit

Permalink
Merge pull request #6 from trocco-io/feature/azure_synapse_analytics
Browse files Browse the repository at this point in the history
Added Azure Synapse Analytics to supported products
  • Loading branch information
d-hrs authored Mar 13, 2024
2 parents 242db4d + db613c5 commit 31ac7b6
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 21 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/gem-push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Ruby Gem

on:
workflow_dispatch:
push:
tags:
- '*'

jobs:
build:
name: Build + Publish
runs-on: ubuntu-latest
permissions:
packages: write
contents: read
strategy:
matrix:
type:
- jdbc
- mysql
- postgresql
- redshift
- sqlserver
steps:
- uses: actions/checkout@v2
- name: Set up Ruby 2.7
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.7
- name: push gem
uses: trocco-io/push-gem-to-gpr-action@v2
with:
language: java
gem-path: "embulk-output-${{ matrix.type }}/build/gems/*.gem"
github-token: "${{ secrets.GITHUB_TOKEN }}"
gradle-subproject: "embulk-output-${{ matrix.type }}"
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins {
id "signing"
id 'checkstyle'
id "org.embulk.embulk-plugins" version "0.6.2" apply false
id "com.palantir.git-version" version "3.0.0"
}

allprojects {
Expand All @@ -15,11 +16,16 @@ allprojects {
description = "Inserts or updates records to a table."
}

ext {
troccoVersion = "0.0.1"
}

subprojects {
apply plugin: 'java'
apply plugin: "maven-publish"
apply plugin: "signing"
apply plugin: "org.embulk.embulk-plugins"
apply plugin: 'com.palantir.git-version'
//apply plugin: 'jacoco'

repositories {
Expand Down Expand Up @@ -106,6 +112,7 @@ subprojects {
summary = "JDBC output plugin for Embulk"
homepage = "https://github.com/embulk/embulk-output-jdbc"
licenses = [ "Apache-2.0" ]
archiveVersion = "${project.version}.trocco.${project.troccoVersion}"

into("default_jdbc_driver") {
from configurations.defaultJdbcDriver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ public enum Mode {
MERGE,
MERGE_DIRECT,
TRUNCATE_INSERT,
UPDATE_INSERT,
REPLACE;

@JsonValue
Expand All @@ -381,6 +382,8 @@ public static Mode fromString(String value)
return MERGE_DIRECT;
case "truncate_insert":
return TRUNCATE_INSERT;
case "update_insert":
return UPDATE_INSERT;
case "replace":
return REPLACE;
default:
Expand All @@ -401,15 +404,15 @@ public boolean isDirectModify()
*/
public boolean isMerge()
{
return this == MERGE || this == MERGE_DIRECT;
return this == MERGE || this == MERGE_DIRECT || this == UPDATE_INSERT;
}

/**
* True if this mode creates intermediate table for each tasks.
*/
public boolean tempTablePerTask()
{
return this == INSERT || this == MERGE || this == TRUNCATE_INSERT /*this == REPLACE_VIEW*/;
return this == INSERT || this == MERGE || this == TRUNCATE_INSERT || this == UPDATE_INSERT /*this == REPLACE_VIEW*/;
}

/**
Expand Down Expand Up @@ -896,6 +899,16 @@ protected void doCommit(JdbcOutputConnection con, PluginTask task, int taskCount
con.collectInsert(task.getIntermediateTables().get(), schema, task.getActualTable(), true, task.getBeforeLoad(), task.getAfterLoad());
break;

case UPDATE_INSERT:
// aggregate update & insert into target
if (task.getNewTableSchema().isPresent()) {
con.createTableIfNotExists(task.getActualTable(), task.getNewTableSchema().get(),
task.getCreateTableConstraint(), task.getCreateTableOption());
}
con.collectUpdateInsert(task.getIntermediateTables().get(), schema, task.getActualTable(),
new MergeConfig(task.getMergeKeys().get(), task.getMergeRule()), task.getBeforeLoad(), task.getAfterLoad());
break;

case MERGE:
// aggregate merge into target
if (task.getNewTableSchema().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,46 @@ protected String buildCollectInsertSql(List<TableIdentifier> fromTables, JdbcSch
return sb.toString();
}

protected void collectUpdateInsert(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable,
MergeConfig mergeConfig, Optional<String> preSql, Optional<String> postSql) throws SQLException
{
if (fromTables.isEmpty()) {
return;
}

Statement stmt = connection.createStatement();
try {
if (preSql.isPresent()) {
execute(stmt, preSql.get());
}

executeUpdate(stmt, buildCollectUpdateSql(fromTables, schema, toTable, mergeConfig));
executeUpdate(stmt, buildCollectInsertSql(fromTables, schema, toTable, mergeConfig));

if (postSql.isPresent()) {
execute(stmt, postSql.get());
}

commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
}
}

protected String buildCollectUpdateSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable,
MergeConfig mergeConfig) throws SQLException
{
throw new UnsupportedOperationException("not implemented");
}

protected String buildCollectInsertSql(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable,
MergeConfig mergeConfig) throws SQLException
{
throw new UnsupportedOperationException("not implemented");
}

protected void collectMerge(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable, MergeConfig mergeConfig,
Optional<String> preSql, Optional<String> postSql) throws SQLException
{
Expand Down
4 changes: 4 additions & 0 deletions embulk-output-sqlserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ embulk "-J-Djava.library.path=C:\drivers" run input-sqlserver.yml
* Behavior: Same with `insert` mode excepting that it truncates the target table (with SQL `DELETE FROM`, not `TRUNCATE`) right before the last `INSERT ...` query.
* Transactional: Yes.
* Resumable: No.
* **update_insert**:
* Behavior: Same with `merge` mode excepting that it runs separated `UPDATE` and `INSERT` statements instead of `MERGE` statement.
* Transactional: Yes.
* Resumable: No.
* **replace**:
* Behavior: This mode writes rows to an intermediate table first. If all those tasks run correctly, drops the target table and alters the name of the intermediate table into the target table name.
* Transactional: No. If fails, the target table could be dropped (because SQL Server can't rollback DDL).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.embulk.output.jdbc.setter.ColumnSetterFactory;
import org.embulk.output.sqlserver.InsertMethod;
import org.embulk.output.sqlserver.NativeBatchInsert;
import org.embulk.output.sqlserver.Product;
import org.embulk.output.sqlserver.SQLServerOutputConnector;
import org.embulk.output.sqlserver.setter.SQLServerColumnSetterFactory;
import org.embulk.util.config.Config;
Expand Down Expand Up @@ -103,6 +104,14 @@ public interface SQLServerPluginTask
@Config("socket_timeout")
@ConfigDefault("null")
public Optional<Integer> getSocketTimeout();

@Config("product")
@ConfigDefault("\"sql_server\"")
public Product getProduct();

@Config("host_name_in_certificate")
@ConfigDefault("null")
public Optional<String> getHostNameInCertificate();
}

private static class UrlAndProperties {
Expand Down Expand Up @@ -138,7 +147,7 @@ protected Features getFeatures(PluginTask task)
return new Features()
.setMaxTableNameLength(128)
.setSupportedModes(Collections.unmodifiableSet(new HashSet<Mode>(Arrays.asList(
Mode.INSERT, Mode.INSERT_DIRECT, Mode.MERGE, Mode.TRUNCATE_INSERT, Mode.REPLACE))))
Mode.INSERT, Mode.INSERT_DIRECT, Mode.MERGE, Mode.TRUNCATE_INSERT, Mode.UPDATE_INSERT, Mode.REPLACE))))
.setIgnoreMergeKeys(false);
}

Expand Down Expand Up @@ -175,7 +184,7 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
UrlAndProperties urlProps = getUrlAndProperties(sqlServerTask, useJtdsDriver);
logConnectionProperties(urlProps.getUrl(), urlProps.getProps());
return new SQLServerOutputConnector(urlProps.getUrl(), urlProps.getProps(), sqlServerTask.getSchema().orElse(null),
sqlServerTask.getTransactionIsolation());
sqlServerTask.getTransactionIsolation(), sqlServerTask.getProduct());
}

private UrlAndProperties getUrlAndProperties(SQLServerPluginTask sqlServerTask, boolean useJtdsDriver)
Expand Down Expand Up @@ -269,6 +278,19 @@ private UrlAndProperties getUrlAndProperties(SQLServerPluginTask sqlServerTask,
props.setProperty("socketTimeout", String.valueOf(sqlServerTask.getSocketTimeout().get() * 1000L)); // milliseconds
}

// https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-connect-overview#jdbc-connection-string-example
// jdbc:sqlserver://yourserver.database.windows.net:1433;database=yourdatabase;user={your_user_name};password={your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;
// https://learn.microsoft.com/en-us/azure/synapse-analytics/sql/connect-overview#jdbc-connection-string-example
// jdbc:sqlserver://yourserver.sql.azuresynapse.net:1433;database=yourdatabase;user={your_user_name};password={your_password_here};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;
// https://learn.microsoft.com/en-us/sql/connect/jdbc/setting-the-connection-properties?view=azure-sqldw-latest#properties
if (sqlServerTask.getProduct() == Product.AZURE_SYNAPSE_ANALYTICS) {
urlBuilder.append(";encrypt=true");
urlBuilder.append(";trustServerCertificate=false");
String host = sqlServerTask.getHost().get();
String hostNameInCertificate = sqlServerTask.getHostNameInCertificate().orElse(host.replaceFirst("^[^.]+\\.(.*)$", "*.$1"));
urlBuilder.append(";hostNameInCertificate=").append(hostNameInCertificate);
}

url = urlBuilder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.embulk.output.sqlserver;

import java.util.Locale;

import org.embulk.config.ConfigException;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

public enum Product
{
// https://learn.microsoft.com/en-us/sql/sql-server/sql-docs-navigation-guide?view=sql-server-ver16#applies-to
SQL_SERVER,
AZURE_SYNAPSE_ANALYTICS;

@JsonValue
@Override
public String toString()
{
return name().toLowerCase(Locale.ENGLISH);
}

@JsonCreator
public static Product fromString(String value)
{
for (Product product : Product.values()) {
if (product.toString().equals(value)) {
return product;
}
}
throw new ConfigException(String.format("Unknown product '%s'.", value));
}
}
Loading

0 comments on commit 31ac7b6

Please sign in to comment.