You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

115 lines
3.7 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import subprocess
import json
import os
import tempfile
# --- helpers --------------------------------------------------------------- #
def ffprobe_json(fp: str) -> dict:
"""Return the full ffprobe-JSON for a media file."""
cmd = [
"ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", "-show_format", fp
]
return json.loads(subprocess.check_output(cmd, text=True))
def get_signature(fp: str) -> tuple:
"""
A signature is everything that has to match for a bit-perfect concat:
video: codec, width, height, fps (as a float), pix_fmt, color_range
audio: codec, sample_rate, channels, channel_layout
"""
info = ffprobe_json(fp)
v_stream = next(s for s in info["streams"] if s["codec_type"] == "video")
a_stream = next((s for s in info["streams"] if s["codec_type"] == "audio"), None)
def fps(stream):
fr = stream.get("r_frame_rate", "0/0")
num, den = map(int, fr.split("/"))
return round(num / den, 3) if den else 0.0
sig = (
v_stream["codec_name"],
int(v_stream["width"]), int(v_stream["height"]),
fps(v_stream),
v_stream.get("pix_fmt"),
v_stream.get("color_range"),
a_stream["codec_name"] if a_stream else None,
int(a_stream["sample_rate"]) if a_stream else None,
a_stream.get("channels") if a_stream else None,
a_stream.get("channel_layout") if a_stream else None,
)
return sig
def all_signatures_equal(videos):
ref = get_signature(videos[0]["filepath"])
return all(get_signature(v["filepath"]) == ref for v in videos[1:])
def concat_copy(videos, out_path):
"""Lossless concat with the *concat demuxer* (-c copy)."""
with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f:
for v in videos:
f.write(f"file '{os.path.abspath(v['filepath']).replace('\'', '\\\'')}'\n")
list_file = f.name
cmd = [
"ffmpeg", "-y",
"-f", "concat", "-safe", "0",
"-i", list_file,
"-c", "copy",
out_path,
]
print("Running FFmpeg concat...")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
os.unlink(list_file)
# Look for specific error patterns in FFmpeg's stderr
ffmpeg_errors = [
"corrupt input packet",
"Invalid OBU",
"Failed to parse temporal unit",
"Packet corrupt",
"partial file",
"Non-monotonic DTS"
]
if result.returncode != 0 or any(err in result.stderr for err in ffmpeg_errors):
print("❌ FFmpeg concat failed or produced corrupted output.")
# Remove broken file if it exists
if os.path.exists(out_path):
os.remove(out_path)
print(f"🗑️ Removed corrupt output: {out_path}")
return False
print("✅ FFmpeg concat completed successfully.")
return True
def copy_concatenate_videos(videos_list):
if not (len(videos_list) > 1 and all_signatures_equal(videos_list)):
print("Streams are not compatible for lossless concat.")
return False
print("All streams are compatible attempting lossless concat …")
main_video = videos_list[0]
video_path = main_video["filepath"]
output_path = os.path.join("temp", os.path.basename(video_path))
os.makedirs("concated", exist_ok=True)
success = concat_copy(videos_list, output_path)
if not success:
print("Falling back to re-encoding due to concat failure.")
return False
# Remove originals
for v in videos_list:
os.remove(v["filepath"])
# move temp to concated folder
os.rename(output_path, os.path.join("concated", os.path.basename(video_path)))
return main_video