diff --git a/qualang_tools/digital_filters/filters.py b/qualang_tools/digital_filters/filters.py index ed08a075..deab5012 100644 --- a/qualang_tools/digital_filters/filters.py +++ b/qualang_tools/digital_filters/filters.py @@ -3,6 +3,15 @@ import numpy as np import scipy.signal as sig from warnings import warn +from enum import Enum + + +class QOP_VERSION(Enum): + QOP222 = { + "feedforward_max": 2 - 2**-16, + "feedback_max": 1 - 2**-20, + "feedforward_length": lambda feedback_len: 44 - 7 * feedback_len, + } def calc_filter_taps( @@ -10,8 +19,9 @@ def calc_filter_taps( exponential: List[Tuple[float, float]] = None, highpass: List[float] = None, bounce: List[Tuple[float, float]] = None, - delay: int = None, + delay: float = None, Ts: float = 1, + qop_version: Enum = QOP_VERSION.QOP222, ) -> Tuple[List[float], List[float]]: """ Calculate the best FIR and IIR filter taps for a system with any combination of FIR corrections, exponential @@ -53,19 +63,9 @@ def calc_filter_taps( if bounce is not None or delay is not None: feedforward_taps = bounce_and_delay_correction(bounce, delay, feedforward_taps, Ts) - max_value = max(np.abs(feedforward_taps)) - - if max_value >= 2: - feedforward_taps = 1.99 * feedforward_taps / max_value - - def _warning_on_one_line(message, category, filename, lineno, file=None, line=None): - return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message) + feedforward_taps = _check_hardware_limitation_for_delay(qop_version, feedforward_taps, len(feedback_taps)) - warnings.formatwarning = _warning_on_one_line - warn( - f"The feedforward taps reached the maximum value of 2. \nThe coefficients are scaled down to stay within the valid range which reduces the outputted amplitude of the pulses played through the filtered port by a factor of {max_value/1.99:.3f}." - ) - return list(feedforward_taps), list(feedback_taps) + return _check_hardware_limitation(qop_version, feedforward_taps, list(feedback_taps)) def exponential_decay(x, a, t): @@ -89,7 +89,7 @@ def high_pass_exponential(x, t): return np.exp(-x / t) -def single_exponential_correction(A: float, tau: float, Ts: float = 1): +def single_exponential_correction(A: float, tau: float, Ts: float = 1, qop_version: Enum = QOP_VERSION.QOP222): """ Calculate the best FIR and IIR filter taps to correct for an exponential decay (undershoot or overshoot) of the shape `1 + A * exp(-t/tau)`. @@ -111,10 +111,10 @@ def single_exponential_correction(A: float, tau: float, Ts: float = 1): c2 = Ts - 2 * tau feedback_tap = [-k2 / k1] feedforward_taps = list(np.array([c1, c2]) / k1) - return feedforward_taps, feedback_tap + return _check_hardware_limitation(qop_version, feedforward_taps, feedback_tap) -def highpass_correction(tau: float, Ts: float = 1): +def highpass_correction(tau: float, Ts: float = 1, qop_version: Enum = QOP_VERSION.QOP222): """ Calculate the best FIR and IIR filter taps to correct for a highpass decay (HPF) of the shape `exp(-t/tau)`. @@ -130,15 +130,17 @@ def highpass_correction(tau: float, Ts: float = 1): flt = sig.butter(1, np.array([1 / tau / Ts]), btype="highpass", analog=True) ahp2, bhp2 = sig.bilinear(flt[1], flt[0], 1e9) feedforward_taps = list(np.array([ahp2[0], ahp2[1]])) - feedback_tap = [min(bhp2[0], 0.9999990463225004)] # Maximum value for the iir tap - return feedforward_taps, feedback_tap + feedback_tap = [bhp2[0]] + # feedback_tap = [min(bhp2[0], 0.9999990463225004)] # Maximum value for the iir tap + return _check_hardware_limitation(qop_version, feedforward_taps, feedback_tap) def bounce_and_delay_correction( bounce_values: list = (), - delay: int = 0, + delay: float = 0, feedforward_taps: list = (1.0,), Ts: float = 1, + qop_version: Enum = QOP_VERSION.QOP222, ): """ Calculate the FIR filter taps to correct for reflections (bounce corrections) and to add a delay. @@ -159,6 +161,8 @@ def bounce_and_delay_correction( bounce_values = [] if delay is None: delay = 0 + # elif delay % Ts != 0: + # raise ValueError("The provided delay is not a multiple of the sampling time.") if feedforward_taps is None: feedforward_taps = [1.0] n_extra = 10 @@ -188,9 +192,12 @@ def bounce_and_delay_correction( index_start = np.nonzero(feedforward_taps_x == 0)[0][0] index_end = np.nonzero(feedforward_taps)[0][-1] + 1 extra_taps = np.abs(np.concatenate((feedforward_taps[:index_start], feedforward_taps[-index_end:]))) + import matplotlib.pyplot as plt + + plt.plot(extra_taps) if np.any(extra_taps > 0.02): # Contribution is more than 2% warnings.warn(f"Contribution from missing taps is not negligible. {max(extra_taps)}") # todo: improve message - return feedforward_taps[index_start:index_end] + return _check_hardware_limitation(qop_version, feedforward_taps[index_start:index_end], [])[0] def _iir_correction(values, filter_type, feedforward_taps, feedback_taps, Ts=1.0): @@ -221,3 +228,31 @@ def _get_coefficients_for_delay(tau, full_taps_x, Ts=1.0): def _round_taps_close_to_zero(taps, accuracy=1e-6): taps[np.abs(taps) < accuracy] = 0 return taps + + +def _check_hardware_limitation(qop_version: Enum, feedforward_taps: List, feedback_taps: List): + feedforward_taps = np.array(feedforward_taps) + feedback_taps = np.array(feedback_taps) + feedback_taps[feedback_taps > qop_version.value["feedback_max"]] = qop_version.value["feedback_max"] + feedback_taps[feedback_taps < -qop_version.value["feedback_max"]] = -qop_version.value["feedback_max"] + + max_value = max(np.abs(feedforward_taps)) + if max_value > qop_version.value["feedforward_max"]: + feedforward_taps = qop_version.value["feedforward_max"] * feedforward_taps / max_value + + def _warning_on_one_line(message, category, filename, lineno, file=None, line=None): + return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message) + + warnings.formatwarning = _warning_on_one_line + warn( + f"The feedforward taps reached the maximum value of 2. \nThe coefficients are scaled down to stay within the valid range which reduces the outputted amplitude of the pulses played through the filtered port by a factor of {max_value/1.99:.3f}." + ) + return list(feedforward_taps), list(feedback_taps) + + +def _check_hardware_limitation_for_delay( + qop_version: Enum, feedforward_taps: List, feedback_taps_len: int): + max_feedforward_len = qop_version.value["feedforward_length"](feedback_taps_len) + if len(feedforward_taps) > max_feedforward_len: + feedforward_taps = feedforward_taps[:max_feedforward_len] + return feedforward_taps diff --git a/tests_against_server/test_digital_filters_server.py b/tests_against_server/test_digital_filters_server.py index 8a1b56f2..7ab0388c 100644 --- a/tests_against_server/test_digital_filters_server.py +++ b/tests_against_server/test_digital_filters_server.py @@ -187,7 +187,7 @@ def IQ_imbalance(g, phi): amplitude_fit, frequency_fit, phase_fit, offset_fit = [0, 0, 0, 0] # FLux pulse parameters -const_flux_len = 200 +const_flux_len = 2000 const_flux_amp = 0.1 # IQ Plane Angle @@ -478,8 +478,9 @@ def IQ_imbalance(g, phi): tau_lp = 100 # feedforward, feedback = single_exponential_correction(A, tau_lp) from scipy.signal.windows import hann -feedforward, feedback = calc_filter_taps(delay=11, Ts=1) -# feedforward, feedback = calc_filter_taps(exponential=[(A, tau_lp)], highpass=[t_hp]) +# feedforward, feedback = calc_filter_taps(delay=11, Ts=2) +# feedforward, feedback = calc_filter_taps(exponential=[(A, tau_lp)], highpass=[t_hp], delay=5.1) +feedforward, feedback = calc_filter_taps(delay=10.1, highpass=[t_hp]) # feedforward, feedback = calc_filter_taps(exponential=[(A, tau_lp)]) # feedforward, feedback = highpass_correction(t_hp) # feedforward = [1.0002, -0.9998] @@ -519,18 +520,18 @@ def IQ_imbalance(g, phi): samples = job.get_simulated_samples().con1 t = np.arange(0, len(samples.analog["5"]), 1) dt = np.where(samples.analog["5"] != 0)[0][0] - # plt.figure() - # plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (np.exp(-t[:const_flux_len] / t_hp)), 'b',label="Pulse before correction") - # plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction") - # plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (np.exp(-t[:const_flux_len] / t_hp)), 'r',label="Pulse after correction") - # plt.legend() - plt.figure() - plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'b',label="Pulse before correction") + plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (np.exp(-t[:const_flux_len] / t_hp)), 'b',label="Pulse before correction") plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction") - plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'r',label="Pulse after correction") + plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (np.exp(-t[:const_flux_len] / t_hp)), 'r',label="Pulse after correction") plt.legend() + # plt.figure() + # plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'b',label="Pulse before correction") + # plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction") + # plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'r',label="Pulse after correction") + # plt.legend() + # plt.figure() # plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (np.exp(-t[:const_flux_len] / t_hp)) * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'b',label="Pulse before correction") # plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction") @@ -539,5 +540,6 @@ def IQ_imbalance(g, phi): else: # Open a quantum machine to execute the QUA program qm = qmm.open_qm(config) + qm.set_output_filter_by_element() # Send the QUA program to the OPX, which compiles and executes it - Execute does not block python! job = qm.execute(hello_qua)