Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions nsi_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class Settings(BaseSettings):

allowed_client_subject_dn_path: FilePath = FilePath("/config/allowed_client_dn.txt")
ssl_client_subject_dn_header: str = "ssl-client-subject-dn"
pem_header: str = "X-Forwarded-Tls-Client-Cert"
traefik_cert_info_header: str = "X-Forwarded-Tls-Client-Cert-Info"
use_watchdog: bool = False
log_level: str = "INFO"

Expand Down Expand Up @@ -107,6 +109,27 @@ def _escape_dn_value(value: str) -> str:
return value


def _first_der_cert(data: bytes) -> bytes:
"""Return just the first DER certificate from potentially concatenated DER bytes.

Certificates are ASN.1 SEQUENCE structures (tag 0x30). Reads the length from the
tag+length header to slice off exactly the first cert and discard the rest.
"""
if not data or data[0] != 0x30:
raise ValueError("not an ASN.1 SEQUENCE")
idx = 1
b = data[idx]
if b & 0x80 == 0:
length = b
idx += 1
else:
n = b & 0x7F
idx += 1
length = int.from_bytes(data[idx : idx + n], "big")
idx += n
return data[: idx + length]


def extract_dn_from_pem_header(header_value: str) -> str | None:
"""Extract DN from Traefik's X-Forwarded-Tls-Client-Cert header (URL-encoded PEM).

Expand All @@ -115,13 +138,17 @@ def extract_dn_from_pem_header(header_value: str) -> str | None:
Returns a normalized DN string in DER field order, or None on parse failure.
"""
try:
# Traefik strips newlines from the PEM before URL-encoding (to prevent header injection),
# so load_pem_x509_certificate would fail on the re-assembled string. Instead, extract
# the base64 between the PEM markers and load as DER.
# Use unquote (not unquote_plus) to preserve '+' characters valid in base64.
# Use unquote (not unquote_plus) to preserve literal '+' which is valid in base64.
# Traefik sends raw base64 DER without PEM markers or URL-encoding, but unquote
# handles any %XX sequences if ever present.
pem_str = unquote(header_value)
b64 = re.sub(r"-----[^-]+-----", "", pem_str).replace(" ", "")
cert = x509.load_der_x509_certificate(base64.b64decode(b64))
# Support both raw base64 (Traefik) and PEM-wrapped base64.
match = re.search(r"-----BEGIN CERTIFICATE-----([^-]*)-----END CERTIFICATE-----", pem_str)
b64 = match.group(1) if match else pem_str
# Strip whitespace, then extract only the first cert — Traefik may send the full chain
# as concatenated DER bytes with no markers or separators between certs.
raw = base64.b64decode(b64.replace(" ", "").replace("\n", "").replace("\r", ""))
cert = x509.load_der_x509_certificate(_first_der_cert(raw))
except Exception as e:
app.logger.warning(f"failed to parse PEM from X-Forwarded-Tls-Client-Cert: {e!s}")
return None
Expand Down Expand Up @@ -158,21 +185,24 @@ def get_client_dn() -> tuple[str | None, str]:
Returns:
Tuple of (dn, source) where source indicates which header was used.
"""
pem_header = request.headers.get("X-Forwarded-Tls-Client-Cert")
pem_header = request.headers.get(settings.pem_header)
if pem_header:
dn = extract_dn_from_pem_header(pem_header)
if dn:
return dn, "traefik-pem"
app.logger.debug(f"extracted DN from {settings.pem_header} (PEM): {dn}")
return dn, settings.pem_header

traefik_header = request.headers.get("X-Forwarded-Tls-Client-Cert-Info")
if traefik_header:
dn = extract_dn_from_traefik_header(traefik_header)
traefik_cert_info_header = request.headers.get(settings.traefik_cert_info_header)
if traefik_cert_info_header:
dn = extract_dn_from_traefik_header(traefik_cert_info_header)
if dn:
return dn, "traefik"
app.logger.debug(f"extracted DN from {settings.traefik_cert_info_header}: {dn}")
return dn, settings.traefik_cert_info_header

nginx_header = request.headers.get(settings.ssl_client_subject_dn_header)
if nginx_header:
return nginx_header, "nginx"
app.logger.debug(f"extracted DN from {settings.ssl_client_subject_dn_header}: {nginx_header}")
return nginx_header, settings.ssl_client_subject_dn_header

return None, "none"

Expand All @@ -184,8 +214,8 @@ def validate() -> tuple[str, int]:

if not dn:
app.logger.warning(
f"no client DN found in headers (tried X-Forwarded-Tls-Client-Cert, "
f"X-Forwarded-Tls-Client-Cert-Info, {settings.ssl_client_subject_dn_header})"
f"no client DN found in headers (tried {settings.pem_header}, "
f"{settings.traefik_cert_info_header}, {settings.ssl_client_subject_dn_header})"
)
return "Forbidden", 403

Expand Down
47 changes: 47 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from collections.abc import Generator
from pathlib import Path

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from flask import Flask
from flask.testing import FlaskClient
from pytest import MonkeyPatch, fixture

_OID_ORGANIZATION_IDENTIFIER = x509.ObjectIdentifier("2.5.4.97")


@fixture
def allowed_client_dn(tmp_path: Path) -> Path:
Expand Down Expand Up @@ -47,3 +54,43 @@ def application(allowed_client_dn: Path, monkeypatch: MonkeyPatch) -> Generator[
def client(application: Flask) -> FlaskClient:
"""A test client for the application instance."""
return application.test_client()


@fixture(scope="session")
def test_cert() -> x509.Certificate:
"""Self-signed certificate with extended subject fields for testing."""
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Michigan"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test Organization"),
x509.NameAttribute(_OID_ORGANIZATION_IDENTIFIER, "NTRUS+MI-123456"),
x509.NameAttribute(NameOID.EMAIL_ADDRESS, "test@example.com"),
x509.NameAttribute(NameOID.COMMON_NAME, "Test Client"),
])
return (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(subject)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.now(datetime.UTC))
.not_valid_after(datetime.datetime.now(datetime.UTC) + datetime.timedelta(days=1))
.sign(key, hashes.SHA256())
)


@fixture(scope="session")
def test_cert_dn() -> str:
"""Expected DN string for test_cert (DER field order, RFC 4514 escaped)."""
return r"C=US,ST=Michigan,O=Test Organization,organizationIdentifier=NTRUS\+MI-123456,emailAddress=test@example.com,CN=Test Client"


@fixture(scope="session")
def pem_header_value(test_cert: x509.Certificate) -> str:
"""Traefik X-Forwarded-Tls-Client-Cert header value for test_cert.

