import subprocess import json import os import tempfile import shutil from video_funcs import get_video_info, get_target_bitrate, get_target_resolution, get_fps TEMP_DIR = ".temp" CONCATED_DIR = "concated" # --- helpers --------------------------------------------------------------- # def get_signature(fp: str) -> tuple: """ A ‘signature’ is everything that has to match for a bit-perfect concat: – video: codec, width, height, fps (as a float), pix_fmt, color_range – audio: codec, sample_rate, channels, channel_layout """ def ffprobe_json(fp: str) -> dict: """Return the full ffprobe-JSON for a media file.""" cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", fp ] return json.loads(subprocess.check_output(cmd, text=True)) info = ffprobe_json(fp) v_stream = next(s for s in info["streams"] if s["codec_type"] == "video") a_stream = next((s for s in info["streams"] if s["codec_type"] == "audio"), None) def fps(stream): fr = stream.get("r_frame_rate", "0/0") num, den = map(int, fr.split("/")) return round(num / den, 3) if den else 0.0 sig = ( v_stream["codec_name"], int(v_stream["width"]), int(v_stream["height"]), fps(v_stream), v_stream.get("pix_fmt"), v_stream.get("color_range"), a_stream["codec_name"] if a_stream else None, int(a_stream["sample_rate"]) if a_stream else None, a_stream.get("channels") if a_stream else None, a_stream.get("channel_layout") if a_stream else None, ) return sig def all_signatures_equal(videos): ref = get_signature(videos[0]["filepath"]) return all(get_signature(v["filepath"]) == ref for v in videos[1:]) # --- concat functions --------------------------------------------------------------- # def concat_copy(videos, out_path): """Lossless concat with the *concat demuxer* (-c copy).""" with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f: for v in videos: f.write(f"file '{os.path.abspath(v['filepath']).replace('\'', '\\\'')}'\n") list_file = f.name cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", out_path, ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) os.unlink(list_file) # Look for specific error patterns in FFmpeg's stderr ffmpeg_errors = [ "corrupt input packet", "Invalid OBU", "Failed to parse temporal unit", "Packet corrupt", "partial file", "Non-monotonic DTS" ] if result.returncode != 0 or any(err in result.stderr for err in ffmpeg_errors): print("❌ FFmpeg concat failed or produced corrupted output.") print("FFmpeg stderr:") print(result.stderr) # Remove broken file if it exists if os.path.exists(out_path): os.remove(out_path) print(f"🗑️ Removed corrupt output: {out_path}") return False print("✅ FFmpeg concat completed successfully.") return True def concatenate_videos(videos_list, reencode_concate = False): """ Concatenate pre-grouped videos, then re-encode them using AV1 (NVENC) while forcing a unified resolution and frame rate on each input before final concatenation in one ffmpeg command. """ if len(videos_list) <= 1: return False copy_concat = copy_concatenate_videos(videos_list) if copy_concat: return copy_concat if not reencode_concate: return False print("Falling back to re-encoding due to concat failure.") return encode_concatenate_videos(videos_list) def copy_concatenate_videos(videos_list): from concat_helper import all_signatures_equal, concat_copy if not (len(videos_list) > 1 and all_signatures_equal(videos_list)): print("Streams are not compatible for lossless concat.") return False print("All streams are compatible – attempting lossless concat …") main_video = videos_list[0] video_path = main_video["filepath"] output_path = os.path.join(TEMP_DIR, os.path.basename(video_path)) os.makedirs(CONCATED_DIR, exist_ok=True) success = concat_copy(videos_list, output_path) if not success: return False # Remove originals for v in videos_list: os.remove(v["filepath"]) # move temp to concated folder os.rename(output_path, os.path.join(CONCATED_DIR, os.path.basename(video_path))) return main_video def encode_concatenate_videos(videos_list): """Encode and concatenate videos without ffmpeg spam in console.""" main_video = videos_list[0] video_path = main_video["filepath"] os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(CONCATED_DIR, exist_ok=True) temp_path = os.path.join(TEMP_DIR, os.path.basename(video_path)) output_path = os.path.join(CONCATED_DIR, os.path.basename(video_path)) video_info = get_video_info(videos_list[0]['filepath']) current_bitrate = int(video_info.get('bitrate') or 0) target_width, target_height = get_target_resolution(videos_list) target_bitrate_kbps = get_target_bitrate(target_width, target_height) if current_bitrate > 0: target_bitrate_kbps = min(target_bitrate_kbps, current_bitrate) max_bitrate_kbps = min(int(1.5 * target_bitrate_kbps), current_bitrate) else: max_bitrate_kbps = int(1.5 * target_bitrate_kbps) fps_float = get_fps(video_path) or video_info.get('fps') or 30.0 if fps_float <= 0: fps_float = 30.0 keyframe_interval = int(fps_float) print(f"Concatenating {len(videos_list)} videos into {temp_path}") print(f" Mode Resolution: {target_width}x{target_height}") print(f" Target Bitrate: {target_bitrate_kbps}k (max ~{max_bitrate_kbps}k)") print(f" Keyframe Interval: {keyframe_interval}") cmd = ["ffmpeg", "-y"] for v in videos_list: cmd.extend(["-i", v["filepath"]]) filter_statements = [] concat_streams = [] n = len(videos_list) unified_fps = 30 for i in range(n): filter_statements.append( f"[{i}:v]fps={unified_fps},scale={target_width}:{target_height}[v{i}]" ) concat_streams.append(f"[v{i}][{i}:a]") concat_line = "".join(concat_streams) + f"concat=n={n}:v=1:a=1[outv][outa]" filter_statements.append(concat_line) filter_complex = ";".join(filter_statements) cmd.extend([ "-filter_complex", filter_complex, "-map", "[outv]", "-map", "[outa]", "-c:v", "av1_nvenc", "-b:v", f"{target_bitrate_kbps}k", "-maxrate", f"{max_bitrate_kbps}k", "-bufsize", f"{max_bitrate_kbps}k", "-preset", "p5", "-g", str(keyframe_interval), "-c:a", "aac", "-b:a", "192k", temp_path ]) try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError: return False for video in videos_list: os.remove(video["filepath"]) shutil.move(temp_path, output_path) return main_video