diff --git a/nigsp/blocks.py b/nigsp/blocks.py index 5db5df9..41d434a 100644 --- a/nigsp/blocks.py +++ b/nigsp/blocks.py @@ -9,13 +9,25 @@ """ import logging +import os +import typing as ty -from nigsp import io, viz +import numpy as np +import pydra + +# TODO: clean import +from nigsp import io, operations +from nigsp import surrogates as surr +from nigsp import timeseries as ts +from nigsp import viz +from nigsp.objects import SCGraph from nigsp.operations import nifti +from nigsp.operations.metrics import SUPPORTED_METRICS LGR = logging.getLogger(__name__) +# @pydra.mark.task def nifti_to_timeseries(fname, atlasname): """ Read a nifti file and returns a normalised timeseries from an atlas. @@ -48,6 +60,7 @@ def nifti_to_timeseries(fname, atlasname): return timeseries, atlas, img +# @pydra.mark.task def export_metric(scgraph, outext, outprefix): """ Export the metrics computed within the library. @@ -61,6 +74,45 @@ def export_metric(scgraph, outext, outprefix): outprefix : str The desired prefix for the export. + Returns + ------- + 0 + If everything goes well + """ + return export_metric_base( + scgraph.atlas, scgraph.img, scgraph.sdi, scgraph.gsdi, outext, outprefix + ) + + +# @pydra.mark.task +# @pydra.mark.annotate( +# { +# "atlas": ty.Any, +# "img": ty.Any, +# "sdi": ty.Any, +# "gsdi": ty.Any, +# "outext": ty.Any +# } +# ) +def export_metric_base(atlas, img, sdi, gsdi, outext, outprefix): + """ + Export the metrics computed within the library. + + Parameters + ---------- + atlas : np.array + ATLAS + img : np.array + IMG + sdi : np.array + SDI + gsdi : np.array + GSDI + outext : str + The desired extension for export - it will force the type of file created. + outprefix : str + The desired prefix for the export. + Returns ------- 0 @@ -75,38 +127,222 @@ def export_metric(scgraph, outext, outprefix): "was not found. Exporting metrics in TSV format instead." ) outext = ".tsv.gz" - if scgraph.img is None: + if img is None: LGR.warning( "A necessary atlas nifti image was not found. " "Exporting metrics in TSV format instead." ) outext = ".tsv.gz" - if scgraph.sdi is not None: + if sdi is not None: if outext in io.EXT_NIFTI: - data = nifti.unfold_atlas(scgraph.sdi, scgraph.atlas) - io.export_nifti(data, scgraph.img, f"{outprefix}sdi{outext}") + data = nifti.unfold_atlas(sdi, atlas) + io.export_nifti(data, img, f"{outprefix}sdi{outext}") else: - io.export_mtx(scgraph.sdi, f"{outprefix}sdi{outext}") - elif scgraph.gsdi is not None: - for k in scgraph.gsdi.keys(): + io.export_mtx(sdi, f"{outprefix}sdi{outext}") + elif gsdi is not None: + for k in gsdi.keys(): if outext in io.EXT_NIFTI: - data = nifti.unfold_atlas(scgraph.gsdi[k], scgraph.atlas) - io.export_nifti(data, scgraph.img, f"{outprefix}gsdi_{k}{outext}") + data = nifti.unfold_atlas(gsdi[k], atlas) + io.export_nifti(data, img, f"{outprefix}gsdi_{k}{outext}") else: - io.export_mtx(scgraph.gsdi[k], f"{outprefix}gsdi_{k}{outext}") + io.export_mtx(gsdi[k], f"{outprefix}gsdi_{k}{outext}") return 0 -def plot_metric(scgraph, outprefix, atlas=None, thr=None): +@pydra.mark.task +@pydra.mark.annotate( + { + "scname": ty.Any, + "fname": ty.Any, + "atlasname": ty.Any, + "index": ty.Any, + "method": ty.Any, + "surr_type": ty.Any, + "p": ty.Any, + "comp_metric": ty.Any, + "return": { + "sc_is": ty.Any, + "func_is": ty.Any, + "atlas_is": ty.Any, + "comp_metric": ty.Any, + }, + } +) +def check_input(scname, fname, atlasname, index, method, surr_type, p, comp_metric): + # Check data files + LGR.info(f"Input structural connectivity file: {scname}") + sc_is = dict.fromkeys(io.EXT_DICT.keys(), False) + LGR.info(f"Input functional file(s): {fname}") + func_is = dict.fromkeys(io.EXT_DICT.keys(), "") + atlas_is = dict.fromkeys(io.EXT_DICT.keys(), False) + if atlasname: + LGR.info(f"Input atlas file: {atlasname}") + + # Check inputs type + for k in io.EXT_DICT.keys(): + func_is[k] = [] + for f in fname: + func_is[k] += [io.check_ext(io.EXT_DICT[k], f)[0]] + # Check that func files are all of the same kind + func_is[k] = all(func_is[k]) + + sc_is[k], _ = io.check_ext(io.EXT_DICT[k], scname) + + if atlasname is not None: + atlas_is[k], _ = io.check_ext(io.EXT_DICT[k], atlasname) + + # Check that other inputs are supported + if index != "median" and type(index) is not int: + raise ValueError(f"Index {index} of type {type(index)} is not valid.") + if method not in surr.STAT_METHOD and method is not None: + raise NotImplementedError( + f"Method {method} is not supported. Supported " + f"methods are: {surr.STAT_METHOD}" + ) + if surr_type not in surr.SURR_TYPE and surr_type is not None: + raise NotImplementedError( + f"Surrogate type {surr_type} is not supported. " + f"Supported types are: {surr.SURR_TYPE}" + ) + if p < 0 or p > 1: + raise ValueError( + "P value must be between 0 and 1, but {p} was provided instead." + ) + + # Check what metric to compute + if comp_metric not in [[], None]: + for item in comp_metric: + if item not in SUPPORTED_METRICS: + raise NotImplementedError( + f"Metric {item} is not supported. " + f"Supported metrics are: {SUPPORTED_METRICS}" + ) + else: + comp_metric = SUPPORTED_METRICS + + return sc_is, func_is, atlas_is, comp_metric + + +@pydra.mark.task +@pydra.mark.annotate( + { + "fname": ty.Any, + "scname": ty.Any, + "atlasname": ty.Any, + "sc_is": ty.Any, + "func_is": ty.Any, + "atlas_is": ty.Any, + "return": {"mtx": ty.Any, "timeseries": ty.Any, "atlas": ty.Any, "img": ty.Any}, + } +) +def read_data(fname, scname, atlasname, sc_is, func_is, atlas_is, cwd=None): + # TODO: review this dirty quick code + if cwd: + scname = os.path.normpath(os.path.join(cwd, scname)) + atlasname = os.path.normpath(os.path.join(cwd, atlasname)) + + # Read in structural connectivity matrix + + if sc_is["1D"]: + mtx = io.load_txt(scname, shape="square") + elif sc_is["mat"]: + mtx = io.load_mat(scname, shape="square") + elif sc_is["xls"]: + mtx = io.load_xls(scname, shape="square") + else: + raise NotImplementedError(f"Input file {scname} is not of a supported type.") + + # Read in atlas, if defined + if atlasname is not None: + if ( + atlas_is["1D"] or atlas_is["mat"] or atlas_is["xls"] or atlas_is["nifti"] + ) is False: + raise NotImplementedError( + f"Input file {atlasname} is not of a supported type." + ) + elif atlas_is["1D"]: + atlas = io.load_txt(atlasname) + elif atlas_is["nifti"]: + atlas, _, img = io.load_nifti_get_mask(atlasname, ndim=3) + elif atlas_is["mat"]: + atlas = io.load_mat(atlasname) + elif atlas_is["xls"]: + atlas = io.load_xls(atlasname) + else: + LGR.warning("Atlas not provided. Some functionalities might not work.") + atlas, img = None, None + + # Read in functional timeseries, join them, and normalise them + timeseries = [] + for f in fname: + # TODO: review this dirty quick line of code + f = os.path.normpath(os.path.join(cwd, f)) + if func_is["nifti"] and atlas_is["nifti"]: + t, atlas, img = nifti_to_timeseries(f, atlasname) + elif func_is["nifti"] and atlas_is["nifti"] is False: + raise NotImplementedError( + "To work with functional file(s) of nifti format, " + "specify an atlas file in nifti format." + ) + elif func_is["1D"]: + t = io.load_txt(f) + elif func_is["mat"]: + t = io.load_mat(f) + elif func_is["xls"]: + t = io.load_xls(f) + else: + raise NotImplementedError(f"Input file {f} is not of a supported type.") + + timeseries += [t[..., np.newaxis]] + + timeseries = np.concatenate(timeseries, axis=-1).squeeze() + timeseries = ts.normalise_ts(timeseries) + return mtx, timeseries, atlas, img + + +@pydra.mark.task +@pydra.mark.annotate( + { + "outdir": ty.AnyStr, + "outname": ty.AnyStr, + "return": {"outprefix": ty.Any, "outext": ty.Any}, + } +) +def prepare_output(outdir, outname=None): + if outname is not None: + _, outprefix, outext = io.check_ext(io.EXT_ALL, outname, remove=True) + outprefix = os.path.join(outdir, f"{os.path.split(outprefix)[1]}_") + else: + outprefix = f"{outdir}{os.sep}" + + return outprefix, outext + + +# TODO: might be deleted for no usage at the moment +def plot_metric(scgraph, **kwargs): """ - If possible, plot metrics as markerplot. + Call plot_metric_base with scgraph object Parameters ---------- scgraph : SCGraph object The internal object containing all data. + """ + return plot_metric_base(scgraph.sdi, scgraph.gsdi, **kwargs) + + +def plot_metric_base(sdi, gsdi, outprefix, atlas=None, thr=None): + """ + If possible, plot metrics as markerplot. + + Parameters + ---------- + sdi : + Structural Decoupling Index. + gsdi: + Generalised SDI outprefix : str The prefix of the png file to export img : 3DNiftiImage or None, optional @@ -133,14 +369,12 @@ def plot_metric(scgraph, outprefix, atlas=None, thr=None): # If it is, plot. if atlas_plot is not None: - if scgraph.sdi is not None: - viz.plot_nodes( - scgraph.sdi, atlas_plot, filename=f"{outprefix}sdi.png", thr=thr - ) - elif scgraph.gsdi is not None: - for k in scgraph.gsdi.keys(): + if sdi is not None: + viz.plot_nodes(sdi, atlas_plot, filename=f"{outprefix}sdi.png", thr=thr) + elif gsdi is not None: + for k in gsdi.keys(): viz.plot_nodes( - scgraph.gsdi[k], + gsdi[k], atlas_plot, filename=f"{outprefix}gsdi_{k}.png", thr=thr, @@ -149,6 +383,322 @@ def plot_metric(scgraph, outprefix, atlas=None, thr=None): return 0 +@pydra.mark.task +@pydra.mark.annotate( + { + "mtx": ty.Any, + "return": {"eigenval": ty.Any, "eigenvec": ty.Any, "lapl_mtx": ty.Any}, + } +) +def laplacian(mtx): + # Perform Laplacian on the structural connectivity matrix + # Return the Laplacian matrix + # operation done by : scgraph.structural_decomposition() + LGR.info("Run laplacian decomposition of structural graph.") + + lapl_mtx = operations.symmetric_normalised_laplacian(mtx) + eigenval, eigenvec = operations.decomposition(lapl_mtx) + + return eigenval, eigenvec, lapl_mtx + + +@pydra.mark.task +@pydra.mark.annotate( + { + "timeseries": ty.Any, + "eigenvec": ty.Any, + "return": {"energy": ty.Any}, + } +) +def timeseries_proj(timeseries, eigenvec, energy=True, mean=True): + # Perform timeseries projection on the graph + # Return the energy + LGR.info("Compute the energy of the graph.") + + energy = operations.graph_fourier_transform(timeseries, eigenvec, energy, mean) + return energy + + +@pydra.mark.task +@pydra.mark.annotate( + { + "energy": ty.Any, + "return": {"index": ty.Any}, + } +) +def cutoffDetection(energy, index="median"): + # Perform frequency cutoff detection + # Return the detected cutoff frequency + # if index is None: + # return index + LGR.info("Compute Cutoff Frequency.") + index = operations.median_cutoff_frequency_idx(energy) + return index + + +@pydra.mark.task +@pydra.mark.annotate( + { + "timeseries": ty.Any, + "eigenvec": ty.Any, + "index": ty.Any, + "return": {"evec_split": ty.Any, "ts_split": ty.Any}, + } +) +def filteringGSP(timeseries, eigenvec, index, keys=["low", "high"]): + # Perform filtering using GSP + # Return the filtered result + evec_split, ts_split = operations.graph_filter(timeseries, eigenvec, index, keys) + return evec_split, ts_split + + +@pydra.mark.task +@pydra.mark.annotate( + { + "timeseries": ty.Any, + "ts_split": ty.Any, + "outprefix": ty.AnyStr, + "outext": ty.AnyStr, + "return": {"fc": ty.Any, "fc_split": ty.Any}, + } +) +def functionalConnectivity(timeseries, ts_split, outprefix, outext, mean=True): + """Implement functional_connectivity as task.""" + # Perform functional connectivity analysis + if timeseries is not None: + LGR.info("Compute FC of original timeseries.") + fc = operations.functional_connectivity(timeseries, mean) + if ts_split is not None: + fc_split = dict.fromkeys(ts_split.keys()) + LGR.info("Compute FC of split timeseries.") + for k in ts_split.keys(): + LGR.info(f"Compute FC of {k} timeseries.") + fc_split[k] = operations.functional_connectivity(ts_split[k], mean) + + # IO: Save to file + for k in ts_split.keys(): + LGR.info(f"Export {k} FC (data).") + io.export_mtx(fc_split[k], f"{outprefix}fc_{k}", ext=outext) + # Export fc + LGR.info("Export original FC (data).") + io.export_mtx(fc, f"{outprefix}fc", ext=outext) + + return fc, fc_split + + +@pydra.mark.task +@pydra.mark.annotate( + { + "ts_split": ty.Any, + "outprefix": ty.AnyStr, + "outext": ty.AnyStr, + "return": {"sdi": ty.Any, "gsdi": ty.Any}, + } +) +def structuralDecouplingIndex( + ts_split, outprefix, outext, mean=False, keys=None +): # # pragma: no cover + """Implement metrics.gsdi as class method.""" + + sdi, gsdi = None, None + + # TODO: make it conditional + # This should not happen in this moment. + if len(ts_split.keys()) == 2: + metric_name = "sdi" + sdi = operations.sdi(ts_split, mean, keys) + elif len(ts_split.keys()) > 2: + metric_name = "gsdi" + gsdi = operations.gsdi(ts_split, mean, keys) + # # Export non-thresholded metrics + LGR.info(f"Export non-thresholded version of {metric_name}.") + # TODO: check if it useful to run export_metric() here and in surrogate + # export_metric(scgraph, outext, outprefix) + + return sdi, gsdi + + +@pydra.mark.task +@pydra.mark.annotate( + { + "ts_split": ty.Any, + "evec_split": ty.Any, + "eigenvec": ty.Any, + "eigenval": ty.Any, + "outprefix": ty.AnyStr, + "outext": ty.AnyStr, + } +) +def export(ts_split, evec_split, eigenvec, eigenval, outprefix, outext): + # Export eigenvalues, eigenvectors, and split timeseries and eigenvectors + for k in ts_split.keys(): + LGR.info(f"Export {k} timeseries.") + io.export_mtx(ts_split[k], f"{outprefix}timeseries_{k}", ext=outext) + LGR.info(f"Export {k} eigenvectors.") + io.export_mtx(evec_split[k], f"{outprefix}eigenvec_{k}", ext=outext) + LGR.info("Export original eigenvectors.") + io.export_mtx(eigenvec, f"{outprefix}eigenvec", ext=outext) + LGR.info("Export original eigenvalues.") + io.export_mtx(eigenval, f"{outprefix}eigenval", ext=outext) + + +@pydra.mark.task +@pydra.mark.annotate( + { + "lapl_mtx": ty.Any, + "mtx": ty.Any, + "timeseries": ty.Any, + "ts_split": ty.Any, + "fc": ty.Any, + "fc_split": ty.Any, + "img": ty.Any, + "atlas": ty.Any, + "sdi": ty.Any, + "gsdi": ty.Any, + "outprefix": ty.AnyStr, + } +) +def visualize( + lapl_mtx, mtx, timeseries, ts_split, fc, fc_split, img, atlas, sdi, gsdi, outprefix +): + # If possible, create plots! + try: + import matplotlib as _ + import nilearn as _ + + except ImportError: + LGR.warning( + "The necessary libraries for graphics (nilearn, matplotlib) " + "were not found. Skipping graphics." + ) + + # Plot original SC and Laplacian + LGR.info("Plot laplacian matrix.") + viz.plot_connectivity(lapl_mtx, f"{outprefix}laplacian.png") + LGR.info("Plot structural connectivity matrix.") + viz.plot_connectivity(mtx, f"{outprefix}sc.png") + + # Plot timeseries + LGR.info("Plot original timeseries.") + viz.plot_greyplot(timeseries, f"{outprefix}greyplot.png") + for k in ts_split.keys(): + LGR.info(f"Plot {k} timeseries.") + viz.plot_greyplot(ts_split[k], f"{outprefix}greyplot_{k}.png") + + # TODO: make it conditional + # if "dfc" in comp_metric or "fc" in comp_metric: + # Plot FC + LGR.info("Plot original functional connectivity matrix.") + viz.plot_connectivity(fc, f"{outprefix}fc.png") + for k in ts_split.keys(): + LGR.info(f"Plot {k} functional connectivity matrix.") + viz.plot_connectivity(fc_split[k], f"{outprefix}fc_{k}.png") + + # TODO: make it conditional + # if "sdi" in comp_metric or "gsdi" in comp_metric: + # if atlasname is not None: + metric_name = "sdi" if len(ts_split.keys()) == 2 else "gsdi" + LGR.info(f"Plot {metric_name} markerplot.") + if img is not None: + plot_metric_base(sdi, gsdi, outprefix, img) + elif atlas is not None: + plot_metric_base(sdi, gsdi, outprefix, atlas) + + +@pydra.mark.task +@pydra.mark.annotate( + { + "timeseries": ty.Any, + "eigenvec": ty.Any, + "lapl_mtx": ty.Any, + "index": ty.Any, + "sdi": ty.Any, + "gsdi": ty.Any, + "img": ty.Any, + "atlas": ty.Any, + "ts_split": ty.Any, + "p": ty.Any, + "method": ty.Any, + "n_surr": ty.Any, + "surr_type": ty.Any, + } +) +def surrogate( + timeseries, + eigenvec, + lapl_mtx, + index, + sdi, + gsdi, + img, + atlas, + ts_split, + p, + method, + n_surr, + surr_type=None, + seed=None, +): + # TODO: make surrogate optional + # if surr_type is not None: + outprefix = "mkd_" + # Todo: should be comperate with the version without Pydra, it was `outprefix += "mkd_"` before update + metric_name = "sdi" if len(ts_split.keys()) == 2 else "gsdi" + LGR.info( + f"Test significant {metric_name} against {n_surr} structurally " + f"{surr_type} surrogates." + ) + # scgraph.create_surrogates(sc_type=surr_type, n_surr=n_surr, seed=seed) + surr = SCGraph.create_surrogates_base( + timeseries, eigenvec, lapl_mtx, sc_type=surr_type, n_surr=n_surr, seed=seed + ) + + _, surr_split = operations.graph_filter(surr, eigenvec, index) + # TODO: make it conditional + # if "sdi" in comp_metric or "gsdi" in comp_metric: + # scgraph.test_significance(method=method, p=p, return_masked=True) + + mean = False # TODO: should be check wasn't there before update + if sdi is not None: + surr_sdi = operations.sdi(surr_split, mean, keys=None) + sdi = operations.test_significance( + surr=surr_sdi, + data=sdi, + method=method, + p=p, + return_masked=True, + mean=False, + ) + if gsdi is not None: + surr_sdi = operations.gsdi(surr_split, mean, keys=None) + gsdi = operations.test_significance( + surr=surr_sdi, + data=gsdi, + method=method, + p=p, + return_masked=True, + mean=False, + ) + + # Export thresholded metrics + # blocks.export_metric(scgraph, outext, outprefix) + # export_metric_base(atlas, img, sdi, gsdi, outext, outprefix) + + try: + import matplotlib as _ + import nilearn as _ + + if atlas is not None: + LGR.info(f"Plot {metric_name} markerplot.") + if img is not None: + plot_metric_base(sdi, gsdi, outprefix, atlas=img, thr=0) + elif atlas is not None: + plot_metric_base(sdi, gsdi, outprefix, atlas=atlas, thr=0) + + except ImportError: + pass + + """ Copyright 2022, Stefano Moia. diff --git a/nigsp/objects.py b/nigsp/objects.py index 489fb10..a5665ef 100644 --- a/nigsp/objects.py +++ b/nigsp/objects.py @@ -170,23 +170,34 @@ def compute_gsdi(self, mean=False, keys=None): # pragma: no cover def create_surrogates(self, sc_type="informed", n_surr=1000, seed=None): """Implement surrogates.sc_informed and sc_uninformed as class method.""" - sc_args = {"timeseries": self.timeseries, "n_surr": n_surr} + self.surr = SCGraph.create_surrogates_base( + self.timeseries, self.eigenvec, self.lapl_mtx, sc_type, n_surr, seed + ) + return self + + @staticmethod + def create_surrogates_base( + timeseries, eigenvec, lapl_mtx, sc_type="informed", n_surr=1000, seed=None + ): + """Implement surrogates sc_informed and sc_uninformed as static method.""" + sc_args = {"timeseries": timeseries, "n_surr": n_surr} if seed is not None: sc_args["seed"] = seed if sc_type == "informed": - sc_args["eigenvec"] = self.eigenvec - self.surr = operations.sc_informed(**sc_args) + sc_args["eigenvec"] = eigenvec + surr = operations.sc_informed(**sc_args) elif sc_type == "uninformed": - sc_args["lapl_mtx"] = self.lapl_mtx - self.surr = operations.sc_uninformed(**sc_args) + sc_args["lapl_mtx"] = lapl_mtx + surr = operations.sc_uninformed(**sc_args) else: - raise ValueError( + LGR.warning( f"Unknown option {sc_type} for surrogate creation. " "Declared type must be either 'informed' or " "'uninformed'" ) - return self + return None + return surr def test_significance( self, method="Bernoulli", p=0.05, return_masked=False, mean=False diff --git a/nigsp/workflow.py b/nigsp/workflow.py index 43afad6..db4ec7e 100644 --- a/nigsp/workflow.py +++ b/nigsp/workflow.py @@ -15,8 +15,9 @@ import sys import numpy as np +import pydra -from nigsp import _version, blocks, io, references +from nigsp import _version, blocks, io, operations, references from nigsp import surrogates as surr from nigsp import timeseries as ts from nigsp import utils, viz @@ -231,236 +232,226 @@ def nigsp( version_number = _version.get_versions()["version"] LGR.info(f"Currently running nigsp version {version_number}") - # #### Check input #### # - - # Check data files - LGR.info(f"Input structural connectivity file: {scname}") - sc_is = dict.fromkeys(io.EXT_DICT.keys(), False) - LGR.info(f"Input functional file(s): {fname}") - func_is = dict.fromkeys(io.EXT_DICT.keys(), "") - atlas_is = dict.fromkeys(io.EXT_DICT.keys(), False) - if atlasname: - LGR.info(f"Input atlas file: {atlasname}") - - # Check inputs type - for k in io.EXT_DICT.keys(): - func_is[k] = [] - for f in fname: - func_is[k] += [io.check_ext(io.EXT_DICT[k], f)[0]] - # Check that func files are all of the same kind - func_is[k] = all(func_is[k]) - - sc_is[k], _ = io.check_ext(io.EXT_DICT[k], scname) - - if atlasname is not None: - atlas_is[k], _ = io.check_ext(io.EXT_DICT[k], atlasname) - - # Check that other inputs are supported - if index != "median" and type(index) is not int: - raise ValueError(f"Index {index} of type {type(index)} is not valid.") - if method not in surr.STAT_METHOD and method is not None: - raise NotImplementedError( - f"Method {method} is not supported. Supported " - f"methods are: {surr.STAT_METHOD}" + wf1 = pydra.Workflow( + name="Input Workflow", + input_spec=[ + "scname", + "fname", + "atlasname", + "index", + "method", + "surr_type", + "p", + "comp_metric", + "outdir", + "outname", + "cwd", + ], + scname=scname, + fname=fname, + atlasname=atlasname, + index=index, + method=method, + surr_type=surr_type, + p=p, + comp_metric=comp_metric, + outdir=outdir, + outname=outname, + cwd=os.getcwd(), + ) + + wf1.add( + blocks.check_input( + name="check_input", + scname=wf1.lzin.scname, + fname=wf1.lzin.fname, + atlasname=wf1.lzin.atlasname, + index=wf1.lzin.index, + method=wf1.lzin.method, + surr_type=wf1.lzin.surr_type, + p=wf1.lzin.p, + comp_metric=wf1.lzin.comp_metric, ) - if surr_type not in surr.SURR_TYPE and surr_type is not None: - raise NotImplementedError( - f"Surrogate type {surr_type} is not supported. " - f"Supported types are: {surr.SURR_TYPE}" + ) + + wf1.add( + blocks.read_data( + name="read_data", + fname=wf1.lzin.fname, + scname=wf1.lzin.scname, + atlasname=wf1.lzin.atlasname, + sc_is=wf1.check_input.lzout.sc_is, + func_is=wf1.check_input.lzout.func_is, + atlas_is=wf1.check_input.lzout.atlas_is, + cwd=wf1.lzin.cwd, ) - if p < 0 or p > 1: - raise ValueError( - "P value must be between 0 and 1, but {p} was provided instead." + ) + + wf1.add( + blocks.prepare_output( + name="prepare_output", outdir=wf1.lzin.outdir, outname=wf1.lzin.outname ) + ) - # Check what metric to compute - if comp_metric not in [[], None]: - for item in comp_metric: - if item not in SUPPORTED_METRICS: - raise NotImplementedError( - f"Metric {item} is not supported. " - f"Supported metrics are: {SUPPORTED_METRICS}" - ) - else: - comp_metric = SUPPORTED_METRICS + wf1.set_output( + [ + # check_input + ("sc_is", wf1.check_input.lzout.sc_is), + ("func_is", wf1.check_input.lzout.func_is), + ("atlas_is", wf1.check_input.lzout.atlas_is), + ("comp_metric", wf1.check_input.lzout.comp_metric), + # read_data + ("mtx", wf1.read_data.lzout.mtx), + ("timeseries", wf1.read_data.lzout.timeseries), + ("atlas", wf1.read_data.lzout.atlas), + ("img", wf1.read_data.lzout.img), + # prepare_output + ("outprefix", wf1.prepare_output.lzout.outprefix), + ("outext", wf1.prepare_output.lzout.outext), + ] + ) - # #### Prepare Outputs #### # - if outname is not None: - _, outprefix, outext = io.check_ext(io.EXT_ALL, outname, remove=True) - outprefix = os.path.join(outdir, f"{os.path.split(outprefix)[1]}_") - else: - outprefix = f"{outdir}{os.sep}" + with pydra.Submitter(plugin="cf") as sub: + sub(wf1) + + out = wf1.result().output + + mtx = out.mtx + timeseries = out.timeseries + outprefix = out.outprefix + outext = out.outext + img = out.img + atlas = out.atlas + + wf2 = pydra.Workflow( + name="GSP+SDI Workflow", + input_spec=["mtx", "timeseries", "outprefix", "outext", "img", "atlas"], + mtx=mtx, + timeseries=timeseries, + # IO File Export + outprefix=outprefix, + outext=outext, + # Visualize + img=img, + atlas=atlas, + ) + wf2.add(blocks.laplacian(name="laplacian", mtx=wf2.lzin.mtx)) + wf2.add( + blocks.timeseries_proj( + name="timeseries_proj", + timeseries=wf2.lzin.timeseries, + eigenvec=wf2.laplacian.lzout.eigenvec, + ) + ) + wf2.add( + blocks.cutoffDetection( + name="cutoffDetection", energy=wf2.timeseries_proj.lzout.energy + ) + ) + wf2.add( + blocks.filteringGSP( + name="filteringGSP", + timeseries=wf2.lzin.timeseries, + eigenvec=wf2.laplacian.lzout.eigenvec, + index=wf2.cutoffDetection.lzout.index, + ) + ) + # TODO: make it optional + # if "dfc" in comp_metric or "fc" in comp_metric: + wf2.add( + blocks.functionalConnectivity( + name="functionalConnectivity", + timeseries=wf2.lzin.timeseries, + ts_split=wf2.filteringGSP.lzout.ts_split, + outprefix=wf2.lzin.outprefix, + outext=wf2.lzin.outext, + ) + ) - # #### Read in data #### # + wf2.add( + blocks.structuralDecouplingIndex( + name="SDI", + ts_split=wf2.filteringGSP.lzout.ts_split, + outprefix=wf2.lzin.outprefix, + outext=wf2.lzin.outext, + ) + ) - # Read in structural connectivity matrix - if sc_is["1D"]: - mtx = io.load_txt(scname, shape="square") - elif sc_is["mat"]: - mtx = io.load_mat(scname, shape="square") - elif sc_is["xls"]: - mtx = io.load_xls(scname, shape="square") - else: - raise NotImplementedError(f"Input file {scname} is not of a supported type.") - - # Read in atlas, if defined - if atlasname is not None: - if ( - atlas_is["1D"] or atlas_is["mat"] or atlas_is["xls"] or atlas_is["nifti"] - ) is False: - raise NotImplementedError( - f"Input file {atlasname} is not of a supported type." - ) - elif atlas_is["1D"]: - atlas = io.load_txt(atlasname) - elif atlas_is["nifti"]: - atlas, _, img = io.load_nifti_get_mask(atlasname, ndim=3) - elif atlas_is["mat"]: - atlas = io.load_mat(atlasname) - elif atlas_is["xls"]: - atlas = io.load_xls(atlasname) - else: - LGR.warning("Atlas not provided. Some functionalities might not work.") - atlas, img = None, None - - # Read in functional timeseries, join them, and normalise them - timeseries = [] - for f in fname: - if func_is["nifti"] and atlas_is["nifti"]: - t, atlas, img = blocks.nifti_to_timeseries(f, atlasname) - elif func_is["nifti"] and atlas_is["nifti"] is False: - raise NotImplementedError( - "To work with functional file(s) of nifti format, " - "specify an atlas file in nifti format." - ) - elif func_is["1D"]: - t = io.load_txt(f) - elif func_is["mat"]: - t = io.load_mat(f) - elif func_is["xls"]: - t = io.load_xls(f) - else: - raise NotImplementedError(f"Input file {f} is not of a supported type.") - - timeseries += [t[..., np.newaxis]] - - timeseries = np.concatenate(timeseries, axis=-1).squeeze() - timeseries = ts.normalise_ts(timeseries) - - # #### Assign SCGraph object #### # - scgraph = SCGraph(mtx, timeseries, atlas=atlas, img=img) - - # #### Compute SDI (split low vs high timeseries) and FC and output them #### # - - # Run laplacian decomposition and actually filter timeseries. - LGR.info("Run laplacian decomposition of structural graph.") - scgraph.structural_decomposition() - LGR.info("Compute the energy of the graph and split it in parts.") - scgraph.compute_graph_energy(mean=True).split_graph() - - if "sdi" in comp_metric or "gsdi" in comp_metric: - # If there are more than two splits in the timeseries, compute Generalised SDI - # This should not happen in this moment. - if len(scgraph.split_keys) == 2: - metric_name = "sdi" - scgraph.compute_sdi() - elif len(scgraph.split_keys) > 2: - metric_name = "gsdi" - scgraph.compute_gsdi() - # Export non-thresholded metrics - LGR.info(f"Export non-thresholded version of {metric_name}.") - blocks.export_metric(scgraph, outext, outprefix) - - if "dfc" in comp_metric or "fc" in comp_metric: - scgraph.compute_fc(mean=True) - for k in scgraph.split_keys: - LGR.info(f"Export {k} FC (data).") - io.export_mtx(scgraph.fc_split[k], f"{outprefix}fc_{k}", ext=outext) - # Export fc - LGR.info("Export original FC (data).") - io.export_mtx(scgraph.fc, f"{outprefix}fc", ext=outext) - - # #### Output more results (pt. 1) #### # - - # Export eigenvalues, eigenvectors, and split timeseries and eigenvectors - for k in scgraph.split_keys: - LGR.info(f"Export {k} timeseries.") - io.export_mtx(scgraph.ts_split[k], f"{outprefix}timeseries_{k}", ext=outext) - LGR.info(f"Export {k} eigenvectors.") - io.export_mtx(scgraph.evec_split[k], f"{outprefix}eigenvec_{k}", ext=outext) - LGR.info("Export original eigenvectors.") - io.export_mtx(scgraph.eigenvec, f"{outprefix}eigenvec", ext=outext) - LGR.info("Export original eigenvalues.") - io.export_mtx(scgraph.eigenval, f"{outprefix}eigenval", ext=outext) - - # #### Additional steps #### # - - # If possible, create plots! - try: - import matplotlib as _ - import nilearn as _ - - # Plot original SC and Laplacian - LGR.info("Plot laplacian matrix.") - viz.plot_connectivity(scgraph.lapl_mtx, f"{outprefix}laplacian.png") - LGR.info("Plot structural connectivity matrix.") - viz.plot_connectivity(scgraph.mtx, f"{outprefix}sc.png") - - # Plot timeseries - LGR.info("Plot original timeseries.") - viz.plot_greyplot(scgraph.timeseries, f"{outprefix}greyplot.png") - for k in scgraph.split_keys: - LGR.info(f"Plot {k} timeseries.") - viz.plot_greyplot(scgraph.ts_split[k], f"{outprefix}greyplot_{k}.png") - - if "dfc" in comp_metric or "fc" in comp_metric: - # Plot FC - LGR.info("Plot original functional connectivity matrix.") - viz.plot_connectivity(scgraph.fc, f"{outprefix}fc.png") - for k in scgraph.split_keys: - LGR.info(f"Plot {k} functional connectivity matrix.") - viz.plot_connectivity(scgraph.fc_split[k], f"{outprefix}fc_{k}.png") - if "sdi" in comp_metric or "gsdi" in comp_metric: - if atlasname is not None: - LGR.info(f"Plot {metric_name} markerplot.") - if img is not None: - blocks.plot_metric(scgraph, outprefix, img) - elif atlas is not None: - blocks.plot_metric(scgraph, outprefix, atlas) - - except ImportError: - LGR.warning( - "The necessary libraries for graphics (nilearn, matplotlib) " - "were not found. Skipping graphics." + wf2.add( + blocks.export( + name="export", + ts_split=wf2.filteringGSP.lzout.ts_split, + evec_split=wf2.filteringGSP.lzout.evec_split, + eigenvec=wf2.laplacian.lzout.eigenvec, + eigenval=wf2.laplacian.lzout.eigenval, + outprefix=wf2.lzin.outprefix, + outext=wf2.lzin.outext, ) + ) + + wf2.add( + blocks.visualize( + name="visualize", + img=wf2.lzin.img, + atlas=wf2.lzin.atlas, + timeseries=wf2.lzin.timeseries, + mtx=wf2.lzin.mtx, + lapl_mtx=wf2.laplacian.lzout.lapl_mtx, + ts_split=wf2.filteringGSP.lzout.ts_split, + fc=wf2.functionalConnectivity.lzout.fc, + fc_split=wf2.functionalConnectivity.lzout.fc_split, + sdi=wf2.SDI.lzout.sdi, + gsdi=wf2.SDI.lzout.gsdi, + outprefix=wf2.lzin.outprefix, + outext=wf2.lzin.outext, + ) + ) - # If required, create surrogates, test, and export masked metrics if surr_type is not None: - outprefix += "mkd_" - LGR.info( - f"Test significant {metric_name} against {n_surr} structurally " - f"{surr_type} surrogates." + wf2.add( + blocks.surrogate( + name="surrogate", + img=wf2.lzin.img, + atlas=wf2.lzin.atlas, + timeseries=wf2.lzin.timeseries, + mtx=wf2.lzin.mtx, + lapl_mtx=wf2.laplacian.lzout.lapl_mtx, + ts_split=wf2.filteringGSP.lzout.ts_split, + fc=wf2.functionalConnectivity.lzout.fc, + fc_split=wf2.functionalConnectivity.lzout.fc_split, + sdi=wf2.SDI.lzout.sdi, + gsdi=wf2.SDI.lzout.gsdi, + outprefix=wf2.lzin.outprefix, + outext=wf2.lzin.outext, + ) ) - scgraph.create_surrogates(sc_type=surr_type, n_surr=n_surr, seed=seed) - # #!# Export surrogates! - if "sdi" in comp_metric or "gsdi" in comp_metric: - scgraph.test_significance(method=method, p=p, return_masked=True) - # Export thresholded metrics - blocks.export_metric(scgraph, outext, outprefix) - - try: - import matplotlib as _ - import nilearn as _ - - if atlasname is not None: - LGR.info(f"Plot {metric_name} markerplot.") - if img is not None: - blocks.plot_metric(scgraph, outprefix, atlas=img, thr=0) - elif atlas is not None: - blocks.plot_metric(scgraph, outprefix, atlas=atlas, thr=0) - - except ImportError: - pass + + # setting multiple workflow output + wf2.set_output( + [ + # laplacian output + ("lapl_mtx", wf2.laplacian.lzout.lapl_mtx), + ("eigenval", wf2.laplacian.lzout.eigenval), + ("eigenvec", wf2.laplacian.lzout.eigenvec), + # timeseries projection output + ("energy", wf2.timeseries_proj.lzout.energy), + # cutoff output + ("index", wf2.cutoffDetection.lzout.index), + # filtering GSP output + ("evec_split", wf2.filteringGSP.lzout.evec_split), + ("ts_split", wf2.filteringGSP.lzout.ts_split), + # fc : functional connectity + ("fc", wf2.functionalConnectivity.lzout.fc), + ("fc_split", wf2.functionalConnectivity.lzout.fc_split), + # sdi : Structural Decoupling Index + ("sdi", wf2.SDI.lzout.sdi), + # gsdi : Generalized Structural Decoupling Index + ("gsdi", wf2.SDI.lzout.gsdi), + ] + ) + + with pydra.Submitter(plugin="cf") as sub: + sub(wf2) LGR.info(f"End of workflow, find results in {outdir}.") diff --git a/setup.cfg b/setup.cfg index 4cc67d2..0e66aa9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,7 @@ python_requires = >=3.6 install_requires = numpy>=1.17 duecredit + pydra>=0.22 tests_require = pytest >=5.3 test_suite = pytest