123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641 |
- """
- Unit tests for dedensification and graph summarization
- """
- import pytest
- import networkx as nx
- class TestDirectedDedensification:
- def build_original_graph(self):
- original_matrix = [
- ("1", "BC"),
- ("2", "ABC"),
- ("3", ["A", "B", "6"]),
- ("4", "ABC"),
- ("5", "AB"),
- ("6", ["5"]),
- ("A", ["6"]),
- ]
- graph = nx.DiGraph()
- for source, targets in original_matrix:
- for target in targets:
- graph.add_edge(source, target)
- return graph
- def build_compressed_graph(self):
- compressed_matrix = [
- ("1", "BC"),
- ("2", ["ABC"]),
- ("3", ["A", "B", "6"]),
- ("4", ["ABC"]),
- ("5", "AB"),
- ("6", ["5"]),
- ("A", ["6"]),
- ("ABC", "ABC"),
- ]
- compressed_graph = nx.DiGraph()
- for source, targets in compressed_matrix:
- for target in targets:
- compressed_graph.add_edge(source, target)
- return compressed_graph
- def test_empty(self):
- """
- Verify that an empty directed graph results in no compressor nodes
- """
- G = nx.DiGraph()
- compressed_graph, c_nodes = nx.dedensify(G, threshold=2)
- assert c_nodes == set()
- @staticmethod
- def densify(G, compressor_nodes, copy=True):
- """
- Reconstructs the original graph from a dedensified, directed graph
- Parameters
- ----------
- G: dedensified graph
- A networkx graph
- compressor_nodes: iterable
- Iterable of compressor nodes in the dedensified graph
- inplace: bool, optional (default: False)
- Indicates if densification should be done inplace
- Returns
- -------
- G: graph
- A densified networkx graph
- """
- if copy:
- G = G.copy()
- for compressor_node in compressor_nodes:
- all_neighbors = set(nx.all_neighbors(G, compressor_node))
- out_neighbors = set(G.neighbors(compressor_node))
- for out_neighbor in out_neighbors:
- G.remove_edge(compressor_node, out_neighbor)
- in_neighbors = all_neighbors - out_neighbors
- for in_neighbor in in_neighbors:
- G.remove_edge(in_neighbor, compressor_node)
- for out_neighbor in out_neighbors:
- G.add_edge(in_neighbor, out_neighbor)
- G.remove_node(compressor_node)
- return G
- def setup_method(self):
- self.c_nodes = ("ABC",)
- def test_dedensify_edges(self):
- """
- Verifies that dedensify produced the correct edges to/from compressor
- nodes in a directed graph
- """
- G = self.build_original_graph()
- compressed_G = self.build_compressed_graph()
- compressed_graph, c_nodes = nx.dedensify(G, threshold=2)
- for s, t in compressed_graph.edges():
- o_s = "".join(sorted(s))
- o_t = "".join(sorted(t))
- compressed_graph_exists = compressed_graph.has_edge(s, t)
- verified_compressed_exists = compressed_G.has_edge(o_s, o_t)
- assert compressed_graph_exists == verified_compressed_exists
- assert len(c_nodes) == len(self.c_nodes)
- def test_dedensify_edge_count(self):
- """
- Verifies that dedensify produced the correct number of comrpessor nodes
- in a directed graph
- """
- G = self.build_original_graph()
- original_edge_count = len(G.edges())
- c_G, c_nodes = nx.dedensify(G, threshold=2)
- compressed_edge_count = len(c_G.edges())
- assert compressed_edge_count <= original_edge_count
- compressed_G = self.build_compressed_graph()
- assert compressed_edge_count == len(compressed_G.edges())
- def test_densify_edges(self):
- """
- Verifies that densification produces the correct edges from the
- original directed graph
- """
- compressed_G = self.build_compressed_graph()
- original_graph = self.densify(compressed_G, self.c_nodes, copy=True)
- G = self.build_original_graph()
- for s, t in G.edges():
- assert G.has_edge(s, t) == original_graph.has_edge(s, t)
- def test_densify_edge_count(self):
- """
- Verifies that densification produces the correct number of edges in the
- original directed graph
- """
- compressed_G = self.build_compressed_graph()
- compressed_edge_count = len(compressed_G.edges())
- original_graph = self.densify(compressed_G, self.c_nodes)
- original_edge_count = len(original_graph.edges())
- assert compressed_edge_count <= original_edge_count
- G = self.build_original_graph()
- assert original_edge_count == len(G.edges())
- class TestUnDirectedDedensification:
- def build_original_graph(self):
- """
- Builds graph shown in the original research paper
- """
- original_matrix = [
- ("1", "CB"),
- ("2", "ABC"),
- ("3", ["A", "B", "6"]),
- ("4", "ABC"),
- ("5", "AB"),
- ("6", ["5"]),
- ("A", ["6"]),
- ]
- graph = nx.Graph()
- for source, targets in original_matrix:
- for target in targets:
- graph.add_edge(source, target)
- return graph
- def test_empty(self):
- """
- Verify that an empty undirected graph results in no compressor nodes
- """
- G = nx.Graph()
- compressed_G, c_nodes = nx.dedensify(G, threshold=2)
- assert c_nodes == set()
- def setup_method(self):
- self.c_nodes = ("6AB", "ABC")
- def build_compressed_graph(self):
- compressed_matrix = [
- ("1", ["B", "C"]),
- ("2", ["ABC"]),
- ("3", ["6AB"]),
- ("4", ["ABC"]),
- ("5", ["6AB"]),
- ("6", ["6AB", "A"]),
- ("A", ["6AB", "ABC"]),
- ("B", ["ABC", "6AB"]),
- ("C", ["ABC"]),
- ]
- compressed_graph = nx.Graph()
- for source, targets in compressed_matrix:
- for target in targets:
- compressed_graph.add_edge(source, target)
- return compressed_graph
- def test_dedensify_edges(self):
- """
- Verifies that dedensify produced correct compressor nodes and the
- correct edges to/from the compressor nodes in an undirected graph
- """
- G = self.build_original_graph()
- c_G, c_nodes = nx.dedensify(G, threshold=2)
- v_compressed_G = self.build_compressed_graph()
- for s, t in c_G.edges():
- o_s = "".join(sorted(s))
- o_t = "".join(sorted(t))
- has_compressed_edge = c_G.has_edge(s, t)
- verified_has_compressed_edge = v_compressed_G.has_edge(o_s, o_t)
- assert has_compressed_edge == verified_has_compressed_edge
- assert len(c_nodes) == len(self.c_nodes)
- def test_dedensify_edge_count(self):
- """
- Verifies that dedensify produced the correct number of edges in an
- undirected graph
- """
- G = self.build_original_graph()
- c_G, c_nodes = nx.dedensify(G, threshold=2, copy=True)
- compressed_edge_count = len(c_G.edges())
- verified_original_edge_count = len(G.edges())
- assert compressed_edge_count <= verified_original_edge_count
- verified_compressed_G = self.build_compressed_graph()
- verified_compressed_edge_count = len(verified_compressed_G.edges())
- assert compressed_edge_count == verified_compressed_edge_count
- @pytest.mark.parametrize(
- "graph_type", [nx.Graph, nx.DiGraph, nx.MultiGraph, nx.MultiDiGraph]
- )
- def test_summarization_empty(graph_type):
- G = graph_type()
- summary_graph = nx.snap_aggregation(G, node_attributes=("color",))
- assert nx.is_isomorphic(summary_graph, G)
- class AbstractSNAP:
- node_attributes = ("color",)
- def build_original_graph(self):
- pass
- def build_summary_graph(self):
- pass
- def test_summary_graph(self):
- original_graph = self.build_original_graph()
- summary_graph = self.build_summary_graph()
- relationship_attributes = ("type",)
- generated_summary_graph = nx.snap_aggregation(
- original_graph, self.node_attributes, relationship_attributes
- )
- relabeled_summary_graph = self.deterministic_labels(generated_summary_graph)
- assert nx.is_isomorphic(summary_graph, relabeled_summary_graph)
- def deterministic_labels(self, G):
- node_labels = list(G.nodes)
- node_labels = sorted(node_labels, key=lambda n: sorted(G.nodes[n]["group"])[0])
- node_labels.sort()
- label_mapping = {}
- for index, node in enumerate(node_labels):
- label = "Supernode-%s" % index
- label_mapping[node] = label
- return nx.relabel_nodes(G, label_mapping)
- class TestSNAPNoEdgeTypes(AbstractSNAP):
- relationship_attributes = ()
- def test_summary_graph(self):
- original_graph = self.build_original_graph()
- summary_graph = self.build_summary_graph()
- relationship_attributes = ("type",)
- generated_summary_graph = nx.snap_aggregation(
- original_graph, self.node_attributes
- )
- relabeled_summary_graph = self.deterministic_labels(generated_summary_graph)
- assert nx.is_isomorphic(summary_graph, relabeled_summary_graph)
- def build_original_graph(self):
- nodes = {
- "A": {"color": "Red"},
- "B": {"color": "Red"},
- "C": {"color": "Red"},
- "D": {"color": "Red"},
- "E": {"color": "Blue"},
- "F": {"color": "Blue"},
- "G": {"color": "Blue"},
- "H": {"color": "Blue"},
- "I": {"color": "Yellow"},
- "J": {"color": "Yellow"},
- "K": {"color": "Yellow"},
- "L": {"color": "Yellow"},
- }
- edges = [
- ("A", "B"),
- ("A", "C"),
- ("A", "E"),
- ("A", "I"),
- ("B", "D"),
- ("B", "J"),
- ("B", "F"),
- ("C", "G"),
- ("D", "H"),
- ("I", "J"),
- ("J", "K"),
- ("I", "L"),
- ]
- G = nx.Graph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target in edges:
- G.add_edge(source, target)
- return G
- def build_summary_graph(self):
- nodes = {
- "Supernode-0": {"color": "Red"},
- "Supernode-1": {"color": "Red"},
- "Supernode-2": {"color": "Blue"},
- "Supernode-3": {"color": "Blue"},
- "Supernode-4": {"color": "Yellow"},
- "Supernode-5": {"color": "Yellow"},
- }
- edges = [
- ("Supernode-0", "Supernode-0"),
- ("Supernode-0", "Supernode-1"),
- ("Supernode-0", "Supernode-2"),
- ("Supernode-0", "Supernode-4"),
- ("Supernode-1", "Supernode-3"),
- ("Supernode-4", "Supernode-4"),
- ("Supernode-4", "Supernode-5"),
- ]
- G = nx.Graph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target in edges:
- G.add_edge(source, target)
- supernodes = {
- "Supernode-0": {"A", "B"},
- "Supernode-1": {"C", "D"},
- "Supernode-2": {"E", "F"},
- "Supernode-3": {"G", "H"},
- "Supernode-4": {"I", "J"},
- "Supernode-5": {"K", "L"},
- }
- nx.set_node_attributes(G, supernodes, "group")
- return G
- class TestSNAPUndirected(AbstractSNAP):
- def build_original_graph(self):
- nodes = {
- "A": {"color": "Red"},
- "B": {"color": "Red"},
- "C": {"color": "Red"},
- "D": {"color": "Red"},
- "E": {"color": "Blue"},
- "F": {"color": "Blue"},
- "G": {"color": "Blue"},
- "H": {"color": "Blue"},
- "I": {"color": "Yellow"},
- "J": {"color": "Yellow"},
- "K": {"color": "Yellow"},
- "L": {"color": "Yellow"},
- }
- edges = [
- ("A", "B", "Strong"),
- ("A", "C", "Weak"),
- ("A", "E", "Strong"),
- ("A", "I", "Weak"),
- ("B", "D", "Weak"),
- ("B", "J", "Weak"),
- ("B", "F", "Strong"),
- ("C", "G", "Weak"),
- ("D", "H", "Weak"),
- ("I", "J", "Strong"),
- ("J", "K", "Strong"),
- ("I", "L", "Strong"),
- ]
- G = nx.Graph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target, type in edges:
- G.add_edge(source, target, type=type)
- return G
- def build_summary_graph(self):
- nodes = {
- "Supernode-0": {"color": "Red"},
- "Supernode-1": {"color": "Red"},
- "Supernode-2": {"color": "Blue"},
- "Supernode-3": {"color": "Blue"},
- "Supernode-4": {"color": "Yellow"},
- "Supernode-5": {"color": "Yellow"},
- }
- edges = [
- ("Supernode-0", "Supernode-0", "Strong"),
- ("Supernode-0", "Supernode-1", "Weak"),
- ("Supernode-0", "Supernode-2", "Strong"),
- ("Supernode-0", "Supernode-4", "Weak"),
- ("Supernode-1", "Supernode-3", "Weak"),
- ("Supernode-4", "Supernode-4", "Strong"),
- ("Supernode-4", "Supernode-5", "Strong"),
- ]
- G = nx.Graph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target, type in edges:
- G.add_edge(source, target, types=[{"type": type}])
- supernodes = {
- "Supernode-0": {"A", "B"},
- "Supernode-1": {"C", "D"},
- "Supernode-2": {"E", "F"},
- "Supernode-3": {"G", "H"},
- "Supernode-4": {"I", "J"},
- "Supernode-5": {"K", "L"},
- }
- nx.set_node_attributes(G, supernodes, "group")
- return G
- class TestSNAPDirected(AbstractSNAP):
- def build_original_graph(self):
- nodes = {
- "A": {"color": "Red"},
- "B": {"color": "Red"},
- "C": {"color": "Green"},
- "D": {"color": "Green"},
- "E": {"color": "Blue"},
- "F": {"color": "Blue"},
- "G": {"color": "Yellow"},
- "H": {"color": "Yellow"},
- }
- edges = [
- ("A", "C", "Strong"),
- ("A", "E", "Strong"),
- ("A", "F", "Weak"),
- ("B", "D", "Strong"),
- ("B", "E", "Weak"),
- ("B", "F", "Strong"),
- ("C", "G", "Strong"),
- ("C", "F", "Strong"),
- ("D", "E", "Strong"),
- ("D", "H", "Strong"),
- ("G", "E", "Strong"),
- ("H", "F", "Strong"),
- ]
- G = nx.DiGraph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target, type in edges:
- G.add_edge(source, target, type=type)
- return G
- def build_summary_graph(self):
- nodes = {
- "Supernode-0": {"color": "Red"},
- "Supernode-1": {"color": "Green"},
- "Supernode-2": {"color": "Blue"},
- "Supernode-3": {"color": "Yellow"},
- }
- edges = [
- ("Supernode-0", "Supernode-1", [{"type": "Strong"}]),
- ("Supernode-0", "Supernode-2", [{"type": "Weak"}, {"type": "Strong"}]),
- ("Supernode-1", "Supernode-2", [{"type": "Strong"}]),
- ("Supernode-1", "Supernode-3", [{"type": "Strong"}]),
- ("Supernode-3", "Supernode-2", [{"type": "Strong"}]),
- ]
- G = nx.DiGraph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target, types in edges:
- G.add_edge(source, target, types=types)
- supernodes = {
- "Supernode-0": {"A", "B"},
- "Supernode-1": {"C", "D"},
- "Supernode-2": {"E", "F"},
- "Supernode-3": {"G", "H"},
- "Supernode-4": {"I", "J"},
- "Supernode-5": {"K", "L"},
- }
- nx.set_node_attributes(G, supernodes, "group")
- return G
- class TestSNAPUndirectedMulti(AbstractSNAP):
- def build_original_graph(self):
- nodes = {
- "A": {"color": "Red"},
- "B": {"color": "Red"},
- "C": {"color": "Red"},
- "D": {"color": "Blue"},
- "E": {"color": "Blue"},
- "F": {"color": "Blue"},
- "G": {"color": "Yellow"},
- "H": {"color": "Yellow"},
- "I": {"color": "Yellow"},
- }
- edges = [
- ("A", "D", ["Weak", "Strong"]),
- ("B", "E", ["Weak", "Strong"]),
- ("D", "I", ["Strong"]),
- ("E", "H", ["Strong"]),
- ("F", "G", ["Weak"]),
- ("I", "G", ["Weak", "Strong"]),
- ("I", "H", ["Weak", "Strong"]),
- ("G", "H", ["Weak", "Strong"]),
- ]
- G = nx.MultiGraph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target, types in edges:
- for type in types:
- G.add_edge(source, target, type=type)
- return G
- def build_summary_graph(self):
- nodes = {
- "Supernode-0": {"color": "Red"},
- "Supernode-1": {"color": "Blue"},
- "Supernode-2": {"color": "Yellow"},
- "Supernode-3": {"color": "Blue"},
- "Supernode-4": {"color": "Yellow"},
- "Supernode-5": {"color": "Red"},
- }
- edges = [
- ("Supernode-1", "Supernode-2", [{"type": "Weak"}]),
- ("Supernode-2", "Supernode-4", [{"type": "Weak"}, {"type": "Strong"}]),
- ("Supernode-3", "Supernode-4", [{"type": "Strong"}]),
- ("Supernode-3", "Supernode-5", [{"type": "Weak"}, {"type": "Strong"}]),
- ("Supernode-4", "Supernode-4", [{"type": "Weak"}, {"type": "Strong"}]),
- ]
- G = nx.MultiGraph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target, types in edges:
- for type in types:
- G.add_edge(source, target, type=type)
- supernodes = {
- "Supernode-0": {"A", "B"},
- "Supernode-1": {"C", "D"},
- "Supernode-2": {"E", "F"},
- "Supernode-3": {"G", "H"},
- "Supernode-4": {"I", "J"},
- "Supernode-5": {"K", "L"},
- }
- nx.set_node_attributes(G, supernodes, "group")
- return G
- class TestSNAPDirectedMulti(AbstractSNAP):
- def build_original_graph(self):
- nodes = {
- "A": {"color": "Red"},
- "B": {"color": "Red"},
- "C": {"color": "Green"},
- "D": {"color": "Green"},
- "E": {"color": "Blue"},
- "F": {"color": "Blue"},
- "G": {"color": "Yellow"},
- "H": {"color": "Yellow"},
- }
- edges = [
- ("A", "C", ["Weak", "Strong"]),
- ("A", "E", ["Strong"]),
- ("A", "F", ["Weak"]),
- ("B", "D", ["Weak", "Strong"]),
- ("B", "E", ["Weak"]),
- ("B", "F", ["Strong"]),
- ("C", "G", ["Weak", "Strong"]),
- ("C", "F", ["Strong"]),
- ("D", "E", ["Strong"]),
- ("D", "H", ["Weak", "Strong"]),
- ("G", "E", ["Strong"]),
- ("H", "F", ["Strong"]),
- ]
- G = nx.MultiDiGraph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target, types in edges:
- for type in types:
- G.add_edge(source, target, type=type)
- return G
- def build_summary_graph(self):
- nodes = {
- "Supernode-0": {"color": "Red"},
- "Supernode-1": {"color": "Blue"},
- "Supernode-2": {"color": "Yellow"},
- "Supernode-3": {"color": "Blue"},
- }
- edges = [
- ("Supernode-0", "Supernode-1", ["Weak", "Strong"]),
- ("Supernode-0", "Supernode-2", ["Weak", "Strong"]),
- ("Supernode-1", "Supernode-2", ["Strong"]),
- ("Supernode-1", "Supernode-3", ["Weak", "Strong"]),
- ("Supernode-3", "Supernode-2", ["Strong"]),
- ]
- G = nx.MultiDiGraph()
- for node in nodes:
- attributes = nodes[node]
- G.add_node(node, **attributes)
- for source, target, types in edges:
- for type in types:
- G.add_edge(source, target, type=type)
- supernodes = {
- "Supernode-0": {"A", "B"},
- "Supernode-1": {"C", "D"},
- "Supernode-2": {"E", "F"},
- "Supernode-3": {"G", "H"},
- }
- nx.set_node_attributes(G, supernodes, "group")
- return G
|