You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

406 lines
14 KiB
Python

from datetime import datetime, timedelta, timezone
from MP4Manager import get_duration
import os, json, subprocess, shutil
def is_file_empty(filepath):
return os.stat(filepath).st_size == 0
def format_datetime(datetime_str):
"""Format the datetime string to a more readable format."""
return datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
def get_file_size_in_mb(file_path):
return os.path.getsize(file_path) / (1024 ** 2)
def get_file_size_gb(file_path):
return os.path.getsize(file_path) / 1024 / 1024 / 1024
def get_data(data_path):
try:
with open(data_path, 'r') as file:
data = json.load(file)
return data
except Exception as e:
print(f"Error loading {data_path}: {e}")
return None
def update_video_data(dataPath, data):
"""Update or create a JSON file for the video metadata."""
if os.path.exists(dataPath):
with open(dataPath, "r") as f:
existing_data = json.load(f)
if existing_data == data:
return # No update needed if data hasn't changed.
data["updatedAt"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(dataPath, "w") as f:
json.dump(data, f) # Write to file if new or if data has changed.
def is_recent(updated_at_str, minutes=30):
updated_at = format_datetime(updated_at_str)
updated_at = updated_at.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
return now - updated_at < timedelta(minutes=minutes)
def is_file_size_bigger_than(file_size_in_mb, max_size_gb):
"""Check if the file size is bigger than the specified max size in GB."""
max_size_megabytes = max_size_gb * 1024 # Convert GB to MB
return file_size_in_mb > max_size_megabytes
def cleanup_data_files(folder_path):
videos = [video for video in os.listdir(folder_path) if video.endswith(".json")]
for filename in videos:
json_path = os.path.join(folder_path, filename)
video_path = json_path.replace(".json", ".mp4")
if not os.path.exists(video_path):
os.remove(json_path)
def get_video_data(videoPath):
with open(videoPath, "r") as f:
data = json.load(f)
return data
def get_videos(folder_path):
"""Retrieve video metadata from the JSON files in a specified folder."""
video_list = []
# List all .mp4 files and their corresponding .json metadata files
videos = [f for f in os.listdir(folder_path) if f.endswith(".mp4")]
for video_filename in videos:
video_path = os.path.join(folder_path, video_filename)
json_path = video_path.replace(".mp4", ".json")
if not os.path.exists(json_path):
continue
data = get_video_data(json_path)
data['size'] = get_file_size_in_mb(video_path) # Include size in MB for further processing
data['filepath'] = video_path
video_list.append(data)
return video_list
def group_videos(video_list, sort_by="count", order="desc"):
"""Group video data by username and site, and sort the groups by video creation time."""
video_data = {}
is_desc = order == "desc"
for video in video_list:
key = (video["username"], video["site"])
if key not in video_data:
video_data[key] = []
video_data[key].append(video)
# Ensure videos for each user and site are sorted by creation date
for key in video_data:
video_data[key].sort(key=lambda x: format_datetime(x["createdAt"]))
# Further sort groups if required based on size or count
if sort_by == "size":
video_data = dict(sorted(video_data.items(), key=lambda x: sum(item['size'] for item in x[1]), reverse=is_desc))
elif sort_by == "count":
video_data = dict(sorted(video_data.items(), key=lambda x: len(x[1]), reverse=is_desc))
return video_data
def process_videos(video_data):
processed_videos = []
failed_directory = "failed"
for video in video_data:
is_updated = False
video_path = video["filepath"]
data_path = video["jsonpath"]
if 'size' not in video:
filesize = get_file_size_in_mb(video_path)
video['size'] = filesize
is_updated = True
if is_updated and 'duration' not in video:
video['duration'] = get_duration(video_path)
is_updated = True
# Move corrupted videos to the failed folder
if video['duration'] == 0:
print(f"{video['videoID']} is corrupted, moving to failed folder")
failed_video_path = os.path.join(failed_directory, video["videoID"] + ".mp4")
failed_data_path = failed_video_path.replace(".mp4", ".json")
shutil.move(video_path, failed_video_path)
shutil.move(data_path, failed_data_path)
continue # Skip further processing for this video
if is_updated:
update_video_data(data_path, video)
processed_videos.append(video)
return processed_videos
def group_for_concatenation(videos, time_limit=30):
"""
Groups videos into lists where:
- total group size <= 9GB (9216 MB),
- time gap between consecutive videos <= time_limit minutes,
- AND all have the same resolution/fps/codecs for no-reencode concat.
"""
concatenated_video_groups = []
current_group = []
current_size_mb = 0
last_video_end = None
reference_params = None # We'll store the 'ffprobe' params for the first video in each group
for video in videos:
video_start = format_datetime(video['createdAt'])
video_end = video_start + timedelta(seconds=video['duration'])
# Probe the video to get parameters
video_path = video['filepath']
params = get_video_params(video_path)
if params is None:
# If ffprobe fails, skip or handle the error
print(f"Skipping {video_path}, failed to get ffprobe info.")
continue
if current_group:
# Check if adding this video breaks the size limit
time_difference = (video_start - last_video_end).total_seconds() / 60
size_exceeded = (current_size_mb + video['size'] > 9216)
time_exceeded = (time_difference > time_limit)
# Check if the video parameters match the group's reference
param_mismatch = False
if reference_params:
# Compare relevant fields
for field in ['video_codec','width','height','pix_fmt','fps',
'audio_codec','audio_sample_rate','audio_channels','audio_channel_layout']:
if params[field] != reference_params[field]:
param_mismatch = True
break
# If we exceed size, exceed time gap, or mismatch in parameters => start new group
if size_exceeded or time_exceeded or param_mismatch:
concatenated_video_groups.append(current_group)
current_group = []
current_size_mb = 0
reference_params = None # reset for new group
# If we're starting a new group, set reference parameters
if not current_group:
reference_params = params
# Add the current video to the group
current_group.append(video)
current_size_mb += video['size']
last_video_end = video_end
# Add the last group if not empty
if current_group:
concatenated_video_groups.append(current_group)
# Optional: Ensure the last group is "ready" for upload based on time difference
# (Your original logic that if last video was updated < time_limit minutes ago, remove the group)
if concatenated_video_groups:
last_group = concatenated_video_groups[-1]
last_video = last_group[-1]
last_updated_at = datetime.strptime(last_video['createdAt'], "%Y-%m-%d %H:%M:%S")
if datetime.now() - last_updated_at <= timedelta(minutes=time_limit):
print(f"Last group is not ready for upload. Removing from final groups.")
concatenated_video_groups.pop()
concatenated_video_groups = [group for group in concatenated_video_groups if len(group) > 1]
return concatenated_video_groups
def get_video_params(video_path):
"""
Run ffprobe on a given video path to extract:
- codec_name (video + audio)
- width, height
- pix_fmt
- r_frame_rate (frame rate)
- sample_rate, channel_layout (audio)
Returns a dict with these parameters or None if there's an error.
"""
cmd = [
'ffprobe', '-v', 'error',
'-print_format', 'json',
'-show_streams',
'-show_format',
video_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
info = json.loads(result.stdout)
# We'll parse out the first video & audio streams we find.
video_stream = next((s for s in info['streams'] if s['codec_type'] == 'video'), None)
audio_stream = next((s for s in info['streams'] if s['codec_type'] == 'audio'), None)
if not video_stream:
raise ValueError(f"No video stream found in {video_path}")
# Frame rate can be something like "30000/1001" - convert to float
r_frame_rate = video_stream.get('r_frame_rate', '0/0')
try:
num, den = r_frame_rate.split('/')
fps = float(num) / float(den) if float(den) != 0 else 0.0
except:
fps = 0.0
# Gather the key parameters
params = {
'video_codec': video_stream.get('codec_name', 'unknown'),
'width': video_stream.get('width', 0),
'height': video_stream.get('height', 0),
'pix_fmt': video_stream.get('pix_fmt', 'unknown'),
'fps': fps,
'audio_codec': audio_stream.get('codec_name', 'none') if audio_stream else 'none',
'audio_sample_rate': audio_stream.get('sample_rate', '0') if audio_stream else '0',
'audio_channels': audio_stream.get('channels', 0) if audio_stream else 0,
'audio_channel_layout': audio_stream.get('channel_layout', 'none') if audio_stream else 'none'
}
return params
except subprocess.CalledProcessError as e:
print(f"Failed to run ffprobe on {video_path}: {e}")
return None
def generate_list_file(videos):
directory = os.path.dirname(videos[0]["filepath"])
list_filename = os.path.join(directory, f"{videos[0]['videoID']}.txt")
with open(list_filename, "w") as list_file:
for video in videos:
list_file.write(f"file '{video['videoID']}.mp4'\n")
return list_filename
def concatenate_videos(grouped_videos, directory):
"""Concatenate pre-grouped videos, updating metadata and managing file operations."""
processed_videos = []
for group in grouped_videos:
if len(group) == 1:
processed_videos.append(group[0])
continue
# Set up paths based on the first video in the group
first_video = group[0]
video_path = first_video["filepath"]
data_path = video_path.replace(".mp4", ".json")
temp_path = video_path.replace(".mp4", "_temp.mp4")
# Generate a list file for ffmpeg concatenation
list_filename = generate_list_file(directory, group)
# Run ffmpeg to concatenate videos
subprocess.run(["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", temp_path])
# Remove individual video files and their metadata
[os.remove(v["filepath"]) for v in group]
[os.remove(v["filepath"].replace(".mp4", ".json")) for v in group]
os.remove(list_filename)
os.rename(temp_path, video_path)
# Update the metadata for the concatenated video
first_video["filepath"] = video_path
first_video["size"] = get_file_size_in_mb(video_path)
first_video["duration"] = get_duration(video_path)
update_video_data(data_path, first_video) # Ensure this function reflects the changes of concatenation
processed_videos.append(first_video)
return processed_videos
def get_all_videos(directory):
# find all .mp4 files in the directory and its subdirectories
videos = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith(".mp4"):
videos.append(os.path.join(root, file))
return videos
def get_all_data(directory):
# finds all json files in the directory and its subdirectories
data = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith(".json"):
data.append(os.path.join(root, file))
return data
def match_data_to_video_fast(videos, data):
data_dict = {os.path.splitext(os.path.basename(d))[0]: d for d in data}
matched, unmatched = [], []
for v in videos:
video_id = os.path.splitext(os.path.basename(v))[0]
if video_id in data_dict:
matched.append((v, data_dict[video_id]))
else:
unmatched.append(v)
return parse_video_data(matched), unmatched
def parse_video_data(matched_videos):
"""Retrieve video metadata from the JSON files in a specified folder."""
import tqdm
video_list = []
with tqdm.tqdm(total=len(matched_videos), desc="Parsing video data") as pbar:
for video in matched_videos:
pbar.update(1)
video_path, json_path = video
data = get_video_data(json_path)
data['filepath'] = video_path
data['jsonpath'] = json_path
video_list.append(data)
return video_list
def get_videos_matched(video_dirs, data_dirs):
# get all videos
videos = []
for d in video_dirs:
videos += get_all_videos(d)
# get all data
data = []
for d in data_dirs:
data += get_all_data(d)
# match the data to the videos
parsed_videos, unmatched = match_data_to_video_fast(videos, data)
return parsed_videos, unmatched
def calculate_file_hash(file_path):
import hashlib
with open(file_path, 'rb') as f:
data = f.read()
return hashlib.sha256(data).hexdigest()