You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

174 lines
5.4 KiB
Python

from moviepy.editor import VideoFileClip
import os, cv2, hashlib, requests
from PIL import Image
import numpy as np
import imagehash
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"}
proxies={"http": "http://yehyuxsl-rotate:4tl5bvrwkz5e@p.webshare.io:80/","https": "http://yehyuxsl-rotate:4tl5bvrwkz5e@p.webshare.io:80/"}
def generate_phash(image_path):
try:
image = Image.open(image_path)
return str(imagehash.phash(image))
except:
return False
def cleanEmptyFolders(path):
for root, dirs, fs in os.walk(path):
for d in dirs:
cleanEmptyFolders(os.path.join(root, d))
if not os.listdir(root):
os.rmdir(root)
def get_files(directory):
files = []
for root, dirs, filenames in os.walk(directory):
for filename in filenames:
files.append(os.path.join(root, filename))
return files
import cv2
import numpy as np
def compare_images(image_path1, image_path2):
# Load the images in grayscale
img1 = cv2.imread(image_path1, cv2.IMREAD_GRAYSCALE)
img2 = cv2.imread(image_path2, cv2.IMREAD_GRAYSCALE)
if img1 is None or img2 is None:
print("Error loading images!")
return False # Or you could raise an exception
# Initialize SIFT detector
sift = cv2.SIFT_create()
# Find keypoints and descriptors with SIFT
kp1, des1 = sift.detectAndCompute(img1, None)
kp2, des2 = sift.detectAndCompute(img2, None)
# Check if descriptors are None
if des1 is None or des2 is None:
return False
# FLANN parameters
index_params = dict(algorithm=1, trees=5)
search_params = dict(checks=50)
# FLANN based matcher
flann = cv2.FlannBasedMatcher(index_params, search_params)
# Matching descriptor vectors using KNN algorithm
matches = flann.knnMatch(des1, des2, k=2)
# Apply ratio test
good = []
for m, n in matches:
if m.distance < 0.6 * n.distance: # More stringent ratio
good.append(m)
# Minimum number of matches
MIN_MATCH_COUNT = 15 # Adjust this threshold as needed
if len(good) > MIN_MATCH_COUNT:
# Extract location of good matches
src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
# Find homography
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
matchesMask = mask.ravel().tolist()
if np.sum(matchesMask) > 10: # Check if enough points agree on homography
return True
else:
return False
else:
return False
def download_file(url, filePath):
try:
response = requests.get(url, stream=True, headers=headers)
response.raise_for_status()
directory = os.path.dirname(filePath)
if not os.path.exists(directory):
os.makedirs(directory)
with open(filePath, "wb") as out_file:
for chunk in response.iter_content(chunk_size=8192):
out_file.write(chunk)
print(f"Downloaded {filePath}")
except Exception as e:
print(f"Failed to download {url}. Error: {e}")
def determine_post_type(filepath, mediatype):
if mediatype == 'image':
try:
with Image.open(filepath) as img:
width, height = img.size
except:
print(f"Error opening image {filepath}")
return False
elif mediatype == 'video':
width, height = get_video_dimensions(filepath)
else:
return False
if 0 in (width, height):
return False
aspect_ratio = width / height
if aspect_ratio > 0.5 and aspect_ratio < 0.6:
return 'stories'
else:
return 'posts'
def get_media_type(filename):
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".tif", ".svg", ".eps", ".raw", ".cr2", ".nef", ".orf", ".sr2", ".heic", ".indd", ".ai", ".psd", ".svg"}
video_extensions = {".mp4", ".mov"}
extension = os.path.splitext(filename.lower())[1] # Get the extension and convert to lower case
if extension in image_extensions:
return 'image'
elif extension in video_extensions:
return 'video'
else:
return 'unknown'
def get_video_duration(file_path):
try:
with VideoFileClip(file_path) as video:
return video.duration
except Exception as e:
print(f"Error getting duration for {file_path}: {e}")
return 0
def get_video_dimensions(video_path):
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return width, height
def get_video_data(video_path):
data = {'duration': 0, 'width': 0, 'height': 0}
try:
with VideoFileClip(video_path) as video:
data['duration'] = video.duration
data['width'] = video.size[0]
data['height'] = video.size[1]
except Exception as e:
print(f"Error getting video data for {video_path}: {e}")
return data
def calculate_file_hash(file_path, hash_func='sha256'):
h = hashlib.new(hash_func)
with open(file_path, 'rb') as file:
chunk = file.read(8192)
while chunk:
h.update(chunk)
chunk = file.read(8192)
return h.hexdigest()