From 0303b44b9cb4c2d6d723b64f81a7044489229855 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 28 Dec 2023 23:22:58 -0800 Subject: [PATCH] OpenMetadata Operations Command Line Utility (#14504) * Add new OpenMetadataSetup command line application to migrate/deploy and re-index * Add new OpenMetadataSetup command line application to migrate/deploy and re-index * Add deployPipelines option * Add reIndex option * add subcommands * add provision to store upgrade metrics * rename bootstrap script * fix styling checks * Add changelog and store metrics into SERVER_CHANGE_LOG * Cast jsonb * Cast jsonb --------- Co-authored-by: Pere Miquel Brull --- bootstrap/openmetadata-ops.sh | 50 ++ .../v015__update_server_change_log.sql | 1 + .../v015__update_server_change_log.sql | 1 + conf/openmetadata.yaml | 16 +- openmetadata-service/pom.xml | 4 + .../java/org/openmetadata/service/Entity.java | 4 +- .../bundles/searchIndex/SearchIndexApp.java | 39 +- .../PipelineServiceClientFactory.java | 2 +- .../service/jdbi3/MigrationDAO.java | 51 +- .../migration/api/MigrationWorkflow.java | 48 +- .../migration/context/MigrationContext.java | 4 +- .../context/MigrationWorkflowContext.java | 8 +- .../service/resources/apps/AppResource.java | 2 + .../IngestionPipelineResource.java | 2 - .../openmetadata/service/util/AsciiTable.java | 126 +++++ .../service/util/OpenMetadataOperations.java | 467 ++++++++++++++++++ .../src/main/resources/logback.xml | 15 +- pom.xml | 6 + 18 files changed, 796 insertions(+), 50 deletions(-) create mode 100755 bootstrap/openmetadata-ops.sh create mode 100644 bootstrap/sql/migrations/flyway/com.mysql.cj.jdbc.Driver/v015__update_server_change_log.sql create mode 100644 bootstrap/sql/migrations/flyway/org.postgresql.Driver/v015__update_server_change_log.sql create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/util/AsciiTable.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java diff --git a/bootstrap/openmetadata-ops.sh b/bootstrap/openmetadata-ops.sh new file mode 100755 index 000000000000..ecb90428446d --- /dev/null +++ b/bootstrap/openmetadata-ops.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Resolve links - $0 may be a softlink +PRG="${0}" +while [ -h "${PRG}" ]; do + ls=`ls -ld "${PRG}"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "${PRG}"`/"$link" + fi +done + +BOOTSTRAP_DIR=`dirname ${PRG}` +CONFIG_FILE_PATH=${BOOTSTRAP_DIR}/../conf/openmetadata.yaml + +# Which java to use +if [ -z "${JAVA_HOME}" ]; then + JAVA="java" +else + JAVA="${JAVA_HOME}/bin/java" +fi + +OPENMETADATA_SETUP_MAIN_CLASS=org.openmetadata.service.util.OpenMetadataOperations +LIBS_DIR="${BOOTSTRAP_DIR}"/../libs/ +if [ ${debug} ] ; then + echo $LIBS_DIR +fi +if [ -d "${LIBS_DIR}" ]; then + for file in "${LIBS_DIR}"*.jar; + do + CLASSPATH="$CLASSPATH":"$file" + done +else + CLASSPATH=`mvn -pl openmetadata-service -q exec:exec -Dexec.executable=echo -Dexec.args="%classpath"` +fi + +${JAVA} -Dbootstrap.dir=$BOOTSTRAP_DIR -cp ${CLASSPATH} ${OPENMETADATA_SETUP_MAIN_CLASS} -c $CONFIG_FILE_PATH "$@" + diff --git a/bootstrap/sql/migrations/flyway/com.mysql.cj.jdbc.Driver/v015__update_server_change_log.sql b/bootstrap/sql/migrations/flyway/com.mysql.cj.jdbc.Driver/v015__update_server_change_log.sql new file mode 100644 index 000000000000..5680beac3c39 --- /dev/null +++ b/bootstrap/sql/migrations/flyway/com.mysql.cj.jdbc.Driver/v015__update_server_change_log.sql @@ -0,0 +1 @@ +ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics JSON; \ No newline at end of file diff --git a/bootstrap/sql/migrations/flyway/org.postgresql.Driver/v015__update_server_change_log.sql b/bootstrap/sql/migrations/flyway/org.postgresql.Driver/v015__update_server_change_log.sql new file mode 100644 index 000000000000..93468558e872 --- /dev/null +++ b/bootstrap/sql/migrations/flyway/org.postgresql.Driver/v015__update_server_change_log.sql @@ -0,0 +1 @@ +ALTER TABLE SERVER_CHANGE_LOG ADD COLUMN metrics jsonb; \ No newline at end of file diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index 921bc880f426..1706c81a0680 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -91,7 +91,19 @@ server: logging: level: ${LOG_LEVEL:-INFO} loggers: - io.swagger: DEBUG + org.openmetadata.service.util.OpenMetadataSetup: + level: INFO + appenders: + - type: console + logFormat: "%msg%n" + timeZone: UTC + - type: file + logFormat: "%level [%d{ISO8601,UTC}] [%t] %logger{5} - %msg%n" + currentLogFilename: ./logs/openmetadata-operations.log + archivedLogFilenamePattern: ./logs/openmetadata-operations-%d{yyyy-MM-dd}-%i.log.gz + archivedFileCount: 7 + timeZone: UTC + maxFileSize: 50MB appenders: - type: console threshold: TRACE @@ -249,7 +261,7 @@ pipelineServiceClientConfiguration: # If we don't need this, set "org.openmetadata.service.clients.pipeline.noop.NoopClient" className: ${PIPELINE_SERVICE_CLIENT_CLASS_NAME:-"org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient"} apiEndpoint: ${PIPELINE_SERVICE_CLIENT_ENDPOINT:-http://localhost:8080} - metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://localhost:8585/api} + metadataApiEndpoint: ${SERVER_HOST_API_URL:-http://host.docker.internal:8585/api} ingestionIpInfoEnabled: ${PIPELINE_SERVICE_IP_INFO_ENABLED:-false} hostIp: ${PIPELINE_SERVICE_CLIENT_HOST_IP:-""} healthCheckInterval: ${PIPELINE_SERVICE_CLIENT_HEALTH_CHECK_INTERVAL:-300} diff --git a/openmetadata-service/pom.xml b/openmetadata-service/pom.xml index fdf05a1fad99..932634376afa 100644 --- a/openmetadata-service/pom.xml +++ b/openmetadata-service/pom.xml @@ -485,6 +485,10 @@ 1.9.3 test + + info.picocli + picocli + diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java index 0d2f9481da2f..06c821040759 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/Entity.java @@ -297,7 +297,7 @@ public static void registerEntity( ENTITY_LIST.add(entity); Collections.sort(ENTITY_LIST); - LOG.info("Registering entity {} {}", clazz, entity); + LOG.debug("Registering entity {} {}", clazz, entity); } public static void registerEntity( @@ -309,7 +309,7 @@ public static void registerEntity( ENTITY_LIST.add(entity); Collections.sort(ENTITY_LIST); - LOG.info("Registering entity time series {} {}", clazz, entity); + LOG.debug("Registering entity time series {} {}", clazz, entity); } public static void registerResourcePermissions( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java index bbed494d4c01..52baf77e8708 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.java @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -54,6 +55,40 @@ @Slf4j public class SearchIndexApp extends AbstractNativeApplication { + + private static final String ALL = "all"; + private static final Set ALL_ENTITIES = + Set.of( + "table", + "dashboard", + "topic", + "pipeline", + "searchIndex", + "user", + "team", + "glossaryTerm", + "mlmodel", + "tag", + "classification", + "query", + "container", + "database", + "databaseSchema", + "testCase", + "testSuite", + "chart", + "dashboardDataModel", + "databaseService", + "messagingService", + "dashboardService", + "pipelineService", + "mlmodelService", + "searchService", + "entityReportData", + "webAnalyticEntityViewReportData", + "webAnalyticUserActivityReportData", + "domain", + "storedProcedure"); private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s"; private final List paginatedEntitiesSources = new ArrayList<>(); private final List paginatedDataInsightSources = new ArrayList<>(); @@ -67,12 +102,14 @@ public class SearchIndexApp extends AbstractNativeApplication { @Override public void init(App app, CollectionDAO dao, SearchRepository searchRepository) { super.init(app, dao, searchRepository); - // request for reindexing EventPublisherJob request = JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class) .withStats(new Stats()) .withFailure(new Failure()); + if (request.getEntities().contains(ALL)) { + request.setEntities(ALL_ENTITIES); + } int totalRecords = getTotalRequestToProcess(request.getEntities(), collectionDAO); this.jobData = request; this.jobData.setStats( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java index 0665f14e5167..e5d8d9ff2b64 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClientFactory.java @@ -35,7 +35,7 @@ public static PipelineServiceClient createPipelineServiceClient( } String pipelineServiceClientClass = config.getClassName(); - LOG.info("Registering PipelineServiceClient: {}", pipelineServiceClientClass); + LOG.debug("Registering PipelineServiceClient: {}", pipelineServiceClientClass); try { pipelineServiceClient = diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java index 08ccd3d82377..7fe7b393d2c1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MigrationDAO.java @@ -3,13 +3,20 @@ import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; import java.util.Optional; +import lombok.Builder; import lombok.Getter; import lombok.Setter; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; import org.jdbi.v3.core.statement.StatementException; import org.jdbi.v3.sqlobject.SingleValue; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery; import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate; @@ -53,26 +60,29 @@ String getSqlQuery(@Bind("version") String version, @Bind("checksum") String che @ConnectionAwareSqlUpdate( value = - "INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, installed_on)" - + "VALUES (:version, :migrationFileName, :checksum, CURRENT_TIMESTAMP) " + "INSERT INTO SERVER_CHANGE_LOG (version, migrationFileName, checksum, metrics, installed_on)" + + "VALUES (:version, :migrationFileName, :checksum, :metrics, CURRENT_TIMESTAMP) " + "ON DUPLICATE KEY UPDATE " + "migrationFileName = :migrationFileName, " + "checksum = :checksum, " + + "metrics = :metrics," + "installed_on = CURRENT_TIMESTAMP", connectionType = MYSQL) @ConnectionAwareSqlUpdate( value = - "INSERT INTO server_change_log (version, migrationFileName, checksum, installed_on)" - + "VALUES (:version, :migrationFileName, :checksum, current_timestamp) " + "INSERT INTO server_change_log (version, migrationFileName, checksum, metrics, installed_on)" + + "VALUES (:version, :migrationFileName, :checksum, to_jsonb(:metrics::text), current_timestamp) " + "ON CONFLICT (version) DO UPDATE SET " + "migrationFileName = EXCLUDED.migrationFileName, " + + "metrics = to_jsonb(:metrics::text)," + "checksum = EXCLUDED.checksum, " + "installed_on = EXCLUDED.installed_on", connectionType = POSTGRES) void upsertServerMigration( @Bind("version") String version, @Bind("migrationFileName") String migrationFileName, - @Bind("checksum") String checksum); + @Bind("checksum") String checksum, + @Bind("metrics") String metrics); @ConnectionAwareSqlUpdate( value = @@ -113,6 +123,11 @@ void upsertServerMigrationSQL( connectionType = POSTGRES) String checkIfQueryPreviouslyRan(@Bind("checksum") String checksum); + @SqlQuery( + "SELECT installed_rank, version, migrationFileName, checksum, installed_on, metrics FROM SERVER_CHANGE_LOG ORDER BY version ASC") + @RegisterRowMapper(FromServerChangeLogMapper.class) + List listMetricsFromDBMigrations(); + @Getter @Setter class ServerMigrationSQLTable { @@ -120,4 +135,30 @@ class ServerMigrationSQLTable { private String sqlStatement; private String checkSum; } + + @Getter + @Setter + @Builder + class ServerChangeLog { + private Integer installedRank; + private String version; + private String migrationFileName; + private String checksum; + private String installedOn; + private String metrics; + } + + class FromServerChangeLogMapper implements RowMapper { + @Override + public ServerChangeLog map(ResultSet rs, StatementContext ctx) throws SQLException { + return ServerChangeLog.builder() + .installedRank(rs.getInt("installed_rank")) + .version(rs.getString("version")) + .migrationFileName(rs.getString("migrationFileName")) + .checksum(rs.getString("checksum")) + .installedOn(rs.getString("installed_on")) + .metrics(rs.getString("metrics")) + .build(); + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java index 5f162f74a511..27891346e93a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/api/MigrationWorkflow.java @@ -7,13 +7,14 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Jdbi; +import org.json.JSONObject; import org.openmetadata.service.jdbi3.MigrationDAO; import org.openmetadata.service.jdbi3.locator.ConnectionType; +import org.openmetadata.service.migration.context.MigrationContext; import org.openmetadata.service.migration.context.MigrationWorkflowContext; import org.openmetadata.service.migration.utils.MigrationFile; @@ -29,6 +30,8 @@ public class MigrationWorkflow { private final boolean forceMigrations; + private Optional currentMaxMigrationVersion; + public MigrationWorkflow( Jdbi jdbi, String nativeSQLScriptRootPath, @@ -86,26 +89,26 @@ public List getMigrationFiles( return Stream.concat( availableOMNativeMigrations.stream(), availableExtensionMigrations.stream()) .sorted() - .collect(Collectors.toList()); + .toList(); } public List getMigrationFilesFromPath(String path, ConnectionType connectionType) { return Arrays.stream(Objects.requireNonNull(new File(path).listFiles(File::isDirectory))) .map(dir -> new MigrationFile(dir, migrationDAO, connectionType)) .sorted() - .collect(Collectors.toList()); + .toList(); } private List filterAndGetMigrationsToRun( List availableMigrations) { LOG.debug("Filtering Server Migrations"); - Optional previousMaxMigration = migrationDAO.getMaxServerMigrationVersion(); + currentMaxMigrationVersion = migrationDAO.getMaxServerMigrationVersion(); List applyMigrations; - if (previousMaxMigration.isPresent() && !forceMigrations) { + if (currentMaxMigrationVersion.isPresent() && !forceMigrations) { applyMigrations = availableMigrations.stream() - .filter(migration -> migration.biggerThan(previousMaxMigration.get())) - .collect(Collectors.toList()); + .filter(migration -> migration.biggerThan(currentMaxMigrationVersion.get())) + .toList(); } else { applyMigrations = availableMigrations; } @@ -125,14 +128,16 @@ private List filterAndGetMigrationsToRun( return processes; } - @SuppressWarnings("unused") - private void initializeMigrationWorkflow() {} - public void runMigrationWorkflows() { try (Handle transactionHandler = jdbi.open()) { LOG.info("[MigrationWorkflow] WorkFlow Started"); MigrationWorkflowContext context = new MigrationWorkflowContext(transactionHandler); - context.computeInitialContext(); + if (currentMaxMigrationVersion.isPresent()) { + LOG.debug("Current Max version {}", currentMaxMigrationVersion.get()); + context.computeInitialContext(currentMaxMigrationVersion.get()); + } else { + context.computeInitialContext("1.1.0"); + } try { for (MigrationProcess process : migrations) { // Initialise Migration Steps @@ -176,7 +181,7 @@ public void runMigrationWorkflows() { process.getVersion(), process.getDatabaseConnectionType(), process.getMigrationsPath()); - updateMigrationStepInDB(process); + updateMigrationStepInDB(process, context); } } catch (Exception e) { @@ -190,17 +195,14 @@ public void runMigrationWorkflows() { LOG.info("[MigrationWorkflow] WorkFlow Completed"); } - public void closeMigrationWorkflow() { - // 1. Write to DB table the version we upgraded to - // should be the current server version - - // 2. Commit Transaction on completion - } - - public void updateMigrationStepInDB(MigrationProcess step) { + public void updateMigrationStepInDB( + MigrationProcess step, MigrationWorkflowContext workflowContext) { + MigrationContext context = workflowContext.getMigrationContext().get(step.getVersion()); + JSONObject metrics = new JSONObject(context.getResults()); migrationDAO.upsertServerMigration( - step.getVersion(), step.getMigrationsPath(), UUID.randomUUID().toString()); + step.getVersion(), + step.getMigrationsPath(), + UUID.randomUUID().toString(), + metrics.toString()); } - - public void migrateSearchIndexes() {} } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationContext.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationContext.java index 93317522cfff..7e5c66fec722 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationContext.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationContext.java @@ -2,7 +2,6 @@ import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -20,8 +19,7 @@ public class MigrationContext { public MigrationContext(String version, List migrationOps, Handle handle) { this.version = version; this.migrationOps = - Stream.concat(migrationOps.stream(), CommonMigrationOps.getCommonOps().stream()) - .collect(Collectors.toList()); + Stream.concat(migrationOps.stream(), CommonMigrationOps.getCommonOps().stream()).toList(); this.handle = handle; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationWorkflowContext.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationWorkflowContext.java index 09aedfba0a07..8c610424704a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationWorkflowContext.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/context/MigrationWorkflowContext.java @@ -10,19 +10,15 @@ @Slf4j public class MigrationWorkflowContext { @Getter private final HashMap migrationContext; - private final MigrationContext initialContext; private final Handle handle; public MigrationWorkflowContext(Handle handle) { this.migrationContext = new HashMap<>(); this.handle = handle; - - // Initialize the context only with the common ops - this.initialContext = new MigrationContext("initial", List.of(), handle); } - public void computeInitialContext() { - computeMigrationSafely(this.initialContext); + public void computeInitialContext(String currentMaxMigrationVersion) { + computeMigrationSafely(new MigrationContext(currentMaxMigrationVersion, List.of(), handle)); } public void computeMigrationContext(MigrationProcess process) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index 4789d725a706..b96ec1df2efb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -861,6 +861,8 @@ public Response deployApplicationFlow( pipelineServiceClient.deployPipeline(ingestionPipeline, service); if (status.getCode() == 200) { ingestionPipelineRepository.createOrUpdate(uriInfo, ingestionPipeline); + } else { + ingestionPipeline.setDeployed(false); } return Response.status(status.getCode()).entity(status).build(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java index 65fc8e6a5703..636f35528de7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -885,8 +885,6 @@ private PipelineServiceClientResponse deployPipelineInternal( UUID id, UriInfo uriInfo, SecurityContext securityContext) { Fields fields = getFields(FIELD_OWNER); IngestionPipeline ingestionPipeline = repository.get(uriInfo, id, fields); - ingestionPipeline.setOpenMetadataServerConnection( - new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build()); decryptOrNullify(securityContext, ingestionPipeline, true); ServiceEntityInterface service = Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/AsciiTable.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/AsciiTable.java new file mode 100644 index 000000000000..e3e971d5356a --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/AsciiTable.java @@ -0,0 +1,126 @@ +package org.openmetadata.service.util; + +import java.util.ArrayList; +import java.util.List; + +public class AsciiTable { + private static final String DEFAULT_COLUMN_NAME = "(No column name)"; + private static final String DEFAULT_NO_VALUE = "-"; + + private final List columns; + private final List> rows; + private final boolean printHeader; + private final String nullText; + private final String emptyText; + + public AsciiTable( + List columns, + List> rows, + boolean printHeader, + String nullText, + String emptyText) { + this.columns = ensureValidColumns(columns); + this.rows = rows; + this.printHeader = printHeader; + this.nullText = nullText; + this.emptyText = emptyText; + } + + private static List ensureValidColumns(List columns) { + List validColumns = new ArrayList<>(); + for (String column : columns) { + validColumns.add(column != null ? column : DEFAULT_COLUMN_NAME); + } + return validColumns; + } + + /** + * @return The table rendered with column header and row data. + */ + public String render() { + List widths = new ArrayList<>(); + for (String column : columns) { + widths.add(column.length()); + } + + for (List row : rows) { + for (int i = 0; i < row.size(); i++) { + widths.set(i, Math.max(widths.get(i), getValue(row, i).length())); + } + } + + StringBuilder ruler = new StringBuilder("+"); + for (Integer width : widths) { + ruler.append("-").append(trimOrPad("", width, '-')).append("-+"); + } + ruler.append("\n"); + + StringBuilder result = new StringBuilder(); + + if (printHeader) { + StringBuilder header = new StringBuilder("|"); + for (int i = 0; i < widths.size(); i++) { + header.append(" ").append(trimOrPad(columns.get(i), widths.get(i), ' ')).append(" |"); + } + header.append("\n"); + + result.append(ruler); + result.append(header); + } + + result.append(ruler); + + if (rows.isEmpty()) { + result + .append("| ") + .append(trimOrPad(emptyText, ruler.length() - Math.min(ruler.length(), 5))) + .append(" |\n"); + } else { + for (List row : rows) { + StringBuilder r = new StringBuilder("|"); + for (int i = 0; i < widths.size(); i++) { + r.append(" ").append(trimOrPad(getValue(row, i), widths.get(i), ' ')).append(" |"); + } + r.append("\n"); + result.append(r); + } + } + + result.append(ruler); + return result.toString(); + } + + private String getValue(List row, int i) { + try { + String value = row.get(i); + if (value == null) { + value = nullText; + } + return value; + } catch (IndexOutOfBoundsException e) { + return DEFAULT_NO_VALUE; + } + } + + private String trimOrPad(String str, int length, char padChar) { + StringBuilder result; + if (str == null) { + result = new StringBuilder(); + } else { + result = new StringBuilder(str); + } + + if (result.length() > length) { + return result.substring(0, length); + } + + while (result.length() < length) { + result.append(padChar); + } + return result.toString(); + } + + private String trimOrPad(String str, int length) { + return trimOrPad(str, length, ' '); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java new file mode 100644 index 000000000000..cc40a5559442 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -0,0 +1,467 @@ +package org.openmetadata.service.util; + +import static org.flywaydb.core.internal.info.MigrationInfoDumper.dumpToAsciiTable; +import static org.openmetadata.service.Entity.FIELD_OWNER; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.dropwizard.configuration.EnvironmentVariableSubstitutor; +import io.dropwizard.configuration.FileConfigurationSourceProvider; +import io.dropwizard.configuration.SubstitutingSourceProvider; +import io.dropwizard.configuration.YamlConfigurationFactory; +import io.dropwizard.db.DataSourceFactory; +import io.dropwizard.jackson.Jackson; +import io.dropwizard.jersey.validation.Validators; +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Scanner; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import javax.validation.Validator; +import lombok.extern.slf4j.Slf4j; +import org.flywaydb.core.Flyway; +import org.flywaydb.core.api.MigrationVersion; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; +import org.jdbi.v3.sqlobject.SqlObjects; +import org.openmetadata.schema.ServiceEntityInterface; +import org.openmetadata.schema.entity.app.App; +import org.openmetadata.schema.entity.app.AppSchedule; +import org.openmetadata.schema.entity.app.ScheduledExecutionContext; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.services.connections.metadata.OpenMetadataConnection; +import org.openmetadata.schema.system.EventPublisherJob; +import org.openmetadata.schema.type.Include; +import org.openmetadata.sdk.PipelineServiceClient; +import org.openmetadata.service.Entity; +import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.apps.scheduler.AppScheduler; +import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; +import org.openmetadata.service.fernet.Fernet; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.jdbi3.MigrationDAO; +import org.openmetadata.service.jdbi3.locator.ConnectionAwareAnnotationSqlLocator; +import org.openmetadata.service.jdbi3.locator.ConnectionType; +import org.openmetadata.service.migration.api.MigrationWorkflow; +import org.openmetadata.service.resources.databases.DatasourceConfig; +import org.openmetadata.service.search.SearchIndexFactory; +import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.secrets.SecretsManager; +import org.openmetadata.service.secrets.SecretsManagerFactory; +import org.openmetadata.service.util.jdbi.DatabaseAuthenticationProviderFactory; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +@Slf4j +@Command( + name = "OpenMetadataSetup", + mixinStandardHelpOptions = true, + version = "OpenMetadataSetup 1.3", + description = + "Creates or Migrates Database/Search Indexes. ReIndex the existing data into Elastic Search " + + "or OpenSearch. Re-Deploys the service pipelines.") +public class OpenMetadataOperations implements Callable { + + private OpenMetadataApplicationConfig config; + private Flyway flyway; + private Jdbi jdbi; + private SearchRepository searchRepository; + private String nativeSQLScriptRootPath; + private String extensionSQLScriptRootPath; + private SecretsManager secretsManager; + private CollectionDAO collectionDAO; + + @Option( + names = {"-d", "--debug"}, + defaultValue = "false") + private boolean debug; + + @Option( + names = {"-c", "--config"}, + required = true) + private String configFilePath; + + @Override + public Integer call() { + LOG.info( + "Subcommand needed: 'info', 'validate', 'repair', 'check-connection', " + + "'drop-create', 'migrate', 'reindex', 'deploy-pipelines'"); + return 0; + } + + @Command( + name = "info", + description = + "Shows the list of migrations applied and the pending migration " + + "waiting to be applied on the target database") + public Integer info() { + try { + parseConfig(); + LOG.info(dumpToAsciiTable(flyway.info().all())); + return 0; + } catch (Exception e) { + LOG.error("Failed due to ", e); + return 1; + } + } + + @Command( + name = "validate", + description = + "Checks if the all the migrations haven been applied " + "on the target database.") + public Integer validate() { + try { + parseConfig(); + flyway.validate(); + return 0; + } catch (Exception e) { + LOG.error("Database migration validation failed due to ", e); + return 1; + } + } + + @Command( + name = "repair", + description = + "Repairs the DATABASE_CHANGE_LOG table which is used to track" + + "all the migrations on the target database This involves removing entries for the failed migrations and update" + + "the checksum of migrations already applied on the target database") + public Integer repair() { + try { + parseConfig(); + flyway.repair(); + return 0; + } catch (Exception e) { + LOG.error("Repair of CHANGE_LOG failed due to ", e); + return 1; + } + } + + @Command( + name = "check-connection", + description = + "Checks if a connection can be successfully " + "obtained for the target database") + public Integer checkConnection() { + try { + parseConfig(); + flyway.getConfiguration().getDataSource().getConnection(); + return 0; + } catch (Exception e) { + LOG.error("Failed to check connection due to ", e); + return 1; + } + } + + @Command( + name = "drop-create", + description = + "Deletes any tables in configured database and creates a new tables " + + "based on current version of OpenMetadata. This command also re-creates the search indexes.") + public Integer dropCreate() { + try { + promptUserForDelete(); + parseConfig(); + LOG.info("Deleting all the OpenMetadata tables."); + flyway.clean(); + LOG.info("Creating the OpenMetadata Schema."); + flyway.migrate(); + validateAndRunSystemDataMigrations(true); + LOG.info("OpenMetadata Database Schema is Updated."); + return 0; + } catch (Exception e) { + LOG.error("Failed to drop create due to ", e); + return 1; + } + } + + @Command( + name = "migrate", + description = "Migrates the OpenMetadata database schema and search index mappings.") + public Integer migrate( + @Option( + names = {"--force"}, + description = "Forces migrations to be run again, even if they have ran previously", + defaultValue = "false") + boolean force) { + try { + LOG.info("Migrating the OpenMetadata Schema."); + parseConfig(); + flyway.migrate(); + validateAndRunSystemDataMigrations(force); + printChangeLog(); + return 0; + } catch (Exception e) { + LOG.error("Failed to db migration due to ", e); + return 1; + } + } + + @Command(name = "changelog", description = "Prints the change log of database migration.") + public Integer changelog() { + try { + parseConfig(); + printChangeLog(); + return 0; + } catch (Exception e) { + LOG.error("Failed to fetch db change log due to ", e); + return 1; + } + } + + @Command(name = "reindex", description = "Re Indexes data into search engine from command line.") + public Integer reIndex( + @Option( + names = {"-b", "--batch-size"}, + defaultValue = "100") + int batchSize, + @Option( + names = {"--recreate-indexes"}, + defaultValue = "true") + boolean recreateIndexes) { + try { + parseConfig(); + AppScheduler.initialize(collectionDAO, searchRepository); + App searchIndexApp = + new App() + .withId(UUID.randomUUID()) + .withName("SearchIndexApp") + .withClassName("org.openmetadata.service.apps.bundles.searchIndex.SearchIndexApp") + .withAppSchedule( + new AppSchedule().withScheduleType(AppSchedule.ScheduleTimeline.DAILY)) + .withAppConfiguration( + new EventPublisherJob() + .withEntities(new HashSet<>(List.of("all"))) + .withRecreateIndex(recreateIndexes) + .withBatchSize(batchSize) + .withSearchIndexMappingLanguage( + config.getElasticSearchConfiguration().getSearchIndexMappingLanguage())) + .withRuntime(new ScheduledExecutionContext().withEnabled(true)); + AppScheduler.getInstance().triggerOnDemandApplication(searchIndexApp); + return 0; + } catch (Exception e) { + LOG.error("Failed to reindex due to ", e); + return 1; + } + } + + @Command(name = "deploy-pipelines", description = "Deploy all the service pipelines.") + public Integer deployPipelines() { + try { + LOG.info("Deploying Pipelines"); + parseConfig(); + PipelineServiceClient pipelineServiceClient = + PipelineServiceClientFactory.createPipelineServiceClient( + config.getPipelineServiceClientConfiguration()); + IngestionPipelineRepository pipelineRepository = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + List pipelines = + pipelineRepository.listAll( + new EntityUtil.Fields(Set.of(FIELD_OWNER, "service")), + new ListFilter(Include.NON_DELETED)); + LOG.debug(String.format("Pipelines %d", pipelines.size())); + List columns = Arrays.asList("Name", "Type", "Service Name", "Status"); + List> pipelineStatuses = new ArrayList<>(); + for (IngestionPipeline pipeline : pipelines) { + deployPipeline(pipeline, pipelineServiceClient, pipelineStatuses); + } + printToAsciiTable(columns, pipelineStatuses, "No Pipelines Found"); + return 0; + } catch (Exception e) { + LOG.error("Failed to deploy pipelines due to ", e); + return 1; + } + } + + private void deployPipeline( + IngestionPipeline pipeline, + PipelineServiceClient pipelineServiceClient, + List> pipelineStatuses) { + try { + LOG.debug(String.format("deploying pipeline %s", pipeline.getName())); + pipeline.setOpenMetadataServerConnection(new OpenMetadataConnectionBuilder(config).build()); + secretsManager.decryptIngestionPipeline(pipeline); + OpenMetadataConnection openMetadataServerConnection = + new OpenMetadataConnectionBuilder(config).build(); + pipeline.setOpenMetadataServerConnection( + secretsManager.encryptOpenMetadataConnection(openMetadataServerConnection, false)); + ServiceEntityInterface service = + Entity.getEntity(pipeline.getService(), "", Include.NON_DELETED); + pipelineServiceClient.deployPipeline(pipeline, service); + } catch (Exception e) { + LOG.error( + String.format( + "Failed to deploy pipeline %s of type %s for service %s", + pipeline.getName(), + pipeline.getPipelineType().value(), + pipeline.getService().getName()), + e); + pipeline.setDeployed(false); + } finally { + LOG.debug("update the pipeline"); + collectionDAO.ingestionPipelineDAO().update(pipeline); + pipelineStatuses.add( + Arrays.asList( + pipeline.getName(), + pipeline.getPipelineType().value(), + pipeline.getService().getName(), + pipeline.getDeployed().toString())); + } + } + + private void parseConfig() throws Exception { + if (debug) { + Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); + root.setLevel(Level.DEBUG); + } + ObjectMapper objectMapper = Jackson.newObjectMapper(); + Validator validator = Validators.newValidator(); + YamlConfigurationFactory factory = + new YamlConfigurationFactory<>( + OpenMetadataApplicationConfig.class, validator, objectMapper, "dw"); + config = + factory.build( + new SubstitutingSourceProvider( + new FileConfigurationSourceProvider(), new EnvironmentVariableSubstitutor(false)), + configFilePath); + Fernet.getInstance().setFernetKey(config); + DataSourceFactory dataSourceFactory = config.getDataSourceFactory(); + if (dataSourceFactory == null) { + throw new IllegalArgumentException("No database in config file"); + } + // Check for db auth providers. + DatabaseAuthenticationProviderFactory.get(dataSourceFactory.getUrl()) + .ifPresent( + databaseAuthenticationProvider -> { + String token = + databaseAuthenticationProvider.authenticate( + dataSourceFactory.getUrl(), + dataSourceFactory.getUser(), + dataSourceFactory.getPassword()); + dataSourceFactory.setPassword(token); + }); + + String jdbcUrl = dataSourceFactory.getUrl(); + String user = dataSourceFactory.getUser(); + String password = dataSourceFactory.getPassword(); + assert user != null && password != null; + + String flywayRootPath = config.getMigrationConfiguration().getFlywayPath(); + String location = + "filesystem:" + + flywayRootPath + + File.separator + + config.getDataSourceFactory().getDriverClass(); + flyway = + Flyway.configure() + .encoding(StandardCharsets.UTF_8) + .table("DATABASE_CHANGE_LOG") + .sqlMigrationPrefix("v") + .validateOnMigrate(false) + .outOfOrder(false) + .baselineOnMigrate(true) + .baselineVersion(MigrationVersion.fromVersion("000")) + .cleanOnValidationError(false) + .locations(location) + .dataSource(jdbcUrl, user, password) + .cleanDisabled(false) + .load(); + nativeSQLScriptRootPath = config.getMigrationConfiguration().getNativePath(); + extensionSQLScriptRootPath = config.getMigrationConfiguration().getExtensionPath(); + jdbi = Jdbi.create(jdbcUrl, user, password); + jdbi.installPlugin(new SqlObjectPlugin()); + jdbi.getConfig(SqlObjects.class) + .setSqlLocator( + new ConnectionAwareAnnotationSqlLocator( + config.getDataSourceFactory().getDriverClass())); + + searchRepository = + new SearchRepository(config.getElasticSearchConfiguration(), new SearchIndexFactory()); + + // Initialize secrets manager + secretsManager = + SecretsManagerFactory.createSecretsManager( + config.getSecretsManagerConfiguration(), config.getClusterName()); + + collectionDAO = jdbi.onDemand(CollectionDAO.class); + Entity.setCollectionDAO(collectionDAO); + Entity.initializeRepositories(config, jdbi); + } + + private void promptUserForDelete() { + LOG.info( + """ + You are about drop all the data in the database. ALL METADATA WILL BE DELETED.\s + This is not recommended for a Production setup or any deployment where you have collected\s + a lot of information from the users, such as descriptions, tags, etc. + """); + String input = ""; + Scanner scanner = new Scanner(System.in); + while (!input.equals("DELETE")) { + LOG.info("Enter QUIT to quit. If you still want to continue, please enter DELETE: "); + input = scanner.next(); + if (input.equals("QUIT")) { + LOG.info("Exiting without deleting data"); + System.exit(1); + } + } + } + + private void validateAndRunSystemDataMigrations(boolean force) { + ConnectionType connType = ConnectionType.from(config.getDataSourceFactory().getDriverClass()); + DatasourceConfig.initialize(connType.label); + MigrationWorkflow workflow = + new MigrationWorkflow( + jdbi, nativeSQLScriptRootPath, connType, extensionSQLScriptRootPath, force); + workflow.loadMigrations(); + workflow.runMigrationWorkflows(); + Entity.cleanup(); + } + + private void printToAsciiTable(List columns, List> rows, String emptyText) { + LOG.info(new AsciiTable(columns, rows, true, "", emptyText).render()); + } + + private void printChangeLog() { + MigrationDAO migrationDAO = jdbi.onDemand(MigrationDAO.class); + List serverChangeLogs = + migrationDAO.listMetricsFromDBMigrations(); + Set columns = new LinkedHashSet<>(Set.of("version", "installedOn")); + List> rows = new ArrayList<>(); + for (MigrationDAO.ServerChangeLog serverChangeLog : serverChangeLogs) { + List row = new ArrayList<>(); + JsonObject metricsJson = new Gson().fromJson(serverChangeLog.getMetrics(), JsonObject.class); + Set keys = metricsJson.keySet(); + columns.addAll(keys); + row.add(serverChangeLog.getVersion()); + row.add(serverChangeLog.getInstalledOn()); + row.addAll( + metricsJson.entrySet().stream() + .map(Map.Entry::getValue) + .map(JsonElement::toString) + .toList()); + rows.add(row); + } + printToAsciiTable(columns.stream().toList(), rows, "No Server Change log found"); + } + + public static void main(String... args) { + int exitCode = + new CommandLine(new org.openmetadata.service.util.OpenMetadataOperations()).execute(args); + System.exit(exitCode); + } +} diff --git a/openmetadata-service/src/main/resources/logback.xml b/openmetadata-service/src/main/resources/logback.xml index f1c245a14900..bc95de867619 100644 --- a/openmetadata-service/src/main/resources/logback.xml +++ b/openmetadata-service/src/main/resources/logback.xml @@ -2,15 +2,20 @@ - - ${LOG_LEVEL:-INFO} - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n - + %msg%n + + + + ./logs/openmetadata-operation.log + true + true + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + \ No newline at end of file diff --git a/pom.xml b/pom.xml index a0f8fd183ec4..3f19b9ace240 100644 --- a/pom.xml +++ b/pom.xml @@ -150,6 +150,7 @@ 5.4.0 1.29.2 2.41.1 + 4.7.5 @@ -480,6 +481,11 @@ resilience4j-retry 2.1.0 + + info.picocli + picocli + ${picocli.version} +