Skip to content

Commit f918ea8

Browse files
authored
Feature/data redaction (#6)
1 parent c412f9f commit f918ea8

18 files changed

+301
-18
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ name = "pypi"
77
six = "*"
88
websocket-client = "*"
99
pystache = "*"
10+
cachetools= "*"
1011

1112
[dev-packages]
1213

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ def ReadConfig(section, value, default):
7373

7474
install_requires = ['six >= 1.11',
7575
'websocket-client >= 0.56.0',
76-
'pystache >= 0.6.0'
76+
'pystache >= 0.6.0',
77+
'cachetools >= 5.2.0'
7778
]
7879

7980
if sys.version_info[0] == 2:

tracepointdebug/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,28 @@
44
from .broker.broker_manager import BrokerManager
55
from .probe.trace_point_manager import TracePointManager
66
from .probe.log_point_manager import LogPointManager
7+
from .probe.error_stack_manager import ErrorStackManager
8+
79
'''
810
After importing ConfigProvider for the first time, the __init__.py has been run by interpreter and
911
whole configuration is reflected to configs.
1012
'''
11-
from .config.config_provider import ConfigProvider
1213

1314

15+
tracepoint_data_redaction_callback = None
16+
log_data_redaction_callback = None
17+
1418
import logging
1519
logger = logging.getLogger(__name__)
1620

17-
def start():
21+
def start(tracepoint_data_redaction_callback=None, log_data_redaction_callback=None):
1822
cdbg_native.InitializeModule(None)
1923
_broker_manager = BrokerManager().instance()
24+
_broker_manager.initialize(tracepoint_data_redaction_callback, log_data_redaction_callback)
2025
tpm = TracePointManager(broker_manager=_broker_manager)
2126
lpm = LogPointManager(broker_manager=_broker_manager)
22-
_broker_manager.initialize()
27+
esm = ErrorStackManager(broker_manager=_broker_manager)
28+
esm.start()
2329
atexit.register(tpm.remove_all_trace_points)
2430
atexit.register(lpm.remove_all_log_points)
31+
atexit.register(esm.shutdown)

tracepointdebug/broker/broker_client.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from time import sleep
66

77
import websocket
8+
from tracepointdebug.config import config_names
9+
from tracepointdebug.config.config_provider import ConfigProvider
810

911
from tracepointdebug.utils import debug_logger
1012
from tracepointdebug.broker.ws_app import WSApp
@@ -38,6 +40,7 @@ def __init__(self, host, port, broker_credentials, message_callback, initial_req
3840
self.connection_timer = None
3941
self.connection_timeout = 10
4042
self.reconnect_interval = 3
43+
self.error_printed = False
4144
self.connected = threading.Event()
4245
self.initial_request_to_broker = initial_request_to_broker
4346

@@ -133,25 +136,37 @@ def on_error(self, ws, msg):
133136
self._running = False
134137
if self.ws:
135138
self.ws.close()
136-
logger.error("Error on connection, msg: {}".format(msg))
139+
if not self.error_printed:
140+
logger.error("Error on connection, msg: {}".format(msg))
141+
self.error_printed = True
142+
else:
143+
debug_logger("Error on connection, msg: {}".format(msg))
137144

138145
def on_close(self, ws):
146+
self.error_printed = False
139147
debug_logger("Connection closed")
140148

141149
def on_open(self, ws):
142150
debug_logger("Connection open")
151+
self.error_printed = False
143152
self.connected.set()
144153
connection_set = self.connected.wait() #TODO Timeout
145154
if connection_set:
146155
self.initial_request_to_broker()
147156

148157
def send(self, data):
149158
try:
150-
self.ws.send(data)
159+
if self.ws.sock.connected:
160+
self.ws.send(data)
161+
else:
162+
if ConfigProvider.get(config_names.SIDEKICK_PRINT_CLOSED_SOCKET_DATA, False):
163+
print("Socket is already closed while sending data: %s" % data)
164+
debug_logger("Socket is already closed while sending data to see data set SIDEKICK_PRINT_DEBUG_DATA to True!")
151165
except websocket.WebSocketConnectionClosedException as e:
152-
logger.error("Error sending %s" % e)
166+
debug_logger("Error sending %s" % e)
153167

154168
def close(self):
169+
self.error_printed = False
155170
self._running = False
156171
if self.ws:
157172
self.ws.close()

tracepointdebug/broker/broker_manager.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def __init__(self):
4343
self.initialized = False
4444
self._event_executor = ThreadPoolExecutor()
4545
self._request_executor = ThreadPoolExecutor()
46+
self._tracepoint_data_redaction_callback = None
47+
self._log_data_redaction_callback = None
4648
import sys
4749
if sys.version_info[0] >= 3:
4850
self.application_status_thread = Thread(target=self.application_status_sender, daemon=True)
@@ -57,10 +59,14 @@ def instance():
5759
return BrokerManager() if BrokerManager.__instance is None else BrokerManager.__instance
5860

5961

60-
def initialize(self):
62+
def initialize(self, tracepoint_data_redaction_callback=None, log_data_redaction_callback=None):
6163
if not self.initialized:
6264
self.connect_to_broker()
6365
self.initialized = True
66+
if callable(tracepoint_data_redaction_callback):
67+
self._tracepoint_data_redaction_callback = tracepoint_data_redaction_callback
68+
if callable(log_data_redaction_callback):
69+
self._log_data_redaction_callback = log_data_redaction_callback
6470

6571

6672
def connect_to_broker(self):
@@ -98,7 +104,6 @@ def prepare_event(event):
98104
event.application_instance_id = application_info['applicationInstanceId']
99105
event.application_name = application_info['applicationName']
100106

101-
102107
def do_publish_event(self, event):
103108
self.prepare_event(event)
104109
try:
@@ -168,6 +173,5 @@ def publish_application_status(self, client=None):
168173

169174
for status_provider in self.application_status_providers:
170175
status_provider.provide(application_status, client)
171-
172176
event = ApplicationStatusEvent(client=client, application=application_status)
173177
self.publish_event(event)

tracepointdebug/broker/broker_message_callback.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from tracepointdebug.probe.handler.request.logPoint.remove_log_point_request_handler import RemoveLogPointRequestHandler
1111
from tracepointdebug.probe.handler.request.logPoint.update_log_point_request_handler import UpdateLogPointRequestHandler
1212
from tracepointdebug.probe.handler.response.filter_logpoints_response_handler import FilterLogPointsResponseHandler
13+
from tracepointdebug.utils import debug_logger
1314

1415
MESSAGE_REQUEST_TYPE = "Request"
1516
MESSAGE_RESPONSE_TYPE = "Response"
@@ -50,16 +51,15 @@ def on_message(self, broker_client, message):
5051
serialized = to_json(response)
5152
broker_client.send(serialized)
5253
else:
53-
print("No request handler could be found for message with name {}: {}".format(message.get("name"),
54+
debug_logger("No request handler could be found for message with name {}: {}".format(message.get("name"),
5455
message))
5556
elif message_type == MESSAGE_RESPONSE_TYPE:
5657
handler = RESPONSE_HANDLER_MAP.get(message.get("name"))
5758
if handler is not None:
5859
response = handler.get_response_cls()(**message)
5960
handler.handle_response(response)
6061
else:
61-
print("No response handler could be found for message with name {}: {}".format(message.get("name"),
62+
debug_logger("No response handler could be found for message with name {}: {}".format(message.get("name"),
6263
message))
63-
6464
except Exception as e:
65-
print(e)
65+
debug_logger(e)

tracepointdebug/config/config_metadata.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@
88
'type': 'boolean',
99
'defaultValue': False,
1010
},
11+
config_names.SIDEKICK_ERROR_STACK_ENABLE: {
12+
'type': 'boolean',
13+
'defaultValue': False,
14+
},
15+
config_names.SIDEKICK_ERROR_FRAME_COLLECTION_ENABLE: {
16+
'type': 'boolean',
17+
'defaultValue': False,
18+
},
19+
config_names.SIDEKICK_PRINT_CLOSED_SOCKET_DATA: {
20+
'type': 'boolean',
21+
'defaultValue': False,
22+
},
1123
config_names.SIDEKICK_APPLICATION_ID: {
1224
'type': 'string',
1325
},

tracepointdebug/config/config_names.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,7 @@
1212
SIDEKICK_APPLICATION_CLASS_NAME = 'sidekick.application.classname'
1313
SIDEKICK_APPLICATION_VERSION = 'sidekick.application.version'
1414
SIDEKICK_APPLICATION_TAG_PREFIX = 'sidekick.application.tag.prefix'
15-
SIDEKICK_APPLICATION_REGION = 'sidekick.application.region'
15+
SIDEKICK_APPLICATION_REGION = 'sidekick.application.region'
16+
SIDEKICK_ERROR_STACK_ENABLE = 'sidekick.error.stack.enable'
17+
SIDEKICK_ERROR_FRAME_COLLECTION_ENABLE = 'sidekick.error.frame.collection.enable'
18+
SIDEKICK_PRINT_CLOSED_SOCKET_DATA = 'sidekick.print.closed.socket.data'

tracepointdebug/probe/encoder.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
2+
from tracepointdebug.utils import debug_logger
33

44
class JSONEncoder(json.JSONEncoder):
55
def default(self, z):
@@ -11,7 +11,7 @@ def default(self, z):
1111
else:
1212
return super(JSONEncoder, self).default(z)
1313
except Exception as e:
14-
print(e)
14+
debug_logger(e)
1515

1616

1717
def to_json(data, separators=None):
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import time, traceback
2+
from tracepointdebug.probe.coded_exception import CodedException
3+
from tracepointdebug.probe.coded_exception import CodedException
4+
from tracepointdebug.probe.event.errorstack.error_stack_rate_limit_event import ErrorStackRateLimitEvent
5+
from tracepointdebug.probe.event.errorstack.error_stack_snapshot_event import ErrorStackSnapshotEvent
6+
from tracepointdebug.probe.event.errorstack.error_stack_snapshot_failed_event import ErrorStackSnapshotFailedEvent
7+
from tracepointdebug.probe.ratelimit.rate_limit_result import RateLimitResult
8+
from tracepointdebug.probe.ratelimit.rate_limiter import RateLimiter
9+
from tracepointdebug.probe.snapshot_collector import SnapshotCollector
10+
import logging, sys, threading
11+
from tracepointdebug.config import config_names
12+
from tracepointdebug.config.config_provider import ConfigProvider
13+
from datetime import datetime as dt
14+
from cachetools import TTLCache
15+
import datetime, os
16+
17+
logger = logging.getLogger(__name__)
18+
19+
_MAX_SNAPSHOT_SIZE = 32768
20+
_MAX_FRAMES = 10
21+
_MAX_EXPAND_FRAMES = 2
22+
_MAX_TIME_TO_ALIVE_MIN = 5
23+
24+
class ErrorStackManager(object):
25+
__instance = None
26+
27+
def __init__(self, broker_manager):
28+
self.broker_manager = broker_manager
29+
self.old_settrace = sys.gettrace()
30+
self.old_threading = threading._trace_hook
31+
self.condition = None
32+
self.timer = None
33+
self.sidekick_exception = "sidekickException"
34+
self.rate_limiter = RateLimiter()
35+
self.ttl_cache = TTLCache(maxsize=2048, ttl=datetime.timedelta(minutes=_MAX_TIME_TO_ALIVE_MIN), timer=datetime.datetime.now)
36+
ErrorStackManager.__instance = self
37+
38+
@staticmethod
39+
def instance(*args, **kwargs):
40+
return ErrorStackManager(*args,
41+
**kwargs) if ErrorStackManager.__instance is None else ErrorStackManager.__instance
42+
43+
@staticmethod
44+
def get_id(file, line):
45+
return '{}:{}:{}'.format(file, line, str(dt.now()))
46+
47+
def _get_point_cache_id(self, frame):
48+
return frame.f_code.co_filename + ":::" + str(frame.f_lineno)
49+
50+
def _check_point_inserted(self, frame):
51+
error_point_id = self._get_point_cache_id(frame)
52+
item = self.ttl_cache.get(error_point_id, None)
53+
if item is None:
54+
self.ttl_cache[error_point_id] = True
55+
return False
56+
return True
57+
58+
def _white_list_exceptions(self, frame):
59+
frame_file_path = os.path.abspath(frame.f_code.co_filename)
60+
blacklist = ["python", "site-packages", "importlib", "tracepointdebug"]
61+
for black in blacklist:
62+
if black in frame_file_path:
63+
return False
64+
return True
65+
66+
def trace_hook(self, frame, event, arg):
67+
if not self._white_list_exceptions(frame):
68+
return
69+
frame.f_trace = self._frame_hook
70+
71+
def _frame_hook(self, frame, event, arg):
72+
try:
73+
if event != "exception":
74+
return
75+
frame_file_name = frame.f_code.co_filename
76+
frame_line_no = frame.f_lineno
77+
rate_limit_result_for_frame_call = self.rate_limiter.check_rate_limit(time.time())
78+
check_point_already_inserted = self._check_point_inserted(frame)
79+
80+
if (check_point_already_inserted):
81+
return
82+
83+
if (rate_limit_result_for_frame_call == RateLimitResult.HIT):
84+
event = ErrorStackRateLimitEvent(frame_file_name, frame_line_no)
85+
self._publish_event(event)
86+
87+
if (rate_limit_result_for_frame_call == RateLimitResult.EXCEEDED):
88+
return
89+
90+
frames = []
91+
if ConfigProvider.get(config_names.SIDEKICK_ERROR_FRAME_COLLECTION_ENABLE, False):
92+
snapshot_collector = SnapshotCollector(_MAX_SNAPSHOT_SIZE, _MAX_FRAMES, _MAX_EXPAND_FRAMES)
93+
snapshot = snapshot_collector.collect(frame)
94+
frames = snapshot.frames
95+
error_stack_id = self.get_id(frame_file_name, frame_line_no)
96+
error = {
97+
"name": str(arg[0]) or "Error",
98+
"message": str(arg[1]),
99+
"stack": str(traceback.extract_tb(arg[2]))
100+
}
101+
event = ErrorStackSnapshotEvent(error_stack_id, frame_file_name, frame_line_no, method_name=frame.f_code.co_name,
102+
error=error, frames=frames)
103+
self._publish_event(event)
104+
except Exception as exc:
105+
logger.warning('Error on error stack snapshot %s' % exc)
106+
code = 0
107+
if isinstance(exc, CodedException):
108+
code = exc.code
109+
event = ErrorStackSnapshotFailedEvent(frame.f_code.co_filename, frame.f_lineno, code, str(exc))
110+
self._publish_event(event)
111+
112+
def start(self):
113+
if ConfigProvider.get(config_names.SIDEKICK_ERROR_STACK_ENABLE):
114+
sys.settrace(self.trace_hook)
115+
threading.settrace(self.trace_hook)
116+
117+
def shutdown(self):
118+
if ConfigProvider.get(config_names.SIDEKICK_ERROR_STACK_ENABLE):
119+
sys.settrace(self.old_settrace)
120+
threading.settrace(self.old_threading)
121+
122+
def _publish_event(self, event):
123+
self.broker_manager.publish_event(event)

0 commit comments

Comments
 (0)