Skip to content

Commit

Permalink
Merge branch 'metrics' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
jpswinski committed Nov 1, 2023
2 parents 710643b + 1b718e3 commit 29de90f
Show file tree
Hide file tree
Showing 43 changed files with 3,036 additions and 429 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ If you are a science data user interested in using SlideRule to process Earth sc

3. For dependencies associated with a specific package, see the package readme at `packages/{package}/README.md` for additional installation instructions.

4. For debug builds, there are additional tools that are needed for static analysis:

```bash
$ sudo apt install clang clang-tidy cppcheck
```

## II. Building with CMake

Expand Down
85 changes: 50 additions & 35 deletions clients/python/sliderule/sliderule.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,29 +377,6 @@ def __parse_native(data, callbacks):

return recs

#
# __build_auth_header
#
def __build_auth_header():
"""
Build authentication header for use with provisioning system
"""
global service_url, ps_access_token, ps_refresh_token, ps_token_exp
headers = None
if ps_access_token:
# Check if Refresh Needed
if time.time() > ps_token_exp:
host = "https://ps." + service_url + "/api/org_token/refresh/"
rqst = {"refresh": ps_refresh_token}
hdrs = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + ps_access_token}
rsps = session.post(host, data=json.dumps(rqst), headers=hdrs, timeout=request_timeout).json()
ps_refresh_token = rsps["refresh"]
ps_access_token = rsps["access"]
ps_token_exp = time.time() + (float(rsps["access_lifetime"]) / 2)
# Build Authentication Header
headers = {'Authorization': 'Bearer ' + ps_access_token}
return headers


###############################################################################
# Overriding DNS
Expand Down Expand Up @@ -430,7 +407,7 @@ def __dnsoverridden():
def __jamdns():
global service_url, service_org, request_timeout
url = service_org + "." + service_url
headers = __build_auth_header()
headers = buildauthheader()
host = "https://ps." + service_url + "/api/org_ip_adr/" + service_org + "/"
rsps = session.get(host, headers=headers, timeout=request_timeout).json()
if rsps["status"] == "SUCCESS":
Expand Down Expand Up @@ -505,6 +482,29 @@ def __arrowrec(rec):
# INTERNAL APIs
###############################################################################

#
# buildauthheader
#
def buildauthheader(force_refresh=False):
"""
Build authentication header for use with provisioning system
"""
global service_url, ps_access_token, ps_refresh_token, ps_token_exp
headers = None
if ps_access_token:
# Check if Refresh Needed
if time.time() > ps_token_exp or force_refresh:
host = "https://ps." + service_url + "/api/org_token/refresh/"
rqst = {"refresh": ps_refresh_token}
hdrs = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + ps_access_token}
rsps = session.post(host, data=json.dumps(rqst), headers=hdrs, timeout=request_timeout).json()
ps_refresh_token = rsps["refresh"]
ps_access_token = rsps["access"]
ps_token_exp = time.time() + (float(rsps["access_lifetime"]) / 2)
# Build Authentication Header
headers = {'Authorization': 'Bearer ' + ps_access_token}
return headers

