Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions backend/src/zimfarm_backend/common/schemas/orms.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,18 @@ class TaskContainerProgressSchema(BaseModel):
total: int | None = None


class TaskContainerMemoryStatsSchema(BaseModel):
max_usage: int | None = None
class TaskResourceUsageSchema(BaseModel):
max_usage: int | float | None = Field(default=None, alias="max")


class TaskContainerStatsSchema(BaseModel):
memory: TaskContainerMemoryStatsSchema = Field(
default_factory=TaskContainerMemoryStatsSchema
)
class TaskCPUUsageSchema(TaskResourceUsageSchema):
avg_usage: float | None = Field(default=None, alias="avg")


class TaskStatsSchema(BaseModel):
memory: TaskResourceUsageSchema = Field(default_factory=TaskResourceUsageSchema)
cpu: TaskCPUUsageSchema = Field(default_factory=TaskCPUUsageSchema)
disk: TaskResourceUsageSchema = Field(default_factory=TaskResourceUsageSchema)


class TaskContainerSchema(BaseModel):
Expand All @@ -130,7 +134,7 @@ class TaskContainerSchema(BaseModel):

log: str | None = None
image: str | None = None
stats: dict[str, Any] | None = None
stats: TaskStatsSchema | None = None
artifacts: str | None = None
stderr: str | None = None
stdout: str | None = None
Expand Down
15 changes: 12 additions & 3 deletions frontend-ui/src/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ export interface TaskFile {
info?: TaskFileInfo
}

export interface TaskResourceUsage {
max?: number | null
}

export interface TaskCPUUsage {
max?: number
avg?: number
}

export interface TaskContainer {
command: string[]
exit_code?: number
Expand All @@ -46,9 +55,9 @@ export interface TaskContainer {
total: number
} | null
stats?: {
memory?: {
max_usage?: number | null
}
memory?: TaskResourceUsage
disk?: TaskResourceUsage
cpu?: TaskCPUUsage
}
}

