test_edgelist.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. """
  2. Unit tests for edgelists.
  3. """
  4. import io
  5. import os
  6. import tempfile
  7. import textwrap
  8. import pytest
  9. import networkx as nx
  10. from networkx.utils import edges_equal, graphs_equal, nodes_equal
  11. edges_no_data = textwrap.dedent(
  12. """
  13. # comment line
  14. 1 2
  15. # comment line
  16. 2 3
  17. """
  18. )
  19. edges_with_values = textwrap.dedent(
  20. """
  21. # comment line
  22. 1 2 2.0
  23. # comment line
  24. 2 3 3.0
  25. """
  26. )
  27. edges_with_weight = textwrap.dedent(
  28. """
  29. # comment line
  30. 1 2 {'weight':2.0}
  31. # comment line
  32. 2 3 {'weight':3.0}
  33. """
  34. )
  35. edges_with_multiple_attrs = textwrap.dedent(
  36. """
  37. # comment line
  38. 1 2 {'weight':2.0, 'color':'green'}
  39. # comment line
  40. 2 3 {'weight':3.0, 'color':'red'}
  41. """
  42. )
  43. edges_with_multiple_attrs_csv = textwrap.dedent(
  44. """
  45. # comment line
  46. 1, 2, {'weight':2.0, 'color':'green'}
  47. # comment line
  48. 2, 3, {'weight':3.0, 'color':'red'}
  49. """
  50. )
  51. _expected_edges_weights = [(1, 2, {"weight": 2.0}), (2, 3, {"weight": 3.0})]
  52. _expected_edges_multiattr = [
  53. (1, 2, {"weight": 2.0, "color": "green"}),
  54. (2, 3, {"weight": 3.0, "color": "red"}),
  55. ]
  56. @pytest.mark.parametrize(
  57. ("data", "extra_kwargs"),
  58. (
  59. (edges_no_data, {}),
  60. (edges_with_values, {}),
  61. (edges_with_weight, {}),
  62. (edges_with_multiple_attrs, {}),
  63. (edges_with_multiple_attrs_csv, {"delimiter": ","}),
  64. ),
  65. )
  66. def test_read_edgelist_no_data(data, extra_kwargs):
  67. bytesIO = io.BytesIO(data.encode("utf-8"))
  68. G = nx.read_edgelist(bytesIO, nodetype=int, data=False, **extra_kwargs)
  69. assert edges_equal(G.edges(), [(1, 2), (2, 3)])
  70. def test_read_weighted_edgelist():
  71. bytesIO = io.BytesIO(edges_with_values.encode("utf-8"))
  72. G = nx.read_weighted_edgelist(bytesIO, nodetype=int)
  73. assert edges_equal(G.edges(data=True), _expected_edges_weights)
  74. @pytest.mark.parametrize(
  75. ("data", "extra_kwargs", "expected"),
  76. (
  77. (edges_with_weight, {}, _expected_edges_weights),
  78. (edges_with_multiple_attrs, {}, _expected_edges_multiattr),
  79. (edges_with_multiple_attrs_csv, {"delimiter": ","}, _expected_edges_multiattr),
  80. ),
  81. )
  82. def test_read_edgelist_with_data(data, extra_kwargs, expected):
  83. bytesIO = io.BytesIO(data.encode("utf-8"))
  84. G = nx.read_edgelist(bytesIO, nodetype=int, **extra_kwargs)
  85. assert edges_equal(G.edges(data=True), expected)
  86. @pytest.fixture
  87. def example_graph():
  88. G = nx.Graph()
  89. G.add_weighted_edges_from([(1, 2, 3.0), (2, 3, 27.0), (3, 4, 3.0)])
  90. return G
  91. def test_parse_edgelist_no_data(example_graph):
  92. G = example_graph
  93. H = nx.parse_edgelist(["1 2", "2 3", "3 4"], nodetype=int)
  94. assert nodes_equal(G.nodes, H.nodes)
  95. assert edges_equal(G.edges, H.edges)
  96. def test_parse_edgelist_with_data_dict(example_graph):
  97. G = example_graph
  98. H = nx.parse_edgelist(
  99. ["1 2 {'weight': 3}", "2 3 {'weight': 27}", "3 4 {'weight': 3.0}"], nodetype=int
  100. )
  101. assert nodes_equal(G.nodes, H.nodes)
  102. assert edges_equal(G.edges(data=True), H.edges(data=True))
  103. def test_parse_edgelist_with_data_list(example_graph):
  104. G = example_graph
  105. H = nx.parse_edgelist(
  106. ["1 2 3", "2 3 27", "3 4 3.0"], nodetype=int, data=(("weight", float),)
  107. )
  108. assert nodes_equal(G.nodes, H.nodes)
  109. assert edges_equal(G.edges(data=True), H.edges(data=True))
  110. def test_parse_edgelist():
  111. # ignore lines with less than 2 nodes
  112. lines = ["1;2", "2 3", "3 4"]
  113. G = nx.parse_edgelist(lines, nodetype=int)
  114. assert list(G.edges()) == [(2, 3), (3, 4)]
  115. # unknown nodetype
  116. with pytest.raises(TypeError, match="Failed to convert nodes"):
  117. lines = ["1 2", "2 3", "3 4"]
  118. nx.parse_edgelist(lines, nodetype="nope")
  119. # lines have invalid edge format
  120. with pytest.raises(TypeError, match="Failed to convert edge data"):
  121. lines = ["1 2 3", "2 3", "3 4"]
  122. nx.parse_edgelist(lines, nodetype=int)
  123. # edge data and data_keys not the same length
  124. with pytest.raises(IndexError, match="not the same length"):
  125. lines = ["1 2 3", "2 3 27", "3 4 3.0"]
  126. nx.parse_edgelist(
  127. lines, nodetype=int, data=(("weight", float), ("capacity", int))
  128. )
  129. # edge data can't be converted to edge type
  130. with pytest.raises(TypeError, match="Failed to convert"):
  131. lines = ["1 2 't1'", "2 3 't3'", "3 4 't3'"]
  132. nx.parse_edgelist(lines, nodetype=int, data=(("weight", float),))
  133. def test_comments_None():
  134. edgelist = ["node#1 node#2", "node#2 node#3"]
  135. # comments=None supported to ignore all comment characters
  136. G = nx.parse_edgelist(edgelist, comments=None)
  137. H = nx.Graph([e.split(" ") for e in edgelist])
  138. assert edges_equal(G.edges, H.edges)
  139. class TestEdgelist:
  140. @classmethod
  141. def setup_class(cls):
  142. cls.G = nx.Graph(name="test")
  143. e = [("a", "b"), ("b", "c"), ("c", "d"), ("d", "e"), ("e", "f"), ("a", "f")]
  144. cls.G.add_edges_from(e)
  145. cls.G.add_node("g")
  146. cls.DG = nx.DiGraph(cls.G)
  147. cls.XG = nx.MultiGraph()
  148. cls.XG.add_weighted_edges_from([(1, 2, 5), (1, 2, 5), (1, 2, 1), (3, 3, 42)])
  149. cls.XDG = nx.MultiDiGraph(cls.XG)
  150. def test_write_edgelist_1(self):
  151. fh = io.BytesIO()
  152. G = nx.Graph()
  153. G.add_edges_from([(1, 2), (2, 3)])
  154. nx.write_edgelist(G, fh, data=False)
  155. fh.seek(0)
  156. assert fh.read() == b"1 2\n2 3\n"
  157. def test_write_edgelist_2(self):
  158. fh = io.BytesIO()
  159. G = nx.Graph()
  160. G.add_edges_from([(1, 2), (2, 3)])
  161. nx.write_edgelist(G, fh, data=True)
  162. fh.seek(0)
  163. assert fh.read() == b"1 2 {}\n2 3 {}\n"
  164. def test_write_edgelist_3(self):
  165. fh = io.BytesIO()
  166. G = nx.Graph()
  167. G.add_edge(1, 2, weight=2.0)
  168. G.add_edge(2, 3, weight=3.0)
  169. nx.write_edgelist(G, fh, data=True)
  170. fh.seek(0)
  171. assert fh.read() == b"1 2 {'weight': 2.0}\n2 3 {'weight': 3.0}\n"
  172. def test_write_edgelist_4(self):
  173. fh = io.BytesIO()
  174. G = nx.Graph()
  175. G.add_edge(1, 2, weight=2.0)
  176. G.add_edge(2, 3, weight=3.0)
  177. nx.write_edgelist(G, fh, data=[("weight")])
  178. fh.seek(0)
  179. assert fh.read() == b"1 2 2.0\n2 3 3.0\n"
  180. def test_unicode(self):
  181. G = nx.Graph()
  182. name1 = chr(2344) + chr(123) + chr(6543)
  183. name2 = chr(5543) + chr(1543) + chr(324)
  184. G.add_edge(name1, "Radiohead", **{name2: 3})
  185. fd, fname = tempfile.mkstemp()
  186. nx.write_edgelist(G, fname)
  187. H = nx.read_edgelist(fname)
  188. assert graphs_equal(G, H)
  189. os.close(fd)
  190. os.unlink(fname)
  191. def test_latin1_issue(self):
  192. G = nx.Graph()
  193. name1 = chr(2344) + chr(123) + chr(6543)
  194. name2 = chr(5543) + chr(1543) + chr(324)
  195. G.add_edge(name1, "Radiohead", **{name2: 3})
  196. fd, fname = tempfile.mkstemp()
  197. pytest.raises(
  198. UnicodeEncodeError, nx.write_edgelist, G, fname, encoding="latin-1"
  199. )
  200. os.close(fd)
  201. os.unlink(fname)
  202. def test_latin1(self):
  203. G = nx.Graph()
  204. name1 = "Bj" + chr(246) + "rk"
  205. name2 = chr(220) + "ber"
  206. G.add_edge(name1, "Radiohead", **{name2: 3})
  207. fd, fname = tempfile.mkstemp()
  208. nx.write_edgelist(G, fname, encoding="latin-1")
  209. H = nx.read_edgelist(fname, encoding="latin-1")
  210. assert graphs_equal(G, H)
  211. os.close(fd)
  212. os.unlink(fname)
  213. def test_edgelist_graph(self):
  214. G = self.G
  215. (fd, fname) = tempfile.mkstemp()
  216. nx.write_edgelist(G, fname)
  217. H = nx.read_edgelist(fname)
  218. H2 = nx.read_edgelist(fname)
  219. assert H is not H2 # they should be different graphs
  220. G.remove_node("g") # isolated nodes are not written in edgelist
  221. assert nodes_equal(list(H), list(G))
  222. assert edges_equal(list(H.edges()), list(G.edges()))
  223. os.close(fd)
  224. os.unlink(fname)
  225. def test_edgelist_digraph(self):
  226. G = self.DG
  227. (fd, fname) = tempfile.mkstemp()
  228. nx.write_edgelist(G, fname)
  229. H = nx.read_edgelist(fname, create_using=nx.DiGraph())
  230. H2 = nx.read_edgelist(fname, create_using=nx.DiGraph())
  231. assert H is not H2 # they should be different graphs
  232. G.remove_node("g") # isolated nodes are not written in edgelist
  233. assert nodes_equal(list(H), list(G))
  234. assert edges_equal(list(H.edges()), list(G.edges()))
  235. os.close(fd)
  236. os.unlink(fname)
  237. def test_edgelist_integers(self):
  238. G = nx.convert_node_labels_to_integers(self.G)
  239. (fd, fname) = tempfile.mkstemp()
  240. nx.write_edgelist(G, fname)
  241. H = nx.read_edgelist(fname, nodetype=int)
  242. # isolated nodes are not written in edgelist
  243. G.remove_nodes_from(list(nx.isolates(G)))
  244. assert nodes_equal(list(H), list(G))
  245. assert edges_equal(list(H.edges()), list(G.edges()))
  246. os.close(fd)
  247. os.unlink(fname)
  248. def test_edgelist_multigraph(self):
  249. G = self.XG
  250. (fd, fname) = tempfile.mkstemp()
  251. nx.write_edgelist(G, fname)
  252. H = nx.read_edgelist(fname, nodetype=int, create_using=nx.MultiGraph())
  253. H2 = nx.read_edgelist(fname, nodetype=int, create_using=nx.MultiGraph())
  254. assert H is not H2 # they should be different graphs
  255. assert nodes_equal(list(H), list(G))
  256. assert edges_equal(list(H.edges()), list(G.edges()))
  257. os.close(fd)
  258. os.unlink(fname)
  259. def test_edgelist_multidigraph(self):
  260. G = self.XDG
  261. (fd, fname) = tempfile.mkstemp()
  262. nx.write_edgelist(G, fname)
  263. H = nx.read_edgelist(fname, nodetype=int, create_using=nx.MultiDiGraph())
  264. H2 = nx.read_edgelist(fname, nodetype=int, create_using=nx.MultiDiGraph())
  265. assert H is not H2 # they should be different graphs
  266. assert nodes_equal(list(H), list(G))
  267. assert edges_equal(list(H.edges()), list(G.edges()))
  268. os.close(fd)
  269. os.unlink(fname)