Source code for jUFLP_cavemen

"""Proof-of-concept experiment: joint UFLP over special class of instances.

Implements DD-based solution procedures for j-UFLP instances, along with the
instance generation. Note that the up-to-date experiment corresponding to
Section 4.2 of the paper is presented in :py:mod:`UFLP_2_cav`, but it calls
solution procedures implemented in this module.
"""
import pytest
import numpy as np
import gurobipy as gp
import json
from time import time

from darkcloud import ptscloud, DDSolver, gen_caveman_inst
from BDD import intersect
from varseq import VarSeq
from BB_search import BBSearch
from UFL import add_BDD_to_MIP
from UFLP_fullDD import create_cover_DD
from UFLPOrder import UFLP_greedy_order


[docs]def gen_cavemen_jUFLP_inst(n=10, M=7, L=0.25, verbose=False, linking="default"): """Generates an instance with the related metadata (info on caves). Args: n (int): number of caves, M (int): number of points in a cave, L (float): edge sparsity parameter (share of missing edges) verbose (Bool): print debug info linking (str): linking constraints type Returns: inst1, inst2, join_map: sub-instances and caves description. Note: Each sub-instance is parameterized by [S,f,c,caves]. (See :py:func:`darkcloud.gen_caveman_inst` for details.) """ S, f, c, caves = gen_caveman_inst(n, M, L) S2, f2, c2, caves2 = gen_caveman_inst(n, M, L) join_map = dict() if linking == "uniform": join_map = dict( zip([j for j in range(1, len(S) + 1)], np.random.permutation([j for j in range(1, len(S) + 1)]))) elif linking == "consecutive": ca1 = [ca.S for ca in caves] ca2 = [ca.S for ca in caves2] for k in range(len(ca1)): join_map.update(dict(zip(ca1[k], np.random.permutation(ca2[k])))) else: ca1 = [ca.S for ca in caves] ca2 = np.random.permutation([ca.S for ca in caves2]) for k in range(len(ca1)): join_map.update(dict(zip(ca1[k], np.random.permutation(ca2[k])))) if verbose: print(f"S={S};\nf={f}\n;c={c}") return [[S, f, c, caves], [S2, f2, c2, caves2], join_map]
# encoding numpy numbers # save and load instances
[docs]class jUFLPEncoder(json.JSONEncoder): """A technical implementation needed to save an instance as ``.json``"""
[docs] def default(self, obj): if isinstance(obj, np.int64): return int(obj) elif isinstance(obj, dict): return {int(k): int(obj[k]) for k in obj} return json.JSONEncoder.default(self, obj)
[docs]def save_inst(i1, i2, join_map, filename): """Saves the jUFLP instance to ``.json`` file. (old instance format).""" with open(filename, "w") as fout: fout.write( json.dumps( { 'inst1': { 'S': i1[0], 'f': i1[1], 'c': i1[2], 'ptsclouds': [i1[3][j].__dict__ for j in range(len(i1[3]))] }, 'inst2': { 'S': i2[0], 'f': i2[1], 'c': i2[2], 'ptsclouds': [i2[3][j].__dict__ for j in range(len(i2[3]))] }, 'jmap': {int(j): join_map[j] for j in join_map} }, cls=jUFLPEncoder))
[docs]def load_inst(filename): """Loads a jUFLP instance from ``.json`` file. Returns: [[S,f,c,ptsclouds], [S2,f2,c2,pltsclouds2], jmap] """ with open(filename, "r") as fin: json_inst = fin.read() json_dct = json.loads(json_inst) return [[ json_dct[f'inst{i}']['S'], json_dct[f'inst{i}']['f'], json_dct[f'inst{i}']['c'], [ ptscloud(ptc['e1'], ptc['e2'], ptc['S']) for ptc in json_dct[f'inst{i}']['ptsclouds'] ] ] for i in [1, 2] ] + [{int(j1): json_dct['jmap'][j1] for j1 in json_dct['jmap']}]
[docs]def show_inst(inst1, inst2, join_map, filename="./tmp/jUFLP.dot"): """Shows the supplied j-UFLP instance / saves it to a .dot file. Args: inst1, inst2 (list): instances description, [S,f,c,caves] filename (str): filename to save to (default "./tmp/jUFLP.dot") """ added = set([]) with open(filename, "w") as fout: fout.write("graph G {\n") fout.write(""" rankdir="LR";\n""") S, f, c, caves = inst2 join_pts = sum([list(c.e1 + c.e2) for c in caves], []) join_pts = list(np.unique([j for j in join_pts if j is not None])) for i in range(len(S)): for j in S[i]: if ((i + 1) != j) and not (((j, (i + 1)) in added) or ((i + 1, j) in added)): if (i + 1) in join_pts: pref_i = "j" else: pref_i = "f" if j in join_pts: pref_j = "j" else: pref_j = "f" fout.write( f" {pref_i}{i+1}--{pref_j}{j}[penwidth=3];\n") added.add(((i + 1), j)) if (i + 1) not in join_pts: fout.write(f" f{i+1}[penwidth=3];\n") S, f, c, caves = inst1 join_pts = sum([list(c.e1 + c.e2) for c in caves], []) join_pts = list(np.unique([j for j in join_pts if j is not None])) added = set([]) for i in range(len(S)): for j in S[i]: if ((i + 1) != j) and not (((j, (i + 1)) in added) or ((i + 1, j) in added)): if (i + 1) in join_pts: a = f"j{join_map[i+1]}" else: a = f"s{i+1}" if j in join_pts: b = f"j{join_map[j]}" else: b = f"s{j}" fout.write(f" {a}--{b}[color=red];\n") added.add(((i + 1), j)) if (i + 1) not in join_pts: fout.write(f" s{i+1}[color=red, textcolor=red];\n") for j in join_pts: fout.write( f" j{join_map[j]}[style=filled, fillcolor=yellow, penwidth=5];\n" ) fout.write("}")
[docs]def solve_cm_jUFLP_MIP(i1, i2, jmap): """Solves the special jUFLP instance with MIP.""" m = gp.Model() m.modelSense = gp.GRB.MINIMIZE m.setParam("OutputFlag", 0) x = dict() y = dict() # add the first instance S, f, c, caves = i1 # create variables for j in range(1, len(S) + 1): x[(1, j)] = m.addVar(vtype=gp.GRB.BINARY, name=f"x1_{j}", obj=c[j - 1]) for a in range(1, len(S[j - 1]) + 1): y[(1, j, a)] = m.addVar(vtype=gp.GRB.BINARY, name=f"y1_{j}_{a}", obj=f[j - 1][a] - f[j - 1][a - 1]) # create constraints for j in range(1, len(S) + 1): m.addConstr( gp.quicksum(x[(1, k)] for k in S[j - 1]) == gp.quicksum( y[(1, j, a)] for a in range(1, len(S[j - 1]) + 1))) for a in range(1, len(S[j - 1])): m.addConstr(y[(1, j, a)] >= y[(1, j, a + 1)]) # add the second instance S, f, c, caves = i2 # create variables for j in range(1, len(S) + 1): x[(2, j)] = m.addVar(vtype=gp.GRB.BINARY, name=f"x2_{j}", obj=c[j - 1]) for a in range(1, len(S[j - 1]) + 1): y[(2, j, a)] = m.addVar(vtype=gp.GRB.BINARY, name=f"y2_{j}_{a}", obj=f[j - 1][a] - f[j - 1][a - 1]) # create constraints for j in range(1, len(S) + 1): m.addConstr( gp.quicksum(x[(2, k)] for k in S[j - 1]) == gp.quicksum( y[(2, j, a)] for a in range(1, len(S[j - 1]) + 1))) for a in range(1, len(S[j - 1])): m.addConstr(y[(2, j, a)] >= y[(2, j, a + 1)]) # link the two instances for j in jmap: m.addConstr(x[(1, j)] == x[(2, jmap[j])]) m.update() m.optimize() assert m.status == gp.GRB.OPTIMAL return m.objVal + sum([fs[0] for fs in i1[1]]) + sum([fs[0] for fs in i2[1]])
[docs]def solve_cm_jUFLP_DDs(i1, i2, jmap, intmode="toA", ret_int=False): """Solves the jUFLP cavemen instance with DDs. Args: i1, i2 (list): instances intmode (str): alignment mode, 'toA', 'toB', or 'VS' Notes: Instance is parameterized as per :py:func:`darkcloud.gen_caveman_inst`, The diagrams are built with :py:class:`darkcloud.DDSolver`. """ S, f, c, caves = i1 S2, f2, c2, caves2 = i2 sol = DDSolver(S, f, c, caves) B1 = sol.build_cover_DD() sol = DDSolver(S2, f2, c2, caves2) B2 = sol.build_cover_DD() B1.make_reduced() B2.make_reduced() B1.rename_vars(jmap) if intmode == "toA": target = B1.vars elif intmode == "toB": target = B2.vars elif intmode == 'VS': vs1 = VarSeq(B1.vars, [len(L) for L in B1.layers[:-1]]) vs2 = VarSeq(B2.vars, [len(L) for L in B2.layers[:-1]]) assert set(vs1.layer_var) == set( vs2.layer_var), f"1:{vs1.layer_var}, 2:{vs2.layer_var}" b = BBSearch(vs1, vs2) status = b.search() assert status == "optimal" or status == "timeout" target = b.Ap_cand.layer_var else: print(f"Wrong mode: '{intmode}'. Expected: 'toA', 'toB', or 'VS'.") B1.align_to(target, inplace=True) B2.align_to(target, inplace=True) int_DD = intersect(B1, B2) sp = int_DD.shortest_path() if ret_int: return sp[0], int_DD.size() else: return sp[0]
[docs]def solve_cm_jUFLP_CPPMIP(i1, i2, jmap): """Solves a jUFLP on cavemen with CPPMIP. Args: i1, i2 (list): instances jmap (dict): joining dict. Notes: Instance is parameterized as per :py:func:`darkcloud.gen_caveman_inst`, The diagrams are built with :py:class:`darkcloud.DDSolver`. Returns: objective (float) """ S, f, c, caves = i1 S2, f2, c2, caves2 = i2 sol = DDSolver(S, f, c, caves) B1 = sol.build_cover_DD() sol = DDSolver(S2, f2, c2, caves2) B2 = sol.build_cover_DD() B1.make_reduced() B2.make_reduced() B1.rename_vars(jmap) m, c, v, x = add_BDD_to_MIP(B1, prefix="B1_") m, c, v, x = add_BDD_to_MIP(B2, model=m, x=x, prefix="B2_") m.update() m.setParam("OutputFlag", 0) m.optimize() return m.objVal
[docs]def solve_cm_jUFLP_CPPMIP_fullDDs(i1, i2, jmap): """Solves a jUFLP on cavemen (full-DDs) with CPPMIP. Args: i1, i2 (list): instances jmap (dict): joining dict. Notes: Instance is parameterized as per :py:func:`darkcloud.gen_caveman_inst`, The diagrams are built with :py:func:`UFLP_fullDD.create_cover_DD`. Returns: objective (float) """ S, f, c, caves = i1 S2, f2, c2, caves2 = i2 B1, _ = create_cover_DD(S, f, c, UFLP_greedy_order(S, True)) B2, _ = create_cover_DD(S2, f2, c2, UFLP_greedy_order(S2, False)) B1.make_reduced() B2.make_reduced() B1.rename_vars(jmap) m, c, v, x = add_BDD_to_MIP(B1, prefix="B1_") m, c, v, x = add_BDD_to_MIP(B2, model=m, x=x, prefix="B2_") m.update() m.setParam("OutputFlag", 0) m.optimize() return m.objVal
[docs]def solve_cm_jUFLP_fullDDs(i1, i2, jmap, intmode, ret_int=False): """Solves the jUFLP cavemen instance with DDs. Args: i1, i2 (list): instances Notes: Instance is parameterized as per :py:func:`darkcloud.gen_caveman_inst`. """ S, f, c, caves = i1 S2, f2, c2, caves2 = i2 o1 = UFLP_greedy_order(S, True) o2 = UFLP_greedy_order(S2, False) B1, _ = create_cover_DD(S, f, c, o1) B2, _ = create_cover_DD(S2, f2, c2, o2) B1.make_reduced() B2.make_reduced() B1.rename_vars(jmap) if intmode == "toA": target = B1.vars elif intmode == "toB": target = B2.vars elif intmode == 'VS': vs1 = VarSeq(B1.vars, [len(L) for L in B1.layers[:-1]]) vs2 = VarSeq(B2.vars, [len(L) for L in B2.layers[:-1]]) assert set(vs1.layer_var) == set( vs2.layer_var), f"1:{vs1.layer_var}, 2:{vs2.layer_var}" b = BBSearch(vs1, vs2) status = b.search() assert status == "optimal" or status == "timeout" target = b.Ap_cand.layer_var else: print(f"Wrong mode: '{intmode}'. Expected: 'toA', 'toB', or 'VS'.") B1.align_to(target, inplace=True) B2.align_to(target, inplace=True) int_DD = intersect(B1, B2) sp = int_DD.shortest_path() if ret_int: return sp[0], int_DD.size() else: return sp[0]
###
[docs]def dump_instance(S, caves, filename="tmp/S.dot"): """Dumps a graph implied by S into a `.dot` file. """ added = set([]) with open(filename, "w") as fout: fout.write("graph G {\n") for i in range(len(S)): for j in S[i]: if ((i + 1) != j) and not (((j, (i + 1)) in added) or ((i + 1, j) in added)): fout.write(f" n{i+1} -- n{j};\n") added.add(((i + 1), j)) fout.write("}")
[docs]def solve_with_MIP(S, S2, f, f2, c, caves): """Generates and solves a MIP for the supplied jUFL cavemen instance. Note: The instance is parameterized as per :py:func:`darkcloud.gen_caveman_inst`. Most of the code is adapted form :py:func:`darkcloud.solve_with_MIP`. """ m = gp.Model() m.modelSense = gp.GRB.MINIMIZE m.setParam("OutputFlag", 0) x = dict() y = dict() y2 = dict() # create variables for j in range(1, len(S) + 1): x[j] = m.addVar(vtype=gp.GRB.BINARY, name=f"x_{j}", obj=c[j - 1]) for a in range(1, len(S[j - 1]) + 1): y[(j, a)] = m.addVar(vtype=gp.GRB.BINARY, name=f"y_{j}_{a}", obj=f[j - 1][a] - f[j - 1][a - 1]) for a in range(1, len(S2[j - 1]) + 1): y2[(j, a)] = m.addVar(vtype=gp.GRB.BINARY, name=f"y2_{j}_{a}", obj=f2[j - 1][a] - f2[j - 1][a - 1]) # create constraints for j in range(1, len(S) + 1): # first-graph constraints m.addConstr( gp.quicksum(x[k] for k in S[j - 1]) == gp.quicksum( y[(j, a)] for a in range(1, len(S[j - 1]) + 1))) for a in range(1, len(S[j - 1])): m.addConstr(y[(j, a)] >= y[(j, a + 1)]) # second-graph constraints m.addConstr( gp.quicksum(x[k] for k in S2[j - 1]) == gp.quicksum( y2[(j, a)] for a in range(1, len(S2[j - 1]) + 1))) for a in range(1, len(S2[j - 1])): m.addConstr(y2[(j, a)] >= y2[(j, a + 1)]) m.update() print("Optimizing the model...", end="", flush=True) m.optimize() print("done.") assert m.status == gp.GRB.OPTIMAL return m, (m.objVal + sum(fs[0] for fs in f) + sum(fs2[0] for fs2 in f2)), x, y, y2
[docs]def solve_with_DDs(S, S2, f, f2, c, caves, intmode="toA"): """Solves the jUFLP cavemen instance with DDs. Args: intmode (str): intersection mode, 'toA' or 'VS' Notes: Instance is parameterized as per :py:func:`darkcloud.gen_caveman_inst`, The diagrams are built with :py:class:`darkcloud.DDSolver`. """ print("Building the first DD...", end="", flush=True) sol = DDSolver(S, f, c, caves) B1 = sol.build_cover_DD() print("done.") print("Building the second DD...", end="", flush=True) sol = DDSolver(S2, f2, [0.0 for _ in c], caves) B2 = sol.build_cover_DD() print("done.") if intmode == "toA": target = B1.vars elif intmode == 'VS': vs1 = VarSeq(B1.vars, [len(L) for L in B1.layers[:-1]]) vs2 = VarSeq(B2.vars, [len(L) for L in B2.layers[:-1]]) assert set(vs1.layer_var) == set( vs2.layer_var), f"1:{vs1.layer_var}, 2:{vs2.layer_var}" b = BBSearch(vs1, vs2) status = b.search() assert status == "optimal" or status == "timeout" target = b.Ap_cand.layer_var else: print(f"Wrong intersection mode '{intmode}'. Expected: 'toA' or 'VS'.") B1.align_to(target, inplace=True) B2.align_to(target, inplace=True) int_DD = intersect(B1, B2) sp = int_DD.shortest_path() return sp[0]
# Testing code ######################################################
[docs]@pytest.mark.parametrize("test_inst", [gen_cavemen_jUFLP_inst(7, 7) for _ in range(5)]) def test_jUFL_DDs(test_inst): i1, i2, jmap = test_inst obj1 = solve_cm_jUFLP_DDs(i1, i2, jmap, 'toA') obj2 = solve_cm_jUFLP_DDs(i1, i2, jmap, 'toB') obj3 = solve_cm_jUFLP_DDs(i1, i2, jmap, 'VS') assert abs(obj1 - obj2) < 0.001 assert abs(obj1 - obj3) < 0.001
[docs]@pytest.mark.parametrize("test_inst", [gen_cavemen_jUFLP_inst(7, 7) for _ in range(5)]) def test_cm_jUFL_DDvsMIP(test_inst): i1, i2, jmap = test_inst obj1 = solve_cm_jUFLP_MIP(i1, i2, jmap) obj2 = solve_cm_jUFLP_DDs(i1, i2, jmap) obj3 = solve_cm_jUFLP_CPPMIP(i1, i2, jmap) assert abs(obj1 - obj2) < 0.01 assert abs(obj1 - obj3) < 0.01
# NOTE: it seems sometimes Gurobi yields # rounding-off (?) errors --- e.g., 77.5055 instead of 77.5000 # Seems not a big deal, but I needed to decrease the required # tolerance to 0.01 (from 0.001)
[docs]@pytest.mark.parametrize("test_inst", [gen_cavemen_jUFLP_inst(5, 5) for _ in range(10)]) def test_load_save(test_inst): i1, i2, jmap = test_inst save_inst(i1, i2, jmap, "./tmp/jUFLP-loadsave-test.json") [inst1, inst2, jm] = load_inst("./tmp/jUFLP-loadsave-test.json") obj1 = solve_cm_jUFLP_DDs(i1, i2, jmap) obj2 = solve_cm_jUFLP_DDs(inst1, inst2, jm) assert abs(obj1 - obj2) < 0.001
[docs]def compare_runtimes(): """Performs a quick runtimes comparison (toA vs VS).""" s, s2, f, f2, c, caves = gen_caveman_inst() sol = DDSolver(s, f, c, caves) B1 = sol.build_cover_DD() sol = DDSolver(s2, f2, [0.0 for _ in c], caves) B2 = sol.build_cover_DD() B2.shuffle_vars() # renames the variables, without reordering B1.make_reduced() B2.make_reduced() t0 = time() B2p = B2.align_to(B1.vars, inplace=False) int_toA = intersect(B1, B2p) obj_toA = int_toA.shortest_path()[0] t_toA = time() - t0 print(f"{t_toA:.2f}, {int_toA.size()},", end="", flush=True) t0 = time() vs1 = VarSeq(B1.vars, [len(L) for L in B1.layers[:-1]]) vs2 = VarSeq(B2.vars, [len(L) for L in B2.layers[:-1]]) assert set(vs1.layer_var) == set( vs2.layer_var), f"1:{vs1.layer_var}, 2:{vs2.layer_var}" b = BBSearch(vs1, vs2) status = b.search() assert status == "optimal" or status == "timeout" target = b.Ap_cand.layer_var B1p = B1.align_to(target, inplace=False) B2p = B2.align_to(target, inplace=False) int_VS = intersect(B1p, B2p) obj_VS = int_VS.shortest_path()[0] t_VS = time() - t0 print(f"{t_VS:.2f}, {int_VS.size()}") assert (abs(obj_VS - obj_toA) < 0.01), f"{obj_VS:.2f} vs {obj_toA:.2f}"
[docs]def main(): print("t_toA, intsize_toA, t_VS, intsize_VS") for _ in range(250): compare_runtimes()
if __name__ == '__main__': main()