From d5dd5525bcffd283646ed9edbb0b7b9f3e077fb6 Mon Sep 17 00:00:00 2001 From: Jack Raymond <10591246+jackraymond@users.noreply.github.com> Date: Tue, 16 Dec 2025 13:47:56 -0800 Subject: [PATCH 1/3] Add an automorphism composite well matched to the SpinReversalTransformComposite. Add processor graph generators. --- dwave/experimental/automorphism/__init__.py | 1 + .../automorphism/automorphism_composite.py | 1026 +++++++++++++++++ examples/automorphism_composite.py | 163 +++ tests/test_automorphism_composite.py | 507 ++++++++ 4 files changed, 1697 insertions(+) create mode 100644 dwave/experimental/automorphism/automorphism_composite.py create mode 100644 examples/automorphism_composite.py create mode 100644 tests/test_automorphism_composite.py diff --git a/dwave/experimental/automorphism/__init__.py b/dwave/experimental/automorphism/__init__.py index 4cbf552..d77c96a 100644 --- a/dwave/experimental/automorphism/__init__.py +++ b/dwave/experimental/automorphism/__init__.py @@ -13,3 +13,4 @@ # limitations under the License. from .automorphism_generation import * +from .automorphism_composite import * diff --git a/dwave/experimental/automorphism/automorphism_composite.py b/dwave/experimental/automorphism/automorphism_composite.py new file mode 100644 index 0000000..34cdb23 --- /dev/null +++ b/dwave/experimental/automorphism/automorphism_composite.py @@ -0,0 +1,1026 @@ +# Copyright 2025 D-Wave +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains an automorphism composite that can make use of +either the Schreier Sims representation of the automorphism_generation.py +module, or a (generator, degree) format where the generator is a dictionary. +This generator should later be moved to dwave.preprocessing.composites to +work alongside SpinReversalTransformComposite - since it serves a closely +related purpose. + +This module also specifies the generators for defect free Chimera, Pegasus +and Zephyr graphs either in a format suitable for sampling by sequential +application of the generators, or in the form of strongest sets. This might +be later moved to dwave_networkx to sit alongside other code related to +processor-graph theoretic properties. +""" + + +from typing import Optional, Sequence, Any +from itertools import product + +import networkx as nx +import dimod +import numpy as np + +from dimod import ComposedSampler +from dwave_networkx import pegasus_graph, pegasus_coordinates +from dwave.experimental.automorphism import ( + schreier_rep, + sample_automorphisms as sample_automorphisms_u_vector, +) + +__all__ = [ + "AutomorphismComposite", + "chimera_generators", + "pegasus_generators", + "zephyr_generators", + "sample_automorphisms_listtuple", + "listtuple_to_arrays", +] + + +def listtuple_to_arrays( + listtuple: list[tuple[dict, int]], node_to_idx: dict +) -> list[list[np.ndarray[np.intp]]]: + """Unwrap (generator, degree) pairs into an array format. + + Note, the array format is accepted by the + `automorphism_generatation.sample_automorphisms`. + It is more general than the generator pair format, so an + inverse mapping is not possible in general. + + Args: + listtuple: A list of generator, degree pairs. Each generator + is defined by a dictionary. + node_to_idx: A mapping from node labels of the graph to + contiguous integers labeled from zero. + + Output: + A list of lists of numpy arrays, suitable for use by + `automorphism.sample_automorphisms` + """ + + nodeset = set(node_to_idx.keys()) + listtuple = [ + (prune_by_nodeset(g, nodeset), n) for g, n in listtuple + ] # Compress listtuple w.r.t. mapped nodes. + listtuple = [p for p in listtuple if p[0]] # Remove anything redundant + llarray = [ + [np.arange(len(node_to_idx), dtype=np.intp) for _ in range(n - 1)] + for _, n in listtuple + ] + for larray, (g, degree) in zip(llarray, listtuple): + g = {node_to_idx[k]: node_to_idx[v] for k, v in g.items()} + for k, v in g.items(): + larray[0][k] = v # First element is standard generator + for idx in range(degree - 2): + parray = larray[idx] # same generator applied up to n-1 times. + for k, v in g.items(): + larray[idx + 1][parray[k]] = parray[v] + return llarray + + +def chimera_generators( + m: int, + n: Optional[int] = None, + t: int = 4, + generator_type: str = "redundant", +) -> list[tuple[dict, int]]: + """Return generators for Chimera graphs. + + Args: + m: number or rows + n: number of columns + t: tile parameter + generator_type: The type of generator, either `redundant`, + to specify a set suitable for sequential sampling, or + 'strongest' to generate a strongest set. The size of the + group is given by the product of degrees in the redunant + representation if m!=n or t<2. And is given by the product + divided by 2 for the case t>1 and m=n. + + Returns: + A list of tuples. Each tuple includes a generator, and the + degree of the generator. + """ + + if n is None: + n = m + if n == m: + # Verticals to horizontals + diagonal = [ + ( + { + (i, j, u, k): (i, j, 1 - u, k) + for i in range(m) + for j in range(n) + for u in range(2) + for k in range(t) + }, + 2, + ) + ] + else: + diagonal = [] + if m > 1: + vertical = [ + ( + { + (i, j, u, k): (m - 1 - i, j, u, k) + for i in range(m) + for j in range(n) + for u in range(2) + for k in range(t) + }, + 2, + ) + ] + else: + vertical = [] + if n > 1 and (m != n or generator_type == "redundant"): + horizontal = [ + ( + { + (i, j, u, k): (i, n - 1 - j, u, k) + for i in range(m) + for j in range(n) + for u in range(2) + for k in range(t) + }, + 2, + ) + ] + else: + horizontal = [] + # Shore Permutations correlated by row/column + # - must be ordered: (1,..,k) then (1,,k-1) .. (1,2) etc. fully mixes (permutation) + if generator_type == "redundant": + vert_shores = [ + ( + { + (i, j, 0, k): (i, j, 0, (k + 1) % kp) + for i in range(m) + for k in range(kp) + }, + kp, + ) + for kp in range(t, 1, -1) + for j in range(n) + ] + horiz_shores = [ + ( + { + (i, j, 1, k): (i, j, 1, (k + 1) % kp) + for j in range(n) + for k in range(kp) + }, + kp, + ) + for kp in range(t, 1, -1) + for i in range(m) + ] + else: + vert_shores = [ + ( + { + (i, j, 0, k + o1): (i, j, 0, k + o2) + for i in range(m) + for o1, o2 in [(0, 1), (1, 0)] + }, + 2, + ) + for k in range(t, 1, -1) + for j in range((n + 1) // 2) + ] + if m != n: + horiz_shores = [ + ( + { + (i, j, 1, k + o1): (i, j, 1, k + o2) + for j in range(n) + for o1, o2 in [(0, 1), (1, 0)] + }, + 2, + ) + for k in range(t, 1, -1) + for i in range((m + 1) // 2) + ] + else: + horiz_shores = [] + return diagonal + vertical + horizontal + vert_shores + horiz_shores + + +def _flip_zephyr(u: int, w: int, k: int, j: int, z: int, orient: bool, m: int) -> tuple: + if orient == True: + ue = u + else: + ue = 1 - u + return ( + u, + ue * (2 * m - w) + (1 - ue) * w, + k, + ue * j + (1 - ue) * (1 - j), + ue * z + (1 - ue) * (m - 1 - z), + ) + + +def zephyr_generators( + m: int, t: int = 4, generator_type="redundant" +) -> list[tuple[dict, int]]: + """Create generators for zephyr graphs + + Args: + m: Grid parameter for the Zephyr lattice. + t: Tile parameter for the Zephyr lattice. + generator_type: The type of generator, either `redundant`, + to specify a set suitable for sequential sampling, or + 'strongest' to generate a strongest set. The size of the + group is given by the product of degrees in the redundant. + + Returns: + A list of tuples. Each tuple includes a generator, and the + degree of the generator. + """ + + diagonal = [ + ( + { + (u, w, k, j, z): (1 - u, w, k, j, z) + for u in range(2) + for w in range(2 * m + 1) + for k in range(t) + for j in range(2) + for z in range(m) + }, + 2, + ) + ] + vertical = [ + ( + { + (u, w, k, j, z): _flip_zephyr(u, w, k, j, z, True, m) + for u in range(2) + for w in range(2 * m + 1) + for k in range(t) + for j in range(2) + for z in range(m) + }, + 2, + ) + ] + if generator_type != "strongest": + horizontal = [ + ( + { + (u, w, k, j, z): _flip_zephyr(u, w, k, j, z, False, m) + for u in range(2) + for w in range(2 * m + 1) + for k in range(t) + for j in range(2) + for z in range(m) + }, + 2, + ) + ] + else: + horizontal = [] + # Shore Permutations correlated by row/column + # - must be ordered: (1,..,k) then (1,,k-1) .. (1,2) etc. fully mixes (permutation) + if generator_type != "strongest": + shores = [ + ( + { + (u, w, k, j, z): (u, w, (k + 1) % kp, j, z) + for z in range(m) + for j in range(2) + for k in range(kp) + }, + kp, + ) + for kp in range(t, 1, -1) + for u in range(2) + for w in range(2 * m + 1) + ] + else: + shores = [ + ( + { + (u, w, k, j, z): (u, w, (k + 1) % kp, j, z) + for z in range(m) + for j in range(2) + for k in range(kp) + }, + kp, + ) + for kp in range(t, 1, -1) + for u in range(1) # one orientation suffices + for w in range(m + 1) # half way suffices + ] + + return diagonal + vertical + horizontal + shores + + +def pegasus_generators( + m: int, generator_type: str = "redundant" +) -> list[tuple[dict, int]]: + """Create generators for pegasus (fabric_only, for m>1) + + Args: + m: Grid parameter for the Pegasus lattice. + generator_type: The type of generator, either `redundant`, + to specify a set suitable for sequential sampling, or + 'strongest' to generate a strongest set. The size of the + group is given by the product of degrees in the redundant. + + Returns: + A list of tuples. Each tuple includes a generator, and the + degree of the generator. + """ + diagonal = [ + ( + { + (u, w, k, z): (1 - u, m - 1 - w, 11 - k, m - 2 - z) + for u in range(2) + for w in range(m) + for k in range(12) + for z in range(m - 1) + }, + 2, + ) + ] + # Odd-pairs + odd_pairs = [ + ( + { + (u, w, koff + kin, z): (u, w, koff + (kin + 1) % 2, z) + for z in range(m - 1) + for kin in range(2) + }, + 2, + ) + for koff in range(0, 12, 2) + for u in range(2) + for w in range(m) + ] + + # Constrain to the standard (fabric_only) representation (the largest component): + nonfabric = { + (u, m - 1, k, z) for u in range(2) for k in range(10, 12) for z in range(m - 1) + } | {(u, 0, k, z) for u in range(2) for k in range(2) for z in range(m - 1)} + fabric = { + c for c in product(range(2), range(m), range(12), range(m - 1)) + }.difference(nonfabric) + pruned_generators = [] + for g in diagonal + odd_pairs: + new = prune_by_nodeset(g[0], nodeset=fabric) + if new: + pruned_generators.append((new, g[1])) + + return pruned_generators + + +def sample_automorphisms_listtuple(generators_listtuple, *, prng=None, mapping=None): + """Sample an automorphism by ordered application of generators + + An abelian group allows for generators to be sampled sequentially to fairly + sample the space of all automorphisms. + Generators for Chimera, Pegasus and Zephyr graphs are not abelian, but can + be ordered to allow for similar fair sampling. See the respective generator methods + of this module. + For more general cases it is recommended to fairly sample by the Screier-Sims + method of automorphism_generation.py module. + + Generators are specified as dictionaries (1:1 mappings), with some known + degree e.g. {2: 4, 4: 2}, degree 2 + + Args: + generators_listtuple: A list of generator, generator degree tuples, to be + sampled sequentially. + prng: A numpy pseudo random number generator, or seed used to initialize + the PRNG. + mapping: An initial permutation to which generators are applied, by default + the identity. Note the mapping is modified in place. + """ + + prng = np.random.default_rng(prng) + if mapping is None: + vars_set = set(n for g in generators_listtuple for n in g[0].keys()) + mapping = {n: n for n in vars_set} + for generator, degree in generators_listtuple: + for _ in range(prng.integers(degree)): + mapping.update({k: mapping[v] for k, v in generator.items()}) + + return mapping + + +def prune_by_nodeset(generator_dict, nodeset): + """Restrict a generator to a subspace defined by nodeset + + Args: + generator_dict: An automorphism as a dictionary. + nodeset: A list of variables (in the subgraph). + + Returns: + a generator restricted to the subspace defined by the nodeset, + or an empty dictionary if the nodeset is not closed under the + generator. + + .. note:: The generator keys need only specify a subset of + variables in the nodeset. Variables not specified in the generator + (dictionary format) are assumed to map 1:1. + + """ + + nodeset = nodeset.intersection(set(generator_dict.keys())) + if not nodeset: + return {} + reduced_nodeset = nodeset & set({generator_dict[n] for n in nodeset}) + while reduced_nodeset != nodeset: + nodeset = reduced_nodeset + reduced_nodeset = nodeset & set({generator_dict[n] for n in nodeset}) + return {n: generator_dict[n] for n in nodeset} + + +def prune_by_edgeset(generator, edgeset): + """Restrict a generator to a subspace defined by an edgeset + + Args: + generator_dict: An automorphism as a dictionary. + nodeset: A list of edges (in the subgraph). + + Returns: + a generator restricted to the subspace defined by the edgeset, + or an empty dictionary if the edgeset is not closed under the + generator. + + .. note:: The generator keys need only specify a subset of + variables in the edges. Variables not specified in the generator + (dictionary format) are assumed to map 1:1. + """ + + nodeset = {n for e in edgeset for n in e}.intersection(set(generator.keys())) + edgeset = { + frozenset(e) for e in edgeset + } # Note edges can be external to elements in the generator. + generated_edgeset = edgeset.intersection( + {frozenset(generator[i] for i, j in edgeset)} + ) + # If an edge is absent we must recursively remove the associated nodes: + bad_nodes = {n for e in generated_edgeset ^ edgeset for n in e} + return prune_by_nodeset(generator, nodeset.difference(bad_nodes)) + + +class AutomorphismComposite(ComposedSampler): + """Composite for applying automorphic preprocessing. + + The ordering of variables is permuted subject to a given pattern of + generators or an inherited sampler structure. + This can be useful to mitigate for noise, control errors or other + symmetry-breaking features of the child sampler. + + .. note:: + If you are configuring an anneal schedule, be mindful that this + composite does not recognize the ``initial_state`` parameter + used by dimod's :class:`~dwave.system.samplers.DWaveSampler` for + reverse annealing (composites do not generally process all keywords + of child samplers) and does not permute any of the configured initial + states. + .. note:: + If the generators or graph are not specified, it is by default inferred + from the graph of the structured child solver. Attention in the choice + of generators, or explicit specification of a target subgraph, can + enhance the variety of automorphisms available. + When the child graph is not structured it is assumed unconstrainted, + and the space of automorphisms is assumed to be that of a fully + connected graph (all permutations). + + Args: + sampler: A `dimod` sampler object. + + seed: As passed to :func:`numpy.random.default_rng`. + + generators_listtuple: A set of permutations compatible with the child + strcture. A list where each element is a tuple of generator (dict) + and integer cycle length. This allows for uniform sampling of some + graphs. If generators_listtuple is None (by default) a schreir_context + is used. + + generators_u_vector: A dwave.experimental.SchreirContext object. This allows + fair sampling of any graph. The SchreirContext for an arbitrary + graph can be created using dwave.experimental. If a schreir_context + is None, and generators_listtuple is None, then the schreir_context + compatible with the child sampler structure is created by default. + If both a generators_listtuple, and a schreir_context are provided, + the generators_listtuple is ignored. + + G: A `nx.Graph` from which automorphisms are to be inferred when neither + generators_listtuple or generators_u_vector are provided. If G is + not specified, the child sampler must be a `dimod.StructuredSampler` + and the graph is assumed from the given nodelist and edgelist. + + idx_to_node: A mapping from variables to sequential integer labels, required + only when automorphisms are determined from `generators_u_vector`. + + Examples: + This example composes a dimod ExactSolver sampler with the AutomorphismComposite + then uses it to sample an Ising problem. + + >>> from dimod import ExactSolver + >>> from dwave.preprocessing.composites import AutomorphismComposite + >>> base_sampler = ExactSolver() + >>> generators_listtuple = [({'a': 'b', 'b':'a'}, 2)] + >>> composed_sampler = AutomorphismComposite(base_sampler, generators_listtuple) + ... # Sample an Ising problem + >>> response = composed_sampler.sample_ising({'a': -0.5, 'b': 1.0}, {('a', 'b'): -1}) + >>> response.first.sample + {'a': -1, 'b': -1} + + References + ---------- + .. [#TODO] + """ + + _children: list[dimod.core.Sampler] + _parameters: dict[str, Sequence[str]] + _properties: dict[str, Any] + + def __init__( + self, + child: dimod.core.Sampler, + *, + seed=None, + generators_listtuple: list[tuple[dict, int]] = None, + generators_u_vector: list[list[np.ndarray[np.intp]]] = None, + G: nx.Graph = None, + idx_to_node: dict = None, + ): + self._child = child + self.rng = np.random.default_rng(seed) + self.generators_listtuple = generators_listtuple + self.generators_u_vector = generators_u_vector + self.idx_to_node = idx_to_node + if ( + generators_u_vector is None + and generators_listtuple is None + and (G is not None or isinstance(child, dimod.Structured)) + ): + if G is None: + G = nx.Graph() + G.add_nodes_from(child.nodelist) + G.add_edges_from(child.edgelist) + if self.idx_to_node is None: + self.idx_to_node = {idx: n for idx, n in enumerate(G.nodes)} + else: + if set(G.nodes()) != set(self.idx_to_node.values()): + raise ValueError( + "idx_to_node values are incompatible with G node labels" + ) + if set(range(G.number_of_nodes())) != set(self.idx_to_node.keys()): + raise ValueError("idx_to_node should have contiguous integer keys") + result = schreier_rep( + nx.relabel_nodes(G, {n: idx for idx, n in self.idx_to_node.items()}), + num_samples=G.number_of_nodes(), + ) + self.generators_u_vector = result.u_vector + elif generators_u_vector is not None: + if idx_to_node is None: + raise ValueError( + "idx_to_node must be specified in combination with generators_u_vector" + ) + else: + # Could check contiguous keys and match of values to structured sampler, user errors + pass + else: + pass # Unconstrained (permutations) are assumed + + @property + def children(self) -> list[dimod.core.Sampler]: + try: + return self._children + except AttributeError: + pass + + self._children = children = [self._child] + return children + + @property + def parameters(self) -> dict[str, Sequence[str]]: + try: + return self._parameters + except AttributeError: + pass + + self._parameters = parameters = dict(automorphism_variables=tuple()) + parameters.update(self._child.parameters) + return parameters + + @property + def properties(self) -> dict[str, Any]: + try: + return self._properties + except AttributeError: + pass + + self._properties = dict(child_properties=self._child.properties) + return self._properties + + class _SampleSets: + def __init__(self, samplesets: list[dimod.SampleSet]): + self.samplesets = samplesets + + def done(self) -> bool: + return all(ss.done() for ss in self.samplesets) + + @staticmethod + def _reorder_variables( + sampleset: dimod.SampleSet, order: dimod.variables.Variables + ) -> dimod.SampleSet: + """Return a sampleset with the given variable order.""" + if sampleset.variables == order: + return sampleset + + # .index(...) is O(1) for dimod's Variables objects so this isn't too bad + sampleset_order = sampleset.variables + reorder = [sampleset_order.index(v) for v in order] + + return dimod.SampleSet.from_samples( + (sampleset.record.sample[:, reorder], order), + sort_labels=False, + vartype=sampleset.vartype, + info=sampleset.info, + **sampleset.data_vectors, + ) + + @dimod.decorators.nonblocking_sample_method + def sample( + self, + bqm: dimod.BinaryQuadraticModel, + *, + mappings: Optional[list[dict]] = None, + num_automorphisms: Optional[int] = None, + **kwargs, + ): + """Sample from the binary quadratic model. + + Args: + bqm: Binary quadratic model to be sampled from. + + mappings: + A list of mappings in the form of dictionaries. + Each dictionary defines a permutation over a + subset of variables. If mappings is provided and + length 0, then a sampleset is returned subject + to no automorphism. + + num_automorphisms: + When mappings is not given, specifies teh number of mappings to + apply (create). If mappings is provided, it + is inferred as :code:`len(mappings)`, otherwise it is defaulted + to 1. + A value of ``0`` will result in sampling of an unmapped problem. + If mappings is None the mappings are generated randomly using the + `generators_listtuple` class variable. + + Returns: + A sample set. Note that for a sampler that returns ``num_reads`` samples, + the sample set will contain ``num_reads*num_automorphisms`` samples. + + Examples: + This example runs 10 automorphisms of a two variable QUBO problem. + + >>> from dimod import ExactSolver + >>> from dwave.preprocessing.composites import EmbeddingComposite + >>> base_sampler = ExactSolver() + >>> composed_sampler = EmbeddingComposite(base_sampler) + ... + >>> Q = {('a', 'a'): -1, ('b', 'b'): -1, ('a', 'b'): 2} + >>> response = composed_sampler.sample_qubo(Q, + ... num_automorphisms=10) + >>> len(response) + 40 + """ + sampler = self._child + + if num_automorphisms is None: + if mappings is not None: + num_automorphisms = len(mappings) + else: + num_automorphisms = 1 + + # No SRTs, so just pass the problem through + if not num_automorphisms or not bqm.num_variables: + sampleset = sampler.sample(bqm, **kwargs) + # yield twice because we're using the @nonblocking_sample_method + yield sampleset # this one signals done()-ness + yield sampleset # this is the one actually used by the user + return + + # Check or generate mappings. + if mappings is not None: + # Given permutation + if len(mappings) != num_automorphisms: + raise ValueError( + "len(mappings) should match num_automorphisms when not None" + ) + elif self.generators_listtuple is not None: + # Generator compatible (uniform random) permutation on all variables + mapping = {v: v for v in bqm.variables} + mappings = [ + sample_automorphisms_listtuple( + self.generators_listtuple, prng=self.rng, mapping=mapping + ) + for _ in range(num_automorphisms) + ] + elif self.generators_u_vector is not None: + arrays = sample_automorphisms_u_vector( + self.generators_u_vector, num_samples=num_automorphisms + ) + mappings = [ + {self.idx_to_node[k]: self.idx_to_node[v] for k, v in enumerate(array)} + for array in arrays + ] + else: + # Random permutation (no generator constraint) on all variables + var_list = list(bqm.variables) + mappings = [] + for n in range(num_automorphisms): + self.rng.shuffle(var_list) + mappings.append({v1: v2 for v1, v2 in zip(bqm.variables, var_list)}) + + samplesets: list[dimod.SampleSet] = [] + for mapping in mappings: + _bqm = bqm.copy() # Costly, but assumed not to be a bottleneck + _bqm.relabel_variables(mapping) + samplesets.append(sampler.sample(_bqm, **kwargs)) + # Yield a view of the samplesets that reports done()-ness + yield self._SampleSets(samplesets) + + # Relabel variables on samplesets + for mapping, ss in zip(mappings, samplesets): + ss.relabel_variables({v: k for k, v in mapping.items()}) + + # Reorder the variables of all the returned samplesets to match our + # original BQM + samplesets = [self._reorder_variables(ss, bqm.variables) for ss in samplesets] + + if num_automorphisms == 1: + # If one sampleset, return full information + # (info returned in full) + yield samplesets[0] + else: + # finally combine all samplesets together + yield dimod.concatenate(samplesets) + + +if __name__ == "__main__": + from dwave_networkx import chimera_graph, zephyr_graph, pegasus_graph + from networkx import relabel_nodes + from dimod import ExactSolver + from itertools import product + + # Chimera_cell: + from dwave.system.testing import MockDWaveSampler + from dwave_networkx import ( + chimera_coordinates, + zephyr_coordinates, + pegasus_coordinates, + ) + + try: + import pynauty # Do tests if available, can hardcode values for tests. + + def get_permutations(Gembeddable, get_generators=True): + """From latqa""" + Gpn = pynauty.Graph(Gembeddable.number_of_nodes()) + node_index_dict = {n: i for i, n in enumerate(Gembeddable.nodes)} + for u, v in Gembeddable.edges: + Gpn.connect_vertex(node_index_dict[u], node_index_dict[v]) + # Gpn.connect_vertex(node_index_dict[v], node_index_dict[u]) + X = pynauty.autgrp(Gpn) + if get_generators: + return np.array(X[0]), { + n: i for i, n in node_index_dict.items() + } # automorphism generators + else: + return X[1] + + print("Ring") + for L in range(3, 6): + graph = nx.from_edgelist({(i, (i + 1) % L) for i in range(L)}) + g1 = get_permutations(graph) + print(g1) + aut_group_size = get_permutations(graph, False) + print("pynauty size: ", aut_group_size) + print("Clique") + for L in range(3, 6): + graph = nx.from_edgelist({(i, j) for i in range(L) for j in range(i)}) + g1 = get_permutations(graph) + print(g1) + aut_group_size = get_permutations(graph, False) + print("pynauty size: ", aut_group_size) + print("BiClique (L,L)") + for L in range(1, 6): + graph = nx.from_edgelist({(i, L + j) for i in range(L) for j in range(L)}) + g1 = get_permutations(graph) + print(L, g1) + aut_group_size = get_permutations(graph, False) + print("pynauty size: ", aut_group_size) + print() + print("Pegasus") + dnxg, my_g = pegasus_graph, pegasus_generators + for m in range(2, 5): + graph = dnxg(m=m) + g1 = get_permutations(graph) + g2 = my_g(m=m) + print(f"Strongest set m={m}", len(g1[0]), len(g2)) + rep = schreier_rep( + nx.relabel_nodes(graph, {n: idx for idx, n in enumerate(graph.nodes())}) + ) + print( + "Size of rep", + rep.num_automorphisms, + np.prod([d for _, d in g2]), + rep.num_automorphisms <= np.prod([d for _, d in g2]), + ) + aut_group_size = get_permutations(graph, False) + print("pynauty size: ", aut_group_size) + + print("Zephyr") + dnxg, my_g = zephyr_graph, zephyr_generators + for m in range(1, 4): + for t in range(1, 5): + graph = dnxg(m=m, t=t) + g1 = get_permutations(dnxg(m=m, t=t)) + g2 = my_g(m=m, t=t) + print(f"Strongest set m={m} t={t}", len(g1[0]), len(g2)) + rep = schreier_rep(graph) + print( + "Size of rep", + rep.num_automorphisms, + np.prod([d for _, d in g2]), + rep.num_automorphisms <= np.prod([d for _, d in g2]), + ) + aut_group_size = get_permutations(graph, False) + print("pynauty size: ", aut_group_size) + + print() + print("Chimera") + dnxg, my_g = chimera_graph, chimera_generators + for m in range(1, 4): + for n in range(1, 4): + for t in range(1, 7): + graph = dnxg(m=m, n=n, t=t) + g1 = get_permutations(graph) + g2 = my_g(m=m, n=n, t=t) + rep = schreier_rep(graph) + print(f"Strongest set m={m} n={n}, t={t}", len(g1[0]), len(g2)) + print( + "Size of rep (Sebastian)", + rep.num_automorphisms, + "Mine", + np.prod([d for _, d in g2]), + rep.num_automorphisms <= np.prod([d for _, d in g2]), + ) + aut_group_size = get_permutations(graph, False) + print("pynauty size: ", aut_group_size) + + if len(g1[0]) != len(g2): + print("Problem") + if len(g1[0]) < 10: + print(g1) + print(g2) + print() + + raise ValueError("DEBUG") + except: + raise ValueError("pynauty wanted") + # Test various defect-free dwave_networkx generators: + topology_type = "chimera" + # topology_type = "zephyr" + # topology_type = "pegasus" + if topology_type == "chimera": + topology_shape = [3, 2, 4] + make_graph = chimera_graph + graph_generators = chimera_generators + coord_transform = chimera_coordinates(*topology_shape).chimera_to_linear + shapes = product([1, 3], [1, 3]) + elif topology_type == "zephyr": + topology_shape = [3, 2] + make_graph = zephyr_graph + graph_generators = zephyr_generators + coord_transform = zephyr_coordinates(*topology_shape).zephyr_to_linear + shapes = product([1, 3], [1, 3]) + elif topology_type == "pegasus": + topology_shape = [2] + make_graph = pegasus_graph + graph_generators = pegasus_generators + coord_transform = pegasus_coordinates(*topology_shape).pegasus_to_linear + shapes = [[2], [3]] + else: + raise ValueError("Unknown topology") + base_sampler = MockDWaveSampler( + topology_type=topology_type, topology_shape=topology_shape + ) + G = base_sampler.to_networkx_graph() + # draw_chimera(G, with_labels=True) + # G2 = relabel_nodes(G, {n: chimera_coordinates(*topology_shape).linear_to_chimera(n) for n in G.nodes()}) + # draw_chimera(G2, with_labels=True) + # plt.show() + + Gedges = set(tuple(sorted(e)) for e in G.edges()) + Gnodes = set(G.nodes()) + generators = graph_generators(*topology_shape) + generators = [ + ({coord_transform(k): coord_transform(v) for k, v in g[0].items()}, g[1]) + for g in generators + ] + pruned_generators = [] + for g in generators: + new = prune_by_nodeset(g[0], nodeset=Gnodes) + if new: + pruned_generators.append((new, g[1])) + assert set(new.keys()).issubset(Gnodes) + composed_sampler = AutomorphismComposite( + base_sampler, generators_listtuple=pruned_generators + ) + + composed_sampler2 = AutomorphismComposite(base_sampler, G=G) + print("u_vector", composed_sampler2.generators_u_vector) + print("listtuple", generators) + node_to_idx = {v: k for k, v in composed_sampler2.idx_to_node.items()} + alt_uvectors = listtuple_to_arrays(generators, node_to_idx) + print("listtuple as u_vector", alt_uvectors) + print( + "SchreierSims", + len(composed_sampler2.generators_u_vector), + [len(v) for v in composed_sampler2.generators_u_vector], + ) + print("Mine", len(alt_uvectors), [len(v) for v in alt_uvectors]) + bqm = dimod.BinaryQuadraticModel("SPIN").from_ising({}, {e: -1 for e in G.edges}) + for cs in [composed_sampler, composed_sampler2]: + response = composed_sampler.sample(bqm, num_automorphisms=2) + print(response.first.sample) + + for shape in shapes: + if len(shape) == 2: + m, t = shape + graph_params = {"m": shape[0], "t": shape[1]} + else: + m = shape + graph_params = {"m": shape[0]} + G = make_graph(**graph_params, coordinates=True) + Gedges = set(tuple(sorted(e)) for e in G.edges()) + Gnodes = set(G.nodes()) + generators = graph_generators(**graph_params) + pruned_generators = [] + for g in generators: + new = prune_by_nodeset(g[0], nodeset=Gnodes) + if new: + pruned_generators.append((new, g[1])) + assert set(new.keys()).issubset(Gnodes) + + for g, ol in pruned_generators: + if set(g.keys()) != set(g.values()): + print(g.keys()) + print(g.values()) + assert set(g.keys()) == set(g.values()) + Gn = relabel_nodes(G, g) + Gnedges = set(tuple(sorted(e)) for e in G.edges()) + if set(Gn.nodes()) != Gnodes: + print(g, Gnodes) + print("Gnodes Gn.nodes difference", Gnodes.difference(set(Gn.nodes()))) + print("Gnodes, Gnedges difference", set(Gn.nodes()).difference(Gnedges)) + assert set(Gn.nodes()) == Gnodes + if Gnedges != Gedges: + print(sorted(set(tuple(sorted(e)) for e in Gedges))) + print(sorted(set(G.edges()))) + print(Gnedges.difference(Gedges)) + print(Gedges.difference(Gnedges)) + assert Gnedges == Gedges + # assert list(Gn.edges()) != list(G.edges()) + + random_perm = sample_automorphisms_listtuple(pruned_generators) + assert set(random_perm.keys()) == Gnodes + assert set(random_perm.values()) == Gnodes + Gn = relabel_nodes(G, random_perm) + if set(Gn.nodes()) != Gnodes: + print("Problem") + print(Gnodes.difference(set(Gn.nodes()))) + print(set(Gn.nodes()).difference(Gnedges)) + + assert set(Gn.nodes()) == Gnodes + Gnedges = set(tuple(sorted(e)) for e in G.edges()) + assert Gnedges == Gedges diff --git a/examples/automorphism_composite.py b/examples/automorphism_composite.py new file mode 100644 index 0000000..539802a --- /dev/null +++ b/examples/automorphism_composite.py @@ -0,0 +1,163 @@ +# Copyright 2025 D-Wave +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +An example to show noise mitigation by use of automorphic averaging +""" + +import os +import argparse + +import pickle + +import networkx as nx +import numpy as np +import matplotlib.pyplot as plt + +from dwave.system import DWaveSampler, FixedEmbeddingComposite +from dwave.system.testing import MockDWaveSampler # For code testing +from dwave.preprocessing import SpinReversalTransformComposite +from minorminer.subgraph import find_subgraph + +from dwave.experimental.automorphism.automorphism_composite import AutomorphismComposite + + +def main( + solver=None, + use_cache: bool = True, + L: int = 4, + subgraph_timeout: int = 60, + srts: bool = True, +): + """An example to demonstrate noise mitigation by averaging on automorphic embeddings + + Examine forward-anneal statistics for a fully-frustrated square-lattice Ising model on a Torus. + The expected energy per edge is identical up to symmetry breaking + effects (noise) resulting from calibration and bath non-idealities. Averaging automorphisms + enhances the signal to noise ratio, resulting in improved statistics. + + Args: + + """ + qpu = DWaveSampler(solver=solver, profile='defaults') + + J = { + ((x, y), ((x + uv[0]) % L, (y + uv[1]) % L)): -1 + 2 * (x % 2 and uv[0] == 0) + for x in range(L) + for y in range(L) + for uv in [(0, 1), (1, 0)] + } + emb = find_subgraph( + J.keys(), qpu.edgelist, as_embedding=True, timeout=subgraph_timeout + ) + if not emb: + raise ValueError( + "For the selected solver, an embedding was not found in 60 seconds. Modifying the solver_name." + "size of lattice L or timeout in subgraph search should resolve this issue" + ) + + sampler0 = FixedEmbeddingComposite(qpu, emb) + samplerA = AutomorphismComposite( + FixedEmbeddingComposite(qpu, emb), G=nx.from_edgelist(J.keys()) + ) + + samplers = {"Fixed Embedding": sampler0, "Automorphism average": samplerA} + if srts: + samplerSRT = FixedEmbeddingComposite(SpinReversalTransformComposite(qpu), emb) + samplerASRT = AutomorphismComposite(samplerSRT, G=nx.from_edgelist(J.keys())) + samplers.update( + {"SRT average": samplerSRT, "SRT and Auto. average": samplerASRT} + ) + num_progs = 10 + fn_cache = f"cache/{solver}_automorphism_composite.pkl" + if use_cache and os.path.isfile(fn_cache): + with open(fn_cache, "rb") as f: + signed_correlations = pickle.load(f) + else: + signed_correlations = {sn: [] for sn in samplers.keys()} + + print('Employing different averaging techniques, we can decrease the ' + 'observed variance in statistics, which are expected to be equal ' + 'in the absence of symmetry breaking effects in environmental noise ' + 'device control and calibration.') + + + print('Identical signed edge expectations are anticipated based on the; ' + 'programmed model, a fully frustrated square lattice on a torus. ' + 'Specifically = -0.5 is anticipated if samples ' + 'are restricted to the ground state space.') + for sn, sampler in samplers.items(): + for num_prog in range(num_progs): + if len(signed_correlations[sn]) < num_prog: + ss = sampler.sample_ising( + {}, J, num_reads=1000 + ) # Could be asyncrhonized + samps = ss.record.sample + no = ss.record.num_occurrences + var_to_idx = {v: idx for idx, v in enumerate(ss.variables)} + signed_correlations[sn].append( + [ + Jij + * np.sum(no * samps[:, var_to_idx[i]] * samps[:, var_to_idx[j]]) + / np.sum(no) + for (i, j), Jij in J.items() + ] + ) + + if use_cache: + os.makedirs(os.path.dirname(fn_cache), exist_ok=True) + with open(fn_cache, "wb") as f: + pickle.dump(signed_correlations, f) + plt.plot( + np.sort(np.mean(signed_correlations[sn][:num_prog], axis=0)), + np.arange(len(J)) / len(J), + label=sn, + ) + plt.xlabel(r"Edge-wise expected energy, J_{ij} ") + plt.ylabel("Cumulative Distribution Function") + plt.legend() + plt.show() + + +if __name__ == "__main__": + + print( + "Averaging over Automorphisms, and/or Spin Reversal Transforms," + " can mitigate for symmetry breaking noise in statistics." + ) + + parser = argparse.ArgumentParser(description="An AutomorphismComposte example") + parser.add_argument( + "--use_cache", + action="store_true", + help="Add this flag to save experimental data, and reload when available at matched parameters", + ) + parser.add_argument( + "--solver_name", + type=str, + help="Option to specify QPU solver, the default DWaveSampler when not specified", + default=None, + ) + parser.add_argument( + "--srts", + action="store_true", + help="Add this flag to invoke additional noise mitigation with the SpinReversalTransformComposite", + ) + + args = parser.parse_args() + + main( + use_cache=args.use_cache, + solver=args.solver_name, + srts=args.srts, + ) diff --git a/tests/test_automorphism_composite.py b/tests/test_automorphism_composite.py new file mode 100644 index 0000000..fa04e71 --- /dev/null +++ b/tests/test_automorphism_composite.py @@ -0,0 +1,507 @@ +# Copyright 2025 D-Wave +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import dimod +import numpy as np +import networkx as nx + +from dwave.system.testing import MockDWaveSampler +from dwave_networkx import chimera_graph, pegasus_graph, zephyr_graph + +from dwave.experimental.automorphism import * + + +@dimod.testing.load_sampler_bqm_tests(AutomorphismComposite(dimod.ExactSolver())) +class TestAutomorphismComposite(unittest.TestCase): + def test_instantiation(self): + for factory in [ + dimod.ExactSolver, + dimod.RandomSampler, + dimod.SimulatedAnnealingSampler, + dimod.NullSampler, + ]: + sampler = AutomorphismComposite(factory()) + + dimod.testing.assert_sampler_api(sampler) + dimod.testing.assert_composite_api(sampler) + + def test_NullSampler_composition(self): + # Check NullSampler() works, this was a reported bug. + sampler = AutomorphismComposite(dimod.NullSampler()) + sampleset = sampler.sample_ising({"a": 1}, {}, num_automorphisms=1) + + self.assertTrue(len(sampleset) == 0) + sampleset = sampler.sample_ising({"a": 1}, {}, num_automorphisms=2) + + self.assertTrue(len(sampleset) == 0) + + def test_empty_bqm_composition(self): + # Check NullSampler() works, this was a reported bug. + + sampler = AutomorphismComposite(dimod.RandomSampler()) + bqm = dimod.BinaryQuadraticModel("SPIN") + sampleset = sampler.sample(bqm, num_automorphisms=1) + self.assertEqual(len(sampleset.variables), 0) + + def test_concatenation_stripping(self): + # Check samplesets are not stripped of information + # under default operation + + # Simple sampler with an info field. + # When multiple samplesets needn't be recombined, this should + # be maintained + class RichSampler(dimod.NullSampler): + def sample_ising(self, *args, **kwargs): + ss = super().sample_ising(*args, **kwargs) + ss.info["hello"] = "world" + return ss + + sampler = AutomorphismComposite(RichSampler()) + + sampleset = sampler.sample_ising({0: 1}, {}) + self.assertTrue(hasattr(sampleset, "info")) + + def test_sampleset_size(self): + # Check num_reads and num_automorphisms combine + # for anticipated number of samples. + + sampler = AutomorphismComposite(dimod.RandomSampler()) + for num_automorphisms in [1, 2]: + for num_reads in [1, 3]: + with self.subTest(f"{num_reads} {num_automorphisms}"): + sampleset = sampler.sample_ising( + {0: 1}, + {}, + num_automorphisms=num_automorphisms, + num_reads=num_reads, + ) + self.assertTrue( + sum(sampleset.record.num_occurrences) + == num_reads * num_automorphisms + ) + + def test_empty(self): + # Check that empty BQMs are handled + sampler = AutomorphismComposite(dimod.ExactSolver()) + bqm = dimod.BinaryQuadraticModel({}, {}, 0.0, dimod.SPIN) + sampleset = sampler.sample(bqm, num_automorphisms=3) + + self.assertEqual(sampleset.record.sample.shape, (0, 0)) + self.assertIs(sampleset.vartype, bqm.vartype) + + def test_ground_state(self): + num_variables = 10 + + bqm = dimod.BQM({v: 1 for v in range(num_variables)}, {}, 0, "SPIN") + + class Sampler: + def sample(self, bqm): + sample = {v: +1 if bias < 0 else -1 for v, bias in bqm.linear.items()} + return dimod.SampleSet.from_samples_bqm(sample, bqm) + + sampler = AutomorphismComposite(Sampler()) + sampleset = sampler.sample(bqm, num_automorphisms=num_variables) + + self.assertTrue((sampleset.record.sample == -1).all()) + + def test_async(self): + class SampleSet: + is_done = False + + def done(self): + return self.is_done + + def relabel_variables(self, mapping, inplace=True): + # Non blocking. + pass + + @property + def record(self): + raise Exception("boom") + + @property + def variables(self): + raise Exception("boom") + + class Sampler: + def __init__(self): + self.count = 0 + + def sample(self, bqm): + self.count += 1 + return SampleSet() + + sampler = AutomorphismComposite(Sampler()) + + # No exception because nothing has been resolved + sampleset = sampler.sample_ising({"a": 1}, {}) + + # We can test whether it's done + self.assertFalse(sampleset.done()) + SampleSet.is_done = True + self.assertTrue(sampleset.done()) + + # Resolving raises the exception + with self.assertRaisesRegex(Exception, "boom"): + sampleset.resolve() + + def test_seed(self): + """Test mirrors SpinReversalTransform, spins should be reordere w.h.p""" + bqm = dimod.BQM(1000, "SPIN") + + class Sampler: + def sample(self, bqm): + return dimod.SampleSet.from_samples_bqm( + [(-1) ** i for i in range(bqm.num_variables)], bqm + ) + + ss1 = AutomorphismComposite(Sampler(), seed=42).sample(bqm) + ss2 = AutomorphismComposite(Sampler(), seed=42).sample(bqm) + ss3 = AutomorphismComposite(Sampler(), seed=35).sample(bqm) + self.assertTrue((ss1.record == ss2.record).all()) + self.assertFalse((ss1.record == ss3.record).all()) + + def test_variable_order(self): + + class AlternatingSampler: + """Return the same solution, but in alternating order.""" + + def __init__(self): + self.rng = np.random.default_rng(42) + + def sample(self, bqm): + + order = list(bqm.variables) + self.rng.shuffle(order) + + sample = [ + [-1 if bqm.linear[v] >= 0 else +1 for v in order], + [+1 if bqm.linear[v] >= 0 else -1 for v in order], + ] + + return dimod.SampleSet.from_samples_bqm( + (sample, order), bqm, sort_labels=False + ) + + sampler = AutomorphismComposite(AlternatingSampler(), seed=42) + + bqm = dimod.BinaryQuadraticModel( + {"a": -1, "b": 1, "c": -1, "d": -1.4}, {"ab": -2}, 0, "SPIN" + ) + + sampleset = sampler.sample(bqm, num_automorphisms=40) + + # there should only be two unique samples after SRTs + self.assertEqual(len(sampleset.aggregate()), 2) + + def test_variable_order2(self): + class GroundStateSampler: + @staticmethod + def sample(bqm): + return dimod.ExactSolver().sample(bqm).lowest() + + bqm = dimod.BinaryQuadraticModel( + {}, + {(1, 0): 0, (4, 3): 0, (2, 0): -1, (2, 1): 1, (5, 3): -1, (5, 4): 1}, + 0, + "SPIN", + ) + + sampler = AutomorphismComposite(GroundStateSampler(), seed=42) + + sampleset = sampler.sample(bqm, num_automorphisms=3) + + self.assertEqual(len(sampleset.aggregate()), 4) + + def test_variable_order3(self): + class GroundStateSampler: + @staticmethod + def sample(bqm): + return dimod.ExactSolver().sample(bqm).lowest() + + bqm = dimod.BinaryQuadraticModel({}, {(1, 0): 1, (2, 3): 1}, 0, "SPIN") + + sampler = AutomorphismComposite(GroundStateSampler(), seed=42) + + sampleset = sampler.sample(bqm, num_automorphisms=10) + + self.assertEqual(len(sampleset.aggregate()), 4) + + def test_propagation_of_info(self): + # NB: info is not propagated when num_automorphisms is + # greater than 1, as a best general aggregation method is not obvious. + class InfoRichSampler: + @staticmethod + def sample(bqm): + ss = dimod.ExactSolver().sample(bqm).lowest() + ss.info["has_some"] = True + return ss + + sampler = AutomorphismComposite(InfoRichSampler()) + + bqm = dimod.BinaryQuadraticModel({0: 1}, {(0, 1): 1}, 0, "SPIN") + + sampleset = sampler.sample(bqm, num_automorphisms=1) + + self.assertTrue(hasattr(sampleset, "info")) + self.assertEqual(sampleset.info, {"has_some": True}) + + def test_mappings_argument(self): + # All 1 ground state + class Sampler: + def sample(self, bqm): + return dimod.SampleSet.from_samples_bqm( + [-(1**i) for i in range(bqm.num_variables)], bqm + ) + + num_var = 10 + bqm = dimod.BinaryQuadraticModel({i: -1 for i in range(num_var)}, {}, 0, "SPIN") + + sampler = Sampler() + ss = sampler.sample(bqm) + samples = ss.record.sample + sampler = AutomorphismComposite(sampler) + mappings = [{v: v for v in bqm.variables}] + ss = sampler.sample(bqm, mappings=mappings) + self.assertTrue( + np.all(ss.record.sample == samples), + "Neutral mappings leaves result unpermuted.", + ) + + ss = sampler.sample(bqm, mappings=[], num_automorphisms=0) + self.assertTrue( + np.all(ss.record.sample == samples), + "Empty mappings also allows pass through " + "(just like num_spin_reversals=0).", + ) + + mapping = {v: vr for v, vr in zip(bqm.variables, reversed(bqm.variables))} + mappings = [mapping] + ss = sampler.sample(bqm, mappings=mappings) + + self.assertTrue( + np.all(ss.record.sample == (-1) ** num_var * samples), + "Flip-all mappings inverts the order", + ) + + with self.subTest("srt shape"): + mappings3 = [mapping.copy(), mapping, mapping] + num_automorphisms = len(mappings3) + mappings3[0].update({k: k for k in (0, num_var - 1)}) + ss = sampler.sample(bqm, mappings=mappings3) + self.assertEqual( + np.sum(ss.record.num_occurrences), len(mappings3), "Apply 3 mappings" + ) + self.assertTrue( + np.all( + (-1) ** m[v] == s[idx] + for m, s in zip(mappings3, ss.record.sample) + for idx, v in enumerate(ss.variables) + ) + ) + + with self.assertRaises(ValueError): + # Inconsistent arguments + ss = sampler.sample( + bqm, mappings=mappings, num_automorphisms=num_automorphisms + 1 + ) + + def test_inheritance_and_u_vector(self): + qpu = MockDWaveSampler(topology_type="chimera", topology_shape=[1, 2, 3]) + G = ( + nx.Graph() + ) # to_networkx_graph() does not preserve node order, and can't be used unfortunately. + G.add_nodes_from(qpu.nodelist) + G.add_edges_from(qpu.edgelist) + h = {i: 1 for i in qpu.nodelist} + J = {e: 1 for e in qpu.edgelist} + + sampler1 = AutomorphismComposite(qpu) + ss = sampler1.sample_ising(h, J, num_automorphisms=1, num_reads=3) + self.assertEqual(np.sum(ss.record.num_occurrences), 3) + unconstrained = dimod.RandomSampler() + sampler2 = AutomorphismComposite(unconstrained, G=G) + self.assertTrue( + all( + np.all(npa1 == npa2) + for l1, l2 in zip( + sampler1.generators_u_vector, sampler2.generators_u_vector + ) + for npa1, npa2 in zip(l1, l2) + ) + ) + ss = sampler2.sample_ising(h, J, num_automorphisms=3, num_reads=1) + self.assertEqual(np.sum(ss.record.num_occurrences), 3) + idx_to_node = {idx: n for idx, n in enumerate(qpu.nodelist)} + sampler3 = AutomorphismComposite( + unconstrained, + generators_u_vector=sampler2.generators_u_vector, + idx_to_node=idx_to_node, + ) + + self.assertTrue( + all( + np.all(npa1 == npa2) + for l1, l2 in zip( + sampler3.generators_u_vector, sampler2.generators_u_vector + ) + for npa1, npa2 in zip(l1, l2) + ) + ) + + ss = sampler3.sample_ising(h, J, num_automorphisms=2, num_reads=2) + self.assertEqual(np.sum(ss.record.num_occurrences), 4) + + def test_nontrivial_sampling(self): + class Sampler: + # First 2 qubits are (+1), other qubits are (-1) + def sample(self, bqm, num_reads=1): + return dimod.SampleSet.from_samples_bqm( + np.tile(np.arange(bqm.num_variables), (num_reads, 1)), bqm + ) + + m, n, t = [1, 2, 3] # Chimera shape + nodelist = list( + range(1, m * n * t * 2) + ) # Missing first (vertical) node labeled 0 + qpu = MockDWaveSampler( + topology_type="chimera", topology_shape=[m, n, t], nodelist=nodelist + ) + + bqm = dimod.BinaryQuadraticModel("SPIN").from_ising( + {n: (-1) ** n for n in qpu.nodelist}, {e: 0 for e in qpu.edgelist} + ) + num_automorphisms = 10 + generators_listtuple = [ + ({1: 2, 2: 1}, 2), + ({2 * t + i: 2 * t + (i % t) for i in range(t)}, t), + ] # Swap of yielded verticals on first Chimera cell, and rotation of verticals on last Chimera cell + for sampler in [ + AutomorphismComposite(qpu), + AutomorphismComposite(qpu, generators_listtuple=generators_listtuple), + ]: + # Uniformly sample all Chimera automorphisms with Schreir-Sims (default) + # Every qubit should move with high probability + ss = sampler.sample(bqm, num_reads=1, num_automorphisms=num_automorphisms) + self.assertEqual( + sum(ss.record.sample @ np.array([(-1) ** v for v in ss.variables])), + (-bqm.num_variables * num_automorphisms), + "All automorphisms should yield ground states -sign(h)", + ) + + def test_listtuples_to_arrays(self): + # Commuting subset of automorphisms on a Chimera[1,2,3] + m = 1 + n = 2 + t = 3 + generators_listtuple = [ + ({1: 2, 2: 1}, 2), + ({2 * t + i: 2 * t + (i % t) for i in range(t)}, t), + ] + node_to_idx = {n: idx for idx, n in enumerate(range(1, m * n * t * 2))} + generators_u_vector = listtuple_to_arrays(generators_listtuple, node_to_idx) + self.assertEqual(len(generators_listtuple), len(generators_u_vector)) + res = [ + [np.array([1, 0, 2, 3, 4, 5, 6, 7, 8, 9, 10])], + [ + np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), + np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), + ], + ] + self.assertTrue( + all( + np.all(g1 == g2) + for l1, l2 in zip(res, generators_u_vector) + for g1, g2 in zip(l1, l2) + ) + ) + + def test_sample_automorphisms_listtuple(self): + sigma = 3 + nodes = ["a", "b"] + generator = {"a": "b", "b": "a"} + generators_listtuple = [(generator, 2)] + # significance test on Bernouilli, 4 sigma: + mapping = None + num_automorphisms = 100 + total_not_identity = 0 + for seed in range(num_automorphisms): + # seed = None + mapping = sample_automorphisms_listtuple( + generators_listtuple=generators_listtuple, prng=seed, mapping=mapping + ) + total_not_identity += int(mapping == generator) # 1 of two options. + self.assertLess( + total_not_identity, + num_automorphisms / 2 + sigma * np.sqrt(num_automorphisms / 4), + "The seeds are set so this test will not fail for convenience of automated testing." + "Seed randomization also passes with high probability per commented line.", + ) + self.assertGreater( + total_not_identity, + num_automorphisms / 2 - sigma * np.sqrt(num_automorphisms / 4), + "The seeds are set so this test will not fail for convenience of automated testing." + "Seed randomization also passes with high probability per commented line.", + ) + + pass + + def test_dnx_graph_generators(self): + dnx_shapes = { + "chimera": [(1, 1, 1), (2, 2, 4), (3, 1, 3)], + "zephyr": [(1, 1), (2, 3)], + "pegasus": [(2,), (3,), (4,)], + } + for dnx_type, shapes in dnx_shapes.items(): + if dnx_type == "chimera": + make_graph = chimera_graph + make_generators = chimera_generators + elif dnx_type == "pegasus": + make_graph = pegasus_graph + make_generators = pegasus_generators + elif dnx_type == "zephyr": + make_graph = zephyr_graph + make_generators = zephyr_generators + else: + raise ValueError("Unknown dnx graph type") + for shape in shapes: + G = make_graph(*shape) + rep = schreier_rep(G) + generators_listtuple = make_generators( + *shape, generator_type="redundant" + ) + for g, d in generators_listtuple: + self.assertEqual(type(d), int) + self.assertGreater( + d, 1 + ) # Can be as large as 't' for shore permutations. + self.assertSetEqual( + set(g.keys()), + set(g.values()), + "Every automorphism defines a 1:1 mapping on a subspace", + ) + gen_sample_space = np.prod([d for _, d in generators_listtuple]) + self.assertEqual( + rep.num_automorphisms, + gen_sample_space, + "Uniform sampling anticipated", + ) + generators_listtuple2 = make_generators( + *shape, generator_type="strongest" + ) + self.assertLessEqual( + len(generators_listtuple2), len(generators_listtuple) + ) From b104524d65c810ab1988652dcfb0c929c4a8624b Mon Sep 17 00:00:00 2001 From: Jack Raymond <10591246+jackraymond@users.noreply.github.com> Date: Tue, 16 Dec 2025 14:07:31 -0800 Subject: [PATCH 2/3] Delete __main__ which is now covered by tests and the example --- .../automorphism/automorphism_composite.py | 243 +----------------- 1 file changed, 1 insertion(+), 242 deletions(-) diff --git a/dwave/experimental/automorphism/automorphism_composite.py b/dwave/experimental/automorphism/automorphism_composite.py index 34cdb23..59feab5 100644 --- a/dwave/experimental/automorphism/automorphism_composite.py +++ b/dwave/experimental/automorphism/automorphism_composite.py @@ -407,7 +407,7 @@ def sample_automorphisms_listtuple(generators_listtuple, *, prng=None, mapping=N degree e.g. {2: 4, 4: 2}, degree 2 Args: - generators_listtuple: A list of generator, generator degree tuples, to be + generators_listtuple: A list of generator, generator degree tuples, to be sampled sequentially. prng: A numpy pseudo random number generator, or seed used to initialize the PRNG. @@ -783,244 +783,3 @@ def sample( else: # finally combine all samplesets together yield dimod.concatenate(samplesets) - - -if __name__ == "__main__": - from dwave_networkx import chimera_graph, zephyr_graph, pegasus_graph - from networkx import relabel_nodes - from dimod import ExactSolver - from itertools import product - - # Chimera_cell: - from dwave.system.testing import MockDWaveSampler - from dwave_networkx import ( - chimera_coordinates, - zephyr_coordinates, - pegasus_coordinates, - ) - - try: - import pynauty # Do tests if available, can hardcode values for tests. - - def get_permutations(Gembeddable, get_generators=True): - """From latqa""" - Gpn = pynauty.Graph(Gembeddable.number_of_nodes()) - node_index_dict = {n: i for i, n in enumerate(Gembeddable.nodes)} - for u, v in Gembeddable.edges: - Gpn.connect_vertex(node_index_dict[u], node_index_dict[v]) - # Gpn.connect_vertex(node_index_dict[v], node_index_dict[u]) - X = pynauty.autgrp(Gpn) - if get_generators: - return np.array(X[0]), { - n: i for i, n in node_index_dict.items() - } # automorphism generators - else: - return X[1] - - print("Ring") - for L in range(3, 6): - graph = nx.from_edgelist({(i, (i + 1) % L) for i in range(L)}) - g1 = get_permutations(graph) - print(g1) - aut_group_size = get_permutations(graph, False) - print("pynauty size: ", aut_group_size) - print("Clique") - for L in range(3, 6): - graph = nx.from_edgelist({(i, j) for i in range(L) for j in range(i)}) - g1 = get_permutations(graph) - print(g1) - aut_group_size = get_permutations(graph, False) - print("pynauty size: ", aut_group_size) - print("BiClique (L,L)") - for L in range(1, 6): - graph = nx.from_edgelist({(i, L + j) for i in range(L) for j in range(L)}) - g1 = get_permutations(graph) - print(L, g1) - aut_group_size = get_permutations(graph, False) - print("pynauty size: ", aut_group_size) - print() - print("Pegasus") - dnxg, my_g = pegasus_graph, pegasus_generators - for m in range(2, 5): - graph = dnxg(m=m) - g1 = get_permutations(graph) - g2 = my_g(m=m) - print(f"Strongest set m={m}", len(g1[0]), len(g2)) - rep = schreier_rep( - nx.relabel_nodes(graph, {n: idx for idx, n in enumerate(graph.nodes())}) - ) - print( - "Size of rep", - rep.num_automorphisms, - np.prod([d for _, d in g2]), - rep.num_automorphisms <= np.prod([d for _, d in g2]), - ) - aut_group_size = get_permutations(graph, False) - print("pynauty size: ", aut_group_size) - - print("Zephyr") - dnxg, my_g = zephyr_graph, zephyr_generators - for m in range(1, 4): - for t in range(1, 5): - graph = dnxg(m=m, t=t) - g1 = get_permutations(dnxg(m=m, t=t)) - g2 = my_g(m=m, t=t) - print(f"Strongest set m={m} t={t}", len(g1[0]), len(g2)) - rep = schreier_rep(graph) - print( - "Size of rep", - rep.num_automorphisms, - np.prod([d for _, d in g2]), - rep.num_automorphisms <= np.prod([d for _, d in g2]), - ) - aut_group_size = get_permutations(graph, False) - print("pynauty size: ", aut_group_size) - - print() - print("Chimera") - dnxg, my_g = chimera_graph, chimera_generators - for m in range(1, 4): - for n in range(1, 4): - for t in range(1, 7): - graph = dnxg(m=m, n=n, t=t) - g1 = get_permutations(graph) - g2 = my_g(m=m, n=n, t=t) - rep = schreier_rep(graph) - print(f"Strongest set m={m} n={n}, t={t}", len(g1[0]), len(g2)) - print( - "Size of rep (Sebastian)", - rep.num_automorphisms, - "Mine", - np.prod([d for _, d in g2]), - rep.num_automorphisms <= np.prod([d for _, d in g2]), - ) - aut_group_size = get_permutations(graph, False) - print("pynauty size: ", aut_group_size) - - if len(g1[0]) != len(g2): - print("Problem") - if len(g1[0]) < 10: - print(g1) - print(g2) - print() - - raise ValueError("DEBUG") - except: - raise ValueError("pynauty wanted") - # Test various defect-free dwave_networkx generators: - topology_type = "chimera" - # topology_type = "zephyr" - # topology_type = "pegasus" - if topology_type == "chimera": - topology_shape = [3, 2, 4] - make_graph = chimera_graph - graph_generators = chimera_generators - coord_transform = chimera_coordinates(*topology_shape).chimera_to_linear - shapes = product([1, 3], [1, 3]) - elif topology_type == "zephyr": - topology_shape = [3, 2] - make_graph = zephyr_graph - graph_generators = zephyr_generators - coord_transform = zephyr_coordinates(*topology_shape).zephyr_to_linear - shapes = product([1, 3], [1, 3]) - elif topology_type == "pegasus": - topology_shape = [2] - make_graph = pegasus_graph - graph_generators = pegasus_generators - coord_transform = pegasus_coordinates(*topology_shape).pegasus_to_linear - shapes = [[2], [3]] - else: - raise ValueError("Unknown topology") - base_sampler = MockDWaveSampler( - topology_type=topology_type, topology_shape=topology_shape - ) - G = base_sampler.to_networkx_graph() - # draw_chimera(G, with_labels=True) - # G2 = relabel_nodes(G, {n: chimera_coordinates(*topology_shape).linear_to_chimera(n) for n in G.nodes()}) - # draw_chimera(G2, with_labels=True) - # plt.show() - - Gedges = set(tuple(sorted(e)) for e in G.edges()) - Gnodes = set(G.nodes()) - generators = graph_generators(*topology_shape) - generators = [ - ({coord_transform(k): coord_transform(v) for k, v in g[0].items()}, g[1]) - for g in generators - ] - pruned_generators = [] - for g in generators: - new = prune_by_nodeset(g[0], nodeset=Gnodes) - if new: - pruned_generators.append((new, g[1])) - assert set(new.keys()).issubset(Gnodes) - composed_sampler = AutomorphismComposite( - base_sampler, generators_listtuple=pruned_generators - ) - - composed_sampler2 = AutomorphismComposite(base_sampler, G=G) - print("u_vector", composed_sampler2.generators_u_vector) - print("listtuple", generators) - node_to_idx = {v: k for k, v in composed_sampler2.idx_to_node.items()} - alt_uvectors = listtuple_to_arrays(generators, node_to_idx) - print("listtuple as u_vector", alt_uvectors) - print( - "SchreierSims", - len(composed_sampler2.generators_u_vector), - [len(v) for v in composed_sampler2.generators_u_vector], - ) - print("Mine", len(alt_uvectors), [len(v) for v in alt_uvectors]) - bqm = dimod.BinaryQuadraticModel("SPIN").from_ising({}, {e: -1 for e in G.edges}) - for cs in [composed_sampler, composed_sampler2]: - response = composed_sampler.sample(bqm, num_automorphisms=2) - print(response.first.sample) - - for shape in shapes: - if len(shape) == 2: - m, t = shape - graph_params = {"m": shape[0], "t": shape[1]} - else: - m = shape - graph_params = {"m": shape[0]} - G = make_graph(**graph_params, coordinates=True) - Gedges = set(tuple(sorted(e)) for e in G.edges()) - Gnodes = set(G.nodes()) - generators = graph_generators(**graph_params) - pruned_generators = [] - for g in generators: - new = prune_by_nodeset(g[0], nodeset=Gnodes) - if new: - pruned_generators.append((new, g[1])) - assert set(new.keys()).issubset(Gnodes) - - for g, ol in pruned_generators: - if set(g.keys()) != set(g.values()): - print(g.keys()) - print(g.values()) - assert set(g.keys()) == set(g.values()) - Gn = relabel_nodes(G, g) - Gnedges = set(tuple(sorted(e)) for e in G.edges()) - if set(Gn.nodes()) != Gnodes: - print(g, Gnodes) - print("Gnodes Gn.nodes difference", Gnodes.difference(set(Gn.nodes()))) - print("Gnodes, Gnedges difference", set(Gn.nodes()).difference(Gnedges)) - assert set(Gn.nodes()) == Gnodes - if Gnedges != Gedges: - print(sorted(set(tuple(sorted(e)) for e in Gedges))) - print(sorted(set(G.edges()))) - print(Gnedges.difference(Gedges)) - print(Gedges.difference(Gnedges)) - assert Gnedges == Gedges - # assert list(Gn.edges()) != list(G.edges()) - - random_perm = sample_automorphisms_listtuple(pruned_generators) - assert set(random_perm.keys()) == Gnodes - assert set(random_perm.values()) == Gnodes - Gn = relabel_nodes(G, random_perm) - if set(Gn.nodes()) != Gnodes: - print("Problem") - print(Gnodes.difference(set(Gn.nodes()))) - print(set(Gn.nodes()).difference(Gnedges)) - - assert set(Gn.nodes()) == Gnodes - Gnedges = set(tuple(sorted(e)) for e in G.edges()) - assert Gnedges == Gedges From 086c7e7ca20dd2a30ebfb5374542da8ba03603e6 Mon Sep 17 00:00:00 2001 From: Jack Raymond <10591246+jackraymond@users.noreply.github.com> Date: Tue, 23 Dec 2025 13:47:47 -0800 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: SebastianGitt <41522417+SebastianGitt@users.noreply.github.com> --- dwave/experimental/automorphism/automorphism_composite.py | 7 +++---- examples/automorphism_composite.py | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/dwave/experimental/automorphism/automorphism_composite.py b/dwave/experimental/automorphism/automorphism_composite.py index 59feab5..c7a28f8 100644 --- a/dwave/experimental/automorphism/automorphism_composite.py +++ b/dwave/experimental/automorphism/automorphism_composite.py @@ -14,7 +14,7 @@ """ This module contains an automorphism composite that can make use of -either the Schreier Sims representation of the automorphism_generation.py +either the Schreier-Sims representation of the automorphism_generation.py module, or a (generator, degree) format where the generator is a dictionary. This generator should later be moved to dwave.preprocessing.composites to work alongside SpinReversalTransformComposite - since it serves a closely @@ -400,7 +400,7 @@ def sample_automorphisms_listtuple(generators_listtuple, *, prng=None, mapping=N Generators for Chimera, Pegasus and Zephyr graphs are not abelian, but can be ordered to allow for similar fair sampling. See the respective generator methods of this module. - For more general cases it is recommended to fairly sample by the Screier-Sims + For more general cases it is recommended to fairly sample by the Schreier-Sims method of automorphism_generation.py module. Generators are specified as dictionaries (1:1 mappings), with some known @@ -592,7 +592,6 @@ def __init__( raise ValueError("idx_to_node should have contiguous integer keys") result = schreier_rep( nx.relabel_nodes(G, {n: idx for idx, n in self.idx_to_node.items()}), - num_samples=G.number_of_nodes(), ) self.generators_u_vector = result.u_vector elif generators_u_vector is not None: @@ -686,7 +685,7 @@ def sample( to no automorphism. num_automorphisms: - When mappings is not given, specifies teh number of mappings to + When mappings is not given, specifies the number of mappings to apply (create). If mappings is provided, it is inferred as :code:`len(mappings)`, otherwise it is defaulted to 1. diff --git a/examples/automorphism_composite.py b/examples/automorphism_composite.py index 539802a..3913cc1 100644 --- a/examples/automorphism_composite.py +++ b/examples/automorphism_composite.py @@ -17,7 +17,6 @@ import os import argparse - import pickle import networkx as nx