From 837a8f4bbff9e80a58e4d3eef72b763e834f91ca Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 11:36:38 +0100 Subject: [PATCH 01/10] Add logging for solver-rewards dry run --- src/pg_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/pg_client.py b/src/pg_client.py index ce6255e4..be034b95 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -21,6 +21,7 @@ class MultiInstanceDBFetcher: """ def __init__(self, db_urls: list[str]): + log.info("Initializing MultiInstanceDBFetcher") self.connections = [ create_engine(f"postgresql+psycopg2://{url}") for url in db_urls ] @@ -58,10 +59,12 @@ def get_solver_rewards( # Here, we use the convention that we run the prod query for the first connection # and the barn query to all other connections + log.info("Running prod query for first connection (in get_solver_rewards)") results.append( self.exec_query(query=batch_reward_query_prod, engine=self.connections[0]) ) for engine in self.connections[1:]: + log.info("Running barn query on other connections (in get_solver_rewards") results.append( self.exec_query(query=batch_reward_query_barn, engine=engine) ) From eeb09c165adca4e91630acfef392e18f36c7cf6f Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 12:03:22 +0100 Subject: [PATCH 02/10] Add pre-ping --- src/.pg_client.py.swp | Bin 0 -> 20480 bytes src/pg_client.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 src/.pg_client.py.swp diff --git a/src/.pg_client.py.swp b/src/.pg_client.py.swp new file mode 100644 index 0000000000000000000000000000000000000000..b65165ea7f36a314ae75764924df6953057cb543 GIT binary patch literal 20480 zcmeI3Ym6jS6@ZHcm4_$+1w=I0UWIO%>DgToWa1c@eaMCdCcD6@yJKy2*X^EitE+0O zs&{5w7GFpfT$LCT5`Tz6V^s8qnivv!1pX){{?HhWi4ZjqqXta~_>76)xmDFw)3dX? z5F-++vfp%9*L|LQ?yYmqz1!Jv%l7NlRqc%y*EyDT-Rt)D|FH1QQ|`QKoz;n*f&6@; zOVUl=wBQB%XTxy+I!N2m@_6q0zLr2Offp@-VcMH~*V)!PHf`J>Wvp+!MV)u<%`dvRI%6$?S^~8M zY6;X5s3lNKpq4-_fm#ADKMADKsn$nm_PV_37v-O)jD4P#-xp5(#beJ;&F?qmpZ&4t z`EC7COQ4oOErD7BwFGJj)DoyAP)neeKrMk<0<{Ec3A_Xna9qp!_^a{%&!%$r{|o>B zJFm5@Z^LKc({L1)U>CIE3^*Nr{~F7B2p)v5z?b1ZxE*eV%itNxej1*F2jHX71*!A9 z)O`k@f8+Cea1=fO?}r(fg4e^p&a$j0;SsnW?uBdN-Ebj1ai(Q`4Hn@Fc>WB_`Xl@R zj>2xZ1fD*fcET6nlW;q{9iDrYWqk>bz(%+f{`pGM!V#E;3AhYSg>~@gD=h1C(1#ve z2XBVc;aLJE55ZmVJ~#`0PLSk_a0Cv*ZrBdjLK`OG2?8D+3+iZF+YWe;gKRR zV{}F3SFDqvpC)QX?XimATBC3BOFPz)?{syuap;f^T)X3k-Tj9THzw5RZtV<7>cnX! zOXYrLrZCchePF2LR1;RUWIFtp)sunI__)R_mVW3u#S~SejDYJbSIl>y-F8!tX0N&Kky=~O3R6~)ST;+hb`*ApoI_P<(j1`q$nb4@SCBGGyANK8$5bZczOqyl6 zr6zbi1esQ<+*4FF-}VxF7Epq#+XR6$0i>m*u*y?nlyf$D_m0t0u@ zi^E{R04Am8mW`mzk=yQu0mEFhtyLzJD$8~j4Uj3-)=RpZF92p4nCUoaccB;u^ON17 zoifwy5S`u&NBxP&vN8T8>sR&g<2+tOgTmQL7>9CE2O< zAi9LOxph)`6c6RQR%5M{pT=8SDpuZkd5z7F>MLa`KC{WxwaREDS+$LhTn}=q) zt$3%NSRVgB0u}Xqp#n83J!Yhgv~_K3rD~@*-EKH!!mxayp&k<%Glk{QRh{Lk5i=^b zux7?B5}6WmRg~W7>@-VRTE0^&kIkGIdpuI(%nGthk^EU@AC*tUelmB3eYJ9VZB{3> z*l+SeiM8X}n14-}F)Q|jEFG)f0d#gN?~!4OZ82Kdx6I~?bfajUe0C`l+cI0eo4rO9 zCMjkj6QZ$urOWjzQ@M_C! z+cfnWAN9hJaAmpgI(|Mr^4cGo^`BS3K-wJnSc)CQkTspxuo$S)rF@ZZWd%yJy1tVn zYjl;?TgV1kiT%vCrfmlp1^;@o(fy8Wb`nHN2es=YxJ#;Gd(kuoi1^f3|g z6nUvdSqFT_N5X^Uc3}}@1#LCfN?yn)QBlYWEzA-{7-9~B%Ce7qhzYfe`K}Wye~>35 zW#rOR(;?gL4Vf}@vB>TOSb~l*XDkbe%{mDqmYtM0eoyh*N4$((UV$aM8o+ zqvK)|#@Y;?<0m3}@&B*HFTMbuS^WQE|Ni^<_IJY_@Ijb|i{Tmk`N!Z<_$ti7EL;PZ z!X@xF_y<1yv+y9?2_J)-;U?GxZ-wXZ>&5?%;TE_OT5t}mgWuxU{|tT%55Rud0kd!o zTn*>IFKPR|@L5=by)XmPr}Lq{Y6;X5s3lNKpq4-_fm#B!1WqUc@mca9w+Sq}ig0B9 zP5W4sH-v#6IWLkrdq@B#oJ6B9472!6Db!jIaIzw znk=U!K9y-e7D~shG7ZbkcH@4V827D40^*%W=jAHiio4U!b|d7GT)oCF^S@sc z%=aN7TMYA6k=g0JfjTq_6uUUNCft6wy*DTe!>Dgd;FqehKq>y7sewNturdbFaJ_^r zpyl?!S+WVIFM2K;2j%lMQlDh9ywKqwNb)j`&wD&7c+7qUdGEYTVIasP%OyKk1IY)^n4mQ(FWG$`A*ldkUDu(U60>prsX%lzk*&8~!rG)NRx$tIR0 zZFV%ip3!qN3>%hOA{2L+Pgda`ldh z*<(4>2!?}fcP?Q^%52gcZXIX8tSr$)HAmVmT7QgDDwCHJ^nY$^McOV`8TFRC<8Mq@ zlBBqfy%SO0^UR{bqfLk`ywNiLe}{eNZ?Hct{{N2r)$kX5{paB+xDRCSe;aIt7Q7Ko z0onim5!?^=z^CAHXu@gmIDY=;LH7F_@EiR4Ux4`iORyQvg`eQZ-wk)b0f^vy_%;6f zQApuUa31`Qc*Wh2g6!#E2p7Pg*~kAAd<$-aU9c0bgfl_*_@9KY!$H^re`O#4F}Mp- z=)h)pCtMB}!5iRl_VhmiF0|qA$m@sjFc{gL@ReF8s3lNK;KfUzSnBcxeMR)VT-qg7 z`J%1Qy0}Sw)}=n{Qp_Fstk1e2gwiTx9jVW{ry$yTsd0V^^=M05@(B?tfj literal 0 HcmV?d00001 diff --git a/src/pg_client.py b/src/pg_client.py index be034b95..b262bb1e 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -23,7 +23,7 @@ class MultiInstanceDBFetcher: def __init__(self, db_urls: list[str]): log.info("Initializing MultiInstanceDBFetcher") self.connections = [ - create_engine(f"postgresql+psycopg2://{url}") for url in db_urls + create_engine(f"postgresql+psycopg2://{url}", pool_pre_ping=True) for url in db_urls ] @classmethod From 5f669263a32a2e01e75f859d82d5e4fa0a560756 Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 12:21:23 +0100 Subject: [PATCH 03/10] Update connection args --- src/.pg_client.py.swp | Bin 20480 -> 20480 bytes src/pg_client.py | 10 +++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/.pg_client.py.swp b/src/.pg_client.py.swp index b65165ea7f36a314ae75764924df6953057cb543..6dda3d02ac308a66cc1ba4555ddd23a94042d602 100644 GIT binary patch delta 325 zcmZozz}T>WQ6b47%+puFQqPb92m}}yCP>7jzsOj_*lEeckW`eIJ8_}PMwc`Ej7pmY z1wQlhYI8F%oCac$Ql-g)3fW4UybKJ#c^DWj0r65GE&^g}Am#w#H(=G9=PAf?PhO}j zI(dUK8>7NxKV8|$r`!Z4&r+5Fim-?(Wv8YVB<5t6r5490=a=S{C|N0(PCno!1(KCS zmd(s7Ni8Z%%mFGeG=Ql<(Vv-;lM0kIhRgCJ=?7^8=@L*-NY2m8OHD3`Pb^9=wymCg z&`nZIL7^Z&KPSGRC^fzyGcVmXq^LAihhg$&9m~y0x(4Et54lNzJTN)Vj&pK@H_PU| HF3#KlfJ0WQ6b47%+puFQqPb92m}}yyhUQtUuCRe?6hQJNGeLqow!hCqstk7Mybt$ z0-yPLn|K%)n0O&dr6vn1WGjJqn}MR7K>7!e-Uh@yKpX|ckw9$Avw5C^EcfK)em^E_ z>RO5^6y)dU#1|B$#usGfrQ3!Sm8NPcq=BS_u}Dq6t}VMcTUT9t;s%Myh2FxO_qsT9 F0{{s6Gs6G? diff --git a/src/pg_client.py b/src/pg_client.py index b262bb1e..86ed8991 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -23,7 +23,15 @@ class MultiInstanceDBFetcher: def __init__(self, db_urls: list[str]): log.info("Initializing MultiInstanceDBFetcher") self.connections = [ - create_engine(f"postgresql+psycopg2://{url}", pool_pre_ping=True) for url in db_urls + create_engine(f"postgresql+psycopg2://{url}", + pool_pre_ping=True, + connect_args={ + "keepalives": 1, + "keepalives_idle": 30, + "keepalives_interval": 10, + "keepalives_count": 5, + } + ) for url in db_urls ] @classmethod From 4ad543551a9e7324d304fe2386b2ec7e7563087c Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 13:23:54 +0100 Subject: [PATCH 04/10] Check statement timeout --- src/.pg_client.py.swp | Bin 20480 -> 0 bytes src/pg_client.py | 4 ++++ 2 files changed, 4 insertions(+) delete mode 100644 src/.pg_client.py.swp diff --git a/src/.pg_client.py.swp b/src/.pg_client.py.swp deleted file mode 100644 index 6dda3d02ac308a66cc1ba4555ddd23a94042d602..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 20480 zcmeI3ZEPGz8OJwCfwUzMwW1AB&~SnFj@WBEZ4-K6)HpB1!fC8HX^>*G+1s5vFW%eT z%PvtKAywi7Mdd|6;REoc5`9to1q$K=6#^mA3Q#LRP>`w?1QinhXLk0* zcd?!H1EpeC`T1^dXJ(&yW_I>@o_W$5y>;>syU83gaBMYajF$Ur>vOFEt$|mpfi!6kzi*3i{m!vbYGZrtUF^y$j=kdMbVaQJtpTk8 ztpTk8tpTk8tpTk8t%28`29ju#@d=2%QHfqtpBwr-~|Ls9s&1&kATa;FA*g9GPnyYg5zKk90Vp922Udpb06q|39tqH2Emx8z@y-? zJTN2Y3R8#Raq1^A8)qks{I^E&jeao&k9^zV_1fvvJaDa+A36(X&eVojakhGeII%@i zsZu##St*QoV4X_2SgzMX;d0SJLdeLeG)%aq@;uD=lwT`rRj*|=(TRE?daYJF!jn`4 zF|#`z!8>-sU6y5zFj<$Wc3C?VO!kTiJLquKl&!eD%{n|m+Z7?|G56a;tkRDrPDDi+ z^1Q)@KgjmkiG2WxE4*nGVhJe|ZY|Yrx$W{2bj`}{(HR|3e+(hKz~#OA;N`(zsL~mO z{+{osMHrBP3#-k!#gqA3lXbc1aLf1NBpx!(SBHKp>QT81t+bsb^IUM|*a2&C=BAPF zIT#1boJ4p$W?sN@d8l0$giEVS5b$Iv6blu}YjN7GttGPvlh6tMi>l&vZLzH8oYK%5 z_c=WvuZihaPD9ci9p!+V9TBEc3-dgijFl<9vY?#K2K|;Fe%P@JC0Zg}itA;&bUD7<`UG6 zQ#+wcBBk$~^+`!T^-Ee#EY*2>jm;JJD|JH>RNrI!r)MUo4&P!Oo1Wf(M83b2}Sl#knrck253 z`2SHTP0x!}sGQP$MoKYTSGQKGc8cvdVTuO^%NJ&-hX)yE3YJ5cwU(;Y73l4w(l+S*phO?a=3Bz7j}OYFG#ppw0+bX z8@-5XuFp~B`U|f@H(0BEwK!U~=)~iLul!OyQ3z`z65NU~LgPn8%2zh)2oC$l%3d1v zb}b6y1jZ>W+Z|E7?1WKgXLDraG?Wel+sNDPU-xO5D=wDn`()rH9<=>&njME=)_aoy z4AsEl`z8)>Jo;RyHJnUa3pS;fSeCMA)lfGX8Db}}9L_bA*_zLwy{QmbtNmq$@wty zo4B_ho0)mh_Hb*j8-*far9V=ZMH?NM2&#l?HE-*bZ~M@2ciGG>f~=wa(ppiC6ov9e zR?LU2Py|EFhN80Sg%2^rX7If8*iaW#F_es4^i+1pvf3$J9&|CUZU?XgZE4QHL$B2) z>=+}KWy;G3J%8;(y%@)QIx-ioON(s4B@b2~I!-pB;BxS6KPKIi|Gyi4@zwB|$^Xyy z?|%s2{v5ayd=$)qYrymH=br$NgKvOoFah>}>%g_(J>XyP?Oy;7gS)|}z%g(X%z$@; z7vb0c0Fd8*E7%PhU@O=NehZU$SyuOa*W;PapdX2Ce1K3xU$ zp*5g2pf#X1pf#X1pf#X1u&xHkXHh|J8CZ51!jbA5_hC`q5(d0*UZ`|-5dpx2Vs?I< zlwk%`#X0k08PKGfSwM}#R`LQx-N-7=epa_BfvV~zj5SeYx2JCbZD0bm) z%5eJ`b2ca&gHdl%;1{>b0;TZxqypaV4rv*li z)Qj=5<<^%?I~4P@^W%~a6wilCGLVU$La2PNqB=FE;8s%fJ@bx=yA{!LrEgQt3Fx^$ zXz#Y0<>ru_8QIj%ri6NW=6TPuT^I8;E<=~2J?y*HW@mfKkuK< z6b;Ju?Wn6KM|&qTSx=(s$;^LFu-O&kLF&c~x1vofDq3%G_v)#EE0UWtWbEdJz zep%U~jA|CMoo4-cMyV`bKA`_|TPq|x9c9#;&eq z{(lSm&fmuVH2MEi>ecXf`1&t_XTbx2_Wlom{h$Hf4mJSV|NjYi5Znju1vh{?xCA^2 zKmUt>_WNt#ckt_f1<3F3fxX}g@N@X_=fIud6o|l8;J5JK&w>QJ16&FIfOy3@kO11# z-v+J*f5SfhU%_|5?O+BR0lUFvfcE&G0p9|PU?2Dg_VJ$p_kaYnz+P}8xBL$jv34>yqUR8Ts>~b6sA&tWlloLbI@H%T&@;Rkl-+mb=mbD!09> zX}GWDTo=l|%^%82f|tzCYEl%hjAK Date: Fri, 13 Dec 2024 13:25:48 +0100 Subject: [PATCH 05/10] Bug fixes --- src/.pg_client.py.swp | Bin 0 -> 20480 bytes src/pg_client.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 src/.pg_client.py.swp diff --git a/src/.pg_client.py.swp b/src/.pg_client.py.swp new file mode 100644 index 0000000000000000000000000000000000000000..d884e703e5e22048038ab5d6777f8948582a3c64 GIT binary patch literal 20480 zcmeHOTZ|k>740M@CJ@IEgoN_rqOHg?VRv@dPC}Tl0oIS$$c~p?J4WnUy*=GEvz_hf zZdY}CXIPfUPaIwbi60Pr7y=(4Bwm6qBjk@rCL$t1iii+WB>0I!_yEPo2oyNC`Z+VZ zegp(0RLf^)db%FB&b?K2@9lDH^5H{A*n1jN2Clal#`#@~ofntCd&6S~Z!}uM?$Xa! zyQtjwv1KnqAG~+`(A?qSdLO5P6SE?GggddEkfDE?i*dnMZQQaed=hvca1X$M9l#Se8^#xaC4jj43h)Rp z2mB5E{Rwyi2!J_Y3-A{tMJ@qP0lxq~3oHUtz$EZ?-~}XJeg&Kd&H%>%6L=O0m~#Nz zTGc$o{%96Mkw&fFyOJ;#nY!-)&waml=ZRoi7j zxVmX$X29dsP^?&O&Mh7QltbDYP zcXhJiNh=AwQweVtz3yZ<5EC#va8s?4RvcS$7PeU`qEb{VjF+Lf)1C-}E)*zB*~*?E zbL2LhFo2qK+3ICMhO3m5?&y--Z#Q_4I~L>;{hIna-Lhk6IoE^w$!bB)G04#I0zRF7 z47QdD(y*h52kxL2KwiTO+F{L{OM<`)I*giQs$~`i48!AsLG%Epg}gA3tOg=FJhoC2 zGErnjU+qw2(hz5A7?t`9Q_Yhl%Y%Ym^N?Y}ZZYJM9wtwg&2BdK(Q#9o*|=#0ZBNKJ z)dc=xm=aptSiqR~;+i>s@aPF9V@zbS11t8rJWS&I&H9Fd)*CW!{NLU-sc8WuG^Atu zwy2eCl%w9e%$*fdcOfy$B(7+3r!s$7K-DPXklDUJY8Xa7o}KILe-)F z(;x+c;nt4nkx1w}XG65pPbu}K&6G1G(ofXx4akqJo zP=|;sRVw#`l|szrY-KP*;d0T!slN%DyjT#*H9|JzDZir9>a(=ET0hjPnk#UyqyKM0 z$vm%@P-W7zP344|rdfQS<8dOejAB0&yo1%k&9)QlOzH?Gqt(ZDoG`(J!32ecdRUoU z-p2awvR0odgJQxCI=pB$@1oAeA7K0J*gk+$o|WyaV)V*pn^>FU(n+X1R&!$LabdVk znn7i?iggY@!S0oogWwzN7nMC+C`M?96x?<#*lP`#R$I^*Egd798V0)`8)Fh zeKXmvlT$$*SN2cGmt$$^RMf0=lkHbK=SIZ6-|=lJyKt*Qw_JqEm520OSNckXPn(_F zvhYn7TA>1i*SaxyKX_-Za30ydcbZE}835e5drGmkxsDmrDpRL`NEjn^~)EO+t5ryzWBf+f*rapa4Bz&-02PP52u~jOnwaqA$G3E{I z<6V*LJ7Lt>-JFxvHCtT>k~kij(zbY9qP*?M;Nhrsww=4p{e@jJV^qJ%?cf zrCn}sc!JGlt~fiIui$LoOhbwK0Uz@eE~ZQSCid~n(8-_^wUJ^k?UKxqj~Jo_>q|w4 zgGNyZ4Jq3#>uG}|ma^wfh6?EoaHG7O*uuA(Wd&G6EDNq`TZCPPv*8W&?aM~$c(y%s z?RBG2#H`$o%Ccyq1LrX-A>lLPm%Oc0zU_nIZa;rcUQ|}Mc-BZ2pBG7I>Tsd_>42@I zg(A4`Hb_imESAMFHjhP>OCx)b6+_7g^E|C;S?vTXHHM#8w*$lvwsL|I9h&BhEuqcy zq`X=La*qYnL&v$PG8e0Hiz0_r53vF$ph!Y+rC8gSq!)_+pGQpl55%|>|Igrqu8WB8 zDF<*4_!J-k0el!Z0c-^>Ag=!g-~;alZU+92c>V?8ai9kTzyV-CumyM)vHf>|Zv$t6 z`+&QEI;T>hTtrh23Q4_fPKK70CDmP@uDxyfM!55pc&8%yuKJ9zcq`3 zR0i2)NLFUwxDU_ap)e>zJu00(M0zl$jGiy!GG<3r+%r$AbSKqJ(`^)1st1&lBkw2u zIqRm#sO0e}RU|RURZSg9wi+fr0#J^u5aZn_?knH7k^8&&JVNCsiZrr|kV7To*BgsL*%+L+2Bn43RT|yH=cf+nBErfL z#B@E0JXF8YwR;HV{8i6Ia=v`NN$G1WmNlAm1DUm{d`pDTVMGopdPFFQtfWDbjgAUV zzS~~FzcK}07OGJYI(-1HDX*uo?yP|3h+gQ+ri4AS>HWnrWguMgb>(uns`pAWu&H^nnX5xmU+*zT^IH$%e2*G z4~ai>aj};LbGf}bVK;(1-$D@6>c`xs)JmE>q@gZN_Lfpvmr!*njiF*BT_h$_S2A=( zxezK^YjODW%FdMiw9d&DMEvO%dZWz zR0>cE_doaIC6|9Nkwfca9&V1lwlMNa^tthC0LALStC=G6pSD|4lF>Afmnff77p3{G ztrgj~|4!oT(GRwJ) zq#L2IEWcjPp(4u9(lI2~bx))#%m`|4P*?N|U7P3jiuc6jTiVLDH>TqMort9$2Ppo( zBlB-AAinPc$AMQ6*S`#01Wp0Rfvvz_5zk)+o(6sioCPf4ZeS1a4&V=n8Sok4 z1aK$tT;AvF%b(Jt(F|w?Gy|Fe&46Y=GoTsJ4E(DMq%QvG-@>)mM?eS28GY-okAUhU zpz7`UYwHMT{#GC7EjB*-TCW_hqXe-!PK%UteiSr6SgFnk6o&yk*<@>U^m856{|UCv Bo7MmT literal 0 HcmV?d00001 diff --git a/src/pg_client.py b/src/pg_client.py index de471c8f..144f66b4 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -69,7 +69,7 @@ def get_solver_rewards( # Here, we use the convention that we run the prod query for the first connection # and the barn query to all other connections log.info("Checking statement timeout") - _res = self.exec_query(query=text("SHOW statement_timeout;")).scalar() + _res = self.exec_query(query=text("SHOW statement_timeout;"), engine=self.connections[0]).scalar() log.info(f"Statement timeout for database is: {_res}") log.info("Running prod query for first connection (in get_solver_rewards)") results.append( From 463061d44a34d438c2a5ad0f3916c094aa5ad1de Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 14:12:33 +0100 Subject: [PATCH 06/10] Catch error and log --- src/' | 164 ++++++++++++++++++++++++++++++++++++++++++ src/.pg_client.py.swp | Bin 20480 -> 20480 bytes src/pg_client.py | 12 ++-- 3 files changed, 172 insertions(+), 4 deletions(-) create mode 100644 src/' diff --git a/src/' b/src/' new file mode 100644 index 00000000..9be30e6a --- /dev/null +++ b/src/' @@ -0,0 +1,164 @@ +"""Basic client for connecting to postgres database with login credentials""" + +from __future__ import annotations + + +import pandas as pd +from pandas import DataFrame, Series +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine +from sqlalchemy import text + +from src.logger import set_log +from src.utils.query_file import open_query + +log = set_log(__name__) + + +class MultiInstanceDBFetcher: + """ + Allows identical query execution on multiple db instances (merging results). + Currently very specific to the CoW Protocol Orderbook DB. + """ + + def __init__(self, db_urls: list[str]): + log.info("Initializing MultiInstanceDBFetcher") + self.connections = [ + create_engine(f"postgresql+psycopg2://{url}", + pool_pre_ping=True, + connect_args={ + "keepalives": 1, + "keepalives_idle": 30, + "keepalives_interval": 10, + "keepalives_count": 5, + } + ) for url in db_urls + ] + + @classmethod + def exec_query(cls, query: str, engine: Engine) -> DataFrame: + """Executes query on DB engine""" + return pd.read_sql(sql=query, con=engine) + + def get_solver_rewards( + self, + start_block: str, + end_block: str, + reward_cap_upper: int, + reward_cap_lower: int, + ) -> DataFrame: + """ + Returns aggregated solver rewards for accounting period defined by block range + """ + batch_reward_query_prod = ( + open_query("orderbook/prod_batch_rewards.sql") + .replace("{{start_block}}", start_block) + .replace("{{end_block}}", end_block) + .replace("{{EPSILON_LOWER}}", str(reward_cap_lower)) + .replace("{{EPSILON_UPPER}}", str(reward_cap_upper)) + ) + batch_reward_query_barn = ( + open_query("orderbook/barn_batch_rewards.sql") + .replace("{{start_block}}", start_block) + .replace("{{end_block}}", end_block) + .replace("{{EPSILON_LOWER}}", str(reward_cap_lower)) + .replace("{{EPSILON_UPPER}}", str(reward_cap_upper)) + ) + results = [] + + # Here, we use the convention that we run the prod query for the first connection + # and the barn query to all other connections + log.info("Checking statement timeout") + _res = self.connections[0].execute(text("SHOW statement_timeout;")) + log.info(f"Statement timeout for database is: {_res}") + log.info("Running prod query for first connection (in get_solver_rewards)") + results.append( + self.exec_query(query=batch_reward_query_prod, engine=self.connections[0]) + ) + for engine in self.connections[1:]: + log.info("Running barn query on other connections (in get_solver_rewards") + results.append( + self.exec_query(query=batch_reward_query_barn, engine=engine) + ) + + results_df = pd.concat(results) + + # warn and merge in case of solvers in both environments + if not results_df["solver"].is_unique: + log_duplicate_rows(results_df) + + results_df = ( + results_df.groupby("solver") + .agg( + { + "primary_reward_eth": "sum", + "protocol_fee_eth": "sum", + "network_fee_eth": "sum", + # there can be duplicate entries in partner_list now + "partner_list": merge_lists, + "partner_fee_eth": merge_lists, + } + ) + .reset_index() + ) + + return results_df + + def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame: + """Returns aggregated solver quote rewards for block range""" + quote_reward_query = ( + open_query("orderbook/quote_rewards.sql") + .replace("{{start_block}}", start_block) + .replace("{{end_block}}", end_block) + ) + results = [ + self.exec_query(query=quote_reward_query, engine=engine) + for engine in self.connections + ] + results_df = pd.concat(results) + + # warn and merge in case of solvers in both environments + if not results_df["solver"].is_unique: + log_duplicate_rows(results_df) + + results_df = ( + results_df.groupby("solver").agg({"num_quotes": "sum"}).reset_index() + ) + + return results_df + + +def pg_hex2bytea(hex_address: str) -> str: + """ + transforms hex string (beginning with 0x) to dune + compatible bytea by replacing `0x` with `\\x`. + """ + return hex_address.replace("0x", "\\x") + + +def log_duplicate_rows(df: DataFrame) -> None: + """Log rows with duplicate solvers entries. + Printing defaults are changed to show all column entries.""" + duplicated_entries = df[df["solver"].duplicated(keep=False)] + with pd.option_context( + "display.max_columns", + None, + "display.width", + None, + "display.max_colwidth", + None, + ): + log.warning( + f"Solvers found in both environments:\n {duplicated_entries}.\n" + "Merging results." + ) + + +def merge_lists(series: Series) -> list | None: + """Merges series containing lists into large list. + Returns None if the result would be an empty list.""" + merged = [] + for lst in series: + if lst is not None: + merged.extend(lst) + return merged if merged else None diff --git a/src/.pg_client.py.swp b/src/.pg_client.py.swp index d884e703e5e22048038ab5d6777f8948582a3c64..38a87955f394cdf4f3e20053021887f650d487c7 100644 GIT binary patch delta 397 zcmZozz}T>WQ6b47%+puFQqPb92m}}y)*Hp7zsgv{*x|v%kW`eIJ8_}PMwd7IjB=X| z1upZmU1nim;AY#*rJ%*g1fKtX&_YO#W?LUC$NnqG2#US4W)NoIatakN3K zUTQ^Za%oAb#^imf@|#1|)=Sp6axpN7azpHt1KAA(2l*Koa)CGjh+TmAB_9LBZ6MwQ z#8p6?2E+ziu!3@;!S<=)K5($2Mcv3DdhH^?VNiJ8Tz3aOI|{KWkOOH+#~6(Ft$`b8lv zF*7GMMMt41RktLgD8D>2FI^$Es3^ZkNs|E@3P9D#sRbnpu3!qJDX~}~)e0;<`J;;1 aWC0c3$p)%glcQ8*SW1d2ttRLAPXPdNuwi%r delta 236 zcmZozz}T>WQ6b47%+puFQqPb92m}}y_Nd3Czsgv{*x|v%kW`eIJ8_}PMwd7Ij8dBo z1upZm?PXzL_{zGOOF@f~2}qkMUS*lQUe%dBGe565+8}mwsM>nTdSz|~hQ&Y(GEWL* zE)X2#XJE($;!q&|$H&0%1&B`oaUl?g0x>%f|Kw$0_yNQhfcQKR_W^M{5Ssw8F%U}t u@oOFihO0olABekvI2?#of%pc@P6md}Y%B`gn?=1NnJ52LSulBye+mGZK{QbS diff --git a/src/pg_client.py b/src/pg_client.py index 144f66b4..d872e33f 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -69,12 +69,16 @@ def get_solver_rewards( # Here, we use the convention that we run the prod query for the first connection # and the barn query to all other connections log.info("Checking statement timeout") - _res = self.exec_query(query=text("SHOW statement_timeout;"), engine=self.connections[0]).scalar() + _res = self.connections[0].execute(text("SHOW statement_timeout;")).scalar() log.info(f"Statement timeout for database is: {_res}") log.info("Running prod query for first connection (in get_solver_rewards)") - results.append( - self.exec_query(query=batch_reward_query_prod, engine=self.connections[0]) - ) + try: + results.append( + self.exec_query(query=batch_reward_query_prod, engine=self.connections[0]) + ) + except Exception as e: + log.info("Query execution failed, re-throwing error") + raise e for engine in self.connections[1:]: log.info("Running barn query on other connections (in get_solver_rewards") results.append( From 526afb69fdf4fe127ead485d2b82333fab7dd0c2 Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 14:32:41 +0100 Subject: [PATCH 07/10] Set idle timeout to 900 --- src/.pg_client.py.swp | Bin 20480 -> 20480 bytes src/pg_client.py | 5 ++--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/.pg_client.py.swp b/src/.pg_client.py.swp index 38a87955f394cdf4f3e20053021887f650d487c7..70de8267dfd560664a1b9e31bfc2effb79254276 100644 GIT binary patch delta 357 zcmZozz}T>WQ6b47%+puFQqPb92m}}yT+L$AKPj$Z?C@Y>NGeLqow!hCqstq9M%m4V z0+;#O4zMsV{9xV8rJ%;h=E4SMn<*Y=VNy_-EUBiF=^CO?l3Wm?oq3dtUtlUz!1U>akwnV13<8bpMhaM5H|y{1`taD@ozo`hMhpX6o@N< zSP6*PfOrQl1H&pHUJ1mFK->Vt_CPEE#7}t`7@h#}Ng$pB#BD%a0L0cn%mKvLxEZ(^ jI)Rph93aZQnUSTPYqF@f@8k#CT$3Nzb8a^C?q&f1_AOOR delta 398 zcmZozz}T>WQ6b47%+puFQqPb92m}}y)*Hp7zsgv{*x|v%kW`eIJ8_}PMwd7IjB=X| z1upZmU1nim;AY#*rJ%;hmcRyPn<*Y=k%=!#Emp8qC{E2u(@U*TfC7l<o{X2|(-u#4q_67;Xdc9w4p);xr((0^-NK3=GGC zxDtr7fjA3@oq*U8h#7$RA`b(@G9X?G#9crf3&id~YzV}EfOekbX5eO+4rF)%u_*Uu zMwWK2$)euAo8Kz(FmGP#7tCmn;h{98V04ctq~#YWq$HLkCM6c9Dr6R0DO7_(s8&gn F0RX*@VB7!z diff --git a/src/pg_client.py b/src/pg_client.py index d872e33f..2f3359eb 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -68,9 +68,8 @@ def get_solver_rewards( # Here, we use the convention that we run the prod query for the first connection # and the barn query to all other connections - log.info("Checking statement timeout") - _res = self.connections[0].execute(text("SHOW statement_timeout;")).scalar() - log.info(f"Statement timeout for database is: {_res}") + log.info("Setting tcp_keepalives_idle to 900 for prod connection") + self.connections[0].execute(text("SET tcp_keepalives_idle = 900;")) log.info("Running prod query for first connection (in get_solver_rewards)") try: results.append( From 2fb2fcde7571588b27b5769a1bdbfff97410af27 Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 14:37:14 +0100 Subject: [PATCH 08/10] Remove try catch --- src/.pg_client.py.swp | Bin 20480 -> 20480 bytes src/pg_client.py | 10 +++------- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/.pg_client.py.swp b/src/.pg_client.py.swp index 70de8267dfd560664a1b9e31bfc2effb79254276..b321352d4bd2016e2fca70fcdd12c228a14efb08 100644 GIT binary patch delta 247 zcmZozz}T>WQ6b47%+puFQqPb92m}}yN-bj2UuCRe?C@Y>NGeLqow!hCqstq9M#;^F z0+;pc)3_NJ9sw~(sU%1X5Uk;6V3-2Lwm@tQ#Oy%4fe$DK#BM-r0mSA&%n!tTKzx>$ zfngpH7Xxt-5QhV?77&X7@jo60hLb=%4Tw{KSR9B!_MHUcRoo2R4AX#&5Fi%i-pt6- u&NW%oyJvEOHxC#;(B_={z@B|_ptm%X&jsXjPB!z_+5A><4fEvN{;B|XTr`{j delta 328 zcmZozz}T>WQ6b47%+puFQqPb92m}}yT+L$AKPj$Z?C@Y>NGeLqow!hCqstq9M%m4V z0+;pcPjE3Xgm6QY%7U~2!5V%BhWS9;48$5hECs~B`4|{>0`XEHt^{HwAZ7#N9lQ(- ztAKbV5H|vG0}$H-u>cT1meQcn%P^0dWBkTLUo%5MSeF;AZFqS`Ko6DEDSY zmUgbmqTW55^?i7lxfOt*C^54*RUviqdOvahz|z#BN`=&l)a25V%=|osw8YGu)D#_s zqEy|IjH3MV%)E4k)S{yNA|*`*kX{f-tw>HSC{b_)Qy@)=#R{ob3?Qk=A63LA3#jN$ VHc-`?9Hk<|Qc_fDHF>pvC;$&~Om_eP diff --git a/src/pg_client.py b/src/pg_client.py index 2f3359eb..3f79cc22 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -71,13 +71,9 @@ def get_solver_rewards( log.info("Setting tcp_keepalives_idle to 900 for prod connection") self.connections[0].execute(text("SET tcp_keepalives_idle = 900;")) log.info("Running prod query for first connection (in get_solver_rewards)") - try: - results.append( - self.exec_query(query=batch_reward_query_prod, engine=self.connections[0]) - ) - except Exception as e: - log.info("Query execution failed, re-throwing error") - raise e + results.append( + self.exec_query(query=batch_reward_query_prod, engine=self.connections[0]) + ) for engine in self.connections[1:]: log.info("Running barn query on other connections (in get_solver_rewards") results.append( From 83e2b0bdcefa1eb6691b3c51940888588a6c4581 Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 16:08:47 +0100 Subject: [PATCH 09/10] Remove unnecessary files --- src/' | 164 ------------------------------------------ src/.pg_client.py.swp | Bin 20480 -> 0 bytes 2 files changed, 164 deletions(-) delete mode 100644 src/' delete mode 100644 src/.pg_client.py.swp diff --git a/src/' b/src/' deleted file mode 100644 index 9be30e6a..00000000 --- a/src/' +++ /dev/null @@ -1,164 +0,0 @@ -"""Basic client for connecting to postgres database with login credentials""" - -from __future__ import annotations - - -import pandas as pd -from pandas import DataFrame, Series -from sqlalchemy import create_engine -from sqlalchemy.engine import Engine -from sqlalchemy import text - -from src.logger import set_log -from src.utils.query_file import open_query - -log = set_log(__name__) - - -class MultiInstanceDBFetcher: - """ - Allows identical query execution on multiple db instances (merging results). - Currently very specific to the CoW Protocol Orderbook DB. - """ - - def __init__(self, db_urls: list[str]): - log.info("Initializing MultiInstanceDBFetcher") - self.connections = [ - create_engine(f"postgresql+psycopg2://{url}", - pool_pre_ping=True, - connect_args={ - "keepalives": 1, - "keepalives_idle": 30, - "keepalives_interval": 10, - "keepalives_count": 5, - } - ) for url in db_urls - ] - - @classmethod - def exec_query(cls, query: str, engine: Engine) -> DataFrame: - """Executes query on DB engine""" - return pd.read_sql(sql=query, con=engine) - - def get_solver_rewards( - self, - start_block: str, - end_block: str, - reward_cap_upper: int, - reward_cap_lower: int, - ) -> DataFrame: - """ - Returns aggregated solver rewards for accounting period defined by block range - """ - batch_reward_query_prod = ( - open_query("orderbook/prod_batch_rewards.sql") - .replace("{{start_block}}", start_block) - .replace("{{end_block}}", end_block) - .replace("{{EPSILON_LOWER}}", str(reward_cap_lower)) - .replace("{{EPSILON_UPPER}}", str(reward_cap_upper)) - ) - batch_reward_query_barn = ( - open_query("orderbook/barn_batch_rewards.sql") - .replace("{{start_block}}", start_block) - .replace("{{end_block}}", end_block) - .replace("{{EPSILON_LOWER}}", str(reward_cap_lower)) - .replace("{{EPSILON_UPPER}}", str(reward_cap_upper)) - ) - results = [] - - # Here, we use the convention that we run the prod query for the first connection - # and the barn query to all other connections - log.info("Checking statement timeout") - _res = self.connections[0].execute(text("SHOW statement_timeout;")) - log.info(f"Statement timeout for database is: {_res}") - log.info("Running prod query for first connection (in get_solver_rewards)") - results.append( - self.exec_query(query=batch_reward_query_prod, engine=self.connections[0]) - ) - for engine in self.connections[1:]: - log.info("Running barn query on other connections (in get_solver_rewards") - results.append( - self.exec_query(query=batch_reward_query_barn, engine=engine) - ) - - results_df = pd.concat(results) - - # warn and merge in case of solvers in both environments - if not results_df["solver"].is_unique: - log_duplicate_rows(results_df) - - results_df = ( - results_df.groupby("solver") - .agg( - { - "primary_reward_eth": "sum", - "protocol_fee_eth": "sum", - "network_fee_eth": "sum", - # there can be duplicate entries in partner_list now - "partner_list": merge_lists, - "partner_fee_eth": merge_lists, - } - ) - .reset_index() - ) - - return results_df - - def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame: - """Returns aggregated solver quote rewards for block range""" - quote_reward_query = ( - open_query("orderbook/quote_rewards.sql") - .replace("{{start_block}}", start_block) - .replace("{{end_block}}", end_block) - ) - results = [ - self.exec_query(query=quote_reward_query, engine=engine) - for engine in self.connections - ] - results_df = pd.concat(results) - - # warn and merge in case of solvers in both environments - if not results_df["solver"].is_unique: - log_duplicate_rows(results_df) - - results_df = ( - results_df.groupby("solver").agg({"num_quotes": "sum"}).reset_index() - ) - - return results_df - - -def pg_hex2bytea(hex_address: str) -> str: - """ - transforms hex string (beginning with 0x) to dune - compatible bytea by replacing `0x` with `\\x`. - """ - return hex_address.replace("0x", "\\x") - - -def log_duplicate_rows(df: DataFrame) -> None: - """Log rows with duplicate solvers entries. - Printing defaults are changed to show all column entries.""" - duplicated_entries = df[df["solver"].duplicated(keep=False)] - with pd.option_context( - "display.max_columns", - None, - "display.width", - None, - "display.max_colwidth", - None, - ): - log.warning( - f"Solvers found in both environments:\n {duplicated_entries}.\n" - "Merging results." - ) - - -def merge_lists(series: Series) -> list | None: - """Merges series containing lists into large list. - Returns None if the result would be an empty list.""" - merged = [] - for lst in series: - if lst is not None: - merged.extend(lst) - return merged if merged else None diff --git a/src/.pg_client.py.swp b/src/.pg_client.py.swp deleted file mode 100644 index b321352d4bd2016e2fca70fcdd12c228a14efb08..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 20480 zcmeHOUu+yl8Q->{rO+f$s1Q#XHbT7%_SsHoNw2toW4j4T8e^voBDU6hyL0Dl_I5Wj zyRi=w{=76rK&eDkfj}tyd!VXB0l`xr$U~*3A`(TaKnST4yi`?r08tbPRQP?fe`fE_ zmnK0fPuWcByoFu|8`^F-l<8FvDMv(JwYJij5kpUFO*f#=z6 z^-(cUF;Fp3F;Fp3F;Fp3F;Fp3F;Fp3F;Fp3G4LN?z;+Ge7YObzy#MVc;zA5U>WU0y}}{w;RR<;91~XfD7CU zYzJPv#V~#jd=Yp6*af`ucEk7;@FU;{!1sZ#0iOVF2VQ-fVf+>NE$|%h6!3MR2Yd>c z2b#biZ#InI06zsbfCIp-!1Hf4j7NcofEI8&@Ryqm<2>+r;3L2t00VXb&)jGjUj=3X z;_4f~gTNy2ckuUT;29tQ7J+TR-;fk}6}Sle68I9Z3`_x&z-_=wNWA&GDRA86!F0A9|e%t@Pc+&GZ&K}@PZB_aZE{OVZhKmE*Mk~a9YR<1IcP2 zqQhe=B_UHqmd2|cicA~gOm(AF|HM>tMYG(m=oNPvChQhn9%^B7MT;HP_u+n%%&gzk zgSIDRoQ?$kqMH(+&#(?>6gODq3&ILU=aTWs)XW z)X*m}uVvEbj8_Io4Z$Pdc6iO)+?-ojI&}Exeb)U83v-KS&X{8uAAt!- zN8K#Pj;sVTLX@~0CE)PU<68^x!}ZGwze(0Q3trvHrRfYkHxKyQd-(M-jT+UVo zGZZctEu8wBpvj8`L9Y?AAy4_GAuT^ktE=?`jOulvzoGwcLTPebEum^cl?gRXv#{1h z9w!3JDE33aJ6J8;Y&*fuq>f;!IcYmim|((S_QFCvtV}L%V|{m7tH+c*KLg}%Z69bP6!)?+G($z}VIrs#-S6U8=Z?RvL z_HdyXp&e3i+qGb?HDFq;31hT$3~g%Y>>M^mtd3IeRI@YLu9H(i4VU&$$CqPi=~UIM zbd$}gopUSVKI-_klwG)0p{o}mz4Fjwrv$VV` z1Asesho-x~aC%p+f+6MNwC$tT)Z|rE!$yKC`>wnSHE=ok+1hQ{q9dn=ok87?D1;vx z32sF&_32|G;r(JAm_!W6Rt-_DZAPJtF>hcWABben38T*5=ETG%NXP1KPnvNv3339?Dx*jiis|L)~QL`k%n8 zSgAKt8_DhOKq3S-J-^3rz`Dm|#2v5gISd;p?Q#dg<7^>w#o5t(1!uD}4W;VGe9Tk0 zm@e&`j_qfLP6nN*jTC!nmvoNc1-LC(U#dDBG>Sr~OWAH&PX&@%%AQAb71|r%23$#a zclcJbtN?3>Wx-W#i?GXZHoSqhec4DI&$frAy>1kWnCb0EmxYWDoX4z0CN-CJ%C~(m z-0kJh$mFhSkV27! z;A&uPU(&cx{Qn|i+J7R(rTBjaA9Ov7_?~hA=YTH&5)i;Af#bkUz=-vxZ&L%@x| zKM>Eq1bh$J00Lkhm;<%}FCeym0{9{D2(Ta61Jr??zz$$L@CU^BzXK@lKLvQeH1J{I zZr~Q+_lWPm2~fO$53mb(CvXYz{mZ~}z|+9bfOEhS@Ihb?PzP$jCB*&bfwRCmunNoq z?+1vJ*NB(usu-vks2Hdis2F&2F+hH676qvcvdfUH%)W6Sp2K}%phG<>ojyd4D5i{_ zFJm3Eqbly1Csn$WYNqKn%C4vfl#?UxC;d5VrpTz|@hMd#HON&>9Z9wxCO!gCj;s*l z-6-xU-?x$byZAgp`EQxYr_e37;WWb^A}5*hrSOY<1O+^;X^>y76i9RCaIKYMnR})} zGOEcwqZl9$l{!zC?l0Y`hD$DQL&w|nxNR<}|K%v*rX@(|!T}q~+cuX2wppZ+U4$Ge z8Goj+9B5&1-Wrq^MpJ3J1U^4CKo=3FLlD#TB=S(bM%Ui3kZfP~TqNi9^AV-5uvjKE z=>{^BseDU>&|yRlDq2J+h^(YRnho~~PQKedfqyy$t_#&D2%VmXYs%|sth*pEazro2 z%cg`qv+4cC8f73{@^$4}xURB-=qZ#!XsyUAo5o>;7L=P$z2Ks?BA1?5Q8odd2UGfP zyQ%XU$}&>ZPE8`4J!^c!vRxPUDs|dwas!D!b9s3q3+8frb;520cfN%nrqzqNO{tYM zc}QJdo!nSWbzMc()ij2Rk#v!mNL|U$6y-vwXsyNJ(Niv z6P&6pY*eQ?OPS1kR?riuc6&Ep2Jr8&mOr8?iLS+Z6xj=lh>TeE%3Q4eSFb-hT@5{bRs9upgk9pWf~7 z04^f7|27~20UQVR0PhA~Lp*;0cpPwn_W;!11>gzbao`wm8&F*p0~G@m0~G@m0~G@m z0~G@m18+D3sZ%}tw{WJf=Fp`U^KVwyg8IH?Qu}_plACMNraD1dP^%LT!x;~wD1|S& z;F~9r(bvJ@m~`tj#fTGNBXbo*Vn$wdg0yscmU6-UCuh~UFMX78@EGHWle2ZCCI1E6 Cy{xGK From 6bf737c0a7eb1b6e08bd8bde64c49f694ed3314c Mon Sep 17 00:00:00 2001 From: = Date: Fri, 13 Dec 2024 16:37:51 +0100 Subject: [PATCH 10/10] Reformatting --- src/pg_client.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/pg_client.py b/src/pg_client.py index 3f79cc22..4338d489 100644 --- a/src/pg_client.py +++ b/src/pg_client.py @@ -24,15 +24,17 @@ class MultiInstanceDBFetcher: def __init__(self, db_urls: list[str]): log.info("Initializing MultiInstanceDBFetcher") self.connections = [ - create_engine(f"postgresql+psycopg2://{url}", - pool_pre_ping=True, - connect_args={ - "keepalives": 1, - "keepalives_idle": 30, - "keepalives_interval": 10, - "keepalives_count": 5, - } - ) for url in db_urls + create_engine( + f"postgresql+psycopg2://{url}", + pool_pre_ping=True, + connect_args={ + "keepalives": 1, + "keepalives_idle": 30, + "keepalives_interval": 10, + "keepalives_count": 5, + }, + ) + for url in db_urls ] @classmethod