Skip to content

Commit

Permalink
Merge pull request #357 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Oct 16, 2024
2 parents 1ea71bb + 00ff919 commit 224356d
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 30 deletions.
6 changes: 4 additions & 2 deletions main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1936,7 +1936,8 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou
coll_status[content['coll_id']]['bytes'] += content['bytes']
elif content['status'] in [ContentStatus.New]:
coll_status[content['coll_id']]['new_files'] += 1
elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]:
elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed,
ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable]:
coll_status[content['coll_id']]['failed_files'] += 1
elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]:
coll_status[content['coll_id']]['missing_files'] += 1
Expand All @@ -1958,7 +1959,8 @@ def sync_collection_status(request_id, transform_id, workload_id, work, input_ou
ContentStatus.FakeAvailable, ContentStatus.FakeAvailable.value]:
coll_status[content['coll_id']]['processed_ext_files'] += 1
# elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]:
elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed]:
elif content['status'] in [ContentStatus.Failed, ContentStatus.FinalFailed,
ContentStatus.SubAvailable, ContentStatus.FinalSubAvailable]:
coll_status[content['coll_id']]['failed_ext_files'] += 1
elif content['status'] in [ContentStatus.Lost, ContentStatus.Deleted, ContentStatus.Missing]:
coll_status[content['coll_id']]['missing_ext_files'] += 1
Expand Down
102 changes: 101 additions & 1 deletion main/lib/idds/agents/transformer/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
TransformStatus, TransformLocking,
CollectionType, CollectionStatus,
CollectionRelationType,
ContentStatus, ContentRelationType,
CommandType, ProcessingStatus, WorkflowType,
ConditionStatus,
get_processing_type_from_transform_type,
get_transform_status_from_processing_status)
from idds.common.utils import setup_logging, truncate_string
from idds.core import (transforms as core_transforms,
processings as core_processings,
catalog as core_catalog,
throttlers as core_throttlers,
conditions as core_conditions)
from idds.agents.common.baseagent import BaseAgent
Expand Down Expand Up @@ -103,7 +105,7 @@ def __init__(self, num_threads=1, max_number_workers=8, poll_period=1800, retrie
if hasattr(self, 'cache_expire_seconds'):
self.cache_expire_seconds = int(self.cache_expire_seconds)
else:
self.cache_expire_seconds = 3600
self.cache_expire_seconds = 300

def is_ok_to_run_more_transforms(self):
if self.number_workers >= self.max_number_workers:
Expand All @@ -117,6 +119,9 @@ def show_queue_size(self):
self.logger.debug(q_str)

def get_throttlers(self):
"""
Use throttler
"""
cache = get_redis_cache()
throttlers = cache.get("throttlers", default=None)
if throttlers is None:
Expand All @@ -133,6 +138,101 @@ def get_throttlers(self):
cache.set("throttlers", throttlers, expire_seconds=self.cache_expire_seconds)
return throttlers

def get_num_active_transforms(self, site_name):
cache = get_redis_cache()
num_transforms = cache.get("num_transforms", default=None)
if num_transforms is None:
num_transforms = {}
active_status = [TransformStatus.New, TransformStatus.Ready]
active_status1 = [TransformStatus.Transforming, TransformStatus.Terminating]
rets = core_transforms.get_num_active_transforms(active_status + active_status1)
for ret in rets:
status, site, count = ret
if site is None:
site = 'Default'
if site not in num_transforms:
num_transforms[site] = {'new': 0, 'processing': 0}
if status in active_status:
num_transforms[site]['new'] += count
elif status in active_status1:
num_transforms[site]['processing'] += count
cache.set("num_transforms", num_transforms, expire_seconds=self.cache_expire_seconds)
default_value = {'new': 0, 'processing': 0}
return num_transforms.get(site_name, default_value)

def get_num_active_processings(self, site_name):
cache = get_redis_cache()
num_processings = cache.get("num_processings", default=None)
active_transforms = cache.get("active_transforms", default={})
if num_processings is None:
num_processings = {}
active_transforms = {}
active_status = [ProcessingStatus.New]
active_status1 = [ProcessingStatus.Submitting, ProcessingStatus.Submitted,
ProcessingStatus.Running, ProcessingStatus.Terminating, ProcessingStatus.ToTrigger,
ProcessingStatus.Triggering]
rets = core_processings.get_active_processings(active_status + active_status1)
for ret in rets:
req_id, trf_id, pr_id, site, status = ret
if site is None:
site = 'Default'
if site not in num_processings:
num_processings[site] = {'new': 0, 'processing': 0}
active_transforms[site] = []
if status in active_status:
num_processings[site]['new'] += 1
elif status in active_status1:
num_processings[site]['processing'] += 1
active_transforms[site].append(trf_id)
cache.set("num_processings", num_processings, expire_seconds=self.cache_expire_seconds)
cache.set("active_transforms", active_transforms, expire_seconds=self.cache_expire_seconds)
default_value = {'new': 0, 'processing': 0}
return num_processings.get(site_name, default_value), active_transforms

def get_num_active_contents(self, site_name, active_transform_ids):
cache = get_redis_cache()
# 1. input contents not terminated
# 2. output contents not terminated
tf_id_site_map = {}
all_tf_ids = []
for site in active_transform_ids:
all_tf_ids += active_transform_ids[site]
for tf_id in active_transform_ids[site]:
tf_id_site_map[tf_id] = site

num_input_contents = cache.get("num_input_contents", default=None)
num_output_contents = cache.get("num_output_contents", default=None)
if num_input_contents is None or num_output_contents is None:
num_input_contents, num_output_contents = {}, {}
if all_tf_ids:
ret = core_catalog.get_content_status_statistics_by_relation_type(all_tf_ids)
for item in ret:
status, relation_type, transform_id, count = item
site = tf_id_site_map[transform_id]
if site not in num_input_contents:
num_input_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
num_output_contents[site] = {'new': 0, 'activated': 0, 'processed': 0}
if status in [ContentStatus.New]:
if relation_type == ContentRelationType.Input:
num_input_contents[site]['new'] += count
elif relation_type == ContentRelationType.Output:
num_output_contents[site]['new'] += count
if status in [ContentStatus.Activated]:
if relation_type == ContentRelationType.Input:
num_input_contents[site]['activated'] += count
elif relation_type == ContentRelationType.Output:
num_output_contents[site]['activated'] += count
else:
if relation_type == ContentRelationType.Input:
num_input_contents[site]['processed'] += count
elif relation_type == ContentRelationType.Output:
num_output_contents[site]['processed'] += count

cache.set("num_input_contents", num_input_contents, expire_seconds=self.cache_expire_seconds)
cache.set("num_output_contents", num_output_contents, expire_seconds=self.cache_expire_seconds)
default_value = {'new': 0, 'activated': 0, 'processed': 0}
return num_input_contents.get(site_name, default_value), num_output_contents.get(site_name, default_value)

def whether_to_throttle(self, transform):
try:
site = transform['site']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ def upgrade() -> None:
op.add_column('requests', sa.Column('locking_hostname', sa.String(50)), schema=schema)
op.add_column('requests', sa.Column('locking_pid', sa.BigInteger()), schema=schema)
op.add_column('requests', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema)
op.add_column('requests', sa.Column('locking_thread_name', sa.String(50)), schema=schema)
op.add_column('requests', sa.Column('locking_thread_name', sa.String(100)), schema=schema)

op.add_column('transforms', sa.Column('locking_hostname', sa.String(50)), schema=schema)
op.add_column('transforms', sa.Column('locking_pid', sa.BigInteger()), schema=schema)
op.add_column('transforms', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema)
op.add_column('transforms', sa.Column('locking_thread_name', sa.String(50)), schema=schema)
op.add_column('transforms', sa.Column('locking_thread_name', sa.String(100)), schema=schema)

op.add_column('processings', sa.Column('locking_hostname', sa.String(50)), schema=schema)
op.add_column('processings', sa.Column('locking_pid', sa.BigInteger()), schema=schema)
op.add_column('processings', sa.Column('locking_thread_id', sa.BigInteger()), schema=schema)
op.add_column('processings', sa.Column('locking_thread_name', sa.String(50)), schema=schema)
op.add_column('processings', sa.Column('locking_thread_name', sa.String(100)), schema=schema)


def downgrade() -> None:
Expand Down
6 changes: 3 additions & 3 deletions main/lib/idds/orm/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class Request(BASE, ModelBase):
locking_hostname = Column(String(50))
locking_pid = Column(BigInteger, autoincrement=False)
locking_thread_id = Column(BigInteger, autoincrement=False)
locking_thread_name = Column(String(255))
locking_thread_name = Column(String(100))
campaign = Column(String(50))
campaign_group = Column(String(250))
campaign_tag = Column(String(20))
Expand Down Expand Up @@ -324,7 +324,7 @@ class Transform(BASE, ModelBase):
locking_hostname = Column(String(50))
locking_pid = Column(BigInteger, autoincrement=False)
locking_thread_id = Column(BigInteger, autoincrement=False)
locking_thread_name = Column(String(255))
locking_thread_name = Column(String(100))
name = Column(String(NAME_LENGTH))
has_previous_conditions = Column(Integer())
loop_index = Column(Integer())
Expand Down Expand Up @@ -441,7 +441,7 @@ class Processing(BASE, ModelBase):
locking_hostname = Column(String(50))
locking_pid = Column(BigInteger, autoincrement=False)
locking_thread_id = Column(BigInteger, autoincrement=False)
locking_thread_name = Column(String(255))
locking_thread_name = Column(String(100))
errors = Column(JSONString(1024))
_processing_metadata = Column('processing_metadata', JSON())
_running_metadata = Column('running_metadata', JSON())
Expand Down
13 changes: 8 additions & 5 deletions main/lib/idds/orm/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,13 @@ def get_collections_by_request_ids(request_ids, session=None):
if request_ids and type(request_ids) not in (list, tuple):
request_ids = [request_ids]

query = session.query(models.Collection.coll_id,
models.Collection.request_id,
models.Collection.transform_id,
models.Collection.workload_id)
columns = [models.Collection.coll_id,
models.Collection.request_id,
models.Collection.transform_id,
models.Collection.workload_id]
column_names = [column.name for column in columns]
query = session.query(*columns)

if request_ids:
query = query.filter(models.Collection.request_id.in_(request_ids))

Expand All @@ -342,7 +345,7 @@ def get_collections_by_request_ids(request_ids, session=None):
if tmp:
for t in tmp:
# rets.append(t.to_dict())
t2 = dict(zip(t.keys(), t))
t2 = dict(zip(column_names, t))
rets.append(t2)
return rets
except Exception as error:
Expand Down
38 changes: 22 additions & 16 deletions main/lib/idds/orm/contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,10 @@ def get_update_contents_from_others_by_dep_id(request_id=None, transform_id=None
.filter(models.Content.substatus != ContentStatus.New)
subquery = subquery.subquery()

query = session.query(models.Content.content_id,
subquery.c.substatus)
columns = [models.Content.content_id, subquery.c.substatus]
column_names = [column.name for column in columns]

query = session.query(*columns)
if request_id:
query = query.filter(models.Content.request_id == request_id)
if transform_id:
Expand All @@ -615,7 +617,7 @@ def get_update_contents_from_others_by_dep_id(request_id=None, transform_id=None
rets = []
if tmp:
for t in tmp:
t2 = dict(zip(t.keys(), t))
t2 = dict(zip(column_names, t))
rets.append(t2)
return rets
except Exception as ex:
Expand All @@ -642,10 +644,12 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None,
subquery = subquery.filter(models.Content.content_relation_type == 1)
subquery = subquery.subquery()

query = session.query(models.Content.request_id,
models.Content.transform_id,
models.Content.workload_id,
models.Content.coll_id)
columns = [models.Content.request_id,
models.Content.transform_id,
models.Content.workload_id,
models.Content.coll_id]
column_names = [column.name for column in columns]
query = session.query(*columns)
# query = query.with_hint(models.Content, "INDEX(CONTENTS CONTENTS_REQ_TF_COLL_IDX)", 'oracle')

if request_id:
Expand All @@ -661,7 +665,7 @@ def get_updated_transforms_by_content_status(request_id=None, transform_id=None,
rets = []
if tmp:
for t in tmp:
t2 = dict(zip(t.keys(), t))
t2 = dict(zip(column_names, t))
rets.append(t2)
return rets
except Exception as error:
Expand Down Expand Up @@ -986,13 +990,15 @@ def get_contents_ext_ids(request_id=None, transform_id=None, workload_id=None, c
if not isinstance(status, (tuple, list)):
status = [status]

query = session.query(models.Content_ext.request_id,
models.Content_ext.transform_id,
models.Content_ext.workload_id,
models.Content_ext.coll_id,
models.Content_ext.content_id,
models.Content_ext.panda_id,
models.Content_ext.status)
columns = [models.Content_ext.request_id,
models.Content_ext.transform_id,
models.Content_ext.workload_id,
models.Content_ext.coll_id,
models.Content_ext.content_id,
models.Content_ext.panda_id,
models.Content_ext.status]
column_names = [column.name for column in columns]
query = session.query(*columns)
if request_id:
query = query.filter(models.Content_ext.request_id == request_id)
if transform_id:
Expand All @@ -1009,7 +1015,7 @@ def get_contents_ext_ids(request_id=None, transform_id=None, workload_id=None, c
rets = []
if tmp:
for t in tmp:
t2 = dict(zip(t.keys(), t))
t2 = dict(zip(column_names, t))
rets.append(t2)
return rets
except sqlalchemy.orm.exc.NoResultFound as error:
Expand Down

0 comments on commit 224356d

Please sign in to comment.