You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

270 lines
7.3 KiB
Python

11 months ago
from snapchat import get_stories, get_highlight_stories, get_all_users_data
11 months ago
from datetime import datetime
from uuid import uuid4
import requests
import config
import funcs
import json
import cv2
import os
11 months ago
directory = "snapchat"
data_directory = "data"
def find_duplicate_snap(existing_snaps, snap_id, username):
"""
Find a snap in the existing_snaps list on database.s
"""
11 months ago
for snap in existing_snaps:
if username == snap[2]:
if snap_id in snap[1]:
return snap
return False
def archive_data(data, username):
data_filename = f"{username}~{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json"
data_filepath = os.path.join(data_directory, data_filename)
with open(data_filepath, 'w') as f:
f.write(json.dumps(data))
def get_file_extension(url):
response = requests.head(url)
if response.status_code != 200:
print(f"Failed to access media {url}")
return None
content_type = response.headers.get('Content-Type', '')
if 'image' in content_type:
return '.jpg'
elif 'video' in content_type:
return '.mp4'
else:
print(f"Unknown content type for media {url}")
return None
def extract_file_type(url):
file_types = {
'400': '.jpg',
'1322': '.mp4',
'1325': '.mp4',
'1034': '.mp4',
'1023': '.jpg'
}
base_url = url.split("?")[0] # Remove query string
snap_data = base_url.split('/')[-1]
# Extract the file type number
data_parts = snap_data.split('.')
if len(data_parts) > 1:
file_type_number = data_parts[1]
if file_type_number in file_types:
return file_types[file_type_number]
else:
print(f"Unexpected URL format: {base_url}")
return None
11 months ago
def download_media(url, filepath):
if os.path.exists(filepath):
11 months ago
# File already exists, skip download and return the filepath as if it was downloaded.
11 months ago
return filepath
response = requests.get(url)
if response.status_code != 200:
print(f"Failed to download media {url}")
return None
with open(filepath, 'wb') as f:
f.write(response.content)
return filepath
def get_snapchat_stories():
11 months ago
os.makedirs(directory, exist_ok=True)
os.makedirs(data_directory, exist_ok=True)
cursor.execute("SELECT username FROM following WHERE platform = 'snapchat' ORDER BY id DESC")
usernames = [row[0] for row in cursor.fetchall()]
cursor.execute("SELECT id, filename, username FROM media WHERE filename IS NOT NULL AND platform = 'snapchat' ORDER BY id DESC")
existing_medias = cursor.fetchall()
11 months ago
snapchat_users_data = get_all_users_data(usernames)
ready_stories = []
for username in usernames:
print(f"Getting stories for {username}...")
data = snapchat_users_data.get(username)
if not data:
print(f"Failed to get data for {username}. Skipping.")
continue
11 months ago
archive_data(data, username)
stories = get_stories(data)
stories.extend(get_highlight_stories(data))
for story in stories:
snap_id = story['snap_id']
url = story['url']
timestamp = story['timestamp']
duplicate_snap = find_duplicate_snap(existing_medias, snap_id, username)
if duplicate_snap:
11 months ago
# Snap already exists in the database
11 months ago
continue
# Determine file extension using HEAD request.
extension = extract_file_type(url)
11 months ago
if not extension:
print(f"Failed to determine file extension for {url}. Skipping.")
11 months ago
continue
filename = f"{username}~{timestamp}~{snap_id}{extension}"
filepath = os.path.join(directory, filename)
media = {
'username': username,
'timestamp': timestamp,
'filepath': filepath,
'snap_id': snap_id,
'original_snap_id': story['original_snap_id'],
'media_url': url,
}
ready_stories.append(media)
print(f"Media {snap_id} ready for download.")
# sort ready_stories by timestamp from oldest to newest
ready_stories.sort(key=lambda x: x['timestamp'])
return ready_stories
11 months ago
11 months ago
def get_snapchat_files():
stories = funcs.get_files(directory)
stories = [get_media_data(filepath) for filepath in stories]
stories = [story for story in stories if story]
return stories
def main():
ready_stories = get_snapchat_stories()
stories_from_files = get_snapchat_files()
ready_stories.extend(stories_from_files)
download_stories(ready_stories)
def download_stories(stories):
for story in stories:
11 months ago
# Download the media
filepath = story['filepath']
11 months ago
url = story['media_url']
filename = os.path.basename(filepath)
timestamp = story['timestamp']
11 months ago
filepath = download_media(url, filepath)
print(f"Downloaded {filename} at {timestamp}")
if not filepath:
continue
story['filepath'] = filepath
UploadMedia(story)
11 months ago
def UploadMedia(media):
username = media['username']
timestamp = media['timestamp']
filepath = media['filepath']
filename = os.path.basename(filepath)
snap_id = media['snap_id']
original_snap_id = media['original_snap_id']
thumbnail_url = None
phash = None
media_type = funcs.get_media_type(filename)
file_hash = funcs.calculate_file_hash(filepath)
post_date = datetime.fromtimestamp(int(timestamp))
11 months ago
width, height = funcs.get_media_dimensions(filepath)
duration = funcs.get_video_duration(filepath)
if media_type == 'image':
phash = funcs.generate_phash(filepath)
elif media_type == 'video':
try:
thumb_path = generate_thumbnail(filepath)
obj_storage.PutFile(thumb_path, f'thumbnails/{filename}')
thumbnail_url = f"https://storysave.b-cdn.net/thumbnails/{filename}"
phash = funcs.generate_phash(thumb_path)
os.remove(thumb_path)
except:
print('Error generating thumbnail. Skipping...')
return False
server_path = f'media/snaps/{username}/{filename}'
file_url = f"https://storysave.b-cdn.net/{server_path}"
obj_storage.PutFile(filepath, server_path)
query = "INSERT IGNORE INTO media (username, media_type, media_url, width, height, post_type, date, hash, filename, duration, thumbnail, phash, platform, snap_id, original_snap_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
values = (username, media_type, file_url, width, height, 'story', post_date, file_hash, filename, duration, thumbnail_url, phash, 'snapchat', snap_id, original_snap_id)
cursor.execute(query, values)
db.commit()
print(f'[{cursor.rowcount}] records updated. File {filename} uploaded to {file_url}')
os.remove(filepath)
return True
def generate_thumbnail(filepath):
thumb_path = f'temp/{uuid4()}.jpg'
cap = cv2.VideoCapture(filepath)
ret, frame = cap.read()
cv2.imwrite(thumb_path, frame)
cap.release()
return thumb_path
def get_media_data(filepath):
filename = os.path.basename(filepath)
parts = filename.split('~')
if len(parts) < 3:
return False
username = parts[0]
timestamp = parts[1]
snap_id = parts[2]
snap_id = os.path.splitext(snap_id)[0]
11 months ago
data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'snap_id': snap_id, 'original_snap_id': None, 'media_url': None}
# data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'snap_id': None, 'original_snap_id': snap_id, 'media_url': None}
11 months ago
return data
def process_snap_ids(filenames):
snap_ids = []
for filename in filenames:
snap_id = filename.split('~')[2]
snap_id = os.path.splitext(snap_id)[0]
if snap_id not in snap_ids:
snap_ids.append(snap_id)
return snap_ids
if __name__ == '__main__':
print('Starting snappy...')
db, cursor = config.gen_connection()
obj_storage = config.get_storage()
main()
print("Processing completed.")