Skip to content

Commit

Permalink
Opensearch support
Browse files Browse the repository at this point in the history
  • Loading branch information
sasgit-hub committed Dec 17, 2024
1 parent 4663432 commit ff02613
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 11 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ schema/__pycache__
schema/model.py
schema/__init__.py
tests/
http_ca.crt
http_ca.crt
root-ca.pem
67 changes: 63 additions & 4 deletions app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime, timezone, timedelta
from elasticsearch import Elasticsearch
import elasticsearch
import opensearchpy
from opensearchpy import OpenSearch
import json
from fastapi import HTTPException
Expand All @@ -27,6 +28,63 @@ def map_dbspec(record):

return record

def map_record_id(record):
record_id = record['host']['client_uuid']
for ip in record['addresses']:
record_id += '-' + str(ip)
return record_id

def post_to_opensearch(record):
# Create the client instance
verify_certs_val = os.environ.get('OS_VERIFY_CERTS', 'false').lower()
if verify_certs_val == '1' or verify_certs_val.startswith('t') or verify_certs_val.startswith('y'):
verify_certs=True
else:
verify_certs=False

os_client = OpenSearch(
os.environ['OS_HOST'],
ca_certs=os.environ['OS_CA_CERT'],
verify_certs=verify_certs,
http_auth=(os.environ['OS_USER'], os.environ['OS_PASS'])
)

try:
if not os_client.indices.exists(index=os.environ['OS_INDEX']):
with open('app/mapping/os_mapping.json') as file:
os_mapping = json.load(file)

with open('app/mapping/os_settings.json') as file:
os_settings = json.load(file)

settings = {
"settings": os_settings,
"mappings": os_mapping
}

os_client.indices.create(index=os.environ['OS_INDEX'], body=settings)

#Generate record id
record_id = map_record_id(record)

#Delete record if exists
try:
resp = os_client.delete(index=os.environ['OS_INDEX'], id=record_id)
if not resp.get('result') == 'deleted':
logger.debug('record with id {} not deleted'.format(record_id))
except opensearchpy.exceptions.NotFoundError:
logger.debug('Record id - {} not found for deletion. Proceeding to add the record'.format(record_id))
pass

#Create the record
resp = os_client.index(index=os.environ['OS_INDEX'], body=record, id=record_id, refresh=True)
logger.info('Record with id {} submitted for creation with the result {}'.format(record_id, resp))

except Exception as e:
raise HTTPException(status_code=500, detail="Error handling opensearch connection - " + str(e))

return resp

def post_to_elastic(record):
# Create the client instance
verify_certs_val = os.environ.get('ELASTIC_VERIFY_CERTS', 'false').lower()
Expand All @@ -47,9 +105,7 @@ def post_to_elastic(record):
verify_certs=verify_certs,
basic_auth=(os.environ['ELASTIC_USER'], os.environ['ELASTIC_PASS'])
)
record_id = record['host']['client_uuid']
for ip in record['addresses']:
record_id += '-' + str(ip)

try:
if not esclient.indices.exists(index=os.environ['ELASTIC_INDEX']):
with open('app/mapping/es_mapping.json') as file:
Expand All @@ -65,9 +121,12 @@ def post_to_elastic(record):

esclient.indices.create(index=os.environ['ELASTIC_INDEX'], ignore=400, body=settings)

#Generate record id
record_id = map_record_id(record)

