Skip to content

Commit

Permalink
hardware limitation implementation in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
TheoLaudatQM committed Apr 9, 2024
1 parent 59b9c6e commit d294ad1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 31 deletions.
75 changes: 55 additions & 20 deletions qualang_tools/digital_filters/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@
import numpy as np
import scipy.signal as sig
from warnings import warn
from enum import Enum


class QOP_VERSION(Enum):
QOP222 = {
"feedforward_max": 2 - 2**-16,
"feedback_max": 1 - 2**-20,
"feedforward_length": lambda feedback_len: 44 - 7 * feedback_len,
}


def calc_filter_taps(
fir: List[float] = None,
exponential: List[Tuple[float, float]] = None,
highpass: List[float] = None,
bounce: List[Tuple[float, float]] = None,
delay: int = None,
delay: float = None,
Ts: float = 1,
qop_version: Enum = QOP_VERSION.QOP222,
) -> Tuple[List[float], List[float]]:
"""
Calculate the best FIR and IIR filter taps for a system with any combination of FIR corrections, exponential
Expand Down Expand Up @@ -53,19 +63,9 @@ def calc_filter_taps(
if bounce is not None or delay is not None:
feedforward_taps = bounce_and_delay_correction(bounce, delay, feedforward_taps, Ts)

max_value = max(np.abs(feedforward_taps))

if max_value >= 2:
feedforward_taps = 1.99 * feedforward_taps / max_value

def _warning_on_one_line(message, category, filename, lineno, file=None, line=None):
return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message)
feedforward_taps = _check_hardware_limitation_for_delay(qop_version, feedforward_taps, len(feedback_taps))

warnings.formatwarning = _warning_on_one_line
warn(
f"The feedforward taps reached the maximum value of 2. \nThe coefficients are scaled down to stay within the valid range which reduces the outputted amplitude of the pulses played through the filtered port by a factor of {max_value/1.99:.3f}."
)
return list(feedforward_taps), list(feedback_taps)
return _check_hardware_limitation(qop_version, feedforward_taps, list(feedback_taps))


def exponential_decay(x, a, t):
Expand All @@ -89,7 +89,7 @@ def high_pass_exponential(x, t):
return np.exp(-x / t)


def single_exponential_correction(A: float, tau: float, Ts: float = 1):
def single_exponential_correction(A: float, tau: float, Ts: float = 1, qop_version: Enum = QOP_VERSION.QOP222):
"""
Calculate the best FIR and IIR filter taps to correct for an exponential decay (undershoot or overshoot) of the shape
`1 + A * exp(-t/tau)`.
Expand All @@ -111,10 +111,10 @@ def single_exponential_correction(A: float, tau: float, Ts: float = 1):
c2 = Ts - 2 * tau
feedback_tap = [-k2 / k1]
feedforward_taps = list(np.array([c1, c2]) / k1)
return feedforward_taps, feedback_tap
return _check_hardware_limitation(qop_version, feedforward_taps, feedback_tap)


