You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
406 lines
14 KiB
Python
406 lines
14 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
from VideoManager import get_duration
|
|
import os, json, subprocess, shutil
|
|
|
|
|
|
def is_file_empty(filepath):
|
|
return os.stat(filepath).st_size == 0
|
|
|
|
def format_datetime(datetime_str):
|
|
"""Format the datetime string to a more readable format."""
|
|
return datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
|
|
|
|
def get_file_size_in_mb(file_path):
|
|
return os.path.getsize(file_path) / (1024 ** 2)
|
|
|
|
def get_file_size_gb(file_path):
|
|
return os.path.getsize(file_path) / 1024 / 1024 / 1024
|
|
|
|
def get_data(data_path):
|
|
try:
|
|
with open(data_path, 'r') as file:
|
|
data = json.load(file)
|
|
return data
|
|
except Exception as e:
|
|
print(f"Error loading {data_path}: {e}")
|
|
return None
|
|
|
|
def update_video_data(dataPath, data):
|
|
"""Update or create a JSON file for the video metadata."""
|
|
if os.path.exists(dataPath):
|
|
with open(dataPath, "r") as f:
|
|
existing_data = json.load(f)
|
|
|
|
if existing_data == data:
|
|
return # No update needed if data hasn't changed.
|
|
|
|
data["updatedAt"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
with open(dataPath, "w") as f:
|
|
json.dump(data, f) # Write to file if new or if data has changed.
|
|
|
|
|
|
def is_recent(updated_at_str, minutes=30):
|
|
updated_at = format_datetime(updated_at_str)
|
|
updated_at = updated_at.replace(tzinfo=timezone.utc)
|
|
now = datetime.now(timezone.utc)
|
|
return now - updated_at < timedelta(minutes=minutes)
|
|
|
|
|
|
def is_file_size_bigger_than(file_size_in_mb, max_size_gb):
|
|
"""Check if the file size is bigger than the specified max size in GB."""
|
|
max_size_megabytes = max_size_gb * 1024 # Convert GB to MB
|
|
return file_size_in_mb > max_size_megabytes
|
|
|
|
|
|
def cleanup_data_files(folder_path):
|
|
videos = [video for video in os.listdir(folder_path) if video.endswith(".json")]
|
|
for filename in videos:
|
|
json_path = os.path.join(folder_path, filename)
|
|
video_path = json_path.replace(".json", ".mp4")
|
|
if not os.path.exists(video_path):
|
|
os.remove(json_path)
|
|
|
|
|
|
def get_video_data(videoPath):
|
|
with open(videoPath, "r") as f:
|
|
data = json.load(f)
|
|
return data
|
|
|
|
|
|
def get_videos(folder_path):
|
|
"""Retrieve video metadata from the JSON files in a specified folder."""
|
|
video_list = []
|
|
|
|
# List all .mp4 files and their corresponding .json metadata files
|
|
videos = [f for f in os.listdir(folder_path) if f.endswith(".mp4")]
|
|
|
|
for video_filename in videos:
|
|
video_path = os.path.join(folder_path, video_filename)
|
|
json_path = video_path.replace(".mp4", ".json")
|
|
|
|
if not os.path.exists(json_path):
|
|
continue
|
|
|
|
data = get_video_data(json_path)
|
|
data['size'] = get_file_size_in_mb(video_path) # Include size in MB for further processing
|
|
data['filepath'] = video_path
|
|
|
|
video_list.append(data)
|
|
|
|
return video_list
|
|
|
|
|
|
def group_videos(video_list, sort_by="count", order="desc"):
|
|
"""Group video data by username and site, and sort the groups by video creation time."""
|
|
video_data = {}
|
|
is_desc = order == "desc"
|
|
|
|
for video in video_list:
|
|
key = (video["username"], video["site"])
|
|
if key not in video_data:
|
|
video_data[key] = []
|
|
video_data[key].append(video)
|
|
|
|
# Ensure videos for each user and site are sorted by creation date
|
|
for key in video_data:
|
|
video_data[key].sort(key=lambda x: format_datetime(x["createdAt"]))
|
|
|
|
# Further sort groups if required based on size or count
|
|
if sort_by == "size":
|
|
video_data = dict(sorted(video_data.items(), key=lambda x: sum(item['size'] for item in x[1]), reverse=is_desc))
|
|
elif sort_by == "count":
|
|
video_data = dict(sorted(video_data.items(), key=lambda x: len(x[1]), reverse=is_desc))
|
|
|
|
return video_data
|
|
|
|
|
|
def process_videos(video_data):
|
|
processed_videos = []
|
|
failed_directory = "failed"
|
|
|
|
for video in video_data:
|
|
is_updated = False
|
|
video_path = video["filepath"]
|
|
data_path = video["jsonpath"]
|
|
|
|
if 'size' not in video:
|
|
filesize = get_file_size_in_mb(video_path)
|
|
video['size'] = filesize
|
|
is_updated = True
|
|
|
|
if is_updated and 'duration' not in video:
|
|
video['duration'] = get_duration(video_path)
|
|
is_updated = True
|
|
|
|
# Move corrupted videos to the failed folder
|
|
if video['duration'] == 0:
|
|
print(f"{video['videoID']} is corrupted, moving to failed folder")
|
|
failed_video_path = os.path.join(failed_directory, video["videoID"] + ".mp4")
|
|
failed_data_path = failed_video_path.replace(".mp4", ".json")
|
|
|
|
shutil.move(video_path, failed_video_path)
|
|
shutil.move(data_path, failed_data_path)
|
|
|
|
continue # Skip further processing for this video
|
|
|
|
if is_updated:
|
|
update_video_data(data_path, video)
|
|
|
|
processed_videos.append(video)
|
|
|
|
return processed_videos
|
|
|
|
|
|
def group_for_concatenation(videos, time_limit=30):
|
|
"""
|
|
Groups videos into lists where:
|
|
- total group size <= 9GB (9216 MB),
|
|
- time gap between consecutive videos <= time_limit minutes,
|
|
- AND all have the same resolution/fps/codecs for no-reencode concat.
|
|
"""
|
|
concatenated_video_groups = []
|
|
current_group = []
|
|
current_size_mb = 0
|
|
last_video_end = None
|
|
reference_params = None # We'll store the 'ffprobe' params for the first video in each group
|
|
|
|
for video in videos:
|
|
video_start = format_datetime(video['createdAt'])
|
|
video_end = video_start + timedelta(seconds=video['duration'])
|
|
|
|
# Probe the video to get parameters
|
|
video_path = video['filepath']
|
|
params = get_video_params(video_path)
|
|
if params is None:
|
|
# If ffprobe fails, skip or handle the error
|
|
print(f"Skipping {video_path}, failed to get ffprobe info.")
|
|
continue
|
|
|
|
if current_group:
|
|
# Check if adding this video breaks the size limit
|
|
time_difference = (video_start - last_video_end).total_seconds() / 60
|
|
size_exceeded = (current_size_mb + video['size'] > 9216)
|
|
time_exceeded = (time_difference > time_limit)
|
|
|
|
# Check if the video parameters match the group's reference
|
|
param_mismatch = False
|
|
if reference_params:
|
|
# Compare relevant fields
|
|
for field in ['video_codec','width','height','pix_fmt','fps',
|
|
'audio_codec','audio_sample_rate','audio_channels','audio_channel_layout']:
|
|
if params[field] != reference_params[field]:
|
|
param_mismatch = True
|
|
break
|
|
|
|
# If we exceed size, exceed time gap, or mismatch in parameters => start new group
|
|
if size_exceeded or time_exceeded or param_mismatch:
|
|
concatenated_video_groups.append(current_group)
|
|
current_group = []
|
|
current_size_mb = 0
|
|
reference_params = None # reset for new group
|
|
|
|
# If we're starting a new group, set reference parameters
|
|
if not current_group:
|
|
reference_params = params
|
|
|
|
# Add the current video to the group
|
|
current_group.append(video)
|
|
current_size_mb += video['size']
|
|
last_video_end = video_end
|
|
|
|
# Add the last group if not empty
|
|
if current_group:
|
|
concatenated_video_groups.append(current_group)
|
|
|
|
# Optional: Ensure the last group is "ready" for upload based on time difference
|
|
# (Your original logic that if last video was updated < time_limit minutes ago, remove the group)
|
|
if concatenated_video_groups:
|
|
last_group = concatenated_video_groups[-1]
|
|
last_video = last_group[-1]
|
|
last_updated_at = datetime.strptime(last_video['createdAt'], "%Y-%m-%d %H:%M:%S")
|
|
if datetime.now() - last_updated_at <= timedelta(minutes=time_limit):
|
|
print(f"Last group is not ready for upload. Removing from final groups.")
|
|
concatenated_video_groups.pop()
|
|
|
|
concatenated_video_groups = [group for group in concatenated_video_groups if len(group) > 1]
|
|
|
|
return concatenated_video_groups
|
|
|
|
|
|
def get_video_params(video_path):
|
|
"""
|
|
Run ffprobe on a given video path to extract:
|
|
- codec_name (video + audio)
|
|
- width, height
|
|
- pix_fmt
|
|
- r_frame_rate (frame rate)
|
|
- sample_rate, channel_layout (audio)
|
|
Returns a dict with these parameters or None if there's an error.
|
|
"""
|
|
cmd = [
|
|
'ffprobe', '-v', 'error',
|
|
'-print_format', 'json',
|
|
'-show_streams',
|
|
'-show_format',
|
|
video_path
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
info = json.loads(result.stdout)
|
|
|
|
# We'll parse out the first video & audio streams we find.
|
|
video_stream = next((s for s in info['streams'] if s['codec_type'] == 'video'), None)
|
|
audio_stream = next((s for s in info['streams'] if s['codec_type'] == 'audio'), None)
|
|
|
|
if not video_stream:
|
|
raise ValueError(f"No video stream found in {video_path}")
|
|
|
|
# Frame rate can be something like "30000/1001" - convert to float
|
|
r_frame_rate = video_stream.get('r_frame_rate', '0/0')
|
|
try:
|
|
num, den = r_frame_rate.split('/')
|
|
fps = float(num) / float(den) if float(den) != 0 else 0.0
|
|
except:
|
|
fps = 0.0
|
|
|
|
# Gather the key parameters
|
|
params = {
|
|
'video_codec': video_stream.get('codec_name', 'unknown'),
|
|
'width': video_stream.get('width', 0),
|
|
'height': video_stream.get('height', 0),
|
|
'pix_fmt': video_stream.get('pix_fmt', 'unknown'),
|
|
'fps': fps,
|
|
'audio_codec': audio_stream.get('codec_name', 'none') if audio_stream else 'none',
|
|
'audio_sample_rate': audio_stream.get('sample_rate', '0') if audio_stream else '0',
|
|
'audio_channels': audio_stream.get('channels', 0) if audio_stream else 0,
|
|
'audio_channel_layout': audio_stream.get('channel_layout', 'none') if audio_stream else 'none'
|
|
}
|
|
|
|
return params
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Failed to run ffprobe on {video_path}: {e}")
|
|
return None
|
|
|
|
|
|
def generate_list_file(videos):
|
|
directory = os.path.dirname(videos[0]["filepath"])
|
|
list_filename = os.path.join(directory, f"{videos[0]['videoID']}.txt")
|
|
with open(list_filename, "w") as list_file:
|
|
for video in videos:
|
|
list_file.write(f"file '{video['videoID']}.mp4'\n")
|
|
return list_filename
|
|
|
|
|
|
def concatenate_videos(grouped_videos, directory):
|
|
"""Concatenate pre-grouped videos, updating metadata and managing file operations."""
|
|
processed_videos = []
|
|
|
|
for group in grouped_videos:
|
|
if len(group) == 1:
|
|
processed_videos.append(group[0])
|
|
continue
|
|
|
|
# Set up paths based on the first video in the group
|
|
first_video = group[0]
|
|
video_path = first_video["filepath"]
|
|
data_path = video_path.replace(".mp4", ".json")
|
|
temp_path = video_path.replace(".mp4", "_temp.mp4")
|
|
|
|
# Generate a list file for ffmpeg concatenation
|
|
list_filename = generate_list_file(directory, group)
|
|
|
|
# Run ffmpeg to concatenate videos
|
|
subprocess.run(["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", temp_path])
|
|
|
|
# Remove individual video files and their metadata
|
|
[os.remove(v["filepath"]) for v in group]
|
|
[os.remove(v["filepath"].replace(".mp4", ".json")) for v in group]
|
|
os.remove(list_filename)
|
|
|
|
os.rename(temp_path, video_path)
|
|
|
|
# Update the metadata for the concatenated video
|
|
first_video["filepath"] = video_path
|
|
first_video["size"] = get_file_size_in_mb(video_path)
|
|
first_video["duration"] = get_duration(video_path)
|
|
update_video_data(data_path, first_video) # Ensure this function reflects the changes of concatenation
|
|
processed_videos.append(first_video)
|
|
|
|
return processed_videos
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_videos(directory):
|
|
# find all .mp4 files in the directory and its subdirectories
|
|
videos = []
|
|
for root, dirs, files in os.walk(directory):
|
|
for file in files:
|
|
if file.endswith(".mp4"):
|
|
videos.append(os.path.join(root, file))
|
|
return videos
|
|
|
|
def get_all_data(directory):
|
|
# finds all json files in the directory and its subdirectories
|
|
data = []
|
|
for root, dirs, files in os.walk(directory):
|
|
for file in files:
|
|
if file.endswith(".json"):
|
|
data.append(os.path.join(root, file))
|
|
return data
|
|
|
|
def match_data_to_video_fast(videos, data):
|
|
data_dict = {os.path.splitext(os.path.basename(d))[0]: d for d in data}
|
|
matched, unmatched = [], []
|
|
for v in videos:
|
|
video_id = os.path.splitext(os.path.basename(v))[0]
|
|
if video_id in data_dict:
|
|
matched.append((v, data_dict[video_id]))
|
|
else:
|
|
unmatched.append(v)
|
|
return parse_video_data(matched), unmatched
|
|
|
|
def parse_video_data(matched_videos):
|
|
"""Retrieve video metadata from the JSON files in a specified folder."""
|
|
import tqdm
|
|
video_list = []
|
|
|
|
with tqdm.tqdm(total=len(matched_videos), desc="Parsing video data") as pbar:
|
|
for video in matched_videos:
|
|
pbar.update(1)
|
|
video_path, json_path = video
|
|
|
|
data = get_video_data(json_path)
|
|
data['filepath'] = video_path
|
|
data['jsonpath'] = json_path
|
|
|
|
video_list.append(data)
|
|
|
|
return video_list
|
|
|
|
def get_videos_matched(video_dirs, data_dirs):
|
|
# get all videos
|
|
videos = []
|
|
for d in video_dirs:
|
|
videos += get_all_videos(d)
|
|
|
|
# get all data
|
|
data = []
|
|
for d in data_dirs:
|
|
data += get_all_data(d)
|
|
|
|
# match the data to the videos
|
|
parsed_videos, unmatched = match_data_to_video_fast(videos, data)
|
|
|
|
return parsed_videos, unmatched
|
|
|
|
def calculate_file_hash(file_path):
|
|
import hashlib
|
|
with open(file_path, 'rb') as f:
|
|
data = f.read()
|
|
return hashlib.sha256(data).hexdigest() |