cleanup structure
parent
96ebf0daac
commit
633251c6d2
@ -1,93 +0,0 @@
|
||||
from funcs import get_files
|
||||
from PIL import Image
|
||||
import imagehash
|
||||
import config
|
||||
import os
|
||||
|
||||
def generate_image_phash(filepath, hash_size=8):
|
||||
try:
|
||||
# Open the image using PIL
|
||||
pil_image = Image.open(filepath)
|
||||
|
||||
# Compute pHash using the imagehash library
|
||||
phash = imagehash.phash(pil_image, hash_size=hash_size)
|
||||
return phash
|
||||
except Exception as e:
|
||||
print(f"Error processing image {filepath}: {e}")
|
||||
return None
|
||||
|
||||
def are_phashes_duplicates(phash1, phash2, threshold=5):
|
||||
try:
|
||||
# Compute the Hamming distance between the pHashes
|
||||
distance = phash1 - phash2
|
||||
return distance <= threshold
|
||||
except TypeError as e:
|
||||
print(f"Error comparing pHashes: {e}")
|
||||
return False
|
||||
|
||||
def get_media_by_phash(phash, username, existing_medias, threshold=5):
|
||||
for media in existing_medias:
|
||||
existing_phash_str = media[1]
|
||||
existing_username = media[2]
|
||||
|
||||
# Convert stored pHash string to ImageHash object
|
||||
existing_phash = imagehash.hex_to_hash(existing_phash_str)
|
||||
|
||||
# Check if the current pHash is a duplicate
|
||||
if are_phashes_duplicates(phash, existing_phash, threshold=threshold):
|
||||
return media
|
||||
return None
|
||||
|
||||
def get_media_by_hash(hash, existing_medias):
|
||||
for media in existing_medias:
|
||||
existing_hash = media[1]
|
||||
if hash == existing_hash:
|
||||
return media
|
||||
return None
|
||||
|
||||
def get_media_by_id(media_id, existing_medias):
|
||||
for media in existing_medias:
|
||||
existing_media_id = media[1]
|
||||
if media_id == existing_media_id:
|
||||
return media
|
||||
return None
|
||||
|
||||
def get_data_by_filename(filename, data):
|
||||
for item in data:
|
||||
if filename in item['filepath']:
|
||||
return item
|
||||
return None
|
||||
|
||||
directory = 'check_if_exists' # Directory containing user images
|
||||
|
||||
# Database connection
|
||||
db, cursor = config.gen_connection()
|
||||
|
||||
# Fetch existing media with pHashes (assuming media are images, adjust media_type if needed)
|
||||
cursor.execute("SELECT id, phash, username FROM media WHERE media_type = %s AND phash IS NOT NULL", ['image'])
|
||||
existing_medias = cursor.fetchall()
|
||||
|
||||
usernames = os.listdir(directory)
|
||||
|
||||
for username in usernames:
|
||||
files = get_files(os.path.join(directory, username))
|
||||
for filepath in files:
|
||||
image_filename = os.path.basename(filepath)
|
||||
print(f'Processing {image_filename}...')
|
||||
|
||||
# Generate pHash for the image
|
||||
phash = generate_image_phash(filepath, hash_size=8)
|
||||
if phash is None:
|
||||
continue # Skip this image if there's an issue
|
||||
|
||||
phash_str = str(phash)
|
||||
|
||||
# Check if the image is a duplicate of any in the database
|
||||
duplicate_media = get_media_by_phash(phash, username, existing_medias, threshold=5)
|
||||
if duplicate_media:
|
||||
print(f'Duplicate found: https://altpins.com/pin/{duplicate_media[0]}')
|
||||
print(f'Duplicate image path: {filepath}')
|
||||
newpath = os.path.join('duplicates', duplicate_media[2], image_filename)
|
||||
os.makedirs(os.path.dirname(newpath), exist_ok=True)
|
||||
os.rename(filepath, newpath)
|
||||
print(f'Moved {image_filename} to duplicates/')
|
||||
@ -1,68 +0,0 @@
|
||||
from funcs import generate_phash # Assuming this function computes the pHash and returns a string
|
||||
import imagehash
|
||||
import os
|
||||
|
||||
def get_files(directory):
|
||||
# Recursively get all files in the directory
|
||||
file_list = []
|
||||
for root, dirs, files in os.walk(directory):
|
||||
for filename in files:
|
||||
file_list.append(os.path.join(root, filename))
|
||||
return file_list
|
||||
|
||||
# Function to compute pHashes for all images in a directory
|
||||
def compute_phashes(image_paths):
|
||||
phash_dict = {}
|
||||
for image_path in image_paths:
|
||||
try:
|
||||
# Compute pHash and get it as a string
|
||||
phash_str = generate_phash(image_path)
|
||||
# Convert the hash string to an ImageHash object
|
||||
phash = imagehash.hex_to_hash(phash_str)
|
||||
phash_dict[image_path] = phash
|
||||
except Exception as e:
|
||||
print(f"Error processing {image_path}: {e}")
|
||||
return phash_dict
|
||||
|
||||
# Get all image files from 'ready_to_upload' and 'sorted' directories
|
||||
ready_images = get_files('ready_to_upload')
|
||||
ready_images = [image for image in ready_images if not image.lower().endswith('.mp4')]
|
||||
|
||||
sorted_images = get_files('sorted')
|
||||
sorted_images = [image for image in sorted_images if not image.lower().endswith('.mp4')]
|
||||
|
||||
# Compute pHashes for images in 'ready_to_upload'
|
||||
print("Computing pHashes for 'ready_to_upload' images...")
|
||||
ready_image_phashes = compute_phashes(ready_images)
|
||||
|
||||
# Compute pHashes for images in 'sorted'
|
||||
print("Computing pHashes for 'sorted' images...")
|
||||
sorted_image_phashes = compute_phashes(sorted_images)
|
||||
|
||||
# Prepare the 'already_processed' directory
|
||||
os.makedirs('already_processed', exist_ok=True)
|
||||
|
||||
# Set a Hamming distance threshold for considering images as duplicates
|
||||
threshold = 5 # Adjust this value as needed
|
||||
|
||||
# Find and move duplicates
|
||||
for sorted_image, sorted_phash in sorted_image_phashes.items():
|
||||
duplicate_found = False
|
||||
for ready_image, ready_phash in ready_image_phashes.items():
|
||||
# Compute Hamming distance between the two pHashes
|
||||
try:
|
||||
distance = sorted_phash - ready_phash
|
||||
except TypeError as e:
|
||||
print(f"Error comparing hashes for {sorted_image} and {ready_image}: {e}")
|
||||
continue
|
||||
|
||||
if distance <= threshold:
|
||||
# Duplicate found
|
||||
newpath = sorted_image.replace('sorted', 'already_processed')
|
||||
os.makedirs(os.path.dirname(newpath), exist_ok=True)
|
||||
print(f"Moving {sorted_image} (duplicate of {ready_image}) to 'already_processed'")
|
||||
os.rename(sorted_image, newpath)
|
||||
duplicate_found = True
|
||||
break # Exit the loop since a duplicate is found
|
||||
if not duplicate_found:
|
||||
print(f"No duplicate found for {sorted_image}")
|
||||
@ -1,59 +0,0 @@
|
||||
import config
|
||||
|
||||
# Function to find the closest perceptual hash (phash) match
|
||||
def find_almost_identical_phash(phash, usernames, max_distance=1):
|
||||
"""
|
||||
Find a username whose phash is nearly identical to the given phash.
|
||||
:param phash: The phash to compare (e.g., from the 'unknown' image).
|
||||
:param usernames: List of tuples containing (username, phash).
|
||||
:param max_distance: Maximum Hamming distance to consider as "identical".
|
||||
:return: The matching username and phash, or None if no match is found.
|
||||
"""
|
||||
for username in usernames:
|
||||
dist = hamming_distance(phash, username[1])
|
||||
if dist <= max_distance:
|
||||
return username
|
||||
return None
|
||||
|
||||
def hamming_distance(phash1, phash2):
|
||||
"""
|
||||
Calculate the Hamming distance between two binary strings.
|
||||
"""
|
||||
if len(phash1) != len(phash2):
|
||||
raise ValueError("Hashes must be of the same length")
|
||||
return sum(c1 != c2 for c1, c2 in zip(phash1, phash2))
|
||||
|
||||
|
||||
# Establish database connection
|
||||
db, cursor = config.gen_connection()
|
||||
|
||||
# Fetch all images with an 'unknown' username
|
||||
cursor.execute("SELECT id, username, phash FROM media WHERE username = 'unknown'")
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Fetch all non-unknown usernames and their associated phash
|
||||
cursor.execute("SELECT username, phash FROM media WHERE username != 'unknown' AND phash IS NOT NULL AND status = 'public'")
|
||||
usernames = cursor.fetchall()
|
||||
|
||||
# Ensure there are valid usernames to compare against
|
||||
if not usernames:
|
||||
print("No known usernames found in the database.")
|
||||
exit()
|
||||
|
||||
# Adjusted section in your script
|
||||
for row in rows:
|
||||
id = row[0]
|
||||
phash = row[2]
|
||||
|
||||
# Find a nearly identical phash match
|
||||
closest = find_almost_identical_phash(phash, usernames, max_distance=2)
|
||||
|
||||
if closest:
|
||||
print(f"Found match for image {id}: {closest[0]} with phash {closest[1]}")
|
||||
cursor.execute(
|
||||
"UPDATE media SET username = %s WHERE id = %s",
|
||||
(closest[0], id),
|
||||
)
|
||||
db.commit()
|
||||
else:
|
||||
print(f"No nearly identical match found for image {id}.")
|
||||
@ -1,90 +0,0 @@
|
||||
from funcs import get_files # Assuming this is defined elsewhere
|
||||
from PIL import Image
|
||||
import imagehash
|
||||
import config
|
||||
import os
|
||||
|
||||
def generate_image_phash(filepath, hash_size=8):
|
||||
try:
|
||||
# Open the image using PIL
|
||||
pil_image = Image.open(filepath)
|
||||
|
||||
# Compute pHash using the imagehash library
|
||||
phash = imagehash.phash(pil_image, hash_size=hash_size)
|
||||
return phash
|
||||
except Exception as e:
|
||||
print(f"Error processing image {filepath}: {e}")
|
||||
return None
|
||||
|
||||
def are_phashes_duplicates(phash1, phash2, threshold=5):
|
||||
try:
|
||||
# Compute the Hamming distance between the pHashes
|
||||
distance = phash1 - phash2
|
||||
return distance <= threshold
|
||||
except TypeError as e:
|
||||
print(f"Error comparing pHashes: {e}")
|
||||
return False
|
||||
|
||||
def get_media_by_phash(phash, username, existing_medias, threshold=5):
|
||||
for media in existing_medias:
|
||||
existing_phash_str = media[1]
|
||||
|
||||
# existing_username = media[2]
|
||||
# if existing_username != username:
|
||||
# continue # Only compare with the same user's media
|
||||
|
||||
# Convert stored pHash string to ImageHash object
|
||||
existing_phash = imagehash.hex_to_hash(existing_phash_str)
|
||||
|
||||
# Check if the current pHash is a duplicate
|
||||
if are_phashes_duplicates(phash, existing_phash, threshold=threshold):
|
||||
return media
|
||||
return None
|
||||
|
||||
# Database connection
|
||||
db, cursor = config.gen_connection()
|
||||
|
||||
directory = 'check_if_exists' # Directory containing user images
|
||||
|
||||
# Fetch existing media with pHashes (assuming media are images, adjust media_type if needed)
|
||||
cursor.execute("SELECT id, phash, username FROM media WHERE media_type = %s AND phash IS NOT NULL", ['image'])
|
||||
existing_medias = cursor.fetchall()
|
||||
|
||||
existing_phashes = [media[1] for media in existing_medias]
|
||||
|
||||
# Go through the directory folder where each subfolder is a username
|
||||
users = os.listdir(directory)
|
||||
|
||||
for username in users:
|
||||
user_images_path = os.path.join(directory, username)
|
||||
if not os.path.isdir(user_images_path):
|
||||
continue # Skip non-directory files
|
||||
|
||||
# Get all images for the current user
|
||||
images = get_files(user_images_path) # Assuming this gets all image files
|
||||
|
||||
for filepath in images:
|
||||
image_filename = os.path.basename(filepath)
|
||||
print(f'Processing {image_filename}...')
|
||||
|
||||
# Generate pHash for the image
|
||||
phash = generate_image_phash(filepath, hash_size=8)
|
||||
if phash is None:
|
||||
continue # Skip this image if there's an issue
|
||||
|
||||
phash_str = str(phash)
|
||||
|
||||
if phash_str not in existing_phashes:
|
||||
print(f'No duplicate found for {image_filename}')
|
||||
continue
|
||||
|
||||
# Check if the image is a duplicate of any in the database
|
||||
duplicate_media = get_media_by_phash(phash, username, existing_medias, threshold=5)
|
||||
if duplicate_media:
|
||||
found_username = duplicate_media[2]
|
||||
print(f'Duplicate found: https://altpins.com/pin/{duplicate_media[0]}')
|
||||
print(f'Duplicate image path: {filepath}')
|
||||
newpath = os.path.join('duplicates', found_username, image_filename)
|
||||
os.makedirs(os.path.dirname(newpath), exist_ok=True)
|
||||
os.rename(filepath, newpath)
|
||||
print(f'Moved {image_filename} to duplicates/')
|
||||
@ -1,87 +0,0 @@
|
||||
from PIL import Image
|
||||
import imagehash
|
||||
import config
|
||||
import cv2
|
||||
import os
|
||||
|
||||
def generate_thumbnail_phash(filepath, hash_size=8): # Set hash_size to 8
|
||||
cap = cv2.VideoCapture(filepath)
|
||||
ret, frame = cap.read()
|
||||
cap.release()
|
||||
|
||||
if not ret:
|
||||
print(f"Error reading frame from {filepath}")
|
||||
return None
|
||||
|
||||
# Resize frame to a standard size
|
||||
standard_size = (320, 240)
|
||||
resized_frame = cv2.resize(frame, standard_size, interpolation=cv2.INTER_AREA)
|
||||
|
||||
# Convert OpenCV image (BGR) to PIL Image (RGB)
|
||||
image_rgb = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
|
||||
pil_image = Image.fromarray(image_rgb)
|
||||
|
||||
# Compute pHash
|
||||
phash = imagehash.phash(pil_image, hash_size=hash_size)
|
||||
|
||||
return phash
|
||||
|
||||
def are_phashes_duplicates(phash1, phash2, threshold=5):
|
||||
# Compute Hamming distance between the pHashes
|
||||
try:
|
||||
distance = phash1 - phash2
|
||||
except TypeError as e:
|
||||
print(f"Error comparing pHashes: {e}")
|
||||
return False
|
||||
|
||||
return distance <= threshold
|
||||
|
||||
def get_media_by_phash(phash, username, existing_medias, threshold=5):
|
||||
for media in existing_medias:
|
||||
existing_phash_str = media[1]
|
||||
existing_username = media[2]
|
||||
if existing_username != username:
|
||||
continue
|
||||
|
||||
# Convert stored phash string to ImageHash object
|
||||
existing_phash = imagehash.hex_to_hash(existing_phash_str)
|
||||
|
||||
if are_phashes_duplicates(phash, existing_phash, threshold=threshold):
|
||||
return media
|
||||
return None
|
||||
|
||||
# Database connection
|
||||
db, cursor = config.gen_connection()
|
||||
|
||||
# Directory containing user videos
|
||||
directory = 'check_if_exists'
|
||||
|
||||
# Fetch existing videos with pHashes
|
||||
cursor.execute("SELECT id, phash, username FROM media WHERE media_type = %s AND phash IS NOT NULL", ['video'])
|
||||
existing_medias = cursor.fetchall()
|
||||
|
||||
users = os.listdir(directory) # Assuming 'check_if_exists' contains user videos
|
||||
for username in users:
|
||||
user_videos_path = os.path.join(directory, username)
|
||||
if not os.path.isdir(user_videos_path):
|
||||
continue
|
||||
|
||||
videos = [video for video in os.listdir(user_videos_path) if video.endswith(('.mp4', '.avi', '.mov'))]
|
||||
for video in videos:
|
||||
print(f'Processing {video}...')
|
||||
filepath = os.path.join(user_videos_path, video)
|
||||
|
||||
phash = generate_thumbnail_phash(filepath, hash_size=8) # Use hash_size=8
|
||||
if phash is None:
|
||||
continue
|
||||
|
||||
phash_str = str(phash)
|
||||
|
||||
duplicate_media = get_media_by_phash(phash, username, existing_medias, threshold=5)
|
||||
if duplicate_media:
|
||||
print(f'Duplicate url found: https://altpins.com/pin/{duplicate_media[0]}')
|
||||
print(f'Duplicate video path: {filepath}')
|
||||
newpath = filepath.replace(directory, 'duplicates')
|
||||
os.makedirs(os.path.dirname(newpath), exist_ok=True)
|
||||
os.rename(filepath, newpath)
|
||||
print(f'Moved {video} to duplicates/')
|
||||
@ -1,58 +0,0 @@
|
||||
from funcs import generate_phash
|
||||
import os
|
||||
|
||||
def find_duplicates(source_dir, target_dir, extensions, max_distance):
|
||||
"""Remove duplicates in target_dir that are found in source_dir based on Hamming distance."""
|
||||
source_files = {}
|
||||
target_files = {}
|
||||
|
||||
# Helper function to filter files by extension
|
||||
def filter_files(files):
|
||||
return [f for f in files if os.path.splitext(f)[1].lower() in extensions]
|
||||
|
||||
# Build hash map of source directory
|
||||
for dirpath, _, filenames in os.walk(source_dir):
|
||||
for filename in filter_files(filenames):
|
||||
filepath = os.path.join(dirpath, filename)
|
||||
filehash = generate_phash(filepath, str=False)
|
||||
if filehash:
|
||||
source_files[filehash] = filepath
|
||||
|
||||
# Build hash map of target directory and compare
|
||||
for dirpath, _, filenames in os.walk(target_dir):
|
||||
for filename in filter_files(filenames):
|
||||
filepath = os.path.join(dirpath, filename)
|
||||
filehash = generate_phash(filepath, str=False)
|
||||
if not filehash:
|
||||
continue
|
||||
|
||||
# Check if this file is similar to any of the source files
|
||||
is_duplicate = False
|
||||
for source_hash in source_files.keys():
|
||||
distance = filehash - source_hash # Hamming distance
|
||||
if distance <= max_distance:
|
||||
is_duplicate = True
|
||||
break # Found a duplicate
|
||||
|
||||
if is_duplicate:
|
||||
newpath = os.path.join('duplicates', filename)
|
||||
os.makedirs(os.path.dirname(newpath), exist_ok=True)
|
||||
os.rename(filepath, newpath)
|
||||
print(f"Moved duplicate: {filepath} to duplicates/ (distance: {distance})")
|
||||
else:
|
||||
target_files[filehash] = filepath
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Paths to the directories
|
||||
source_dir = 'D:/Crawlers/media/Coomer/sadierayxo'
|
||||
target_dir = 'sorted/sadierayxo'
|
||||
|
||||
# List of accepted extensions
|
||||
extensions = {'.png', '.jpg', '.jpeg', '.webp', '.gif'}
|
||||
|
||||
# Maximum Hamming distance to consider as duplicates
|
||||
MAX_DISTANCE = 5 # Adjust this threshold as needed
|
||||
|
||||
find_duplicates(source_dir, target_dir, extensions, MAX_DISTANCE)
|
||||
|
||||
print("Duplicate removal process completed.")
|
||||
@ -1,112 +0,0 @@
|
||||
from PIL import Image
|
||||
import imagehash
|
||||
import config
|
||||
import funcs
|
||||
import cv2
|
||||
import os
|
||||
|
||||
directory = "old_snapchats"
|
||||
duplicate_dir = 'dupelicate_snaps'
|
||||
|
||||
def generate_video_phash(filepath):
|
||||
try:
|
||||
cap = cv2.VideoCapture(filepath)
|
||||
ret, frame = cap.read()
|
||||
cap.release()
|
||||
if not ret:
|
||||
return None
|
||||
phash = imagehash.phash(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))
|
||||
return str(phash)
|
||||
except:
|
||||
return None
|
||||
|
||||
def get_snapchat_files():
|
||||
stories = funcs.get_files(directory)
|
||||
stories = [get_media_data(filepath) for filepath in stories]
|
||||
stories = [story for story in stories if story]
|
||||
return stories
|
||||
|
||||
def get_media_data(filepath):
|
||||
filename = os.path.basename(filepath)
|
||||
parts = filename.split('~')
|
||||
if len(parts) < 3:
|
||||
return False
|
||||
|
||||
username = parts[0]
|
||||
timestamp = parts[1]
|
||||
snap_id = parts[2]
|
||||
snap_id = os.path.splitext(snap_id)[0]
|
||||
|
||||
# data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'snap_id': snap_id, 'original_snap_id': None}
|
||||
data = {'username': username, 'timestamp': timestamp, 'filepath': filepath, 'snap_id': None, 'original_snap_id': snap_id}
|
||||
|
||||
return data
|
||||
|
||||
def process_snap_ids(filenames):
|
||||
snap_ids = []
|
||||
for filename in filenames:
|
||||
snap_id = filename.split('~')[2]
|
||||
snap_id = os.path.splitext(snap_id)[0]
|
||||
if snap_id not in snap_ids:
|
||||
snap_ids.append(snap_id)
|
||||
|
||||
return snap_ids
|
||||
|
||||
def find_duplicate_snap(existing_snaps, current_snap):
|
||||
filepath = current_snap['filepath']
|
||||
original_snap_id = current_snap['original_snap_id']
|
||||
username = current_snap['username']
|
||||
|
||||
snap_hash = funcs.calculate_file_hash(current_snap['filepath'])
|
||||
if filepath.endswith('.mp4'):
|
||||
phash = generate_video_phash(current_snap['filepath'])
|
||||
elif filepath.endswith('.jpg'):
|
||||
phash = funcs.generate_phash(current_snap['filepath'])
|
||||
|
||||
for snap in existing_snaps:
|
||||
if username != snap[2]:
|
||||
continue
|
||||
|
||||
if original_snap_id in snap[1]:
|
||||
return snap
|
||||
if original_snap_id == snap[5]:
|
||||
return snap
|
||||
if snap_hash == snap[3]:
|
||||
return snap
|
||||
if phash == snap[4]:
|
||||
return snap
|
||||
|
||||
return False
|
||||
|
||||
if __name__ == '__main__':
|
||||
print('Starting snappy...')
|
||||
|
||||
db, cursor = config.gen_connection()
|
||||
obj_storage = config.get_storage()
|
||||
|
||||
stories_from_files = get_snapchat_files()
|
||||
|
||||
# this script will check if there are any duplicates in old_snapchats folder in the database in table media where platform = 'snapchat'
|
||||
cursor.execute("SELECT id, filename, username, hash, phash, original_snap_id FROM media WHERE filename IS NOT NULL AND platform = 'snapchat'")
|
||||
existing_medias = cursor.fetchall()
|
||||
|
||||
snap_files = get_snapchat_files()
|
||||
|
||||
os.makedirs(duplicate_dir, exist_ok=True)
|
||||
|
||||
for story in snap_files:
|
||||
print(f"Processing {story['username']}...")
|
||||
snap_id = story['snap_id']
|
||||
original_snap_id = story['original_snap_id']
|
||||
username = story['username']
|
||||
|
||||
# check if the snap_id is already in the database
|
||||
existing_snap = find_duplicate_snap(existing_medias, story)
|
||||
|
||||
if existing_snap:
|
||||
print(f"Snap {original_snap_id} already exists in the database.")
|
||||
new_filename = os.path.basename(story['filepath'])
|
||||
new_filepath = os.path.join(duplicate_dir, new_filename)
|
||||
os.rename(story['filepath'], new_filepath)
|
||||
|
||||
print("Processing completed.")
|
||||
Loading…
Reference in New Issue