-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtest_connect.py
101 lines (85 loc) · 3.06 KB
/
test_connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import argparse
import socket
import time
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--server", help="run as server", action='store_true')
parser.add_argument("-c", "--client", help="run as client", action='store_true')
parser.add_argument("--host-ip", help="host ip address")
parser.add_argument("--netns-ip", help="netns ip address")
parser.add_argument("-p", "--port", help="bind or connect port")
parser.add_argument("-u", "--udp", help="use UDP", action='store_true')
parser.add_argument("--count", help="try count", default=1)
args = parser.parse_args()
TEST_MESSAGE = 'bypass4netns OK'
def server_tcp(args):
print('test server starting...')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.settimeout(3.0)
sock.bind(('0.0.0.0', int(args.port)))
sock.listen()
cnt = 0
while cnt < int(args.count):
con, _ = sock.accept()
print('connection accepted.')
recvline = con.recv(8192).decode()
if recvline == TEST_MESSAGE:
print('TEST_MESSAGE has been received.')
cnt += 1
con.close()
sock.close()
def client_tcp(args):
print('test client starting...')
for _ in (0, int(args.count)):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3.0)
print('send TEST_MESSAGE to host')
sock.connect((args.host_ip, int(args.port)))
sock.send(TEST_MESSAGE.encode('utf-8'))
sock.close()
time.sleep(1.0)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3.0)
print('send TEST_MESSAGE to netns')
sock.connect((args.netns_ip, int(args.port)))
sock.send(TEST_MESSAGE.encode('utf-8'))
sock.close()
print('done.')
def server_udp(args):
print('test server starting...')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.settimeout(3.0)
sock.bind(('0.0.0.0', int(args.port)))
cnt = 0
while cnt < int(args.count):
recvline = sock.recv(8192).decode()
if recvline == TEST_MESSAGE:
print('TEST_MESSAGE has been received.')
cnt += 1
sock.close()
def client_udp(args):
print('test client starting...')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(3.0)
for _ in (0, int(args.count)):
print('send TEST_MESSAGE to host')
sock.connect((args.host_ip, int(args.port)))
sock.send(TEST_MESSAGE.encode('utf-8'))
time.sleep(1.0)
print('send TEST_MESSAGE to netns')
sock.connect((args.netns_ip, int(args.port)))
sock.send(TEST_MESSAGE.encode('utf-8'))
sock.close()
print('done.')
if __name__ == '__main__':
if args.udp:
if args.server:
server_udp(args)
elif args.client:
client_udp(args)
else:
if args.server:
server_tcp(args)
elif args.client:
client_tcp(args)