Skip to content

Commit

Permalink
feat(decider): retrigger worker-spell on deal update [NET-649] (#148)
Browse files Browse the repository at this point in the history
* feat(decider): retrigger worker-spell on deal update
  • Loading branch information
justprosh authored Dec 14, 2023
1 parent e3ba478 commit ff9f826
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 52 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
/.vscode/settings.json
/src/ts/src/aqua
/src/js/src/aqua
/src/air
10 changes: 7 additions & 3 deletions src/aqua/chain/changed_deals.aqua
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
module ChangedDeals declares poll_deal_changes_batch

import Spell, TriggerConfig from "@fluencelabs/spell/spell_service.aqua"
import PeerSpell from "@fluencelabs/spell/api.aqua"

import deal_log, spell_log, get_string from "../fluence/spell.aqua"
import Json from "../fluence/peer.aqua"
import ChainInfo from "../chain/chain.aqua"
import JoinedDeal, JsonDealState, DealState, store_deal_state, get_deal_state from "../decider/deal_storage.aqua"
import SpellId, DealId from "../types.aqua"
import WORKER_DEF_CID from "../decider/consts.aqua"
import WorkerSettings from "../fluence/worker.aqua"

import DealInfo, DealChangesReq, ChainConnector, DealChangedResult, DealChanged from "services.aqua"

Expand Down Expand Up @@ -46,7 +48,8 @@ func get_deal_changes(spell_id: SpellId, api_endpoint: string, joined: []JoinedD
<- changes

-- Update app_cid of the deal on the corresponding worker spell
func update_worker(spell_id: SpellId, deal_info: DealInfo, log: DealChanged):
-- and retrigger spell to update the worker sooner
func update_worker(spell_id: SpellId, deal_info: DealInfo, log: DealChanged, settings: WorkerSettings):
deal_id = deal_info.deal_id
worker_id = deal_info.worker_id
app_cid <- Json.stringify(log.info.app_cid)
Expand All @@ -55,6 +58,7 @@ func update_worker(spell_id: SpellId, deal_info: DealInfo, log: DealChanged):
-- stringify app_cid to be able to use it as an argument of a spell
Spell "worker-spell"
Spell.set_string(WORKER_DEF_CID, app_cid)
PeerSpell.update_trigger_config(spell_id, settings.config)

-- Move left boundary of the deal's block range to be `min(right boundary, latest_block) + 1`
func move_left_boundary(spell_id: SpellId, deal_id: DealId, right_boundary: string, latest: string):
Expand All @@ -64,7 +68,7 @@ func move_left_boundary(spell_id: SpellId, deal_id: DealId, right_boundary: stri
if left != nil:
store_deal_state(spell_id, deal_id, DealState(left_boundary = left!))

func poll_deal_changes_batch(spell_id: SpellId, chain: ChainInfo, deals: []JoinedDeal, latest_block: string):
func poll_deal_changes_batch(spell_id: SpellId, chain: ChainInfo, deals: []JoinedDeal, latest_block: string, settings: WorkerSettings):
Spell spell_id

changes <- get_deal_changes(spell_id, chain.api_endpoint, deals)
Expand All @@ -76,7 +80,7 @@ func poll_deal_changes_batch(spell_id: SpellId, chain: ChainInfo, deals: []Joine
if change.log != nil:
deal_log(spell_id, deal_id, ["found a deal changed log on block", change.log!.block_number])
-- Update app_cid of the deal on the corresponding worker spell
update_worker(spell_id, change.deal_info, change.log!)
update_worker(spell_id, change.deal_info, change.log!, settings)

-- Move block range so that log's block becomes new left boundary
log_block = change.log!.block_number
Expand Down
10 changes: 5 additions & 5 deletions src/aqua/chain/new_deals.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import ChainConnector, DealMatched from "services.aqua"

import ChainInfo from "../chain/chain.aqua"
import deal_log, spell_log, get_string from "../fluence/spell.aqua"
import is_worker_created from "../fluence/worker.aqua"
import is_worker_created, WorkerSettings from "../fluence/worker.aqua"
import Json from "../fluence/peer.aqua"
import join_deal from "../decider/join_deal.aqua"
import JoinedDeal, store_deal, store_installation_failed_deal from "../decider/deal_storage.aqua"
Expand Down Expand Up @@ -49,13 +49,13 @@ func get_left_boundary(spell_id: SpellId, chain: ChainInfo) -> ?string:
spell_log(spell_id, ["left boundary will be", left])
<- left

func join_deals(chain: ChainInfo, spell_id: SpellId, logs: []DealMatched, left: string):
func join_deals(chain: ChainInfo, spell_id: SpellId, logs: []DealMatched, left: string, settings: WorkerSettings):
Spell spell_id

-- TODO: I assume that `logs` are sorted by `block_number`. Is that a correct assumption?
for log <- logs:
deal_id = log.info.deal_id
worker_id, error <- join_deal(spell_id, log.block_number, deal_id, log.info.app_cid)
worker_id, error <- join_deal(spell_id, log.block_number, deal_id, log.info.app_cid, settings)
if worker_id == nil:
store_installation_failed_deal(spell_id, deal_id, log, error!)
else:
Expand Down Expand Up @@ -112,14 +112,14 @@ func poll_logs(spell_id: SpellId, chain: ChainInfo, left: string) -> ?Poll:

<- poll

func poll_new_deals(spell_id: SpellId, chain: ChainInfo, joined_deals: []JoinedDeal, latest_block: string):
func poll_new_deals(spell_id: SpellId, chain: ChainInfo, joined_deals: []JoinedDeal, latest_block: string, settings: WorkerSettings):
-- retrieve block number to poll from
left <- get_left_boundary(spell_id, chain)
if left != nil:
poll <- poll_logs(spell_id, chain, left!)
if poll != nil:
new_deals <- filter_new(spell_id, poll!.logs, joined_deals)
join_deals(chain, spell_id, new_deals, left!)
join_deals(chain, spell_id, new_deals, left!, settings)

-- after we have processed all logs from poll, we can conclude
-- that we have seen all logs until `min(latest_block, right_boundary)`
Expand Down
34 changes: 14 additions & 20 deletions src/aqua/decider/join_deal.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import PeerSpell from "@fluencelabs/spell/api.aqua"
import Spell, TriggerConfig from "@fluencelabs/spell/spell_service.aqua"

import deal_log from "../fluence/spell.aqua"
import get_worker_settings, WorkerArgs, WorkerSettings from "../fluence/worker.aqua"
import WorkerArgs, WorkerSettings from "../fluence/worker.aqua"
import DealState from "../decider/deal_storage.aqua"
import DealId, SpellId, WorkerId, CID, Block from "../types.aqua"
import Json from "../fluence/peer.aqua"
Expand All @@ -22,7 +22,7 @@ func install_deal_spell(deal_id: DealId, cid: CID, settings: WorkerSettings) ->

-- TODO: what should happen if join_deal has failed? We might never receive it again
-- Probably should store failed deals, and then retry them a few times?
func join_deal(spell_id: SpellId, block: string, deal_id: DealId, app_cid: CID) -> ?WorkerId, ?string:
func join_deal(spell_id: SpellId, block: string, deal_id: DealId, app_cid: CID, settings: WorkerSettings) -> ?WorkerId, ?string:
log = (msg: ⊤):
deal_log(spell_id, deal_id, msg)

Expand All @@ -38,25 +38,19 @@ func join_deal(spell_id: SpellId, block: string, deal_id: DealId, app_cid: CID)
log(["error creating worker", e])

if worker_id != nil:
settings <- get_worker_settings(spell_id)
if settings != nil:
on worker_id!:
on worker_id!:
try:
try:
try:
deal_spell <- Srv.resolve_alias("worker-spell")
log(["resolved existing worker-spell", deal_spell])
otherwise:
deal_spell <- install_deal_spell(deal_id, app_cid, settings!)
log(["created deal spell", deal_spell])
catch e:
error <<- Json.stringify(["error installing deal spell", e])
log(["error installing deal spell", e])
deal_spell <- Srv.resolve_alias("worker-spell")
log(["resolved existing worker-spell", deal_spell])
otherwise:
deal_spell <- install_deal_spell(deal_id, app_cid, settings)
log(["created deal spell", deal_spell])
catch e:
error <<- Json.stringify(["cannot create deal spell", e.message])
log(["cannot create deal spell, deal join failed", e.message])
else:
error <<- Json.stringify(["error reading worker settings"])
log(["error reading worker settings, deal join failed"])

error <<- Json.stringify(["error installing deal spell", e])
log(["error installing deal spell", e])
catch e:
error <<- Json.stringify(["cannot create deal spell", e.message])
log(["cannot create deal spell, deal join failed", e.message])

<- worker_id, error
10 changes: 5 additions & 5 deletions src/aqua/decider/poll.aqua
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import SpellId from "../types.aqua"
import ChainInfo from "../chain/chain.aqua"
import poll_new_deals from "../chain/new_deals.aqua"
import poll_deal_changes_batch from "../chain/changed_deals.aqua"
import spell_log from "../fluence/spell.aqua"
import get_joined_deals from "../decider/deal_storage.aqua"
import poll_mailbox from "../decider/mailbox.aqua"
import poll_txs from "./register_worker.aqua"
import store_sync_info from "./sync_info.aqua"
import poll_deal_statuses from "../chain/deal_status.aqua"
import WorkerSettings from "./fluence/worker.aqua"

use "../chain/blocks.aqua" as Blocks

func main(spell_id: SpellId, chain: ChainInfo):
func main(spell_id: SpellId, chain: ChainInfo, worker_settings: WorkerSettings):
-- Get the latest block number from chain
latest_block <- Blocks.get_latest(spell_id, chain)

Expand All @@ -27,12 +27,12 @@ func main(spell_id: SpellId, chain: ChainInfo):

try:
-- Find new deals and create workers
poll_new_deals(spell_id, chain, joined_deals, latest_block!)
poll_new_deals(spell_id, chain, joined_deals, latest_block!, worker_settings)

try:
-- Update existing deals
poll_deal_changes_batch(spell_id, chain, joined_deals, latest_block!)
poll_deal_changes_batch(spell_id, chain, joined_deals, latest_block!, worker_settings)

try:
poll_deal_statuses(spell_id, chain.api_endpoint, joined_deals)

Expand Down
20 changes: 1 addition & 19 deletions src/aqua/fluence/worker.aqua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Worker declares WorkerArgs, WorkerSettings, get_worker_settings, is_worker_created, create, remove
module Worker declares WorkerArgs, WorkerSettings, is_worker_created, create, remove

import TriggerConfig, Spell from "@fluencelabs/spell/spell_service.aqua"
import PeerId from "@fluencelabs/aqua-lib/builtin.aqua"
Expand Down Expand Up @@ -27,24 +27,6 @@ data WorkerSettings:
-- IPFS API for the worker to pull info from
ipfs: string

service JsonWorkerSettings("json"):
parse(str: string) -> WorkerSettings

-- Parse worker-settings from the decider's KV
func get_worker_settings(spell_id: SpellId) -> ?WorkerSettings:
Spell spell_id

settings: *WorkerSettings
-- TODO: I want to unite these guys to one structure but it won't be backward-compatible.
settings_str <- get_string(spell_id, "worker_settings")
if settings_str != nil:
try:
settings <<- JsonWorkerSettings.parse(settings_str!)
catch e:
spell_log(spell_id, ["can't parse worker_settings", e, " original string:", settings_str!])

<- settings

-- Check if a worker for a deal is already created
func is_worker_created(spell_id: SpellId, deal_id: DealId) -> bool:
created: *bool
Expand Down

0 comments on commit ff9f826

Please sign in to comment.