diff --git a/.gitignore b/.gitignore index 2c1d0a5..d831489 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ TODO* -test* results* # Created by https://www.toptal.com/developers/gitignore/api/python,macos,direnv,visualstudiocode diff --git a/CITATION.cff b/CITATION.cff index cf72bd0..791629b 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -1,5 +1,5 @@ cff-version: 1.2.0 -title: "Quantum Circuit Designer" +title: "qcd-gym" message: >- If you use this software, please cite it using the metadata from this file. @@ -8,12 +8,14 @@ authors: - given-names: Philipp family-names: Altmann email: philipp@hyphi.co - orcid: 'https://orcid.org/0000-0003-1134-176X' repository-code: 'https://github.com/philippaltmann/qcd/' -abstract: A gymnasium-based set of environments for benchmarking reinforcement learning for quantum circuit design. +url: 'https://github.com/philippaltmann/qcd/' +abstract: >- + A gymnasium-based set of environments for benchmarking + reinforcement learning for quantum circuit design. keywords: - - benchmark - - reinforcement-learning - - quantum-computing - -circuit-design -license: MIT + - Reinforcement Learning + - Quantum Computing + - Circuit Optimization + - Architecture Search +license: MIT \ No newline at end of file diff --git a/QCD.png b/QCD.png index f4b8234..df8a200 100644 Binary files a/QCD.png and b/QCD.png differ diff --git a/README.md b/README.md index ee75a23..b565c69 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,13 @@ # Quantum Circuit Designer +[![arXiv](https://img.shields.io/badge/arXiv-2312.11337-b31b1b.svg?logo=arxiv&logoColor=white)](https://arxiv.org/abs/2312.11337) +[![GitHub Release](https://img.shields.io/github/v/release/philippaltmann/qcd?logo=github&logoColor=white&label=GitHub)](http://github.com/philippaltmann/qcd) +[![PyPI Version](https://img.shields.io/pypi/v/qcd-gym?logo=pypi&logoColor=white)](https://pypi.org/p/qcd-gym/) -[![arXiv](https://img.shields.io/badge/arXiv-2312.11337-b31b1b.svg)](https://arxiv.org/abs/2312.11337) -[![PyPI version](https://badge.fury.io/py/qcd-gym.svg)](https://badge.fury.io/py/qcd-gym) -![alt text](QCD.png) +![QCD Overview](QCD.png) ## **Description** -This repository contains the Quantum Circuit Designer, a generic [gymnasium](https://github.com/Farama-Foundation/Gymnasium) environment to build quantum circuits gate-by-gate using [pennylane](https://github.com/PennyLaneAI/pennylane), revealing current challenges regarding: +This repository contains `qcd-gym`, a generic [gymnasium](https://github.com/Farama-Foundation/Gymnasium) environment to build quantum circuits gate-by-gate using [qiskit](https://github.com/Qiskit/qiskit), revealing current challenges regarding: - [State Preparation (SP)](#state-preparation): Find a gate sequence that turns some initial state into the target quantum state. - [Unitary Composition (UC)](#unitary-composition): Find a gate sequence that constructs an arbitrary quantum operator. @@ -14,9 +15,9 @@ This repository contains the Quantum Circuit Designer, a generic [gymnasium](htt ## Observations -The observation is defined by the full complex vector representation of the state of the current circuit: $s = \ket{\boldsymbol{\Psi}}\in\mathbb{C}^{2^\eta}$. +The observation is comprised of the state of the current circuit, represented by the full complex vector representation $\ket{\Psi}$ or the unitary operator $\boldsymbol{V}(\Sigma_t)$ resulting from the current sequence of operations $\Sigma_t$, as well as the intended target. While this information is only available in quantum circuit simulators efficiently (on real hardware, $\mathcal{O}(2^\eta)$ measurements would be needed), it depicts a starting point for RL from which future work should extract a sufficient, efficiently obtainable, subset of information. -This $2^\eta$-dimensional state representation is sufficient for the definition of an MDP-compliant environment, as operations on this state are required to be reversible. +This state representation is sufficient for the definition of an MDP-compliant environment, as operations on this state are required to be reversible. ## Actions @@ -33,12 +34,11 @@ The operations $\Gamma$ are defined as: | o | Operation | Condition | Type | Arguments | Comments | | - | ------------ | ---------- | -------------------- | ---------- | :---------------------------- | -| 0 | $\mathbb{M}$ | | Meassurement | $q$ | Control and Parameter omitted | -| 1 | $\mathbb{Z}$ | $q = c$ | PhaseShift | $q,\Phi$ | Control omitted | -| 1 | $\mathbb{Z}$ | $q \neq c$ | ControlledPhaseShift | $q,c,\Phi$ | - | -| 2 | $\mathbb{X}$ | $q = c$ | X-Rotation | $q,\Phi$ | Control omitted | -| 2 | $\mathbb{X}$ | $q \neq c$ | CNOT | $q,c$ | Parameter omitted | -| 3 | $\mathbb{T}$ | | Terminate | | All agruments omitted | +| 0 | $\mathbb{Z}$ | $q = c$ | PhaseShift | $q,\Phi$ | Control omitted | +| 0 | $\mathbb{Z}$ | $q \neq c$ | ControlledPhaseShift | $q,c,\Phi$ | - | +| 1 | $\mathbb{X}$ | $q = c$ | X-Rotation | $q,\Phi$ | Control omitted | +| 1 | $\mathbb{X}$ | $q \neq c$ | CNOT | $q,c$ | Parameter omitted | +| 2 | $\mathbb{T}$ | | Terminate | | All agruments omitted | With operations according to the following unversal gate set: @@ -53,13 +53,13 @@ The reward is kept $0$ until the end of an episode is reached (either by truncat To incentivize the use of few operations, a step-cost $\mathcal{C}_t$ is applied when exceeding two-thirds of the available operations $\sigma$: $$\mathcal{C}_t=\max\left(0,\frac{3}{2\sigma}\left(t-\frac{\sigma}{3}\right)\right)$$ -Suitable task reward functions $\mathcal{R}^{\*}\in[0,1]$ are defined, s.t.: $\mathcal{R}=\mathcal{R}^{\*}(s_t,a_t)-C_t$ if $t$ is terminal, according to the following challenges: +Suitable task reward functions $\mathcal{R}^{\ast}\in[0,1]$ are defined, s.t.: $\mathcal{R}=\mathcal{R}^{\ast}(s_t,a_t)-C_t$ if $t$ is terminal, according to the following objectives: -## Challenges +## Objectives ### **State Preparation** -The objective of this challenge is to construct a quantum circuit that generates a desired quantum state. +The task of this objective is to construct a quantum circuit that generates a desired quantum state. The reward is based on the *fidelity* between the target an the final state: $$\mathcal{R}^{SP}(s_t,a_t) = F(s_t, \Psi) = |\braket{\psi_{\text{env}}|\psi_{\text{target}}}|^2 \in [0,1]$$ Currently, the following states are defined: @@ -69,33 +69,32 @@ Currently, the following states are defined: ### **Unitary Composition** -The objective of this challenge is to construct a quantum circuit that implements a desired unitary operation. +The task of this objective is to construct a quantum circuit that implements a desired unitary operation. The reward is based on the ***Frobenius norm*** $D = |U - V(\Sigma_t)|_2$ between the taget unitary $U$ and the final unitary $V$ based on the sequence of operations $\Sigma_t = \langle a_0, \dots, a_t \rangle$: $$ R^{UC}(s_t,a_t) = 1 - \arctan (D)$$ -The following unitaries are currently available for this challenge: +The following unitaries are currently available for this objective: - `'UC-random'` (a random unitary operation on *max_qubits* ) - `'UC-hadamard'` (the single qubit Hadamard gate) - `'UC-toffoli'` (the 3-qubit Toffoli gate) -See [Outlook](#outlook-and-todos) for more challenges to come. ### *Further Objectives* -The goal of this implementation is to not only construct any circuit that fulfills a specific challenge but to also make this circuit optimal, that is to give the environment further objectives, such as optimizing: +The goal of this implementation is to not only construct any circuit that fulfills a specific objective but to also make this circuit optimal, that is to give the environment further objectives, such as optimizing: - Circuit Depth - Qubit Count -- Gate Count (or: 2-qubit Gate Count) +- Gate Count - Parameter Count - Qubit-Connectivity These circuit optimization objectives can be switched on by the parameter `punish` when initializing a new environment. -Currently, the only further objective implemented in this environment is the **circuit depth**, as this is one of the most important features to restrict for NISQ (noisy, intermediate-scale, quantum) devices. This metric already includes gate count and parameter count to some extent. However, further objectives can easily be added within the `Reward` class of this environment (see [Outlook](#outlook)). +Currently, the only further objective implemented in this environment is the **circuit depth**, as this is one of the most important features to restrict for NISQ (noisy, intermediate-scale, quantum) devices. This metric already includes gate count and parameter count to some extent. However, further objectives can easily be added within the `Reward` class of this environment. ## **Setup** @@ -111,7 +110,7 @@ The environment can be set up as: ```python import gymnasium as gym -env = gym.make("CircuitDesigner-v0", max_qubits=2, max_depth=10, challenge='SP-bell', render_mode='text', verbose=True) +env = gym.make("CircuitDesigner-v0", max_qubits=2, max_depth=10, objective='SP-bell', render_mode='text') observation, info = env.reset(seed=42); env.action_space.seed(42) for _ in range(9): @@ -128,7 +127,7 @@ The relevant parameters for setting up the environment are: | :----------------- | ------ | ------------------------------------------------------------ | | max_qubits $\eta$ | `int` | maximal number of qubits available | | max_depth $\delta$ | `int` | maximal circuit depth allowed (= truncation criterion) | -| challenge | `str` | RL challenge for which the circuit is to be built (see [Challenges](#challenges)) | +| objective | `str` | RL objective for which the circuit is to be built (see [Objectives](#objectives)) | | punish | `bool` | specifier for turning on multi-objectives (see [Further Objectives](#further-objectives)) | @@ -141,31 +140,28 @@ git clone https://github.com/philippaltmann/QCD.git pip install -e '.[all]' ``` -Specify the intended \ as: "`challenge`-q`max_qubits`-d`max_depth`": +Specify the intended \ as: "`objective`-q`max_qubits`-d`max_depth`": ```sh -# Run a specific algoritm and challenge (requires `pip install -e '.[train]'`) -python -m train [A2C | PPO | SAC | TD3] -e +# Run a specific algoritm and task (requires `pip install -e '.[train]'`) +python -m train [A2C | PPO | SAC | TD3] -e -# Generate plots from the `results` folder (requires `pip install -e '.[plot]'`) -python -m plot results +# Generate plots from the `results` folder (requires `pip install -e '.[plot]'`) +python -m plot results -b # plot all runs in `results`, add random and evo baselines # To train the provided baseline algorithms, use (pip install -e .[all]) -./run +./run.sh # Test the circuit designer (requires `pip install -e '.[test]'`) -python -m circuit_designer.test +python -m test ``` -## Results +## Results + +![Results](Results.png) -![alt text](Results.png) ## Acknowledgements The research is part of the [Munich Quantum Valley](https://www.munich-quantum-valley.de), which is supported by the Bavarian state government with funds from the [Hightech Agenda Bayern Plus](https://www.hightechagenda.de). - - - - diff --git a/Results.png b/Results.png index d52b42d..5685f32 100644 Binary files a/Results.png and b/Results.png differ diff --git a/algorithm/algorithm.py b/algorithm/algorithm.py index 7b8f3d5..3169fc6 100644 --- a/algorithm/algorithm.py +++ b/algorithm/algorithm.py @@ -1,8 +1,7 @@ import numpy as np; import pandas as pd; import random -import torch as th; import scipy.stats as st +import torch as th; import scipy.stats as st; import re from stable_baselines3.common.base_class import BaseAlgorithm -from stable_baselines3.common.callbacks import CallbackList -from stable_baselines3.common.policies import ActorCriticPolicy, obs_as_tensor as obs +from stable_baselines3.common.policies import ActorCriticPolicy from stable_baselines3.common.vec_env import VecNormalize from stable_baselines3.common.save_util import load_from_zip_file, recursive_setattr from torch.utils.tensorboard.writer import SummaryWriter @@ -10,41 +9,47 @@ from tqdm import tqdm; import os import gymnasium as gym import platform; import stable_baselines3 as sb3; -from algorithm.evaluation import EvaluationCallback -from algorithm.factory import factory + +from qcd_gym.wrappers.monitor import Monitor +from stable_baselines3.common.vec_env import DummyVecEnv + +def _make(record_video=False, **spec): + def _init() -> gym.Env: return Monitor(gym.make(**spec), record_video=record_video) + return _init + +def named(env): + max_qubits = int(re.search('-q(\d+)', env).group(1)); env = re.sub('-q(\d+)', '', env) + max_depth = int(re.search('-d(\d+)', env).group(1)); env = re.sub('-d(\d+)', '', env) + return {'id': 'CircuitDesigner-v0', 'max_qubits': max_qubits, 'max_depth': max_depth, 'objective': env} + +def make_vec(env, seed=None, n_envs=4, **kwargs): + spec = lambda rank: {**named(env), 'seed': seed, **kwargs} + return DummyVecEnv([_make(**spec(i)) for i in range(n_envs)]) + class TrainableAlgorithm(BaseAlgorithm): """ Generic Algorithm Class extending BaseAlgorithm with features needed by the training pipeline """ - def __init__(self, envs:List[str]=None, normalize:bool=False, policy:Union[str,Type[ActorCriticPolicy]]="MlpPolicy", path:Optional[str]=None, - seed=None, silent=False, stop_on_reward=False, explore=False, log_name=None, envkwargs={}, **kwargs): + def __init__(self, env:str, normalize:bool=False, policy:Union[str,Type[ActorCriticPolicy]]="MlpPolicy", path:Optional[str]=None, seed=None, silent=False, log_name=None, envkwargs={}, **kwargs): """ :param env: The environment to learn from (if registered in Gym, can be str) :param policy: The policy model to use (MlpPolicy, CnnPolicy, ...) defaults to MlpPolicy :param normalize: whether to use normalized observations, default: False - :param stop_on_reward: bool for ealry stopping, defaults to False. - :param explore: sets enviornment to explore mode, default False :param log_name: optional custom folder name for logging :param path: (str) the log location for tensorboard (if None, no logging) """ - _path = lambda seed: f"{path}/{envs[0]}/{log_name or str(self.__class__.__name__)}/{seed}" + _path = lambda seed: f"{path}/{env}/{log_name or str(self.__class__.__name__)}/{seed}" gen_seed = lambda s=random.randint(0, 999): s if not os.path.isdir(_path(s)) else gen_seed() if seed is None: seed = gen_seed() self.path = _path(seed) if path is not None else None; self.eval_frequency, self.progress_bar = None, None - if envs is not None: self.envs = factory(envs, seed=seed, **envkwargs); - self.explore = explore; self.stop_on_reward = stop_on_reward and not explore + env = make_vec(env, seed=seed, **envkwargs) self.normalize, self.silent, self.continue_training = normalize, silent, True; - super().__init__(policy=policy, seed=seed, verbose=0, env=self.envs['train'], **kwargs) + super().__init__(policy=policy, seed=seed, verbose=0, env=env, **kwargs) def _setup_model(self) -> None: if self.normalize: self.env = VecNormalize(self.env) - self._naming = {'l': 'length-100', 'r': 'return-100'}; self._custom_scalars = {} #, 's': 'safety-100' - self.get_actions = lambda s: self.policy.get_distribution(obs(np.expand_dims(s, axis=0), self.device)).distribution.probs - self.heatmap_iterations = { # Deterministic policy heatmaps - 'action': (lambda _,s,a,r: self.policy.predict(s.flat, deterministic=True)[0] == a, (0,1)), - # Prob distributions (coelation of porb index and action number might be misalligned) - 'policy': (lambda _, s, a, r: self.get_actions(s).cpu().detach().numpy()[0][a], (0,1))} - super(TrainableAlgorithm, self)._setup_model(); stage = '/explore' if self.explore else '/train' - self.writer, self._registered_ci = SummaryWriter(self.path + stage) if self.path and not self.silent else None, [] - if not self.silent and not self.explore: print("+-------------------------------------------------------+\n"\ + self._naming = {'l': 'length-100', 'r': 'return-100'}; self._custom_scalars = {} + super(TrainableAlgorithm, self)._setup_model() + self.writer, self._registered_ci = SummaryWriter(self.path+"/train") if self.path and not self.silent else None, [] + if not self.silent: print("+-------------------------------------------------------+\n"\ f"| System: {platform.version()} |\n" \ f"| GPU: {f'Enabled, version {th.version.cuda} on {th.cuda.get_device_name(0)}' if th.cuda.is_available() else'Disabled'} |\n"\ f"| Python: {platform.python_version()} | PyTorch: {th.__version__} | Numpy: {np.__version__} |\n" \ @@ -57,34 +62,34 @@ def _excluded_save_params(self) -> List[str]: E.g. replay buffers are skipped by default as they take up a lot of space. PyTorch variables should be excluded with this so they can be stored with ``th.save``. :return: List of parameters that should be excluded from being saved with pickle. """ - return super(TrainableAlgorithm, self)._excluded_save_params() + ['get_actions', 'heatmap_iterations', '_naming', '_custom_scalars', '_registered_ci', 'envs', 'writer', 'progress_bar', 'silent'] + return super(TrainableAlgorithm, self)._excluded_save_params() + ['_naming', '_custom_scalars', '_registered_ci', 'writer', 'progress_bar', 'silent'] def should_eval(self) -> bool: return self.eval_frequency is not None and self.num_timesteps % self.eval_frequency == 0 def learn(self, total_timesteps: int, eval_frequency=8192, eval_kwargs={}, **kwargs) -> "TrainableAlgorithm": """ Learn a policy :param total_timesteps: The total number of samples (env steps) to train on - :param eval_kwargs: stop_on_reward: Threshold of the mean 100 episode return to terminate training., record_video:bool=True, write_heatmaps:bool=True, run_test:bool=True :param **kwargs: further aguments are passed to the parent classes :return: the trained model """ - stop_on_reward = self.env.get_attr('reward_threshold')[0] if self.stop_on_reward else None - callback = EvaluationCallback(self, self.envs['test'], stop_on_reward=stop_on_reward, **eval_kwargs); - if 'callback' in kwargs: callback = CallbackList([kwargs.pop('callback'), callback]) - alg = self.__class__.__name__; total = self.num_timesteps+total_timesteps; stepsize = self.n_steps * self.n_envs; + total = self.num_timesteps+total_timesteps; stepsize = self.n_steps * self.n_envs; if eval_frequency is not None: self.eval_frequency = eval_frequency * self.n_envs // stepsize * stepsize or eval_frequency * self.n_envs # total = self.num_timesteps+total_timesteps # if eval_frequency is not None: self.eval_frequency = eval_frequency * self.n_envs # **2 hps = self.get_hparams(); hps.pop('seed'); hps.pop('num_timesteps'); - self.progress_bar = tqdm(total=total, unit="steps", postfix=[0,""], bar_format="{desc}[R: {postfix[0]:4.2f}][{bar}]({percentage:3.0f}%)[{n_fmt}/{total_fmt}@{rate_fmt}]") + # self.progress_bar = tqdm(total=total, unit="steps", postfix=[0,""], bar_format="{desc}[R: {postfix[0]:4.2f}][{bar}]({percentage:3.0f}%)[{n_fmt}/{total_fmt}@{rate_fmt}]") + metrics = "M: {postfix[0]:4.2f} | Q: {postfix[1]:4.2f} | D: {postfix[2]:4.2f}" + self.progress_bar = tqdm(total=total, unit="steps", postfix=[0,0,0,""], bar_format="{desc}["+metrics+"][{bar}]({percentage:3.0f}%)[{n_fmt}/{total_fmt}@{rate_fmt}]") self.progress_bar.update(self.num_timesteps); - model = super(TrainableAlgorithm, self).learn(total_timesteps=total_timesteps, callback=callback, **kwargs) + model = super(TrainableAlgorithm, self).learn(total_timesteps=total_timesteps, **kwargs) #callback=callback, self.progress_bar.close() return model def train(self, **kwargs) -> None: if not self.continue_training: return - # print(f"train {self.num_timesteps} | {self.eval_frequency}") - self.progress_bar.postfix[0] = np.mean([ep_info["r"] for ep_info in self.ep_info_buffer]) + self.progress_bar.postfix[0] = np.mean([ep_info["m"] for ep_info in self.ep_info_buffer]) + self.progress_bar.postfix[1] = np.mean([ep_info["q"] for ep_info in self.ep_info_buffer]) + self.progress_bar.postfix[2] = np.mean([ep_info["d"] for ep_info in self.ep_info_buffer]) + if self.should_eval(): self.progress_bar.update(self.eval_frequency); #n_steps summary, step = {}, self.num_timesteps diff --git a/algorithm/evaluation.py b/algorithm/evaluation.py deleted file mode 100644 index 2f9a133..0000000 --- a/algorithm/evaluation.py +++ /dev/null @@ -1,69 +0,0 @@ -import numpy as np -from stable_baselines3.common.base_class import BaseAlgorithm -from stable_baselines3.common.callbacks import BaseCallback -from torch.utils.tensorboard.writer import SummaryWriter -from algorithm.logging import write_hyperparameters - -class EvaluationCallback(BaseCallback): - """ Callback for evaluating an agent. - :param model: The model to be evaluated^ - :param eval_envs: A dict containing environments for testing the current model. - :param stop_on_reward: Whether to use early stopping. Defaults to True - :param reward_threshold: The reward threshold to stop at.""" - def __init__(self, model: BaseAlgorithm, eval_envs: dict, stop_on_reward:float=None, record_video:bool=True, run_test:bool=True): - super(EvaluationCallback, self).__init__(); self.model = model; self.writer: SummaryWriter = self.model.writer - self.eval_envs = eval_envs; self.record_video = record_video; self.run_test = run_test - self.stop_on_reward = lambda r: (stop_on_reward is not None and r >= stop_on_reward) or not self.model.continue_training - if stop_on_reward is not None: print(f"Stopping at {stop_on_reward}"); assert run_test, f"Can't stop on reward {stop_on_reward} without running test episodes" - if record_video: assert run_test, f"Can't record video without running test episodes" - - def _on_training_start(self): self.evaluate() - - def _on_rollout_end(self) -> None: - if self.writer == None: return - # Uncomment for early stopping based on 100-mean training return - mean_return = np.mean([ep_info["r"] for ep_info in self.model.ep_info_buffer]) - if self.stop_on_reward(mean_return): self.model.continue_training = False - if self.model.should_eval(): self.evaluate() - - def _on_step(self) -> bool: - """ Write timesteps to info & stop on reward threshold""" - [info['episode'].update({'t': self.model.num_timesteps}) for info in self.locals['infos'] if info.get('episode')] - return self.model.continue_training - - def _on_training_end(self) -> None: # No Early Stopping->Unkown, not reached (continue=True)->Failure, reached (stopped)->Success - if self.writer == None: return - status = 'STATUS_UNKNOWN' if not self.stop_on_reward else 'STATUS_FAILURE' if self.model.continue_training else 'STATUS_SUCCESS' - metrics = self.evaluate(); write_hyperparameters(self.model, list(metrics.keys()), status) - - def evaluate(self): - """Run evaluation & write hyperparameters, results & video to tensorboard. Args: - write_hp: Bool flag to use basic method for writing hyperparams for current evaluation, defaults to False - Returns: metrics: A dict of evaluation metrics, can be used to write custom hparams """ - step = self.model.num_timesteps - if not self.writer: return [] - metrics = {k:v for label, env in self.eval_envs.items() for k, v in self.run_eval(env, label, step).items()} - [self.writer.add_scalar(key, value, step) for key, value in metrics.items()]; self.writer.flush() - return metrics - - def run_eval(self, env, label: str, step: int): - metrics = {} - if self.run_test: - deterministic = False # not env.get_attr('spec')[0].nondeterministic - n_eval_episodes = 1 if not deterministic else 100 - n_envs = env.num_envs; episode_rewards = []; episode_counts = np.zeros(n_envs, dtype="int") - episode_count_targets = np.array([(n_eval_episodes + i) // n_envs for i in range(n_envs)], dtype="int") - observations = env.reset(); states = None; episode_starts = np.ones((env.num_envs,), dtype=bool) - while (episode_counts < episode_count_targets).any(): - actions, states = self.model.predict(observations, state=states, episode_start=episode_starts, deterministic=deterministic) - new_observations, _, dones, infos = env.step(actions) - for i in range(n_envs): - if episode_counts[i] < episode_count_targets[i]: - episode_starts[i] = dones[i] - if dones[i] and "episode" in infos[i].keys(): - episode_rewards.append(infos[i]["episode"]["r"]); episode_counts[i] += 1 - observations = new_observations - metrics[f"rewards/{label}"] = np.mean(episode_rewards) #np.std(episode_rewards) - if self.record_video: env.envs[0].write_video(self.writer, label, step) - self.writer.flush() - return metrics diff --git a/algorithm/factory.py b/algorithm/factory.py deleted file mode 100644 index b689612..0000000 --- a/algorithm/factory.py +++ /dev/null @@ -1,23 +0,0 @@ -import gymnasium as gym; import re -from circuit_designer.wrappers.monitor import Monitor -from stable_baselines3.common.vec_env import DummyVecEnv - -def _make(record_video=False, **spec): - def _init() -> gym.Env: return Monitor(gym.make(**spec), record_video=record_video) - return _init - -def named(env): - max_qubits = int(re.search('-q(\d+)', env).group(1)); env = re.sub('-q(\d+)', '', env) - max_depth = int(re.search('-d(\d+)', env).group(1)); env = re.sub('-d(\d+)', '', env) - return {'id': 'CircuitDesigner-v0', 'max_qubits': max_qubits, 'max_depth': max_depth, 'challenge': env} - -def make_vec(env, seed=None, n_envs=1, **kwargs): - spec = lambda rank: {**named(env), 'seed': seed+rank, **kwargs} - return DummyVecEnv([_make(**spec(i)) for i in range(n_envs)]) - -def factory(env_spec, n_train=4, **kwargs): - assert len(env_spec) > 0, 'Please specify at least one environment for training' - test_names = ['validation', *[f'evaluation-{i}' for i in range(len(env_spec)-1)]] - return { 'train': make_vec(env_spec[0], n_envs=n_train, **kwargs), - 'test': {name: make_vec(spec, render_mode='text', record_video=True, **kwargs) for name, spec in zip(test_names, env_spec)} - } \ No newline at end of file diff --git a/baselines/__init__.py b/baselines/__init__.py index e5c0405..66b89ad 100644 --- a/baselines/__init__.py +++ b/baselines/__init__.py @@ -2,5 +2,6 @@ from .ppo import PPO from .sac import SAC from .td3 import TD3 +from .evo.run import run_evo ALGS = ['A2C', 'PPO', 'SAC', 'TD3'] \ No newline at end of file diff --git a/baselines/evo/__init__.py b/baselines/evo/__init__.py new file mode 100644 index 0000000..059a1de --- /dev/null +++ b/baselines/evo/__init__.py @@ -0,0 +1,51 @@ +"""Genetic Algorithm Baseline from https://arxiv.org/pdf/2302.01303.pdf""" + +import datetime + +class CustomFitness: + def __init__(self): + self.custom_fitness = None + + def set_custom_fitness(self, fitness): + self.custom_fitness = fitness + +class Config: + def __init__(self, config): + for k, v in config.items(): setattr(self, k, v) + +custom_fitness = CustomFitness() + +params = Config(dict( + single_gate_flip_mutation_rate=0.3, + swap_control_qubit_mutation_rate=0.3, + mutate_n_qubit_mutation_rate=0.3, # rate for which the number of qbits gets mutated + mutate_n_gates_mutation_rate=0.3, # rate for which the number of gates gets mutated + swap_columns_mutation_rate=0.3, # mutation rate to swap columns + gate_parameters_mutation_rate=0.3, # rate for which gate parameters are mutated + single_point_crossover_rate=0.3, + multi_point_crossover_rate=0.3, + blockwise_crossover_rate=0, + n_generations=50, # number of generations the GA process is running + population_size=20, # size of the population for each generation + n_sub_populations=1, + offspring_rate=0.3, # the rate for which individuals an offspring gets produced + migration_rate=0.1, + migrate_every_n_generations=20, + fitness_function_name="custom", + parent_selection_method="random", # random (each individual is equally likely to be selected) / tournament + survivor_selection_method="strongest", # strongest (best n individuals always survive) / tournament + tournament_size=10, # number of individuals in a tournament + youngest_ratio=0, # percentage of the youngest individuals that should be kept per generation + single_parent=True, + calculate_diversity=False, + constant_n_qubits=None, # Only required if n_qubits must be constant + parameter_mutation='gaussian', # decides how the gate parameters are mutated, either 'uniform' or 'gaussian' + init_min_gates=0, # minimal gates that are used for each individual + init_max_gates=0, # maximal gates that are used for each individual + parameter_init=None, # constant for initialization of every parameter, if set to None then all are random + data_dim=0, # dimension of the data input + gatesets=[['cx', 'rx', 'cp', 'p']], + data_gates=[], # the parametrized gates that can be data (reuploading) gates, if empty then no reupload will happen + experiment_date=datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'), + log_every_n_generation=20 +)) diff --git a/baselines/evo/__main__.py b/baselines/evo/__main__.py new file mode 100644 index 0000000..040d423 --- /dev/null +++ b/baselines/evo/__main__.py @@ -0,0 +1,7 @@ +import sys +from evo.run import run_evo + +res = [] +print(f"Running GA Baseline for {sys.argv[1]}") +for seed in range(8): res.append(run_evo(sys.argv[1], seed)) +for k in res[0].keys(): print(f"{k}: {sum([r[k] for r in res])/len(res)}") diff --git a/baselines/evo/crossover.py b/baselines/evo/crossover.py new file mode 100644 index 0000000..a786446 --- /dev/null +++ b/baselines/evo/crossover.py @@ -0,0 +1,399 @@ +from __future__ import annotations + +import random +from copy import deepcopy +import numpy as np + +from evo import params +from evo.individual import Individual +from typing import List + +from evo.gate import * +from evo.individual import Individual + +def apply_crossover(original_parents: List[Individual]) -> tuple[List[Individual], bool]: + crossover_applied, children_solutions = False, [] + parents = get_crossover_parents(original_parents) + children = [Individual(is_empty=True), Individual(is_empty=True)] + min_gates, max_gates = get_min_max_gates_in_parents(parents) + if min_gates < 2: + return children_solutions, crossover_applied + min_qubits, max_qubits = get_min_max_qubits_in_parents(parents) + if random.random() < params.single_point_crossover_rate: + single_point_crossover_children = single_point_crossover(deepcopy(parents), deepcopy(children), + [parent.solution for parent in parents], + min_gates, min_qubits, max_qubits) + for child in single_point_crossover_children: + children_solutions.append(child) + validate_solution_metadata(child.solution) + + crossover_applied = True + if random.random() < params.multi_point_crossover_rate: + if min_gates < 3: + return children_solutions, crossover_applied + else: + multi_point_crossover_children = multi_point_crossover(deepcopy(parents), deepcopy(children), + [parent.solution for parent in parents], + min_gates, min_qubits, max_qubits) + for child in multi_point_crossover_children: + children_solutions.append(child) + validate_solution_metadata(child.solution) + crossover_applied = True + if random.random() < params.blockwise_crossover_rate: + blockwise_crossover_children = blockwise_crossover(deepcopy(parents), deepcopy(children), + [parent.solution for parent in parents], + min_gates, min_qubits, max_qubits) + for child in blockwise_crossover_children: + children_solutions.append(child) + for idx_qubit, q in enumerate(child.solution): + for idx_gate, g in enumerate(q): + if g.qubit_id != idx_qubit: + print( + f"Error in blockwise_crossover_children: gate {g.name} on the qubit {idx_qubit} has inconsistent metadata . ") + for child in blockwise_crossover_children: + for idx_qubit, q in enumerate(child.solution): + for idx_gate, g in enumerate(q): + if len(g.affected_qubits) > 1: + affected = g.affected_qubits + for a in affected: + if child.solution[a][idx_gate].name == "id": + print("Inconsistent Circuit! after the blockwise crossover") + + crossover_applied = True + # choose gateset for child from random parent + for child in children_solutions: + child.gateset = parents[random.randrange(len(parents))].gateset + return children_solutions, crossover_applied + + +def get_crossover_parents(original_parents: list[Individual]): + parents = deepcopy(original_parents) + return parents + + +# def validate_solution_metadata(solution): +# for idx_qubit, q in enumerate(solution): +# for idx_gate, g in enumerate(q): +# validate_qubit_id(g, idx_qubit) +# validate_affected_qubits(solution, g, idx_gate) + + +# def validate_qubit_id(g, idx_qubit): +# if g.qubit_id != idx_qubit: +# raise AttributeError( +# f"Error during the crossover has been occurred: gate {g.name} on the qubit {idx_qubit} " +# f"has inconsistent metadata . ") + + +# def validate_affected_qubits(solution, g, idx_gate): +# if len(g.affected_qubits) > 1: +# affected = g.affected_qubits +# for a in affected: +# if solution[a][idx_gate].name == "id": +# print(solution[a][idx_gate].name) +# raise ValueError("Error in generating multi-qubit gates during the crossover!") + + +def single_point_crossover(parents: List[Individual], children: List[Individual], parent_solutions: list[list], + min_gates: int, min_qubits: int, max_qubits: int) -> list[Individual]: + """ + Crossover on a single point + p1: ------------| + p2: |--------------- + """ + children_solutions, smaller_parent_idx, offset = init_crossover_variables(parents, min_qubits, max_qubits) + # find crossover point + splitting_point = random.randint(1, min_gates) + adjust_gates_of_the_smaller_parent(parents, parent_solutions, smaller_parent_idx, offset) + pad_smaller_circuit_with_additional_qubits(parent_solutions, smaller_parent_idx, parents, offset, min_qubits, + max_qubits) + # crossover + for qubit in range(max_qubits): + children_solutions[0].append(deepcopy( + parent_solutions[0][qubit][:splitting_point]) + deepcopy(parent_solutions[1][qubit][splitting_point:])) + children_solutions[1].append(deepcopy( + parent_solutions[1][qubit][:splitting_point]) + deepcopy(parent_solutions[0][qubit][splitting_point:])) + + final_adjustments_in_each_child(children, children_solutions, max_qubits, blockwise_cross=False) + return children + + +def init_crossover_variables(parents: List[Individual], min_qubits: int, max_qubits: int): + children_solutions = [[], []] + smaller_parent_idx = get_smaller_parent_idx(parents, min_qubits) + offset = random.randint(0, max_qubits - min_qubits) + return children_solutions, smaller_parent_idx, offset + + +def multi_point_crossover(parents: List[Individual], children: List[Individual], parent_solutions: list[list], + min_gates: int, min_qubits: int, max_qubits: int) -> list[Individual]: + """ + Crossover on multiple points + p1: |---------| |------ + p2: --| |---------| + """ + children_solutions, smaller_parent_idx, offset = init_crossover_variables(parents, min_qubits, max_qubits) + pad_smaller_parent_with_dummy_qubits(parents, parent_solutions, smaller_parent_idx, offset) + pad_smaller_circuit_with_additional_qubits(parent_solutions, smaller_parent_idx, parents, offset, min_qubits, + max_qubits) + + indices = list(range(1, min_gates)) + num_splitting_points = random.randint(2, len(indices)) + choices = random.choices(indices, k=num_splitting_points) + splitting_points = list(set(choices)) + + # crossover + for qubit in range(max_qubits): + p_s_0 = parent_solutions[0][qubit] + p_s_1 = parent_solutions[1][qubit] + qubit_splits = [_split_list(p_s_0, splitting_points), + _split_list(p_s_1, splitting_points)] + children_solutions[0].append([]) + children_solutions[1].append([]) + + for i, splits in enumerate(zip(qubit_splits[0], qubit_splits[1])): + children_solutions[0][-1].extend(splits[i % 2]) + children_solutions[1][-1].extend(splits[(i + 1) % 2]) + + final_adjustments_in_each_child(children, children_solutions, max_qubits, blockwise_cross=False) + + return children + + +def blockwise_crossover(parents: List[Individual], children: List[Individual], parent_solutions: list[list], + min_gates: int, min_qubits: int, max_qubits: int) -> Individual | list[Individual]: + """ + Block includes multiple qubits and multiple gates; exact number is dynamic + """ + children_solutions, smaller_parent_idx, offset = init_crossover_variables(parents, min_qubits, max_qubits) + pad_smaller_parent_with_dummy_qubits(parents, parent_solutions, smaller_parent_idx, offset) + pad_smaller_circuit_with_additional_qubits(parent_solutions, smaller_parent_idx, + parents, offset, min_qubits, max_qubits) + + # pad qubits with identity gate to meet max_gates + max_gates = max([len(q) for solution in parent_solutions for q in solution]) + for solution in parent_solutions: + for i, qubit in enumerate(solution): + if len(qubit) < max_gates: + for _ in range(max_gates - len(qubit)): + qubit.append(get_identity_gate(i)) + + # find crossover points + # TODO: find good parameter to find a reasonable number of splits + indices = list(range(1, max_gates)) # max_gates since the smallest parent is padded + num_splitting_points = random.randint(1, len(indices)) + num_blocks = num_splitting_points + 1 + choices = random.choices(indices, k=num_splitting_points) + splitting_points = list(set(choices)) + qubits_in_blocks = [] + for b in range(num_blocks): + nr_of_concerned_qubits = random.randint(0, max_qubits) + qubits_in_block = np.random.choice(max_qubits, nr_of_concerned_qubits, replace=False) + qubits_in_blocks.append(qubits_in_block) + + # crossover + for qubit in range(max_qubits): + qubit_splits = [_split_list(deepcopy(parent_solutions[0][qubit]), splitting_points), + _split_list(deepcopy(parent_solutions[1][qubit]), splitting_points)] + + children_solutions[0].append([]) + children_solutions[1].append([]) + + for i, splits in enumerate(zip(qubit_splits[0], qubit_splits[1])): + if qubit in qubits_in_blocks[i]: + children_solutions[0][-1].extend(deepcopy(splits[1])) + children_solutions[1][-1].extend(deepcopy(splits[0])) + else: + children_solutions[0][-1].extend(deepcopy(splits[0])) + children_solutions[1][-1].extend(deepcopy(splits[1])) + + # fix multi-qubit gates + for c, solution in enumerate(children_solutions): + for q in range(max_qubits): + for g in range(len(solution[q]) - 1, -1, -1): + gate = deepcopy(solution[q][g]) + if len(gate.affected_qubits) <= 1: + continue # if this gate is not a multi-qubit gate + + modified_qubits = [] + for a in gate.affected_qubits: + if a != q: + # check if gate on qubit is another multi-qubit gate + # if yes: remove that + gate_on_affected_qubit = deepcopy(solution[a][g]) + if gate.name != gate_on_affected_qubit.name and len(gate_on_affected_qubit.affected_qubits) > 1: + for other_gate_affected_qubit in gate_on_affected_qubit.affected_qubits: + if solution[other_gate_affected_qubit][g] == gate_on_affected_qubit: + solution[other_gate_affected_qubit].insert(g + 1, + get_identity_gate( + other_gate_affected_qubit)) + solution[other_gate_affected_qubit].remove(gate_on_affected_qubit) + # TODO: need validation + elif gate.name != gate_on_affected_qubit.name: + print("TODO Blockwise crossover: need validation") + elif gate.affected_qubits != gate_on_affected_qubit.affected_qubits: + print("TODO Blockwise crossover: need validation") + elif gate.control_qubits != gate_on_affected_qubit.control_qubits: + print("TODO Blockwise crossover: need validation") + elif gate.target_qubits != gate_on_affected_qubit.target_qubits: + continue + if len(modified_qubits) > 0: + for i in range(max_qubits): + if i not in modified_qubits: + solution[i].insert(g + 1, get_identity_gate(i)) + final_adjustments_in_each_child(children, children_solutions, max_qubits, blockwise_cross=True) + return children + + +def get_identity_gate(qubit_id: int) -> Gate: + return Gate(name="id", qubit_id=qubit_id, affected_qubits=[qubit_id], target_qubits=[], control_qubits=[], + parameters=[]) + + +def get_max_gates_for_solutions(solutions: list): + return max([len(q) for q in solutions]) + + +def get_min_max_gates_in_parents(parents: List[Individual]) -> tuple[int, int]: + return min([p.n_gates_per_qubit for p in parents]), max([p.n_gates_per_qubit for p in parents]) + + +def get_min_max_qubits_in_parents(parents: List[Individual]): + return min([p.n_qubits for p in parents]), max([p.n_qubits for p in parents]) + + +def get_smaller_parent_idx(parents: List[Individual], min_qubits: int) -> int: + return 0 if parents[0].n_qubits == min_qubits else 1 + + +def set_n_qubits_and_n_gates(child: Individual, max_qubits: int, max_gates: int): + child.n_qubits = max_qubits + child.n_gates_per_qubit = max_gates + + +def prune_identity_gates(solution: List[List[Gate]]) -> List[List[Gate]]: + n_gates = max([len(qubit) for qubit in solution]) + prunable_columns = [] + + for column in range(0, n_gates): + all_ids_in_column = True + + for qubit in solution: + if not qubit[column].name == "id": + all_ids_in_column = False + + if all_ids_in_column: + prunable_columns.append(column) + + for qubit in solution: + for column in range(len(qubit) - 1, -1, -1): + if column in prunable_columns: + qubit.remove(qubit[column]) + + return solution + + +def adjust_gate_metadata(gate: Gate, q: int, offset: int): + gate.qubit_id = q + offset + gate.target_qubits = [t + offset for t in gate.target_qubits] + gate.control_qubits = [c + offset for c in gate.control_qubits] + gate.affected_qubits = [a + offset for a in gate.affected_qubits] + + +def final_adjustments_in_each_child(children: List[Individual], children_solutions: list[list], + max_qubits: int, blockwise_cross: bool = False): + for c, child in enumerate(children): + max_gates = get_max_gates_for_solutions(children_solutions[c]) + set_n_qubits_and_n_gates(child, max_qubits, max_gates) + pad_qubits_with_id_gate(children_solutions, max_gates, c) # to meet max_gates + if blockwise_cross: + children_solutions[c] = prune_identity_gates(children_solutions[c]) + max_gates = get_max_gates_for_solutions(children_solutions[c]) + child.n_gates_per_qubit = max_gates + fix_gates_metadata(children_solutions[c]) + child.solution = children_solutions[c] + + +def fix_gates_metadata(child): + for idx_qubit, q in enumerate(child): + for idx_gate, g in enumerate(q): + if g.qubit_id != idx_qubit: + g.qubit_id = idx_qubit + + +def adjust_gates_of_the_smaller_parent(parents: List[Individual], parent_solutions: list[list], + smaller_parent_idx: int, offset: int): + for column in range(parents[smaller_parent_idx].n_gates_per_qubit): + visited_gates = [] + + for q, qubit in enumerate(parent_solutions[smaller_parent_idx]): + gate = parent_solutions[smaller_parent_idx][q][column] + + if gate in visited_gates: # if this gate has been processed already: skip + continue + + adjust_gate_metadata(gate, q, offset) + visited_gates.append(gate) + + +def pad_smaller_parent_with_dummy_qubits(parents: List[Individual], parent_solutions: list[list], + smaller_parent_idx: int, offset: int): + for column in range(parents[smaller_parent_idx].n_gates_per_qubit): + visited_gates = [] + + for q, qubit in enumerate(parent_solutions[smaller_parent_idx]): + gate = qubit[column] + if gate in visited_gates: # if this gate has been processed already: skip + continue + adjust_gate_metadata(gate, q, offset) + visited_gates.append(gate) + + +def pad_smaller_circuit_with_additional_qubits(parent_solutions: list[list], smaller_parent_idx: int, + parents: List[Individual], offset: int, + min_qubits: int, max_qubits: int): + parent_solutions[smaller_parent_idx] = [[get_identity_gate(i) for _ in + range(parents[smaller_parent_idx].n_gates_per_qubit)] for i in + range(offset)] + parent_solutions[smaller_parent_idx] + [ + [get_identity_gate(i) for _ in + range(parents[smaller_parent_idx].n_gates_per_qubit)] + for i in range(offset + min_qubits, max_qubits)] + + +def pad_qubits_with_id_gate(children_solutions: list, max_gates: int, c: int): + for i, qubit in enumerate(children_solutions[c]): + for _ in range(max_gates - len(qubit)): + qubit.append( + get_identity_gate(i)) + + +def check_qubit_and_gate_validity(child: Individual, qubit_id: int, column_id: int, parent: Individual) -> bool: + if parent.n_qubits <= qubit_id or parent.n_gates_per_qubit <= column_id: + return False + if parent.solution[qubit_id][column_id].target_id is not None and parent.solution[qubit_id][ + column_id].target_id >= child.n_qubits: + return False + return True + + +def _split_list(split_list: List, idx: List[int]): + """ + :param split_list: list to be splitted + :param idx: list of indeces where to split at + :return: list of splits + + Splits a given list at points defined in idx + """ + + l = [el for el in split_list] + + result = [] + for i in reversed(idx): + split = l[i:] + if len(split) > 0: + result.append(split) + l = l[:i] + if len(l) > 0: + result.append(l) + + return [el for el in reversed(result)] diff --git a/baselines/evo/evolution.py b/baselines/evo/evolution.py new file mode 100644 index 0000000..eb976a3 --- /dev/null +++ b/baselines/evo/evolution.py @@ -0,0 +1,129 @@ +import uuid + +from evo.individual import Individual +from evo.crossover import * +from evo.mutation import * +from evo.selection import tournament_selection, roulette_wheel_selection, rejuvenate_population +from evo.population import migration +import numpy as np + +from typing import List +from evo.population import update_population_statistics + +class Evolution: + + def __init__(self, population: List[Individual]): + self.population = population + self.n_children = int(int(params.population_size / params.n_sub_populations) * params.offspring_rate) + + def begin_evolution(self, metrics={}, **kw_args): + stats = {k:[] for k in metrics}; m = int(100 / params.n_generations); assert m < params.population_size + best_fitness_per_gen, avg_fitness_per_gen, avg_age_per_gen = [[] for _ in range(params.n_sub_populations)], [[] for _ in range(params.n_sub_populations)], [[] for _ in range(params.n_sub_populations)] + diversity_per_gen = [[] for _ in range(params.n_sub_populations)] if params.calculate_diversity else None + + for current_gen in range(params.n_generations): + if params.migrate_every_n_generations != 0 and current_gen % params.migrate_every_n_generations == 0: + migration(self.population, migration_rate=params.migration_rate) + + for sub_pop_indx, population in enumerate(self.population): + + # Sort population by fitness + population.sort(key=lambda x: x.calculate_fitness(**kw_args), reverse=True) + for k,f in metrics.items(): + # stats[k].append(f(population[0])) + stats[k].extend([f(i) for i in population[:m]]) + + if diversity_per_gen is not None: + update_population_statistics(population, current_gen, best_fitness_per_gen[sub_pop_indx], + avg_fitness_per_gen[sub_pop_indx], avg_age_per_gen[sub_pop_indx], + diversity_per_gen[sub_pop_indx]) + else: update_population_statistics(population, current_gen, best_fitness_per_gen[sub_pop_indx], + avg_fitness_per_gen[sub_pop_indx], avg_age_per_gen[sub_pop_indx], + None) + + # Produce new individuals + children = self.produce_offspring(population=population, current_gen=current_gen, **kw_args) + + if params.youngest_ratio > 0: old_population = self.population.copy() + + # Determine individuals that will be part of the next generation + population = self.replacement(population, children, **kw_args) + # keep the youngest individuals if parameter is set + if params.youngest_ratio > 0: + n_youngest = int(params.youngest_ratio * params.population_size) + population = rejuvenate_population(population, old_population, children, n_youngest, **kw_args) + + # Sort last population by fitness and return best solution + population = self.population[-1] + population.sort(key=lambda x: x.calculate_fitness(**kw_args), reverse=True) + return stats, population + # mean_fitness = np.array([i.fitness for p in self.population for i in p]).mean() + # return population, mean_fitness + + def produce_offspring(self, population: List[Individual], current_gen: int = 0, **kw_args) -> \ + List[Individual]: + children = [] + while len(children) < self.n_children: + parents = self.select_parents(population, **kw_args) + children_created = self.create_children(parents) + for child in children_created: children.append(child) + for child in children: + apply_mutations(child) + child.solution = child.solution # to sync also with qc + child.set_generated_in_generation(current_gen + 1) + child.id = uuid.uuid4() + + return children + + def select_parents(self, population, **kw_args) -> List[Individual]: + """Select parents following the selection method specified in evolution parameters. + """ + if params.parent_selection_method == "random": + parents = [random.choice(population), random.choice(population)] + elif params.parent_selection_method == "tournament": + parents = tournament_selection(population, n_to_select=2, + tournament_size=params.tournament_size, + **kw_args) + elif params.parent_selection_method == "roulette_wheel": + parents = roulette_wheel_selection(population, size=2, **kw_args) + else: + raise ValueError("Invalid parent selection method!") + return parents + + @staticmethod + def create_children(parents: List[Individual]) -> List[Individual]: + """Create a new child by recombining the genome of two parent individuals + + Args: + parents (:obj:`list` of :obj:`Individual`): parent individuals + + Returns: + :obj:`Individual`: Returns the generated child individual. + """ + children, crossover_applied = apply_crossover(parents) + if not crossover_applied: + if random.random() < params.single_parent: + child = Individual() + random_parent = random.choice(parents) + child.n_gates_per_qubit = random_parent.n_gates_per_qubit + child.n_qubits = random_parent.n_qubits + child.solution = deepcopy(random_parent.solution) + child.gateset = random_parent.gateset + else: + child = Individual(max_qubits=parents[0].n_qubits) + children.append(child) + return children + + def replacement(self, population, children, **kw_args): + if params.survivor_selection_method == "strongest": + for idx in range(self.n_children): + population[-idx - 1] = children[idx] + elif params.survivor_selection_method == "tournament": + population = tournament_selection(population + children, + n_to_select=params.population_size, + tournament_size=params.tournament_size, + **kw_args) + else: + raise ValueError("Invalid survivor selection method!") + return population + diff --git a/baselines/evo/experiment.py b/baselines/evo/experiment.py new file mode 100644 index 0000000..9fcff90 --- /dev/null +++ b/baselines/evo/experiment.py @@ -0,0 +1,21 @@ +from evo import params, custom_fitness +from evo.individual import Individual +from evo.population import divide_population_into_islands +from evo.evolution import Evolution + + +class Experiment: + + def __init__(self, fitness, **kwargs): + self.kwargs = kwargs + custom_fitness.set_custom_fitness(fitness) + self.population = self.generate_initial_population() + self.evolution = Evolution(self.population) + + def generate_initial_population(self, log_parameters=True): + population = [Individual(params=params) for _ in range(params.population_size)] + population = divide_population_into_islands(population, params.n_sub_populations) + return population + + def run_experiments(self): + return self.evolution.begin_evolution(**self.kwargs) diff --git a/baselines/evo/gate.py b/baselines/evo/gate.py new file mode 100644 index 0000000..3becd41 --- /dev/null +++ b/baselines/evo/gate.py @@ -0,0 +1,313 @@ +import random +import math +import pandas as pd + +from evo import params + +g_d = {'cx': [2, 0, 1], 'rx': [1, 1, 0], 'cp': [2, 1, 0], 'p': [1, 1, 0]} +gates_metadata = pd.DataFrame.from_dict(g_d, orient='index', columns=['qubits', 'parameters', 'controls']) +gate_names = g_d.keys() + + + +class Gate: + + def __init__(self, name: str, qubit_id: int, affected_qubits: list, target_qubits: list = [], + parameters: list = None, + control_qubits: list = [], data_ids=None): + self._name = name + self._qubit_id = qubit_id + self._affected_qubits = affected_qubits + self._target_qubits = target_qubits + self._parameters = parameters + self._control_qubits = control_qubits + self._data_ids = data_ids + + def __str__(self): + return self.name + + def __repr__(self): + return self.__str__() + + @property + def name(self) -> str: + """ Gate name. """ + return self._name + + @name.setter + def name(self, value): + self._name = value + + @property + def qubit_id(self) -> int: + """ IDs of the qubit the gate originally acts on. """ + return self._qubit_id + + @qubit_id.setter + def qubit_id(self, value): + self._qubit_id = value + + @property + def is_data_gate(self): + if self._data_ids is None: + return False + else: + return True + + @property + def data_ids(self): + return self._data_ids + + @property + def affected_qubits(self) -> list: + """ IDs of the qubits the gate acts on. + + Example: `swap` gate applied on the qubits `0` and `1` -> `affected_qubits = [0, 1]` + """ + return self._affected_qubits + + @affected_qubits.setter + def affected_qubits(self, value): + self._affected_qubits = value + + @property + def target_qubits(self) -> list: + """ IDs of target qubits. Specified only by controlled gates otherwise an empty list.""" + return self._target_qubits + + @target_qubits.setter + def target_qubits(self, value): + self._target_qubits = value + + @property + def parameters(self) -> list: + """ List of parameters for the gate. (e.g. rotation parameters). + Specified only by parametrized gates otherwise `None`.""" + return self._parameters + + @parameters.setter + def parameters(self, value): + self._parameters = value + + @property + def control_qubits(self) -> list: + """ IDs of the control qubits for the gate. + Specified only by controlled gates otherwise an empty list.""" + return self._control_qubits + + @control_qubits.setter + def control_qubits(self, value): + self._control_qubits = value + + + +def get_gates_metadata(gateset): + unavailable_gates = list(set(gate_names) - set(params.gatesets[gateset])) + return gates_metadata.drop(unavailable_gates) + + +def create_random_gate(n_qubits, qubit_id, gateset, max_affected_qubits=1, excluded_qubits=None, max_random_iterations=20): + """ + :qubit_id - index of the qubit in the circuit + :max_affected_qubits - only single qubit gates if 1, else if n - random choice of the 1...n-qubits gate + :excluded_qubits - ids (idxes) of the qubits that can not be used additional to the qubit with qubit_id for + the gate generation, e.g. as controlled qubits + :max_random_iteration - to prevent infinite or long random loops + """ + data_id = None + if excluded_qubits is None: + excluded_qubits = [] + max_n_qubits = get_max_nr_qubits_in_gate(gateset) + nr_qubits_for_gate = 1 if max_affected_qubits == 1 else random.randint(1, + min(max_affected_qubits, + max_n_qubits)) + gate_name, nr_qubits, nr_parameters, nr_controls = choose_random_gate(nr_qubits_for_gate, gateset) + affected_qubits, target_qubits, control_qubits, parameters = [qubit_id], [], [], [None] * nr_parameters + if nr_qubits > 1: + affected_qubits = specify_affected_qubits(n_qubits_circuit=n_qubits, affected_qubits=affected_qubits, + nr_qubits_gate=nr_qubits, excluded_qubits=excluded_qubits, + max_random_iterations=max_random_iterations) + if nr_controls > 0: + target_qubits = [random.choice(affected_qubits)] # currently only single target qubit + for tq in target_qubits: + excluded_qubits.append(tq) + control_qubits = specify_control_qubits(affected_qubits=affected_qubits, excluded_qubits=excluded_qubits, + control_qubits=control_qubits, nr_controls=nr_controls, + max_random_iterations=max_random_iterations) + elif nr_parameters > 0: + parameters = specify_parameters(parameters=parameters, nr_parameters=nr_parameters) + #TODO probability for data gates + if len(params.data_gates) > 0 \ + and (gate_name in params.data_gates) and random.random() > 0.5: + data_id = [random.randint(0, params.data_dim-1)] + print(params.data_dim) + assert False + return Gate(name=gate_name, qubit_id=qubit_id, affected_qubits=affected_qubits, + target_qubits=target_qubits, parameters=parameters, control_qubits=control_qubits, data_ids=data_id) + + +def get_max_nr_qubits_in_gate(gateset): + gates_metadata = get_gates_metadata(gateset) + max_q = gates_metadata.iloc[gates_metadata.qubits.argmax(), 0] + return max_q + + +def get_gate_metadata_by_name(name, gateset): + gates_metadata = get_gates_metadata(gateset) + nr_qubits, nr_parameters, nr_controls = gates_metadata.loc[[name], :].values[0] + return nr_qubits, nr_parameters, nr_controls + + +def get_set_of_gates_by_nr_of_qubits(nr_qubits, gateset): + gates_metadata = get_gates_metadata(gateset) + gates_by_nr_of_qubits = gates_metadata.index[gates_metadata['qubits'] == nr_qubits].tolist() + available_gates = list(set(params.gatesets[gateset]).intersection(gates_by_nr_of_qubits)) + return available_gates + + +def choose_random_gate(nr_qubits, gateset): + gates_ = get_set_of_gates_by_nr_of_qubits(nr_qubits, gateset) + gate_name = random.choice(sorted(gates_)) + nr_qubits, nr_parameters, nr_controls = get_gate_metadata_by_name(gate_name, gateset) + return gate_name, nr_qubits, nr_parameters, nr_controls + + +def specify_affected_qubits(n_qubits_circuit, affected_qubits, nr_qubits_gate, excluded_qubits, max_random_iterations): + it = 0 + while len(affected_qubits) != nr_qubits_gate: + it += 1 + q = random.choice([idx for idx in range(n_qubits_circuit)]) + if q not in affected_qubits and q not in excluded_qubits: + affected_qubits.append(q) + # prevent long loop + if it == max_random_iterations: + for q in range(0, n_qubits_circuit): + if q not in affected_qubits and q not in excluded_qubits: + affected_qubits.append(q) + # validate + if len(affected_qubits) != nr_qubits_gate: + pass + else: + break + if len(affected_qubits) != nr_qubits_gate: + raise Exception( + f'ERROR: The length of the specified affected qubits is unequal to required number of qubits for' + f' the gate. Affected qubits: {affected_qubits}, nr of qubits in gate: {nr_qubits_gate}') + break + return affected_qubits + + +def specify_control_qubits(affected_qubits, excluded_qubits, control_qubits, nr_controls, max_random_iterations): + it = 0 + while len(control_qubits) != nr_controls: # multi-controlled operations are allowed + it += 1 + q = random.choice(affected_qubits) + if q not in control_qubits and q not in excluded_qubits: + control_qubits.append(q) + # prevent long loop + if it == max_random_iterations: + for q in affected_qubits: + if q not in control_qubits and q not in excluded_qubits: + control_qubits.append(q) + # validate + if len(control_qubits) != nr_controls: + raise Exception( + f'ERROR: The length of the specified controlled qubits is unequal to the number of the required ' + f'controlled qubits. Control qubits: {control_qubits}, number of required control qubits: {nr_controls}') + return control_qubits + + +def specify_parameters(parameters, nr_parameters, constant=params.parameter_init): + if len(parameters) == 0: + parameters = [None] * nr_parameters + if len(parameters) < nr_parameters: + for nr_p in range(len(parameters), nr_parameters): + parameters.append(None) + for p in range(0, nr_parameters): + if constant is None: + parameters[p] = random.uniform(-math.pi, math.pi) + else: + parameters[p] = constant + return parameters + + +def exclude_already_handled_qubits_in_column(circuit, qubit_id, column_id): + excluded_qubits = [] + # restriction 1: exclude previous qubits in this column, it means all (qubit_id -n) qubits, where n = 1..qubit_id. + for q in range(0, qubit_id): + excluded_qubits.append(q) + # restriction 2: exclude all following qubits that are already occupied, e.g. used for multiple-qubit gates + for qb in range(qubit_id, len(circuit)): + if not circuit[qb][column_id] is None: + excluded_qubits.append(qb) + return excluded_qubits + + +def get_gates_for_qubit(circuit, qubit_id): + gates = [] + for circuit_data in circuit.data: + affected_qubits = [] + for q in circuit_data[1]: + affected_qubits.append(q._index) + if circuit_data[1][0]._index == qubit_id or qubit_id in affected_qubits: + target_qubits = [] if circuit_data[0].num_qubits == 1 else [affected_qubits[-1]] + control_qubits = [] if circuit_data[0].num_qubits == 1 else [affected_qubits[idx] for idx in + range(len(circuit_data[1]) - 1)] + gate_data = {'name': circuit_data[0].name, 'type': circuit_data[0].num_qubits - 1, + 'qubit_id': qubit_id, 'affected_qubits': affected_qubits, 'target_qubits': target_qubits, + 'parameters': circuit_data[0].params, 'control_qubits': control_qubits} + gates.append(gate_data) + return gates + + +def validate_solution_metadata(solution): + for idx_qubit, q in enumerate(solution): + for idx_gate, g in enumerate(q): + validate_qubit_id(g, idx_qubit) + validate_affected_qubits(solution, g, idx_gate) + validate_control_qubits_vs_name(g) + + +def validate_control_qubits_vs_name(g): + if g.name == "id" and len(g.control_qubits) > 0: + raise AttributeError( + f'Error in Solutions Setter (Individual): {g.name}, {g.control_qubits}, {g.target_qubits}') + + +def validate_qubit_id(g, idx_qubit): + if g.qubit_id != idx_qubit: + raise AttributeError( + f"Error in individual solution: gate {g.name} on the qubit {idx_qubit} " + f"has inconsistent metadata . ") + + +def validate_affected_qubits(solution, g, idx_gate): + if len(g.affected_qubits) > 1: + for affected_qubit in g.affected_qubits: + validate_id_gate_is_not_in_affected_qubits(solution, affected_qubit, g, idx_gate) + validate_names_of_affected(solution, affected_qubit, g, idx_gate) + if len(g.control_qubits) > 0: + validate_affected_vs_control_and_target_qubits(solution, affected_qubit, len(g.affected_qubits), + idx_gate) + + +def validate_id_gate_is_not_in_affected_qubits(solution, affected_qubit, g, idx_gate): + if solution[affected_qubit][idx_gate].name == "id": + raise ValueError(f'Error in generating multi-qubit gates! ' + f'Expected {g.name}, got {solution[affected_qubit][idx_gate].name}') + + +def validate_affected_vs_control_and_target_qubits(solution, affected_qubit, len_affected, idx_gate): + len_control_and_target = len(solution[affected_qubit][idx_gate].control_qubits) + len( + solution[affected_qubit][idx_gate].target_qubits) + if len_affected != len_control_and_target: + raise ValueError(f'Error in generating multi-qubit gates! ' + f'Number of the affected qubits is not equal the sum of the control and target qubits: ' + f'expected {len_affected} got {len_control_and_target}') + + +def validate_names_of_affected(solution, affected_qubit, g, idx_gate): + if g.name != solution[affected_qubit][idx_gate].name: + raise ValueError( + f'Error in generating multi-qubit gates! Gate names are inconsistent: expected {g} got ' + f'{solution[affected_qubit][idx_gate].name} ') diff --git a/baselines/evo/individual.py b/baselines/evo/individual.py new file mode 100644 index 0000000..4f4464d --- /dev/null +++ b/baselines/evo/individual.py @@ -0,0 +1,225 @@ +import random +import uuid +from copy import deepcopy +from typing import Callable +from qiskit import QuantumCircuit + +from evo import params as evo_params, custom_fitness +from evo.gate import * + + +class Individual: + + def __init__(self, max_qubits: int = 5, params: dict = None, + generated_in_generation: int = 0, is_empty=False, initial_solution: list = None, gateset: int = None): + self._id = uuid.uuid4() + params = params or evo_params + if gateset is None: + self._gateset = random.randrange(len(params.gatesets)) + else: self._gateset = gateset + self.fitness = None + assert custom_fitness.custom_fitness + self.fitness_function = custom_fitness.custom_fitness + self.generated_in_generation = generated_in_generation + if is_empty: pass + else: + self.n_qubits = params.constant_n_qubits if params.constant_n_qubits is not None else \ + random.randint(1, max_qubits) + self.n_gates_per_qubit = random.randint(params.init_min_gates, params.init_max_gates) + if initial_solution is not None: + self.n_qubits = len(initial_solution) + self.n_gates_per_qubit = len(initial_solution[0]) + self.solution = initial_solution + else: + self.solution = self.create_solution() + self._qc = self.transform_to_executable_circuit(self.solution) + + def set_fitness_function(self, fitness_function: Callable): + self.fitness_function = fitness_function + + def get_fitness_function(self): + return self.fitness_function + + def get_data_gates(self): + data_gates = [] + for qubit_id, gates in enumerate(self.solution): + for column_id, gate in enumerate(gates): + if gate.is_data_gate: + data_gates.append({'qubit_id': qubit_id, 'column_id': column_id, 'data_ids': gate.data_ids}) + return data_gates + + def calculate_fitness(self, **kw_args): + if self.fitness is None: + self.fitness = self.fitness_function(self, **kw_args) + return self.fitness + + def generated_in_generation(self) -> int: + return self.generated_in_generation + + def set_generated_in_generation(self, gen): + self.generated_in_generation = gen + + @property + def id(self): + """Individual id. + """ + return self._id + + @id.setter + def id(self, value): + self._id = value + + @property + def solution(self) -> list: + """Individual (quantum circuit) as nested `n x m` list (`n` qubits, `m` gates). + """ + return self._solution + + @solution.setter + def solution(self, value: list): + validate_solution_metadata(value) + self._solution = value + self.qc = self.transform_to_executable_circuit(self._solution) + + @property + def qc(self) -> QuantumCircuit: + """An executable individual (qiskit circuit).""" + return self._qc + + @qc.setter + def qc(self, value: QuantumCircuit): + self._qc = value + + @property + def depth(self) -> int: + """Depth of the executable individual. + """ + return self._qc.depth() + + @property + def size(self) -> int: + """Total number of instructions in the executable individual. + In OpenQASM: +1 for each instruction from registry definition till measurements. + """ + return self._qc.size() + + @property + def gateset(self) -> int: + return self._gateset + + @gateset.setter + def gateset(self, gateset: int): + self._gateset = gateset + + def create_solution(self) -> list: + """ + Function for the individual generation (represented as list). The gates are generated column-wise. For the each + column: + + 1. Identify the number of qubits that in can be used for a gate in this column (max_affected_qubits, + min: 1 qubit, max: 3 qubits). The qubits that are already occupied (=> can not be used) are stored in the + excluded_qubits array. + + 2. Using this info, create a gate at random and assign it to the corresponding qubit + and column of the circuit. If the generated gate uses more than one qubit -> assign also the same gate with + its metadata to the other affected qubits in this column. + + Returns: + A quantum circuit as nested `n x m` list (`n` qubits, `m` gates). + + Raises: + ValueError: If specification of the qubits a gate acts on (affected qubits) is not valid. + """ + circuit = [[None for _ in range(self.n_gates_per_qubit)] for _ in range(self.n_qubits)] + for column_id in range(self.n_gates_per_qubit): + for qubit_id in range(self.n_qubits): + if circuit[qubit_id][column_id] is None: + excluded_qubits = exclude_already_handled_qubits_in_column(circuit=circuit, qubit_id=qubit_id, + column_id=column_id) + max_affected_qubits = min(3, self.n_qubits - len(excluded_qubits)) + if max_affected_qubits < 1: + max_affected_qubits = 1 + gate = create_random_gate(n_qubits=self.n_qubits, qubit_id=qubit_id, + gateset=self._gateset, + max_affected_qubits=max_affected_qubits, + excluded_qubits=excluded_qubits) + circuit[qubit_id][column_id] = gate + if len(gate.affected_qubits) > 1: + for q in gate.affected_qubits[1:]: + if q > qubit_id: + g_copy = deepcopy(gate) + g_copy.qubit_id = q + circuit[q][column_id] = g_copy + else: + raise ValueError( + "Id of the helper qubit for multiple qubits gate should greater than qubit_id.") + return circuit + + def transform_to_executable_circuit(self, circuit_solution: list): + """Transforms circuit_solution into executable instance of qiskit.QuantumCircuit. + + Args: + circuit_solution (list): Generated individual as list of `n x m` elements, whereby `n` is the number of + qubits, `m` is the number of gates acting on each qubit. + + Returns: + QuantumCircuit: Instance of QuantumCircuit, is executable in Qiskit. + + Raises: + CircuitError: if the circuit is not valid. + TypeError: If circuit parameters are not valid. + IndexError: If the number of qubits and gates in the circuit are not valid. + """ + qc = QuantumCircuit(self.n_qubits) + running_solution = deepcopy(circuit_solution) # need for saving info about already handled gates + for gate_idx in range(self.n_gates_per_qubit): + for qb_idx in range(self.n_qubits): + gate = circuit_solution[qb_idx][gate_idx] + if running_solution[qb_idx][gate_idx] is not None: + if gate.control_qubits and gate.control_qubits.__len__() > 1: + p = [] + for cq in gate.control_qubits: + p.append(cq) + for tq in gate.target_qubits: + p.append(tq) + getattr(qc, gate.name)(*p) + else: + if not gate.parameters or gate.parameters.__len__() == 0: + getattr(qc, gate.name)(*gate.affected_qubits) + elif gate.parameters.__len__() >= 1: + p = [] + for gp in gate.parameters: + p.append(gp) + for q in gate.affected_qubits: + p.append(q) + getattr(qc, gate.name)(*p) + if gate.affected_qubits and len(gate.affected_qubits) > 1: + for q in gate.affected_qubits: + if q != qb_idx: + running_solution[q][gate_idx] = None + # qc.measure_all() + return qc + + def insert_datapoint_to_solution(self, data_point: list): + """ Inserting data point into circuit via the data gates. + Args: + data_point (list): + """ + data_gates = self.get_data_gates() + for data_gate in data_gates: + qubit_id = data_gate['qubit_id'] + column_id = data_gate['column_id'] + parameters = data_gate['data_ids'] + for idx in range(len(self.solution[qubit_id][column_id].parameters)): + self.solution[qubit_id][column_id].parameters[idx] = data_point[parameters[idx]] + self.qc, _ = self.transform_to_executable_circuit(self._solution) + + def __repr__(self): + # Draw individual in the console and print inforation about the number of qubits, number of gates per qubit and + # the value of the fitness function. + return "Individual: Qubits={}, Gates={}, Fitness={}, Circuit: \n{}".format(self.n_qubits, + self.n_gates_per_qubit, + self.fitness, self.qc.draw()) + + def __str__(self): + return self.__repr__() diff --git a/baselines/evo/mutation.py b/baselines/evo/mutation.py new file mode 100644 index 0000000..c40fa8e --- /dev/null +++ b/baselines/evo/mutation.py @@ -0,0 +1,212 @@ +import math +import random + +from evo import params +from evo.gate import create_random_gate +from evo.individual import Individual + + +def apply_mutations(child: Individual): + """Applies each mutation to an individual according to the probability specified in params. + Each mutation can have a different probability assigned. Probabilities do not need to sum up to 1. + + Args: + child (Individual): The individual on which the mutation should be applied. + """ + if child.n_qubits == 0 or child.n_gates_per_qubit==0: return + if random.random() < params.single_gate_flip_mutation_rate: + single_gate_flip(child) + if random.random() < params.swap_control_qubit_mutation_rate: + swap_control_qubit(child) + if params.constant_n_qubits is None and random.random() < params.mutate_n_qubit_mutation_rate: + mutate_n_qubits(child) + if random.random() < params.mutate_n_gates_mutation_rate: + mutate_n_gates(child) + if random.random() < params.swap_columns_mutation_rate: + swap_columns(child) + if random.random() < params.gate_parameters_mutation_rate: + mutate_gate_parameters(child) + + +def single_gate_flip(child: Individual): + """Randomly selects a gate and replaces it with a random gate. Also replaces gates of affected qubits with random + gates. + + Args: + child (Individual): The individual on which the mutation should be applied. + """ + + random_qubit, random_column = random.randint(0, child.n_qubits - 1), random.randint(0, child.n_gates_per_qubit - 1) + affected_qubits = child.solution[random_qubit][random_column].affected_qubits + + # Determine if it's a gate acting on more than one qubits. If so, also mutate that gate + if len(affected_qubits) > 1: + for i in affected_qubits: + child.solution[i][random_column] = create_random_gate(n_qubits=child.n_qubits, qubit_id=i, + gateset=child.gateset, + max_affected_qubits=True) + else: + child.solution[random_qubit][random_column] = create_random_gate(n_qubits=child.n_qubits, qubit_id=random_qubit, + gateset=child.gateset, + max_affected_qubits=True) + + +def swap_control_qubit(child: Individual, max_loop_iterations=10): + """Randomly searches for a controlled-gate and swaps control and target qubit (if one exists). + + Args: + child (Individual): The individual on which the mutation should be applied. + max_loop_iterations (int): Specifies for how many iterations maximally to search for a controlled-gate + """ + for _ in range(max_loop_iterations): + # Choose random qubit and column + qubit_id, column_id = random.randint(0, child.n_qubits - 1), random.randint(0, child.n_gates_per_qubit - 1) + # Determine if circuit contains a controlled gate + if len(child.solution[qubit_id][column_id].control_qubits) > 0: + # Found a controlled gate, now swap control and target qubits + control_qubits = child.solution[qubit_id][column_id].control_qubits + target_qubits = child.solution[qubit_id][column_id].target_qubits + random_control_qubit = random.choice(range(len(control_qubits))) + + temp = child.solution[qubit_id][column_id].control_qubits[random_control_qubit] + control_qubits[random_control_qubit] = child.solution[qubit_id][column_id].target_qubits[0] + target_qubits[0] = temp + + for idx in control_qubits + target_qubits: + child.solution[idx][column_id].control_qubits = control_qubits + child.solution[idx][column_id].target_qubits = target_qubits + + break + + +def mutate_n_qubits(child: Individual): + """Adjusts the number of qubits in a circuit. + + Args: + child (Individual): The individual on which the mutation should be applied. + """ + current_qubits = child.n_qubits + if current_qubits == 2: + child.n_qubits += random.randint(1, 2) + elif current_qubits == 3: + child.n_qubits += random.choice([-1, 1]) + elif random.random() < 0.5: + child.n_qubits += random.randint(1, 2) + else: + child.n_qubits -= random.randint(1, 2) + + # Adjust circuit + if child.n_qubits < current_qubits: + # Remove qubits + for _ in range(abs(current_qubits - child.n_qubits)): + child.solution.pop() + repair_affected_qubits(child) + else: + # Add new qubits and gates (currently only adds single qubit gates) + for qubit_id in range(current_qubits, child.n_qubits): + child.solution.append([]) + for _ in range(child.n_gates_per_qubit): + child.solution[qubit_id].append( + create_random_gate(n_qubits=child.n_qubits, qubit_id=qubit_id, gateset=child.gateset, + max_affected_qubits=1)) + + +def mutate_n_gates(child: Individual): + """Adjusts the number of gates in a circuit. + + Args: + child (Individual): The individual on which the mutation should be applied. + """ + current_gates = child.n_gates_per_qubit + + if current_gates < 2: + return child + + if current_gates == 2: + child.n_gates_per_qubit += random.randint(1, 2) + elif current_gates == 3: + child.n_gates_per_qubit += random.choice([-1, 1]) + elif random.random() < 0.5: + child.n_gates_per_qubit += random.randint(1, 2) + else: + child.n_gates_per_qubit -= random.randint(1, 2) + + # check max_gates and min_gates constraints + if child.n_gates_per_qubit < params.init_min_gates: + child.n_gates_per_qubit = params.init_min_gates + elif child.n_gates_per_qubit > params.init_max_gates: + child.n_gates_per_qubit = params.init_max_gates + + # Adjust circuit + if child.n_gates_per_qubit < current_gates: + # Remove gates (a column) + for _ in range(abs(current_gates - child.n_gates_per_qubit)): + for qubit_id in range(child.n_qubits): + child.solution[qubit_id].pop() + + else: + # Add new gates + for qubit_id in range(child.n_qubits): + for idx in range(current_gates, child.n_gates_per_qubit): + # Could be adjusted such that not only single gates can be added + child.solution[qubit_id].append( + create_random_gate(n_qubits=child.n_qubits, qubit_id=qubit_id, gateset=child.gateset, + max_affected_qubits=1)) + + +def repair_affected_qubits(child: Individual): + """Used as part of mutation functions in order to adjust and fix solutions that have been corrupted by a mutation. + + Args: + child (Individual): The individual on which the mutation should be applied. + """ + for column_id in range(child.n_gates_per_qubit): + for qubit_id in range(child.n_qubits): + affected_qubits = child.solution[qubit_id][column_id].affected_qubits + for qubit in affected_qubits: + if qubit >= child.n_qubits: + child.solution[qubit_id][column_id] = create_random_gate(n_qubits=child.n_qubits, qubit_id=qubit_id, + gateset=child.gateset, + max_affected_qubits=1) + + +def swap_columns(child: Individual): + """Mutation that exchanges all gates from two randomly chosen columns of a circuit. + + Args: + child (Individual): The individual on which the mutation should be applied. + """ + if child.n_gates_per_qubit < 2: + return child + + # Randomly choose two columns to swap + column_1, column_2 = random.sample(range(child.n_gates_per_qubit), 2) + # print('Column 1: {} Column 2: {}'.format(column_1, column_2)) + for qubit_id in range(child.n_qubits): + temp = child.solution[qubit_id][column_1] + child.solution[qubit_id][column_1] = child.solution[qubit_id][column_2] + child.solution[qubit_id][column_2] = temp + + +def mutate_gate_parameters(child: Individual, max_loop_iterations=10): + """Randomly selects a parameterised gate and adjusts its parameter (if such a gate is found). + + Args: + child (Individual): The individual on which the mutation should be applied. + max_loop_iterations (int): Specifies for how many iterations maximally to search for a controlled-gate + """ + # Determine if circuit contains a parameterised gate + for _ in range(max_loop_iterations): + qubit_id, column_id = random.randint(0, child.n_qubits - 1), random.randint(0, child.n_gates_per_qubit - 1) + if child.solution[qubit_id][column_id].parameters is not None \ + and len(child.solution[qubit_id][column_id].parameters) > 0: + # print('Mutating qubit {} gate {}'.format(qubit_id, column_id)) + for idx in range(len(child.solution[qubit_id][column_id].parameters)): + if params.parameter_mutation == 'uniform': + child.solution[qubit_id][column_id].parameters[idx] = random.uniform(-math.pi, math.pi) + elif params.parameter_mutation == 'gaussian': + mu = child.solution[qubit_id][column_id].parameters[idx] + sigma = 0.25 * math.pi + child.solution[qubit_id][column_id].parameters[idx] = random.gauss(mu, sigma) + + return diff --git a/baselines/evo/population.py b/baselines/evo/population.py new file mode 100644 index 0000000..15ac4d7 --- /dev/null +++ b/baselines/evo/population.py @@ -0,0 +1,62 @@ +import random + +def divide_population_into_islands(initial_population, n_sub_populations): + full_population = [] + sub_population_size = int(len(initial_population) / n_sub_populations) + population_index = 0 + for idx in range(n_sub_populations): + sub_population = [] + for jdx in range(population_index, population_index + sub_population_size): + sub_population.append(initial_population[jdx]) + population_index += sub_population_size + full_population.append(sub_population) + return full_population + + +def migration(population: list, migration_rate): + n_individuals_to_migrate = int(len(population[0]) * migration_rate) + random.shuffle(population) + for idx in range(0, len(population)-1): + swap_individual(population[idx], population[idx+1], n_individuals_to_migrate) + + +def swap_individual(sub_population_a, sub_population_b, n_to_migrate): + random_ind_a, random_ind_b = random.sample([idx for idx in range(len(sub_population_a))], k=n_to_migrate),\ + random.sample([idx for idx in range(len(sub_population_b))], k=n_to_migrate) + temp = [sub_population_a[random_ind_a[idx]] for idx in range(n_to_migrate)] + for idx in range(n_to_migrate): + sub_population_a[random_ind_a[idx]] = sub_population_b[random_ind_b[idx]] + sub_population_b[random_ind_b[idx]] = temp[idx] + + + +def update_population_statistics(population, current_gen, best_fitness_per_gen, avg_fitness_per_gen, avg_age_per_gen, + diversity_per_gen=None): + best_fitness_per_gen.append(population[0].fitness) + avg_fitness_per_gen.append(sum([x.fitness for x in population]) / len(population)) + avg_age = sum([current_gen - x.generated_in_generation for x in population]) / len(population) + avg_age_per_gen.append(avg_age) + if diversity_per_gen is not None: + diversity = calculate_diversity_of_population(population) + diversity_per_gen.append(diversity) + +def calculate_diversity_of_population(population): + distances = [] + for ind in population: + for other_ind in population: + if ind == other_ind: + continue + distances.append(compare_circuits(ind, other_ind)) + return sum(distances) / len(distances) + + +# Placeholder: find better way to compare circuits +def compare_circuits(ind_a, ind_b): + distance = 0 + for row_a, row_b in zip(ind_a.solution, ind_b.solution): + for gate_a, gate_b in zip(row_a, row_b): + if gate_a != gate_b: + distance += 1 + distance += abs(ind_a.n_gates_per_qubit - ind_b.n_gates_per_qubit) + distance += abs(ind_a.n_qubits - ind_b.n_qubits) * ind_a.n_gates_per_qubit if ind_a.n_qubits > ind_b.n_qubits else abs(ind_a.n_qubits - ind_b.n_qubits) * ind_b.n_gates_per_qubit + return distance diff --git a/baselines/evo/run.py b/baselines/evo/run.py new file mode 100644 index 0000000..e638219 --- /dev/null +++ b/baselines/evo/run.py @@ -0,0 +1,63 @@ +import numpy as np +from evo.experiment import Experiment +from evo import params + +from qiskit import QuantumCircuit +from qiskit.quantum_info import Statevector, Operator, random_statevector, random_unitary +from qiskit.circuit.library import CCXGate, HGate +from qiskit.converters import circuit_to_dag + +import random + +def ghz(eta): + target = np.zeros(shape=(2**eta,), dtype=np.complex128) + target[0] = target[-1] = 1/np.sqrt(2) + return target + +def SP(individual, target): + individual.qc.remove_final_measurements() + state = Statevector.from_instruction(individual.qc) + return abs(np.vdot(state, target))**2 # Fidelity + +def UC(individual, target): + matrix = Operator(individual.qc) + norm = np.linalg.norm(target - matrix) + return 1 - 2 * np.arctan(norm)/np.pi + +def fitness(mode, delta, penalize=True, operation=None): + def F(individual, target): + R = eval(mode)(individual, target) + d = individual.qc.depth() + C = (max(0, d - delta/3)) / (delta / 2 * 3) + return R-penalize*C + return F + +def run_evo(config, seed): + random.seed(seed); np.random.seed(seed) + mode, goal, eta, delta = config.split('-') + eta, delta = int(eta[1:]), int(delta[1:]) + params.constant_n_qubits = eta + params.init_max_gates = delta + # params.init_max_gates = eta * delta * 2 + target = { + 'hadamard': Operator(HGate()), + 'toffoli': Operator(CCXGate()), + 'bell': Statevector(ghz(2)), + 'ghz3': Statevector(ghz(3)), + 'random': { + 'SP': random_statevector((2**eta,), seed), + 'UC': random_unitary((2**eta,), seed), + }[mode], + }[goal] + + experiment = Experiment(target=target, fitness=fitness(mode,delta), metrics={ + 'Return': lambda i: fitness(mode, delta, False)(i, target), + 'Metric': lambda i: fitness(mode, delta, True)(i, target), + 'Cost': lambda i: (max(0, i.qc.depth() - delta/3)) / (delta / 2 * 3), + 'Depth': lambda i: i.qc.depth(), + 'Qubits': lambda i: eta - len(list(circuit_to_dag(i.qc).idle_wires())) + }) + + stats, population = experiment.run_experiments() + mean = {k: np.mean(v) for k,v in stats.items()} + return mean diff --git a/baselines/evo/selection.py b/baselines/evo/selection.py new file mode 100644 index 0000000..30f1fc6 --- /dev/null +++ b/baselines/evo/selection.py @@ -0,0 +1,73 @@ +import random + + +def tournament_selection(population: list, n_to_select: int, tournament_size: int, **kw_args): + """Selects n individuals from the population according to the tournament selection approach used in genetic + algorithms. + + Args: + population (list): Contains all individuals of a population. + n_to_select (int): The number of individuals to select from the population. + tournament_size (int): Size of the subset used in a tournament. + """ + winners = [perform_tournament(population, tournament_size, **kw_args) for _ in range(n_to_select)] + winners.sort(key=lambda x: x.calculate_fitness(**kw_args), reverse=True) + return winners + + +def perform_tournament(population: list, tournament_size: int, **kwargs): + """Randomly selects a subset from a population and returns the best individual. + + Args: + population (list): Contains all individuals of a population. + tournament_size (int): Number of individuals in the subset (tournament) + """ + subset = random.sample(population, k=tournament_size) + subset.sort(key=lambda x: x.calculate_fitness(**kwargs), reverse=True) + return subset[0] + + +def roulette_wheel_selection(population: list, size: int, **kw_args): + """Returns a population according to the roulette wheel selection + approach used in genetic algorithms. + + Args: + population (list): Contains all individuals of a population. + size (int): Size of the returned population + """ + new_population = [] + sum_fitness = sum([ind.calculate_fitness(**kw_args) for ind in population]) + while len(new_population) < size: + for ind in population: + selection_prob = ind.calculate_fitness(**kw_args) / sum_fitness + if random.random() < selection_prob: + new_population.append(ind) + if len(new_population) == size: + break + return new_population + + +def rejuvenate_population(population: list, old_population: list, children: list, n_youngest: list, **kw_args): + """Keeps the n_youngest individuals in the generation. + + Args: + population (list): Contains all individuals of the current population. + old_population (list): Contains all individuals of the last population. + children (list): Contains all individuals that are children of the current population. + n_youngest (int): Number of youngest individuals that should be kept + """ + youngest = population[:-1-n_youngest] + old_population + children + ids = [] + + # remove duplicates + for i in list(youngest): + if i.id in ids: + youngest.remove(i) + else: + ids.append(i.id) + + youngest.sort(key=lambda x: x.generated_in_generation, reverse=True) + np = population[:len(population) - n_youngest] + youngest[:n_youngest] + np.sort(key=lambda x: x.calculate_fitness(**kw_args), reverse=True) + return np + diff --git a/circuit_designer/__init__.py b/circuit_designer/__init__.py deleted file mode 100644 index 976a98e..0000000 --- a/circuit_designer/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from gymnasium.envs.registration import register - -def register_envs(): - register(id="CircuitDesigner-v0", entry_point="circuit_designer.env:CircuitDesigner") \ No newline at end of file diff --git a/circuit_designer/env/__init__.py b/circuit_designer/env/__init__.py deleted file mode 100644 index 764a4b6..0000000 --- a/circuit_designer/env/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from circuit_designer.env.environment import CircuitDesigner diff --git a/circuit_designer/env/environment.py b/circuit_designer/env/environment.py deleted file mode 100644 index b4998bb..0000000 --- a/circuit_designer/env/environment.py +++ /dev/null @@ -1,139 +0,0 @@ -import gymnasium as gym; import pennylane as qml -import numpy as np; import re - -# disable warnings -import warnings -warnings.simplefilter(action='ignore', category=np.ComplexWarning) - -from .rewards import Reward - -# Resolution of the parameter disrectization -GATES = 3 - -class CircuitDesigner(gym.Env): - """ Quantum Circuit Environment: build a quantum circuit gate-by-gate for a desired challenge. - - Attributes - qubits [int]: number of available qubits for quantum circuit - depth [int]: maximum depth desired for quantum circuit - challenge [str]: RL challenge for which the circuit is to be built (see Reward class) - punish [bool]: specifies whether depth of circuit should be punished - device [qml.device]: quantum device to use (see PennyLane) - - action_space [gymnasium.spaces]: action space consisting of (gate: Box (int), target qubit: Box (int), params: Box (float)) - observation_space [gymnasium.spaces]: complex observation of state in the computational basis as a Box with [real, imag] - - Methods - reset(): resets the circuit to initial state of |0>^n with empty list of operations - step(action): updates environment for given action, returning observation and reward after that action - """ - - metadata = {"render_modes": ["image","text"], "render_fps": 30} - - def __init__(self, max_qubits: int, max_depth: int, challenge: str, punish=True, seed=None, render_mode=None, verbose=False): - super().__init__() - if seed is not None: self._np_random, seed = gym.utils.seeding.np_random(seed) - self.render_mode = render_mode; self.verbose = verbose; self.name = f"{challenge}|{max_qubits}-{max_depth}" - - # define parameters - self.qubits = max_qubits # the (maximal) number of available qubits - self.depth = max_depth # the (maximal) available circuit depth - self.max_steps = max_depth * max_qubits * 2 - self.challenge = challenge # challenge for reward computation - task = re.split("-", self.challenge)[0] - if task not in Reward.challenges: - raise ValueError(f'desired challenge {task} is not defined in this class.' - f'See attribute "challenges" for a list of available challenges') - - if 'toffoli' in challenge: assert self.qubits >= 3, "to build Toffoli gate you need at least three wires/qubits." - if 'ghz' in challenge: - n = int(challenge[-1:]) - assert n >= 2, "GHZ entangled state must have at least 2 qubits. " \ - "\n For N=2: GHZ state is equal to Bell state." - assert n <= self.qubits, "Target GHZ state cannot consist of more qubits " \ - "than are available within the circuit environment." - - # initialize quantum device to use for QNode (add one ancilla) - self.device = qml.device('lightning.qubit', wires=self.qubits) # default.qubit - - # Action space: Gate, Wire, Control, and Theta - m = 1e-5 # prevent gate overflow at bounds due to floor operator - self.action_space = gym.spaces.Box(np.array([0,0,0,-np.pi]), np.array([GATES-m,self.qubits-m,self.qubits-m,np.pi])) - - # define observation space - self.observation_space = gym.spaces.Box(low=-1.0, high=+1.0, shape=(2*2**max_qubits,)) #, type=np.float64 - - # initialize reward class - self.reward = Reward(self.qubits, self.depth) - self.punish = punish - - def _action_to_operation(self, action): - """ Action Converter translating values from action_space into quantum operations """ - gate, wire, cntrl, theta = action - gate, wire, cntrl = np.floor([gate, wire, cntrl]).astype(int) - # print(f"Applying {['NOOP', 'CRZ', 'CRX'][gate]}({theta/np.pi}π) to {gate}•{cntrl}") - assert wire in range(self.qubits) and cntrl in range(self.qubits), f"{action}" - - if wire in self._disabled: return None # check if wire is already disabled - if gate == 0: self._disabled.append(wire); return int(wire) # Meassurement - if wire == cntrl and gate == 1: return qml.PhaseShift(theta,wire) # PhaseShift - if wire == cntrl and gate == 2: return qml.RX(theta,wire) # RX - if cntrl in self._disabled: return None # check if control qubit already disabled - if gate == 1: return qml.ControlledPhaseShift(theta, [cntrl, wire]) # ControlledPhaseShift - if gate == 2: return qml.CNOT([cntrl, wire]) # CNOT - assert False, 'Unhandled Action on gate ' + gate - - def _build_circuit(self): - """ Quantum Circuit Function taking a list of quantum operations and returning state information """ - for op in self._operations: - if op is None: pass - elif type(op) == int: qml.measure(op); - else: qml.apply(op); - return qml.state() - - def _draw_circuit(self) -> np.ndarray: - """ Drawing given circuit using matplotlib.""" - circuit = qml.QNode(self._build_circuit, self.device) - return qml.draw(circuit)() - - def _get_state(self) -> tuple[np.ndarray,qml.QNode]: - """ Calculate zero-state information """ - node = qml.QNode(self._build_circuit, self.device) - state = node()#[int(2**(self.qubits+1)/2):] - observation = np.concatenate([state.real, state.imag]).astype(np.float32) - return observation, node #information - - def reset(self, seed=None, options=None): - super().reset(seed=seed) # Set seed for random number generator - self._operations = [] # Reset trajectory of operations - self._disabled = [] # Rest list of measured qubits - state, node = self._get_state() # Calculate state get node - return state, qml.specs(node)() - - def step(self, action): - terminated = action[0] == 3 or len(self._disabled) >= self.qubits - if not terminated: # conduct action & update action trajectory - operation = self._action_to_operation(action) - self._operations.append(operation) - - state, node = self._get_state(); info = qml.specs(node)() - terminated = action[0] == 3 or len(self._disabled) >= self.qubits - truncated = info["resources"].depth >= self.depth or len(self._operations) >= self.max_steps - - if self.verbose: - if isinstance(operation,int): print(f"Measure {operation}") - else: print(str(operation)+"\n"+self.render() if self.render_mode is not None else '') - print("Terminate\n" if terminated else "\n") - - if terminated: info['termination_reason'] = 'DONE' - if truncated: info['termination_reason'] = 'DEPTH' - - # sparse reward computation - if not terminated and not truncated: reward = 0 - else: reward = self.reward.compute_reward(node, self.challenge, self.punish) - - return state, reward, terminated, truncated, info - - def render(self): - if self.render_mode is None: return None - if self.render_mode == 'text': return self._draw_circuit() diff --git a/circuit_designer/env/rewards.py b/circuit_designer/env/rewards.py deleted file mode 100644 index f55d1de..0000000 --- a/circuit_designer/env/rewards.py +++ /dev/null @@ -1,79 +0,0 @@ -import numpy as np -import re -import pennylane as qml -from scipy.stats import unitary_group - - -class Reward: - """ Reward class for CircuitDesigner environment: - computes the reward for all available challenges. - - Attributes - qubits [int]: number of available qubits for quantum circuit - depth [int]: maximum depth desired for quantum circuit """ - - # list of available challenges: - challenges = ['SP', 'UC'] - states = ['random', 'bell', 'ghzN #N:number of qubits'] - unitaries = ['random', 'hadamard', 'toffoli'] - - def __init__(self, max_qubit, max_depth): - self.depth = max_depth - self.qubits = max_qubit - - # draw random unitary matrix - self.random_op = unitary_group.rvs(2**self.qubits) - # draw random Haar state - self.random_state = np.random.normal(size=(2**self.qubits,)) + 1.j * np.random.normal(size=(2**self.qubits,)) - self.random_state /= np.linalg.norm(self.random_state) - - def compute_reward(self, circuit, challenge, punish): - """ Wrapper function mapping challenge to corresponding reward function. """ - task, param = re.split("-", challenge) - if task == 'SP': reward = self._state_preparation(circuit, param) # StatePreparation - elif task == 'UC': reward = self._unitary_composition(circuit, param) # Unitary Composition - if punish: reward -= (max(0,qml.specs(circuit)()["resources"].depth - self.depth/3)) / (self.depth / 2 * 3) # 1/3 deph overhead to solution - return reward - - # REWARD FUNCTIONS: - def _state_preparation(self, circuit, param): - """ Compute Reward for State Preparation (SP) task - = fidelity of the state produced by circuit compared to a given target state defined by param. """ - # compute output state of designed circuit - state = np.array(circuit()) - # state = state[:int(state.shape[0]/2)] - # define target state based on param-string - if param == 'random': target = self.random_state - elif param == 'bell': target = np.array([1/np.sqrt(2), 0, 0, 1/np.sqrt(2)], dtype=np.complex128) - elif param[:3] == 'ghz': # n-qubit GHZ State - n = int(param[-1:]) - target = np.zeros(shape=(2**n,), dtype=np.complex128) - target[0] = target[-1] = 1/np.sqrt(2) - else: raise ValueError(f'desired target state {param} is not defined in this reward function.' - f'See attribute "states" for a list of available states.') - - # compute fidelity between target and output state within [0,1] - fidelity = abs(np.vdot(state, target))**2 - return fidelity - - def _unitary_composition(self, circuit, param): - """ Compute Reward for Unitary Composition (UC) task - = 1 - 2* arctan(norm(U_composed - U_target)) / pi with U_target defined by param. """ - # compute matrix representation of designed circuit - if qml.specs(circuit)()["resources"].num_gates == 0: return 0 - order = list(range(self.qubits)) - matrix = qml.matrix(circuit, wire_order=order)().astype(np.complex128) - # compute Frobenius norm of difference between target and output matrix - if param == 'random': target = self.random_op - elif param == 'hadamard': target = qml.matrix(qml.Hadamard(0), wire_order=order) - elif param == 'toffoli': target = qml.matrix(qml.Toffoli([0, 1, 2]), wire_order=order) - else: raise ValueError(f'desired target unitary {param} is not defined in this reward function.' - f'See attribute "unitaries" for a list of available operations.') - norm = np.linalg.norm(target - matrix) - return 1 - 2*np.arctan(norm)/np.pi - - @staticmethod - def _state_transform(state): - n = int(np.log2(state.shape[0])) - qml.QubitStateVector(state, wires=range(n)) - return qml.state() diff --git a/circuit_designer/test/__main__.py b/circuit_designer/test/__main__.py deleted file mode 100644 index b4be5e2..0000000 --- a/circuit_designer/test/__main__.py +++ /dev/null @@ -1,12 +0,0 @@ -from circuit_designer.test.bell import bell -from circuit_designer.test.ghz import ghz -from circuit_designer.test.hadamard import hadamard -from circuit_designer.test.toffoli import toffoli - -# State Preparation -bell() -ghz() - -# Unitary Composition -hadamard() -toffoli() \ No newline at end of file diff --git a/circuit_designer/test/ghz.py b/circuit_designer/test/ghz.py deleted file mode 100644 index 5302027..0000000 --- a/circuit_designer/test/ghz.py +++ /dev/null @@ -1,20 +0,0 @@ -import gymnasium as gym; import numpy as np - -def ghz(): - env = gym.make("CircuitDesigner-v0", max_qubits=3, max_depth=15, challenge='SP-ghz3') - env.reset() - - # H - env.step([1,0,0,np.pi/2]); env.step([2,0,0,np.pi/2]); env.step([1,0,0,np.pi/2]) - - # CX - env.step([2,1,0,np.pi]); env.step([2,2,1,np.pi]) - - # M - env.step([0,0,0,0]); env.step([0,1,0,0]) - reward = env.step([0,2,0,0])[1] - - np.testing.assert_almost_equal(reward, 1) - print("Succeeded GHZ test") - - diff --git a/circuit_designer/test/hadamard.py b/circuit_designer/test/hadamard.py deleted file mode 100644 index 62ece8f..0000000 --- a/circuit_designer/test/hadamard.py +++ /dev/null @@ -1,23 +0,0 @@ -import gymnasium as gym; import numpy as np - -def hadamard(): - # Test 1-qubit H - env = gym.make("CircuitDesigner-v0", max_qubits=1, max_depth=9, challenge='UC-hadamard') - env.reset() - env.step([1,0,0,3]) - env.step([2,0,0,3]) - env.step([1,0,0,3]) - reward = env.step([0,0,0,0])[1] # Meassure - # assert reward == 1 - - # Test 2-qubit H - env = gym.make("CircuitDesigner-v0", max_qubits=2, max_depth=9, challenge='UC-hadamard') - - env.reset() - env.step([1,0,0,np.pi/2]) - env.step([2,0,0,np.pi/2]) - env.step([1,0,0,np.pi/2]) - reward = env.step([0,0,0,0])[1] # Meassure - reward = env.step([0,1,0,0])[1] # Meassure - np.testing.assert_almost_equal(reward, 1) - print("Succeeded Hadamard test") diff --git a/circuit_designer/test/toffoli.py b/circuit_designer/test/toffoli.py deleted file mode 100644 index fe271fd..0000000 --- a/circuit_designer/test/toffoli.py +++ /dev/null @@ -1,33 +0,0 @@ -import gymnasium as gym; import numpy as np - -def toffoli(): - env = gym.make("CircuitDesigner-v0", max_qubits=3, max_depth=63, challenge='UC-toffoli') - env.reset() - - # V - env.step([1,2,2,np.pi/2]); env.step([2,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]) - env.step([1,2,1,np.pi/2]) - env.step([1,2,2,np.pi/2]); env.step([2,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]) - - # Cnot - env.step([2,1,0,np.pi/2]) - - # V- - env.step([1,2,2,np.pi/2]); env.step([2,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]) - env.step([1,2,1,-np.pi/2]) - env.step([1,2,2,np.pi/2]); env.step([2,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]) - - # Cnot - env.step([2,1,0,np.pi/2]) - - env.step([1,2,2,np.pi/2]); env.step([2,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]) - env.step([1,2,0,np.pi/2]) - env.step([1,2,2,np.pi/2]); env.step([2,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]) - # CZ pi/2 - - # Meassure - env.step([0,0,0,0]) - env.step([0,1,0,0]) - reward = env.step([0,2,0,0])[1] - np.testing.assert_almost_equal(reward, 1) - print("Succeeded Toffoli test") diff --git a/circuit_designer/wrappers/__init__.py b/circuit_designer/wrappers/__init__.py deleted file mode 100644 index 132b50d..0000000 --- a/circuit_designer/wrappers/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from circuit_designer.wrappers.monitor import Monitor \ No newline at end of file diff --git a/plot/__main__.py b/plot/__main__.py index ca2d2f7..55886b4 100644 --- a/plot/__main__.py +++ b/plot/__main__.py @@ -7,22 +7,24 @@ 'Return': (('Return', 'rewards/return-100-mean'), process_ci, plot_ci), 'Qubits': (('Qubits', 'rewards/qbits-100-mean'), process_ci, plot_ci), 'Depth': (('Depth', 'rewards/depth-100-mean'), process_ci, plot_ci), + 'Metric': (('Metric', 'rewards/metric-100-mean'), process_ci, plot_ci), + 'Cost': (('Cost', 'rewards/cost-100-mean'), process_ci, plot_ci), } # Process commandline arguments parser = argparse.ArgumentParser() parser.add_argument('base', help='The results root') parser.add_argument('-a', dest='alg', nargs='+', help='The algorithm to vizualise') #choices=[*ALGS] -parser.add_argument('-r', dest='random_baseline', action='store_true') +parser.add_argument('-b', dest='baseline', action='store_true') parser.add_argument('-e', dest='env', help='Environment to vizualise.') parser.add_argument('-g', dest='groupby', nargs='+', default=['env'], metavar="groupby", help='Experiment keys to group plotted data by.') -parser.add_argument('-m', dest='metrics', nargs='+', default=['Return', 'Qubits', 'Depth'], choices=options.keys(), help='Experiment keys to group plotted data by.') +parser.add_argument('-m', dest='metrics', nargs='+', default=['Qubits', 'Depth', 'Metric'], choices=options.keys(), help='Experiment keys to group plotted data by.') parser.add_argument('--mergeon', help='Key to merge experiments e.g. algorithm.') parser.add_argument('--no-dump', dest='dump_csv', action='store_false', help='Skip csv dump') args = vars(parser.parse_args()); tryint = lambda s: int(s) if s.isdigit() else s if args['alg']: args['alg'] = ' '.join(args['alg']) -groupby = args.pop('groupby'); mergeon = args.pop('mergeon'); random_baseline = args.pop('random_baseline') +groupby = args.pop('groupby'); mergeon = args.pop('mergeon'); baseline = args.pop('baseline') mergemetrics,_ = (True, groupby.remove('metrics')) if 'metrics' in groupby else (False,None) metrics = [(metric, *options[metric]) for metric in args.pop('metrics')] @@ -30,7 +32,10 @@ experiments = fetch_experiments(**args, metrics=list(zip(titles, scalars))) experiments = group_experiments(experiments, groupby, mergeon) -if random_baseline: [fetch_random(args['base'], exp) for exp in experiments] +if baseline: + [fetch_evo(args['base'], exp, dump=args['dump_csv']) for exp in experiments] + [fetch_random(args['base'], exp, dump=args['dump_csv']) for exp in experiments] + experiments = calculate_metrics(experiments, list(zip(titles, procs))) if mergemetrics: experiments = [ {'title': t, 'metric': metrics[0][0], 'merge': True, diff --git a/plot/metrics.py b/plot/metrics.py index 97667c7..3cd3c73 100644 --- a/plot/metrics.py +++ b/plot/metrics.py @@ -1,19 +1,13 @@ import os; from os import path; import itertools; from tqdm import tqdm from tensorboard.backend.event_processing.event_accumulator import EventAccumulator as EA import pandas as pd; import numpy as np; import scipy.stats as st; import re -import gymnasium as gym; from algorithm.factory import named -from circuit_designer.wrappers import Monitor +import gymnasium as gym; from algorithm.algorithm import named +from qcd_gym.wrappers import Monitor from baselines import * -# TODO: acc pretrain scores def extract_model(exp, run): return None - if '-' in exp['algorithm']: explorer,exp['algorithm'] = exp['algorithm'].split('-') - algorithm, seed = eval(exp['algorithm']), int(run.name) - # TODO: load explorer if not in ['Random', 'LOAD'] - model = algorithm.load(load=run.path, seed=seed, envs=[exp['env']], path=None, device='cpu') - return model def fetch_experiments(base='./results', alg=None, env=None, metrics=[], dump_csv=False, baseline=None, random_baseline=True): """Loads and structures all tb log files Given: @@ -21,7 +15,7 @@ def fetch_experiments(base='./results', alg=None, env=None, metrics=[], dump_csv :param env (optional): the environment to load :param alg (optional): the algorithm to load :param metrics: list of (Name, Tag) tuples of metrics to load - :param save_csv: save loaded experiments to csv + :param dump_csv: save loaded experiments to csv Returns: list with dicts of experiments """ # Helper to fetch all relevant folders @@ -33,16 +27,12 @@ def fetch_experiments(base='./results', alg=None, env=None, metrics=[], dump_csv print(f"Scanning for {alg if alg else 'algorithms'} in {base}") # Second layer: Algorithms if alg: experiments = [{**exp, 'algorithm': alg, 'path': a} for exp in tqdm(experiments) for a in subdirs(exp['path']) if alg == a.name] - else: experiments = [{**exp, 'algorithm': a.name, 'path': a} for exp in tqdm(experiments) for a in subdirs(exp['path']) if any([n in ALGS for n in a.name.split('-')])] - - # Split explorer: - # experiments = [{**e, 'algorithm': e['algorithm'].split('-')[-1], 'explorer': e['algorithm'].split('-')[0] if len(e['algorithm'].split('-'))>1 else 'Random'} for e in tqdm(experiments)] + else: experiments = [{**exp, 'algorithm': a.name, 'path': a} for exp in tqdm(experiments) for a in subdirs(exp['path']) if any([n in [*ALGS] for n in a.name.split('-')])] # Third Layer: Count Runs / fetch tb files print(f"Scanning for hyperparameters in {base}") # Third layer: Hyperparameters & number of runs experiments = [{ **e, 'runs': len(subdirs(e['path'])) } for e in tqdm(experiments) if os.path.isdir(e['path'])] - # experiments = [{ **exp, 'path': e.path, 'method': e.name, 'runs': len(subdirs(e)) } for exp in tqdm(experiments) if os.path.isdir(exp['path']) for e in subdirs(exp['path'])] # With hp progressbar = tqdm(total=sum([exp['runs'] for exp in experiments])* len(metrics)) data_buffer = {} @@ -54,9 +44,15 @@ def fetch_data(exp, run_path, name, key): # Use buffered Event Accumulator if already open if log := data_buffer.get(run_path): extract_args = {'columns': ['Time', 'Step', 'Data'], 'index': 'Step', 'exclude': ['Time']} - # print(log.scalars.Keys()) - data = pd.DataFrame.from_records([(s.wall_time, s.step, s.value) for s in log.Scalars(key)], **extract_args) - # data = pd.DataFrame.from_records(log.Scalars(key), **extract_args) + if key == 'auto': + d = int(exp['env'].split('-d')[1]) + data = fetch_data(exp, run_path, 'Return', 'rewards/return-100-mean') + depth = fetch_data(exp, run_path, 'Depth', 'rewards/depth-100-mean') + pen = depth['Data'].apply(lambda D: max(0,D-d/3)/(d/2*3)) + if name == 'Metric': data['Data'] += pen + if name == 'Cost': data['Data'] = pen + else: + data = pd.DataFrame.from_records([(s.wall_time, s.step, s.value) for s in log.Scalars(key)], **extract_args) data = data.loc[~data.index.duplicated(keep='first')] # Remove duplicate indexes if dump_csv: data.to_csv(f'{run_path}/{name}.csv') return data @@ -83,7 +79,6 @@ def extract_data(exp, run, name, key): return experiments -# def group_experiments(experiments, groupby=['algorithm', 'env'], mergeon=None): #merge=None def group_experiments(experiments, groupby=['env'], mergeon=None): #merge=None # Graphical helpers for titles, labels forms = ['algorithm', 'env'] @@ -91,7 +86,6 @@ def label(exp): i = {key: re.sub(r'[0-9]+ ', '', exp[key]) for key in forms if key in exp and key not in groupby} check = lambda keys,base,op=all: op([k in base for k in keys]) return f"{i['algorithm']}-{i['explorer']}" if 'explorer' in i else i['algorithm'] - # return f"{i['algorithm'] if check(['algorithm', 'method'],i) and check(['Full','RAD'],i['method'], any) else ''} {'FO' if check(['Full'],i['method']) else i['method']}" title = lambda exp: ' '.join([exp[key] for key in forms if key in exp and key in groupby]) @@ -118,9 +112,6 @@ def calculate_metrics(plots, metrics): """ def process(metric, proc, plot): graphs = [ { **graph, 'data': proc(graph['data'][metric], graph['models']) } for graph in plot['graphs']] - if metric == 'Heatmap': - return [ { 'title': f"{plot['title']} | {graph['label']} | {key} ", 'data': data, 'metric': metric} - for graph in graphs for key, data in graph['data'].items() ] return [{ **plot, 'graphs': graphs, 'metric': metric}] return [ result for metric in metrics for plot in plots for result in process(*metric, plot)] @@ -131,7 +122,7 @@ def process_ci(data, models): reward_range = (0,1) # Prepare Data (fill until highest index) steps = [d.index[-1] for d in data]; maxsteps = np.max(steps) - for d in data: d.at[maxsteps, 'Data'] = float(d.tail(1)['Data']) + for d in data: d.at[maxsteps, 'Data'] = float(d.tail(1)['Data'].iloc[0]) data = pd.concat(data, axis=1, ignore_index=False, sort=True).bfill() # Mean 1..n | CI 1..n..1 @@ -142,15 +133,37 @@ def process_ci(data, models): def process_steps(data, models): return ([d.index[-1] for d in data], 10e5) -iterate = lambda model, envs, func: [ func(env, k,i) for env in envs for k,i in model.heatmap_iterations.items() ] -heatmap = lambda model, envs: iterate(model, envs, lambda env, k,i: env.envs[0].iterate(i[0])) +def fetch_evo(base, experiment, EPS=100, dump=False, load=True): + out = f"{base}/{experiment['title']}/GA" + if not os.path.exists(out) or not load: + print(f"Running Evo Baseline for {experiment['title']}") + data = {m: [] for m in experiment['graphs'][0]['data'].keys()} + steps = [i for g in experiment['graphs'] for i in g['data'][list(data.keys())[0]][0].index]; + index = [np.min(steps), np.max(steps)] + for s in range(8): + [d.append([]) for d in data.values()] + info = run_evo(experiment['title'], s+1) + for key,val in data.items(): + val[-1] = pd.DataFrame([info[key]]*2, index=index, columns=['Data']).rename_axis(index='Step') + + if dump: + os.makedirs(f"{out}", exist_ok=True) + [pd.concat(metric, axis=1).to_csv(f"{out}/{key}.csv") for key, metric in data.items()] + + else: data = { m: [ + c.to_frame('Data') for _,c in pd.read_csv(f"{out}/{m}.csv").set_index('Step').items() + ] for m in experiment['graphs'][0]['data'].keys()} + + experiment['graphs'].append({'label': 'GA', 'models': [None], 'data': data}) + -def fetch_random(base, experiment, EPS=100): - if not os.path.exists(f"{base}/{experiment['title']}/Random"): +def fetch_random(base, experiment, EPS=100, dump=False, load=True): + out = f"{base}/{experiment['title']}/Random" + if not os.path.exists(out) or not load: print(f"Running Random Baseline for {experiment['title']}") - M = {'Return': 'r', 'Depth': 'd', 'Qubits': 'q'} - env = Monitor(gym.make(**named(experiment['title']), seed=42, discrete=False)) + M = {'Return': 'r', 'Depth': 'd', 'Qubits': 'q', 'Metric': 'm', 'Cost': 'c'} + env = Monitor(gym.make(**named(experiment['title']), seed=42)) #, discrete=False data = {m: [] for m in experiment['graphs'][0]['data'].keys()} steps = [i for g in experiment['graphs'] for i in g['data'][list(data.keys())[0]][0].index]; index = [np.min(steps), np.max(steps)] @@ -160,6 +173,14 @@ def fetch_random(base, experiment, EPS=100): env.reset(); terminated = False; truncated = False while not (terminated or truncated): _, _, terminated, truncated, info = env.step(env.action_space.sample()) [val[-1].append(info['episode'][M[key]]) for key,val in data.items()] - for val in data.values(): val[-1] = pd.DataFrame([sum(val[-1])/EPS]*2, index=index, columns=['Data']) - experiment['graphs'].append({'label': 'Random', 'models': [None], 'data': data}) - else: assert False, "TODO: load random baseline" + for val in data.values(): val[-1] = pd.DataFrame([sum(val[-1])/EPS]*2, index=index, columns=['Data']).rename_axis(index='Step') + if dump: + os.makedirs(f"{out}", exist_ok=True) + [pd.concat(metric, axis=1).to_csv(f"{out}/{key}.csv") for key, metric in data.items()] + + else: data = { m: [ + c.to_frame('Data') for _,c in pd.read_csv(f"{out}/{m}.csv").set_index('Step').items() + ] for m in experiment['graphs'][0]['data'].keys()} + + experiment['graphs'].append({'label': 'Random', 'models': [None], 'data': data}) + \ No newline at end of file diff --git a/plot/plotting.py b/plot/plotting.py index e8d1dcb..481ec87 100644 --- a/plot/plotting.py +++ b/plot/plotting.py @@ -15,36 +15,30 @@ def plot_ci(plot): dash = lambda g: {'dash': 'dash'} if 'Random' in g['label'] else {} getmean = lambda g: scatter(g['data'][0], name=g['label'], mode='lines', line={'color': color(g), **smooth(g), **dash(g)}) getconf = lambda g: scatter(g['data'][1], fillcolor=color(g, 1), fill='toself', line={'color': 'rgba(255,255,255,0)', **smooth(g)}, showlegend=False) - # threshold = [go.Scatter(y=[plot['graphs'][0]['data'][2][1]]*2, x=[0,max([g['data'][0].tail(1).index[0] for g in plot['graphs']])], - # name='Solved', mode='lines', line={'dash':'dot', 'color':'rgb(64, 64, 64)'})] #Threshold data = [getconf(g) for g in plot['graphs']] + [getmean(g) for g in plot['graphs']] #+ threshold - # if not plot['graphs'][0]['models'][0].continue_training: data += threshold #TODO: check for any graph/model - figure = go.Figure(layout=layout( y=f'Mean {plot["metric"]}', x='Steps', legend=True, inset=len(data)<18), data=data) + metric = ('Fidelity' if 'SP' in plot['title'] else 'Similarity') if plot['metric'] == "Metric" else plot['metric'] + figure = go.Figure(layout=layout( y=f'Mean {metric}', x='Steps', legend=False, inset=False), data=data) xmax = int(math.floor(max([g['data'][0].index[-1] for g in plot['graphs']])/10))*10 ymax = 100; dtick = 1 if plot['metric'] == "Return": ymax = 1 ; dtick = 0.1 + if plot['metric'] == "Metric": ymax = 1 ; dtick = 0.1 + if plot['metric'] == "Cost": ymax = 1 ; dtick = 0.1 if plot['metric'] == "Qubits": ymax = qb(plot['title']) if plot['metric'] == "Depth": ymax = dp(plot['title']) if ymax > 8: dtick = 2 if ymax > 16: dtick = 8 figure.update_yaxes(range=[0, ymax], tickmode = 'linear', dtick=dtick) figure.update_xaxes(range=[2048*4*4, 128*(2048*4)]) - - # if plot['graphs'][0]['models'][0].continue_training: figure.update_xaxes(range=[0, xmax], tickmode = 'linear', dtick = 50000) return {' '.join(title(plot)): figure} -def get_heatmap(compress=False, deterministic=False,flat=True): - # def plot_heatmap(plot): return {f'Heatmaps/{plot["title"]}': - # heatmap_3D(plot['data'], compress=compress, deterministic=deterministic, flat=flat)} - # return plot_heatmap - raise(NotImplementedError) - def color(graph, dim=0): # TODO grey for random? - if 'Random' in graph['label']: return 'hsva(0,0%,{}%,{:.2f})'.format(20+dim*60, 1.0-dim*0.8) + if any(s in graph['label'] for s in ['Random', 'GA']): + return 'hsva(0,0%,{}%,{:.2f})'.format(20+dim*60, 1.0-dim*0.8) + # if 'Random' in graph['label']: return 'hsva(0,0%,{}%,{:.2f})'.format(20+dim*60, 1.0-dim*0.8) hue = { 'A2C': 40, # Orange - # '': 70, # Yellow + '': 70, # Yellow 'PPO': 200, # Light Blue # '': 230, # Blue 'SAC': 150, # Green @@ -55,10 +49,13 @@ def color(graph, dim=0): def layout(title=None, legend=True, wide=False, x='', y='', inset=False): d,m,l = 'rgb(64, 64, 64)', 'rgba(64, 64, 64, 0.32)', 'rgba(64,64,64,0.04)' axis = lambda title: {'gridcolor': m, 'linecolor': d, 'title': title, 'mirror':True, 'ticks':'outside', 'showline':True, 'zeroline': True, 'zerolinecolor': m} + lgd = {'bgcolor':l,'bordercolor':d,'borderwidth':1} + if inset: lgd = {**lgd, 'yanchor':'top', 'y':0.935, 'xanchor':'left', 'x':0.01} + else: lgd = {**lgd, 'yanchor':'bottom', 'y':1, 'orientation': 'h'} + width = 700 #600+200*wide+100*legend - return go.Layout( title=title, showlegend=legend, font=dict(size=20), - legend={'yanchor':'top', 'y':0.935, 'xanchor':'left', 'x':0.01,'bgcolor':l,'bordercolor':d,'borderwidth':1} if inset else {}, - margin=dict(l=8, r=8, t=8+(72 * (title is not None)), b=8), width=600+200*wide+100*legend, height=400, + return go.Layout( title=title, showlegend=legend, font=dict(size=20), legend=lgd, + margin=dict(l=8, r=8, t=8+(72 * (title is not None)), b=8), width=width, height=400, xaxis=axis(x), yaxis=axis(y), plot_bgcolor=l) #, paper_bgcolor='rgba(0,0,0,0)', def generate_figures(plots, generator): return { k:v for p in plots for k,v in generator[p['metric']](p).items()} diff --git a/qcd_gym/__init__.py b/qcd_gym/__init__.py new file mode 100644 index 0000000..dadd648 --- /dev/null +++ b/qcd_gym/__init__.py @@ -0,0 +1,5 @@ +from gymnasium.envs.registration import register, registry + +def register_envs(): + if "CircuitDesigner-v0" in registry: return + register(id="CircuitDesigner-v0", entry_point="qcd_gym.env:CircuitDesigner") \ No newline at end of file diff --git a/qcd_gym/env.py b/qcd_gym/env.py new file mode 100644 index 0000000..e8848fa --- /dev/null +++ b/qcd_gym/env.py @@ -0,0 +1,141 @@ +import numpy as np; import gymnasium as gym + +from qiskit import QuantumCircuit +from qiskit.circuit.library import PhaseGate, RXGate, CPhaseGate, CXGate, CCXGate, HGate +from qiskit.quantum_info import Statevector, Operator, random_statevector, random_unitary +from qiskit.converters import circuit_to_dag + +import warnings; warnings.simplefilter(action='ignore', category=np.ComplexWarning) + +flat = lambda s: np.concatenate([s.data.real, s.data.imag]).astype(np.float32).flatten() +GATES = 3 + +class CircuitDesigner(gym.Env): + """ Quantum Circuit Environment: build a quantum circuit gate-by-gate for a desired objective. + + Attributes + qubits [int]: number of available qubits for quantum circuit + depth [int]: maximum depth desired for quantum circuit + objective [str]: RL objective for which the circuit is to be built (see Reward class) + punish [bool]: specifies whether depth of circuit should be punished + + Methods + reset(): resets the circuit to initial state of |0>^n with empty list of operations + step(action): updates environment for given action, returning observation and reward after that action + """ + + metadata = {"render_modes": ["image","text"], "render_fps": 30} + + def __init__(self, max_qubits: int, max_depth: int, objective: str, + punish=True, sparse=True, seed=None, render_mode=None): + super().__init__() + if seed is not None: self._np_random, seed = gym.utils.seeding.np_random(seed) + self.render_mode = render_mode; self.name = f"{objective}|{max_qubits}-{max_depth}" + + # define parameters, the (maximal) number of available qubits and circuit depth + self.qubits, self.depth = max_qubits, max_depth + self.max_steps = max_depth * max_qubits * 2 + self.punish = punish; self.sparse = sparse + self.objective = objective # objective for reward computation + self.target = self._target(*objective.split('-'), seed) + self._qc = QuantumCircuit(self.qubits) + + # Define observation space + self.observation_space = gym.spaces.Box(low=-1.0, high=+1.0, shape=self._state[0].shape) + + # Action space: Gate, Wire, Control, and Theta + m = 1e-5 # prevent gate overflow at bounds due to floor operator + self.action_space = gym.spaces.Box( + np.array([0,0,0,-np.pi]), np.array([GATES-m,self.qubits-m,self.qubits-m,np.pi])) + + + def _target(self, task, target, seed): + if task == 'SP': + if target == 'random': return random_statevector(2**self.qubits, seed) + if target == 'bell': return Statevector(np.array([1/np.sqrt(2), 0, 0, 1/np.sqrt(2)], dtype=np.complex128)) + if 'ghz' in target: + n = int(target[-1]) + assert 2 <= n <= self.qubits, f"GHZ entangled state must have at least 2 and at most {self.qubits} qubits." + target = np.zeros(shape=(2**n,), dtype=np.complex128) + target[0] = target[-1] = 1/np.sqrt(2) + return Statevector(target) + if task == 'UC': + if target == 'random': return random_unitary(2**self.qubits, seed) + _t = QuantumCircuit(self.qubits) + if target == 'hadamard': _t.append(HGate(),[0]) + if target == 'toffoli': + assert self.qubits >= 3, "to build Toffoli gate you need at least three wires/qubits." + _t.append(CCXGate(),[0,1,2]) + return Operator(_t) + assert False, f'{task}-{target} not defined.' + + + @property + def _operations(self): return sum([v for k,v in self._qc.count_ops().items()]) + + @property + def _used_wires(self): return self.qubits - len(list(circuit_to_dag(self._qc).idle_wires())) + + + @property + def _state(self): + """ Calculate zero-state information """ + if 'UC' in self.objective: state = flat(Operator(self._qc)) + if 'SP' in self.objective: state = flat(Statevector.from_instruction(self._qc)) + observation = np.concatenate([state, flat(self.target)]) + info = {'depth': self._qc.depth(), 'operations': self._operations, 'used_wires': self._used_wires} + return observation, info + + + def _operation(self, action): + """ Action Converter translating values from action_space into quantum operations """ + gate, wire, cntrl, theta = action + gate, wire, cntrl = np.floor([gate, wire, cntrl]).astype(int) + assert wire in range(self.qubits) and cntrl in range(self.qubits), f"{action}" + if wire == cntrl and gate == 0: return PhaseGate(theta), [wire] # PhaseShift + if wire == cntrl and gate == 1: return RXGate(theta), [wire] # RX + if gate == 0: return CPhaseGate(theta), [cntrl, wire] # ControlledPhaseShift + if gate == 1: return CXGate(), [cntrl, wire] # CNOT + if gate == 2: return None # Terminate + assert False, 'Unhandled Action on gate ' + gate + + def _reward_delta(self, reward, cost): + reward_delta, cost_delta = reward - self.last_reward, cost - self.last_cost + self.last_reward = reward; self.last_cost = cost; return reward_delta, cost_delta + + @property + def _reward(self): + if 'SP' in self.objective: # compute fidelity between target and output state within [0,1] + reward = abs(np.vdot(Statevector.from_instruction(self._qc), self.target))**2 + if 'UC' in self.objective: # 1 - 2 * arctan(norm(U_composed - U_target)) / pi with U_target defined by param. + reward = 1 - 2 * np.arctan(np.linalg.norm(self.target - Operator(self._qc)))/np.pi + cost = (max(0, self._qc.depth() - self.depth/3)) / (self.depth / 2 * 3) # 1/3 deph overhead to solution + if not self.sparse: reward, cost = self._reward_delta(reward, cost) + return reward, cost + + + def reset(self, seed=None, options=None): + super().reset(seed=seed) # Set seed for random number generator + if not self.sparse: self.last_reward = 0; self.last_cost = 0; + self._qc.clear() + return self._state + + + def step(self, action): + operation = self._operation(action) + terminated = operation is None + if not terminated: self._qc.append(*operation) + state, info = self._state + truncated = self._qc.depth() >= self.depth or self._operations >= self.max_steps + if terminated: info['termination_reason'] = 'DONE' + if truncated: info['termination_reason'] = 'DEPTH' + reward, cost = self._reward + info = {**info, 'metric': reward, 'cost': cost} + if self.sparse and not (terminated or truncated): reward, cost = 0, 0 + if self.punish: reward -= cost + return state, reward, terminated, truncated, info + + + def render(self): + if self.render_mode is None: return None + return self._qc.draw(self.render_mode) diff --git a/qcd_gym/wrappers/__init__.py b/qcd_gym/wrappers/__init__.py new file mode 100644 index 0000000..45dde96 --- /dev/null +++ b/qcd_gym/wrappers/__init__.py @@ -0,0 +1 @@ +from qcd_gym.wrappers.monitor import Monitor \ No newline at end of file diff --git a/circuit_designer/wrappers/monitor.py b/qcd_gym/wrappers/monitor.py similarity index 89% rename from circuit_designer/wrappers/monitor.py rename to qcd_gym/wrappers/monitor.py index 3e41ba0..db67af4 100644 --- a/circuit_designer/wrappers/monitor.py +++ b/qcd_gym/wrappers/monitor.py @@ -41,9 +41,11 @@ def step(self, action: ActType) -> tuple[ObsType, SupportsFloat, bool, bool, dic self._termination_reasons.append(info.pop('termination_reason')) self._episode_lengths.append(ep_len); self._episode_times.append(time.time() - self.t_start) - ep_info['d'] = info["resources"].depth # ['depth'] - ep_info['q'] = info["resources"].num_wires # ['num_used_wires'] - ep_info['o'] = info["resources"].num_gates # ['num_operations'] + ep_info['d'] = info["depth"] + ep_info['o'] = info["operations"] + ep_info['q'] = info["used_wires"] + ep_info['m'] = info["metric"] + ep_info['c'] = info["cost"] info["episode"] = ep_info self._total_steps += 1 @@ -56,9 +58,7 @@ def get_video(self, reset=True): def write_video(self, writer, label, step): """Adds current videobuffer to tensorboard""" - frame_buffer = self.get_video() - if self.render_mode == 'text': writer.add_text(label, frame_buffer[-2], step) - elif self.render_mode == 'image': assert False, 'Not implemented' + if self.render_mode == 'text': writer.add_text(label, str(self.env.render()), step) @property def total_steps(self) -> int: return self._total_steps diff --git a/run.sh b/run.sh index 928c82a..724e8c9 100755 --- a/run.sh +++ b/run.sh @@ -1,11 +1,14 @@ -for ITERATION in 0 2 4 6; do +BASE='results' +for ITERATION in 0 2 4 6; do #0 2 4 6 for RUN in 1 2; do # Depth = 3xoptimal solution - for CHALLENGE in 'UC-hadamard-q1-d9' 'SP-bell-q2-d12' 'SP-ghz3-q3-d15' 'UC-random-q2-d12' 'SP-random-q2-d12' 'UC-toffoli-q3-d63'; do - for ALG in 'A2C' 'PPO' 'SAC' 'TD3'; do + for TASK in 'UC-hadamard-q1-d9' 'SP-random-q2-d12'; do # Base + # for TASK in 'SP-ghz3-q3-d15' 'UC-toffoli-q3-d63'; do # Advanced + # for TASK in 'SP-bell-q2-d12' 'UC-random-q2-d12'; do # Additional + for ALG in 'PPO' 'SAC' 'A2C' 'TD3'; do SEED=$(($ITERATION + $RUN)) - O="results/out/$CHALLENGE/$ALG"; mkdir -p "$O" - echo "Running $ALG in $CHALLENGE [SEED $SEED]" - python -m train $ALG -e $CHALLENGE -s $SEED &> "$O/$SEED.out" & + O="$BASE/out/$TASK/$ALG"; mkdir -p "$O" + echo "Running $ALG in $TASK [SEED $SEED]" + python -m train $ALG -e $TASK -s $SEED --sparse --punish --path $BASE &> "$O/$SEED.out" & sleep 5 done done diff --git a/setup.py b/setup.py index b085ff6..4af379e 100644 --- a/setup.py +++ b/setup.py @@ -1,25 +1,22 @@ from pathlib import Path from setuptools import setup, find_packages -train = [ "stable_baselines3>=2.0.0", "tqdm>=4.65.0"] +train = [ "torch==2.0.1", "stable_baselines3>=2.0.0", "tqdm>=4.65.0"] plot = ["plotly>=5.0", "tensorboard>=2.0"] setup( - name="qcd-gym", version="0.1.0", + name="qcd-gym", version="0.2.0", description="Quantum Circuit Designer: A gymnasium-based set of environments for benchmarking reinforcement learning for quantum circuit design.", url="https://github.com/philippaltmann/qcd", author_email="philipp@hyphi.co", license="MIT", keywords="benchmark reinforcement-learning quantum-computing gymnasium circuit-design", long_description=(Path(__file__).parent / "README.md").read_text(), long_description_content_type="text/markdown", - packages=[package for package in find_packages() if package.startswith("circuit_designer")], - install_requires=[ - "gymnasium==0.29", # Bump to 1.0 once available - "pennylane-lightning==0.32.0", # Quantum Simulation (including pennylane base) - ], + packages=find_packages(include=['qcd_gym','qcd_gym.wrappers']), + install_requires=[ "gymnasium==0.29", "qiskit==1.0.2" ], extras_require = { "tests": [ "pytest", "black"], "train": train, "plot": plot, "all": train + plot }, python_requires=">=3.8", - entry_points={ "gymnasium.envs": ["__root__ = circuit_designer.__init__:register_envs"] } + entry_points={ "gymnasium.envs": ["__root__ = qcd_gym.__init__:register_envs"] } ) diff --git a/circuit_designer/test/__init__.py b/test/__init__.py similarity index 100% rename from circuit_designer/test/__init__.py rename to test/__init__.py diff --git a/test/__main__.py b/test/__main__.py new file mode 100644 index 0000000..964f89c --- /dev/null +++ b/test/__main__.py @@ -0,0 +1,12 @@ +from test.bell import bell +from test.ghz import ghz +from test.hadamard import hadamard +from test.toffoli import toffoli + +# State Preparation +bell() +ghz() + +# Unitary Composition +hadamard() +toffoli() \ No newline at end of file diff --git a/circuit_designer/test/bell.py b/test/bell.py similarity index 53% rename from circuit_designer/test/bell.py rename to test/bell.py index 630902d..7cb42f2 100644 --- a/circuit_designer/test/bell.py +++ b/test/bell.py @@ -1,20 +1,19 @@ import gymnasium as gym; import numpy as np def bell(): - env = gym.make("CircuitDesigner-v0", max_qubits=2, max_depth=12, challenge='SP-bell') + env = gym.make("CircuitDesigner-v0", max_qubits=2, max_depth=12, objective='SP-bell') env.reset() # H - env.step([1,0,0,np.pi/2]) - env.step([2,0,0,np.pi/2]) - env.step([1,0,0,np.pi/2]) + env.step([0,0,0,np.pi/2]) + env.step([1,0,0,np.pi/2]) + env.step([0,0,0,np.pi/2]) # CX - env.step([2,1,0,np.pi]) + env.step([1,1,0,np.pi]) - # M - env.step([0,0,0,0]) - reward = env.step([0,1,0,0])[1] + # T + reward = env.step([2,0,0,0])[1] np.testing.assert_almost_equal(reward, 1) print("Succeeded bell test") diff --git a/test/ghz.py b/test/ghz.py new file mode 100644 index 0000000..ffd2503 --- /dev/null +++ b/test/ghz.py @@ -0,0 +1,19 @@ +import gymnasium as gym; import numpy as np + +def ghz(): + env = gym.make("CircuitDesigner-v0", max_qubits=3, max_depth=15, objective='SP-ghz3') + env.reset() + + # H + env.step([0,0,0,np.pi/2]); env.step([1,0,0,np.pi/2]); env.step([0,0,0,np.pi/2]) + + # CX + env.step([1,1,0,np.pi]); env.step([1,2,1,np.pi]) + + # M + reward = env.step([2,0,0,0])[1] + + np.testing.assert_almost_equal(reward, 1) + print("Succeeded GHZ test") + + diff --git a/test/hadamard.py b/test/hadamard.py new file mode 100644 index 0000000..f0fc3e7 --- /dev/null +++ b/test/hadamard.py @@ -0,0 +1,22 @@ +import gymnasium as gym; import numpy as np + +def hadamard(): + # Test 1-qubit H + env = gym.make("CircuitDesigner-v0", max_qubits=1, max_depth=9, objective='UC-hadamard') + env.reset() + env.step([0,0,0,np.pi/2]) + env.step([1,0,0,np.pi/2]) + env.step([0,0,0,np.pi/2]) + reward = env.step([2,0,0,0])[1] # Meassure + np.testing.assert_almost_equal(reward, 1) + + # Test 2-qubit H + env = gym.make("CircuitDesigner-v0", max_qubits=2, max_depth=9, objective='UC-hadamard') + + env.reset() + env.step([0,0,0,np.pi/2]) + env.step([1,0,0,np.pi/2]) + env.step([0,0,0,np.pi/2]) + reward = env.step([2,0,0,0])[1] # Meassure + np.testing.assert_almost_equal(reward, 1) + print("Succeeded Hadamard test") diff --git a/test/toffoli.py b/test/toffoli.py new file mode 100644 index 0000000..b5a17c5 --- /dev/null +++ b/test/toffoli.py @@ -0,0 +1,31 @@ +import gymnasium as gym; import numpy as np + +def toffoli(): + env = gym.make("CircuitDesigner-v0", max_qubits=3, max_depth=63, objective='UC-toffoli') + env.reset() + + # V + env.step([0,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]); env.step([0,2,2,np.pi/2]) + env.step([0,2,1,np.pi/2]) + env.step([0,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]); env.step([0,2,2,np.pi/2]) + + # Cnot + env.step([1,1,0,np.pi/2]) + + # V- + env.step([0,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]); env.step([0,2,2,np.pi/2]) + env.step([0,2,1,-np.pi/2]) + env.step([0,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]); env.step([0,2,2,np.pi/2]) + + # Cnot + env.step([1,1,0,np.pi/2]) + + env.step([0,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]); env.step([0,2,2,np.pi/2]) + env.step([0,2,0,np.pi/2]) + env.step([0,2,2,np.pi/2]); env.step([1,2,2,np.pi/2]); env.step([0,2,2,np.pi/2]) + # CZ pi/2 + + # Meassure + reward = env.step([2,0,0,0])[1] + np.testing.assert_almost_equal(reward, 1) + print("Succeeded Toffoli test") diff --git a/train/__main__.py b/train/__main__.py index dfb51f3..07bfaeb 100644 --- a/train/__main__.py +++ b/train/__main__.py @@ -5,35 +5,35 @@ # General Arguments parser = argparse.ArgumentParser() parser.add_argument('method', help='The algorithm to use', choices=[*ALGS]) -parser.add_argument( '-e', dest='envs', nargs='+', default=['Maze7Target'], metavar="Environment", help='The name and spec and of the safety environments to train and test the agent. Usage: --env NAME, CONFIG, N_TRAIN, N_TEST') +parser.add_argument( '-e', dest='env', metavar="Environment") parser.add_argument('-s', dest='seed', type=int, help='The random seed. If not specified a free seed [0;999] is randomly chosen') parser.add_argument('-t', dest='timesteps', type=int, help='The number of timesteps to explore.', default=128*(2048*4)) #~10e5 parser.add_argument('--load', type=str, help='Path to load the model.') parser.add_argument('--test', help='Run in test mode (dont write log files).', action='store_true') -parser.add_argument('--stop', dest='stop_on_reward', help='Stop at reward threshold.', action='store_true') # TODO: test parser.add_argument('--path', default='results', help='The base path, defaults to `results`') -# parser.add_argument('-d', dest='device', default='cuda', choices=['cuda','cpu']) +parser.add_argument('--punish', action='store_true') +parser.add_argument('--sparse', action='store_true') # Get arguments & extract training parameters & merge model args args = {key: value for key, value in vars(parser.parse_args()).items() if value is not None}; if args.pop('test'): args['path'] = None +args['envkwargs'] = {'punish': args.pop('punish'), 'sparse': args.pop('sparse')} timesteps = args.pop('timesteps'); presteps = 0 load = args.pop('load', None) # Init Training Model trainer = eval(args.pop('method')) -# args['policy'] = 'MultiInputPolicy' model = trainer(**args) -model._naming = {**model._naming , 'd': 'depth-100', 'q': 'qbits-100', 'o': 'ops-100'} +model._naming = {**model._naming , 'd': 'depth-100', 'q': 'qbits-100', 'o': 'ops-100', 'm': 'metric-100', 'c': 'cost-100'} if load is not None: _params = model.get_parameters()['policy'].copy().__str__() model.set_parameters({**model.get_parameters(), 'policy': model.policy.load(load).state_dict()}) # v2: load policy from file assert _params != model.get_parameters()['policy'].copy().__str__(), "Load failed" -print(f"Training {trainer.__name__ } in {model.envs['train'].envs[0].unwrapped.name} for {timesteps-presteps:.0f} steps.") -model.learn(total_timesteps=timesteps-presteps) #, reset_num_timesteps = not pretrain +print(f"Training {trainer.__name__ } in {args['env']} for {timesteps-presteps:.0f} steps.") +model.learn(total_timesteps=timesteps-presteps) if model.path: model.save() print(f"Done in {time.time()-start}")