From dc87b51369030ace07f48d857dc02c1c64bc911d Mon Sep 17 00:00:00 2001 From: k-bartlett Date: Mon, 2 Dec 2024 04:23:28 -0500 Subject: [PATCH] feat(ingest): connector for Neo4j (#11526) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: kbartlett Co-authored-by: Andrew Sikowitz Co-authored-by: Jay Feldman <8128360+feldjay@users.noreply.github.com> Co-authored-by: Harshal Sheth Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Co-authored-by: Shirshanka Das Co-authored-by: deepgarg-visa <149145061+deepgarg-visa@users.noreply.github.com> Co-authored-by: Felix Lüdin <13187726+Masterchen09@users.noreply.github.com> --- .../app/ingest/source/builder/constants.ts | 4 + .../app/ingest/source/builder/sources.json | 8 + datahub-web-react/src/images/neo4j.png | Bin 0 -> 12968 bytes .../docs/sources/neo4j/neo4j.md | 20 ++ .../docs/sources/neo4j/neo4j_recipe.yml | 12 + metadata-ingestion/setup.py | 3 + .../ingestion/source/common/subtypes.py | 2 + .../ingestion/source/neo4j/__init__.py | 0 .../ingestion/source/neo4j/neo4j_source.py | 331 ++++++++++++++++++ .../tests/unit/test_neo4j_source.py | 221 ++++++++++++ .../bootstrap_mcps/data-platforms.yaml | 11 + 11 files changed, 612 insertions(+) create mode 100644 datahub-web-react/src/images/neo4j.png create mode 100644 metadata-ingestion/docs/sources/neo4j/neo4j.md create mode 100644 metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml create mode 100644 metadata-ingestion/src/datahub/ingestion/source/neo4j/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py create mode 100644 metadata-ingestion/tests/unit/test_neo4j_source.py diff --git a/datahub-web-react/src/app/ingest/source/builder/constants.ts b/datahub-web-react/src/app/ingest/source/builder/constants.ts index f892f0ed525d25..58525b3e88f975 100644 --- a/datahub-web-react/src/app/ingest/source/builder/constants.ts +++ b/datahub-web-react/src/app/ingest/source/builder/constants.ts @@ -38,6 +38,7 @@ import sigmaLogo from '../../../../images/sigmalogo.png'; import sacLogo from '../../../../images/saclogo.svg'; import cassandraLogo from '../../../../images/cassandralogo.png'; import datahubLogo from '../../../../images/datahublogo.png'; +import neo4j from '../../../../images/neo4j.png'; export const ATHENA = 'athena'; export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`; @@ -137,6 +138,8 @@ export const DATAHUB_GC = 'datahub-gc'; export const DATAHUB_LINEAGE_FILE = 'datahub-lineage-file'; export const DATAHUB_BUSINESS_GLOSSARY = 'datahub-business-glossary'; export const DATAHUB_URN = `urn:li:dataPlatform:${DATAHUB}`; +export const NEO4J = 'neo4j'; +export const NEO4J_URN = `urn:li:dataPlatform:${NEO4J}`; export const PLATFORM_URN_TO_LOGO = { [ATHENA_URN]: athenaLogo, @@ -180,6 +183,7 @@ export const PLATFORM_URN_TO_LOGO = { [SAC_URN]: sacLogo, [CASSANDRA_URN]: cassandraLogo, [DATAHUB_URN]: datahubLogo, + [NEO4J_URN]: neo4j, }; export const SOURCE_TO_PLATFORM_URN = { diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 44b8a37f14655d..776b6703895c35 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -325,5 +325,13 @@ "description": "Ingest databases and tables from any Iceberg catalog implementation", "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/iceberg", "recipe": "source:\n type: \"iceberg\"\n config:\n env: dev\n # each thread will open internet connections to fetch manifest files independently, \n # this value needs to be adjusted with ulimit\n processing_threads: 1 \n # a single catalog definition with a form of a dictionary\n catalog: \n demo: # name of the catalog\n type: \"rest\" # other types are available\n uri: \"uri\"\n s3.access-key-id: \"access-key\"\n s3.secret-access-key: \"secret-access-key\"\n s3.region: \"aws-region\"\n profiling:\n enabled: false\n" + }, + { + "urn": "urn:li:dataPlatform:neo4j", + "name": "neo4j", + "displayName": "Neo4j", + "description": "Import Nodes and Relationships from Neo4j.", + "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/neo4j/", + "recipe": "source:\n type: 'neo4j'\n config:\n uri: 'neo4j+ssc://host:7687'\n username: 'neo4j'\n password: 'password'\n env: 'PROD'\n\nsink:\n type: \"datahub-rest\"\n config:\n server: 'http://localhost:8080'" } ] diff --git a/datahub-web-react/src/images/neo4j.png b/datahub-web-react/src/images/neo4j.png new file mode 100644 index 0000000000000000000000000000000000000000..b03b2a4532b3ba0329fdaccbb60a3a57b111ed4f GIT binary patch literal 12968 zcmW+-1y~#177XqV1&TYxrMP>s;!xb7xNC5Cid%7aEACzhUfiWXad&(Bzx@(6Azyay z-h0lRnK_$C<*zbm$RCkGAP}1D7fDrM{{%eM5TSv;eA%%Izz)hqRYn|Yd6dZ?cmr=D zFCz);f$dSE1NaXzt(Am?vX!YF2;})QJJmy7eepy1RF{W${sfyVfp@JcJmw)uk(LU9 z1w6qRXaIEyg^~eNMY8Bq5G))j0h*MF6fO#s3KUp0C;&TH#kR~$dO^;|%jng&L}%*K}mZl5v(uOi3=kBbcqu89t8Ju9{RFa?+@p~MO&mS zbUUehuD!htBoFN5tjY%+=tQ2B(*L*du;$Q;P+{XE=)?JoHpkA&tdvwUcS}<kpE#+%N z_OK)T`LdH536{{kq=b);k5&&IBB?jMc`HKl7S`>V8~M<>36b#FrDh8)hi-4RuezCY zrO3@m=`(ZsH|$Tk<4^ps=tThMg3arLu7r!llF7lssxoI!n#H(Pa{W(l!{2&G1ob)T z!+{R9+$)`EEEkk2jC#P!8iFz|rKkydlzZOh^L{u6LSO-xxSg}pnFz|$z1p_PB~^6R z1K%VUZ}KGeRU`iWU@na)4D=6J^*oSzAni2Fa4+-{2)qTm7Knie$|pyzf@b;wI}0NF z8*poc@*5%4h-V3bK@6=7AvO>0Cx~S0b1R4?_;?GR0Q$agG%jAi+Pl4DkZWaOk zN!0oWw;7_gDC&2@zvz!*eag7F2sGk?Ul6Oqo5e$wk<^jq#bc*ira%3IcE$S`awcv% z{rQIRhE&=JTOm}#h-3-ZqIa^6l^D4;1ZP{plldpYc@Siq|AAl)-7uhbK;nY_1vN(k zBrf?qg5M;D4O5F66e*33DcpedNp$Q7e-YfFxOal?Z+ed4P6=b+-#*73)n`}-O?$?p%fVR1uZZ+<1z+J>XE-mRKakAAcM$tk4wlW>?eFDiYKsl z%yAKF1KL}Hjx)TZEcjYsHvQQ{>w}6r`a>Z#H zUQBVM(GcbkXK|Em>}{xRnuITNP$S;yE=I71}a_3YPh-OaB39FwLDLvHAVmlK%lLTeDMY}cpzPxoo>-XtXNEyeA zhxbcJB^Lym{4j~z5gU?^;*?XT0n?p8r1YStdJe_MP^7s7YcXWTphH09F*q(9S5%oX=Y4Nn^if zpW-^_c%Sd@4Ki0TcLF1MMUEsY%JZjn)_wl<_Z#W2PE)D+tCkh}dAr-0mElZ>brT`G zLUSBTH9OA!JPv3=sTjuQ@4EP_ip`~Vo6Fj(NGD&HUQHZqb4@nQM4Hq+K3Q75&N?BW zA(Ti|5B%eJYG^eNPmSfT5a1v=CMFZG;;+yC_>(sKHajUho=?Ps-DSJuS6io3lgs*+ z_Xe8NwJW3Ju}7K{hD)uh|KI%Wgnu(9?8jDnZIjoX_$!b_oA||umM9)b#|(I3WY{uT zDV}wdXRu}VzCJgQY_k?Zc6WcddHinFcIUit6$irHBB|rqGS0I%ve>#8OV8`gtH`74 z+wVIL220RW)Ke%@)Jo`!M@yWCABG|gv=7h>nD!G5Y)ZCDB}>LgXG+^gbdnYm48X0@JLQoI~vwI3`xdlDEN(E$gQVTY8mJkw=%fRXM7v|W^ppg>#8TIr?!S*0cwon z!#^i6BoTo)o1a<=nF`d)FaIG%_=;r7(MOR_ZOkx3;pDL6nrN!3J&U@SqC4uFaFU{z z^g>HRzV~HJ+*;Dw<|7L;%PFo>SX%$Ftd^o-ws5*}dO|LHrd3)St4r!R#Nl^PgQkTW za#=xcxp01tW0oTmSleCWZuRHI&!oy?t!vHuYW8o9E)#!s;+qU>C8|xUj}2h_O@ zrP&m2F3rygfeuIe)3&y2!UIbdY6F7vxYKTb_N$%KoV=Vj`R=qDy}MSQ`o*^UHN0@f z(%Kb$A-7nk>fQF++dJXP?q;$ozwgH_XAWd_g#S8?!G$)Chg-{8aXhSR&K+)#&G)Cr z5|D8C@PV-Nn8BEs4dcy>_AKxFp{^6~^*xV>s0h6${cYVz!c}2A*{#1VNfW6M=RWts zJ@4PkSEq%C#@q(KC$@z9ke4D`Rl5eKF_WX4Un4iMrHnaW1tFf?B1!L!H&gRdw>kdz z(SKNd8xtCFSRYtF_%VBzUtIlWglMh11iHiF#t6gYrpU!y1cyygpZ75A1`1JbG zzn8t&A19rcBA{5Ip!7i9hg?6?uS__DK=h=tl49zfE2q9*KDa~8@8xGXquq_L*r}N0 z4fOPqEU=W%lo0_}(e%y<6C)a41wmSHDd8wl5yE7iT})bVF#?WKtn_1)-w`oUqp6u> z1FERasFKVq+AID1e=5vmoE=gp4Q*LYhGXOwsp-2_9O;&R&2~BZY#b99F6fGbZ5o3q zPG}kv8I;GIN3V{FNC7Jk9dILaeL{iJCc*WsQ~?ta)A_`9cry4?IpMPT{DQJ!E|d_z zQk#v@=D#nXyci_(L7aHk`)NfP>#8)p7AAv^+uikW?ItuyC`71X(R>S$T$$21&+ALV z4U0c$gW#>R{Hz3ebC9y00lV?#ktyA5$TYbac`pGBIY>03Kv`3d`=};RVAFGU6(Wg< z1qvDFKBmBEVq&K@3hC|S;U;@#USDHLW-s|2B~GYOCZ0jb2r{xKyW0&kReC$8D;32~T zfgZl!Qvig%X3lAt01>w8t{RNa3EB_r2)CEUmNH38!NIQykS#!`DojPBh~Z$9mHER8 zWg0~8iJCzA1oL}TbO*V%os|P*q^zRtwyp|S0CHQ=fmPL0DOYBQEC2>%O`$5$x&I4E z^b50%=Xxt$2%9wGUM5cJNM;ITedmzssnzr`98ZiC~4&{1qK&0pWZ7j z%~QMEv4QXHd}pVi)*krw=yr^;Rw0W!R&}iJP|lVyIV$K!Vh-0;{o+`SRwnc-eQ^v2 z2m~F1DTaAcyhz3Wi|^tMWu3oy;CA8Tw!wkH9fEV{T&9-|&52VLOu&Ri7N_M`6B9~M zM8R+3!M35>fTQQeDd7(5c?OnUGzOVLGNCN?`xLa&NsEAGSXv}RsIgv_V0}`b5$3Ek z5pVf4^wz&`59t|U%^M~-<>uo{-qZbvHhU*dshCZ`g|D``lv2xjes$9zy{x7mv~^W{#iM%nN{1OwK=(sR0u!ulWmSVvX$ZOcf^;C zGVgp-hk0Mq7!|p;I(&8ZiHXnt*2Bsi#vLpB^u1B0Ni153)kQq(luhPOR5t}pjE?eoUHx_Fc}>m9$w^95)6>&a zQK5h|kRxtvY>0`8ovyaTM@1b0vHJe@3cf$Hj@R-Y0R6SK6ml$eM;Euin1U#XAOD!5Rs zd*9OYs+el7g1c@pe^|GQ?dZ|a2`y@Ge|859*J0D|=HJZK)zy%D+OLLy`_o*ohs(s9;aiL}gVM6fP>d3h-lM~npv zWm)k?9YtCESy3rv9ckl4M zJ({;$`L)f)!4^0lUQZzrMl9(0gD*r+f?{tlXWrWPv}sXXTs+fyEew^=d8-$yWtlU_ z=C2ndmAU=fNO;JP^X})kp7b9-wkjOu_BY%wdtobn3 z0A!!jadiuSppuZG=}4RgE*u@oyJ4dRTI-K9w4e^}2p!zB&iDUEi~V{#MUkE&=&X4~ zR?66_&SN+FYXna^caw9Q*R@5}a&LnT5c|mWJwVVxCMRp?isnvyFZvLylRx8teV=Yd z$H#4{cjj%UGPzLnDo?PmumBfEFS1|j4b^d^$-e%ZiK!`CT@4onO3>EcUNKiY(Kx^6 z+jcnFN2M&CwwJ$2N}{_#>=vk?&d$zhY#mEkx+3d8=kvjM+n)Eg=Pey`d|2Q>ep{iB$X?H@ie#X1VhdXp+%y(_AevNe zk`5;cGs_AJ3uR&m*uPKbOxk=Noq^Ts3r6sN+{4eBDDjh0{Ab|x zayf#4_7N8cr|>w+6dk9QTDIrq3PKex7*tqTt@zq0pT+{@smF%b0dvdRY>mDED=RCg ztfFFUYD!67US3X)A3%w(U(tC_@~fIP@b#I?%gZr@Zl-xQeFpRp1k0D7FGt7@rgCd_ z^KroX9qt;9mfTr{rWzU=ZEbCqQ#nFD4;R}5;V395`msa}X*oHu)Kt}$9j*&Q$579%d{Xi@V zT@Ku%>hLQbkx&p=KOG!!Tx4SxNVEFUeueFj0xF@Hj5pim&s$Kz+g+V5~ zcId{HkLi0kgg+mosEgN-mX)=dFM+UY58eB4x$AMZ=JopIW_nj+(A91{5cc-^yg0gn zqa@gOl#gZ|%GjW@L3o22n06)B%m{(FB_ZN<%&W}r3sa78Hto!0)C;LemVc|$ZLtRc`rw?B z=xfYnc;`1DGR(}1#3l;pY3b-VQ?xbHx}MKErqa94);rFZ>I=nAmTj9oHhp=>$$LHV zPL~^H(u8)~hVyf{?J`+bm-GPM{9k09?ni4Shq4 z%_58G9j#)fy1M#Yp3^RTB@W%!Jm2y8>D#k)A{d7ws;zm3=PZYAtIkOPmPS60REQw0 z)oXn>U(EIt!vyegh@erY`j$P_#ofuZ#;_-@qw!3KGK89jrhX;W^QWhj%FfOX>OOcZ z`P16vAeNe?e}W^IriKOpT$6c`wLNc{^BX>M$B%cb4hV}+AZbMIWT6+bWgv~u{*=ax z;Q%ou{O2ULreY{^qs1Y&@(N{7~O91n)W$Hx_7WM{LlEOWfV*a0?i#AkWb zy$#A1^qK|0Fa_WAa%ZTjxf0L+?d|y-Er8dk7laD!GC*CL^ENv>+wJ>oP1J7~j!L-m z>(TKJU>U}}26}ow=dGpmi^Wd(kfKrWS$^5AYT8`)Y5te28XD$;ta|39$@_bI`7oRF zdI~v$8Ew4SFN9lhB+myT??@~!W(f%i=H}*Q-?VK))KLtP$7`D-MKEnvFr31h)+JmJLk&uzwJugjmmn8__Zx;=Tsw#nc zf9+M}QlL`rTHNV&y3$lvH!W-KQ4z8}=Q#4%%l5exeXF*%^9q^sv+~?BEV`kE1?}*f zwQOP$r{&~@aew{fCb;T|66} zg|bV#h*^SO4UBgHotQ+$8fMh)_Tg*gCWT$I0=0(r=l&sz^sCAbG)-`xp& zoDomfga4GIF5GW`YZ;+>>{hv10x#a(y`NpulUU+(e0mYZVRV~m_Asm?LcbjX4oGdqb;m?JT76WHHFE7pM&myb} ztf{mpE*Yl`Gj-h?OR1<}&>y`_>v@4A#-AkOgnh8LwPljNv_Z)$i5`+vNjB%Bp@IAC zVSTYk$Y}uz7&XJrjF@%`p673h`2Iy!cH0wMYR0;$q@*Nhcqn>f=aX6q+elNS^iCKU zfS&tJf05R2@1vumc^>bGUd^kgz(O-IFw7db4j)Fu#ck9X!_-VIz_=Ebpi=fe{+^uN zFWgnn^Els7oW+zI1n6^UXei!?4;8EZWB{~y6<~{D0#ueoza2L7lcZ<$xB8t4o=sbE zijdeHJE{tggm4F8pTQcvww*yz4l+EvJ%-*o_Cf^EKp2{fTj2sx7Z7Ee-GTSHUa>Akw0GrcnN*J~# z9tp{MzC`%RiEVY=@9}YUgei7l6adXR$@bWo(Wm=;vC?D%6w5&vQysj8KBTkG)v{$6 zC?ilvgy3@`#q&4mdvqFPKEIT5*)N!|cA>w6hfEU8Ge!gfc@YqehLEZ5cyy-&F%T1j z+KE&hTmGXl-tKW;qftmCp+|Rle}4{K2FqHleut*MK9E8aN>ucVe`aStFnu%1YoMd1 zX43gphF5X&liE!LN;F>q8&1X50vCq}b7J1UG#@tL=*Zf!o&{BWgGlao9rs34n69>_ z%tlH|N}x^aA9w>|d%=8>=w01=bY66$4)@c{U?^K5CMMJyi;9MZS(AECFRy&fuw6B6 z?Z>TPjMTI=8&lGHDt2~uGM@`5t>rAoNNx&QWdVm`YUEUgk3ob0l$1@$^rq$&gAUvc-oC`Ez6AEKeEavHq*9i*lvSzGhEJ-$YN z^ZbPP8IzH#NsqUB)d0TXmhyG@o2RFzvvWgyyi#>kbbXmRs0m;D09CsO6TYKJYw+amd^Qgt6(*~PNZ=Oi)^RPZOZ;q$> z{Ric-&wVi9$R$atlsDL~xm~>Hc&Do%S9h5HbP@jooExAtxmj72T1EdhGuva%d zood+AI~17908+BS#Q}Hj>@i5oDemxa{+A<8}0feXZi28DTYf z7C7*F8CS9aAkX*mbtV)Q@K}eGLX3@#g}`^Pu~cf9Aq@>2F^skhV0`XCug>0~?!Xp> zAES^oRxK?p8&fp;Vi*MUpCbOkJVT-|RB@g>v4omwuHK7i*sy_1=52XJ#FBljq+ ziU%zCW_x(^7slzC2DnnPiAM^avuhz2{a}Um?hV%NhoI6F8+&NHtse;LFl?D$c&5Jt zitc4y+Io0Afv>HzleloBNrk_F9{uCz2(U)AW@Y8d1Jdg07uC0dzO_$HNG-H2VytY# zu#^!6pt#xx!kAR*$P}8rO}bp^TWS2khiQw(?Rg^7_05M1sC&fKp$Ey*grWLpfD8O5 z1J)2eBAGkVH^IB0Hjx-SG_rudzia?3*4moEqpv;Iv*bzjzf1z^2gK8i!Y$&)eu zT0Ld$ynx;~v%{_e)K$;h({X#L5-45Ji(?;dC!Y(*pqXJ2j$t$HuV21=>+$pb@+ATs zvt9mQ{m#=q`nDZ~#|Vg;-ku?Hh%;b@B+|&Qk%oWdc`j;w>$oEg^!8McGR)mAKWIJ? z7JNQ4=I*fNUjLF1%;*3>fXn_kkd|prLt|rOMMXtGajaiqUtpGqDgf|x;wPWWGyI?xs~12FWo2idQVTp{4h;<%zfn0xxJ#)(LqlU@V=q)` z@8a?WpZEgOFN?<^hto1{!a}WQm3*U)B5CvnU|$8_7th=*8Xxn+Nq2VCRFdoJ>K+~* z0A9SJQFR*#gOF(!=Zf`m?*G1eIC|Q$+3k9$x@NFu1@E!c5H(hIs5Gz@UaWGxUBhSg zu%`2VmMKq+5sGykV%#V(EbPrdmBnt+w3^NGu@JLi`4q4N=dA&@u9+2UD4ieM9Y3J% z(F=GvNKI3^yvW%C9yL`}W);TT$suT}Xi8OkUaH`mK`f3dJcn^zQQW!HtF zFr}cDP3RrQ_Z#Gng_LUfoc9ZsFGL}8wMvXc8J48eYAx9>Rze z0C$dBum}%o2Gnh%#YF$B{;HgY#(dZP#xuZqvnDKbo|BH+Ps-mzG9yRLbbwL}M40Q| zXLJbg@p;jNnp`-bM!|QsYlOehtdx?_XkY}(7!MA5hkt_LhSTX0qr_!~?jB9%{lgzU zm!lzNKD+%`DR^z2;w)WXD=XmS;D-Mqqf|Fr=uZ;8ZaN6wza3bmIg`kXW5NZhx}V9t ziFll3-HMRK=yg7w72Bk}oYRSr2vEr;(j&|5PU@avxk%nI>$iWVq!br0`<7Q%=LndC z0R0-E!rGMRZHh#Lkf$dmCCS8oOi4`*^E9b|VFawzJ@8qZB_QRajs*T~O(|u_!7kN-Fw;(CwVspcE6R*md$J9{{OE$x(5* zw|d21=Uw+<=p@!Ht6AKfo)EQI#c*D6B$j)lp9kJ+|ZtEDXWj$*3#%7{6O~e>0V)ds)GIjpYalGc@0g2na7{dHrj9jRGrfK zrWe;{;`L|HE)x}V1-3ng&;QTxw2fpC)w&SN^gFTv!B2>x8Jw1`*C{fKra ze$Qy<7-gq9F9EumnsN}W zo}VPBcia|t#PXT4TruNN%-}EwC|vrRUK-G|@qgG3r_1pS^5>BCfHIZH;@)0mI+?IDMMBdu{UcJW` zHm^cVqmNow+riAWTokamg@idt;-5@oK;`x0oS!L>LVm2v(v`lx>I+Z|ZWo)xsGJ2l zGkpU6P)O`r!(MONorz_Idweu8KSO#5lk;dQ$4wh6l1=h^*hbLGXq|5W$W;%(1OPu9 z1p{M5feY}u0ZEg_YS`_0Nm-KQ?k%ObGzgf*ZyVrq<8kuVGp@USlX}2o+X0LrH(F!_ zkcdbRsgSqh9epu?1Awsw7+yWVcQt%}Dr-fU;Mgow^$kL`*MsziO*KA5?*TOB70`OB zYHHV4R~OvztCEO)wG02=G)UaTSR8K;9`d$l*O$|Et|ems;06Q)ifytmS|r_Uq=E6P zSD4@*HzNw?sD{%=sh6YW4E;=oHP+&~ z+rSwi5rkoz0E~TLiOL2r7fTG9_J3q$0Y?22Aoo5e3I%igZY!Xl5_x~AtnIe$#PvGI z9id~rJOJ7P!w;%v+ILr1Iz4``(X8ENjI7PJ%N&kdqFEDRt)OkgSUSJ&*3>IU8W+o=A22s z#b82ufa*tF+?(9@7di@xZNPx`pm}-tcWe;7zuHx!Ji>3T4VamC!?8qSG-Z0#j*~eB z^neOq1GwK@xmrA_P}ITiu!fNo%=KnW-Kc^p0F;*-%wN>HYD(fED0(6Kmb`~x8ugCTTHW*tE8F3~N|Z8z%wsZ&Vmb@PJ2P*f zm~9i)sf2lXJr`^AfqEcm*4n@rjG~*x7#X+1_U1Meh4=on*y9K~#vzM7q4)swl~~wVcQt3Whrx zCH*dGOf|}zkJ!S&hh(Uw+IlAD=y3A0Y)w^TDiJjxqM$ZjD=FOp&BqPi!D$c%R1(b# z#0pTVE-cu;cK&<#*u)Mqc{W8J4~S&_Vpaip9+T;vJ+u=oAhE8uPdu;c@@$vQmF-vAcR$-%+&AzG|C2yjr>y^pJ@Wa54TKCQvZ z*2y}q$LW!X=Bkw+SD%3y^)SPP-RboXgA0goK3JfLZYP z2tXd-GK63PAUHiejZVVv3YcvS49FtM;UOz1audVBk!2^Ptudd9DJUoae=eKfjiLBv zwpczRBLg@B&;exz`iT`4*sv7jIaGTAy@qB-%QCgBH(w_b794Lv4%c%E+Ci-zjj+YC^}r`+*<@vHGh7u# zL@db}`^MnntFAQsySS(=C)l&w9}cYWv=QDqM0wm5t$k2U2fzQnJv?}tT0D5+Cd~a( zRfCYWz)jj%mFocjAED&0;QjM+44u|W5@Jx@Zg0KPJt<#G zhMJB!+6DLh>59{fq5ZS`xxEU>suwsTK+L@9R>5MjROF@oa#bSLrr0PImZ8ZC2k6U1 z&ui)#P#Cs=D7MVuRg7W+tLW#Ox9akm{QZ6I)+N~A4XAj$U-Bn+b%N0&e2Bf8j=teG zwQ1Rrk)wtb#97= zYoW3B!xY$UAZMUl6r_NS2(6sycQ&g9_G5>82n~k|eSGE!=)a0u>&Gr@fDT?-5{{3F zCt9erG<#-~#jLQsllKBRK%+YupWDk?F^7B#eU$zxYHO5?cj3yA51$e~cGOo`wUZ@$M52XaKHadqq~_HV&_8Ktas^{lQ{)Kk{#Iyl(PniAgYK<_$N_ruh5|%%_5pRn>dX7f zyZiDsw`gSXGUT(Cl<7L9FHRVP6EkA<1pEoyhf(A30aIg;BDbnkolJr*@YyF)966VgbRDh@glYvgi z@wYb+rv2UuWVACF>4~BCN{3XtnmslSrKaySMSvrjps6Wxbus+|B%-YngH!0;dzw#6 z(Rr4hp8f!|YO%!uuNP?HhV!7go?9A!#x;0xgU%0`oQYy!L=-0(#_rrYIOaU9f%2K~ zVmr;>r&S+JN=jnCCdBp*EsrK%!w|d>1)A(m*=6qxY{LqP1pkg|9pqEcYMU`Oy%?gE)gb_=ET7Xy+ z8v+YLWE&>^$oUYDta1M%{Ffy0%t} + +Neo4j metadata will be ingested into DataHub using +`CALL apoc.meta.schema() YIELD value UNWIND keys(value) AS key RETURN key, value[key] AS value;` +The data that is returned will be parsed +and will be displayed as Nodes and Relationships in DataHub. Each object will be tagged with describing what kind of DataHub +object it is. The defaults are 'Node' and 'Relationship'. These tag values can be overwritten in the recipe. + + + +## Metadata Ingestion Quickstart + +### Prerequisites + +In order to ingest metadata from Neo4j, you will need: + +* Neo4j instance with APOC installed + diff --git a/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml b/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml new file mode 100644 index 00000000000000..463d65e7ba323b --- /dev/null +++ b/metadata-ingestion/docs/sources/neo4j/neo4j_recipe.yml @@ -0,0 +1,12 @@ +source: + type: 'neo4j' + config: + uri: 'neo4j+ssc://host:7687' + username: 'neo4j' + password: 'password' + env: 'PROD' + +sink: + type: "datahub-rest" + config: + server: 'http://localhost:8080' \ No newline at end of file diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index d7e056b31370df..c6d55fb5bcc56e 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -525,6 +525,7 @@ "qlik-sense": sqlglot_lib | {"requests", "websocket-client"}, "sigma": sqlglot_lib | {"requests"}, "sac": sac, + "neo4j": {"pandas", "neo4j"}, } # This is mainly used to exclude plugins from the Docker image. @@ -673,6 +674,7 @@ "sigma", "sac", "cassandra", + "neo4j", ] if plugin for dependency in plugins[plugin] @@ -792,6 +794,7 @@ "sigma = datahub.ingestion.source.sigma.sigma:SigmaSource", "sac = datahub.ingestion.source.sac.sac:SACSource", "cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource", + "neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource", ], "datahub.ingestion.transformer.plugins": [ "pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership", diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index 9fbb15500a863c..a5eecf198a9b49 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -22,6 +22,8 @@ class DatasetSubTypes(StrEnum): SAC_MODEL = "Model" SAC_IMPORT_DATA_MODEL = "Import Data Model" SAC_LIVE_DATA_MODEL = "Live Data Model" + NEO4J_NODE = "Neo4j Node" + NEO4J_RELATIONSHIP = "Neo4j Relationship" # TODO: Create separate entity... NOTEBOOK = "Notebook" diff --git a/metadata-ingestion/src/datahub/ingestion/source/neo4j/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/neo4j/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py b/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py new file mode 100644 index 00000000000000..2c9107b967e4f8 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/neo4j/neo4j_source.py @@ -0,0 +1,331 @@ +import logging +import time +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, Optional, Type, Union + +import pandas as pd +from neo4j import GraphDatabase +from pydantic.fields import Field + +from datahub.configuration.source_common import EnvConfigMixin +from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.subtypes import DatasetSubTypes +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaFieldDataType +from datahub.metadata.schema_classes import ( + AuditStampClass, + BooleanTypeClass, + DatasetPropertiesClass, + DateTypeClass, + NullTypeClass, + NumberTypeClass, + OtherSchemaClass, + SchemaFieldClass, + SchemaMetadataClass, + StringTypeClass, + SubTypesClass, + UnionTypeClass, +) + +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +_type_mapping: Dict[Union[Type, str], Type] = { + "list": UnionTypeClass, + "boolean": BooleanTypeClass, + "integer": NumberTypeClass, + "local_date_time": DateTypeClass, + "float": NumberTypeClass, + "string": StringTypeClass, + "date": DateTypeClass, + "node": StringTypeClass, + "relationship": StringTypeClass, +} + + +class Neo4jConfig(EnvConfigMixin): + username: str = Field(description="Neo4j Username") + password: str = Field(description="Neo4j Password") + uri: str = Field(description="The URI for the Neo4j server") + env: str = Field(description="Neo4j env") + + +@dataclass +class Neo4jSourceReport(SourceReport): + obj_failures: int = 0 + obj_created: int = 0 + + +@platform_name("Neo4j", id="neo4j") +@config_class(Neo4jConfig) +@support_status(SupportStatus.CERTIFIED) +class Neo4jSource(Source): + NODE = "node" + RELATIONSHIP = "relationship" + PLATFORM = "neo4j" + + def __init__(self, ctx: PipelineContext, config: Neo4jConfig): + self.ctx = ctx + self.config = config + self.report = Neo4jSourceReport() + + @classmethod + def create(cls, config_dict, ctx): + config = Neo4jConfig.parse_obj(config_dict) + return cls(ctx, config) + + def get_field_type(self, attribute_type: Union[type, str]) -> SchemaFieldDataType: + type_class: type = _type_mapping.get(attribute_type, NullTypeClass) + return SchemaFieldDataType(type=type_class()) + + def get_schema_field_class( + self, col_name: str, col_type: str, **kwargs: Any + ) -> SchemaFieldClass: + if kwargs["obj_type"] == self.NODE and col_type == self.RELATIONSHIP: + col_type = self.NODE + else: + col_type = col_type + return SchemaFieldClass( + fieldPath=col_name, + type=self.get_field_type(col_type), + nativeDataType=col_type, + description=col_type.upper() + if col_type in (self.NODE, self.RELATIONSHIP) + else col_type, + lastModified=AuditStampClass( + time=round(time.time() * 1000), actor="urn:li:corpuser:ingestion" + ), + ) + + def add_properties( + self, + dataset: str, + description: Optional[str] = None, + custom_properties: Optional[Dict[str, str]] = None, + ) -> MetadataChangeProposalWrapper: + dataset_properties = DatasetPropertiesClass( + description=description, + customProperties=custom_properties, + ) + return MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + platform=self.PLATFORM, name=dataset, env=self.config.env + ), + aspect=dataset_properties, + ) + + def generate_neo4j_object( + self, dataset: str, columns: list, obj_type: Optional[str] = None + ) -> MetadataChangeProposalWrapper: + try: + fields = [ + self.get_schema_field_class(key, value.lower(), obj_type=obj_type) + for d in columns + for key, value in d.items() + ] + mcp = MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + platform=self.PLATFORM, name=dataset, env=self.config.env + ), + aspect=SchemaMetadataClass( + schemaName=dataset, + platform=make_data_platform_urn(self.PLATFORM), + version=0, + hash="", + platformSchema=OtherSchemaClass(rawSchema=""), + lastModified=AuditStampClass( + time=round(time.time() * 1000), + actor="urn:li:corpuser:ingestion", + ), + fields=fields, + ), + ) + self.report.obj_created += 1 + except Exception as e: + log.error(e) + self.report.obj_failures += 1 + return mcp + + def get_neo4j_metadata(self, query: str) -> pd.DataFrame: + driver = GraphDatabase.driver( + self.config.uri, auth=(self.config.username, self.config.password) + ) + """ + This process retrieves the metadata for Neo4j objects using an APOC query, which returns a dictionary + with two columns: key and value. The key represents the Neo4j object, while the value contains the + corresponding metadata. + + When data is returned from Neo4j, much of the relationship metadata is stored with the relevant node's + metadata. Consequently, the objects are organized into two separate dataframes: one for nodes and one for + relationships. + + In the node dataframe, several fields are extracted and added as new columns. Similarly, in the relationship + dataframe, certain fields are parsed out, while others require metadata from the nodes dataframe. + + Once the data is parsed and these two dataframes are created, we combine a subset of their columns into a + single dataframe, which will be used to create the DataHub objects. + + See the docs for examples of metadata: metadata-ingestion/docs/sources/neo4j/neo4j.md + """ + try: + log.info(f"{query}") + with driver.session() as session: + result = session.run(query) + data = [record for record in result] + log.info("Closing Neo4j driver") + driver.close() + + node_df = self.process_nodes(data) + rel_df = self.process_relationships(data, node_df) + + union_cols = ["key", "obj_type", "property_data_types", "description"] + df = pd.concat([node_df[union_cols], rel_df[union_cols]]) + except Exception as e: + self.report.failure( + message="Failed to get neo4j metadata", + exc=e, + ) + + return df + + def process_nodes(self, data: list) -> pd.DataFrame: + nodes = [record for record in data if record["value"]["type"] == self.NODE] + node_df = pd.DataFrame( + nodes, + columns=["key", "value"], + ) + node_df["obj_type"] = node_df["value"].apply( + lambda record: self.get_obj_type(record) + ) + node_df["relationships"] = node_df["value"].apply( + lambda record: self.get_relationships(record) + ) + node_df["properties"] = node_df["value"].apply( + lambda record: self.get_properties(record) + ) + node_df["property_data_types"] = node_df["properties"].apply( + lambda record: self.get_property_data_types(record) + ) + node_df["description"] = node_df.apply( + lambda record: self.get_node_description(record, node_df), axis=1 + ) + return node_df + + def process_relationships(self, data: list, node_df: pd.DataFrame) -> pd.DataFrame: + rels = [ + record for record in data if record["value"]["type"] == self.RELATIONSHIP + ] + rel_df = pd.DataFrame(rels, columns=["key", "value"]) + rel_df["obj_type"] = rel_df["value"].apply( + lambda record: self.get_obj_type(record) + ) + rel_df["properties"] = rel_df["value"].apply( + lambda record: self.get_properties(record) + ) + rel_df["property_data_types"] = rel_df["properties"].apply( + lambda record: self.get_property_data_types(record) + ) + rel_df["description"] = rel_df.apply( + lambda record: self.get_rel_descriptions(record, node_df), axis=1 + ) + return rel_df + + def get_obj_type(self, record: dict) -> str: + return record["type"] + + def get_rel_descriptions(self, record: dict, df: pd.DataFrame) -> str: + descriptions = [] + for _, row in df.iterrows(): + relationships = row.get("relationships", {}) + for relationship, props in relationships.items(): + if record["key"] == relationship: + if props["direction"] == "in": + for prop in props["labels"]: + descriptions.append( + f"({row['key']})-[{record['key']}]->({prop})" + ) + return "\n".join(descriptions) + + def get_node_description(self, record: dict, df: pd.DataFrame) -> str: + descriptions = [] + for _, row in df.iterrows(): + if record["key"] == row["key"]: + for relationship, props in row["relationships"].items(): + direction = props["direction"] + for node in set(props["labels"]): + if direction == "in": + descriptions.append( + f"({row['key']})<-[{relationship}]-({node})" + ) + elif direction == "out": + descriptions.append( + f"({row['key']})-[{relationship}]->({node})" + ) + + return "\n".join(descriptions) + + def get_property_data_types(self, record: dict) -> List[dict]: + return [{k: v["type"]} for k, v in record.items()] + + def get_properties(self, record: dict) -> str: + return record["properties"] + + def get_relationships(self, record: dict) -> dict: + return record.get("relationships", None) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + df = self.get_neo4j_metadata( + "CALL apoc.meta.schema() YIELD value UNWIND keys(value) AS key RETURN key, value[key] AS value;" + ) + for index, row in df.iterrows(): + try: + yield MetadataWorkUnit( + id=row["key"], + mcp=self.generate_neo4j_object( + columns=row["property_data_types"], + dataset=row["key"], + ), + is_primary_source=True, + ) + + yield MetadataWorkUnit( + id=row["key"], + mcp=MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn( + platform=self.PLATFORM, + name=row["key"], + env=self.config.env, + ), + aspect=SubTypesClass( + typeNames=[ + DatasetSubTypes.NEO4J_NODE + if row["obj_type"] == self.NODE + else DatasetSubTypes.NEO4J_RELATIONSHIP + ] + ), + ), + ) + + yield MetadataWorkUnit( + id=row["key"], + mcp=self.add_properties( + dataset=row["key"], + custom_properties=None, + description=row["description"], + ), + ) + + except Exception as e: + raise e + + def get_report(self): + return self.report diff --git a/metadata-ingestion/tests/unit/test_neo4j_source.py b/metadata-ingestion/tests/unit/test_neo4j_source.py new file mode 100644 index 00000000000000..62586718e86067 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_neo4j_source.py @@ -0,0 +1,221 @@ +import unittest +from pathlib import Path + +import pandas as pd +import pytest + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.neo4j.neo4j_source import Neo4jConfig, Neo4jSource + + +@pytest.fixture +def tracking_uri(tmp_path: Path) -> str: + # return str(tmp_path / "neo4j") + return "neo4j+ssc://host:7687" + + +@pytest.fixture +def source(tracking_uri: str) -> Neo4jSource: + return Neo4jSource( + ctx=PipelineContext(run_id="neo4j-test"), + config=Neo4jConfig( + uri=tracking_uri, env="Prod", username="test", password="test" + ), + ) + + +def data(): + return [ + { + "key": "Node_1", + "value": { + "count": 433026, + "relationships": { + "RELATIONSHIP_1": { + "count": 1, + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + "direction": "in", + "labels": ["Node_2"], + } + }, + "RELATIONSHIP_2": { + "count": 2, + "properties": { + "Relationship2_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + "direction": "in", + "labels": ["Node_3"], + }, + "type": "node", + "properties": { + "Node1_Property1": { + "existence": False, + "type": "DATE", + "indexed": False, + "unique": False, + }, + "Node1_Property2": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + "Node1_Property3": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + }, + "labels": [], + }, + }, + { + "key": "Node_2", + "value": { + "count": 3, + "relationships": { + "RELATIONSHIP_1": { + "count": 1, + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + "direction": "out", + "labels": ["Node_2"], + } + }, + "type": "node", + "properties": { + "Node2_Property1": { + "existence": False, + "type": "DATE", + "indexed": False, + "unique": False, + }, + "Node2_Property2": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + "Node2_Property3": { + "existence": False, + "type": "STRING", + "indexed": False, + "unique": False, + }, + }, + "labels": [], + }, + }, + { + "key": "RELATIONSHIP_1", + "value": { + "count": 4, + "type": "relationship", + "properties": { + "Relationship1_Property1": { + "existence": False, + "type": "STRING", + "indexed": False, + "array": False, + } + }, + }, + }, + ] + + +def test_process_nodes(source): + df = source.process_nodes(data=data()) + assert type(df) is pd.DataFrame + + +def test_process_relationships(source): + df = source.process_relationships( + data=data(), node_df=source.process_nodes(data=data()) + ) + assert type(df) is pd.DataFrame + + +def test_get_obj_type(source): + results = data() + assert source.get_obj_type(results[0]["value"]) == "node" + assert source.get_obj_type(results[1]["value"]) == "node" + assert source.get_obj_type(results[2]["value"]) == "relationship" + + +def test_get_node_description(source): + results = data() + df = source.process_nodes(data=data()) + assert ( + source.get_node_description(results[0], df) + == "(Node_1)<-[RELATIONSHIP_1]-(Node_2)" + ) + assert ( + source.get_node_description(results[1], df) + == "(Node_2)-[RELATIONSHIP_1]->(Node_2)" + ) + + +def test_get_property_data_types(source): + results = data() + assert source.get_property_data_types(results[0]["value"]["properties"]) == [ + {"Node1_Property1": "DATE"}, + {"Node1_Property2": "STRING"}, + {"Node1_Property3": "STRING"}, + ] + assert source.get_property_data_types(results[1]["value"]["properties"]) == [ + {"Node2_Property1": "DATE"}, + {"Node2_Property2": "STRING"}, + {"Node2_Property3": "STRING"}, + ] + assert source.get_property_data_types(results[2]["value"]["properties"]) == [ + {"Relationship1_Property1": "STRING"} + ] + + +def test_get_properties(source): + results = data() + assert list(source.get_properties(results[0]["value"]).keys()) == [ + "Node1_Property1", + "Node1_Property2", + "Node1_Property3", + ] + assert list(source.get_properties(results[1]["value"]).keys()) == [ + "Node2_Property1", + "Node2_Property2", + "Node2_Property3", + ] + assert list(source.get_properties(results[2]["value"]).keys()) == [ + "Relationship1_Property1" + ] + + +def test_get_relationships(source): + results = data() + record = list( + results[0]["value"]["relationships"].keys() + ) # Get the first key from the dict_keys + assert record == ["RELATIONSHIP_1"] + + +if __name__ == "__main__": + unittest.main() diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml index 1625df4a99540d..0b3d815c710980 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml @@ -727,3 +727,14 @@ displayName: Cassandra type: KEY_VALUE_STORE logoUrl: "/assets/platforms/cassandralogo.png" +- entityUrn: urn:li:dataPlatform:neo4j + entityType: dataPlatform + aspectName: dataPlatformInfo + changeType: UPSERT + aspect: + datasetNameDelimiter: "." + name: neo4j + displayName: Neo4j + type: OTHERS + logoUrl: "/assets/platforms/neo4j.png" +