diff --git a/data_juicer/config/config_all.yaml b/data_juicer/config/config_all.yaml index a07e5bdecc..03f2ec3f39 100644 --- a/data_juicer/config/config_all.yaml +++ b/data_juicer/config/config_all.yaml @@ -529,6 +529,35 @@ process: overlap_len: 200 # Overlap length of the split texts if not split in the split pattern. tokenizer: 'gpt-4o' # The tokenizer name of Hugging Face tokenizers. The text length will be calculate as the token num if it is offered. Otherwise, the text length equals to string length. trust_remote_code: True # for loading huggingface model. + - vggt_mapper: # Input a video of a single scene, and use VGGT to extract information including Camera Pose, Depth Maps, Point Maps, and 3D Point Tracks. + vggt_model_path: "facebook/VGGT-1B" # the path to the VGGT model. + frame_num: 3 # the number of frames to be extracted uniformly from the video. If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration. + duration: 4 # the duration of each segment in seconds. If 0, frames are extracted from the entire video. If duration > 0, the video is segmented into multiple segments based on duration, and frames are extracted from each segment. + tag_field_name: 'vggt_tags' # the field name to store the tags. It's "vggt_tags" in default. + frame_dir: None # Output directory to save extracted frames. + if_output_camera_parameters: True # Determines whether to output camera parameters. + if_output_depth_maps: True # Determines whether to output depth maps. + if_output_point_maps_from_projection: True # Determines whether to output point maps directly inferred by VGGT. + if_output_point_maps_from_unprojection: True # Determines whether to output point maps constructed from depth maps and camera parameters. + if_output_point_tracks: True # Determines whether to output point tracks. + - video_camera_calibration_static_deepcalib_mapper: # Compute the camera intrinsics and field of view (FOV) for a static camera using DeepCalib. + model_path: "weights_10_0.02.h5" # The path to the DeepCalib Regression model. + frame_num: 3 # the number of frames to be extracted uniformly from the video. If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration. + duration: 0 # the duration of each segment in seconds. If 0, frames are extracted from the entire video. If duration > 0, the video is segmented into multiple segments based on duration, and frames are extracted from each segment. + tag_field_name: 'static_camera_calibration_deepcalib_tags' # the field name to store the tags. It's "static_camera_calibration_deepcalib_tags" in default. + frame_dir: None # Output directory to save extracted frames. + output_info_dir: None # Output directory for saving camera parameters. + - video_camera_calibration_static_moge_mapper: # Compute the camera intrinsics and field of view (FOV) for a static camera using Moge-2 (more accurate than DeepCalib). + model_path: "Ruicheng/moge-2-vitl" # The path to the Moge-2 model. + frame_num: 3 # the number of frames to be extracted uniformly from the video. If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration. + duration: 0 # the duration of each segment in seconds. If 0, frames are extracted from the entire video. If duration > 0, the video is segmented into multiple segments based on duration, and frames are extracted from each segment. + tag_field_name: 'static_camera_calibration_moge_tags' # the field name to store the tags. It's "static_camera_calibration_moge_tags" in default. + frame_dir: None # Output directory to save extracted frames. + if_output_info: True # Whether to save the camera parameters results to an JSON file. + output_info_dir: None # Output directory for saving camera parameters. + if_output_points_info: True # Determines whether to output point map in OpenCV camera coordinate system (x right, y down, z forward). + if_output_depth_info: True # Determines whether to output depth maps. + if_output_mask_info: True # Determines whether to output a binary mask for valid pixels. - video_captioning_from_audio_mapper: # caption a video according to its audio streams based on Qwen-Audio model keep_original_sample: true # whether to keep the original sample. If it's set to False, there will be only captioned sample in the final datasets and the original sample will be removed. It's True in default. memory: '30GB' # This operation (Op) utilizes deep neural network models that consume a significant amount of memory for computation, hence the system's available memory might constrain the maximum number of processes that can be launched @@ -633,6 +662,20 @@ process: frame_num: 3 # the number of frames to be extracted uniformly from the video. Only works when frame_sampling_method is "uniform". If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration. tag_field_name: 'video_frame_tags' # the key name in the meta field to store the tags. It's "video_frame_tags" in default. memory: '9GB' + - video_undistort_mapper: # Undistort raw videos with corresponding camera intrinsics and distortion coefficients. + output_video_dir: None # Output directory to save undistorted videos. + tag_field_name: 'video_undistortion_tags' # The field name to store the tags. It's "video_undistortion_tags" in default. + batch_size_each_video: 1000 # Number of frames to process and save per temporary TS file batch. + crf: 22 # Constant Rate Factor (CRF) for FFmpeg encoding quality. + - video_whole_body_pose_estimation_mapper: # Input a video containing people, and use the DWPose model to extract the body, hand, feet, and face keypoints of the human subjects in the video, i.e., 2D Whole-body Pose Estimation. + onnx_det_model: 'yolox_l.onnx' # The path to 'yolox_l.onnx'. + onnx_pose_model: 'dw-ll_ucoco_384.onnx' # The path to 'dw-ll_ucoco_384.onnx'. + frame_num: 3 # the number of frames to be extracted uniformly from the video. If it's 1, only the middle frame will be extracted. If it's 2, only the first and the last frames will be extracted. If it's larger than 2, in addition to the first and the last frames, other frames will be extracted uniformly within the video duration. + duration: 0 # the duration of each segment in seconds. If 0, frames are extracted from the entire video. If duration > 0, the video is segmented into multiple segments based on duration, and frames are extracted from each segment. + tag_field_name: 'pose_estimation_tags' # the field name to store the tags. It's "pose_estimation_tags" in default. + frame_dir: None # Output directory to save extracted frames. + if_save_visualization: False # Whether to save visualization results. + save_visualization_dir: None # The path for saving visualization results. - whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace. # Filter ops diff --git a/data_juicer/ops/mapper/__init__.py b/data_juicer/ops/mapper/__init__.py index 82d7d29c41..0264b26c36 100644 --- a/data_juicer/ops/mapper/__init__.py +++ b/data_juicer/ops/mapper/__init__.py @@ -82,6 +82,12 @@ from .text_chunk_mapper import TextChunkMapper from .text_tagging_by_prompt_mapper import TextTaggingByPromptMapper from .vggt_mapper import VggtMapper +from .video_camera_calibration_static_deepcalib_mapper import ( + VideoCameraCalibrationStaticDeepcalibMapper, +) +from .video_camera_calibration_static_moge_mapper import ( + VideoCameraCalibrationStaticMogeMapper, +) from .video_captioning_from_audio_mapper import VideoCaptioningFromAudioMapper from .video_captioning_from_frames_mapper import VideoCaptioningFromFramesMapper from .video_captioning_from_summarizer_mapper import VideoCaptioningFromSummarizerMapper @@ -101,6 +107,7 @@ from .video_split_by_scene_mapper import VideoSplitBySceneMapper from .video_tagging_from_audio_mapper import VideoTaggingFromAudioMapper from .video_tagging_from_frames_mapper import VideoTaggingFromFramesMapper +from .video_undistort_mapper import VideoUndistortMapper from .video_whole_body_pose_estimation_mapper import VideoWholeBodyPoseEstimationMapper from .whitespace_normalization_mapper import WhitespaceNormalizationMapper @@ -183,6 +190,8 @@ "TextChunkMapper", "TextTaggingByPromptMapper", "VggtMapper", + "VideoCameraCalibrationStaticDeepcalibMapper", + "VideoCameraCalibrationStaticMogeMapper", "VideoCaptioningFromAudioMapper", "VideoCaptioningFromFramesMapper", "VideoCaptioningFromSummarizerMapper", @@ -202,6 +211,7 @@ "VideoSplitBySceneMapper", "VideoTaggingFromAudioMapper", "VideoTaggingFromFramesMapper", + "VideoUndistortMapper", "VideoWholeBodyPoseEstimationMapper", "WhitespaceNormalizationMapper", ] diff --git a/data_juicer/ops/mapper/video_camera_calibration_static_deepcalib_mapper.py b/data_juicer/ops/mapper/video_camera_calibration_static_deepcalib_mapper.py new file mode 100644 index 0000000000..5bac79d148 --- /dev/null +++ b/data_juicer/ops/mapper/video_camera_calibration_static_deepcalib_mapper.py @@ -0,0 +1,180 @@ +import json +import os + +import numpy as np +from pydantic import PositiveInt + +import data_juicer +from data_juicer.ops.load import load_ops +from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE +from data_juicer.utils.constant import Fields, MetaKeys +from data_juicer.utils.lazy_loader import LazyLoader +from data_juicer.utils.mm_utils import SpecialTokens +from data_juicer.utils.model_utils import get_model, prepare_model + +from ..base_op import OPERATORS, Mapper +from ..op_fusion import LOADED_VIDEOS + +OP_NAME = "video_camera_calibration_static_deepcalib_mapper" + +cv2 = LazyLoader("cv2", "opencv-python") + + +@OPERATORS.register_module(OP_NAME) +@LOADED_VIDEOS.register_module(OP_NAME) +class VideoCameraCalibrationStaticDeepcalibMapper(Mapper): + """Compute the camera intrinsics and field of view (FOV) + for a static camera using DeepCalib.""" + + _accelerator = "cuda" + + def __init__( + self, + model_path: str = "weights_10_0.02.h5", + frame_num: PositiveInt = 3, + duration: float = 0, + tag_field_name: str = MetaKeys.static_camera_calibration_deepcalib_tags, + frame_dir: str = DATA_JUICER_ASSETS_CACHE, + output_info_dir: str = DATA_JUICER_ASSETS_CACHE, + *args, + **kwargs, + ): + """ + Initialization method. + + :param model_path: The path to the DeepCalib Regression model. + :param frame_num: The number of frames to be extracted uniformly from + the video. If it's 1, only the middle frame will be extracted. If + it's 2, only the first and the last frames will be extracted. If + it's larger than 2, in addition to the first and the last frames, + other frames will be extracted uniformly within the video duration. + If "duration" > 0, frame_num is the number of frames per segment. + :param duration: The duration of each segment in seconds. + If 0, frames are extracted from the entire video. + If duration > 0, the video is segmented into multiple segments + based on duration, and frames are extracted from each segment. + :param tag_field_name: The field name to store the tags. It's + "static_camera_calibration_deepcalib_tags" in default. + :param frame_dir: Output directory to save extracted frames. + :param output_info_dir: Output directory for saving camera parameters. + :param args: extra args + :param kwargs: extra args + + """ + + super().__init__(*args, **kwargs) + + LazyLoader.check_packages(["tensorflow"]) + import keras + from keras.applications.imagenet_utils import preprocess_input + + self.keras = keras + self.preprocess_input = preprocess_input + + self.video_extract_frames_mapper_args = { + "frame_sampling_method": "uniform", + "frame_num": frame_num, + "duration": duration, + "frame_dir": frame_dir, + "frame_key": MetaKeys.video_frames, + } + self.fused_ops = load_ops([{"video_extract_frames_mapper": self.video_extract_frames_mapper_args}]) + self.model_key = prepare_model(model_type="deepcalib", model_path=model_path) + + self.frame_num = frame_num + self.duration = duration + self.tag_field_name = tag_field_name + self.frame_dir = frame_dir + self.output_info_dir = output_info_dir + self.INPUT_SIZE = 299 + self.focal_start = 40 + self.focal_end = 500 + + def process_single(self, sample=None, rank=None): + + # check if it's generated already + if self.tag_field_name in sample[Fields.meta]: + return sample + + # there is no video in this sample + if self.video_key not in sample or not sample[self.video_key]: + return [] + + # load videos + ds_list = [{"text": SpecialTokens.video, "videos": sample[self.video_key]}] + + dataset = data_juicer.core.data.NestedDataset.from_list(ds_list) + dataset = self.fused_ops[0].run(dataset) + + frames_root = os.path.join(self.frame_dir, os.path.splitext(os.path.basename(sample[self.video_key][0]))[0]) + frame_names = os.listdir(frames_root) + frames_path = sorted([os.path.join(frames_root, frame_name) for frame_name in frame_names]) + model = get_model(self.model_key, rank, self.use_cuda()) + + final_k_list = [] + final_xi_list = [] + final_hfov_list = [] + final_vfov_list = [] + + for i, path in enumerate(frames_path): + image = cv2.imread(path) + height, width, channels = image.shape + + image = cv2.resize(image, (self.INPUT_SIZE, self.INPUT_SIZE)) + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + image = image / 255.0 + image = image - 0.5 + image = image * 2.0 + image = np.expand_dims(image, 0) + + image = self.preprocess_input(image) + + prediction = model.predict(image) + prediction_focal = prediction[0] + prediction_dist = prediction[1] + + # Scale the focal length based on the original width of the image. + curr_focal_pred = ( + (prediction_focal[0][0] * (self.focal_end + 1.0 - self.focal_start * 1.0) + self.focal_start * 1.0) + * (width * 1.0) + / (self.INPUT_SIZE * 1.0) + ) + curr_focal_pred = curr_focal_pred.item() + + # Following DeepCalib's official codes + curr_dist_pred = prediction_dist[0][0] * 1.2 + curr_dist_pred = curr_dist_pred.item() + + temp_k = [[curr_focal_pred, 0, width / 2], [0, curr_focal_pred, height / 2], [0, 0, 1]] + temp_xi = curr_dist_pred + + temp_hfov = 2 * np.arctan(width / 2 / curr_focal_pred) # rad + temp_vfov = 2 * np.arctan(height / 2 / curr_focal_pred) + + temp_hfov = temp_hfov.item() + temp_vfov = temp_vfov.item() + + final_k_list.append(temp_k) + final_xi_list.append(temp_xi) + final_hfov_list.append(temp_hfov) + final_vfov_list.append(temp_vfov) + + sample[Fields.meta][self.tag_field_name] = { + "frames_folder": frames_root, + "frame_names": frame_names, + "intrinsics_list": final_k_list, + "xi_list": final_xi_list, + "hfov_list": final_hfov_list, + "vfov_list": final_vfov_list, + } + + os.makedirs(self.output_info_dir, exist_ok=True) + with open( + os.path.join( + self.output_info_dir, os.path.splitext(os.path.basename(sample[self.video_key][0]))[0] + ".json" + ), + "w", + ) as f: + json.dump(sample[Fields.meta][self.tag_field_name], f) + + return sample diff --git a/data_juicer/ops/mapper/video_camera_calibration_static_moge_mapper.py b/data_juicer/ops/mapper/video_camera_calibration_static_moge_mapper.py new file mode 100644 index 0000000000..3825b1e901 --- /dev/null +++ b/data_juicer/ops/mapper/video_camera_calibration_static_moge_mapper.py @@ -0,0 +1,191 @@ +import json +import os + +import numpy as np +from pydantic import PositiveInt + +import data_juicer +from data_juicer.ops.load import load_ops +from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE +from data_juicer.utils.constant import Fields, MetaKeys +from data_juicer.utils.lazy_loader import LazyLoader +from data_juicer.utils.mm_utils import SpecialTokens +from data_juicer.utils.model_utils import get_model, prepare_model + +from ..base_op import OPERATORS, Mapper +from ..op_fusion import LOADED_VIDEOS + +OP_NAME = "video_camera_calibration_static_moge_mapper" + +cv2 = LazyLoader("cv2", "opencv-python") +torch = LazyLoader("torch") + + +@OPERATORS.register_module(OP_NAME) +@LOADED_VIDEOS.register_module(OP_NAME) +class VideoCameraCalibrationStaticMogeMapper(Mapper): + """Compute the camera intrinsics and field of view (FOV) + for a static camera using Moge-2 (more accurate + than DeepCalib).""" + + _accelerator = "cuda" + + def __init__( + self, + model_path: str = "Ruicheng/moge-2-vitl", + frame_num: PositiveInt = 3, + duration: float = 0, + tag_field_name: str = MetaKeys.static_camera_calibration_moge_tags, + frame_dir: str = DATA_JUICER_ASSETS_CACHE, + if_output_info: bool = True, + output_info_dir: str = DATA_JUICER_ASSETS_CACHE, + if_output_points_info: bool = True, + if_output_depth_info: bool = True, + if_output_mask_info: bool = True, + *args, + **kwargs, + ): + """ + Initialization method. + + :param model_path: The path to the Moge-2 model. + :param frame_num: The number of frames to be extracted uniformly from + the video. If it's 1, only the middle frame will be extracted. If + it's 2, only the first and the last frames will be extracted. If + it's larger than 2, in addition to the first and the last frames, + other frames will be extracted uniformly within the video duration. + If "duration" > 0, frame_num is the number of frames per segment. + :param duration: The duration of each segment in seconds. + If 0, frames are extracted from the entire video. + If duration > 0, the video is segmented into multiple segments + based on duration, and frames are extracted from each segment. + :param tag_field_name: The field name to store the tags. It's + "static_camera_calibration_moge_tags" in default. + :param frame_dir: Output directory to save extracted frames. + :param if_output_info: Whether to save the camera parameters results + to an JSON file. + :param output_info_dir: Output directory for saving camera parameters. + :param if_output_points_info: Determines whether to output point map + in OpenCV camera coordinate system (x right, y down, z forward). + For MoGe-2, the point map is in metric scale. + :param if_output_depth_info: Determines whether to output + depth maps. + :param if_output_mask_info: Determines whether to output a + binary mask for valid pixels. + :param args: extra args + :param kwargs: extra args + + """ + + super().__init__(*args, **kwargs) + + self.video_extract_frames_mapper_args = { + "frame_sampling_method": "uniform", + "frame_num": frame_num, + "duration": duration, + "frame_dir": frame_dir, + "frame_key": MetaKeys.video_frames, + } + self.fused_ops = load_ops([{"video_extract_frames_mapper": self.video_extract_frames_mapper_args}]) + self.model_key = prepare_model(model_type="moge", model_path=model_path) + + self.frame_num = frame_num + self.duration = duration + self.tag_field_name = tag_field_name + self.frame_dir = frame_dir + self.output_info_dir = output_info_dir + self.if_output_points_info = if_output_points_info + self.if_output_depth_info = if_output_depth_info + self.if_output_mask_info = if_output_mask_info + self.if_output_info = if_output_info + + def process_single(self, sample=None, rank=None): + + # check if it's generated already + if self.tag_field_name in sample[Fields.meta]: + return sample + + # there is no video in this sample + if self.video_key not in sample or not sample[self.video_key]: + return [] + + # load videos + ds_list = [{"text": SpecialTokens.video, "videos": sample[self.video_key]}] + + dataset = data_juicer.core.data.NestedDataset.from_list(ds_list) + dataset = self.fused_ops[0].run(dataset) + + frames_root = os.path.join(self.frame_dir, os.path.splitext(os.path.basename(sample[self.video_key][0]))[0]) + frame_names = os.listdir(frames_root) + frames_path = sorted([os.path.join(frames_root, frame_name) for frame_name in frame_names]) + model = get_model(self.model_key, rank, self.use_cuda()) + + final_k_list = [] + final_hfov_list = [] + final_vfov_list = [] + final_points_list = [] + final_depth_list = [] + final_mask_list = [] + + if rank is not None: + device = f"cuda:{rank}" if self.use_cuda() else "cpu" + else: + device = "cuda" if self.use_cuda() else "cpu" + + for i, path in enumerate(frames_path): + + input_image = cv2.cvtColor(cv2.imread(path), cv2.COLOR_BGR2RGB) + height, width, channels = input_image.shape + input_image = torch.tensor(input_image / 255, dtype=torch.float32, device=device).permute(2, 0, 1) + + output = model.infer(input_image) + + points = output["points"].cpu().tolist() + depth = output["depth"].cpu().tolist() + mask = output["mask"].cpu().tolist() + intrinsics = output["intrinsics"].cpu().tolist() + + temp_k = [ + [intrinsics[0][0] * width, 0, intrinsics[0][2] * width], + [0, intrinsics[1][1] * height, intrinsics[1][2] * height], + [0, 0, 1], + ] + + temp_hfov = 2 * np.arctan(1 / 2 / intrinsics[0][0]) # rad + temp_vfov = 2 * np.arctan(1 / 2 / intrinsics[1][1]) + + final_k_list.append(temp_k) + final_hfov_list.append(temp_hfov) + final_vfov_list.append(temp_vfov) + + if self.if_output_points_info: + final_points_list.append(points) + + if self.if_output_depth_info: + final_depth_list.append(depth) + + if self.if_output_mask_info: + final_mask_list.append(mask) + + sample[Fields.meta][self.tag_field_name] = { + "frames_folder": frames_root, + "frame_names": frame_names, + "intrinsics_list": final_k_list, + "hfov_list": final_hfov_list, + "vfov_list": final_vfov_list, + "points_list": final_points_list, + "depth_list": final_depth_list, + "mask_list": final_mask_list, + } + + if self.if_output_info: + os.makedirs(self.output_info_dir, exist_ok=True) + with open( + os.path.join( + self.output_info_dir, os.path.splitext(os.path.basename(sample[self.video_key][0]))[0] + ".json" + ), + "w", + ) as f: + json.dump(sample[Fields.meta][self.tag_field_name], f) + + return sample diff --git a/data_juicer/ops/mapper/video_hand_reconstruction_mapper.py b/data_juicer/ops/mapper/video_hand_reconstruction_mapper.py index eeb5d4310a..4308531816 100644 --- a/data_juicer/ops/mapper/video_hand_reconstruction_mapper.py +++ b/data_juicer/ops/mapper/video_hand_reconstruction_mapper.py @@ -89,7 +89,7 @@ def __init__( super().__init__(*args, **kwargs) - LazyLoader.check_packages(["chumpy @ git+https://github.com/mattloper/chumpy"]) + LazyLoader.check_packages(["chumpy@ git+https://github.com/mattloper/chumpy"]) LazyLoader.check_packages(["smplx==0.1.28", "yacs", "timm", "pyrender", "pytorch_lightning"]) LazyLoader.check_packages(["scikit-image"], pip_args=["--no-deps"]) diff --git a/data_juicer/ops/mapper/video_undistort_mapper.py b/data_juicer/ops/mapper/video_undistort_mapper.py new file mode 100644 index 0000000000..6c300399dc --- /dev/null +++ b/data_juicer/ops/mapper/video_undistort_mapper.py @@ -0,0 +1,210 @@ +import os +import subprocess + +import numpy as np + +from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE +from data_juicer.utils.constant import Fields, MetaKeys +from data_juicer.utils.lazy_loader import LazyLoader + +from ..base_op import OPERATORS, Mapper +from ..op_fusion import LOADED_VIDEOS + +OP_NAME = "video_undistort_mapper" + +cv2 = LazyLoader("cv2", "opencv-python") +ffmpeg = LazyLoader("ffmpeg", "ffmpeg-python") + + +@OPERATORS.register_module(OP_NAME) +@LOADED_VIDEOS.register_module(OP_NAME) +class VideoUndistortMapper(Mapper): + """Undistort raw videos with corresponding camera intrinsics + and distortion coefficients.""" + + def __init__( + self, + output_video_dir: str = DATA_JUICER_ASSETS_CACHE, + tag_field_name: str = MetaKeys.video_undistortion_tags, + batch_size_each_video: int = 1000, + crf: int = 22, + *args, + **kwargs, + ): + """ + Initialization method. + + :param output_video_dir: Output directory to save undistorted videos. + :param tag_field_name: The field name to store the tags. It's + "video_undistortion_tags" in default. + :param batch_size_each_video: Number of frames to process and save per + temporary TS file batch. + :param crf: Constant Rate Factor (CRF) for FFmpeg encoding quality. + :param args: extra args + :param kwargs: extra args + + """ + + super().__init__(*args, **kwargs) + + opencv_contrib_python_exist_code = subprocess.run(["pip", "show", "opencv-contrib-python"]) + # This is written to fix version issues with Numpy + if opencv_contrib_python_exist_code.returncode == 1: # not exist + LazyLoader.check_packages(["opencv-contrib-python"]) + subprocess.run(["pip", "install", "numpy==1.26.4"], check=True) + + self.output_video_dir = output_video_dir + self.tag_field_name = tag_field_name + self.batch_size_each_video = batch_size_each_video + self.crf = crf + + def concatenate_ts_files(self, folder, video_name, batch_counts): + """Concatenate batch TS files into final mp4.""" + inputs_path = os.path.join(folder, "inputs.txt") + + # Create a file list for ffmpeg + with open(inputs_path, "w") as f: + for i in range(batch_counts): + f.write(f"file '{video_name}_b{i:04d}.ts'\n") + + # Merge using ffmpeg concat demuxer + ffmpeg.input(inputs_path, format="concat", safe=0).output( + os.path.join(folder, f"{video_name}.mp4"), c="copy" + ).run() + + # Cleanup temporary TS files and list file + for i in range(batch_counts): + os.remove(os.path.join(folder, f"{video_name}_b{i:04d}.ts")) + os.remove(inputs_path) + + def create_ffmpeg_writer(self, output_path, width, height, fps, crf): + """Spawn an ffmpeg async encoding process for writing raw frames.""" + return ( + ffmpeg.output( + ffmpeg.input( + "pipe:0", + format="rawvideo", + pix_fmt="rgb24", + s=f"{width}x{height}", + r=fps, + ), + output_path, + **{ + "preset": "medium", + "pix_fmt": "yuv420p", + "b:v": "0", + "c:v": "libx264", + "crf": str(crf), + "r": fps, + }, + ) + .overwrite_output() + .run_async(quiet=True, pipe_stdin=True) + ) + + def process_single(self, sample, context=False): + + # check if it's generated already + if self.tag_field_name in sample[Fields.meta]: + return sample + + # there is no videos in this sample + if self.video_key not in sample or not sample[self.video_key]: + return [] + + cap = cv2.VideoCapture(sample[self.video_key][0]) + video_name = os.path.splitext(os.path.basename(sample[self.video_key][0]))[0] + + # Get video properties + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + fps = cap.get(cv2.CAP_PROP_FPS) + + K = sample["intrinsics"] # 3x3 camera intrinsics. + D = sample[ + "distortion_coefficients" + ] # Distortion coefficients (k1,k2,p1,p2). If D is None then zero distortion is used. + xi = sample["xi"] # The parameter xi for CMei's model. + R = sample[ + "rotation_matrix" + ] # Rotation transform between the original and object space. If it is None, there is no rotation. + new_K = sample["intrinsics_new"] # New camera intrinsics. if new_K is empty then identity intrinsics are used. + + K = np.array(K, dtype=np.float32) + xi = np.array(xi, dtype=np.float32) + + if D is None: + D = np.array([0, 0, 0, 0], dtype=np.float32) + else: + D = np.array(D, dtype=np.float32) + + if R is None: + R = np.eye(3) + else: + R = np.array(R, dtype=np.float32) + + if new_K is None: + new_K = K + else: + new_K = np.array(new_K, dtype=np.float32) + + map1, map2 = cv2.omnidir.initUndistortRectifyMap( + K, D, xi, R, new_K, (width, height), cv2.CV_16SC2, cv2.omnidir.RECTIFY_PERSPECTIVE + ) + + # Initialize the first batch ffmpeg writer + os.makedirs(self.output_video_dir, exist_ok=True) + batch_number = 0 + writer = self.create_ffmpeg_writer( + os.path.join(self.output_video_dir, f"{video_name}_b{batch_number:04d}.ts"), width, height, fps, self.crf + ) + + idx = 0 + # Read and process frames + while True: + ret, frame = cap.read() + if not ret: + # End of video stream: close the last writer + writer.stdin.close() + writer.wait() + break + + # Undistort the frame + undistorted_frame = cv2.remap( + frame, map1, map2, interpolation=cv2.INTER_CUBIC, borderMode=cv2.BORDER_CONSTANT + ) + + # Convert BGR to RGB before writing to ffmpeg (FFmpeg expects RGB) + undistorted_frame = cv2.cvtColor(undistorted_frame, cv2.COLOR_BGR2RGB) + + # Write to ffmpeg stdin + writer.stdin.write(undistorted_frame.tobytes()) + + # Check if the current batch is complete (for idx + 1) + if (idx + 1) % self.batch_size_each_video == 0: + # Finalize the current batch writer + writer.stdin.close() + writer.wait() + + # Start the next batch writer + batch_number += 1 + writer = self.create_ffmpeg_writer( + os.path.join(self.output_video_dir, f"{video_name}_b{batch_number:04d}.ts"), + width, + height, + fps, + self.crf, + ) + + idx += 1 + + cap.release() + + # Merge all temporary TS chunks into the final MP4 file + self.concatenate_ts_files(self.output_video_dir, video_name, batch_number + 1) + + sample[Fields.meta][self.tag_field_name] = { + "new_video_path": os.path.join(self.output_video_dir, f"{video_name}.mp4") + } + + return sample diff --git a/data_juicer/utils/constant.py b/data_juicer/utils/constant.py index 7ee08e75fc..d82d58056a 100644 --- a/data_juicer/utils/constant.py +++ b/data_juicer/utils/constant.py @@ -85,6 +85,12 @@ class MetaKeys(object): # # pose information pose_info = "pose_info" + # # Static Camera Calibration Info (for DeepCalib) + static_camera_calibration_deepcalib_tags = "static_camera_calibration_deepcalib_tags" + # # Static Camera Calibration Info (for Moge-2) + static_camera_calibration_moge_tags = "static_camera_calibration_moge_tags" + # # Video Undistortion Info + video_undistortion_tags = "video_undistortion_tags" # === info extraction related tags === # # for event extraction diff --git a/data_juicer/utils/model_utils.py b/data_juicer/utils/model_utils.py index 547e1370fa..e748f83e1b 100644 --- a/data_juicer/utils/model_utils.py +++ b/data_juicer/utils/model_utils.py @@ -419,6 +419,60 @@ def get_processor(): return (client, processor) +def prepare_deepcalib_model(model_path, **model_params): + + device = model_params.pop("device", None) + if device is None: + raise ValueError("video_camera_calibration_static_deepcalib_mapper currently supports GPU usage only.") + device = device.replace("cuda", "/gpu") + + if not os.path.exists(model_path): + LazyLoader.check_packages(["gdown"]) + deepcalib_folder = os.path.join(DJMC, "deepcalib") + deepcalib_model_path = os.path.join(deepcalib_folder, "Regression", "Single_net", "weights_10_0.02.h5") + os.makedirs(deepcalib_folder, exist_ok=True) + + if not os.path.exists(deepcalib_model_path): + + deepcalib_zip_path = os.path.join(DJMC, "deepcalib_weights.zip") + subprocess.run(["gdown", "1TYZn-f2z7O0hp_IZnNfZ06ExgU9ii70T", "-O", deepcalib_zip_path], check=True) + + import zipfile + + zip_file = zipfile.ZipFile(deepcalib_zip_path) + for names in zip_file.namelist(): + zip_file.extract(names, deepcalib_folder) + zip_file.close() + + model_path = deepcalib_model_path + + LazyLoader.check_packages(["tensorflow"]) + import tensorflow as tf + from keras.applications.inception_v3 import InceptionV3 + from keras.layers import Dense, Flatten, Input + from keras.models import Model + + gpus = tf.config.list_physical_devices("GPU") + tf.config.set_visible_devices(gpus[int(device.split(":")[-1])], "GPU") + tf.config.experimental.set_memory_growth(gpus[int(device.split(":")[-1])], True) + with tf.device(device): + input_shape = (299, 299, 3) + main_input = Input(shape=input_shape, dtype="float32", name="main_input") + phi_model = InceptionV3(weights="imagenet", include_top=False, input_tensor=main_input, input_shape=input_shape) + phi_features = phi_model.output + phi_flattened = Flatten(name="phi-flattened")(phi_features) + final_output_focal = Dense(1, activation="sigmoid", name="output_focal")(phi_flattened) + final_output_distortion = Dense(1, activation="sigmoid", name="output_distortion")(phi_flattened) + + for layer in phi_model.layers: + layer.name = layer.name + "_phi" + + model = Model(inputs=main_input, outputs=[final_output_focal, final_output_distortion]) + model.load_weights(model_path) + + return model + + def prepare_diffusion_model(pretrained_model_name_or_path, diffusion_type, **model_params): """ Prepare and load an Diffusion model from HuggingFace. @@ -592,6 +646,18 @@ def prepare_kenlm_model(lang, name_pattern="{}.arpa.bin", **model_params): return kenlm_model +def prepare_moge_model(model_path, **model_params): + + device = model_params.pop("device", "cpu") + + LazyLoader.check_packages(["moge@ git+https://github.com/microsoft/MoGe.git"]) + from moge.model.v2 import MoGeModel + + model = MoGeModel.from_pretrained(model_path).to(device) + + return model + + def prepare_nltk_model(lang, name_pattern="punkt.{}.pickle", **model_params): """ Prepare and load a nltk punkt model with enhanced resource handling. @@ -1517,12 +1583,14 @@ def _download_model(local_dir): MODEL_FUNCTION_MAPPING = { "api": prepare_api_model, + "deepcalib": prepare_deepcalib_model, "diffusion": prepare_diffusion_model, "dwpose": prepare_dwpose_model, "fasttext": prepare_fasttext_model, "fastsam": prepare_fastsam_model, "huggingface": prepare_huggingface_model, "kenlm": prepare_kenlm_model, + "moge": prepare_moge_model, "nltk": prepare_nltk_model, "nltk_pos_tagger": prepare_nltk_pos_tagger, "opencv_classifier": prepare_opencv_classifier, diff --git a/docs/Operators.md b/docs/Operators.md index 090337be92..3364995ad7 100644 --- a/docs/Operators.md +++ b/docs/Operators.md @@ -46,7 +46,7 @@ Data-Juicer 中的算子分为以下 8 种类型。 | [filter](#filter) | 56 | Filters out low-quality samples. 过滤低质量样本。 | | [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 | | [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 | -| [mapper](#mapper) | 98 | Edits and transforms samples. 对数据样本进行编辑和转换。 | +| [mapper](#mapper) | 101 | Edits and transforms samples. 对数据样本进行编辑和转换。 | | [pipeline](#pipeline) | 3 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 | | [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 | @@ -257,6 +257,8 @@ All the specific operators are listed below, each featured with several capabili | text_chunk_mapper | 🔤Text 💻CPU 🔗API 🟢Stable | Split input text into chunks based on specified criteria. 根据指定的条件将输入文本拆分为块。 | [info](operators/mapper/text_chunk_mapper.md) | - | | text_tagging_by_prompt_mapper | 🔤Text 🚀GPU 🌊vLLM 🧩HF 🟡Beta | Mapper to generate text tags using prompt with LLM. Mapper使用带有LLM的prompt生成文本标记。 | - | - | | vggt_mapper | 🎬Video 🚀GPU 🟡Beta | Input a video of a single scene, and use VGGT to extract information including Camera Pose, Depth Maps, Point Maps, and 3D Point Tracks. 输入单个场景的视频,并使用VGGT提取包括相机姿态、深度图、点图和3D点轨迹的信息。 | [info](operators/mapper/vggt_mapper.md) | - | +| video_camera_calibration_static_deepcalib_mapper | 🎬Video 🚀GPU 🟡Beta | Compute the camera intrinsics and field of view (FOV) for a static camera using DeepCalib. 使用DeepCalib计算静态摄像机的摄像机内部和视场 (FOV)。 | - | - | +| video_camera_calibration_static_moge_mapper | 🎬Video 🚀GPU 🟡Beta | Compute the camera intrinsics and field of view (FOV) for a static camera using Moge-2 (more accurate than DeepCalib). 使用Moge-2 (比DeepCalib更准确) 计算静态摄像机的摄像机内部函数和视场 (FOV)。 | - | - | | video_captioning_from_audio_mapper | 🔮Multimodal 🚀GPU 🧩HF 🟢Stable | Mapper to caption a video according to its audio streams based on Qwen-Audio model. 映射器根据基于qwen-audio模型的音频流为视频添加字幕。 | [info](operators/mapper/video_captioning_from_audio_mapper.md) | - | | video_captioning_from_frames_mapper | 🔮Multimodal 🚀GPU 🧩HF 🟢Stable | Generates video captions from sampled frames using an image-to-text model. 使用图像到文本模型从采样帧生成视频字幕。 | [info](operators/mapper/video_captioning_from_frames_mapper.md) | - | | video_captioning_from_summarizer_mapper | 🔮Multimodal 🚀GPU 🧩HF 🟢Stable | Mapper to generate video captions by summarizing several kinds of generated texts (captions from video/audio/frames, tags from audio/frames, ...). 映射器通过总结几种生成的文本 (来自视频/音频/帧的字幕,来自音频/帧的标签,...) 来生成视频字幕。 | [info](operators/mapper/video_captioning_from_summarizer_mapper.md) | - | @@ -276,6 +278,7 @@ All the specific operators are listed below, each featured with several capabili | video_split_by_scene_mapper | 🔮Multimodal 💻CPU 🟢Stable | Splits videos into scene clips based on detected scene changes. 根据检测到的场景变化将视频拆分为场景剪辑。 | [info](operators/mapper/video_split_by_scene_mapper.md) | - | | video_tagging_from_audio_mapper | 🎬Video 🚀GPU 🧩HF 🟢Stable | Generates video tags from audio streams using the Audio Spectrogram Transformer. 使用音频频谱图转换器从音频流生成视频标签。 | [info](operators/mapper/video_tagging_from_audio_mapper.md) | - | | video_tagging_from_frames_mapper | 🎬Video 🚀GPU 🟢Stable | Generates video tags from frames extracted from videos. 从视频中提取的帧生成视频标签。 | [info](operators/mapper/video_tagging_from_frames_mapper.md) | - | +| video_undistort_mapper | 🎬Video 💻CPU 🟡Beta | Undistort raw videos with corresponding camera intrinsics and distortion coefficients. 使用相应的相机固有特性和失真系数对原始视频进行失真。 | - | - | | video_whole_body_pose_estimation_mapper | 🎬Video 🚀GPU 🟡Beta | Input a video containing people, and use the DWPose model to extract the body, hand, feet, and face keypoints of the human subjects in the video, i.e., 2D Whole-body Pose Estimation. 输入包含人的视频,并使用DWPose模型来提取视频中人类主体的身体、手、脚和面部关键点,即2D全身姿态估计。 | - | - | | whitespace_normalization_mapper | 🔤Text 💻CPU 🟢Stable | Normalizes various types of whitespace characters to standard spaces in text samples. 将文本样本中各种类型的空白字符规范化为标准空格。 | [info](operators/mapper/whitespace_normalization_mapper.md) | - | diff --git a/tests/ops/data/video12.mp4 b/tests/ops/data/video12.mp4 new file mode 100644 index 0000000000..24fdd261d3 Binary files /dev/null and b/tests/ops/data/video12.mp4 differ diff --git a/tests/ops/mapper/test_video_camera_calibration_static_deepcalib_mapper.py b/tests/ops/mapper/test_video_camera_calibration_static_deepcalib_mapper.py new file mode 100644 index 0000000000..194afa0d55 --- /dev/null +++ b/tests/ops/mapper/test_video_camera_calibration_static_deepcalib_mapper.py @@ -0,0 +1,75 @@ +import os +import unittest +import numpy as np + +from data_juicer.core.data import NestedDataset as Dataset +from data_juicer.ops.mapper.video_camera_calibration_static_deepcalib_mapper import VideoCameraCalibrationStaticDeepcalibMapper +from data_juicer.utils.mm_utils import SpecialTokens +from data_juicer.utils.constant import Fields, MetaKeys +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE + + +class VideoCameraCalibrationStaticDeepcalibMapperTest(DataJuicerTestCaseBase): + data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', + 'data') + vid3_path = os.path.join(data_path, 'video3.mp4') + vid4_path = os.path.join(data_path, 'video4.mp4') + vid12_path = os.path.join(data_path, 'video12.mp4') + + def _run_and_assert(self, num_proc): + ds_list = [{ + 'videos': [self.vid3_path] + }, { + 'videos': [self.vid4_path] + }, { + 'videos': [self.vid12_path] + }] + + tgt_list = [{"frame_names_shape": [49], + "intrinsics_list_shape": [49, 3, 3], + "xi_list_shape": [49], + "hfov_list_shape": [49], + "vfov_list_shape": [49]}, + {"frame_names_shape": [22], + "intrinsics_list_shape": [22, 3, 3], + "xi_list_shape": [22], + "hfov_list_shape": [22], + "vfov_list_shape": [22]}, + {"frame_names_shape": [3], + "intrinsics_list_shape": [3, 3, 3], + "xi_list_shape": [3], + "hfov_list_shape": [3], + "vfov_list_shape": [3]}] + + op = VideoCameraCalibrationStaticDeepcalibMapper( + model_path="weights_10_0.02.h5", + frame_num=1, + duration=1, + frame_dir=DATA_JUICER_ASSETS_CACHE, + output_info_dir=DATA_JUICER_ASSETS_CACHE, + ) + dataset = Dataset.from_list(ds_list) + if Fields.meta not in dataset.features: + dataset = dataset.add_column(name=Fields.meta, + column=[{}] * dataset.num_rows) + dataset = dataset.map(op.process, num_proc=num_proc, with_rank=True) + res_list = dataset.to_list() + + for sample, target in zip(res_list, tgt_list): + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_deepcalib_tags]["frame_names"]).shape), target["frame_names_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_deepcalib_tags]["intrinsics_list"]).shape), target["intrinsics_list_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_deepcalib_tags]["xi_list"]).shape), target["xi_list_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_deepcalib_tags]["hfov_list"]).shape), target["hfov_list_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_deepcalib_tags]["vfov_list"]).shape), target["vfov_list_shape"]) + + + def test(self): + self._run_and_assert(num_proc=1) + + def test_mul_proc(self): + self._run_and_assert(num_proc=2) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/ops/mapper/test_video_camera_calibration_static_moge_mapper.py b/tests/ops/mapper/test_video_camera_calibration_static_moge_mapper.py new file mode 100644 index 0000000000..1aa03a5fbd --- /dev/null +++ b/tests/ops/mapper/test_video_camera_calibration_static_moge_mapper.py @@ -0,0 +1,89 @@ +import os +import unittest +import numpy as np + +from data_juicer.core.data import NestedDataset as Dataset +from data_juicer.ops.mapper.video_camera_calibration_static_moge_mapper import VideoCameraCalibrationStaticMogeMapper +from data_juicer.utils.mm_utils import SpecialTokens +from data_juicer.utils.constant import Fields, MetaKeys +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE + + +class VideoCameraCalibrationStaticMogeMapperTest(DataJuicerTestCaseBase): + data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', + 'data') + vid3_path = os.path.join(data_path, 'video3.mp4') + vid4_path = os.path.join(data_path, 'video4.mp4') + vid12_path = os.path.join(data_path, 'video12.mp4') + + def _run_and_assert(self, num_proc): + ds_list = [{ + 'videos': [self.vid3_path] + }, { + 'videos': [self.vid4_path] + }, { + 'videos': [self.vid12_path] + }] + + tgt_list = [{"frame_names_shape": [49], + "intrinsics_list_shape": [49, 3, 3], + "hfov_list_shape": [49], + "vfov_list_shape": [49], + "points_list_shape": [49, 640, 362, 3], + "depth_list_shape": [49, 640, 362], + "mask_list_shape": [49, 640, 362]}, + {"frame_names_shape": [22], + "intrinsics_list_shape": [22, 3, 3], + "hfov_list_shape": [22], + "vfov_list_shape": [22], + "points_list_shape": [22, 360, 480, 3], + "depth_list_shape": [22, 360, 480], + "mask_list_shape": [22, 360, 480]}, + {"frame_names_shape": [3], + "intrinsics_list_shape": [3, 3, 3], + "hfov_list_shape": [3], + "vfov_list_shape": [3], + "points_list_shape": [3, 1080, 1920, 3], + "depth_list_shape": [3, 1080, 1920], + "mask_list_shape": [3, 1080, 1920]}] + + op = VideoCameraCalibrationStaticMogeMapper( + model_path="Ruicheng/moge-2-vitl", + frame_num=1, + duration=1, + frame_dir=DATA_JUICER_ASSETS_CACHE, + if_output_info=True, + output_info_dir=DATA_JUICER_ASSETS_CACHE, + if_output_points_info=True, + if_output_depth_info=True, + if_output_mask_info=True, + ) + + dataset = Dataset.from_list(ds_list) + if Fields.meta not in dataset.features: + dataset = dataset.add_column(name=Fields.meta, + column=[{}] * dataset.num_rows) + dataset = dataset.map(op.process, num_proc=num_proc, with_rank=True) + res_list = dataset.to_list() + + + for sample, target in zip(res_list, tgt_list): + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_moge_tags]["frame_names"]).shape), target["frame_names_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_moge_tags]["intrinsics_list"]).shape), target["intrinsics_list_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_moge_tags]["hfov_list"]).shape), target["hfov_list_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_moge_tags]["vfov_list"]).shape), target["vfov_list_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_moge_tags]["points_list"]).shape), target["points_list_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_moge_tags]["depth_list"]).shape), target["depth_list_shape"]) + self.assertEqual(list(np.array(sample[Fields.meta][MetaKeys.static_camera_calibration_moge_tags]["mask_list"]).shape), target["mask_list_shape"]) + + + def test(self): + self._run_and_assert(num_proc=1) + + def test_mul_proc(self): + self._run_and_assert(num_proc=2) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/ops/mapper/test_video_undistort_mapper.py b/tests/ops/mapper/test_video_undistort_mapper.py new file mode 100644 index 0000000000..93410ecddd --- /dev/null +++ b/tests/ops/mapper/test_video_undistort_mapper.py @@ -0,0 +1,59 @@ +import os +import unittest +import numpy as np + +from data_juicer.core.data import NestedDataset as Dataset +from data_juicer.ops.mapper.video_undistort_mapper import VideoUndistortMapper +from data_juicer.utils.mm_utils import SpecialTokens +from data_juicer.utils.constant import Fields, MetaKeys +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.cache_utils import DATA_JUICER_ASSETS_CACHE + +class VideoUndistortMapperTest(DataJuicerTestCaseBase): + data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', + 'data') + vid3_path = os.path.join(data_path, 'video3.mp4') + vid12_path = os.path.join(data_path, 'video12.mp4') + + def _run_and_assert(self, output_video_dir, num_proc): + ds_list = [{ + 'videos': [self.vid3_path], + 'intrinsics': [[465.4728460758426, 0, 181.0], [0, 465.4728460758426, 320.0], [0, 0, 1]], + 'distortion_coefficients': None, + 'xi': 0.203957462310791, + 'rotation_matrix': None, + 'intrinsics_new': None + }, { + 'videos': [self.vid12_path], + 'intrinsics': [[1227.3657989501953, 0, 960.0], [0, 1227.3657989501953, 540.0], [0, 0, 1]], + 'distortion_coefficients': None, + 'xi': 0.33518279, + 'rotation_matrix': None, + 'intrinsics_new': None + }] + + tgt_key_names = ["new_video_path"] + + op = VideoUndistortMapper( + output_video_dir=output_video_dir + ) + dataset = Dataset.from_list(ds_list) + if Fields.meta not in dataset.features: + dataset = dataset.add_column(name=Fields.meta, + column=[{}] * dataset.num_rows) + dataset = dataset.map(op.process, num_proc=num_proc, with_rank=True) + res_list = dataset.to_list() + + for sample in res_list: + self.assertEqual(list(sample[Fields.meta][MetaKeys.video_undistortion_tags].keys()), tgt_key_names) + + + def test(self): + self._run_and_assert(output_video_dir=os.path.join(DATA_JUICER_ASSETS_CACHE, "output_video1"), num_proc=1) + + def test_mul_proc(self): + self._run_and_assert(output_video_dir=os.path.join(DATA_JUICER_ASSETS_CACHE, "output_video2"), num_proc=2) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file