From be47e59d977e28421ba154e5bdc5abf7f97d372a Mon Sep 17 00:00:00 2001 From: John Doe Date: Sat, 18 Apr 2026 13:19:24 +0300 Subject: [PATCH] fix: some broken strings feat: install button and new limoka version update notification drop: watcher that installs module from bot (temporary) --- Limoka.py | 453 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 291 insertions(+), 162 deletions(-) diff --git a/Limoka.py b/Limoka.py index d44a446..7138e71 100644 --- a/Limoka.py +++ b/Limoka.py @@ -25,15 +25,37 @@ from telethon import TelegramClient from telethon.errors.rpcerrorlist import YouBlockedUserError from telethon import functions +from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton + +import ast + try: from aiogram.utils.exceptions import BadRequest except ImportError: from aiogram.exceptions import TelegramBadRequest as BadRequest + from .. import utils, loader -from ..types import InlineCall +from ..types import BotInlineCall, InlineCall logger = logging.getLogger("Limoka") -__version__ = (1, 4, 5) +__version__ = (1, 5, 0) + + +def _parse_version_from_source(source: str): + try: + tree = ast.parse(source) + except SyntaxError: + return None + + for node in tree.body: + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name) and target.id == "__version__": + try: + return ast.literal_eval(node.value) + except (ValueError, SyntaxError): + return None + return None WEIGHTS = { "inline.token_obtainment": 15, @@ -236,9 +258,11 @@ class ModuleContentBuilder: module_info: Dict[str, Any], query: str, filters: Dict[str, List[str]], + url: str, include_categories: bool = True, module_path: Optional[str] = None, lang: str = "en", + ) -> tuple: """ Build complete formatted module content. @@ -260,39 +284,48 @@ class ModuleContentBuilder: categories_text = self._build_categories_text(filters) commands = self.formatter.format_commands(module_info, lang) - header = self._build_header(query, name, description, dev_username, module_path) + header = self._build_header(query, name, description, dev_username, module_path, url) footer = self._build_footer(module_path) body_pages = self._paginate_commands(commands) return header, body_pages, footer, categories_text - def _build_header(self, query: str, name: str, description: str, dev_username: str, module_path: Optional[str]) -> str: + def _build_header(self, query: str, name: str, description: str, dev_username: str, module_path: Optional[str], url: str) -> str: """Build message header with module info and tags.""" tags_list = self.repository.get_tags_for_module(module_path) if module_path else [] tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list) header_template = self.strings["found_header"] if not tags_text: + # Replace the tags line but keep blockquote closure at the end header_template = header_template.replace( - "
🏷 Tags: {tags}
\n\n", - "" + "\n\n🏷 Tags: {tags}\n\n", + "\n\n" ) header_template = header_template.replace( - "
🏷 Теги: {tags}
\n\n", - "" + "\n\n🏷 Теги: {tags}\n\n", + "\n\n" ) - return header_template.format( + header = header_template.format( query=html.escape(query), name=name, description=description, username=dev_username, tags=tags_text, + module_path=module_path, + url=url ) + + # Remove extra newlines + header = re.sub(r'\n+', '\n', header) + + return header def _build_footer(self, module_path: Optional[str]) -> str: """Build message footer with download command.""" clean_path = (module_path or "").replace("\\", "/") + return "" # unused for now, but may be used in the future for additional info return self.strings["found_footer"].format( url=html.escape(self.formatter.strings.get("limokaurl", "")), module_path=html.escape(clean_path), @@ -351,22 +384,20 @@ class Limoka(loader.Module): "found_header": ( "
🔍 Found module {name} " "by query: {query}\n\n" + "🌐 Source\n" "ℹ️ Description: {description}\n" "🧑‍💻 Developer: {username}\n\n" - "
🏷 Tags: {tags}
\n\n" + "🏷 Tags: {tags}
\n\n" ), "found_body": ("{commands}"), - "found_footer": ( - "
\n🪄 {prefix}dlm {url}{module_path}
" - ), "caption_short": ( "
🔍 {safe_name}\n" + "🌐 Source\n" "ℹ️ Description: {safe_desc}\n" - "🧑‍💻 Dev: {dev_username}\n" - "🪄 {prefix}dlm {module_path}
" + "🧑‍💻 Dev: {dev_username}\n" ), - "command_template": "{emoji} {prefix}{command} — {description}\n", - "inline_handler_template": "{inline_bot} {command} — {description}\n", + "command_template": "
{emoji} {prefix}{command} — {description}
\n", + "inline_handler_template": "
{inline_bot} {command} — {description}
\n", "emojis": { 1: "1️⃣", 2: "2️⃣", @@ -414,6 +445,7 @@ class Limoka(loader.Module): "global_button": "🌍 Results", "filtered_button": "🏷️ Filtered search", "inline_search": "🔍 Search in Limoka", + "install_button": "🪄 Install", "inline_no_results": "
❌ No modules found
", "inline_error": "
❌ Search error occurred
", "inline_short_query": "
❌ Query too short (min 2 chars)
", @@ -450,7 +482,13 @@ class Limoka(loader.Module): "If error persists again, report to developers" ), "body_page": "Commands", - "limokaurl": "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/" + "install_failed": "Installation failed. Check logs for details.", + "install_succeeded": "Module installed successfully!", + "update_available": ( + "🔔 New update available!\n\n" + "New Limoka Version {version} already available. Please update for better performance, bug fixes, and new features.\n" + "Press the button below to update the module." + ), } strings_ru = { "name": "Limoka", @@ -461,23 +499,21 @@ class Limoka(loader.Module): ), "found_header": ( "
🔍 Найден модуль {name} " - "по запросу: {query}
\n\n" - "
ℹ️ Описание: {description}
\n" - "
🧑‍💻 Разработчик: {username}
\n\n" - "
🏷 Теги: {tags}
\n\n" + "по запросу: {query}\n\n" + "🌐 Исходный код\n" + "ℹ️ Описание: {description}\n" + "🧑‍💻 Разработчик: {username}\n\n" + "🏷 Теги: {tags}\n\n" ), "found_body": ("{commands}"), - "found_footer": ( - "\n
🪄 {prefix}dlm {url}{module_path}
" - ), "caption_short": ( "
🔍 {safe_name}\n" + "🌐 Исходный код\n" "ℹ️ Описание: {safe_desc}\n" - "🧑‍💻 Разработчик: {dev_username}\n" - "🪄 {prefix}dlm {module_path}
" + "🧑‍💻 Разработчик: {dev_username}\n" ), "command_template": "
{emoji} {prefix}{command} — {description}
\n", - "inline_handler_template": "{inline_bot} {command} — {description}\n", + "inline_handler_template": "
{inline_bot} {command} — {description}
\n", "emojis": { 1: "1️⃣", 2: "2️⃣", @@ -530,6 +566,7 @@ class Limoka(loader.Module): "global_button": "🌍 Результаты", "filtered_button": "🏷️ Поиск с фильтрами", "inline_search": "🔍 Поиск в Limoka", + "install_button": "🪄 Установить", "inline_no_results": "
❌ Модули не найдены
", "inline_error": "
❌ Ошибка поиска
", "inline_short_query": "
❌ Запрос слишком короткий (мин. 2 символа)
", @@ -566,6 +603,15 @@ class Limoka(loader.Module): "Если ошибка сохраняется снова, сообщите разработчикам" ), "body_page": "Команды", + "install_failed": "Установка не удалась. Проверьте логи для деталей.", + "install_succeeded": "Модуль успешно установлен!", + "update_available": ( + "🔔 Доступно новое обновление!\n\n" + "Новая версия Limoka {version} уже доступна. Пожалуйста, обновитесь для лучшей производительности, исправления багов и новых функций.\n" + "Нажмите кнопку ниже, чтобы обновить модуль." + ), + "no_updates_available": "Нет доступных обновлений. У вас установлена последняя версия Limoka.", + "module_update_available": "Уведомление об обновлении модуля было отправлено, проверьте {bot}.", "_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!", } @@ -590,11 +636,18 @@ class Limoka(loader.Module): lambda: "If enabled, modules from developers with newbies tag will be not shown.", validator=loader.validators.Boolean(), ), + loader.ConfigValue( + "auto_update_check", + True, + lambda: "If enabled, Limoka will periodically check for updates and notify you when a new version is available.", + validator=loader.validators.Boolean(), + ) ) self.name = self.strings["name"] self._invalid_banners = set() self._bot_username = "limoka_bbot" self._base_url = self.config["limokaurl"] + self._self_bot_username = None self.SEARCH_STATES = { "no_banner": "no_banner", @@ -613,6 +666,35 @@ class Limoka(loader.Module): def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]: """[DEPRECATED] Use ModuleRepository.apply_newbie_filter instead.""" return self.repository.apply_newbie_filter(self.config.get("filter_newbies_modules", False)) + + @loader.loop(interval=3600*24) + async def periodic_update_check(self): + """Periodically check for module updates if auto_update_check is enabled.""" + if self.config["auto_update_check"]: + await self.check_for_module_update() + + async def check_for_module_update(self): + async with aiohttp.ClientSession() as session: + try: + async with session.get(self._base_url + "Limoka.py", timeout=10) as response: + if response.status == 200: + version = _parse_version_from_source(await response.text()) + if version is not None and version > __version__: + # TODO: TODO + await self.inline.bot.send_message( + self._tg_id, + self.strings["update_available"].format(version=version), + reply_markup=InlineKeyboardMarkup().add(InlineKeyboardButton("Update Now", callback_data="limoka:update_module")) + ) + return True + return False + except Exception as e: + logger.error(f"Error checking for module update: {e}") + + @loader.callback_handler() + async def callback_handler(self, call: BotInlineCall): + if call.data == "limoka:update_module": + await self._install_module_limoka(call) def _create_search_session( self, @@ -668,8 +750,8 @@ class Limoka(loader.Module): self.repository = ModuleRepository(raw_modules, repositories) self.modules = self.repository.apply_newbie_filter(self.config["filter_newbies_modules"]) - self._userbot_bot_username = (await self.inline.bot.get_me()).username - self.formatter = CommandFormatter(self.strings, self._userbot_bot_username, self.get_prefix()) + self._self_bot_username = (await self.inline.bot.get_me()).username + self.formatter = CommandFormatter(self.strings, self._self_bot_username, self.get_prefix()) self.content_builder = ModuleContentBuilder(self.strings, self.formatter, self.repository) self._service_bot_id = (await self.client.get_entity(self._bot_username)).id @@ -805,13 +887,15 @@ class Limoka(loader.Module): module_info: Dict[str, Any], query: str, filters: Dict[str, List[str]], + url: str, include_categories: bool = True, module_path: Optional[str] = None, - lang: str = "en", + lang: str = "en" + ) -> tuple: """[DEPRECATED] Use ModuleContentBuilder.build_content instead.""" return self.content_builder.build_content( - module_info, query, filters, include_categories, module_path, lang + module_info, query, filters, url, include_categories, module_path, lang ) def _build_navigation_markup(self, session: Dict[str, Any]) -> list: @@ -951,6 +1035,15 @@ class Limoka(loader.Module): }, ] ) + markup.append( + [ + { + "text": self.strings["install_button"], + "callback": self._install_module, + "args": (session,), + }, + ] + ) markup.append( [{"text": self.strings.get("close", "❌ Close"), "action": "close", "style": "danger"}] ) @@ -1024,6 +1117,7 @@ class Limoka(loader.Module): include_categories=True, module_path=module_path, lang=lang, + url=self._base_url ) current_body = body_pages[min(page_body, len(body_pages) - 1)] full_message = header + current_body + footer + categories_text @@ -1070,12 +1164,38 @@ class Limoka(loader.Module): include_categories=True, module_path=module_path, lang=self.user_lang, + url=self.config["limokaurl"] ) new_page_body = min(page_body + 1, len(body_pages) - 1) await self._display_module( call, module_info, module_path, session, page_body=new_page_body ) + async def _install_module(self, call: InlineCall, session: Dict[str, Any]): + try: + loader = self.lookup("Loader") + await loader.download_and_install(f"{self.config['limokaurl']}{session['results'][session['current_index']]}") + if getattr(loader, "fully_loaded", False): + loader.update_modules_in_db() + + except Exception: + await call.answer(f"❌ {self.strings['install_failed']}", alert=True) + else: + await call.answer(f"✅ {self.strings['install_succeeded']}", alert=True) + + async def _install_module_limoka(self, call: InlineCall): + try: + loader = self.lookup("Loader") + await loader.download_and_install(f"{self.config['limokaurl']}Limoka.py") + logger.info(f"Downloading and installing: {self.config['limokaurl']}Limoka.py") + if getattr(loader, "fully_loaded", False): + loader.update_modules_in_db() + + except Exception: + await call.answer(f"❌ {self.strings['install_failed']}", alert=True) + else: + await call.answer(f"✅ {self.strings['install_succeeded']}", alert=True) + async def _display_filter_menu(self, call: InlineCall, session: Dict[str, Any]): query = session["query"] current_filters = session["filters"] @@ -1589,6 +1709,15 @@ class Limoka(loader.Module): message, module_info, module_path, display_session, 0 ) + @loader.command(ru_doc="Проверить наличие обновлений модуля") + async def limokaupdatecmd(self, message: Message): + """Check for module updates""" + is_update_available = await self.check_for_module_update() + if is_update_available: + await utils.answer(message, self.strings["module_update_available"].format(bot=self._self_bot_username)) + else: + await utils.answer(message, self.strings["no_updates_available"]) + async def _show_global_form(self, call: InlineCall, message: Message): markup = [ [ @@ -1742,135 +1871,135 @@ class Limoka(loader.Module): self.strings["history"].format(history="\n".join(formatted_history)), ) - @loader.watcher(from_dl=False) - async def secure_install_watcher(self, message: Message): - if not message.text: - return - if not hasattr(message, "from_id") or not message.from_id: - return - sender_id = None - if hasattr(message.from_id, "user_id"): - sender_id = message.from_id.user_id - elif hasattr(message.from_id, "channel_id"): - sender_id = message.from_id.channel_id - if sender_id != self._service_bot_id: - # logger.debug("Message not from official bot, ignoring") - return - if not self.config["external_install_allowed"]: - return - try: - clean_text = ( - getattr(message, "raw_text", None) - or getattr(message, "message", None) - or message.text - or "" - ) - if message.entities: - from html import unescape + # @loader.watcher(from_dl=False) + # async def secure_install_watcher(self, message: Message): + # if not message.text: + # return + # if not hasattr(message, "from_id") or not message.from_id: + # return + # sender_id = None + # if hasattr(message.from_id, "user_id"): + # sender_id = message.from_id.user_id + # elif hasattr(message.from_id, "channel_id"): + # sender_id = message.from_id.channel_id + # if sender_id != self._service_bot_id: + # # logger.debug("Message not from official bot, ignoring") + # return + # if not self.config["external_install_allowed"]: + # return + # try: + # clean_text = ( + # getattr(message, "raw_text", None) + # or getattr(message, "message", None) + # or message.text + # or "" + # ) + # if message.entities: + # from html import unescape - clean_text = unescape(clean_text) - clean_text = re.sub(r"<[^>]+>", "", clean_text) - match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text) - if not match: - logger.debug( - "No #limoka tag found in cleaned text; leaving original message intact" - ) - return - tag_content = match.group(1) - parts = tag_content.split(":", 1) - if len(parts) != 2: - logger.error("Invalid tag format after cleaning") - await utils.answer(message, self.strings["watcher_invalid_format"]) - return - module_path, signature_hex = parts - module_path = re.sub(r"[<>\"']", "", module_path).strip() - if module_path.startswith("href="): - module_path = module_path[5:].strip('"').strip("'") - if module_path not in self.modules: - found = False - for db_path in self.modules.keys(): - if module_path in db_path or db_path in module_path: - module_path = db_path - found = True - break - if not found: - logger.warning(f"Module not found after cleanup: {module_path}") - await utils.answer( - message, - self.strings["watcher_module_not_found"].format( - path=html.escape(module_path) - ), - ) - return - try: - import base64 - from cryptography.hazmat.primitives.asymmetric import ed25519 + # clean_text = unescape(clean_text) + # clean_text = re.sub(r"<[^>]+>", "", clean_text) + # match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text) + # if not match: + # logger.debug( + # "No #limoka tag found in cleaned text; leaving original message intact" + # ) + # return + # tag_content = match.group(1) + # parts = tag_content.split(":", 1) + # if len(parts) != 2: + # logger.error("Invalid tag format after cleaning") + # await utils.answer(message, self.strings["watcher_invalid_format"]) + # return + # module_path, signature_hex = parts + # module_path = re.sub(r"[<>\"']", "", module_path).strip() + # if module_path.startswith("href="): + # module_path = module_path[5:].strip('"').strip("'") + # if module_path not in self.modules: + # found = False + # for db_path in self.modules.keys(): + # if module_path in db_path or db_path in module_path: + # module_path = db_path + # found = True + # break + # if not found: + # logger.warning(f"Module not found after cleanup: {module_path}") + # await utils.answer( + # message, + # self.strings["watcher_module_not_found"].format( + # path=html.escape(module_path) + # ), + # ) + # return + # try: + # import base64 + # from cryptography.hazmat.primitives.asymmetric import ed25519 - PUB_KEY_B64 = ( - "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE=" - ) - der_bytes = base64.b64decode(PUB_KEY_B64) - raw_pubkey = der_bytes[-32:] - module_url = self.config["limokaurl"] + module_path - async with aiohttp.ClientSession() as session: - async with session.get(module_url, timeout=10) as resp: - if resp.status != 200: - logger.error( - f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})" - ) - await utils.answer( - message, self.strings["watcher_loader_missing"] - ) - return - module_bytes = await resp.read() - sha256 = hashlib.sha256(module_bytes).hexdigest() - public_key = ed25519.Ed25519PublicKey.from_public_bytes( - raw_pubkey - ) - signature = bytes.fromhex(signature_hex) - signed_payload = f"{module_path}|{sha256}".encode() - public_key.verify(signature, signed_payload) - except Exception as e: - logger.error(f"Signature verification failed for {module_path}: {e}") - await utils.answer(message, self.strings["watcher_signature_invalid"]) - return - loader_mod = self.lookup("loader") - if not loader_mod: - logger.error("Loader module not found") - await utils.answer(message, self.strings["watcher_loader_missing"]) - return - module_url = self.config["limokaurl"] + module_path - status = await loader_mod.download_and_install(module_url, None) - if getattr(loader_mod, "fully_loaded", False): - loader_mod.update_modules_in_db() - try: - await message.delete() - except Exception as e: - logger.error(f"Failed to delete message: {e}") - if status: - try: - bot_peer = await self.client.get_entity(self._service_bot_id) - await self.client.send_message( - bot_peer, f"#limoka:sucsess:{message.id}" - ) - except Exception as e: - logger.error(f"Failed to send success confirmation: {e}") - else: - logger.error(f"Installation failed with status: {status}") - try: - bot_peer = await self.client.get_entity(self._service_bot_id) - await self.client.send_message( - bot_peer, f"#limoka:failed:{message.id}" - ) - except Exception as e: - logger.error(f"Failed to send failure notification: {e}") - except Exception as e: - logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}") - try: - await utils.answer( - message, self.strings["watcher_critical"].format(error=str(e)[:100]) - ) - await asyncio.sleep(5) - await message.delete() - except Exception: - pass \ No newline at end of file + # PUB_KEY_B64 = ( + # "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE=" + # ) + # der_bytes = base64.b64decode(PUB_KEY_B64) + # raw_pubkey = der_bytes[-32:] + # module_url = self.config["limokaurl"] + module_path + # async with aiohttp.ClientSession() as session: + # async with session.get(module_url, timeout=10) as resp: + # if resp.status != 200: + # logger.error( + # f"Failed to fetch module for verification: {module_url} (HTTP {resp.status})" + # ) + # await utils.answer( + # message, self.strings["watcher_loader_missing"] + # ) + # return + # module_bytes = await resp.read() + # sha256 = hashlib.sha256(module_bytes).hexdigest() + # public_key = ed25519.Ed25519PublicKey.from_public_bytes( + # raw_pubkey + # ) + # signature = bytes.fromhex(signature_hex) + # signed_payload = f"{module_path}|{sha256}".encode() + # public_key.verify(signature, signed_payload) + # except Exception as e: + # logger.error(f"Signature verification failed for {module_path}: {e}") + # await utils.answer(message, self.strings["watcher_signature_invalid"]) + # return + # loader_mod = self.lookup("loader") + # if not loader_mod: + # logger.error("Loader module not found") + # await utils.answer(message, self.strings["watcher_loader_missing"]) + # return + # module_url = self.config["limokaurl"] + module_path + # status = await loader_mod.download_and_install(module_url, None) + # if getattr(loader_mod, "fully_loaded", False): + # loader_mod.update_modules_in_db() + # try: + # await message.delete() + # except Exception as e: + # logger.error(f"Failed to delete message: {e}") + # if status: + # try: + # bot_peer = await self.client.get_entity(self._service_bot_id) + # await self.client.send_message( + # bot_peer, f"#limoka:sucsess:{message.id}" + # ) + # except Exception as e: + # logger.error(f"Failed to send success confirmation: {e}") + # else: + # logger.error(f"Installation failed with status: {status}") + # try: + # bot_peer = await self.client.get_entity(self._service_bot_id) + # await self.client.send_message( + # bot_peer, f"#limoka:failed:{message.id}" + # ) + # except Exception as e: + # logger.error(f"Failed to send failure notification: {e}") + # except Exception as e: + # logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}") + # try: + # await utils.answer( + # message, self.strings["watcher_critical"].format(error=str(e)[:100]) + # ) + # await asyncio.sleep(5) + # await message.delete() + # except Exception: + # pass \ No newline at end of file