-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflow_graph.py
More file actions
103 lines (81 loc) · 3.19 KB
/
flow_graph.py
File metadata and controls
103 lines (81 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Manage and representation of network flow graph."""
import pandas as pd
import networkx as nx
from functools import lru_cache
from pyvis.network import Network
class FlowGraph:
"""Represent all data flows between services in the system."""
def __init__(self):
self.connections = {}
def get_connection(self, src_ip, dst_ip):
key = (src_ip, dst_ip)
con = self.connections.get(key, Connection(*key))
self.connections[key] = con
return con
def _get_adjacency_dict(self):
network = dict()
for connection in self.connections.values():
src = connection.src.name
dst = connection.dst.name
neighbours = network.get(src, [])
neighbours.append(dst)
network[src] = neighbours
return network
def plot_graph(self, out_path):
"""Create an HTML file with plotting of the flow graph."""
net = Network(notebook=True)
network_dict = self._get_adjacency_dict()
net.from_nx(nx.from_dict_of_lists(network_dict))
net.show(out_path)
def plot_table(self, out_path):
"""Create an HTML file with table of flow connections and keys."""
connections = self.connections.values()
sources = [connection.src.name for connection in connections]
dests = [connection.dst.name for connection in connections]
keys = [", ".join(connection.keys) for connection in connections]
df = pd.DataFrame({"Source": sources, "Dest": dests, "Key names": keys})
html = df.to_html()
with open(out_path, "wb") as table_file:
table_file.write(html.encode())
def print_table(self):
"""Print flow table textually."""
print(list(self.connections.values()))
class Connection:
"""Represent a data flow between 2 entities in the system."""
def __init__(self, src, dst):
self.src = Entity(src)
self.dst = Entity(dst)
self.keys = set()
def add_keys(self, json):
if json:
if type(json) is dict:
self.keys.update(json.keys())
elif type(json) is list:
for item in json:
self.add_keys(item)
def __repr__(self):
return f"{self.src} -> {self.dst}: {self.keys}\n"
class Entity:
"""Represent a service in the system."""
SERVICES_TABLE_FILE = "services.txt"
def __init__(self, ip):
self.ip = ip
self.name = self.fetch_service_name()
def fetch_service_name(self):
"""Lookup the service name that uses this ip. if not found return the ip."""
return self.get_ip_to_name_table().get(self.ip, self.ip)
@classmethod
@lru_cache(100)
def get_ip_to_name_table(cls):
"""Load translation table of ip to service name."""
with open(cls.SERVICES_TABLE_FILE, "rb") as ip_to_name:
lines = ip_to_name.readlines()
ip_to_name_table = {}
for line in lines:
table_line = line.decode("ascii").split(" ")
ip = table_line[-1].strip()
name = table_line[0].strip()
ip_to_name_table[ip] = name
return ip_to_name_table
def __repr__(self):
return self.name