Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBEX] create UpdateDocumentsStatus and Polling services #17460

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions lib/lighthouse/benefits_documents/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ class Configuration < Common::Client::Configuration::REST
SYSTEM_NAME = 'VA.gov'
API_SCOPES = %w[documents.read documents.write].freeze
DOCUMENTS_PATH = 'services/benefits-documents/v1/documents'
DOCUMENTS_STATUS_PATH = 'services/benefits-documents/v1/uploads/status'
TOKEN_PATH = 'oauth2/benefits-documents/system/v1/token'
QA_TESTING_DOMAIN = 'https://dev-api.va.gov'

##
# @return [Config::Options] Settings for benefits_claims API.
Expand Down Expand Up @@ -95,13 +97,28 @@ def generate_upload_body(document_data, file_body)
payload
end

def get_documents_status(lighthouse_document_request_ids)
headers = {
'Authorization' => "Bearer #{documents_status_access_token}",
'Content-Type' => 'application/json'
}

body = {
data: {
requestIds: lighthouse_document_request_ids
}
}.to_json

documents_status_api_connection.post(DOCUMENTS_STATUS_PATH, body, headers)
end

##
# Creates a Faraday connection with parsing json and breakers functionality.
#
# @return [Faraday::Connection] a Faraday connection instance.
#
def connection
@conn ||= Faraday.new(base_api_path, headers: base_request_headers, request: request_options) do |faraday|
def connection(api_path = base_api_path)
@conn ||= Faraday.new(api_path, headers: base_request_headers, request: request_options) do |faraday|
faraday.use :breakers
faraday.use Faraday::Response::RaiseError

Expand Down Expand Up @@ -152,5 +169,15 @@ def token_service(lighthouse_client_id, lighthouse_rsa_key_path, aud_claim_url =
url, API_SCOPES, lighthouse_client_id, aud_claim_url, lighthouse_rsa_key_path, 'benefits-documents'
)
end

def documents_status_access_token
# Lighthouse requires the documents status endpoint be tested on the QA testing domain
ENV['RAILS_ENV'] == 'test' ? access_token(nil, nil, { host: QA_TESTING_DOMAIN }) : access_token
end

def documents_status_api_connection
# Lighthouse requires the documents status endpoint be tested on the QA testing domain
ENV['RAILS_ENV'] == 'test' ? connection(QA_TESTING_DOMAIN) : connection
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

require 'common/client/base'
require 'lighthouse/benefits_documents/configuration'

module BenefitsDocuments
module Form526
class DocumentsStatusPollingService < Common::Client::Base
configuration BenefitsDocuments::Configuration

def self.call(args)
new(args).check_documents_status
end

def initialize(document_request_ids)
@document_request_ids = document_request_ids
super()
end

def check_documents_status
fetch_documents_status
end

private

def fetch_documents_status
config.get_documents_status(@document_request_ids)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# frozen_string_literal: true

require 'lighthouse/benefits_documents/form526/documents_status_polling_service'
require 'lighthouse/benefits_documents/form526/upload_status_updater'

module BenefitsDocuments
module Form526
class UpdateDocumentsStatusService
# Queries the Lighthouse Benefits Documents API's '/uploads/status' endpoint
# to check the progression of Lighthouse526DocumentUpload records submitted to Lighthouse.
#
# Once submitted to Lighthouse, they are forwarded to VBMS and, if successful, to BGS as well.
# All steps must complete before a document is considered to have been successfully processed.
# Whenever a document is submitted to Lighthouse, a 'request_id' corresponding to that document is returned,
# which is saved as lighthouse_document_request_id on a Lighthouse526DocumentUpload record.
#
# Lighthouse provides an endpoint, '/uploads/status', which takes an array of request ids and ressponds with
# a detailed JSON representiation of each document's upload progress to VBMS and BGS, including failures.
# This service class uses those data to update the status of each Lighthouse526DocumentUpload record,
# and log success, failure and timeout metrics to StatsD, where the data are used to drive dashboards.
#
# Documentation for Lighthouse's Benefits Documents API and '/uploads/status' endpoint is available here:
# https://dev-developer.va.gov/explore/api/benefits-documents/docs?version=current

