-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathTor.py
More file actions
282 lines (228 loc) · 11.5 KB
/
Tor.py
File metadata and controls
282 lines (228 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""
Tor.py
Tor network integration module for fetching Reddit RSS feeds through the Tor network.
Provides functionality for fetching content via curl through Tor SOCKS proxy and
managing Tor circuit renewal for IP rotation.
"""
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import traceback
import io
import random
import socket
import sqlite3
import subprocess
import threading
import time
import traceback
from timeit import default_timer as timer
from xml.parsers import expat
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import feedparser
from fake_useragent import UserAgent
# =============================================================================
# LOCAL IMPORTS
# =============================================================================
from shared import g_cs, g_logger, EXPIRE_YEARS, WORKER_PROXYING, PROXY_SERVER, PROXY_USERNAME, PROXY_PASSWORD
from browser_fetch import fetch_site_posts
from app_config import get_tor_password
# =============================================================================
# CONSTANTS AND CONFIGURATION
# =============================================================================
# =============================================================================
# GLOBAL VARIABLES AND INITIALIZATION
# =============================================================================
ua = UserAgent()
# Initialize Reddit user agent if not already set
if not g_cs.has("REDDIT_USER_AGENT"):
g_cs.put("REDDIT_USER_AGENT", ua.random, timeout=EXPIRE_YEARS)
# Initialize Reddit method preference if not already set
if not g_cs.has("REDDIT_METHOD"):
g_cs.put("REDDIT_METHOD", "curl", timeout=EXPIRE_YEARS)
# Thread lock for Tor fetch operations
tor_fetch_lock = threading.Lock()
# =============================================================================
# TOR NETWORK OPERATIONS
# =============================================================================
def fetch_via_curl(url, user_agent):
"""
Fetch Reddit RSS feeds using curl subprocess through Tor SOCKS proxy.
Args:
url (str): The URL to fetch via Tor network
user_agent (str): User agent string for the request
Returns:
feedparser.FeedParserDict or None: Parsed RSS feed data or None if failed
"""
g_logger.info(f"=== FETCH_VIA_CURL START ===")
g_logger.info(f"Function called with URL: {url}")
g_logger.info(f"Using curl TOR method for: {url}")
result = None
try:
cmd = [
"curl", "-s",
"--socks5-hostname", "127.0.0.1:9050",
"-A", user_agent,
"-H", "Accept: */*",
]
# Add proxy headers if proxying is enabled
if WORKER_PROXYING and PROXY_SERVER:
cmd.extend(["-H", f"X-Forwarded-For: {PROXY_SERVER.split(':')[0]}"])
if PROXY_USERNAME and PROXY_PASSWORD:
import base64
auth_string = f"{PROXY_USERNAME}:{PROXY_PASSWORD}"
auth_bytes = auth_string.encode('ascii')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
cmd.extend(["-H", f"Proxy-Authorization: Basic {auth_b64}"])
cmd.append(url)
g_logger.debug(f"Executing curl command: {' '.join(cmd)}")
start_time = timer()
# Use text=False to get bytes directly
process_result = subprocess.run(cmd, capture_output=True, text=False, timeout=30)
elapsed = timer() - start_time
if process_result.returncode == 0 and process_result.stdout:
content_bytes = process_result.stdout
content_length = len(content_bytes)
g_logger.info(f"Curl succeeded in {elapsed:.2f}s, content length: {content_length}")
try:
# First try parsing it as bytes directly
result = feedparser.parse(content_bytes)
entries_count = len(result.get('entries', [])) if hasattr(result, 'get') else 0
g_logger.debug(f"Parsed {entries_count} entries from curl result (bytes mode)")
# If no entries, try string conversion
if entries_count == 0:
# Convert bytes to string with explicit UTF-8 decoding
content_str = content_bytes.decode('utf-8', errors='replace')
# Try parsing as string
result = feedparser.parse(content_str)
entries_count = len(result.get('entries', [])) if hasattr(result, 'get') else 0
g_logger.debug(f"Parsed {entries_count} entries from curl result (string mode)")
# If still no entries, save the first part of the content for debugging
if entries_count == 0 and content_length > 1000:
g_logger.debug(f"Failed to parse content. First 200 chars: {content_str[:200]}")
g_logger.debug(f"Content appears to be XML/RSS: {'<?xml' in content_str[:10]}")
# Last attempt: try using StringIO
result = feedparser.parse(io.BytesIO(content_bytes))
entries_count = len(result.get('entries', [])) if hasattr(result, 'get') else 0
g_logger.debug(f"BytesIO parsing attempt: {entries_count} entries")
if entries_count == 0:
result = None
except (expat.ExpatError, TypeError) as parse_error:
g_logger.error(f"Error parsing curl content: {str(parse_error)}")
g_logger.error(f"Exception type: {type(parse_error).__name__}")
g_logger.error(f"Traceback: {traceback.format_exc()}")
result = None
else:
stderr = process_result.stderr
if isinstance(stderr, bytes):
stderr = stderr.decode('utf-8', errors='replace')
g_logger.error(f"Curl failed with error: {stderr}")
result = None
except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as e:
g_logger.warning(f"Curl TOR method failed: {str(e)}, falling back to cached data")
# Fall back to cached data if proxying fails
cached_feed = g_cs.get(f"tor_cache:{url}")
if cached_feed and cached_feed.get('entries'):
result = cached_feed
g_logger.info(f"Using cached TOR data for {url}: {len(result.get('entries', []))} entries")
else:
result = None
return result
def renew_tor_ip():
"""
Generate a new user agent and request a new Tor IP address.
This function authenticates with the Tor control port and requests a new circuit
to obtain a fresh IP address. It also generates a new user agent for additional
anonymity. Waits 20-30 seconds for the new circuit to be established.
"""
# Generate new user agent
g_cs.put("REDDIT_USER_AGENT", ua.random, timeout=EXPIRE_YEARS)
g_logger.info("Requesting a new TOR IP address...")
host = "127.0.0.1"
port = 9051
# Create socket and connect to Tor control port
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.send(f'AUTHENTICATE "{get_tor_password()}"\r\n'.encode())
response = s.recv(1024).decode()
if "250 OK" not in response:
g_logger.error(f"Authentication failed: {response}")
exit(1)
s.send(b"SIGNAL NEWNYM\r\n")
response = s.recv(1024).decode()
g_logger.info(f"New circuit requested: {response}")
# Wait 20-30 seconds to give time for a circuit to be re-established
time.sleep(random.uniform(20, 30))
# =============================================================================
# MAIN FETCH FUNCTION
# =============================================================================
def fetch_via_tor(url, site_url):
"""
Fetch content via Tor network with automatic fallback and retry logic.
This function attempts to fetch content using the last successful method first,
then falls back to alternative methods. If all attempts fail, it renews the
Tor IP address and retries. Supports both curl and selenium methods.
User agent management is handled internally with smart rotation on failures.
Args:
url (str): The RSS feed URL to fetch
site_url (str): The site URL for selenium-based fetching
Returns:
dict: Parsed feed data with entries, or empty result dict if all methods fail
"""
g_logger.info(f"=== FETCH_VIA_TOR START ===")
g_logger.info(f"Function called with URL: {url}, site_url: {site_url}")
# Get current user agent from cache
user_agent = g_cs.get("REDDIT_USER_AGENT")
g_logger.info(f"Using user agent: {user_agent[:50]}...")
try:
last_success_method = g_cs.get("REDDIT_LAST_METHOD")
g_logger.info(f"Successfully got REDDIT_LAST_METHOD: {last_success_method}")
except (sqlite3.Error, IOError) as e:
g_logger.error(f"CRITICAL ERROR: Failed to access g_cs.get(): {e}")
g_logger.error(f"Exception type: {type(e).__name__}")
g_logger.error(f"Full traceback: {traceback.format_exc()}")
g_logger.error("This is likely the source of the 'shared' error!")
return {
'entries': [],
'status': 'failed',
'bozo_exception': f'g_cs access failed: {str(e)}'
}
with tor_fetch_lock:
max_attempts = 3 # Define how many attempts we try
result = None
for attempt in range(max_attempts):
# On first try use last_success_method, otherwise start with selenium after renew_tor_ip
default_method = last_success_method if (attempt == 0 and last_success_method) else "selenium"
# Try default method
if default_method == "curl":
result_default = fetch_via_curl(url, user_agent)
else:
result_default = fetch_site_posts(site_url, user_agent)
if result_default is not None and len(result_default.get("entries", [])) > 0:
g_cs.put("REDDIT_METHOD", default_method, EXPIRE_YEARS)
result = result_default
break
# Try alternative method
alternative_method = "selenium" if default_method == "curl" else "curl"
if alternative_method == "curl":
result_alternative = fetch_via_curl(url, user_agent)
else:
result_alternative = fetch_site_posts(site_url, user_agent)
if result_alternative is not None and len(result_alternative.get("entries", [])) > 0:
g_cs.put("REDDIT_METHOD", alternative_method, EXPIRE_YEARS)
result = result_alternative
break
g_logger.info(f"Attempt {attempt + 1} failed, renewing TOR and trying again...")
renew_tor_ip()
# Update user_agent to use the newly generated one
user_agent = g_cs.get("REDDIT_USER_AGENT")
if result is None:
g_logger.error("All TOR methods failed, returning empty result")
result = {
'entries': [],
'status': 'failed',
'bozo_exception': 'All TOR methods failed'
}
return result