Skip to content

Commit 5825c90

Browse files
committed
Add celery tasks
1 parent b42f55a commit 5825c90

File tree

10 files changed

+222
-35
lines changed

10 files changed

+222
-35
lines changed

Diff for: Makefile

+6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ run-prod:
7979
stop-prod:
8080
docker compose down
8181

82+
create-celery-db:
83+
if ! docker compose -f docker-compose-dev.yml exec database psql -U ${DATABASE_USER} -h localhost -lqt | cut -d \| -f 1 | grep -qw ${DATABASE_CELERY_NAME}; then \
84+
docker compose -f docker-compose-dev.yml exec database createdb -U ${DATABASE_USER} -W ${DATABASE_PASSWORD} -h localhost -O ${DATABASE_USER} ${DATABASE_CELERY_NAME}; \
85+
fi
86+
87+
8288
init-db:
8389
docker compose -f docker-compose-dev.yml exec fastapi_server python app/initial_data.py && \
8490
echo "Initial data created."

Diff for: backend/app/app/api/celery_task.py

+22
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,31 @@
1+
import asyncio
12
import time
3+
from uuid import UUID
4+
from app import crud
25
from app.core.celery import celery
6+
from app.models.hero_model import Hero
7+
from app.schemas.hero_schema import IHeroCreate, IHeroRead, IHeroReadWithTeam
8+
from app.db.session import SessionLocal
9+
from asyncer import runnify
310

411
@celery.task(name="increment_a_value")
512
def increment(value: int) -> int:
613
time.sleep(5)
714
new_value = value + 1
815
print("new_value", new_value)
916
return new_value
17+
18+
19+
async def get_hero(hero_id: UUID) -> Hero:
20+
async with SessionLocal() as session:
21+
await asyncio.sleep(5) # Add a delay of 5 seconds
22+
hero = await crud.hero.get(id=hero_id, db_session=session)
23+
return hero
24+
25+
26+
@celery.task(name="print_hero")
27+
def print_hero(hero_id: UUID) -> None:
28+
hero = runnify(get_hero)(hero_id=hero_id)
29+
print(f"hero_id {hero.id}")
30+
return hero.id
31+

Diff for: backend/app/app/api/v1/endpoints/hero.py

+4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from uuid import UUID
2+
from app.api.celery_task import print_hero
23
from app.utils.exceptions import IdNotFoundException, NameNotFoundException
34
from fastapi import APIRouter, Depends, HTTPException, Query
45
from fastapi_pagination import Params
@@ -23,6 +24,7 @@
2324
)
2425
from app.schemas.role_schema import IRoleEnum
2526
from app.core.authz import is_authorized
27+
from fastapi.encoders import jsonable_encoder
2628

2729
router = APIRouter()
2830

@@ -68,6 +70,8 @@ async def get_hero_by_id(
6870
hero = await crud.hero.get(id=hero_id)
6971
if not hero:
7072
raise IdNotFoundException(Hero, hero_id)
73+
74+
print_hero.delay(hero.id)
7175
return create_response(data=hero)
7276

7377

Diff for: backend/app/app/deps/celery_deps.py

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from typing import Generator
2+
from celery_sqlalchemy_scheduler.session import SessionManager
3+
from app.core.config import settings
4+
5+
def get_job_db() ->Generator:
6+
try:
7+
session_manager = SessionManager()
8+
engine, _session = session_manager.create_session(settings.SYNC_CELERY_BEAT_DATABASE_URI)
9+
session = _session()
10+
return session
11+
finally:
12+
session.close()

Diff for: backend/app/app/main.py

+98-15
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from datetime import datetime, timedelta
22
import gc
3+
import json
34
from typing import Any
4-
from fastapi import FastAPI, HTTPException
5+
from app.deps.celery_deps import get_job_db
6+
from fastapi import Depends, FastAPI, HTTPException
57
from app.api.deps import get_redis_client
68
from fastapi_pagination import add_pagination
79
from starlette.middleware.cors import CORSMiddleware
@@ -15,6 +17,13 @@
1517
from transformers import pipeline
1618
from app.api.celery_task import increment
1719
from app.core.celery import celery
20+
from celery_sqlalchemy_scheduler.models import (
21+
PeriodicTask,
22+
IntervalSchedule,
23+
CrontabSchedule,
24+
)
25+
from sqlmodel import select
26+
1827

1928
@asynccontextmanager
2029
async def lifespan(app: FastAPI):
@@ -49,7 +58,6 @@ async def lifespan(app: FastAPI):
4958
)
5059

