From 345673a480f4dbe8b578232363b81ef4662949a8 Mon Sep 17 00:00:00 2001 From: oscar Date: Sat, 30 Nov 2024 02:05:17 +0200 Subject: [PATCH] old snappy master --- check_file_types.py | 79 ------------------------ snappy_master.py | 144 +++++++++++--------------------------------- 2 files changed, 35 insertions(+), 188 deletions(-) delete mode 100644 check_file_types.py diff --git a/check_file_types.py b/check_file_types.py deleted file mode 100644 index d6fb247..0000000 --- a/check_file_types.py +++ /dev/null @@ -1,79 +0,0 @@ -import os, requests, config -from snapchat import get_stories, get_highlight_stories, get_all_users_data - -def get_file_extension(url): - response = requests.head(url) - if response.status_code != 200: - print(f"Failed to access media {url}") - return None - - content_type = response.headers.get('Content-Type', '') - if 'image' in content_type: - return '.jpg' - elif 'video' in content_type: - return '.mp4' - else: - print(f"Unknown content type for media {url}") - return None - -import re -def extract_file_type(url): - # Use a regular expression to extract the file type number - match = re.search(r"/d/[^.]+\.([0-9]+)\.", url) - if match: - return match.group(1) # Return the number as a string - return None - -def map_file_type_to_extension(urls): - file_type_to_extension = {} - seen_file_types = set() - - for url in urls: - # Extract the file type number - file_type_number = extract_file_type(url) - if not file_type_number: - continue - - # Skip if we've already checked this file type - if file_type_number in seen_file_types: - continue - - # Use the get_file_extension function to determine the extension - file_extension = get_file_extension(url) - if file_extension: - file_type_to_extension[file_type_number] = file_extension - seen_file_types.add(file_type_number) - - return file_type_to_extension - -def main(): - cursor.execute("SELECT username FROM following WHERE platform = 'snapchat' ORDER BY id DESC") - usernames = [row[0] for row in cursor.fetchall()] - - snapchat_users_data = get_all_users_data(usernames) - - all_stories = [get_stories(data) + get_highlight_stories(data) for data in snapchat_users_data.values()] - - processed_stories = [] - for stories in all_stories: - processed_stories.extend(stories) - - all_urls = [story['url'] for story in processed_stories] - - # Map file type numbers to extensions - file_type_to_extension = map_file_type_to_extension(all_urls) - - # Print the mapping - print("File Type to Extension Mapping:") - for file_type, extension in file_type_to_extension.items(): - print(f"File Type {file_type}: {extension}") - -if __name__ == '__main__': - print('Starting snappy...') - - db, cursor = config.gen_connection() - obj_storage = config.get_storage() - - main() - - print("Processing completed.") \ No newline at end of file diff --git a/snappy_master.py b/snappy_master.py index 81cea1e..2bd2969 100644 --- a/snappy_master.py +++ b/snappy_master.py @@ -1,15 +1,24 @@ from uuid import uuid4 from datetime import datetime -import os, requests, config, json, funcs, cv2, re +import os, requests, config, json, funcs, cv2 from snapchat import get_stories, get_highlight_stories, get_all_users_data directory = "snapchat" data_directory = "data" +def get_existing_snap_ids(directory): + existing_snap_ids = set() + for root, _, files in os.walk(directory): + for file in files: + if '~' not in file: + continue + + filename, _ = os.path.splitext(file) + snap_id = filename.split('~')[2] + existing_snap_ids.add(snap_id) + return existing_snap_ids + def find_duplicate_snap(existing_snaps, snap_id, username): - """ - Find a snap in the existing_snaps list on database.s - """ for snap in existing_snaps: if username == snap[2]: if snap_id in snap[1]: @@ -38,30 +47,6 @@ def get_file_extension(url): print(f"Unknown content type for media {url}") return None -def extract_file_type(url): - file_types = { - '400': '.jpg', - '1322': '.mp4', - '1325': '.mp4', - '1034': '.mp4', - '1023': '.jpg' - } - - base_url = url.split("?")[0] # Remove query string - - snap_data = base_url.split('/')[-1] - - # Extract the file type number - data_parts = snap_data.split('.') - if len(data_parts) > 1: - file_type_number = data_parts[1] - if file_type_number in file_types: - return file_types[file_type_number] - else: - print(f"Unexpected URL format: {base_url}") - return None - - def download_media(url, filepath): if os.path.exists(filepath): print(f"File {filepath} already exists. Skipping download.") @@ -76,56 +61,7 @@ def download_media(url, filepath): f.write(response.content) return filepath -def get_all_stories(usernames): - snapchat_users_data = get_all_users_data(usernames) - - all_stories = [] - for username in usernames: - print(f"Getting stories for {username}...") - data = snapchat_users_data.get(username) - if not data: - print(f"Failed to get data for {username}. Skipping.") - continue - - archive_data(data, username) - - print("Getting stories...") - stories = get_stories(data) - - print("Getting highlights...") - stories.extend(get_highlight_stories(data)) - - for story in stories: - snap_id = story['snap_id'] - url = story['url'] - timestamp = story['timestamp'] - - # Determine file extension using HEAD request. - extension = extract_file_type(url) - if not extension: - print(f"Failed to determine file extension for {url}. Skipping.") - continue - - filename = f"{username}~{timestamp}~{snap_id}{extension}" - filepath = os.path.join(directory, filename) - - media = { - 'username': username, - 'timestamp': timestamp, - 'filepath': filepath, - 'snap_id': snap_id, - 'original_snap_id': story['original_snap_id'], - 'media_url': url, - } - - all_stories.append(media) - print(f"Media {snap_id} ready for download.") - - all_stories.extend(stories) - - return all_stories - -def get_snapchat_stories(): +def main(): os.makedirs(directory, exist_ok=True) os.makedirs(data_directory, exist_ok=True) @@ -134,7 +70,9 @@ def get_snapchat_stories(): cursor.execute("SELECT id, filename, username FROM media WHERE filename IS NOT NULL AND platform = 'snapchat' ORDER BY id DESC") existing_medias = cursor.fetchall() - + + existing_snap_ids = get_existing_snap_ids(directory) + snapchat_users_data = get_all_users_data(usernames) ready_stories = [] @@ -146,7 +84,7 @@ def get_snapchat_stories(): if not data: print(f"Failed to get data for {username}. Skipping.") continue - + archive_data(data, username) print("Getting stories...") @@ -165,15 +103,24 @@ def get_snapchat_stories(): print(f"Media {snap_id} already exists. Skipping download.") continue + # Check if media already exists + if snap_id in existing_snap_ids: + print(f"Media {snap_id} already exists. Skipping download.") + continue + # Determine file extension using HEAD request. - extension = extract_file_type(url) + extension = get_file_extension(url) if not extension: - print(f"Failed to determine file extension for {url}. Skipping.") continue filename = f"{username}~{timestamp}~{snap_id}{extension}" filepath = os.path.join(directory, filename) + # Check if file already exists + if os.path.exists(filepath): + print(f"File {filename} already exists. Skipping download.") + continue + media = { 'username': username, 'timestamp': timestamp, @@ -185,40 +132,19 @@ def get_snapchat_stories(): ready_stories.append(media) print(f"Media {snap_id} ready for download.") - - # sort ready_stories by timestamp from oldest to newest - ready_stories.sort(key=lambda x: x['timestamp']) - - return ready_stories + -def download_stories(stories): - for story in stories: + for media in ready_stories: # Download the media - filepath = story['filepath'] - url = story['media_url'] if 'media_url' in story else None - filename = os.path.basename(filepath) - timestamp = story['timestamp'] - filepath = download_media(url, filepath) print(f"Downloaded {filename} at {timestamp}") if not filepath: continue - story['filepath'] = filepath - - UploadMedia(story) + media['filepath'] = filepath -def main(): - ready_stories = get_snapchat_stories() - - stories_from_files = funcs.get_files(directory) - stories_from_files = [get_media_data(filepath) for filepath in stories_from_files] - stories_from_files = [story for story in stories_from_files if story] - - ready_stories.extend(stories_from_files) - - download_stories(ready_stories) + UploadMedia(media) def UploadMedia(media): username = media['username'] @@ -234,7 +160,7 @@ def UploadMedia(media): file_hash = funcs.calculate_file_hash(filepath) - post_date = datetime.fromtimestamp(int(timestamp)) + post_date = datetime.fromtimestamp(int(timestamp)) if timestamp else datetime.now() width, height = funcs.get_media_dimensions(filepath) @@ -288,7 +214,7 @@ def get_media_data(filepath): snap_id = parts[2] snap_id = os.path.splitext(snap_id)[0] - data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'snap_id': snap_id, 'original_snap_id': None} + data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'media_id': snap_id} return data