diff --git a/bootstrap/openmetadata-ops.sh b/bootstrap/openmetadata-ops.sh
new file mode 100755
index 000000000000..ecb90428446d
--- /dev/null
+++ b/bootstrap/openmetadata-ops.sh
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+# Copyright 2021 Collate
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Resolve links - $0 may be a softlink
+PRG="${0}"
+while [ -h "${PRG}" ]; do
+ ls=`ls -ld "${PRG}"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "${PRG}"`/"$link"
+ fi
+done
+
+BOOTSTRAP_DIR=`dirname ${PRG}`
+CONFIG_FILE_PATH=${BOOTSTRAP_DIR}/../conf/openmetadata.yaml
+
+# Which java to use
+if [ -z "${JAVA_HOME}" ]; then
+ JAVA="java"
+else
+ JAVA="${JAVA_HOME}/bin/java"
+fi
+
+OPENMETADATA_SETUP_MAIN_CLASS=org.openmetadata.service.util.OpenMetadataOperations
+LIBS_DIR="${BOOTSTRAP_DIR}"/../libs/
+if [ ${debug} ] ; then
+ echo $LIBS_DIR
+fi
+if [ -d "${LIBS_DIR}" ]; then
+ for file in "${LIBS_DIR}"*.jar;
+ do
+ CLASSPATH="$CLASSPATH":"$file"
+ done
+else
+ CLASSPATH=`mvn -pl openmetadata-service -q exec:exec -Dexec.executable=echo -Dexec.args="%classpath"`
+fi
+
+${JAVA} -Dbootstrap.dir=$BOOTSTRAP_DIR -cp ${CLASSPATH} ${OPENMETADATA_SETUP_MAIN_CLASS} -c $CONFIG_FILE_PATH "$@"
+
diff --git a/bootstrap/sql/migrations/flyway/com.mysql.cj.jdbc.Driver/v015__update_server_change_log.sql b/bootstrap/sql/migrations/flyway/com.mysql.cj.jdbc.Driver/v015__update_server_change_log.sql
new file mode 100644
index 000000000000..5680beac3c39
--- /dev/null
+++ b/bootstrap/sql/migrations/flyway/com.mysql.cj.jdbc.Driver/v015__update_server_change_log.sql
@@ -0,0 +1 @@
+ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics JSON;
\ No newline at end of file
diff --git a/bootstrap/sql/migrations/flyway/org.postgresql.Driver/v015__update_server_change_log.sql b/bootstrap/sql/migrations/flyway/org.postgresql.Driver/v015__update_server_change_log.sql
new file mode 100644
index 000000000000..93468558e872
--- /dev/null
+++ b/bootstrap/sql/migrations/flyway/org.postgresql.Driver/v015__update_server_change_log.sql
@@ -0,0 +1 @@
+ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics jsonb;
\ No newline at end of file
diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml
index 921bc880f426..1706c81a0680 100644
--- a/conf/openmetadata.yaml
+++ b/conf/openmetadata.yaml
@@ -91,7 +91,19 @@ server:
logging:
level: ${LOG_LEVEL:-INFO}
loggers:
- io.swagger: DEBUG
+ org.openmetadata.service.util.OpenMetadataSetup:
+ level: INFO
+ appenders:
+ - type: console
+ logFormat: "%msg%n"
+ timeZone: UTC
+ - type: file
+ logFormat: "%level [%d{ISO8601,UTC}] [%t] %logger{5} - %msg%n"
+ currentLogFilename: ./logs/openmetadata-operations.log
+ archivedLogFilenamePattern: ./logs/openmetadata-operations-%d{yyyy-MM-dd}-%i.log.gz
+ archivedFileCount: 7
+ timeZone: UTC
+ maxFileSize: 50MB
appenders:
- type: console
threshold: TRACE
@@ -249,7 +261,7 @@ pipelineServiceClientConfiguration:
# If we don't need this, set "org.openmetadata.service.clients.pipeline.noop.NoopClient"
className: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"}
apiEndpoint: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080}
- metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api}
+ metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://host.docker.internal:8585/api}
ingestionIpInfoEnabled: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false}
hostIp: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""}
healthCheckInterval: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300}
diff --git a/openmetadata-service/pom.xml b/openmetadata-service/pom.xml
index fdf05a1fad99..932634376afa 100644
--- a/openmetadata-service/pom.xml
+++ b/openmetadata-service/pom.xml
@@ -485,6 +485,10 @@
1.9.3
test
+
+ info.picocli
+ picocli
+
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java
index 0d2f9481da2f..06c821040759 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java
@@ -297,7 +297,7 @@ public static void registerEntity(
ENTITY_LIST.add(entity);
Collections.sort(ENTITY_LIST);
- LOG.info("Registering entity {} {}", clazz, entity);
+ LOG.debug("Registering entity {} {}", clazz, entity);
}
public static void registerEntity(
@@ -309,7 +309,7 @@ public static void registerEntity(
ENTITY_LIST.add(entity);
Collections.sort(ENTITY_LIST);
- LOG.info("Registering entity time series {} {}", clazz, entity);
+ LOG.debug("Registering entity time series {} {}", clazz, entity);
}
public static void registerResourcePermissions(
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java
index bbed494d4c01..52baf77e8708 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java
@@ -13,6 +13,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -54,6 +55,40 @@
@Slf4j
public class SearchIndexApp extends AbstractNativeApplication {
+
+ private static final String ALL = "all";
+ private static final Set ALL_ENTITIES =
+ Set.of(
+ "table",
+ "dashboard",
+ "topic",
+ "pipeline",
+ "searchIndex",
+ "user",
+ "team",
+ "glossaryTerm",
+ "mlmodel",
+ "tag",
+ "classification",
+ "query",
+ "container",
+ "database",
+ "databaseSchema",
+ "testCase",
+ "testSuite",
+ "chart",
+ "dashboardDataModel",
+ "databaseService",
+ "messagingService",
+ "dashboardService",
+ "pipelineService",
+ "mlmodelService",
+ "searchService",
+ "entityReportData",
+ "webAnalyticEntityViewReportData",
+ "webAnalyticUserActivityReportData",
+ "domain",
+ "storedProcedure");
private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
private final List paginatedEntitiesSources = new ArrayList<>();
private final List paginatedDataInsightSources = new ArrayList<>();
@@ -67,12 +102,14 @@ public class SearchIndexApp extends AbstractNativeApplication {
@Override
public void init(App app, CollectionDAO dao, SearchRepository searchRepository) {
super.init(app, dao, searchRepository);
-
// request for reindexing
EventPublisherJob request =
JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class)
.withStats(new Stats())
.withFailure(new Failure());
+ if (request.getEntities().contains(ALL)) {
+ request.setEntities(ALL_ENTITIES);
+ }
int totalRecords = getTotalRequestToProcess(request.getEntities(), collectionDAO);
this.jobData = request;
this.jobData.setStats(
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java
index 0665f14e5167..e5d8d9ff2b64 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java
@@ -35,7 +35,7 @@ public static PipelineServiceClient createPipelineServiceClient(
}
String pipelineServiceClientClass = config.getClassName();
- LOG.info("Registering PipelineServiceClient: {}", pipelineServiceClientClass);
+ LOG.debug("Registering PipelineServiceClient: {}", pipelineServiceClientClass);
try {
pipelineServiceClient =
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java
index 08ccd3d82377..7fe7b393d2c1 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java
@@ -3,13 +3,20 @@
import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL;
import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
+import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
+import org.jdbi.v3.core.mapper.RowMapper;
+import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.core.statement.StatementException;
import org.jdbi.v3.sqlobject.SingleValue;
+import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
+import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate;
@@ -53,26 +60,29 @@ String getSqlQuery(@Bind("version") String version, @Bind("checksum") String che
@ConnectionAwareSqlUpdate(
value =
- "INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, installed_on)"
- + "VALUES (:version, :migrationFileName, :checksum, CURRENT_TIMESTAMP) "
+ "INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, metrics, installed_on)"
+ + "VALUES (:version, :migrationFileName, :checksum, :metrics, CURRENT_TIMESTAMP) "
+ "ON DUPLICATE KEY UPDATE "
+ "migrationFileName = :migrationFileName, "
+ "checksum = :checksum, "
+ + "metrics = :metrics,"
+ "installed_on = CURRENT_TIMESTAMP",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
- "INSERT INTO server_change_log (version, migrationFileName, checksum, installed_on)"
- + "VALUES (:version, :migrationFileName, :checksum, current_timestamp) "
+ "INSERT INTO server_change_log (version, migrationFileName, checksum, metrics, installed_on)"
+ + "VALUES (:version, :migrationFileName, :checksum, to_jsonb(:metrics::text), current_timestamp) "
+ "ON CONFLICT (version) DO UPDATE SET "
+ "migrationFileName = EXCLUDED.migrationFileName, "
+ + "metrics = to_jsonb(:metrics::text),"
+ "checksum = EXCLUDED.checksum, "
+ "installed_on = EXCLUDED.installed_on",
connectionType = POSTGRES)
void upsertServerMigration(
@Bind("version") String version,
@Bind("migrationFileName") String migrationFileName,
- @Bind("checksum") String checksum);
+ @Bind("checksum") String checksum,
+ @Bind("metrics") String metrics);
@ConnectionAwareSqlUpdate(
value =
@@ -113,6 +123,11 @@ void upsertServerMigrationSQL(
connectionType = POSTGRES)
String checkIfQueryPreviouslyRan(@Bind("checksum") String checksum);
+ @SqlQuery(
+ "SELECT installed_rank, version, migrationFileName, checksum, installed_on, metrics FROM SERVER_CHANGE_LOG ORDER BY version ASC")
+ @RegisterRowMapper(FromServerChangeLogMapper.class)
+ List listMetricsFromDBMigrations();
+
@Getter
@Setter
class ServerMigrationSQLTable {
@@ -120,4 +135,30 @@ class ServerMigrationSQLTable {
private String sqlStatement;
private String checkSum;
}
+
+ @Getter
+ @Setter
+ @Builder
+ class ServerChangeLog {
+ private Integer installedRank;
+ private String version;
+ private String migrationFileName;
+ private String checksum;
+ private String installedOn;
+ private String metrics;
+ }
+
+ class FromServerChangeLogMapper implements RowMapper {
+ @Override
+ public ServerChangeLog map(ResultSet rs, StatementContext ctx) throws SQLException {
+ return ServerChangeLog.builder()
+ .installedRank(rs.getInt("installed_rank"))
+ .version(rs.getString("version"))
+ .migrationFileName(rs.getString("migrationFileName"))
+ .checksum(rs.getString("checksum"))
+ .installedOn(rs.getString("installed_on"))
+ .metrics(rs.getString("metrics"))
+ .build();
+ }
+ }
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
index 5f162f74a511..27891346e93a 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java
@@ -7,13 +7,14 @@
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
+import org.json.JSONObject;
import org.openmetadata.service.jdbi3.MigrationDAO;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
+import org.openmetadata.service.migration.context.MigrationContext;
import org.openmetadata.service.migration.context.MigrationWorkflowContext;
import org.openmetadata.service.migration.utils.MigrationFile;
@@ -29,6 +30,8 @@ public class MigrationWorkflow {
private final boolean forceMigrations;
+ private Optional currentMaxMigrationVersion;
+
public MigrationWorkflow(
Jdbi jdbi,
String nativeSQLScriptRootPath,
@@ -86,26 +89,26 @@ public List getMigrationFiles(
return Stream.concat(
availableOMNativeMigrations.stream(), availableExtensionMigrations.stream())
.sorted()
- .collect(Collectors.toList());
+ .toList();
}
public List getMigrationFilesFromPath(String path, ConnectionType connectionType) {
return Arrays.stream(Objects.requireNonNull(new File(path).listFiles(File::isDirectory)))
.map(dir -> new MigrationFile(dir, migrationDAO, connectionType))
.sorted()
- .collect(Collectors.toList());
+ .toList();
}
private List filterAndGetMigrationsToRun(
List availableMigrations) {
LOG.debug("Filtering Server Migrations");
- Optional previousMaxMigration = migrationDAO.getMaxServerMigrationVersion();
+ currentMaxMigrationVersion = migrationDAO.getMaxServerMigrationVersion();
List applyMigrations;
- if (previousMaxMigration.isPresent() && !forceMigrations) {
+ if (currentMaxMigrationVersion.isPresent() && !forceMigrations) {
applyMigrations =
availableMigrations.stream()
- .filter(migration -> migration.biggerThan(previousMaxMigration.get()))
- .collect(Collectors.toList());
+ .filter(migration -> migration.biggerThan(currentMaxMigrationVersion.get()))
+ .toList();
} else {
applyMigrations = availableMigrations;
}
@@ -125,14 +128,16 @@ private List filterAndGetMigrationsToRun(
return processes;
}
- @SuppressWarnings("unused")
- private void initializeMigrationWorkflow() {}
-
public void runMigrationWorkflows() {
try (Handle transactionHandler = jdbi.open()) {
LOG.info("[MigrationWorkflow] WorkFlow Started");
MigrationWorkflowContext context = new MigrationWorkflowContext(transactionHandler);
- context.computeInitialContext();
+ if (currentMaxMigrationVersion.isPresent()) {
+ LOG.debug("Current Max version {}", currentMaxMigrationVersion.get());
+ context.computeInitialContext(currentMaxMigrationVersion.get());
+ } else {
+ context.computeInitialContext("1.1.0");
+ }
try {
for (MigrationProcess process : migrations) {
// Initialise Migration Steps
@@ -176,7 +181,7 @@ public void runMigrationWorkflows() {
process.getVersion(),
process.getDatabaseConnectionType(),
process.getMigrationsPath());
- updateMigrationStepInDB(process);
+ updateMigrationStepInDB(process, context);
}
} catch (Exception e) {
@@ -190,17 +195,14 @@ public void runMigrationWorkflows() {
LOG.info("[MigrationWorkflow] WorkFlow Completed");
}
- public void closeMigrationWorkflow() {
- // 1. Write to DB table the version we upgraded to
- // should be the current server version
-
- // 2. Commit Transaction on completion
- }
-
- public void updateMigrationStepInDB(MigrationProcess step) {
+ public void updateMigrationStepInDB(
+ MigrationProcess step, MigrationWorkflowContext workflowContext) {
+ MigrationContext context = workflowContext.getMigrationContext().get(step.getVersion());
+ JSONObject metrics = new JSONObject(context.getResults());
migrationDAO.upsertServerMigration(
- step.getVersion(), step.getMigrationsPath(), UUID.randomUUID().toString());
+ step.getVersion(),
+ step.getMigrationsPath(),
+ UUID.randomUUID().toString(),
+ metrics.toString());
}
-
- public void migrateSearchIndexes() {}
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationContext.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationContext.java
index 93317522cfff..7e5c66fec722 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationContext.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationContext.java
@@ -2,7 +2,6 @@
import java.util.HashMap;
import java.util.List;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -20,8 +19,7 @@ public class MigrationContext {
public MigrationContext(String version, List migrationOps, Handle handle) {
this.version = version;
this.migrationOps =
- Stream.concat(migrationOps.stream(), CommonMigrationOps.getCommonOps().stream())
- .collect(Collectors.toList());
+ Stream.concat(migrationOps.stream(), CommonMigrationOps.getCommonOps().stream()).toList();
this.handle = handle;
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationWorkflowContext.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationWorkflowContext.java
index 09aedfba0a07..8c610424704a 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationWorkflowContext.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationWorkflowContext.java
@@ -10,19 +10,15 @@
@Slf4j
public class MigrationWorkflowContext {
@Getter private final HashMap migrationContext;
- private final MigrationContext initialContext;
private final Handle handle;
public MigrationWorkflowContext(Handle handle) {
this.migrationContext = new HashMap<>();
this.handle = handle;
-
- // Initialize the context only with the common ops
- this.initialContext = new MigrationContext("initial", List.of(), handle);
}
- public void computeInitialContext() {
- computeMigrationSafely(this.initialContext);
+ public void computeInitialContext(String currentMaxMigrationVersion) {
+ computeMigrationSafely(new MigrationContext(currentMaxMigrationVersion, List.of(), handle));
}
public void computeMigrationContext(MigrationProcess process) {
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
index 4789d725a706..b96ec1df2efb 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
@@ -861,6 +861,8 @@ public Response deployApplicationFlow(
pipelineServiceClient.deployPipeline(ingestionPipeline, service);
if (status.getCode() == 200) {
ingestionPipelineRepository.createOrUpdate(uriInfo, ingestionPipeline);
+ } else {
+ ingestionPipeline.setDeployed(false);
}
return Response.status(status.getCode()).entity(status).build();
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java
index 65fc8e6a5703..636f35528de7 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java
@@ -885,8 +885,6 @@ private PipelineServiceClientResponse deployPipelineInternal(
UUID id, UriInfo uriInfo, SecurityContext securityContext) {
Fields fields = getFields(FIELD_OWNER);
IngestionPipeline ingestionPipeline = repository.get(uriInfo, id, fields);
- ingestionPipeline.setOpenMetadataServerConnection(
- new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build());
decryptOrNullify(securityContext, ingestionPipeline, true);
ServiceEntityInterface service =
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/AsciiTable.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/AsciiTable.java
new file mode 100644
index 000000000000..e3e971d5356a
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/AsciiTable.java
@@ -0,0 +1,126 @@
+package org.openmetadata.service.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AsciiTable {
+ private static final String DEFAULT_COLUMN_NAME = "(No column name)";
+ private static final String DEFAULT_NO_VALUE = "-";
+
+ private final List columns;
+ private final List> rows;
+ private final boolean printHeader;
+ private final String nullText;
+ private final String emptyText;
+
+ public AsciiTable(
+ List columns,
+ List> rows,
+ boolean printHeader,
+ String nullText,
+ String emptyText) {
+ this.columns = ensureValidColumns(columns);
+ this.rows = rows;
+ this.printHeader = printHeader;
+ this.nullText = nullText;
+ this.emptyText = emptyText;
+ }
+
+ private static List ensureValidColumns(List columns) {
+ List validColumns = new ArrayList<>();
+ for (String column : columns) {
+ validColumns.add(column != null ? column : DEFAULT_COLUMN_NAME);
+ }
+ return validColumns;
+ }
+
+ /**
+ * @return The table rendered with column header and row data.
+ */
+ public String render() {
+ List widths = new ArrayList<>();
+ for (String column : columns) {
+ widths.add(column.length());
+ }
+
+ for (List row : rows) {
+ for (int i = 0; i < row.size(); i++) {
+ widths.set(i, Math.max(widths.get(i), getValue(row, i).length()));
+ }
+ }
+
+ StringBuilder ruler = new StringBuilder("+");
+ for (Integer width : widths) {
+ ruler.append("-").append(trimOrPad("", width, '-')).append("-+");
+ }
+ ruler.append("\n");
+
+ StringBuilder result = new StringBuilder();
+
+ if (printHeader) {
+ StringBuilder header = new StringBuilder("|");
+ for (int i = 0; i < widths.size(); i++) {
+ header.append(" ").append(trimOrPad(columns.get(i), widths.get(i), ' ')).append(" |");
+ }
+ header.append("\n");
+
+ result.append(ruler);
+ result.append(header);
+ }
+
+ result.append(ruler);
+
+ if (rows.isEmpty()) {
+ result
+ .append("| ")
+ .append(trimOrPad(emptyText, ruler.length() - Math.min(ruler.length(), 5)))
+ .append(" |\n");
+ } else {
+ for (List row : rows) {
+ StringBuilder r = new StringBuilder("|");
+ for (int i = 0; i < widths.size(); i++) {
+ r.append(" ").append(trimOrPad(getValue(row, i), widths.get(i), ' ')).append(" |");
+ }
+ r.append("\n");
+ result.append(r);
+ }
+ }
+
+ result.append(ruler);
+ return result.toString();
+ }
+
+ private String getValue(List row, int i) {
+ try {
+ String value = row.get(i);
+ if (value == null) {
+ value = nullText;
+ }
+ return value;
+ } catch (IndexOutOfBoundsException e) {
+ return DEFAULT_NO_VALUE;
+ }
+ }
+
+ private String trimOrPad(String str, int length, char padChar) {
+ StringBuilder result;
+ if (str == null) {
+ result = new StringBuilder();
+ } else {
+ result = new StringBuilder(str);
+ }
+
+ if (result.length() > length) {
+ return result.substring(0, length);
+ }
+
+ while (result.length() < length) {
+ result.append(padChar);
+ }
+ return result.toString();
+ }
+
+ private String trimOrPad(String str, int length) {
+ return trimOrPad(str, length, ' ');
+ }
+}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java
new file mode 100644
index 000000000000..cc40a5559442
--- /dev/null
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java
@@ -0,0 +1,467 @@
+package org.openmetadata.service.util;
+
+import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable;
+import static org.openmetadata.service.Entity.FIELD_OWNER;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.dropwizard.configuration.EnvironmentVariableSubstitutor;
+import io.dropwizard.configuration.FileConfigurationSourceProvider;
+import io.dropwizard.configuration.SubstitutingSourceProvider;
+import io.dropwizard.configuration.YamlConfigurationFactory;
+import io.dropwizard.db.DataSourceFactory;
+import io.dropwizard.jackson.Jackson;
+import io.dropwizard.jersey.validation.Validators;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import javax.validation.Validator;
+import lombok.extern.slf4j.Slf4j;
+import org.flywaydb.core.Flyway;
+import org.flywaydb.core.api.MigrationVersion;
+import org.jdbi.v3.core.Jdbi;
+import org.jdbi.v3.sqlobject.SqlObjectPlugin;
+import org.jdbi.v3.sqlobject.SqlObjects;
+import org.openmetadata.schema.ServiceEntityInterface;
+import org.openmetadata.schema.entity.app.App;
+import org.openmetadata.schema.entity.app.AppSchedule;
+import org.openmetadata.schema.entity.app.ScheduledExecutionContext;
+import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
+import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection;
+import org.openmetadata.schema.system.EventPublisherJob;
+import org.openmetadata.schema.type.Include;
+import org.openmetadata.sdk.PipelineServiceClient;
+import org.openmetadata.service.Entity;
+import org.openmetadata.service.OpenMetadataApplicationConfig;
+import org.openmetadata.service.apps.scheduler.AppScheduler;
+import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
+import org.openmetadata.service.fernet.Fernet;
+import org.openmetadata.service.jdbi3.CollectionDAO;
+import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
+import org.openmetadata.service.jdbi3.ListFilter;
+import org.openmetadata.service.jdbi3.MigrationDAO;
+import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator;
+import org.openmetadata.service.jdbi3.locator.ConnectionType;
+import org.openmetadata.service.migration.api.MigrationWorkflow;
+import org.openmetadata.service.resources.databases.DatasourceConfig;
+import org.openmetadata.service.search.SearchIndexFactory;
+import org.openmetadata.service.search.SearchRepository;
+import org.openmetadata.service.secrets.SecretsManager;
+import org.openmetadata.service.secrets.SecretsManagerFactory;
+import org.openmetadata.service.util.jdbi.DatabaseAuthenticationProviderFactory;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+@Slf4j
+@Command(
+ name = "OpenMetadataSetup",
+ mixinStandardHelpOptions = true,
+ version = "OpenMetadataSetup 1.3",
+ description =
+ "Creates or Migrates Database/Search Indexes. ReIndex the existing data into Elastic Search "
+ + "or OpenSearch. Re-Deploys the service pipelines.")
+public class OpenMetadataOperations implements Callable {
+
+ private OpenMetadataApplicationConfig config;
+ private Flyway flyway;
+ private Jdbi jdbi;
+ private SearchRepository searchRepository;
+ private String nativeSQLScriptRootPath;
+ private String extensionSQLScriptRootPath;
+ private SecretsManager secretsManager;
+ private CollectionDAO collectionDAO;
+
+ @Option(
+ names = {"-d", "--debug"},
+ defaultValue = "false")
+ private boolean debug;
+
+ @Option(
+ names = {"-c", "--config"},
+ required = true)
+ private String configFilePath;
+
+ @Override
+ public Integer call() {
+ LOG.info(
+ "Subcommand needed: 'info', 'validate', 'repair', 'check-connection', "
+ + "'drop-create', 'migrate', 'reindex', 'deploy-pipelines'");
+ return 0;
+ }
+
+ @Command(
+ name = "info",
+ description =
+ "Shows the list of migrations applied and the pending migration "
+ + "waiting to be applied on the target database")
+ public Integer info() {
+ try {
+ parseConfig();
+ LOG.info(dumpToAsciiTable(flyway.info().all()));
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Failed due to ", e);
+ return 1;
+ }
+ }
+
+ @Command(
+ name = "validate",
+ description =
+ "Checks if the all the migrations haven been applied " + "on the target database.")
+ public Integer validate() {
+ try {
+ parseConfig();
+ flyway.validate();
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Database migration validation failed due to ", e);
+ return 1;
+ }
+ }
+
+ @Command(
+ name = "repair",
+ description =
+ "Repairs the DATABASE_CHANGE_LOG table which is used to track"
+ + "all the migrations on the target database This involves removing entries for the failed migrations and update"
+ + "the checksum of migrations already applied on the target database")
+ public Integer repair() {
+ try {
+ parseConfig();
+ flyway.repair();
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Repair of CHANGE_LOG failed due to ", e);
+ return 1;
+ }
+ }
+
+ @Command(
+ name = "check-connection",
+ description =
+ "Checks if a connection can be successfully " + "obtained for the target database")
+ public Integer checkConnection() {
+ try {
+ parseConfig();
+ flyway.getConfiguration().getDataSource().getConnection();
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Failed to check connection due to ", e);
+ return 1;
+ }
+ }
+
+ @Command(
+ name = "drop-create",
+ description =
+ "Deletes any tables in configured database and creates a new tables "
+ + "based on current version of OpenMetadata. This command also re-creates the search indexes.")
+ public Integer dropCreate() {
+ try {
+ promptUserForDelete();
+ parseConfig();
+ LOG.info("Deleting all the OpenMetadata tables.");
+ flyway.clean();
+ LOG.info("Creating the OpenMetadata Schema.");
+ flyway.migrate();
+ validateAndRunSystemDataMigrations(true);
+ LOG.info("OpenMetadata Database Schema is Updated.");
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Failed to drop create due to ", e);
+ return 1;
+ }
+ }
+
+ @Command(
+ name = "migrate",
+ description = "Migrates the OpenMetadata database schema and search index mappings.")
+ public Integer migrate(
+ @Option(
+ names = {"--force"},
+ description = "Forces migrations to be run again, even if they have ran previously",
+ defaultValue = "false")
+ boolean force) {
+ try {
+ LOG.info("Migrating the OpenMetadata Schema.");
+ parseConfig();
+ flyway.migrate();
+ validateAndRunSystemDataMigrations(force);
+ printChangeLog();
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Failed to db migration due to ", e);
+ return 1;
+ }
+ }
+
+ @Command(name = "changelog", description = "Prints the change log of database migration.")
+ public Integer changelog() {
+ try {
+ parseConfig();
+ printChangeLog();
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Failed to fetch db change log due to ", e);
+ return 1;
+ }
+ }
+
+ @Command(name = "reindex", description = "Re Indexes data into search engine from command line.")
+ public Integer reIndex(
+ @Option(
+ names = {"-b", "--batch-size"},
+ defaultValue = "100")
+ int batchSize,
+ @Option(
+ names = {"--recreate-indexes"},
+ defaultValue = "true")
+ boolean recreateIndexes) {
+ try {
+ parseConfig();
+ AppScheduler.initialize(collectionDAO, searchRepository);
+ App searchIndexApp =
+ new App()
+ .withId(UUID.randomUUID())
+ .withName("SearchIndexApp")
+ .withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp")
+ .withAppSchedule(
+ new AppSchedule().withScheduleType(AppSchedule.ScheduleTimeline.DAILY))
+ .withAppConfiguration(
+ new EventPublisherJob()
+ .withEntities(new HashSet<>(List.of("all")))
+ .withRecreateIndex(recreateIndexes)
+ .withBatchSize(batchSize)
+ .withSearchIndexMappingLanguage(
+ config.getElasticSearchConfiguration().getSearchIndexMappingLanguage()))
+ .withRuntime(new ScheduledExecutionContext().withEnabled(true));
+ AppScheduler.getInstance().triggerOnDemandApplication(searchIndexApp);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Failed to reindex due to ", e);
+ return 1;
+ }
+ }
+
+ @Command(name = "deploy-pipelines", description = "Deploy all the service pipelines.")
+ public Integer deployPipelines() {
+ try {
+ LOG.info("Deploying Pipelines");
+ parseConfig();
+ PipelineServiceClient pipelineServiceClient =
+ PipelineServiceClientFactory.createPipelineServiceClient(
+ config.getPipelineServiceClientConfiguration());
+ IngestionPipelineRepository pipelineRepository =
+ (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
+ List pipelines =
+ pipelineRepository.listAll(
+ new EntityUtil.Fields(Set.of(FIELD_OWNER, "service")),
+ new ListFilter(Include.NON_DELETED));
+ LOG.debug(String.format("Pipelines %d", pipelines.size()));
+ List columns = Arrays.asList("Name", "Type", "Service Name", "Status");
+ List> pipelineStatuses = new ArrayList<>();
+ for (IngestionPipeline pipeline : pipelines) {
+ deployPipeline(pipeline, pipelineServiceClient, pipelineStatuses);
+ }
+ printToAsciiTable(columns, pipelineStatuses, "No Pipelines Found");
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Failed to deploy pipelines due to ", e);
+ return 1;
+ }
+ }
+
+ private void deployPipeline(
+ IngestionPipeline pipeline,
+ PipelineServiceClient pipelineServiceClient,
+ List> pipelineStatuses) {
+ try {
+ LOG.debug(String.format("deploying pipeline %s", pipeline.getName()));
+ pipeline.setOpenMetadataServerConnection(new OpenMetadataConnectionBuilder(config).build());
+ secretsManager.decryptIngestionPipeline(pipeline);
+ OpenMetadataConnection openMetadataServerConnection =
+ new OpenMetadataConnectionBuilder(config).build();
+ pipeline.setOpenMetadataServerConnection(
+ secretsManager.encryptOpenMetadataConnection(openMetadataServerConnection, false));
+ ServiceEntityInterface service =
+ Entity.getEntity(pipeline.getService(), "", Include.NON_DELETED);
+ pipelineServiceClient.deployPipeline(pipeline, service);
+ } catch (Exception e) {
+ LOG.error(
+ String.format(
+ "Failed to deploy pipeline %s of type %s for service %s",
+ pipeline.getName(),
+ pipeline.getPipelineType().value(),
+ pipeline.getService().getName()),
+ e);
+ pipeline.setDeployed(false);
+ } finally {
+ LOG.debug("update the pipeline");
+ collectionDAO.ingestionPipelineDAO().update(pipeline);
+ pipelineStatuses.add(
+ Arrays.asList(
+ pipeline.getName(),
+ pipeline.getPipelineType().value(),
+ pipeline.getService().getName(),
+ pipeline.getDeployed().toString()));
+ }
+ }
+
+ private void parseConfig() throws Exception {
+ if (debug) {
+ Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
+ root.setLevel(Level.DEBUG);
+ }
+ ObjectMapper objectMapper = Jackson.newObjectMapper();
+ Validator validator = Validators.newValidator();
+ YamlConfigurationFactory factory =
+ new YamlConfigurationFactory<>(
+ OpenMetadataApplicationConfig.class, validator, objectMapper, "dw");
+ config =
+ factory.build(
+ new SubstitutingSourceProvider(
+ new FileConfigurationSourceProvider(), new EnvironmentVariableSubstitutor(false)),
+ configFilePath);
+ Fernet.getInstance().setFernetKey(config);
+ DataSourceFactory dataSourceFactory = config.getDataSourceFactory();
+ if (dataSourceFactory == null) {
+ throw new IllegalArgumentException("No database in config file");
+ }
+ // Check for db auth providers.
+ DatabaseAuthenticationProviderFactory.get(dataSourceFactory.getUrl())
+ .ifPresent(
+ databaseAuthenticationProvider -> {
+ String token =
+ databaseAuthenticationProvider.authenticate(
+ dataSourceFactory.getUrl(),
+ dataSourceFactory.getUser(),
+ dataSourceFactory.getPassword());
+ dataSourceFactory.setPassword(token);
+ });
+
+ String jdbcUrl = dataSourceFactory.getUrl();
+ String user = dataSourceFactory.getUser();
+ String password = dataSourceFactory.getPassword();
+ assert user != null && password != null;
+
+ String flywayRootPath = config.getMigrationConfiguration().getFlywayPath();
+ String location =
+ "filesystem:"
+ + flywayRootPath
+ + File.separator
+ + config.getDataSourceFactory().getDriverClass();
+ flyway =
+ Flyway.configure()
+ .encoding(StandardCharsets.UTF_8)
+ .table("DATABASE_CHANGE_LOG")
+ .sqlMigrationPrefix("v")
+ .validateOnMigrate(false)
+ .outOfOrder(false)
+ .baselineOnMigrate(true)
+ .baselineVersion(MigrationVersion.fromVersion("000"))
+ .cleanOnValidationError(false)
+ .locations(location)
+ .dataSource(jdbcUrl, user, password)
+ .cleanDisabled(false)
+ .load();
+ nativeSQLScriptRootPath = config.getMigrationConfiguration().getNativePath();
+ extensionSQLScriptRootPath = config.getMigrationConfiguration().getExtensionPath();
+ jdbi = Jdbi.create(jdbcUrl, user, password);
+ jdbi.installPlugin(new SqlObjectPlugin());
+ jdbi.getConfig(SqlObjects.class)
+ .setSqlLocator(
+ new ConnectionAwareAnnotationSqlLocator(
+ config.getDataSourceFactory().getDriverClass()));
+
+ searchRepository =
+ new SearchRepository(config.getElasticSearchConfiguration(), new SearchIndexFactory());
+
+ // Initialize secrets manager
+ secretsManager =
+ SecretsManagerFactory.createSecretsManager(
+ config.getSecretsManagerConfiguration(), config.getClusterName());
+
+ collectionDAO = jdbi.onDemand(CollectionDAO.class);
+ Entity.setCollectionDAO(collectionDAO);
+ Entity.initializeRepositories(config, jdbi);
+ }
+
+ private void promptUserForDelete() {
+ LOG.info(
+ """
+ You are about drop all the data in the database. ALL METADATA WILL BE DELETED.\s
+ This is not recommended for a Production setup or any deployment where you have collected\s
+ a lot of information from the users, such as descriptions, tags, etc.
+ """);
+ String input = "";
+ Scanner scanner = new Scanner(System.in);
+ while (!input.equals("DELETE")) {
+ LOG.info("Enter QUIT to quit. If you still want to continue, please enter DELETE: ");
+ input = scanner.next();
+ if (input.equals("QUIT")) {
+ LOG.info("Exiting without deleting data");
+ System.exit(1);
+ }
+ }
+ }
+
+ private void validateAndRunSystemDataMigrations(boolean force) {
+ ConnectionType connType = ConnectionType.from(config.getDataSourceFactory().getDriverClass());
+ DatasourceConfig.initialize(connType.label);
+ MigrationWorkflow workflow =
+ new MigrationWorkflow(
+ jdbi, nativeSQLScriptRootPath, connType, extensionSQLScriptRootPath, force);
+ workflow.loadMigrations();
+ workflow.runMigrationWorkflows();
+ Entity.cleanup();
+ }
+
+ private void printToAsciiTable(List columns, List> rows, String emptyText) {
+ LOG.info(new AsciiTable(columns, rows, true, "", emptyText).render());
+ }
+
+ private void printChangeLog() {
+ MigrationDAO migrationDAO = jdbi.onDemand(MigrationDAO.class);
+ List serverChangeLogs =
+ migrationDAO.listMetricsFromDBMigrations();
+ Set columns = new LinkedHashSet<>(Set.of("version", "installedOn"));
+ List> rows = new ArrayList<>();
+ for (MigrationDAO.ServerChangeLog serverChangeLog : serverChangeLogs) {
+ List row = new ArrayList<>();
+ JsonObject metricsJson = new Gson().fromJson(serverChangeLog.getMetrics(), JsonObject.class);
+ Set keys = metricsJson.keySet();
+ columns.addAll(keys);
+ row.add(serverChangeLog.getVersion());
+ row.add(serverChangeLog.getInstalledOn());
+ row.addAll(
+ metricsJson.entrySet().stream()
+ .map(Map.Entry::getValue)
+ .map(JsonElement::toString)
+ .toList());
+ rows.add(row);
+ }
+ printToAsciiTable(columns.stream().toList(), rows, "No Server Change log found");
+ }
+
+ public static void main(String... args) {
+ int exitCode =
+ new CommandLine(new org.openmetadata.service.util.OpenMetadataOperations()).execute(args);
+ System.exit(exitCode);
+ }
+}
diff --git a/openmetadata-service/src/main/resources/logback.xml b/openmetadata-service/src/main/resources/logback.xml
index f1c245a14900..bc95de867619 100644
--- a/openmetadata-service/src/main/resources/logback.xml
+++ b/openmetadata-service/src/main/resources/logback.xml
@@ -2,15 +2,20 @@
-
- ${LOG_LEVEL:-INFO}
-
- %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n
-
+ %msg%n
+
+
+
+ ./logs/openmetadata-operation.log
+ true
+ true
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a0f8fd183ec4..3f19b9ace240 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,6 +150,7 @@
5.4.0
1.29.2
2.41.1
+ 4.7.5
@@ -480,6 +481,11 @@
resilience4j-retry
2.1.0
+
+ info.picocli
+ picocli
+ ${picocli.version}
+