diff --git a/CHANGELOG.md b/CHANGELOG.md index 32afdd9b..b116ba8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - octave_tools - Added `get_calibration_parameters_from_db` get the most up-to-date correction parameters in the calibration database for the specified values of the Octave LO frequency, intermediate frequency and Octave gain. - octave_tools - Added `set_correction_parameters_to_opx` set the most up-to-date correction parameters from the calibration database for the specified values of the Octave LO frequency, intermediate frequency and Octave gain. - octave_tools - Added `get_correction_for_each_LO_and_IF` get the correction matrix elements for a set of intermediate frequencies picked equally spaced in a given list in order to update the correction matrix while performing the IF sweep in QUA. +- digital_filters - Added library of functions allowing the derivation of the digital filter taps to correct distortions. - macros - Added `long_wait` convenience macro to simplify waiting for longer than the maximum wait time. ### Changed diff --git a/README.md b/README.md index 7702b9f5..568c9e5e 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ storing them in the usual configuration file. It allows defining waveforms in a * [ManualOutputControl](qualang_tools/control_panel/README_manual_output_control.md) - This module allows controlling the outputs from the OPX in CW mode. Once created, it has an API for defining which channels are on. Analog channels also have an API for defining their amplitude and frequency. * [VNA](qualang_tools/control_panel/README_vna.md) - This module allows to configure the OPX as a VNA for a given element (readout resonator for instance) and operation (readout pulse for instance) already defined in the configuration. Once created, it has an API for defining which measurements are to be run depending on the down-conversion solution used (ED: envelope detector, IR: image rejection mixer, IQ: IQ mixer). * [Macros](qualang_tools/macros/README.md) - This module includes convenience functions for encapsulating common QUA code. +* [Digital filters](qualang_tools/digital_filters/README.md) - Library of functions allowing the derivation of the digital filter taps to correct distortions. ## Installation diff --git a/qualang_tools/digital_filters/README.md b/qualang_tools/digital_filters/README.md new file mode 100644 index 00000000..4de247f4 --- /dev/null +++ b/qualang_tools/digital_filters/README.md @@ -0,0 +1,95 @@ +# Digital filter tools +This library includes tools for deriving the taps of the OPX digital filters (IIR and FIR). + +Such filters are generally used to correct for the high-pass filtering occurring on the fast line of a bias-tee, +or to correct flux pulses for possible distortions happening on the flux lines of superconducting qubit chips. + +More details about these types of filter and how they are implemented on the OPX can be found [here](https://docs.quantum-machines.co/1.1.7/qm-qua-sdk/docs/Guides/output_filter/?h=iir#output-filter) + +The goal of the following functions is to allow users to easily implement such filters by deriving the IIR and FIR taps +from the measured distortions: +* [Single exponential correction](#singleexponentialcorrection): correct for a low-pass exponential decay `1 + A * exp(-t/tau)`. +* [Highpass correction](#highpasscorrection): correct for a high pass exponential decay `exp(-t/tau)`. +* [Bounce and delay correction](#bounceanddelaycorrection): correct for reflections and delays. +* [Calc filter taps](#calcfiltertaps): correct for any combination of the aforementioned compensations. + +## Usage examples + +### single_exponential_correction +Calculate the best FIR and IIR filter taps to correct for an exponential decay (undershoot or overshoot) of the shape +`1 + A * exp(-t/tau)`. You can use the `exponential_decay` function for fitting your data as shown below. + +#### +```python +from scipy import optimize +from qualang_tools.digital_filters import exponential_decay, single_exponential_correction + +# Fit your data with the exponential_decay function +[A_lpf, tau_lpf_ns], _ = optimize.curve_fit( + exponential_decay, + x_data, + y_data, +) +# Derive the corresponding taps +feedforward_taps, feedback_tap = single_exponential_correction(A_lpf, tau_lpf_ns) +# Update the config with the digital filter parameters +config["controllers"]["con1"]["analog_outputs"][port_number] = { + "offset": 0.0, + "filter": {"feedforward": feedforward_taps, "feedback": feedback_tap}} +``` + +### highpass_correction +Calculate the best FIR and IIR filter taps to correct for a highpass decay (high-pass filter) of the shape `exp(-t/tau)`. +You can use the `high_pass_exponential` function for fitting your data as shown below. + +#### +```python +from scipy import optimize +from qualang_tools.digital_filters import high_pass_exponential, highpass_correction + +# Fit your data with the exponential_decay function +[tau_hpf_ns], _ = optimize.curve_fit( + high_pass_exponential, + x_data, + y_data, +) +# Derive the taps from the time constant of the exponential highpass decay tau +feedforward_taps, feedback_tap = highpass_correction(tau_hpf_ns) +# Update the config with the digital filter parameters +config["controllers"]["con1"]["analog_outputs"][port_number] = { + "offset": 0.0, + "filter": {"feedforward": feedforward_taps, "feedback": feedback_tap}} +``` + +### bounce_and_delay_correction +Calculate the FIR filter taps to correct for reflections (bounce corrections) and to add a delay. + +#### +```python +from qualang_tools.digital_filters import bounce_and_delay_correction + + + +``` + +### calc_filter_taps +Calculate the best FIR and IIR filter taps for a system with any combination of FIR corrections, exponential +corrections (undershoot or overshoot), high pass compensation, reflections (bounce corrections) and a needed delay on the line. + +#### +```python +from qualang_tools.digital_filters import calc_filter_taps + +# Derive the taps for correction all the identified distortions (high-pass, low-pass, reflection and delay) +feedforward_taps, feedback_tap = calc_filter_taps( + fir=None, + exponential=list(zip([A_lpf_1, A_lpf_2,...], [tau_lpf_ns_1, tau_lpf_ns_2,...])), + highpass=[tau_hpf_ns], + bounce=[(a_bounce, tau_bounce),], + delay=20, +) +# Update the config with the digital filter parameters +config["controllers"]["con1"]["analog_outputs"][port_number] = { + "offset": 0.0, + "filter": {"feedforward": feedforward_taps, "feedback": feedback_tap}} +``` diff --git a/qualang_tools/digital_filters/__init__.py b/qualang_tools/digital_filters/__init__.py new file mode 100644 index 00000000..71d0f65e --- /dev/null +++ b/qualang_tools/digital_filters/__init__.py @@ -0,0 +1,19 @@ +from qualang_tools.digital_filters.filters import ( + QOPVersion, + calc_filter_taps, + exponential_decay, + high_pass_exponential, + single_exponential_correction, + highpass_correction, + bounce_and_delay_correction, +) + +__all__ = [ + "QOPVersion", + "calc_filter_taps", + "exponential_decay", + "high_pass_exponential", + "single_exponential_correction", + "highpass_correction", + "bounce_and_delay_correction", +] diff --git a/qualang_tools/digital_filters/filters.py b/qualang_tools/digital_filters/filters.py new file mode 100644 index 00000000..6a88b2f6 --- /dev/null +++ b/qualang_tools/digital_filters/filters.py @@ -0,0 +1,310 @@ +import warnings +from typing import Tuple, List +import numpy as np +import scipy.signal as sig +from warnings import warn +from enum import Enum + + +class QOPVersion(Enum): + NONE = { + "feedforward_max": np.inf, + "feedback_max": np.inf, + "feedforward_length": lambda feedback_len: np.inf - 0 * feedback_len, + } + QOP222 = { + "feedforward_max": 2 - 2**-16, + "feedback_max": 1 - 2**-20, + "feedforward_length": lambda feedback_len: 44 - 7 * feedback_len, + } + QOP220 = { + "feedforward_max": 2 - 2**-16, + "feedback_max": 1 - 2**-20, + "feedforward_length": lambda feedback_len: 44 - 7 * feedback_len, + } + + @classmethod + def get_latest(cls): + """Return the latest QOP version.""" + return cls.QOP222 + + @classmethod + def get_options(cls): + """Return the list of implemented QOP versions""" + return [cls.NONE.name, cls.QOP220.name, cls.QOP222.name] + + +def calc_filter_taps( + fir: List[float] = None, + exponential: List[Tuple[float, float]] = None, + highpass: List[float] = None, + bounce: List[Tuple[float, float]] = None, + delay: float = None, + Ts: float = 1, + qop_version: QOPVersion = QOPVersion.get_latest(), +) -> Tuple[List[float], List[float]]: + """ + Calculate the best FIR and IIR filter taps for a system with any combination of FIR corrections, exponential + corrections (undershoot or overshoot), high pass compensation, reflections (bounce corrections) and a needed delay on the line. + The OPX has hardware constraints that may limit the filter implementation and this is why the running QOP version can be specified as an enum of the class QOPVersion. + The possible options are returned by the `QOPVersion.get_options()` method and the default value is given by `QOPVersion.get_latest()`. + + Args: + fir: A list of the needed FIR taps. These would be convoluted with whatever other FIR taps + are required by the other filters. + exponential: A list of tuples (A, tau), each tuple represents an exponential decay of the shape + `1 + A * exp(-t/tau)`. `tau` is in ns. Each exponential correction requires 1 IIR tap and 2 FIR taps. + highpass: A list of taus, each tau represents a highpass decay of the shape `exp(-t/tau)`. + `tau` is in ns. Each highpass correction requires 1 IIR tap and 2 FIR taps. + bounce: A list of tuples (a, tau), each tuple represents a reflection of amplitude `a` happening at time `tau`. + `tau` is in ns. Note, if `tau` is not a multiple of the sampling rate, multiple FIR taps will be created. + If `tau` is smaller than 5 taps, accuracy might be lost. + delay: A global delay to apply using the FIR filters. + `delay` is in ns. Note, if `delay` is not a multiple of the sampling rate, multiple FIR taps will be + created. If `delay` is smaller than 5 taps, accuracy might be lost. + Ts: The sampling rate (in ns) of the system and filter taps. + qop_version: running QOP version used to format the taps according to the corresponding hardware limitations (ex: QOPVersion.QOP222). + Returns: + A tuple of two lists. + The first is a list of FIR (feedforward) taps starting at 0 and spaced `Ts` apart. + The second is a list of IIR (feedback) taps. + """ + feedforward_taps = np.array([1.0]) + feedback_taps = np.array([]) + + if highpass is not None: + feedforward_taps, feedback_taps = _iir_correction(highpass, "highpass", feedforward_taps, feedback_taps, Ts) + + if exponential is not None: + feedforward_taps, feedback_taps = _iir_correction( + exponential, "exponential", feedforward_taps, feedback_taps, Ts + ) + + if fir is not None: + feedforward_taps = np.convolve(feedforward_taps, fir) + + if bounce is not None or delay is not None: + feedforward_taps = bounce_and_delay_correction(bounce, delay, feedforward_taps, Ts, qop_version=QOPVersion.NONE) + + return _check_hardware_limitation(qop_version, feedforward_taps, list(feedback_taps)) + + +def exponential_decay(x, a, t): + """Function representing the exponential decay defined as 1 + a * np.exp(-x / t). + + :param x: numpy array for the time vector in ns + :param a: float for the exponential amplitude + :param t: float for the exponential decay time in ns + :return: numpy array for the exponential decay + """ + return 1 + a * np.exp(-x / t) + + +def high_pass_exponential(x, t): + """Function representing the exponential decay defined as np.exp(-x / t). + + :param x: numpy array for the time vector in ns + :param t: float for the exponential decay time in ns + :return: numpy array for the exponential decay + """ + return np.exp(-x / t) + + +def single_exponential_correction( + A: float, tau: float, Ts: float = 1, qop_version: QOPVersion = QOPVersion.get_latest() +): + """ + Calculate the best FIR and IIR filter taps to correct for an exponential decay (undershoot or overshoot) of the shape + `1 + A * exp(-t/tau)`. + The OPX has hardware constraints that may limit the filter implementation and this is why the running QOP version can be specified as an enum of the class QOPVersion. + The possible options are returned by the `QOPVersion.get_options()` method and the default value is given by `QOPVersion.get_latest()`. + + Args: + A: The exponential decay pre-factor. + tau: The time constant for the exponential decay, given in ns. + Ts: The sampling rate (in ns) of the system and filter taps. + qop_version: running QOP version used to format the taps according to the corresponding hardware limitations (ex: QOPVersion.QOP222). + Returns: + A tuple of two items. + The first is a list of 2 FIR (feedforward) taps starting at 0 and spaced `Ts` apart. + The second is a single IIR (feedback) tap. + """ + tau *= 1e-9 + Ts *= 1e-9 + k1 = Ts + 2 * tau * (A + 1) + k2 = Ts - 2 * tau * (A + 1) + c1 = Ts + 2 * tau + c2 = Ts - 2 * tau + feedback_tap = [-k2 / k1] + feedforward_taps = list(np.array([c1, c2]) / k1) + return _check_hardware_limitation(qop_version, feedforward_taps, feedback_tap) + + +def highpass_correction(tau: float, Ts: float = 1, qop_version: QOPVersion = QOPVersion.get_latest()): + """ + Calculate the best FIR and IIR filter taps to correct for a highpass decay (HPF) of the shape `exp(-t/tau)`. + The OPX has hardware constraints that may limit the filter implementation and this is why the running QOP version can be specified as an enum of the class QOPVersion. + The possible options are returned by the `QOPVersion.get_options()` method and the default value is given by `QOPVersion.get_latest()`. + + Args: + tau: The time constant for the exponential decay, given in ns. + Ts: The sampling rate (in ns) of the system and filter taps. + qop_version: running QOP version used to format the taps according to the corresponding hardware limitations (ex: QOPVersion.QOP222). + Returns: + A tuple of two items. + The first is a list of 2 FIR (feedforward) taps starting at 0 and spaced `Ts` apart. + The second is a single IIR (feedback) tap. + """ + Ts *= 1e-9 + flt = sig.butter(1, np.array([1 / tau / Ts]), btype="highpass", analog=True) + ahp2, bhp2 = sig.bilinear(flt[1], flt[0], 1e9) + feedforward_taps = list(np.array([ahp2[0], ahp2[1]])) + feedback_tap = [bhp2[0]] + return _check_hardware_limitation(qop_version, feedforward_taps, feedback_tap) + + +def bounce_and_delay_correction( + bounce_values: list = (), + delay: float = 0, + feedforward_taps: list = (1.0,), + Ts: float = 1, + qop_version: QOPVersion = QOPVersion.get_latest(), +): + """ + Calculate the FIR filter taps to correct for reflections (bounce corrections) and to add a delay. + The OPX has hardware constraints that may limit the filter implementation and this is why the running QOP version can be specified as an enum of the class QOPVersion. + The possible options are returned by the `QOPVersion.get_options()` method and the default value is given by `QOPVersion.get_latest()`. + + Args: + bounce_values: A list of tuples (a, tau), each tuple represents a reflection of amplitude `a` happening at time + `tau`. `tau` is in ns. Note, if `tau` is not a multiple of the sampling rate, multiple FIR taps will be + created. If `tau` is smaller than 5 taps, accuracy might be lost. + delay: A global delay to apply using the FIR filters. + `delay` is in ns. Note, if `delay` is not a multiple of the sampling rate, multiple FIR taps will be + created. If `delay` is smaller than 5 taps, accuracy might be lost. + feedforward_taps: Existing FIR (feedforward) taps to be convoluted with the resulting taps. + Ts: The sampling rate (in ns) of the system and filter taps. + qop_version: running QOP version used to format the taps according to the corresponding hardware limitations (ex: QOPVersion.QOP222). + Returns: + A list of FIR (feedforward) taps starting at 0 and spaced `Ts` apart. + """ + if bounce_values is None: + bounce_values = [] + if delay is None: + delay = 0 + + if feedforward_taps is None: + feedforward_taps = [1.0] + n_extra = 10 + n_taps = 101 + long_taps_x = np.linspace((0 - n_extra) * Ts, (n_taps + n_extra) * Ts, n_taps + 1 + 2 * n_extra)[0:-1] + feedforward_taps_x = np.linspace(0, (len(feedforward_taps) - 1) * Ts, len(feedforward_taps)) + + delay_taps = _get_coefficients_for_delay(delay, long_taps_x, Ts) + + feedforward_taps = np.convolve(feedforward_taps, delay_taps) + feedforward_taps_x = np.linspace( + min(feedforward_taps_x) + min(long_taps_x), + max(feedforward_taps_x) + max(long_taps_x), + len(feedforward_taps), + ) + for i, (a, tau) in enumerate(bounce_values): + bounce_taps = -a * _get_coefficients_for_delay(tau, long_taps_x, Ts) + bounce_taps[n_extra] += 1 + feedforward_taps = np.convolve(feedforward_taps, bounce_taps) + feedforward_taps_x = np.linspace( + min(feedforward_taps_x) + min(long_taps_x), + max(feedforward_taps_x) + max(long_taps_x), + len(feedforward_taps), + ) + + feedforward_taps = _round_taps_close_to_zero(feedforward_taps) + index_start = np.nonzero(feedforward_taps_x == 0)[0][0] + index_end = np.nonzero(feedforward_taps)[0][-1] + 1 + final_taps = feedforward_taps[index_start:index_end] + + return _check_hardware_limitation(qop_version, final_taps, [])[0] + + +def _iir_correction(values, filter_type, feedforward_taps, feedback_taps, Ts=1.0): + b = np.zeros((2, len(values))) + feedback_taps = np.append(np.zeros(len(values)), feedback_taps) + + if filter_type == "highpass": + for i, tau in enumerate(values): + b[:, i], [feedback_taps[i]] = highpass_correction(tau, Ts, qop_version=QOPVersion.NONE) + elif filter_type == "exponential": + for i, (A, tau) in enumerate(values): + b[:, i], [feedback_taps[i]] = single_exponential_correction(A, tau, Ts, qop_version=QOPVersion.NONE) + else: + raise Exception("Unknown filter type") + + for i in range(len(values)): + feedforward_taps = np.convolve(feedforward_taps, b[:, i]) + + return feedforward_taps, feedback_taps + + +def _get_coefficients_for_delay(tau, full_taps_x, Ts=1.0): + full_taps = np.sinc((full_taps_x - tau) / Ts) + full_taps = _round_taps_close_to_zero(full_taps) + return full_taps + + +def _round_taps_close_to_zero(taps, accuracy=1e-6): + taps[np.abs(taps) < accuracy] = 0 + return taps + + +def _check_hardware_limitation(qop_version: QOPVersion, feedforward_taps: List, feedback_taps: List): + def _warning_on_one_line(message, category, filename, lineno, file=None, line=None): + return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message) + + warnings.formatwarning = _warning_on_one_line + warnings.simplefilter("always", UserWarning) + + feedback_taps = np.array(feedback_taps) + feedforward_taps = np.array(feedforward_taps) + + # Check limitation on the number of feedforward taps + max_feedforward_len = qop_version.value["feedforward_length"](len(feedback_taps)) + if len(feedforward_taps) > max_feedforward_len: + extra_taps = feedforward_taps[max_feedforward_len:] + final_taps = feedforward_taps[:max_feedforward_len] + if np.max(np.sum(extra_taps)) / np.max(np.sum(final_taps)) > 0.10: # Contribution is more than 2% + raise RuntimeError( + f"The number of feedforward taps exceed the maximum length of " + f"{qop_version.value['feedforward_length'](len(feedback_taps))}.\nRemoved all taps afterwards." + f" The contribution from the removed taps was " + f"{np.max(np.sum(extra_taps)) / np.max(np.sum(final_taps)) * 100:.1f}%." + ) + elif np.max(np.sum(extra_taps)) / np.max(np.sum(final_taps)) > 0.02: # Contribution is more than 2% + warnings.warn( + f"The number of feedforward taps exceed the maximum length of " + f"{qop_version.value['feedforward_length'](len(feedback_taps))}.\nRemoved all taps afterwards." + f" The contribution from the removed taps was " + f"{np.max(np.sum(extra_taps)) / np.max(np.sum(final_taps)) * 100:.1f}%." + ) + feedforward_taps = final_taps + + # Check limitation on the max value of a feedback tap + if np.any(np.abs(feedback_taps) > qop_version.value["feedback_max"]): + feedback_taps[feedback_taps > qop_version.value["feedback_max"]] = qop_version.value["feedback_max"] + feedback_taps[feedback_taps < -qop_version.value["feedback_max"]] = -qop_version.value["feedback_max"] + warn( + f"The feedback taps reached the maximum value of {qop_version.value['feedback_max']} which may result in " + f"a non-optimal filter implementation." + ) + + # Check limitation on the max value of a feedforward tap + max_value = max(np.abs(feedforward_taps)) + if max_value > qop_version.value["feedforward_max"]: + feedforward_taps = qop_version.value["feedforward_max"] * feedforward_taps / max_value + warn( + f"The feedforward taps reached the maximum value of {qop_version.value['feedforward_max']}.\n" + f"The coefficients are scaled down to stay within the valid range which reduces the outputted amplitude of" + f" the pulses played through the filtered port by a factor of " + f"{max_value/qop_version.value['feedforward_max']:.3f}." + ) + + return list(feedforward_taps), list(feedback_taps) diff --git a/tests/test_digital_filters.py b/tests/test_digital_filters.py new file mode 100644 index 00000000..2a1f0d56 --- /dev/null +++ b/tests/test_digital_filters.py @@ -0,0 +1,171 @@ +import pytest +import numpy as np +from qualang_tools.digital_filters import * + + +@pytest.mark.parametrize("low_pass_exp", [ + [500, -0.25, 200], + [500, 0.25, 200], +]) +def test_exponential_decay(low_pass_exp): + t_max = low_pass_exp[0] + A = low_pass_exp[1] + tau = low_pass_exp[2] + t = np.arange(0, t_max, 1) + assert np.all(exponential_decay(t, A, tau) == 1 + A * np.exp(-t/tau)) + +@pytest.mark.parametrize("high_pass_exp", [ + [500, 200], +]) +def test_high_pass_exponential(high_pass_exp): + t_max = high_pass_exp[0] + tau = high_pass_exp[1] + t = np.arange(0, t_max, 1) + assert np.all(high_pass_exponential(t, tau) == np.exp(-t/tau)) + + +@pytest.mark.parametrize("single_exp_correction", [ + [-0.25, 200, 1], + [0.25, 200, 1], + [0.25, 200, 2], + [-0.01, 2000, 1], +]) +def test_single_exp_correction(single_exp_correction): + A = single_exp_correction[0] + tau = single_exp_correction[1] + ts = single_exp_correction[2] + feedforward, feedback = single_exponential_correction(A, tau, ts) + feedforward = np.array(feedforward) + print(f"\nfeedback: {feedback}") + print(f"feedforward: {feedforward}") + assert -1 < feedback[0] < 1 # Must be within (-1, 1) + assert 0 <= feedback[0] # Low pass correction must be > 0 + assert np.all(-2 < np.array(feedforward)) # Must be within (-2, 2) + assert np.all(np.array(feedforward) < 2) # Must be within (-2, 2) + assert np.all(feedforward == single_exponential_correction(A, tau*2, 2*ts)[0]) + assert feedback == single_exponential_correction(A, tau*2, 2*ts)[1] + + +@pytest.mark.parametrize("hp_correction", [ + [20000, 1], + [2000000, 2], + [10, 1], +]) +def test_hp_correction(hp_correction): + tau = hp_correction[0] + ts = hp_correction[1] + feedforward, feedback = highpass_correction(tau, ts) + print(f"\nfeedback: {feedback}") + print(f"feedforward: {feedforward}") + assert -1 < feedback[0] < 1 # Must be within (-1, 1) + assert 0 <= feedback[0] # High pass correction must be > 0 + assert np.all(-2 < np.array(feedforward)) # Must be within (-2, 2) + assert np.all(np.array(feedforward) < 2) # Must be within (-2, 2) + assert np.all(feedforward == highpass_correction(tau/2, 2*ts)[0]) + assert feedback == highpass_correction(tau/2, 2*ts)[1] + +@pytest.mark.parametrize("calc_correction", [ + [[(0.25, 200)], None, 1], + [[(-0.25, 200)], None, 1], + [[(-0.25, 200)], None, 2], + [None, [20_000], 1], + [None, [20_000], 2], +]) +def test_single_calc_filter_taps(calc_correction): + low_pass = calc_correction[0] + high_pass = calc_correction[1] + ts = calc_correction[2] + feedforward, feedback = calc_filter_taps(fir=None, exponential=low_pass, highpass=high_pass, bounce=None, delay=None, Ts=ts) + print(f"\nfeedback: {feedback}") + print(f"feedforward: {feedforward}") + if low_pass is None: + assert np.all(feedforward == highpass_correction(high_pass[0], ts)[0]) + assert feedback[0] == highpass_correction(high_pass[0], ts)[1] + elif high_pass is None: + assert np.all(feedforward == single_exponential_correction(low_pass[0][0], low_pass[0][1], ts)[0]) + assert feedback[0] == single_exponential_correction(low_pass[0][0], low_pass[0][1], ts)[1] + + +@pytest.mark.parametrize("static_calc_correction", [ + [[(-0.25, 200)], None, 1, [[1.3322259136212624, -1.325581395348837], [0.9933554817275746]]], + [None, [20_000], 1, [[1.000025, -0.999975], [0.9999990463256836]]], +]) +def test_single_calc_filter_taps_static(static_calc_correction): + low_pass = static_calc_correction[0] + high_pass = static_calc_correction[1] + ts = static_calc_correction[2] + static_values = static_calc_correction[3] + feedforward, feedback = calc_filter_taps(fir=None, exponential=low_pass, highpass=high_pass, bounce=None, delay=None, Ts=ts) + print(f"\nfeedback: {feedback}") + print(f"feedforward: {feedforward}") + assert np.all(feedforward == static_values[0]) + assert feedback[0] == static_values[1][0] + +@pytest.mark.parametrize("calc_correction", [ + [[(0.1, 100)], [10_000], 1, False], + [[(0.1, 100)], [10_000], 2, False], + [[(-0.1, 100)], [10_000], 1, True], + [[(-0.1, 100)], [10_000], 2, True], +]) +def test_double_calc_filter_taps(calc_correction): + low_pass = calc_correction[0] + high_pass = calc_correction[1] + ts = calc_correction[2] + warning = calc_correction[3] + if warning: + with pytest.warns(expected_warning=UserWarning): + feedforward, feedback = calc_filter_taps(fir=None, exponential=low_pass, highpass=high_pass, bounce=None, + delay=None, Ts=ts) + else: + feedforward, feedback = calc_filter_taps(fir=None, exponential=low_pass, highpass=high_pass, bounce=None, + delay=None, Ts=ts) + print(f"\nfeedback: {feedback}") + print(f"feedforward: {feedforward}") + assert feedback[1] == highpass_correction(high_pass[0], ts)[1] + assert feedback[0] == single_exponential_correction(low_pass[0][0], low_pass[0][1], ts)[1] + assert np.all(np.abs(feedforward) < 2) + + +@pytest.mark.parametrize("calc_correction", [ + [None, None, 1, 10], + [None, None, 2, 10], + [None, None, 0.5, 11], + [[(0.1, 100)], [10_000], 1, 10], + [[(0.1, 100)], [10_000], 0.5, 10], +]) +def test_delay_calc_filter_taps(calc_correction): + low_pass = calc_correction[0] + high_pass = calc_correction[1] + ts = calc_correction[2] + delay = calc_correction[3] + + feedforward, feedback = calc_filter_taps(fir=None, exponential=low_pass, highpass=high_pass, bounce=None, + delay=delay, Ts=ts) + print(f"\nfeedback: {feedback}") + print(f"feedforward: {feedforward}") + + assert feedforward[:int(delay/ts)] == [0.0] * int(delay/ts) + assert feedforward[int(delay/ts):] == calc_filter_taps(fir=None, exponential=low_pass, highpass=high_pass, bounce=None, + delay=0, Ts=ts)[0] + assert feedback == calc_filter_taps(fir=None, exponential=low_pass, highpass=high_pass, bounce=None, + delay=0, Ts=ts)[1] + +@pytest.mark.parametrize("calc_correction", [ + [None, None, 1], + [[(0.1, 100)], [10_000], 1], +]) +def test_fir_calc_filter_taps(calc_correction): + from scipy.signal.windows import hann + + low_pass = calc_correction[0] + high_pass = calc_correction[1] + ts = calc_correction[2] + + feedforward, feedback = calc_filter_taps(fir=hann(20)/np.sum(hann(20)), exponential=low_pass, highpass=high_pass, bounce=None, + delay=None, Ts=ts) + print(f"\nfeedback: {feedback}") + print(f"feedforward: {feedforward}") + + assert feedforward == list(np.convolve(calc_filter_taps(fir=None, exponential=low_pass, highpass=high_pass, bounce=None, + delay=None, Ts=ts)[0], hann(20)/np.sum(hann(20)))) + diff --git a/tests_against_server/test_digital_filters_server.py b/tests_against_server/test_digital_filters_server.py new file mode 100644 index 00000000..7ab0388c --- /dev/null +++ b/tests_against_server/test_digital_filters_server.py @@ -0,0 +1,545 @@ +""" +A simple sandbox to showcase different QUA functionalities during the installation. +""" + +from qm.qua import * +from qm import QuantumMachinesManager +from qm import SimulationConfig +from qualang_tools.digital_filters import * +from pathlib import Path +import numpy as np +from qualang_tools.config.waveform_tools import drag_gaussian_pulse_waveforms +from qualang_tools.units import unit +import matplotlib.pyplot as plt +####################### +# AUXILIARY FUNCTIONS # +####################### +u = unit(coerce_to_integer=True) + + +# IQ imbalance matrix +def IQ_imbalance(g, phi): + """ + Creates the correction matrix for the mixer imbalance caused by the gain and phase imbalances, more information can + be seen here: + https://docs.qualang.io/libs/examples/mixer-calibration/#non-ideal-mixer + :param g: relative gain imbalance between the 'I' & 'Q' ports. (unit-less), set to 0 for no gain imbalance. + :param phi: relative phase imbalance between the 'I' & 'Q' ports (radians), set to 0 for no phase imbalance. + """ + c = np.cos(phi) + s = np.sin(phi) + N = 1 / ((1 - g**2) * (2 * c**2 - 1)) + return [float(N * x) for x in [(1 - g) * c, (1 + g) * s, (1 - g) * s, (1 + g) * c]] + + +###################### +# Network parameters # +###################### +qop_ip = "172.16.33.101" # Write the QM router IP address +cluster_name = "Cluster_83" # Write your cluster_name if version >= QOP220 +qop_port = None # Write the QOP port if version < QOP220 + +# Path to save data +save_dir = Path().absolute() / "QM" / "INSTALLATION" / "data" + +##################### +# OPX configuration # +##################### +# Set octave_config to None if no octave are present +octave_config = None + +############################################# +# Qubits # +############################################# +qubit_LO = 7.4 * u.GHz # Used only for mixer correction and frequency rescaling for plots or computation +qubit_IF = 110 * u.MHz +mixer_qubit_g = 0.0 +mixer_qubit_phi = 0.0 + +qubit_T1 = int(10 * u.us) +thermalization_time = 5 * qubit_T1 + +# Continuous wave +const_len = 100 +const_amp = 0.1 +# Saturation_pulse +saturation_len = 10 * u.us +saturation_amp = 0.1 +# Square pi pulse +square_pi_len = 100 +square_pi_amp = 0.1 +# Drag pulses +drag_coef = 0 +anharmonicity = -200 * u.MHz +AC_stark_detuning = 0 * u.MHz + +x180_len = 40 +x180_sigma = x180_len / 5 +x180_amp = 0.35 +x180_wf, x180_der_wf = np.array( + drag_gaussian_pulse_waveforms(x180_amp, x180_len, x180_sigma, drag_coef, anharmonicity, AC_stark_detuning) +) +x180_I_wf = x180_wf +x180_Q_wf = x180_der_wf +# No DRAG when alpha=0, it's just a gaussian. + +x90_len = x180_len +x90_sigma = x90_len / 5 +x90_amp = x180_amp / 2 +x90_wf, x90_der_wf = np.array( + drag_gaussian_pulse_waveforms(x90_amp, x90_len, x90_sigma, drag_coef, anharmonicity, AC_stark_detuning) +) +x90_I_wf = x90_wf +x90_Q_wf = x90_der_wf +# No DRAG when alpha=0, it's just a gaussian. + +minus_x90_len = x180_len +minus_x90_sigma = minus_x90_len / 5 +minus_x90_amp = -x90_amp +minus_x90_wf, minus_x90_der_wf = np.array( + drag_gaussian_pulse_waveforms( + minus_x90_amp, + minus_x90_len, + minus_x90_sigma, + drag_coef, + anharmonicity, + AC_stark_detuning, + ) +) +minus_x90_I_wf = minus_x90_wf +minus_x90_Q_wf = minus_x90_der_wf +# No DRAG when alpha=0, it's just a gaussian. + +y180_len = x180_len +y180_sigma = y180_len / 5 +y180_amp = x180_amp +y180_wf, y180_der_wf = np.array( + drag_gaussian_pulse_waveforms(y180_amp, y180_len, y180_sigma, drag_coef, anharmonicity, AC_stark_detuning) +) +y180_I_wf = (-1) * y180_der_wf +y180_Q_wf = y180_wf +# No DRAG when alpha=0, it's just a gaussian. + +y90_len = x180_len +y90_sigma = y90_len / 5 +y90_amp = y180_amp / 2 +y90_wf, y90_der_wf = np.array( + drag_gaussian_pulse_waveforms(y90_amp, y90_len, y90_sigma, drag_coef, anharmonicity, AC_stark_detuning) +) +y90_I_wf = (-1) * y90_der_wf +y90_Q_wf = y90_wf +# No DRAG when alpha=0, it's just a gaussian. + +minus_y90_len = y180_len +minus_y90_sigma = minus_y90_len / 5 +minus_y90_amp = -y90_amp +minus_y90_wf, minus_y90_der_wf = np.array( + drag_gaussian_pulse_waveforms( + minus_y90_amp, + minus_y90_len, + minus_y90_sigma, + drag_coef, + anharmonicity, + AC_stark_detuning, + ) +) +minus_y90_I_wf = (-1) * minus_y90_der_wf +minus_y90_Q_wf = minus_y90_wf +# No DRAG when alpha=0, it's just a gaussian. + +############################################# +# Resonators # +############################################# +resonator_LO = 4.8 * u.GHz # Used only for mixer correction and frequency rescaling for plots or computation +resonator_IF = 60 * u.MHz +mixer_resonator_g = 0.0 +mixer_resonator_phi = 0.0 + +readout_len = 5000 +readout_amp = 0.2 + +time_of_flight = 24 +depletion_time = 2 * u.us + +opt_weights = False +if opt_weights: + from qualang_tools.config.integration_weights_tools import convert_integration_weights + + weights = np.load("optimal_weights.npz") + opt_weights_real = convert_integration_weights(weights["weights_real"]) + opt_weights_minus_imag = convert_integration_weights(weights["weights_minus_imag"]) + opt_weights_imag = convert_integration_weights(weights["weights_imag"]) + opt_weights_minus_real = convert_integration_weights(weights["weights_minus_real"]) +else: + opt_weights_real = [(1.0, readout_len)] + opt_weights_minus_imag = [(1.0, readout_len)] + opt_weights_imag = [(1.0, readout_len)] + opt_weights_minus_real = [(1.0, readout_len)] + +########################################## +# Flux line # +########################################## +max_frequency_point = 0.0 +flux_settle_time = 100 * u.ns + +# Resonator frequency versus flux fit parameters according to resonator_spec_vs_flux +# amplitude * np.cos(2 * np.pi * frequency * x + phase) + offset +amplitude_fit, frequency_fit, phase_fit, offset_fit = [0, 0, 0, 0] + +# FLux pulse parameters +const_flux_len = 2000 +const_flux_amp = 0.1 + +# IQ Plane Angle +rotation_angle = (0 / 180) * np.pi +# Threshold for single shot g-e discrimination +ge_threshold = 0.0 + +############################################# +# Config # +############################################# +config = { + "version": 1, + "controllers": { + "con1": { + "analog_outputs": { + 1: {"offset": 0.0}, # I qubit + 2: {"offset": 0.0}, # Q qubit + 3: {"offset": 0.0}, # I resonator + 4: {"offset": 0.0}, # Q resonator + 5: {"offset": max_frequency_point}, # flux line + }, + "digital_outputs": { + 1: {}, + }, + "analog_inputs": { + 1: {"offset": 0.0, "gain_db": 0}, # I from down-conversion + 2: {"offset": 0.0, "gain_db": 0}, # Q from down-conversion + }, + }, + }, + "elements": { + "qubit": { + "mixInputs": { + "I": ("con1", 1), + "Q": ("con1", 2), + "lo_frequency": qubit_LO, + "mixer": "mixer_qubit", + }, + "intermediate_frequency": qubit_IF, + "operations": { + "cw": "const_pulse", + "saturation": "saturation_pulse", + "pi": "pi_pulse", + "pi_half": "pi_half_pulse", + "x180": "x180_pulse", + "x90": "x90_pulse", + "-x90": "-x90_pulse", + "y90": "y90_pulse", + "y180": "y180_pulse", + "-y90": "-y90_pulse", + }, + }, + "resonator": { + "mixInputs": { + "I": ("con1", 3), + "Q": ("con1", 4), + "lo_frequency": resonator_LO, + "mixer": "mixer_resonator", + }, + "intermediate_frequency": resonator_IF, + "operations": { + "cw": "const_pulse", + "readout": "readout_pulse", + }, + "outputs": { + "out1": ("con1", 1), + "out2": ("con1", 2), + }, + "time_of_flight": time_of_flight, + "smearing": 0, + }, + "flux_line": { + "singleInput": { + "port": ("con1", 5), + }, + "operations": { + "const": "const_flux_pulse", + }, + }, + "flux_line_sticky": { + "singleInput": { + "port": ("con1", 5), + }, + "sticky": {"analog": True, "duration": 20}, + "operations": { + "const": "const_flux_pulse", + }, + }, + }, + "pulses": { + "const_single_pulse": { + "operation": "control", + "length": const_len, + "waveforms": { + "single": "const_wf", + }, + }, + "const_flux_pulse": { + "operation": "control", + "length": const_flux_len, + "waveforms": { + "single": "const_flux_wf", + }, + }, + "const_pulse": { + "operation": "control", + "length": const_len, + "waveforms": { + "I": "const_wf", + "Q": "zero_wf", + }, + }, + "saturation_pulse": { + "operation": "control", + "length": saturation_len, + "waveforms": {"I": "saturation_drive_wf", "Q": "zero_wf"}, + }, + "pi_pulse": { + "operation": "control", + "length": square_pi_len, + "waveforms": { + "I": "pi_wf", + "Q": "zero_wf", + }, + }, + "pi_half_pulse": { + "operation": "control", + "length": square_pi_len, + "waveforms": { + "I": "pi_half_wf", + "Q": "zero_wf", + }, + }, + "x90_pulse": { + "operation": "control", + "length": x90_len, + "waveforms": { + "I": "x90_I_wf", + "Q": "x90_Q_wf", + }, + }, + "x180_pulse": { + "operation": "control", + "length": x180_len, + "waveforms": { + "I": "x180_I_wf", + "Q": "x180_Q_wf", + }, + }, + "-x90_pulse": { + "operation": "control", + "length": minus_x90_len, + "waveforms": { + "I": "minus_x90_I_wf", + "Q": "minus_x90_Q_wf", + }, + }, + "y90_pulse": { + "operation": "control", + "length": y90_len, + "waveforms": { + "I": "y90_I_wf", + "Q": "y90_Q_wf", + }, + }, + "y180_pulse": { + "operation": "control", + "length": y180_len, + "waveforms": { + "I": "y180_I_wf", + "Q": "y180_Q_wf", + }, + }, + "-y90_pulse": { + "operation": "control", + "length": minus_y90_len, + "waveforms": { + "I": "minus_y90_I_wf", + "Q": "minus_y90_Q_wf", + }, + }, + "readout_pulse": { + "operation": "measurement", + "length": readout_len, + "waveforms": { + "I": "readout_wf", + "Q": "zero_wf", + }, + "integration_weights": { + "cos": "cosine_weights", + "sin": "sine_weights", + "minus_sin": "minus_sine_weights", + "rotated_cos": "rotated_cosine_weights", + "rotated_sin": "rotated_sine_weights", + "rotated_minus_sin": "rotated_minus_sine_weights", + "opt_cos": "opt_cosine_weights", + "opt_sin": "opt_sine_weights", + "opt_minus_sin": "opt_minus_sine_weights", + }, + "digital_marker": "ON", + }, + }, + "waveforms": { + "const_wf": {"type": "constant", "sample": const_amp}, + "saturation_drive_wf": {"type": "constant", "sample": saturation_amp}, + "pi_wf": {"type": "constant", "sample": square_pi_amp}, + "pi_half_wf": {"type": "constant", "sample": square_pi_amp / 2}, + "const_flux_wf": {"type": "constant", "sample": const_flux_amp}, + "zero_wf": {"type": "constant", "sample": 0.0}, + "x90_I_wf": {"type": "arbitrary", "samples": x90_I_wf.tolist()}, + "x90_Q_wf": {"type": "arbitrary", "samples": x90_Q_wf.tolist()}, + "x180_I_wf": {"type": "arbitrary", "samples": x180_I_wf.tolist()}, + "x180_Q_wf": {"type": "arbitrary", "samples": x180_Q_wf.tolist()}, + "minus_x90_I_wf": {"type": "arbitrary", "samples": minus_x90_I_wf.tolist()}, + "minus_x90_Q_wf": {"type": "arbitrary", "samples": minus_x90_Q_wf.tolist()}, + "y90_Q_wf": {"type": "arbitrary", "samples": y90_Q_wf.tolist()}, + "y90_I_wf": {"type": "arbitrary", "samples": y90_I_wf.tolist()}, + "y180_Q_wf": {"type": "arbitrary", "samples": y180_Q_wf.tolist()}, + "y180_I_wf": {"type": "arbitrary", "samples": y180_I_wf.tolist()}, + "minus_y90_Q_wf": {"type": "arbitrary", "samples": minus_y90_Q_wf.tolist()}, + "minus_y90_I_wf": {"type": "arbitrary", "samples": minus_y90_I_wf.tolist()}, + "readout_wf": {"type": "constant", "sample": readout_amp}, + }, + "digital_waveforms": { + "ON": {"samples": [(1, 0)]}, + }, + "integration_weights": { + "cosine_weights": { + "cosine": [(1.0, readout_len)], + "sine": [(0.0, readout_len)], + }, + "sine_weights": { + "cosine": [(0.0, readout_len)], + "sine": [(1.0, readout_len)], + }, + "minus_sine_weights": { + "cosine": [(0.0, readout_len)], + "sine": [(-1.0, readout_len)], + }, + "opt_cosine_weights": { + "cosine": opt_weights_real, + "sine": opt_weights_minus_imag, + }, + "opt_sine_weights": { + "cosine": opt_weights_imag, + "sine": opt_weights_real, + }, + "opt_minus_sine_weights": { + "cosine": opt_weights_minus_imag, + "sine": opt_weights_minus_real, + }, + "rotated_cosine_weights": { + "cosine": [(np.cos(rotation_angle), readout_len)], + "sine": [(np.sin(rotation_angle), readout_len)], + }, + "rotated_sine_weights": { + "cosine": [(-np.sin(rotation_angle), readout_len)], + "sine": [(np.cos(rotation_angle), readout_len)], + }, + "rotated_minus_sine_weights": { + "cosine": [(np.sin(rotation_angle), readout_len)], + "sine": [(-np.cos(rotation_angle), readout_len)], + }, + }, + "mixers": { + "mixer_qubit": [ + { + "intermediate_frequency": qubit_IF, + "lo_frequency": qubit_LO, + "correction": IQ_imbalance(mixer_qubit_g, mixer_qubit_phi), + } + ], + "mixer_resonator": [ + { + "intermediate_frequency": resonator_IF, + "lo_frequency": resonator_LO, + "correction": IQ_imbalance(mixer_resonator_g, mixer_resonator_phi), + } + ], + }, +} + + +# +# feedforward = [1.000055, -0.999945] +t_hp = 10_000 +A = -0.1 +tau_lp = 100 +# feedforward, feedback = single_exponential_correction(A, tau_lp) +from scipy.signal.windows import hann +# feedforward, feedback = calc_filter_taps(delay=11, Ts=2) +# feedforward, feedback = calc_filter_taps(exponential=[(A, tau_lp)], highpass=[t_hp], delay=5.1) +feedforward, feedback = calc_filter_taps(delay=10.1, highpass=[t_hp]) +# feedforward, feedback = calc_filter_taps(exponential=[(A, tau_lp)]) +# feedforward, feedback = highpass_correction(t_hp) +# feedforward = [1.0002, -0.9998] +# feedback = [0.9999990463225004, 0.9999990463225004] +# feedforward, feedback = calc_filter_taps(exponential=[(-0.1, tau_lp)]) +# feedback = [-feedback[0]] +config["controllers"]["con1"]["analog_outputs"][5] = { + "offset": 0.0, + "filter": {"feedforward": feedforward, "feedback": feedback}} + +################### +# The QUA program # +################### +with program() as hello_qua: + a = declare(fixed) + play("const" * amp(1), "flux_line") + wait(25, "flux_line") + +##################################### +# Open Communication with the QOP # +##################################### +qmm = QuantumMachinesManager(host=qop_ip, port=qop_port, cluster_name=cluster_name, octave=octave_config) + +########################### +# Run or Simulate Program # +########################### + +simulate = True + +if simulate: + # Simulates the QUA program for the specified duration + simulation_config = SimulationConfig(duration=2_000) # In clock cycles = 4ns + # Simulate blocks python until the simulation is done + job = qmm.simulate(config, hello_qua, simulation_config) + # Plot the simulated samples + # job.get_simulated_samples().con1.plot() + samples = job.get_simulated_samples().con1 + t = np.arange(0, len(samples.analog["5"]), 1) + dt = np.where(samples.analog["5"] != 0)[0][0] + plt.figure() + plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (np.exp(-t[:const_flux_len] / t_hp)), 'b',label="Pulse before correction") + plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction") + plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (np.exp(-t[:const_flux_len] / t_hp)), 'r',label="Pulse after correction") + plt.legend() + + # plt.figure() + # plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'b',label="Pulse before correction") + # plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction") + # plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'r',label="Pulse after correction") + # plt.legend() + + # plt.figure() + # plt.plot(t[dt:const_flux_len+dt], const_flux_amp * (np.exp(-t[:const_flux_len] / t_hp)) * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'b',label="Pulse before correction") + # plt.plot(t, samples.analog["5"], 'g--', label="OPX pulse with correction") + # plt.plot(t[dt:const_flux_len+dt], samples.analog["5"][dt:const_flux_len+dt] * (np.exp(-t[:const_flux_len] / t_hp)) * (1 + A*np.exp(-t[:const_flux_len] / tau_lp)), 'r',label="Pulse after correction") + # plt.legend() +else: + # Open a quantum machine to execute the QUA program + qm = qmm.open_qm(config) + qm.set_output_filter_by_element() + # Send the QUA program to the OPX, which compiles and executes it - Execute does not block python! + job = qm.execute(hello_qua)