mirror of
https://github.com/MuRuLOSE/limoka.git
synced 2026-06-18 15:14:18 +02:00
Added and updated repositories 2026-01-10 01:09:56
This commit is contained in:
@@ -26,20 +26,21 @@
|
||||
# scope: Api TikTokDownloader 0.0.1
|
||||
# ---------------------------------------------------------------------------------
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import re
|
||||
import os
|
||||
import warnings
|
||||
import functools
|
||||
import logging
|
||||
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import List, Optional, Union
|
||||
from urllib.parse import urljoin
|
||||
from typing import Union, Optional, List
|
||||
|
||||
import aiohttp
|
||||
from tqdm import tqdm
|
||||
|
||||
from .. import loader, utils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class data:
|
||||
@@ -51,8 +52,11 @@ class data:
|
||||
class TikTok:
|
||||
def __init__(self, host: Optional[str] = None):
|
||||
self.headers = {
|
||||
"User-Agent": "Mozilla/5.0 (iPad; U; CPU OS 3_2 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) "
|
||||
"Version/4.0.4 Mobile/7B334b Safari/531.21.10"
|
||||
"User-Agent": (
|
||||
"Mozilla/5.0 (iPad; U; CPU OS 3_2 like Mac OS X; en-us) "
|
||||
"AppleWebKit/531.21.10 (KHTML, like Gecko) Version/4.0.4 "
|
||||
"Mobile/7B334b Safari/531.21.10"
|
||||
)
|
||||
}
|
||||
self.host = host or "https://www.tikwm.com/"
|
||||
self.session = aiohttp.ClientSession()
|
||||
@@ -73,42 +77,25 @@ class TikTok:
|
||||
self.logger.addHandler(handler)
|
||||
self.logger.setLevel(logging.INFO)
|
||||
|
||||
def _warn(reason: str = "This function is NOT used but may be useful"):
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
warnings.warn(
|
||||
f"Warning! Deprecated: {func.__name__}\nReason: {reason}",
|
||||
category=DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
async def close_session(self):
|
||||
await self.session.close()
|
||||
|
||||
async def _ensure_data(self, link: str):
|
||||
try:
|
||||
if self.result is None or self.link != link:
|
||||
self.link = link
|
||||
self.result = await self.fetch(link)
|
||||
self.logger.info("Successfully ensured data from the link")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error occurred when trying to get data from tikwm: {e}")
|
||||
raise
|
||||
async def __ensure_data(self, link: str):
|
||||
if self.link != link:
|
||||
self.link = link
|
||||
self.result = await self._fetch_data(link)
|
||||
self.logger.info("Successfully ensured data from the link")
|
||||
|
||||
async def __getimages(self, download_dir: Optional[str] = None):
|
||||
async def __get_images(self, download_dir: Optional[str] = None):
|
||||
download_dir = download_dir or self.result["id"]
|
||||
os.makedirs(download_dir, exist_ok=True)
|
||||
|
||||
tasks = [
|
||||
self._download_file(url, os.path.join(download_dir, f"image_{i + 1}.jpg"))
|
||||
for i, url in enumerate(self.result["images"])
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
self.logger.info(f"Images - Downloaded and saved photos to {download_dir}")
|
||||
|
||||
return data(
|
||||
@@ -120,7 +107,7 @@ class TikTok:
|
||||
type="images",
|
||||
)
|
||||
|
||||
async def __getvideo(self, video_filename: Optional[str] = None, hd: bool = False):
|
||||
async def __get_video(self, video_filename: Optional[str] = None, hd: bool = False):
|
||||
video_url = self.result["hdplay"] if hd else self.result["play"]
|
||||
video_filename = video_filename or f"{self.result['id']}.mp4"
|
||||
|
||||
@@ -141,7 +128,50 @@ class TikTok:
|
||||
dir_name=os.path.dirname(video_filename), media=video_filename, type="video"
|
||||
)
|
||||
|
||||
async def _makerequest(self, endpoint: str, params: dict) -> dict:
|
||||
async def _fetch_data(self, link: str) -> dict:
|
||||
url = self.get_url(link)
|
||||
params = {"url": url, "hd": 1}
|
||||
return await self._make_request(self.data_endpoint, params=params)
|
||||
|
||||
async def _download_file(self, url: str, path: str):
|
||||
async with self.session.get(url) as response:
|
||||
response.raise_for_status()
|
||||
with open(path, "wb") as file:
|
||||
while chunk := await response.content.read(1024):
|
||||
file.write(chunk)
|
||||
|
||||
async def download_sound(
|
||||
self,
|
||||
link: str,
|
||||
audio_filename: Optional[str] = None,
|
||||
audio_ext: Optional[str] = ".mp3",
|
||||
):
|
||||
await self.__ensure_data(link)
|
||||
|
||||
if not audio_filename:
|
||||
audio_filename = f"{self.result['music_info']['title']}{audio_ext}"
|
||||
else:
|
||||
audio_filename += audio_ext
|
||||
|
||||
await self._download_file(self.result["music_info"]["play"], audio_filename)
|
||||
self.logger.info(f"Sound - Downloaded and saved sound as {audio_filename}")
|
||||
return audio_filename
|
||||
|
||||
async def download(
|
||||
self, link: str, video_filename: Optional[str] = None, hd: bool = True
|
||||
):
|
||||
await self.__ensure_data(link)
|
||||
|
||||
if "images" in self.result:
|
||||
return await self.__get_images(video_filename)
|
||||
|
||||
if "hdplay" in self.result or "play" in self.result:
|
||||
return await self.__get_video(video_filename, hd)
|
||||
|
||||
self.logger.error("No downloadable content found in the provided link.")
|
||||
raise Exception("No downloadable content found in the provided link.")
|
||||
|
||||
async def _make_request(self, endpoint: str, params: dict) -> dict:
|
||||
async with self.session.get(
|
||||
urljoin(self.host, endpoint), params=params, headers=self.headers
|
||||
) as response:
|
||||
@@ -154,87 +184,12 @@ class TikTok:
|
||||
urls = re.findall(r"http[s]?://[^\s]+", text)
|
||||
return urls[0] if urls else None
|
||||
|
||||
@_warn()
|
||||
async def convert_share_urls(self, url: str) -> Optional[str]:
|
||||
url = self.get_url(url)
|
||||
if "@" in url:
|
||||
return url
|
||||
async with self.session.get(
|
||||
url, headers=self.headers, allow_redirects=False
|
||||
) as response:
|
||||
if response.status == 301:
|
||||
return response.headers["Location"].split("?")[0]
|
||||
return None
|
||||
|
||||
@_warn()
|
||||
async def get_tiktok_video_id(self, original_url: str) -> Optional[str]:
|
||||
original_url = await self.convert_share_urls(original_url)
|
||||
matches = re.findall(r"/video|v|photo/(\d+)", original_url)
|
||||
return matches[0] if matches else None
|
||||
|
||||
async def fetch(self, link: str) -> dict:
|
||||
url = self.get_url(link)
|
||||
params = {"url": url, "hd": 1}
|
||||
return await self._makerequest(self.data_endpoint, params=params)
|
||||
|
||||
async def _download_file(self, url: str, path: str):
|
||||
async with self.session.get(url) as response:
|
||||
response.raise_for_status()
|
||||
with open(path, "wb") as file:
|
||||
while chunk := await response.content.read(1024):
|
||||
file.write(chunk)
|
||||
|
||||
async def download_sound(
|
||||
self,
|
||||
link: Union[str],
|
||||
audio_filename: Optional[str] = None,
|
||||
audio_ext: Optional[str] = ".mp3",
|
||||
):
|
||||
await self._ensure_data(link)
|
||||
|
||||
if not audio_filename:
|
||||
audio_filename = f"{self.result['music_info']['title']}{audio_ext}"
|
||||
else:
|
||||
audio_filename += audio_ext
|
||||
|
||||
await self._download_file(self.result["music_info"]["play"], audio_filename)
|
||||
self.logger.info(f"Sound - Downloaded and saved sound as {audio_filename}")
|
||||
return audio_filename
|
||||
|
||||
async def download(
|
||||
self, link: Union[str], video_filename: Optional[str] = None, hd: bool = True
|
||||
) -> data:
|
||||
"""
|
||||
Asynchronously downloads a TikTok video or photo post.
|
||||
|
||||
Args:
|
||||
video_filename (Optional[str]): The name of the file for the TikTok video or photo. If None, the file will be named based on the video or photo ID.
|
||||
hd (bool): If True, downloads the video in HD format. Defaults to False.
|
||||
|
||||
Returns:
|
||||
dir_name (str): Directory name
|
||||
media (Union[str, List[str]]): Full list of downloaded media
|
||||
type (str): The type of downloaded objects: Images or video
|
||||
|
||||
Raises:
|
||||
Exception: No downloadable content found in the provided link.
|
||||
|
||||
"""
|
||||
await self._ensure_data(link)
|
||||
if "images" in self.result:
|
||||
self.logger.info("Starting to download images")
|
||||
return await self.__getimages(video_filename)
|
||||
elif "hdplay" in self.result or "play" in self.result:
|
||||
self.logger.info("Starting to download video.")
|
||||
return await self.__getvideo(video_filename, hd)
|
||||
else:
|
||||
self.logger.error("No downloadable content found in the provided link.")
|
||||
raise Exception("No downloadable content found in the provided link.")
|
||||
|
||||
def _get_video_link(self, unique_id: str, aweme_id: str) -> str:
|
||||
@staticmethod
|
||||
def _get_video_link(unique_id: str, aweme_id: str) -> str:
|
||||
return f"https://www.tiktok.com/@{unique_id}/video/{aweme_id}"
|
||||
|
||||
def _get_uploader_link(self, unique_id: str) -> str:
|
||||
@staticmethod
|
||||
def _get_uploader_link(unique_id: str) -> str:
|
||||
return f"https://www.tiktok.com/@{unique_id}"
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user