5160

52-
5361
app.add_middleware(
5462
SQLAlchemyMiddleware,
5563
db_url=settings.ASYNC_DATABASE_URI,
@@ -90,18 +98,16 @@ async def root():
9098
An example "Hello world" FastAPI route.
9199
"""
92100

93-
94-
val = increment.delay(1) #wait
95-
increment.delay(9) #no wait
101+
val = increment.delay(1) # wait
102+
increment.delay(9) # no wait
96103
tomorrow = datetime.utcnow() + timedelta(seconds=20)
97-
new_tomorrow = increment.apply_async(args=[7], eta=tomorrow)
104+
new_tomorrow = increment.apply_async(args=[8], eta=tomorrow)
98105
print("task_id", new_tomorrow.task_id)
99106
print("result", new_tomorrow.result)
100107
print("status", new_tomorrow.status)
101108
increment.apply_async(args=[20], expires=datetime.now() + timedelta(seconds=10))
102-
103-
104-
# if oso.is_allowed(user, "read", message):
109+
110+
# if oso.is_allowed(user, "read", message):
105111
return {"message": new_tomorrow.task_id}
106112

107113

@@ -112,16 +118,93 @@ async def root(task_id: Any):
112118
"""
113119
# Retrieve the result using the task ID
114120
async_result = celery.AsyncResult(task_id)
115-
print("state", async_result.state)
116-
print("ready", async_result.ready())
117-
print("successful", async_result.successful())
121+
118122
if async_result.ready():
119-
print(f"Task {task_id} exists and has completed.")
123+
if not async_result.successful():
124+
raise HTTPException(
125+
status_code=404,
126+
detail=f"Task {task_id} with state {async_result.state}.",
127+
)
128+
120129
result = async_result.get(timeout=1.0)
121130
return {"message": result}
122-
else:
123-
raise HTTPException(status_code=404, detail="Task {task_id} does not exist or is still running.")
131+
else:
132+
raise HTTPException(
133+
status_code=404,
134+
detail=f"Task {task_id} does not exist or is still running.",
135+
)
136+
137+
138+
@app.get("/3")
139+
async def root(celery_session=Depends(get_job_db)):
140+
"""
141+
An example "Hello world" FastAPI route.
142+
"""
143+
# Retrieve the result using the task ID
144+
periodic_task = PeriodicTask(
145+
crontab=CrontabSchedule(
146+
hour=14, minute=47, day_of_month=24, month_of_year=3, timezone="UTC"
147+
),
148+
name="new_uuid",
149+
args="[8]",
150+
task="increment_a_value",
151+
one_off=True,
152+
)
153+
celery_session.add(periodic_task)
154+
celery_session.commit()
155+
celery_session.close()
156+
157+
return {"message": "hello"}
158+
159+
160+
@app.get("/4")
161+
async def root(celery_session=Depends(get_job_db)):
162+
"""
163+
An example "Hello world" FastAPI route.
164+
"""
165+
# Retrieve the result using the task ID
166+
query = select(PeriodicTask).where(PeriodicTask.name == "new_uuid")
167+
periodic_task = celery_session.execute(query).scalar_one_or_none()
124168

169+
periodic_task.crontab = CrontabSchedule(
170+
hour=19, minute=58, day_of_month=24, month_of_year=3, timezone="UTC"
171+
)
172+
celery_session.add(periodic_task)
173+
celery_session.commit()
174+
celery_session.close()
175+
176+
return {"message": "hello"}
177+
178+
@app.get("/5")
179+
async def root(celery_session=Depends(get_job_db)):
180+
"""
181+
An example "Hello world" FastAPI route.
182+
"""
183+
# Retrieve the result using the task ID
184+
periodic_task = PeriodicTask(
185+
interval=IntervalSchedule(every=10, period=IntervalSchedule.SECONDS),
186+
name="new_uuid_new_interval",
187+
args="[8]",
188+
task="increment_a_value",
189+
)
190+
celery_session.add(periodic_task)
191+
celery_session.commit()
192+
celery_session.close()
193+
194+
@app.get("/6")
195+
async def root(celery_session=Depends(get_job_db)):
196+
"""
197+
An example "Hello world" FastAPI route.
198+
"""
199+
# Retrieve the result using the task ID
200+
query = select(PeriodicTask).where(PeriodicTask.name == "new_uuid_new_interval")
201+
periodic_task = celery_session.execute(query).scalar_one_or_none()
202+
periodic_task.enabled = False
203+
celery_session.add(periodic_task)
204+
celery_session.commit()
205+
celery_session.close()
206+
207+
return {"message": "hello"}
125208

126209
# Add Routers
127210
app.include_router(api_router_v1, prefix=settings.API_V1_STR)

Diff for: backend/app/poetry.lock

+41-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: backend/app/pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ transformers = {extras = ["torch"], version = "^4.27.2"}
5858
celery = "^5.2.7"
5959
celery-sqlalchemy-scheduler = "^0.3.0"
6060
psycopg2-binary = "^2.9.5"
61+
watchdog = "^3.0.0"
6162

6263
[tool.poetry.group.dev.dependencies]
6364
pytest = "^5.2"

Diff for: create-dbs.sql

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
CREATE DATABASE fastapi_db;
2+
CREATE DATABASE celery_schedule_jobs;

Diff for: docker-compose-dev.yml

+30-11
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,44 @@ services:
2424
- "traefik.http.routers.fastapi_local_base.rule=Host(`${LOCAL_1}`, `${LOCAL_2}`, `${EXT_ENDPOINT1}`)"
2525
- traefik.http.services.fastapi_localservice.loadbalancer.server.port=8000
2626

27+
# database:
28+
# image: postgres
29+
# restart: always
30+
# container_name: database
31+
# env_file: ".env"
32+
# #user: root
33+
# volumes:
34+
# - ./db_docker:/var/lib/postgresql/data
35+
# #- ./db_docker:/bitnami/postgresql
36+
# - ./init-multi-postgres-databases.sh:/docker-entrypoint-initdb.d/init-multi-postgres-databases.sh
37+
# ports:
38+
# - 5454:5432 # Remove this on production
39+
# expose:
40+
# - 5432
41+
# environment:
42+
# - POSTGRES_USERNAME=${DATABASE_USER}
43+
# - POSTGRES_PASSWORD=${DATABASE_PASSWORD}
44+
# - POSTGRES_DB=fastapi_db
45+
# #- POSTGRES_DATABASE=${DATABASE_NAME},${DATABASE_CELERY_NAME}
46+
# - POSTGRES_MULTIBLE_DATABASES=fastapi_db,celery_schedule_jobs
47+
# #command: ["bash", "-c", "chmod +x /docker-entrypoint-initdb.d/init-multi-postgres-databases.sh && postgres"]
48+
2749
database:
28-
image: bitnami/postgresql:13.3.0
50+
image: postgres:latest
2951
restart: always
3052
container_name: database
3153
env_file: ".env"
32-
user: root
33-
volumes:
34-
- ./db_docker:/bitnami/postgresql
3554
ports:
3655
- 5454:5432 # Remove this on production
3756
expose:
3857
- 5432
58+
volumes:
59+
- ./db_docker:/var/lib/postgresql/data
60+
- ./create-dbs.sql:/docker-entrypoint-initdb.d/create-dbs.sql
3961
environment:
40-
- POSTGRES_USERNAME=${DATABASE_USER}
62+
- POSTGRES_USER=${DATABASE_USER}
4163
- POSTGRES_PASSWORD=${DATABASE_PASSWORD}
42-
- POSTGRES_DATABASE=${DATABASE_NAME}
43-
- POSTGRES_HOST_AUTH_METHOD= "trust"
44-
- POSTGRESQL_POSTGRES_PASSWORD= ${DATABASE_PASSWORD}
64+
4565

4666
redis_server:
4767
container_name: redis_server
@@ -59,9 +79,7 @@ services:
5979
context: ./backend
6080
args:
6181
INSTALL_DEV: ${INSTALL_DEV-false}
62-
command: "celery -A app.core.celery worker -l info "
63-
# celery -A my_celery_app worker --without-heartbeat --without-gossip --without-mingle
64-
# command: " watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A app.core.worker worker -l info "
82+
command: "watchfiles 'celery -A app.core.celery worker -l info' "
6583
volumes:
6684
- ./backend/app:/code
6785
# - "${EB_LOG_BASE_DIR}/php-app:/var/log/celery"
@@ -86,6 +104,7 @@ services:
86104
- database
87105
- redis_server
88106
env_file: .env
107+
89108

90109
nginx_server:
91110
container_name: nginx_server

0 commit comments

Comments
 (0)