STATSD_BASE_KEY = 'api.form_526.lighthouse_document_upload_processing_status'
STATSD_DOCUMENT_COMPLETE_KEY = 'complete'
STATSD_DOCUMENT_FAILED_KEY = 'failed'
STATSD_PROCESSING_TIMEOUT_KEY = 'processing_timeout'
STATSD_DOCUMENT_TYPE_KEY_MAP = {
Lighthouse526DocumentUpload::VETERAN_UPLOAD_DOCUMENT_TYPE => 'veteran_upload',
Lighthouse526DocumentUpload::BDD_INSTRUCTIONS_DOCUMENT_TYPE => 'bdd_instructions',
Lighthouse526DocumentUpload::FORM_0781_DOCUMENT_TYPE => 'form_0781',
Lighthouse526DocumentUpload::FORM_0781A_DOCUMENT_TYPE => 'form_0781a'
}.freeze

def self.call(*)
new(*).call
end

# @param lighthouse526_document_uploads [Lighthouse526DocumentUpload] a collection of
# Lighthouse526DocumentUpload records polled for status updates on Lighthouse's '/uploads/status' endpoint
def initialize(lighthouse526_document_uploads, lighthouse_status_response)
@lighthouse526_document_uploads = lighthouse526_document_uploads
@lighthouse_status_response = lighthouse_status_response
end

def call
update_documents_status
end

private

def update_documents_status
@lighthouse_status_response.dig('data', 'statuses').each do |status|
update_document_status(status)
end
end

def update_document_status(status)
document_upload = @lighthouse526_document_uploads.find_by!(lighthouse_document_request_id: status['requestId'])

# UploadStatusUpdater encapsulates all parsing of a status response from Lighthouse
status_updater = BenefitsDocuments::Form526::UploadStatusUpdater.new(status, document_upload)
status_updater.update_status

statsd_document_type_key = STATSD_DOCUMENT_TYPE_KEY_MAP[document_upload.document_type]
statsd_document_base_key = "#{STATSD_BASE_KEY}.#{statsd_document_type_key}"

if document_upload.completed?
# ex. 'api.form_526.lighthouse_document_upload_processing_status.bdd_instructions.complete'
StatsD.increment("#{statsd_document_base_key}.#{STATSD_DOCUMENT_COMPLETE_KEY}")
elsif document_upload.failed?
# Because Lighthouse's processing steps are subject to change, these metrics must be dyanmic.
# Currently, this should return either claims_evidence or benefits_gateway_service
failure_step_key = status_updater.get_failure_step.downcase

# ex. 'api.form_526.lighthouse_document_upload_processing_status.bdd_instructions.failed.claims_evidence'
StatsD.increment("#{statsd_document_base_key}.#{STATSD_DOCUMENT_FAILED_KEY}.#{failure_step_key}")
elsif status_updater.processing_timeout?
# Triggered when a document is still pending more than 24 hours after processing began
# ex. 'api.form_526.lighthouse_document_upload_processing_status.bdd_instructions.processing_timeout'
StatsD.increment("#{statsd_document_base_key}.#{STATSD_PROCESSING_TIMEOUT_KEY}")
end
end
end
end
end
132 changes: 132 additions & 0 deletions lib/lighthouse/benefits_documents/form526/upload_status_updater.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# frozen_string_literal: true

module BenefitsDocuments
module Form526
class UploadStatusUpdater
# Parses the status of a Lighthouse526DocumentUpload submitted to Lighthouse
# using metadata from the Lighthouse Benefits Documents API '/uploads/status' endpoint.
# Provides methods to determine if a document has completed all steps or failed,
# abstracting away the details of the Lighthouse status data structure.
#
# Additionally, updates the state of a Lighthouse526DocumentUpload in vets-api to reflect
# the current status of a document as it transitions from Lighthouse > VBMS > BGS
#
# Documentation on the Lighthouse '/uploads/status' endpoint is available here:
# https://dev-developer.va.gov/explore/api/benefits-documents/docs?version=current

