Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge upstream 2024 05 07 #284

Merged
merged 20 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6f02001
fix(ingest/airflow-plugin): emit the operation aspect (#10402)
dushayntAW May 3, 2024
a635620
feat(search): allow overriding case-sensitivity to zero (#10422)
david-leifker May 3, 2024
50d37fe
fix(ci): add labeled to list of pr types for ci (#10363)
david-leifker May 3, 2024
bda609b
docs(ingest): update datahub sink doc to include an acryl example (#1…
gabe-lyons May 3, 2024
fa24ca5
feat(ui) Support rich text for form descriptions (#10425)
chriscollins3456 May 3, 2024
5a686c5
feat(auth): improve authentication flow logging (#10428)
darnaut May 3, 2024
b325819
feat(upgrade): common base for mcl upgrades (#10429)
david-leifker May 3, 2024
41fa259
feat(search): autocomplete custom configuration (#10426)
david-leifker May 3, 2024
35dbbaa
fix(upgrade): fix upgrade npe (#10436)
david-leifker May 6, 2024
2766fcd
fix(docker): use distinct empty env files (#10438)
hsheth2 May 6, 2024
6a24ed2
feat(ingest/snowflake): use system sampling on very large tables (#10…
hsheth2 May 6, 2024
1dae37a
fix(ingest/bigquery): remove last modified timestamp fallback (#10431)
hsheth2 May 6, 2024
0e8fc51
feat(cli): cache sql parsing intermediates (#10399)
hsheth2 May 6, 2024
360445e
docs: fix blog link (#10441)
yoonhyejin May 7, 2024
ddb38e7
fix(ingestion/tableau): Fix tableau custom sql lineage gap (#10359)
shubhamjagtap639 May 7, 2024
28e97dd
fix(changeEvents): add description-parameter to the change-event of a…
ksrinath May 7, 2024
71759f9
feat(ci): add linting for cypress tests (#10424)
anshbansal May 7, 2024
d08f36f
feat(spark/openlineage): Use Openlineage 1.13.1 in Spark Plugin (#10433)
treff7es May 7, 2024
ae3f0fd
feat(ingestion): Copy urns from previous checkpoint state on ingestio…
shubhamjagtap639 May 7, 2024
313d42b
merge upstream resolve conflicts
anshbansal May 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ jobs:
if: ${{ steps.ci-optimize.outputs.smoke-test-change == 'true' }}
run: |
python ./.github/scripts/check_python_package.py
./gradlew :smoke-test:lint
./gradlew :smoke-test:pythonLint
./gradlew :smoke-test:cypressLint

gms_build:
name: Build and Push DataHub GMS Docker Image
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.5.0'
ext.openLineageVersion = '1.13.1'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
2 changes: 2 additions & 0 deletions datahub-frontend/app/auth/sso/oidc/OidcCallbackLogic.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ private Result handleOidcCallback(
"Failed to perform post authentication steps. Error message: %s", e.getMessage()));
}

log.info("OIDC callback authentication successful for user: {}", userName);

// Successfully logged in - Generate GMS login token
final String accessToken = authClient.generateSessionTokenForUser(corpUserUrn.getId());
return result
Expand Down
6 changes: 4 additions & 2 deletions datahub-frontend/app/client/AuthServiceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
CloseableHttpResponse response = null;

try {

final String protocol = this.metadataServiceUseSsl ? "https" : "http";
final HttpPost request =
new HttpPost(
Expand All @@ -86,6 +85,8 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
this.metadataServicePort,
GENERATE_SESSION_TOKEN_ENDPOINT));

log.info("Requesting session token for user: {}", userId);

// Build JSON request to generate a token on behalf of a user.
final ObjectMapper objectMapper = new ObjectMapper();
final ObjectNode objectNode = objectMapper.createObjectNode();
Expand All @@ -100,7 +101,7 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
response = httpClient.execute(request);
final HttpEntity entity = response.getEntity();
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK && entity != null) {
// Successfully generated a token for the User
log.info("Successfully received session token for user: {}", userId);
final String jsonStr = EntityUtils.toString(entity);
return getAccessTokenFromJson(jsonStr);
} else {
Expand All @@ -110,6 +111,7 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
response.getStatusLine().toString(), response.getEntity().toString()));
}
} catch (Exception e) {
log.error("Failed to generate session token for user: {}", userId, e);
throw new RuntimeException("Failed to generate session token for user", e);
} finally {
try {
Expand Down
12 changes: 7 additions & 5 deletions datahub-frontend/app/controllers/AuthenticationController.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import play.mvc.Results;
import security.AuthenticationManager;

// TODO add logging.
public class AuthenticationController extends Controller {
public static final String AUTH_VERBOSE_LOGGING = "auth.verbose.logging";
private static final String AUTH_REDIRECT_URI_PARAM = "redirect_uri";
Expand Down Expand Up @@ -183,10 +182,12 @@ public Result logIn(Http.Request request) {
boolean loginSucceeded = tryLogin(username, password);

if (!loginSucceeded) {
_logger.info("Login failed for user: {}", username);
return Results.badRequest(invalidCredsJson);
}

final Urn actorUrn = new CorpuserUrn(username);
_logger.info("Login successful for user: {}, urn: {}", username, actorUrn);
final String accessToken = _authClient.generateSessionTokenForUser(actorUrn.getId());
return createSession(actorUrn.toString(), accessToken);
}
Expand Down Expand Up @@ -250,6 +251,7 @@ public Result signUp(Http.Request request) {
final Urn userUrn = new CorpuserUrn(email);
final String userUrnString = userUrn.toString();
_authClient.signUp(userUrnString, fullName, email, title, password, inviteToken);
_logger.info("Signed up user {} using invite tokens", userUrnString);
final String accessToken = _authClient.generateSessionTokenForUser(userUrn.getId());
return createSession(userUrnString, accessToken);
}
Expand Down Expand Up @@ -351,15 +353,15 @@ private boolean tryLogin(String username, String password) {
// First try jaas login, if enabled
if (_jaasConfigs.isJAASEnabled()) {
try {
_logger.debug("Attempting jaas authentication");
_logger.debug("Attempting JAAS authentication for user: {}", username);
AuthenticationManager.authenticateJaasUser(username, password);
_logger.debug("Jaas authentication successful. Login succeeded");
_logger.debug("JAAS authentication successful. Login succeeded");
loginSucceeded = true;
} catch (Exception e) {
if (_verbose) {
_logger.debug("Jaas authentication error. Login failed", e);
_logger.debug("JAAS authentication error. Login failed", e);
} else {
_logger.debug("Jaas authentication error. Login failed");
_logger.debug("JAAS authentication error. Login failed");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
Expand All @@ -15,13 +16,14 @@ public class ReindexDataJobViaNodesCLLConfig {

@Bean
public NonBlockingSystemUpgrade reindexDataJobViaNodesCLL(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled,
@Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize,
@Value("${systemUpdate.dataJobNodeCLL.delayMs}") final Integer delayMs,
@Value("${systemUpdate.dataJobNodeCLL.limit}") final Integer limit) {
return new ReindexDataJobViaNodesCLL(
entityService, aspectDao, enabled, batchSize, delayMs, limit);
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.linkedin.datahub.upgrade.system;

import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

/**
* Generic upgrade step class for generating MCLs for a given aspect in order to update ES documents
*/
@Slf4j
public abstract class AbstractMCLStep implements UpgradeStep {
private final OperationContext opContext;
private final EntityService<?> entityService;
private final AspectDao aspectDao;

private final int batchSize;
private final int batchDelayMs;
private final int limit;

public AbstractMCLStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
this.opContext = opContext;
this.entityService = entityService;
this.aspectDao = aspectDao;
this.batchSize = batchSize;
this.batchDelayMs = batchDelayMs;
this.limit = limit;
}

@Nonnull
protected abstract String getAspectName();

protected Urn getUpgradeIdUrn() {
return BootstrapStep.getUpgradeUrn(id());
}

/** Optionally apply an urn-like sql filter, otherwise all urns */
@Nullable
protected abstract String getUrnLike();

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {

// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit);

if (getUrnLike() != null) {
args = args.urnLike(getUrnLike());
}

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

entityService
.streamRestoreIndices(opContext, args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
context.report().addLine("State updated: " + getUpgradeIdUrn());

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}

@Override
/** Returns whether the upgrade should be skipped. */
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -18,6 +20,7 @@ public class ReindexDataJobViaNodesCLL implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> _steps;

public ReindexDataJobViaNodesCLL(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Expand All @@ -28,7 +31,7 @@ public ReindexDataJobViaNodesCLL(
_steps =
ImmutableList.of(
new ReindexDataJobViaNodesCLLStep(
entityService, aspectDao, batchSize, batchDelayMs, limit));
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
Expand Down
Loading
Loading