massive update
parent
55484ebf11
commit
a65cc43999
File diff suppressed because one or more lines are too long
@ -1 +0,0 @@
|
||||
gAAAAABmRUff7c9t9gngWj_2cwvaTBrUDJ_JUyYVUfG-p3SvDV7qOSHddJ4eHADiJeRtJNtY9UxkohSB5I1MmLahAb_hxxwIVA==
|
||||
@ -1,20 +1,41 @@
|
||||
from storysave_api import get_hd_profile_picture
|
||||
import config, funcs, os
|
||||
import config, funcs, os, time
|
||||
|
||||
known_phashes = {'e7c51a904b69d366': 'default empty profile picture',
|
||||
'cb3ce46194c335dc': 'default empty profile picture',
|
||||
}
|
||||
|
||||
known_hashes = {
|
||||
'09c3cf34d4f117d99fa6285f4bfd3a0d888d7ab2cbca665b16097f6b93ca0de6' : 'default empty profile picture',
|
||||
'2b9c0914d8f3f0aa6cf86705df70b7b21e9ca2f9013a346463788e7cebd0158f' : 'default empty profile picture',
|
||||
}
|
||||
|
||||
db, cursor = config.gen_connection()
|
||||
|
||||
cursor.execute(f"SELECT DISTINCT username, user_id FROM media WHERE user_id IS NOT NULL AND username IN (SELECT username FROM following WHERE platform = 'instagram');")
|
||||
cursor.execute("SELECT DISTINCT username, user_id, favorite FROM following WHERE user_id IS NOT NULL AND platform = 'instagram' ORDER BY favorite DESC;")
|
||||
usernames = cursor.fetchall()
|
||||
|
||||
for username, user_id in usernames:
|
||||
for username, user_id, favorite in usernames:
|
||||
profilepicurl = get_hd_profile_picture(user_id=user_id)
|
||||
if not profilepicurl:
|
||||
print(f'Failed for {username}')
|
||||
continue
|
||||
|
||||
filename = os.path.basename(profilepicurl).split('?')[0]
|
||||
user_dir = os.path.join('media', 'instagram', 'profile', username)
|
||||
filepath = os.path.join(user_dir, filename)
|
||||
|
||||
funcs.download_file(profilepicurl, filepath)
|
||||
print(f"Downloaded profile picture for {username}.")
|
||||
filepath = funcs.download_file(profilepicurl, filepath)
|
||||
|
||||
if not filepath:
|
||||
continue
|
||||
|
||||
phash = funcs.generate_phash(filepath)
|
||||
if phash in known_phashes:
|
||||
print(f"Profile picture for {username} is the default empty profile picture.")
|
||||
os.remove(filepath)
|
||||
continue
|
||||
|
||||
print(f"Downloaded profile picture for {username}.")
|
||||
|
||||
time.sleep(1)
|
||||
@ -1,153 +0,0 @@
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from bs4 import BeautifulSoup
|
||||
import requests
|
||||
import json
|
||||
|
||||
headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"}
|
||||
|
||||
snap_types = {
|
||||
27 : ['spotlight', 'video'],
|
||||
256 : ['thumbnail', 'image'],
|
||||
400 : ['idk', 'image'],
|
||||
1023 : ['idk', 'image'],
|
||||
1034 : ['downscaled_video', 'video'],
|
||||
1322 : ['idk', 'video'],
|
||||
1325 : ['idk', 'video'],
|
||||
}
|
||||
|
||||
def get_data(username):
|
||||
url = f"https://www.snapchat.com/add/{username}"
|
||||
response = requests.get(url, headers=headers)
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
data_script = soup.find("script", id="__NEXT_DATA__")
|
||||
if not data_script:
|
||||
print(f"No data found for {username}.")
|
||||
return None
|
||||
data = json.loads(data_script.string)
|
||||
return data
|
||||
|
||||
def get_social_medias(data):
|
||||
website_url = None
|
||||
try:
|
||||
website_url = data['props']['pageProps']['userProfile']['publicProfileInfo']['websiteUrl']
|
||||
except KeyError:
|
||||
pass
|
||||
return website_url
|
||||
|
||||
def get_related_profiles(data):
|
||||
related_profiles = []
|
||||
try:
|
||||
related_profiles_data = data['props']['pageProps']['userProfile']['relatedProfiles']
|
||||
for profile in related_profiles_data:
|
||||
related_profiles.append(profile['username'])
|
||||
except KeyError:
|
||||
pass
|
||||
return related_profiles
|
||||
|
||||
def get_all_users_data(usernames):
|
||||
all_data = {}
|
||||
|
||||
# Define a helper function for threading
|
||||
def fetch_data(username):
|
||||
return username, get_data(username)
|
||||
|
||||
# Use ThreadPoolExecutor for concurrent fetching
|
||||
with ThreadPoolExecutor() as executor:
|
||||
futures = {executor.submit(fetch_data, username): username for username in usernames}
|
||||
|
||||
for future in as_completed(futures):
|
||||
username = futures[future]
|
||||
try:
|
||||
username, data = future.result()
|
||||
all_data[username] = data
|
||||
except Exception as e:
|
||||
print(f"Error fetching data for {username}: {e}")
|
||||
all_data[username] = None
|
||||
|
||||
return all_data
|
||||
|
||||
def parse_stories(stories):
|
||||
parsed_stories = []
|
||||
|
||||
for story in stories:
|
||||
parsed_story = parse_story(story)
|
||||
parsed_stories.append(parsed_story)
|
||||
|
||||
return parsed_stories
|
||||
|
||||
def get_stories(data):
|
||||
"""Extract story list from the JSON data."""
|
||||
try:
|
||||
stories = data['props']['pageProps']['story']['snapList']
|
||||
|
||||
if not type(stories) == list:
|
||||
return []
|
||||
|
||||
stories.sort(key=lambda x: x.get('snapIndex'), reverse=True)
|
||||
return stories
|
||||
except:
|
||||
return []
|
||||
|
||||
def get_highlights(data):
|
||||
"""Extract highlights from possible highlight keys in JSON data."""
|
||||
highlights = []
|
||||
|
||||
page_props = data.get('props', {}).get('pageProps', {})
|
||||
possible_highlight_keys = ['curatedHighlights', 'savedHighlights', 'highlights']
|
||||
|
||||
for key in possible_highlight_keys:
|
||||
highlight_data = page_props.get(key, [])
|
||||
if highlight_data:
|
||||
highlights.extend(highlight_data)
|
||||
|
||||
return highlights
|
||||
|
||||
def parse_story(story):
|
||||
original_snap_id = story.get('snapId', {}).get('value', '')
|
||||
snap_url = story.get('snapUrls', {}).get('mediaUrl', '')
|
||||
timestamp = story.get('timestampInSec', {}).get('value', '')
|
||||
media_type = story.get('snapMediaType')
|
||||
media_type = 'image' if media_type == 0 else 'video'
|
||||
|
||||
return {
|
||||
"original_snap_id": original_snap_id,
|
||||
"snap_id": get_snap_id(snap_url),
|
||||
"url": snap_url,
|
||||
"timestamp": timestamp,
|
||||
"platform": "snapchat",
|
||||
"type": "story",
|
||||
"username": story.get('username', ''),
|
||||
"media_type": media_type,
|
||||
}
|
||||
|
||||
def get_snap_id(url):
|
||||
return url.split('?')[0].split('/')[-1].split('.')[0]
|
||||
|
||||
def get_highlight_stories(data):
|
||||
stories = []
|
||||
highlights = get_highlights(data)
|
||||
|
||||
for highlight in highlights:
|
||||
snap_list = highlight.get('snapList', [])
|
||||
|
||||
for snap in snap_list:
|
||||
story = parse_story(snap)
|
||||
stories.append(story)
|
||||
|
||||
return stories
|
||||
|
||||
def get_spotlight_metadata(data):
|
||||
"""Extract spotlight metadata from JSON data."""
|
||||
try:
|
||||
return data['props']['pageProps']['spotlightStoryMetadata']
|
||||
except KeyError:
|
||||
return []
|
||||
|
||||
def get_username(data):
|
||||
"""Extract username from JSON data."""
|
||||
try:
|
||||
return data['props']['pageProps']['userProfile']['publicProfileInfo']['username']
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
|
||||
@ -1,126 +0,0 @@
|
||||
import os
|
||||
import json
|
||||
from tqdm import tqdm
|
||||
|
||||
from funcs import get_files
|
||||
from snapchat import get_stories, get_highlights, get_spotlight_metadata, get_username
|
||||
|
||||
# import config as altpinsConfig
|
||||
import altpinsConfig
|
||||
|
||||
def get_data(filepath):
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
print(f"Error reading {filepath}")
|
||||
return None
|
||||
|
||||
def process_story(story, username, story_type, db, cursor):
|
||||
snap_urls = story.get('snapUrls', {})
|
||||
media_url = snap_urls.get('mediaUrl', '').split('?')[0]
|
||||
media_id = media_url.split('/')[-1].split('.')[0].split('?')[-1]
|
||||
|
||||
if media_id in existing_media_ids:
|
||||
return False
|
||||
|
||||
media_url = f"https://cf-st.sc-cdn.net/d/{media_url.split('/')[-1]}"
|
||||
|
||||
media_preview_url = snap_urls.get('mediaPreviewUrl', '').get('value', '').split('?')[0]
|
||||
media_preview_url = f"https://cf-st.sc-cdn.net/d/{media_preview_url.split('/')[-1]}"
|
||||
|
||||
|
||||
timestamp = story.get('timestampInSec', {}).get('value', '')
|
||||
media_type = story.get('snapMediaType')
|
||||
snap_id = story.get('snapId', {}).get('value', '')
|
||||
|
||||
|
||||
query = "INSERT IGNORE INTO snapchat_stories (snapId, mediaUrl, mediaPreviewUrl, timestampInSec, snapMediaType, storyType, username, media_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
|
||||
cursor.execute(query, (snap_id, media_url, media_preview_url, timestamp, media_type, story_type, username, media_id))
|
||||
db.commit()
|
||||
|
||||
existing_media_ids.add(media_id)
|
||||
|
||||
print_emoji = '✅' if cursor.rowcount else '❌'
|
||||
print(f"{print_emoji} Inserted story {media_id}")
|
||||
|
||||
def process_json(json_path, db, cursor):
|
||||
"""
|
||||
Given a path to a JSON file, parse it and insert relevant data
|
||||
into the database.
|
||||
"""
|
||||
|
||||
# Load JSON data
|
||||
data = get_data(json_path)
|
||||
username = get_username(data)
|
||||
|
||||
ready_stories = []
|
||||
|
||||
# Insert stories (regular)
|
||||
stories = get_stories(data)
|
||||
for story in stories:
|
||||
story['storyType'] = 'story'
|
||||
ready_stories.append(story)
|
||||
|
||||
# Insert stories (highlights)
|
||||
highlights = get_highlights(data)
|
||||
highlight_stories = [story for highlight in highlights for story in highlight.get('snapList', [])]
|
||||
highlight_stories.sort(key=lambda x: x.get('snapIndex'), reverse=True)
|
||||
for story in highlight_stories:
|
||||
story['storyType'] = 'highlight'
|
||||
ready_stories.append(story)
|
||||
|
||||
|
||||
for story in ready_stories:
|
||||
story_type = story.get('storyType')
|
||||
process_story(story, username, story_type, db, cursor)
|
||||
|
||||
|
||||
# Insert spotlight metadata
|
||||
spotlight_metadata = get_spotlight_metadata(data)
|
||||
for story in spotlight_metadata:
|
||||
try:
|
||||
media_id = story['videoMetadata']['contentUrl'].split('/')[-1].split('.')[0].split('?')[-1]
|
||||
deepLinkUrl = story['oneLinkParams']['deepLinkUrl'].split('?')[0]
|
||||
except:
|
||||
continue
|
||||
|
||||
if not all((media_id, deepLinkUrl)):
|
||||
continue
|
||||
|
||||
if deepLinkUrl in existing_spotlights:
|
||||
continue
|
||||
|
||||
deepLinkId = deepLinkUrl.split('/')[-1]
|
||||
description = story['description']
|
||||
|
||||
insert_query = "INSERT IGNORE INTO snapchat_metadata (media_id, deepLinkUrl, description, username, deepLinkId) VALUES (%s, %s, %s, %s, %s)"
|
||||
cursor.execute(insert_query, (media_id, deepLinkUrl, description, username, deepLinkId))
|
||||
db.commit()
|
||||
|
||||
existing_spotlights.add(deepLinkUrl)
|
||||
|
||||
print_emoji = '✅' if cursor.rowcount else '❌'
|
||||
print(f"{print_emoji} Inserted spotlight {media_id}")
|
||||
|
||||
os.remove(json_path)
|
||||
|
||||
|
||||
db, cursor = altpinsConfig.gen_connection()
|
||||
|
||||
existing_media_ids = []
|
||||
cursor.execute("SELECT media_id FROM snapchat_stories WHERE media_id != '';")
|
||||
existing_media_ids = {row[0] for row in cursor.fetchall()}
|
||||
|
||||
existing_spotlights = []
|
||||
cursor.execute("SELECT deepLinkUrl FROM snapchat_metadata;")
|
||||
existing_spotlights = {row[0] for row in cursor.fetchall()}
|
||||
|
||||
data_dir = 'data'
|
||||
files = [f for f in get_files(data_dir) if f.endswith('.json')]
|
||||
|
||||
# Wrap the file list with tqdm to show a progress bar
|
||||
for filepath in tqdm(files, desc="Processing files", unit="file"):
|
||||
process_json(filepath, db, cursor)
|
||||
|
||||
db.close()
|
||||
@ -1,66 +0,0 @@
|
||||
from snapchat import get_all_users_data, get_stories, get_highlight_stories, get_social_medias, get_related_profiles
|
||||
import os, config
|
||||
|
||||
snapchat_directory = "snapchat"
|
||||
media_directory = "media"
|
||||
temp_directory = ".temp"
|
||||
data_directory = "data"
|
||||
|
||||
directory = os.path.join(media_directory, snapchat_directory)
|
||||
|
||||
def get_snapchat_stories(usernames):
|
||||
usernames = usernames[:5]
|
||||
snapchat_users_data = get_all_users_data(usernames)
|
||||
snapchat_users_data = dict(sorted(snapchat_users_data.items()))
|
||||
|
||||
ready_stories = []
|
||||
|
||||
for username, data in snapchat_users_data.items():
|
||||
print(f"Getting stories for {username}...")
|
||||
|
||||
data = snapchat_users_data.get(username)
|
||||
if not data:
|
||||
print(f"Failed to get data for {username}. Skipping.")
|
||||
continue
|
||||
|
||||
website_url = get_social_medias(data)
|
||||
|
||||
related_profiles = get_related_profiles(data)
|
||||
|
||||
stories = get_stories(data)
|
||||
|
||||
stories.extend(get_highlight_stories(data))
|
||||
|
||||
for story in stories:
|
||||
snap_id = story['snap_id']
|
||||
url = story['url']
|
||||
timestamp = story['timestamp']
|
||||
|
||||
# Determine file extension
|
||||
extension = '.jpg' if story['media_type'] == 'image' else '.mp4'
|
||||
|
||||
filename = f"{username}~{timestamp}~{snap_id}{extension}"
|
||||
filepath = os.path.join(directory, filename)
|
||||
|
||||
story['media_url'] = url
|
||||
story['snap_id'] = snap_id
|
||||
story['filepath'] = filepath
|
||||
story['username'] = username
|
||||
story['timestamp'] = timestamp
|
||||
story['original_snap_id'] = story['original_snap_id']
|
||||
|
||||
ready_stories.append(story)
|
||||
|
||||
# sort ready_stories by timestamp from oldest to newest
|
||||
ready_stories.sort(key=lambda x: x['timestamp'])
|
||||
|
||||
return ready_stories
|
||||
|
||||
db, cursor = config.gen_connection()
|
||||
|
||||
cursor.execute("SELECT username FROM following WHERE platform = 'snapchat' ORDER BY id DESC")
|
||||
usernames = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
stories = get_snapchat_stories(usernames)
|
||||
|
||||
|
||||
@ -1,243 +0,0 @@
|
||||
from snapchat import get_stories, get_highlight_stories, get_all_users_data, parse_stories
|
||||
from datetime import datetime
|
||||
from uuid import uuid4
|
||||
import config
|
||||
import funcs
|
||||
import cv2
|
||||
import os
|
||||
import json
|
||||
|
||||
UPLOAD_MODE = True
|
||||
|
||||
media_directory = "media"
|
||||
snapchat_directory = "snapchat"
|
||||
temp_directory = ".temp"
|
||||
data_directory = "data"
|
||||
|
||||
directory = os.path.join(media_directory, snapchat_directory)
|
||||
|
||||
os.makedirs(media_directory, exist_ok=True)
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
os.makedirs(temp_directory, exist_ok=True)
|
||||
os.makedirs(data_directory, exist_ok=True)
|
||||
|
||||
def find_duplicate_snap(existing_snap_ids, snap_id):
|
||||
return snap_id in existing_snap_ids
|
||||
|
||||
def archive_data(data, username):
|
||||
try:
|
||||
current_timestamp = int(datetime.now().timestamp())
|
||||
data_filename = f"{username}~{current_timestamp}.json"
|
||||
data_filepath = os.path.join(data_directory, data_filename)
|
||||
with open(data_filepath, 'w') as f:
|
||||
f.write(json.dumps(data, indent=4))
|
||||
except:
|
||||
print(f"Failed to archive data for {username}.")
|
||||
return False
|
||||
|
||||
def get_snapchat_stories(usernames):
|
||||
snapchat_users_data = get_all_users_data(usernames)
|
||||
snapchat_users_data = dict(sorted(snapchat_users_data.items()))
|
||||
|
||||
ready_stories = []
|
||||
|
||||
for username, data in snapchat_users_data.items():
|
||||
print(f"Getting stories for {username}...")
|
||||
|
||||
if not data:
|
||||
print(f"Failed to get data for {username}. Skipping.")
|
||||
continue
|
||||
|
||||
archive_data(data, username)
|
||||
|
||||
stories = get_stories(data)
|
||||
stories = parse_stories(stories)
|
||||
|
||||
stories.extend(get_highlight_stories(data))
|
||||
|
||||
for story in stories:
|
||||
snap_id = story['snap_id']
|
||||
url = story['url']
|
||||
timestamp = story['timestamp']
|
||||
|
||||
# Determine file extension
|
||||
file_exts = {'image': '.jpg', 'video': '.mp4'}
|
||||
extension = file_exts.get(story['media_type'])
|
||||
if not extension:
|
||||
print(f"Failed to determine file extension for {url}. Skipping.")
|
||||
continue
|
||||
|
||||
filename = f"{username}~{timestamp}~{snap_id}{extension}"
|
||||
filepath = os.path.join(directory, filename)
|
||||
|
||||
story['media_url'] = url
|
||||
story['snap_id'] = snap_id
|
||||
story['filepath'] = filepath
|
||||
story['username'] = username
|
||||
story['timestamp'] = timestamp
|
||||
story['original_snap_id'] = story['original_snap_id']
|
||||
|
||||
ready_stories.append(story)
|
||||
|
||||
ready_stories.sort(key=lambda x: x['timestamp'])
|
||||
|
||||
return ready_stories
|
||||
|
||||
def get_snapchat_files():
|
||||
stories = funcs.get_files(directory)
|
||||
stories = [get_media_data(filepath) for filepath in stories]
|
||||
stories = [story for story in stories if story]
|
||||
return stories
|
||||
|
||||
def main():
|
||||
print('Initializing snappy...')
|
||||
ready_stories = []
|
||||
|
||||
stories_from_files = get_snapchat_files()
|
||||
|
||||
cursor.execute("SELECT username FROM following WHERE platform = 'snapchat' ORDER BY id DESC")
|
||||
usernames = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
print(f"Getting stories for {len(usernames)} users...")
|
||||
new_stories = get_snapchat_stories(usernames)
|
||||
|
||||
cleaned_stories = []
|
||||
print("Checking for duplicates...")
|
||||
for story in new_stories:
|
||||
duplicate_snap = find_duplicate_snap(existing_snap_ids, story['snap_id'])
|
||||
if duplicate_snap:
|
||||
print(f"Snap {story['filepath']} already exists in the database. Removing...")
|
||||
continue
|
||||
cleaned_stories.append(story)
|
||||
|
||||
cleaned_stories = download_stories(cleaned_stories)
|
||||
|
||||
ready_stories.extend(cleaned_stories)
|
||||
ready_stories.extend(stories_from_files)
|
||||
|
||||
for story in ready_stories:
|
||||
UploadMedia(story)
|
||||
|
||||
def download_stories(stories):
|
||||
downloaded_stories = []
|
||||
for story in stories:
|
||||
filepath = story['filepath']
|
||||
url = story['media_url']
|
||||
|
||||
filepath = funcs.download_file(url, filepath)
|
||||
print(f"Downloaded {os.path.basename(filepath)}")
|
||||
|
||||
if not filepath:
|
||||
continue
|
||||
|
||||
story['hash'] = funcs.calculate_file_hash(filepath)
|
||||
story['size'] = os.path.getsize(filepath)
|
||||
|
||||
downloaded_stories.append(story)
|
||||
|
||||
return downloaded_stories
|
||||
|
||||
def UploadMedia(media):
|
||||
file_size = media['size']
|
||||
file_hash = media['hash']
|
||||
filepath = media['filepath']
|
||||
filename = os.path.basename(filepath)
|
||||
|
||||
username = media['username']
|
||||
timestamp = media['timestamp']
|
||||
media_type = media['media_type']
|
||||
snap_id = media['snap_id']
|
||||
original_snap_id = media['original_snap_id']
|
||||
thumbnail_url = None
|
||||
phash = None
|
||||
|
||||
duplicate_snap = find_duplicate_snap(existing_snap_ids, media['snap_id'])
|
||||
if duplicate_snap:
|
||||
print(f"Snap {filename} already exists in the database. Removing...")
|
||||
os.remove(filepath)
|
||||
return False
|
||||
|
||||
post_date = datetime.fromtimestamp(int(timestamp))
|
||||
|
||||
width, height = funcs.get_media_dimensions(filepath)
|
||||
|
||||
duration = funcs.get_video_duration(filepath)
|
||||
|
||||
if media_type == 'image':
|
||||
phash = funcs.generate_phash(filepath)
|
||||
elif media_type == 'video':
|
||||
try:
|
||||
thumb_path = generate_thumbnail(filepath)
|
||||
obj_storage.PutFile(thumb_path, f'thumbnails/{filename}')
|
||||
thumbnail_url = f"https://storysave.b-cdn.net/thumbnails/{filename}"
|
||||
phash = funcs.generate_phash(thumb_path)
|
||||
os.remove(thumb_path)
|
||||
except:
|
||||
print('Error generating thumbnail. Skipping...')
|
||||
return False
|
||||
|
||||
server_path = f'media/snaps/{username}/{filename}'
|
||||
file_url = f"https://storysave.b-cdn.net/{server_path}"
|
||||
|
||||
obj_storage.PutFile(filepath, server_path)
|
||||
|
||||
query = "INSERT IGNORE INTO media (username, media_type, media_url, width, height, post_type, date, hash, filename, duration, thumbnail, phash, platform, snap_id, original_snap_id, file_size) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
||||
values = (username, media_type, file_url, width, height, 'story', post_date, file_hash, filename, duration, thumbnail_url, phash, 'snapchat', snap_id, original_snap_id, file_size)
|
||||
|
||||
cursor.execute(query, values)
|
||||
db.commit()
|
||||
print(f'[{cursor.rowcount}] records updated. File {filename} uploaded to {file_url}')
|
||||
|
||||
os.remove(filepath)
|
||||
|
||||
return True
|
||||
|
||||
def generate_thumbnail(filepath):
|
||||
thumb_path = os.path.join(temp_directory, f'{uuid4()}.jpg')
|
||||
cap = cv2.VideoCapture(filepath)
|
||||
ret, frame = cap.read()
|
||||
cv2.imwrite(thumb_path, frame)
|
||||
cap.release()
|
||||
return thumb_path
|
||||
|
||||
def get_media_data(filepath):
|
||||
filename = os.path.basename(filepath)
|
||||
parts = filename.split('~')
|
||||
if len(parts) < 3:
|
||||
return False
|
||||
|
||||
username = parts[0]
|
||||
timestamp = parts[1]
|
||||
snap_id = parts[2]
|
||||
snap_id = os.path.splitext(snap_id)[0]
|
||||
|
||||
file_size = os.path.getsize(filepath)
|
||||
file_hash = funcs.calculate_file_hash(filepath)
|
||||
|
||||
data = {
|
||||
"username": username,
|
||||
"timestamp": timestamp,
|
||||
"filepath": filepath,
|
||||
"snap_id": snap_id,
|
||||
"original_snap_id": None,
|
||||
"media_url": None,
|
||||
"size": file_size,
|
||||
"hash": file_hash
|
||||
}
|
||||
|
||||
return data
|
||||
|
||||
if __name__ == '__main__':
|
||||
print('Starting snappy...')
|
||||
|
||||
db, cursor = config.gen_connection()
|
||||
obj_storage = config.get_storage()
|
||||
|
||||
cursor.execute("SELECT snap_id FROM media WHERE filename IS NOT NULL AND platform = 'snapchat' ORDER BY id DESC")
|
||||
existing_snap_ids = cursor.fetchall()
|
||||
|
||||
existing_snap_ids = {row[0] for row in existing_snap_ids}
|
||||
|
||||
main()
|
||||
|
||||
print("Processing completed.")
|
||||
@ -1,147 +0,0 @@
|
||||
from datetime import datetime
|
||||
from uuid import uuid4
|
||||
import funcs
|
||||
import config
|
||||
import cv2
|
||||
import os
|
||||
|
||||
|
||||
media_directory = "media/ready_for_upload"
|
||||
platform = "instagram"
|
||||
|
||||
working_directory = os.path.join(media_directory, platform)
|
||||
|
||||
def UploadMedia(media):
|
||||
username = media['username']
|
||||
user_id = media['user_id']
|
||||
filepath = media['filepath']
|
||||
platform = media['platform']
|
||||
|
||||
media_id = media['media_id']
|
||||
|
||||
thumbnail_url = None
|
||||
phash = None
|
||||
|
||||
filename = os.path.basename(filepath)
|
||||
file_extension = os.path.splitext(filename)[1].lower()
|
||||
|
||||
media_type = funcs.get_media_type(filename)
|
||||
if not media_type:
|
||||
print(f'Error determining media type for {filename}. Skipping...')
|
||||
return False
|
||||
|
||||
post_type = funcs.determine_post_type(filepath)
|
||||
if not post_type:
|
||||
print(f'Error determining post type for {filename}. Skipping...')
|
||||
return False
|
||||
|
||||
file_hash = funcs.calculate_file_hash(filepath)
|
||||
|
||||
post_date = datetime.now()
|
||||
|
||||
width, height = funcs.get_media_dimensions(filepath)
|
||||
|
||||
duration = funcs.get_video_duration(filepath)
|
||||
|
||||
if media_type == 'image':
|
||||
phash = funcs.generate_phash(filepath)
|
||||
elif media_type == 'video':
|
||||
try:
|
||||
thumb_path = generate_thumbnail(filepath)
|
||||
obj_storage.PutFile(thumb_path, f'thumbnails/{file_hash}.jpg') # this might be a problem in case of duplicate hashes
|
||||
thumbnail_url = f"https://storysave.b-cdn.net/thumbnails/{file_hash}.jpg"
|
||||
phash = funcs.generate_phash(thumb_path)
|
||||
os.remove(thumb_path)
|
||||
except Exception as e:
|
||||
print(f'Error generating thumbnail. Skipping... {e}')
|
||||
return False
|
||||
|
||||
newFilename = f'{file_hash}{file_extension}'
|
||||
server_path = f'media/{post_type}/{username}/{newFilename}'
|
||||
|
||||
file_url = f"https://storysave.b-cdn.net/{server_path}"
|
||||
|
||||
obj_storage.PutFile(filepath, server_path) # slow as fuck
|
||||
|
||||
post_type = 'story' if post_type == 'stories' else 'post'
|
||||
query = "INSERT IGNORE INTO media (username, media_type, media_url, width, height, post_type, date, user_id, hash, filename, duration, thumbnail, phash, platform, media_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
||||
values = (username, media_type, file_url, width, height, post_type, post_date, user_id, file_hash, filename, duration, thumbnail_url, phash, platform, media_id)
|
||||
|
||||
newCursor.execute(query, values) # slower
|
||||
newDB.commit()
|
||||
print(f'[{newCursor.rowcount}] records updated. File {filename} uploaded to {file_url}')
|
||||
|
||||
os.remove(filepath)
|
||||
|
||||
return True
|
||||
|
||||
def generate_thumbnail(filepath):
|
||||
thumb_path = f'.temp/{uuid4()}.jpg'
|
||||
cap = cv2.VideoCapture(filepath)
|
||||
ret, frame = cap.read()
|
||||
cv2.imwrite(thumb_path, frame)
|
||||
cap.release()
|
||||
return thumb_path
|
||||
|
||||
def get_user_id(username):
|
||||
username = username.lower()
|
||||
if username in existing_users:
|
||||
return existing_users[username]
|
||||
|
||||
return None
|
||||
|
||||
def get_media(folder_path):
|
||||
medias = []
|
||||
|
||||
user_folders = os.listdir(folder_path)
|
||||
for user_folder in user_folders:
|
||||
user_folder_path = os.path.join(folder_path, user_folder)
|
||||
|
||||
if not os.path.isdir(user_folder_path):
|
||||
continue
|
||||
|
||||
files = os.listdir(user_folder_path)
|
||||
for filename in files:
|
||||
filepath = os.path.join(folder_path, user_folder, filename)
|
||||
|
||||
# skip file if its hidden
|
||||
if filename.startswith('.'):
|
||||
continue
|
||||
|
||||
try:
|
||||
media_id = filename.split('.')[0]
|
||||
media_id = int(media_id)
|
||||
except:
|
||||
media_id = None
|
||||
|
||||
media = {
|
||||
'username': user_folder,
|
||||
'filepath': filepath,
|
||||
'user_id': get_user_id(user_folder),
|
||||
'media_id': media_id,
|
||||
'platform': platform
|
||||
}
|
||||
|
||||
medias.append(media)
|
||||
|
||||
return medias
|
||||
|
||||
def dump_instagram(folder_path):
|
||||
medias = get_media(folder_path)
|
||||
|
||||
for media in medias:
|
||||
UploadMedia(media)
|
||||
|
||||
if __name__ == '__main__':
|
||||
print('Starting processing...')
|
||||
|
||||
newDB, newCursor = config.gen_connection()
|
||||
|
||||
obj_storage = config.get_storage()
|
||||
|
||||
newCursor.execute("SELECT DISTINCT username, user_id FROM media WHERE user_id IS NOT NULL")
|
||||
existing_users = {user[0].lower(): user[1].lower() for user in newCursor.fetchall()}
|
||||
|
||||
dump_instagram(working_directory)
|
||||
|
||||
print("Processing completed.")
|
||||
@ -1,140 +0,0 @@
|
||||
from datetime import datetime
|
||||
from uuid import uuid4
|
||||
import funcs
|
||||
import config
|
||||
import cv2
|
||||
import os
|
||||
|
||||
directory = 'processed_tiktoks'
|
||||
|
||||
def UploadMedia(media):
|
||||
platform = 'TikTok'
|
||||
username = media['username']
|
||||
filepath = media['filepath']
|
||||
file_size = os.path.getsize(filepath)
|
||||
thumbnail_url = None
|
||||
phash = None
|
||||
|
||||
filename = os.path.basename(filepath)
|
||||
file_extension = os.path.splitext(filename)[1].lower()
|
||||
|
||||
media_type = funcs.get_media_type(filename)
|
||||
if not media_type:
|
||||
print(f'Error determining media type for {filename}. Skipping...')
|
||||
return False
|
||||
|
||||
post_type = funcs.determine_post_type(filepath)
|
||||
if not post_type:
|
||||
print(f'Error determining post type for {filename}. Skipping...')
|
||||
return False
|
||||
|
||||
file_hash = funcs.calculate_file_hash(filepath)
|
||||
if file_hash in existing_hashes:
|
||||
print(f'File {filename} already exists. Skipping...')
|
||||
return False
|
||||
|
||||
post_date = datetime.now()
|
||||
|
||||
width, height = funcs.get_media_dimensions(filepath)
|
||||
|
||||
duration = funcs.get_video_duration(filepath)
|
||||
|
||||
if media_type == 'image':
|
||||
phash = funcs.generate_phash(filepath)
|
||||
elif media_type == 'video':
|
||||
try:
|
||||
thumb_path = generate_thumbnail(filepath)
|
||||
obj_storage.PutFile(thumb_path, f'thumbnails/{file_hash}.jpg') # this might be a problem in case of duplicate hashes
|
||||
thumbnail_url = f"https://storysave.b-cdn.net/thumbnails/{file_hash}.jpg"
|
||||
phash = funcs.generate_phash(thumb_path)
|
||||
os.remove(thumb_path)
|
||||
except:
|
||||
print('Error generating thumbnail. Skipping...')
|
||||
return False
|
||||
|
||||
newFilename = f'{file_hash}{file_extension}'
|
||||
server_path = f'media/tiktoks/{username}/{newFilename}'
|
||||
|
||||
file_url = f"https://storysave.b-cdn.net/{server_path}"
|
||||
|
||||
obj_storage.PutFile(filepath, server_path) # slow as fuck
|
||||
|
||||
post_type = 'story' if post_type == 'stories' else 'post'
|
||||
query = "INSERT IGNORE INTO media (username, media_type, media_url, width, height, post_type, date, hash, filename, duration, thumbnail, phash, platform, file_size) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
||||
values = (username, media_type, file_url, width, height, post_type, post_date, file_hash, filename, duration, thumbnail_url, phash, platform, file_size)
|
||||
|
||||
newCursor.execute(query, values) # slower
|
||||
newDB.commit()
|
||||
print(f'[{newCursor.rowcount}] records updated. File {filename} uploaded to {file_url}')
|
||||
|
||||
os.remove(filepath)
|
||||
|
||||
return True
|
||||
|
||||
def generate_thumbnail(filepath):
|
||||
thumb_path = f'temp/{uuid4()}.jpg'
|
||||
cap = cv2.VideoCapture(filepath)
|
||||
ret, frame = cap.read()
|
||||
cv2.imwrite(thumb_path, frame)
|
||||
cap.release()
|
||||
return thumb_path
|
||||
|
||||
def get_media_data(filepath):
|
||||
filename = os.path.basename(filepath)
|
||||
parts = filename.split('~')
|
||||
|
||||
if len(parts) == 3:
|
||||
username, title, tiktok_id = parts
|
||||
elif len(parts) == 2:
|
||||
username, title = parts
|
||||
tiktok_id = None
|
||||
else:
|
||||
return False
|
||||
|
||||
data = {'username': username, 'filepath': filepath, 'tiktok_id': tiktok_id, 'title': title}
|
||||
|
||||
return data
|
||||
|
||||
def get_media(folder_path):
|
||||
medias = []
|
||||
|
||||
users = os.listdir(folder_path)
|
||||
for user in users:
|
||||
user_folder = os.path.join(folder_path, user)
|
||||
if not os.path.isdir(user_folder):
|
||||
print(f"Skipping {user}")
|
||||
continue
|
||||
|
||||
files = os.listdir(user_folder)
|
||||
for filename in files:
|
||||
filepath = os.path.join(user_folder, filename)
|
||||
|
||||
data = get_media_data(filepath)
|
||||
if data:
|
||||
medias.append(data)
|
||||
|
||||
return medias
|
||||
|
||||
def dump_instagram(folder_path):
|
||||
medias = get_media(folder_path)
|
||||
|
||||
for media in medias:
|
||||
UploadMedia(media)
|
||||
|
||||
if __name__ == '__main__':
|
||||
print('Starting processing...')
|
||||
|
||||
if not os.listdir(directory):
|
||||
print('No files to process. Exiting...')
|
||||
exit()
|
||||
|
||||
newDB, newCursor = config.gen_connection()
|
||||
|
||||
obj_storage = config.get_storage()
|
||||
|
||||
newCursor.execute("SELECT hash FROM media WHERE hash IS NOT NULL AND platform = 'TikTok'")
|
||||
existing_hashes = [row[0] for row in newCursor.fetchall()]
|
||||
|
||||
dump_instagram(directory)
|
||||
|
||||
print("Processing completed.")
|
||||
@ -1,123 +0,0 @@
|
||||
from selenium.webdriver.common.by import By
|
||||
import undetected_chromedriver as uc
|
||||
import requests
|
||||
import base64
|
||||
import re
|
||||
import os
|
||||
|
||||
def format_url(url):
|
||||
clean_url = re.sub(r'%[0-9A-F]{2}', '', url)
|
||||
return clean_url
|
||||
|
||||
def encode_offset(offset_num):
|
||||
offset_base64 = str(offset_num).encode('utf-8')
|
||||
offset_base64 = base64.b64encode(offset_base64).decode('utf-8')
|
||||
return offset_base64
|
||||
|
||||
def get_clips(username):
|
||||
url = 'https://gql.twitch.tv/gql'
|
||||
|
||||
offset_num = 20
|
||||
offset_base64 = encode_offset(offset_num)
|
||||
|
||||
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
|
||||
|
||||
headers = {
|
||||
'client-id': 'kimne78kx3ncx6brgo4mv6wki5h1ko',
|
||||
'Content-Type': 'text/plain;charset=UTF-8',
|
||||
'User-Agent': user_agent
|
||||
}
|
||||
|
||||
data = {
|
||||
"operationName":"ClipsCards__User",
|
||||
"variables":{"login":username,"limit":100,},
|
||||
"extensions":{"persistedQuery":{"version":1,"sha256Hash":"4eb8f85fc41a36c481d809e8e99b2a32127fdb7647c336d27743ec4a88c4ea44"}}
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=data)
|
||||
|
||||
clips = response.json()
|
||||
|
||||
clips = clips['data']['user']['clips']['edges']
|
||||
|
||||
cleaned_clips = parse_clips(clips)
|
||||
|
||||
return cleaned_clips
|
||||
|
||||
|
||||
def parse_clips(clips):
|
||||
"""
|
||||
clips is a list of dictionaries
|
||||
"""
|
||||
|
||||
cleaned_clips = []
|
||||
for clip in clips:
|
||||
clip = clip['node']
|
||||
|
||||
clip_id = clip['id']
|
||||
clip_url = clip['url']
|
||||
clip_title = clip['title']
|
||||
clip_view_count = clip['viewCount']
|
||||
clip_duration = clip['durationSeconds']
|
||||
|
||||
cleaned_clip = {
|
||||
'id': clip_id,
|
||||
'url': clip_url,
|
||||
'title': clip_title,
|
||||
'views': clip_view_count,
|
||||
'duration': clip_duration
|
||||
}
|
||||
|
||||
cleaned_clips.append(cleaned_clip)
|
||||
|
||||
return cleaned_clips
|
||||
|
||||
def get_video_url(video_url, driver):
|
||||
driver.get(video_url)
|
||||
|
||||
# Get the video element
|
||||
video = driver.find_element(By.TAG_NAME, 'video')
|
||||
|
||||
# Get the video source
|
||||
video_src = video.get_attribute('src')
|
||||
|
||||
return video_src
|
||||
|
||||
def download_video(video_url, filepath):
|
||||
if os.path.exists(filepath):
|
||||
return filepath
|
||||
|
||||
video = requests.get(video_url)
|
||||
|
||||
# Download in chunks
|
||||
with open(filepath, 'wb') as f:
|
||||
for chunk in video.iter_content(chunk_size=1024):
|
||||
f.write(chunk)
|
||||
|
||||
return filepath
|
||||
|
||||
|
||||
# Set up an undetected Chrome driver in headless mode
|
||||
opts = uc.ChromeOptions()
|
||||
opts.add_argument("--headless")
|
||||
opts.add_argument("--window-size=1920,1080")
|
||||
|
||||
driver = uc.Chrome(use_subprocess=True, options=opts)
|
||||
|
||||
username = 'didicandy666'
|
||||
clips = get_clips(username)
|
||||
|
||||
for clip in clips:
|
||||
clip_url = clip['clip_url']
|
||||
|
||||
filename = f"{clip['id']}.mp4"
|
||||
filepath = os.path.join('clips', filename)
|
||||
|
||||
if os.path.exists(filepath):
|
||||
print(f"Already downloaded {filename}")
|
||||
continue
|
||||
|
||||
video_url = get_video_url(clip_url, driver)
|
||||
|
||||
download_video(video_url, filepath)
|
||||
print(f"Downloaded {filename}")
|
||||
@ -0,0 +1,143 @@
|
||||
import os
|
||||
import time
|
||||
import requests
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
|
||||
# --- Configuration ---
|
||||
USERNAME = "maorshabakov" # your Instagram username
|
||||
PASSWORD = "PeyxCU%MD*Zq9p" # your Instagram password
|
||||
TARGET_USER = "cata.leyah" # the username of the profile to scrape
|
||||
DOWNLOAD_DIR = "downloads" # directory to save media
|
||||
SCROLL_PAUSE_TIME = 2 # seconds to wait after each scroll
|
||||
|
||||
# --- Helper functions ---
|
||||
def login_instagram(driver, username, password):
|
||||
driver.get("https://www.instagram.com/accounts/login/")
|
||||
time.sleep(3) # wait for the login page to load
|
||||
|
||||
# Accept cookies if prompted (may need to adjust for your region)
|
||||
try:
|
||||
accept_button = driver.find_element(By.XPATH, "//button[text()='Allow all cookies']")
|
||||
accept_button.click()
|
||||
time.sleep(2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# check if already logged in by checking if the current url has been redirected to the home page
|
||||
if driver.current_url == "https://www.instagram.com/":
|
||||
print("Already logged in.")
|
||||
return
|
||||
|
||||
# Enter username and password
|
||||
username_input = driver.find_element(By.NAME, "username")
|
||||
password_input = driver.find_element(By.NAME, "password")
|
||||
username_input.send_keys(username)
|
||||
password_input.send_keys(password)
|
||||
password_input.send_keys(Keys.RETURN)
|
||||
time.sleep(5) # wait for login to complete
|
||||
|
||||
def scroll_to_load_posts(driver, post_count=12):
|
||||
post_links = dict()
|
||||
|
||||
last_height = driver.execute_script("return document.body.scrollHeight")
|
||||
while True:
|
||||
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
|
||||
time.sleep(SCROLL_PAUSE_TIME)
|
||||
new_height = driver.execute_script("return document.body.scrollHeight")
|
||||
|
||||
new_posts = get_post_links(driver)
|
||||
for link in new_posts:
|
||||
if link not in post_links:
|
||||
post_links[link] = True
|
||||
|
||||
if len(post_links) >= post_count:
|
||||
break
|
||||
|
||||
if new_height == last_height:
|
||||
break
|
||||
last_height = new_height
|
||||
|
||||
def get_post_links(driver):
|
||||
# Find all post links on the profile page.
|
||||
# Instagram posts are links with hrefs that contain '/p/'
|
||||
post_elements = driver.find_elements(By.XPATH, "//a[contains(@href, '/p/')]")
|
||||
links = [elem.get_attribute("href") for elem in post_elements]
|
||||
# Remove duplicates
|
||||
return list(set(links))
|
||||
|
||||
def download_media(url, download_folder, filename):
|
||||
response = requests.get(url, stream=True)
|
||||
if response.status_code == 200:
|
||||
filepath = os.path.join(download_folder, filename)
|
||||
with open(filepath, 'wb') as f:
|
||||
for chunk in response.iter_content(1024):
|
||||
f.write(chunk)
|
||||
print(f"Downloaded: {filename}")
|
||||
else:
|
||||
print(f"Failed to download: {url}")
|
||||
|
||||
def extract_media_url(driver):
|
||||
# Try to get video first
|
||||
try:
|
||||
video = driver.find_element(By.TAG_NAME, "video")
|
||||
media_url = video.get_attribute("src")
|
||||
if media_url:
|
||||
return media_url, "mp4"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback to image extraction
|
||||
try:
|
||||
# Sometimes the post image is inside a div with role="button"
|
||||
image = driver.find_element(By.XPATH, "//img[contains(@src, 'scontent')]")
|
||||
media_url = image.get_attribute("src")
|
||||
if media_url:
|
||||
return media_url, "jpg"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None, None
|
||||
|
||||
# --- Main script ---
|
||||
def main():
|
||||
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
|
||||
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--user-data-dir=.profiles/thenigga")
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
driver.maximize_window()
|
||||
|
||||
try:
|
||||
# Log in to Instagram
|
||||
login_instagram(driver, USERNAME, PASSWORD)
|
||||
|
||||
# Navigate to the target user's profile
|
||||
driver.get(f"https://www.instagram.com/{TARGET_USER}/")
|
||||
time.sleep(5) # let the page load
|
||||
|
||||
# Scroll down to load all posts
|
||||
scroll_to_load_posts(driver)
|
||||
|
||||
# Gather all post links from the profile page
|
||||
post_links = get_post_links(driver)
|
||||
print(f"Found {len(post_links)} posts.")
|
||||
|
||||
# Process each post
|
||||
for idx, post_link in enumerate(post_links):
|
||||
driver.get(post_link)
|
||||
time.sleep(3) # wait for post to load
|
||||
|
||||
# click download button where div class post-download-all-button
|
||||
download_button = driver.find_element(By.XPATH, "//div[@class='post-download-all-button']")
|
||||
driver.execute_script("arguments[0].click();", download_button)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue