Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Decompose spreadsheet jobs into one per row #3797

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions app/jobs/description_import_row_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# frozen_string_literal: true

class DescriptionImportRowJob < ApplicationJob
include Dry::Monads[:result]

##
# A job that allows a user to make descriptive updates from a CSV file
# @param [Integer] bulk_action_id GlobalID for a BulkAction object
# @param [Hash] params additional parameters that an Argo job may need
# @option params [String] :csv_file file
# @option params [String] :csv_filename the name of the file
def perform(csv_row:, headers:, row_num:, bulk_action:, groups:)
druid = csv_row.fetch('druid')
cocina_object = Repository.find(druid)
BulkJobLog.open(bulk_action.log_name) do |log|
success = lambda { |message|
byebug

bulk_action.with_lock do
bulk_action.increment!(:druid_count_success)
end
log.puts("Line #{row_num}: #{message} for #{druid} (#{Time.current})")
}
failure = lambda { |message|
bulk_action.with_lock do
bulk_action.increment!(:druid_count_fail)
end
log.puts("Line #{row_num}: #{message} for #{druid} (#{Time.current})")
}

user = bulk_action.user
# Since a user doesn't persist its groups, we need to pass the groups in here.
user.set_groups_to_impersonate(groups)
ability = Ability.new(user)
return failure.call('Not authorized') unless ability.can?(:update, cocina_object)


DescriptionImport.import(csv_row:, headers:)
.bind { |description| validate_changed(cocina_object, description) }
.bind { |description| open_version(cocina_object, description, user) }
.bind { |description, new_cocina_object| validate_and_save(new_cocina_object, description) }
.bind { |new_cocina_object| close_version(new_cocina_object) }
.either(
->(_updated) { success.call('Successfully updated') },
->(messages) { failure.call(messages.to_sentence) }
)
rescue => e
byebug
end
end

private

def validate_changed(cocina_object, description)
return Failure(['Description unchanged']) if cocina_object.description == description

Success(description)
end

def open_version(cocina_object, description, user)
cocina_object = open_new_version_if_needed(cocina_object, 'Descriptive metadata upload', user)

Success([description, cocina_object])
rescue RuntimeError => e
Failure([e.message])
end

def validate_and_save(cocina_object, description)
result = CocinaValidator.validate_and_save(cocina_object, description:)
return Success(cocina_object) if result.success?

Failure(["validate_and_save failed for #{cocina_object.externalIdentifier}"])
end

def close_version(cocina_object)
VersionService.close(identifier: cocina_object.externalIdentifier) unless StateService.new(cocina_object).object_state == :unlock_inactive
Success()
rescue RuntimeError => e
Failure([e.message])
end

# Opens a new minor version of the provided cocina object.
# @param [Cocina::Models::DROWithMetadata|CollectionWithMetadata|AdminPolicyWithMetadata]
# @param [String] description for new version
# @returns [Cocina::Models::DROWithMetadata|CollectionWithMetadata|AdminPolicyWithMetadata] cocina object with the new version
def open_new_version(cocina_object, description, user)
wf_status = DorObjectWorkflowStatus.new(cocina_object.externalIdentifier, version: cocina_object.version)
raise 'Unable to open new version' unless wf_status.can_open_version?

VersionService.open(identifier: cocina_object.externalIdentifier,
significance: 'minor',
description:,
opening_user_name: user.to_s)
end

# Opens a new minor version of the provided cocina object unless the object is already open for modification.
# @param [Cocina::Models::DROWithMetadata|CollectionWithMetadata|AdminPolicyWithMetadata]
# @param [String] description for new version
# @returns [Cocina::Models::DROWithMetadata|CollectionWithMetadata|AdminPolicyWithMetadata] cocina object with the new/existing version
def open_new_version_if_needed(cocina_object, description, user)
state_service = StateService.new(cocina_object)
return cocina_object if state_service.allows_modification?

open_new_version(cocina_object, description, user)
end
end

58 changes: 16 additions & 42 deletions app/jobs/descriptive_metadata_import_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,48 +13,22 @@ class DescriptiveMetadataImportJob < GenericJob
def perform(bulk_action_id, params)
super
csv = CSV.parse(params[:csv_file], headers: true)
with_csv_items(csv, name: 'Import descriptive metadata', filename: params[:csv_filename]) do |cocina_object, csv_row, success, failure|
next failure.call('Not authorized') unless ability.can?(:update, cocina_object)

DescriptionImport.import(csv_row:)
.bind { |description| validate_changed(cocina_object, description) }
.bind { |description| open_version(cocina_object, description) }
.bind { |description, new_cocina_object| validate_and_save(new_cocina_object, description) }
.bind { |new_cocina_object| close_version(new_cocina_object) }
.either(
->(_updated) { success.call('Successfully updated') },
->(messages) { failure.call(messages.to_sentence) }
)
druid_column = 'druid'
update_druid_count(count: csv.size)
with_bulk_action_log do |log|
log.puts("CSV filename: #{params[:csv_filename]}")
return unless check_druid_column(csv:, druid_column:, log:, bulk_action:)

csv.each.with_index(2) do |csv_row, row_num|
druid = csv_row.fetch(druid_column)
DescriptionImportRowJob.perform_later(
csv_row: csv_row.to_h,
headers: csv_row.headers.excluding('source_id', 'druid'),
row_num:,
bulk_action:,
groups:
)
end
end
end

private

def validate_changed(cocina_object, description)
return Failure(['Description unchanged']) if cocina_object.description == description

Success(description)
end

def open_version(cocina_object, description)
cocina_object = open_new_version_if_needed(cocina_object, 'Descriptive metadata upload')

Success([description, cocina_object])
rescue RuntimeError => e
Failure([e.message])
end

def validate_and_save(cocina_object, description)
result = CocinaValidator.validate_and_save(cocina_object, description:)
return Success(cocina_object) if result.success?

Failure(["validate_and_save failed for #{cocina_object.externalIdentifier}"])
end

def close_version(cocina_object)
VersionService.close(identifier: cocina_object.externalIdentifier) unless StateService.new(cocina_object).object_state == :unlock_inactive
Success()
rescue RuntimeError => e
Failure([e.message])
end
end
8 changes: 6 additions & 2 deletions app/services/cocina_validator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ def self.validate(model, **args)

def self.validate_and_save(model, **args)
validate(model, **args).bind do |updated|
Try[Dor::Services::Client::UnexpectedResponse] { Repository.store(updated) }
Try[Dor::Services::Client::UnexpectedResponse] {
byebug
Repository.store(updated) }
.to_result
.or { |e| Failure(e.errors.map { |err| err['detail'] }) }
.or { |e|
byebug
Failure(e.errors.map { |err| err['detail'] }) }
end
end
end
16 changes: 7 additions & 9 deletions app/services/description_import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,21 @@
class DescriptionImport
include Dry::Monads[:result]

def self.import(csv_row:)
new(csv_row:).import
# The source_id and druid are only there for the user to reference and should be ignored for data processing
# druid is only on the bulk sheet
def self.import(csv_row:, headers: csv_row.headers.excluding('source_id', 'druid'))
new(csv_row:, headers:).import
end

def initialize(csv_row:)
def initialize(csv_row:, headers:)
@csv_row = csv_row
@headers = headers.sort_by { |address| sortable_address(address) }
end

def import
params = {}

# The source_id and druid are only there for the user to reference and should be ignored for data processing
# druid is only on the bulk sheet
headers = @csv_row.headers.excluding('source_id', 'druid')
headers.sort_by! { |address| sortable_address(address) }

headers.each do |address|
@headers.each do |address|
visit(params, split_address(address), @csv_row[address]) if @csv_row[address]
end

Expand Down
1 change: 1 addition & 0 deletions spec/jobs/descriptive_metadata_import_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
let(:state_service) { instance_double(StateService, allows_modification?: true, object_state: :unlock) }

before do
Rails.application.config.active_job.queue_adapter = :inline
allow(BulkJobLog).to receive(:open).and_yield(log_buffer)
allow(subject).to receive(:bulk_action).and_return(bulk_action)
allow(Repository).to receive(:find).with(druids[0]).and_return(item1)
Expand Down