from datetime import datetime import os, requests, config, json from snapchat import get_data, get_stories, get_highlight_stories """ media_url_filename = url.split('/')[-1].split('?')[0] etag = response.headers.get('ETag', '').replace('"', '') filename = f"{username}~{timestamp}-{media_url_filename}~{etag}{extension}" filepath = os.path.join(directory, 'highlights', filename) """ directory = "snapchat" data_directory = "data" def get_existing_snap_ids(directory): existing_snap_ids = set() for root, _, files in os.walk(directory): for file in files: if '~' not in file: continue filename, _ = os.path.splitext(file) snap_id = filename.split('~')[2] existing_snap_ids.add(snap_id) return existing_snap_ids def find_duplicate_snap(existing_snaps, snap_id, username): for snap in existing_snaps: if username == snap[2]: if snap_id in snap[1]: return snap return False def archive_data(data, username): data_filename = f"{username}~{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json" data_filepath = os.path.join(data_directory, data_filename) with open(data_filepath, 'w') as f: f.write(json.dumps(data)) print(f"Archived data for {username} at {data_filepath}") def get_file_extension(url): response = requests.head(url) if response.status_code != 200: print(f"Failed to access media {url}") return None content_type = response.headers.get('Content-Type', '') if 'image' in content_type: return '.jpg' elif 'video' in content_type: return '.mp4' else: print(f"Unknown content type for media {url}") return None def extract_file_type(url): file_types = { '400': '.jpg', '1322': '.mp4', '1325': '.mp4', '1034': '.mp4', '1023': '.jpg' } base_url = url.split("?")[0] # Remove query string snap_data = base_url.split('/')[-1] # Extract the file type number data_parts = snap_data.split('.') if len(data_parts) > 1: file_type_number = data_parts[1] if file_type_number in file_types: return file_types[file_type_number] else: print(f"Unexpected URL format: {base_url}") return None def download_media(url, filepath): if os.path.exists(filepath): print(f"File {filepath} already exists. Skipping download.") return filepath response = requests.get(url) if response.status_code != 200: print(f"Failed to download media {url}") return None with open(filepath, 'wb') as f: f.write(response.content) return filepath def main(): if not os.path.exists(directory): os.makedirs(directory) db, cursor = config.gen_connection() cursor.execute("SELECT username FROM following WHERE platform = 'snapchat'") usernames = [row[0] for row in cursor.fetchall()] cursor.execute("SELECT id, filename, username FROM media WHERE filename IS NOT NULL AND platform = 'snapchat'") existing_medias = cursor.fetchall() existing_snap_ids = get_existing_snap_ids(directory) for username in usernames: print(f"Getting stories for {username}...") data = get_data(username) if not data: continue archive_data(data, username) print("Getting stories...") stories = get_stories(data) print("Getting highlights...") stories.extend(get_highlight_stories(data)) for story in stories: snap_id = story['snap_id'] url = story['url'] timestamp = story['timestamp'] duplicate_snap = find_duplicate_snap(existing_medias, snap_id, username) if duplicate_snap: print(f"Media {snap_id} already exists. Skipping download.") continue # Check if media already exists if snap_id in existing_snap_ids: print(f"Media {snap_id} already exists. Skipping download.") continue # Determine file extension using HEAD request. # TODO: find a better way to determine file extension without downloading the file. extension = extract_file_type(url) if not extension: continue filename = f"{username}~{timestamp}~{snap_id}{extension}" filepath = os.path.join(directory, filename) # Check if file already exists if os.path.exists(filepath): print(f"File {filename} already exists. Skipping download.") continue # Download the media filepath = download_media(url, filepath) print(f"Downloaded {filename} at {timestamp}") if __name__ == "__main__": main()