From 70127a813506ca7e57fa09542e4929cbc3e9ad74 Mon Sep 17 00:00:00 2001 From: TriDefender Date: Mon, 29 Sep 2025 23:12:07 +0800 Subject: [PATCH] Refactor spatial audio pipeline and improve robustness --- main.py | 290 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 208 insertions(+), 82 deletions(-) diff --git a/main.py b/main.py index 4d9c42a..64995c5 100644 --- a/main.py +++ b/main.py @@ -1,112 +1,238 @@ +"""Main entry-point for the Spatial Audio Studio demo.""" + +from __future__ import annotations + +import argparse +import contextlib +import logging +from dataclasses import dataclass, field +from typing import Tuple + import cv2 import mediapipe as mp import numpy as np import sounddevice as sd + # Audio settings BLOCKSIZE = 1024 SAMPLERATE = 44100 CHANNELS = 2 -# Device indices from query_devices() -blackhole_input_device = 0 -headphones_output_device = 1 +# Default device indices from query_devices() +DEFAULT_INPUT_DEVICE = 0 +DEFAULT_OUTPUT_DEVICE = 1 -# Mediapipe setup -mp_face_mesh = mp.solutions.face_mesh -face_mesh = mp_face_mesh.FaceMesh(max_num_faces=1) +# Reverb constants +REVERB_DECAY = 0.35 +REVERB_MULTIPLIER = 2 -mp_hands = mp.solutions.hands -hands = mp_hands.Hands(max_num_hands=1, min_detection_confidence=0.7) -# Webcam -cap = cv2.VideoCapture(1) +logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") -# Shared panning, distance values, and master volume -pan = 0.0 -distance_factor = 1.0 -master_volume = 0.8 -# Reverb buffer (simple delay line) -reverb_buffer = np.zeros((BLOCKSIZE * 2, 2)) -reverb_index = 0 -REVERB_DECAY = 0.35 +@dataclass +class SpatialAudioState: + """Holds state shared between the tracker and the audio callback.""" + + pan: float = 0.0 + distance_factor: float = 1.0 + master_volume: float = 0.8 + reverb_buffer: np.ndarray = field( + default_factory=lambda: np.zeros((BLOCKSIZE * REVERB_MULTIPLIER, CHANNELS), dtype=np.float32) + ) + reverb_index: int = 0 -# Audio callback -def audio_callback(indata, outdata, frames, time, status): - global pan, distance_factor, reverb_buffer, reverb_index, master_volume - if status: - print(status) - - left = indata[:, 0] * (1 - pan) - right = indata[:, 1] * (1 + pan) - stereo = np.column_stack((left, right)) - stereo *= distance_factor - - for i in range(len(stereo)): - delayed = reverb_buffer[reverb_index] - stereo[i] += delayed * REVERB_DECAY - reverb_buffer[reverb_index] = stereo[i] - reverb_index = (reverb_index + 1) % len(reverb_buffer) - - stereo *= master_volume - outdata[:] = stereo - -# Start audio stream -stream = sd.Stream( - samplerate=SAMPLERATE, - blocksize=BLOCKSIZE, - device=(blackhole_input_device, headphones_output_device), - channels=CHANNELS, - callback=audio_callback -) -stream.start() - -# Head and hand tracking loop -while True: - ret, frame = cap.read() - if not ret: - break - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - face_result = face_mesh.process(frame_rgb) - hand_result = hands.process(frame_rgb) - - if face_result.multi_face_landmarks: - landmarks = face_result.multi_face_landmarks[0] - left_ear = landmarks.landmark[234] - right_ear = landmarks.landmark[454] + def update_pan_from_face(self, face_landmarks: mp.framework.formats.landmark_pb2.NormalizedLandmarkList) -> None: + """Update panning and distance factor using the Face Mesh landmarks.""" + + left_ear = face_landmarks.landmark[234] + right_ear = face_landmarks.landmark[454] dx = right_ear.x - left_ear.x dy = right_ear.y - left_ear.y - yaw = np.arctan2(dy, dx) * (180.0 / np.pi) + yaw = np.degrees(np.arctan2(dy, dx)) alpha = 0.15 - pan = alpha * np.clip(yaw / 30.0, -0.8, 0.8) + (1 - alpha) * pan + target_pan = np.clip(yaw / 30.0, -0.8, 0.8) + self.pan = alpha * target_pan + (1 - alpha) * self.pan ear_distance = np.sqrt(dx**2 + dy**2) - distance_factor = np.clip((ear_distance - 0.05) * 8.0, 0.4, 1.0) + target_distance = np.clip((ear_distance - 0.05) * 8.0, 0.4, 1.0) + self.distance_factor = 0.2 * target_distance + 0.8 * self.distance_factor - if hand_result.multi_hand_landmarks: - hand_landmarks = hand_result.multi_hand_landmarks[0] - index_finger_tip = hand_landmarks.landmark[8] # index fingertip + def update_volume_from_hand(self, hand_landmarks: mp.framework.formats.landmark_pb2.NormalizedLandmarkList) -> None: + """Update the master volume using the hand landmarks.""" - # Map Y-position to volume: lower Y (hand up) = higher volume + index_finger_tip = hand_landmarks.landmark[8] hand_y = index_finger_tip.y - volume_alpha = 0.2 + new_volume = np.clip(1.2 - hand_y * 2, 0.2, 1.0) - master_volume = volume_alpha * new_volume + (1 - volume_alpha) * master_volume + volume_alpha = 0.2 + self.master_volume = volume_alpha * new_volume + (1 - volume_alpha) * self.master_volume + + def process_audio_block(self, indata: np.ndarray) -> np.ndarray: + """Apply panning, distance attenuation, and reverb to the audio block.""" + + if indata.ndim == 1: + stereo = np.repeat(indata[:, np.newaxis], CHANNELS, axis=1) + elif indata.shape[1] == 1: + stereo = np.repeat(indata, CHANNELS, axis=1) + else: + # Use only the first two channels, copy to avoid mutating input buffer. + stereo = np.array(indata[:, :CHANNELS], copy=True) - # Draw hand landmarks - mp.solutions.drawing_utils.draw_landmarks( - frame, hand_landmarks, mp_hands.HAND_CONNECTIONS) + stereo = stereo.astype(np.float32, copy=False) - cv2.imshow('Head & Hand Tracking', frame) - if cv2.waitKey(1) & 0xFF == ord('q'): - break + left_gain = 1.0 - self.pan + right_gain = 1.0 + self.pan + stereo[:, 0] *= left_gain + stereo[:, 1] *= right_gain + + stereo *= self.distance_factor -# Cleanup -cap.release() -cv2.destroyAllWindows() -stream.stop() -stream.close() + for i in range(len(stereo)): + delayed = self.reverb_buffer[self.reverb_index] + stereo[i] += delayed * REVERB_DECAY + self.reverb_buffer[self.reverb_index] = stereo[i] + self.reverb_index = (self.reverb_index + 1) % len(self.reverb_buffer) + + stereo *= self.master_volume + return stereo + + +def create_audio_stream(state: SpatialAudioState, input_device: int, output_device: int) -> sd.Stream: + """Create the audio stream with the spatial audio callback.""" + + def callback(indata, outdata, frames, time, status): # pylint: disable=unused-argument + if status: + logging.warning("Audio callback status: %s", status) + + if not len(indata): + outdata.fill(0) + return + + processed = state.process_audio_block(indata) + np.copyto(outdata, processed) + + return sd.Stream( + samplerate=SAMPLERATE, + blocksize=BLOCKSIZE, + device=(input_device, output_device), + channels=CHANNELS, + dtype="float32", + callback=callback, + ) + + +def try_open_camera(preferred_index: int) -> Tuple[cv2.VideoCapture, int]: + """Attempt to open the preferred webcam, falling back to the first available.""" + + tried_indices = [] + search_order = [preferred_index] + [idx for idx in range(5) if idx != preferred_index] + + for index in search_order: + cap = cv2.VideoCapture(index) + if cap.isOpened(): + logging.info("Using webcam index %d", index) + return cap, index + tried_indices.append(index) + cap.release() + + raise RuntimeError(f"Could not open any webcam. Tried indices: {tried_indices}") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Head-tracked spatial audio demo") + parser.add_argument("--input-device", type=int, default=DEFAULT_INPUT_DEVICE, help="Sounddevice input index") + parser.add_argument("--output-device", type=int, default=DEFAULT_OUTPUT_DEVICE, help="Sounddevice output index") + parser.add_argument("--camera", type=int, default=0, help="Preferred webcam index") + return parser.parse_args() + + +def main() -> None: + args = parse_args() + state = SpatialAudioState() + + try: + stream = create_audio_stream(state, args.input_device, args.output_device) + except Exception as exc: # noqa: BLE001 + raise RuntimeError("Unable to open the audio stream. Check your device indices and loopback setup.") from exc + + cap, camera_index = try_open_camera(args.camera) + + drawing_utils = mp.solutions.drawing_utils + drawing_spec = drawing_utils.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1) + + with contextlib.ExitStack() as stack: + stack.enter_context(stream) + stack.callback(cap.release) + stack.callback(cv2.destroyAllWindows) + + with ( + mp.solutions.face_mesh.FaceMesh(max_num_faces=1, refine_landmarks=True) as face_mesh, + mp.solutions.hands.Hands(max_num_hands=1, min_detection_confidence=0.7) as hands, + ): + logging.info( + "Spatial audio running (audio in=%d, audio out=%d, camera=%d). Press 'q' to quit.", + args.input_device, + args.output_device, + camera_index, + ) + + while True: + ret, frame = cap.read() + if not ret: + logging.warning("Failed to read frame from webcam") + break + + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + face_result = face_mesh.process(frame_rgb) + hand_result = hands.process(frame_rgb) + + if face_result.multi_face_landmarks: + face_landmarks = face_result.multi_face_landmarks[0] + state.update_pan_from_face(face_landmarks) + drawing_utils.draw_landmarks( + frame, + face_landmarks, + mp.solutions.face_mesh.FACEMESH_TESSELATION, + landmark_drawing_spec=None, + connection_drawing_spec=drawing_spec, + ) + + if hand_result.multi_hand_landmarks: + hand_landmarks = hand_result.multi_hand_landmarks[0] + state.update_volume_from_hand(hand_landmarks) + drawing_utils.draw_landmarks( + frame, + hand_landmarks, + mp.solutions.hands.HAND_CONNECTIONS, + drawing_spec, + drawing_spec, + ) + + cv2.putText( + frame, + f"Pan: {state.pan:+.2f} Vol: {state.master_volume:.2f} Dist: {state.distance_factor:.2f}", + (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, + 0.6, + (255, 255, 255), + 1, + cv2.LINE_AA, + ) + + cv2.imshow("Head & Hand Tracking", frame) + if cv2.waitKey(1) & 0xFF == ord("q"): + break + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + logging.info("Exiting on keyboard interrupt") + except Exception as exc: # noqa: BLE001 + logging.error("%s", exc) + raise