Skip to content
This repository has been archived by the owner on Sep 29, 2020. It is now read-only.

Commit

Permalink
Merge pull request #99 from m-lab/development
Browse files Browse the repository at this point in the history
Merge changes to processing of BigQuery, unit tests, etc to bring up to v1.2 (see release notes)
  • Loading branch information
collina committed Feb 22, 2016
2 parents d0fe87e + e1d8583 commit 434f0b8
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 351 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ install:
- pip install -r test-requirements.txt
- pip install coveralls
- pip install yapf==0.5.0
- pip install pyflakes==1.0.0
script:
# Run unit tests and calculate code coverage.
- coverage run --source telescope -m unittest discover
# Check that source has correct formatting.
- yapf --diff --recursive --style google ./
# Run static analysis for Python bugs/cruft.
- pyflakes telescope/*.py tests/*.py
after_success:
coveralls

Expand Down
1 change: 1 addition & 0 deletions VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.2
4 changes: 3 additions & 1 deletion dev/pre-commit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@
# limitations under the License.

# Block the commit if it breaks unit tests or formatting
python -m unittest discover && yapf --diff --recursive --style google ./
python -m unittest discover && \
yapf --diff --recursive --style google ./ && \
pyflakes telescope/*.py tests/*.py
8 changes: 4 additions & 4 deletions documentation/selector-file-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ client_providers: ['twc']
},
"sites": ["lga01"],
"client_providers": ["Verizon"],
"start_time": "2014-07-01T00:00:00Z"
"start_times": ["2014-07-01T00:00:00Z"]
}
```

Expand All @@ -67,7 +67,7 @@ client_providers: ['twc']

`params`: Specifies the parameters to the IP translation strategy.

`db_snapshots`: Specifies the snapshot dates (in YYYY-MM-DD format) of the MaxMind databases that are required to resolve IP addresses to providers. This field is optional. If not specified or specified as an empty list, consuming programs should use database snapshots closest in time to the snapshots specified and should suppress warnings to the user about missing snapshots.
`db_snapshots`: Specifies the snapshot dates (in YYYY-MM-DD format) of the MaxMind databases that are required to resolve IP addresses to providers.

`sites` _(optional)_: A list of M-Lab sites, where each value in the list is an M-Lab site name (e.g. lga01). Telescope will retrieve results for NDT tests that users performed against each of the specified M-Lab sites.

Expand All @@ -80,14 +80,14 @@ client_providers: ['twc']

`client_countries` _(optional)_: A list of ISO 3166-1 alpha-2 country code(s) associated with the IP address of the measurement client, as recorded within BigQuery.

`start_times`: List of start times of the window in which to collect test results (in ISO 8601 format). Start time values must end in `Z` (i.e. only UTC time zone is supported).
`start_times`: List of start times of the window in which to collect test results (in ISO 8601 format). Start time values must end in `Z` (i.e. only UTC time zone is supported) and the date and time must be separated by T. For example a start time of 2:00 am on Jan 5, 2014 would be formatted "2014-01-05T02:00:00Z".

# Changelog

## As of version 1.1

* Added optional `client_countries` property.
* The properties `metric`, `client_provider` and `site` are now represented by the lists `metrics`, `client_providers` and `sites`.
* The properties `metric`, `client_provider`, `start_time` and `site` are now represented by the lists `metrics`, `client_providers`, `start_times` and `sites`.
* Made `client_providers` and `sites` optional.

# Deprecated
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-api-python-client
google-api-python-client==1.3.2
python-dateutil
164 changes: 77 additions & 87 deletions telescope/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.

import datetime
import httplib
import httplib2
import logging
import os
Expand All @@ -25,16 +26,16 @@

from apiclient.discovery import build
from apiclient.errors import HttpError
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage
from oauth2client.tools import run_flow

from httplib import ResponseNotReady

class BigQueryError(Exception):
pass

class BigQueryJobFailure(Exception):

class BigQueryJobFailure(BigQueryError):
"""Indicates that a BigQuery job's result was retrieved, but the query failed.
Raised when BigQuery reports a job has failed. Additional attempts to
Expand All @@ -43,10 +44,10 @@ class BigQueryJobFailure(Exception):

def __init__(self, http_code, cause):
self.code = http_code
Exception.__init__(self, cause)
super(BigQueryJobFailure, self).__init__(cause)


class BigQueryCommunicationError(Exception):
class BigQueryCommunicationError(BigQueryError):
"""An error occurred trying to communicate with BigQuery
This error is raised when the application fails to communicate with BigQuery.
Expand All @@ -61,16 +62,16 @@ def __init__(self, message, cause):
'%s (%s)' % (message, self.cause))


class TableDoesNotExist(Exception):
class TableDoesNotExist(BigQueryError):

def __init__(self):
Exception.__init__(self)
super(TableDoesNotExist, self).__init__()


class APIConfigError(Exception):
class APIConfigError(BigQueryError):

def __init__(self):
Exception.__init__(self)
super(APIConfigError, self).__init__()


class GoogleAPIAuthConfig:
Expand Down Expand Up @@ -113,7 +114,7 @@ def authenticate_with_google(self):
flags=GoogleAPIAuthConfig,
http=http)
self.logger.info(
"Successfully authenticated with Google, moving on to building query.")
'Successfully authenticated with Google, moving on to building query.')

http = credentials.authorize(http)

Expand Down Expand Up @@ -290,54 +291,45 @@ def _parse_query_results_response(self, results_response):
return parsed_rows, page_token


class BigQueryCall:
def get_authenticated_service(google_auth_config):
try:
authenticated_service = google_auth_config.authenticate_with_google()
except (SSLError, HttpError, httplib2.ServerNotFoundError,
httplib.ResponseNotReady) as e:
raise BigQueryCommunicationError(
'Failed to communicate with BigQuery during authentication', e)

def __init__(self, google_auth_config):
self.logger = logging.getLogger('telescope')
return authenticated_service

try:
self.authenticated_service = (
google_auth_config.authenticate_with_google())
self.project_id = google_auth_config.project_id
except (SSLError, AttributeError, HttpError,
httplib2.ServerNotFoundError, ResponseNotReady) as e:
raise BigQueryCommunicationError(None, e)

class BigQueryCall(object):

def __init__(self, authenticated_service, project_id):
self.logger = logging.getLogger('telescope')
self._authenticated_service = authenticated_service
self._project_id = project_id

def retrieve_job_data(self, job_id):
result_collector = BigQueryJobResultCollector(
self.authenticated_service.jobs(), self.project_id)
self._authenticated_service.jobs(), self._project_id)
return result_collector.collect_results(job_id)

def run_asynchronous_query(self, query_string, batch_mode=False):
def run_asynchronous_query(self, query_string):
job_reference_id = None

if self.project_id is None:
self.logger.error(
'Cannot continue since I have not found a project id.')
return None

try:
job_collection = self.authenticated_service.jobs()
job_collection = self._authenticated_service.jobs()
job_definition = {
'configuration': {'query': {'query': query_string}}
}

if batch_mode is True:
job_definition['configuration']['query']['priority'] = 'BATCH'

job_collection_insert = job_collection.insert(
projectId=self.project_id,
projectId=self._project_id,
body=job_definition).execute()
job_reference_id = job_collection_insert['jobReference']['jobId']
except (HttpError, ResponseNotReady) as caught_http_error:
self.logger.error(
'HTTP error when running asynchronous query: {error}'.format(
error=caught_http_error.resp))
except (Exception,
httplib2.ServerNotFoundError) as caught_generic_error:
self.logger.error(
'Unknown error when running asynchronous query: {error}'.format(
error=caught_generic_error))
except (HttpError, httplib.ResponseNotReady) as e:
raise BigQueryCommunicationError(
'Failed to communicate with BigQuery', e)

return job_reference_id

Expand All @@ -349,49 +341,47 @@ def monitor_query_queue(self,

query_object = query_object or self

if self.project_id is not None:
started_checking = datetime.datetime.utcnow()

notification_identifier = ', '.join(filter(None,
job_metadata.values()))
self.logger.info('Queued request for %s, received job id: %s',
notification_identifier, job_id)

while True:
try:
job_collection = query_object.authenticated_service.jobs()
job_collection_state = job_collection.get(
projectId=self.project_id,
jobId=job_id).execute()
except (SSLError, Exception, AttributeError, HttpError,
httplib2.ServerNotFoundError) as caught_error:
self.logger.warn(
'Encountered error (%s) monitoring for %s, could '
'be temporary, not bailing out.', caught_error,
notification_identifier)
job_collection_state = None

if job_collection_state is not None:
time_waiting = int((datetime.datetime.utcnow() -
started_checking).total_seconds())

if job_collection_state['status']['state'] == 'RUNNING':
self.logger.info(
'Waiting for %s to complete, spent %d seconds so '
'far.', notification_identifier, time_waiting)
time.sleep(10)
elif job_collection_state['status']['state'] == 'PENDING':
self.logger.info(
'Waiting for %s to submit, spent %d seconds so '
'far.', notification_identifier, time_waiting)
time.sleep(60)
elif (
(job_collection_state['status']['state'] == 'DONE') and
callback_function is not None):
self.logger.info('Found completion status for %s.',
notification_identifier)
callback_function(job_id, query_object=self)
break
else:
raise Exception('UnknownBigQueryResponse')
started_checking = datetime.datetime.utcnow()

notification_identifier = ', '.join(filter(None, job_metadata.values()))
self.logger.info('Queued request for %s, received job id: %s',
notification_identifier, job_id)

while True:
try:
job_collection = query_object._authenticated_service.jobs()
job_collection_state = job_collection.get(
projectId=self._project_id,
jobId=job_id).execute()
except (SSLError, Exception, AttributeError, HttpError,
httplib2.ServerNotFoundError) as caught_error:
self.logger.warn(
'Encountered error (%s) monitoring for %s, could '
'be temporary, not bailing out.', caught_error,
notification_identifier)
job_collection_state = None

if job_collection_state is not None:
time_waiting = int((datetime.datetime.utcnow() -
started_checking).total_seconds())

if job_collection_state['status']['state'] == 'RUNNING':
self.logger.info(
'Waiting for %s to complete, spent %d seconds so '
'far.', notification_identifier, time_waiting)
time.sleep(10)
elif job_collection_state['status']['state'] == 'PENDING':
self.logger.info(
'Waiting for %s to submit, spent %d seconds so '
'far.', notification_identifier, time_waiting)
time.sleep(60)
elif (
(job_collection_state['status']['state'] == 'DONE') and
callback_function is not None):
self.logger.info('Found completion status for %s.',
notification_identifier)
callback_function(job_id, query_object=self)
break
else:
raise Exception('UnknownBigQueryResponse')
return None
Loading

0 comments on commit 434f0b8

Please sign in to comment.