Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicte keys creation #228

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions operate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def pause_all_services_on_startup() -> None:
for service in service_hashes:
if not operate.service_manager().exists(service=service):
continue
deployment = operate.service_manager().create_or_load(service).deployment
deployment = operate.service_manager().load_or_create(service).deployment
if deployment.status == DeploymentStatus.DELETED:
continue
logger.info(f"stopping service {service}")
Expand Down Expand Up @@ -515,7 +515,7 @@ async def _create_services(request: Request) -> JSONResponse:
old_hash = manager.json[0]["hash"]
if old_hash == template["hash"]:
logger.info(f'Loading service {template["hash"]}')
service = manager.create_or_load(
service = manager.load_or_create(
hash=template["hash"],
rpc=template["configuration"]["rpc"],
on_chain_user_params=services.manage.OnChainUserParams.from_json(
Expand All @@ -536,7 +536,7 @@ async def _create_services(request: Request) -> JSONResponse:
update = True
else:
logger.info(f'Creating service {template["hash"]}')
service = manager.create_or_load(
service = manager.load_or_create(
hash=template["hash"],
rpc=template["configuration"]["rpc"],
on_chain_user_params=services.manage.OnChainUserParams.from_json(
Expand All @@ -553,7 +553,7 @@ async def _create_services(request: Request) -> JSONResponse:
schedule_healthcheck_job(service=service.hash)

return JSONResponse(
content=operate.service_manager().create_or_load(hash=service.hash).json
content=operate.service_manager().load_or_create(hash=service.hash).json
)

@app.put("/api/services")
Expand Down Expand Up @@ -587,7 +587,7 @@ async def _get_service(request: Request) -> JSONResponse:
return JSONResponse(
content=(
operate.service_manager()
.create_or_load(
.load_or_create(
hash=request.path_params["service"],
)
.json
Expand All @@ -611,7 +611,7 @@ async def _deploy_service_onchain(request: Request) -> JSONResponse:
return JSONResponse(
content=(
operate.service_manager()
.create_or_load(hash=request.path_params["service"])
.load_or_create(hash=request.path_params["service"])
.json
)
)
Expand All @@ -636,7 +636,7 @@ async def _stop_service_onchain(request: Request) -> JSONResponse:
return JSONResponse(
content=(
operate.service_manager()
.create_or_load(hash=request.path_params["service"])
.load_or_create(hash=request.path_params["service"])
.json
)
)
Expand All @@ -649,7 +649,7 @@ async def _get_service_deployment(request: Request) -> JSONResponse:
return service_not_found_error(service=request.path_params["service"])
return JSONResponse(
content=operate.service_manager()
.create_or_load(
.load_or_create(
request.path_params["service"],
)
.deployment.json
Expand All @@ -663,7 +663,7 @@ async def _build_service_locally(request: Request) -> JSONResponse:
return service_not_found_error(service=request.path_params["service"])
deployment = (
operate.service_manager()
.create_or_load(
.load_or_create(
request.path_params["service"],
)
.deployment
Expand All @@ -685,7 +685,7 @@ async def _start_service_locally(request: Request) -> JSONResponse:
manager.deploy_service_locally(hash=service, force=True)
schedule_funding_job(service=service)
schedule_healthcheck_job(service=service.hash)
return JSONResponse(content=manager.create_or_load(service).deployment)
return JSONResponse(content=manager.load_or_create(service).deployment)

@app.post("/api/services/{service}/deployment/stop")
@with_retries
Expand All @@ -694,7 +694,7 @@ async def _stop_service_locally(request: Request) -> JSONResponse:
if not operate.service_manager().exists(service=request.path_params["service"]):
return service_not_found_error(service=request.path_params["service"])
service = request.path_params["service"]
deployment = operate.service_manager().create_or_load(service).deployment
deployment = operate.service_manager().load_or_create(service).deployment
deployment.stop()
logger.info(f"Cancelling funding job for {service}")
cancel_funding_job(service=service)
Expand All @@ -709,7 +709,7 @@ async def _delete_service_locally(request: Request) -> JSONResponse:
# TODO: Drain safe before deleting service
deployment = (
operate.service_manager()
.create_or_load(
.load_or_create(
request.path_params["service"],
)
.deployment
Expand Down
59 changes: 29 additions & 30 deletions operate/services/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_eth_safe_tx_builder(self, service: Service) -> EthSafeTxBuilder:
contracts=CONTRACTS[service.ledger_config.chain],
)

def create_or_load(
def load_or_create(
self,
hash: str,
rpc: t.Optional[str] = None,
Expand Down Expand Up @@ -163,14 +163,23 @@ def create_or_load(
"On-chain user parameters cannot be None when creating a new service"
)

return Service.new(
service = Service.new(
hash=hash,
keys=keys or [],
rpc=rpc,
storage=self.path,
on_chain_user_params=on_chain_user_params,
)

if not service.keys:
service.keys = [
self.keys_manager.get(self.keys_manager.create())
for _ in range(service.helper.config.number_of_agents)
]
service.store()

return service

def deploy_service_onchain( # pylint: disable=too-many-statements
self,
hash: str,
Expand All @@ -183,12 +192,9 @@ def deploy_service_onchain( # pylint: disable=too-many-statements
:param update: Update the existing deployment
"""
self.logger.info("Loading service")
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
user_params = service.chain_data.user_params
keys = service.keys or [
self.keys_manager.get(self.keys_manager.create())
for _ in range(service.helper.config.number_of_agents)
]
keys = service.keys
instances = [key.address for key in keys]
ocm = self.get_on_chain_manager(service=service)
if user_params.use_staking and not ocm.staking_slots_available(
Expand Down Expand Up @@ -296,7 +302,6 @@ def deploy_service_onchain( # pylint: disable=too-many-statements
),
)
service.chain_data.on_chain_state = OnChainState.REGISTERED
service.keys = keys
service.store()

info = ocm.info(token_id=service.chain_data.token)
Expand All @@ -317,7 +322,6 @@ def deploy_service_onchain( # pylint: disable=too-many-statements
service.store()

info = ocm.info(token_id=service.chain_data.token)
service.keys = keys
service.chain_data = OnChainData(
token=service.chain_data.token,
instances=info["instances"],
Expand All @@ -340,12 +344,9 @@ def deploy_service_onchain_from_safe( # pylint: disable=too-many-statements,too
:param update: Update the existing deployment
"""
self.logger.info("Loading service")
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
user_params = service.chain_data.user_params
keys = service.keys or [
self.keys_manager.get(self.keys_manager.create())
for _ in range(service.helper.config.number_of_agents)
]
keys = service.keys
instances = [key.address for key in keys]
wallet = self.wallet_manager.load(service.ledger_config.type)
sftxb = self.get_eth_safe_tx_builder(service=service)
Expand Down Expand Up @@ -536,7 +537,6 @@ def deploy_service_onchain_from_safe( # pylint: disable=too-many-statements,too
)
).settle()
service.chain_data.on_chain_state = OnChainState.REGISTERED
service.keys = keys
service.store()

info = sftxb.info(token_id=service.chain_data.token)
Expand All @@ -554,7 +554,6 @@ def deploy_service_onchain_from_safe( # pylint: disable=too-many-statements,too
service.store()

info = sftxb.info(token_id=service.chain_data.token)
service.keys = keys
service.chain_data = OnChainData(
token=service.chain_data.token,
instances=info["instances"],
Expand All @@ -571,7 +570,7 @@ def terminate_service_on_chain(self, hash: str) -> None:

:param hash: Service hash
"""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
ocm = self.get_on_chain_manager(service=service)
info = ocm.info(token_id=service.chain_data.token)
service.chain_data.on_chain_state = OnChainState(info["service_state"])
Expand All @@ -598,7 +597,7 @@ def terminate_service_on_chain_from_safe(self, hash: str) -> None:

:param hash: Service hash
"""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
sftxb = self.get_eth_safe_tx_builder(service=service)
info = sftxb.info(token_id=service.chain_data.token)
service.chain_data.on_chain_state = OnChainState(info["service_state"])
Expand All @@ -622,7 +621,7 @@ def unbond_service_on_chain(self, hash: str) -> None:

:param hash: Service hash
"""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
ocm = self.get_on_chain_manager(service=service)
info = ocm.info(token_id=service.chain_data.token)
service.chain_data.on_chain_state = OnChainState(info["service_state"])
Expand All @@ -649,7 +648,7 @@ def unbond_service_on_chain_from_safe(self, hash: str) -> None:

:param hash: Service hash
"""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
sftxb = self.get_eth_safe_tx_builder(service=service)
info = sftxb.info(token_id=service.chain_data.token)
service.chain_data.on_chain_state = OnChainState(info["service_state"])
Expand All @@ -673,7 +672,7 @@ def stake_service_on_chain(self, hash: str) -> None:

:param hash: Service hash
"""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
if not service.chain_data.user_params.use_staking:
self.logger.info("Cannot stake service, `use_staking` is set to false")
return
Expand Down Expand Up @@ -718,7 +717,7 @@ def stake_service_on_chain_from_safe(self, hash: str) -> None:

:param hash: Service hash
"""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
if not service.chain_data.user_params.use_staking:
self.logger.info("Cannot stake service, `use_staking` is set to false")
return
Expand Down Expand Up @@ -775,7 +774,7 @@ def unstake_service_on_chain(self, hash: str) -> None:

:param hash: Service hash
"""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
if not service.chain_data.user_params.use_staking:
self.logger.info("Cannot unstake service, `use_staking` is set to false")
return
Expand Down Expand Up @@ -808,7 +807,7 @@ def unstake_service_on_chain_from_safe(self, hash: str) -> None:

:param hash: Service hash
"""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
if not service.chain_data.user_params.use_staking:
self.logger.info("Cannot unstake service, `use_staking` is set to false")
return
Expand Down Expand Up @@ -848,7 +847,7 @@ def fund_service( # pylint: disable=too-many-arguments
from_safe: bool = True,
) -> None:
"""Fund service if required."""
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
wallet = self.wallet_manager.load(ledger_type=service.ledger_config.type)
ledger_api = wallet.ledger_api(chain_type=service.ledger_config.chain, rpc=rpc)
agent_fund_threshold = (
Expand Down Expand Up @@ -902,7 +901,7 @@ async def funding_job(
) -> None:
"""Start a background funding job."""
loop = loop or asyncio.get_event_loop()
service = self.create_or_load(hash=hash)
service = self.load_or_create(hash=hash)
with ThreadPoolExecutor() as executor:
while True:
try:
Expand Down Expand Up @@ -957,7 +956,7 @@ def deploy_service_locally(self, hash: str, force: bool = True) -> Deployment:
:param force: Remove previous deployment and start a new one.
:return: Deployment instance
"""
deployment = self.create_or_load(hash=hash).deployment
deployment = self.load_or_create(hash=hash).deployment
deployment.build(force=force)
deployment.start()
return deployment
Expand All @@ -970,7 +969,7 @@ def stop_service_locally(self, hash: str, delete: bool = False) -> Deployment:
:param delete: Delete local deployment.
:return: Deployment instance
"""
deployment = self.create_or_load(hash=hash).deployment
deployment = self.load_or_create(hash=hash).deployment
deployment.stop()
if delete:
deployment.delete()
Expand All @@ -985,7 +984,7 @@ def update_service(
from_safe: bool = True, # pylint: disable=unused-argument
) -> Service:
"""Update a service."""
old_service = self.create_or_load(
old_service = self.load_or_create(
hash=old_hash,
)
# TODO code for updating service commented until safe swap transaction is implemented
Expand Down Expand Up @@ -1032,7 +1031,7 @@ def update_service(
# owner_key=str(self.keys_manager.get(key=owner).private_key), # noqa: E800
# ) # noqa: E800

new_service = self.create_or_load(
new_service = self.load_or_create(
hash=new_hash,
rpc=rpc or old_service.ledger_config.rpc,
on_chain_user_params=on_chain_user_params
Expand Down
Loading