You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

313 lines
8.4 KiB
Python

11 months ago
from uuid import uuid4
from datetime import datetime
import os, requests, config, json, funcs, cv2, re
11 months ago
from snapchat import get_stories, get_highlight_stories, get_all_users_data
directory = "snapchat"
data_directory = "data"
def find_duplicate_snap(existing_snaps, snap_id, username):
"""
Find a snap in the existing_snaps list on database.s
"""
11 months ago
for snap in existing_snaps:
if username == snap[2]:
if snap_id in snap[1]:
return snap
return False
def archive_data(data, username):
data_filename = f"{username}~{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json"
data_filepath = os.path.join(data_directory, data_filename)
with open(data_filepath, 'w') as f:
f.write(json.dumps(data))
print(f"Archived data for {username} at {data_filepath}")
def get_file_extension(url):
response = requests.head(url)
if response.status_code != 200:
print(f"Failed to access media {url}")
return None
content_type = response.headers.get('Content-Type', '')
if 'image' in content_type:
return '.jpg'
elif 'video' in content_type:
return '.mp4'
else:
print(f"Unknown content type for media {url}")
return None
def extract_file_type(url):
file_types = {
'400': '.jpg',
'1322': '.mp4',
'1325': '.mp4',
'1034': '.mp4',
'1023': '.jpg'
}
base_url = url.split("?")[0] # Remove query string
snap_data = base_url.split('/')[-1]
# Extract the file type number
data_parts = snap_data.split('.')
if len(data_parts) > 1:
file_type_number = data_parts[1]
if file_type_number in file_types:
return file_types[file_type_number]
else:
print(f"Unexpected URL format: {base_url}")
return None
11 months ago
def download_media(url, filepath):
if os.path.exists(filepath):
print(f"File {filepath} already exists. Skipping download.")
return filepath
response = requests.get(url)
if response.status_code != 200:
print(f"Failed to download media {url}")
return None
with open(filepath, 'wb') as f:
f.write(response.content)
return filepath
def get_all_stories(usernames):
snapchat_users_data = get_all_users_data(usernames)
all_stories = []
for username in usernames:
print(f"Getting stories for {username}...")
data = snapchat_users_data.get(username)
if not data:
print(f"Failed to get data for {username}. Skipping.")
continue
archive_data(data, username)
print("Getting stories...")
stories = get_stories(data)
print("Getting highlights...")
stories.extend(get_highlight_stories(data))
for story in stories:
snap_id = story['snap_id']
url = story['url']
timestamp = story['timestamp']
# Determine file extension using HEAD request.
extension = extract_file_type(url)
if not extension:
print(f"Failed to determine file extension for {url}. Skipping.")
continue
filename = f"{username}~{timestamp}~{snap_id}{extension}"
filepath = os.path.join(directory, filename)
media = {
'username': username,
'timestamp': timestamp,
'filepath': filepath,
'snap_id': snap_id,
'original_snap_id': story['original_snap_id'],
'media_url': url,
}
all_stories.append(media)
print(f"Media {snap_id} ready for download.")
all_stories.extend(stories)
return all_stories
def get_snapchat_stories():
11 months ago
os.makedirs(directory, exist_ok=True)
os.makedirs(data_directory, exist_ok=True)
cursor.execute("SELECT username FROM following WHERE platform = 'snapchat' ORDER BY id DESC")
usernames = [row[0] for row in cursor.fetchall()]
cursor.execute("SELECT id, filename, username FROM media WHERE filename IS NOT NULL AND platform = 'snapchat' ORDER BY id DESC")
existing_medias = cursor.fetchall()
11 months ago
snapchat_users_data = get_all_users_data(usernames)
ready_stories = []
for username in usernames:
print(f"Getting stories for {username}...")
data = snapchat_users_data.get(username)
if not data:
print(f"Failed to get data for {username}. Skipping.")
continue
11 months ago
archive_data(data, username)
print("Getting stories...")
stories = get_stories(data)
print("Getting highlights...")
stories.extend(get_highlight_stories(data))
for story in stories:
snap_id = story['snap_id']
url = story['url']
timestamp = story['timestamp']
duplicate_snap = find_duplicate_snap(existing_medias, snap_id, username)
if duplicate_snap:
print(f"Media {snap_id} already exists. Skipping download.")
continue
# Determine file extension using HEAD request.
extension = extract_file_type(url)
11 months ago
if not extension:
print(f"Failed to determine file extension for {url}. Skipping.")
11 months ago
continue
filename = f"{username}~{timestamp}~{snap_id}{extension}"
filepath = os.path.join(directory, filename)
media = {
'username': username,
'timestamp': timestamp,
'filepath': filepath,
'snap_id': snap_id,
'original_snap_id': story['original_snap_id'],
'media_url': url,
}
ready_stories.append(media)
print(f"Media {snap_id} ready for download.")
# sort ready_stories by timestamp from oldest to newest
ready_stories.sort(key=lambda x: x['timestamp'])
return ready_stories
11 months ago
def download_stories(stories):
for story in stories:
11 months ago
# Download the media
filepath = story['filepath']
url = story['media_url'] if 'media_url' in story else None
filename = os.path.basename(filepath)
timestamp = story['timestamp']
11 months ago
filepath = download_media(url, filepath)
print(f"Downloaded {filename} at {timestamp}")
if not filepath:
continue
story['filepath'] = filepath
UploadMedia(story)
11 months ago
def main():
ready_stories = get_snapchat_stories()
stories_from_files = funcs.get_files(directory)
stories_from_files = [get_media_data(filepath) for filepath in stories_from_files]
stories_from_files = [story for story in stories_from_files if story]
ready_stories.extend(stories_from_files)
download_stories(ready_stories)
11 months ago
def UploadMedia(media):
username = media['username']
timestamp = media['timestamp']
filepath = media['filepath']
filename = os.path.basename(filepath)
snap_id = media['snap_id']
original_snap_id = media['original_snap_id']
thumbnail_url = None
phash = None
media_type = funcs.get_media_type(filename)
file_hash = funcs.calculate_file_hash(filepath)
post_date = datetime.fromtimestamp(int(timestamp))
11 months ago
width, height = funcs.get_media_dimensions(filepath)
duration = funcs.get_video_duration(filepath)
if media_type == 'image':
phash = funcs.generate_phash(filepath)
elif media_type == 'video':
try:
thumb_path = generate_thumbnail(filepath)
obj_storage.PutFile(thumb_path, f'thumbnails/{filename}')
thumbnail_url = f"https://storysave.b-cdn.net/thumbnails/{filename}"
phash = funcs.generate_phash(thumb_path)
os.remove(thumb_path)
except:
print('Error generating thumbnail. Skipping...')
return False
server_path = f'media/snaps/{username}/{filename}'
file_url = f"https://storysave.b-cdn.net/{server_path}"
obj_storage.PutFile(filepath, server_path)
query = "INSERT IGNORE INTO media (username, media_type, media_url, width, height, post_type, date, hash, filename, duration, thumbnail, phash, platform, snap_id, original_snap_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
values = (username, media_type, file_url, width, height, 'story', post_date, file_hash, filename, duration, thumbnail_url, phash, 'snapchat', snap_id, original_snap_id)
cursor.execute(query, values)
db.commit()
print(f'[{cursor.rowcount}] records updated. File {filename} uploaded to {file_url}')
os.remove(filepath)
return True
def generate_thumbnail(filepath):
thumb_path = f'temp/{uuid4()}.jpg'
cap = cv2.VideoCapture(filepath)
ret, frame = cap.read()
cv2.imwrite(thumb_path, frame)
cap.release()
return thumb_path
def get_media_data(filepath):
filename = os.path.basename(filepath)
parts = filename.split('~')
if len(parts) < 3:
return False
username = parts[0]
timestamp = parts[1]
snap_id = parts[2]
snap_id = os.path.splitext(snap_id)[0]
data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'snap_id': snap_id, 'original_snap_id': None}
11 months ago
return data
def process_snap_ids(filenames):
snap_ids = []
for filename in filenames:
snap_id = filename.split('~')[2]
snap_id = os.path.splitext(snap_id)[0]
if snap_id not in snap_ids:
snap_ids.append(snap_id)
return snap_ids
if __name__ == '__main__':
print('Starting snappy...')
db, cursor = config.gen_connection()
obj_storage = config.get_storage()
main()
print("Processing completed.")