From 1104d3d8f7f167c4ff5bbfe155927afa928c2404 Mon Sep 17 00:00:00 2001 From: msaf1980 Date: Thu, 27 Oct 2022 21:02:11 +0500 Subject: [PATCH] prevent xss on dashbord queries --- webapp/graphite/dashboard/views.py | 20 ++++++++++++++++++++ webapp/graphite/errors.py | 23 +++++++++++++++++------ webapp/graphite/util.py | 15 +++++++++++++++ webapp/tests/base.py | 12 ++++++++++-- webapp/tests/test_dashboard.py | 23 ++++++++++++++++++++++- 5 files changed, 84 insertions(+), 9 deletions(-) diff --git a/webapp/graphite/dashboard/views.py b/webapp/graphite/dashboard/views.py index 5e5d69183..80750c7d5 100644 --- a/webapp/graphite/dashboard/views.py +++ b/webapp/graphite/dashboard/views.py @@ -16,6 +16,7 @@ from graphite.render.views import renderView from graphite.util import json from graphite.user_util import isAuthenticated +from graphite.errors import handleInputParameterError, str_param fieldRegex = re.compile(r'<([^>]+)>') defaultScheme = { @@ -108,7 +109,9 @@ def load(self): config = DashboardConfig() +@handleInputParameterError def dashboard(request, name=None): + name = str_param('name', name) dashboard_conf_missing = False try: @@ -155,7 +158,9 @@ def dashboard(request, name=None): return render(request, "dashboard.html", context) +@handleInputParameterError def template(request, name, val): + name = str_param('name', name) template_conf_missing = False try: @@ -221,7 +226,10 @@ def getPermissions(user): return permissions +@handleInputParameterError def save(request, name): + name = str_param('name', name) + if 'change' not in getPermissions(request.user): return json_response( dict(error="Must be logged in with appropriate permissions to save") ) # Deserialize and reserialize as a validation step @@ -238,7 +246,11 @@ def save(request, name): return json_response( dict(success=True) ) +@handleInputParameterError def save_template(request, name, key): + name = str_param('name', name) + key = str_param('key', key) + if 'change' not in getPermissions(request.user): return json_response( dict(error="Must be logged in with appropriate permissions to save the template") ) # Deserialize and reserialize as a validation step @@ -257,7 +269,9 @@ def save_template(request, name, key): return json_response( dict(success=True) ) +@handleInputParameterError def load(request, name): + name = str_param('name', name) try: dashboard = Dashboard.objects.get(name=name) except Dashboard.DoesNotExist: @@ -266,7 +280,9 @@ def load(request, name): return json_response( dict(state=json.loads(dashboard.state)) ) +@handleInputParameterError def load_template(request, name, val): + name = str_param('name', name) try: template = Template.objects.get(name=name) except Template.DoesNotExist: @@ -277,7 +293,9 @@ def load_template(request, name, val): return json_response( dict(state=state) ) +@handleInputParameterError def delete(request, name): + name = str_param('name', name) if 'delete' not in getPermissions(request.user): return json_response( dict(error="Must be logged in with appropriate permissions to delete") ) @@ -290,7 +308,9 @@ def delete(request, name): return json_response( dict(success=True) ) +@handleInputParameterError def delete_template(request, name): + name = str_param('name', name) if 'delete' not in getPermissions(request.user): return json_response( dict(error="Must be logged in with appropriate permissions to delete the template") ) diff --git a/webapp/graphite/errors.py b/webapp/graphite/errors.py index 144946309..a0144db0c 100644 --- a/webapp/graphite/errors.py +++ b/webapp/graphite/errors.py @@ -1,5 +1,6 @@ from django.http import HttpResponseBadRequest from graphite.logger import log +from graphite.util import htmlEscape, is_unsafe_str class NormalizeEmptyResultError(Exception): @@ -94,12 +95,22 @@ def __str__(self): return msg -# Replace special characters "&", "<" and ">" to HTML-safe sequences. -def escape(s): - s = s.replace("&", "&") # Must be done first! - s = s.replace("<", "<") - s = s.replace(">", ">") +def safe_param(name, s): + if is_unsafe_str(s): + raise InputParameterError("{} contain unsafe symbols".format(name)) + return s + + +def is_unclean_str(s): + for symbol in '&<>~!@#$%^*()`': + if s.find(symbol) >= 0: + return True + return False + +def str_param(name, s): + if s is not None and is_unclean_str(s): + raise InputParameterError("{} contain restricted symbols".format(name)) return s @@ -111,6 +122,6 @@ def new_f(*args, **kwargs): except InputParameterError as e: msgStr = str(e) log.warning('%s', msgStr) - return HttpResponseBadRequest(escape(msgStr)) + return HttpResponseBadRequest(htmlEscape(msgStr)) return new_f diff --git a/webapp/graphite/util.py b/webapp/graphite/util.py index 1b6203317..1b5889026 100644 --- a/webapp/graphite/util.py +++ b/webapp/graphite/util.py @@ -128,6 +128,21 @@ def find_escaped_pattern_fields(pattern_string): yield index +# Replace special characters "&", "<" and ">" to HTML-safe sequences. +def htmlEscape(s): + s = s.replace("&", "&") # Must be done first! + s = s.replace("<", "<") + s = s.replace(">", ">") + return s + + +def is_unsafe_str(s): + for symbol in '<>': + if s.find(symbol) >= 0: + return True + return False + + def load_module(module_path, member=None): module_name = splitext(basename(module_path))[0] try: # 'U' is default from Python 3.0 and deprecated since 3.9 diff --git a/webapp/tests/base.py b/webapp/tests/base.py index 512e1e6d3..844f28ead 100644 --- a/webapp/tests/base.py +++ b/webapp/tests/base.py @@ -2,6 +2,13 @@ from graphite.worker_pool.pool import stop_pools +def is_unsafe_str(s): + for symbol in '<>': + if s.find(symbol) > 0: + return True + return False + + class TestCase(OriginalTestCase): def tearDown(self): stop_pools() @@ -15,5 +22,6 @@ def assertXSS(self, response, status_code=200, msg_prefix=''): " (expected %d)" % (response.status_code, status_code) ) - xss = response.content.find(b"<") != -1 or response.content.find(b">") != -1 - self.assertFalse(xss, msg=msg_prefix+str(response.content)) + content = str(response.content) + xss = is_unsafe_str(content) + self.assertFalse(xss, msg=msg_prefix+content) diff --git a/webapp/tests/test_dashboard.py b/webapp/tests/test_dashboard.py index 160d0f949..4b2782f46 100644 --- a/webapp/tests/test_dashboard.py +++ b/webapp/tests/test_dashboard.py @@ -20,6 +20,8 @@ except ImportError: from django.contrib.auth.models import User +from graphite.util import htmlEscape + class DashboardTest(TestCase): # Set config to the test config file @@ -244,23 +246,42 @@ def test_dashboard_find_template_empty(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.content, b'{"templates": []}') + def test_dashboard_save_temporary_xss_name(self): + xssStr = htmlEscape('') + url = '/graphite/dashboard/save_template/' + xssStr + '/testkey' + + request = copy.deepcopy(self.testtemplate) + response = self.client.post(url, request) + self.assertContains(response, 'name contain restricted symbols', status_code=400) + + def test_dashboard_save_temporary_xss_key(self): + xssStr = htmlEscape('') + url = '/graphite/dashboard/save_template/testtemplate/' + xssStr + + request = copy.deepcopy(self.testtemplate) + response = self.client.post(url, request) + self.assertContains(response, 'key contain restricted symbols', status_code=400) + def test_dashboard_save_template(self): url = reverse('dashboard_save_template', args=['testtemplate', 'testkey']) request = copy.deepcopy(self.testtemplate) response = self.client.post(url, request) self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, b'{"success": true}') - # Save again after it now exists + # Save again after it now exists def test_dashboard_save_template_overwrite(self): url = reverse('dashboard_save_template', args=['testtemplate', 'testkey']) request = copy.deepcopy(self.testtemplate) response = self.client.post(url, request) self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, b'{"success": true}') url = reverse('dashboard_save_template', args=['testtemplate', 'testkey']) request = copy.deepcopy(self.testtemplate) response = self.client.post(url, request) self.assertEqual(response.status_code, 200) + self.assertEqual(response.content, b'{"success": true}') def test_dashboard_find_template(self): url = reverse('dashboard_save_template', args=['testtemplate', 'testkey'])