You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
424 lines
16 KiB
Python
424 lines
16 KiB
Python
import requests
|
|
import hashlib
|
|
|
|
access_key = "ccd3f9d4-9e6f-4bd2-8f594402b5a7-3646-48fe"
|
|
video_library_id = 106867
|
|
|
|
def create_video(title):
|
|
url = f"https://video.bunnycdn.com/library/{video_library_id}/videos"
|
|
|
|
payload = f"{{\"title\":\"{title}\"}}"
|
|
headers = {
|
|
"accept": "application/json",
|
|
"content-type": "application/*+json",
|
|
"AccessKey": access_key
|
|
}
|
|
|
|
response = requests.post(url, data=payload, headers=headers)
|
|
|
|
return response
|
|
|
|
def generate_signature(library_id, api_key, expiration_time, video_id):
|
|
signature = hashlib.sha256((library_id + api_key + str(expiration_time) + video_id).encode()).hexdigest()
|
|
return signature
|
|
|
|
def upload_video_process(file_path, video_id):
|
|
url = f"https://video.bunnycdn.com/library/{video_library_id}/videos/{video_id}"
|
|
|
|
headers = {
|
|
"accept": "application/json",
|
|
"AccessKey": access_key
|
|
}
|
|
|
|
with open(file_path, "rb") as file:
|
|
file_data = file.read()
|
|
|
|
response = requests.put(url, headers=headers, data=file_data)
|
|
|
|
return response.status_code
|
|
|
|
def upload_video(file_path, title=None):
|
|
video_item = create_video(title)
|
|
if video_item.status_code != 200:
|
|
return False
|
|
|
|
video_id = video_item.json()['guid']
|
|
upload_video_process(file_path, video_id)
|
|
|
|
return {
|
|
"embed_link": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/playlist.m3u8",
|
|
"animated_thumbnail": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/preview.webp",
|
|
"default_thumbnail": f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/thumbnail.jpg",
|
|
}
|
|
|
|
|
|
def upload_video_recurbate(videoInfo):
|
|
title = f"{videoInfo['username']} {videoInfo['platform']}"
|
|
video_item = create_video(title)
|
|
if video_item.status_code != 200:
|
|
return False
|
|
|
|
video_id = video_item.json()['guid']
|
|
upload_video_process(videoInfo['filename'], video_id)
|
|
|
|
videoInfo["embed_link"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/playlist.m3u8"
|
|
videoInfo["animated_thumbnail"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/preview.webp"
|
|
videoInfo["default_thumbnail"] = f"https://vz-58ca89f1-986.b-cdn.net/{video_id}/thumbnail.jpg"
|
|
|
|
return True
|
|
|
|
def delete_video(video_id):
|
|
video_id = video_id.replace('https://vz-58ca89f1-986.b-cdn.net/', '').replace('/playlist.m3u8', '')
|
|
|
|
url = f"https://video.bunnycdn.com/library/{video_library_id}/videos/{video_id}"
|
|
|
|
headers = {
|
|
"accept": "application/json",
|
|
"AccessKey": access_key
|
|
}
|
|
|
|
response = requests.delete(url, headers=headers)
|
|
|
|
return response.status_code
|
|
|
|
def list_videos():
|
|
url = f"https://video.bunnycdn.com/library/{video_library_id}/videos?page=1&itemsPerPage=2147483647&orderBy=date"
|
|
|
|
headers = {
|
|
"accept": "application/json",
|
|
"AccessKey": access_key
|
|
}
|
|
|
|
response = requests.get(url, headers=headers)
|
|
|
|
return response.json()['items']
|
|
|
|
def get_heatmap(video_id):
|
|
url = "https://video.bunnycdn.com/library/libraryId/videos/videoId/heatmap"
|
|
url = url.replace('libraryId', str(video_library_id)).replace('videoId', str(video_id))
|
|
|
|
headers = {
|
|
"accept": "application/json",
|
|
"AccessKey": access_key
|
|
}
|
|
|
|
response = requests.get(url, headers=headers).json()
|
|
|
|
return response
|
|
|
|
def get_video(video_id):
|
|
url = "https://video.bunnycdn.com/library/libraryId/videos/videoId"
|
|
url = url.replace('libraryId', str(video_library_id)).replace('videoId', str(video_id))
|
|
|
|
headers = {
|
|
"accept": "application/json",
|
|
"AccessKey": access_key
|
|
}
|
|
|
|
response = requests.get(url, headers=headers).json()
|
|
|
|
return response
|
|
|
|
|
|
import os
|
|
import requests
|
|
from requests.exceptions import HTTPError
|
|
from urllib import parse
|
|
|
|
class Storage:
|
|
def __init__(self, api_key, storage_zone, storage_zone_region="de"):
|
|
"""
|
|
Creates an object for using BunnyCDN Storage API
|
|
Parameters
|
|
----------
|
|
api_key : String
|
|
Your bunnycdn storage
|
|
Apikey/FTP password of
|
|
storage zone
|
|
|
|
storage_zone : String
|
|
Name of your storage zone
|
|
|
|
storage_zone_region(optional parameter) : String
|
|
The storage zone region code
|
|
as per BunnyCDN
|
|
"""
|
|
self.headers = {
|
|
# headers to be passed in HTTP requests
|
|
"AccessKey": api_key,
|
|
"Content-Type": "application/json",
|
|
"Accept": "applcation/json",
|
|
}
|
|
|
|
# applying constraint that storage_zone must be specified
|
|
assert storage_zone != "", "storage_zone is not specified/missing"
|
|
|
|
# For generating base_url for sending requests
|
|
if storage_zone_region == "de" or storage_zone_region == "":
|
|
self.base_url = "https://storage.bunnycdn.com/" + storage_zone + "/"
|
|
else:
|
|
self.base_url = (
|
|
"https://"
|
|
+ storage_zone_region
|
|
+ ".storage.bunnycdn.com/"
|
|
+ storage_zone
|
|
+ "/"
|
|
)
|
|
|
|
def DownloadFile(self, storage_path, download_path=os.getcwd()):
|
|
"""
|
|
This function will get the files and subfolders of storage zone mentioned in path
|
|
and download it to the download_path location mentioned
|
|
Parameters
|
|
----------
|
|
storage_path : String
|
|
The path of the directory
|
|
(including file name and excluding storage zone name)
|
|
from which files are to be retrieved
|
|
download_path : String
|
|
The directory on local server to which downloaded file must be saved
|
|
Note:For download_path instead of '\' '\\' should be used example: C:\\Users\\XYZ\\OneDrive
|
|
"""
|
|
|
|
assert (
|
|
storage_path != ""
|
|
), "storage_path must be specified" # to make sure storage_path is not null
|
|
# to build correct url
|
|
if storage_path[0] == "/":
|
|
storage_path = storage_path[1:]
|
|
if storage_path[-1] == "/":
|
|
storage_path = storage_path[:-1]
|
|
url = self.base_url + parse.quote(storage_path)
|
|
file_name = url.split("/")[-1] # For storing file name
|
|
|
|
# to return appropriate help messages if file is present or not and download file if present
|
|
try:
|
|
response = requests.get(url, headers=self.headers, stream=True)
|
|
response.raise_for_status()
|
|
except HTTPError as http:
|
|
return {
|
|
"status": "error",
|
|
"HTTP": response.status_code,
|
|
"msg": f"Http error occured {http}",
|
|
}
|
|
except Exception as err:
|
|
return {
|
|
"status": "error",
|
|
"HTTP": response.status_code,
|
|
"msg": f"error occured {err}",
|
|
}
|
|
else:
|
|
download_path = os.path.join(download_path, file_name)
|
|
# Downloading file
|
|
with open(download_path, "wb") as file:
|
|
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
if chunk:
|
|
file.write(chunk)
|
|
return {
|
|
"status": "success",
|
|
"HTTP": response.status_code,
|
|
"msg": "File downloaded Successfully",
|
|
}
|
|
|
|
def PutFile(
|
|
self,
|
|
file_name,
|
|
storage_path=None,
|
|
local_upload_file_path=os.getcwd(),
|
|
):
|
|
|
|
"""
|
|
This function uploads files to your BunnyCDN storage zone
|
|
Parameters
|
|
----------
|
|
storage_path : String
|
|
The path of directory in storage zone
|
|
(including the name of file as desired and excluding storage zone name)
|
|
to which file is to be uploaded
|
|
file_name : String
|
|
The name of the file as stored in local server
|
|
local_upload_file_path : String
|
|
The path of file as stored in local server(excluding file name)
|
|
from where file is to be uploaded
|
|
Examples
|
|
--------
|
|
file_name : 'ABC.txt'
|
|
local_upload_file_path : 'C:\\User\\Sample_Directory'
|
|
storage_path : '<Directory name in storage zone>/<file name as to be uploaded on storage zone>.txt'
|
|
#Here .txt because the file being uploaded in example is txt
|
|
"""
|
|
local_upload_file_path = os.path.join(local_upload_file_path, file_name)
|
|
|
|
# to build correct url
|
|
if storage_path is not None and storage_path != "":
|
|
if storage_path[0] == "/":
|
|
storage_path = storage_path[1:]
|
|
if storage_path[-1] == "/":
|
|
storage_path = storage_path[:-1]
|
|
url = self.base_url + parse.quote(storage_path)
|
|
else:
|
|
url = self.base_url + parse.quote(file_name)
|
|
with open(local_upload_file_path, "rb") as file:
|
|
file_data = file.read()
|
|
response = requests.put(url, data=file_data, headers=self.headers)
|
|
try:
|
|
response.raise_for_status()
|
|
except HTTPError as http:
|
|
return {
|
|
"status": "error",
|
|
"HTTP": response.status_code,
|
|
"msg": f"Upload Failed HTTP Error Occured: {http}",
|
|
}
|
|
else:
|
|
return {
|
|
"status": "success",
|
|
"HTTP": response.status_code,
|
|
"msg": "The File Upload was Successful",
|
|
}
|
|
|
|
def DeleteFile(self, storage_path=""):
|
|
"""
|
|
This function deletes a file or folder mentioned in the storage_path from the storage zone
|
|
Parameters
|
|
----------
|
|
storage_path : The directory path to your file (including file name) or folder which is to be deleted.
|
|
If this is the root of your storage zone, you can ignore this parameter.
|
|
"""
|
|
# Add code below
|
|
assert (
|
|
storage_path != ""
|
|
), "storage_path must be specified" # to make sure storage_path is not null
|
|
# to build correct url
|
|
if storage_path[0] == "/":
|
|
storage_path = storage_path[1:]
|
|
url = self.base_url + parse.quote(storage_path)
|
|
|
|
try:
|
|
response = requests.delete(url, headers=self.headers)
|
|
response.raise_for_status
|
|
except HTTPError as http:
|
|
return {
|
|
"status": "error",
|
|
"HTTP": response.raise_for_status(),
|
|
"msg": f"HTTP Error occured: {http}",
|
|
}
|
|
except Exception as err:
|
|
return {
|
|
"status": "error",
|
|
"HTTP": response.status_code,
|
|
"msg": f"Object Delete failed ,Error occured:{err}",
|
|
}
|
|
else:
|
|
return {
|
|
"status": "success",
|
|
"HTTP": response.status_code,
|
|
"msg": "Object Successfully Deleted",
|
|
}
|
|
|
|
def GetStoragedObjectsList(self, storage_path=None):
|
|
"""
|
|
This functions returns a list of files and directories located in given storage_path.
|
|
Parameters
|
|
----------
|
|
storage_path : The directory path that you want to list.
|
|
"""
|
|
# to build correct url
|
|
if storage_path is not None:
|
|
if storage_path[0] == "/":
|
|
storage_path = storage_path[1:]
|
|
if storage_path[-1] != "/":
|
|
url = self.base_url + parse.quote(storage_path) + "/"
|
|
else:
|
|
url = self.base_url
|
|
# Sending GET request
|
|
try:
|
|
response = requests.get(url, headers=self.headers)
|
|
response.raise_for_status()
|
|
except HTTPError as http:
|
|
return {
|
|
"status": "error",
|
|
"HTTP": response.status_code,
|
|
"msg": f"http error occured {http}",
|
|
}
|
|
else:
|
|
storage_list = []
|
|
for dictionary in response.json():
|
|
temp_dict = {}
|
|
for key in dictionary:
|
|
if key == "ObjectName" and dictionary["IsDirectory"] is False:
|
|
temp_dict["File_Name"] = dictionary[key]
|
|
if key == "ObjectName" and dictionary["IsDirectory"]:
|
|
temp_dict["Folder_Name"] = dictionary[key]
|
|
storage_list.append(temp_dict)
|
|
return storage_list
|
|
|
|
def MoveFile(self, old_path, new_path):
|
|
"""
|
|
Moves a file by downloading from the old path and uploading to the new path,
|
|
then deleting from the old path. Uses existing PutFile and DeleteFile methods.
|
|
|
|
Parameters
|
|
----------
|
|
old_path : str
|
|
The current path (relative to storage zone root) of the file to move.
|
|
new_path : str
|
|
The new path (relative to storage zone root) for the file.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
A dictionary containing 'status', 'msg', and optionally 'HTTP'.
|
|
"""
|
|
# Validate arguments
|
|
if not old_path or not new_path:
|
|
return {
|
|
"status": "error",
|
|
"msg": "Both old_path and new_path must be provided."
|
|
}
|
|
|
|
# 1. Download from old_path to a temporary local directory
|
|
# If you already have the file locally, you can skip this download step.
|
|
download_response = self.DownloadFile(old_path, download_path="temp")
|
|
if download_response.get("status") != "success":
|
|
return {
|
|
"status": "error",
|
|
"msg": f"Failed to download file for moving. Reason: {download_response.get('msg', 'unknown')}",
|
|
"HTTP": download_response.get("HTTP")
|
|
}
|
|
|
|
# Extract the filename from old_path to know what we downloaded
|
|
filename = os.path.basename(old_path)
|
|
|
|
# 2. Upload to new_path using existing PutFile
|
|
# We'll assume new_path includes the desired filename. If it does not, adjust logic.
|
|
put_response = self.PutFile(
|
|
file_name=filename,
|
|
storage_path=new_path, # e.g. "folder/newfile.jpg"
|
|
local_upload_file_path="temp" # where we downloaded it
|
|
)
|
|
if put_response.get("status") != "success":
|
|
return {
|
|
"status": "error",
|
|
"msg": f"Failed to upload file to new path. Reason: {put_response.get('msg', 'unknown')}",
|
|
"HTTP": put_response.get("HTTP")
|
|
}
|
|
|
|
# 3. Delete the original file using existing DeleteFile
|
|
delete_response = self.DeleteFile(old_path)
|
|
if delete_response.get("status") != "success":
|
|
return {
|
|
"status": "error",
|
|
"msg": f"Failed to delete old file. Reason: {delete_response.get('msg', 'unknown')}",
|
|
"HTTP": delete_response.get("HTTP")
|
|
}
|
|
|
|
# (Optional) Clean up the local temp file
|
|
local_temp_path = os.path.join("temp", filename)
|
|
if os.path.exists(local_temp_path):
|
|
os.remove(local_temp_path)
|
|
|
|
return {
|
|
"status": "success",
|
|
"msg": f"File successfully moved from '{old_path}' to '{new_path}'."
|
|
} |