-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_site.py
90 lines (70 loc) · 2.59 KB
/
test_site.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import time
import argparse
import random
import threading
from concurrent.futures import ThreadPoolExecutor
def visit_site(url, visit_number):
"""
Single site visit function for threading
"""
print(f"\nStarting Visit #{visit_number}")
# Set up Chrome options
chrome_options = Options()
chrome_options.add_argument(
"--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--window-size=1920,1080")
# Create new browser instance
driver = webdriver.Chrome(options=chrome_options)
try:
# Clear browser data
driver.execute_cdp_cmd('Network.clearBrowserCache', {})
driver.execute_cdp_cmd('Network.clearBrowserCookies', {})
# Random delay to seem more human-like
time.sleep(3 + random.random() * 2)
# Visit the site
driver.get(url)
# Simulate reading time
time.sleep(10 + random.random() * 3)
except Exception as e:
print(f"Error in visit #{visit_number}: {str(e)}")
finally:
driver.quit()
print(f"Completed Visit #{visit_number}")
def simulate_concurrent_visits(url, visits, max_concurrent):
"""
Simulate concurrent first-time visits to a website
Args:
url (str): The website URL to visit
visits (int): Number of times to visit the site
max_concurrent (int): Maximum number of concurrent visits
"""
print(f"Starting {visits} concurrent visits to {url}")
print(f"Maximum concurrent visits: {max_concurrent}")
# Use ThreadPoolExecutor to manage concurrent visits
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
# Create a list of futures
futures = [
executor.submit(visit_site, url, i + 1) for i in range(visits)
]
# Wait for all futures to complete
for future in futures:
future.result()
def main():
parser = argparse.ArgumentParser(
description='Simulate concurrent first-time visits to a website')
parser.add_argument('url', help='The URL to visit')
parser.add_argument('visits',
type=int,
help='Number of times to visit the site')
parser.add_argument(
'--max-concurrent',
type=int,
default=5,
help='Maximum number of concurrent visits (default: 5)')
args = parser.parse_args()
simulate_concurrent_visits(args.url, args.visits, args.max_concurrent)
print("\nAll visits completed!")
if __name__ == "__main__":
main()