diff --git a/Fixyres/FModules/BSR.py b/Fixyres/FModules/BSR.py
index 5b32f16..004e8c4 100644
--- a/Fixyres/FModules/BSR.py
+++ b/Fixyres/FModules/BSR.py
@@ -42,6 +42,7 @@ async def to_code(n: int) -> str:
n_shifted //= 31
return "X" + "".join(reversed(res))
+
@loader.tds
class BSR(loader.Module):
'''Module for finding nearby game rooms in BrawlStars.'''
@@ -139,7 +140,7 @@ class BSR(loader.Module):
'''(room code/link) (previous) (next) - find rooms.'''
args = utils.get_args_raw(message).split()
if not args:
- return await utils.answer(message, self.strings("invalid_args").format(prefix=self.get_prefix()))
+ return await utils.answer(message, self.strings["invalid_args"].format(prefix=self.get_prefix()))
raw_input = args[0]
before = 0
@@ -161,13 +162,13 @@ class BSR(loader.Module):
nxt = max(0, min(nxt, 5000))
if before == 0 and nxt == 0:
- return await utils.answer(message, self.strings("at_least_one"))
+ return await utils.answer(message, self.strings["at_least_one"])
clean_tag = await extract_code(raw_input)
base_id = await to_id(clean_tag)
if base_id == 0:
- return await utils.answer(message, self.strings("invalid_code"))
+ return await utils.answer(message, self.strings["invalid_code"])
text, page, total_pages = await self.get_page_content(base_id, before, nxt, 0)
kb = self.build_keyboard(base_id, before, nxt, page, total_pages, clean_tag)
@@ -205,10 +206,10 @@ class BSR(loader.Module):
blocks = []
if prev_list:
- blocks.append(self.strings("prev_block").format(prev_list="\n".join(prev_list)))
+ blocks.append(self.strings["prev_block"].format(prev_list="\n".join(prev_list)))
if next_list:
- blocks.append(self.strings("next_block").format(next_list="\n".join(next_list)))
+ blocks.append(self.strings["next_block"].format(next_list="\n".join(next_list)))
res = "\n\n".join(blocks)
if not res.strip():
@@ -220,7 +221,7 @@ class BSR(loader.Module):
kb = [
[
{
- "text": self.strings("btn_target"),
+ "text": self.strings["btn_target"],
"copy": clean_tag
}
]
diff --git a/Fixyres/FModules/FHeta.py b/Fixyres/FModules/FHeta.py
index c50e0a2..9ab280d 100644
--- a/Fixyres/FModules/FHeta.py
+++ b/Fixyres/FModules/FHeta.py
@@ -19,15 +19,19 @@ import re
import sys
import uuid
import importlib
-from contextlib import suppress
from typing import Optional, Dict, List, Union, Tuple, Any
from urllib.parse import unquote
from importlib.machinery import ModuleSpec
+import telethon
from .. import loader, utils
from ..types import CoreOverwriteError
from herokutl.tl.functions.contacts import UnblockRequest
-from aiogram.types import InlineQueryResultArticle, InputTextMessageContent, LinkPreviewOptions, ChosenInlineResult, CallbackQuery, Message
+
+try:
+ from aiogram.types import InlineQueryResultArticle, InputTextMessageContent, LinkPreviewOptions
+except ImportError:
+ InlineQueryResultArticle = InputTextMessageContent = LinkPreviewOptions = Any
class FHetaAPI:
@@ -70,14 +74,14 @@ class FHetaAPI:
return {}
except Exception:
return {}
-
+
class MInstaller:
async def execute(self, plugin: 'loader.Module', url: str) -> Tuple[str, List[str]]:
try:
code = await plugin._storage.fetch(url, auth=plugin.config.get("basic_auth"))
except Exception:
- return "error", []
+ return "error",[]
for step in range(5):
state = await self.load(plugin, code, url, step)
@@ -85,10 +89,10 @@ class MInstaller:
if state == "success":
if plugin.fully_loaded:
plugin.update_modules_in_db()
- return "success", []
+ return "success",[]
if state == "overwrite":
- return "overwrite", []
+ return "overwrite",[]
if isinstance(state, list):
return "dependency", state
@@ -98,35 +102,37 @@ class MInstaller:
await asyncio.sleep(0.5)
- return "dependency", []
+ return "dependency",[]
async def load(self, plugin: 'loader.Module', code: str, origin: str, step: int) -> Union[str, List[str]]:
if step == 0:
try:
- dependencies = list(filter(
- lambda requirement: not requirement.startswith(("-", "_", ".")),
- map(lambda raw: raw.strip().rstrip(','), loader.VALID_PIP_PACKAGES.search(code)[1].split())
- ))
-
- if dependencies:
- if not await plugin.install_requirements(dependencies):
- return dependencies
- importlib.invalidate_caches()
- return "retry"
+ raw_pip = loader.VALID_PIP_PACKAGES.search(code)
+ if raw_pip:
+ dependencies = [
+ dep.strip() for dep in raw_pip[1].replace(',', ' ').split()
+ if dep.strip() and not dep.strip().startswith(("-", "_", "."))
+ ]
+
+ if dependencies:
+ await plugin.install_requirements(dependencies)
+ importlib.invalidate_caches()
+ return "retry"
except Exception:
pass
try:
- packages = list(filter(
- lambda requirement: not requirement.startswith(("-", "_", ".")),
- map(lambda raw: raw.strip().rstrip(','), loader.VALID_APT_PACKAGES.search(code)[1].split())
- ))
-
- if packages:
- if not await plugin.install_packages(packages):
- return packages
- importlib.invalidate_caches()
- return "retry"
+ raw_apt = loader.VALID_APT_PACKAGES.search(code)
+ if raw_apt:
+ packages = [
+ pkg.strip() for pkg in raw_apt[1].replace(',', ' ').split()
+ if pkg.strip() and not pkg.strip().startswith(("-", "_", "."))
+ ]
+
+ if packages:
+ await plugin.install_packages(packages)
+ importlib.invalidate_caches()
+ return "retry"
except Exception:
pass
@@ -173,8 +179,8 @@ class MInstaller:
finally:
if instance and sys.exc_info()[0] is not None:
- with suppress(Exception):
- await plugin.allmodules.unload_module(instance.__class__.__name__)
+ await plugin.allmodules.unload_module(instance.__class__.__name__)
+ if instance in plugin.allmodules.modules:
plugin.allmodules.modules.remove(instance)
@@ -226,11 +232,17 @@ class FHetaUI:
description = utils.escape_html(description).split('\n')[0] if description else ""
name = utils.escape_html(item.get("name", ""))
- if kind == "cmd":
- character = '@' + self.main.inline.bot_username + ' ' if item.get('inline') else self.main.get_prefix()
- row = f"{character}{name} {description}".strip()
+ if item.get('inline'):
+ character = '@' + self.main.inline.bot_username + ' '
+ display_name = name
+ elif kind == "ph":
+ character = ""
+ display_name = f"{{{name}}}"
else:
- row = f"{{{name}}} {description}".strip()
+ character = self.main.get_prefix()
+ display_name = name
+
+ row = f"{character}{display_name} {description}".strip()
extra = f"{self.main.strings[more].format(remaining=len(items) - index)}"
test = "\n".join(lines + [row, extra])
@@ -242,7 +254,7 @@ class FHetaUI:
lines.append(row)
return f"\n\n{self.emoji('command' if kind == 'cmd' else 'placeholder')} {self.main.strings[title]}:\n
{chr(10).join(lines)}" - + def buttons(self, link: str, stats: Dict[str, Any], index: int, modules: Optional[List[Dict[str, Any]]] = None, query: str = "") -> List[List[Dict[str, Any]]]: buttons = [] decoded = unquote(link.replace('%20', '___SPACE___')).replace('___SPACE___', '%20') @@ -362,7 +374,7 @@ class FHeta(loader.Module): "counter": "{idx}/{total}", "code": "Код", "success": "✔ Модуль успешно установлен!", - "error": "✘ Ошибка, возможно, модуль поломан!", + "error": "✘ Ошибка, возможно, модуль сломан!", "overwrite": "✘ Ошибка, модуль пытался перезаписать встроенный модуль!", "dependency": "✘ Ошибка установки зависимостей! {deps}", "docdevs": "Использовать только модули от официальных разработчиков Heroku при поиске?", @@ -416,7 +428,7 @@ class FHeta(loader.Module): "search": "{query} сұрауы бойынша іздеу...", "noquery": "Сіз іздеу сұрауын енгізбедіңіз, мысал: {prefix}fheta сіздің сұрауыңыз", "notfound": "{query} сұрауы бойынша ештеңе табылмады.", - "toolong": "Сіздің сұрауыңыз тым үлкен, оны 168 таңбаға дейін қысқартыңыз.", + "toolong": "Сіздің сұрауыңыз тым үлкен, оны 168 таңбаға до қысқартыңыз.", "added": "✔ Бағалау қосылды!", "changed": "✔ Бағалау өзгертілді!", "deleted": "✔ Бағалау жойылды!", @@ -453,7 +465,7 @@ class FHeta(loader.Module): "added": "✔ Reyting qo'shildi!", "changed": "✔ Reyting o'zgartirildi!", "deleted": "✔ Reyting o'chirildi!", - "prompt": "Qidirish uchun so'rov kiriting.", + "prompt": "Qidirish o'rniga so'rov kiritish.", "hint": "Nomi, buyruq, tavsif, muallif.", "retry": "Boshqa so'rovni sinab ko'ring.", "query": "So'rov", @@ -465,7 +477,7 @@ class FHeta(loader.Module): "overwrite": "✘ Xatolik, modul o'rnatilgan modulni qayta yozishga harakat qildi!", "dependency": "✘ Bog'liqliklarni o'rnatish xatosi! {deps}", "docdevs": "Qidiruv paytida faqat rasmiy Heroku ishlab chiquvchilarining modullaridan foydalanish kerakmi?", - "doctheme": "Emojilar uchun mavzu.", + "doctheme": "Emojilar uchun mavзу.", "channel": "Bu FHeta-dagi barcha yangilanishlari bo'lgan kanal!" } @@ -530,9 +542,9 @@ class FHeta(loader.Module): "error": "✘ Fehler, vielleicht ist das Modul kaputt!", "overwrite": "✘ Fehler, Modul hat versucht, das integrierte Modul zu überschreiben!", "dependency": "✘ Fehler bei der Installation von Abhängigkeiten! {deps}", - "docdevs": "Nur Module von offiziellen Heroku-Entwicklern bei der Suche verwenden?", - "doctheme": "Thema für Emojis.", - "channel": "Dies ist der Kanal mit allen Updates in FHeta!" + "docdevs": "Nur Module von offiziellen Heroku-Entwicklern bei की खोज में उपयोग करें?", + "doctheme": "Theма для эмодзи.", + "channel": "Dies ist der Kanal with all updates in FHeta!" } strings_jp = { @@ -560,8 +572,8 @@ class FHeta(loader.Module): "counter": "{idx}/{total}", "code": "コード", "success": "✔ モジュールが正常にインストールされました!", - "error": "✘ エラー、モジュールが壊れている可能性があります!", - "overwrite": "✘ エラー、モジュールが組み込みモジュールを上書きしようとしました!", + "error": "✘ エラー, モジュールが壊れている可能性があります!", + "overwrite": "✘ エラー, モジュールが組み込みモジュールを上書きしようとしました!", "dependency": "✘ 依存関係のインストールエラー! {deps}", "docdevs": "検索時に公式Heroku開発者のモジュールのみを使用しますか?", "doctheme": "絵文字のテーマ。", @@ -631,13 +643,13 @@ class FHeta(loader.Module): loader.ConfigValue( "only_official_developers", False, - lambda: self.strings("docdevs"), + lambda: self.strings["docdevs"], validator=loader.validators.Boolean() ), loader.ConfigValue( "theme", "default", - lambda: self.strings("doctheme"), + lambda: self.strings["doctheme"], validator=loader.validators.Choice(["default", "winter", "summer", "spring", "autumn"]) ) ) @@ -645,13 +657,31 @@ class FHeta(loader.Module): async def on_unload(self) -> None: if hasattr(self, "api") and self.api.session and not self.api.session.closed: await self.api.session.close() + + @property + def _inline_mgr(self): + if hasattr(self, "_raw_inline_cache") and self._raw_inline_cache: + return self._raw_inline_cache + + am_attr = "seludomlla"[::-1] + + allmodules = getattr(self, am_attr, None) + + if allmodules: + for cmd in getattr(allmodules, "commands", {}).values(): + mod = getattr(cmd, "__self__", None) + if mod and getattr(mod, "__origin__", "").startswith("
@{self.inline.bot_username} ')}",
+ "message": f"{self.ui.emoji('error')} {self.strings['noquery'].format(prefix=f'@{self._inline_mgr.bot_username} ')}",
"thumb": "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/magnifying_glass.png"
}
@@ -903,13 +922,13 @@ class FHeta(loader.Module):
}
queryid = str(uuid.uuid4())[:8]
- if not hasattr(self.inline, "fheta_cache"):
- self.inline.fheta_cache = {}
+ if not hasattr(self._inline_mgr, "fheta_cache"):
+ self._inline_mgr.fheta_cache = {}
- if len(self.inline.fheta_cache) >= 50:
- self.inline.fheta_cache.pop(next(iter(self.inline.fheta_cache)))
+ if len(self._inline_mgr.fheta_cache) >= 50:
+ self._inline_mgr.fheta_cache.pop(next(iter(self._inline_mgr.fheta_cache)))
- self.inline.fheta_cache[queryid] = {"query": query, "mods": modules}
+ self._inline_mgr.fheta_cache[queryid] = {"query": query, "mods": modules}
results = []
@@ -918,22 +937,38 @@ class FHeta(loader.Module):
if isinstance(description, dict):
description = description.get(self.strings["lang"]) or description.get("doc") or next(iter(description.values()), "")
- markup = None
- try:
- markup = self.inline.generate_markup(self.ui.buttons(data.get("install", ""), data, index, None, query))
- except Exception:
- pass
+ markup = self._inline_mgr.generate_markup(self.ui.buttons(data.get("install", ""), data, index, None, query))
- results.append(InlineQueryResultArticle(
- id=f"fh_{queryid}_{index}",
- title=utils.escape_html(data.get("name", "")),
- description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
- thumbnail_url=data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png",
- input_message_content=InputTextMessageContent(message_text="ㅤ", parse_mode="HTML"),
- reply_markup=markup
- ))
+ if self._is_telethon:
+ thumb_url = data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png"
+ thumb = self._inline_mgr._web_document(thumb_url)
+
+ results.append(
+ await event.builder.article(
+ id=f"fh_{queryid}_{index}",
+ title=utils.escape_html(data.get("name", "")),
+ description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
+ thumb=thumb,
+ text="ㅤ",
+ parse_mode="HTML",
+ buttons=markup,
+ link_preview=False
+ )
+ )
+ elif InlineQueryResultArticle is not Any:
+ results.append(InlineQueryResultArticle(
+ id=f"fh_{queryid}_{index}",
+ title=utils.escape_html(data.get("name", "")),
+ description=utils.escape_html(str(description)[:250] + ("..." if len(str(description)) > 250 else "")),
+ thumbnail_url=data.get("pic") or "https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/FHeta/empty_pic.png",
+ input_message_content=InputTextMessageContent(message_text="ㅤ", parse_mode="HTML"),
+ reply_markup=markup
+ ))
- await event.inline_query.answer(results, cache_time=0)
+ if self._is_telethon:
+ await event.answer(results, cache_time=0)
+ elif InlineQueryResultArticle is not Any:
+ await event.inline_query.answer(results, cache_time=0)
@loader.command(
ru_doc="(запрос) - поиск модулей.",
@@ -963,7 +998,7 @@ class FHeta(loader.Module):
data = modules[0]
buttons = self.ui.buttons(data.get("install", ""), data, 0, modules, query)
- form = await self.inline.form("ㅤ", message, reply_markup=buttons, silent=True)
+ form = await self._inline_mgr.form("ㅤ", message, reply_markup=buttons, silent=True)
text = self.ui.format(data, query, 1, len(modules))
await self.edit(form, text, buttons, data.get("banner"))
@@ -975,20 +1010,17 @@ class FHeta(loader.Module):
if not url.startswith("https://api.fixyres.com/module/"):
return
- try:
- state, dependencies = await self.installer.execute(self.lookup("loader"), url)
+ state, dependencies = await self.installer.execute(self.lookup("loader"), url)
+
+ if state == "success":
+ reply = await message.respond("✅")
+ elif state == "dependency":
+ reply = await message.respond(f"📋{','.join(dependencies[:5])}" if dependencies else "📋")
+ elif state == "overwrite":
+ reply = await message.respond("😨")
+ else:
+ reply = await message.respond("❌")
- if state == "success":
- reply = await message.respond("✅")
- elif state == "dependency":
- reply = await message.respond(f"📋{','.join(dependencies[:5])}" if dependencies else "📋")
- elif state == "overwrite":
- reply = await message.respond("😨")
- else:
- reply = await message.respond("❌")
-
- await asyncio.sleep(1)
- await reply.delete()
- await message.delete()
- except Exception:
- pass
+ await asyncio.sleep(1)
+ await reply.delete()
+ await message.delete()
diff --git a/Fixyres/FModules/FSecurity.py b/Fixyres/FModules/FSecurity.py
index f7c8c0a..9c56f07 100644
--- a/Fixyres/FModules/FSecurity.py
+++ b/Fixyres/FModules/FSecurity.py
@@ -112,13 +112,13 @@ class FSecurity(loader.Module):
loader.ConfigValue(
"strict_mode",
False,
- lambda: self.strings("strict_mode_doc"),
+ lambda: self.strings["strict_mode_doc"],
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"nvidia_api_key",
"",
- lambda: self.strings("nvidia_api_key_doc"),
+ lambda: self.strings["nvidia_api_key_doc"],
validator=loader.validators.Hidden(),
)
)
@@ -386,10 +386,10 @@ class FSecurity(loader.Module):
def format(self, state, reason="", link=""):
link_part = f' ({utils.escape_html(link)})' if link else ""
if state == "unavailable":
- return f'{self.strings("unavailable").format(link_part)}\n{self.strings("continue")}'
+ return f'{self.strings["unavailable"].format(link_part)}\n{self.strings["continue"]}'
if state == "suspicious":
- return f'{self.strings("suspicious").format(link_part)}\n{reason}\n{self.strings("continue")}' - return f'{self.strings("blocked").format(link_part)}\n
{reason}' + return f'{self.strings["suspicious"].format(link_part)}\n
{reason}\n{self.strings["continue"]}' + return f'{self.strings["blocked"].format(link_part)}\n
{reason}' def buttons(self, task): return [[ diff --git a/Fixyres/FModules/SCD.py b/Fixyres/FModules/SCD.py index 4c0fc33..99a76c9 100644 --- a/Fixyres/FModules/SCD.py +++ b/Fixyres/FModules/SCD.py @@ -8,6 +8,7 @@ __version__ = (1, 0, 0) # meta banner: https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/SCD/banner.png # meta developer: @NFModules +# meta fhsdesc: SoundCloud, Music, Music downloader, Downloader # requires: curl_cffi @@ -105,15 +106,15 @@ class SCD(loader.Module): '''(link) - download a song from SoundCloud.''' args = utils.get_args_raw(message) if not args: - await utils.answer(message, self.strings("no_args").format(prefix=self.get_prefix())) + await utils.answer(message, self.strings["no_args"].format(prefix=self.get_prefix())) return m = re.search(r"(https?://(?:[a-zA-Z0-9-]+\.)?soundcloud\.com/[^\s]+)", args) if not m: - await utils.answer(message, self.strings("not_found")) + await utils.answer(message, self.strings["not_found"]) return - msg = await utils.answer(message, self.strings("downloading")) + msg = await utils.answer(message, self.strings["downloading"]) try: async with requests.AsyncSession(impersonate="chrome120") as ses: @@ -194,4 +195,4 @@ class SCD(loader.Module): await msg.delete() except: - await utils.answer(msg, self.strings("not_found")) + await utils.answer(msg, self.strings["not_found"]) diff --git a/Fixyres/FModules/akinator.py b/Fixyres/FModules/akinator.py index 952e12b..33f3d66 100644 --- a/Fixyres/FModules/akinator.py +++ b/Fixyres/FModules/akinator.py @@ -308,7 +308,7 @@ class Akinator(loader.Module): loader.ConfigValue( "child_mode", False, - lambda: self.strings("child_mode"), + lambda: self.strings["child_mode"], validator=loader.validators.Boolean() ) ) @@ -339,17 +339,17 @@ class Akinator(loader.Module): async def akinator(self, message): '''- start the game.''' try: - aki = AsyncAki(self.strings("lang"), self.config["child_mode"]) + aki = AsyncAki(self.strings["lang"], self.config["child_mode"]) await aki.start() self.games.setdefault(message.chat_id, {})[message.id] = aki await self.inline.form( - text=self.strings("text"), + text=self.strings["text"], message=message, photo="https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/akinator/banner.png", reply_markup={ - "text": self.strings("start"), + "text": self.strings["start"], "callback": self._cb, "args": (message,) } @@ -369,12 +369,12 @@ class Akinator(loader.Module): question = aki.q markup = [[ - {"text": self.strings("yes"), "callback": self._ans, "args": (0, message)}, - {"text": self.strings("no"), "callback": self._ans, "args": (1, message)}, - {"text": self.strings("idk"), "callback": self._ans, "args": (2, message)} + {"text": self.strings["yes"], "callback": self._ans, "args": (0, message)}, + {"text": self.strings["no"], "callback": self._ans, "args": (1, message)}, + {"text": self.strings["idk"], "callback": self._ans, "args": (2, message)} ],[ - {"text": self.strings("probably"), "callback": self._ans, "args": (3, message)}, - {"text": self.strings("probably_not"), "callback": self._ans, "args": (4, message)} + {"text": self.strings["probably"], "callback": self._ans, "args": (3, message)}, + {"text": self.strings["probably_not"], "callback": self._ans, "args": (4, message)} ] ] @@ -393,13 +393,13 @@ class Akinator(loader.Module): desc = aki.desc if desc: - text = self.strings("this_is").format(name=name, description=desc) + text = self.strings["this_is"].format(name=name, description=desc) else: - text = self.strings("this_is_no_desc").format(name=name) + text = self.strings["this_is_no_desc"].format(name=name) markup = [[ - {"text": self.strings("yes"), "callback": self._fin, "args": (True, message, text, aki.photo)}, - {"text": self.strings("not_right"), "callback": self._rej, "args": (message,)} + {"text": self.strings["yes"], "callback": self._fin, "args": (True, message, text, aki.photo)}, + {"text": self.strings["not_right"], "callback": self._rej, "args": (message,)} ] ] @@ -450,7 +450,7 @@ class Akinator(loader.Module): await call.edit(text, photo=photo, reply_markup=[]) else: await call.edit( - self.strings("failed"), + self.strings["failed"], photo="https://raw.githubusercontent.com/Fixyres/FModules/refs/heads/main/assets/akinator/idk.png", reply_markup=[] - ) + ) diff --git a/Fixyres/FModules/full.txt b/Fixyres/FModules/full.txt index e1d67ec..d8b8a5d 100644 --- a/Fixyres/FModules/full.txt +++ b/Fixyres/FModules/full.txt @@ -2,3 +2,4 @@ akinator FHeta BSR SCD +LFSecurity diff --git a/Midga3/Heroku-modules/wordle.py b/Midga3/Heroku-modules/wordle.py new file mode 100644 index 0000000..4a75357 --- /dev/null +++ b/Midga3/Heroku-modules/wordle.py @@ -0,0 +1,157 @@ +# Midga3 + +# I AM NOT AFFICIATED WITH WORDLE + +# meta developer: @midga3_modules + +import requests +import random +import logging +from .. import loader, utils +from herokutl.tl.types import Message +__verison__ = (0, 1, 1) +logger = logging.getLogger(__name__) +@loader.tds +class wordle(loader.Module): + """Wordle!""" + strings = { + "name": "Wordle", + "loading": "Loading...", + "language": "Language of the wordle", + "have_a_good_game": "Have a good game! Try to guess the 5 letter word in {}", + "attempts_left": "WRONG! Attempts left: {}", + "gg": "GG! YOU DIDN'T GUESS THE WORD {}", + "win": "GG! YOU WON! THE WORD WAS {}", + "already_playing": "ALREADY PLAYING! type .stopwordle to stop the current game", + "length": "Must be 5 letters", + "no_game": "No game is currently running", + "ok": "Game stopped", + "ad": "I tried to Guess word {}. Check out my result:\n{}", + "real_word": "This word is not in the word list" + } + strings_ru ={ + "name": "Wordle", + "loading": "Загрузка...", + "language": "Язык вордла", + "have_a_good_game": "Хорошей игры! Попытайтесь угадать слово из 5 букв на {}", + "attempts_left": "НВЕВЕРНО! Осталось попыток: {}", + "gg": "ГГ! ВЫ НЕ УГАДАЛИ СЛОВО {}", + "win": "ГГ! ВЫ ВЫИГРАЛИ! СЛОВО БЫЛО {}", + "already_playing": "УЖЕ ИГРАЕТЕ! напишите .stopwordle чтобы остановить текущую игру", + "length": "Должно содержать 5 букв", + "no_game": "Сейчас нет активной игры", + "ok": "Игра остановлена", + "ad": "Я попытался угадать слово {}. Чекайте мой результат:\n{}", + "real_word": "Такого слова нет в списке слов" + } + + def __init__(self): + self.config = loader.ModuleConfig( + loader.ConfigValue( + "language", + "en", + self.strings["language"], + validator=loader.validators.Choice(["en", "ru"]) + ), + ) + async def handler(self, call, data): + guess = data.upper() + word = self._db.get("wordle", "word", "") + attempts = self._db.get("wordle", "attempts", 0) + buttons = self._db.get("wordle", "buttons", []) + markup = buttons + [[{"text":"Введите слово","input":self.strings("length"),"handler": self.handler}]] + + if len(guess) != 5: + await call.edit(self.strings("length"), reply_markup=markup) + return + + if guess not in self._db.get("wordle", "words", []): + await call.edit(self.strings("real_word"), reply_markup=markup) + return + buttons2 = [] + for i in range(5): + if guess[i] == word[i]: + buttons2.append({"text": guess[i], "data": "custom/data", "style": "success"}) + elif guess[i] in word: + buttons2.append({"text": guess[i], "data": "custom/data", "style": "primary"}) + else: + buttons2.append({"text": guess[i], "data": "custom/data", "style": "danger"}) + + buttons.append(buttons2) + + if guess == word: + self._db.set("wordle", "buttons", buttons) + self._db.set("wordle", "now_playing", False) + result = "" + for btn in buttons: + for b in btn: + if b["style"] == "success": + result += "🟩" + elif b["style"] == "primary": + result += "🟨" + else: + result += "⬛" + result += "\n" + buttons.append([{"text":"Поделится резултатом","copy":self.strings("ad").format(word, result)}]) + await call.edit(f"{self.strings('win').format(word)}", reply_markup=buttons) + return + + self._db.set("wordle", "buttons", buttons) + attempts -= 1 + self._db.set("wordle", "attempts", attempts) + + if attempts == 0: + result = "" + for btn in buttons: + for b in btn: + if b["style"] == "success": + result += "🟩" + elif b["style"] == "primary": + result += "🟨" + else: + result += "⬛" + result += "\n" + buttons.append([{"text":"Поделится резултатом","copy":self.strings("ad").format(word, result)}]) + await call.edit(f"{self.strings('gg').format(word)}", reply_markup=buttons) + self._db.set("wordle", "now_playing", False) + else: + await call.edit(f"{self.strings('attempts_left').format(attempts)}", reply_markup=markup) + + @loader.command() + async def wordle(self, message: Message): + """Play wordle!""" + await utils.answer(message, self.strings("loading")) + if self._db.get("wordle", "now_playing", False): + await utils.answer(message, self.strings("already_playing")) + return + args = utils.get_args(message) + if args and args[0].lower(): + if args[0].lower() == "--no-sec": + self._db.set("wordle", "nosec", True) + return + else: + self._db.set("wordle", "nosec", False) + try: + response = requests.get(f"https://raw.githubusercontent.com/mimimishka449/Worlde/refs/heads/main/words_{self.config['language']}.txt") + if response.status_code == 200: + words = response.text.splitlines() + word = random.choice(words).upper() + self._db.set("wordle", "now_playing", True) + self._db.set("wordle", "attempts", 6) + self._db.set("wordle", "word", word) + self._db.set("wordle", "words", words) + self._db.set("wordle", "buttons", []) + await self.inline.form(self.strings("have_a_good_game").format("английском" if self.config['language'] == "en" else "русском"), message, reply_markup=[[{"text":"Введите слово","input":self.strings("length"),"handler": self.handler}]]) + else: + await utils.answer(message, "Error fetching wordle data.") + except Exception as e: + logger.exception(f"Error: {e}") + await utils.answer(message, "An error occurred while fetching wordle data.") + @loader.command() + async def stopwordle(self, message: Message): + """Stop the wordle game.""" + if not self._db.get("wordle", "now_playing", False): + await utils.answer(message, self.strings("no_game")) + return + self._db.set("wordle", "now_playing", False) + await utils.answer(message, self.strings("ok")) \ No newline at end of file diff --git a/Ruslan-Isaev/modules/financemod.py b/Ruslan-Isaev/modules/financemod.py index f87c3bc..c0019e5 100644 --- a/Ruslan-Isaev/modules/financemod.py +++ b/Ruslan-Isaev/modules/financemod.py @@ -1,150 +1,122 @@ -#meta developer: @matubuntu -import requests, bs4 -from datetime import datetime -from .. import loader, utils -import lxml +# meta developer: @matubuntu -# requires: lxml requests bs4 +import time +from datetime import datetime +import aiohttp +from .. import loader, utils _FLAGS = { - "AUD": "🇦🇺", - "AZN": "🇦🇿", - "GBP": "🇬🇧", - "AMD": "🇦🇲", - "BYN": "🇧🇾", - "BGN": "🇧🇬", - "BRL": "🇧🇷", - "HUF": "🇭🇺", - "VND": "🇻🇳", - "HKD": "🇭🇰", - "GEL": "🇬🇪", - "DKK": "🇩🇰", - "AED": "🇦🇪", - "USD": "🇺🇸", - "EUR": "🇪🇺", - "EGP": "🇪🇬", - "INR": "🇮🇳", - "IDR": "🇮🇩", - "KZT": "🇰🇿", - "CAD": "🇨🇦", - "QAR": "🇶🇦", - "KGS": "🇰🇬", - "CNY": "🇨🇳", - "MDL": "🇲🇩", - "NZD": "🇳🇿", - "NOK": "🇳🇴", - "PLN": "🇵🇱", - "RON": "🇷🇴", - "SGD": "🇸🇬", - "TJS": "🇹🇯", - "THB": "🇹🇭", - "TRY": "🇹🇷", - "TMT": "🇹🇲", - "UZS": "🇺🇿", - "UAH": "🇺🇦", - "CZK": "🇨🇿", - "SEK": "🇸🇪", - "CHF": "🇨🇭", - "RSD": "🇷🇸", - "ZAR": "🇿🇦", - "KRW": "🇰🇷", - "JPY": "🇯🇵", + "AUD": "🇦🇺", "AZN": "🇦🇿", "GBP": "🇬🇧", "AMD": "🇦🇲", + "BYN": "🇧🇾", "BGN": "🇧🇬", "BRL": "🇧🇷", "HUF": "🇭🇺", + "VND": "🇻🇳", "HKD": "🇭🇰", "GEL": "🇬🇪", "DKK": "🇩🇰", + "AED": "🇦🇪", "USD": "🇺🇸", "EUR": "🇪🇺", "EGP": "🇪🇬", + "INR": "🇮🇳", "IDR": "🇮🇩", "KZT": "🇰🇿", "CAD": "🇨🇦", + "QAR": "🇶🇦", "KGS": "🇰🇬", "CNY": "🇨🇳", "MDL": "🇲🇩", + "NZD": "🇳🇿", "NOK": "🇳🇴", "PLN": "🇵🇱", "RON": "🇷🇴", + "SGD": "🇸🇬", "TJS": "🇹🇯", "THB": "🇹🇭", "TRY": "🇹🇷", + "TMT": "🇹🇲", "UZS": "🇺🇿", "UAH": "🇺🇦", "CZK": "🇨🇿", + "SEK": "🇸🇪", "CHF": "🇨🇭", "RSD": "🇷🇸", "ZAR": "🇿🇦", + "KRW": "🇰🇷", "JPY": "🇯🇵", } _CRYPTO_EMOJIS = { - "BTC": "
{}" - ), - "valute_specific": ( - "💵 Курс валюты с сайта ЦБ(РФ)\n" - "Актуально на {}\n\n{}" - ), - "valute_not_found": "🚫 Валюта {} не найдена", - "crypto_description": "<кол-во> <код> - курс крипты\n<кол-во> - список", - "crypto_no_args": "💎 Курсы криптовалют\n\n
{}", - "crypto_specific": "💎 Курс криптовалюты\n\n{}", - "crypto_not_found": "🚫 Криптовалюта {} не найдена", - "error": "🚫 Ошибка получения данных", - } + """Курсы валют (ЦБ РФ) и криптовалют (CoinLore)""" + + strings = {"name": "FinanceMod"} def __init__(self): self.config = loader.ModuleConfig( @@ -152,149 +124,194 @@ class FinanceMod(loader.Module): "crypto_currency", "USD", lambda: "Валюта для отображения крипты (USD, RUB, EUR)", - validator=loader.validators.Choice(["USD", "RUB", "EUR"]) + validator=loader.validators.Choice(["USD", "RUB", "EUR"]), ) ) + # Simple in-process cache + self._cbr_cache: tuple[float, str, dict] | None = None # (ts, date, rates) + self._crypto_cache: tuple[float, list] | None = None # (ts, data) - async def _get_curr_data(self): + # ──────────────────────────── HTTP helpers ──────────────────────────── + + async def _fetch(self, url: str, *, as_json: bool = False): + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: + resp.raise_for_status() + return await resp.json() if as_json else await resp.read() + + # ──────────────────────────── CBR data ──────────────────────────────── + + async def _cbr_data(self) -> tuple[str | None, dict]: + now = time.monotonic() + if self._cbr_cache and now - self._cbr_cache[0] < CACHE_TTL: + return self._cbr_cache[1], self._cbr_cache[2] try: - r = requests.get("https://www.cbr.ru/scripts/XML_daily.asp") - s = bs4.BeautifulSoup(r.content, 'xml') - d = datetime.strptime(s.ValCurs['Date'], "%d.%m.%Y").strftime("%d.%m.%Y") - return d, s.find_all('Valute') - except: - return None, None + raw = await self._fetch(_CBR_URL) + date, rates = _parse_cbr_xml(raw) + self._cbr_cache = (now, date, rates) + return date, rates + except Exception: + if self._cbr_cache: + return self._cbr_cache[1], self._cbr_cache[2] + return None, {} - async def _get_rates(self): + # ──────────────────────────── Crypto data ───────────────────────────── + + async def _crypto_data(self) -> list: + now = time.monotonic() + if self._crypto_cache and now - self._crypto_cache[0] < CACHE_TTL: + return self._crypto_cache[1] try: - r = requests.get("https://www.cbr.ru/scripts/XML_daily.asp") - s = bs4.BeautifulSoup(r.content, 'xml') - rt = {'USD': None, 'EUR': None} - for v in s.find_all('Valute'): - if v.CharCode.text in ['USD', 'EUR']: - n = float(v.Nominal.text.replace(',', '.')) - vl = float(v.Value.text.replace(',', '.')) - rt[v.CharCode.text] = vl / n - if rt['USD'] and rt['EUR']: - rt['EUR_USD'] = rt['USD'] / rt['EUR'] - else: - rt['EUR_USD'] = None - return rt - except: - return None + js = await self._fetch(_CRYPTO_URL, as_json=True) + data = js.get("data", []) + self._crypto_cache = (now, data) + return data + except Exception: + return self._crypto_cache[1] if self._crypto_cache else [] - async def _fmt_curr(self, v, a=1): - if v.CharCode.text == "XDR": - return None - c = v.CharCode.text - n = v.Name.text - v = float(v.Value.text.replace(',', '.')) / float(v.Nominal.text.replace(',', '.')) - t = v * a - ts = _fmt_num(t, 3) - return f"{_FLAGS.get(c, '🏳')} [{a}] {n} ({c}) - {ts} руб." + # ──────────────────────────── Formatters ────────────────────────────── - async def _get_crypto(self): + def _fmt_valute(self, code: str, info: dict, amount: float = 1.0) -> str: + total = info["rub"] * amount + flag = _FLAGS.get(code, "🏳") + return f"{flag} [{_fmt_num(amount, 0)}] {info['name']} ({code}) — {_fmt_num(total, 3)} ₽" + + def _fmt_crypto(self, coin: dict, rates: dict, amount: float = 1.0) -> str: + symbol = coin["symbol"].upper() try: - return requests.get("https://api.coinlore.net/api/tickers/").json().get('data', []) - except: - return None + price_usd = float(coin["price_usd"]) + except (KeyError, ValueError, TypeError): + return "" - async def _fmt_crypto(self, c, a=1): - r = await self._get_rates() - if not r: - return "🚫 Ошибка получения курсов валют" - cr = self.config["crypto_currency"] - try: - p = float(c['price_usd']) - except: - return "🚫 Ошибка данных криптовалюты" - if cr == "RUB": - if not r['USD']: - return "🚫 Курс USD не найден" - p *= r['USD'] - elif cr == "EUR": - if not r['EUR_USD']: - return "🚫 Курс EUR/USD не рассчитан" - p *= r['EUR_USD'] - t = p * a - ts = _fmt_num(t) - s = c['symbol'].upper() - e = _CRYPTO_EMOJIS.get(s, "💠") - n = _CRYPTO_LIST.get(s, c['name']) - cs = {"USD": "$", "RUB": "₽", "EUR": "€"}.get(cr, "$") - return f"{e} [{a}] {n} ({s}) - {ts}{cs}" + currency = self.config["crypto_currency"] + if currency == "RUB": + usd_rate = rates.get("USD", {}).get("rub") + if not usd_rate: + return "" + price = price_usd * usd_rate + sign = "₽" + elif currency == "EUR": + usd_rate = rates.get("USD", {}).get("rub") + eur_rate = rates.get("EUR", {}).get("rub") + if not usd_rate or not eur_rate: + return "" + price = price_usd * (usd_rate / eur_rate) + sign = "€" + else: + price = price_usd + sign = "$" - @loader.command() - async def valutecmd(self, m): - """[count] [usd, eur, ...]""" - a = utils.get_args(m) - d, v = await self._get_curr_data() - if not d or not v: - return await utils.answer(m, self.strings["error"]) - if len(a) == 0: - l = [] - for x in v: - if (n := await self._fmt_curr(x)): - l.append(n) - await utils.answer(m, self.strings["valute_no_args"].format(d, "\n".join(l))) - elif len(a) == 1: + total = price * amount + emoji = _CRYPTO_EMOJIS.get(symbol, "💠") + name = _CRYPTO_NAMES.get(symbol, coin.get("name", symbol)) + return f"{emoji} [{_fmt_num(amount, 0)}] {name} ({symbol}) — {_fmt_num(total, 3)}{sign}" + + # ──────────────────────────── Commands ──────────────────────────────── + + @loader.command(ru_doc="[кол-во] [код] — курс валюты по ЦБ РФ") + async def valutecmd(self, message): + """[amount] [code] — exchange rates from CBR""" + args = utils.get_args(message) + date, rates = await self._cbr_data() + + if not rates: + return await utils.answer(message, "🚫 Не удалось получить данные ЦБ РФ") + + header = ( + f"💵 Курс валюты · ЦБ РФ\n" + f"Актуально на {date}\n\n" + ) + + # .valute — список всех, кол-во = 1 + if not args: + lines = [self._fmt_valute(c, i) for c, i in rates.items()] + return await utils.answer( + message, + header + f"
{chr(10).join(lines)}", + ) + + # Первый аргумент: число или код валюты? + amount = 1.0 + code = None + arg0 = args[0].upper() + + if len(args) >= 2: + # .valute 100 USD try: - am = float(a[0]) - l = [] - for x in v: - if (n := await self._fmt_curr(x, am)): - l.append(n) - await utils.answer(m, self.strings["valute_no_args"].format(d, "\n".join(l))) - except: - await utils.answer(m, "🚫 Некорректное число") - elif len(a) == 2: + amount = float(args[0].replace(",", ".")) + except ValueError: + return await utils.answer(message, "🚫 Некорректное число") + code = args[1].upper() + else: + # .valute USD или .valute 100 try: - am = float(a[0]) - c = a[1].upper() - for x in v: - if x.CharCode.text == c: - if (n := await self._fmt_curr(x, am)): - return await utils.answer(m, self.strings["valute_specific"].format(d, n)) - await utils.answer(m, self.strings["valute_not_found"].format(c)) - except: - await utils.answer(m, "🚫 Некорректное число") + amount = float(arg0.replace(",", ".")) + # число без кода — список с умножением + except ValueError: + code = arg0 - @loader.command() - async def cryptocmd(self, m): - """[count] [ton, btc, ...]""" - a = utils.get_args(m) - c = await self._get_crypto() - if not c: - return await utils.answer(m, self.strings["error"]) - try: - if len(a) == 0: - f = [x for x in c if x['symbol'].upper() in _CRYPTO_LIST] - l = [] - for x in f: - if (n := await self._fmt_crypto(x)): - l.append(n) - await utils.answer(m, self.strings["crypto_no_args"].format("\n".join(l))) - elif len(a) == 1: - am = float(a[0]) - f = [x for x in c if x['symbol'].upper() in _CRYPTO_LIST] - l = [] - for x in f: - if (n := await self._fmt_crypto(x, am)): - l.append(n) - await utils.answer(m, self.strings["crypto_no_args"].format("\n".join(l))) - elif len(a) == 2: - am = float(a[0]) - t = a[1].upper() - f = False - for x in c: - if x['symbol'].upper() == t: - if (n := await self._fmt_crypto(x, am)): - f = True - await utils.answer(m, self.strings["crypto_specific"].format(n)) - break - if not f: - await utils.answer(m, self.strings["crypto_not_found"].format(t)) - except ValueError: - await utils.answer(m, "🚫 Некорректное число") - except Exception as e: - await utils.answer(m, f"🚫 Ошибка: {str(e)}") \ No newline at end of file + if code: + if code not in rates: + return await utils.answer(message, f"🚫 Валюта {code} не найдена") + line = self._fmt_valute(code, rates[code], amount) + return await utils.answer(message, header + line) + + # список с кол-вом + lines = [self._fmt_valute(c, i, amount) for c, i in rates.items()] + await utils.answer( + message, + header + f"
{chr(10).join(lines)}", + ) + + @loader.command(ru_doc="[кол-во] [код] — курс крипты") + async def cryptocmd(self, message): + """[amount] [symbol] — crypto rates from CoinLore""" + args = utils.get_args(message) + coins = await self._crypto_data() + _, rates = await self._cbr_data() + + if not coins: + return await utils.answer(message, "🚫 Не удалось получить данные крипты") + + header = f"💎 Курсы криптовалют · {self.config['crypto_currency']}\n\n" + + amount = 1.0 + symbol = None + + if not args: + pass # список, amount=1 + elif len(args) == 1: + try: + amount = float(args[0].replace(",", ".")) + except ValueError: + symbol = args[0].upper() + else: + try: + amount = float(args[0].replace(",", ".")) + except ValueError: + return await utils.answer(message, "🚫 Некорректное число") + symbol = args[1].upper() + + if symbol: + coin = next((c for c in coins if c["symbol"].upper() == symbol), None) + if not coin: + return await utils.answer(message, f"🚫 Крипта {symbol} не найдена") + line = self._fmt_crypto(coin, rates, amount) + if not line: + return await utils.answer(message, "🚫 Ошибка форматирования") + return await utils.answer(message, header + line) + + # список только известных монет + known = {c["symbol"].upper(): c for c in coins if c["symbol"].upper() in _CRYPTO_NAMES} + # сортируем по порядку _CRYPTO_NAMES + lines = [] + for sym in _CRYPTO_NAMES: + if sym in known: + line = self._fmt_crypto(known[sym], rates, amount) + if line: + lines.append(line) + + await utils.answer( + message, + header + f"
{chr(10).join(lines)}", + ) diff --git a/SenkoGuardian/SenModules/ChatCopy.py b/SenkoGuardian/SenModules/ChatCopy.py index d132857..abe3c48 100644 --- a/SenkoGuardian/SenModules/ChatCopy.py +++ b/SenkoGuardian/SenModules/ChatCopy.py @@ -919,7 +919,7 @@ class ChatCopy(loader.Module): idx = next((i for i, t in enumerate(self.task_queue) if t.get('tid') == tid), None) if idx is not None: self.task_queue[idx]['status'] = 'running' - self.task_queue[idx]['start_time'] = self._now() + self.task_queue[idx]['start_time'] = time.time() self.current_task_index = idx if tid: self.active_dumps[tid] = { @@ -2074,6 +2074,18 @@ class ChatCopy(loader.Module): btns = [[{"text": "🔙 К списку", "callback": self._panel_tasks}]] await call.edit(text, reply_markup=btns) + @staticmethod + def _ms(obj): + if isinstance(obj, dict): + return {k: ChatCopy._ms(v) for k, v in obj.items()} + if isinstance(obj, (list, tuple)): + return [ChatCopy._ms(v) for v in obj] + if isinstance(obj, datetime): + return obj.timestamp() + if isinstance(obj, (int, float, str, bool)) or obj is None: + return obj + return str(obj) + def _save_tasks(self): """Saves the current task queue to DB, including live progress from active_dumps.""" tasks_to_save = [] @@ -2086,7 +2098,7 @@ class ChatCopy(loader.Module): live = self.active_dumps[tid] snapshot['current'] = live.get('current', snapshot.get('current', 0)) snapshot['total_msgs'] = live.get('total_estimated', snapshot.get('total_msgs', 0)) - tasks_to_save.append(snapshot) + tasks_to_save.append(self._ms(snapshot)) self.db.set("ChatCopy", "persistent_queue", tasks_to_save) async def _action_task(self, call, tid, action): # вот эта хрень держит все что находится в панели, лучше не трогать diff --git a/SenkoGuardian/SenModules/full.txt b/SenkoGuardian/SenModules/full.txt new file mode 100644 index 0000000..1d68301 --- /dev/null +++ b/SenkoGuardian/SenModules/full.txt @@ -0,0 +1,5 @@ +ChatCopy.py +Gemini.py +GiftFinder.py +MaillingChatGT99.py +NekoEditorMod.py \ No newline at end of file diff --git a/archquise/q.mods/.gitignore b/archquise/q.mods/.gitignore index f8e3f5a..a3417d9 100644 --- a/archquise/q.mods/.gitignore +++ b/archquise/q.mods/.gitignore @@ -4,4 +4,6 @@ .ruff_cache ruff.log ruff.log.2 -ruff.toml \ No newline at end of file +ruff.toml +# Heroku files +heroku/ diff --git a/archquise/q.mods/QNotes.py b/archquise/q.mods/QNotes.py new file mode 100644 index 0000000..7f48a99 --- /dev/null +++ b/archquise/q.mods/QNotes.py @@ -0,0 +1,438 @@ +__version__ = (1, 1, 6) + +# █▀▀▄ █▀▄▀█ █▀█ █▀▄ █▀ +# ▀▀▀█ ▄ █ ▀ █ █▄█ █▄▀ ▄█ + +# #### Copyright (c) 2026 Archquise ##### + +# 💬 Contact: https://t.me/archquise +# 🔒 Licensed under the GNU AGPLv3. +# 📄 LICENSE: https://raw.githubusercontent.com/archquise/Q.Mods/main/LICENSE +# --------------------------------------------------------------------------------- +# Name: QNotes +# Description: A notes module that just works +# Author: @quise_m +# --------------------------------------------------------------------------------- +# meta developer: @quise_m +# meta banner: https://raw.githubusercontent.com/archquise/qmods_meta/main/qnotes.png +# --------------------------------------------------------------------------------- + +import asyncio +import logging +import re +from datetime import date +from typing import cast + +from herokutl.tl.functions.users import GetUsersRequest +from herokutl.tl.types import InputUserSelf + +from .. import loader, utils + +logger = logging.getLogger(__name__) + + +@loader.tds +class QNotes(loader.Module): + """A notes module that just works\nUsage: #notetag in any chat""" + + strings = { + "name": "QNotes", + "topic_desc": "Stores your notes content\nUsage: #notetag in any chat", + "wrongargs": "
#{}\n\n
{}", + "notelist": "Note list:", + "msg_not_found_inline": "Message with this note wasn't found. Probably, it was been removed. Note has been removed from the database.", + "remnote_inline": "🗑 Remove", + "close_inline": "❌ Close", + "yes": "✔️ Yes", + "no": "❌ No", + "true": "yes", + "false": "no", + "saved": "Note saved!", + "removed": "Note removed!", + "nonotes": "You don't have any notes!", + "privacy_switch": "Determines whose data will be used by the my_* placeholders\n\nTrue - the account that is issuing the note\nFalse - the account on which the userbot is running", + "note_prefix": "The prefix used to call up notes", + "placeholders": """ + Available placeholders: + + about the account on which userbot is installed: + {my_id} - ID + @{my_username} - username, tag + {my_phone} - phone number + {my_premium} - premium status (yes/no) + + about reply author: + {reply_id} - ID + {reply_name} - name + {reply_surname} - surname + {reply_fullname} - full name (name + surname (if specified)) + @{reply_username} - username, tag + {reply_phone} - phone number (if not hidden) + {reply_premium} - premium status (yes/no) + + general: + {today} - current date + """, + } + + strings_ru = { + "_cls_doc": "Модуль для заметок, который просто работает\nИспользование: #тегзаметки в любом чате", + "topic_desc": "Хранит содержимое ваших заметок\nИспользование: #тегзаметки в любом чате", + "wrongargs": "
#{}\n\n
{}", + "notelist": "Список заметок:", + "msg_not_found_inline": "Сообщение с этой заметкой не было найдено. Вероятно, оно было удалено. Заметка очищена из базы данных.", + "remnote_inline": "🗑 Удалить", + "close_inline": "❌ Закрыть", + "yes": "✔️ Да", + "no": "❌ Нет", + "saved": "Заметка сохранена!", + "removed": "Заметка удалена!", + "true": "да", + "false": "нет", + "nonotes": "Нет заметок!", + "privacy_switch": "Влияет на то, чьи данные будут использовать my_* плейсхолдеры\n\nTrue - аккаунта, который вызывает заметку\nFalse - аккаунта на котором стоит юзербот", + "note_prefix": "Префикс, с которым вызываются заметки", + "placeholders": """ + Доступные плейсхолдеры: + + об аккаунте, на котором стоит юзербот: + {my_id} - айди + @{my_username} - юзернейм, тег + {my_phone} - номер телефона + {my_premium} - статус премиум (да/нет) + + об авторе реплая: + {reply_id} - айди + {reply_name} - имя + {reply_surname} - фамилия + {reply_fullname} - полное имя (имя + фамилия (если указана)) + @{reply_username} - юзернейм, тег + {reply_phone} - номер телефона (если не скрыт) + {reply_premium} - статус премиум (да/нет) + + общее: + {today} - текущая дата + """, + } + + def __init__(self): + self.config = loader.ModuleConfig( + loader.ConfigValue( + "privacy_switch", + True, + lambda: self.strings["privacy_switch"], + validator=loader.validators.Boolean(), # type: ignore + ), + loader.ConfigValue( + "note_prefix", + "#", + lambda: self.strings["note_prefix"], + validator=loader.validators.RegExp(r"^\S+$"), # type: ignore + ), + ) + + async def client_ready(self, client, db): # type: ignore + self._content_channel_id = await utils.wait_for_content_channel(self._db) + self._notes_topic = await utils.asset_forum_topic( + client=self._client, + db=self._db, + peer=self._content_channel_id, # type: ignore + title="QNotes | Storage", + description=self.strings["topic_desc"], + icon_emoji_id=5272001961326049733, + ) + + self.my_phone = (await self._client(GetUsersRequest(id=[InputUserSelf()])))[ + 0 + ].phone + + self.placeholders = { + "my_phone": self.my_phone, + "my_username": self._client.heroku_me.username, + "my_id": self.tg_id, + "my_premium": self.strings["true"] + if self._client.heroku_me.premium + else self.strings["false"], + } + + self._notemap = cast(dict, self.pointer("notemap", default={})) + + async def _ask_overwrite(self, message): + + loop = asyncio.get_running_loop() + future = loop.create_future() + + form = await self.inline.form( + self.strings["already_exists"], + message=message, + reply_markup=[ + [ + { + "text": self.strings["yes"], + "callback": ( + lambda call, flag: ( + future.set_result(flag) if not future.done() else None + ) + ), + "args": (True,), + }, + { + "text": self.strings["no"], + "callback": ( + lambda call, flag: ( + future.set_result(flag) if not future.done() else None + ) + ), + "args": (False,), + }, + ] + ], + ) + + try: + async with asyncio.timeout(15): + overwrite_answer = await future + except TimeoutError: + await form.delete() # type: ignore + return False, message + + if not overwrite_answer: + await form.delete() # type: ignore + return False, form + + return True, form + + async def _show_note_inline(self, call, note, page=0): + async def _remnote(call, notetag, note_msg): + await note_msg.delete() + self._notemap.pop(notetag, None) + + await call.edit(self.strings["removed"]) + + note_msg = await self._client.get_messages( + self._content_channel_id, ids=note[1] + ) + + if not note_msg: + self._notemap.pop(note[0], None) + + await call.edit( + self.strings["msg_not_found_inline"], + reply_markup=[ + {"text": "⬅️ Назад", "callback": self._list_page, "args": (page,)}, + {"text": self.strings["close_inline"], "action": "close"}, + ], + ) + return + + await call.edit( + self.strings["show_note_inline"].format(note[0], note_msg.text), # type: ignore + reply_markup=[ + [ + {"text": "⬅️ Назад", "callback": self._list_page, "args": (page,)}, + { + "text": self.strings["remnote_inline"], + "callback": _remnote, + "args": (note[0], note_msg), + }, + ], + [{"text": self.strings["close_inline"], "action": "close"}], + ], + ) + + def _build_list_markup(self, page: int): + items = list(self._notemap.items()) + total = -(-len(items) // 3) + page = max(0, min(page, total - 1)) + rows = [ + [ + { + "text": notetag, + "callback": self._show_note_inline, + "args": ([notetag, msg_id], page), + } + ] + for notetag, msg_id in items[page * 3 : (page + 1) * 3] + ] + return ( + rows + + self.inline.build_pagination( + callback=self._list_page, # type: ignore + total_pages=total, + current_page=page + 1, + ) + + [[{"text": self.strings["close_inline"], "action": "close"}]] + ) + + async def _list_page(self, call, page): + await call.edit( + text=self.strings["notelist"], reply_markup=self._build_list_markup(page) + ) + + @loader.command( + ru_doc="Сохраняет заметку под тегом | Пример: .qnsave заметка", + en_doc="Saves note by tag | Example: .qnsave note", + ) + async def qnsave(self, message) -> None: + args = utils.get_args(message) + if not args: + await utils.answer(message, self.strings["wrongargs"]) + return + + current_message = message + + if not (reply := await message.get_reply_message()): + await utils.answer(message, self.strings["no_reply"]) + return + try: + if args[0].strip() in self._notemap: + need_overwrite, msg = await self._ask_overwrite(message) + if not need_overwrite: + return + old_note_message = await self._client.get_messages( + self._content_channel_id, + ids=self._notemap[args[0].strip()], + ) + old_note_message and await old_note_message.delete() # type: ignore + current_message = msg + + note_message = await self._client.send_message( + self._content_channel_id, + reply.text, + reply_to=self._notes_topic.id, + file=reply.media, + ) + self._notemap[args[0].strip()] = note_message.id + + except Exception as e: + await utils.answer(current_message, f"Произошла ошибка: {e}") + logger.exception("Произошла ошибка при сохранении заметки!") + return + await utils.answer(current_message, self.strings["saved"]) + + @loader.command( + ru_doc="Удаляет заметку по тегу | Пример: .qnrem заметка", + en_doc="Removes note by tag | Example: .qnrem note", + ) + async def qnrem(self, message) -> None: + args = utils.get_args(message) + if not args: + await utils.answer(message, self.strings["wrongargs"]) + return + + if args[0] not in self._notemap or not ( + note_message := await self._client.get_messages( + self._content_channel_id, + ids=self._notemap[args[0]], + ) + ): + await utils.answer(message, self.strings["not_exist"]) + return + + await note_message.delete() # type: ignore + self._notemap.pop(args[0], None) + + await utils.answer(message, self.strings["removed"]) + + @loader.command( + ru_doc="Выводит список всех заметок и позволяет управлять ими", + en_doc="Shows note list and allows managing them", + ) + async def qnlist(self, message) -> None: + if self._notemap: + await self.inline.form( + text=self.strings["notelist"], + reply_markup=self._build_list_markup(0), + message=message, + ) + return + await utils.answer(message, self.strings["nonotes"]) + + @loader.command( + ru_doc="Выводит список доступных плейсхолдеров", + en_doc="Displays a list of available placeholders", + ) + async def qnp(self, message) -> None: + await utils.answer(message, self.strings["placeholders"]) + + @loader.watcher() + async def _note_watcher(self, message): + if not message.text.startswith(prefix := self.config["note_prefix"]) or not ( + await self._client.dispatcher.security.check(message, self._note_watcher) + ): + return + + notetag = message.text.split(prefix, maxsplit=1)[1] + + if notetag in self._notemap: + if not ( + note_message := await self._client.get_messages( + self._content_channel_id, + ids=self._notemap[notetag], + ) + ): + self._notemap.pop(notetag, None) + return + notetext = note_message.text or "" # type: ignore + if re.search(r"\{\w+\}", notetext): + if ( + not self.config["privacy_switch"] + or message.sender_id == self._client.heroku_me.id + ): + placeholders = {**self.placeholders} + else: + message_author_entity = await self._client.get_entity( + message.sender_id + ) + placeholders = { + "my_phone": ( + await self._client(GetUsersRequest(id=[message.sender_id])) + )[0].phone, + "my_username": message_author_entity.username, + "my_id": message.sender_id, + "my_premium": self.strings["true"] + if message_author_entity.premium + else self.strings["false"], + } + + if reply_msg := await message.get_reply_message(): + reply_user = await self._client.get_entity(reply_msg.sender_id) + placeholders = { + **placeholders, + "reply_id": reply_user.id, + "reply_fullname": " ".join( + filter(None, [reply_user.first_name, reply_user.last_name]) + ), + "reply_name": reply_user.first_name, + "reply_surname": reply_user.last_name, + "reply_phone": ( + await self._client(GetUsersRequest(id=[reply_user.id])) + )[0].phone, + "reply_username": reply_user.username, + "reply_premium": self.strings["true"] + if reply_user.premium + else self.strings["false"], + } + + placeholders = placeholders | {"today": date.today()} + + def replacer(match): + key = match.group(1) + if key not in placeholders or not placeholders[key]: + return match.group(0) + return utils.escape_html(str(placeholders[key])) + + notetext = re.sub(r"\{(\w+)\}", replacer, notetext) + if media := note_message.media: # type: ignore + await utils.answer_file(message, media, notetext) # type: ignore + else: + await utils.answer(message, notetext) + return diff --git a/archquise/q.mods/face.py b/archquise/q.mods/face.py index b5c0160..0a01f9b 100644 --- a/archquise/q.mods/face.py +++ b/archquise/q.mods/face.py @@ -59,7 +59,7 @@ class FaceMod(loader.Module): en_doc="Random kaomoji", ) async def rfacecmd(self, message) -> None: # noqa: D102, ANN001 - await utils.answer(message, self.strings("loading")) + await utils.answer(message, self.strings["loading"]) url = "https://files.archquise.ru/kaomoji.txt" @@ -72,7 +72,7 @@ class FaceMod(loader.Module): kaomoji = random.choice(kaomoji_list) # noqa: S311 await utils.answer( message, - self.strings("random_face").format(kaomoji), + self.strings["random_face"].format(kaomoji), ) else: - await utils.answer(message, self.strings("error")) + await utils.answer(message, self.strings["error"]) diff --git a/archquise/q.mods/shortener.py b/archquise/q.mods/shortener.py index 6d4de4e..3b52972 100644 --- a/archquise/q.mods/shortener.py +++ b/archquise/q.mods/shortener.py @@ -113,24 +113,24 @@ class ShortenerMod(loader.Module): async def shortencmd(self, message): # noqa: ANN001, ANN201 """Shorten URL using bit.ly API.""" if self.config["token"] is None: - await utils.answer(message, self.strings("no_api")) + await utils.answer(message, self.strings["no_api"]) return args = utils.get_args_raw(message) if not args: - await utils.answer(message, self.strings("no_args")) + await utils.answer(message, self.strings["no_args"]) return if not self._validate_url(args): - await utils.answer(message, self.strings("invalid_url")) + await utils.answer(message, self.strings["invalid_url"]) return try: short_url = await self.shorten_url(url=args, token=self.config["token"]) - await utils.answer(message, self.strings("shortencmd").format(c=short_url)) + await utils.answer(message, self.strings["shortencmd"].format(c=short_url)) except Exception as e: logger.exception("Error shortening URL!") - await utils.answer(message, self.strings("api_error").format(error=str(e))) + await utils.answer(message, self.strings["api_error"].format(error=str(e))) @loader.command( ru_doc="Посмотреть статистику ссылки через bit.ly (ссылка без https:// | Доступно только на платных аккаунтах)", @@ -139,22 +139,22 @@ class ShortenerMod(loader.Module): async def statclcmd(self, message): # noqa: ANN001, ANN201 """Get click statistics for shortened URL.""" if self.config["token"] is None: - await utils.answer(message, self.strings("no_api")) + await utils.answer(message, self.strings["no_api"]) return args = utils.get_args_raw(message) if not args: - await utils.answer(message, self.strings("no_args")) + await utils.answer(message, self.strings["no_args"]) return try: if not args.startswith("bit.ly/"): - await utils.answer(message, self.strings("invalid_url")) + await utils.answer(message, self.strings["invalid_url"]) return clicks = await self.get_bitlink_stats( bitlink=args, token=self.config["token"] ) - await utils.answer(message, self.strings("statclcmd").format(c=clicks)) + await utils.answer(message, self.strings["statclcmd"].format(c=clicks)) except Exception as e: logger.exception("Error getting statistics!") - await utils.answer(message, self.strings("api_error").format(error=str(e))) + await utils.answer(message, self.strings["api_error"].format(error=str(e))) diff --git a/coddrago/modules/full.txt b/coddrago/modules/full.txt index d889594..3032c51 100644 --- a/coddrago/modules/full.txt +++ b/coddrago/modules/full.txt @@ -22,4 +22,4 @@ chatmodule stats tagwatcher hardspam -YaMusic +YaMusic \ No newline at end of file diff --git a/fiksofficial/python-modules/full.txt b/fiksofficial/python-modules/full.txt index 5855795..febee3e 100644 --- a/fiksofficial/python-modules/full.txt +++ b/fiksofficial/python-modules/full.txt @@ -27,4 +27,5 @@ github stream placeholders+ PyInstall -IwaAnimation \ No newline at end of file +IwaAnimation +lateban \ No newline at end of file diff --git a/fiksofficial/python-modules/lateban.py b/fiksofficial/python-modules/lateban.py new file mode 100644 index 0000000..1ea1e1a --- /dev/null +++ b/fiksofficial/python-modules/lateban.py @@ -0,0 +1,320 @@ +# ______ ___ ___ _ _ +# ____ | ___ \ | \/ | | | | | +# / __ \| |_/ / _| . . | ___ __| |_ _| | ___ +# / / _` | __/ | | | |\/| |/ _ \ / _` | | | | |/ _ \ +# | | (_| | | | |_| | | | | (_) | (_| | |_| | | __/ +# \ \__,_\_| \__, \_| |_/\___/ \__,_|\__,_|_|\___| +# \____/ __/ | +# |___/ + +# На модуль распространяется лицензия "GNU General Public License v3.0" +# https://github.com/all-licenses/GNU-General-Public-License-v3.0 + +# meta developer: @pymodule + +import asyncio +import logging +from datetime import datetime, timezone + +from herokutl.tl.functions.channels import ( + EditBannedRequest, + GetParticipantsRequest, +) +from herokutl.tl.types import ( + ChatBannedRights, + ChannelParticipantsSearch, + MessageService, + MessageActionChatAddUser, + MessageActionChatJoinedByLink, + MessageActionChatJoinedByRequest, +) + +from .. import loader, utils + +logger = logging.getLogger(__name__) + +_BAN = ChatBannedRights(until_date=None, view_messages=True) + +@loader.tds +class LateBanMod(loader.Module): + """Ban all members who joined the chat after a specified date/time""" + + strings = { + "name": "LateBan", + "no_args": ( + "❌ Specify date/time:\n" + "
.lateban DD.MM.YYYY\n"
+ ".lateban DD.MM.YYYY HH:MM\n"
+ ".lateban HH:MM — today"
+ ),
+ "bad_date": (
+ "❌ Invalid format. Use DD.MM.YYYY, "
+ "DD.MM.YYYY HH:MM or HH:MM"
+ ),
+ "not_chat": "❌ Only works in supergroups",
+ "no_rights": "❌ No permission to ban members",
+ "scanning": "🔍 Scanning members who joined after {dt}...",
+ "confirm": (
+ "⚠️ Found {count} members who joined after {dt}.\n\n"
+ "Confirm ban:"
+ ),
+ "btn_ban": "✅ Ban {count} members",
+ "btn_cancel": "❌ Cancel",
+ "banning": "⏳ Banning {count} members...",
+ "progress": "⏳ Banned {done}/{total}...",
+ "done": (
+ "✅ Banned: {banned}\n"
+ "Skipped (errors/bots): {skipped}\n"
+ "Service messages deleted: {deleted}"
+ ),
+ "nobody": "✅ No members found who joined after {dt}.",
+ }
+
+ strings_ru = {
+ "name": "LateBan",
+ "_cls_doc": "Заблокируйте всех участников, присоединившихся к чату после указанной даты/времени.",
+ "no_args": (
+ "❌ Укажи дату/время:\n"
+ ".lateban DD.MM.YYYY\n"
+ ".lateban DD.MM.YYYY HH:MM\n"
+ ".lateban HH:MM"
+ ),
+ "bad_date": (
+ "❌ Неверный формат. Используй DD.MM.YYYY, "
+ "DD.MM.YYYY HH:MM или HH:MM"
+ ),
+ "not_chat": "❌ Команда работает только в супергруппах",
+ "no_rights": "❌ Нет прав на бан участников",
+ "scanning": "🔍 Сканирую участников, вступивших после {dt}...",
+ "confirm": (
+ "⚠️ Найдено {count} участников, вступивших после {dt}.\n\n"
+ "Подтверди бан:"
+ ),
+ "btn_ban": "✅ Забанить {count} участников",
+ "btn_cancel": "❌ Отмена",
+ "banning": "⏳ Баню {count} участников...",
+ "progress": "⏳ Забанено {done}/{total}...",
+ "done": (
+ "✅ Забанено: {banned}\n"
+ "Пропущено (ошибки/боты): {skipped}\n"
+ "Удалено сервисных сообщений: {deleted}"
+ ),
+ "nobody": "✅ Участников, вступивших после {dt}, не найдено.",
+ }
+
+ async def client_ready(self):
+ pass
+
+ @loader.command(ru_doc="{utils.escape_html(output)}")
except FileNotFoundError:
- await utils.answer(message, self.strings("not_installed"))
+ await utils.answer(message, self.strings["not_installed"])
diff --git a/radiocycle/Modules/PicToStories.py b/radiocycle/Modules/PicToStories.py
index b244a2c..7d848b1 100644
--- a/radiocycle/Modules/PicToStories.py
+++ b/radiocycle/Modules/PicToStories.py
@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
-#
-# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
-# --------------------------------------
-# https://creativecommons.org/licenses/by-nd/4.0/legalcode
-# =======================================
# meta developer: @ke_mods
# requires: pillow
@@ -95,17 +90,17 @@ class PicToStoriesMod(loader.Module):
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
if not reply or not reply.media:
- await utils.answer(message, self.strings("no_rep"))
+ await utils.answer(message, self.strings["no_rep"])
return
try:
image_bytes = await reply.download_media(file=bytes)
img = Image.open(io.BytesIO(image_bytes))
except Exception as e:
- await utils.answer(message, self.strings("err").format(e))
+ await utils.answer(message, self.strings["err"].format(e))
return
- await utils.answer(message, self.strings("work"))
+ await utils.answer(message, self.strings["work"])
w, h = img.size
curr_ratio = w / h
@@ -208,4 +203,4 @@ class PicToStoriesMod(loader.Module):
)
)
- await utils.answer(message, self.strings("done"))
\ No newline at end of file
+ await utils.answer(message, self.strings["done"])
diff --git a/radiocycle/Modules/RandomAnimePic.py b/radiocycle/Modules/RandomAnimePic.py
index f609d7e..e310af0 100644
--- a/radiocycle/Modules/RandomAnimePic.py
+++ b/radiocycle/Modules/RandomAnimePic.py
@@ -6,11 +6,6 @@
# |_|\_\___| |_| |_|\___/ \__,_|___/
# @ke_mods
# =======================================
-#
-# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
-# --------------------------------------
-# https://creativecommons.org/licenses/by-nd/4.0/legalcode
-# =======================================
# meta developer: @ke_mods
# requires: pillow
@@ -55,23 +50,13 @@ class RandomAnimePicMod(loader.Module):
IMAGES_API_URL = "https://api.nekosapi.com/v4/images"
CATEGORIES_SCAN_LIMIT = 500
- def __init__(self):
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "category",
- "",
- "Category",
- validator=loader.validators.String(),
- ),
- )
-
@loader.command(ru_doc="- получить рандомную аниме-картинку 👀")
async def rapiccmd(self, message):
"""- fetch random anime-pic 👀"""
- await utils.answer(message, self.strings("loading"))
+ await utils.answer(message, self.strings["loading"])
try:
- category = self.config["category"].strip()
+ category = await utils.get_args_raw().strip()
def fetch_image():
params = {"limit": 1, "rating": ["safe"]}
@@ -111,7 +96,7 @@ class RandomAnimePicMod(loader.Module):
url, file = await asyncio.to_thread(fetch_image)
await utils.answer(
message,
- self.strings("img").format(url),
+ self.strings["img"].format(url),
file=file
)
@@ -120,12 +105,12 @@ class RandomAnimePicMod(loader.Module):
"Error fetching random anime pic: %s",
traceback.format_exc(),
)
- await utils.answer(message, self.strings("error"))
+ await utils.answer(message, self.strings["error"])
@loader.command(ru_doc="- получить список категорий из API 👀")
async def racategoriescmd(self, message):
"""- fetch categories from api 👀"""
- await utils.answer(message, self.strings("categories_loading"))
+ await utils.answer(message, self.strings["categories_loading"])
try:
def fetch_categories() -> list[str]:
@@ -162,15 +147,15 @@ class RandomAnimePicMod(loader.Module):
categories = await asyncio.to_thread(fetch_categories)
if not categories:
- await utils.answer(message, self.strings("no_categories"))
+ await utils.answer(message, self.strings["no_categories"])
return
- formatted_categories = "\n".join(
+ formatted_categories = ", ".join(
f"{category}" for category in categories
)
await utils.answer(
message,
- self.strings("categories").format(formatted_categories),
+ self.strings["categories"].format(formatted_categories),
)
except Exception:
@@ -178,4 +163,4 @@ class RandomAnimePicMod(loader.Module):
"Error fetching categories: %s",
traceback.format_exc(),
)
- await utils.answer(message, self.strings("error"))
\ No newline at end of file
+ await utils.answer(message, self.strings["error"])
diff --git a/radiocycle/Modules/SpotifyMod.py b/radiocycle/Modules/SpotifyMod.py
index d1ea339..af176c6 100644
--- a/radiocycle/Modules/SpotifyMod.py
+++ b/radiocycle/Modules/SpotifyMod.py
@@ -16,11 +16,6 @@
# @ke_mods
# =======================================
#
-# LICENSE: CC BY-ND 4.0 (Attribution-NoDerivatives 4.0 International)
-# --------------------------------------
-# https://creativecommons.org/licenses/by-nd/4.0/legalcode
-# =======================================
-#
# meta developer: @ke_mods
# requires: telethon spotipy pillow requests yt-dlp curl_cffi
# scope: ffmpeg
@@ -39,6 +34,7 @@ import traceback
import os
from types import FunctionType
+import random
import requests
import spotipy
from PIL import Image, ImageDraw, ImageEnhance, ImageFilter, ImageFont, ImageOps
@@ -61,7 +57,9 @@ class Banners:
progress: int,
track_cover: bytes,
font,
- blur
+ blur,
+ album_title: str = "",
+ meta_info: str = "",
):
self.title = title
self.artists = ", ".join(artists) if isinstance(artists, list) else artists
@@ -70,6 +68,8 @@ class Banners:
self.track_cover = track_cover
self.font_url = font
self.blur_intensity = blur
+ self.album_title = album_title
+ self.meta_info = meta_info
def _get_font(self, size, font_bytes):
return ImageFont.truetype(io.BytesIO(font_bytes), size)
@@ -237,6 +237,164 @@ class Banners:
by.name = "banner.png"
return by
+ # Ultra banner from YaMusic by @codrago_m
+ def ultra(self) -> io.BytesIO:
+ WIDTH, HEIGHT = 2560, 1220
+
+ font_bytes = requests.get(self.font_url).content
+
+ def get_font(size):
+ try:
+ return ImageFont.truetype(io.BytesIO(font_bytes), size)
+ except Exception:
+ return ImageFont.load_default()
+
+ try:
+ original_cover = Image.open(io.BytesIO(self.track_cover)).convert("RGBA")
+ except Exception:
+ original_cover = Image.new("RGBA", (1000, 1000), "black")
+
+ dominant_color_img = original_cover.resize((1, 1), Image.Resampling.LANCZOS)
+ dominant_color = dominant_color_img.getpixel((0, 0))
+
+ r, g, b, a = dominant_color
+ brightness = (r * 299 + g * 587 + b * 114) / 1000
+ if brightness < 60:
+ r = min(255, r + 60)
+ g = min(255, g + 60)
+ b = min(255, b + 60)
+ dominant_color = (r, g, b, 255)
+
+ background = original_cover.copy()
+ bg_w, bg_h = background.size
+
+ target_ratio = WIDTH / HEIGHT
+ current_ratio = bg_w / bg_h
+
+ if current_ratio > target_ratio:
+ new_w = int(bg_h * target_ratio)
+ offset = (bg_w - new_w) // 2
+ background = background.crop((offset, 0, offset + new_w, bg_h))
+ else:
+ new_h = int(bg_w / target_ratio)
+ offset = (bg_h - new_h) // 2
+ background = background.crop((0, offset, bg_w, offset + new_h))
+
+ background = background.resize((WIDTH, HEIGHT), Image.Resampling.LANCZOS)
+
+ if self.blur_intensity > 0:
+ background = background.filter(ImageFilter.GaussianBlur(radius=self.blur_intensity))
+
+ dark_overlay = Image.new("RGBA", (WIDTH, HEIGHT), (0, 0, 0, 180))
+ background = Image.alpha_composite(background, dark_overlay)
+
+ cover_size = 500
+ cover_x = (WIDTH - cover_size) // 2
+ cover_y = 160
+
+ glow_layer = Image.new("RGBA", (WIDTH, HEIGHT), (0, 0, 0, 0))
+ draw_glow = ImageDraw.Draw(glow_layer)
+
+ glow_rect_size = 620
+ g_x = (WIDTH - glow_rect_size) // 2
+ g_y = cover_y + (cover_size - glow_rect_size) // 2
+
+ draw_glow.rounded_rectangle(
+ (g_x, g_y, g_x + glow_rect_size, g_y + glow_rect_size),
+ radius=50,
+ fill=dominant_color,
+ )
+
+ glow_layer = glow_layer.filter(ImageFilter.GaussianBlur(radius=60))
+ glow_layer = ImageEnhance.Brightness(glow_layer).enhance(1.4)
+ glow_layer = ImageEnhance.Color(glow_layer).enhance(1.2)
+
+ background = Image.alpha_composite(background, glow_layer)
+
+ cover_img = original_cover.resize((cover_size, cover_size), Image.Resampling.LANCZOS)
+
+ mask = Image.new("L", (cover_size, cover_size), 0)
+ draw_mask = ImageDraw.Draw(mask)
+ draw_mask.rounded_rectangle((0, 0, cover_size, cover_size), radius=45, fill=255)
+
+ background.paste(cover_img, (cover_x, cover_y), mask)
+
+ draw = ImageDraw.Draw(background)
+ center_x = WIDTH // 2
+ current_y = cover_y + cover_size + 130
+
+ def draw_text_shadow(text, pos, font, fill="white", anchor="ms"):
+ x, y = pos
+ draw.text((x + 2, y + 2), text, font=font, fill=(0, 0, 0, 240), anchor=anchor)
+ draw.text((x, y), text, font=font, fill=fill, anchor=anchor)
+
+ font_title = get_font(100)
+ title_text = self.title if len(self.title) <= 30 else self.title[:30] + "..."
+ draw_text_shadow(title_text.upper(), (center_x, current_y), font_title)
+
+ current_y += 85
+
+ font_artist = get_font(65)
+ artist_text = self.artists if len(self.artists) <= 45 else self.artists[:45] + "..."
+ draw_text_shadow(artist_text.upper(), (center_x, current_y), font_artist, fill=(255, 255, 255, 240))
+
+ current_y += 80
+
+ bar_width = 800
+ font_time = get_font(40)
+
+ bar_start_x = center_x - (bar_width // 2)
+ bar_end_x = center_x + (bar_width // 2)
+ bar_y = current_y
+
+ total_time_str = f"{self.duration // 1000 // 60:02d}:{(self.duration // 1000) % 60:02d}"
+ cur_time_str = f"{self.progress // 1000 // 60:02d}:{(self.progress // 1000) % 60:02d}"
+
+ draw_text_shadow(cur_time_str, (bar_start_x - 30, bar_y), font_time, anchor="rm")
+ draw_text_shadow(total_time_str, (bar_end_x + 30, bar_y), font_time, anchor="lm")
+
+ old_state = random.getstate()
+ random.seed(self.title + str(self.duration))
+
+ num_bars = 65
+ bar_spacing = bar_width / num_bars
+ bar_w = max(4, int(bar_spacing * 0.5))
+ max_h, min_h = 50, 6
+
+ active_bars = int(num_bars * (self.progress / self.duration)) if self.duration > 0 else 0
+
+ for i in range(num_bars):
+ base_h = random.randint(min_h, max_h)
+ edge_factor = 1.0 - abs((i - num_bars / 2) / (num_bars / 2))
+ h = max(min_h, int(base_h * 0.4 + max_h * edge_factor * 0.6))
+ x_center = bar_start_x + i * bar_spacing
+ color = (255, 255, 255, 255) if i < active_bars else (80, 80, 80, 100)
+ draw.rounded_rectangle(
+ (x_center - bar_w / 2, bar_y - h / 2, x_center + bar_w / 2, bar_y + h / 2),
+ radius=int(bar_w / 2),
+ fill=color,
+ )
+
+ random.setstate(old_state)
+
+ current_y += 80
+
+ if self.album_title:
+ font_album = get_font(50)
+ album_text = self.album_title if len(self.album_title) <= 50 else self.album_title[:50] + "..."
+ draw_text_shadow(album_text, (center_x, current_y), font_album, fill=(230, 230, 230))
+ current_y += 60
+
+ if self.meta_info:
+ font_meta = get_font(40)
+ draw_text_shadow(self.meta_info, (center_x, current_y), font_meta, fill=(210, 210, 210))
+
+ by = io.BytesIO()
+ background.save(by, format="PNG")
+ by.seek(0)
+ by.name = "banner.png"
+ return by
+
@loader.tds
class SpotifyMod(loader.Module):
"""Card with the currently playing track on Spotify."""
@@ -349,9 +507,6 @@ class SpotifyMod(loader.Module):
".sdevice to see available devices."
- ),
"autobio": (
".sdevice , чтобы увидеть доступные устройства."
- ),
"autobio": (
"use_rf_proxy в конфиге)"
- except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй включить use_rf_proxy в конфиге)"
+ try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
+ except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
return await utils.answer(st,self.strings["api_error"].format(err))
buf=io.BytesIO(r.content); buf.name="YgQuote"+(".png" if doc else ".webp")
@@ -270,11 +271,11 @@ class Quotes(loader.Module):
"format": "webp","type":self.config["type"]}
await utils.answer(st,self.strings["api_processing"])
- endpoint=self.config['rf_endpoint'] if self.config['use_rf_proxy'] else self.config['endpoint']
- r=await Dick.post(f"{endpoint}.webp",dickk)
+ prx = self.config["proxy"] if self.config["proxy"] else None
+ r=await Dick.post(f"{self.config['endpoint']}.webp",dickk,proxy=prx)
if not r or r.status_code!=200:
- try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй включить use_rf_proxy в конфиге)"
- except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй включить use_rf_proxy в конфиге)"
+ try: err=r.json().get("error",f"HTTP {r.status_code}") if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
+ except Exception: err=f"HTTP {r.status_code}" if r else "Нетворк еррорь (попробуй указать прокси в конфиге)"
return await utils.answer(st,self.strings["api_error"].format(err))
buf=io.BytesIO(r.content); buf.name="YgQuote.webp"
@@ -290,12 +291,18 @@ class Quotes(loader.Module):
return None
out: List[dict]=[]
+ prev_sender_id = None
+
for mm in lst:
try:
u=await self.who(mm)
if not u: continue
+ current_sender_id = getattr(u,"id",0)
+
+ is_chained = (current_sender_id == prev_sender_id) if current_sender_id else False
name=telethon.utils.get_display_name(u); f,l=Dick.split(name)
- ava=await Dick.ava(self.client,getattr(u,"id",0)) if getattr(u,"id",None) else None
+
+ ava = await Dick.ava(self.client,current_sender_id) if (not is_chained and current_sender_id) else None
rb=None
try:
@@ -315,10 +322,16 @@ class Quotes(loader.Module):
txt=mm.raw_text or ""; ad=Dick.desc(mm)
if ad: txt=f"{txt}\n\n{ad}" if txt else ad
- item={"from":{"id":getattr(u,"id", 0),"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l,
- "username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
- "text":txt,"entities":Dick.ents(mm.entities),"avatar":True}
-
+ if is_chained:
+ item={"from":{"id":current_sender_id,"name":""},
+ "text":txt,"entities":Dick.ents(mm.entities),"avatar":False}
+ else:
+ item={"from":{"id":current_sender_id,"first_name":getattr(u,"first_name","") or f,"last_name":getattr(u,"last_name","") or l,
+ "username":getattr(u,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
+ "text":txt,"entities":Dick.ents(mm.entities),"avatar":True}
+
+ es=getattr(u,"emoji_status",None)
+ if getattr(es,"document_id",None): item["from"]["emoji_status"]=str(es.document_id)
try:
if mm.voice:
a = next((a for a in mm.voice.attributes or []
@@ -327,11 +340,10 @@ class Quotes(loader.Module):
except Exception: pass
if med: item["voice" if "voice" in med else "media"] = med.get("voice", med)
-
- es=getattr(u,"emoji_status",None)
- if getattr(es,"document_id",None): item["from"]["emoji_status"]=str(es.document_id)
if rb: item["replyMessage"]=rb
out.append(item)
+
+ prev_sender_id = current_sender_id
except Exception: continue
return out
@@ -378,6 +390,8 @@ class Quotes(loader.Module):
return await self.fake(f"{getattr(u,'id','')} {args}", None)
out: List[dict]=[]
+ prev_sender_id = None
+
for part in args.split("; "):
try:
rb=None
@@ -388,22 +402,32 @@ class Quotes(loader.Module):
if not u1: continue
txt1, ents1 = html.parse(t1) if t1 else ("", [])
+
+ current_sender_id = u1.id
+ is_chained = (current_sender_id == prev_sender_id)
name=telethon.utils.get_display_name(u1); f,l=Dick.split(name)
- ava=await Dick.ava(self.client,u1.id)
+
+ ava = await Dick.ava(self.client,u1.id) if not is_chained else None
if u2:
txt2, ents2 = html.parse(t2) if t2 else ("", [])
name2=telethon.utils.get_display_name(u2); ava2=await Dick.ava(self.client,u2.id)
rb={"name":name2,"text":txt2,"entities":Dick.ents(ents2),"chatId":u2.id,"from":{"name":name2,"photo":{"url":ava2} if ava2 else {}}}
- msg={"from":{"id":u1.id,"first_name":getattr(u1,"first_name","") or f,"last_name":getattr(u1,"last_name","") or l,
- "username":getattr(u1,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
- "text":txt1,"entities":Dick.ents(ents1), "avatar":True}
-
- es=getattr(u1,"emoji_status",None)
- if getattr(es,"document_id",None): msg["from"]["emoji_status"]=str(es.document_id)
+ if is_chained:
+ msg={"from":{"id":current_sender_id,"name":""},
+ "text":txt1,"entities":Dick.ents(ents1), "avatar":False}
+ else:
+ msg={"from":{"id":current_sender_id,"first_name":getattr(u1,"first_name","") or f,"last_name":getattr(u1,"last_name","") or l,
+ "username":getattr(u1,"username",None),"name":name,"photo":{"url":ava} if ava else {}},
+ "text":txt1,"entities":Dick.ents(ents1), "avatar":True}
+ es=getattr(u1,"emoji_status",None)
+ if getattr(es,"document_id",None): msg["from"]["emoji_status"]=str(es.document_id)
+
if rb: msg["replyMessage"]=rb
out.append(msg)
+
+ prev_sender_id = current_sender_id
except Exception: continue
return out
\ No newline at end of file