Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions airflow/config/webserver_config.py.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Keycloak Direct OIDC Authentication for Airflow
# Airflow authenticates directly with Keycloak (no proxy layer)

import os
import logging
from airflow.www.security import AirflowSecurityManager
from flask_appbuilder.security.manager import AUTH_OAUTH

log = logging.getLogger(__name__)

# Enable OAuth authentication
AUTH_TYPE = AUTH_OAUTH

# Keycloak OIDC Configuration
OIDC_ISSUER = "${keycloak_provider_url}"
OIDC_CLIENT_ID = "${keycloak_client_id}"

# Client secret must be provided via environment variable
OIDC_CLIENT_SECRET = os.getenv("OIDC_CLIENT_SECRET", "CHANGE_ME")

# OAuth provider configuration
OAUTH_PROVIDERS = [
{
"name": "keycloak",
"icon": "fa-key",
"token_key": "access_token",
"remote_app": {
"client_id": OIDC_CLIENT_ID,
"client_secret": OIDC_CLIENT_SECRET,
"api_base_url": OIDC_ISSUER,
"client_kwargs": {
"scope": "openid email profile groups"
},
"access_token_url": f"{OIDC_ISSUER}/protocol/openid-connect/token",
"authorize_url": f"{OIDC_ISSUER}/protocol/openid-connect/auth",
"request_token_url": None,
"server_metadata_url": f"{OIDC_ISSUER}/.well-known/openid-configuration",
},
}
]

# Auto-register users on first login (only if they have approved Keycloak groups)
# Users without approved groups will be rejected during authentication
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Viewer" # Not used - role determined by Keycloak group mapping

# Role mapping configuration
class CustomSecurityManager(AirflowSecurityManager):
"""
Custom security manager to map Keycloak groups to Airflow roles.

IMPORTANT: Users must have at least one approved Keycloak group to access Airflow.
Users without approved groups will be denied access during authentication.
"""

def oauth_user_info(self, provider, response):
"""
Get user info from OAuth provider and map groups to roles.

Args:
provider: OAuth provider name
response: OAuth response containing tokens

Returns:
Dictionary with user information
"""
if provider == "keycloak":
import json
import base64

# Log the OAuth response structure (without sensitive token values)
log.info(f"OAuth callback from provider: {provider}")
log.info(f"OAuth response keys: {list(response.keys())}")

# Get access token
access_token = response.get("access_token")
if not access_token:
log.error(f"No access token in OAuth response. Response keys: {list(response.keys())}")
log.error(f"Full response (for debugging): {response}")
return {}

try:
# Decode JWT to get user info and groups
# JWT structure: header.payload.signature
parts = access_token.split('.')
if len(parts) != 3:
log.error(f"Invalid JWT format. Expected 3 parts, got {len(parts)}")
return {}

payload = parts[1]
# Add padding if needed
payload += '=' * (4 - len(payload) % 4)
decoded = json.loads(base64.urlsafe_b64decode(payload))

# Log what we received from Keycloak (useful for debugging)
log.info(f"JWT payload keys: {list(decoded.keys())}")
log.info(f"Available claims: username={decoded.get('preferred_username')}, email={decoded.get('email')}")
log.info(f"Groups in token: {decoded.get('groups', [])}")

# Extract user information (with fallbacks for different claim names)
username = decoded.get("preferred_username") or decoded.get("username") or decoded.get("sub")
email = decoded.get("email", f"{username}@example.com")
first_name = decoded.get("given_name") or decoded.get("first_name") or username
last_name = decoded.get("family_name") or decoded.get("last_name") or ""

# Groups might be in different formats depending on Keycloak mapper config
groups = decoded.get("groups", [])
if isinstance(groups, str):
groups = [groups]

# Some Keycloak configs put groups in realm_access or resource_access
if not groups and "realm_access" in decoded:
groups = decoded["realm_access"].get("roles", [])
if not groups and "resource_access" in decoded:
client_access = decoded["resource_access"].get(OIDC_CLIENT_ID, {})
groups = client_access.get("roles", [])

user_info = {
"username": username,
"email": email,
"first_name": first_name,
"last_name": last_name,
"groups": groups,
}

log.info(f"Keycloak user login: username={user_info['username']}, email={user_info['email']}, groups={user_info['groups']}")

# Map groups to roles
user_info["role_keys"] = self._map_groups_to_roles(user_info["groups"])
log.info(f"Mapped to Airflow roles: {user_info['role_keys']}")

return user_info

except Exception as e:
log.error(f"Error decoding access token: {e}", exc_info=True)
log.error(f"Token (first 50 chars): {access_token[:50]}...")
return {}

return {}