#
# GeoDataFrame to Polygon
#
Expand Down Expand Up @@ -734,7 +734,7 @@ def source (api, parm={}, stream=False, callbacks={}, path="/source", silence=Fa
# Construct Request URL and Authorization
if service_org:
url = 'https://%s.%s%s/%s' % (service_org, service_url, path, api)
headers = __build_auth_header()
headers = buildauthheader()
else:
url = 'http://%s%s/%s' % (service_url, path, api)
# Perform Request
Expand Down Expand Up @@ -903,7 +903,7 @@ def update_available_servers (desired_nodes=None, time_to_live=None):
if type(desired_nodes) == int:
rsps_body = {}
requested_nodes = desired_nodes
headers = __build_auth_header()
headers = buildauthheader()

# Get boundaries of cluster and calculate nodes to request
try:
Expand Down Expand Up @@ -995,12 +995,12 @@ def scaleout(desired_nodes, time_to_live, bypass_dns):
#
# authenticate
#
def authenticate (ps_organization, ps_username=None, ps_password=None):
def authenticate (ps_organization, ps_username=None, ps_password=None, github_token=None):
'''
Authenticate to SlideRule Provisioning System
The username and password can be provided the following way in order of priority:
(1) The passed in arguments `ps_username' and 'ps_password';
(2) The O.S. environment variables 'PS_USERNAME' and 'PS_PASSWORD';
(1) The passed in arguments `github_token` or `ps_username` and `ps_password`;
(2) The O.S. environment variables `PS_GITHUB_TOKEN` or `PS_USERNAME` and `PS_PASSWORD`;
(3) The `ps.<url>` entry in the .netrc file in your home directory
Parameters
Expand All @@ -1013,6 +1013,10 @@ def authenticate (ps_organization, ps_username=None, ps_password=None):
ps_password: str
SlideRule provisioning system account password
github_token: str
GitHub access token (minimum scope/permissions require)
Returns
-------
status
Expand All @@ -1036,12 +1040,13 @@ def authenticate (ps_organization, ps_username=None, ps_password=None):
return True

# attempt retrieving from environment
if not ps_username or not ps_password:
if not github_token and not ps_username and not ps_password:
github_token = os.environ.get("PS_GITHUB_TOKEN")
ps_username = os.environ.get("PS_USERNAME")
ps_password = os.environ.get("PS_PASSWORD")

# attempt retrieving from netrc file
if not ps_username or not ps_password:
if not github_token and not ps_username and not ps_password:
try:
netrc_file = netrc.netrc()
login_credentials = netrc_file.hosts[ps_url]
Expand All @@ -1051,12 +1056,22 @@ def authenticate (ps_organization, ps_username=None, ps_password=None):
if ps_organization != PUBLIC_ORG:
logger.warning("Unable to retrieve username and password from netrc file for machine: {}".format(e))

# authenticate to provisioning system
if ps_username and ps_password:
# build authentication request
user = None
if github_token:
user = "github"
rqst = {"org_name": ps_organization, "access_token": github_token}
headers = {'Content-Type': 'application/json'}
api = "https://" + ps_url + "/api/org_token_github/"
elif ps_username or ps_password:
user = "local"
rqst = {"username": ps_username, "password": ps_password, "org_name": ps_organization}
headers = {'Content-Type': 'application/json'}
api = "https://" + ps_url + "/api/org_token/"

# authenticate to provisioning system
if user:
try:
api = "https://" + ps_url + "/api/org_token/"
rsps = session.post(api, data=json.dumps(rqst), headers=headers, timeout=request_timeout)
rsps.raise_for_status()
rsps = rsps.json()
Expand All @@ -1066,7 +1081,7 @@ def authenticate (ps_organization, ps_username=None, ps_password=None):
login_status = True
except:
if ps_organization != PUBLIC_ORG:
logger.error("Unable to authenticate user %s to %s" % (ps_username, api))
logger.error("Unable to authenticate %s user to %s" % (user, api))

# return login status
return login_status
Expand Down
7 changes: 7 additions & 0 deletions clients/python/tests/test_provisioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ def test_authenticate(self, domain, organization):
status = sliderule.authenticate(organization)
assert status

def test_refresh(self, domain, organization):
sliderule.set_url(domain)
status = sliderule.authenticate(organization)
headers = sliderule.buildauthheader(force_refresh=True)
assert status
assert len(headers['Authorization']) > 8

def test_num_nodes_update(self, domain, organization):
sliderule.set_url(domain)
status = sliderule.authenticate(organization)
Expand Down
17 changes: 8 additions & 9 deletions packages/aws/CredentialStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class CredentialStore
*--------------------------------------------------------------------*/

static const int STARTING_STORE_SIZE = 8;
static const int MAX_KEY_SIZE = 2048;

static const char* LIBRARY_NAME;
static const char* EXPIRATION_GPS_METRIC;
Expand Down Expand Up @@ -132,7 +131,7 @@ class CredentialStore
}
}
const char* access_key_id_str = LuaObject::getLuaString(L, -1);
accessKeyId = StringLib::duplicate(access_key_id_str, MAX_KEY_SIZE);
accessKeyId = StringLib::duplicate(access_key_id_str);
lua_pop(L, 1);

/* Get Secret Access Key */
Expand All @@ -146,7 +145,7 @@ class CredentialStore
}
}
const char* secret_access_key_str = LuaObject::getLuaString(L, -1);
secretAccessKey = StringLib::duplicate(secret_access_key_str, MAX_KEY_SIZE);
secretAccessKey = StringLib::duplicate(secret_access_key_str);
lua_pop(L, 1);

/* Get Session Token */
Expand All @@ -160,7 +159,7 @@ class CredentialStore
}
}
const char* session_token_str = LuaObject::getLuaString(L, -1);
sessionToken = StringLib::duplicate(session_token_str, MAX_KEY_SIZE);
sessionToken = StringLib::duplicate(session_token_str);
lua_pop(L, 1);

/* Get Expiration Date */
Expand All @@ -170,7 +169,7 @@ class CredentialStore
lua_getfield(L, index, EXPIRATION_STR1);
}
const char* expiration_str = LuaObject::getLuaString(L, -1, true, NULL);
expiration = StringLib::duplicate(expiration_str, MAX_KEY_SIZE);
expiration = StringLib::duplicate(expiration_str);
if(expiration) expirationGps = TimeLib::str2gpstime(expiration);
else expirationGps = 0;
lua_pop(L, 1);
Expand Down Expand Up @@ -226,10 +225,10 @@ class CredentialStore

void copy (const Credential& c) {
provided = c.provided;
accessKeyId = StringLib::duplicate(c.accessKeyId, MAX_KEY_SIZE);
secretAccessKey = StringLib::duplicate(c.secretAccessKey, MAX_KEY_SIZE);
sessionToken = StringLib::duplicate(c.sessionToken, MAX_KEY_SIZE);
expiration = StringLib::duplicate(c.expiration, MAX_KEY_SIZE);
accessKeyId = StringLib::duplicate(c.accessKeyId);
secretAccessKey = StringLib::duplicate(c.secretAccessKey);
sessionToken = StringLib::duplicate(c.sessionToken);
expiration = StringLib::duplicate(c.expiration);
expirationGps = c.expirationGps;
}
};
Expand Down
4 changes: 2 additions & 2 deletions packages/core/CsvDispatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ CsvDispatch::CsvDispatch (lua_State* L, const char* outq_name, const char** _col
}

/* Send Out Header Row */
int len = StringLib::size(hdrrow, MAX_STR_SIZE) + 1;
int len = StringLib::size(hdrrow) + 1;
outQ->postCopy(hdrrow, len, SYS_TIMEOUT);
}

Expand Down Expand Up @@ -172,7 +172,7 @@ bool CsvDispatch::processRecord (RecordObject* record, okey_t key, recVec_t* rec
}

/* Send Out Row */
int len = StringLib::size(valrow, MAX_STR_SIZE) + 1;
int len = StringLib::size(valrow) + 1;
int status = outQ->postCopy(valrow, len, SYS_TIMEOUT);

/* Check and Return Status */
Expand Down
2 changes: 1 addition & 1 deletion packages/core/EndpointObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,5 @@ int EndpointObject::buildheader (char hdr_str[MAX_HDR_SIZE], code_t code, const

StringLib::concat(hdr_str, "\r\n", MAX_HDR_SIZE);

return StringLib::size(hdr_str, MAX_HDR_SIZE);
return StringLib::size(hdr_str);
}
4 changes: 2 additions & 2 deletions packages/core/EventLib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,9 @@ void EventLib::generateMetric (event_level_t lvl, const char* name, metric_subty
/* Initialize Log Message */
event.systime = TimeLib::gpstime();
event.tid = Thread::getId();
event.id = subtype;
event.id = ORIGIN;
event.parent = ORIGIN;
event.flags = 0;
event.flags = subtype;
event.type = METRIC;
event.level = lvl;

Expand Down
3 changes: 3 additions & 0 deletions packages/core/EventLib.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
#define stop_trace(lvl,id,...) {(void)lvl; (void)id;}
#endif

#define count_metric(lvl,name,value) EventLib::generateMetric(lvl,name,EventLib::COUNTER,value)
#define gauge_metric(lvl,name,value) EventLib::generateMetric(lvl,name,EventLib::GAUGE,value)

/******************************************************************************
* EVENT LIBRARY CLASS
******************************************************************************/
Expand Down
8 changes: 4 additions & 4 deletions packages/core/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ HttpClient::HttpClient(lua_State* L, const char* url):
// Parse URL
char url_buf[MAX_URL_LEN];
StringLib::copy(url_buf, url, MAX_URL_LEN);
char* proto_term = StringLib::find(url_buf, "://", MAX_URL_LEN);
char* proto_term = StringLib::find(url_buf, "://");
if(proto_term)
{
char* proto = url_buf;
char* _ip_addr = proto_term + 3;
*proto_term = '\0';
if((_ip_addr - proto) < MAX_URL_LEN)
{
char* ip_addr_term = StringLib::find(_ip_addr, ":", MAX_URL_LEN);
char* ip_addr_term = StringLib::find(_ip_addr, ":");
if(ip_addr_term)
{
char* _port_str = ip_addr_term + 1;
Expand Down Expand Up @@ -232,8 +232,8 @@ bool HttpClient::makeRequest (EndpointObject::verb_t verb, const char* resource,
int content_length = 0;
if(data)
{
content_length = StringLib::size(data, MAX_RQST_BUF_LEN);
if(content_length == MAX_RQST_BUF_LEN)
content_length = StringLib::size(data);
if(content_length >= MAX_RQST_BUF_LEN)
{
throw RunTimeException(ERROR, RTE_ERROR, "data exceeds maximum allowed size: %d > %d", content_length, MAX_RQST_BUF_LEN);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/HttpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
#include "EndpointObject.h"

/******************************************************************************
* HTTP SERVER CLASS
* HTTP CLIENT CLASS
******************************************************************************/

class HttpClient: public LuaObject
Expand Down
2 changes: 1 addition & 1 deletion packages/core/HttpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ int HttpServer::onWrite(int fd)
/* Write Chunk Header - HTTP */
unsigned long chunk_size = state->ref.size > 0 ? state->ref.size : 0;
StringLib::format((char*)state->stream_buf, STREAM_OVERHEAD_SIZE, "%lX\r\n", chunk_size);
state->stream_buf_size = StringLib::size((const char*)state->stream_buf, state->stream_mem_size);
state->stream_buf_size = StringLib::size((const char*)state->stream_buf);

if(state->ref.size > 0)
{
Expand Down
9 changes: 7 additions & 2 deletions packages/core/LuaEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ void* LuaEndpoint::requestThread (void* parm)
EndpointObject::info_t* info = (EndpointObject::info_t*)parm;
EndpointObject::Request* request = info->request;
LuaEndpoint* lua_endpoint = dynamic_cast<LuaEndpoint*>(info->endpoint);
double start = TimeLib::latchtime();

/* Get Request Script */
const char* script_pathname = LuaEngine::sanitize(request->resource);
Expand Down Expand Up @@ -230,6 +231,10 @@ void* LuaEndpoint::requestThread (void* parm)
/* End Response */
rspq->postCopy("", 0);

/* Generate Metric for Endpoint */
double duration = TimeLib::latchtime() - start;
gauge_metric(INFO, request->resource, duration);

/* Clean Up */
delete rspq;
delete [] script_pathname;
Expand All @@ -238,7 +243,7 @@ void* LuaEndpoint::requestThread (void* parm)

/* Stop Trace */
stop_trace(INFO, trace_id);

/* Return */
return NULL;
}
Expand Down Expand Up @@ -284,7 +289,7 @@ void LuaEndpoint::normalResponse (const char* scriptpath, Request* request, Publ
const char* result = engine->getResult();
if(result)
{
int result_length = StringLib::size(result, MAX_SOURCED_RESPONSE_SIZE);
int result_length = StringLib::size(result);
int header_length = buildheader(header, OK, "text/plain", result_length, NULL, serverHead.c_str());
rspq->postCopy(header, header_length);
rspq->postCopy(result, result_length);
Expand Down
1 change: 0 additions & 1 deletion packages/core/LuaEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class LuaEndpoint: public EndpointObject
static const double DEFAULT_NORMAL_REQUEST_MEMORY_THRESHOLD;
static const double DEFAULT_STREAM_REQUEST_MEMORY_THRESHOLD;

static const int MAX_SOURCED_RESPONSE_SIZE = 0x8000000; // 128M
static const int MAX_RESPONSE_TIME_MS = 5000;
static const int MAX_EXCEPTION_TEXT_SIZE = 256;
static const char* LUA_RESPONSE_QUEUE;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/LuaEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ LuaEngine::LuaEngine(const char* script, const char* arg, uint32_t trace_id, lua
dInfo = new directThread_t;
dInfo->engine = this;
dInfo->script = StringLib::duplicate(script);
dInfo->arg = StringLib::duplicate(arg, 0);
dInfo->arg = StringLib::duplicate(arg);

/* Start Script Thread */
engineActive = false;
Expand Down
Loading

0 comments on commit 29de90f

Please sign in to comment.