def highpass_correction(tau: float, Ts: float = 1):
def highpass_correction(tau: float, Ts: float = 1, qop_version: Enum = QOP_VERSION.QOP222):
"""
Calculate the best FIR and IIR filter taps to correct for a highpass decay (HPF) of the shape `exp(-t/tau)`.
Expand All @@ -130,15 +130,17 @@ def highpass_correction(tau: float, Ts: float = 1):
flt = sig.butter(1, np.array([1 / tau / Ts]), btype="highpass", analog=True)
ahp2, bhp2 = sig.bilinear(flt[1], flt[0], 1e9)
feedforward_taps = list(np.array([ahp2[0], ahp2[1]]))
feedback_tap = [min(bhp2[0], 0.9999990463225004)] # Maximum value for the iir tap
return feedforward_taps, feedback_tap
feedback_tap = [bhp2[0]]
# feedback_tap = [min(bhp2[0], 0.9999990463225004)] # Maximum value for the iir tap
return _check_hardware_limitation(qop_version, feedforward_taps, feedback_tap)


def bounce_and_delay_correction(
bounce_values: list = (),
delay: int = 0,
delay: float = 0,
feedforward_taps: list = (1.0,),
Ts: float = 1,
qop_version: Enum = QOP_VERSION.QOP222,
):
"""
Calculate the FIR filter taps to correct for reflections (bounce corrections) and to add a delay.
Expand All @@ -159,6 +161,8 @@ def bounce_and_delay_correction(
bounce_values = []
if delay is None:
delay = 0
# elif delay % Ts != 0:
# raise ValueError("The provided delay is not a multiple of the sampling time.")
if feedforward_taps is None:
feedforward_taps = [1.0]
n_extra = 10
Expand Down Expand Up @@ -188,9 +192,12 @@ def bounce_and_delay_correction(
index_start = np.nonzero(feedforward_taps_x == 0)[0][0]
index_end = np.nonzero(feedforward_taps)[0][-1] + 1
extra_taps = np.abs(np.concatenate((feedforward_taps[:index_start], feedforward_taps[-index_end:])))
import matplotlib.pyplot as plt

plt.plot(extra_taps)
if np.any(extra_taps > 0.02): # Contribution is more than 2%
warnings.warn(f"Contribution from missing taps is not negligible. {max(extra_taps)}") # todo: improve message
return feedforward_taps[index_start:index_end]
return _check_hardware_limitation(qop_version, feedforward_taps[index_start:index_end], [])[0]


def _iir_correction(values, filter_type, feedforward_taps, feedback_taps, Ts=1.0):
Expand Down Expand Up @@ -221,3 +228,31 @@ def _get_coefficients_for_delay(tau, full_taps_x, Ts=1.0):
def _round_taps_close_to_zero(taps, accuracy=1e-6):
taps[np.abs(taps) < accuracy] = 0
return taps


def _check_hardware_limitation(qop_version: Enum, feedforward_taps: List, feedback_taps: List):
feedforward_taps = np.array(feedforward_taps)
feedback_taps = np.array(feedback_taps)
feedback_taps[feedback_taps > qop_version.value["feedback_max"]] = qop_version.value["feedback_max"]
feedback_taps[feedback_taps < -qop_version.value["feedback_max"]] = -qop_version.value["feedback_max"]

max_value = max(np.abs(feedforward_taps))
if max_value > qop_version.value["feedforward_max"]:
feedforward_taps = qop_version.value["feedforward_max"] * feedforward_taps / max_value

def _warning_on_one_line(message, category, filename, lineno, file=None, line=None):
return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message)

warnings.formatwarning = _warning_on_one_line
warn(
f"The feedforward taps reached the maximum value of 2. \nThe coefficients are scaled down to stay within the valid range which reduces the outputted amplitude of the pulses played through the filtered port by a factor of {max_value/1.99:.3f}."
)
return list(feedforward_taps), list(feedback_taps)


def _check_hardware_limitation_for_delay(
qop_version: Enum, feedforward_taps: List, feedback_taps_len: int):
max_feedforward_len = qop_version.value["feedforward_length"](feedback_taps_len)
if len(feedforward_taps) > max_feedforward_len:
feedforward_taps = feedforward_taps[:max_feedforward_len]
return feedforward_taps
24 changes: 13 additions & 11 deletions tests_against_server/test_digital_filters_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def IQ_imbalance(g, phi):
amplitude_fit, frequency_fit, phase_fit, offset_fit = [0, 0, 0, 0]

# FLux pulse parameters
const_flux_len = 200
const_flux_len = 2000
const_flux_amp = 0.1

# IQ Plane Angle
Expand Down Expand Up @@ -478,8 +478,9 @@ def IQ_imbalance(g, phi):
tau_lp = 100
# feedforward, feedback = single_exponential_correction(A, tau_lp)
from scipy.signal.windows import hann
feedforward, feedback = calc_filter_taps(delay=11, Ts=1)
# feedforward, feedback = calc_filter_taps(exponential=[(A, tau_lp)], highpass=[t_hp])
# feedforward, feedback = calc_filter_taps(delay=11, Ts=2)
# feedforward, feedback = calc_filter_taps(exponential=[(A, tau_lp)], highpass=[t_hp], delay=5.1)
feedforward, feedback = calc_filter_taps(delay=10.1, highpass=[t_hp])
# feedforward, feedback = calc_filter_taps(exponential=[(A, tau_lp)])
# feedforward, feedback = highpass_correction(t_hp)
# feedforward = [1.0002, -0.9998]
Expand Down Expand Up @@ -519,18 +520,18 @@ def IQ_imbalance(g, phi):
samples = job.get_simulated_samples().con1
t = np.arange(0, len(samples.analog["5"]), 1)
dt = np.where(samples.analog["5"] != 0)[0][0]
# plt.figure()
# plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (np.exp(-t[:const_flux_len] / t_hp)), 'b',label="Pulse before correction")
# plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction")
# plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (np.exp(-t[:const_flux_len] / t_hp)), 'r',label="Pulse after correction")
# plt.legend()

plt.figure()
plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'b',label="Pulse before correction")
plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (np.exp(-t[:const_flux_len] / t_hp)), 'b',label="Pulse before correction")
plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction")
plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'r',label="Pulse after correction")
plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (np.exp(-t[:const_flux_len] / t_hp)), 'r',label="Pulse after correction")
plt.legend()

# plt.figure()
# plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'b',label="Pulse before correction")
# plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction")
# plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'r',label="Pulse after correction")
# plt.legend()

# plt.figure()
# plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (np.exp(-t[:const_flux_len] / t_hp)) * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'b',label="Pulse before correction")
# plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction")
Expand All @@ -539,5 +540,6 @@ def IQ_imbalance(g, phi):
else:
# Open a quantum machine to execute the QUA program
qm = qmm.open_qm(config)
qm.set_output_filter_by_element()
# Send the QUA program to the OPX, which compiles and executes it - Execute does not block python!
job = qm.execute(hello_qua)

0 comments on commit d294ad1

Please sign in to comment.