Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/urls/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
PlaylistOrderViewV2,
RebootViewV2,
RecoverViewV2,
ScreenshotViewV2,
ShutdownViewV2,
)

Expand Down Expand Up @@ -60,4 +61,9 @@ def get_url_patterns():
IntegrationsViewV2.as_view(),
name='integrations_v2',
),
path(
'v2/screenshot',
ScreenshotViewV2.as_view(),
name='screenshot_v2',
),
]
226 changes: 225 additions & 1 deletion api/views/v2.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import hashlib
import io
import ipaddress
import logging
import time
from datetime import timedelta
from os import getenv, statvfs
from os import getenv, path, statvfs
from platform import machine

import psutil
from drf_spectacular.utils import extend_schema
from django.http import HttpResponse
from hurry.filesize import size
from rest_framework import status
from rest_framework.response import Response
Expand Down Expand Up @@ -488,6 +491,227 @@
)


class ScreenshotViewV2(APIView):
"""Capture a screenshot of what is currently displayed.

For video assets, VLC on Raspberry Pi uses a hardware overlay that
bypasses the Linux framebuffer (/dev/fb0), resulting in a black image.
This view detects video playback via viewlog.db and extracts a frame
using ffmpeg instead. For images and web pages, it reads /dev/fb0.
"""

_cache = None
_cache_time = 0
CACHE_TTL = 5 # seconds

@staticmethod
def _get_current_video():
"""Check viewlog.db for a currently-playing video asset.

Returns (file_path, seconds_elapsed) or (None, None).
"""
import sqlite3
from datetime import datetime, timezone as tz

db_path = path.join(
path.expanduser('~'), '.screenly', 'viewlog.db'
)
if not path.exists(db_path):
return None, None

try:
conn = sqlite3.connect(db_path, timeout=3)
row = conn.execute(
'SELECT asset_id, mimetype, started_at FROM viewlog '
'ORDER BY id DESC LIMIT 1'
).fetchone()
conn.close()
except Exception:
return None, None

if not row:
return None, None

asset_id, mimetype, started_at = row
if mimetype != 'video':
return None, None

try:
start_dt = datetime.fromisoformat(started_at)
elapsed = (datetime.now(tz.utc) - start_dt).total_seconds()
except Exception:
elapsed = 0

if elapsed < 0:
elapsed = 0

try:
asset = Asset.objects.get(asset_id=asset_id)
except Asset.DoesNotExist:
return None, None

if elapsed > asset.duration + 2:
return None, None

file_path = asset.uri
if not path.isfile(file_path):
from settings import settings as app_settings
assets_dir = path.join(
path.expanduser('~'), app_settings['assetdir']
)
candidate = path.join(assets_dir, f'{asset_id}.mp4')
if path.isfile(candidate):
file_path = candidate
else:
return None, None

return file_path, elapsed

@staticmethod
def _ffmpeg_frame(video_path, seek_seconds, quality=70, width=None):
"""Extract a single frame from a video file using ffmpeg.

Returns JPEG bytes or None on failure.
"""
import subprocess

seek = max(0, int(seek_seconds))
cmd = [
'ffmpeg', '-ss', str(seek),
'-i', video_path,
'-frames:v', '1',
'-q:v', str(max(1, min(31, (100 - quality) * 31 // 100))),
]
if width:
cmd += ['-vf', f'scale={int(width)}:-1']
cmd += ['-f', 'image2pipe', '-vcodec', 'mjpeg', 'pipe:1']

try:
result = subprocess.run(
cmd, capture_output=True, timeout=10,
)
if result.returncode == 0 and len(result.stdout) > 100:
return result.stdout
except Exception as e:
logging.warning('ffmpeg frame extraction failed: %s', e)
return None

@extend_schema(
summary='Take a screenshot of the current display',
responses={
200: {
'type': 'string',
'format': 'binary',
'description': 'JPEG image',
}
},
)
@authorized
def get(self, request):
width = request.query_params.get('width', None)
quality = int(request.query_params.get('quality', 70))
quality = max(10, min(100, quality))

now = time.time()
if (
ScreenshotViewV2._cache is not None
and now - ScreenshotViewV2._cache_time < self.CACHE_TTL
and width is None
):
return HttpResponse(
ScreenshotViewV2._cache,
content_type='image/jpeg',
)

# If a video is playing, extract a frame via ffmpeg
# (VLC hardware overlay bypasses /dev/fb0 on Pi)
video_path, elapsed = self._get_current_video()
if video_path:
video_width = width or '640'
video_quality = min(quality, 60)
jpeg_bytes = self._ffmpeg_frame(
video_path, elapsed, video_quality, video_width
)
if jpeg_bytes:
if width is None:
ScreenshotViewV2._cache = jpeg_bytes
ScreenshotViewV2._cache_time = now
return HttpResponse(
jpeg_bytes, content_type='image/jpeg'
)

# Fallback: framebuffer capture (works for images and web pages)
try:
from PIL import Image
except ImportError:
return Response(
{'error': 'Pillow is not installed'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

fb_path = '/dev/fb0'
if not path.exists(fb_path):
return Response(
{'error': 'Framebuffer /dev/fb0 not available'},
status=status.HTTP_503_SERVICE_UNAVAILABLE,
)

try:
with open('/sys/class/graphics/fb0/virtual_size') as f:
fb_w, fb_h = [int(x) for x in f.read().strip().split(',')]
with open('/sys/class/graphics/fb0/bits_per_pixel') as f:
bpp = int(f.read().strip())

bytes_per_pixel = bpp // 8
line_length = fb_w * bytes_per_pixel

with open(fb_path, 'rb') as fb:
raw = fb.read(line_length * fb_h)

if bytes_per_pixel == 4:
img = Image.frombytes(
'RGBA', (fb_w, fb_h), raw, 'raw', 'BGRA',
)
elif bytes_per_pixel == 3:
img = Image.frombytes(
'RGB', (fb_w, fb_h), raw, 'raw', 'BGR',
)
elif bytes_per_pixel == 2:
img = Image.frombytes(
'RGB', (fb_w, fb_h), raw, 'raw', 'BGR;16',
)
else:
return Response(
{'error': f'Unsupported bits_per_pixel: {bpp}'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)

img = img.convert('RGB')

if width:
w = int(width)
ratio = w / img.width
h = int(img.height * ratio)
img = img.resize((w, h), Image.LANCZOS)

buf = io.BytesIO()
img.save(buf, format='JPEG', quality=quality)
jpeg_bytes = buf.getvalue()

if width is None:
ScreenshotViewV2._cache = jpeg_bytes
ScreenshotViewV2._cache_time = now

return HttpResponse(jpeg_bytes, content_type='image/jpeg')

except Exception as e:
logging.error(f'Screenshot error: {e}')
return Response(
{'error': str(e)},

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)


class IntegrationsViewV2(APIView):
serializer_class = IntegrationsSerializerV2

Expand Down
Loading