From 01ec2147cc830ec8271f3423e88d82cc23a68609 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Tue, 8 Nov 2022 16:54:24 +0100 Subject: [PATCH] Skipping of cached task outputs via execution config (#489) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implemented skipping of cached task results via execution config CatalogClient.Put can now create or update/overwrite artifacts and their data Signed-off-by: Nick Müller * Renamed SkipCache flag to OverwriteCache Updated to latest released versions of flyteidl and flytestdlib Signed-off-by: Nick Müller * datacatalog client now handles NotFound errors gracefully while overwriting artifact Signed-off-by: Nick Müller * Refactored updating of artifact data into separate Update method of datacatalog client Signed-off-by: Nick Müller * Updated to latest released flyteplugins version Signed-off-by: Nick Müller * Use go 1.18 for CodeQL GHA Signed-off-by: Nick Müller Signed-off-by: Nick Müller --- .github/workflows/codeql-analysis.yml | 4 + go.mod | 26 +-- go.sum | 54 +++-- .../v1alpha1/execution_config.go | 2 + ...ization.multi_images.my_workflow_2_wf.yaml | 1 + ...ontainerization.raw_container.wf_2_wf.yaml | 1 + ...n.use_secrets.my_secret_workflow_2_wf.yaml | 1 + ..._flow.chain_tasks.chain_tasks_wf_2_wf.yaml | 1 + ....control_flow.checkpoint.example_2_wf.yaml | 1 + ...ntrol_flow.conditions.multiplier_2_wf.yaml | 1 + ...rol_flow.conditions.multiplier_2_2_wf.yaml | 1 + ...rol_flow.conditions.multiplier_3_2_wf.yaml | 1 + ...flow.conditions.basic_boolean_wf_2_wf.yaml | 1 + ...ol_flow.conditions.bool_input_wf_2_wf.yaml | 1 + ...low.conditions.nested_conditions_2_wf.yaml | 1 + ..._flow.conditions.consume_outputs_2_wf.yaml | 1 + ...48_core.control_flow.dynamics.wf_2_wf.yaml | 1 + ...ol_flow.map_task.my_map_workflow_2_wf.yaml | 1 + ...ntrol_flow.merge_sort.merge_sort_2_wf.yaml | 1 + ...ntrol_flow.subworkflows.my_subwf_2_wf.yaml | 1 + ...l_flow.subworkflows.ext_workflow_2_wf.yaml | 1 + ...e.custom_task_plugin.my_workflow_2_wf.yaml | 1 + ...ore.extend_flyte.custom_types.wf_2_wf.yaml | 1 + ...lyte_basics.basic_workflow.my_wf_2_wf.yaml | 1 + ...flyte_basics.decorating_tasks.wf_2_wf.yaml | 1 + ...e_basics.decorating_workflows.wf_2_wf.yaml | 1 + ...mented_workflow.sphinx_docstring_2_wf.yaml | 1 + ...umented_workflow.numpy_docstring_2_wf.yaml | 1 + ...mented_workflow.google_docstring_2_wf.yaml | 1 + ..._basics.files.normalize_csv_file_2_wf.yaml | 1 + ...download_and_normalize_csv_files_2_wf.yaml | 1 + ...e.flyte_basics.hello_world.my_wf_2_wf.yaml | 1 + ...7_my.imperative.workflow.example_2_wf.yaml | 1 + .../120_core.flyte_basics.lp.my_wf_2_wf.yaml | 1 + ...25_core.flyte_basics.lp.go_greet_2_wf.yaml | 1 + ...flyte_basics.named_outputs.my_wf_2_wf.yaml | 1 + ..._core.flyte_basics.shell_task.wf_2_wf.yaml | 1 + ...s.task_cache.cached_dataframe_wf_2_wf.yaml | 1 + ...s.lp_schedules.date_formatter_wf_2_wf.yaml | 1 + ...rkflows.lp_schedules.positive_wf_2_wf.yaml | 1 + ...re.type_system.custom_objects.wf_2_wf.yaml | 1 + ...6_core.type_system.enums.enum_wf_2_wf.yaml | 1 + ...type_system.flyte_pickle.welcome_2_wf.yaml | 1 + ...73_core.type_system.schema.df_wf_2_wf.yaml | 1 + ..._dataset.pandas_compatibility_wf_2_wf.yaml | 1 + ..._dataset.schema_compatibility_wf_2_wf.yaml | 1 + ...core.type_system.typed_schema.wf_2_wf.yaml | 1 + .../nodes/dynamic/dynamic_workflow_test.go | 2 +- .../task/catalog/datacatalog/datacatalog.go | 194 ++++++++++++----- .../catalog/datacatalog/datacatalog_test.go | 202 +++++++++++++++++- .../nodes/task/catalog/noop_catalog.go | 8 + pkg/controller/nodes/task/handler.go | 64 +++--- pkg/controller/nodes/task/handler_test.go | 90 +++++++- .../nodes/task/pre_post_execution.go | 18 +- pkg/utils/failing_datastore.go | 4 + pkg/utils/failing_datastore_test.go | 2 + 56 files changed, 585 insertions(+), 128 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index fd6b11af7..af39d7554 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -40,6 +40,10 @@ jobs: - name: Checkout repository uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: '1.18' + # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL uses: github/codeql-action/init@v1 diff --git a/go.mod b/go.mod index 5884c1c35..f3ca59247 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 - github.com/flyteorg/flyteidl v1.1.19 - github.com/flyteorg/flyteplugins v1.0.17 - github.com/flyteorg/flytestdlib v1.0.5 + github.com/flyteorg/flyteidl v1.2.3 + github.com/flyteorg/flyteplugins v1.0.18 + github.com/flyteorg/flytestdlib v1.0.11 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible github.com/go-test/deep v1.0.7 @@ -26,7 +26,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.2 golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 google.golang.org/grpc v1.46.0 google.golang.org/protobuf v1.28.0 @@ -43,12 +43,12 @@ require ( cloud.google.com/go/compute v1.6.1 // indirect cloud.google.com/go/iam v0.3.0 // indirect cloud.google.com/go/storage v1.22.0 // indirect - github.com/Azure/azure-sdk-for-go v62.3.0+incompatible // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 // indirect - github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 // indirect + github.com/Azure/azure-sdk-for-go v63.4.0+incompatible // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect - github.com/Azure/go-autorest/autorest v0.11.25 // indirect + github.com/Azure/go-autorest/autorest v0.11.27 // indirect github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect @@ -74,7 +74,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful v2.9.6+incompatible // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect - github.com/flyteorg/stow v0.3.4 // indirect + github.com/flyteorg/stow v0.3.6 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect @@ -82,7 +82,7 @@ require ( github.com/go-openapi/swag v0.19.14 // indirect github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang-jwt/jwt/v4 v4.2.0 // indirect + github.com/golang-jwt/jwt/v4 v4.4.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/go-cmp v0.5.8 // indirect @@ -122,9 +122,9 @@ require ( github.com/subosito/gotenv v1.2.0 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect - golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect + golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect - golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68 // indirect + golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect diff --git a/go.sum b/go.sum index 0c7408adf..f5034b122 100644 --- a/go.sum +++ b/go.sum @@ -63,14 +63,19 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f cloud.google.com/go/storage v1.22.0 h1:NUV0NNp9nkBuW66BFRLuMgldN60C57ET3dhbwLIYio8= cloud.google.com/go/storage v1.22.0/go.mod h1:GbaLEoMqbVm6sx3Z0R++gSiBlgMv6yUi2q1DeGFKQgE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/Azure/azure-sdk-for-go v62.3.0+incompatible h1:Ctfsn9UoA/BB4HMYQlbPPgNXdX0tZ4tmb85+KFb2+RE= github.com/Azure/azure-sdk-for-go v62.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 h1:qoVeMsc9/fh/yhxVaA0obYjVH/oI/ihrOoMwsLS9KSA= +github.com/Azure/azure-sdk-for-go v63.4.0+incompatible h1:fle3M5Q7vr8auaiPffKyUQmLbvYeqpw30bKU6PrWJFo= +github.com/Azure/azure-sdk-for-go v63.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM= -github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 h1:E+m3SkZCN0Bf5q7YdTs5lSm2CYY3CK4spn5OmUIiQtk= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1 h1:3CVsSo4mp8NDWO11tHzN/mdo2zP0CtaSK5IcwBjfqRA= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1/go.mod h1:w5pDIZuawUmY3Bj4tVx3Xb8KS96ToB0j315w9rqpAg0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0 h1:NVS/4LOQfkBpk+B1VopIzv1ptmYeEskA8w/3K/w7vjo= github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 h1:Px2UA+2RvSSvv+RvJNuUB6n7rs5Wsel4dXLe90Um2n4= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 h1:Px2KVERcYEg2Lv25AqC2hVr0xUWaq94wuEObLIkYzmA= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2/go.mod h1:CdSJQNNzZhCkwDaV27XV1w48ZBPtxe7mlrZAsPNxD5g= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0/go.mod h1:tPaiy8S5bQ+S5sOiDlINkp7+Ef339+Nz5L5XO+cnOHo= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0 h1:0nJeKDmB7a1a8RDMjTltahlPsaNlWjq/LpkZleSwINk= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0/go.mod h1:mbwxKc/fW+IkF0GG591MuXw0KuEQBDkeRoZ9vmVJPxg= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= @@ -79,8 +84,8 @@ github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+B github.com/Azure/go-autorest/autorest v0.11.12/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= github.com/Azure/go-autorest/autorest v0.11.17/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= -github.com/Azure/go-autorest/autorest v0.11.25 h1:yp+V8DGur2aIUE87ebP8twPLz6k68jtJTlg61mEoByA= -github.com/Azure/go-autorest/autorest v0.11.25/go.mod h1:7l8ybrIdUmGqZMTD0sRtAr8NvbHjfofbf8RSP2q7w7U= +github.com/Azure/go-autorest/autorest v0.11.27 h1:F3R3q42aWytozkV8ihzcgMO4OA4cuqr3bNlsEuF6//A= +github.com/Azure/go-autorest/autorest v0.11.27/go.mod h1:7l8ybrIdUmGqZMTD0sRtAr8NvbHjfofbf8RSP2q7w7U= github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A= github.com/Azure/go-autorest/autorest/adal v0.9.10/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A= @@ -104,6 +109,7 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= +github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA= @@ -292,16 +298,16 @@ github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.1.19 h1:1CtSbuFhFHwUbKdv66PqbcER01iacAJU+snh0eTsXc4= -github.com/flyteorg/flyteidl v1.1.19/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U= -github.com/flyteorg/flyteplugins v1.0.17 h1:yC44tbVb0RTsul4+loiz/OEAMtfTPuWlzFK/lGknM2U= -github.com/flyteorg/flyteplugins v1.0.17/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84= +github.com/flyteorg/flyteidl v1.2.3 h1:4A90rFyGXiUtFnQIgSPxPzBZRy9RoAPsfxs7OWYHfFA= +github.com/flyteorg/flyteidl v1.2.3/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I= +github.com/flyteorg/flyteplugins v1.0.18 h1:DOyxAFaS4luv7H9XRKUpHbO09imsG4LP8Du515FGXyM= +github.com/flyteorg/flyteplugins v1.0.18/go.mod h1:ZbZVBxEWh8Icj1AgfNKg0uPzHHGd9twa4eWcY2Yt6xE= github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c= -github.com/flyteorg/flytestdlib v1.0.5 h1:80A/vfpAJl+pgU6vxccbsYApZPrvyGhOIsCAFngsjnk= -github.com/flyteorg/flytestdlib v1.0.5/go.mod h1:WTe0k3DmmrKFjj3hwiIbjjdCK89X63MBzBbXhQ4Yxf0= +github.com/flyteorg/flytestdlib v1.0.11 h1:f7B8x2/zMuimEVi4Jx0zqzvNhdi7aq7+ZWoqHsbp4F4= +github.com/flyteorg/flytestdlib v1.0.11/go.mod h1:nIBmBHtjTJvhZEn3e/EwVC/iMkR2tUX8hEiXjRBpH/s= github.com/flyteorg/stow v0.3.3/go.mod h1:HBld7ud0i4khMHwJjkO8v+NSP7ddKa/ruhf4I8fliaA= -github.com/flyteorg/stow v0.3.4 h1:gJVz1LCcEQ5ESWoedRxKh4uUv/V/c1eYLVAQVy07PPY= -github.com/flyteorg/stow v0.3.4/go.mod h1:2T2f6KaIWoWCLgI6EFZQgjb83Vg5SolmBwc2O06WQU4= +github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= +github.com/flyteorg/stow v0.3.6/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= @@ -404,9 +410,11 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= -github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU= github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= +github.com/golang-jwt/jwt/v4 v4.4.1 h1:pC5DB52sCeK48Wlb9oPcdhnjkz1TKt1D/P7WKJ0kUcQ= +github.com/golang-jwt/jwt/v4 v4.4.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -575,6 +583,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= +github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.3/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -620,6 +630,7 @@ github.com/kubeflow/common v0.4.3 h1:vVoOMNPOZK4wzZvQ4rsRLvC3SDi+J1fVKNHSXC/QRvU github.com/kubeflow/common v0.4.3/go.mod h1:Qb/5aON7/OWVkN8OnjRqqT0i8X/XzMekRIZ8lkLosj4= github.com/kubeflow/training-operator v1.5.0-rc.0 h1:MaxbG80SYpIbDG63tSiwav4OXczrSFA5AFnaQavzgbw= github.com/kubeflow/training-operator v1.5.0-rc.0/go.mod h1:xgcu/ZI/RwKbTvYgzU7ZWFpxbsefSey5We3KmKroALY= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= @@ -1067,8 +1078,8 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220607020251-c690dde0001d h1:4SFsTMi4UahlKoloni7L4eYzhFRifURQLw+yv0QDCx8= -golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1100,8 +1111,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1195,8 +1207,8 @@ golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68 h1:z8Hj/bl9cOV2grsOpEaQFUaly0JWN3i97mo3jXKJNp0= -golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1295,7 +1307,7 @@ golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.10-0.20220218145154-897bd77cd717/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= -golang.org/x/tools v0.1.11 h1:loJ25fNOEhSXfHrpoGj91eCUThwdNX6u24rO1xnNteY= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/apis/flyteworkflow/v1alpha1/execution_config.go b/pkg/apis/flyteworkflow/v1alpha1/execution_config.go index 97cd87336..878df4768 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/execution_config.go +++ b/pkg/apis/flyteworkflow/v1alpha1/execution_config.go @@ -30,6 +30,8 @@ type ExecutionConfig struct { TaskResources TaskResources // Defines whether a workflow has been flagged as interruptible. Interruptible *bool + // Defines whether a workflow should skip all its cached results and re-compute its output, overwriting any already stored data. + OverwriteCache bool } type TaskPluginOverride struct { diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/002_core.containerization.multi_images.my_workflow_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/002_core.containerization.multi_images.my_workflow_2_wf.yaml index 018b5eb18..98ea1f620 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/002_core.containerization.multi_images.my_workflow_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/002_core.containerization.multi_images.my_workflow_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/010_core.containerization.raw_container.wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/010_core.containerization.raw_container.wf_2_wf.yaml index 53d3aa7d5..9a00ffdff 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/010_core.containerization.raw_container.wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/010_core.containerization.raw_container.wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/015_core.containerization.use_secrets.my_secret_workflow_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/015_core.containerization.use_secrets.my_secret_workflow_2_wf.yaml index 29bc1b6b9..0d33b2aad 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/015_core.containerization.use_secrets.my_secret_workflow_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/015_core.containerization.use_secrets.my_secret_workflow_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/019_core.control_flow.chain_tasks.chain_tasks_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/019_core.control_flow.chain_tasks.chain_tasks_wf_2_wf.yaml index c30ffc272..7efbcf697 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/019_core.control_flow.chain_tasks.chain_tasks_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/019_core.control_flow.chain_tasks.chain_tasks_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/022_core.control_flow.checkpoint.example_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/022_core.control_flow.checkpoint.example_2_wf.yaml index f0b17af3b..36abd7d14 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/022_core.control_flow.checkpoint.example_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/022_core.control_flow.checkpoint.example_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/026_core.control_flow.conditions.multiplier_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/026_core.control_flow.conditions.multiplier_2_wf.yaml index 8e959bd0c..af54d19a9 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/026_core.control_flow.conditions.multiplier_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/026_core.control_flow.conditions.multiplier_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf.yaml index 7b94c0b89..57b143920 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/028_core.control_flow.conditions.multiplier_2_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf.yaml index eb37b0a97..27dd0868b 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/030_core.control_flow.conditions.multiplier_3_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/035_core.control_flow.conditions.basic_boolean_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/035_core.control_flow.conditions.basic_boolean_wf_2_wf.yaml index 03836a7dc..45e222160 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/035_core.control_flow.conditions.basic_boolean_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/035_core.control_flow.conditions.basic_boolean_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/037_core.control_flow.conditions.bool_input_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/037_core.control_flow.conditions.bool_input_wf_2_wf.yaml index c3854d679..cfc31d71e 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/037_core.control_flow.conditions.bool_input_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/037_core.control_flow.conditions.bool_input_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf.yaml index 1b9ec7a14..bf85763b9 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/039_core.control_flow.conditions.nested_conditions_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/042_core.control_flow.conditions.consume_outputs_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/042_core.control_flow.conditions.consume_outputs_2_wf.yaml index a2ff6638c..36db07065 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/042_core.control_flow.conditions.consume_outputs_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/042_core.control_flow.conditions.consume_outputs_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/048_core.control_flow.dynamics.wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/048_core.control_flow.dynamics.wf_2_wf.yaml index 45edd8304..f7477b6e5 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/048_core.control_flow.dynamics.wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/048_core.control_flow.dynamics.wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/053_core.control_flow.map_task.my_map_workflow_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/053_core.control_flow.map_task.my_map_workflow_2_wf.yaml index 552d1b806..46c7b0ccd 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/053_core.control_flow.map_task.my_map_workflow_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/053_core.control_flow.map_task.my_map_workflow_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/059_core.control_flow.merge_sort.merge_sort_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/059_core.control_flow.merge_sort.merge_sort_2_wf.yaml index fa333434d..e340f7bd2 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/059_core.control_flow.merge_sort.merge_sort_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/059_core.control_flow.merge_sort.merge_sort_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/062_core.control_flow.subworkflows.my_subwf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/062_core.control_flow.subworkflows.my_subwf_2_wf.yaml index 633da9a81..4c2b3b822 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/062_core.control_flow.subworkflows.my_subwf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/062_core.control_flow.subworkflows.my_subwf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/069_core.control_flow.subworkflows.ext_workflow_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/069_core.control_flow.subworkflows.ext_workflow_2_wf.yaml index 98f73f45c..b3342ba32 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/069_core.control_flow.subworkflows.ext_workflow_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/069_core.control_flow.subworkflows.ext_workflow_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/077_core.extend_flyte.custom_task_plugin.my_workflow_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/077_core.extend_flyte.custom_task_plugin.my_workflow_2_wf.yaml index 91a5e738d..407584f70 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/077_core.extend_flyte.custom_task_plugin.my_workflow_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/077_core.extend_flyte.custom_task_plugin.my_workflow_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/081_core.extend_flyte.custom_types.wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/081_core.extend_flyte.custom_types.wf_2_wf.yaml index 92daf8689..fdccbb84b 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/081_core.extend_flyte.custom_types.wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/081_core.extend_flyte.custom_types.wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/085_core.flyte_basics.basic_workflow.my_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/085_core.flyte_basics.basic_workflow.my_wf_2_wf.yaml index 6d52f7746..c2b156942 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/085_core.flyte_basics.basic_workflow.my_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/085_core.flyte_basics.basic_workflow.my_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/089_core.flyte_basics.decorating_tasks.wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/089_core.flyte_basics.decorating_tasks.wf_2_wf.yaml index 3920835e6..baedd2afd 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/089_core.flyte_basics.decorating_tasks.wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/089_core.flyte_basics.decorating_tasks.wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/095_core.flyte_basics.decorating_workflows.wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/095_core.flyte_basics.decorating_workflows.wf_2_wf.yaml index f94c0ea9a..15f19ddb1 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/095_core.flyte_basics.decorating_workflows.wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/095_core.flyte_basics.decorating_workflows.wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/098_core.flyte_basics.documented_workflow.sphinx_docstring_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/098_core.flyte_basics.documented_workflow.sphinx_docstring_2_wf.yaml index b71c5a23b..56324a47c 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/098_core.flyte_basics.documented_workflow.sphinx_docstring_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/098_core.flyte_basics.documented_workflow.sphinx_docstring_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/100_core.flyte_basics.documented_workflow.numpy_docstring_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/100_core.flyte_basics.documented_workflow.numpy_docstring_2_wf.yaml index 614fe0743..89dd7dc55 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/100_core.flyte_basics.documented_workflow.numpy_docstring_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/100_core.flyte_basics.documented_workflow.numpy_docstring_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/102_core.flyte_basics.documented_workflow.google_docstring_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/102_core.flyte_basics.documented_workflow.google_docstring_2_wf.yaml index 4ed4f0e68..76407b7ae 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/102_core.flyte_basics.documented_workflow.google_docstring_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/102_core.flyte_basics.documented_workflow.google_docstring_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/105_core.flyte_basics.files.normalize_csv_file_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/105_core.flyte_basics.files.normalize_csv_file_2_wf.yaml index eb6d09c68..bb99dd7b6 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/105_core.flyte_basics.files.normalize_csv_file_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/105_core.flyte_basics.files.normalize_csv_file_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/109_core.flyte_basics.folders.download_and_normalize_csv_files_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/109_core.flyte_basics.folders.download_and_normalize_csv_files_2_wf.yaml index 8603a881d..5d943ca0d 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/109_core.flyte_basics.folders.download_and_normalize_csv_files_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/109_core.flyte_basics.folders.download_and_normalize_csv_files_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/112_core.flyte_basics.hello_world.my_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/112_core.flyte_basics.hello_world.my_wf_2_wf.yaml index 57fbfabf7..d2d30b315 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/112_core.flyte_basics.hello_world.my_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/112_core.flyte_basics.hello_world.my_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/117_my.imperative.workflow.example_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/117_my.imperative.workflow.example_2_wf.yaml index 7bf3321b7..8c27274b0 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/117_my.imperative.workflow.example_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/117_my.imperative.workflow.example_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/120_core.flyte_basics.lp.my_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/120_core.flyte_basics.lp.my_wf_2_wf.yaml index 84454acd0..eb91ab4cb 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/120_core.flyte_basics.lp.my_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/120_core.flyte_basics.lp.my_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/125_core.flyte_basics.lp.go_greet_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/125_core.flyte_basics.lp.go_greet_2_wf.yaml index 1e3316bff..2fa067626 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/125_core.flyte_basics.lp.go_greet_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/125_core.flyte_basics.lp.go_greet_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/129_core.flyte_basics.named_outputs.my_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/129_core.flyte_basics.named_outputs.my_wf_2_wf.yaml index cb8d7ba31..2ad1ef7ee 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/129_core.flyte_basics.named_outputs.my_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/129_core.flyte_basics.named_outputs.my_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/140_core.flyte_basics.shell_task.wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/140_core.flyte_basics.shell_task.wf_2_wf.yaml index 35c747cac..15b186204 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/140_core.flyte_basics.shell_task.wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/140_core.flyte_basics.shell_task.wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/147_core.flyte_basics.task_cache.cached_dataframe_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/147_core.flyte_basics.task_cache.cached_dataframe_wf_2_wf.yaml index 6f40a115f..44d2f9562 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/147_core.flyte_basics.task_cache.cached_dataframe_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/147_core.flyte_basics.task_cache.cached_dataframe_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/151_core.scheduled_workflows.lp_schedules.date_formatter_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/151_core.scheduled_workflows.lp_schedules.date_formatter_wf_2_wf.yaml index 27376484c..06e20a2a9 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/151_core.scheduled_workflows.lp_schedules.date_formatter_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/151_core.scheduled_workflows.lp_schedules.date_formatter_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/155_core.scheduled_workflows.lp_schedules.positive_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/155_core.scheduled_workflows.lp_schedules.positive_wf_2_wf.yaml index 8aca784d7..064338460 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/155_core.scheduled_workflows.lp_schedules.positive_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/155_core.scheduled_workflows.lp_schedules.positive_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/162_core.type_system.custom_objects.wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/162_core.type_system.custom_objects.wf_2_wf.yaml index 57dd92a74..570453d1a 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/162_core.type_system.custom_objects.wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/162_core.type_system.custom_objects.wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/166_core.type_system.enums.enum_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/166_core.type_system.enums.enum_wf_2_wf.yaml index 260c1c101..1eaf99845 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/166_core.type_system.enums.enum_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/166_core.type_system.enums.enum_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/169_core.type_system.flyte_pickle.welcome_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/169_core.type_system.flyte_pickle.welcome_2_wf.yaml index 17529df04..f4466f9d5 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/169_core.type_system.flyte_pickle.welcome_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/169_core.type_system.flyte_pickle.welcome_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/173_core.type_system.schema.df_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/173_core.type_system.schema.df_wf_2_wf.yaml index 1d7f73d86..e6f0239d6 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/173_core.type_system.schema.df_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/173_core.type_system.schema.df_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/179_core.type_system.structured_dataset.pandas_compatibility_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/179_core.type_system.structured_dataset.pandas_compatibility_wf_2_wf.yaml index 548a07c95..4577e374a 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/179_core.type_system.structured_dataset.pandas_compatibility_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/179_core.type_system.structured_dataset.pandas_compatibility_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/181_core.type_system.structured_dataset.schema_compatibility_wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/181_core.type_system.structured_dataset.schema_compatibility_wf_2_wf.yaml index ceb764fa9..4340ffef5 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/181_core.type_system.structured_dataset.schema_compatibility_wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/181_core.type_system.structured_dataset.schema_compatibility_wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/compiler/test/testdata/snacks-core/k8s/185_core.type_system.typed_schema.wf_2_wf.yaml b/pkg/compiler/test/testdata/snacks-core/k8s/185_core.type_system.typed_schema.wf_2_wf.yaml index cd06f0454..d04c5cc46 100755 --- a/pkg/compiler/test/testdata/snacks-core/k8s/185_core.type_system.typed_schema.wf_2_wf.yaml +++ b/pkg/compiler/test/testdata/snacks-core/k8s/185_core.type_system.typed_schema.wf_2_wf.yaml @@ -2,6 +2,7 @@ apiVersion: flyte.lyft.com/v1alpha1 executionConfig: Interruptible: null MaxParallelism: 0 + OverwriteCache: false RecoveryExecution: {} TaskPluginImpls: null TaskResources: diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index 184b0ee6c..63359352f 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -498,7 +498,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t composedPBStore.OnWriteRawMatch( mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"), - int64(1450), + int64(1473), storage.Options{}, mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo")) diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index b776d8c55..3009af038 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -28,13 +28,13 @@ var ( _ catalog.Client = &CatalogClient{} ) -// This is the client that caches task executions to DataCatalog service. +// CatalogClient is the client that caches task executions to DataCatalog service. type CatalogClient struct { client datacatalog.DataCatalogClient maxCacheAge time.Duration } -// Helper method to retrieve a dataset that is associated with the task +// GetDataset retrieves a dataset that is associated with the task represented by the provided catalog.Key. func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error) { datasetID, err := GenerateDatasetIDForTask(ctx, key) if err != nil { @@ -54,7 +54,7 @@ func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datac return datasetResponse.Dataset, nil } -// Helper method to retrieve an artifact by the tag +// GetArtifactByTag retrieves an artifact using the provided tag and dataset. func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error) { logger.Debugf(ctx, "Get Artifact by tag %v", tagName) artifactQuery := &datacatalog.GetArtifactRequest{ @@ -144,6 +144,7 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil } +// CreateDataset creates a Dataset in datacatalog including the associated metadata. func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error) { datasetID, err := GenerateDatasetIDForTask(ctx, key) if err != nil { @@ -170,56 +171,15 @@ func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, meta return datasetID, nil } -func (m *CatalogClient) CreateArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, outputs *core.LiteralMap, md *datacatalog.Metadata) (*datacatalog.Artifact, error) { - // Create the artifact for the execution that belongs in the task - artifactDataList := make([]*datacatalog.ArtifactData, 0, len(outputs.Literals)) - for name, value := range outputs.Literals { - artifactData := &datacatalog.ArtifactData{ - Name: name, - Value: value, - } - artifactDataList = append(artifactDataList, artifactData) - } - - cachedArtifact := &datacatalog.Artifact{ - Id: string(uuid.NewUUID()), - Dataset: datasetID, - Data: artifactDataList, - Metadata: md, - } - - createArtifactRequest := &datacatalog.CreateArtifactRequest{Artifact: cachedArtifact} - _, err := m.client.CreateArtifact(ctx, createArtifactRequest) - if err != nil { - logger.Errorf(ctx, "Failed to create Artifact %+v, err: %v", cachedArtifact, err) - return cachedArtifact, err - } - logger.Debugf(ctx, "Created artifact: %v, with %v outputs from execution %v", cachedArtifact.Id, len(artifactDataList)) - return cachedArtifact, nil -} - -// Catalog the task execution as a cached Artifact. We associate an Artifact as the cached data by tagging the Artifact -// with the hash of the input values. -// -// The steps taken to cache an execution: -// - Ensure a Dataset exists for the Artifact. The Dataset represents the proj/domain/name/version of the task -// - Create an Artifact with the execution data that belongs to the dataset -// - Tag the Artifact with a hash generated by the input values -func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error) { - - // Populate Metadata for later recovery - datasetID, err := m.CreateDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier)) - if err != nil { - return catalog.Status{}, err - } - - inputs := &core.LiteralMap{} - outputs := &core.LiteralMap{} +// prepareInputsAndOutputs reads the inputs and outputs of a task and returns them as core.LiteralMaps to be consumed by datacatalog. +func (m *CatalogClient) prepareInputsAndOutputs(ctx context.Context, key catalog.Key, reader io.OutputReader) (inputs *core.LiteralMap, outputs *core.LiteralMap, err error) { + inputs = &core.LiteralMap{} + outputs = &core.LiteralMap{} if key.TypedInterface.Inputs != nil && len(key.TypedInterface.Inputs.Variables) != 0 { retInputs, err := key.InputReader.Get(ctx) if err != nil { logger.Errorf(ctx, "DataCatalog failed to read inputs err: %s", err) - return catalog.Status{}, err + return nil, nil, err } logger.Debugf(ctx, "DataCatalog read inputs") inputs = retInputs @@ -229,21 +189,48 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp retOutputs, retErr, err := reader.Read(ctx) if err != nil { logger.Errorf(ctx, "DataCatalog failed to read outputs err: %s", err) - return catalog.Status{}, err + return nil, nil, err } if retErr != nil { logger.Errorf(ctx, "DataCatalog failed to read outputs, err :%s", retErr.Message) - return catalog.Status{}, errors.Errorf("Failed to read outputs. EC: %s, Msg: %s", retErr.Code, retErr.Message) + return nil, nil, errors.Errorf("Failed to read outputs. EC: %s, Msg: %s", retErr.Code, retErr.Message) } logger.Debugf(ctx, "DataCatalog read outputs") outputs = retOutputs } + return inputs, outputs, nil +} + +// CreateArtifact creates an Artifact in datacatalog including its associated ArtifactData and tags it with a hash of +// the provided input values for retrieval. +func (m *CatalogClient) CreateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error) { + logger.Debugf(ctx, "Creating artifact for key %+v, dataset %+v and execution %+v", key, datasetID, metadata) + // Create the artifact for the execution that belongs in the task - cachedArtifact, err := m.CreateArtifact(ctx, datasetID, outputs, GetArtifactMetadataForSource(metadata.TaskExecutionIdentifier)) + artifactDataList := make([]*datacatalog.ArtifactData, 0, len(outputs.Literals)) + for name, value := range outputs.Literals { + artifactData := &datacatalog.ArtifactData{ + Name: name, + Value: value, + } + artifactDataList = append(artifactDataList, artifactData) + } + + cachedArtifact := &datacatalog.Artifact{ + Id: string(uuid.NewUUID()), + Dataset: datasetID, + Data: artifactDataList, + Metadata: GetArtifactMetadataForSource(metadata.TaskExecutionIdentifier), + } + + createArtifactRequest := &datacatalog.CreateArtifactRequest{Artifact: cachedArtifact} + _, err := m.client.CreateArtifact(ctx, createArtifactRequest) if err != nil { - return catalog.Status{}, errors.Wrapf(err, "failed to create dataset for ID %s", key.Identifier.String()) + logger.Errorf(ctx, "Failed to create Artifact %+v, err: %v", cachedArtifact, err) + return catalog.Status{}, err } + logger.Debugf(ctx, "Created artifact: %v, with %v outputs from execution %+v", cachedArtifact.Id, len(artifactDataList), metadata) // Tag the artifact since it is the cached artifact tagName, err := GenerateArtifactTagName(ctx, inputs) @@ -269,9 +256,110 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp } } + logger.Debugf(ctx, "Successfully created artifact %+v for key %+v, dataset %+v and execution %+v", cachedArtifact, key, datasetID, metadata) return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, nil)), nil } +// UpdateArtifact overwrites the ArtifactData of an existing artifact with the provided data in datacatalog. +func (m *CatalogClient) UpdateArtifact(ctx context.Context, key catalog.Key, datasetID *datacatalog.DatasetID, inputs *core.LiteralMap, outputs *core.LiteralMap, metadata catalog.Metadata) (catalog.Status, error) { + logger.Debugf(ctx, "Updating artifact for key %+v, dataset %+v and execution %+v", key, datasetID, metadata) + + artifactDataList := make([]*datacatalog.ArtifactData, 0, len(outputs.Literals)) + for name, value := range outputs.Literals { + artifactData := &datacatalog.ArtifactData{ + Name: name, + Value: value, + } + artifactDataList = append(artifactDataList, artifactData) + } + + tagName, err := GenerateArtifactTagName(ctx, inputs) + if err != nil { + logger.Errorf(ctx, "Failed to generate artifact tag name for key %+v, dataset %+v and execution %+v, err: %+v", key, datasetID, metadata, err) + return catalog.Status{}, err + } + + updateArtifactRequest := &datacatalog.UpdateArtifactRequest{ + Dataset: datasetID, + QueryHandle: &datacatalog.UpdateArtifactRequest_TagName{TagName: tagName}, + Data: artifactDataList, + } + resp, err := m.client.UpdateArtifact(ctx, updateArtifactRequest) + if err != nil { + logger.Errorf(ctx, "Failed to update artifact for key %+v, dataset %+v and execution %+v, err: %v", key, datasetID, metadata, err) + return catalog.Status{}, err + } + + tag := &datacatalog.Tag{ + Name: tagName, + Dataset: datasetID, + ArtifactId: resp.GetArtifactId(), + } + + source, err := GetSourceFromMetadata(GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier), GetArtifactMetadataForSource(metadata.TaskExecutionIdentifier), key.Identifier) + if err != nil { + return catalog.Status{}, fmt.Errorf("failed to get source from metadata. Error: %w", err) + } + + logger.Debugf(ctx, "Successfully updated artifact with ID %v and %d outputs for key %+v, dataset %+v and execution %+v", tag.ArtifactId, len(artifactDataList), key, datasetID, metadata) + return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, source)), nil +} + +// Put stores the result of a task execution as a cached Artifact and associates it with the data by tagging it with +// the hash of the input values. +// The CatalogClient will ensure a dataset exists for the Artifact to be created. A Dataset represents the +// project/domain/name/version of the task executed. +// Lastly, CatalogClient will create an Artifact tagged with the input value hash and store the provided execution data. +func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error) { + // Ensure dataset exists, idempotent operations. Populate Metadata for later recovery + datasetID, err := m.CreateDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier)) + if err != nil { + return catalog.Status{}, err + } + + inputs, outputs, err := m.prepareInputsAndOutputs(ctx, key, reader) + if err != nil { + return catalog.Status{}, err + } + + return m.CreateArtifact(ctx, key, datasetID, inputs, outputs, metadata) +} + +// Update stores the result of a task execution as a cached Artifact, overwriting any already stored data from a previous +// execution. +// The CatalogClient will ensure the referenced dataset exists and will silently create a new Artifact if the referenced +// key does not exist in datacatalog yet. +// After the operation succeeds, an artifact with the given key and data will be stored in catalog and a tag with the +// has of the input values will exist. +func (m *CatalogClient) Update(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error) { + // Ensure dataset exists, idempotent operations. Populate Metadata for later recovery + datasetID, err := m.CreateDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier)) + if err != nil { + return catalog.Status{}, err + } + + inputs, outputs, err := m.prepareInputsAndOutputs(ctx, key, reader) + if err != nil { + return catalog.Status{}, err + } + + catalogStatus, err := m.UpdateArtifact(ctx, key, datasetID, inputs, outputs, metadata) + if err != nil { + if status.Code(err) == codes.NotFound { + // No existing artifact found (e.g. initial execution of task with overwrite flag already set), + // silently ignore error and create artifact instead to make overwriting an idempotent operation. + logger.Debugf(ctx, "Artifact %+v for dataset %+v does not exist while updating, creating instead", key, datasetID) + return m.CreateArtifact(ctx, key, datasetID, inputs, outputs, metadata) + } + + logger.Errorf(ctx, "Failed to update artifact %+v for dataset %+v: %v", key, datasetID, err) + return catalog.Status{}, err + } + + logger.Debugf(ctx, "Successfully updated artifact %+v for dataset %+v", key, datasetID) + return catalogStatus, nil +} + // GetOrExtendReservation attempts to get a reservation for the cachable task. If you have // previously acquired a reservation it will be extended. If another entity holds the reservation // that is returned. diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go index 6a1f9de5e..7243c45de 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go @@ -69,7 +69,7 @@ var typedInterface = core.TypedInterface{ } var sampleKey = catalog.Key{ - Identifier: core.Identifier{ResourceType: core.ResourceType_TASK, Project: "project", Domain: "domain", Name: "name"}, + Identifier: core.Identifier{ResourceType: core.ResourceType_TASK, Project: "project", Domain: "domain", Name: "name", Version: "version"}, TypedInterface: typedInterface, CacheVersion: "1.0.0", } @@ -537,7 +537,207 @@ func TestCatalog_Put(t *testing.T) { assert.Equal(t, core.CatalogCacheStatus_CACHE_POPULATED, s.GetCacheStatus()) assert.NotNil(t, s.GetMetadata()) }) +} + +func TestCatalog_Update(t *testing.T) { + ctx := context.Background() + + t.Run("Overwrite existing cached execution", func(t *testing.T) { + ir := &mocks2.InputReader{} + ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + } + + mockClient.On("CreateDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { + assert.True(t, proto.Equal(o.Dataset.Id, datasetID)) + return true + }), + ).Return(&datacatalog.CreateDatasetResponse{}, nil) + + mockClient.On("UpdateArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.UpdateArtifactRequest) bool { + assert.True(t, proto.Equal(o.Dataset, datasetID)) + assert.IsType(t, &datacatalog.UpdateArtifactRequest_TagName{}, o.QueryHandle) + assert.Equal(t, tagName, o.GetTagName()) + return true + }), + ).Return(&datacatalog.UpdateArtifactResponse{ArtifactId: "test-artifact"}, nil) + + taskID := &core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Name: sampleKey.Identifier.Name, + Project: sampleKey.Identifier.Project, + Domain: sampleKey.Identifier.Domain, + Version: "version", + }, + NodeExecutionId: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Name: "wf", + Project: "p1", + Domain: "d1", + }, + NodeId: "unknown", // not set in Put request below --> defaults to "unknown" + }, + RetryAttempt: 0, + } + + newKey := sampleKey + newKey.InputReader = ir + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ + WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ + Name: taskID.NodeExecutionId.ExecutionId.Name, + Domain: taskID.NodeExecutionId.ExecutionId.Domain, + Project: taskID.NodeExecutionId.ExecutionId.Project, + }, + TaskExecutionIdentifier: &core.TaskExecutionIdentifier{ + TaskId: &sampleKey.Identifier, + NodeExecutionId: taskID.NodeExecutionId, + RetryAttempt: 0, + }, + }) + assert.NoError(t, err) + assert.Equal(t, core.CatalogCacheStatus_CACHE_POPULATED, s.GetCacheStatus()) + assert.NotNil(t, s.GetMetadata()) + assert.Equal(t, tagName, s.GetMetadata().ArtifactTag.Name) + sourceTID := s.GetMetadata().GetSourceTaskExecution() + assert.Equal(t, taskID.TaskId.String(), sourceTID.TaskId.String()) + assert.Equal(t, taskID.RetryAttempt, sourceTID.RetryAttempt) + assert.Equal(t, taskID.NodeExecutionId.String(), sourceTID.NodeExecutionId.String()) + }) + + t.Run("Overwrite non-existing execution", func(t *testing.T) { + ir := &mocks2.InputReader{} + ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + } + + createDatasetCalled := false + mockClient.On("CreateDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { + assert.True(t, proto.Equal(o.Dataset.Id, datasetID)) + createDatasetCalled = true + return true + }), + ).Return(&datacatalog.CreateDatasetResponse{}, nil) + + updateArtifactCalled := false + mockClient.On("UpdateArtifact", ctx, mock.Anything).Run(func(args mock.Arguments) { + updateArtifactCalled = true + }).Return(nil, status.New(codes.NotFound, "missing entity of type Artifact with identifier id").Err()) + + createArtifactCalled := false + mockClient.On("CreateArtifact", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateArtifactRequest) bool { + _, parseErr := uuid.Parse(o.Artifact.Id) + assert.NoError(t, parseErr) + assert.True(t, proto.Equal(o.Artifact.Dataset, datasetID)) + createArtifactCalled = true + return true + }), + ).Return(&datacatalog.CreateArtifactResponse{}, nil) + + addTagCalled := false + mockClient.On("AddTag", + ctx, + mock.MatchedBy(func(o *datacatalog.AddTagRequest) bool { + assert.EqualValues(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", o.Tag.Name) + addTagCalled = true + return true + }), + ).Return(&datacatalog.AddTagResponse{}, nil) + + taskID := &core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Name: sampleKey.Identifier.Name, + Project: sampleKey.Identifier.Project, + Domain: sampleKey.Identifier.Domain, + Version: "version", + }, + NodeExecutionId: &core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Name: "wf", + Project: "p1", + Domain: "d1", + }, + NodeId: "unknown", // not set in Put request below --> defaults to "unknown" + }, + RetryAttempt: 0, + } + + newKey := sampleKey + newKey.InputReader = ir + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ + WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ + Name: taskID.NodeExecutionId.ExecutionId.Name, + Domain: taskID.NodeExecutionId.ExecutionId.Domain, + Project: taskID.NodeExecutionId.ExecutionId.Project, + }, + TaskExecutionIdentifier: &core.TaskExecutionIdentifier{ + TaskId: &sampleKey.Identifier, + NodeExecutionId: taskID.NodeExecutionId, + RetryAttempt: 0, + }, + }) + assert.NoError(t, err) + assert.Equal(t, core.CatalogCacheStatus_CACHE_POPULATED, s.GetCacheStatus()) + assert.NotNil(t, s.GetMetadata()) + assert.Equal(t, tagName, s.GetMetadata().ArtifactTag.Name) + assert.Nil(t, s.GetMetadata().GetSourceTaskExecution()) + assert.True(t, createDatasetCalled) + assert.True(t, updateArtifactCalled) + assert.True(t, createArtifactCalled) + assert.True(t, addTagCalled) + }) + + t.Run("Error while overwriting execution", func(t *testing.T) { + ir := &mocks2.InputReader{} + ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil) + + mockClient := &mocks.DataCatalogClient{} + discovery := &CatalogClient{ + client: mockClient, + } + mockClient.On("CreateDataset", + ctx, + mock.MatchedBy(func(o *datacatalog.CreateDatasetRequest) bool { + assert.True(t, proto.Equal(o.Dataset.Id, datasetID)) + return true + }), + ).Return(&datacatalog.CreateDatasetResponse{}, nil) + + genericErr := errors.New("generic error") + mockClient.On("UpdateArtifact", ctx, mock.Anything).Return(nil, genericErr) + + newKey := sampleKey + newKey.InputReader = ir + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ + WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ + Name: "test", + }, + TaskExecutionIdentifier: nil, + }) + assert.Error(t, err) + assert.Equal(t, genericErr, err) + assert.Equal(t, core.CatalogCacheStatus_CACHE_DISABLED, s.GetCacheStatus()) + assert.Nil(t, s.GetMetadata()) + }) } var tagName = "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY" diff --git a/pkg/controller/nodes/task/catalog/noop_catalog.go b/pkg/controller/nodes/task/catalog/noop_catalog.go index 2f799e442..90302c383 100644 --- a/pkg/controller/nodes/task/catalog/noop_catalog.go +++ b/pkg/controller/nodes/task/catalog/noop_catalog.go @@ -10,6 +10,10 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" ) +var ( + _ catalog.Client = &NOOPCatalog{} +) + var disabledStatus = catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil) type NOOPCatalog struct { @@ -23,6 +27,10 @@ func (n NOOPCatalog) Put(_ context.Context, _ catalog.Key, _ io.OutputReader, _ return disabledStatus, nil } +func (n NOOPCatalog) Update(_ context.Context, _ catalog.Key, _ io.OutputReader, _ catalog.Metadata) (catalog.Status, error) { + return disabledStatus, nil +} + func (n NOOPCatalog) GetOrExtendReservation(_ context.Context, _ catalog.Key, _ string, _ time.Duration) (*datacatalog.Reservation, error) { return nil, nil } diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index a5685c6f7..7a19c6d76 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -51,6 +51,7 @@ type metrics struct { catalogPutSuccessCount labeled.Counter catalogMissCount labeled.Counter catalogHitCount labeled.Counter + catalogSkipCount labeled.Counter pluginExecutionLatency labeled.StopWatch pluginQueueLatency labeled.StopWatch reservationGetSuccessCount labeled.Counter @@ -563,39 +564,47 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) // STEP 1: Check Cache if (ts.PluginPhase == pluginCore.PhaseUndefined || ts.PluginPhase == pluginCore.PhaseWaitingForCache) && checkCatalog { // This is assumed to be first time. we will check catalog and call handle - entry, err := t.CheckCatalogCache(ctx, tCtx.tr, nCtx.InputReader(), tCtx.ow) - if err != nil { - logger.Errorf(ctx, "failed to check catalog cache with error") - return handler.UnknownTransition, err - } - - if entry.GetStatus().GetCacheStatus() == core.CatalogCacheStatus_CACHE_HIT { - r := tCtx.ow.GetReader() - if r == nil { - return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "failed to reader outputs from a CacheHIT. Unexpected!") - } - - // TODO @kumare this can be optimized, if we have paths then the reader could be pipelined to a sink - o, ee, err := r.Read(ctx) + // If the cache should be skipped (requested by user for the execution), do not check datacatalog for any cached + // data, but instead always perform calculations again and overwrite the stored data after successful execution. + if nCtx.ExecutionContext().GetExecutionConfig().OverwriteCache { + logger.Info(ctx, "Execution config forced cache skip, not checking catalog") + pluginTrns.PopulateCacheInfo(catalog.NewCatalogEntry(nil, cacheSkipped)) + t.metrics.catalogSkipCount.Inc(ctx) + } else { + entry, err := t.CheckCatalogCache(ctx, tCtx.tr, nCtx.InputReader(), tCtx.ow) if err != nil { - logger.Errorf(ctx, "failed to read from catalog, err: %s", err.Error()) + logger.Errorf(ctx, "failed to check catalog cache with error") return handler.UnknownTransition, err } - if ee != nil { - logger.Errorf(ctx, "got execution error from catalog output reader? This should not happen, err: %s", ee.String()) - return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "execution error from a cache output, bad state: %s", ee.String()) - } + if entry.GetStatus().GetCacheStatus() == core.CatalogCacheStatus_CACHE_HIT { + r := tCtx.ow.GetReader() + if r == nil { + return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "failed to reader outputs from a CacheHIT. Unexpected!") + } - if err := nCtx.DataStore().WriteProtobuf(ctx, tCtx.ow.GetOutputPath(), storage.Options{}, o); err != nil { - logger.Errorf(ctx, "failed to write cached value to datastore, err: %s", err.Error()) - return handler.UnknownTransition, err - } + // TODO @kumare this can be optimized, if we have paths then the reader could be pipelined to a sink + o, ee, err := r.Read(ctx) + if err != nil { + logger.Errorf(ctx, "failed to read from catalog, err: %s", err.Error()) + return handler.UnknownTransition, err + } - pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, entry) - } else { - logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String()) - pluginTrns.PopulateCacheInfo(entry) + if ee != nil { + logger.Errorf(ctx, "got execution error from catalog output reader? This should not happen, err: %s", ee.String()) + return handler.UnknownTransition, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "execution error from a cache output, bad state: %s", ee.String()) + } + + if err := nCtx.DataStore().WriteProtobuf(ctx, tCtx.ow.GetOutputPath(), storage.Options{}, o); err != nil { + logger.Errorf(ctx, "failed to write cached value to datastore, err: %s", err.Error()) + return handler.UnknownTransition, err + } + + pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, entry) + } else { + logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String()) + pluginTrns.PopulateCacheInfo(entry) + } } } @@ -880,6 +889,7 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client unsupportedTaskType: labeled.NewCounter("unsupported_tasktype", "No Handler plugin configured for Handler type", scope), catalogHitCount: labeled.NewCounter("discovery_hit_count", "Task cached in Discovery", scope), catalogMissCount: labeled.NewCounter("discovery_miss_count", "Task not cached in Discovery", scope), + catalogSkipCount: labeled.NewCounter("discovery_skip_count", "Task lookup skipped in Discovery", scope), catalogPutSuccessCount: labeled.NewCounter("discovery_put_success_count", "Discovery Put success count", scope), catalogPutFailureCount: labeled.NewCounter("discovery_put_failure_count", "Discovery Put failure count", scope), catalogGetFailureCount: labeled.NewCounter("discovery_get_failure_count", "Discovery Get faillure count", scope), diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index bc2487a6a..0b5bf6da7 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -734,7 +734,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) { func Test_task_Handle_Catalog(t *testing.T) { - createNodeContext := func(recorder events.TaskEventRecorder, ttype string, s *taskNodeStateHolder) *nodeMocks.NodeExecutionContext { + createNodeContext := func(recorder events.TaskEventRecorder, ttype string, s *taskNodeStateHolder, overwriteCache bool) *nodeMocks.NodeExecutionContext { wfExecID := &core.WorkflowExecutionIdentifier{ Project: "project", Domain: "domain", @@ -818,7 +818,7 @@ func Test_task_Handle_Catalog(t *testing.T) { nCtx.OnEnqueueOwnerFunc().Return(nil) executionContext := &mocks.ExecutionContext{} - executionContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{}) + executionContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{OverwriteCache: overwriteCache}) executionContext.OnGetEventVersion().Return(v1alpha1.EventVersion0) executionContext.OnGetParentInfo().Return(nil) nCtx.OnExecutionContext().Return(executionContext) @@ -847,6 +847,7 @@ func Test_task_Handle_Catalog(t *testing.T) { catalogFetch bool catalogFetchError bool catalogWriteError bool + catalogSkip bool } type want struct { handlerPhase handler.EPhase @@ -898,12 +899,34 @@ func Test_task_Handle_Catalog(t *testing.T) { eventPhase: core.TaskExecution_SUCCEEDED, }, }, + { + "cache-skip-hit", + args{ + catalogFetch: true, + catalogSkip: true, + }, + want{ + handlerPhase: handler.EPhaseSuccess, + eventPhase: core.TaskExecution_SUCCEEDED, + }, + }, + { + "cache-skip-miss", + args{ + catalogFetch: false, + catalogSkip: true, + }, + want{ + handlerPhase: handler.EPhaseSuccess, + eventPhase: core.TaskExecution_SUCCEEDED, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { state := &taskNodeStateHolder{} ev := &fakeBufferedTaskEventRecorder{} - nCtx := createNodeContext(ev, "test", state) + nCtx := createNodeContext(ev, "test", state, tt.args.catalogSkip) c := &pluginCatalogMocks.Client{} if tt.args.catalogFetch { or := &ioMocks.OutputReader{} @@ -918,8 +941,10 @@ func Test_task_Handle_Catalog(t *testing.T) { } if tt.args.catalogWriteError { c.OnPutMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.Status{}, fmt.Errorf("failed to write to catalog")) + c.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.Status{}, fmt.Errorf("failed to write to catalog")) } else { c.OnPutMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) + c.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) } tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), c, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) @@ -944,14 +969,22 @@ func Test_task_Handle_Catalog(t *testing.T) { if tt.args.catalogFetch { if assert.NotNil(t, got.Info().GetInfo().TaskNodeInfo) { assert.NotNil(t, got.Info().GetInfo().TaskNodeInfo.TaskNodeMetadata) - assert.Equal(t, core.CatalogCacheStatus_CACHE_HIT, got.Info().GetInfo().TaskNodeInfo.TaskNodeMetadata.CacheStatus) + if tt.args.catalogSkip { + assert.Equal(t, core.CatalogCacheStatus_CACHE_POPULATED, got.Info().GetInfo().TaskNodeInfo.TaskNodeMetadata.CacheStatus) + } else { + assert.Equal(t, core.CatalogCacheStatus_CACHE_HIT, got.Info().GetInfo().TaskNodeInfo.TaskNodeMetadata.CacheStatus) + } } assert.NotNil(t, got.Info().GetInfo().OutputInfo) s := storage.DataReference("/output-dir/outputs.pb") assert.Equal(t, s, got.Info().GetInfo().OutputInfo.OutputURI) r, err := nCtx.DataStore().Head(context.TODO(), s) assert.NoError(t, err) - assert.True(t, r.Exists()) + assert.Equal(t, !tt.args.catalogSkip, r.Exists()) + } + if tt.args.catalogSkip { + c.AssertNotCalled(t, "Put", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + c.AssertCalled(t, "Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything) } } }) @@ -960,7 +993,7 @@ func Test_task_Handle_Catalog(t *testing.T) { func Test_task_Handle_Reservation(t *testing.T) { - createNodeContext := func(recorder events.TaskEventRecorder, ttype string, s *taskNodeStateHolder) *nodeMocks.NodeExecutionContext { + createNodeContext := func(recorder events.TaskEventRecorder, ttype string, s *taskNodeStateHolder, overwriteCache bool) *nodeMocks.NodeExecutionContext { wfExecID := &core.WorkflowExecutionIdentifier{ Project: "project", Domain: "domain", @@ -1046,7 +1079,7 @@ func Test_task_Handle_Reservation(t *testing.T) { nCtx.OnEnqueueOwnerFunc().Return(nil) executionContext := &mocks.ExecutionContext{} - executionContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{}) + executionContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{OverwriteCache: overwriteCache}) executionContext.OnGetEventVersion().Return(v1alpha1.EventVersion0) executionContext.OnGetParentInfo().Return(nil) executionContext.OnIncrementParallelism().Return(1) @@ -1063,6 +1096,7 @@ func Test_task_Handle_Reservation(t *testing.T) { type args struct { catalogFetch bool + catalogSkip bool pluginPhase pluginCore.Phase ownerID string } @@ -1114,12 +1148,40 @@ func Test_task_Handle_Reservation(t *testing.T) { eventPhase: core.TaskExecution_SUCCEEDED, }, }, + { + "cache-skip-miss", + args{ + catalogFetch: false, + catalogSkip: true, + pluginPhase: pluginCore.PhaseUndefined, + ownerID: "name-n1-1", + }, + want{ + pluginPhase: pluginCore.PhaseSuccess, + handlerPhase: handler.EPhaseSuccess, + eventPhase: core.TaskExecution_SUCCEEDED, + }, + }, + { + "cache-skip-hit", + args{ + catalogFetch: true, + catalogSkip: true, + pluginPhase: pluginCore.PhaseWaitingForCache, + ownerID: "name-n1-1", + }, + want{ + pluginPhase: pluginCore.PhaseSuccess, + handlerPhase: handler.EPhaseSuccess, + eventPhase: core.TaskExecution_SUCCEEDED, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { state := &taskNodeStateHolder{} ev := &fakeBufferedTaskEventRecorder{} - nCtx := createNodeContext(ev, "test", state) + nCtx := createNodeContext(ev, "test", state, tt.args.catalogSkip) c := &pluginCatalogMocks.Client{} nr := &nodeMocks.NodeStateReader{} st := bytes.NewBuffer([]byte{}) @@ -1142,6 +1204,7 @@ func Test_task_Handle_Reservation(t *testing.T) { c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewFailedCatalogEntry(catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, nil)), nil) } c.OnPutMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) + c.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) c.OnGetOrExtendReservationMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datacatalog.Reservation{OwnerId: tt.args.ownerID}, nil) tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), c, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) @@ -1163,6 +1226,17 @@ func Test_task_Handle_Reservation(t *testing.T) { } assert.Equal(t, tt.want.pluginPhase.String(), state.s.PluginPhase.String()) assert.Equal(t, uint32(0), state.s.PluginPhaseVersion) + // verify catalog.Put was called appropriately (overwrite param should be `true` if catalog cache is skipped) + // Put only gets called in the tests defined above that succeed and have an owner ID defined + if tt.want.pluginPhase == pluginCore.PhaseSuccess && len(tt.args.ownerID) > 0 { + if tt.args.catalogSkip { + c.AssertNotCalled(t, "Put", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + c.AssertCalled(t, "Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + } else { + c.AssertCalled(t, "Put", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + c.AssertNotCalled(t, "Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + } + } } }) } diff --git a/pkg/controller/nodes/task/pre_post_execution.go b/pkg/controller/nodes/task/pre_post_execution.go index 7ee10700e..c571d4ed4 100644 --- a/pkg/controller/nodes/task/pre_post_execution.go +++ b/pkg/controller/nodes/task/pre_post_execution.go @@ -19,7 +19,10 @@ import ( errors2 "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors" ) -var cacheDisabled = catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil) +var ( + cacheDisabled = catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil) + cacheSkipped = catalog.NewStatus(core.CatalogCacheStatus_CACHE_SKIPPED, nil) +) func (t *Handler) CheckCatalogCache(ctx context.Context, tr pluginCore.TaskReader, inputReader io.InputReader, outputWriter io.OutputWriter) (catalog.Entry, error) { tk, err := tr.Read(ctx) @@ -218,10 +221,17 @@ func (t *Handler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1 logger.Infof(ctx, "Catalog CacheEnabled. recording execution [%s/%s/%s/%s]", tk.Id.Project, tk.Id.Domain, tk.Id.Name, tk.Id.Version) // ignores discovery write failures - s, err2 := t.catalog.Put(ctx, key, r, m) - if err2 != nil { + var s catalog.Status + if executionConfig.OverwriteCache { + // Overwrite existing artifact (will create instead of no existing data was found) + s, err = t.catalog.Update(ctx, key, r, m) + } else { + // Explicitly create new artifact + s, err = t.catalog.Put(ctx, key, r, m) + } + if err != nil { t.metrics.catalogPutFailureCount.Inc(ctx) - logger.Errorf(ctx, "Failed to write results to catalog for Task [%v]. Error: %v", tk.GetId(), err2) + logger.Errorf(ctx, "Failed to write results to catalog for Task [%v]. Error: %v", tk.GetId(), err) return catalog.NewStatus(core.CatalogCacheStatus_CACHE_PUT_FAILURE, s.GetMetadata()), nil, nil } t.metrics.catalogPutSuccessCount.Inc(ctx) diff --git a/pkg/utils/failing_datastore.go b/pkg/utils/failing_datastore.go index 95762d465..fa88be68e 100644 --- a/pkg/utils/failing_datastore.go +++ b/pkg/utils/failing_datastore.go @@ -34,3 +34,7 @@ func (FailingRawStore) ReadRaw(ctx context.Context, reference storage.DataRefere func (FailingRawStore) WriteRaw(ctx context.Context, reference storage.DataReference, size int64, opts storage.Options, raw io.Reader) error { return fmt.Errorf("failed write raw") } + +func (FailingRawStore) Delete(ctx context.Context, reference storage.DataReference) error { + return fmt.Errorf("failed to delete") +} diff --git a/pkg/utils/failing_datastore_test.go b/pkg/utils/failing_datastore_test.go index 45ac107fa..6bb9cfb1a 100644 --- a/pkg/utils/failing_datastore_test.go +++ b/pkg/utils/failing_datastore_test.go @@ -24,4 +24,6 @@ func TestFailingRawStore(t *testing.T) { assert.Error(t, f.WriteRaw(ctx, "", 0, storage.Options{}, bytes.NewReader(nil))) assert.Error(t, f.CopyRaw(ctx, "", "", storage.Options{})) + + assert.Error(t, f.Delete(ctx, "")) }