You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
4.2 KiB
Python

from archiveConfig import get_local_db_connection
from psycopg2.extras import execute_values
from datetime import datetime
import uuid, shutil, json, os
from tqdm import tqdm
DATA_DIR = 'data'
DOWNLOAD_DIR = 'downloaded'
conn, cursor = get_local_db_connection()
def is_valid_uuid(val: str, version=None) -> bool:
try:
u = uuid.UUID(val, version=version) if version else uuid.UUID(val)
return str(u) == val.lower() # Match exact input (handles casing)
except (ValueError, AttributeError, TypeError):
return False
def parse_json_file(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
if "createdAt" in data:
date = data.get("createdAt")
elif "date" in data:
date = data.get("date")
if date:
created_at = datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
else:
created_at = None
print(f"⚠️ No createdAt or date found in {filepath}")
if "updatedAt" in data:
updated_at = datetime.strptime(data.get("updatedAt"), "%Y-%m-%d %H:%M:%S")
else:
updated_at = created_at
video_id = os.path.splitext(os.path.basename(filepath))[0]
if not is_valid_uuid(video_id):
print(f"⚠️ Invalid video_id: {video_id}")
return
parsed_data = {
'video_id': video_id,
'username': data.get("username"),
'site': data.get("site"),
'gender': data.get("gender"),
'size': data.get("size") if data.get("size") else 0,
'duration': data.get("duration") if data.get("duration") else 0,
'filepath': data.get("filepath"),
'jsonpath': data.get("jsonpath"),
'hash': None, # You can add hash calculation here if needed
'created_at': created_at,
'updated_at': updated_at
}
return parsed_data
def insert_data(all_data):
query = """
INSERT INTO videos (
video_id, username, site, gender, size, duration,
filepath, hash, created_at, updated_at
)
VALUES %s
ON CONFLICT (video_id) DO NOTHING;
"""
values = [
(
d['video_id'], d['username'], d['site'], d['gender'],
d['size'], d['duration'], d['filepath'],
d['hash'], d['created_at'], d['updated_at']
)
for d in all_data
]
execute_values(cursor, query, values)
conn.commit()
print(f"✅ Inserted {cursor.rowcount} new records.")
def get_files(dir):
files = []
for root, _, filenames in os.walk(dir):
for filename in filenames:
if filename.endswith('.json'):
files.append(os.path.join(root, filename))
return files
def main():
all_records = []
data_files = [f for f in get_files(DOWNLOAD_DIR) if f.endswith('.json')]
with tqdm(data_files, desc="Processing files", unit="file") as t:
for filepath in data_files:
t.update(1)
try:
record = parse_json_file(filepath)
all_records.append(record)
except Exception as e:
print(f"❌ Failed to process {filepath}: {e}")
if all_records:
insert_data(all_records)
else:
print("⚠️ No new records to insert.")
def check_and_move():
db_ids = get_video_ids_from_db()
moved = 0
for path in get_json_files(DOWNLOAD_DIR):
video_id = os.path.splitext(os.path.basename(path))[0]
if video_id in db_ids:
output_path = os.path.join(DATA_DIR, os.path.basename(path))
if os.path.exists(output_path):
print(f"⚠️ Skipping {path} because it already exists in {DOWNLOAD_DIR}/")
continue
shutil.move(path, output_path)
moved += 1
print(f"✅ Moved {moved} files to {DOWNLOAD_DIR}/")
# Get all existing video IDs
def get_video_ids_from_db():
cursor.execute("SELECT video_id FROM videos;")
return {row['video_id'] for row in cursor.fetchall()}
# Iterate files
def get_json_files(dir):
for root, _, files in os.walk(dir):
for file in files:
if file.endswith('.json'):
yield os.path.join(root, file)
if __name__ == '__main__':
main()
check_and_move()