123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050 |
- """
- *******
- GraphML
- *******
- Read and write graphs in GraphML format.
- .. warning::
- This parser uses the standard xml library present in Python, which is
- insecure - see :external+python:mod:`xml` for additional information.
- Only parse GraphML files you trust.
- This implementation does not support mixed graphs (directed and unidirected
- edges together), hyperedges, nested graphs, or ports.
- "GraphML is a comprehensive and easy-to-use file format for graphs. It
- consists of a language core to describe the structural properties of a
- graph and a flexible extension mechanism to add application-specific
- data. Its main features include support of
- * directed, undirected, and mixed graphs,
- * hypergraphs,
- * hierarchical graphs,
- * graphical representations,
- * references to external data,
- * application-specific attribute data, and
- * light-weight parsers.
- Unlike many other file formats for graphs, GraphML does not use a
- custom syntax. Instead, it is based on XML and hence ideally suited as
- a common denominator for all kinds of services generating, archiving,
- or processing graphs."
- http://graphml.graphdrawing.org/
- Format
- ------
- GraphML is an XML format. See
- http://graphml.graphdrawing.org/specification.html for the specification and
- http://graphml.graphdrawing.org/primer/graphml-primer.html
- for examples.
- """
- import warnings
- from collections import defaultdict
- import networkx as nx
- from networkx.utils import open_file
- __all__ = [
- "write_graphml",
- "read_graphml",
- "generate_graphml",
- "write_graphml_xml",
- "write_graphml_lxml",
- "parse_graphml",
- "GraphMLWriter",
- "GraphMLReader",
- ]
- @open_file(1, mode="wb")
- def write_graphml_xml(
- G,
- path,
- encoding="utf-8",
- prettyprint=True,
- infer_numeric_types=False,
- named_key_ids=False,
- edge_id_from_attribute=None,
- ):
- """Write G in GraphML XML format to path
- Parameters
- ----------
- G : graph
- A networkx graph
- path : file or string
- File or filename to write.
- Filenames ending in .gz or .bz2 will be compressed.
- encoding : string (optional)
- Encoding for text data.
- prettyprint : bool (optional)
- If True use line breaks and indenting in output XML.
- infer_numeric_types : boolean
- Determine if numeric types should be generalized.
- For example, if edges have both int and float 'weight' attributes,
- we infer in GraphML that both are floats.
- named_key_ids : bool (optional)
- If True use attr.name as value for key elements' id attribute.
- edge_id_from_attribute : dict key (optional)
- If provided, the graphml edge id is set by looking up the corresponding
- edge data attribute keyed by this parameter. If `None` or the key does not exist in edge data,
- the edge id is set by the edge key if `G` is a MultiGraph, else the edge id is left unset.
- Examples
- --------
- >>> G = nx.path_graph(4)
- >>> nx.write_graphml(G, "test.graphml")
- Notes
- -----
- This implementation does not support mixed graphs (directed
- and unidirected edges together) hyperedges, nested graphs, or ports.
- """
- writer = GraphMLWriter(
- encoding=encoding,
- prettyprint=prettyprint,
- infer_numeric_types=infer_numeric_types,
- named_key_ids=named_key_ids,
- edge_id_from_attribute=edge_id_from_attribute,
- )
- writer.add_graph_element(G)
- writer.dump(path)
- @open_file(1, mode="wb")
- def write_graphml_lxml(
- G,
- path,
- encoding="utf-8",
- prettyprint=True,
- infer_numeric_types=False,
- named_key_ids=False,
- edge_id_from_attribute=None,
- ):
- """Write G in GraphML XML format to path
- This function uses the LXML framework and should be faster than
- the version using the xml library.
- Parameters
- ----------
- G : graph
- A networkx graph
- path : file or string
- File or filename to write.
- Filenames ending in .gz or .bz2 will be compressed.
- encoding : string (optional)
- Encoding for text data.
- prettyprint : bool (optional)
- If True use line breaks and indenting in output XML.
- infer_numeric_types : boolean
- Determine if numeric types should be generalized.
- For example, if edges have both int and float 'weight' attributes,
- we infer in GraphML that both are floats.
- named_key_ids : bool (optional)
- If True use attr.name as value for key elements' id attribute.
- edge_id_from_attribute : dict key (optional)
- If provided, the graphml edge id is set by looking up the corresponding
- edge data attribute keyed by this parameter. If `None` or the key does not exist in edge data,
- the edge id is set by the edge key if `G` is a MultiGraph, else the edge id is left unset.
- Examples
- --------
- >>> G = nx.path_graph(4)
- >>> nx.write_graphml_lxml(G, "fourpath.graphml")
- Notes
- -----
- This implementation does not support mixed graphs (directed
- and unidirected edges together) hyperedges, nested graphs, or ports.
- """
- try:
- import lxml.etree as lxmletree
- except ImportError:
- return write_graphml_xml(
- G,
- path,
- encoding,
- prettyprint,
- infer_numeric_types,
- named_key_ids,
- edge_id_from_attribute,
- )
- writer = GraphMLWriterLxml(
- path,
- graph=G,
- encoding=encoding,
- prettyprint=prettyprint,
- infer_numeric_types=infer_numeric_types,
- named_key_ids=named_key_ids,
- edge_id_from_attribute=edge_id_from_attribute,
- )
- writer.dump()
- def generate_graphml(
- G,
- encoding="utf-8",
- prettyprint=True,
- named_key_ids=False,
- edge_id_from_attribute=None,
- ):
- """Generate GraphML lines for G
- Parameters
- ----------
- G : graph
- A networkx graph
- encoding : string (optional)
- Encoding for text data.
- prettyprint : bool (optional)
- If True use line breaks and indenting in output XML.
- named_key_ids : bool (optional)
- If True use attr.name as value for key elements' id attribute.
- edge_id_from_attribute : dict key (optional)
- If provided, the graphml edge id is set by looking up the corresponding
- edge data attribute keyed by this parameter. If `None` or the key does not exist in edge data,
- the edge id is set by the edge key if `G` is a MultiGraph, else the edge id is left unset.
- Examples
- --------
- >>> G = nx.path_graph(4)
- >>> linefeed = chr(10) # linefeed = \n
- >>> s = linefeed.join(nx.generate_graphml(G))
- >>> for line in nx.generate_graphml(G): # doctest: +SKIP
- ... print(line)
- Notes
- -----
- This implementation does not support mixed graphs (directed and unidirected
- edges together) hyperedges, nested graphs, or ports.
- """
- writer = GraphMLWriter(
- encoding=encoding,
- prettyprint=prettyprint,
- named_key_ids=named_key_ids,
- edge_id_from_attribute=edge_id_from_attribute,
- )
- writer.add_graph_element(G)
- yield from str(writer).splitlines()
- @open_file(0, mode="rb")
- def read_graphml(path, node_type=str, edge_key_type=int, force_multigraph=False):
- """Read graph in GraphML format from path.
- Parameters
- ----------
- path : file or string
- File or filename to write.
- Filenames ending in .gz or .bz2 will be compressed.
- node_type: Python type (default: str)
- Convert node ids to this type
- edge_key_type: Python type (default: int)
- Convert graphml edge ids to this type. Multigraphs use id as edge key.
- Non-multigraphs add to edge attribute dict with name "id".
- force_multigraph : bool (default: False)
- If True, return a multigraph with edge keys. If False (the default)
- return a multigraph when multiedges are in the graph.
- Returns
- -------
- graph: NetworkX graph
- If parallel edges are present or `force_multigraph=True` then
- a MultiGraph or MultiDiGraph is returned. Otherwise a Graph/DiGraph.
- The returned graph is directed if the file indicates it should be.
- Notes
- -----
- Default node and edge attributes are not propagated to each node and edge.
- They can be obtained from `G.graph` and applied to node and edge attributes
- if desired using something like this:
- >>> default_color = G.graph["node_default"]["color"] # doctest: +SKIP
- >>> for node, data in G.nodes(data=True): # doctest: +SKIP
- ... if "color" not in data:
- ... data["color"] = default_color
- >>> default_color = G.graph["edge_default"]["color"] # doctest: +SKIP
- >>> for u, v, data in G.edges(data=True): # doctest: +SKIP
- ... if "color" not in data:
- ... data["color"] = default_color
- This implementation does not support mixed graphs (directed and unidirected
- edges together), hypergraphs, nested graphs, or ports.
- For multigraphs the GraphML edge "id" will be used as the edge
- key. If not specified then they "key" attribute will be used. If
- there is no "key" attribute a default NetworkX multigraph edge key
- will be provided.
- Files with the yEd "yfiles" extension can be read. The type of the node's
- shape is preserved in the `shape_type` node attribute.
- yEd compressed files ("file.graphmlz" extension) can be read by renaming
- the file to "file.graphml.gz".
- """
- reader = GraphMLReader(node_type, edge_key_type, force_multigraph)
- # need to check for multiple graphs
- glist = list(reader(path=path))
- if len(glist) == 0:
- # If no graph comes back, try looking for an incomplete header
- header = b'<graphml xmlns="http://graphml.graphdrawing.org/xmlns">'
- path.seek(0)
- old_bytes = path.read()
- new_bytes = old_bytes.replace(b"<graphml>", header)
- glist = list(reader(string=new_bytes))
- if len(glist) == 0:
- raise nx.NetworkXError("file not successfully read as graphml")
- return glist[0]
- def parse_graphml(
- graphml_string, node_type=str, edge_key_type=int, force_multigraph=False
- ):
- """Read graph in GraphML format from string.
- Parameters
- ----------
- graphml_string : string
- String containing graphml information
- (e.g., contents of a graphml file).
- node_type: Python type (default: str)
- Convert node ids to this type
- edge_key_type: Python type (default: int)
- Convert graphml edge ids to this type. Multigraphs use id as edge key.
- Non-multigraphs add to edge attribute dict with name "id".
- force_multigraph : bool (default: False)
- If True, return a multigraph with edge keys. If False (the default)
- return a multigraph when multiedges are in the graph.
- Returns
- -------
- graph: NetworkX graph
- If no parallel edges are found a Graph or DiGraph is returned.
- Otherwise a MultiGraph or MultiDiGraph is returned.
- Examples
- --------
- >>> G = nx.path_graph(4)
- >>> linefeed = chr(10) # linefeed = \n
- >>> s = linefeed.join(nx.generate_graphml(G))
- >>> H = nx.parse_graphml(s)
- Notes
- -----
- Default node and edge attributes are not propagated to each node and edge.
- They can be obtained from `G.graph` and applied to node and edge attributes
- if desired using something like this:
- >>> default_color = G.graph["node_default"]["color"] # doctest: +SKIP
- >>> for node, data in G.nodes(data=True): # doctest: +SKIP
- ... if "color" not in data:
- ... data["color"] = default_color
- >>> default_color = G.graph["edge_default"]["color"] # doctest: +SKIP
- >>> for u, v, data in G.edges(data=True): # doctest: +SKIP
- ... if "color" not in data:
- ... data["color"] = default_color
- This implementation does not support mixed graphs (directed and unidirected
- edges together), hypergraphs, nested graphs, or ports.
- For multigraphs the GraphML edge "id" will be used as the edge
- key. If not specified then they "key" attribute will be used. If
- there is no "key" attribute a default NetworkX multigraph edge key
- will be provided.
- """
- reader = GraphMLReader(node_type, edge_key_type, force_multigraph)
- # need to check for multiple graphs
- glist = list(reader(string=graphml_string))
- if len(glist) == 0:
- # If no graph comes back, try looking for an incomplete header
- header = '<graphml xmlns="http://graphml.graphdrawing.org/xmlns">'
- new_string = graphml_string.replace("<graphml>", header)
- glist = list(reader(string=new_string))
- if len(glist) == 0:
- raise nx.NetworkXError("file not successfully read as graphml")
- return glist[0]
- class GraphML:
- NS_GRAPHML = "http://graphml.graphdrawing.org/xmlns"
- NS_XSI = "http://www.w3.org/2001/XMLSchema-instance"
- # xmlns:y="http://www.yworks.com/xml/graphml"
- NS_Y = "http://www.yworks.com/xml/graphml"
- SCHEMALOCATION = " ".join(
- [
- "http://graphml.graphdrawing.org/xmlns",
- "http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd",
- ]
- )
- def construct_types(self):
- types = [
- (int, "integer"), # for Gephi GraphML bug
- (str, "yfiles"),
- (str, "string"),
- (int, "int"),
- (int, "long"),
- (float, "float"),
- (float, "double"),
- (bool, "boolean"),
- ]
- # These additions to types allow writing numpy types
- try:
- import numpy as np
- except:
- pass
- else:
- # prepend so that python types are created upon read (last entry wins)
- types = [
- (np.float64, "float"),
- (np.float32, "float"),
- (np.float16, "float"),
- (np.float_, "float"),
- (np.int_, "int"),
- (np.int8, "int"),
- (np.int16, "int"),
- (np.int32, "int"),
- (np.int64, "int"),
- (np.uint8, "int"),
- (np.uint16, "int"),
- (np.uint32, "int"),
- (np.uint64, "int"),
- (np.int_, "int"),
- (np.intc, "int"),
- (np.intp, "int"),
- ] + types
- self.xml_type = dict(types)
- self.python_type = dict(reversed(a) for a in types)
- # This page says that data types in GraphML follow Java(TM).
- # http://graphml.graphdrawing.org/primer/graphml-primer.html#AttributesDefinition
- # true and false are the only boolean literals:
- # http://en.wikibooks.org/wiki/Java_Programming/Literals#Boolean_Literals
- convert_bool = {
- # We use data.lower() in actual use.
- "true": True,
- "false": False,
- # Include integer strings for convenience.
- "0": False,
- 0: False,
- "1": True,
- 1: True,
- }
- def get_xml_type(self, key):
- """Wrapper around the xml_type dict that raises a more informative
- exception message when a user attempts to use data of a type not
- supported by GraphML."""
- try:
- return self.xml_type[key]
- except KeyError as err:
- raise TypeError(
- f"GraphML does not support type {type(key)} as data values."
- ) from err
- class GraphMLWriter(GraphML):
- def __init__(
- self,
- graph=None,
- encoding="utf-8",
- prettyprint=True,
- infer_numeric_types=False,
- named_key_ids=False,
- edge_id_from_attribute=None,
- ):
- self.construct_types()
- from xml.etree.ElementTree import Element
- self.myElement = Element
- self.infer_numeric_types = infer_numeric_types
- self.prettyprint = prettyprint
- self.named_key_ids = named_key_ids
- self.edge_id_from_attribute = edge_id_from_attribute
- self.encoding = encoding
- self.xml = self.myElement(
- "graphml",
- {
- "xmlns": self.NS_GRAPHML,
- "xmlns:xsi": self.NS_XSI,
- "xsi:schemaLocation": self.SCHEMALOCATION,
- },
- )
- self.keys = {}
- self.attributes = defaultdict(list)
- self.attribute_types = defaultdict(set)
- if graph is not None:
- self.add_graph_element(graph)
- def __str__(self):
- from xml.etree.ElementTree import tostring
- if self.prettyprint:
- self.indent(self.xml)
- s = tostring(self.xml).decode(self.encoding)
- return s
- def attr_type(self, name, scope, value):
- """Infer the attribute type of data named name. Currently this only
- supports inference of numeric types.
- If self.infer_numeric_types is false, type is used. Otherwise, pick the
- most general of types found across all values with name and scope. This
- means edges with data named 'weight' are treated separately from nodes
- with data named 'weight'.
- """
- if self.infer_numeric_types:
- types = self.attribute_types[(name, scope)]
- if len(types) > 1:
- types = {self.get_xml_type(t) for t in types}
- if "string" in types:
- return str
- elif "float" in types or "double" in types:
- return float
- else:
- return int
- else:
- return list(types)[0]
- else:
- return type(value)
- def get_key(self, name, attr_type, scope, default):
- keys_key = (name, attr_type, scope)
- try:
- return self.keys[keys_key]
- except KeyError:
- if self.named_key_ids:
- new_id = name
- else:
- new_id = f"d{len(list(self.keys))}"
- self.keys[keys_key] = new_id
- key_kwargs = {
- "id": new_id,
- "for": scope,
- "attr.name": name,
- "attr.type": attr_type,
- }
- key_element = self.myElement("key", **key_kwargs)
- # add subelement for data default value if present
- if default is not None:
- default_element = self.myElement("default")
- default_element.text = str(default)
- key_element.append(default_element)
- self.xml.insert(0, key_element)
- return new_id
- def add_data(self, name, element_type, value, scope="all", default=None):
- """
- Make a data element for an edge or a node. Keep a log of the
- type in the keys table.
- """
- if element_type not in self.xml_type:
- raise nx.NetworkXError(
- f"GraphML writer does not support {element_type} as data values."
- )
- keyid = self.get_key(name, self.get_xml_type(element_type), scope, default)
- data_element = self.myElement("data", key=keyid)
- data_element.text = str(value)
- return data_element
- def add_attributes(self, scope, xml_obj, data, default):
- """Appends attribute data to edges or nodes, and stores type information
- to be added later. See add_graph_element.
- """
- for k, v in data.items():
- self.attribute_types[(str(k), scope)].add(type(v))
- self.attributes[xml_obj].append([k, v, scope, default.get(k)])
- def add_nodes(self, G, graph_element):
- default = G.graph.get("node_default", {})
- for node, data in G.nodes(data=True):
- node_element = self.myElement("node", id=str(node))
- self.add_attributes("node", node_element, data, default)
- graph_element.append(node_element)
- def add_edges(self, G, graph_element):
- if G.is_multigraph():
- for u, v, key, data in G.edges(data=True, keys=True):
- edge_element = self.myElement(
- "edge",
- source=str(u),
- target=str(v),
- id=str(data.get(self.edge_id_from_attribute))
- if self.edge_id_from_attribute
- and self.edge_id_from_attribute in data
- else str(key),
- )
- default = G.graph.get("edge_default", {})
- self.add_attributes("edge", edge_element, data, default)
- graph_element.append(edge_element)
- else:
- for u, v, data in G.edges(data=True):
- if self.edge_id_from_attribute and self.edge_id_from_attribute in data:
- # select attribute to be edge id
- edge_element = self.myElement(
- "edge",
- source=str(u),
- target=str(v),
- id=str(data.get(self.edge_id_from_attribute)),
- )
- else:
- # default: no edge id
- edge_element = self.myElement("edge", source=str(u), target=str(v))
- default = G.graph.get("edge_default", {})
- self.add_attributes("edge", edge_element, data, default)
- graph_element.append(edge_element)
- def add_graph_element(self, G):
- """
- Serialize graph G in GraphML to the stream.
- """
- if G.is_directed():
- default_edge_type = "directed"
- else:
- default_edge_type = "undirected"
- graphid = G.graph.pop("id", None)
- if graphid is None:
- graph_element = self.myElement("graph", edgedefault=default_edge_type)
- else:
- graph_element = self.myElement(
- "graph", edgedefault=default_edge_type, id=graphid
- )
- default = {}
- data = {
- k: v
- for (k, v) in G.graph.items()
- if k not in ["node_default", "edge_default"]
- }
- self.add_attributes("graph", graph_element, data, default)
- self.add_nodes(G, graph_element)
- self.add_edges(G, graph_element)
- # self.attributes contains a mapping from XML Objects to a list of
- # data that needs to be added to them.
- # We postpone processing in order to do type inference/generalization.
- # See self.attr_type
- for xml_obj, data in self.attributes.items():
- for k, v, scope, default in data:
- xml_obj.append(
- self.add_data(
- str(k), self.attr_type(k, scope, v), str(v), scope, default
- )
- )
- self.xml.append(graph_element)
- def add_graphs(self, graph_list):
- """Add many graphs to this GraphML document."""
- for G in graph_list:
- self.add_graph_element(G)
- def dump(self, stream):
- from xml.etree.ElementTree import ElementTree
- if self.prettyprint:
- self.indent(self.xml)
- document = ElementTree(self.xml)
- document.write(stream, encoding=self.encoding, xml_declaration=True)
- def indent(self, elem, level=0):
- # in-place prettyprint formatter
- i = "\n" + level * " "
- if len(elem):
- if not elem.text or not elem.text.strip():
- elem.text = i + " "
- if not elem.tail or not elem.tail.strip():
- elem.tail = i
- for elem in elem:
- self.indent(elem, level + 1)
- if not elem.tail or not elem.tail.strip():
- elem.tail = i
- else:
- if level and (not elem.tail or not elem.tail.strip()):
- elem.tail = i
- class IncrementalElement:
- """Wrapper for _IncrementalWriter providing an Element like interface.
- This wrapper does not intend to be a complete implementation but rather to
- deal with those calls used in GraphMLWriter.
- """
- def __init__(self, xml, prettyprint):
- self.xml = xml
- self.prettyprint = prettyprint
- def append(self, element):
- self.xml.write(element, pretty_print=self.prettyprint)
- class GraphMLWriterLxml(GraphMLWriter):
- def __init__(
- self,
- path,
- graph=None,
- encoding="utf-8",
- prettyprint=True,
- infer_numeric_types=False,
- named_key_ids=False,
- edge_id_from_attribute=None,
- ):
- self.construct_types()
- import lxml.etree as lxmletree
- self.myElement = lxmletree.Element
- self._encoding = encoding
- self._prettyprint = prettyprint
- self.named_key_ids = named_key_ids
- self.edge_id_from_attribute = edge_id_from_attribute
- self.infer_numeric_types = infer_numeric_types
- self._xml_base = lxmletree.xmlfile(path, encoding=encoding)
- self._xml = self._xml_base.__enter__()
- self._xml.write_declaration()
- # We need to have a xml variable that support insertion. This call is
- # used for adding the keys to the document.
- # We will store those keys in a plain list, and then after the graph
- # element is closed we will add them to the main graphml element.
- self.xml = []
- self._keys = self.xml
- self._graphml = self._xml.element(
- "graphml",
- {
- "xmlns": self.NS_GRAPHML,
- "xmlns:xsi": self.NS_XSI,
- "xsi:schemaLocation": self.SCHEMALOCATION,
- },
- )
- self._graphml.__enter__()
- self.keys = {}
- self.attribute_types = defaultdict(set)
- if graph is not None:
- self.add_graph_element(graph)
- def add_graph_element(self, G):
- """
- Serialize graph G in GraphML to the stream.
- """
- if G.is_directed():
- default_edge_type = "directed"
- else:
- default_edge_type = "undirected"
- graphid = G.graph.pop("id", None)
- if graphid is None:
- graph_element = self._xml.element("graph", edgedefault=default_edge_type)
- else:
- graph_element = self._xml.element(
- "graph", edgedefault=default_edge_type, id=graphid
- )
- # gather attributes types for the whole graph
- # to find the most general numeric format needed.
- # Then pass through attributes to create key_id for each.
- graphdata = {
- k: v
- for k, v in G.graph.items()
- if k not in ("node_default", "edge_default")
- }
- node_default = G.graph.get("node_default", {})
- edge_default = G.graph.get("edge_default", {})
- # Graph attributes
- for k, v in graphdata.items():
- self.attribute_types[(str(k), "graph")].add(type(v))
- for k, v in graphdata.items():
- element_type = self.get_xml_type(self.attr_type(k, "graph", v))
- self.get_key(str(k), element_type, "graph", None)
- # Nodes and data
- for node, d in G.nodes(data=True):
- for k, v in d.items():
- self.attribute_types[(str(k), "node")].add(type(v))
- for node, d in G.nodes(data=True):
- for k, v in d.items():
- T = self.get_xml_type(self.attr_type(k, "node", v))
- self.get_key(str(k), T, "node", node_default.get(k))
- # Edges and data
- if G.is_multigraph():
- for u, v, ekey, d in G.edges(keys=True, data=True):
- for k, v in d.items():
- self.attribute_types[(str(k), "edge")].add(type(v))
- for u, v, ekey, d in G.edges(keys=True, data=True):
- for k, v in d.items():
- T = self.get_xml_type(self.attr_type(k, "edge", v))
- self.get_key(str(k), T, "edge", edge_default.get(k))
- else:
- for u, v, d in G.edges(data=True):
- for k, v in d.items():
- self.attribute_types[(str(k), "edge")].add(type(v))
- for u, v, d in G.edges(data=True):
- for k, v in d.items():
- T = self.get_xml_type(self.attr_type(k, "edge", v))
- self.get_key(str(k), T, "edge", edge_default.get(k))
- # Now add attribute keys to the xml file
- for key in self.xml:
- self._xml.write(key, pretty_print=self._prettyprint)
- # The incremental_writer writes each node/edge as it is created
- incremental_writer = IncrementalElement(self._xml, self._prettyprint)
- with graph_element:
- self.add_attributes("graph", incremental_writer, graphdata, {})
- self.add_nodes(G, incremental_writer) # adds attributes too
- self.add_edges(G, incremental_writer) # adds attributes too
- def add_attributes(self, scope, xml_obj, data, default):
- """Appends attribute data."""
- for k, v in data.items():
- data_element = self.add_data(
- str(k), self.attr_type(str(k), scope, v), str(v), scope, default.get(k)
- )
- xml_obj.append(data_element)
- def __str__(self):
- return object.__str__(self)
- def dump(self):
- self._graphml.__exit__(None, None, None)
- self._xml_base.__exit__(None, None, None)
- # default is lxml is present.
- write_graphml = write_graphml_lxml
- class GraphMLReader(GraphML):
- """Read a GraphML document. Produces NetworkX graph objects."""
- def __init__(self, node_type=str, edge_key_type=int, force_multigraph=False):
- self.construct_types()
- self.node_type = node_type
- self.edge_key_type = edge_key_type
- self.multigraph = force_multigraph # If False, test for multiedges
- self.edge_ids = {} # dict mapping (u,v) tuples to edge id attributes
- def __call__(self, path=None, string=None):
- from xml.etree.ElementTree import ElementTree, fromstring
- if path is not None:
- self.xml = ElementTree(file=path)
- elif string is not None:
- self.xml = fromstring(string)
- else:
- raise ValueError("Must specify either 'path' or 'string' as kwarg")
- (keys, defaults) = self.find_graphml_keys(self.xml)
- for g in self.xml.findall(f"{{{self.NS_GRAPHML}}}graph"):
- yield self.make_graph(g, keys, defaults)
- def make_graph(self, graph_xml, graphml_keys, defaults, G=None):
- # set default graph type
- edgedefault = graph_xml.get("edgedefault", None)
- if G is None:
- if edgedefault == "directed":
- G = nx.MultiDiGraph()
- else:
- G = nx.MultiGraph()
- # set defaults for graph attributes
- G.graph["node_default"] = {}
- G.graph["edge_default"] = {}
- for key_id, value in defaults.items():
- key_for = graphml_keys[key_id]["for"]
- name = graphml_keys[key_id]["name"]
- python_type = graphml_keys[key_id]["type"]
- if key_for == "node":
- G.graph["node_default"].update({name: python_type(value)})
- if key_for == "edge":
- G.graph["edge_default"].update({name: python_type(value)})
- # hyperedges are not supported
- hyperedge = graph_xml.find(f"{{{self.NS_GRAPHML}}}hyperedge")
- if hyperedge is not None:
- raise nx.NetworkXError("GraphML reader doesn't support hyperedges")
- # add nodes
- for node_xml in graph_xml.findall(f"{{{self.NS_GRAPHML}}}node"):
- self.add_node(G, node_xml, graphml_keys, defaults)
- # add edges
- for edge_xml in graph_xml.findall(f"{{{self.NS_GRAPHML}}}edge"):
- self.add_edge(G, edge_xml, graphml_keys)
- # add graph data
- data = self.decode_data_elements(graphml_keys, graph_xml)
- G.graph.update(data)
- # switch to Graph or DiGraph if no parallel edges were found
- if self.multigraph:
- return G
- G = nx.DiGraph(G) if G.is_directed() else nx.Graph(G)
- # add explicit edge "id" from file as attribute in NX graph.
- nx.set_edge_attributes(G, values=self.edge_ids, name="id")
- return G
- def add_node(self, G, node_xml, graphml_keys, defaults):
- """Add a node to the graph."""
- # warn on finding unsupported ports tag
- ports = node_xml.find(f"{{{self.NS_GRAPHML}}}port")
- if ports is not None:
- warnings.warn("GraphML port tag not supported.")
- # find the node by id and cast it to the appropriate type
- node_id = self.node_type(node_xml.get("id"))
- # get data/attributes for node
- data = self.decode_data_elements(graphml_keys, node_xml)
- G.add_node(node_id, **data)
- # get child nodes
- if node_xml.attrib.get("yfiles.foldertype") == "group":
- graph_xml = node_xml.find(f"{{{self.NS_GRAPHML}}}graph")
- self.make_graph(graph_xml, graphml_keys, defaults, G)
- def add_edge(self, G, edge_element, graphml_keys):
- """Add an edge to the graph."""
- # warn on finding unsupported ports tag
- ports = edge_element.find(f"{{{self.NS_GRAPHML}}}port")
- if ports is not None:
- warnings.warn("GraphML port tag not supported.")
- # raise error if we find mixed directed and undirected edges
- directed = edge_element.get("directed")
- if G.is_directed() and directed == "false":
- msg = "directed=false edge found in directed graph."
- raise nx.NetworkXError(msg)
- if (not G.is_directed()) and directed == "true":
- msg = "directed=true edge found in undirected graph."
- raise nx.NetworkXError(msg)
- source = self.node_type(edge_element.get("source"))
- target = self.node_type(edge_element.get("target"))
- data = self.decode_data_elements(graphml_keys, edge_element)
- # GraphML stores edge ids as an attribute
- # NetworkX uses them as keys in multigraphs too if no key
- # attribute is specified
- edge_id = edge_element.get("id")
- if edge_id:
- # self.edge_ids is used by `make_graph` method for non-multigraphs
- self.edge_ids[source, target] = edge_id
- try:
- edge_id = self.edge_key_type(edge_id)
- except ValueError: # Could not convert.
- pass
- else:
- edge_id = data.get("key")
- if G.has_edge(source, target):
- # mark this as a multigraph
- self.multigraph = True
- # Use add_edges_from to avoid error with add_edge when `'key' in data`
- # Note there is only one edge here...
- G.add_edges_from([(source, target, edge_id, data)])
- def decode_data_elements(self, graphml_keys, obj_xml):
- """Use the key information to decode the data XML if present."""
- data = {}
- for data_element in obj_xml.findall(f"{{{self.NS_GRAPHML}}}data"):
- key = data_element.get("key")
- try:
- data_name = graphml_keys[key]["name"]
- data_type = graphml_keys[key]["type"]
- except KeyError as err:
- raise nx.NetworkXError(f"Bad GraphML data: no key {key}") from err
- text = data_element.text
- # assume anything with subelements is a yfiles extension
- if text is not None and len(list(data_element)) == 0:
- if data_type == bool:
- # Ignore cases.
- # http://docs.oracle.com/javase/6/docs/api/java/lang/
- # Boolean.html#parseBoolean%28java.lang.String%29
- data[data_name] = self.convert_bool[text.lower()]
- else:
- data[data_name] = data_type(text)
- elif len(list(data_element)) > 0:
- # Assume yfiles as subelements, try to extract node_label
- node_label = None
- # set GenericNode's configuration as shape type
- gn = data_element.find(f"{{{self.NS_Y}}}GenericNode")
- if gn:
- data["shape_type"] = gn.get("configuration")
- for node_type in ["GenericNode", "ShapeNode", "SVGNode", "ImageNode"]:
- pref = f"{{{self.NS_Y}}}{node_type}/{{{self.NS_Y}}}"
- geometry = data_element.find(f"{pref}Geometry")
- if geometry is not None:
- data["x"] = geometry.get("x")
- data["y"] = geometry.get("y")
- if node_label is None:
- node_label = data_element.find(f"{pref}NodeLabel")
- shape = data_element.find(f"{pref}Shape")
- if shape is not None:
- data["shape_type"] = shape.get("type")
- if node_label is not None:
- data["label"] = node_label.text
- # check all the different types of edges avaivable in yEd.
- for edge_type in [
- "PolyLineEdge",
- "SplineEdge",
- "QuadCurveEdge",
- "BezierEdge",
- "ArcEdge",
- ]:
- pref = f"{{{self.NS_Y}}}{edge_type}/{{{self.NS_Y}}}"
- edge_label = data_element.find(f"{pref}EdgeLabel")
- if edge_label is not None:
- break
- if edge_label is not None:
- data["label"] = edge_label.text
- return data
- def find_graphml_keys(self, graph_element):
- """Extracts all the keys and key defaults from the xml."""
- graphml_keys = {}
- graphml_key_defaults = {}
- for k in graph_element.findall(f"{{{self.NS_GRAPHML}}}key"):
- attr_id = k.get("id")
- attr_type = k.get("attr.type")
- attr_name = k.get("attr.name")
- yfiles_type = k.get("yfiles.type")
- if yfiles_type is not None:
- attr_name = yfiles_type
- attr_type = "yfiles"
- if attr_type is None:
- attr_type = "string"
- warnings.warn(f"No key type for id {attr_id}. Using string")
- if attr_name is None:
- raise nx.NetworkXError(f"Unknown key for id {attr_id}.")
- graphml_keys[attr_id] = {
- "name": attr_name,
- "type": self.python_type[attr_type],
- "for": k.get("for"),
- }
- # check for "default" sub-element of key element
- default = k.find(f"{{{self.NS_GRAPHML}}}default")
- if default is not None:
- # Handle default values identically to data element values
- python_type = graphml_keys[attr_id]["type"]
- if python_type == bool:
- graphml_key_defaults[attr_id] = self.convert_bool[
- default.text.lower()
- ]
- else:
- graphml_key_defaults[attr_id] = python_type(default.text)
- return graphml_keys, graphml_key_defaults
|