Source code for BDD

"""Implements the Binary Decision Diagram data type.

Defines the "original" BDD-related machinery (as opposed to the
simplification defined in :py:class:`varseq.VarSeq`), including
actual simultaneous variable reordering (inspired by Rudell'93) and
other BDD alignment-related functions, along with technical utilities
(such as load/save to file, etc.). A simple auxiliary class
:py:class:`node` keeps the necessary data related to BDD nodes.

Tests coverage: :py:mod:`BDD_test`

"""

import numpy as np
import pandas as pd
from numpy.random import random as rnd
from random import choice as rnd_choose
import sys
import pytest

import itertools as iters
from graphviz import Digraph
from collections import deque

import copy
import math

# Module-level constants
GSIFTS_MAX_INCREASE_MUL = 100000  # max increase of the BDD size (e.g., 100 times) during greedy sifts
WEIGHT_TOL = 3  # number of decimal points for the arc weights tolerance (swap_up)
C_MAX = 50  # max edge cost (for random generation, non-inclusive)

# special node IDs (see the node class docstring)
NROOT = 0
NTRUE = -1
NFALSE = -2
######################################################################

[docs]def simscore(seq_A, seq_B, p_B = None): """Calculates a 'similarity score' between `A` and `B`. The function has nothing to do with other data structures, technically: it just calculates the normalized number of inversions between the two ordered lists of variables (which we call 'similarity score'). Args: seq_A (list): sequence A seq_B (list): sequence B to compare p_B (dict): dict of element positions in B (default: None, will be built) Returns: number (between 0 and 1): share of possible inversions. Examples: >>> simscore([1,2,3], [1,2,3]) 1.0 >>> simscore([1,2,3], [3,2,1]) 0.0 >>> simscore([1,2,3], [1,3,2]) 0.6666666666666667 >>> simscore([1,2,3,4,5,6,7], [5,6,3,4,2,7,1]) 0.33333333333333337 """ N = len(seq_A) no_inversions = 0 if p_B is None: p_B = {seq_B[i]: i for i in range(N)} assert set(seq_A) == set(seq_B) for i in range(N-1): for j in range(i, N): if p_B[seq_A[i]] > p_B[seq_A[j]]: no_inversions += 1 return 1 - no_inversions / (N*(N-1)/2)
[docs]class node(object): """Represents a BDD node, with the following attributes. Attributes: id: an integer node ID hi: a pointer to the ``hi``-node lo: a pointer to the ``lo``-node layer: layer number (id) Note: There are special node IDs: ``0`` for the root, ``-1`` for True sink, ``-2`` for the False sink. """ def __init__(self, id, hi=None, lo=None, layer=-1): """A simple class constructor.""" self.id = id self.hi = hi self.lo = lo self.layer = layer
######################################################################
[docs]class BDD(object): """Encodes a BDD and implements layer-ordering (revision) methods. Attributes: N (int): number of variables (equals number of layers - 1) layers (list): a list of layers (sets/hashes), including {T, F}-nodes nodes (dict): a set/hash of nodes, key by node ID vars (list): variables associated with layers T,F (node): pointers to `True` and `False` sink nodes weighted (bool): a flag whether arcs have weights weights (dict): arc weights, keys: `(id_from, id_to, arc_type)` (with the latter either 'hi', or 'lo') """ def __init__(self, N=2, vars=None, weighted=False): """Defines an empty BDD with N variables. Args: N (int): number of variables ( = number of layers except the terminal), vars (list): variable names (strings or ints), weighted (bool): a flag for weighted BDD (assigning arc weights). """ if not vars: vars = [i for i in range(1, N+1)] self.vars = vars self.weighted = weighted if weighted: self.weights = dict() self.layers = [set() for i in range(N)] self.T = node(NTRUE) self.F = node(NFALSE) self.layers.append(set([self.T, self.F])) self.max_id = -1 self.av_node_ids = deque() self.nodes = dict() self.var_pos = dict(zip(vars, [i for i in range(len(self.layers))])) self.nodes.update({NTRUE: self.T, NFALSE: self.F})
[docs] def width(self): """Returns BDD width (max no. nodes in a layer).""" return max([len(L) for L in self.layers])
# some helper functions def __len__(self): """Returns no of variables (layers, except the terminal).""" return len(self.vars)
[docs] def n(self, i): """Returns size of the i-th layer.""" return len(self.layers[i])
[docs] def p(self, a): """Returns position (0-based layer index) of variable ``a``.""" return self.var_pos[a]
[docs] def size(self): """Returns the size of the BDD (total no. of nodes).""" return len(self.nodes)-2
[docs] def new_node_name(self): """Returns a unique name for the next node to be created. Picks the one to re-cycle after some node deletions, when available; when it is not -- just takes the ``max_id+1`` (adjusting the ``max_id``) """ if len(self.av_node_ids) > 0: return self.av_node_ids.popleft() else: self.max_id += 1 return self.max_id
[docs] def dump_gv(self, layerCapt=True, x_prefix="x", node_labels=None): """Exports the BDD to the Graphviz format (``.dot``). Args: layerCapt (bool): whether to generate layer captions. x_prefix (str): a prefix to be shown for layer name (default 'x') Returns: a Digraph object (see ``graphviz`` package). """ g = Digraph() for i, layer in enumerate(self.layers): with g.subgraph(name="cluster_{}".format(i)) as s: for n in layer: if n.id == NTRUE: s.node(str(NTRUE), label="T",fillcolor = "orange",style="filled") elif n.id == NFALSE: s.node(str(NFALSE),label="F", fillcolor = "gray",style="filled") else: if node_labels is None: s.node( str(n.id) ) else: s.node(str(n.id), label=f"{n.id}: {node_labels[n.id]}") if i!=len(self.layers)-1: if layerCapt: s.attr(label=f"{x_prefix}{self.vars[i]}, sz={len(layer)}", color="lightgrey") else: s.attr(color="lightgrey") else: s.attr(color="white") for layer in self.layers: for n in layer: if not n.hi is None: if self.weighted: elabel = f"{self.weights[(n.id, n.hi.id, 'hi')]:.2f}" else: elabel = "" g.edge(str(n.id), str(n.hi.id), label=elabel) if not n.lo is None: if self.weighted: elabel = f"{self.weights[(n.id, n.lo.id, 'lo')]:.2f}" else: elabel = "" g.edge(str(n.id), str(n.lo.id), style="dashed", label=elabel) return g
[docs] def addnode(self, parent_node, arc="hi", node_id=None, edge_weight=0.0): """Adds a node and updates aux info as necessary. Args: parent_node (node): parent node pointer, arc (str): arc type, `hi` or `lo`, node_id: node name. If `None` is provided, the value is derived from :py:func:`BDD.BDD.new_node_name`, edge_weight(float): edge weight Returns: The created node as a :py:class:`BDD.node` object. """ if node_id is None: node_id = self.new_node_name() newnode = node(node_id) if parent_node is None: newnode.layer = 0 self.layers[0].add(newnode) elif arc == "hi": parent_node.hi = newnode self.layers[parent_node.layer+1].add(newnode) newnode.layer = parent_node.layer+1 if self.weighted: self.weights[(parent_node.id, parent_node.hi.id, 'hi')] = edge_weight elif arc == "lo": parent_node.lo = newnode self.layers[parent_node.layer+1].add(newnode) newnode.layer = parent_node.layer+1 if self.weighted: self.weights[(parent_node.id, parent_node.lo.id, 'lo')] = edge_weight else: print("ERROR: Wrong arc type: {}\n(allowed values are 'hi' and 'lo')".format(arc)) self.nodes.update({newnode.id: newnode}) return newnode
[docs] def swap_up(self, layer_idx): """Swaps the layer with the one immediately above it (in-place). (by *index* ! in-place operation) Args: layer_idx (int): number (0-based index) of the layer to be ''bubbled up'' Note: Operation takes O (`no-of-nodes in the upper layer`) time. Drops the layer being swapped. Then iterates through the nodes of the layer immediately above it and creates nodes as necessary (re-cycling new nodes to avoid redundancy: a quasi-reduced BDD keeps this property) """ assert layer_idx >= 1 # otherwise there's no layer to swap to assert layer_idx <= len(self.layers)-2 # can't swap-up the last layer new_nodes = dict() for n in self.layers[layer_idx]: del self.nodes[n.id] self.av_node_ids.append(n.id) self.layers[layer_idx] = set() prev_layer = self.layers[layer_idx-1] if not self.weighted: for F in prev_layer: # iterating through the nodes of the upper layer # save the pointers F_hi_hi = F.hi.hi F_hi_lo = F.hi.lo F_lo_hi = F.lo.hi F_lo_lo = F.lo.lo # creating the "hi"-node if not (F_hi_hi.id, F_lo_hi.id) in new_nodes.keys(): F_hi = self.addnode(F, "hi") F_hi.link(F_hi_hi, "hi") F_hi.link(F_lo_hi, "lo") new_nodes.update({(F_hi.hi.id, F_hi.lo.id): F_hi}) else: F_hi = new_nodes[(F_hi_hi.id, F_lo_hi.id)] # re-cycle the old node F.link(F_hi, "hi") # creating the "lo"-node if not (F_hi_lo.id, F_lo_lo.id) in new_nodes.keys(): F_lo = self.addnode(F, "lo") F_lo.link(F_hi_lo, "hi") F_lo.link(F_lo_lo, "lo") new_nodes.update({(F_lo.hi.id, F_lo.lo.id): F_lo}) else: F_lo = new_nodes[(F_hi_lo.id, F_lo_lo.id)] # re-cycle the old node F.link(F_lo, "lo") self.layers[layer_idx] = set(new_nodes.values()) else: # weighted version (with arc costs) old_weights = copy.copy(self.weights) for F in prev_layer: # save the pointers F_hi_hi = F.hi.hi F_hi_lo = F.hi.lo F_lo_hi = F.lo.hi F_lo_lo = F.lo.lo # calculating the necessary costs cF_hi = old_weights[(F.id, F.hi.id, "hi")] cF_lo = old_weights[(F.id, F.lo.id, "lo")] cF_hi_hi = old_weights[(F.hi.id, F.hi.hi.id, "hi")] cF_lo_hi = old_weights[(F.lo.id, F.lo.hi.id, "hi")] cF_hi_lo = old_weights[(F.hi.id, F.hi.lo.id, "lo")] cF_lo_lo = old_weights[(F.lo.id, F.lo.lo.id, "lo")] cH_min = np.round(min(cF_hi + cF_hi_hi, cF_lo + cF_lo_hi), decimals=WEIGHT_TOL) cH_hi = np.round((cF_hi + cF_hi_hi) - cH_min, decimals=WEIGHT_TOL) cH_lo = np.round((cF_lo + cF_lo_hi) - cH_min, decimals=WEIGHT_TOL) cL_min = np.round(min(cF_hi + cF_hi_lo, cF_lo + cF_lo_lo), decimals=WEIGHT_TOL) cL_hi = np.round((cF_hi + cF_hi_lo) - cL_min, decimals=WEIGHT_TOL) cL_lo = np.round((cF_lo + cF_lo_lo) - cL_min, decimals=WEIGHT_TOL) # ---------------------------------------------------- # creating the "hi"-node if not (F_hi_hi.id, F_lo_hi.id, cH_hi, cH_lo) in new_nodes.keys(): F_hi = self.addnode(F, "hi", edge_weight=cH_min) self.llink(F_hi, F_hi_hi, "hi", edge_weight=cH_hi) self.llink(F_hi, F_lo_hi, "lo", edge_weight=cH_lo) new_nodes.update({(F_hi.hi.id, F_hi.lo.id, cH_hi, cH_lo): F_hi}) else: F_hi = new_nodes[(F_hi_hi.id, F_lo_hi.id, cH_hi, cH_lo)] # re-cycle the old node self.llink(F, F_hi, "hi", edge_weight=cH_min) # ---------------------------------------------------- # creating the "lo"-node if (F_hi_lo.id, F_lo_lo.id, cL_hi, cL_lo) not in new_nodes.keys(): F_lo = self.addnode(F, "lo", edge_weight=cL_min) self.llink(F_lo, F_hi_lo, "hi", edge_weight=cL_hi) self.llink(F_lo, F_lo_lo, "lo", edge_weight=cL_lo) new_nodes.update({(F_hi_lo.id, F_lo_lo.id, cL_hi, cL_lo): F_lo}) else: # re-cycle the old node F_lo = new_nodes[(F_hi_lo.id, F_lo_lo.id, cL_hi, cL_lo)] self.llink(F, F_lo, "lo", edge_weight=cL_min) self.layers[layer_idx] = set(new_nodes.values()) # NOTE: old nodes in the source layer must be (physically) deleted # by the Python garbage collection # rename layers and update aux structures as necessary self.var_pos.update({ self.vars[layer_idx]: layer_idx - 1, self.vars[layer_idx - 1]: layer_idx }) v_1 = self.vars[layer_idx-1] self.vars[layer_idx-1] = self.vars[layer_idx] self.vars[layer_idx] = v_1
[docs] def sift(self, var, pos): """Sifts variable ``var`` (by name) to (0-based) position ``pos``. Uses :py:func:`BDD.sift` under the hood, in-place operation.""" assert pos >= 0 and pos < len(self.layers)-1 assert var in self.vars if pos < self.p(var): # this is a sift-up while self.p(var) > pos: self.swap_up(self.p(var)) elif pos == self.p(var): # a trivial case -- nothing to do return else: # this is a sift-down while pos > self.p(var): self.swap_up(self.p(var)+1)
[docs] def gsifts(self, with_whom, start_order=None): """Performs greedy sifts to align with ``with_whom``. Runs a simplistic implementation of Rudell'93 sifting alorithm extension to minimize \|A\|+\|B\| starts with aligning to ``self`` or ``start_order`` (if given) """ N = len(self.layers)-1 if start_order is None: start_order = self.vars else: for i in range(N): self.sift(start_order[i], i) for i in range(N): with_whom.sift(start_order[i], i) # now the pair is aligned, but possibly huge # let us see if we can compress it: processed_vars = set() while len(processed_vars) < N: i = 0 while self.vars[i] in processed_vars: i += 1 # skip processed variables best_pos = i active_var = self.vars[i] best_size = self.size() + with_whom.size() cur_size = best_size # try moving the var down for j in range(i+1, N): #if self.size()+with_whom.size() > cur_size*GSIFTS_MAX_INCREASE_MUL: # break self.swap_up(j) with_whom.swap_up(j) cur_size = self.size()+with_whom.size() if cur_size < best_size: best_size = cur_size best_pos = j # try moving the var up self.sift(active_var, i) with_whom.sift(active_var, i) cur_size = self.size()+with_whom.size() for j in reversed(range(1,i+1)): #if self.size()+with_whom.size() > cur_size*GSIFTS_MAX_INCREASE_MUL: # break self.swap_up(j) with_whom.swap_up(j) cur_size = self.size()+with_whom.size() if cur_size < best_size: best_size = cur_size best_pos = j-1 # now choose the best position for the variable self.sift(active_var, best_pos) with_whom.sift(active_var, best_pos) processed_vars.add(active_var)
# file manipulation procedures
[docs] def save(self, filename): """Saves a BDD to a text file. Args: filename (string): destination file name. Note: File format is as follows. - File header, the first two lines | N= `<no-of-layers>` | vars= `<comma-separated layer labels>` - Then, one node description = one line of the format: `id` : `hi` , `lo` where ``id`` is node's ID, ``hi`` and ``lo`` are IDs of the nodes corresponding to one- and zero- pointers of the node ``ID``. The procedure performs breadth-first search and just saves all the nodes. Three nodes have reserved ID values: - **Root** node: 0 - **True** terminal: -1 - **False** terminal: -2 """ S = deque() # a deque of node IDs to explore (FIFO) V = set() # set of visited (saved) nodes (by ID) for n in self.layers[0]: S.append(n.id) with open(filename, "w") as f: f.write("N={}\n".format(len(self.layers))) f.write("vars={}\n".format(', '.join([str(v) for v in self.vars]))) while len(S) > 0: n_id = S.popleft() # this is a new node if not (self.nodes[n_id].hi is None or self.nodes[n_id].hi is None): n_hi = self.nodes[n_id].hi.id n_lo = self.nodes[n_id].lo.id f.write("{}:{},{}\n".format(n_id, n_hi, n_lo)) if not (n_hi in V or n_hi in S or n_hi is None): S.append(n_hi) if not (n_lo in V or n_lo in S or n_lo is None): S.append(n_lo) else: f.write("{}:None,None\n".format(n_id)) V.add(n_id)
[docs] def load(self, filename, weighted=False): """Loads a BDD from a (text) file. Note: The format is described in :py:func:`save`. """ with open(filename, "r") as f: line = f.readline() while line[0] == "#" or line == "": line = f.readline() assert line[:2] == 'N=' N = int(line[2:]) assert N > 1 line = f.readline() assert line[:5] == 'vars=' line = line[5:].split(',') assert len(line) == N-1 # initialize the attributes self.layers = [set() for i in range(N)] self.nodes = dict() self.vars = [v.strip() for v in line] self.var_pos = dict(zip(self.vars, [i for i in range(N)])) self.weighted = weighted if weighted: self.weights = dict() line = f.readline() current_layer = 0 next_layer = set() while line: if weighted: id, line, weights = line.split(':') weights = weights.split(",") w_hi = float(weights[0]) w_lo = float(weights[1]) else: id, line = line.split(':') hi_id, lo_id = line.split(',') id = int(id) if id != NTRUE and id != NFALSE: hi_id = int(hi_id) lo_id = int(lo_id) if weighted: self.weights[(id, hi_id, "hi")] = w_hi self.weights[(id, lo_id, "lo")] = w_lo else: line = f.readline() continue if id in next_layer: current_layer += 1 next_layer = set() elif id not in self.nodes.keys(): self.addnode(None, node_id=id) if id != 0: print(f"WARNING: during load, just added an orphan node with id: {id}") if hi_id in next_layer: F_hi = self.nodes[hi_id] self.nodes[id].link(F_hi, "hi") else: if self.weighted: e_w = w_hi else: e_w = 0.0 F_hi = self.addnode(self.nodes[id], "hi", node_id=hi_id, edge_weight=e_w) next_layer.add(hi_id) if lo_id in next_layer: F_lo = self.nodes[lo_id] self.nodes[id].link(F_lo, "lo") else: if self.weighted: e_w = w_lo else: e_w = 0.0 F_lo = self.addnode(self.nodes[id], "lo", node_id=lo_id, edge_weight=e_w) next_layer.add(lo_id) if id not in self.nodes.keys(): if current_layer > 0: print("WARNING: a node with no source at layer {}". format(current_layer)) F = node(id, F_hi, F_lo) self.layers[current_layer].add(F) self.nodes.update({id: F}) if id == NTRUE: self.T = self.nodes[id] if id == NFALSE: self.F = self.nodes[id] line = f.readline() self.max_id = max([n.id for n in self.nodes.values()]) self.av_node_ids = deque([ i for i in range(1, self.max_id + 1) if i not in self.nodes.keys() ])
[docs] @classmethod def random(cls, N=5, p=0.5, weighted=False): """Generates a random BDD with `N` variables (N+1 layers). Layers are numbered with consecutive integers by default, starting with ``1``. Args: N (int): number of variables (results in N+1 layers, incl. T,F) p (float): tree size parameter 0 will generate a non-random exponential-sized diagram, 1 will result in a single node per layer. weighted (bool): whether to generate a weighted diagram. Returns: A generated :py:class:`BDD`. """ bdd = BDD(N, weighted=weighted) assert N > 1 bdd.addnode(parent_node=None) # create a root node for layer in range(1, N): # generate nodes (and edges) for the layer for n in bdd.layers[layer-1]: ch = np.random.randint(0, 1+C_MAX) cl = np.random.randint(0, 1+C_MAX) if rnd() <= p or len(bdd.layers[layer]) == 0: newnode = bdd.addnode(n, arc="hi", edge_weight=ch) else: bdd.link(n.id, rnd_choose(tuple(bdd.layers[layer])).id, "hi", edge_weight=ch) if rnd() <= p: newnode = bdd.addnode(n, arc="lo", edge_weight=cl) else: bdd.link(n.id, rnd_choose(tuple(bdd.layers[layer])).id, "lo", edge_weight=cl) # separately process the last layer for n in bdd.layers[-2]: bdd.link(n.id, rnd_choose(tuple(bdd.layers[-1])).id, "hi", edge_weight=np.random.randint(0, 1+C_MAX)) bdd.link(n.id, rnd_choose(tuple(bdd.layers[-1])).id, "lo", edge_weight=np.random.randint(0, 1+C_MAX)) return bdd
[docs] def profile(self): """Returns a BDD "profile" -- an (almost) BFS-ordered list of nodes. Note: This is used to determine, e.g., if the generated instance is unique. A 'profile' is a string of the format ``<vars>``:``<BFS-nodes>``, where: - ``<vars>`` : comma-separated variable names by layer; - ``<BFS-nodes>`` : comma-separated list of node-numbers (not IDs!) in a BFS-traverse order """ Q = deque() V = set() node_nums = dict() cur_node = 0 p = [] for n in self.layers[0]: node_nums.update({n.id: cur_node}) Q.append(n) p.append(str(cur_node)) cur_node += 1 p = [",".join(p) + ";"] while(len(Q) > 0): n = Q.popleft() for successor in [n.hi, n.lo]: if successor is None: continue if successor.id not in node_nums.keys(): node_nums.update({successor.id: cur_node}) cur_node += 1 p.append(str(node_nums[successor.id])) if successor.id not in V: Q.append(successor) V.add(successor.id) return p[0] + ",".join(p[1:])
[docs] def show(self, dir="tmp", filename="showfunc.dot", layerCapt=True, x_prefix="x", node_labels=None): """Shows the diagram on the screen (saving the auxiliary pdf). Generates a ``.dot`` file and compiles it to ``.pdf``. Requires ``graphviz`` (``dot`` program) Args: dir (str): directory to place files (default: ``tmp``) filename (str): `.dot` filename (default: ``showfunc.dot``) layerCapt (bool): whether to show layer caption (default: ``True``) x_prefix(str): a prefix for variable captions (default: ``x``) node_labels(dict): node labels to display (optional). """ self.dump_gv(layerCapt, x_prefix, node_labels).view(filename, directory=dir, cleanup=True)
[docs] def align_to(self, vars_order, inplace=False): """Revises the BDD to a given order. (with a series of consecutive sifts) """ if inplace: aligned = self else: aligned = copy.deepcopy(self) for i in range(len(vars_order)): aligned.sift(vars_order[i], i) return aligned
[docs] def OA_bruteforce(self, with_what, LR=False): """Finds the best (smallest) alignment with BDD ``with_what``. Uses brute-force enumeration of all possible orders. Args: with_what (BDD): target diagram. LR (bool): if set, layer-reduces each candidate BDD. Returns: A tuple of values - alternatives (int): number of alternative optima - A_aligned (list): of A-aligned (in each optimum) - B_aligned (list): of B-aligned (in respective optimum) """ # generate all possible permutations perms = iters.permutations(self.vars) min_size = math.inf A_aligned = [] B_aligned = [] alternatives = 0 for perm in perms: A_c = self.align_to(list(perm)) B_c = with_what.align_to(list(perm)) if LR: A_c.make_reduced() B_c.make_reduced() if min_size > A_c.size() + B_c.size(): A_aligned = [A_c] B_aligned = [B_c] min_size = A_c.size()+B_c.size() alternatives = 1 elif min_size == A_c.size() + B_c.size(): A_aligned.append(A_c) B_aligned.append(B_c) alternatives += 1 return [alternatives, A_aligned, B_aligned]
[docs] def is_reduced(self): """Checks if the BDD is quasi-reduced (no "redundant" nodes).""" checked_nodes = set() # quasi-reduced BDD for weighted diagrams must also check the # uniqueness of weights, not just uniqueness of (hi, lo) tuples. # Not a big issue, just a separate procedure (need to take care # of the precision, though). assert (not self.weighted), "ERROR: not implemented for weighted BDDs." for layer in range(len(self.layers)-2, -1, -1): for n in self.layers[layer]: if (n.hi.id, n.lo.id) in checked_nodes: return False else: checked_nodes.add((n.hi.id, n.lo.id)) return True
[docs] def make_reduced(self): """Makes the BDD quasi-reduced. Swaps each layer back and forth, starting from the last one, (which makes sure the results is quasi-reduced due to :py:func:`BDD.BDD.swap` implementation.) """ for i in range(len(self.vars)-1, 0, -1): self.swap_up(i) self.swap_up(i)
[docs] def rename_vars(self, ren_dict): """Helper function: renames variables. Args: ren_dict (dict): a dict of <= :py:attr:`BDD.N` labels in the form {``before``: ``after``} """ for v in ren_dict.keys(): assert v in self.vars, f"{v} is not in {self.vars}" new_vars = [(lambda v: ren_dict[v] if v in ren_dict.keys() else v)(v) for v in self.vars] self.vars = new_vars self.var_pos = dict(zip(self.vars, [i for i in range(len(self.vars))]))
[docs] def shuffle_vars(self): """Randomly renames the variables (shuffles their order).""" self.rename_vars( dict(zip(self.vars, np.random.permutation(self.vars))))
[docs] def is_aligned(self, to_what): """Checks if the BDD is aligned with ``to_what``.""" return np.array_equal(self.vars, to_what.vars)
# functions related to testing equivalence and finding truth tables
[docs] def get_value(self, x): """Finds the terminal node (T or F) -- an endpoint for ``x``. Returns a terminal node ID corresponding to the variable choices in ``x``. Args: x (dict): of {var_name: value}, where value is in {0,1}. Returns: A tuple of values - terminal (bool or `-1`): terminal node corresponding to the path implied by x, encoded as Boolean (or -1 if error) - cost (float): if the BDD is weighted, cost of the path. """ (node,) = self.layers[0] cost = 0.0 for i in range(len(x)): if x[self.vars[i]] == 0: if self.weighted: cost += self.weights[(node.id, node.lo.id, "lo")] node = node.lo elif x[self.vars[i]] == 1: if self.weighted: cost += self.weights[(node.id, node.hi.id, "hi")] node = node.hi else: print( "Wrong value ({}) for variabe {}: 0 or 1 expected".format( x[self.vars[i]], self.vars[i])) return -1 if node.id == self.T.id: if self.weighted: return [True, cost] else: return True elif node.id == self.F.id: if self.weighted: return [False, cost] else: return False else: print("Error not a True or False node reached!") print("node is {}, T is {}, F is {}".format( node.id, self.T.id, self.F.id)) return -1
[docs] def truth_table(self): """Returns a "truth table" for the encoded Boolean function. Returns: ``pandas`` dataframe: column numbers correspond to variable names with two additional columns: ``Terminal`` for the function value (of Boolean type) and ``Cost`` if the diagram is :py:attr:`BDD.weighted`. """ tt = [] ind = [] for x_num in range(2**len(self)): x = [int(j) for j in np.binary_repr(x_num, width=len(self.vars))] if not self.weighted: tt.append(x + [self.get_value(dict(zip(self.vars, x)))]) else: tt.append(x + self.get_value(dict(zip(self.vars, x)))) ind.append(np.binary_repr(x_num, width=len(self.vars))) if not self.weighted: return pd.DataFrame(tt, columns=[str(x) for x in self.vars] + ["Terminal"], index=ind) else: return pd.DataFrame(tt, columns=[str(x) for x in self.vars] + ["Terminal", "Cost"], index=ind)
[docs] def is_equivalent(self, B): """Checks if two BDDs (A and B) are equivalent. Equivalent in the sense that they define the same Boolean function and the corresponding path costs coincide. Checks: - if the corresponding truth tables coincide. - (if the BDD is weighted) if all paths have the same costs. Returns: A tuple of - result (bool): whether the diagrams are equivalent, - message (str): information regarding the difference, if found. """ assert self.weighted == B.weighted msg = None tt_self = self.truth_table() tt_B = B.truth_table() if self.weighted: tt_B = tt_B[[str(x) for x in self.vars] + ['Terminal', 'Cost']] else: tt_B = tt_B[[str(x) for x in self.vars] + ['Terminal']] if self.weighted: tt_B['new_index'] = [ c for c in tt_B.apply( lambda row: "".join([str(x) for x in row[:-2]]), axis=1) ] else: tt_B['new_index'] = [ c for c in tt_B.apply( lambda row: "".join([str(x) for x in row[:-1]]), axis=1) ] tt_B.set_index('new_index', inplace=True) for idx, row in tt_self.iterrows(): if row['Terminal'] != tt_B.loc[idx]['Terminal']: msg = "\nINFO: Discrepancy found. For x={}, value(self)={}, value(B)={}".format( idx, row['Terminal'], tt_B.loc[idx]['Terminal']) return [False, msg] if self.weighted: if abs(row['Cost'] - tt_B.loc[idx]['Cost']) >= WEIGHT_TOL: msg = "\nINFO: Discrepancy found. For x={}, cost(self)={}, cost(B)={}".format( idx, row['Cost'], tt_B.loc[idx]['Cost']) return [False, msg] return [True, msg]
[docs] def shortest_path(self): """Finds a shortest-path in O(`no-of-nodes`) time. Returns: dict: node labels (using node IDs as keys) corresponding to a shortest path from the current node to **True** terminal. Note: Expects the diagram to be :py:attr:`BDD.weighted`. (An unweighted version, in terms of 'number of hops', does not make any sense for a quasi-reduced BDD: the answer would be ``len(self)``.) """ assert self.weighted node_labels = {NTRUE: 0} for L in reversed(range(len(self.vars))): for node in self.layers[L]: node_label = None if node.hi.id in node_labels.keys(): node_label = node_labels[node.hi.id] + self.weights[ (node.id, node.hi.id, "hi")] if node.lo.id in node_labels.keys(): if node_label is not None: node_label = min( node_label, node_labels[node.lo.id] + self.weights[(node.id, node.lo.id, "lo")]) else: node_label = node_labels[node.lo.id] + self.weights[ (node.id, node.lo.id, "lo")] if node_label is not None: node_labels.update({node.id: node_label}) for node in self.nodes: if node not in node_labels.keys(): node_labels[node] = "∞" return node_labels
[docs] def simscore(self, B): """Computes a simscore with another diagram, `B` (see :py:func:`simscore`).""" return simscore(self.vars, B.vars, B.var_pos)
[docs]def intersect(A, B): """Produces an intersection BDD of ``A`` and ``B``. Notes: BDDs need to be order-associated (same vars in the same order). Returns: :py:class:`BDD` : an intersection BDD. """ assert A.vars == B.vars assert A.weighted == B.weighted weighted = A.weighted C = BDD(vars=A.vars, N=len(A.vars), weighted=weighted) root = C.addnode(parent_node=None) current_layer = {(NROOT, NROOT): root} # for all layers but the last one for i in range(len(A.vars)-1): new_layer = dict() for n in current_layer: niA, niB = n nA = A.nodes[niA] nB = B.nodes[niB] nC = current_layer[n] # yes-arc new_yes = (nA.hi.id, nB.hi.id) wA = A.weights[(nA.id, nA.hi.id, "hi")] wB = B.weights[(nB.id, nB.hi.id, "hi")] if new_yes in new_layer: C.link(nC.id, new_layer[new_yes].id, "hi", edge_weight=wA+wB) else: new_layer[new_yes] = C.addnode(nC, "hi", edge_weight=wA+wB) # no-arc new_no = (nA.lo.id, nB.lo.id) wA = A.weights[(nA.id, nA.lo.id, "lo")] wB = B.weights[(nB.id, nB.lo.id, "lo")] if new_no in new_layer: C.link(nC.id, new_layer[new_no].id, "lo", edge_weight=wA+wB) else: new_layer[new_no] = C.addnode(nC, "lo", edge_weight=wA+wB) current_layer = new_layer # a special case of the last layer for n in current_layer: niA, niB = n nA = A.nodes[niA] nB = B.nodes[niB] nC = current_layer[n] # yes-arc wA = A.weights[(nA.id, nA.hi.id, "hi")] wB = B.weights[(nB.id, nB.hi.id, "hi")] if nA.hi.id == NTRUE and nB.hi.id == NTRUE: C.link(nC.id, NTRUE, "hi", edge_weight=wA+wB) else: C.link(nC.id, NFALSE, "hi", edge_weight=wA+wB) # no-arc wA = A.weights[(nA.id, nA.lo.id, "lo")] wB = B.weights[(nB.id, nB.lo.id, "lo")] if nA.lo.id == NTRUE and nB.lo.id == NTRUE: C.link(nC.id, NTRUE, "lo", edge_weight=wA+wB) else: C.link(nC.id, NFALSE, "lo", edge_weight=wA+wB) return C