Skip to content

Commit

Permalink
Query only recent data
Browse files Browse the repository at this point in the history
  • Loading branch information
petya-vasileva committed Jan 14, 2025
1 parent 67f0088 commit af3c356
Showing 1 changed file with 32 additions and 45 deletions.
77 changes: 32 additions & 45 deletions data_objects/MetaData.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,58 @@
from elasticsearch.helpers import scan
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import pandas as pd
import traceback
import urllib3
import re
from geopy.geocoders import Nominatim

import utils.queries as qrs
import utils.helpers as hp


class MetaData(object):

def __init__(self, dateFrom=None, dateTo=None):

fmt = "%Y-%m-%d %H:%M"
now = hp.roundTime(datetime.utcnow())
dateTo = datetime.strftime(now, fmt)
# Around 1 Sept, the netsites were introduced to ES
beforeChange = self.getEndpoints( ['2023-01-01 00:15', '2023-09-01 00:15'], False)
afterChange = self.getEndpoints(['2023-09-01 00:15', dateTo], True)
fmt = "%Y-%m-%d %H:%M"
now = hp.roundTime(datetime.now(timezone.utc))
dateTo = datetime.strftime(now, fmt)
dateFrom = datetime.strftime(now - timedelta(days=90), fmt)

endpointsDf = pd.merge(afterChange[['ip', '_host', '_site' ,'ipv6', 'netsite']],
beforeChange[['ip', '_host', '_site' ,'ipv6']], on=['ip', 'ipv6'], how="outer", suffixes=('_after', '_before'))
endpointsDf = self.getEndpoints([dateFrom, dateTo], True)

endpointsDf.loc[:, 'ip'] = endpointsDf['ip'].str.upper()
endpointsDf = endpointsDf[(~endpointsDf['ip'].isnull()) & ~endpointsDf['ip'].isin(['%{[DESTINATION][IPV4]}', '%{[DESTINATION][IPV6]}'])]
endpointsDf.loc[:, 'ip'] = endpointsDf['ip'].str.upper()
endpointsDf = endpointsDf[(~endpointsDf['ip'].isnull()) & ~endpointsDf['ip'].isin(['%{[DESTINATION][IPV4]}', '%{[DESTINATION][IPV6]}'])]

endpointsDf['site'] = endpointsDf.apply(lambda row: self.combine_sites(row), axis=1)
endpointsDf['netsite'] = endpointsDf['netsite'].fillna(endpointsDf['site'])
endpointsDf['host'] = endpointsDf['_host_after'].fillna(endpointsDf['_host_before'])
endpointsDf['site'] = endpointsDf['netsite']
endpointsDf['netsite'] = endpointsDf['netsite'].fillna(endpointsDf['_site'])
endpointsDf['host'] = endpointsDf['_host']

endpointsDf = self.fixUnknownSites(endpointsDf)
endpointsDf = endpointsDf[['ip', 'site', 'ipv6', 'host', 'netsite']].drop_duplicates()
endpointsDf = self.fixUnknownSites(endpointsDf)
endpointsDf = endpointsDf[['ip', 'site', 'ipv6', 'host', 'netsite']].drop_duplicates()

metaDf = self.getMeta(endpointsDf)
metaDf = self.fixUnknownWithNetsite(metaDf)

metaDf = self.getMeta(endpointsDf)
metaDf = self.fixUnknownWithNetsite(metaDf)
# metaDf.loc[:, 'site_original'] = metaDf['site']
# metaDf.loc[:, 'netsite_original'] = metaDf['netsite']
metaDf.loc[:, 'site_meta'] = metaDf['site_meta']

# metaDf.loc[:, 'ip_original'] = metaDf['ip']
metaDf.loc[:, 'site_original'] = metaDf['site']
metaDf.loc[:, 'netsite_original'] = metaDf['netsite']
metaDf.loc[:, 'site_meta'] = metaDf['site_meta']
metaDf.loc[:, 'ip'] = metaDf['ip'].str.upper()
metaDf.loc[:, 'site'] = metaDf['site'].str.upper()
metaDf.loc[:, 'netsite'] = metaDf['netsite'].str.upper()
metaDf.loc[:, 'site_meta'] = metaDf['site_meta'].str.upper()

metaDf.loc[:, 'ip'] = metaDf['ip'].str.upper()
metaDf.loc[:, 'site'] = metaDf['site'].str.upper()
metaDf.loc[:, 'netsite'] = metaDf['netsite'].str.upper()
metaDf.loc[:, 'site_meta'] = metaDf['site_meta'].str.upper()
metaDf = metaDf.sort_values(['lat', 'lon'])
metaDf = metaDf.fillna(metaDf[['host', 'lat', 'lon']].groupby('host').ffill())
metaDf = metaDf.fillna(metaDf[['ip', 'lat', 'lon']].groupby('ip').ffill())
metaDf = metaDf.fillna(metaDf[['site', 'lat', 'lon']].groupby('site').ffill())
metaDf = metaDf.fillna(metaDf[['site_meta', 'lat', 'lon']].groupby('site_meta').ffill())

# fill in empty lat and lon based on host, ip, site
metaDf = metaDf.sort_values(['lat', 'lon'])
metaDf = metaDf.fillna(metaDf[['host', 'lat', 'lon']].groupby('host').ffill())
metaDf = metaDf.fillna(metaDf[['ip', 'lat', 'lon']].groupby('ip').ffill())
metaDf = metaDf.fillna(metaDf[['site', 'lat', 'lon']].groupby('site').ffill())
metaDf = metaDf.fillna(metaDf[['site_meta', 'lat', 'lon']].groupby('site_meta').ffill())
metaDf = self.locateCountry(metaDf)

# mdf_sorted = metaDf.sort_values(by=['ip', 'site', 'netsite'])
# # when an ip has a few records, prefer one where site != netsite
# metaDf = mdf_sorted.drop_duplicates(subset='ip', keep='last')
metaDf = metaDf.drop_duplicates()

metaDf = self.locateCountry(metaDf)
print(f"Endpoints without a location: {len(metaDf[metaDf['lat'].isnull()])}")

metaDf = metaDf.drop_duplicates()

print(f"Endpoints without a location: {len(metaDf[metaDf['lat'].isnull()])}")

self.metaDf = metaDf
self.metaDf = metaDf


@staticmethod
Expand Down Expand Up @@ -390,7 +377,7 @@ def getMeta(self, endpointsDf):
not_in_meta, uk = [],[]
for ip, site, ipv6, host, netsite in endpointsDf.values.tolist():
rec, not_in = self.mostRecentMetaRecord(ip, site, host, ipv6, netsite)
if site == 'UNKNOWN':
if site == 'UNKNOWN':
uk.append(ip)
# print(ip, site, ipv6, host)
if rec:
Expand Down

0 comments on commit af3c356

Please sign in to comment.