def _map_groups_to_roles(self, keycloak_groups):
"""
Map Keycloak groups to Airflow roles.

Role mapping (configured via Terraform):
%{ for group, roles in keycloak_role_mapping ~}
- ${group} → ${join(", ", roles)}
%{ endfor ~}

Users with multiple groups get the highest priority role.
Priority: Admin > Op > User > Viewer > Public

IMPORTANT: Users without any approved Keycloak groups will be rejected.

Args:
keycloak_groups: List of Keycloak group names from OIDC token

Returns:
List of Airflow role names

Raises:
Exception: If user has no approved Keycloak groups (access denied)
"""
# Keycloak group to Airflow role mapping (from Terraform configuration)
group_role_mapping = {
%{ for group, roles in keycloak_role_mapping ~}
'${group}': '${roles[0]}',
%{ endfor ~}
}

log.debug(f"Group role mapping: {group_role_mapping}")
log.debug(f"User's Keycloak groups: {keycloak_groups}")

# Role priority (higher index = higher priority)
role_priority = ['Public', 'Viewer', 'User', 'Op', 'Admin']

# Find highest priority role from user's groups
highest_role_name = None
highest_priority = -1

for group in keycloak_groups:
# Handle group paths (e.g., "/airflow/admin" or "airflow_admin")
group_name = group.split('/')[-1] # Get last part of path

if group_name in group_role_mapping:
role_name = group_role_mapping[group_name]
if role_name in role_priority:
priority = role_priority.index(role_name)
if priority > highest_priority:
highest_priority = priority
highest_role_name = role_name
log.info(f"Group '{group}' maps to role '{role_name}' (priority {priority})")

# Return the highest priority role
if highest_role_name:
return [highest_role_name]
else:
# Reject users who don't have any approved Keycloak groups
log.error(f"Access denied: User has no approved Keycloak groups. User groups: {keycloak_groups}")
log.error("User must be assigned to one of these Keycloak groups to access Airflow:")
log.error(f" Approved groups: {list(group_role_mapping.keys())}")
raise Exception(
"Access denied: You are not assigned to any approved Keycloak groups. "
"Please contact your administrator to request access."
)

# Set the custom security manager
SECURITY_MANAGER_CLASS = CustomSecurityManager

# Security settings
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = None

# Session configuration
PERMANENT_SESSION_LIFETIME = 28800 # 8 hours

log.info("Airflow webserver configured for direct Keycloak OIDC authentication")
log.info(f"Keycloak provider: {OIDC_ISSUER}")
log.info(f"Keycloak client: {OIDC_CLIENT_ID}")
11 changes: 11 additions & 0 deletions airflow/helm/values.tmpl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ webserverSecretKeySecretName: ${webserver_secret_name}
webserver:
replicas: 3

# Keycloak OIDC Authentication Configuration
webserverConfig: |-
${webserver_config}

startupProbe:
timeoutSeconds: 20
failureThreshold: 60 # Number of tries before giving up (10 minutes with periodSeconds of 10)
Expand Down Expand Up @@ -368,3 +372,10 @@ extraEnv: |
value: "1024"
- name: AIRFLOW__WEBSERVER__EXPOSE_CONFIG
value: "True"
- name: AIRFLOW__WEBSERVER__BASE_URL
value: "${airflow_base_url}"
- name: OIDC_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: airflow-oidc-secret
key: client-secret
2 changes: 2 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ locals {
"dev" = "#58cc35"
"sbg-dev" = "#58cc35"
}[var.venue]
# BASE_URL uses placeholder initially, updated by null_resource after LB is created
airflow_base_url = "http://placeholder:${local.load_balancer_port}"
}
61 changes: 55 additions & 6 deletions terraform-unity/modules/terraform-unity-sps-airflow/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ resource "kubernetes_secret" "airflow_webserver" {
}
}

# Keycloak OIDC client secret for direct authentication
resource "kubernetes_secret" "airflow_oidc" {
count = var.enable_oidc_auth ? 1 : 0
metadata {
name = "airflow-oidc-secret"
namespace = data.kubernetes_namespace.service_area.metadata[0].name
}
data = {
"client-secret" = data.aws_ssm_parameter.keycloak_client_secret[0].value
}
}

data "aws_ssm_parameter" "keycloak_client_secret" {
count = var.enable_oidc_auth ? 1 : 0
name = var.keycloak_client_secret_ssm_param
}

# TODO evaluate if this role is still necessary
resource "kubernetes_role" "airflow_pod_creator" {
metadata {
Expand Down Expand Up @@ -413,6 +430,13 @@ resource "helm_release" "airflow" {
unity_cluster_name = data.aws_eks_cluster.cluster.name
karpenter_node_pools = join(",", var.karpenter_node_pools)
cwl_dag_ecr_uri = "${data.aws_caller_identity.current.account_id}.dkr.ecr.us-west-2.amazonaws.com"
airflow_base_url = local.airflow_base_url
# Keycloak Direct OIDC authentication configuration
webserver_config = indent(4, templatefile("${path.module}/../../../airflow/config/webserver_config.py.tpl", {
keycloak_role_mapping = var.keycloak_role_mapping
keycloak_provider_url = var.keycloak_provider_url
keycloak_client_id = var.keycloak_client_id
}))
})
]
set_sensitive {
Expand All @@ -431,6 +455,30 @@ resource "helm_release" "airflow" {
]
}

