Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sasgit-hub committed Oct 17, 2024
1 parent ba17b02 commit 590f407
Show file tree
Hide file tree
Showing 11 changed files with 657 additions and 26 deletions.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
__pycache__
app/__pycache__
app/routers/__pycache__
schema/__pycache__
schema/model.py
schema/__init__.py
tests/
Dockerfile
http_ca.crt
Dockerfile
Empty file added app/__init__.py
Empty file.
55 changes: 55 additions & 0 deletions app/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from ipaddress import ip_address, IPv6Address, IPv4Address
from datetime import datetime, timezone, timedelta
from elasticsearch import Elasticsearch
from opensearchpy import OpenSearch
import json
from fastapi import HTTPException
import os

def map_dbspec(record):

ip_version = set()

for ip in record['addresses']:
if type(ip_address(ip)) is IPv6Address:
ip_version.add(6)
elif type(ip_address(ip)) is IPv4Address:
ip_version.add(4)

record['ip_versions'] = list(ip_version)
record['@timestamp'] = datetime.now(timezone.utc)
record['expires'] = record['@timestamp'] + timedelta(hours=24)

return record

def post_to_elastic(record):
# Create the client instance
esclient = Elasticsearch(
os.environ['ELASTIC_HOST'],
ca_certs=os.environ.get('ELASTIC_CA_CERT'),
basic_auth=(os.environ['ELASTIC_USER'], os.environ['ELASTIC_PASS'])
)
record_id = record['host']['client_uuid']
for ip in record['addresses']:
record_id += '-' + str(ip)
try:
if not esclient.indices.exists(index=os.environ['ELASTIC_INDEX']):
with open('app/mapping/es_mapping.json') as file:
es_mapping = json.load(file)

with open('app/mapping/es_settings.json') as file:
es_settings = json.load(file)

settings = {
"settings": es_settings,
"mappings": es_mapping
}

esclient.indices.create(index=os.environ['ELASTIC_INDEX'], ignore=400, body=settings)
resp = esclient.index(index=os.environ['ELASTIC_INDEX'], document=record, id=record_id)

except Exception as e:
raise HTTPException(status_code=500, detail="Error handling elasticsearch connection - " + str(e))


return resp
28 changes: 28 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from fastapi import FastAPI

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_ipaddr
from slowapi.errors import RateLimitExceeded
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
from .routers import records

app = FastAPI()

limiter = Limiter(key_func=get_ipaddr)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# Acquire a tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter()))
FastAPIInstrumentor().instrument_app(app)
ElasticsearchInstrumentor().instrument()

