You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
6.7 KiB
Python

11 months ago
import os
import cv2
import hashlib
import requests
11 months ago
import imagehash
11 months ago
import numpy as np
from PIL import Image
from moviepy.editor import VideoFileClip
11 months ago
11 months ago
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"}
proxies={"http": "http://yehyuxsl-rotate:4tl5bvrwkz5e@p.webshare.io:80/","https": "http://yehyuxsl-rotate:4tl5bvrwkz5e@p.webshare.io:80/"}
11 months ago
9 months ago
def get_file_extension(url):
response = requests.head(url)
if response.status_code != 200:
print(f"Failed to access media {url}")
return None
content_type = response.headers.get('Content-Type', '')
if 'image' in content_type:
return '.jpg'
elif 'video' in content_type:
return '.mp4'
else:
print(f"Unknown content type for media {url}")
return None
11 months ago
def generate_phash(image_path):
try:
image = Image.open(image_path)
return str(imagehash.phash(image))
11 months ago
except Exception as e:
print(f"Error generating phash for {image_path}: {e}")
11 months ago
return False
9 months ago
def clean_empty_folders(path):
11 months ago
for root, dirs, fs in os.walk(path):
for d in dirs:
9 months ago
clean_empty_folders(os.path.join(root, d))
11 months ago
if not os.listdir(root):
os.rmdir(root)
def get_files(directory):
files = []
for root, dirs, filenames in os.walk(directory):
for filename in filenames:
9 months ago
if filename.startswith('.'):
continue
11 months ago
files.append(os.path.join(root, filename))
return files
def compare_images(image_path1, image_path2):
# Load the images in grayscale
img1 = cv2.imread(image_path1, cv2.IMREAD_GRAYSCALE)
img2 = cv2.imread(image_path2, cv2.IMREAD_GRAYSCALE)
if img1 is None or img2 is None:
print("Error loading images!")
return False # Or you could raise an exception
# Initialize SIFT detector
sift = cv2.SIFT_create()
# Find keypoints and descriptors with SIFT
kp1, des1 = sift.detectAndCompute(img1, None)
kp2, des2 = sift.detectAndCompute(img2, None)
# Check if descriptors are None
if des1 is None or des2 is None:
return False
# FLANN parameters
index_params = dict(algorithm=1, trees=5)
search_params = dict(checks=50)
# FLANN based matcher
flann = cv2.FlannBasedMatcher(index_params, search_params)
# Matching descriptor vectors using KNN algorithm
matches = flann.knnMatch(des1, des2, k=2)
# Apply ratio test
good = []
for m, n in matches:
if m.distance < 0.6 * n.distance: # More stringent ratio
good.append(m)
# Minimum number of matches
MIN_MATCH_COUNT = 15 # Adjust this threshold as needed
if len(good) > MIN_MATCH_COUNT:
# Extract location of good matches
src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
# Find homography
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
matchesMask = mask.ravel().tolist()
if np.sum(matchesMask) > 10: # Check if enough points agree on homography
return True
else:
return False
else:
return False
def download_file(url, filePath):
try:
9 months ago
if os.path.exists(filePath):
3 months ago
print(f"File already exists: {filePath}")
9 months ago
return filePath
11 months ago
9 months ago
if not url:
print(f"Invalid URL: {url}")
return False
response = requests.get(url, stream=True, headers=headers)
11 months ago
9 months ago
if response.status_code != 200:
print(f"Failed to download {url}. Status code: {response.status_code}")
return False
os.makedirs(os.path.dirname(filePath), exist_ok=True)
11 months ago
with open(filePath, "wb") as out_file:
for chunk in response.iter_content(chunk_size=8192):
out_file.write(chunk)
11 months ago
9 months ago
return filePath
11 months ago
except Exception as e:
print(f"Failed to download {url}. Error: {e}")
11 months ago
return False
11 months ago
def get_media_type(filename):
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif", ".svg", ".eps", ".raw", ".cr2", ".nef", ".orf", ".sr2", ".heic", ".indd", ".ai", ".psd", ".svg"}
9 months ago
video_extensions = {".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".webm", ".vob", ".ogg", ".ts", ".flv"}
11 months ago
filetype_dict = {"image": image_extensions, "video": video_extensions}
11 months ago
extension = os.path.splitext(filename.lower())[1] # Get the extension and convert to lower case
11 months ago
for filetype, extensions in filetype_dict.items():
if extension in extensions:
return filetype
return None
11 months ago
def get_video_duration(file_path):
11 months ago
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
return 0
9 months ago
if not get_media_type(file_path) == 'video':
11 months ago
return 0
11 months ago
try:
with VideoFileClip(file_path) as video:
11 months ago
duration = video.duration
if duration == 0:
duration = 1
return duration
11 months ago
except Exception as e:
print(f"Error getting duration for {file_path}: {e}")
return 0
11 months ago
9 months ago
def get_media_dimensions(media_path):
if get_media_type(media_path) == 'video':
return get_video_dimensions(media_path)
else:
return get_image_dimensions(media_path)
11 months ago
def get_video_dimensions(video_path):
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return width, height
9 months ago
def get_image_dimensions(image_path):
try:
with Image.open(image_path) as img:
11 months ago
return img.size
9 months ago
except:
return 0, 0
11 months ago
def get_video_data(video_path):
data = {'duration': 0, 'width': 0, 'height': 0}
try:
with VideoFileClip(video_path) as video:
data['duration'] = video.duration
data['width'] = video.size[0]
data['height'] = video.size[1]
except Exception as e:
print(f"Error getting video data for {video_path}: {e}")
return data
11 months ago
def calculate_file_hash(file_path, hash_func='sha256'):
h = hashlib.new(hash_func)
with open(file_path, 'rb') as file:
chunk = file.read(8192)
while chunk:
h.update(chunk)
chunk = file.read(8192)
3 months ago
return h.hexdigest()
def files_are_identical(file1, file2):
"""Compare two files byte-by-byte."""
with open(file1, "rb") as f1, open(file2, "rb") as f2:
while True:
chunk1 = f1.read(4096)
chunk2 = f2.read(4096)
if chunk1 != chunk2:
return False
if not chunk1: # End of file
return True