Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Nov 22, 2024
2 parents a9a4beb + 86b8175 commit 2ae9a67
Show file tree
Hide file tree
Showing 34 changed files with 2,331 additions and 859 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/pr-labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ jobs:
"Salman-Apptware",
"mayurinehate",
"noggi",
"skrydal"
"skrydal",
"kevinkarchacryl"
]'),
github.actor
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package com.linkedin.datahub.graphql.analytics.service;

import static com.linkedin.metadata.Constants.CORP_USER_EDITABLE_INFO_ASPECT_NAME;
import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
import static com.linkedin.metadata.Constants.CORP_USER_INFO_ASPECT_NAME;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.dashboard.DashboardInfo;
import com.linkedin.datahub.graphql.generated.BarSegment;
import com.linkedin.datahub.graphql.generated.Cell;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityProfileParams;
import com.linkedin.datahub.graphql.generated.LinkParams;
import com.linkedin.datahub.graphql.generated.NamedBar;
import com.linkedin.datahub.graphql.generated.Row;
import com.linkedin.datahub.graphql.generated.SearchParams;
import com.linkedin.datahub.graphql.generated.*;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.dataplatform.DataPlatformInfo;
import com.linkedin.dataset.DatasetProperties;
Expand All @@ -22,6 +17,7 @@
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.glossary.GlossaryTermInfo;
import com.linkedin.identity.CorpUserEditableInfo;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.key.GlossaryTermKey;
Expand All @@ -35,6 +31,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -169,36 +166,79 @@ public static void convertToUserInfoRows(
final Map<Urn, EntityResponse> gmsResponseByUser =
entityClient.batchGetV2(
opContext,
CORP_USER_INFO_ASPECT_NAME,
CORP_USER_ENTITY_NAME,
userUrns,
ImmutableSet.of(CORP_USER_INFO_ASPECT_NAME));
final Map<Urn, CorpUserInfo> urnToCorpUserInfo =
ImmutableSet.of(CORP_USER_INFO_ASPECT_NAME, CORP_USER_EDITABLE_INFO_ASPECT_NAME));
final Stream<Map.Entry<Urn, EntityResponse>> entityStream =
gmsResponseByUser.entrySet().stream()
.filter(
entry ->
entry.getValue() != null
&& entry.getValue().getAspects().containsKey(CORP_USER_INFO_ASPECT_NAME))
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
&& (entry.getValue().getAspects().containsKey(CORP_USER_INFO_ASPECT_NAME)
|| entry
.getValue()
.getAspects()
.containsKey(CORP_USER_EDITABLE_INFO_ASPECT_NAME)));
final Map<Urn, Pair<CorpUserInfo, CorpUserEditableInfo>> urnToCorpUserInfo =
entityStream.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> {
CorpUserInfo userInfo = null;
CorpUserEditableInfo editableInfo = null;
try {
userInfo =
new CorpUserInfo(
entry
.getValue()
.getAspects()
.get(CORP_USER_INFO_ASPECT_NAME)
.getValue()
.data())));
.data());
} catch (Exception e) {
// nothing to do
}
try {

editableInfo =
new CorpUserEditableInfo(
entry
.getValue()
.getAspects()
.get(CORP_USER_EDITABLE_INFO_ASPECT_NAME)
.getValue()
.data());
} catch (Exception e) {
// nothing to do
}

