You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
138 lines
4.2 KiB
Python
138 lines
4.2 KiB
Python
from archiveConfig import get_local_db_connection
|
|
from psycopg2.extras import execute_values
|
|
from datetime import datetime
|
|
import uuid, shutil, json, os
|
|
from tqdm import tqdm
|
|
|
|
DATA_DIR = 'data'
|
|
DOWNLOAD_DIR = 'downloaded'
|
|
|
|
conn, cursor = get_local_db_connection()
|
|
|
|
def is_valid_uuid(val: str, version=None) -> bool:
|
|
try:
|
|
u = uuid.UUID(val, version=version) if version else uuid.UUID(val)
|
|
return str(u) == val.lower() # Match exact input (handles casing)
|
|
except (ValueError, AttributeError, TypeError):
|
|
return False
|
|
|
|
def parse_json_file(filepath):
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
if "createdAt" in data:
|
|
date = data.get("createdAt")
|
|
elif "date" in data:
|
|
date = data.get("date")
|
|
|
|
if date:
|
|
created_at = datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
|
|
else:
|
|
created_at = None
|
|
print(f"⚠️ No createdAt or date found in {filepath}")
|
|
|
|
if "updatedAt" in data:
|
|
updated_at = datetime.strptime(data.get("updatedAt"), "%Y-%m-%d %H:%M:%S")
|
|
else:
|
|
updated_at = created_at
|
|
|
|
video_id = os.path.splitext(os.path.basename(filepath))[0]
|
|
if not is_valid_uuid(video_id):
|
|
print(f"⚠️ Invalid video_id: {video_id}")
|
|
return
|
|
|
|
parsed_data = {
|
|
'video_id': video_id,
|
|
'username': data.get("username"),
|
|
'site': data.get("site"),
|
|
'gender': data.get("gender"),
|
|
'size': data.get("size") if data.get("size") else 0,
|
|
'duration': data.get("duration") if data.get("duration") else 0,
|
|
'filepath': data.get("filepath"),
|
|
'jsonpath': data.get("jsonpath"),
|
|
'hash': None, # You can add hash calculation here if needed
|
|
'created_at': created_at,
|
|
'updated_at': updated_at
|
|
}
|
|
|
|
return parsed_data
|
|
|
|
def insert_data(all_data):
|
|
query = """
|
|
INSERT INTO videos (
|
|
video_id, username, site, gender, size, duration,
|
|
filepath, hash, created_at, updated_at
|
|
)
|
|
VALUES %s
|
|
ON CONFLICT (video_id) DO NOTHING;
|
|
"""
|
|
values = [
|
|
(
|
|
d['video_id'], d['username'], d['site'], d['gender'],
|
|
d['size'], d['duration'], d['filepath'],
|
|
d['hash'], d['created_at'], d['updated_at']
|
|
)
|
|
for d in all_data
|
|
]
|
|
execute_values(cursor, query, values)
|
|
conn.commit()
|
|
print(f"✅ Inserted {cursor.rowcount} new records.")
|
|
|
|
def get_files(dir):
|
|
files = []
|
|
for root, _, filenames in os.walk(dir):
|
|
for filename in filenames:
|
|
if filename.endswith('.json'):
|
|
files.append(os.path.join(root, filename))
|
|
return files
|
|
|
|
def main():
|
|
all_records = []
|
|
|
|
data_files = [f for f in get_files(DOWNLOAD_DIR) if f.endswith('.json')]
|
|
|
|
with tqdm(data_files, desc="Processing files", unit="file") as t:
|
|
for filepath in data_files:
|
|
t.update(1)
|
|
try:
|
|
record = parse_json_file(filepath)
|
|
all_records.append(record)
|
|
except Exception as e:
|
|
print(f"❌ Failed to process {filepath}: {e}")
|
|
|
|
if all_records:
|
|
insert_data(all_records)
|
|
else:
|
|
print("⚠️ No new records to insert.")
|
|
|
|
def check_and_move():
|
|
db_ids = get_video_ids_from_db()
|
|
moved = 0
|
|
|
|
for path in get_json_files(DOWNLOAD_DIR):
|
|
video_id = os.path.splitext(os.path.basename(path))[0]
|
|
if video_id in db_ids:
|
|
output_path = os.path.join(DATA_DIR, os.path.basename(path))
|
|
if os.path.exists(output_path):
|
|
print(f"⚠️ Skipping {path} because it already exists in {DOWNLOAD_DIR}/")
|
|
continue
|
|
shutil.move(path, output_path)
|
|
moved += 1
|
|
|
|
print(f"✅ Moved {moved} files to {DOWNLOAD_DIR}/")
|
|
|
|
# Get all existing video IDs
|
|
def get_video_ids_from_db():
|
|
cursor.execute("SELECT video_id FROM videos;")
|
|
return {row['video_id'] for row in cursor.fetchall()}
|
|
|
|
# Iterate files
|
|
def get_json_files(dir):
|
|
for root, _, files in os.walk(dir):
|
|
for file in files:
|
|
if file.endswith('.json'):
|
|
yield os.path.join(root, file)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
check_and_move() |