Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Apr 22, 2024
2 parents 0da6cee + 3668a56 commit 670967e
Show file tree
Hide file tree
Showing 18 changed files with 176 additions and 48 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==23.2.1.dev5",
"acryl-sqlglot==23.11.2.dev2",
}

classification_lib = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ def extend_field(
if len(terms_to_add) == 0:
terms_to_add = all_terms

new_glossary_terms = []
new_glossary_terms.extend(server_terms)
new_glossary_terms.extend(terms_to_add)

unique_gloseary_terms = []
for term in new_glossary_terms:
if term not in unique_gloseary_terms:
unique_gloseary_terms.append(term)

new_glossary_term = GlossaryTermsClass(
terms=[],
auditStamp=schema_field.glossaryTerms.auditStamp
Expand All @@ -79,11 +88,9 @@ def extend_field(
time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter"
),
)
new_glossary_term.terms.extend(terms_to_add)
new_glossary_term.terms.extend(server_terms)
new_glossary_term.terms.extend(unique_gloseary_terms)

schema_field.glossaryTerms = new_glossary_term

return schema_field

def transform_aspect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
"downstream": {
"table": null,
"column": "max_col",
"column_type": null,
"native_column_type": null
"column_type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"native_column_type": "DECIMAL"
},
"upstreams": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"query_type_props": {
"kind": "TABLE"
},
"query_fingerprint": "2aa655ab211e061dc8c1161e0b2a7073b38636f9ffcc4719d4e70743e3321cb2",
"query_fingerprint": "58a15c69c357905aec390867335699413678368f321bfde6f477c08a973ca3a9",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.customer_last_purchase_date,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.ecommerce.purchases,PROD)"
Expand Down Expand Up @@ -127,6 +127,6 @@
],
"debug_info": {
"confidence": 0.4,
"generalized_statement": "CREATE TABLE active_customer_ltv AS (WITH active_customers AS (SELECT * FROM customer_last_purchase_date WHERE last_purchase_date >= CURRENT_DATE - INTERVAL DAYS), purchases AS (SELECT * FROM ecommerce.purchases) SELECT active_customers.user_fk, active_customers.email, active_customers.last_purchase_date, SUM(purchases.purchase_amount) AS lifetime_purchase_amount, COUNT(DISTINCT (purchases.pk)) AS lifetime_purchase_count, SUM(purchases.purchase_amount) / COUNT(DISTINCT (purchases.pk)) AS average_purchase_amount FROM active_customers JOIN purchases ON active_customers.user_fk = purchases.user_fk GROUP BY ?, ?, ?)"
"generalized_statement": "CREATE TABLE active_customer_ltv AS (WITH active_customers AS (SELECT * FROM customer_last_purchase_date WHERE last_purchase_date >= CURRENT_DATE - INTERVAL '? DAYS'), purchases AS (SELECT * FROM ecommerce.purchases) SELECT active_customers.user_fk, active_customers.email, active_customers.last_purchase_date, SUM(purchases.purchase_amount) AS lifetime_purchase_amount, COUNT(DISTINCT (purchases.pk)) AS lifetime_purchase_count, SUM(purchases.purchase_amount) / COUNT(DISTINCT (purchases.pk)) AS average_purchase_amount FROM active_customers JOIN purchases ON active_customers.user_fk = purchases.user_fk GROUP BY ?, ?, ?)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,19 @@ private boolean isRequestGranted(
return false;
}

final ResolvedEntitySpec resolvedActorSpec =
entitySpecResolver.resolve(
new EntitySpec(actorUrn.get().getEntityType(), request.getActorUrn()));
final PolicyEngine.PolicyEvaluationResult result =
policyEngine.evaluatePolicy(
systemOpContext, policy, resolvedActorSpec, request.getPrivilege(), resourceSpec);
return result.isGranted();
try {
final ResolvedEntitySpec resolvedActorSpec =
entitySpecResolver.resolve(
new EntitySpec(actorUrn.get().getEntityType(), request.getActorUrn()));

final PolicyEngine.PolicyEvaluationResult result =
policyEngine.evaluatePolicy(
systemOpContext, policy, resolvedActorSpec, request.getPrivilege(), resourceSpec);
return result.isGranted();
} catch (RuntimeException e) {
log.error("Error evaluating policy {} for request {}", policy.getDisplayName(), request);
throw e;
}
}

