"""Implements abstract interfaces for instance storage and QPU solvers."""
import glob
import json
import logging
import os
import traceback
from abc import abstractmethod
from datetime import datetime
import numpy as np
__version__ = '2.0.2'
__debug_mode__ = False
[docs]
class DataBase():
"""Implements storage/access to instances to be solved on QPUs."""
def __init__(self, directory, file_ext='.json', filter_fun=None, sort_fun=None):
self.directory = directory
self.file_ext = file_ext
self.filter_fun = filter_fun
self.sort_fun = sort_fun
self.db = {}
self._gather_data()
[docs]
def _gather_data(self):
self.db.clear()
self._load_map = {}
glob_request = f'{self.directory}/*{self.file_ext}'
for path in glob.glob(glob_request):
with open(path, 'r') as fh:
data = json.load(fh)
if self.filter_fun is None or self.filter_fun(data):
_, filename = os.path.split(path)
instance_id = data['description']['instance_id']
assert instance_id not in self.db
self.db[instance_id] = data
self._load_map[filename] = instance_id
if self.sort_fun is not None:
self.db = dict(sorted([(instance_id, data) for instance_id, data in self.db.items()],
key=lambda item: self.sort_fun(item[0], item[1])))
[docs]
def _preprocess_qubo_data(self, Q, P, a, b):
if a is None or b is None:
return Q, P
else:
assert a == 1 and b == 1
qubo_matrix = Q / 2 + np.diag(P)
scaling_factor = a / np.max(np.abs(qubo_matrix)) ** b
Q_scaled = Q * scaling_factor
P_scaled = P * scaling_factor
return Q_scaled, P_scaled
[docs]
def get_data(self, instance_id):
for data in self.db.values():
if data['description']['instance_id'] == instance_id:
return data
raise ValueError
[docs]
def get_qubo_data(self, instance_id, a, b):
qubo_data = self.db[instance_id]
# get_data, Const is ignored
Q = np.array(qubo_data['Q'])
P = np.array(qubo_data['P'])
return self._preprocess_qubo_data(Q, P, a, b)
[docs]
def get_instance_ids(self):
return (key for key in self.db.keys())
[docs]
def get_filename_from_instance_id(self, instance_id):
load_map_inv = {instance_id: filename for (filename, instance_id) in self._load_map.items()}
return load_map_inv[instance_id]
[docs]
class Solver():
"""Implements abstract QPU Solver interface.
See device-specific code in :py:mod:`bench_qubo`.
"""
def __init__(self, name, db):
self.name = name
self.db = db
self.logger = None
[docs]
@abstractmethod
def _run(self, instance_id, *args):
raise NotImplementedError
[docs]
@abstractmethod
def get_data(self, instance_id, *args):
raise NotImplementedError
[docs]
def _stringify_keys(self, d):
# Convert a dict's keys to strings if they are not. (https://stackoverflow.com/questions/12734517/json-dumping-a-dict-throws-typeerror-keys-must-be-a-string)
d_new = dict()
for key in d.keys():
value = d[key]
if isinstance(value, dict):
new_value = self._stringify_keys(value)
elif isinstance(value, list) or isinstance(value, tuple):
new_value = [self._stringify_keys(v) if isinstance(v, dict) else v for v in value]
if isinstance(value, tuple):
new_value = tuple(new_value)
else:
new_value = value
if not isinstance(key, str) and not isinstance(key, int) and not isinstance(key, float) and not isinstance(
key, bool) and key is not None:
try:
new_key = str(key)
except Exception:
try:
new_key = repr(key)
except Exception as e:
raise e
else:
new_key = key
d_new[new_key] = new_value
return d_new
[docs]
def _save(self, path, data):
with open(path, 'w') as fh:
try:
new_dict = self._stringify_keys(data)
except Exception as e:
new_dict = dict(str_data=str(data), exception=e) # fallback
if __debug_mode__:
raise e
json.dump(new_dict, fh, default=str)
[docs]
def _setup_logger(self, log_file, file_mode='w', log_name=__name__, log_level=logging.DEBUG, save_logger=True):
logger = logging.getLogger(log_name)
logger.setLevel(log_level)
fh = logging.FileHandler(log_file, mode=file_mode)
fh.setLevel(logging.DEBUG)
fmt = logging.Formatter('%(asctime)s %(name)-6s %(levelname)-6s %(message)s')
fh.setFormatter(fmt)
logger.addHandler(fh)
if save_logger:
self.logger = logger
return logger
[docs]
def _run_multiple(self, run_list, save_file_path, logger):
num_success = 0
if logger is not None:
logger.info(f'started {len(run_list)} run(s).')
for run_idx, (instance_id, args) in enumerate(run_list):
num_success += self.run(f'{save_file_path}_{run_idx:04d}', logger, instance_id, *args)['success']
if logger is not None:
logger.info(f'run {run_idx + 1}/{len(run_list)} complete.')
if logger is not None:
logger.info(f'completed {len(run_list)} run(s): {num_success} successful.')
[docs]
def run(self, save_file_path, logger, instance_id, *args):
if logger is not None:
logger.info(f'run start: instance_id={instance_id}, args={args}.')
start_timestamp = datetime.now().timestamp()
try:
solver = self._run(instance_id, *args)
success = True
except Exception as e:
fail_timestamp = datetime.now().timestamp()
solver = dict(exception=e, traceback=traceback.format_exc(), fail_timestamp=fail_timestamp)
success = False
if __debug_mode__:
raise e
end_timestamp = datetime.now().timestamp()
result = dict(instance_id=instance_id,
instance_filename=self.db.get_filename_from_instance_id(instance_id),
solver_name=self.name,
args=[str(arg) for arg in args],
timestamps=dict(start=start_timestamp, end=end_timestamp),
solver=solver,
success=success,
version=__version__)
if save_file_path is not None:
self._save(f'{save_file_path}.json', result)
if logger is not None:
logger.info(f'run finished: {"success" if success else "failure"}, saved to "{save_file_path}.json".')
return result
[docs]
def run_multiple(self, run_list, save_file_path=None, logger=None):
self._run_multiple(run_list, save_file_path, logger)
[docs]
def benchmark(self, run_list, save_file_path):
logger = self._setup_logger(f'{save_file_path}.log')
self._run_multiple(run_list, save_file_path, logger)