app.include_router(records.router)
111 changes: 85 additions & 26 deletions mapping/es_mapping.json → app/mapping/es_mapping.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
{
"dynamic_templates": [
{
"strings_as_text": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
],
"numeric_detection": true,
"date_detection": false,
"properties": {
"@timestamp": {
"type": "date"
},
"expires": {
"type": "date"
},
Expand All @@ -11,11 +24,10 @@
"name": {
"type": "keyword"
},
"addresses":
{
"addresses": {
"type": "ip"
},
"ip_versions":{
"ip_versions": {
"type": "integer"
},
"pscheduler_tests": {
Expand All @@ -32,7 +44,7 @@
},
"host": {
"type": "object",
"properties":{
"properties": {
"vm": {
"type": "boolean"
},
Expand All @@ -46,7 +58,7 @@
"type": "keyword"
},
"processor_speed": {
"type": "integer"
"type": "long"
},
"os_version": {
"type": "version"
Expand All @@ -60,25 +72,34 @@
"perfsonar_bundle": {
"type": "keyword"
},
"net_tcp_autotunemaxbuffer_send": {
"type": "integer"
"net_core_rmem_max": {
"type": "text"
},
"net_tcp_maxbuffer_recv": {
"type": "integer"
"net_core_rmem_default": {
"type": "text"
},
"net_core_wmem_max": {
"type": "text"
},
"net_core_wmem_default": {
"type": "text"
},
"manufacturer": {
"type": "keyword"
},
"processor_id": {
"type": "keyword"
},
"processor_count": {
"type": "integer"
},
"perfsonar_version": {
"type": "version"
},
"net_tcp_congestionalgorithm": {
"net_ipv4_tcp_congestion_control": {
"type": "keyword"
},
"net_ipv4_tcp_available_congestion_control":{
"type": "keyword"
},
"net_ipv4_tcp_allowed_congestion_control":{
"type": "keyword"
},
"os_architecture": {
Expand All @@ -87,28 +108,64 @@
"name": {
"type": "keyword"
},
"net_tcp_autotunemaxbuffer_recv": {
"net_ipv4_tcp_rmem_default": {
"type": "text"
},
"net_ipv4_tcp_rmem_max":{
"type": "text"
},
"net_ipv4_tcp_rmem_min":{
"type": "text"
},
"net_ipv4_tcp_wmem_default": {
"type": "text"
},
"net_ipv4_tcp_wmem_max": {
"type": "text"
},
"net_ipv4_tcp_wmem_min": {
"type": "text"
},
"net_ipv4_tcp_no_metrics_save":{
"type": "boolean"
},
"net_ipv4_tcp_mtu_probing":{
"type": "integer"
},
"group_domains": {
"type": "keyword"
"net_core_default_qdisc":{
"type": "text"
},
"memory_bytes": {
"net_ipv4_conf_all_arp_ignore":{
"type": "integer"
},
"net_tcp_maxbuffer_send": {
"net_ipv4_conf_all_arp_announce":{
"type": "integer"
},
"location":{
"net_ipv4_conf_default_arp_filter":{
"type": "boolean"
},
"net_ipv4_conf_all_arp_filter":{
"type": "boolean"
},
"net_core_netdev_max_backlog":{
"type": "long"
},
"group_domains": {
"type": "keyword"
},
"memory_bytes": {
"type": "text"
},
"location": {
"type": "object",
"properties": {
"state": {
"state_province_region": {
"type": "keyword"
},
"city": {
"type": "keyword"
},
"zip": {
"postal_code": {
"type": "keyword"
},
"country": {
Expand Down Expand Up @@ -160,6 +217,9 @@
"urls": {
"type": "keyword"
},
"write_urls": {
"type": "keyword"
},
"version": {
"type": "version"
},
Expand All @@ -176,11 +236,11 @@
},
"administrators": {
"type": "object",
"properties":{
"emails":{
"properties": {
"emails": {
"type": "keyword"
},
"meta":{
"meta": {
"type": "object"
}
}
Expand All @@ -191,5 +251,4 @@
}
}
}
}

}
4 changes: 4 additions & 0 deletions app/mapping/es_settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"number_of_shards": 1,
"number_of_replicas": 0
}
Empty file added app/routers/__init__.py
Empty file.
25 changes: 25 additions & 0 deletions app/routers/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from fastapi import APIRouter, Request, Response, HTTPException
from ..validate_record import RecordValidation
from ..database import map_dbspec, post_to_elastic
try:
from schema.model import PerfsonarLookupServiceSchema
except Exception:
PerfsonarLookupServiceSchema = dict

router = APIRouter()
validation = RecordValidation()

@router.post("/record/")
def register_record(request: Request, response: Response, registration_record: dict):

registration_record = validation.validate_record(registration_record)

if not registration_record['validated']:
raise HTTPException(status_code=422,
detail="Registration record Validation failed. {}".format(registration_record['error'].message))

registration_record = map_dbspec(registration_record['record'])
registration_record = map_dbspec(registration_record)
response = post_to_elastic(registration_record)

return response
21 changes: 21 additions & 0 deletions app/validate_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from jsonschema import validate
import json

class RecordValidation(object):

def __init__(self):
with open('schema/schema.json') as file:
self.schema = json.load(file)

def validate_record(self, record={}):
#validate
try:
validate(instance=record, schema=self.schema)
validated = True
error = None

except Exception as e:
validated = False
error = e

return {'validated': validated, 'error': error, 'record': record}
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
opensearch-py
Loading

0 comments on commit 590f407

Please sign in to comment.