Skip to content

Commit

Permalink
refactored statements into separate sub-package
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Apr 27, 2024
1 parent 50a2bad commit 521aecf
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 139 deletions.
34 changes: 32 additions & 2 deletions nipype2pydra/cli/pkg_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from nipype2pydra.cli.base import cli
from nipype2pydra.package import PackageConverter
from nipype2pydra.workflow import WorkflowConverter
from nipype2pydra.node_factory import NodeFactoryConverter


@cli.command(
Expand Down Expand Up @@ -129,11 +130,18 @@ def pkg_gen(
for wf_path in spec["workflows"]:
parts = wf_path.split(".")
wf_name = parts[-1]
mod_path = ".".join(parts[:-1])
nipype_module_str = ".".join(parts[:-1])
nipype_module = import_module(nipype_module_str)
try:
getattr(nipype_module, wf_name)
except AttributeError:
raise RuntimeError(
f"Did not find workflow function {wf_name} in module {nipype_module_str}"
)
with open(workflows_spec_dir / (wf_path + ".yaml"), "w") as f:
f.write(
WorkflowConverter.default_spec(
wf_name, mod_path, defaults=wf_defaults
wf_name, nipype_module_str, defaults=wf_defaults
)
)

Expand Down Expand Up @@ -179,6 +187,28 @@ def pkg_gen(
with open(callables_fspath, "w") as f:
f.write(parsed.generate_callables(nipype_interface))

if "node_factories" in spec:
node_factories_spec_dir = spec_dir / "node_factories"
node_factories_spec_dir.mkdir(parents=True, exist_ok=True)
for node_factory_path in spec["node_factories"]:
parts = node_factory_path.split(".")
factory_name = parts[-1]
nipype_module_str = ".".join(parts[:-1])
nipype_module = import_module(nipype_module_str)
try:
getattr(nipype_module, factory_name)
except AttributeError:
raise RuntimeError(
f"Did not find factory function {factory_name} in module {nipype_module_str}"
)

with open(workflows_spec_dir / (wf_path + ".yaml"), "w") as f:
f.write(
NodeFactoryConverter.default_spec(
factory_name, nipype_module_str, defaults=wf_defaults
)
)

if interface_only_pkg:
with open(
pkg_dir
Expand Down
45 changes: 40 additions & 5 deletions nipype2pydra/node_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
from importlib import import_module
from types import ModuleType
import black.report
import yaml
from .utils import (
UsedSymbols,
extract_args,
ImportStatement,
full_address,
multiline_comment,
)
from .worflow.components import (
CommentConverter,
DocStringConverter,
from .statements import (
CommentStatement,
DocStringStatement,
)
import nipype2pydra.package
import nipype2pydra.interface
Expand All @@ -27,7 +29,7 @@
@attrs.define
class NodeFactoryConverter:
"""Specifies how the semi-automatic conversion from Nipype to Pydra should
be performed
be performed for functions that build and return Nipype nodes
Parameters
----------
Expand Down Expand Up @@ -176,7 +178,7 @@ def _converted_code(self) -> ty.Tuple[str, ty.List[str]]:
# Write out the preamble (e.g. docstring, comments, etc..)
while parsed_statements and isinstance(
parsed_statements[0],
(DocStringConverter, CommentConverter, ImportStatement),
(DocStringStatement, CommentStatement, ImportStatement),
):
preamble += str(parsed_statements.pop(0)) + "\n"

Expand Down Expand Up @@ -216,3 +218,36 @@ def _converted_code(self) -> ty.Tuple[str, ty.List[str]]:
code_str = re.sub(find, replace, code_str, flags=re.MULTILINE | re.DOTALL)

return code_str, used_configs

@classmethod
def default_spec(
cls, name: str, nipype_module: str, defaults: ty.Dict[str, ty.Any]
) -> str:
"""Generates a spec for the workflow converter from the given function"""
conv = NodeFactoryConverter(
name=name,
nipype_name=name,
nipype_module=nipype_module,
input_nodes={"inputnode": ""},
output_nodes={"outputnode": ""},
**{n: eval(v) for n, v in defaults},
)
dct = attrs.asdict(conv)
dct["nipype_module"] = dct["nipype_module"].__name__
del dct["package"]
del dct["nodes"]
for k in dct:
if not dct[k]:
dct[k] = None
yaml_str = yaml.dump(dct, sort_keys=False)
for k in dct:
fld = getattr(attrs.fields(NodeFactoryConverter), k)
hlp = fld.metadata.get("help")
if hlp:
yaml_str = re.sub(
r"^(" + k + r"):",
multiline_comment(hlp) + r"\1:",
yaml_str,
flags=re.MULTILINE,
)
return yaml_str
17 changes: 17 additions & 0 deletions nipype2pydra/statements/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .imports import ImportStatement, parse_imports, Imported # noqa: F401
from .workflow import ( # noqa: F401
AddNestedWorkflowStatement,
AddNodeStatement,
ConnectionStatement,
IterableStatement,
DynamicField,
)
from .assignment import ( # noqa: F401
NodeAssignmentStatement,
NestedWorkflowAssignmentStatement,
)
from .misc import DocStringStatement, CommentStatement, ReturnStatement # noqa: F401
from .utility import ( # noqa: F401
IdentityInterfaceNodeConverter,
FunctionNodeConverter,
)
49 changes: 49 additions & 0 deletions nipype2pydra/statements/assignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import attrs
import typing as ty
from .workflow import AddNodeStatement, AddNestedWorkflowStatement


@attrs.define
class NodeAssignmentStatement:

nodes: ty.List[AddNodeStatement] = attrs.field()
attribute: str = attrs.field()
value: str = attrs.field()
indent: str = attrs.field()

def __str__(self):
if not any(n.include for n in self.nodes):
return ""
node_name = self.nodes[0].name
workflow_variable = self.nodes[0].workflow_variable
assert (n.name == node_name for n in self.nodes)
assert (n.workflow_variable == workflow_variable for n in self.nodes)
return f"{self.indent}{workflow_variable}.{node_name}{self.attribute} = {self.value}"


@attrs.define
class NestedWorkflowAssignmentStatement:

nodes: ty.List[AddNestedWorkflowStatement] = attrs.field()
attribute: str = attrs.field()
value: str = attrs.field()
indent: str = attrs.field()

def __str__(self):
if not any(n.include for n in self.nodes):
return ""
node = self.nodes[0]
if not node.nested_spec:
raise NotImplementedError(
f"Need specification for nested workflow {node.workflow_name} in order to "
"assign to it"
)
nested_wf = node.nested_spec
parts = self.attribute.split(".")
nested_node_name = parts[2]
attribute_name = parts[3]
target_in = nested_wf.input_name(nested_node_name, attribute_name)
attribute = ".".join(parts[:2] + [target_in] + parts[4:])
workflow_variable = self.nodes[0].workflow_variable
assert (n.workflow_variable == workflow_variable for n in self.nodes)
return f"{self.indent}{workflow_variable}{attribute} = {self.value}"
File renamed without changes.
32 changes: 32 additions & 0 deletions nipype2pydra/statements/misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import typing as ty
import attrs


@attrs.define
class ReturnStatement:

vars: ty.List[str] = attrs.field(converter=lambda s: s.split(", "))
indent: str = attrs.field()

def __str__(self):
return f"{self.indent}return {', '.join(self.vars)}"


@attrs.define
class CommentStatement:

comment: str = attrs.field()
indent: str = attrs.field()

def __str__(self):
return f"{self.indent}# {self.comment}"


@attrs.define
class DocStringStatement:

docstring: str = attrs.field()
indent: str = attrs.field()

def __str__(self):
return f"{self.indent}{self.docstring}"
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import re
import attrs
from .components import NodeConverter
from .workflow import AddNodeStatement


@attrs.define
class FunctionNodeConverter(NodeConverter):
class FunctionNodeConverter(AddNodeStatement):

converted_interface = "FunctionTask"

Expand All @@ -25,7 +25,7 @@ def arg_name_vals(self):


@attrs.define
class IdentityInterfaceNodeConverter(NodeConverter):
class IdentityInterfaceNodeConverter(AddNodeStatement):

converted_interface = "FunctionTask"

Expand Down
Loading

0 comments on commit 521aecf

Please sign in to comment.