#Delete record if exists
try:
resp = esclient.delete(index='test_index', id=record_id)
resp = esclient.delete(index=os.environ['ELASTIC_INDEX'], id=record_id)
if not resp.get('result') == 'deleted':
logger.debug('record with id {} not deleted'.format(record_id))
except elasticsearch.NotFoundError:
Expand Down
254 changes: 254 additions & 0 deletions app/mapping/os_mapping.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
{
"dynamic_templates": [
{
"strings_as_text": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"numeric_detection": true,
"date_detection": false,
"properties": {
"@timestamp": {
"type": "date"
},
"expires": {
"type": "date"
},
"capacity": {
"type": "integer"
},
"name": {
"type": "keyword"
},
"addresses": {
"type": "ip"
},
"ip_versions": {
"type": "integer"
},
"pscheduler_tests": {
"type": "keyword"
},
"mtu": {
"type": "integer"
},
"mac": {
"type": "keyword"
},
"meta": {
"type": "object"
},
"host": {
"type": "object",
"properties": {
"vm": {
"type": "boolean"
},
"processor_core_count": {
"type": "integer"
},
"os_kernel": {
"type": "keyword"
},
"os_name": {
"type": "keyword"
},
"processor_speed": {
"type": "long"
},
"os_version": {
"type": "keyword"
},
"productname": {
"type": "keyword"
},
"client_uuid": {
"type": "keyword"
},
"perfsonar_bundle": {
"type": "keyword"
},
"net_core_rmem_max": {
"type": "text"
},
"net_core_rmem_default": {
"type": "text"
},
"net_core_wmem_max": {
"type": "text"
},
"net_core_wmem_default": {
"type": "text"
},
"manufacturer": {
"type": "keyword"
},
"processor_id": {
"type": "keyword"
},
"perfsonar_version": {
"type": "keyword"
},
"net_ipv4_tcp_congestion_control": {
"type": "keyword"
},
"net_ipv4_tcp_available_congestion_control":{
"type": "keyword"
},
"net_ipv4_tcp_allowed_congestion_control":{
"type": "keyword"
},
"os_architecture": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"net_ipv4_tcp_rmem_default": {
"type": "text"
},
"net_ipv4_tcp_rmem_max":{
"type": "text"
},
"net_ipv4_tcp_rmem_min":{
"type": "text"
},
"net_ipv4_tcp_wmem_default": {
"type": "text"
},
"net_ipv4_tcp_wmem_max": {
"type": "text"
},
"net_ipv4_tcp_wmem_min": {
"type": "text"
},
"net_ipv4_tcp_no_metrics_save":{
"type": "boolean"
},
"net_ipv4_tcp_mtu_probing":{
"type": "integer"
},
"net_core_default_qdisc":{
"type": "text"
},
"net_ipv4_conf_all_arp_ignore":{
"type": "integer"
},
"net_ipv4_conf_all_arp_announce":{
"type": "integer"
},
"net_ipv4_conf_default_arp_filter":{
"type": "boolean"
},
"net_ipv4_conf_all_arp_filter":{
"type": "boolean"
},
"net_core_netdev_max_backlog":{
"type": "long"
},
"group_domains": {
"type": "keyword"
},
"memory_bytes": {
"type": "text"
},
"location": {
"type": "object",
"properties": {
"state_province_region": {
"type": "keyword"
},
"city": {
"type": "keyword"
},
"postal_code": {
"type": "keyword"
},
"country": {
"type": "keyword"
},
"coordinates": {
"type": "geo_point"
},
"sitename": {
"type": "keyword"
}
}
},
"group_communities": {
"type": "keyword"
},
"role": {
"type": "keyword"
},
"access_policy": {
"type": "keyword"
},
"access_notes": {
"type": "keyword"
},
"pscheduler_service": {
"type": "object",
"properties": {
"urls": {
"type": "keyword"
},
"version": {
"type": "keyword"
},
"tools": {
"type": "keyword"
},
"tests": {
"type": "keyword"
},
"meta": {
"type": "object"
}
}
},
"archive_service": {
"type": "object",
"properties": {
"urls": {
"type": "keyword"
},
"write_urls": {
"type": "keyword"
},
"version": {
"type": "keyword"
},
"archiver_type": {
"type": "keyword"
},
"write_type": {
"type": "keyword"
},
"meta": {
"type": "object"
}
}
},
"administrators": {
"type": "object",
"properties": {
"emails": {
"type": "keyword"
},
"meta": {
"type": "object"
}
}
},
"meta": {
"type": "object"
}
}
}
}
}
4 changes: 4 additions & 0 deletions app/mapping/os_settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"number_of_shards": 1,
"number_of_replicas": 0
}
12 changes: 6 additions & 6 deletions app/routers/records.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from fastapi import APIRouter, Request, Response, HTTPException
from ..validate_record import RecordValidation
from ..database import map_dbspec, post_to_elastic
try:
from schema.model import PerfsonarLookupServiceSchema
except Exception:
PerfsonarLookupServiceSchema = dict
from ..database import map_dbspec, post_to_elastic, post_to_opensearch
import os

router = APIRouter()
validation = RecordValidation()
Expand All @@ -20,6 +17,9 @@ def register_record(request: Request, response: Response, registration_record: d

registration_record = registration_record['record']
registration_record = map_dbspec(registration_record)
response = post_to_elastic(registration_record)
if str(os.environ.get('DATABASE')).startswith('elastic'):
response = post_to_elastic(registration_record)
elif str(os.environ.get('DATABASE')).startswith('opensearch'):
response = post_to_opensearch(registration_record)

return response

0 comments on commit ff02613

Please sign in to comment.