private Optional<Urn> getUrnFromRequestActor(String actor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,22 @@ public FieldResolver getFieldResolver(

private FieldResolver.FieldValue getDataPlatformInstance(
@Nonnull OperationContext opContext, EntitySpec entitySpec) {
Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());
// In the case that the entity is a platform instance, the associated platform instance entity
// is the instance itself
if (entityUrn.getEntityType().equals(DATA_PLATFORM_INSTANCE_ENTITY_NAME)) {
return FieldResolver.FieldValue.builder()
.values(Collections.singleton(entityUrn.toString()))
.build();
}

EnvelopedAspect dataPlatformInstanceAspect;
try {
if (entitySpec.getEntity().isEmpty()) {
return FieldResolver.emptyFieldValue();
}

Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());
// In the case that the entity is a platform instance, the associated platform instance entity
// is the instance itself
if (entityUrn.getEntityType().equals(DATA_PLATFORM_INSTANCE_ENTITY_NAME)) {
return FieldResolver.FieldValue.builder()
.values(Collections.singleton(entityUrn.toString()))
.build();
}

EntityResponse response =
_entityClient.getV2(
opContext,
Expand All @@ -63,7 +68,7 @@ private FieldResolver.FieldValue getDataPlatformInstance(
}
dataPlatformInstanceAspect = response.getAspects().get(DATA_PLATFORM_INSTANCE_ASPECT_NAME);
} catch (Exception e) {
log.error("Error while retrieving platform instance aspect for urn {}", entityUrn, e);
log.error("Error while retrieving platform instance aspect for entitySpec {}", entitySpec, e);
return FieldResolver.emptyFieldValue();
}
DataPlatformInstance dataPlatformInstance =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,22 @@ private Set<Urn> getBatchedParentDomains(

private FieldResolver.FieldValue getDomains(
@Nonnull OperationContext opContext, EntitySpec entitySpec) {
final Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());
// In the case that the entity is a domain, the associated domain is the domain itself
if (entityUrn.getEntityType().equals(DOMAIN_ENTITY_NAME)) {
return FieldResolver.FieldValue.builder()
.values(Collections.singleton(entityUrn.toString()))
.build();
}

final EnvelopedAspect domainsAspect;
try {
if (entitySpec.getEntity().isEmpty()) {
return FieldResolver.emptyFieldValue();
}

final Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());

// In the case that the entity is a domain, the associated domain is the domain itself
if (entityUrn.getEntityType().equals(DOMAIN_ENTITY_NAME)) {
return FieldResolver.FieldValue.builder()
.values(Collections.singleton(entityUrn.toString()))
.build();
}

EntityResponse response =
_entityClient.getV2(
opContext,
Expand All @@ -103,7 +109,7 @@ private FieldResolver.FieldValue getDomains(
}
domainsAspect = response.getAspects().get(DOMAINS_ASPECT_NAME);
} catch (Exception e) {
log.error("Error while retrieving domains aspect for urn {}", entityUrn, e);
log.error("Error while retrieving domains aspect for entitySpec {}", entitySpec, e);
return FieldResolver.emptyFieldValue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import com.datahub.authorization.FieldResolver;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.annotation.Nonnull;

/** Provides field resolver for entity urn given entitySpec */
Expand All @@ -20,6 +20,13 @@ public List<EntityFieldType> getFieldTypes() {
@Override
public FieldResolver getFieldResolver(
@Nonnull OperationContext opContext, EntitySpec entitySpec) {
return FieldResolver.getResolverFromValues(Collections.singleton(entitySpec.getEntity()));
return FieldResolver.getResolverFromFunction(entitySpec, this::getUrn);
}

private FieldResolver.FieldValue getUrn(EntitySpec entitySpec) {
if (entitySpec.getEntity().isEmpty()) {
return FieldResolver.emptyFieldValue();
}
return FieldResolver.FieldValue.builder().values(Set.of(entitySpec.getEntity())).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ public FieldResolver getFieldResolver(

private FieldResolver.FieldValue getGroupMembership(
@Nonnull OperationContext opContext, EntitySpec entitySpec) {
Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());

EnvelopedAspect groupMembershipAspect;
EnvelopedAspect nativeGroupMembershipAspect;
List<Urn> groups = new ArrayList<>();
try {
if (entitySpec.getEntity().isEmpty()) {
return FieldResolver.emptyFieldValue();
}

Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());

EntityResponse response =
_entityClient.getV2(
opContext,
Expand Down Expand Up @@ -77,7 +83,7 @@ private FieldResolver.FieldValue getGroupMembership(
groups.addAll(nativeGroupMembership.getNativeGroups());
}
} catch (Exception e) {
log.error("Error while retrieving group membership aspect for urn {}", entityUrn, e);
log.error("Error while retrieving group membership aspect for entitySpec {}", entitySpec, e);
return FieldResolver.emptyFieldValue();
}
return FieldResolver.FieldValue.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ public FieldResolver getFieldResolver(

private FieldResolver.FieldValue getOwners(
@Nonnull OperationContext opContext, EntitySpec entitySpec) {
Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());

EnvelopedAspect ownershipAspect;
try {
if (entitySpec.getEntity().isEmpty()) {
return FieldResolver.emptyFieldValue();
}
Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());

EntityResponse response =
_entityClient.getV2(
opContext,
Expand All @@ -52,7 +57,7 @@ private FieldResolver.FieldValue getOwners(
}
ownershipAspect = response.getAspects().get(Constants.OWNERSHIP_ASPECT_NAME);
} catch (Exception e) {
log.error("Error while retrieving domains aspect for urn {}", entityUrn, e);
log.error("Error while retrieving ownership aspect for entitySpec {}", entitySpec, e);
return FieldResolver.emptyFieldValue();
}
Ownership ownership = new Ownership(ownershipAspect.getValue().data());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ public FieldResolver getFieldResolver(

private FieldResolver.FieldValue getTags(
@Nonnull OperationContext opContext, EntitySpec entitySpec) {
Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());

EnvelopedAspect globalTagsAspect;
try {
if (entitySpec.getEntity().isEmpty()) {
return FieldResolver.emptyFieldValue();
}

Urn entityUrn = UrnUtils.getUrn(entitySpec.getEntity());

EntityResponse response =
_entityClient.getV2(
opContext,
Expand All @@ -53,7 +59,7 @@ private FieldResolver.FieldValue getTags(
}
globalTagsAspect = response.getAspects().get(Constants.GLOBAL_TAGS_ASPECT_NAME);
} catch (Exception e) {
log.error("Error while retrieving tags aspect for urn {}", entityUrn, e);
log.error("Error while retrieving tags aspect for entitySpec {}", entitySpec, e);
return FieldResolver.emptyFieldValue();
}
GlobalTags globalTags = new GlobalTags(globalTagsAspect.getValue().data());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class DataPlatformInstanceFieldResolverProviderTest {
public class DataPlatformInstanceFieldResolverProviderTest
extends EntityFieldResolverProviderBaseTest<DataPlatformInstanceFieldResolverProvider> {

private static final String DATA_PLATFORM_INSTANCE_URN =
"urn:li:dataPlatformInstance:(urn:li:dataPlatform:s3,test-platform-instance)";
Expand All @@ -50,11 +51,15 @@ public class DataPlatformInstanceFieldResolverProviderTest {
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
dataPlatformInstanceFieldResolverProvider =
new DataPlatformInstanceFieldResolverProvider(entityClientMock);
dataPlatformInstanceFieldResolverProvider = buildFieldResolverProvider();
systemOperationContext = TestOperationContexts.systemContextNoSearchAuthorization();
}

@Override
protected DataPlatformInstanceFieldResolverProvider buildFieldResolverProvider() {
return new DataPlatformInstanceFieldResolverProvider(entityClientMock);
}

@Test
public void shouldReturnDataPlatformInstanceType() {
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.datahub.authorization.fieldresolverprovider;

import static org.mockito.Mockito.mock;

import com.linkedin.entity.client.SystemEntityClient;

public class DomainFieldResolverProviderTest
extends EntityFieldResolverProviderBaseTest<DomainFieldResolverProvider> {
@Override
protected DomainFieldResolverProvider buildFieldResolverProvider() {
return new DomainFieldResolverProvider(mock(SystemEntityClient.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.datahub.authorization.fieldresolverprovider;

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;

import com.datahub.authorization.EntitySpec;
import com.datahub.authorization.FieldResolver;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.testng.annotations.Test;

public abstract class EntityFieldResolverProviderBaseTest<T extends EntityFieldResolverProvider> {
protected abstract T buildFieldResolverProvider();

@Test
public void testEmpty() throws ExecutionException, InterruptedException {
assertEquals(
buildFieldResolverProvider()
.getFieldResolver(mock(OperationContext.class), new EntitySpec("dataset", ""))
.getFieldValuesFuture()
.get(),
FieldResolver.getResolverFromValues(Collections.emptySet()).getFieldValuesFuture().get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.datahub.authorization.fieldresolverprovider;

public class EntityUrnFieldResolverProviderTest
extends EntityFieldResolverProviderBaseTest<EntityUrnFieldResolverProvider> {
@Override
protected EntityUrnFieldResolverProvider buildFieldResolverProvider() {
return new EntityUrnFieldResolverProvider();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class GroupMembershipFieldResolverProviderTest {
public class GroupMembershipFieldResolverProviderTest
extends EntityFieldResolverProviderBaseTest<GroupMembershipFieldResolverProvider> {

private static final String CORPGROUP_URN = "urn:li:corpGroup:groupname";
private static final String NATIVE_CORPGROUP_URN = "urn:li:corpGroup:nativegroupname";
Expand All @@ -47,11 +48,15 @@ public class GroupMembershipFieldResolverProviderTest {
@BeforeMethod
public void setup() {
MockitoAnnotations.initMocks(this);
groupMembershipFieldResolverProvider =
new GroupMembershipFieldResolverProvider(entityClientMock);
groupMembershipFieldResolverProvider = buildFieldResolverProvider();
systemOperationContext = TestOperationContexts.systemContextNoSearchAuthorization();
}

@Override
protected GroupMembershipFieldResolverProvider buildFieldResolverProvider() {
return new GroupMembershipFieldResolverProvider(entityClientMock);
}

@Test
public void shouldReturnGroupsMembershipType() {
assertEquals(
Expand Down
Loading

0 comments on commit 670967e

Please sign in to comment.