Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 80 additions & 63 deletions backend/python/app/dependencies/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import firebase_admin.auth
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import exists, select as sql_select
from sqlalchemy.ext.asyncio import AsyncSession

from app.config import settings
from app.models import get_session
from app.models.driver_assignment import DriverAssignment
from app.services.implementations.auth_service import AuthService
from app.services.implementations.driver_service import DriverService
from app.services.implementations.email_service import EmailService
Expand Down Expand Up @@ -70,84 +72,97 @@ async def check_role(
return check_role


def require_authorization_by_user_id(user_id: str) -> Callable:
async def require_self_driver_or_admin(
driver_id: UUID,
access_token: str = Depends(get_access_token),
session: AsyncSession = Depends(get_session),
) -> bool:
"""
Create a dependency that checks if the user is authorized for a specific user_id

:param user_id: The user ID to check authorization for
:return: FastAPI dependency function
Allow access if user is an admin, or if the authenticated user is the driver
with the given driver_id. Used for GET /drivers/{driver_id}.
"""
try:
decoded_token = firebase_admin.auth.verify_id_token(
access_token, check_revoked=True
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
) from e

async def check_user_id(
access_token: str = Depends(get_access_token),
session: AsyncSession = Depends(get_session),
) -> bool:
try:
authorized = await auth_service.is_authorized_by_driver_id(
session, access_token, UUID(user_id)
)
if not authorized:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="You are not authorized to make this request.",
)
return True
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="You are not authorized to make this request.",
) from e
if decoded_token.get("role") == "admin":
return True

return check_user_id
authorized = await auth_service.is_authorized_by_driver_id(
session, access_token, driver_id
)
if not authorized:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not authorized to access this resource.",
)
return True


def require_authorization_by_email(email: str) -> Callable:
async def require_route_assigned_or_admin(
route_id: UUID,
access_token: str = Depends(get_access_token),
session: AsyncSession = Depends(get_session),
) -> bool:
"""
Create a dependency that checks if the user is authorized for a specific email

:param email: The email to check authorization for
:return: FastAPI dependency function
Allow access if user is an admin, or if the authenticated driver is assigned
to the given route. Used for GET /routes/{route_id}.
"""
try:
decoded_token = firebase_admin.auth.verify_id_token(
access_token, check_revoked=True
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
) from e

if decoded_token.get("role") == "admin":
return True

# Get the driver_id for the authenticated user
token_driver_id = await driver_service.get_driver_id_by_auth_id(
session, decoded_token["uid"]
)
if token_driver_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not authorized to access this resource.",
)

def check_email(access_token: str = Depends(get_access_token)) -> bool:
try:
authorized = auth_service.is_authorized_by_email(access_token, email)
if not authorized:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="You are not authorized to make this request.",
# Check if this driver has any assignment for this route
result = await session.execute(
sql_select(
exists(
sql_select(1)
.select_from(DriverAssignment)
.where(
DriverAssignment.driver_id == token_driver_id,
DriverAssignment.route_id == route_id,
)
return True
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="You are not authorized to make this request.",
) from e
)
)
)
is_assigned = result.scalar()

return check_email
if not is_assigned:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not authorized to access this resource.",
)
return True


# Common authorization dependencies
require_admin = require_authorization_by_role({"admin"})


def get_current_user_id(access_token: str = Depends(get_access_token)) -> str:
"""
Get the current user ID from the access token

:param access_token: JWT access token
:return: User ID
"""
try:
decoded_token: dict[str, str] = firebase_admin.auth.verify_id_token(
access_token, check_revoked=True
)
return str(decoded_token["uid"])
except Exception as e:
logger.error(f"Failed to decode access token: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token"
) from e
require_driver_or_admin = require_authorization_by_role({"driver", "admin"})


def get_current_user_email(access_token: str = Depends(get_access_token)) -> str:
Expand Down Expand Up @@ -195,6 +210,8 @@ async def get_current_database_user_id(
status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found"
)
return database_user_id
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get database user ID from access token: {e}")
raise HTTPException(
Expand Down
4 changes: 0 additions & 4 deletions backend/python/app/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
admin_routes,
announcement_routes,
auth_routes,
database_routes,
driver_assignment_routes,
driver_history_routes,
driver_routes,
Expand All @@ -15,7 +14,6 @@
note_chain_routes,
route_group_routes,
route_routes,
simple_entity_routes,
upload_routes,
)

Expand All @@ -24,13 +22,11 @@ def init_app(app: FastAPI) -> None:
"""Initialize all routers with the FastAPI app"""
app.include_router(admin_routes.router)
app.include_router(announcement_routes.router)
app.include_router(database_routes.router)
app.include_router(driver_assignment_routes.router)
app.include_router(auth_routes.router)
app.include_router(driver_history_routes.router)
app.include_router(driver_routes.router)
app.include_router(entity_routes.router)
app.include_router(simple_entity_routes.router)
app.include_router(location_group_routes.router)
app.include_router(route_group_routes.router)
app.include_router(route_routes.router)
Expand Down
173 changes: 0 additions & 173 deletions backend/python/app/routers/database_routes.py

This file was deleted.

Loading
Loading