You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
6.3 KiB
Python

import os
import cv2
import hashlib
import requests
import imagehash
import numpy as np
from PIL import Image
from moviepy.editor import VideoFileClip
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"}
proxies={"http": "http://yehyuxsl-rotate:4tl5bvrwkz5e@p.webshare.io:80/","https": "http://yehyuxsl-rotate:4tl5bvrwkz5e@p.webshare.io:80/"}
def get_file_extension(url):
response = requests.head(url)
if response.status_code != 200:
print(f"Failed to access media {url}")
return None
content_type = response.headers.get('Content-Type', '')
if 'image' in content_type:
return '.jpg'
elif 'video' in content_type:
return '.mp4'
else:
print(f"Unknown content type for media {url}")
return None
def generate_phash(image_path):
try:
image = Image.open(image_path)
return str(imagehash.phash(image))
except Exception as e:
print(f"Error generating phash for {image_path}: {e}")
return False
def clean_empty_folders(path):
for root, dirs, fs in os.walk(path):
for d in dirs:
clean_empty_folders(os.path.join(root, d))
if not os.listdir(root):
os.rmdir(root)
def get_files(directory):
files = []
for root, dirs, filenames in os.walk(directory):
for filename in filenames:
if filename.startswith('.'):
continue
files.append(os.path.join(root, filename))
return files
def compare_images(image_path1, image_path2):
# Load the images in grayscale
img1 = cv2.imread(image_path1, cv2.IMREAD_GRAYSCALE)
img2 = cv2.imread(image_path2, cv2.IMREAD_GRAYSCALE)
if img1 is None or img2 is None:
print("Error loading images!")
return False # Or you could raise an exception
# Initialize SIFT detector
sift = cv2.SIFT_create()
# Find keypoints and descriptors with SIFT
kp1, des1 = sift.detectAndCompute(img1, None)
kp2, des2 = sift.detectAndCompute(img2, None)
# Check if descriptors are None
if des1 is None or des2 is None:
return False
# FLANN parameters
index_params = dict(algorithm=1, trees=5)
search_params = dict(checks=50)
# FLANN based matcher
flann = cv2.FlannBasedMatcher(index_params, search_params)
# Matching descriptor vectors using KNN algorithm
matches = flann.knnMatch(des1, des2, k=2)
# Apply ratio test
good = []
for m, n in matches:
if m.distance < 0.6 * n.distance: # More stringent ratio
good.append(m)
# Minimum number of matches
MIN_MATCH_COUNT = 15 # Adjust this threshold as needed
if len(good) > MIN_MATCH_COUNT:
# Extract location of good matches
src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
# Find homography
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
matchesMask = mask.ravel().tolist()
if np.sum(matchesMask) > 10: # Check if enough points agree on homography
return True
else:
return False
else:
return False
def download_file(url, filePath):
try:
if os.path.exists(filePath):
return filePath
if not url:
print(f"Invalid URL: {url}")
return False
response = requests.get(url, stream=True, headers=headers)
if response.status_code != 200:
print(f"Failed to download {url}. Status code: {response.status_code}")
return False
os.makedirs(os.path.dirname(filePath), exist_ok=True)
with open(filePath, "wb") as out_file:
for chunk in response.iter_content(chunk_size=8192):
out_file.write(chunk)
return filePath
except Exception as e:
print(f"Failed to download {url}. Error: {e}")
return False
def get_media_type(filename):
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif", ".svg", ".eps", ".raw", ".cr2", ".nef", ".orf", ".sr2", ".heic", ".indd", ".ai", ".psd", ".svg"}
video_extensions = {".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".webm", ".vob", ".ogg", ".ts", ".flv"}
filetype_dict = {"image": image_extensions, "video": video_extensions}
extension = os.path.splitext(filename.lower())[1] # Get the extension and convert to lower case
for filetype, extensions in filetype_dict.items():
if extension in extensions:
return filetype
return None
def get_video_duration(file_path):
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
return 0
if not get_media_type(file_path) == 'video':
return 0
try:
with VideoFileClip(file_path) as video:
duration = video.duration
if duration == 0:
duration = 1
return duration
except Exception as e:
print(f"Error getting duration for {file_path}: {e}")
return 0
def get_media_dimensions(media_path):
if get_media_type(media_path) == 'video':
return get_video_dimensions(media_path)
else:
return get_image_dimensions(media_path)
def get_video_dimensions(video_path):
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return width, height
def get_image_dimensions(image_path):
try:
with Image.open(image_path) as img:
return img.size
except:
return 0, 0
def get_video_data(video_path):
data = {'duration': 0, 'width': 0, 'height': 0}
try:
with VideoFileClip(video_path) as video:
data['duration'] = video.duration
data['width'] = video.size[0]
data['height'] = video.size[1]
except Exception as e:
print(f"Error getting video data for {video_path}: {e}")
return data
def calculate_file_hash(file_path, hash_func='sha256'):
h = hashlib.new(hash_func)
with open(file_path, 'rb') as file:
chunk = file.read(8192)
while chunk:
h.update(chunk)
chunk = file.read(8192)
return h.hexdigest()