Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 208 additions & 82 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,112 +1,238 @@
"""Main entry-point for the Spatial Audio Studio demo."""

from __future__ import annotations

import argparse
import contextlib
import logging
from dataclasses import dataclass, field
from typing import Tuple

import cv2
import mediapipe as mp
import numpy as np
import sounddevice as sd


# Audio settings
BLOCKSIZE = 1024
SAMPLERATE = 44100
CHANNELS = 2

# Device indices from query_devices()
blackhole_input_device = 0
headphones_output_device = 1
# Default device indices from query_devices()
DEFAULT_INPUT_DEVICE = 0
DEFAULT_OUTPUT_DEVICE = 1

# Mediapipe setup
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(max_num_faces=1)
# Reverb constants
REVERB_DECAY = 0.35
REVERB_MULTIPLIER = 2

mp_hands = mp.solutions.hands
hands = mp_hands.Hands(max_num_hands=1, min_detection_confidence=0.7)

# Webcam
cap = cv2.VideoCapture(1)
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")

# Shared panning, distance values, and master volume
pan = 0.0
distance_factor = 1.0
master_volume = 0.8

# Reverb buffer (simple delay line)
reverb_buffer = np.zeros((BLOCKSIZE * 2, 2))
reverb_index = 0
REVERB_DECAY = 0.35
@dataclass
class SpatialAudioState:
"""Holds state shared between the tracker and the audio callback."""

pan: float = 0.0
distance_factor: float = 1.0
master_volume: float = 0.8
reverb_buffer: np.ndarray = field(
default_factory=lambda: np.zeros((BLOCKSIZE * REVERB_MULTIPLIER, CHANNELS), dtype=np.float32)
)
reverb_index: int = 0

# Audio callback
def audio_callback(indata, outdata, frames, time, status):
global pan, distance_factor, reverb_buffer, reverb_index, master_volume
if status:
print(status)

left = indata[:, 0] * (1 - pan)
right = indata[:, 1] * (1 + pan)
stereo = np.column_stack((left, right))
stereo *= distance_factor

for i in range(len(stereo)):
delayed = reverb_buffer[reverb_index]
stereo[i] += delayed * REVERB_DECAY
reverb_buffer[reverb_index] = stereo[i]
reverb_index = (reverb_index + 1) % len(reverb_buffer)

stereo *= master_volume
outdata[:] = stereo

# Start audio stream
stream = sd.Stream(
samplerate=SAMPLERATE,
blocksize=BLOCKSIZE,
device=(blackhole_input_device, headphones_output_device),
channels=CHANNELS,
callback=audio_callback
)
stream.start()

# Head and hand tracking loop
while True:
ret, frame = cap.read()
if not ret:
break
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

face_result = face_mesh.process(frame_rgb)
hand_result = hands.process(frame_rgb)

if face_result.multi_face_landmarks:
landmarks = face_result.multi_face_landmarks[0]
left_ear = landmarks.landmark[234]
right_ear = landmarks.landmark[454]
def update_pan_from_face(self, face_landmarks: mp.framework.formats.landmark_pb2.NormalizedLandmarkList) -> None:
"""Update panning and distance factor using the Face Mesh landmarks."""

left_ear = face_landmarks.landmark[234]
right_ear = face_landmarks.landmark[454]

dx = right_ear.x - left_ear.x
dy = right_ear.y - left_ear.y
yaw = np.arctan2(dy, dx) * (180.0 / np.pi)
yaw = np.degrees(np.arctan2(dy, dx))

alpha = 0.15
pan = alpha * np.clip(yaw / 30.0, -0.8, 0.8) + (1 - alpha) * pan
target_pan = np.clip(yaw / 30.0, -0.8, 0.8)
self.pan = alpha * target_pan + (1 - alpha) * self.pan

ear_distance = np.sqrt(dx**2 + dy**2)
distance_factor = np.clip((ear_distance - 0.05) * 8.0, 0.4, 1.0)
target_distance = np.clip((ear_distance - 0.05) * 8.0, 0.4, 1.0)
self.distance_factor = 0.2 * target_distance + 0.8 * self.distance_factor

if hand_result.multi_hand_landmarks:
hand_landmarks = hand_result.multi_hand_landmarks[0]
index_finger_tip = hand_landmarks.landmark[8] # index fingertip
def update_volume_from_hand(self, hand_landmarks: mp.framework.formats.landmark_pb2.NormalizedLandmarkList) -> None:
"""Update the master volume using the hand landmarks."""

# Map Y-position to volume: lower Y (hand up) = higher volume
index_finger_tip = hand_landmarks.landmark[8]
hand_y = index_finger_tip.y
volume_alpha = 0.2

new_volume = np.clip(1.2 - hand_y * 2, 0.2, 1.0)
master_volume = volume_alpha * new_volume + (1 - volume_alpha) * master_volume
volume_alpha = 0.2
self.master_volume = volume_alpha * new_volume + (1 - volume_alpha) * self.master_volume

def process_audio_block(self, indata: np.ndarray) -> np.ndarray:
"""Apply panning, distance attenuation, and reverb to the audio block."""

if indata.ndim == 1:
stereo = np.repeat(indata[:, np.newaxis], CHANNELS, axis=1)
elif indata.shape[1] == 1:
stereo = np.repeat(indata, CHANNELS, axis=1)
else:
# Use only the first two channels, copy to avoid mutating input buffer.
stereo = np.array(indata[:, :CHANNELS], copy=True)

# Draw hand landmarks
mp.solutions.drawing_utils.draw_landmarks(
frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)
stereo = stereo.astype(np.float32, copy=False)

cv2.imshow('Head & Hand Tracking', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
left_gain = 1.0 - self.pan
right_gain = 1.0 + self.pan
stereo[:, 0] *= left_gain
stereo[:, 1] *= right_gain

stereo *= self.distance_factor

# Cleanup
cap.release()
cv2.destroyAllWindows()
stream.stop()
stream.close()
for i in range(len(stereo)):
delayed = self.reverb_buffer[self.reverb_index]
stereo[i] += delayed * REVERB_DECAY
self.reverb_buffer[self.reverb_index] = stereo[i]
self.reverb_index = (self.reverb_index + 1) % len(self.reverb_buffer)

stereo *= self.master_volume
return stereo


def create_audio_stream(state: SpatialAudioState, input_device: int, output_device: int) -> sd.Stream:
"""Create the audio stream with the spatial audio callback."""

def callback(indata, outdata, frames, time, status): # pylint: disable=unused-argument
if status:
logging.warning("Audio callback status: %s", status)

if not len(indata):
outdata.fill(0)
return

processed = state.process_audio_block(indata)
np.copyto(outdata, processed)

return sd.Stream(
samplerate=SAMPLERATE,
blocksize=BLOCKSIZE,
device=(input_device, output_device),
channels=CHANNELS,
dtype="float32",
callback=callback,
)


def try_open_camera(preferred_index: int) -> Tuple[cv2.VideoCapture, int]:
"""Attempt to open the preferred webcam, falling back to the first available."""

tried_indices = []
search_order = [preferred_index] + [idx for idx in range(5) if idx != preferred_index]

for index in search_order:
cap = cv2.VideoCapture(index)
if cap.isOpened():
logging.info("Using webcam index %d", index)
return cap, index
tried_indices.append(index)
cap.release()

raise RuntimeError(f"Could not open any webcam. Tried indices: {tried_indices}")


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Head-tracked spatial audio demo")
parser.add_argument("--input-device", type=int, default=DEFAULT_INPUT_DEVICE, help="Sounddevice input index")
parser.add_argument("--output-device", type=int, default=DEFAULT_OUTPUT_DEVICE, help="Sounddevice output index")
parser.add_argument("--camera", type=int, default=0, help="Preferred webcam index")
return parser.parse_args()


def main() -> None:
args = parse_args()
state = SpatialAudioState()

try:
stream = create_audio_stream(state, args.input_device, args.output_device)
except Exception as exc: # noqa: BLE001
raise RuntimeError("Unable to open the audio stream. Check your device indices and loopback setup.") from exc

cap, camera_index = try_open_camera(args.camera)

drawing_utils = mp.solutions.drawing_utils
drawing_spec = drawing_utils.DrawingSpec(color=(0, 255, 0), thickness=1, circle_radius=1)

with contextlib.ExitStack() as stack:
stack.enter_context(stream)
stack.callback(cap.release)
stack.callback(cv2.destroyAllWindows)

with (
mp.solutions.face_mesh.FaceMesh(max_num_faces=1, refine_landmarks=True) as face_mesh,
mp.solutions.hands.Hands(max_num_hands=1, min_detection_confidence=0.7) as hands,
):
logging.info(
"Spatial audio running (audio in=%d, audio out=%d, camera=%d). Press 'q' to quit.",
args.input_device,
args.output_device,
camera_index,
)

while True:
ret, frame = cap.read()
if not ret:
logging.warning("Failed to read frame from webcam")
break

frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_result = face_mesh.process(frame_rgb)
hand_result = hands.process(frame_rgb)

if face_result.multi_face_landmarks:
face_landmarks = face_result.multi_face_landmarks[0]
state.update_pan_from_face(face_landmarks)
drawing_utils.draw_landmarks(
frame,
face_landmarks,
mp.solutions.face_mesh.FACEMESH_TESSELATION,
landmark_drawing_spec=None,
connection_drawing_spec=drawing_spec,
)

if hand_result.multi_hand_landmarks:
hand_landmarks = hand_result.multi_hand_landmarks[0]
state.update_volume_from_hand(hand_landmarks)
drawing_utils.draw_landmarks(
frame,
hand_landmarks,
mp.solutions.hands.HAND_CONNECTIONS,
drawing_spec,
drawing_spec,
)

cv2.putText(
frame,
f"Pan: {state.pan:+.2f} Vol: {state.master_volume:.2f} Dist: {state.distance_factor:.2f}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
1,
cv2.LINE_AA,
)

cv2.imshow("Head & Hand Tracking", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break


if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("Exiting on keyboard interrupt")
except Exception as exc: # noqa: BLE001
logging.error("%s", exc)
raise
Loading