Traefik sends raw base64 DER with no PEM markers and no URL-encoding.
"""
import base64
return base64.b64encode(test_cert.public_bytes(serialization.Encoding.DER)).decode("ascii")
94 changes: 94 additions & 0 deletions tests/functional/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from urllib.parse import quote_plus

import pytest
from flask.testing import FlaskClient

Expand Down Expand Up @@ -71,3 +73,95 @@ def test_validate_rejects_non_get_methods(client: FlaskClient, method: str) -> N
"""Verify that the /validate endpoint only accepts GET requests."""
response = getattr(client, method)("/validate")
assert response.status_code == 405


# ---------------------------------------------------------------------------
# PEM header (X-Forwarded-Tls-Client-Cert)
# ---------------------------------------------------------------------------


def test_validate_pem_header_allowed(client: FlaskClient, pem_header_value: str, test_cert_dn: str) -> None:
"""PEM header with DN in allow-list returns 200."""
from nsi_auth import state

state.allowed_client_subject_dn = [test_cert_dn]
response = client.get("/validate", headers={"X-Forwarded-Tls-Client-Cert": pem_header_value})
assert response.status_code == 200
assert response.data == b"OK"


def test_validate_pem_header_not_in_allowlist(client: FlaskClient, pem_header_value: str) -> None:
"""PEM header with DN not in allow-list returns 403."""
from nsi_auth import state

state.allowed_client_subject_dn = ["CN=SomeoneElse,C=NL"]
response = client.get("/validate", headers={"X-Forwarded-Tls-Client-Cert": pem_header_value})
assert response.status_code == 403


# ---------------------------------------------------------------------------
# Traefik Info header (X-Forwarded-Tls-Client-Cert-Info)
# ---------------------------------------------------------------------------


def test_validate_traefik_info_header_allowed(client: FlaskClient) -> None:
"""URL-encoded Subject= info header with DN in allow-list returns 200."""
from nsi_auth import state

dn = "CN=Test,O=Org,C=US"
state.allowed_client_subject_dn = [dn]
encoded = quote_plus(f'Subject="{dn}"')
response = client.get("/validate", headers={"X-Forwarded-Tls-Client-Cert-Info": encoded})
assert response.status_code == 200
assert response.data == b"OK"


def test_validate_traefik_info_header_not_in_allowlist(client: FlaskClient) -> None:
"""URL-encoded Subject= info header with DN not in allow-list returns 403."""
from nsi_auth import state

state.allowed_client_subject_dn = ["CN=SomeoneElse,C=NL"]
encoded = quote_plus('Subject="CN=Test,O=Org,C=US"')
response = client.get("/validate", headers={"X-Forwarded-Tls-Client-Cert-Info": encoded})
assert response.status_code == 403


# ---------------------------------------------------------------------------
# Priority and fallback behaviour
# ---------------------------------------------------------------------------


def test_validate_pem_takes_priority_over_info(
client: FlaskClient, pem_header_value: str, test_cert_dn: str
) -> None:
"""When both headers are present, PEM DN is used (not Info DN)."""
from nsi_auth import state

# Only the PEM cert's DN is allowed; Info header carries a different DN
state.allowed_client_subject_dn = [test_cert_dn]
info_encoded = quote_plus('Subject="CN=Different,C=NL"')
response = client.get(
"/validate",
headers={
"X-Forwarded-Tls-Client-Cert": pem_header_value,
"X-Forwarded-Tls-Client-Cert-Info": info_encoded,
},
)
assert response.status_code == 200


def test_validate_pem_parse_failure_falls_back_to_info(client: FlaskClient) -> None:
"""Garbage PEM header falls back to Info header for DN extraction."""
from nsi_auth import state

dn = "CN=Test,O=Org,C=US"
state.allowed_client_subject_dn = [dn]
info_encoded = quote_plus(f'Subject="{dn}"')
response = client.get(
"/validate",
headers={
"X-Forwarded-Tls-Client-Cert": "not-a-valid-pem",
"X-Forwarded-Tls-Client-Cert-Info": info_encoded,
},
)
assert response.status_code == 200
Loading