return Pair.of(userInfo, editableInfo);
}));
// Populate a row with the user link, title, and email.
rows.forEach(
row -> {
Urn urn = UrnUtils.getUrn(row.getCells().get(0).getValue());
EntityResponse response = gmsResponseByUser.get(urn);
String maybeDisplayName = response != null ? getUserName(response).orElse(null) : null;
String maybeEmail =
urnToCorpUserInfo.containsKey(urn) ? urnToCorpUserInfo.get(urn).getEmail() : null;
String maybeTitle =
urnToCorpUserInfo.containsKey(urn) ? urnToCorpUserInfo.get(urn).getTitle() : null;
String maybeEmail = null;
String maybeTitle = null;
if (urnToCorpUserInfo.containsKey(urn)) {
Pair<CorpUserInfo, CorpUserEditableInfo> pair = urnToCorpUserInfo.get(urn);
if (pair.getLeft() != null) {
CorpUserInfo userInfo = pair.getLeft();
maybeEmail = userInfo.getEmail();
maybeTitle = userInfo.getTitle();
}
if (pair.getRight() != null) {
CorpUserEditableInfo userInfo = pair.getRight();
if (maybeEmail == null) {
maybeEmail = userInfo.getEmail();
}
if (maybeTitle == null) {
maybeTitle = userInfo.getTitle();
}
}
}
if (maybeDisplayName != null) {
row.getCells().get(0).setValue(maybeDisplayName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.linkedin.datahub.graphql.utils;

import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.DataMap;
import com.linkedin.datahub.graphql.analytics.service.AnalyticsUtil;
import com.linkedin.datahub.graphql.generated.Cell;
import com.linkedin.datahub.graphql.generated.Row;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.identity.CorpUserEditableInfo;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.Constants;
import io.datahubproject.metadata.context.OperationContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class AnalyticsUtilTest {

@Mock private OperationContext mockOpContext;

@Mock private EntityClient mockEntityClient;

final String TEST_CORP_USER_INFO_TEST_USER = "Corp User";
final String TEST_CORP_USER_EDITABLE_INFO_TEST_TITLE = "Editable Info Title";
final String TEST_CORP_USER_EDITABLE_INFO_TEST_EMAIL = "Editable Info Email";

@BeforeMethod
public void setUp() {
MockitoAnnotations.openMocks(this);
}

@Test
public void testConvertToUserInfoRows() throws Exception {
List<Row> rows = new ArrayList<>();
rows.add(new Row(null, Arrays.asList(new Cell("urn:li:corpuser:testuser", null, null))));

// create a CorpUserInfo with only display name set
CorpUserInfo corpUserInfo = new CorpUserInfo();
corpUserInfo.setActive(true);
corpUserInfo.setDisplayName(TEST_CORP_USER_INFO_TEST_USER);

// create an editableInfo with the email and title set
CorpUserEditableInfo corpUserEditableInfo = new CorpUserEditableInfo();
corpUserEditableInfo.setEmail(TEST_CORP_USER_EDITABLE_INFO_TEST_EMAIL); // Overriding email
corpUserEditableInfo.setTitle(TEST_CORP_USER_EDITABLE_INFO_TEST_TITLE); // Overriding title

DataMap corpUserInfoDataMap = new DataMap();
corpUserInfoDataMap.put("name", Constants.CORP_USER_INFO_ASPECT_NAME);
corpUserInfoDataMap.put("type", "VERSIONED");
corpUserInfoDataMap.put("value", corpUserInfo.data());

DataMap corpUserEditableInfoDataMap = new DataMap();
corpUserEditableInfoDataMap.put("name", Constants.CORP_USER_EDITABLE_INFO_ASPECT_NAME);
corpUserEditableInfoDataMap.put("type", "VERSIONED");
corpUserEditableInfoDataMap.put("value", corpUserEditableInfo.data());

EnvelopedAspect corpUserInfoEnvelopedAspect = new EnvelopedAspect(corpUserInfoDataMap);
EnvelopedAspect corpUserEditableInfoEnvelopedAspect =
new EnvelopedAspect(corpUserEditableInfoDataMap);

EnvelopedAspectMap aspectMap = new EnvelopedAspectMap();
aspectMap.put(Constants.CORP_USER_INFO_ASPECT_NAME, corpUserInfoEnvelopedAspect);
aspectMap.put(
Constants.CORP_USER_EDITABLE_INFO_ASPECT_NAME, corpUserEditableInfoEnvelopedAspect);

EntityResponse entityResponse = new EntityResponse();
entityResponse.setAspects(aspectMap);

Map<Urn, EntityResponse> entityResponseMap = new HashMap<>();
Urn userUrn = UrnUtils.getUrn("urn:li:corpuser:testuser");
entityResponseMap.put(userUrn, entityResponse);

// method of the entity client we need to mock to retrieve the response map
when(mockEntityClient.batchGetV2(
eq(mockOpContext), eq(Constants.CORP_USER_ENTITY_NAME), anySet(), anySet()))
.thenReturn(entityResponseMap);

// function we are testing
AnalyticsUtil.convertToUserInfoRows(mockOpContext, mockEntityClient, rows);

Row updatedRow = rows.get(0);
List<Cell> updatedCells = updatedRow.getCells();

// asserting that the display user is from CorpUserInfo and email, title are from EditableInfo
assertEquals(updatedCells.get(0).getValue(), TEST_CORP_USER_INFO_TEST_USER);
assertEquals(
updatedCells.get(1).getValue(),
TEST_CORP_USER_EDITABLE_INFO_TEST_TITLE); // Overriding title
assertEquals(
updatedCells.get(2).getValue(),
TEST_CORP_USER_EDITABLE_INFO_TEST_EMAIL); // Overriding email
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,26 @@ export const Editor = forwardRef((props: EditorProps, ref) => {
const { manager, state, getContext } = useRemirror({
extensions: () => [
new BlockquoteExtension(),
new BoldExtension(),
new BulletListExtension(),
new BoldExtension({}),
new BulletListExtension({}),
new CodeBlockExtension({ syntaxTheme: 'base16_ateliersulphurpool_light' }),
new CodeExtension(),
new DataHubMentionsExtension(),
new DropCursorExtension(),
new DataHubMentionsExtension({}),
new DropCursorExtension({}),
new HardBreakExtension(),
new HeadingExtension(),
new HistoryExtension(),
new HorizontalRuleExtension(),
new HeadingExtension({}),
new HistoryExtension({}),
new HorizontalRuleExtension({}),
new ImageExtension({ enableResizing: !readOnly }),
new ItalicExtension(),
new LinkExtension({ autoLink: true, defaultTarget: '_blank' }),
new ListItemExtension(),
new ListItemExtension({}),
new MarkdownExtension({ htmlSanitizer: DOMPurify.sanitize, htmlToMarkdown, markdownToHtml }),
new OrderedListExtension(),
new UnderlineExtension(),
new StrikeExtension(),
new TableExtension({ resizable: false }),
...(readOnly ? [] : [new HistoryExtension()]),
...(readOnly ? [] : [new HistoryExtension({})]),
],
content,
stringHandler: 'markdown',
Expand Down
16 changes: 12 additions & 4 deletions docs-website/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,7 @@
"@docusaurus/theme-search-algolia" "2.4.3"
"@docusaurus/types" "2.4.3"

"@docusaurus/[email protected]", "react-loadable@npm:@docusaurus/[email protected]":
"@docusaurus/[email protected]":
version "5.5.2"
resolved "https://registry.yarnpkg.com/@docusaurus/react-loadable/-/react-loadable-5.5.2.tgz#81aae0db81ecafbdaee3651f12804580868fa6ce"
integrity sha512-A3dYjdBGuy0IGT+wyLIGIKLRE+sAk1iNk0f1HjNDysO7u8lhL4N3VEm+FAubmJbAztn94F7MxBTPmnixbiyFdQ==
Expand Down Expand Up @@ -4757,9 +4757,9 @@ cross-fetch@^3.1.5:
node-fetch "^2.6.12"

cross-spawn@^7.0.3:
version "7.0.3"
resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.3.tgz#f73a85b9d5d41d045551c177e2882d4ac85728a6"
integrity sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==
version "7.0.6"
resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.6.tgz#8a58fe78f00dcd70c370451759dfbfaf03e8ee9f"
integrity sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==
dependencies:
path-key "^3.1.0"
shebang-command "^2.0.0"
Expand Down Expand Up @@ -9713,6 +9713,14 @@ react-loadable-ssr-addon-v5-slorber@^1.0.1:
dependencies:
"@babel/runtime" "^7.10.3"

"react-loadable@npm:@docusaurus/[email protected]":
version "5.5.2"
resolved "https://registry.yarnpkg.com/@docusaurus/react-loadable/-/react-loadable-5.5.2.tgz#81aae0db81ecafbdaee3651f12804580868fa6ce"
integrity sha512-A3dYjdBGuy0IGT+wyLIGIKLRE+sAk1iNk0f1HjNDysO7u8lhL4N3VEm+FAubmJbAztn94F7MxBTPmnixbiyFdQ==
dependencies:
"@types/react" "*"
prop-types "^15.6.2"

react-markdown@^8.0.6:
version "8.0.7"
resolved "https://registry.yarnpkg.com/react-markdown/-/react-markdown-8.0.7.tgz#c8dbd1b9ba5f1c5e7e5f2a44de465a3caafdf89b"
Expand Down
22 changes: 22 additions & 0 deletions metadata-ingestion/docs/sources/kafka/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,29 @@ source:
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081
```

### OAuth Callback
The OAuth callback function can be set up using `config.connection.consumer_config.oauth_cb`.

You need to specify a Python function reference in the format &lt;python-module&gt;:&lt;function-name&gt;.

For example, in the configuration `oauth:create_token`, `create_token` is a function defined in `oauth.py`, and `oauth.py` must be accessible in the PYTHONPATH.

```YAML
source:
type: "kafka"
config:
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
# Coordinates
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081
consumer_config:
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "OAUTHBEARER"
oauth_cb: "oauth:create_token"
# sink configs
```

Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,8 @@
"hive = datahub.ingestion.source.sql.hive:HiveSource",
"hive-metastore = datahub.ingestion.source.sql.hive_metastore:HiveMetastoreSource",
"json-schema = datahub.ingestion.source.schema.json_schema:JsonSchemaSource",
"kafka = datahub.ingestion.source.kafka:KafkaSource",
"kafka-connect = datahub.ingestion.source.kafka_connect:KafkaConnectSource",
"kafka = datahub.ingestion.source.kafka.kafka:KafkaSource",
"kafka-connect = datahub.ingestion.source.kafka.kafka_connect:KafkaConnectSource",
"ldap = datahub.ingestion.source.ldap:LDAPSource",
"looker = datahub.ingestion.source.looker.looker_source:LookerDashboardSource",
"lookml = datahub.ingestion.source.looker.lookml_source:LookMLSource",
Expand Down
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/configuration/kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pydantic import Field, validator

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.configuration.kafka_consumer_config import CallableConsumerConfig
from datahub.configuration.validate_host_port import validate_host_port


Expand Down Expand Up @@ -36,6 +37,16 @@ class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
description="Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .",
)

@validator("consumer_config")
@classmethod
def resolve_callback(cls, value: dict) -> dict:
if CallableConsumerConfig.is_callable_config(value):
try:
value = CallableConsumerConfig(value).callable_config()
except Exception as e:
raise ConfigurationError(e)
return value


class KafkaProducerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka producers"""
Expand Down
Loading

0 comments on commit 2ae9a67

Please sign in to comment.