# Update Airflow BASE_URL after LoadBalancer is created
resource "null_resource" "update_airflow_base_url" {
triggers = {
lb_hostname = data.kubernetes_service.airflow_ingress_internal.status[0].load_balancer[0].ingress[0].hostname
}

provisioner "local-exec" {
command = <<EOT
kubectl set env deployment/airflow-webserver \
-n ${data.kubernetes_namespace.service_area.metadata[0].name} \
AIRFLOW__WEBSERVER__BASE_URL=http://${data.kubernetes_service.airflow_ingress_internal.status[0].load_balancer[0].ingress[0].hostname}:${local.load_balancer_port}
kubectl set env deployment/airflow-scheduler \
-n ${data.kubernetes_namespace.service_area.metadata[0].name} \
AIRFLOW__WEBSERVER__BASE_URL=http://${data.kubernetes_service.airflow_ingress_internal.status[0].load_balancer[0].ingress[0].hostname}:${local.load_balancer_port}
EOT
}

depends_on = [
helm_release.airflow,
kubernetes_service.airflow_ingress_internal,
time_sleep.wait_for_airflow_lb
]
}

/* Note: re-enable this to allow access via the JPL network
resource "aws_security_group" "airflow_ingress_sg" {
name = "${var.project}-${var.venue}-airflow-ingress-sg"
Expand Down Expand Up @@ -563,9 +611,10 @@ resource "kubernetes_service" "airflow_ingress_internal" {
}
}
wait_for_load_balancer = true
lifecycle { # this is necessary or terraform will try to recreate this every run
ignore_changes = all
}
# Temporarily disabled to allow updating load balancer scheme to internet-facing
# lifecycle { # this is necessary or terraform will try to recreate this every run
# ignore_changes = all
# }
depends_on = [helm_release.airflow]
}

Expand Down Expand Up @@ -662,8 +711,8 @@ resource "aws_ssm_parameter" "airflow_ui_url" {
name = format("/%s", join("/", compact(["", var.project, var.venue, var.service_area, "processing", "airflow", "ui_url"])))
description = "The URL of the Airflow UI."
type = "String"
# Updated to use LoadBalancer instead of shared services domain
value = "http://${data.kubernetes_service.airflow_ingress_internal.status[0].load_balancer[0].ingress[0].hostname}:${local.load_balancer_port}/"
# Updated to use LoadBalancer instead of shared services domain (no trailing slash)
value = "http://${data.kubernetes_service.airflow_ingress_internal.status[0].load_balancer[0].ingress[0].hostname}:${local.load_balancer_port}"
tags = merge(local.common_tags, {
Name = format(local.resource_name_prefix, "endpoints-airflow_ui")
Component = "SSM"
Expand Down Expand Up @@ -738,7 +787,7 @@ resource "aws_ssm_parameter" "airflow_api_health_check_endpoint" {

resource "aws_ssm_parameter" "unity_proxy_airflow_ui" {
name = format("/%s", join("/", compact(["unity", var.project, var.venue, "cs", "management", "proxy", "configurations", "015-sps-airflow-ui"])))
description = "The unity-proxy configuration for the Airflow UI."
description = "The unity-proxy configuration for the Airflow UI"
type = "String"
value = <<-EOT

Expand Down
36 changes: 36 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,39 @@ variable "karpenter_node_pools" {
description = "Names of the Karpenter node pools"
type = list(string)
}

variable "keycloak_provider_url" {
description = "Keycloak OIDC provider URL including realm (e.g., https://keycloak.example.com/realms/MAAP)"
type = string
default = "https://dit.kc-test-maap.xyz/realms/MAAP"
}

variable "keycloak_client_id" {
description = "Keycloak OIDC client ID for Airflow authentication"
type = string
default = "airflow"
}

variable "keycloak_client_secret_ssm_param" {
description = "SSM parameter path containing Keycloak OIDC client secret"
type = string
default = "/sps/keycloak/client_secret"
}

variable "enable_oidc_auth" {
description = "Enable Keycloak OIDC authentication for Airflow"
type = bool
default = true
}

variable "keycloak_role_mapping" {
description = "Mapping of Keycloak groups to Airflow roles"
type = map(list(string))
default = {
"airflow_admin" = ["Admin"]
"airflow_op" = ["Op"]
"airflow_user" = ["User"]
"airflow_viewer" = ["Viewer"]
"airflow_public" = ["Public"]
}
}