Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple POC for cross-back-end process graph processing #95

Closed
soxofaan opened this issue Mar 8, 2023 · 5 comments
Closed

Simple POC for cross-back-end process graph processing #95

soxofaan opened this issue Mar 8, 2023 · 5 comments

Comments

@soxofaan
Copy link
Member

soxofaan commented Mar 8, 2023

Follow up of #53 to have a basic POC implementation of a simple system that can split a process graph and execute it on two/multiple back-ends.

First step: instead of trying to build the whole system in the aggregator, it's maybe simpler to first build a POC that does the splitting and scheduling client side

@m-mohr
Copy link
Member

m-mohr commented Mar 10, 2023

Wouldn't be a POC on the server-side be more or less the same work? I assume the code would be very similar, wouldn't it?

I'm also wondering whether we should check which use case we could use for this. It needs data being available on two back-ends and the same processes so that we can actually test it. Maybe that's the first step?

@soxofaan
Copy link
Member Author

Wouldn't be a POC on the server-side be more or less the same work? I assume the code would be very similar, wouldn't it?

I'm indeed developing this while keeping in mind that it should port relatively easy from client-side to the aggregator.
The thing is that cross-backend processing has a scheduling aspect to it: you can only start one job when it's data "dependencies" from another backend (to be loaded with load_result) are available. That's relatively easy to do client side with a dumb poll-sleep loop. On the back-end you don't want to do this, but instead build/use a real scheduling system, which is a lot less trivial.

I'm also wondering whether we should check which use case we could use for this. It needs data being available on two back-ends and the same processes so that we can actually test it. Maybe that's the first step?

For now, I'm just aiming for a minimal "dummy" use case: load_collection on a couple of upstream back-ends and a merge_cubes (or mask) to combine them.

soxofaan added a commit that referenced this issue Mar 13, 2023
more explicit distinction between processes (pg+metadata) and process graphs (flat graph representation)
soxofaan added a commit that referenced this issue Mar 17, 2023
more explicit distinction between processes (pg+metadata) and process graphs (flat graph representation)
soxofaan added a commit that referenced this issue Mar 20, 2023
more explicit distinction between processes (pg+metadata) and process graphs (flat graph representation)
soxofaan added a commit that referenced this issue Mar 22, 2023
more explicit distinction between processes (pg+metadata) and process graphs (flat graph representation)
soxofaan added a commit to Open-EO/openeo-python-driver that referenced this issue Mar 23, 2023
Triggered from debugging load_result in openeo-geopyspark-driver for Open-EO/openeo-aggregator#95
soxofaan added a commit to Open-EO/openeo-geopyspark-driver that referenced this issue Mar 23, 2023
@soxofaan
Copy link
Member Author

soxofaan commented Mar 23, 2023

merged first iteration of POC now in master: 3a1ee85

see POC script at https://github.com/Open-EO/openeo-aggregator/blob/3a1ee8582fca9a9e1c9315708ed43680fc23ee1b/scripts/crossbackend-processing-poc.py

  • starts with simple process graph with two load_collection nodes (at the moment with two VITO based collections, but splitting can still be enforced)
  • process graph is split in two sub jobs (one for each load_collection)
    • one, e.g. called "vito:lc2", that isolates one load_collection, directly followed by a save_result
    • the "main" one that loads the result of the other one with load_result
  • subjobs are scheduled/managed:
    • taking dependencies into account ("main" job depends on "vito:lc2"): e.g. the process graph of the main job can only be made concrete when the an actual batch job is created for the "vito:lc2" subjob
    • when the "vito:lc2" batch job finishes with success, the "main" job can be started

as illustration, excerpt of the logs of a run of this POC script:

INFO:crossbackend-poc:Connecting to openeocloud-dev.vito.be: start 2023-03-23 15:28:38.156842
INFO:crossbackend-poc:Connecting to openeocloud-dev.vito.be: end 2023-03-23 15:29:18.725504, elapsed 0:00:40.568662
Authenticated using refresh token.
INFO:openeo_aggregator.partitionedjobs.crossbackend:Extracted backend usage from `load_collection` nodes: Counter({'vito': 2})
INFO:openeo_aggregator.partitionedjobs.crossbackend:Backend split: primary_backend='vito' secondary_backends=set()
INFO:crossbackend-poc:Partitioned job: PartitionedJob(process={'process_graph': {'lc1': {'process_id': 'load_collection', 'arguments': {'id': 'TERRASCOPE_S2_TOC_V2', 'temporal_extent': ['2022-09-01', '2022-09-10'], 'spatial_extent': {'west': 3, 'south': 51, 'east': 3.1, 'north': 51.1}, 'bands': ['B02', 'B03']}}, 'lc2': {'process_id': 'load_collection', 'arguments': {'id': 'TERRASCOPE_S2_TOC_V2', 'temporal_extent': ['2022-09-01', '2022-09-10'], 'spatial_extent': {'west': 3, 'south': 51, 'east': 3.1, 'north': 51.1}, 'bands': ['B04']}}, 'mc1': {'process_id': 'merge_cubes', 'arguments': {'cube1': {'from_node': 'lc1'}, 'cube2': {'from_node': 'lc2'}}}, 'sr1': {'process_id': 'save_result', 'arguments': {'data': {'from_node': 'mc1'}, 'format': 'NetCDF'}, 'result': True}}}, metadata=None, job_options=None, subjobs={'main': SubJob(process_graph={'lc1': {'process_id': 'load_collection', 'arguments': {'id': 'TERRASCOPE_S2_TOC_V2', 'temporal_extent': ['2022-09-01', '2022-09-10'], 'spatial_extent': {'west': 3, 'south': 51, 'east': 3.1, 'north': 51.1}, 'bands': ['B02', 'B03']}}, 'lc2': {'process_id': 'load_result', 'arguments': {'id': '_placeholder:vito:lc2'}}, 'mc1': {'process_id': 'merge_cubes', 'arguments': {'cube1': {'from_node': 'lc1'}, 'cube2': {'from_node': 'lc2'}}}, 'sr1': {'process_id': 'save_result', 'arguments': {'data': {'from_node': 'mc1'}, 'format': 'NetCDF'}, 'result': True}}, backend_id='vito'), 'vito:lc2': SubJob(process_graph={'lc2': {'process_id': 'load_collection', 'arguments': {'id': 'TERRASCOPE_S2_TOC_V2', 'temporal_extent': ['2022-09-01', '2022-09-10'], 'spatial_extent': {'west': 3, 'south': 51, 'east': 3.1, 'north': 51.1}, 'bands': ['B04']}}, 'sr1': {'process_id': 'save_result', 'arguments': {'data': {'from_node': 'lc2'}, 'format': 'GTiff'}, 'result': True}}, backend_id='vito')}, dependencies={'main': ['vito:lc2']})
INFO:crossbackend-poc:Running partitioned job: start 2023-03-23 15:29:39.400190
INFO:openeo_aggregator.partitionedjobs.crossbackend:subjob dependencies: {'main': ['vito:lc2']}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Initial states: {'main': 'waiting', 'vito:lc2': 'waiting'}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 1, elapsed: 0:00:00.000040
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': waiting
INFO:openeo_aggregator.partitionedjobs.crossbackend:Dependency states for subjob_id='main': {'waiting'}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': waiting
INFO:openeo_aggregator.partitionedjobs.crossbackend:Dependency states for subjob_id='vito:lc2': set()
INFO:openeo_aggregator.partitionedjobs.crossbackend:No unfulfilled dependencies: ready to start subjob_id='vito:lc2'
INFO:openeo_aggregator.partitionedjobs.crossbackend:Starting new batch job for subjob 'vito:lc2' on backend 'vito'
INFO:openeo_aggregator.partitionedjobs.crossbackend:Started batch job 'vito-j-f1126612df78496caab555628ad96ba5' for subjob 'vito:lc2'
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'waiting', 'vito:lc2': 'running'} state_stats=Counter({'waiting': 1, 'running': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Going to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 2, elapsed: 0:01:32.512253
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': waiting
INFO:openeo_aggregator.partitionedjobs.crossbackend:Dependency states for subjob_id='main': {'running'}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Upstream status for subjob 'vito:lc2' (batch job 'vito-j-f1126612df78496caab555628ad96ba5'): queued
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'waiting', 'vito:lc2': 'running'} state_stats=Counter({'waiting': 1, 'running': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Going to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 3, elapsed: 0:02:04.353245
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': waiting
INFO:openeo_aggregator.partitionedjobs.crossbackend:Dependency states for subjob_id='main': {'running'}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Upstream status for subjob 'vito:lc2' (batch job 'vito-j-f1126612df78496caab555628ad96ba5'): queued
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'waiting', 'vito:lc2': 'running'} state_stats=Counter({'waiting': 1, 'running': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Going to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 4, elapsed: 0:03:04.261397
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': waiting
INFO:openeo_aggregator.partitionedjobs.crossbackend:Dependency states for subjob_id='main': {'running'}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Upstream status for subjob 'vito:lc2' (batch job 'vito-j-f1126612df78496caab555628ad96ba5'): finished
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'waiting', 'vito:lc2': 'finished'} state_stats=Counter({'waiting': 1, 'finished': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:No time to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 5, elapsed: 0:03:04.825470
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': waiting
INFO:openeo_aggregator.partitionedjobs.crossbackend:Dependency states for subjob_id='main': {'finished'}
INFO:openeo_aggregator.partitionedjobs.crossbackend:No unfulfilled dependencies: ready to start subjob_id='main'
INFO:openeo_aggregator.partitionedjobs.crossbackend:resolve_dependencies: replace placeholder 'vito:lc2' with concrete 'vito-j-f1126612df78496caab555628ad96ba5'
INFO:openeo_aggregator.partitionedjobs.crossbackend:Starting new batch job for subjob 'main' on backend 'vito'
INFO:openeo_aggregator.partitionedjobs.crossbackend:Started batch job 'vito-j-05d619897904467ca928d4f42fae17ff' for subjob 'main'
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': finished
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'running', 'vito:lc2': 'finished'} state_stats=Counter({'running': 1, 'finished': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>, 'main': <BatchJob job_id='vito-j-05d619897904467ca928d4f42fae17ff'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Going to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 6, elapsed: 0:04:35.360122
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Upstream status for subjob 'main' (batch job 'vito-j-05d619897904467ca928d4f42fae17ff'): queued
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': finished
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'running', 'vito:lc2': 'finished'} state_stats=Counter({'running': 1, 'finished': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>, 'main': <BatchJob job_id='vito-j-05d619897904467ca928d4f42fae17ff'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Going to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 7, elapsed: 0:05:05.658425
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Upstream status for subjob 'main' (batch job 'vito-j-05d619897904467ca928d4f42fae17ff'): running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': finished
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'running', 'vito:lc2': 'finished'} state_stats=Counter({'running': 1, 'finished': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>, 'main': <BatchJob job_id='vito-j-05d619897904467ca928d4f42fae17ff'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Going to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 8, elapsed: 0:05:35.932514
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Upstream status for subjob 'main' (batch job 'vito-j-05d619897904467ca928d4f42fae17ff'): running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': finished
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'running', 'vito:lc2': 'finished'} state_stats=Counter({'running': 1, 'finished': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>, 'main': <BatchJob job_id='vito-j-05d619897904467ca928d4f42fae17ff'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Going to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 9, elapsed: 0:06:07.720188
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Upstream status for subjob 'main' (batch job 'vito-j-05d619897904467ca928d4f42fae17ff'): running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': finished
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'running', 'vito:lc2': 'finished'} state_stats=Counter({'running': 1, 'finished': 1}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>, 'main': <BatchJob job_id='vito-j-05d619897904467ca928d4f42fae17ff'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Going to sleep
INFO:openeo_aggregator.partitionedjobs.crossbackend:Scheduling loop: step 10, elapsed: 0:06:48.484045
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='main': running
INFO:openeo_aggregator.partitionedjobs.crossbackend:Upstream status for subjob 'main' (batch job 'vito-j-05d619897904467ca928d4f42fae17ff'): finished
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state subjob_id='vito:lc2': finished
INFO:openeo_aggregator.partitionedjobs.crossbackend:Current state overview: states={'main': 'finished', 'vito:lc2': 'finished'} state_stats=Counter({'finished': 2}) batch_jobs={'vito:lc2': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>, 'main': <BatchJob job_id='vito-j-05d619897904467ca928d4f42fae17ff'>}
INFO:openeo_aggregator.partitionedjobs.crossbackend:Breaking out of loop: all jobs finished successfully.
INFO:crossbackend-poc:Running partitioned job: end 2023-03-23 15:36:41.464959, elapsed 0:07:02.064769
Run info of subjobs: {'main': {'state': 'finished', 'batch_job': <BatchJob job_id='vito-j-05d619897904467ca928d4f42fae17ff'>}, 'vito:lc2': {'state': 'finished', 'batch_job': <BatchJob job_id='vito-j-f1126612df78496caab555628ad96ba5'>}}

Process finished with exit code 0

Breaking out of loop: all jobs finished successfully.

🎉

soxofaan added a commit to Open-EO/openeo-geopyspark-driver that referenced this issue Mar 24, 2023
soxofaan added a commit to Open-EO/openeo-geopyspark-driver that referenced this issue Mar 24, 2023
soxofaan added a commit to Open-EO/openeo-geopyspark-driver that referenced this issue Mar 24, 2023
@soxofaan
Copy link
Member Author

I just tried to run a true cross-backend job using VITO for the main job and SentinelHub for the dependency subjob but that is currently blocked because the job result listing of SentinelHub (GET /job/{job_id}/results) is not available as signed URL.
GET /job/{job_id}/results list assets as signed URLs, but does not have "canonical" link with signed URL version of itself:

{
  "assets": {
    "46ce6f3b-498c-48d7-af5c-d39b42ef1549/31UES_0_3/default.tif": {
      "href": "https://s3.eu-central-1.amazonaws.com/com.sinergi...-d39b42ef1549/31UES_0_3/default.tif?...&X-Amz-Signature=0eaf023b011...",
      "roles": [
        "data"
      ]
    },
    "46ce6f3b-498c-48d7-af5c-d39b42ef1549/31UES_0_4/default.tif": {
      "href": "https://s3.eu-central-1.amazonaws.com/com.sinergi...-d39b42ef1549/31UES_0_4/default.tif?...&X-Amz-Signature=7b40541efa",
      "roles": [
        "data"
      ]
    }
  },
  "geometry": null,
  "id": "d137ea10-3d36-48c3-ae15-008c130c75a9",
  "links": [],
  "properties": {
    "datetime": null
  },
  "stac_version": "0.9.0",
  "type": "Feature"
}

Also note that it returns a "type": "Feature", while the current load_result implementation of VITO only works with a "type": "Collection" result.

The signed URL problem could relatively easy be addressed in aggregator, but I think the "Feature" instead of "Collection" problem is a lot harder to fix in VITO's load_result

@soxofaan
Copy link
Member Author

soxofaan commented Apr 4, 2023

Closing this issue for now.

There are some follow up actions, e.g.

  • use case with VITO and SentinelHub
  • use case with VITO and EODC
    which are blocked at the moment, but it's probably better to tackle them in next iteration, follow-up tickets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants