Skip to content

Commit

Permalink
update condor to es scripts to es version 8.15 & python 3.11
Browse files Browse the repository at this point in the history
  • Loading branch information
alemsh committed Oct 30, 2024
1 parent 94b4121 commit 50721aa
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 34 deletions.
23 changes: 16 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
FROM almalinux:8
FROM almalinux:8.9

ARG CLIENT_ID
ARG CLIENT_SECRET
ARG TOKEN_URL

ENV CLIENT_ID=${CLIENT_ID}
ENV CLIENT_SECRET=${CLIENT_SECRET}
ENV TOKEN_URL=${TOKEN_URL}

RUN dnf -y install epel-release && \
yum install -y https://repo.opensciencegrid.org/osg/23-main/osg-23-main-el8-release-latest.rpm && \
yum install -y osg-ca-certs && \
dnf install -y python38 python38-pip && \
dnf install -y https://repo.opensciencegrid.org/osg/23-main/osg-23-main-el8-release-latest.rpm && \
dnf install -y osg-ca-certs && \
dnf install -y python3.11 python3.11-pip wget tar && \
dnf clean all && yum clean all && \
ln -s /usr/bin/python3.8 /usr/bin/python && \
pip3.8 install --no-cache-dir 'elasticsearch>=6.0.0,<7.0.0' 'elasticsearch-dsl>=6.0.0,<7.0.0' htcondor requests prometheus_client
ln -s /usr/bin/python3.11 /usr/bin/python && \
wget https://github.com/WIPACrepo/rest-tools/archive/refs/tags/v1.8.2.tar.gz && \
pip3.11 install --no-cache-dir elasticsearch elasticsearch htcondor requests prometheus_client setuptools ./v1.8.2.tar.gz

COPY . /monitoring

WORKDIR /monitoring

ENV CONDOR_CONFIG=/monitoring/condor_config
ENV CONDOR_CONFIG=/monitoring/condor_config
48 changes: 33 additions & 15 deletions condor_history_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@

import os
import glob
from optparse import OptionParser
from argparse import ArgumentParser
import logging
from functools import partial
from rest_tools.client import ClientCredentialsAuth

parser = OptionParser('usage: %prog [options] history_files')
parser.add_option('-a','--address',help='elasticsearch address')
parser.add_option('-n','--indexname',default='condor',
parser = ArgumentParser('usage: %prog [options] history_files')
parser.add_argument('-a','--address',help='elasticsearch address')
parser.add_argument('-n','--indexname',default='condor',
help='index name (default condor)')
parser.add_option('--dailyindex', default=False, action='store_true',
parser.add_argument('--dailyindex', default=False, action='store_true',
help='Index pattern daily')
parser.add_option("-y", "--dry-run", default=False,
parser.add_argument("-y", "--dry-run", default=False,
action="store_true",
help="query jobs, but do not ingest into ES",)
parser.add_option('--collectors', default=False, action='store_true',
parser.add_argument('--collectors', default=False, action='store_true',
help='Args are collector addresses, not files')
(options, args) = parser.parse_args()
if not args:
parser.add_argument('--client_id',help='oauth2 client id')
parser.add_argument('--client_secret',help='oauth2 client secret')
parser.add_argument('--token_url',help='oauth2 realm token url')
parser.add_argument("positionals", nargs='+')

options = parser.parse_args()
if not options.positionals:
parser.error('no condor history files or collectors')

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')
Expand All @@ -36,7 +42,6 @@ def es_generator(entries):
data['_index'] = options.indexname
if options.dailyindex:
data['_index'] += '-'+(data['date'].split('T')[0].replace('-','.'))
data['_type'] = 'job_ad'
data['_id'] = data['GlobalJobId'].replace('#','-').replace('.','-')
if not data['_id']:
continue
Expand All @@ -50,10 +55,21 @@ def es_generator(entries):
if '://' in address:
prefix,address = address.split('://')

api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq',
token_url=options.token_url,
client_secret=options.client_secret,
client_id=options.client_id)
token = api.make_access_token()

print(token)

url = '{}://{}'.format(prefix, address)
logging.info('connecting to ES at %s',url)
es = Elasticsearch(hosts=[url],
timeout=5000)
request_timeout=5000,
bearer_auth=token)

print('made it here')

def es_import(document_generator):
if options.dry_run:
Expand All @@ -63,21 +79,23 @@ def es_import(document_generator):
json.dump(hit, sys.stdout)
success = True
else:
success, _ = bulk(es, document_generator, max_retries=20, initial_backoff=2, max_backoff=3600)
success, _ = bulk(es, document_generator, max_retries=20, initial_backoff=2, max_backoff=360)
return success

failed = False
if options.collectors:
for coll_address in args:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address, history=True))
success = es_import(gen)
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
else:
for path in args:
for filename in glob.iglob(path):
print(options.positionals)
for path in options.positionals:
print(path)
for filename in glob.glob(path):
gen = es_generator(read_from_file(filename))
success = es_import(gen)
logging.info('finished processing %s', filename)
Expand Down
47 changes: 35 additions & 12 deletions condor_queue_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,32 @@

import os
import glob
from optparse import OptionParser
from argparse import ArgumentParser
import logging
from functools import partial
from rest_tools.client import ClientCredentialsAuth

parser = OptionParser('usage: %prog [options] history_files')
parser.add_option('-a','--address',help='elasticsearch address')
parser.add_option('-n','--indexname',default='job_queue',
help='index name (default job_queue)')
parser.add_option('--collectors', default=False, action='store_true',
parser = ArgumentParser('usage: %prog [options] history_files')
parser.add_argument('-a','--address',help='elasticsearch address')
parser.add_argument('-n','--indexname',default='condor',
help='index name (default condor)')
parser.add_argument('--dailyindex', default=False, action='store_true',
help='Index pattern daily')
parser.add_argument("-y", "--dry-run", default=False,
action="store_true",
help="query jobs, but do not ingest into ES",)
parser.add_argument('--collectors', default=False, action='store_true',
help='Args are collector addresses, not files')
(options, args) = parser.parse_args()
if not args:
parser.add_argument('--client_id',help='oauth2 client id')
parser.add_argument('--client_secret',help='oauth2 client secret')
parser.add_argument('--token_url',help='oauth2 realm token url')
parser.add_argument("positionals", nargs='+')

options = parser.parse_args()
if not options.positionals:
parser.error('no condor history files or collectors')


logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')

import htcondor
Expand All @@ -39,35 +51,46 @@ def es_generator(entries):
add_classads(data)
data = {k:data[k] for k in keys if k in data} # do filtering
data['_index'] = options.indexname
data['_type'] = 'job_ad'
data['_id'] = data['GlobalJobId'].replace('#','-').replace('.','-') + data['@timestamp']
yield data

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from rest_tools.client import ClientCredentialsAuth

prefix = 'http'
address = options.address



if '://' in address:
prefix,address = address.split('://')

url = '{}://{}'.format(prefix, address)

api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq',
token_url=options.token_url,
client_secret=options.client_secret,
client_id=options.client_id)
token = api.make_access_token()

logging.info('connecting to ES at %s',url)
es = Elasticsearch(hosts=[url],
timeout=5000)
timeout=5000,
bearer_auth=token)
es_import = partial(bulk, es, max_retries=20, initial_backoff=2, max_backoff=3600)

failed = False
if options.collectors:
for coll_address in args:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address))
success, _ = es_import(gen)
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
else:
for path in args:
for path in options.args:
for filename in glob.iglob(path):
gen = es_generator(read_from_file(filename))
success, _ = es_import(gen)
Expand Down

0 comments on commit 50721aa

Please sign in to comment.