From 50721aaaeab0721cd5969ef333b3c5673f7fc321 Mon Sep 17 00:00:00 2001 From: Alec Date: Wed, 30 Oct 2024 11:33:08 -0500 Subject: [PATCH] update condor to es scripts to es version 8.15 & python 3.11 --- Dockerfile | 23 ++++++++++++++------ condor_history_to_es.py | 48 ++++++++++++++++++++++++++++------------- condor_queue_to_es.py | 47 +++++++++++++++++++++++++++++----------- 3 files changed, 84 insertions(+), 34 deletions(-) diff --git a/Dockerfile b/Dockerfile index 20a48bd..42c0f84 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,24 @@ -FROM almalinux:8 +FROM almalinux:8.9 + +ARG CLIENT_ID +ARG CLIENT_SECRET +ARG TOKEN_URL + +ENV CLIENT_ID=${CLIENT_ID} +ENV CLIENT_SECRET=${CLIENT_SECRET} +ENV TOKEN_URL=${TOKEN_URL} RUN dnf -y install epel-release && \ - yum install -y https://repo.opensciencegrid.org/osg/23-main/osg-23-main-el8-release-latest.rpm && \ - yum install -y osg-ca-certs && \ - dnf install -y python38 python38-pip && \ + dnf install -y https://repo.opensciencegrid.org/osg/23-main/osg-23-main-el8-release-latest.rpm && \ + dnf install -y osg-ca-certs && \ + dnf install -y python3.11 python3.11-pip wget tar && \ dnf clean all && yum clean all && \ - ln -s /usr/bin/python3.8 /usr/bin/python && \ - pip3.8 install --no-cache-dir 'elasticsearch>=6.0.0,<7.0.0' 'elasticsearch-dsl>=6.0.0,<7.0.0' htcondor requests prometheus_client + ln -s /usr/bin/python3.11 /usr/bin/python && \ + wget https://github.com/WIPACrepo/rest-tools/archive/refs/tags/v1.8.2.tar.gz && \ + pip3.11 install --no-cache-dir elasticsearch elasticsearch htcondor requests prometheus_client setuptools ./v1.8.2.tar.gz COPY . /monitoring WORKDIR /monitoring -ENV CONDOR_CONFIG=/monitoring/condor_config +ENV CONDOR_CONFIG=/monitoring/condor_config \ No newline at end of file diff --git a/condor_history_to_es.py b/condor_history_to_es.py index 09d173e..1cee00b 100755 --- a/condor_history_to_es.py +++ b/condor_history_to_es.py @@ -6,23 +6,29 @@ import os import glob -from optparse import OptionParser +from argparse import ArgumentParser import logging from functools import partial +from rest_tools.client import ClientCredentialsAuth -parser = OptionParser('usage: %prog [options] history_files') -parser.add_option('-a','--address',help='elasticsearch address') -parser.add_option('-n','--indexname',default='condor', +parser = ArgumentParser('usage: %prog [options] history_files') +parser.add_argument('-a','--address',help='elasticsearch address') +parser.add_argument('-n','--indexname',default='condor', help='index name (default condor)') -parser.add_option('--dailyindex', default=False, action='store_true', +parser.add_argument('--dailyindex', default=False, action='store_true', help='Index pattern daily') -parser.add_option("-y", "--dry-run", default=False, +parser.add_argument("-y", "--dry-run", default=False, action="store_true", help="query jobs, but do not ingest into ES",) -parser.add_option('--collectors', default=False, action='store_true', +parser.add_argument('--collectors', default=False, action='store_true', help='Args are collector addresses, not files') -(options, args) = parser.parse_args() -if not args: +parser.add_argument('--client_id',help='oauth2 client id') +parser.add_argument('--client_secret',help='oauth2 client secret') +parser.add_argument('--token_url',help='oauth2 realm token url') +parser.add_argument("positionals", nargs='+') + +options = parser.parse_args() +if not options.positionals: parser.error('no condor history files or collectors') logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s') @@ -36,7 +42,6 @@ def es_generator(entries): data['_index'] = options.indexname if options.dailyindex: data['_index'] += '-'+(data['date'].split('T')[0].replace('-','.')) - data['_type'] = 'job_ad' data['_id'] = data['GlobalJobId'].replace('#','-').replace('.','-') if not data['_id']: continue @@ -50,10 +55,21 @@ def es_generator(entries): if '://' in address: prefix,address = address.split('://') +api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq', + token_url=options.token_url, + client_secret=options.client_secret, + client_id=options.client_id) +token = api.make_access_token() + +print(token) + url = '{}://{}'.format(prefix, address) logging.info('connecting to ES at %s',url) es = Elasticsearch(hosts=[url], - timeout=5000) + request_timeout=5000, + bearer_auth=token) + +print('made it here') def es_import(document_generator): if options.dry_run: @@ -63,12 +79,12 @@ def es_import(document_generator): json.dump(hit, sys.stdout) success = True else: - success, _ = bulk(es, document_generator, max_retries=20, initial_backoff=2, max_backoff=3600) + success, _ = bulk(es, document_generator, max_retries=20, initial_backoff=2, max_backoff=360) return success failed = False if options.collectors: - for coll_address in args: + for coll_address in options.positionals: try: gen = es_generator(read_from_collector(coll_address, history=True)) success = es_import(gen) @@ -76,8 +92,10 @@ def es_import(document_generator): failed = e logging.error('Condor error', exc_info=True) else: - for path in args: - for filename in glob.iglob(path): + print(options.positionals) + for path in options.positionals: + print(path) + for filename in glob.glob(path): gen = es_generator(read_from_file(filename)) success = es_import(gen) logging.info('finished processing %s', filename) diff --git a/condor_queue_to_es.py b/condor_queue_to_es.py index e7b35fe..62fd695 100755 --- a/condor_queue_to_es.py +++ b/condor_queue_to_es.py @@ -3,20 +3,32 @@ import os import glob -from optparse import OptionParser +from argparse import ArgumentParser import logging from functools import partial +from rest_tools.client import ClientCredentialsAuth -parser = OptionParser('usage: %prog [options] history_files') -parser.add_option('-a','--address',help='elasticsearch address') -parser.add_option('-n','--indexname',default='job_queue', - help='index name (default job_queue)') -parser.add_option('--collectors', default=False, action='store_true', +parser = ArgumentParser('usage: %prog [options] history_files') +parser.add_argument('-a','--address',help='elasticsearch address') +parser.add_argument('-n','--indexname',default='condor', + help='index name (default condor)') +parser.add_argument('--dailyindex', default=False, action='store_true', + help='Index pattern daily') +parser.add_argument("-y", "--dry-run", default=False, + action="store_true", + help="query jobs, but do not ingest into ES",) +parser.add_argument('--collectors', default=False, action='store_true', help='Args are collector addresses, not files') -(options, args) = parser.parse_args() -if not args: +parser.add_argument('--client_id',help='oauth2 client id') +parser.add_argument('--client_secret',help='oauth2 client secret') +parser.add_argument('--token_url',help='oauth2 realm token url') +parser.add_argument("positionals", nargs='+') + +options = parser.parse_args() +if not options.positionals: parser.error('no condor history files or collectors') + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s') import htcondor @@ -39,27 +51,38 @@ def es_generator(entries): add_classads(data) data = {k:data[k] for k in keys if k in data} # do filtering data['_index'] = options.indexname - data['_type'] = 'job_ad' data['_id'] = data['GlobalJobId'].replace('#','-').replace('.','-') + data['@timestamp'] yield data from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk +from rest_tools.client import ClientCredentialsAuth prefix = 'http' address = options.address + + + if '://' in address: prefix,address = address.split('://') url = '{}://{}'.format(prefix, address) + +api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq', + token_url=options.token_url, + client_secret=options.client_secret, + client_id=options.client_id) +token = api.make_access_token() + logging.info('connecting to ES at %s',url) es = Elasticsearch(hosts=[url], - timeout=5000) + timeout=5000, + bearer_auth=token) es_import = partial(bulk, es, max_retries=20, initial_backoff=2, max_backoff=3600) failed = False if options.collectors: - for coll_address in args: + for coll_address in options.positionals: try: gen = es_generator(read_from_collector(coll_address)) success, _ = es_import(gen) @@ -67,7 +90,7 @@ def es_generator(entries): failed = e logging.error('Condor error', exc_info=True) else: - for path in args: + for path in options.args: for filename in glob.iglob(path): gen = es_generator(read_from_file(filename)) success, _ = es_import(gen)