-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathutils.py
166 lines (142 loc) · 5.68 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from p2p_python.config import V, Debug
import logging
import socket
import random
import asyncio
import os
loop = asyncio.get_event_loop()
NAMES = (
"Angle", "Ant", "Apple", "Arch", "Arm", "Army", "Baby", "Bag", "Ball", "Band", "Basin", "Bath", "Bed",
"Bee", "Bell", "Berry", "Bird", "Blade", "Board", "Boat", "Bone", "Book", "Boot", "Box", "Boy", "Brain",
"Brake", "Brick", "Brush", "Bulb", "Cake", "Card", "Cart", "Cat", "Chain", "Chest", "Chin", "Clock",
"Cloud", "Coat", "Comb", "Cord", "Cow", "Cup", "Dog", "Door", "Drain", "Dress", "Drop", "Ear", "Egg",
"Eye", "Face", "Farm", "Fish", "Flag", "Floor", "Fly", "Foot", "Fork", "Fowl", "Frame", "Girl", "Glove",
"Goat", "Gun", "Hair", "Hand", "Hat", "Head", "Heart", "Hook", "Horn", "Horse", "House", "Jewel", "Key",
"Knee", "Knife", "Knot", "Leaf", "Leg", "Line", "Lip", "Lock", "Map", "Match", "Moon", "Mouth", "Nail",
"Neck", "Nerve", "Net", "Nose", "Nut", "Oven", "Pen", "Pig", "Pin", "Pipe", "Plane", "Plate", "Pot",
"Pump", "Rail", "Rat", "Ring", "Rod", "Roof", "Root", "Sail", "Screw", "Seed", "Sheep", "Shelf", "Ship",
"Shirt", "Shoe", "Skin", "Skirt", "Snake", "Sock", "Spade", "Spoon", "Stamp", "Star", "Stem", "Stick",
"Store", "Sun", "Table", "Tail", "Thumb", "Toe", "Tooth", "Town", "Train", "Tray", "Tree", "Wall",
"Watch", "Wheel", "Whip", "Wing", "Wire", "Worm"
)
def get_version():
"""get program version string"""
if Debug.P_PRINT_EXCEPTION:
return 'debug'
# read version from code
try:
from p2p_python import __version__
return __version__
except Exception:
pass
# read version from file
try:
hear = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(hear, '__init__.py'), mode='r') as fp:
for word in fp.readlines():
if word.startswith('__version__'):
return word.replace('"', "'").split("'")[-2]
except Exception:
pass
return 'unknown'
def get_name():
"""get random name for identify from others"""
return "{}:{}".format(random.choice(NAMES), random.randint(10000, 99999))
def setup_p2p_params(network_ver, p2p_port, p2p_accept=False, p2p_udp_accept=False, sub_dir=None):
""" setup general connection setting """
# directory params
if V.DATA_PATH is not None:
raise Exception('Already setup params.')
root_data_dir = os.path.join(os.path.expanduser('~'), 'p2p-python')
if not os.path.exists(root_data_dir):
os.makedirs(root_data_dir)
V.DATA_PATH = os.path.join(root_data_dir, str(p2p_port))
if not os.path.exists(V.DATA_PATH):
os.makedirs(V.DATA_PATH)
if sub_dir:
V.DATA_PATH = os.path.join(V.DATA_PATH, sub_dir)
if not os.path.exists(V.DATA_PATH):
os.makedirs(V.DATA_PATH)
# network params
V.CLIENT_VER = get_version()
V.SERVER_NAME = get_name()
V.NETWORK_VER = network_ver
V.P2P_PORT = p2p_port
V.P2P_ACCEPT = p2p_accept
V.P2P_UDP_ACCEPT = p2p_udp_accept
def setup_tor_connection(proxy_host='127.0.0.1', port=9150, f_raise_error=True):
""" client connection to onion router """
# Typically, Tor listens for SOCKS connections on port 9050.
# Tor-browser listens on port 9150.
host_port = (proxy_host, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if V.P2P_ACCEPT or V.P2P_UDP_ACCEPT:
raise ConnectionError('P2P socket accept enable? tcp={} udp={}'.format(
V.P2P_ACCEPT, V.P2P_UDP_ACCEPT))
if 0 != sock.connect_ex(host_port):
if f_raise_error:
raise ConnectionError('Cannot connect proxy by test.')
else:
V.TOR_CONNECTION = host_port
sock.close()
def setup_server_hostname(hostname: str = None):
"""
hostname displayed for others
This is useful when proxy provide different ip address
"""
V.MY_HOST_NAME = hostname
async def is_reachable(host, port):
"""check a port is opened, finish in 2s"""
future: asyncio.Future = loop.run_in_executor(
None, socket.getaddrinfo, host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
try:
await asyncio.wait_for(future, 10.0)
addrs = future.result()
except (socket.gaierror, asyncio.TimeoutError):
return False
for af, socktype, proto, canonname, host_port in addrs:
try:
sock = socket.socket(af, socktype, proto)
except OSError:
continue
sock.settimeout(2.0)
future: asyncio.Future = loop.run_in_executor(
None, sock.connect_ex, host_port)
await asyncio.wait_for(future, 3.0)
result = future.result()
loop.run_in_executor(None, sock.close)
if result == 0:
return True
else:
# create no connection
return False
def is_unbind_port(port, family=socket.AF_INET, protocol=socket.SOCK_STREAM):
"""check is bind port by server"""
try:
with socket.socket(family, protocol) as sock:
sock.bind(("127.0.0.1", port))
return True
except socket.error:
return False
def setup_logger(level=logging.DEBUG, format_str='[%(levelname)-6s] [%(threadName)-10s] [%(asctime)-24s] %(message)s'):
"""setup basic logging handler"""
logger = logging.getLogger()
for sh in logger.handlers:
logger.removeHandler(sh)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(format_str)
sh = logging.StreamHandler()
sh.setLevel(level)
sh.setFormatter(formatter)
logger.addHandler(sh)
__all__ = [
"NAMES",
"get_version",
"get_name",
"setup_server_hostname",
"is_reachable",
"is_unbind_port",
"setup_tor_connection",
"setup_p2p_params",
"setup_logger",
]