Skip to content

Commit

Permalink
Merge pull request PanDAWMS#451 from tmaeno/master
Browse files Browse the repository at this point in the history
improved executemany for postgres
  • Loading branch information
tmaeno authored Nov 18, 2024
2 parents 02abe5d + b2d51e0 commit 9e351be
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19628,7 +19628,7 @@ def loadSWTags(self, sw_tags):
self.cur.execute(sql_delete + comment)
tmp_log.debug("done cleaning up SW_TAGS table")

sql_insert = "INSERT INTO ATLAS_PANDA.SW_TAGS (panda_queue, data, last_update)" "VALUES (:pq, :data, :last_update)"
sql_insert = "INSERT INTO ATLAS_PANDA.SW_TAGS (panda_queue, data, last_update) VALUES (:pq, :data, :last_update)"
tmp_log.debug("start filling up SW_TAGS table")
for shard in create_shards(var_map_tags, 100): # insert in batches of 100 rows
self.cur.executemany(sql_insert + comment, shard)
Expand Down
45 changes: 28 additions & 17 deletions pandaserver/taskbuffer/WrappedCursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@


# convert SQL and parameters in_printf format
def convert_query_in_printf_format(sql, var_dict, sql_conv_map):
def convert_query_in_printf_format(sql, var_dict_list, sql_conv_map):
if sql in sql_conv_map:
sql = sql_conv_map[sql]
else:
old_sql = sql
if var_dict_list:
var_dict = var_dict_list[0]
else:
var_dict = {}
# %
sql = re.sub(r"%", r"%%", sql)
# current date except for being used for interval
Expand Down Expand Up @@ -128,16 +132,18 @@ def convert_query_in_printf_format(sql, var_dict, sql_conv_map):
# cache
sql_conv_map[old_sql] = sql
# extract placeholders
paramList = []
params_list = []
items = re.findall(r":[^ $,)\+\-\n]+", sql)
for item in items:
if item not in var_dict:
raise KeyError(f"{item} is missing in SQL parameters")
if item not in paramList:
paramList.append(var_dict[item])
for var_dict in var_dict_list:
params = []
for item in items:
if item not in var_dict:
raise KeyError(f"{item} is missing in SQL parameters")
params.append(var_dict[item])
params_list.append(params)
# using the printf style syntax
sql = re.sub(":[^ $,)\+\-]+", "%s", sql)
return sql, paramList
return sql, params_list


# proxy
Expand All @@ -159,6 +165,13 @@ def __init__(self, connection):
self.dump = False
# SQL conversion map
self.sql_conv_map = {}
# executemany
if self.backend == "postgres":
from psycopg2.extras import execute_batch

self.alt_executemany = execute_batch
else:
self.alt_executemany = None

# __iter__
def __iter__(self):
Expand Down Expand Up @@ -216,18 +229,16 @@ def execute(self, sql, varDict=None, cur=None): # , returningInto=None
cur = self.cur
ret = None
# schema names
sql = re.sub("ATLAS_PANDA\.", panda_config.schemaPANDA + ".", sql)
sql = re.sub("ATLAS_PANDAMETA\.", panda_config.schemaMETA + ".", sql)
sql = re.sub("ATLAS_GRISLI\.", panda_config.schemaGRISLI + ".", sql)
sql = re.sub("ATLAS_PANDAARCH\.", panda_config.schemaPANDAARCH + ".", sql)
sql = self.change_schema(sql)
# remove `
sql = re.sub("`", "", sql)
if self.backend == "oracle":
ret = cur.execute(sql, varDict)
elif self.backend == "postgres":
if self.dump:
_logger.debug(f"OLD: {sql} {str(varDict)}")
sql, varList = convert_query_in_printf_format(sql, varDict, self.sql_conv_map)
sql, vars_list = convert_query_in_printf_format(sql, [varDict], self.sql_conv_map)
varList = vars_list[0]
if self.dump:
_logger.debug(f"NEW: {sql} {str(varList)}")
ret = cur.execute(sql, varList)
Expand Down Expand Up @@ -412,11 +423,11 @@ def executemany(self, sql, params):
if sql is None:
sql = self.statement
sql = self.change_schema(sql)
if self.backend == "oracle":
self.cur.executemany(sql, params)
if self.backend == "postgres":
sql, vars_list = convert_query_in_printf_format(sql, params, self.sql_conv_map)
self.alt_executemany(self.cur, sql, vars_list)
else:
for paramsItem in params:
self.execute(sql, paramsItem)
self.cur.executemany(sql, params)

# get_description
@property
Expand Down

0 comments on commit 9e351be

Please sign in to comment.