import requests import hashlib access_key = "ccd3f9d4-9e6f-4bd2-8f594402b5a7-3646-48fe" video_library_id = 106867 def create_video(title): url = f"https://video.bunnycdn.com/library/{video_library_id}/videos" payload = f"{{\"title\":\"{title}\"}}" headers = { "accept": "application/json", "content-type": "application/*+json", "AccessKey": access_key } response = requests.post(url, data=payload, headers=headers) return response def generate_signature(library_id, api_key, expiration_time, video_id): signature = hashlib.sha256((library_id + api_key + str(expiration_time) + video_id).encode()).hexdigest() return signature def upload_video_process(file_path, video_id): url = f"https://video.bunnycdn.com/library/{video_library_id}/videos/{video_id}" headers = { "accept": "application/json", "AccessKey": access_key } with open(file_path, "rb") as file: file_data = file.read() response = requests.put(url, headers=headers, data=file_data) return response.status_code def upload_video(file_path, title=None): video_item = create_video(title) if video_item.status_code != 200: return False video_id = video_item.json()['guid'] upload_video_process(file_path, video_id) return { "embed_link": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/playlist.m3u8", "animated_thumbnail": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/preview.webp", "default_thumbnail": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/thumbnail.jpg", } def upload_video_recurbate(videoInfo): title = f"{videoInfo['username']} {videoInfo['platform']}" video_item = create_video(title) if video_item.status_code != 200: return False video_id = video_item.json()['guid'] upload_video_process(videoInfo['filename'], video_id) videoInfo["embed_link"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/playlist.m3u8" videoInfo["animated_thumbnail"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/preview.webp" videoInfo["default_thumbnail"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/thumbnail.jpg" return True def delete_video(video_id): video_id = video_id.replace('https://vz-58ca89f1-986.b-cdn.net/', '').replace('/playlist.m3u8', '') url = f"https://video.bunnycdn.com/library/{video_library_id}/videos/{video_id}" headers = { "accept": "application/json", "AccessKey": access_key } response = requests.delete(url, headers=headers) return response.status_code def list_videos(): url = f"https://video.bunnycdn.com/library/{video_library_id}/videos?page=1&itemsPerPage=2147483647&orderBy=date" headers = { "accept": "application/json", "AccessKey": access_key } response = requests.get(url, headers=headers) return response.json()['items'] def get_heatmap(video_id): url = "https://video.bunnycdn.com/library/libraryId/videos/videoId/heatmap" url = url.replace('libraryId', str(video_library_id)).replace('videoId', str(video_id)) headers = { "accept": "application/json", "AccessKey": access_key } response = requests.get(url, headers=headers).json() return response def get_video(video_id): url = "https://video.bunnycdn.com/library/libraryId/videos/videoId" url = url.replace('libraryId', str(video_library_id)).replace('videoId', str(video_id)) headers = { "accept": "application/json", "AccessKey": access_key } response = requests.get(url, headers=headers).json() return response import os import requests from requests.exceptions import HTTPError from urllib import parse class Storage: def __init__(self, api_key, storage_zone, storage_zone_region="de"): """ Creates an object for using BunnyCDN Storage API Parameters ---------- api_key : String Your bunnycdn storage Apikey/FTP password of storage zone storage_zone : String Name of your storage zone storage_zone_region(optional parameter) : String The storage zone region code as per BunnyCDN """ self.headers = { # headers to be passed in HTTP requests "AccessKey": api_key, "Content-Type": "application/json", "Accept": "applcation/json", } # applying constraint that storage_zone must be specified assert storage_zone != "", "storage_zone is not specified/missing" # For generating base_url for sending requests if storage_zone_region == "de" or storage_zone_region == "": self.base_url = "https://storage.bunnycdn.com/" + storage_zone + "/" else: self.base_url = ( "https://" + storage_zone_region + ".storage.bunnycdn.com/" + storage_zone + "/" ) def DownloadFile(self, storage_path, download_path=os.getcwd()): """ This function will get the files and subfolders of storage zone mentioned in path and download it to the download_path location mentioned Parameters ---------- storage_path : String The path of the directory (including file name and excluding storage zone name) from which files are to be retrieved download_path : String The directory on local server to which downloaded file must be saved Note:For download_path instead of '\' '\\' should be used example: C:\\Users\\XYZ\\OneDrive """ assert ( storage_path != "" ), "storage_path must be specified" # to make sure storage_path is not null # to build correct url if storage_path[0] == "/": storage_path = storage_path[1:] if storage_path[-1] == "/": storage_path = storage_path[:-1] url = self.base_url + parse.quote(storage_path) file_name = url.split("/")[-1] # For storing file name # to return appropriate help messages if file is present or not and download file if present try: response = requests.get(url, headers=self.headers, stream=True) response.raise_for_status() except HTTPError as http: return { "status": "error", "HTTP": response.status_code, "msg": f"Http error occured {http}", } except Exception as err: return { "status": "error", "HTTP": response.status_code, "msg": f"error occured {err}", } else: download_path = os.path.join(download_path, file_name) # Downloading file with open(download_path, "wb") as file: for chunk in response.iter_content(chunk_size=1024): if chunk: file.write(chunk) return { "status": "success", "HTTP": response.status_code, "msg": "File downloaded Successfully", } def PutFile( self, file_name, storage_path=None, local_upload_file_path=os.getcwd(), ): """ This function uploads files to your BunnyCDN storage zone Parameters ---------- storage_path : String The path of directory in storage zone (including the name of file as desired and excluding storage zone name) to which file is to be uploaded file_name : String The name of the file as stored in local server local_upload_file_path : String The path of file as stored in local server(excluding file name) from where file is to be uploaded Examples -------- file_name : 'ABC.txt' local_upload_file_path : 'C:\\User\\Sample_Directory' storage_path : '/.txt' #Here .txt because the file being uploaded in example is txt """ local_upload_file_path = os.path.join(local_upload_file_path, file_name) # to build correct url if storage_path is not None and storage_path != "": if storage_path[0] == "/": storage_path = storage_path[1:] if storage_path[-1] == "/": storage_path = storage_path[:-1] url = self.base_url + parse.quote(storage_path) else: url = self.base_url + parse.quote(file_name) with open(local_upload_file_path, "rb") as file: file_data = file.read() response = requests.put(url, data=file_data, headers=self.headers) try: response.raise_for_status() except HTTPError as http: return { "status": "error", "HTTP": response.status_code, "msg": f"Upload Failed HTTP Error Occured: {http}", } else: return { "status": "success", "HTTP": response.status_code, "msg": "The File Upload was Successful", } def DeleteFile(self, storage_path=""): """ This function deletes a file or folder mentioned in the storage_path from the storage zone Parameters ---------- storage_path : The directory path to your file (including file name) or folder which is to be deleted. If this is the root of your storage zone, you can ignore this parameter. """ # Add code below assert ( storage_path != "" ), "storage_path must be specified" # to make sure storage_path is not null # to build correct url if storage_path[0] == "/": storage_path = storage_path[1:] url = self.base_url + parse.quote(storage_path) try: response = requests.delete(url, headers=self.headers) response.raise_for_status except HTTPError as http: return { "status": "error", "HTTP": response.raise_for_status(), "msg": f"HTTP Error occured: {http}", } except Exception as err: return { "status": "error", "HTTP": response.status_code, "msg": f"Object Delete failed ,Error occured:{err}", } else: return { "status": "success", "HTTP": response.status_code, "msg": "Object Successfully Deleted", } def GetStoragedObjectsList(self, storage_path=None): """ This functions returns a list of files and directories located in given storage_path. Parameters ---------- storage_path : The directory path that you want to list. """ # to build correct url if storage_path is not None: if storage_path[0] == "/": storage_path = storage_path[1:] if storage_path[-1] != "/": url = self.base_url + parse.quote(storage_path) + "/" else: url = self.base_url # Sending GET request try: response = requests.get(url, headers=self.headers) response.raise_for_status() except HTTPError as http: return { "status": "error", "HTTP": response.status_code, "msg": f"http error occured {http}", } else: storage_list = [] for dictionary in response.json(): temp_dict = {} for key in dictionary: if key == "ObjectName" and dictionary["IsDirectory"] is False: temp_dict["File_Name"] = dictionary[key] if key == "ObjectName" and dictionary["IsDirectory"]: temp_dict["Folder_Name"] = dictionary[key] storage_list.append(temp_dict) return storage_list def MoveFile(self, old_path, new_path): """ Moves a file by downloading from the old path and uploading to the new path, then deleting from the old path. Uses existing PutFile and DeleteFile methods. Parameters ---------- old_path : str The current path (relative to storage zone root) of the file to move. new_path : str The new path (relative to storage zone root) for the file. Returns ------- dict A dictionary containing 'status', 'msg', and optionally 'HTTP'. """ # Validate arguments if not old_path or not new_path: return { "status": "error", "msg": "Both old_path and new_path must be provided." } # 1. Download from old_path to a temporary local directory # If you already have the file locally, you can skip this download step. download_response = self.DownloadFile(old_path, download_path="temp") if download_response.get("status") != "success": return { "status": "error", "msg": f"Failed to download file for moving. Reason: {download_response.get('msg', 'unknown')}", "HTTP": download_response.get("HTTP") } # Extract the filename from old_path to know what we downloaded filename = os.path.basename(old_path) # 2. Upload to new_path using existing PutFile # We'll assume new_path includes the desired filename. If it does not, adjust logic. put_response = self.PutFile( file_name=filename, storage_path=new_path, # e.g. "folder/newfile.jpg" local_upload_file_path="temp" # where we downloaded it ) if put_response.get("status") != "success": return { "status": "error", "msg": f"Failed to upload file to new path. Reason: {put_response.get('msg', 'unknown')}", "HTTP": put_response.get("HTTP") } # 3. Delete the original file using existing DeleteFile delete_response = self.DeleteFile(old_path) if delete_response.get("status") != "success": return { "status": "error", "msg": f"Failed to delete old file. Reason: {delete_response.get('msg', 'unknown')}", "HTTP": delete_response.get("HTTP") } # (Optional) Clean up the local temp file local_temp_path = os.path.join("temp", filename) if os.path.exists(local_temp_path): os.remove(local_temp_path) return { "status": "success", "msg": f"File successfully moved from '{old_path}' to '{new_path}'." }