Skip to content

Commit

Permalink
wip: issue 20 (broken)
Browse files Browse the repository at this point in the history
  • Loading branch information
patmikista committed Apr 5, 2024
1 parent 4f9d2b4 commit c757a50
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 24 deletions.
47 changes: 47 additions & 0 deletions pyPTE/core/delay_estimations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import numpy as np
import numpy.typing as npt


def delay_by_hillebrand(phase: npt.NDArray) -> npt.NDArray:
"""
Original method to compute the overall delay for all given channels.
Parameters:
----------
phase : numpy.ndarray
m x n ndarray : m: number of channels, n: number of samples.
Returns:
-------
delay : int
The computed delay.
"""
m, n = phase.shape
c1 = n * m
r_phase = np.roll(phase, 1, axis=0)
phase_product = np.multiply(phase, r_phase)
c2 = (phase_product < 0).sum()
delay = int(np.round(c1 / c2))
delay_matrix = np.full((m, m), delay, dtype=int)
return delay_matrix

def delay_by_crosscorrelation(phase_matrix: npt.NDArray) -> npt.NDArray:
m, _ = phase_matrix.shape
delay_matrix = np.zeros((m, m), dtype=int)

for i in range(m):
for j in range(i+1, m):
unwrapped_phase_i = np.unwrap(phase_matrix[i])
unwrapped_phase_j = np.unwrap(phase_matrix[j])

cross_corr = np.correlate(unwrapped_phase_i - np.mean(unwrapped_phase_i),
unwrapped_phase_j - np.mean(unwrapped_phase_j),
mode='full')

delay = np.argmax(cross_corr) - (len(unwrapped_phase_i) - 1)
delay_matrix[i, j] = delay

delay_matrix = delay_matrix - delay_matrix.T

return delay_matrix

67 changes: 44 additions & 23 deletions pyPTE/core/pyPTE.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,31 @@
import numpy.typing as npt
from scipy.signal import hilbert

from .delay_estimations import delay_by_crosscorrelation, delay_by_hillebrand

def get_delay(phase: npt.NDArray) -> int:

def get_delay(phase: npt.NDArray, method: str = 'hillebrand') -> npt.NDArray:
"""
Computes the overall delay for a all given channels
Computes the delay using specified method.
Parameters
Parameters:
----------
phase : numpy.ndarray
m x n ndarray : m: number of channels, n: number of samples
m x n ndarray : m: number of channels, n: number of samples.
method : str
The method to use for delay calculation ('original', 'cross_correlation').
Returns
Returns:
-------
delay : int
The computed delay.
"""
phase = phase
m, n = phase.shape
c1 = n * m
r_phase = np.roll(phase, 1, axis=0)
phase_product = np.multiply(phase, r_phase)
c2 = (phase_product < 0).sum()
delay = int(np.round(c1 / c2))

return delay
if method == 'hillebrand':
return delay_by_hillebrand(phase)
elif method == 'cross_correlation':
return delay_by_crosscorrelation(phase)
else:
raise ValueError(f"Unknown method: {method}")


def get_phase(time_series: npt.ArrayLike) -> npt.NDArray:
Expand Down Expand Up @@ -110,7 +112,7 @@ def get_bincount(binsize: float) -> int:
return bincount


def compute_PTE(phase: npt.NDArray, delay: int) -> npt.NDArray:
def compute_PTE(phase: npt.NDArray, delay_matrix: npt.NDArray) -> npt.NDArray:
"""
For each channel pair (x, y) containing the individual discretized phase,
which is obtained by pyPTE.pyPTE.get_discretized_phase,
Expand All @@ -137,10 +139,29 @@ def compute_PTE(phase: npt.NDArray, delay: int) -> npt.NDArray:

for i in range(0, m):
for j in range(0, m):

ypr = phase[delay:, j]
y = phase[:-delay, j]
x = phase[:-delay, i]
if i == j:
continue # Skip self-comparison

delay = delay_matrix[i, j]
delay = int(max(min(delay, n-1), -n+1))
print(delay_matrix)
print(delay)

if delay > 0:
x = phase[i, :-delay]
y = phase[j, delay:]
# Adjust y for prediction, shifting by one more sample
ypr = phase[j, delay+1:]
elif delay < 0:
x = phase[i, -delay:-1]
# Adjust x to match the prediction shift when delay is negative
y = phase[j, :delay]
# Keep ypr aligned with y, considering the negative delay
ypr = phase[j, :delay+1]
else:
x = phase[i, :-1]
y = phase[j, :-1]
ypr = phase[j, 1:]

P_y = np.zeros([y.max() + 1])
np.add.at(P_y, [y], 1)
Expand Down Expand Up @@ -170,7 +191,7 @@ def compute_PTE(phase: npt.NDArray, delay: int) -> npt.NDArray:


def compute_dPTE_rawPTE(
phase: npt.NDArray, delay: int
phase: npt.NDArray, delay_matrix: npt.NDArray
) -> Tuple[npt.NDArray, npt.NDArray]:
"""
This function calls pyPTE.pyPTE.compute_PTE to obtain a PTE matrix and performs a
Expand All @@ -195,7 +216,7 @@ def compute_dPTE_rawPTE(
dPTE : normalized PTE matrix, raw_PTE: original PTE values
"""
raw_PTE = compute_PTE(phase, delay)
raw_PTE = compute_PTE(phase, delay_matrix)

tmp = np.triu(raw_PTE) + np.tril(raw_PTE).T
with np.errstate(divide="ignore", invalid="ignore"):
Expand Down Expand Up @@ -227,9 +248,9 @@ def PTE(time_series: npt.ArrayLike) -> Tuple[npt.NDArray, npt.NDArray]:
"""
phase = get_phase(time_series)
delay = get_delay(phase)
delay_matrix = get_delay(phase)
phase_inc = phase + np.pi
binsize = get_binsize(phase_inc)
d_phase = get_discretized_phase(phase_inc, binsize)

return compute_dPTE_rawPTE(d_phase, delay)
return compute_dPTE_rawPTE(d_phase, delay_matrix)
4 changes: 3 additions & 1 deletion tests/test_pyPTE.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import pytest

from pyPTE.core.pyPTE import (
PTE,
Expand Down Expand Up @@ -47,7 +48,8 @@ def test_PTE_with_independent_signals():
binsize = get_binsize(phase_inc)
d_phase = get_discretized_phase(phase_inc, binsize)

pte_matrix = compute_PTE(d_phase, 1)
delay_matrix = np.ones((2, 2))
pte_matrix = compute_PTE(d_phase, delay_matrix)

# Check off-diagonal elements for significant PTE
# (s1 -> s2 and s2 -> s1 should be low)
Expand Down

0 comments on commit c757a50

Please sign in to comment.