From abff91c013e98dac63ed261a1f4208dcf2f1961d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 5 Feb 2026 01:22:45 +0000 Subject: [PATCH] Added and updated repositories 2026-02-05 01:22:45 --- coddrago/modules/YaMusic.py | 130 ++- coddrago/modules/full.txt | 3 +- coddrago/modules/lastfm.py | 124 --- coddrago/modules/tagwatcher.py | 6 +- mead0wsss/mead0wsMods/SenderGifts.py | 52 +- radiocycle/Modules/LastFm.py | 214 ++++ radiocycle/Modules/Neofetch.py | 48 + radiocycle/Modules/PicToStories.py | 211 ++++ radiocycle/Modules/SpotifyMod.py | 1436 ++++++++++++++++++++++++++ radiocycle/Modules/UnbanAll.py | 74 ++ radiocycle/Modules/full.txt | 7 + radiocycle/Modules/randomanimepic.py | 65 ++ radiocycle/Modules/voicetotext.py | 77 ++ yummy1gay/limoka/yg_quotes.py | 803 +++++++------- 14 files changed, 2689 insertions(+), 561 deletions(-) delete mode 100644 coddrago/modules/lastfm.py create mode 100644 radiocycle/Modules/LastFm.py create mode 100644 radiocycle/Modules/Neofetch.py create mode 100644 radiocycle/Modules/PicToStories.py create mode 100644 radiocycle/Modules/SpotifyMod.py create mode 100644 radiocycle/Modules/UnbanAll.py create mode 100644 radiocycle/Modules/full.txt create mode 100644 radiocycle/Modules/randomanimepic.py create mode 100644 radiocycle/Modules/voicetotext.py diff --git a/coddrago/modules/YaMusic.py b/coddrago/modules/YaMusic.py index 18b514d..2279cfe 100644 --- a/coddrago/modules/YaMusic.py +++ b/coddrago/modules/YaMusic.py @@ -1,8 +1,8 @@ __version__ = (3, 1, 1) # meta banner: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/banners/yamusic.png # packurl: https://raw.githubusercontent.com/coddrago/assets/refs/heads/main/modules/yamusic.yml -# meta pic: https://raw.githubusercontent.com/kamekuro/hikka-mods/main/icons/yamusic.png -# meta developer: @codrago +# meta banner: https://raw.githubusercontent.com/coddrago/modules/refs/heads/main/banner.png +# meta developer: @codrago_m # old meta dev: @kamekuro xuesos # scope: heroku_only # scope: heroku_min 1.7.2 @@ -41,6 +41,7 @@ class Banners: meta_info: str = "Music", is_liked: bool = False, repeat_mode: str = "NONE", + blur: int = 0, ): self.title = title self.artists = artists @@ -52,6 +53,7 @@ class Banners: self.meta_info = meta_info self.is_liked = is_liked self.repeat_mode = repeat_mode + self.blur = blur def ultra(self) -> io.BytesIO: WIDTH, HEIGHT = 2560, 1220 @@ -96,7 +98,9 @@ class Banners: background = background.crop((0, offset, bg_w, offset + new_h)) background = background.resize((WIDTH, HEIGHT), Image.Resampling.LANCZOS) - background = background.filter(ImageFilter.GaussianBlur(radius=0)) + + if self.blur > 0: + background = background.filter(ImageFilter.GaussianBlur(radius=self.blur)) dark_overlay = Image.new("RGBA", (WIDTH, HEIGHT), (0, 0, 0, 180)) background = Image.alpha_composite(background, dark_overlay) @@ -296,30 +300,6 @@ class Banners: (heart_x, icon_y_center + heart_size + 5), ] - if self.is_liked: - draw.ellipse(c1_box, fill="red", outline="red") - draw.ellipse(c2_box, fill="red", outline="red") - draw.polygon(tri_points, fill="red", outline="red") - else: - draw.ellipse(c1_box, fill=None, outline="red", width=3) - draw.ellipse(c2_box, fill=None, outline="red", width=3) - draw.line( - [ - (heart_x - c_r * 2 + 1, icon_y_center), - (heart_x, icon_y_center + heart_size + 5), - ], - fill="red", - width=3, - ) - draw.line( - [ - (heart_x + c_r * 2 - 1, icon_y_center), - (heart_x, icon_y_center + heart_size + 5), - ], - fill="red", - width=3, - ) - by = io.BytesIO() background.save(by, format="PNG") by.seek(0) @@ -378,8 +358,13 @@ class YaMusicMod(loader.Module): option="banner_version", default="ultra", doc=lambda: self.strings["_cfg"]["banner_version"], - validator=loader.validators.Choice(["old", "new", "ultra"]), - ),) + validator=loader.validators.Choice(["ultra"]), + ), + loader.ConfigValue( + option="blur", + default=0, + ), + ) self.ym_client = None self.device_id = "".join(random.choices(string.ascii_lowercase, k=16)) @@ -392,6 +377,7 @@ class YaMusicMod(loader.Module): #"now_play", self._now_play_placeholder, "placeholder for nowplay music" # Heroku 2.0.0 feature #) + #utils.register_placeholder("duration", self._duration_placeholder, "progress bar") if not self.get("guide_sent", False): await self.inline.bot.send_message(self._tg_id, self.strings("iguide")) @@ -437,7 +423,7 @@ class YaMusicMod(loader.Module): me = await self._client.get_me() self._premium = me.premium if hasattr(me, "premium") else False - @loader.loop(30) + @loader.loop(15) async def autobio(self): if not self.config["token"]: self.autobio.stop() @@ -547,6 +533,88 @@ class YaMusicMod(loader.Module): ), ) + + async def _duration_placeholder(self): + """Placeholder for {duration} with custom emoji bar""" + if not self.config["token"]: + return "No Token" + + try: + now = await self.__get_now_playing() + if not now or now.get("paused"): + return "Not Playing" + + duration = now.get("duration_ms", 0) + progress = now.get("progress_ms", 0) + + if duration == 0: + return "0%" + + percent = (progress / duration) * 100 + + s_less_10 = ( + "" + "" + "" + "" + "" + "" + ) + + s_10_to_20 = ( + "" + "" + "" + "" + "" + "" + ) + + s_30_to_40 = ( + "" + "" + "" + "" + "" + "" + ) + + s_over_50 = ( + "" + "" + "" + "" + "" + "" + ) + + s_over_80 = ( + "" + "" + "" + "" + "" + "" + ) + + if percent < 10: + return s_less_10 + elif percent < 20: + return s_10_to_20 + elif percent < 30: + return s_10_to_20 + elif percent < 40: + return s_30_to_40 + elif percent < 50: + return s_30_to_40 + elif percent < 80: + return s_over_50 + else: + return s_over_80 + + except Exception as e: + return f"Error: {e}" + async def _download_bytes(self, url: str) -> typing.Optional[bytes]: try: async with aiohttp.ClientSession() as session: @@ -686,8 +754,10 @@ class YaMusicMod(loader.Module): meta_info=meta_info, is_liked=is_liked, repeat_mode=repeat_mode, + blur=self.config["blur"], ) + file = await utils.run_sync( getattr(banners, self.config["banner_version"], banners.ultra) ) diff --git a/coddrago/modules/full.txt b/coddrago/modules/full.txt index 59451ea..d889594 100644 --- a/coddrago/modules/full.txt +++ b/coddrago/modules/full.txt @@ -17,10 +17,9 @@ figlet promoclaimer passwordgen send -lastfm dbmod chatmodule stats tagwatcher hardspam -YaMusic \ No newline at end of file +YaMusic diff --git a/coddrago/modules/lastfm.py b/coddrago/modules/lastfm.py deleted file mode 100644 index efd7334..0000000 --- a/coddrago/modules/lastfm.py +++ /dev/null @@ -1,124 +0,0 @@ -# --------------------------------------------------------------------------------- -#░█▀▄░▄▀▀▄░█▀▄░█▀▀▄░█▀▀▄░█▀▀▀░▄▀▀▄░░░█▀▄▀█ -#░█░░░█░░█░█░█░█▄▄▀░█▄▄█░█░▀▄░█░░█░░░█░▀░█ -#░▀▀▀░░▀▀░░▀▀░░▀░▀▀░▀░░▀░▀▀▀▀░░▀▀░░░░▀░░▒▀ -# Name: LastFM -# Description: Module for music from different services -# Author: @codrago_m -# --------------------------------------------------------------------------------- -# 🔒 Licensed under the GNU AGPLv3 -# 🌐 https://www.gnu.org/licenses/agpl-3.0.html -# --------------------------------------------------------------------------------- -# Author: @codrago -# Commands: nowplay -# scope: heroku_only -# meta developer: @codrago_m -# meta banner: https://raw.githubusercontent.com/coddrago/modules/refs/heads/main/banner.png -# meta pic: https://envs.sh/Hob.webp -# --------------------------------------------------------------------------------- - -from .. import loader, utils -from herokutl import events -import requests -import asyncio - - -@loader.tds -class lastfmmod(loader.Module): - """Module for music from different services""" - def __init__(self): - self.config = loader.ModuleConfig( - loader.ConfigValue( - "username_lastfm", - None, - lambda: self.strings["_doc_username_lastfm"], - ), - loader.ConfigValue( - "text", - "🎧 now playing...\n" - "🎶 playlist: {song_album}\n" - "🎵 track: {song_name}\n" - "🎤 artist: {song_artist}", - lambda: self.strings["_doc_text"], - ), - ) - - strings = { - "name": "LastFm", - "loading":"⌨️ Loading song...", - "bot_no_result": " Nothing found.\nTitle: {song_name}\nAuthor: {song_artist}\nAlbum:{song_album}", - "_doc_text": "The text that will be written next to the file", - "_doc_username_lastfm": "Your username from last.fm", - "nick_error": " Put your nickname from last.fm", - "tutorial": "Go to last.fm and register.\nBE SURE to remember the username and password, they will come in handy later.\nLet's look at the VK version\nAfter that, go to the @vkxci channel, download VK X and log in to your VK account, then go to settings and click «Integrations», select Last FM.\nEnter the username and password.\nThen you're almost done!\nWrite {prefix}fcfg lastfm username_lastfm {username}\nUse the {prefix}nowplay command and enjoy life!", - } - - strings_ru = { - "name": "LastFm", - "loading": "⌨️ Загрузка трека...", - "bot_no_result": " Ничего не найдено.\nНазвание: {song_name}\nИсполнитель: {song_artist}\nАльбом: {song_album}", - "_doc_text": "Текст, который будет написан рядом с файлом", - "_doc_username_lastfm": "Ваш username с last.fm", - "nick_error": " Укажите ваш никнейм с last.fm", - "tutorial": "Зайдите на last.fm и зарегистрируйтесь.\nОБЯЗАТЕЛЬНО запомните логин и пароль, они пригодятся позже.\nРассмотрим вариант для VK\nПосле этого зайдите в канал @vkxci, скачайте VK X и авторизуйтесь в своём аккаунте VK, затем зайдите в настройки и нажмите «Интеграции», выберите Last FM.\nВведите логин и пароль.\nЗатем вы почти закончили!\nНапишите {prefix}fcfg lastfm username_lastfm {username}\nИспользуйте команду {prefix}nowplay и наслаждайтесь жизнью!", - } - - @loader.command(alias="np") - async def nowplay(self, message): - """| send playing track""" - - lastfm_username = self.config["username_lastfm"] - API_KEY = "460cda35be2fbf4f28e8ea7a38580730" # Облегчение жизни школьникам - - if not lastfm_username: - response_text = self.strings["nick_error"] - await self.invoke("config", "lastfm", message=message) - await utils.answer(message, response_text) - else: - try: - current_track_url = f'http://ws.audioscrobbler.com/2.0/?method=user.getrecenttracks&nowplaying=true&user={lastfm_username}&api_key={API_KEY}&format=json' - response = requests.get(current_track_url) - data = response.json() - - if 'recenttracks' in data and 'track' in data['recenttracks'] and data['recenttracks']['track']: - nowplaying_track = None - for track in data['recenttracks']['track']: - if '@attr' in track and 'nowplaying' in track['@attr']: - nowplaying_track = track - break - - if nowplaying_track: - song_name = nowplaying_track.get('name', 'Unknown song') - song_artist = nowplaying_track.get('artist', {}).get('#text', 'Unknown Artist') - if nowplaying_track.get('album', {}).get('#text') == nowplaying_track.get('name'): - song_album = "single" - else: - song_album = nowplaying_track.get('album', {}).get('#text', 'Unknown Album') - response_text = f"/search {song_name} - {song_artist}" - - try: - async with message.client.conversation("@LyaDownbot") as conv: - await conv.send_message(response_text) - while True: - response_bot = await conv.get_response() - if "Не удалось найти трек" in response_bot.text: - await utils.answer(message, self.strings["bot_no_result"]) - return - - if "Ищем треки..." in response_bot.text: - await utils.answer(message, self.strings["loading"]) - - if response_bot.media: - await message.client.send_file(message.chat_id, response_bot.media, caption = self.config["text"].format(song_artist=song_artist, song_album=song_album, song_name=song_name)) - await message.delete() - return - except Exception as e: - await utils.answer(message, f"
{e}
") - except Exception as e: - await utils.answer(message, f"
{e}
") - - @loader.command() - async def tutorl(self, message): - """| tutorial""" - - await utils.answer(message, self.strings['tutorial'].format(prefix = self.get_prefix(), username="{username}")) diff --git a/coddrago/modules/tagwatcher.py b/coddrago/modules/tagwatcher.py index 294a559..13392ff 100644 --- a/coddrago/modules/tagwatcher.py +++ b/coddrago/modules/tagwatcher.py @@ -1,10 +1,13 @@ # meta developer: @codrago_m # scope: heroku_min 2.0.0 + import logging -from .. import utils, loader, main + from telethon.tl.functions.messages import MarkDialogUnreadRequest +from .. import loader, main, utils + logger = logging.getLogger("TagWatcher") @@ -124,6 +127,7 @@ class TagWatcher(loader.Module): description="Here will be notifications about mentions in chats.", icon_emoji_id=5409025823388741707, ) + self.xdlib = await self.import_lib( "https://raw.githubusercontent.com/coddrago/modules/refs/heads/main/libs/xdlib.py", suspend_on_error=True, diff --git a/mead0wsss/mead0wsMods/SenderGifts.py b/mead0wsss/mead0wsMods/SenderGifts.py index 74b6d22..9ea5a29 100644 --- a/mead0wsss/mead0wsMods/SenderGifts.py +++ b/mead0wsss/mead0wsMods/SenderGifts.py @@ -1,5 +1,5 @@ # -- version -- -__version__ = (1, 2, 1) +__version__ = (1, 2, 2) # -- version -- @@ -9,7 +9,7 @@ __version__ = (1, 2, 1) # ██║╚██╔╝██║██╔══╝░░██╔══██║██║░░██║██║░░██║░░████╔═████║░░╚═══██╗░╚═══██╗ # ██║░╚═╝░██║███████╗██║░░██║██████╔╝╚█████╔╝░░╚██╔╝░╚██╔╝░██████╔╝██████╔╝ # ╚═╝░░░░░╚═╝╚══════╝╚═╝░░╚═╝╚═════╝░░╚════╝░░░░╚═╝░░░╚═╝░░╚═════╝░╚═════╝░ -# © Copyright 2025 +# © Copyright 2026 # ✈ https://t.me/mead0wssMods @@ -33,14 +33,17 @@ class SenderGifts(loader.Module): "checking_user": "🔍 Проверка пользователя...", "checking_balance": "🔍 Проверка баланса...", "user_not_found": " Пользователь не найден", - "gift_menu": "🎁 Выберите категорию подарков.\n\n👤 Пользователь: {}\n📄 Текст: {}\n Баланс: {} звезд", - "category_menu": "🎁 Подарки за {} ⭐\n\n👤 Пользователь: {}\n📄 Текст: {}", + "gift_menu": "🎁 Выберите категорию подарков.\n\n👤 Пользователь: {}\n📂 Текст: {}\n⭐️ Баланс: {} звезд", + "category_menu": "🎁 Подарки за {} ⭐\n\n👤 Пользователь: {}\n📂 Текст: {}", + "privacy_menu": "🎁 Выбран подарок: {}\n\nКак отправить подарок?", "sending_gift": "🛫 Отправка подарка...", "gift_sent": " Подарок успешно отправлен!", "not_enough_stars": " Недостаточно звезд для отправки подарка {}!", "min_stars_error": " Недостаточно звезд для отправки минимального подарка!", "no_available_gifts": " Нет доступных подарков для вашего баланса", "balance_error": " Ошибка при проверке баланса", + "btn_public": "📢 Публично", + "btn_anon": "🕵️ Анонимно", } gift_categories = { @@ -57,6 +60,7 @@ class SenderGifts(loader.Module): {"id": 5170314324215857265, "emoji": "💐", "name": "Цветы"}, {"id": 5170564780938756245, "emoji": "🚀", "name": "Ракета"}, {"id": 5922558454332916696, "emoji": "🎄", "name": "Ёлка"}, + {"id": 5956217000635139069, "emoji": "🧸", "name": "Новогодний мишка"} ], 100: [ {"id": 5168043875654172773, "emoji": "🏆", "name": "Кубок"}, @@ -135,9 +139,11 @@ class SenderGifts(loader.Module): if row: buttons.append(row) + + helper_msg = await self.inline.form("🪐", balance_msg) await utils.answer( - balance_msg, + helper_msg, self.strings["gift_menu"].format( f"@{user.username}" if user.username else user.first_name, text if text else "-", @@ -153,8 +159,8 @@ class SenderGifts(loader.Module): for gift in gifts: row.append({ "text": gift["emoji"], - "callback": self._send_gift, - "args": (user_id, gift["id"], text, gift["emoji"], msg_id, balance), + "callback": self._select_privacy, + "args": (user_id, gift["id"], text, gift["emoji"], msg_id, balance, price), }) if len(row) == 3: buttons.append(row) @@ -183,6 +189,34 @@ class SenderGifts(loader.Module): reply_markup=buttons ) + async def _select_privacy(self, call, user_id, gift_id, text, gift_emoji, msg_id, balance, price): + buttons = [ + [ + { + "text": self.strings["btn_public"], + "callback": self._send_gift, + "args": (user_id, gift_id, text, gift_emoji, msg_id, balance, False) # hide_name=False публично + }, + { + "text": self.strings["btn_anon"], + "callback": self._send_gift, + "args": (user_id, gift_id, text, gift_emoji, msg_id, balance, True) # hide_name=True анонимно + } + ], + [ + { + "text": "⬅️ Назад", + "callback": self._show_category, + "args": (user_id, price, text, balance, msg_id) + } + ] + ] + + await call.edit( + self.strings["privacy_menu"].format(gift_emoji), + reply_markup=buttons + ) + async def _back_to_categories(self, call, user_id, text, balance, msg_id): try: user = await self.client.get_entity(user_id) @@ -216,7 +250,7 @@ class SenderGifts(loader.Module): reply_markup=buttons ) - async def _send_gift(self, call, user_id, gift_id, text, gift_emoji, msg_id, balance): + async def _send_gift(self, call, user_id, gift_id, text, gift_emoji, msg_id, balance, hide_name): try: await call.edit( self.strings["sending_gift"], @@ -227,11 +261,11 @@ class SenderGifts(loader.Module): self.client.parse_mode, ) text, entities = parse_mode.parse(text) - user = await self.client.get_input_entity(user_id) inv = InputInvoiceStarGift( user, gift_id, + hide_name=hide_name, message=TextWithEntities(text, entities) if text else TextWithEntities("", []) ) form = await self.client(GetPaymentFormRequest(inv)) diff --git a/radiocycle/Modules/LastFm.py b/radiocycle/Modules/LastFm.py new file mode 100644 index 0000000..7069ef3 --- /dev/null +++ b/radiocycle/Modules/LastFm.py @@ -0,0 +1,214 @@ +# ======================================= +# _ __ __ __ _ +# | |/ /___ | \/ | ___ __| |___ +# | ' // _ \ | |\/| |/ _ \ / _` / __| +# | . \ __/ | | | | (_) | (_| \__ \ +# |_|\_\___| |_| |_|\___/ \__,_|___/ +# @ke_mods +# ======================================= +# +# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) +# -------------------------------------- +# https://creativecommons.org/licenses/by-nd/4.0/legalcode +# ======================================= + +# meta developer: @ke_mods + +from .. import loader, utils +import requests +import io +import textwrap +from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps + +class Banners: + def __init__( + self, + title: str, + artists: list, + track_cover: bytes, + font + ): + self.title = title + self.artists = ", ".join(artists) if isinstance(artists, list) else artists + self.track_cover = track_cover + self.font_url = font + + def _get_font(self, size, font_bytes): + return ImageFont.truetype(io.BytesIO(font_bytes), size) + + def _prepare_cover(self, size, radius): + cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") + cover = cover.resize((size, size), Image.Resampling.LANCZOS) + + mask = Image.new("L", (size, size), 0) + draw = ImageDraw.Draw(mask) + draw.rounded_rectangle((0, 0, size, size), radius=radius, fill=255) + + output = Image.new("RGBA", (size, size), (0, 0, 0, 0)) + output.paste(cover, (0, 0), mask=mask) + return output + + def _prepare_background(self, w, h): + bg = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") + bg = bg.resize((w, h), Image.Resampling.BICUBIC) + bg = bg.filter(ImageFilter.GaussianBlur(radius=20)) + bg = ImageEnhance.Brightness(bg).enhance(0.4) + return bg + + def horizontal(self): + W, H = 1500, 600 + padding = 60 + cover_size = 480 + + font_bytes = requests.get(self.font_url).content + title_font = self._get_font(55, font_bytes) + artist_font = self._get_font(45, font_bytes) + lfm_font = self._get_font(35, font_bytes) + + img = self._prepare_background(W, H) + draw = ImageDraw.Draw(img) + + cover = self._prepare_cover(cover_size, 30) + img.paste(cover, (padding, (H - cover_size) // 2), cover) + + text_x = padding + cover_size + 60 + text_y_start = 100 + text_width_limit = W - text_x - padding + + display_title = self.title + while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0: + display_title = display_title[:-1] + if len(display_title) < len(self.title): display_title += "…" + + display_artist = self.artists + while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: + display_artist = display_artist[:-1] + if len(display_artist) < len(self.artists): display_artist += "…" + + draw.text((text_x, text_y_start), display_title, font=title_font, fill="white") + draw.text((text_x, text_y_start + 70), display_artist, font=artist_font, fill="#B3B3B3") + + bar_y = 480 + draw.text((text_x, bar_y), "last.fm", font=lfm_font, fill="white") + + by = io.BytesIO() + img.save(by, format="PNG") + by.seek(0) + by.name = "banner.png" + return by + + def vertical(self): + W, H = 1000, 1500 + padding = 80 + cover_size = 800 + + font_bytes = requests.get(self.font_url).content + title_font = self._get_font(60, font_bytes) + artist_font = self._get_font(45, font_bytes) + lfm_font = self._get_font(35, font_bytes) + + img = self._prepare_background(W, H) + draw = ImageDraw.Draw(img) + + cover = self._prepare_cover(cover_size, 40) + cover_x = (W - cover_size) // 2 + cover_y = 120 + img.paste(cover, (cover_x, cover_y), cover) + + text_area_y = cover_y + cover_size + 120 + text_width_limit = W - (padding * 2) + + display_title = self.title + while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0: + display_title = display_title[:-1] + if len(display_title) < len(self.title): display_title += "…" + + display_artist = self.artists + while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: + display_artist = display_artist[:-1] + if len(display_artist) < len(self.artists): display_artist += "…" + + title_w = title_font.getlength(display_title) + draw.text(((W - title_w) / 2, text_area_y), display_title, font=title_font, fill="white") + + artist_w = artist_font.getlength(display_artist) + draw.text(((W - artist_w) / 2, text_area_y + 75), display_artist, font=artist_font, fill="#B3B3B3") + + bar_y = text_area_y + 260 + + lfm_w = lfm_font.getlength("last.fm") + draw.text(((W - lfm_w) / 2, bar_y), "last.fm", font=lfm_font, fill="white") + + by = io.BytesIO() + img.save(by, format="PNG") + by.seek(0) + by.name = "banner.png" + return by + +@loader.tds +class lastfmmod(loader.Module): + """Module for music from different services""" + + strings = { + "name": "LastFm", + "no_track": " No track is currently playing", + "_doc_text": "The text that will be written next to the file", + "_doc_username": "Your username from last.fm", + "nick_error": " Put your nickname from last.fm", + "uploading": "🕔 Uploading banner...", + } + strings_ru = { + "name": "LastFm", + "no_track": " Сейчас ничего не играет", + "_doc_text": "Текст, который будет написан рядом с файлом", + "_doc_username": "Ваш username с last.fm", + "nick_error": " Укажите ваш никнейм с last.fm", + "uploading": "🕔 Загрузка баннера...", + } + strings_jp = { + "name": "LastFm", + "no_track": " 現在再生中のトラックはありません", + "_doc_text": "ファイルの横に表示されるテキスト", + "_doc_username": "Last.fmのユーザー名", + "nick_error": " Last.fmのニックネームを入力してください", + "uploading": "🕔 バナーをアップロード中...", + } + + def __init__(self): + self.config = loader.ModuleConfig( + loader.ConfigValue("username", None, lambda: self.strings["_doc_username"]), + loader.ConfigValue("custom_text", "🤩 {song_name}{song_artist}", lambda: self.strings["_doc_text"]), + loader.ConfigValue("font", "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf", "Custom font URL (ttf)"), + loader.ConfigValue("banner_version", "horizontal", lambda: "Banner version", validator=loader.validators.Choice(["horizontal", "vertical"])), + ) + + @loader.command(alias="np") + async def nowplay(self, message): + """| send playing track info""" + user = self.config["username"] + if not user: + await self.invoke("config", "lastfm", message=message) + return await utils.answer(message, self.strings["nick_error"]) + + try: + url = f'http://ws.audioscrobbler.com/2.0/?method=user.getrecenttracks&nowplaying=true&user={user}&api_key=460cda35be2fbf4f28e8ea7a38580730&format=json' + data = requests.get(url).json() + track = next((t for t in data.get('recenttracks', {}).get('track', []) if t.get('@attr', {}).get('nowplaying')), None) + if not track: + return await utils.answer(message, self.strings["no_track"]) + name = track.get('name', 'Unknown') + artist = track.get('artist', {}).get('#text', 'Unknown') + caption = self.config["custom_text"].format(song_artist=artist, song_name=name) + imgs = track.get('image', []) + cov_url = next((i['#text'] for i in imgs if i['size'] == 'extralarge'), imgs[-1]['#text'] if imgs else None) + + if not cov_url: + return await utils.answer(message, caption) + msg = await utils.answer(message, self.strings["uploading"]) + cov_bytes = await utils.run_sync(requests.get, cov_url) + banners = Banners(name, artist, cov_bytes.content, self.config["font"]) + file = await utils.run_sync(getattr(banners, self.config["banner_version"])) + await utils.answer(msg, caption, file=file) + + except Exception as e: + await utils.answer(message, f"
{e}
") diff --git a/radiocycle/Modules/Neofetch.py b/radiocycle/Modules/Neofetch.py new file mode 100644 index 0000000..3301c1e --- /dev/null +++ b/radiocycle/Modules/Neofetch.py @@ -0,0 +1,48 @@ +# ======================================= +# _ __ __ __ _ +# | |/ /___ | \/ | ___ __| |___ +# | ' // _ \ | |\/| |/ _ \ / _` / __| +# | . \ __/ | | | | (_) | (_| \__ \ +# |_|\_\___| |_| |_|\___/ \__,_|___/ +# @ke_mods +# ======================================= +# +# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) +# -------------------------------------- +# https://creativecommons.org/licenses/by-nd/4.0/legalcode +# ======================================= + +# meta developer: @ke_mods + +import subprocess +from .. import loader, utils + +@loader.tds +class NeofetchMod(loader.Module): + strings = { + "name": "Neofetch", + "not_installed": "Please, install Neofetch package", + } + + strings_ru = { + "not_installed": "Пожалуйста, установите пакет Neofetch", + } + + strings_ua = { + "not_installed": "Будь ласка, встановіть пакет Neofetch", + } + + @loader.command( + ru_doc="- запустить команду neofetch", + ua_doc="- запустити команду neofetch", + ) + async def neofetchcmd(self, message): + """- run neofetch command""" + try: + result = subprocess.run(["neofetch", "--stdout"], capture_output=True, text=True) + output = result.stdout + await utils.answer(message, f"
{utils.escape_html(output)}
") + + except FileNotFoundError: + await utils.answer(message, self.strings("not_installed")) + diff --git a/radiocycle/Modules/PicToStories.py b/radiocycle/Modules/PicToStories.py new file mode 100644 index 0000000..84bbd9a --- /dev/null +++ b/radiocycle/Modules/PicToStories.py @@ -0,0 +1,211 @@ +# ======================================= +# _ __ __ __ _ +# | |/ /___ | \/ | ___ __| |___ +# | ' // _ \ | |\/| |/ _ \ / _` / __| +# | . \ __/ | | | | (_) | (_| \__ \ +# |_|\_\___| |_| |_|\___/ \__,_|___/ +# @ke_mods +# ======================================= +# +# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) +# -------------------------------------- +# https://creativecommons.org/licenses/by-nd/4.0/legalcode +# ======================================= + +# meta developer: @ke_mods +# requires: pillow + +import io +import asyncio + +from telethon import functions, types +from PIL import Image + +from .. import loader, utils + + +@loader.tds +class PicToStoriesMod(loader.Module): + """Grid for stories""" + + strings = { + "name": "PicToStories", + "no_rep": ( + "❗️ " + "Reply to photo!" + ), + "work": ( + "🕔 " + "Processing..." + ), + "done": ( + " " + "Done! Check your profile." + ), + "err": ( + " " + "Error: {}" + ), + } + + strings_ru = { + "no_rep": ( + "❗️ " + "Реплай на фото!" + ), + "work": ( + "🕔 " + "Обрабатываю..." + ), + "done": ( + " " + "Готово! Проверяй профиль." + ), + "err": ( + " " + "Ошибка: {}" + ), + } + + def __init__(self): + self.config = loader.ModuleConfig( + loader.ConfigValue( + "period", + 48, + lambda: "Visibility period in hours", + validator=loader.validators.Integer(), + ), + loader.ConfigValue( + "blacklist", + [], + lambda: "Blacklisted user IDs", + validator=loader.validators.Series(loader.validators.Integer()), + ), + loader.ConfigValue( + "cooldown", + 0, + lambda: "Cooldown between stories in seconds", + validator=loader.validators.Integer(minimum=0), + ), + ) + + @loader.command(ru_doc="<реплай на фото> [название альбома] - сделать сетку") + async def ptscmd(self, message): + """ [album name] - make grid""" + args = utils.get_args_raw(message) + reply = await message.get_reply_message() + if not reply or not reply.media: + await utils.answer(message, self.strings("no_rep")) + return + + try: + image_bytes = await reply.download_media(file=bytes) + img = Image.open(io.BytesIO(image_bytes)) + except Exception as e: + await utils.answer(message, self.strings("err").format(e)) + return + + await utils.answer(message, self.strings("work")) + + w, h = img.size + curr_ratio = w / h + variants = [ + (5 / 4, 2), + (4 / 5, 3), + (3 / 5, 4), + (9 / 16, 5) + ] + best_ratio, rows = min(variants, key=lambda x: abs(curr_ratio - x[0])) + + new_h = int(w / best_ratio) + img = img.resize((w, new_h), Image.LANCZOS) + w, h = img.size + + parts = [] + pw, ph = w // 3, h // rows + for r in range(rows): + for c in range(3): + x, y = c * pw, r * ph + parts.append(img.crop((x, y, x + pw, y + ph))) + + parts.reverse() + + privacy = [types.InputPrivacyValueAllowAll()] + if self.config["blacklist"]: + entities = [] + for uid in self.config["blacklist"]: + try: + entities.append(await self.client.get_input_entity(uid)) + except Exception: + continue + if entities: + privacy.append(types.InputPrivacyValueDisallowUsers(users=entities)) + + story_ids = [] + for i, p in enumerate(parts): + out = io.BytesIO() + p.save(out, "JPEG", quality=95) + out.seek(0) + + uploaded_file = await self.client.upload_file(out, file_name="s.jpg") + res = await self.client( + functions.stories.SendStoryRequest( + peer=types.InputPeerSelf(), + media=types.InputMediaUploadedPhoto(uploaded_file), + privacy_rules=privacy, + period=self.config["period"] * 3600, + ) + ) + + sid = next( + ( + u.story_id if hasattr(u, "story_id") else u.id + for u in res.updates + if hasattr(u, "story_id") or hasattr(u, "id") + ), + None, + ) + + if sid: + story_ids.append(sid) + + if self.config["cooldown"] > 0 and i < len(parts) - 1: + await asyncio.sleep(self.config["cooldown"]) + + if not story_ids: + return + + if args: + all_albums = await self.client( + functions.stories.GetAlbumsRequest(peer=types.InputPeerSelf(), hash=0) + ) + + target = next( + (a for a in all_albums.albums if getattr(a, 'title', '') == args), + None + ) + + if target: + await self.client( + functions.stories.UpdateAlbumRequest( + peer=types.InputPeerSelf(), + album_id=target.album_id, + add_stories=story_ids, + ) + ) + else: + await self.client( + functions.stories.CreateAlbumRequest( + peer=types.InputPeerSelf(), + stories=story_ids, + title=args, + ) + ) + else: + await self.client( + functions.stories.TogglePinnedRequest( + peer=types.InputPeerSelf(), id=story_ids, pinned=True + ) + ) + + await utils.answer(message, self.strings("done")) \ No newline at end of file diff --git a/radiocycle/Modules/SpotifyMod.py b/radiocycle/Modules/SpotifyMod.py new file mode 100644 index 0000000..218588b --- /dev/null +++ b/radiocycle/Modules/SpotifyMod.py @@ -0,0 +1,1436 @@ +# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ +# █▀█ █ █ █ █▀█ █▀▄ █ +# © Copyright 2022 +# +# https://t.me/hikariatama +# +# 🔒 Licensed under the GNU AGPLv3 +# 🌐 https://www.gnu.org/licenses/agpl-3.0.html +# +# You CANNOT edit, distribute or redistribute this file without direct permission from the author. +# +# ORIGINAL MODULE: https://raw.githubusercontent.com/hikariatama/ftg/master/spotify.py + +# ======================================= +# _ __ __ __ _ +# | |/ /___ | \/ | ___ __| |___ +# | ' // _ \ | |\/| |/ _ \ / _` / __| +# | . \ __/ | | | | (_) | (_| \__ \ +# |_|\_\___| |_| |_|\___/ \__,_|___/ +# @ke_mods +# ======================================= +# +# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) +# -------------------------------------- +# https://creativecommons.org/licenses/by-nd/4.0/legalcode +# ======================================= + +# meta developer: @ke_mods +# requires: telethon spotipy pillow requests yt-dlp + +import asyncio +import contextlib +import functools +import io +import logging +import re +import textwrap +import time +import traceback +import os +from types import FunctionType + +import requests +import spotipy +from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps +from telethon.errors import FloodWaitError +from telethon.tl.functions.account import UpdateProfileRequest +from telethon.tl.types import Message + +from .. import loader, utils + +logger = logging.getLogger(__name__) +logging.getLogger("spotipy").setLevel(logging.CRITICAL) + +class Banners: + def __init__( + self, + title: str, + artists: list, + duration: int, + progress: int, + track_cover: bytes, + font + ): + self.title = title + self.artists = ", ".join(artists) if isinstance(artists, list) else artists + self.duration = duration + self.progress = progress + self.track_cover = track_cover + self.font_url = font + + def _get_font(self, size, font_bytes): + return ImageFont.truetype(io.BytesIO(font_bytes), size) + + def _prepare_cover(self, size, radius): + cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") + cover = cover.resize((size, size), Image.Resampling.LANCZOS) + + mask = Image.new("L", (size, size), 0) + draw = ImageDraw.Draw(mask) + draw.rounded_rectangle((0, 0, size, size), radius=radius, fill=255) + + output = Image.new("RGBA", (size, size), (0, 0, 0, 0)) + output.paste(cover, (0, 0), mask=mask) + return output + + def _prepare_background(self, w, h): + bg = Image.open(io.BytesIO(self.track_cover)).convert("RGBA") + bg = bg.resize((w, h), Image.Resampling.BICUBIC) + bg = bg.filter(ImageFilter.GaussianBlur(radius=40)) + bg = ImageEnhance.Brightness(bg).enhance(0.4) + return bg + + def _draw_progress_bar(self, draw, x, y, w, h, progress_pct, color="white", bg_color="#5e5e5e"): + draw.rounded_rectangle((x, y, x + w, y + h), radius=h/2, fill=bg_color) + + fill_w = int(w * progress_pct) + if fill_w > 0: + draw.rounded_rectangle((x, y, x + fill_w, y + h), radius=h/2, fill=color) + + dot_radius = h * 1.2 + dot_x = x + fill_w + dot_y = y + (h / 2) + + draw.ellipse( + (dot_x - dot_radius, dot_y - dot_radius, dot_x + dot_radius, dot_y + dot_radius), + fill=color + ) + + def horizontal(self): + W, H = 1500, 600 + padding = 60 + cover_size = 480 + + font_bytes = requests.get(self.font_url).content + title_font = self._get_font(55, font_bytes) + artist_font = self._get_font(45, font_bytes) + time_font = self._get_font(25, font_bytes) + + img = self._prepare_background(W, H) + draw = ImageDraw.Draw(img) + + cover = self._prepare_cover(cover_size, 30) + img.paste(cover, (padding, (H - cover_size) // 2), cover) + + text_x = padding + cover_size + 60 + text_y_start = 100 + text_width_limit = W - text_x - padding + + display_title = self.title + while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0: + display_title = display_title[:-1] + if len(display_title) < len(self.title): display_title += "…" + + display_artist = self.artists + while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: + display_artist = display_artist[:-1] + if len(display_artist) < len(self.artists): display_artist += "…" + + draw.text((text_x, text_y_start), display_title, font=title_font, fill="white") + draw.text((text_x, text_y_start + 70), display_artist, font=artist_font, fill="#B3B3B3") + + cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}" + dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}" + + cur_w = time_font.getlength(cur_time) + dur_w = time_font.getlength(dur_time) + + bar_y = 480 + bar_h = 8 + gap = 25 + + draw.text((text_x, bar_y - 12), cur_time, font=time_font, fill="white") + + bar_start_x = text_x + cur_w + gap + bar_end_x = text_x + text_width_limit - dur_w - gap + bar_w = bar_end_x - bar_start_x + + prog_pct = self.progress / self.duration if self.duration > 0 else 0 + self._draw_progress_bar(draw, bar_start_x, bar_y, bar_w, bar_h, prog_pct) + + draw.text((bar_end_x + gap, bar_y - 12), dur_time, font=time_font, fill="white") + + by = io.BytesIO() + img.save(by, format="PNG") + by.seek(0) + by.name = "banner.png" + return by + + def vertical(self): + W, H = 1000, 1500 + padding = 80 + cover_size = 800 + + font_bytes = requests.get(self.font_url).content + title_font = self._get_font(60, font_bytes) + artist_font = self._get_font(45, font_bytes) + time_font = self._get_font(35, font_bytes) + + img = self._prepare_background(W, H) + draw = ImageDraw.Draw(img) + + cover = self._prepare_cover(cover_size, 40) + cover_x = (W - cover_size) // 2 + cover_y = 120 + img.paste(cover, (cover_x, cover_y), cover) + + text_area_y = cover_y + cover_size + 120 + text_width_limit = W - (padding * 2) + + display_title = self.title + while title_font.getlength(display_title) > text_width_limit and len(display_title) > 0: + display_title = display_title[:-1] + if len(display_title) < len(self.title): display_title += "…" + + display_artist = self.artists + while artist_font.getlength(display_artist) > text_width_limit and len(display_artist) > 0: + display_artist = display_artist[:-1] + if len(display_artist) < len(self.artists): display_artist += "…" + + title_w = title_font.getlength(display_title) + draw.text(((W - title_w) / 2, text_area_y), display_title, font=title_font, fill="white") + + artist_w = artist_font.getlength(display_artist) + draw.text(((W - artist_w) / 2, text_area_y + 75), display_artist, font=artist_font, fill="#B3B3B3") + + bar_y = text_area_y + 260 + bar_h = 8 + bar_w = W - (padding * 2) + prog_pct = self.progress / self.duration if self.duration > 0 else 0 + + self._draw_progress_bar(draw, padding, bar_y, bar_w, bar_h, prog_pct, color="white", bg_color="#5e5e5e") + + cur_time = f"{(self.progress//1000//60):02}:{(self.progress//1000%60):02}" + dur_time = f"{(self.duration//1000//60):02}:{(self.duration//1000%60):02}" + + draw.text((padding, bar_y + 40), cur_time, font=time_font, fill="#B3B3B3") + + dur_w = time_font.getlength(dur_time) + draw.text((W - padding - dur_w, bar_y + 40), dur_time, font=time_font, fill="#B3B3B3") + + by = io.BytesIO() + img.save(by, format="PNG") + by.seek(0) + by.name = "banner.png" + return by + +@loader.tds +class SpotifyMod(loader.Module): + """Card with the currently playing track on Spotify.""" + + strings = { + "name": "SpotifyMod", + "need_auth": ( + " Please execute" + " .sauth before performing this action." + ), + "on-repeat": ( + "🔄 Set on-repeat." + ), + "off-repeat": ( + "🔄 Stopped track" + " repeat." + ), + "skipped": ( + "➡️ Skipped track." + ), + "playing": "▶️ Playing...", + "back": ( + "⬅️ Switched to previous" + " track" + ), + "paused": " Pause", + "restarted": ( + "✅️ Playing track" + " from the" + " beginning" + ), + "liked": ( + "❤️ Liked current" + " playback" + ), + "unlike": ( + "" + " Unliked current playback" + ), + "err": ( + " An error occurred." + "\n{}" + ), + "already_authed": ( + " Already authorized" + ), + "authed": ( + " Authentication" + " successful" + ), + "deauth": ( + "🚪 Successfully logged out" + " of account" + ), + "auth": ( + '🔗 Follow this' + " link, allow access, then enter .scode https://... with" + " the link you received." + ), + "no_music": ( + " No music is playing!" + ), + "dl_err": ( + " Failed to download" + " track." + ), + "volume_changed": ( + "🔊" + " Volume changed to {}%." + ), + "volume_invalid": ( + " Volume level must be" + " a number between 0 and 100." + ), + "volume_err": ( + " An error occurred while" + " changing volume." + ), + "no_volume_arg": ( + " Please specify a" + " volume level between 0 and 100." + ), + "searching_tracks": ( + "🕔 Searching for tracks" + " matching {}..." + ), + "no_search_query": ( + " Please specify a" + " search query." + ), + "no_tracks_found": ( + " No tracks found for" + " {}." + ), + "search_results": ( + " Search results for" + " {}:\n\n{}" + ), + "downloading_search_track": ( + "🕔 Downloading {}..." + ), + "download_success": ( + " Successfully downloaded {} - {}" + ), + "invalid_track_number": ( + " Invalid track number." + " Please search first or provide a valid number from the list." + ), + "device_list": ( + "📄 Available devices:\n{}" + ), + "no_devices_found": ( + " No devices found." + ), + "device_changed": ( + " Playback transferred to" + " {}." + ), + "invalid_device_id": ( + " Invalid device ID." + " Use .sdevice to see available devices." + ), + "search_results_cleared": " Search results cleared", + "autobio": ( + "🎧 Spotify autobio {}" + ), + "no_ytdlp": " yt-dlp not found... Check config or install yt-dlp ({}terminal pip install yt-dlp)", + "snowt_failed": "\n\n Download failed", + "uploading_banner": "\n\n🕔 Uploading banner...", + "downloading_track": "\n\n🕔 Downloading track...", + "no_playlists": " No playlists found.", + "playlists_list": "📄 Your playlists:\n\n{}", + "added_to_playlist": " Added {} to {}", + "removed_from_playlist": " Removed {} from {}", + "invalid_playlist_index": " Invalid playlist number.", + "no_cached_playlists": " Use .splaylists first.", + "playlist_created": " Playlist {} created.", + "playlist_deleted": " Playlist {} deleted.", + "no_playlist_name": " Please specify a playlist name.", + } + + strings_ru = { + "_cls_doc": "Карточка с играющим треком в Spotify.", + "need_auth": ( + " Выполни" + " .sauth перед выполнением этого действия." + ), + "err": ( + " Произошла ошибка." + "\n{}" + ), + "on-repeat": ( + "🔄 Включен повтор трека." + ), + "off-repeat": ( + "🔄 Повтор трека отключён." + ), + "skipped": ( + "➡️ Трек пропущен." + ), + "playing": "▶️ Играет...", + "back": ( + "⬅️ Переключено на предыдущий трек" + ), + "paused": " Пауза", + "restarted": ( + "✅️ Воспроизведение трека с начала..." + ), + "liked": ( + "❤️ Текущий трек добавлен в избранное" + ), + "unlike": ( + " Убрал лайк с текущего трека" + ), + "already_authed": ( + " Уже авторизован" + ), + "authed": ( + " Успешная аутентификация" + ), + "deauth": ( + "🚪 Успешный выход из аккаунта" + ), + "auth": ( + '🔗 Пройдите по этой ссылке, разрешите вход, затем введите .scode https://... с ссылкой которую вы получили.' + ), + "no_music": ( + " Музыка не играет!" + ), + "dl_err": ( + " Не удалось скачать трек." + ), + "volume_changed": ( + "🔊" + " Громкость изменена на {}%." + ), + "volume_invalid": ( + " Уровень громкости должен" + " быть числом от 0 до 100." + ), + "volume_err": ( + " Произошла ошибка при" + " изменении громкости." + ), + "no_volume_arg": ( + " Пожалуйста, укажите" + " уровень громкости от 0 до 100." + ), + "searching_tracks": ( + "🕔 Идет поиск треков" + " по запросу {}..." + ), + "no_search_query": ( + " Пожалуйста, укажите" + " поисковый запрос." + ), + "no_tracks_found": ( + " По запросу '{}'" + " ничего не найдено." + ), + "search_results": ( + " Результаты поиска" + " по запросу {}:\n\n{}" + ), + "downloading_search_track": ( + "🕔 Скачиваю {}..." + ), + "download_success": ( + " Трек {} - {} успешно скачан." + ), + "invalid_track_number": ( + " Некорректный номер трека." + " Сначала выполните поиск или укажите правильный номер из списка." + ), + "device_list": ( + "📄 Доступные устройства:\n{}" + ), + "no_devices_found": ( + " Устройства не найдены." + ), + "device_changed": ( + " Воспроизведение переключено на" + " {}." + ), + "invalid_device_id": ( + " Некорректный ID устройства." + " Используйте .sdevice , чтобы увидеть доступные устройства." + ), + "search_results_cleared": " Результаты поиска очищены", + "autobio": ( + "🎧 Обновление био" + " включено {}" + ), + "no_ytdlp": " yt-dlp не найден... Проверьте конфиг или установите yt-dlp ({}terminal pip install yt-dlp)", + "snowt_failed": "\n\n Ошибка скачивания.", + "uploading_banner": "\n\n🕔 Загрузка баннера...", + "downloading_track": "\n\n🕔 Скачивание трека...", + "no_playlists": " Плейлисты не найдены.", + "playlists_list": "📄 Ваши плейлисты:\n\n{}", + "added_to_playlist": " Трек {} добавлен в {}", + "removed_from_playlist": " Трек {} удален из {}", + "invalid_playlist_index": " Неверный номер плейлиста.", + "no_cached_playlists": " Сначала используйте .splaylists.", + "playlist_created": " Плейлист {} создан.", + "playlist_deleted": " Плейлист {} удален.", + "no_playlist_name": " Пожалуйста, укажите название плейлиста.", + } + strings_jp = { + "_cls_doc": "Spotify からのメッセージ", + "need_auth": ( + " この操作を行う前に " + ".sauth を実行してください。" + ), + "on-repeat": ( + "🔄 リピート再生を設定しました。" + ), + "off-repeat": ( + "🔄 リピート再生を解除しました。" + ), + "skipped": ( + "➡️ スキップしました。" + ), + "playing": "▶️ 再生中...", + "back": ( + "⬅️ 前のトラックに戻りました。" + ), + "paused": " 一時停止", + "restarted": ( + "✅️ 最初から再生します。" + ), + "liked": ( + "❤️ お気に入りに追加しました。" + ), + "unlike": ( + "" + " お気に入りから削除しました。" + ), + "err": ( + " エラーが発生しました。" + "\n{}" + ), + "already_authed": ( + " 既に認証されています。" + ), + "authed": ( + " 認証に成功しました。" + ), + "deauth": ( + "🚪 ログアウトしました。" + ), + "auth": ( + '🔗 リンクをクリックしてアクセスを許可し、取得したURLを使って .scode https://... を入力してください。' + ), + "no_music": ( + " 音楽は再生されていません!" + ), + "dl_err": ( + " トラックのダウンロードに失敗しました。" + ), + "volume_changed": ( + "🔊" + " 音量を {}% に変更しました。" + ), + "volume_invalid": ( + " 音量は0から100の数字で指定してください。" + ), + "volume_err": ( + " 音量の変更中にエラーが発生しました。" + ), + "no_volume_arg": ( + " 0から100の間で音量を指定してください。" + ), + "searching_tracks": ( + "🕔 {} を検索中..." + ), + "no_search_query": ( + " 検索キーワードを指定してください。" + ), + "no_tracks_found": ( + " {} は見つかりませんでした。" + ), + "search_results": ( + " {} の検索結果:\n\n{}" + ), + "downloading_search_track": ( + "🕔 {} をダウンロード中..." + ), + "download_success": ( + " {} - {} のダウンロードに成功しました。" + ), + "invalid_track_number": ( + " トラック番号が無効です。" + " 先に検索するか、リストから有効な番号を指定してください。" + ), + "device_list": ( + "📄 利用可能なデバイス:\n{}" + ), + "no_devices_found": ( + " デバイスが見つかりません。" + ), + "device_changed": ( + " 再生デバイスを" + " {} に切り替えました。" + ), + "invalid_device_id": ( + " デバイスIDが無効です。" + " .sdevice で利用可能なデバイスを確認してください。" + ), + "search_results_cleared": " 検索結果をクリアしました。", + "autobio": ( + "🎧 Spotify AutoBio: {}" + ), + "no_ytdlp": " yt-dlpが見つかりません... 設定を確認するか、インストールしてください ({}terminal pip install yt-dlp)", + "snowt_failed": "\n\n ダウンロードに失敗しました。", + "uploading_banner": "\n\n🕔 バナーをアップロード中...", + "downloading_track": "\n\n🕔 トラックをダウンロード中...", + "no_playlists": " プレイリストが見つかりません。", + "playlists_list": "📄 あなたのプレイリスト:\n\n{}", + "added_to_playlist": " {} を {} に追加しました。", + "removed_from_playlist": " {} を {} から削除しました。", + "invalid_playlist_index": " プレイリスト番号が無効です。", + "no_cached_playlists": " 先に .splaylists を使用してください。", + "playlist_created": " プレイリスト {} を作成しました。", + "playlist_deleted": " プレイリスト {} を削除しました。", + "no_playlist_name": " プレイリスト名を指定してください。", + } + + def __init__(self): + self._client_id = "e0708753ab60499c89ce263de9b4f57a" + self._client_secret = "80c927166c664ee98a43a2c0e2981b4a" + self.scope = ( + "user-read-playback-state playlist-read-private playlist-read-collaborative" + " user-modify-playback-state user-library-modify" + " playlist-modify-public playlist-modify-private" + ) + self.sp_auth = spotipy.oauth2.SpotifyOAuth( + client_id=self._client_id, + client_secret=self._client_secret, + redirect_uri="https://thefsch.github.io/spotify/", + scope=self.scope, + ) + self.config = loader.ModuleConfig( + loader.ConfigValue( + "show_banner", + True, + "Show banner with track info", + validator=loader.validators.Boolean(), + ), + loader.ConfigValue( + "custom_text", + ( + "🎧 Now playing: {track} — {artists}\n" + "🔗 song.link" + ), + """Custom text, supports {track}, {artists}, {album}, {playlist}, {playlist_owner}, {spotify_url}, {songlink}, {progress}, {duration}, {device} placeholders""", + validator=loader.validators.String(), + ), + loader.ConfigValue( + "font", + "https://raw.githubusercontent.com/kamekuro/assets/master/fonts/Onest-Bold.ttf", + "Custom font. Specify URL to .ttf file", + validator=loader.validators.String(), + ), + loader.ConfigValue( + "auto_bio_template", + "🎧 {}", + lambda: "Template for Spotify AutoBio", + ), + loader.ConfigValue( + "ytdlp_path", + "", + "Path to ytdlp binary", + validator=loader.validators.String(), + ), + loader.ConfigValue( + "banner_version", + "horizontal", + lambda: "Banner version", + validator=loader.validators.Choice(["horizontal", "vertical"]), + ), + ) + + async def client_ready(self, client, db): + self.font_ready = asyncio.Event() + + self._premium = getattr(await client.get_me(), "premium", False) + try: + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) + except Exception: + self.set("acs_tkn", None) + self.sp = None + + if self.get("autobio", False): + self.autobio.start() + + def tokenized(func) -> FunctionType: + @functools.wraps(func) + async def wrapped(*args, **kwargs): + if not args[0].get("acs_tkn", False) or not args[0].sp: + await utils.answer(args[1], args[0].strings("need_auth")) + return + + return await func(*args, **kwargs) + + wrapped.__doc__ = func.__doc__ + wrapped.__module__ = func.__module__ + + return wrapped + + def error_handler(func) -> FunctionType: + @functools.wraps(func) + async def wrapped(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception: + logger.exception(traceback.format_exc()) + with contextlib.suppress(Exception): + await utils.answer( + args[1], + args[0].strings("err").format(traceback.format_exc()), + ) + + wrapped.__doc__ = func.__doc__ + wrapped.__module__ = func.__module__ + + return wrapped + + + @loader.loop(interval=90) + async def autobio(self): + try: + current_playback = self.sp.current_playback() + track = current_playback["item"]["name"] + track = re.sub(r"([(].*?[)])", "", track).strip() + except Exception: + return + + bio = self.config["auto_bio_template"].format(f"{track}") + + try: + await self._client( + UpdateProfileRequest(about=bio[: 140 if self._premium else 70]) + ) + except FloodWaitError as e: + logger.info(f"Sleeping {max(e.seconds, 60)} bc of floodwait") + await asyncio.sleep(max(e.seconds, 60)) + return + + async def _download_track(self, message, query: str, caption: str = ""): + dl_dir = os.path.join(os.getcwd(), "spotifymod") + if not os.path.exists(dl_dir): + os.makedirs(dl_dir, exist_ok=True) + + for f in os.listdir(dl_dir): + try: + os.remove(os.path.join(dl_dir, f)) + except: + pass + + try: + squery = query.replace('"', '').replace("'", "") + + cmd = ( + f'{self.config["ytdlp_path"]} -x --audio-format mp3 --add-metadata ' + f'-o "{dl_dir}/%(title)s [%(id)s].%(ext)s" ' + f'"ytsearch1:{squery}"' + ) + + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + await proc.communicate() + + files = [f for f in os.listdir(dl_dir) if f.endswith(".mp3")] + + if files: + target_file = os.path.join(dl_dir, files[0]) + await utils.answer(message, caption, file=target_file) + else: + await utils.answer(message, self.strings("snowt_failed")) + + except Exception as e: + logger.error(e) + await utils.answer(message, self.strings("dl_err")) + + finally: + if os.path.exists(dl_dir): + for f in os.listdir(dl_dir): + try: + os.remove(os.path.join(dl_dir, f)) + except: + pass + + + @error_handler + @tokenized + @loader.command( + ru_doc="- ➕ Добавить текущий трек в плейлист (используйте номер из .splaylists)" + ) + async def splaylistadd(self, message: Message): + """- ➕ Add current track to playlist (use number from .splaylists)""" + args = utils.get_args_raw(message) + if not args or not args.isdigit(): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + index = int(args) - 1 + playlists = self.get("last_playlists", []) + + if not playlists: + await utils.answer(message, self.strings("no_cached_playlists")) + return + + if index < 0 or index >= len(playlists): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + current = self.sp.current_playback() + if not current or not current.get("item"): + await utils.answer(message, self.strings("no_music")) + return + + track_uri = current["item"]["uri"] + track_name = current["item"]["name"] + artists = ", ".join([a["name"] for a in current["item"]["artists"]]) + full_track_name = f"{artists} - {track_name}" + + playlist_id = playlists[index]["id"] + playlist_name = playlists[index]["name"] + + try: + self.sp.playlist_add_items(playlist_id, [track_uri]) + except spotipy.exceptions.SpotifyException as e: + if e.http_status == 403 and "Insufficient client scope" in str(e): + await utils.answer(message, self.strings("need_auth")) + return + raise e + + await utils.answer(message, self.strings("added_to_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name))) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ➖ Удалить текущий трек из плейлиста (используйте номер из .splaylists)" + ) + async def splaylistrem(self, message: Message): + """- ➖ Remove current track from playlist (use number from .splaylists)""" + args = utils.get_args_raw(message) + if not args or not args.isdigit(): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + index = int(args) - 1 + playlists = self.get("last_playlists", []) + + if not playlists: + await utils.answer(message, self.strings("no_cached_playlists")) + return + + if index < 0 or index >= len(playlists): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + current = self.sp.current_playback() + if not current or not current.get("item"): + await utils.answer(message, self.strings("no_music")) + return + + track_uri = current["item"]["uri"] + track_name = current["item"]["name"] + artists = ", ".join([a["name"] for a in current["item"]["artists"]]) + full_track_name = f"{artists} - {track_name}" + + playlist_id = playlists[index]["id"] + playlist_name = playlists[index]["name"] + + try: + self.sp.playlist_remove_all_occurrences_of_items(playlist_id, [track_uri]) + except spotipy.exceptions.SpotifyException as e: + if e.http_status == 403 and "Insufficient client scope" in str(e): + await utils.answer(message, self.strings("need_auth")) + return + raise e + + await utils.answer(message, self.strings("removed_from_playlist").format(utils.escape_html(full_track_name), utils.escape_html(playlist_name))) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🆕 Создать новый плейлист" + ) + async def splaylistcreate(self, message: Message): + """- 🆕 Create a new playlist""" + name = utils.get_args_raw(message) + if not name: + await utils.answer(message, self.strings("no_playlist_name")) + return + + user_id = self.sp.me()["id"] + self.sp.user_playlist_create(user_id, name) + await utils.answer(message, self.strings("playlist_created").format(utils.escape_html(name))) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🗑 Удалить плейлист (используйте номер из .splaylists)" + ) + async def splaylistdelete(self, message: Message): + """- 🗑 Delete playlist (use number from .splaylists)""" + args = utils.get_args_raw(message) + if not args or not args.isdigit(): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + index = int(args) - 1 + playlists = self.get("last_playlists", []) + + if not playlists: + await utils.answer(message, self.strings("no_cached_playlists")) + return + + if index < 0 or index >= len(playlists): + await utils.answer(message, self.strings("invalid_playlist_index")) + return + + playlist_id = playlists[index]["id"] + playlist_name = playlists[index]["name"] + + self.sp.current_user_unfollow_playlist(playlist_id) + await utils.answer(message, self.strings("playlist_deleted").format(utils.escape_html(playlist_name))) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 📃 Получить все плейлисты" + ) + async def splaylists(self, message: Message): + """- 📃 Get all playlists""" + user_id = self.sp.me()["id"] + playlists = self.sp.current_user_playlists() + + editable_playlists = [] + for playlist in playlists["items"]: + if playlist["owner"]["id"] == user_id or playlist["collaborative"]: + editable_playlists.append(playlist) + + self.set("last_playlists", editable_playlists) + + playlist_list_text = "" + for i, playlist in enumerate(editable_playlists): + name = utils.escape_html(playlist["name"]) + url = playlist["external_urls"]["spotify"] + count = playlist["tracks"]["total"] + playlist_list_text += f"{i + 1}. {name} ({count} tracks)\n" + + if not playlist_list_text: + await utils.answer(message, self.strings("no_playlists")) + else: + await utils.answer(message, self.strings("playlists_list").format(playlist_list_text)) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ℹ️ Переключить стриминг воспроизведения в био" + ) + async def sbiocmd(self, message: Message): + """- ℹ️ Toggle bio playback streaming""" + current = self.get("autobio", False) + new = not current + self.set("autobio", new) + await utils.answer( + message, + self.strings("autobio").format("enabled" if new else "disabled"), + ) + + if new: + self.autobio.start() + else: + self.autobio.stop() + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🔊 Изменить громкость. .svolume <0-100>" + ) + async def svolume(self, message: Message): + """- 🔊 Change playback volume. .svolume <0-100>""" + try: + args = utils.get_args_raw(message) + if not args: + await utils.answer(message, self.strings("no_volume_arg")) + return + + volume_percent = int(args) + if 0 <= volume_percent <= 100: + self.sp.volume(volume_percent) + await utils.answer(message, self.strings("volume_changed").format(volume_percent)) + else: + await utils.answer(message, self.strings("volume_invalid")) + except ValueError: + await utils.answer(message, self.strings("volume_invalid")) + except Exception: + await utils.answer(message, self.strings("volume_err")) + + @error_handler + @tokenized + @loader.command( + ru_doc=( + "- 🎵 Выбрать устройство для воспроизведения. Например: .sdevice \n" + "- 📝 Показать список устройств: .sdevice" + ) + ) + async def sdevicecmd(self, message: Message): + """- 🎵 Set preferred playback device. Usage: .sdevice or .sdevice to list devices""" + args = utils.get_args_raw(message) + devices = self.sp.devices()["devices"] + + if not args: + if not devices: + await utils.answer(message, self.strings("no_devices_found")) + return + + device_list_text = "" + for i, device in enumerate(devices): + is_active = "(active)" if device["is_active"] else "" + device_list_text += ( + f"{i+1}. {device['name']}" + f" ({device['type']}) {is_active}\n" + ) + + await utils.answer(message, self.strings("device_list").format(device_list_text.strip())) + return + + device_id = None + try: + device_number = int(args) + if 0 < device_number <= len(devices): + device_id = devices[device_number - 1]["id"] + device_name = devices[device_number - 1]["name"] + else: + await utils.answer(message, self.strings("invalid_device_id")) + return + except ValueError: + found_device = next((d for d in devices if d["id"] == args.strip()), None) + if found_device: + device_id = found_device["id"] + device_name = found_device["name"] + else: + await utils.answer(message, self.strings("invalid_device_id")) + return + + self.sp.transfer_playback(device_id=device_id) + await utils.answer(message, self.strings("device_changed").format(device_name)) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 💫 Включить повтор трека" + ) + async def srepeatcmd(self, message: Message): + """- 💫 Repeat""" + self.sp.repeat("track") + await utils.answer(message, self.strings("on-repeat")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ✋ Остановить повтор" + ) + async def sderepeatcmd(self, message: Message): + """- ✋ Stop repeat""" + self.sp.repeat("context") + await utils.answer(message, self.strings("off-repeat")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 👉 Следующий трек" + ) + async def snextcmd(self, message: Message): + """- 👉 Next track""" + self.sp.next_track() + await utils.answer(message, self.strings("skipped")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🤚 Продолжить воспроизведение" + ) + async def sresumecmd(self, message: Message): + """- 🤚 Resume""" + self.sp.start_playback() + await utils.answer(message, self.strings("playing")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🤚 Пауза" + ) + async def spausecmd(self, message: Message): + """- 🤚 Pause""" + self.sp.pause_playback() + await utils.answer(message, self.strings("paused")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ⏮ Предыдущий трек" + ) + async def sbackcmd(self, message: Message): + """- ⏮ Previous track""" + self.sp.previous_track() + await utils.answer(message, self.strings("back")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ⏪ Перезапустить трек" + ) + async def sbegincmd(self, message: Message): + """- ⏪ Restart track""" + self.sp.seek_track(0) + await utils.answer(message, self.strings("restarted")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- ❤️ Лайкнуть играющий трек" + ) + async def slikecmd(self, message: Message): + """- ❤️ Like current track""" + cupl = self.sp.current_playback() + self.sp.current_user_saved_tracks_add([cupl["item"]["id"]]) + await utils.answer(message, self.strings("liked")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 💔 Убрать лайк с играющего трека" + ) + async def sunlikecmd(self, message: Message): + """- 💔 Unlike current track""" + cupl = self.sp.current_playback() + self.sp.current_user_saved_tracks_delete([cupl["item"]["id"]]) + await utils.answer(message, self.strings("unlike")) + + @error_handler + @loader.command( + ru_doc="- Получить ссылку для авторизации" + ) + async def sauthcmd(self, message: Message): + """- Get authorization link""" + if self.get("acs_tkn", False) and not self.sp: + await utils.answer(message, self.strings("already_authed")) + else: + self.sp_auth.get_authorize_url() + await utils.answer( + message, + self.strings("auth").format(self.sp_auth.get_authorize_url()), + ) + + @error_handler + @loader.command( + ru_doc="- Вставить код авторизации" + ) + async def scodecmd(self, message: Message): + """- Paste authorization code""" + url = message.message.split(" ")[1] + code = self.sp_auth.parse_auth_response_url(url) + self.set("acs_tkn", self.sp_auth.get_access_token(code, True, False)) + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) + await utils.answer(message, self.strings("authed")) + + @error_handler + @loader.command( + ru_doc="- Выйти из аккаунта" + ) + async def unauthcmd(self, message: Message): + """- Log out of account""" + self.set("acs_tkn", None) + del self.sp + await utils.answer(message, self.strings("deauth")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- Обновить токен авторизации" + ) + async def stokrefreshcmd(self, message: Message): + """- Refresh authorization token""" + self.set( + "acs_tkn", + self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]), + ) + self.set("NextRefresh", time.time() + 45 * 60) + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) + await utils.answer(message, self.strings("authed")) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🎧 Показать карточку играющего трека" + ) + async def snowcmd(self, message: Message): + """- 🎧 View current track card.""" + current_playback = self.sp.current_playback() + if not current_playback or not current_playback.get("is_playing", False): + await utils.answer(message, self.strings("no_music")) + return + + track = current_playback["item"]["name"] + track_id = current_playback["item"]["id"] + artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]]) + album_name = current_playback["item"]["album"].get("name", "Unknown Album") + duration_ms = current_playback["item"].get("duration_ms", 0) + progress_ms = current_playback.get("progress_ms", 0) + + duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}" + progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}" + + spotify_url = f"https://open.spotify.com/track/{track_id}" + songlink = f"https://song.link/s/{track_id}" + + try: + device_raw = ( + current_playback["device"]["name"] + + " " + + current_playback["device"]["type"].lower() + ) + device = device_raw.replace("computer", "").replace("smartphone", "").strip() + except Exception: + device = None + + try: + playlist_id = current_playback["context"]["uri"].split(":")[-1] + playlist = self.sp.playlist(playlist_id) + playlist_name = playlist.get("name", None) + try: + playlist_owner = ( + f'' + f'{playlist["owner"]["display_name"]}' + ) + except KeyError: + playlist_owner = playlist.get("owner", {}).get("display_name", "") + except Exception: + playlist_name = "" + playlist_owner = "" + + text = self.config["custom_text"].format( + track=utils.escape_html(track), + artists=utils.escape_html(artists), + album=utils.escape_html(album_name), + duration=duration, + progress=progress, + device=device, + spotify_url=spotify_url, + songlink=songlink, + playlist=utils.escape_html(playlist_name) if playlist_name else "", + playlist_owner=playlist_owner or "", + ) + + if self.config["show_banner"]: + cover_url = current_playback["item"]["album"]["images"][0]["url"] + + tmp_msg = await utils.answer(message, text + self.strings("uploading_banner")) + + banners = Banners( + title=track, + artists=artists, + duration=duration_ms, + progress=progress_ms, + track_cover=requests.get(cover_url).content, + font=self.config["font"], + ) + file = getattr(banners, self.config["banner_version"], banners.horizontal)() + + await utils.answer(tmp_msg, text, file=file) + else: + await utils.answer(message, text) + + @error_handler + @tokenized + @loader.command( + ru_doc="- 🎧 Скачать играющий трек" + ) + async def snowtcmd(self, message: Message): + """- 🎧 Download current track.""" + current_playback = self.sp.current_playback() + if not current_playback or not current_playback.get("is_playing", False): + await utils.answer(message, self.strings("no_music")) + return + + track = current_playback["item"]["name"] + artists = ", ".join([a["name"] for a in current_playback["item"]["artists"]]) + album_name = current_playback["item"]["album"].get("name", "Unknown Album") + duration_ms = current_playback["item"].get("duration_ms", 0) + progress_ms = current_playback.get("progress_ms", 0) + + duration = f"{duration_ms//1000//60}:{duration_ms//1000%60:02}" + progress = f"{progress_ms//1000//60}:{progress_ms//1000%60:02}" + + spotify_url = f"https://open.spotify.com/track/{current_playback['item']['id']}" + songlink = f"https://song.link/s/{current_playback['item']['id']}" + + try: + device_raw = ( + current_playback["device"]["name"] + + " " + + current_playback["device"]["type"].lower() + ) + device = device_raw.replace("computer", "").replace("smartphone", "").strip() + except Exception: + device = None + + try: + playlist_id = current_playback["context"]["uri"].split(":")[-1] + playlist = self.sp.playlist(playlist_id) + playlist_name = playlist.get("name", None) + try: + playlist_owner = ( + f'' + f'{playlist["owner"]["display_name"]}' + ) + except KeyError: + playlist_owner = playlist.get("owner", {}).get("display_name", "") + except Exception: + playlist_name = "" + playlist_owner = "" + + text = self.config["custom_text"].format( + track=utils.escape_html(track), + artists=utils.escape_html(artists), + album=utils.escape_html(album_name), + duration=duration, + progress=progress, + device=device, + spotify_url=spotify_url, + songlink=songlink, + playlist=utils.escape_html(playlist_name) if playlist_name else "", + playlist_owner=playlist_owner or "", + ) + + msg = await utils.answer(message, text + self.strings("downloading_track")) + + await self._download_track(msg, f"{artists} {track}", caption=text) + + @error_handler + @tokenized + @loader.command( + ru_doc=( + "- 🔍 Поиск треков. Например: .ssearch Imagine Dragons Believer\n" + "- 🎧 Скачать трек: .ssearch 1 (где 1 — номер трека из списка)" + ) + ) + async def ssearchcmd(self, message: Message): + """🔍 Search for tracks. Usage: .ssearch or .ssearch to download""" + args = utils.get_args_raw(message) + if not args: + await utils.answer(message, self.strings("no_search_query")) + return + + try: + track_number = int(args) + search_results = self.get("last_search_results", []) + + if not search_results: + await utils.answer(message, self.strings("no_tracks_found")) + return + + if track_number <= 0 or track_number > len(search_results): + raise ValueError + + msg = await utils.answer(message, self.strings("downloading_track")) + + track_info = search_results[track_number - 1] + track_name = track_info["name"] + artists = ", ".join([a["name"] for a in track_info["artists"]]) + + caption_text = self.strings("download_success").format( + utils.escape_html(track_name), + utils.escape_html(artists) + ) + + await self._download_track(msg, f"{artists} {track_name}", caption=caption_text) + return + + except ValueError: + await utils.answer(message, self.strings("searching_tracks").format(args)) + + results = self.sp.search(q=args, limit=5, type="track") + + if not results or not results["tracks"]["items"]: + await utils.answer(message, self.strings("no_tracks_found").format(args)) + return + + self.set("last_search_results", results["tracks"]["items"]) + + tracks_list = [] + for i, track in enumerate(results["tracks"]["items"]): + track_name = track["name"] + artists = ", ".join([artist["name"] for artist in track["artists"]]) + track_url = track["external_urls"]["spotify"] + tracks_list.append( + "{number}. {track_name} — {artists}\n🔗 Spotify".format( + number=i + 1, + track_name=utils.escape_html(track_name), + artists=utils.escape_html(artists), + track_url=track_url, + ) + ) + + text = "\n".join(tracks_list) + await utils.answer(message, self.strings("search_results").format(args, text)) + + + @loader.command( + ru_doc="- 🔄 Сброс результатов поиска по трекам" + ) + async def ssearchresetcmd(self, message: Message): + """- 🔄 Reset track search results""" + self.set("last_search_results", []) + await utils.answer(message, self.strings["search_results_cleared"]) + + async def watcher(self, message: Message): + """Watcher is used to update token""" + if not self.sp: + return + + if self.get("NextRefresh", False): + ttc = self.get("NextRefresh", 0) + crnt = time.time() + if ttc < crnt: + self.set( + "acs_tkn", + self.sp_auth.refresh_access_token( + self.get("acs_tkn")["refresh_token"] + ), + ) + self.set("NextRefresh", time.time() + 45 * 60) + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) + else: + self.set( + "acs_tkn", + self.sp_auth.refresh_access_token(self.get("acs_tkn")["refresh_token"]), + ) + self.set("NextRefresh", time.time() + 45 * 60) + self.sp = spotipy.Spotify(auth=self.get("acs_tkn")["access_token"]) diff --git a/radiocycle/Modules/UnbanAll.py b/radiocycle/Modules/UnbanAll.py new file mode 100644 index 0000000..922fc3d --- /dev/null +++ b/radiocycle/Modules/UnbanAll.py @@ -0,0 +1,74 @@ +# ======================================= +# _ __ __ __ _ +# | |/ /___ | \/ | ___ __| |___ +# | ' // _ \ | |\/| |/ _ \ / _` / __| +# | . \ __/ | | | | (_) | (_| \__ \ +# |_|\_\___| |_| |_|\___/ \__,_|___/ +# @ke_mods +# ======================================= +# +# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) +# -------------------------------------- +# https://creativecommons.org/licenses/by-nd/4.0/legalcode +# ======================================= + +# meta developer: @ke_mods + +from .. import loader, utils +from telethon.tl.types import ChatBannedRights +from telethon.tl.functions.channels import EditBannedRequest +from telethon.tl.types import ChannelParticipantsKicked + +@loader.tds +class UnbanAllMod(loader.Module): + strings = { + "name": "UnbanAll", + "no_rights": "❌ I don't have administrator rights to remove restrictions.", + "success": "✅ All banned chat members have been unbanned.", + "unban_in_process": "👀 Unbanning users...", + "no_banned": "ℹ️ There are no banned members in this chat.", + "error_occured": "💢 An error occurred while unbanning user {}:\n{}", + } + strings_ru = { + "no_rights": "❌ У меня нет прав администратора для снятия ограничений.", + "success": "✅ Все забаненные участники чата были разблокированы.", + "unban_in_process": "👀 Разбаниваю пользователей...", + "no_banned": "ℹ️ В этом чате нет забаненных участников.", + "error_occured": "💢 Произошла ошибка при разблокировке пользователя {}:\n{}", + } + + @loader.command(ru_doc="- Разбанить всех забаненных пользователей") + async def unbanallcmd(self, message): + """- Unban all banned users""" + chat = await message.get_chat() + + if not chat.admin_rights and not chat.creator: + await utils.answer(message, self.strings("no_rights")) + return + + await utils.answer(message, self.strings("unban_in_process")) + + no_banned = True + + async for user in self.client.iter_participants( + message.chat_id, filter=ChannelParticipantsKicked + ): + + no_banned = False + + try: + await self.client(EditBannedRequest( + message.chat_id, + user.id, + ChatBannedRights(until_date=0) + )) + + except Exception as e: + await utils.answer(message, self.strings("error_occured").format(user.id, e)) + pass + + if no_banned: + await utils.answer(message, self.strings("no_banned")) + return + + await utils.answer(message, self.strings("success")) diff --git a/radiocycle/Modules/full.txt b/radiocycle/Modules/full.txt new file mode 100644 index 0000000..699e87a --- /dev/null +++ b/radiocycle/Modules/full.txt @@ -0,0 +1,7 @@ +Neofetch +randomanimepic +SpotifyMod +UnbanAll +voicetotext +LastFm +PicToStories diff --git a/radiocycle/Modules/randomanimepic.py b/radiocycle/Modules/randomanimepic.py new file mode 100644 index 0000000..aa7766b --- /dev/null +++ b/radiocycle/Modules/randomanimepic.py @@ -0,0 +1,65 @@ +# ======================================= +# _ __ __ __ _ +# | |/ /___ | \/ | ___ __| |___ +# | ' // _ \ | |\/| |/ _ \ / _` / __| +# | . \ __/ | | | | (_) | (_| \__ \ +# |_|\_\___| |_| |_|\___/ \__,_|___/ +# @ke_mods +# ======================================= +# +# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) +# -------------------------------------- +# https://creativecommons.org/licenses/by-nd/4.0/legalcode +# ======================================= + +# meta developer: @ke_mods + +import requests +import asyncio +import logging +import traceback +from logging import basicConfig +from .. import loader, utils + +basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +@loader.tds +class RandomAnimePicMod(loader.Module): + strings = { + "name": "RandomAnimePic", + "img": " Your anime pic\n🔗 URL: {}", + "loading": " Loading image...", + "error": "🚫 An unexpected error occurred...", + } + + strings_ru = { + "img": " Ваша аниме-картинка\n🔗 Ссылка: {}", + "loading": " Загрузка изображения...", + "error": "🚫 Произошла непредвиденная ошибка...", + } + + @loader.command( + ru_doc="- получить рандомную аниме-картинку 👀" + ) + async def rapiccmd(self, message): + """- fetch random anime-pic 👀""" + + await utils.answer(message, self.strings("loading")) + + try: + res = requests.get("https://api.nekosia.cat/api/v1/images/cute?count=1") + res.raise_for_status() + data = res.json() + image_url = data['image']['original']['url'] + + await asyncio.sleep(2) + + await utils.answer(message, self.strings("img").format(image_url), file=image_url, reply_to=message.reply_to_msg_id) + + except Exception: + logger.error("Error fetching random anime pic: %s", traceback.format_exc()) + + await utils.answer(message, self.strings("error")) + + await asyncio.sleep(5) diff --git a/radiocycle/Modules/voicetotext.py b/radiocycle/Modules/voicetotext.py new file mode 100644 index 0000000..0e4cfd5 --- /dev/null +++ b/radiocycle/Modules/voicetotext.py @@ -0,0 +1,77 @@ +# ======================================= +# _ __ __ __ _ +# | |/ /___ | \/ | ___ __| |___ +# | ' // _ \ | |\/| |/ _ \ / _` / __| +# | . \ __/ | | | | (_) | (_| \__ \ +# |_|\_\___| |_| |_|\___/ \__,_|___/ +# @ke_mods +# ======================================= +# +# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International) +# -------------------------------------- +# https://creativecommons.org/licenses/by-nd/4.0/legalcode +# ======================================= + +# meta developer: @ke_mods +# scope: ffmpeg +# requires: pydub SpeechRecognition + +from .. import loader, utils +import os +import speech_recognition as sr +from pydub import AudioSegment + +@loader.tds +class VoiceToTextMod(loader.Module): + strings = { + "name": "VoiceToText", + "process_text": " Recognizing the message text...", + "vtt_success": "🔥 Recognized text:\n
{}
", + "vtt_failure": "🚫 Failed to recognize the message.", + "vtt_request_error": "🚫 Error when contacting the recognition service:\n{}", + "vtt_invalid": "🚫 Please reply to a voice or video message with the command {}vtt", + "vtt_successful": " Text recognized successfully", + } + + strings_ru = { + "process_text": " Распознаю текст сообщения...", + "vtt_success": "🔥 Распознанный текст:\n
{}
", + "vtt_failure": "🚫 Не удалось распознать сообщение.", + "vtt_request_error": "🚫 Ошибка при обращении к сервису распознавания:\n{}", + "vtt_invalid": "🚫 Пожалуйста, ответьте на голосовое или видеосообщение командой {}vtt", + "vtt_successful": " Текст успешно распознан", + } + + @loader.command( + ru_doc="- распознает текст из голосового или видеосообщения.", + ) + async def vttcmd(self, message): + """- recognizes text from voice or video messages.""" + reply = await message.get_reply_message() + + if not reply or not (reply.voice or reply.video_note): + await utils.answer(message, self.strings["vtt_invalid"].format(self.get_prefix())) + return + + msg = await utils.answer( + message, self.strings["process_text"], reply_to=message.id + ) + + media_file = await reply.download_media() + wav_file = media_file.replace('.mp4', '.wav') if reply.video_note else media_file.replace('.oga', '.wav') + + try: + AudioSegment.from_file(media_file).export(wav_file, format='wav') + recognizer = sr.Recognizer() + with sr.AudioFile(wav_file) as source: + audio_data = recognizer.record(source) + try: + text = recognizer.recognize_google(audio_data, language='ru-RU') + await utils.answer(msg, self.strings["vtt_success"].format(text)) + except sr.UnknownValueError: + await utils.answer(msg, self.strings["vtt_failure"]) + except sr.RequestError as e: + await utils.answer(msg, self.strings["vtt_request_error"].format(e)) + finally: + os.remove(media_file) + os.remove(wav_file) diff --git a/yummy1gay/limoka/yg_quotes.py b/yummy1gay/limoka/yg_quotes.py index b2f6231..28f89ac 100644 --- a/yummy1gay/limoka/yg_quotes.py +++ b/yummy1gay/limoka/yg_quotes.py @@ -1,396 +1,409 @@ -__version__ = (1, 1, 1, 1) - -# This file is a part of Hikka Userbot! -# This product includes software developed by t.me/Fl1yd and t.me/spypm. -# Based on the "SQuotes" module. - -# 🌐 https://github.com/hikariatama/Hikka - -# You CAN edit this file without direct permission from the author. -# You can redistribute this file with any modifications. - -# thx to t.me/LyoSU for github.com/LyoSU/quote-api - -# meta developer: @yg_modules -# scope: hikka_only -# scope: hikka_min 1.6.3 - -# █▄█ █░█ █▀▄▀█ █▀▄▀█ █▄█   █▀▄▀█ █▀█ █▀▄ █▀ -# ░█░ █▄█ █░▀░█ █░▀░█ ░█░   █░▀░█ █▄█ █▄▀ ▄█ - -import base64, io, requests, telethon -from time import gmtime -from typing import List, Optional, Tuple, Union -from PIL import Image, ImageDraw -from telethon.tl import types -from telethon.extensions import html -from telethon.tl.patched import Message - -from .. import loader, utils - -class Dick: - @staticmethod - def ents(es: types.TypeMessageEntity) -> List[dict]: - out: List[dict] = [] - if not es: return out - for e in es: - try: - d = e.to_dict(); t = d.pop("_","").replace("MessageEntity","").lower() - if not t: continue - mt = {"bold": "bold","italic": "italic","underline": "underline","strikethrough": "strikethrough", - "code": "code","pre": "pre","texturl": "text_link","url": "url","email": "email", - "phone": "phone_number","mention": "mention", - "mentionname": "text_mention","hashtag": "hashtag","cashtag": "cashtag", - "botcommand": "bot_command","spoiler": "spoiler","customemoji": "custom_emoji"}.get(t,t) - it = {"type": mt,"offset": d.get("offset",0),"length": d.get("length",0)} - if t=="texturl": it["url"]=d.get("url","") - elif t=="mentionname": it["user"]={"id": d.get("user_id",0)} - elif t=="customemoji": it["custom_emoji_id"]=str(d.get("document_id","")) - elif t=="pre": it["language"]=d.get("language","") - out.append(it) - except Exception: continue - return out - - @staticmethod - def dur(s: Union[int,float]) -> str: - t=gmtime(s); return (f"{t.tm_hour:02d}:" if t.tm_hour>0 else "")+f"{t.tm_min:02d}:{t.tm_sec:02d}" - - @staticmethod - def desc(m: Message, rep: bool=False) -> str: - return ( - "📷 Фото" if m.photo and rep else - (m.file.emoji+" Стикер") if m.sticker and rep else - "📹 Видеосообщение" if m.video_note and rep else - "📹 Видео" if m.video and rep else - "🖼 GIF" if m.gif else - "📊 Опрос" if m.poll else - "📍 Местоположение" if m.geo else - "👤 Контакт" if m.contact else - (f"🎵 Голосовое сообщение: {Dick.dur(m.voice.attributes[0].duration)}" if m.voice else - (f"🎧 Музыка: {Dick.dur(m.audio.attributes[0].duration)} | {m.audio.attributes[0].performer} - {m.audio.attributes[0].title}" if m.audio else - (f"💾 Файл: {m.file.name}" if isinstance(m.media, types.MessageMediaDocument) and not Dick.pick(m) else - (f"{m.media.emoticon} Кость: {m.media.value}" if isinstance(m.media, types.MessageMediaDice) else - (f"Сервисное сообщение: {m.action.to_dict().get('_')}" if isinstance(m, types.MessageService) else "")))))) #))) - - @staticmethod - def split(name: Optional[str]) -> Tuple[str,str]: - if not name: return "","" - p=name.split(); return (p[0], " ".join(p[1:]) if len(p)>1 else "") - - @staticmethod - def pick(m: Message): - if m and m.media: - return m.photo or m.sticker or m.video or m.video_note or m.gif or m.web_preview - return None - - @staticmethod - def wf(b: Optional[bytes]) -> List[int]: - if not b: return [] - n=(len(b)*8)//5 - if not n: return [] - out: List[int]=[] - last=n-1 - for i in range(last): - j=i*5; bi,sh=j//8,j%8 - v=int.from_bytes(b[bi:bi+2],"little") if bi+1>sh)&0b11111) - j=last*5; bi,sh=j//8,j%8 - v=int.from_bytes(b[bi:bi+2],"little") if bi+1>sh)&0b11111) - return out - - @staticmethod - async def img(b: bytes, circle: bool=False) -> Optional[str]: - try: - im=Image.open(io.BytesIO(b)) - if im.mode!="RGBA": im=im.convert("RGBA") - if circle: - size=min(im.size) - mask=Image.new("L",(size,size),0); ImageDraw.Draw(mask).ellipse((0,0,size,size),fill=255) - sq=Image.new("RGBA",(size,size),(0,0,0,0)) - off=((size-im.width)//2,(size-im.height)//2); sq.paste(im,off) - im=Image.composite(sq,Image.new("RGBA",(size,size),(0,0,0,0)),mask) - o=io.BytesIO(); im.save(o,format="PNG") - return f"data:image/png;base64,{base64.b64encode(o.getvalue()).decode()}" - except Exception: - return None - - @staticmethod - async def stc(b: bytes) -> Optional[str]: - try: - im=Image.open(io.BytesIO(b)) - if im.mode not in ("RGBA","LA"): im=im.convert("RGBA") - elif im.mode=="LA": im=im.convert("RGBA") - o=io.BytesIO(); im.save(o,format="PNG") - return f"data:image/png;base64,{base64.b64encode(o.getvalue()).decode()}" - except Exception: - return None - - @staticmethod - async def proc(cli, obj, m: Message) -> Optional[dict]: - try: - if m.voice: - for a in m.voice.attributes or []: - if getattr(a,"voice",False) and hasattr(a,"waveform"): - return {"voice":{"waveform":Dick.wf(a.waveform)}} - b: bytes = await cli.download_media(obj, bytes, thumb=-1) - if not b: return None - if m.sticker: - u=await Dick.stc(b); return {"url": u} if u else None - u=await Dick.img(b, circle=bool(m.video_note)) - return {"url": u} if u else None - except Exception: - return None - - @staticmethod - async def ava(cli, uid: int) -> Optional[str]: - try: - b=await cli.download_profile_photo(uid, bytes) - if b: return f"data:image/jpeg;base64,{base64.b64encode(b).decode()}" - except Exception: pass - return None - - @staticmethod - async def post(url: str, data: dict): - try: - return await utils.run_sync(requests.post, url, json=data, timeout=30) - except Exception: - return None - -@loader.tds -class Quotes(loader.Module): - """Модуль для создания цитат из сообщений""" - - strings = {"name": "yg_quotes", - "no_reply": "🏳️‍🌈 Нет реплая на сообщение", - "processing": "🏳️‍🌈 Обработка…", - "api_processing": "🏳️‍🌈 Ожидание ответа API…", - "api_error": "🏳️‍🌈 Ошибка API: {}", - "loading_media": "🏳️‍🌈 Отправка…", - "no_args_or_reply": "🏳️‍🌈 Нет аргументов или реплая", - "args_error": "🏳️‍🌈 Ошибка разбора аргументов. Запрос: {}", - "too_many_messages": "🏳️‍🌈 Слишком много сообщений. Максимум: {}"} - - def __init__(self): - self.config=loader.ModuleConfig( - loader.ConfigValue("type","quote", - lambda:"Тип цитаты", - validator=loader.validators.Choice(["quote", "stories"])), - loader.ConfigValue("bg_color","#162330", - lambda:"Цвет фона цитаты (например, #1a1a1a или red)"), - loader.ConfigValue("width",512, - lambda:"Ширина цитаты (px)", - validator=loader.validators.Integer(minimum=200,maximum=2000)), - loader.ConfigValue("height",768, - lambda:"Высота цитаты (px)", - validator=loader.validators.Integer(minimum=200,maximum=2000)), - loader.ConfigValue("scale",2, - lambda:"Масштаб рендера", - validator=loader.validators.Choice([1, 2, 3])), - loader.ConfigValue("emoji_brand","apple", - lambda:"Стиль эмодзи (apple, google, twitter и т.д.)"), - loader.ConfigValue("max_messages",15, - lambda:"Максимальное число сообщений в цитате", - validator=loader.validators.Integer(minimum=1,maximum=50)), - loader.ConfigValue("endpoint","https://kok.gay/gayotes/generate", - lambda:"URL API-эндпоинта (можешь поднять локально - github.com/yummy1gay/quote-api)", - validator=loader.validators.Link())) - - async def client_ready(self, client, db): - self.client=client; self.db=db - - async def qcmd(self, m: Message): - """ - Обычные цитаты: - • .q — процитировать одно сообщение из реплая - • .q 2 — процитировать 2 сообщения - • .q 3 #2d2d2d — 3 сообщения на тёмном фоне - • .q pink — фон по имени цвета - • .q !file — отправить как файл (PNG) - """ - try: - args=utils.get_args(m); rep=await m.get_reply_message() - if not rep: return await utils.answer(m,self.strings["no_reply"]) - st=await utils.answer(m,self.strings["processing"]) - doc="!file" in args - n=next((int(a) for a in args if a.isdigit() and int(a)>0),1) - bg=next((a for a in args if a!="!file" and not a.isdigit()), self.config["bg_color"]) - if n>self.config["max_messages"]: - return await utils.answer(st,self.strings["too_many_messages"].format(self.config["max_messages"])) - - js=await self.parse(m,n) - if not js: return await utils.answer(st,self.strings["api_error"].format("Не удалось собрать сообщения")) - - pay={"backgroundColor":bg,"width":self.config["width"],"height":self.config["height"], - "scale":self.config["scale"],"emojiBrand":self.config["emoji_brand"],"messages":js, - "format": "webp" if not doc else "png", "type": self.config["type"]} - - await utils.answer(st,self.strings["api_processing"]) - r=await Dick.post(f"{self.config['endpoint']}.webp",pay) - if not r or r.status_code!=200: - try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь" - except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь" - return await utils.answer(st,self.strings["api_error"].format(err)) - - buf=io.BytesIO(r.content); buf.name="YgQuote"+(".png" if doc else ".webp") - await utils.answer(st,buf,force_document=doc) - except Exception as e: - return await utils.answer(m,f"🏳️‍🌈 Ошибка: {e}") - - async def fqcmd(self, m: Message): - """ - Фейковые цитаты: - • .fq <@ или ID> <текст> — цитата от пользователя - • .fq <текст> — цитата от автора реплая - • .fq <@/ID> <текст> -r <@/ID> <текст> — с ответом - • .fq user1 текст; user2 текст — несколько сообщений - """ - try: - raw=utils.get_args_html(m); rep=await m.get_reply_message() - if not (raw or rep): return await utils.answer(m,self.strings["no_args_or_reply"]) - st= await utils.answer(m,self.strings["processing"]) - try: js=await self.fake(raw,rep) - except (IndexError,ValueError): return await utils.answer(st,self.strings["args_error"].format(m.text)) - if len(js)>self.config["max_messages"]: - return await utils.answer(st,self.strings["too_many_messages"].format(self.config["max_messages"])) - - dickk={"backgroundColor":self.config["bg_color"],"width":self.config["width"],"height":self.config["height"], - "scale":self.config["scale"],"emojiBrand":self.config["emoji_brand"],"messages":js, - "format": "webp","type":self.config["type"]} - - await utils.answer(st,self.strings["api_processing"]) - r=await Dick.post(f"{self.config['endpoint']}.webp",dickk) - if not r or r.status_code!=200: - try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь" - except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь" - return await utils.answer(st,self.strings["api_error"].format(err)) - - buf=io.BytesIO(r.content); buf.name="YgQuote.webp" - await utils.answer(st,buf) - except Exception as e: - return await utils.answer(m,f"🏳️‍🌈 Ошибка: {e}") - - async def parse(self, trg: Message, n: int) -> Optional[List[dict]]: - try: - rep= await trg.get_reply_message() - lst: List[Message]=[mm async for mm in self.client.iter_messages(trg.chat_id,limit=n,reverse=True,add_offset=1,offset_id=rep.id if rep else None)] - except Exception: - return None - - out: List[dict]=[] - for mm in lst: - try: - u=await self.who(mm) - if not u: continue - name=telethon.utils.get_display_name(u); f,l=Dick.split(name) - ava=await Dick.ava(self.client,getattr(u,"id",0)) if getattr(u,"id",None) else None - - rb=None - try: - r=await mm.get_reply_message() - if r: - rname=telethon.utils.get_display_name(r.sender) - rtxt=Dick.desc(r,True) - if r.raw_text: rtxt=(rtxt+". "+r.raw_text) if rtxt else r.raw_text - rb={"name":rname,"text":rtxt or "","entities":Dick.ents(r.entities), - "chatId": r.sender.id if r.sender else mm.chat_id,"from":{"name":rname}} - except Exception: rb=None - - med=None; obj=Dick.pick(mm) - if obj: med=await Dick.proc(self.client,obj,mm) - - txt=mm.raw_text or ""; ad=Dick.desc(mm) - if ad: txt=f"{txt}\n\n{ad}" if txt else ad - - item={"from":{"id":getattr(u,"id", 0),"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l, - "username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}}, - "text":txt,"entities":Dick.ents(mm.entities),"avatar":True} - - try: - if mm.voice: - a = next((a for a in mm.voice.attributes or [] - if getattr(a, "voice", False) and hasattr(a, "waveform")), None) - if a: item["voice"] = {"waveform": Dick.wf(a.waveform)} - except Exception: pass - - if med: item["voice" if "voice" in med else "media"] = med.get("voice", med) - - es=getattr(u,"emoji_status",None) - if getattr(es,"document_id",None): item["from"]["emoji_status"]=str(es.document_id) - if rb: item["replyMessage"]=rb - out.append(item) - except Exception: continue - return out - - async def who(self, m: Message): - try: - if m.fwd_from: - if m.fwd_from.from_id: - pid=m.fwd_from.from_id - uid=pid.channel_id if isinstance(pid, types.PeerChannel) else pid.user_id - try: return await self.client.get_entity(uid) - except Exception: return m.sender - if m.fwd_from.from_name: - return types.User( - id=hash(m.fwd_from.from_name)%2147483647, first_name=m.fwd_from.from_name, - username=None, phone=None, bot=False, verified=False, restricted=False, - scam=False, fake=False, premium=False) - return m.sender - except Exception: - return m.sender - - async def fake(self, args: str, rep: Optional[Message]) -> List[dict]: - async def tok(ch: str): - p=ch.split() - if not p: return None,"" - who=p[0]; tx=ch.split(maxsplit=1)[1] if len(p)>1 else "" - try: - u=await self.client.get_entity(int(who) if who.isdigit() else who) - return u,tx - except Exception: - return None,tx - - if rep and not args: - u=rep.sender; name=telethon.utils.get_display_name(u); f,l=Dick.split(name) - ava=await Dick.ava(self.client,u.id) if getattr(u,"id",None) else None - msg={"from":{"id":u.id,"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l, - "username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}}, - "text":"","entities":[], "avatar":True} - es=getattr(u,"emoji_status",None) - if getattr(es,"document_id", None): msg["from"]["emoji_status"]=str(es.document_id) - return [msg] - - if rep and args: - u=rep.sender - return await self.fake(f"{getattr(u,'id','')} {args}", None) - - out: List[dict]=[] - for part in args.split("; "): - try: - rb=None - if " -r " in part: - a,b=part.split(" -r ",1); u1,t1=await tok(a); u2,t2=await tok(b) - else: - u1,t1=await tok(part); u2,t2=None,None - if not u1: continue - - txt1, ents1 = html.parse(t1) if t1 else ("", []) - - name=telethon.utils.get_display_name(u1); f,l=Dick.split(name) - ava=await Dick.ava(self.client,u1.id) - - if u2: - txt2, ents2 = html.parse(t2) if t2 else ("", []) - name2=telethon.utils.get_display_name(u2); ava2=await Dick.ava(self.client,u2.id) - rb={"name":name2,"text":txt2,"entities":Dick.ents(ents2),"chatId":u2.id,"from":{"name":name2,"photo":{"url":ava2} if ava2 else {}}} - - msg={"from":{"id":u1.id,"first_name":getattr(u1,"first_name","") or f,"last_name":getattr(u1,"last_name","") or l, - "username":getattr(u1,"username",None),"name":name,"photo":{"url":ava} if ava else {}}, - "text":txt1,"entities":Dick.ents(ents1), "avatar":True} - - es=getattr(u1,"emoji_status",None) - if getattr(es,"document_id",None): msg["from"]["emoji_status"]=str(es.document_id) - if rb: msg["replyMessage"]=rb - out.append(msg) - except Exception: continue +__version__ = (1, 2, 0, 0) + +# This file is a part of Hikka Userbot! +# This product includes software developed by t.me/Fl1yd and t.me/spypm. +# Based on the "SQuotes" module. + +# 🌐 https://github.com/hikariatama/Hikka + +# You CAN edit this file without direct permission from the author. +# You can redistribute this file with any modifications. + +# thx to t.me/LyoSU for github.com/LyoSU/quote-api + +# meta developer: @yg_modules +# scope: hikka_only +# scope: hikka_min 1.6.3 + +# Changelog v1.2: +# - Added: Proxy for users from RF +# - Fixed: Correct reply author resolving for forwarded messages + +# █▄█ █░█ █▀▄▀█ █▀▄▀█ █▄█   █▀▄▀█ █▀█ █▀▄ █▀ +# ░█░ █▄█ █░▀░█ █░▀░█ ░█░   █░▀░█ █▄█ █▄▀ ▄█ + +import base64, io, requests, telethon +from time import gmtime +from typing import List, Optional, Tuple, Union +from PIL import Image, ImageDraw +from telethon.tl import types +from telethon.extensions import html +from telethon.tl.patched import Message + +from .. import loader, utils + +class Dick: + @staticmethod + def ents(es: types.TypeMessageEntity) -> List[dict]: + out: List[dict] = [] + if not es: return out + for e in es: + try: + d = e.to_dict(); t = d.pop("_","").replace("MessageEntity","").lower() + if not t: continue + mt = {"bold": "bold","italic": "italic","underline": "underline","strikethrough": "strikethrough", + "code": "code","pre": "pre","texturl": "text_link","url": "url","email": "email", + "phone": "phone_number","mention": "mention", + "mentionname": "text_mention","hashtag": "hashtag","cashtag": "cashtag", + "botcommand": "bot_command","spoiler": "spoiler","customemoji": "custom_emoji"}.get(t,t) + it = {"type": mt,"offset": d.get("offset",0),"length": d.get("length",0)} + if t=="texturl": it["url"]=d.get("url","") + elif t=="mentionname": it["user"]={"id": d.get("user_id",0)} + elif t=="customemoji": it["custom_emoji_id"]=str(d.get("document_id","")) + elif t=="pre": it["language"]=d.get("language","") + out.append(it) + except Exception: continue + return out + + @staticmethod + def dur(s: Union[int,float]) -> str: + t=gmtime(s); return (f"{t.tm_hour:02d}:" if t.tm_hour>0 else "")+f"{t.tm_min:02d}:{t.tm_sec:02d}" + + @staticmethod + def desc(m: Message, rep: bool=False) -> str: + return ( + "📷 Фото" if m.photo and rep else + (m.file.emoji+" Стикер") if m.sticker and rep else + "📹 Видеосообщение" if m.video_note and rep else + "📹 Видео" if m.video and rep else + "🖼 GIF" if m.gif else + "📊 Опрос" if m.poll else + "📍 Местоположение" if m.geo else + "👤 Контакт" if m.contact else + (f"🎵 Голосовое сообщение: {Dick.dur(m.voice.attributes[0].duration)}" if m.voice else + (f"🎧 Музыка: {Dick.dur(m.audio.attributes[0].duration)} | {m.audio.attributes[0].performer} - {m.audio.attributes[0].title}" if m.audio else + (f"💾 Файл: {m.file.name}" if isinstance(m.media, types.MessageMediaDocument) and not Dick.pick(m) else + (f"{m.media.emoticon} Кость: {m.media.value}" if isinstance(m.media, types.MessageMediaDice) else + (f"Сервисное сообщение: {m.action.to_dict().get('_')}" if isinstance(m, types.MessageService) else "")))))) #))) + + @staticmethod + def split(name: Optional[str]) -> Tuple[str,str]: + if not name: return "","" + p=name.split(); return (p[0], " ".join(p[1:]) if len(p)>1 else "") + + @staticmethod + def pick(m: Message): + if m and m.media: + return m.photo or m.sticker or m.video or m.video_note or m.gif or m.web_preview + return None + + @staticmethod + def wf(b: Optional[bytes]) -> List[int]: + if not b: return [] + n=(len(b)*8)//5 + if not n: return [] + out: List[int]=[] + last=n-1 + for i in range(last): + j=i*5; bi,sh=j//8,j%8 + v=int.from_bytes(b[bi:bi+2],"little") if bi+1>sh)&0b11111) + j=last*5; bi,sh=j//8,j%8 + v=int.from_bytes(b[bi:bi+2],"little") if bi+1>sh)&0b11111) + return out + + @staticmethod + async def img(b: bytes, circle: bool=False) -> Optional[str]: + try: + im=Image.open(io.BytesIO(b)) + if im.mode!="RGBA": im=im.convert("RGBA") + if circle: + size=min(im.size) + mask=Image.new("L",(size,size),0); ImageDraw.Draw(mask).ellipse((0,0,size,size),fill=255) + sq=Image.new("RGBA",(size,size),(0,0,0,0)) + off=((size-im.width)//2,(size-im.height)//2); sq.paste(im,off) + im=Image.composite(sq,Image.new("RGBA",(size,size),(0,0,0,0)),mask) + o=io.BytesIO(); im.save(o,format="PNG") + return f"data:image/png;base64,{base64.b64encode(o.getvalue()).decode()}" + except Exception: + return None + + @staticmethod + async def stc(b: bytes) -> Optional[str]: + try: + im=Image.open(io.BytesIO(b)) + if im.mode not in ("RGBA","LA"): im=im.convert("RGBA") + elif im.mode=="LA": im=im.convert("RGBA") + o=io.BytesIO(); im.save(o,format="PNG") + return f"data:image/png;base64,{base64.b64encode(o.getvalue()).decode()}" + except Exception: + return None + + @staticmethod + async def proc(cli, obj, m: Message) -> Optional[dict]: + try: + if m.voice: + for a in m.voice.attributes or []: + if getattr(a,"voice",False) and hasattr(a,"waveform"): + return {"voice":{"waveform":Dick.wf(a.waveform)}} + b: bytes = await cli.download_media(obj, bytes, thumb=-1) + if not b: return None + if m.sticker: + u=await Dick.stc(b); return {"url": u} if u else None + u=await Dick.img(b, circle=bool(m.video_note)) + return {"url": u} if u else None + except Exception: + return None + + @staticmethod + async def ava(cli, uid: int) -> Optional[str]: + try: + b=await cli.download_profile_photo(uid, bytes) + if b: return f"data:image/jpeg;base64,{base64.b64encode(b).decode()}" + except Exception: pass + return None + + @staticmethod + async def post(url: str, data: dict): + try: + return await utils.run_sync(requests.post, url, json=data, timeout=30) + except Exception: + return None + +@loader.tds +class Quotes(loader.Module): + """Модуль для создания цитат из сообщений""" + + strings = {"name": "yg_quotes", + "no_reply": "🏳️‍🌈 Нет реплая на сообщение", + "processing": "🏳️‍🌈 Обработка…", + "api_processing": "🏳️‍🌈 Ожидание ответа API…", + "api_error": "🏳️‍🌈 Ошибка API: {}", + "loading_media": "🏳️‍🌈 Отправка…", + "no_args_or_reply": "🏳️‍🌈 Нет аргументов или реплая", + "args_error": "🏳️‍🌈 Ошибка разбора аргументов. Запрос: {}", + "too_many_messages": "🏳️‍🌈 Слишком много сообщений. Максимум: {}"} + + def __init__(self): + self.config=loader.ModuleConfig( + loader.ConfigValue("type","quote", + lambda:"Тип цитаты", + validator=loader.validators.Choice(["quote", "stories"])), + loader.ConfigValue("bg_color","#162330", + lambda:"Цвет фона цитаты (например, #1a1a1a или red)"), + loader.ConfigValue("width",512, + lambda:"Ширина цитаты (px)", + validator=loader.validators.Integer(minimum=200,maximum=2000)), + loader.ConfigValue("height",768, + lambda:"Высота цитаты (px)", + validator=loader.validators.Integer(minimum=200,maximum=2000)), + loader.ConfigValue("scale",2, + lambda:"Масштаб рендера", + validator=loader.validators.Choice([1, 2, 3])), + loader.ConfigValue("emoji_brand","apple", + lambda:"Стиль эмодзи (apple, google, twitter и т.д.)"), + loader.ConfigValue("max_messages",15, + lambda:"Максимальное число сообщений в цитате", + validator=loader.validators.Integer(minimum=1,maximum=50)), + loader.ConfigValue("endpoint","https://kok.gay/gayotes/generate", + lambda:"URL API-эндпоинта (можешь поднять локально - github.com/yummy1gay/quote-api)", + validator=loader.validators.Link()), + loader.ConfigValue("use_rf_proxy", False, + lambda:'Включает прокси для РФ, если основной эндпоинт возвращает ошибку "Нетворк еррорь", и при этом сервер с юзерботом находится в России или ты сам сидишь в России с ограниченным доступом к зарубежным ресурсам (Termux / UserLAnd)', + validator=loader.validators.Boolean()), + loader.ConfigValue("rf_endpoint", "https://ru.kok.gay/gayotes/generate", + lambda:"URL API-эндпоинта для РФ", + validator=loader.validators.Link())) + + async def client_ready(self, client, db): + self.client=client; self.db=db + + async def qcmd(self, m: Message): + """ + Обычные цитаты: + • .q — процитировать одно сообщение из реплая + • .q 2 — процитировать 2 сообщения + • .q 3 #2d2d2d — 3 сообщения на тёмном фоне + • .q pink — фон по имени цвета + • .q !file — отправить как файл (PNG) + """ + try: + args=utils.get_args(m); rep=await m.get_reply_message() + if not rep: return await utils.answer(m,self.strings["no_reply"]) + st=await utils.answer(m,self.strings["processing"]) + doc="!file" in args + n=next((int(a) for a in args if a.isdigit() and int(a)>0),1) + bg=next((a for a in args if a!="!file" and not a.isdigit()), self.config["bg_color"]) + if n>self.config["max_messages"]: + return await utils.answer(st,self.strings["too_many_messages"].format(self.config["max_messages"])) + + js=await self.parse(m,n) + if not js: return await utils.answer(st,self.strings["api_error"].format("Не удалось собрать сообщения")) + + pay={"backgroundColor":bg,"width":self.config["width"],"height":self.config["height"], + "scale":self.config["scale"],"emojiBrand":self.config["emoji_brand"],"messages":js, + "format": "webp" if not doc else "png", "type": self.config["type"]} + + await utils.answer(st,self.strings["api_processing"]) + endpoint=self.config['rf_endpoint'] if self.config['use_rf_proxy'] else self.config['endpoint'] + r=await Dick.post(f"{endpoint}.webp",pay) + if not r or r.status_code!=200: + try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй включить use_rf_proxy в конфиге)" + except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй включить use_rf_proxy в конфиге)" + return await utils.answer(st,self.strings["api_error"].format(err)) + + buf=io.BytesIO(r.content); buf.name="YgQuote"+(".png" if doc else ".webp") + await utils.answer(st,buf,force_document=doc) + except Exception as e: + return await utils.answer(m,f"🏳️‍🌈 Ошибка: {e}") + + async def fqcmd(self, m: Message): + """ + Фейковые цитаты: + • .fq <@ или ID> <текст> — цитата от пользователя + • .fq <текст> — цитата от автора реплая + • .fq <@/ID> <текст> -r <@/ID> <текст> — с ответом + • .fq user1 текст; user2 текст — несколько сообщений + """ + try: + raw=utils.get_args_html(m); rep=await m.get_reply_message() + if not (raw or rep): return await utils.answer(m,self.strings["no_args_or_reply"]) + st= await utils.answer(m,self.strings["processing"]) + try: js=await self.fake(raw,rep) + except (IndexError,ValueError): return await utils.answer(st,self.strings["args_error"].format(m.text)) + if len(js)>self.config["max_messages"]: + return await utils.answer(st,self.strings["too_many_messages"].format(self.config["max_messages"])) + + dickk={"backgroundColor":self.config["bg_color"],"width":self.config["width"],"height":self.config["height"], + "scale":self.config["scale"],"emojiBrand":self.config["emoji_brand"],"messages":js, + "format": "webp","type":self.config["type"]} + + await utils.answer(st,self.strings["api_processing"]) + endpoint=self.config['rf_endpoint'] if self.config['use_rf_proxy'] else self.config['endpoint'] + r=await Dick.post(f"{endpoint}.webp",dickk) + if not r or r.status_code!=200: + try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй включить use_rf_proxy в конфиге)" + except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй включить use_rf_proxy в конфиге)" + return await utils.answer(st,self.strings["api_error"].format(err)) + + buf=io.BytesIO(r.content); buf.name="YgQuote.webp" + await utils.answer(st,buf) + except Exception as e: + return await utils.answer(m,f"🏳️‍🌈 Ошибка: {e}") + + async def parse(self, trg: Message, n: int) -> Optional[List[dict]]: + try: + rep= await trg.get_reply_message() + lst: List[Message]=[mm async for mm in self.client.iter_messages(trg.chat_id,limit=n,reverse=True,add_offset=1,offset_id=rep.id if rep else None)] + except Exception: + return None + + out: List[dict]=[] + for mm in lst: + try: + u=await self.who(mm) + if not u: continue + name=telethon.utils.get_display_name(u); f,l=Dick.split(name) + ava=await Dick.ava(self.client,getattr(u,"id",0)) if getattr(u,"id",None) else None + + rb=None + try: + r=await mm.get_reply_message() + if r: + ruser = await self.who(r) + rname=telethon.utils.get_display_name(ruser) + rtxt=Dick.desc(r,True) + if r.raw_text: rtxt=(rtxt+". "+r.raw_text) if rtxt else r.raw_text + rb={"name":rname,"text":rtxt or "","entities":Dick.ents(r.entities), + "chatId": r.sender.id if r.sender else mm.chat_id,"from":{"name":rname}} + except Exception: rb=None + + med=None; obj=Dick.pick(mm) + if obj: med=await Dick.proc(self.client,obj,mm) + + txt=mm.raw_text or ""; ad=Dick.desc(mm) + if ad: txt=f"{txt}\n\n{ad}" if txt else ad + + item={"from":{"id":getattr(u,"id", 0),"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l, + "username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}}, + "text":txt,"entities":Dick.ents(mm.entities),"avatar":True} + + try: + if mm.voice: + a = next((a for a in mm.voice.attributes or [] + if getattr(a, "voice", False) and hasattr(a, "waveform")), None) + if a: item["voice"] = {"waveform": Dick.wf(a.waveform)} + except Exception: pass + + if med: item["voice" if "voice" in med else "media"] = med.get("voice", med) + + es=getattr(u,"emoji_status",None) + if getattr(es,"document_id",None): item["from"]["emoji_status"]=str(es.document_id) + if rb: item["replyMessage"]=rb + out.append(item) + except Exception: continue + return out + + async def who(self, m: Message): + try: + if m.fwd_from: + if m.fwd_from.from_id: + pid=m.fwd_from.from_id + uid=pid.channel_id if isinstance(pid, types.PeerChannel) else pid.user_id + try: return await self.client.get_entity(uid) + except Exception: return m.sender + if m.fwd_from.from_name: + return types.User( + id=hash(m.fwd_from.from_name)%2147483647, first_name=m.fwd_from.from_name, + username=None, phone=None, bot=False, verified=False, restricted=False, + scam=False, fake=False, premium=False) + return m.sender + except Exception: + return m.sender + + async def fake(self, args: str, rep: Optional[Message]) -> List[dict]: + async def tok(ch: str): + p=ch.split() + if not p: return None,"" + who=p[0]; tx=ch.split(maxsplit=1)[1] if len(p)>1 else "" + try: + u=await self.client.get_entity(int(who) if who.isdigit() else who) + return u,tx + except Exception: + return None,tx + + if rep and not args: + u=rep.sender; name=telethon.utils.get_display_name(u); f,l=Dick.split(name) + ava=await Dick.ava(self.client,u.id) if getattr(u,"id",None) else None + msg={"from":{"id":u.id,"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l, + "username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}}, + "text":"","entities":[], "avatar":True} + es=getattr(u,"emoji_status",None) + if getattr(es,"document_id", None): msg["from"]["emoji_status"]=str(es.document_id) + return [msg] + + if rep and args: + u=rep.sender + return await self.fake(f"{getattr(u,'id','')} {args}", None) + + out: List[dict]=[] + for part in args.split("; "): + try: + rb=None + if " -r " in part: + a,b=part.split(" -r ",1); u1,t1=await tok(a); u2,t2=await tok(b) + else: + u1,t1=await tok(part); u2,t2=None,None + if not u1: continue + + txt1, ents1 = html.parse(t1) if t1 else ("", []) + + name=telethon.utils.get_display_name(u1); f,l=Dick.split(name) + ava=await Dick.ava(self.client,u1.id) + + if u2: + txt2, ents2 = html.parse(t2) if t2 else ("", []) + name2=telethon.utils.get_display_name(u2); ava2=await Dick.ava(self.client,u2.id) + rb={"name":name2,"text":txt2,"entities":Dick.ents(ents2),"chatId":u2.id,"from":{"name":name2,"photo":{"url":ava2} if ava2 else {}}} + + msg={"from":{"id":u1.id,"first_name":getattr(u1,"first_name","") or f,"last_name":getattr(u1,"last_name","") or l, + "username":getattr(u1,"username",None),"name":name,"photo":{"url":ava} if ava else {}}, + "text":txt1,"entities":Dick.ents(ents1), "avatar":True} + + es=getattr(u1,"emoji_status",None) + if getattr(es,"document_id",None): msg["from"]["emoji_status"]=str(es.document_id) + if rb: msg["replyMessage"]=rb + out.append(msg) + except Exception: continue return out \ No newline at end of file