LIGHTHOUSE_DOCUMENT_COMPLETE_STATUS = 'SUCCESS'
LIGHTHOUSE_DOCUMENT_FAILED_STATUS = 'FAILED'
PROCESSING_TIMEOUT_WINDOW_IN_HOURS = 24

# @param lighthouse526_document_status [Hash] includes a single document's status progress
# after it has been submitted to Lighthouse, while Lighthouse attempts to pass it on to
# VBMS and then BGS. These data come from Lighthouse's '/uploads/status' endpoint.
#
# @param lighthouse526_document_upload [Lighthouse526DocumentUpload] the VA.gov record of the document
# submitted to Lighthouse for tracking.
#
# example lighthouse526_document_status hash:
# {
# "requestId": 600000001,
# "time": {
# "startTime": 1502199000,
# "endTime": 1502199000
# },
# "status": "IN_PROGRESS",
# "steps": [
# {
# "name": "BENEFITS_GATEWAY_SERVICE",
# "nextStepName": "BENEFITS_GATEWAY_SERVICE",
# "description": "string",
# "status": "NOT_STARTED"
# }
# ],
# "error": {
# "detail": "string",
# "step": "BENEFITS_GATEWAY_SERVICE"
# }
# }
#

def initialize(lighthouse526_document_status, lighthouse526_document_upload)
@lighthouse526_document_status = lighthouse526_document_status
@lighthouse526_document_upload = lighthouse526_document_upload
end

def update_status
# Only save an upload's status if it has transitioned since the last Lighthouse poll
return unless status_changed?

# Ensure start time and latest status response from API are saved, regardless if document is still in progress
@lighthouse526_document_upload.update!(
lighthouse_processing_started_at: start_time,
last_status_response: @lighthouse526_document_status
)

log_status

finalize_upload if completed? || failed?

@lighthouse526_document_upload.update!(status_last_polled_at: DateTime.now.utc)
end

def get_failure_step
return unless failed? && @lighthouse526_document_status['error']

@lighthouse526_document_status['error']['step']
end

# Returns true if document is still processing in Lighthouse, and initiated more than a set number of hours ago
def processing_timeout?
return false if @lighthouse526_document_status.dig('time', 'endTime')

start_time < PROCESSING_TIMEOUT_WINDOW_IN_HOURS.hours.ago.utc
end

private

def status_changed?
@lighthouse526_document_status != @lighthouse526_document_upload.last_status_response
end

# Lighthouse returns date times as UNIX timestamps in milliseconds
def start_time
unix_start_time = @lighthouse526_document_status.dig('time', 'startTime')
Time.at(unix_start_time).utc.to_datetime
end

def end_time
unix_end_time = @lighthouse526_document_status.dig('time', 'endTime')
Time.at(unix_end_time).utc.to_datetime
end

def failed?
@lighthouse526_document_status['status'] == LIGHTHOUSE_DOCUMENT_FAILED_STATUS
end

def completed?
@lighthouse526_document_status['status'] == LIGHTHOUSE_DOCUMENT_COMPLETE_STATUS
end

def log_status
Rails.logger.info(
'BenefitsDocuments::Form526::UploadStatusUpdater',
status: @lighthouse526_document_status['status'],
status_response: @lighthouse526_document_status,
updated_at: DateTime.now.utc
)
end

def finalize_upload
@lighthouse526_document_upload.update!(lighthouse_processing_ended_at: end_time)

if completed?
@lighthouse526_document_upload.complete!
else
@lighthouse526_document_upload.update!(error_message: @lighthouse526_document_status['error'])
@lighthouse526_document_upload.fail!
end
end
end
end
end
Loading
Loading