Expand Down
79 changes: 73 additions & 6 deletions frontend-ui/src/views/TaskDetailView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,55 @@
<code class="text-pink-accent-2">{{ taskContainer.exit_code }}</code>
</td>
</tr>
<tr v-if="maxMemory != null">
<tr v-if="hasStats">
<th class="text-left w-20">Stats</th>
<td>
<v-chip size="small" class="mr-2">
<v-icon size="small" class="mr-1">mdi-memory</v-icon>
{{ maxMemory }} (max)
</v-chip>
<div class="d-flex flex-wrap ga-2">
<v-tooltip
v-if="maxMemory"
text="Maximum memory used during task execution"
>
<template #activator="{ props }">
<v-chip v-bind="props" size="small">
<v-icon size="small" class="mr-1">mdi-memory</v-icon>
{{ maxMemory }} (max)
</v-chip>
</template>
</v-tooltip>
<v-tooltip
v-if="maxDisk"
text="Maximum disk space used during task execution"
>
<template #activator="{ props }">
<v-chip v-bind="props" size="small">
<v-icon size="small" class="mr-1">mdi-harddisk</v-icon>
{{ maxDisk }} (max)
</v-chip>
</template>
</v-tooltip>
<v-tooltip
v-if="hasCpuStats && cpuStats && cpuStats.max !== null"
text="Maximum CPU usage percentage during task execution"
>
<template #activator="{ props }">
<v-chip v-bind="props" size="small">
<v-icon size="small" class="mr-1">mdi-cpu-64-bit</v-icon>
{{ cpuStats.max.toFixed(1) }}% (max)
</v-chip>
</template>
</v-tooltip>
<v-tooltip
v-if="hasCpuStats && cpuStats && cpuStats.avg !== null"
text="Average CPU usage percentage during task execution"
>
<template #activator="{ props }">
<v-chip v-bind="props" size="small">
<v-icon size="small" class="mr-1">mdi-chart-line</v-icon>
{{ cpuStats.avg.toFixed(1) }}% (avg)
</v-chip>
</template>
</v-tooltip>
</div>
</td>
</tr>
<tr v-if="taskProgress">
Expand Down Expand Up @@ -578,12 +620,37 @@ const canCancel = computed(() => {

const maxMemory = computed(() => {
try {
return formattedBytesSize(taskContainer.value?.stats?.memory?.max_usage || 0)
return formattedBytesSize(taskContainer.value?.stats?.memory?.max || 0)
} catch {
return null
}
})

const maxDisk = computed(() => {
try {
return formattedBytesSize(taskContainer.value?.stats?.disk?.max || 0)
} catch {
return null
}
})

const cpuStats = computed(() => {
const stats = taskContainer.value?.stats?.cpu
if (!stats) return null
return {
max: stats.max ?? null,
avg: stats.avg ?? null,
}
})

const hasCpuStats = computed(() => {
return cpuStats.value && (cpuStats.value.max !== null || cpuStats.value.avg !== null)
})

const hasStats = computed(() => {
return maxMemory.value || maxDisk.value || hasCpuStats.value
})

const monitoringUrl = computed(() => {
return `http://monitoring.openzim.org/host/${scheduleName.value}_${shortId.value}.${
task.value?.worker_name
Expand Down
79 changes: 77 additions & 2 deletions worker/src/zimfarm_worker/task/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
from zimfarm_worker.task.zim import get_zim_info

SLEEP_INTERVAL = 60 # nb of seconds to sleep before watching
CPU_EWMA_ALPHA = 0.01 # EWMA smoothing factor for CPU percentage samples (0..1)


PENDING = "pending"
DOING = "doing"
DONE = "done"
Expand Down Expand Up @@ -132,6 +135,9 @@ def __init__(
self.scraper_succeeded: bool | None = None # whether scraper succeeded

self.max_memory_usage: int = 0 # maximum memory used by scraper
self.max_disk_usage: int = 0 # maximum disk used by scraper
self.avg_cpu_usage: float = 0.0 # cpu exponential moving weighted average
self.max_cpu_usage: float = 0.0 # maximum cpu percentage used by scraper

# register stop/^C
self.register_signals()
Expand Down Expand Up @@ -192,8 +198,62 @@ def mark_scraper_completed(self, exit_code: int, stdout: str, stderr: str):
}
)

def _get_scraper_disk_usage(self) -> int:
"""
Get disk usage of scraper container's task workdir in bytes.

Calculates the actual disk space used by files in the scraper's
task workdir (where ZIM files and other outputs are written).
"""
if not self.task_workdir:
return 0

try:
if self.task_workdir.exists() and self.task_workdir.is_dir():
return sum(
f.stat().st_size
for f in self.task_workdir.rglob("*")
if f.is_file()
)
return 0
except Exception:
logger.exception("Failed to get scraper disk usage")
return 0

def _compute_scraper_cpu_stats(self, scraper_stats: dict[str, Any]) -> float:
"""
Compute CPU usage statistics from scraper container stats.

Calculates CPU percentage with EWMA smoothing to reduce effect of
short spikes.
"""
cpu_sample = 0.0
cpu_stats = scraper_stats.get("cpu_stats", {})
precpu_stats = scraper_stats.get("precpu_stats", {})
prev_total = precpu_stats.get("cpu_usage", {}).get("total_usage", 0)
curr_total = cpu_stats.get("cpu_usage", {}).get("total_usage", 0)
prev_system = precpu_stats.get("system_cpu_usage", 0)
curr_system = cpu_stats.get("system_cpu_usage", 0)

delta_cpu = curr_total - prev_total
delta_system = curr_system - prev_system
online_cpus = cpu_stats.get("online_cpus", 0)

if delta_system > 0 and delta_cpu >= 0 and online_cpus > 0:
cpu_sample = (delta_cpu / float(delta_system)) * float(online_cpus) * 100.0

# apply EWMA smoothing to reduce effect of short spikes
if self.avg_cpu_usage == 0.0:
self.avg_cpu_usage = cpu_sample
else:
self.avg_cpu_usage = (
CPU_EWMA_ALPHA * cpu_sample
+ (1.0 - CPU_EWMA_ALPHA) * self.avg_cpu_usage
)
return cpu_sample

def submit_scraper_progress(self):
"""report last lines of scraper to the API"""
"""report scraper statistics and logs to the API"""
if not self.scraper:
logger.error("No scraper to update")
return
Expand All @@ -204,17 +264,32 @@ def submit_scraper_progress(self):
stream=False
)
scraper_stats = cast(dict[str, Any], scraper_stats)

# update statistics
self.max_memory_usage = max(
[
scraper_stats.get("memory_stats", {}).get("usage", 0),
self.max_memory_usage,
]
)

cpu_sample = self._compute_scraper_cpu_stats(scraper_stats)
self.max_cpu_usage = max([cpu_sample, self.max_cpu_usage])

disk_usage = self._get_scraper_disk_usage()
self.max_disk_usage = max([disk_usage, self.max_disk_usage])

stats: dict[str, Any] = {
"memory": {
"max_usage": self.max_memory_usage,
}
},
"cpu": {
"max_usage": self.max_cpu_usage,
"avg_usage": round(self.avg_cpu_usage, 2),
},
"disk": {
"max_usage": self.max_disk_usage,
},
}

# fetch and compute progression from progress file
Expand Down