You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
156 lines
4.2 KiB
Python
156 lines
4.2 KiB
Python
|
11 months ago
|
from datetime import datetime
|
||
|
|
import os, requests, config, json
|
||
|
|
from snapchat import get_data, get_stories, get_highlight_stories
|
||
|
|
|
||
|
|
"""
|
||
|
|
media_url_filename = url.split('/')[-1].split('?')[0]
|
||
|
|
etag = response.headers.get('ETag', '').replace('"', '')
|
||
|
|
filename = f"{username}~{timestamp}-{media_url_filename}~{etag}{extension}"
|
||
|
|
filepath = os.path.join(directory, 'highlights', filename)
|
||
|
|
"""
|
||
|
|
|
||
|
|
directory = "snapchat"
|
||
|
|
data_directory = "data"
|
||
|
|
|
||
|
|
def get_existing_snap_ids(directory):
|
||
|
|
existing_snap_ids = set()
|
||
|
11 months ago
|
for root, _, files in os.walk(directory):
|
||
|
|
for file in files:
|
||
|
|
if '~' not in file:
|
||
|
|
continue
|
||
|
|
|
||
|
|
filename, _ = os.path.splitext(file)
|
||
|
11 months ago
|
snap_id = filename.split('~')[2]
|
||
|
|
existing_snap_ids.add(snap_id)
|
||
|
|
return existing_snap_ids
|
||
|
|
|
||
|
|
def find_duplicate_snap(existing_snaps, snap_id, username):
|
||
|
|
for snap in existing_snaps:
|
||
|
|
if username == snap[2]:
|
||
|
|
if snap_id in snap[1]:
|
||
|
|
return snap
|
||
|
|
return False
|
||
|
|
|
||
|
|
def archive_data(data, username):
|
||
|
|
data_filename = f"{username}~{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json"
|
||
|
|
data_filepath = os.path.join(data_directory, data_filename)
|
||
|
|
with open(data_filepath, 'w') as f:
|
||
|
|
f.write(json.dumps(data))
|
||
|
|
print(f"Archived data for {username} at {data_filepath}")
|
||
|
|
|
||
|
|
def get_file_extension(url):
|
||
|
|
response = requests.head(url)
|
||
|
|
if response.status_code != 200:
|
||
|
|
print(f"Failed to access media {url}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
content_type = response.headers.get('Content-Type', '')
|
||
|
|
if 'image' in content_type:
|
||
|
|
return '.jpg'
|
||
|
|
elif 'video' in content_type:
|
||
|
|
return '.mp4'
|
||
|
|
else:
|
||
|
|
print(f"Unknown content type for media {url}")
|
||
|
|
return None
|
||
|
11 months ago
|
|
||
|
|
def extract_file_type(url):
|
||
|
|
file_types = {
|
||
|
|
'400': '.jpg',
|
||
|
|
'1322': '.mp4',
|
||
|
|
'1325': '.mp4',
|
||
|
|
'1034': '.mp4',
|
||
|
|
'1023': '.jpg'
|
||
|
|
}
|
||
|
|
|
||
|
|
base_url = url.split("?")[0] # Remove query string
|
||
|
|
|
||
|
|
snap_data = base_url.split('/')[-1]
|
||
|
|
|
||
|
|
# Extract the file type number
|
||
|
|
data_parts = snap_data.split('.')
|
||
|
|
if len(data_parts) > 1:
|
||
|
|
file_type_number = data_parts[1]
|
||
|
|
if file_type_number in file_types:
|
||
|
|
return file_types[file_type_number]
|
||
|
|
else:
|
||
|
|
print(f"Unexpected URL format: {base_url}")
|
||
|
|
return None
|
||
|
11 months ago
|
|
||
|
|
def download_media(url, filepath):
|
||
|
|
if os.path.exists(filepath):
|
||
|
|
print(f"File {filepath} already exists. Skipping download.")
|
||
|
|
return filepath
|
||
|
|
|
||
|
|
response = requests.get(url)
|
||
|
|
if response.status_code != 200:
|
||
|
|
print(f"Failed to download media {url}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
with open(filepath, 'wb') as f:
|
||
|
|
f.write(response.content)
|
||
|
|
return filepath
|
||
|
11 months ago
|
|
||
|
|
def main():
|
||
|
|
if not os.path.exists(directory):
|
||
|
|
os.makedirs(directory)
|
||
|
|
|
||
|
11 months ago
|
db, cursor = config.gen_connection()
|
||
|
|
|
||
|
|
cursor.execute("SELECT username FROM following WHERE platform = 'snapchat'")
|
||
|
|
usernames = [row[0] for row in cursor.fetchall()]
|
||
|
|
|
||
|
|
cursor.execute("SELECT id, filename, username FROM media WHERE filename IS NOT NULL AND platform = 'snapchat'")
|
||
|
|
existing_medias = cursor.fetchall()
|
||
|
|
|
||
|
|
existing_snap_ids = get_existing_snap_ids(directory)
|
||
|
11 months ago
|
|
||
|
|
for username in usernames:
|
||
|
|
print(f"Getting stories for {username}...")
|
||
|
|
data = get_data(username)
|
||
|
|
if not data:
|
||
|
|
continue
|
||
|
|
|
||
|
11 months ago
|
archive_data(data, username)
|
||
|
|
|
||
|
11 months ago
|
print("Getting stories...")
|
||
|
|
stories = get_stories(data)
|
||
|
|
|
||
|
|
print("Getting highlights...")
|
||
|
|
stories.extend(get_highlight_stories(data))
|
||
|
|
|
||
|
|
for story in stories:
|
||
|
11 months ago
|
snap_id = story['snap_id']
|
||
|
11 months ago
|
url = story['url']
|
||
|
|
timestamp = story['timestamp']
|
||
|
11 months ago
|
|
||
|
|
duplicate_snap = find_duplicate_snap(existing_medias, snap_id, username)
|
||
|
|
if duplicate_snap:
|
||
|
|
print(f"Media {snap_id} already exists. Skipping download.")
|
||
|
11 months ago
|
continue
|
||
|
11 months ago
|
|
||
|
|
# Check if media already exists
|
||
|
|
if snap_id in existing_snap_ids:
|
||
|
|
print(f"Media {snap_id} already exists. Skipping download.")
|
||
|
11 months ago
|
continue
|
||
|
|
|
||
|
11 months ago
|
# Determine file extension using HEAD request.
|
||
|
|
# TODO: find a better way to determine file extension without downloading the file.
|
||
|
11 months ago
|
extension = extract_file_type(url)
|
||
|
11 months ago
|
if not extension:
|
||
|
11 months ago
|
continue
|
||
|
11 months ago
|
|
||
|
11 months ago
|
filename = f"{username}~{timestamp}~{snap_id}{extension}"
|
||
|
|
filepath = os.path.join(directory, filename)
|
||
|
|
|
||
|
11 months ago
|
# Check if file already exists
|
||
|
|
if os.path.exists(filepath):
|
||
|
|
print(f"File {filename} already exists. Skipping download.")
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Download the media
|
||
|
11 months ago
|
filepath = download_media(url, filepath)
|
||
|
|
|
||
|
11 months ago
|
print(f"Downloaded {filename} at {timestamp}")
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|