|
1 |
| -from typing import Callable, Iterable, Self |
| 1 | +from typing import Callable, Iterable, Mapping, Optional, Self |
2 | 2 |
|
3 | 3 | from aiohttp import web
|
4 | 4 | from aiohttp.typedefs import Handler
|
@@ -30,31 +30,29 @@ def __init__(
|
30 | 30 |
|
31 | 31 | @classmethod
|
32 | 32 | def from_config(
|
33 |
| - cls, request_policy_config: list[str], response_policy_config: list[str] |
| 33 | + cls, |
| 34 | + request_policy_config: list[str], |
| 35 | + response_policy_config: list[str], |
| 36 | + csp_config: Optional[Mapping[str, Optional[list[str]]]] = None, |
34 | 37 | ) -> Self:
|
35 | 38 | request_policy_map = {
|
36 | 39 | "reject_metadata_local_link_policy": reject_metadata_local_link_policy,
|
37 | 40 | "reject_access_for_unsafe_file_policy": reject_access_for_unsafe_file_policy,
|
38 | 41 | }
|
39 | 42 | response_policy_map = {
|
40 |
| - "add_self_content_security_policy": add_self_content_security_policy, |
41 | 43 | "set_content_type_nosniff_policy": set_content_type_nosniff_policy,
|
42 | 44 | }
|
43 | 45 | try:
|
44 | 46 | request_policies = [
|
45 | 47 | request_policy_map[policy_name] for policy_name in request_policy_config
|
46 | 48 | ]
|
47 |
| - response_policies = [ |
48 |
| - response_policy_map[policy_name] for policy_name in response_policy_config |
49 |
| - ] |
| 49 | + response_policies: list[ResponsePolicy] = [] |
| 50 | + for policy_name in response_policy_config: |
| 51 | + response_policies.append(response_policy_map[policy_name]) |
50 | 52 | except KeyError as e:
|
51 | 53 | raise ValueError(f"Unknown security policy name: {e}")
|
52 |
| - return cls(request_policies, response_policies) |
53 |
| - |
54 |
| - @classmethod |
55 |
| - def default_policy(cls) -> Self: |
56 |
| - request_policies = [reject_metadata_local_link_policy, reject_access_for_unsafe_file_policy] |
57 |
| - response_policies = [add_self_content_security_policy, set_content_type_nosniff_policy] |
| 54 | + if csp_config is not None: |
| 55 | + response_policies.append(csp_policy_builder(csp_config)) |
58 | 56 | return cls(request_policies, response_policies)
|
59 | 57 |
|
60 | 58 | def check_request_policies(self, request: web.Request) -> None:
|
@@ -102,6 +100,19 @@ def add_self_content_security_policy(response: web.StreamResponse) -> web.Stream
|
102 | 100 | return response
|
103 | 101 |
|
104 | 102 |
|
| 103 | +def csp_policy_builder(csp_config: Mapping[str, Optional[list[str]]]) -> ResponsePolicy: |
| 104 | + csp = [key + " " + " ".join(value) for key, value in csp_config.items() if value] |
| 105 | + csp_str = "; ".join(csp) |
| 106 | + if csp_str: |
| 107 | + csp_str = csp_str + ";" |
| 108 | + |
| 109 | + def policy(response: web.StreamResponse) -> web.StreamResponse: |
| 110 | + response.headers["Content-Security-Policy"] = csp_str |
| 111 | + return response |
| 112 | + |
| 113 | + return policy |
| 114 | + |
| 115 | + |
105 | 116 | def set_content_type_nosniff_policy(response: web.StreamResponse) -> web.StreamResponse:
|
106 | 117 | response.headers["X-Content-Type-Options"] = "nosniff"
|
107 | 118 | return response
|
0 commit comments