diff --git a/qopt_best_practices/cost_function/cost_utils.py b/qopt_best_practices/cost_function/cost_utils.py index 483d5be..ac4bb96 100644 --- a/qopt_best_practices/cost_function/cost_utils.py +++ b/qopt_best_practices/cost_function/cost_utils.py @@ -1,5 +1,7 @@ """QAOA Cost function utils""" +from collections.abc import Callable, Iterable + import numpy as np from qiskit.quantum_info import SparsePauliOp @@ -14,8 +16,18 @@ def evaluate_sparse_pauli(state: int, observable: SparsePauliOp) -> complex: return np.sum(observable.coeffs * _PARITY[reduced]) -def qaoa_sampler_cost_fun(params, ansatz, hamiltonian, sampler): - """Standard sampler-based QAOA cost function to be plugged into optimizer routines.""" +def qaoa_sampler_cost_fun(params, ansatz, hamiltonian, sampler, aggregation=None): + """Standard sampler-based QAOA cost function to be plugged into optimizer routines. + + Args: + params (np.ndarray): Parameters for the ansatz. + ansatz (QuantumCircuit): Ansatz circuit. + hamiltonian (SparsePauliOp): Hamiltonian to be minimized. + sampler (QAOASampler): Sampler to be used. + aggregation (Callable | float | None): Aggregation function to be applied to + the sampled results. If None, the sum of the expectation values is returned. + If float, the CVaR with the given alpha is used. + """ job = sampler.run(ansatz, params) sampler_result = job.result() sampled = sampler_result.quasi_dists[0] @@ -26,6 +38,43 @@ def qaoa_sampler_cost_fun(params, ansatz, hamiltonian, sampler): for state, probability in sampled.items() } - result = sum(probability * value for probability, value in evaluated.values()) + # If aggregation is None, return the sum of the expectation values. + # If aggregation is a float, return the CVaR with the given alpha. + # Otherwise, use the aggregation function. + if aggregation is None: + result = sum(probability * value for probability, value in evaluated.values()) + elif isinstance(aggregation, float): + cvar_aggregation = _get_cvar_aggregation(aggregation) + result = cvar_aggregation(evaluated.values()) + else: + result = aggregation(evaluated.values()) return result + + +def _get_cvar_aggregation(alpha: float | None) -> Callable: + """Return the CVaR aggregation function with the given alpha. + + Args: + alpha (float | None): Alpha value for the CVaR aggregation. If None, 1 is used + by default. + """ + if alpha is None: + alpha = 1 + elif not 0 <= alpha <= 1: + raise ValueError(f"alpha must be in [0, 1], but {alpha} was given.") + + def cvar_aggregation(measurements: Iterable[tuple[float, float]]) -> float: + """Return the CVaR of the given measurements.""" + sorted_measurements = sorted(measurements, key=lambda x: x[1]) + # accumulate the probabilities until alpha is reached + accumulated_percent = 0.0 + cvar = 0.0 + for probability, value in sorted_measurements: + cvar += value * min(probability, alpha - accumulated_percent) + accumulated_percent += probability + if accumulated_percent >= alpha: + break + return cvar / alpha + + return cvar_aggregation diff --git a/qopt_best_practices/utils/__init__.py b/qopt_best_practices/utils/__init__.py index 7a87e00..2c092a7 100644 --- a/qopt_best_practices/utils/__init__.py +++ b/qopt_best_practices/utils/__init__.py @@ -1,6 +1,5 @@ """Utils""" - from .graph_utils import build_max_cut_graph, build_max_cut_paulis __all__ = ["build_max_cut_graph", "build_max_cut_paulis"]