From 4cd3983055cd3522198cb2a242f541a55126c799 Mon Sep 17 00:00:00 2001 From: oscar Date: Sat, 30 Nov 2024 02:05:29 +0200 Subject: [PATCH] updated snappy master --- snappy_master.py | 144 +++++++++++++++++++------- snappy_master_v1.py | 239 -------------------------------------------- 2 files changed, 109 insertions(+), 274 deletions(-) delete mode 100644 snappy_master_v1.py diff --git a/snappy_master.py b/snappy_master.py index 2bd2969..81cea1e 100644 --- a/snappy_master.py +++ b/snappy_master.py @@ -1,24 +1,15 @@ from uuid import uuid4 from datetime import datetime -import os, requests, config, json, funcs, cv2 +import os, requests, config, json, funcs, cv2, re from snapchat import get_stories, get_highlight_stories, get_all_users_data directory = "snapchat" data_directory = "data" -def get_existing_snap_ids(directory): - existing_snap_ids = set() - for root, _, files in os.walk(directory): - for file in files: - if '~' not in file: - continue - - filename, _ = os.path.splitext(file) - snap_id = filename.split('~')[2] - existing_snap_ids.add(snap_id) - return existing_snap_ids - def find_duplicate_snap(existing_snaps, snap_id, username): + """ + Find a snap in the existing_snaps list on database.s + """ for snap in existing_snaps: if username == snap[2]: if snap_id in snap[1]: @@ -47,6 +38,30 @@ def get_file_extension(url): print(f"Unknown content type for media {url}") return None +def extract_file_type(url): + file_types = { + '400': '.jpg', + '1322': '.mp4', + '1325': '.mp4', + '1034': '.mp4', + '1023': '.jpg' + } + + base_url = url.split("?")[0] # Remove query string + + snap_data = base_url.split('/')[-1] + + # Extract the file type number + data_parts = snap_data.split('.') + if len(data_parts) > 1: + file_type_number = data_parts[1] + if file_type_number in file_types: + return file_types[file_type_number] + else: + print(f"Unexpected URL format: {base_url}") + return None + + def download_media(url, filepath): if os.path.exists(filepath): print(f"File {filepath} already exists. Skipping download.") @@ -61,7 +76,56 @@ def download_media(url, filepath): f.write(response.content) return filepath -def main(): +def get_all_stories(usernames): + snapchat_users_data = get_all_users_data(usernames) + + all_stories = [] + for username in usernames: + print(f"Getting stories for {username}...") + data = snapchat_users_data.get(username) + if not data: + print(f"Failed to get data for {username}. Skipping.") + continue + + archive_data(data, username) + + print("Getting stories...") + stories = get_stories(data) + + print("Getting highlights...") + stories.extend(get_highlight_stories(data)) + + for story in stories: + snap_id = story['snap_id'] + url = story['url'] + timestamp = story['timestamp'] + + # Determine file extension using HEAD request. + extension = extract_file_type(url) + if not extension: + print(f"Failed to determine file extension for {url}. Skipping.") + continue + + filename = f"{username}~{timestamp}~{snap_id}{extension}" + filepath = os.path.join(directory, filename) + + media = { + 'username': username, + 'timestamp': timestamp, + 'filepath': filepath, + 'snap_id': snap_id, + 'original_snap_id': story['original_snap_id'], + 'media_url': url, + } + + all_stories.append(media) + print(f"Media {snap_id} ready for download.") + + all_stories.extend(stories) + + return all_stories + +def get_snapchat_stories(): os.makedirs(directory, exist_ok=True) os.makedirs(data_directory, exist_ok=True) @@ -70,9 +134,7 @@ def main(): cursor.execute("SELECT id, filename, username FROM media WHERE filename IS NOT NULL AND platform = 'snapchat' ORDER BY id DESC") existing_medias = cursor.fetchall() - - existing_snap_ids = get_existing_snap_ids(directory) - + snapchat_users_data = get_all_users_data(usernames) ready_stories = [] @@ -84,7 +146,7 @@ def main(): if not data: print(f"Failed to get data for {username}. Skipping.") continue - + archive_data(data, username) print("Getting stories...") @@ -103,24 +165,15 @@ def main(): print(f"Media {snap_id} already exists. Skipping download.") continue - # Check if media already exists - if snap_id in existing_snap_ids: - print(f"Media {snap_id} already exists. Skipping download.") - continue - # Determine file extension using HEAD request. - extension = get_file_extension(url) + extension = extract_file_type(url) if not extension: + print(f"Failed to determine file extension for {url}. Skipping.") continue filename = f"{username}~{timestamp}~{snap_id}{extension}" filepath = os.path.join(directory, filename) - # Check if file already exists - if os.path.exists(filepath): - print(f"File {filename} already exists. Skipping download.") - continue - media = { 'username': username, 'timestamp': timestamp, @@ -132,19 +185,40 @@ def main(): ready_stories.append(media) print(f"Media {snap_id} ready for download.") - + + # sort ready_stories by timestamp from oldest to newest + ready_stories.sort(key=lambda x: x['timestamp']) + + return ready_stories - for media in ready_stories: +def download_stories(stories): + for story in stories: # Download the media + filepath = story['filepath'] + url = story['media_url'] if 'media_url' in story else None + filename = os.path.basename(filepath) + timestamp = story['timestamp'] + filepath = download_media(url, filepath) print(f"Downloaded {filename} at {timestamp}") if not filepath: continue - media['filepath'] = filepath + story['filepath'] = filepath + + UploadMedia(story) - UploadMedia(media) +def main(): + ready_stories = get_snapchat_stories() + + stories_from_files = funcs.get_files(directory) + stories_from_files = [get_media_data(filepath) for filepath in stories_from_files] + stories_from_files = [story for story in stories_from_files if story] + + ready_stories.extend(stories_from_files) + + download_stories(ready_stories) def UploadMedia(media): username = media['username'] @@ -160,7 +234,7 @@ def UploadMedia(media): file_hash = funcs.calculate_file_hash(filepath) - post_date = datetime.fromtimestamp(int(timestamp)) if timestamp else datetime.now() + post_date = datetime.fromtimestamp(int(timestamp)) width, height = funcs.get_media_dimensions(filepath) @@ -214,7 +288,7 @@ def get_media_data(filepath): snap_id = parts[2] snap_id = os.path.splitext(snap_id)[0] - data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'media_id': snap_id} + data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'snap_id': snap_id, 'original_snap_id': None} return data diff --git a/snappy_master_v1.py b/snappy_master_v1.py deleted file mode 100644 index 2bd2969..0000000 --- a/snappy_master_v1.py +++ /dev/null @@ -1,239 +0,0 @@ -from uuid import uuid4 -from datetime import datetime -import os, requests, config, json, funcs, cv2 -from snapchat import get_stories, get_highlight_stories, get_all_users_data - -directory = "snapchat" -data_directory = "data" - -def get_existing_snap_ids(directory): - existing_snap_ids = set() - for root, _, files in os.walk(directory): - for file in files: - if '~' not in file: - continue - - filename, _ = os.path.splitext(file) - snap_id = filename.split('~')[2] - existing_snap_ids.add(snap_id) - return existing_snap_ids - -def find_duplicate_snap(existing_snaps, snap_id, username): - for snap in existing_snaps: - if username == snap[2]: - if snap_id in snap[1]: - return snap - return False - -def archive_data(data, username): - data_filename = f"{username}~{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json" - data_filepath = os.path.join(data_directory, data_filename) - with open(data_filepath, 'w') as f: - f.write(json.dumps(data)) - print(f"Archived data for {username} at {data_filepath}") - -def get_file_extension(url): - response = requests.head(url) - if response.status_code != 200: - print(f"Failed to access media {url}") - return None - - content_type = response.headers.get('Content-Type', '') - if 'image' in content_type: - return '.jpg' - elif 'video' in content_type: - return '.mp4' - else: - print(f"Unknown content type for media {url}") - return None - -def download_media(url, filepath): - if os.path.exists(filepath): - print(f"File {filepath} already exists. Skipping download.") - return filepath - - response = requests.get(url) - if response.status_code != 200: - print(f"Failed to download media {url}") - return None - - with open(filepath, 'wb') as f: - f.write(response.content) - return filepath - -def main(): - os.makedirs(directory, exist_ok=True) - os.makedirs(data_directory, exist_ok=True) - - cursor.execute("SELECT username FROM following WHERE platform = 'snapchat' ORDER BY id DESC") - usernames = [row[0] for row in cursor.fetchall()] - - cursor.execute("SELECT id, filename, username FROM media WHERE filename IS NOT NULL AND platform = 'snapchat' ORDER BY id DESC") - existing_medias = cursor.fetchall() - - existing_snap_ids = get_existing_snap_ids(directory) - - snapchat_users_data = get_all_users_data(usernames) - - ready_stories = [] - - for username in usernames: - print(f"Getting stories for {username}...") - - data = snapchat_users_data.get(username) - if not data: - print(f"Failed to get data for {username}. Skipping.") - continue - - archive_data(data, username) - - print("Getting stories...") - stories = get_stories(data) - - print("Getting highlights...") - stories.extend(get_highlight_stories(data)) - - for story in stories: - snap_id = story['snap_id'] - url = story['url'] - timestamp = story['timestamp'] - - duplicate_snap = find_duplicate_snap(existing_medias, snap_id, username) - if duplicate_snap: - print(f"Media {snap_id} already exists. Skipping download.") - continue - - # Check if media already exists - if snap_id in existing_snap_ids: - print(f"Media {snap_id} already exists. Skipping download.") - continue - - # Determine file extension using HEAD request. - extension = get_file_extension(url) - if not extension: - continue - - filename = f"{username}~{timestamp}~{snap_id}{extension}" - filepath = os.path.join(directory, filename) - - # Check if file already exists - if os.path.exists(filepath): - print(f"File {filename} already exists. Skipping download.") - continue - - media = { - 'username': username, - 'timestamp': timestamp, - 'filepath': filepath, - 'snap_id': snap_id, - 'original_snap_id': story['original_snap_id'], - 'media_url': url, - } - - ready_stories.append(media) - print(f"Media {snap_id} ready for download.") - - - for media in ready_stories: - # Download the media - filepath = download_media(url, filepath) - print(f"Downloaded {filename} at {timestamp}") - - if not filepath: - continue - - media['filepath'] = filepath - - UploadMedia(media) - -def UploadMedia(media): - username = media['username'] - timestamp = media['timestamp'] - filepath = media['filepath'] - filename = os.path.basename(filepath) - snap_id = media['snap_id'] - original_snap_id = media['original_snap_id'] - thumbnail_url = None - phash = None - - media_type = funcs.get_media_type(filename) - - file_hash = funcs.calculate_file_hash(filepath) - - post_date = datetime.fromtimestamp(int(timestamp)) if timestamp else datetime.now() - - width, height = funcs.get_media_dimensions(filepath) - - duration = funcs.get_video_duration(filepath) - - if media_type == 'image': - phash = funcs.generate_phash(filepath) - elif media_type == 'video': - try: - thumb_path = generate_thumbnail(filepath) - obj_storage.PutFile(thumb_path, f'thumbnails/{filename}') - thumbnail_url = f"https://storysave.b-cdn.net/thumbnails/{filename}" - phash = funcs.generate_phash(thumb_path) - os.remove(thumb_path) - except: - print('Error generating thumbnail. Skipping...') - return False - - server_path = f'media/snaps/{username}/{filename}' - file_url = f"https://storysave.b-cdn.net/{server_path}" - - obj_storage.PutFile(filepath, server_path) - - query = "INSERT IGNORE INTO media (username, media_type, media_url, width, height, post_type, date, hash, filename, duration, thumbnail, phash, platform, snap_id, original_snap_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" - values = (username, media_type, file_url, width, height, 'story', post_date, file_hash, filename, duration, thumbnail_url, phash, 'snapchat', snap_id, original_snap_id) - - cursor.execute(query, values) - db.commit() - print(f'[{cursor.rowcount}] records updated. File {filename} uploaded to {file_url}') - - os.remove(filepath) - - return True - -def generate_thumbnail(filepath): - thumb_path = f'temp/{uuid4()}.jpg' - cap = cv2.VideoCapture(filepath) - ret, frame = cap.read() - cv2.imwrite(thumb_path, frame) - cap.release() - return thumb_path - -def get_media_data(filepath): - filename = os.path.basename(filepath) - parts = filename.split('~') - if len(parts) < 3: - return False - - username = parts[0] - timestamp = parts[1] - snap_id = parts[2] - snap_id = os.path.splitext(snap_id)[0] - - data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'media_id': snap_id} - - return data - -def process_snap_ids(filenames): - snap_ids = [] - for filename in filenames: - snap_id = filename.split('~')[2] - snap_id = os.path.splitext(snap_id)[0] - if snap_id not in snap_ids: - snap_ids.append(snap_id) - - return snap_ids - -if __name__ == '__main__': - print('Starting snappy...') - - db, cursor = config.gen_connection() - obj_storage = config.get_storage() - - main() - - print("Processing completed.") \ No newline at end of file