From 8e88ed28473db169d8f16fce98c4ad3b31d7c231 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 28 Dec 2024 22:47:51 +1100 Subject: [PATCH] finished off the workflow tutorial for now --- new-docs/source/tutorial/workflow.ipynb | 94 ++++++++++++++++++++++--- 1 file changed, 86 insertions(+), 8 deletions(-) diff --git a/new-docs/source/tutorial/workflow.ipynb b/new-docs/source/tutorial/workflow.ipynb index b836ec5c6..37d116d63 100644 --- a/new-docs/source/tutorial/workflow.ipynb +++ b/new-docs/source/tutorial/workflow.ipynb @@ -4,14 +4,22 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Workflow design" + "# Workflow design\n", + "\n", + "In Pydra, workflows are DAG of component tasks to be executed on specified inputs.\n", + "Workflow specifications are dataclasses, which interchangeable with Python and shell tasks\n", + "specifications and executed in the same way." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Given two task specifications, `Add` and `Mul`" + "## Constructor functions\n", + "\n", + "Workflows are typically defined using the `pydra.design.workflow.define` decorator on \n", + "a \"constructor\" function that generates the workflow. For example, given two task\n", + "specifications, `Add` and `Mul`." ] }, { @@ -47,7 +55,7 @@ "outputs": [], "source": [ "@workflow.define\n", - "def MyTestWorkflow(a, b):\n", + "def BasicWorkflow(a, b):\n", " add = workflow.add(Add(a=a, b=b))\n", " mul = workflow.add(Mul(a=add.out, b=b))\n", " return mul.out" @@ -72,7 +80,7 @@ "from fileformats import image, video\n", "\n", "@workflow.define\n", - "def MyTestShellWorkflow(\n", + "def ShellWorkflow(\n", " input_video: video.Mp4,\n", " watermark: image.Png,\n", " watermark_dims: tuple[int, int] = (10, 10),\n", @@ -209,7 +217,7 @@ " return float(value)\n", "\n", "@workflow.define\n", - "class MyLibraryWorkflow(WorkflowSpec[\"MyLibraryWorkflow.Outputs\"]):\n", + "class LibraryWorkflow(WorkflowSpec[\"MyLibraryWorkflow.Outputs\"]):\n", "\n", " a: int\n", " b: float = workflow.arg(\n", @@ -248,7 +256,7 @@ " return sum(x)\n", "\n", "@workflow.define\n", - "def MySplitWorkflow(a: list[int], b: list[float]) -> list[float]:\n", + "def SplitWorkflow(a: list[int], b: list[float]) -> list[float]:\n", " # Multiply over all combinations of the elements of a and b, then combine the results\n", " # for each a element into a list over each b element\n", " mul = workflow.add(Mul()).split(x=a, y=b).combine(\"x\")\n", @@ -271,7 +279,7 @@ "outputs": [], "source": [ "@workflow.define\n", - "def MySplitThenCombineWorkflow(a: list[int], b: list[float], c: float) -> list[float]:\n", + "def SplitThenCombineWorkflow(a: list[int], b: list[float], c: float) -> list[float]:\n", " mul = workflow.add(Mul()).split(x=a, y=b)\n", " add = workflow.add(Add(x=mul.out, y=c)).combine(\"Mul.x\")\n", " sum = workflow.add(Sum(x=add.out))\n", @@ -301,7 +309,7 @@ "outputs": [], "source": [ "@workflow.define\n", - "def MyConditionalWorkflow(\n", + "def ConditionalWorkflow(\n", " input_video: video.Mp4,\n", " watermark: image.Png,\n", " watermark_dims: tuple[int, int] | None = None,\n", @@ -371,6 +379,76 @@ "placeholders see [Conditional construction](../explanation/conditional-lazy.html)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Typing\n", + "\n", + "Pydra utilizes Python type annotations to implement strong type-checking, which is performed\n", + "when values or upstream outputs are assigned to task specification inputs.\n", + "\n", + "Task input and output fields do not need to be assigned types, since they will default to `typing.Any`.\n", + "However, if they are assigned a type and a value or output from an upstream node conflicts\n", + "with the type, a `TypeError` will be raised at construction time.\n", + "\n", + "Note that the type-checking \"assumes the best\", and will pass if the upstream field is typed\n", + "by `Any` or a super-class of the field being assigned to. For example, an input of\n", + "`fileformats.generic.File` passed to a field expecting a `fileformats.image.Png` file type,\n", + "because `Png` is a subtype of `File`, where as `fileformats.image.Jpeg` input would fail\n", + "since it is clearly not the intended type.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "from fileformats import generic\n", + "\n", + "Mp4Handbrake = shell.define(\n", + " \"HandBrakeCLI -i -o \"\n", + " \"--width --height \",\n", + ")\n", + "\n", + "\n", + "QuicktimeHandbrake = shell.define(\n", + " \"HandBrakeCLI -i -o \"\n", + " \"--width --height \",\n", + ")\n", + "\n", + "@workflow.define\n", + "def TypeErrorWorkflow(\n", + " input_video: video.Mp4,\n", + " watermark: generic.File,\n", + " watermark_dims: tuple[int, int] = (10, 10),\n", + ") -> video.Mp4:\n", + "\n", + " add_watermark = workflow.add(\n", + " shell.define(\n", + " \"ffmpeg -i -i \"\n", + " \"-filter_complex \"\n", + " )(\n", + " in_video=input_video,\n", + " watermark=watermark, # Type is OK because generic.File is superclass of image.Png\n", + " filter=\"overlay={}:{}\".format(*watermark_dims),\n", + " ),\n", + " name=\"add_watermark\",\n", + " )\n", + "\n", + " try:\n", + " handbrake = workflow.add(\n", + " QuicktimeHandbrake(in_video=add_watermark.out_video, width=1280, height=720),\n", + " ) # This will raise a TypeError because the input video is an Mp4\n", + " except TypeError:\n", + " handbrake = workflow.add(\n", + " Mp4Handbrake(in_video=add_watermark.out_video, width=1280, height=720),\n", + " ) # The type of the input video is now correct\n", + "\n", + " return handbrake.output_video" + ] + }, { "cell_type": "markdown", "metadata": {},