diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index b8f6d93..0405e92 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -206,61 +206,53 @@ jobs:
echo "Branch ${{ env.BRANCH_NAME }} does not exist in remote repository, skipping PR creation."
fi
+ notify_diffs:
+ runs-on: ubuntu-latest
+ if: |
+ (github.event_name == 'push' && github.ref == 'refs/heads/main') ||
+ (github.event_name == 'pull_request' && github.event.action == 'closed' && github.event.pull_request.merged == true)
+ needs: parse
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: ${{ env.GIT_DEPTH }}
+ - name: Configure Git for github-actions[bot]
+ run: |
+ git config --global user.email "github-actions[bot]@users.noreply.github.com"
+ git config --global user.name "github-actions[bot]"
+ - name: Set up Python 3.9
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.9'
+ - name: Install Python dependencies
+ run: pip install aiohttp
+ - name: Send module diffs to channel
+ env:
+ TELEGRAM_BOT_TOKEN: ${{ secrets.TELEGRAM_BOT_TOKEN }}
+ TELEGRAM_CHAT_ID: ${{ secrets.TELEGRAM_CHAT_ID_UPDATE }}
+ run: |
+ git fetch origin main
+ python3 update_diffs.py --token ${{ secrets.TELEGRAM_BOT_TOKEN }} --chat_id ${{ secrets.TELEGRAM_CHAT_ID_UPDATE }} --base_commit HEAD~1
+
backup:
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch'
needs: parse
steps:
- - name: Configure Git for github-actions[bot]
- run: |
- git config --global user.email "github-actions[bot]@users.noreply.github.com"
- git config --global user.name "github-actions[bot]"
- git config --global --list
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: ${{ env.GIT_DEPTH }}
- - name: Create and send backup to Telegram
+ - name: Set up Python 3.9
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.9'
+ - name: Install Python dependencies
+ run: pip install aiohttp
+ - name: Run backup script
env:
TELEGRAM_BOT_TOKEN: ${{ secrets.TELEGRAM_BOT_TOKEN }}
TELEGRAM_CHAT_ID: ${{ secrets.TELEGRAM_CHAT_ID }}
run: |
- echo "Creating .zip file of the repository with maximum compression..."
- git archive --format=zip --output=repository-original.zip HEAD
- zip -9 repository.zip repository-original.zip
- rm repository-original.zip
- echo "File size of the created .zip file:"
- du -sh repository.zip
- echo "Splitting the .zip file into 8 parts..."
- split -b 49M repository.zip repository-part-
- echo "Files created after split:"
- ls repository-part-*
- COMMIT_MESSAGE="$(git log -1 --pretty=%B)"
- COMMIT_DATE="$(date --date="$(git log -1 --pretty=%ci)" +'%Y-%m-%d %H:%M:%S')"
- COMMIT_HASH="$(git rev-parse --short=6 HEAD)"
- COMMIT_URL="https://${REPO_URL}/commit/$(git rev-parse HEAD)"
- MESSAGE="Commit Date: $COMMIT_DATE, Commit Message: $COMMIT_MESSAGE, Commit Hash: [\`$COMMIT_HASH\`]($COMMIT_URL)"
-
- echo "Sending .zip file parts to Telegram..."
-
- FIRST_PART=true
- for part in $(ls repository-part-* | sort); do
- if $FIRST_PART; then
- TELEGRAM_API_URL="https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendDocument"
- echo "Sending first file to Telegram: $TELEGRAM_API_URL"
- curl -X POST "$TELEGRAM_API_URL" \
- -F chat_id=$TELEGRAM_CHAT_ID \
- -F document=@$part \
- -F caption="$MESSAGE" \
- -F parse_mode="Markdown"
- FIRST_PART=false
- else
- TELEGRAM_API_URL="https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendDocument"
- echo "Sending file to Telegram: $TELEGRAM_API_URL"
- curl -X POST "$TELEGRAM_API_URL" \
- -F chat_id=$TELEGRAM_CHAT_ID \
- -F document=@$part
- fi
- done
-
- echo "Files sent to Telegram successfully!"
+ python backup.py --token ${{ secrets.TELEGRAM_BOT_TOKEN }} --chat_id ${{ secrets.TELEGRAM_CHAT_ID }}
diff --git a/Limoka.py b/Limoka.py
index abd819f..586f4cb 100644
--- a/Limoka.py
+++ b/Limoka.py
@@ -1,65 +1,38 @@
# meta developer: @limokanews
# requires: whoosh cryptography
-# Limoka search module.
-
-# This module loads a remote `modules.json`, builds a Whoosh index and
-# exposes inline and chat commands to search and display module
-# information. It handles remote banner validation and falls back to an
-# external PNG hosted in the repository when a module banner is missing.
-# The fallback is provided as a URL (`self._fallback_banner_url`). Depending
-# on the client library the `photo` parameter may accept a URL, a file
-# path or a file-like object; this implementation prefers using the
-# external URL for the fallback.
-
-# Note: Expected `modules.json` record format:
-
-# {
-# "path/to/module.py": {
-# "name": "ModuleName",
-# "description": "Short description",
-# "meta": {"banner": "https://.../image.png", "developer": "@dev"},
-# "commands": [{"cmd1": "desc1"}, {"cmd2": "desc2"}],
-# "category": ["fun", "tools"]
-# }
-# }
-# Whoosh index in `userbotFolder/limoka_search/index`.
-
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser, OrGroup
from whoosh.query import FuzzyTerm, Wildcard
-
import aiohttp
import random
import logging
import os
import html
import json
-
import re
-
import asyncio
-
from typing import Union, List, Dict, Any, Optional
import hashlib
-
from telethon.types import Message
from telethon.errors.rpcerrorlist import WebpageMediaEmptyError
-
+from telethon import TelegramClient
+from telethon.errors.rpcerrorlist import YouBlockedUserError
try:
from aiogram.utils.exceptions import BadRequest
except ImportError:
from aiogram.exceptions import TelegramBadRequest as BadRequest
-
from .. import utils, loader
from ..types import InlineCall
-
logger = logging.getLogger("Limoka")
+__version__ = (1, 3, 1)
-__version__ = (1, 3, 0)
-
+def _get_lang_value(data: Dict[str, Any], lang: str) -> str:
+ if not isinstance(data, dict):
+ return str(data) if data else ""
+ return data.get(lang, data.get("default", data.get("en", "")))
class Search:
def __init__(self, query, ix):
@@ -75,47 +48,51 @@ class Search:
query = parser.parse(self.query)
wildcard_query = Wildcard("content", f"*{self.query}*")
fuzzy_query = FuzzyTerm("content", self.query, maxdist=2, prefixlength=1)
-
for search_query in [query, wildcard_query, fuzzy_query]:
results = searcher.search(search_query)
if results:
return list(set(result["path"] for result in results))
- return []
-
+ return []
class LimokaAPI:
- async def get_all_modules(self, url):
+ async def fetch_json(self, base_url, path):
+ url = f"{base_url}{path}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return json.loads(await response.text())
-
@loader.tds
class Limoka(loader.Module):
"""Modules are now in one place with easy searching!"""
-
strings = {
"name": "Limoka",
"wait": (
"Just wait\n"
"🔍 A search is underway among {count} modules "
- "for the query: {query}\n\n{fact}"
+ "for the query: {query}\n"
+ "{fact}"
),
- "found": (
+ "found_header": (
"🔍 Found module {name} "
"by query: {query}\n\n"
"ℹ️ Description: {description}\n"
"🧑💻 Developer: {username}\n\n"
- "{commands}\n"
- "🪄 {prefix}dlm {url}{module_path}"
+ "🏷 Tags: {tags}\n\n"
+ ),
+ "found_body": (
+ "{commands}"
+ ),
+ "found_footer": (
+ "\n🪄 {prefix}dlm {url}{module_path}"
),
"caption_short": (
"🔍 {safe_name}\n"
"ℹ️ Description: {safe_desc}\n"
- "🧑💻 Dev: {dev_username}\n\n"
+ "🧑💻 Dev: {dev_username}\n"
"🪄 {prefix}dlm {module_path}"
),
"command_template": "{emoji} {prefix}{command} — {description}\n",
+ "inline_handler_template": "{inline_bot} {command} — {description}\n",
"emojis": {
1: "1️⃣",
2: "2️⃣",
@@ -168,13 +145,13 @@ class Limoka(loader.Module):
"inline_short_query": "❌ Query too short (min 2 chars)",
"inline_switch_pm": "💬 Open in chat",
"inline_switch_pm_text": "🔍 Results for: {query}",
- "inline_start_message": "🔍 Limoka Search\n\nType module name or keyword",
+ "inline_start_message": "🔍 Limoka Search\nType module name or keyword",
"first_page": "This is the first page!",
"last_page": "This is the last page!",
"display_error": "Error displaying module. Please try again.",
"error_occurred": "An error occurred. Please try again.",
- "start_search_form": "🔍 Limoka Search\n\nEnter your query to search for modules:",
- "global_search_form": "🔍 Global Search\n\nEnter your query to search ALL modules without filters:",
+ "start_search_form": "🔍 Limoka Search\nEnter your query to search for modules:",
+ "global_search_form": "🔍 Global Search\nEnter your query to search ALL modules without filters:",
"history_cleared": "🧹 Search history cleared!",
"invalid_history_arg": "❌ Invalid argument for history command. Use:\n.lshistory - show history\n.lshistory clear - clear history",
"close": "❌ Close",
@@ -184,30 +161,42 @@ class Limoka(loader.Module):
"watcher_loader_missing": "❌ Loader module not found.",
"watcher_module_not_found": "❌ Module not found in Limoka database: {path}",
"watcher_critical": "❌ Critical error: {error}",
+ "tags": {
+ "herokutrusted": "Heroku Trusted",
+ "hikkatrusted": "Hikka Trusted",
+ "nonactive": "Non-Active Repository",
+ "nonlongermaintained": "No Longer Maintained Repository",
+ "newbie": "Newbie"
+ }
}
-
strings_ru = {
"name": "Limoka",
"wait": (
"Подождите\n"
- "🔍 Идёт поиск среди {count} модулей по запросу: {query}\n\n"
+ "🔍 Идёт поиск среди {count} модулей по запросу: {query}\n"
"{fact}"
),
- "found": (
+ "found_header": (
"🔍 Найден модуль {name} "
"по запросу: {query}\n\n"
"ℹ️ Описание: {description}\n"
"🧑💻 Разработчик: {username}\n\n"
- "{commands}\n"
- "🪄 {prefix}dlm {url}{module_path}"
+ "🏷 Теги: {tags}\n\n"
+ ),
+ "found_body": (
+ "{commands}"
+ ),
+ "found_footer": (
+ "\n🪄 {prefix}dlm {url}{module_path}"
),
"caption_short": (
"🔍 {safe_name}\n"
"ℹ️ Описание: {safe_desc}\n"
- "🧑💻 Разработчик: {dev_username}\n\n"
+ "🧑💻 Разработчик: {dev_username}\n"
"🪄 {prefix}dlm {module_path}"
),
"command_template": "{emoji} {prefix}{command} — {description}\n",
+ "inline_handler_template": "{inline_bot} {command} — {description}\n",
"emojis": {
1: "1️⃣",
2: "2️⃣",
@@ -227,8 +216,8 @@ class Limoka(loader.Module):
"🛡 Каталог Limoka тщательно модерируется!",
"🚀 Limoka позволяет искать модули с невероятной скоростью!",
(
- "🔎 Limoka имеет лучший поиск*!"
- "\n * В сравнении с предыдущей версией Limoka"
+ "🔎 Limoka имеет лучший поиск*!\n"
+ "* В сравнении с предыдущей версией Limoka"
)
],
"inline404": "Не найдено",
@@ -264,13 +253,13 @@ class Limoka(loader.Module):
"inline_short_query": "❌ Запрос слишком короткий (мин. 2 символа)",
"inline_switch_pm": "💬 Открыть в чате",
"inline_switch_pm_text": "🔍 Результаты для: {query}",
- "inline_start_message": "🔍 Limoka Поиск\n\nВведите название модуля или ключевое слово",
+ "inline_start_message": "🔍 Limoka Поиск\nВведите название модуля или ключевое слово",
"first_page": "Это первая страница!",
"last_page": "Это последняя страница!",
"display_error": "Ошибка отображения модуля. Пожалуйста, попробуйте еще раз.",
"error_occurred": "Произошла ошибка. Пожалуйста, попробуйте еще раз.",
- "start_search_form": "🔍 Limoka Поиск\n\nВведите ваш запрос для поиска модулей:",
- "global_search_form": "🔍 Глобальный Поиск\n\nВведите запрос для поиска ВСЕХ модулей без фильтров:",
+ "start_search_form": "🔍 Limoka Поиск\nВведите ваш запрос для поиска модулей:",
+ "global_search_form": "🔍 Глобальный Поиск\nВведите запрос для поиска ВСЕХ модулей без фильтров:",
"history_cleared": "🧹 История поиска очищена!",
"invalid_history_arg": "❌ Неверный аргумент для команды истории. Используйте:\n.lshistory - показать историю\n.lshistory clear - очистить историю",
"close": "❌ Закрыть",
@@ -280,6 +269,13 @@ class Limoka(loader.Module):
"watcher_loader_missing": "❌ Модуль загрузчика не найден.",
"watcher_module_not_found": "❌ Модуль не найден в базе Limoka: {path}",
"watcher_critical": "❌ Критическая ошибка: {error}",
+ "tags": {
+ "herokutrusted": "Heroku Trusted",
+ "hikkatrusted": "Hikka Trusted",
+ "nonactive": "Неактивный репозиторий",
+ "nonlongermaintained": "Неподдерживаемый репозиторий",
+ "newbie": "Новичок"
+ },
"_cls_doc": "Модули теперь в одном месте с простым и удобным поиском!",
}
@@ -297,44 +293,153 @@ class Limoka(loader.Module):
True,
lambda: "If enabled, module installation can be handled via external Limoka bot (@limoka_bbot) for better reliability.",
validator=loader.validators.Boolean(),
- )
+ ),
+ loader.ConfigValue(
+ "filter_newbies_modules",
+ False,
+ lambda: "If enabled, modules from developers with newbies tag will be not shown.",
+ validator=loader.validators.Boolean(),
+ ),
)
self.name = self.strings["name"]
self._invalid_banners = set()
- # Also keep a convenient external fallback URL for plain search display
- # (used when no valid banner is available and no filters are applied).
- self._fallback_banner_url = "https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/assets/limoka404.png"
+ self._service_bot_id = 8581621390
+ self._base_url = self.config["limokaurl"]
+
+ # Search session states
+ self.SEARCH_STATES = {
+ "no_banner": "no_banner", # 404 - Нет баннера
+ "global_search": "global_search", # Глобальный поиск
+ "not_found": "not_found", # Не найдено (модуль)
+ "filter_select": "filter_select", # Выбор категорий (фильтров)
+ }
+
+ # State banners - placeholders for now
+ self.state_banners = {
+ "no_banner": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/refs/heads/main/Limoka%20-%20No%20banner.png",
+ "global_search": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Global%20Search.png",
+ "not_found": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Not%20Found.png",
+ "filter_select": "https://raw.githubusercontent.com/MuRuLOSE/hikka-assets/main/Limoka%20-%20Categories.png",
+ }
+
+ def _filter_newbies(self, modules: Dict[str, Any]) -> Dict[str, Any]:
+ """Filter out modules which belong to repositories tagged as 'newbie'.
+ Returns the original dict when the feature is disabled or repositories
+ metadata is not available.
+ """
+ try:
+ if not self.config.get("filter_newbies_modules"):
+ return modules
+ except Exception:
+ return modules
+
+ if not getattr(self, "repositories", None):
+ return modules
+
+ filtered: Dict[str, Any] = {}
+ for path, info in modules.items():
+ repo_key = "/".join(path.split("/")[:2]) if "/" in path else path
+ repo = self.repositories.get(repo_key)
+ tags = repo.get("tags", []) if repo else []
+ if "newbie" in tags:
+ continue
+ filtered[path] = info
+ return filtered
+
+ def _create_search_session(
+ self,
+ state: str,
+ query: str = "",
+ filters: Optional[Dict[str, List[str]]] = None,
+ results: Optional[List[str]] = None,
+ current_index: int = 0,
+ ) -> Dict[str, Any]:
+ """Create a search session dictionary to track state across callbacks.
+
+ Args:
+ state: Current search state (one of SEARCH_STATES values)
+ query: Current search query
+ filters: Active category filters
+ results: Search results list
+ current_index: Index of current result being displayed
+ banner_url: Banner image URL for current state
+
+ Returns:
+ Dictionary containing the complete session state
+ """
+ return {
+ "state": state,
+ "query": query,
+ "filters": filters or {},
+ "results": results or [],
+ "current_index": current_index,
+ }
+
+ def _get_banner_for_state(self, state: str) -> str:
+ return self.state_banners.get(state)
async def client_ready(self, client, db):
- self.client = client
+ self.client: TelegramClient = client
self.db = db
self.api = LimokaAPI()
self.schema = Schema(
title=TEXT(stored=True), path=ID(stored=True), content=TEXT(stored=True)
)
-
os.makedirs("limoka_search", exist_ok=True)
if not os.path.exists("limoka_search/index"):
self.ix = create_in("limoka_search", self.schema)
else:
self.ix = open_dir("limoka_search")
-
self._history = self.pointer("history", [])
-
- self.modules = await self.api.get_all_modules(
- f"{self.config['limokaurl']}modules.json"
+ self.modules = await self.api.fetch_json(
+ self._base_url, "modules.json"
)
+ raw = (await self.api.fetch_json(
+ self._base_url, "repositories.json"
+ )).get("repositories", [])
+ self.repositories = {
+ repo["path"]: repo
+ for repo in raw
+ }
+ # Apply newbie filter if enabled
+ try:
+ self.modules = self._filter_newbies(self.modules)
+ except Exception:
+ pass
+ self._bot_username = (await self.client.get_entity(self._service_bot_id)).username
+ await self._update_index()
+ if self.config["external_install_allowed"]:
+ try:
+ message = await self.client.get_messages(self._bot_username, limit=1)
+ if not message:
+ message = await self.client.send_message(self._service_bot_id, "/start")
+ await message.delete()
+ except YouBlockedUserError:
+ logger.warning(f"Please unblock {self._bot_username} to enable external installation feature. Or disable external_install_allowed in Limoka settings to get rid of this message.")
+ self._userbot_bot_username = (await self.inline.bot.get_me()).username
+
+ @loader.loop(interval=3600)
+ async def _update_modules_loop(self):
+ self.modules = await self.api.fetch_json(
+ self._base_url, "modules.json"
+ )
+ # Re-apply newbie filter after modules refresh
+ try:
+ self.modules = self._filter_newbies(self.modules)
+ except Exception:
+ pass
await self._update_index()
async def _update_index(self):
writer = self.ix.writer()
- for module_path, module_data in self.modules.items():
+ modules_to_index = self._filter_newbies(self.modules)
+ for module_path, module_data in modules_to_index.items():
writer.add_document(
title=module_data["name"],
path=module_path,
- content=module_data["name"] + " " + (module_data["description"] or ""),
+ content=module_data["name"] + " " + (module_data.get("description") or "" + " " + ((module_data.get("meta").get("developer") or "") if module_data.get("meta") else "")),
)
- for func in module_data["commands"]:
+ for func in module_data.get("commands", []):
for command, description in func.items():
writer.add_document(
title=module_data["name"],
@@ -344,51 +449,33 @@ class Limoka(loader.Module):
writer.commit()
async def _validate_url(self, url: str) -> Optional[str]:
- """Validate a remote URL points to an image.
-
- Args:
- url: Remote URL to validate.
-
- Returns:
- The same URL if it points to an image and is reachable, otherwise
- ``None``.
-
- Side effects:
- Adds invalid URLs to ``self._invalid_banners`` to avoid repeated
- checks.
- """
- # Return the url if valid, otherwise None. Do not return or use
- # a global fallback here; fallback handling is done by the caller
- # based on display context.
if not url or url in self._invalid_banners:
return None
try:
async with aiohttp.ClientSession() as session:
- async with session.head(url, timeout=5) as response:
+ async with session.head(url, timeout=5, allow_redirects=True) as response:
if response.status != 200:
self._invalid_banners.add(url)
return None
- content_type = response.headers.get("Content-Type", "").lower()
- if not content_type.startswith("image/"):
+ ct = response.headers.get("Content-Type", "").lower()
+ if not ct.startswith("image/"):
self._invalid_banners.add(url)
return None
return url
- except (aiohttp.ClientError, asyncio.TimeoutError):
+ except Exception as e:
if url:
self._invalid_banners.add(url)
return None
+
+ def user_lang(self) -> str:
+ self.db.get("heroku.translations", "lang")
- def generate_commands(self, module_info):
+ def generate_commands(self, module_info, lang: str = "en"):
commands = []
- for i, func in enumerate(module_info["commands"], 1):
- if i > 9:
- commands.append("…\n")
- break
+ for i, func in enumerate(module_info.get("commands", []), 1):
for command, description in func.items():
emoji = self.strings["emojis"].get(i, "")
desc = description or self.strings["no_info"]
- if len(desc) > 150:
- desc = desc[:147] + "…"
commands.append(
self.strings["command_template"].format(
prefix=self.get_prefix(),
@@ -397,7 +484,18 @@ class Limoka(loader.Module):
description=html.escape(desc),
)
)
- return commands[:5]
+ for i, handler in enumerate(module_info.get("inline_handlers", []), 1):
+ name = handler.get("name", "")
+ desc_map = handler.get("description", {})
+ desc = _get_lang_value(desc_map, lang) or self.strings["no_info"]
+ commands.append(
+ self.strings["inline_handler_template"].format(
+ inline_bot=self._userbot_bot_username,
+ command=html.escape(name),
+ description=html.escape(desc),
+ )
+ )
+ return commands
def _format_module_content(
self,
@@ -406,98 +504,96 @@ class Limoka(loader.Module):
filters: Dict[str, List[str]],
include_categories: bool = True,
module_path: Optional[str] = None,
+ lang: str = "en",
) -> tuple:
- """Formats the module content for display."""
name = html.escape(module_info.get("name") or self.strings["no_info"])
+ cls_doc = module_info.get("cls_doc", {})
description = html.escape(
- module_info.get("description") or self.strings["no_info"]
+ _get_lang_value(cls_doc, lang) or
+ _get_lang_value(module_info.get("description", ""), lang) or
+ self.strings["no_info"]
)
dev_username = html.escape(module_info["meta"].get("developer", "Unknown"))
-
- # Prefer explicit module_path argument (caller provides the key),
- # otherwise fall back to module_info['path'] if present.
- raw_path = (
- module_path if module_path is not None else module_info.get("path", "")
- )
+ raw_path = module_path if module_path is not None else module_info.get("path", "")
clean_module_path = (raw_path or "").replace("\\", "/")
- commands = self.generate_commands(module_info)
-
+ commands = self.generate_commands(module_info, lang)
categories_text = ""
if include_categories:
categories = filters.get("category", [])
if categories:
- categories_text = "\n\n" + self.strings["selected_categories"].format(
+ categories_text = "\n" + self.strings["selected_categories"].format(
categories=", ".join(html.escape(c) for c in categories)
)
-
if len(description) > 300:
description = description[:297] + "…"
-
- core_message = self.strings["found"].format(
+ repo_key = "/".join(module_path.split("/")[:2]) if "/" in module_path else module_path
+ tags_list = []
+ for x in self.repositories:
+ if x == repo_key:
+ tags_list = self.repositories.get(x, {}).get("tags", [])
+ break
+ tags_text = ", ".join(self.strings["tags"].get(tag, tag) for tag in tags_list)
+ header = self.strings["found_header"].format(
query=html.escape(query),
name=name,
description=description,
- url=html.escape(self.config["limokaurl"]),
username=dev_username,
- commands="".join(commands),
- prefix=html.escape(self.get_prefix()),
- module_path=html.escape(clean_module_path),
+ tags=tags_text,
)
-
- full_message = core_message[:4096] + categories_text[:100]
-
- caption_message = full_message
- if len(caption_message) > 1024:
- safe_name = name[:40] + ("..." if len(name) > 40 else "")
- safe_desc = description[:100] + ("…" if len(description) > 100 else "")
-
- caption_message = self.strings["caption_short"].format(
- safe_name=safe_name,
- safe_desc=safe_desc,
- dev_username=dev_username,
- prefix=self.get_prefix(),
- module_path=html.escape(self.config["limokaurl"] + clean_module_path),
- )[:1024]
-
- if categories_text:
- remaining_space = 1024 - len(caption_message)
- if remaining_space > 0:
- caption_message += categories_text[:remaining_space]
-
- return caption_message, full_message
+ commands_text = "".join(commands)
+ if len(commands_text) <= 500:
+ body_pages = [commands_text] if commands_text else [""]
+ else:
+ body_pages = []
+ current_page = []
+ current_length = 0
+ for cmd in commands:
+ if current_length + len(cmd) > 500:
+ if current_page:
+ body_pages.append("".join(current_page))
+ current_page = []
+ current_length = 0
+ current_page.append(cmd)
+ current_length += len(cmd)
+ if current_page:
+ body_pages.append("".join(current_page))
+ if not body_pages:
+ body_pages = [""]
+ footer = self.strings["found_footer"].format(
+ url=html.escape(self.config["limokaurl"]),
+ module_path=html.escape(clean_module_path),
+ prefix=html.escape(self.get_prefix()),
+ )
+ return header, body_pages, footer, categories_text
def _build_navigation_markup(
- self, result: List[str], index: int, query: str, filters: Dict[str, List[str]]
+ self, session: Dict[str, Any]
) -> list:
- """Create navigation markup for inline results."""
+ result = session["results"]
+ index = session["current_index"]
+ query = session["query"]
+ filters = session["filters"]
+
page = index + 1
markup = [
[
{
"text": "⏪" if index > 0 else "🚫",
"callback": self._previous_page if index > 0 else self._inline_void,
- "args": (result, index, query, filters) if index > 0 else (),
+ "args": (session,) if index > 0 else (),
},
{"text": f"{page}/{len(result)}", "callback": self._inline_void},
{
"text": "⏩" if index + 1 < len(result) else "🚫",
- "callback": (
- self._next_page
- if index + 1 < len(result)
- else self._inline_void
- ),
- "args": (
- (result, index, query, filters)
- if index + 1 < len(result)
- else ()
- ),
+ "callback": self._next_page if index + 1 < len(result) else self._inline_void,
+ "args": (session,) if index + 1 < len(result) else (),
},
],
[
{
"text": "🔍 " + self.strings["filter_menu"].split(":")[0],
"callback": self._display_filter_menu,
- "args": (query, filters),
+ "args": (session,),
},
{
"text": "🔄 " + self.strings["change_query"],
@@ -508,11 +604,70 @@ class Limoka(loader.Module):
{
"text": self.strings["global_button"],
"callback": self._show_global_results,
- "args": (query,),
+ "args": (session,),
},
],
]
- # Add a universal close button to the navigation markup
+ markup.append(
+ [{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
+ )
+ return markup
+
+ def _build_module_markup(
+ self, session: Dict[str, Any], body_pages: List[str], page_body: int, module_path: str
+ ) -> list:
+ result = session["results"]
+ index = session["current_index"]
+ query = session["query"]
+ filters = session["filters"]
+
+ markup = []
+ if len(body_pages) > 1:
+ markup.append([
+ {
+ "text": "◀️" if page_body > 0 else "🚫",
+ "callback": self._previous_body_page if page_body > 0 else self._inline_void,
+ "args": (session, module_path, page_body) if page_body > 0 else (),
+ },
+ {"text": f"Body {page_body + 1}/{len(body_pages)}", "callback": self._inline_void},
+ {
+ "text": "▶️" if page_body + 1 < len(body_pages) else "🚫",
+ "callback": self._next_body_page if page_body + 1 < len(body_pages) else self._inline_void,
+ "args": (session, module_path, page_body) if page_body + 1 < len(body_pages) else (),
+ },
+ ])
+ page = index + 1
+ markup.append([
+ {
+ "text": "⏪" if index > 0 else "🚫",
+ "callback": self._previous_page if index > 0 else self._inline_void,
+ "args": (session,) if index > 0 else (),
+ },
+ {"text": f"{page}/{len(result)}", "callback": self._inline_void},
+ {
+ "text": "⏩" if index + 1 < len(result) else "🚫",
+ "callback": self._next_page if index + 1 < len(result) else self._inline_void,
+ "args": (session,) if index + 1 < len(result) else (),
+ },
+ ])
+ markup.append([
+ {
+ "text": "🔍 " + self.strings["filter_menu"].split(":")[0],
+ "callback": self._display_filter_menu,
+ "args": (session,),
+ },
+ {
+ "text": "🔄 " + self.strings["change_query"],
+ "callback": self._enter_query,
+ },
+ ])
+ markup.append([
+ {
+ "text": self.strings["global_button"],
+ "callback": self._show_global_results,
+ "args": (session,),
+ },
+ ])
markup.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
@@ -525,15 +680,12 @@ class Limoka(loader.Module):
markup: list,
photo: Optional[Any] = None,
):
- """Safely display module information, handling potential errors."""
try:
if message_or_call is None:
logger.error("message_or_call is None in _safe_display")
return
-
if isinstance(message_or_call, Message):
if photo is not None:
- # photo can be a URL/str, file path or a file-like object
await self.inline.form(
text=text,
message=message_or_call,
@@ -546,9 +698,7 @@ class Limoka(loader.Module):
)
else:
if photo is not None:
- await message_or_call.edit(
- text=text, reply_markup=markup, photo=photo
- )
+ await message_or_call.edit(text=text, reply_markup=markup, photo=photo)
else:
await message_or_call.edit(text=text, reply_markup=markup)
except (BadRequest, WebpageMediaEmptyError) as e:
@@ -563,63 +713,37 @@ class Limoka(loader.Module):
message_or_call: Union[Message, InlineCall],
module_info: Dict[str, Any],
module_path: str,
- query: str,
- result: List[str],
- index: int,
- filters: Dict[str, List[str]],
+ session: Dict[str, Any],
+ page_body: int = 0,
):
- """Display module information with banner and formatted content.
-
- Args:
- message_or_call: Message or InlineCall object where the module
- will be displayed.
- module_info: Dictionary with module metadata (name, description,
- meta.banner, commands, category).
- module_path: Path key of the module in `self.modules`.
- query: Original search query string.
- result: Full list of matched module paths.
- index: Index of the current module in `result`.
- filters: Active filters (e.g., categories). If ``filters`` is
- empty and no valid remote banner exists, the external fallback
- URL (`self._fallback_banner_url`) will be used.
-
- Notes:
- The method attempts to validate a remote banner URL via
- :meth:`_validate_url`. If validation succeeds the remote URL is
- passed to the messaging client. If validation fails and ``filters``
- is empty, the external fallback URL (`self._fallback_banner_url`)
- will be used. Behavior may vary depending on the messaging client
- used (Telethon/aiogram/etc.).
- """
try:
- banner_url = await self._validate_url(module_info["meta"].get("banner"))
+ query = session["query"]
+ filters = session["filters"]
+
+ lang = self.user_lang()
+ module_banner_raw = module_info.get("meta", {}).get("banner")
+ photo = await self._validate_url(module_banner_raw)
- caption_message, full_message = self._format_module_content(
+ if not photo:
+ state_banner_raw = self._get_banner_for_state("no_banner")
+ photo = await self._validate_url(state_banner_raw)
+
+ header, body_pages, footer, categories_text = self._format_module_content(
module_info,
query,
filters,
include_categories=True,
module_path=module_path,
+ lang=lang,
)
+ current_body = body_pages[min(page_body, len(body_pages) - 1)]
+ full_message = header + current_body + footer + categories_text
- markup = self._build_navigation_markup(result, index, query, filters)
+ markup = self._build_module_markup(session, body_pages, page_body, module_path)
- # Determine which banner to use. If banner_url is valid, use it.
- # If no valid banner and no filters are applied (normal search display),
- # create an in-memory BytesIO from the embedded base64 and use it.
- banner_to_use = None
- if banner_url:
- banner_to_use = banner_url
- else:
- if not filters:
- # Use external fallback URL for plain search display.
- banner_to_use = getattr(self, "_fallback_banner_url", None)
-
- display_text = caption_message if banner_to_use else full_message
await self._safe_display(
- message_or_call, display_text, markup, banner_to_use
+ message_or_call, full_message, markup, photo
)
-
except Exception as e:
logger.exception(f"Error in _display_module: {e}")
if isinstance(message_or_call, Message):
@@ -627,57 +751,79 @@ class Limoka(loader.Module):
elif hasattr(message_or_call, "edit"):
await message_or_call.edit(self.strings["error_occurred"])
- async def _display_filter_menu(
- self, call: InlineCall, query: str, current_filters: dict
+ async def _previous_body_page(
+ self, call: InlineCall, session: Dict[str, Any], module_path: str, page_body: int
):
+ module_info = self.modules[module_path]
+ new_page_body = max(page_body - 1, 0)
+ await self._display_module(call, module_info, module_path, session, page_body=new_page_body)
+
+ async def _next_body_page(
+ self, call: InlineCall, session: Dict[str, Any], module_path: str, page_body: int
+ ):
+ module_info = self.modules[module_path]
+ query = session["query"]
+ filters = session["filters"]
+ header, body_pages, footer, categories_text = self._format_module_content(
+ module_info, query, filters, include_categories=True, module_path=module_path, lang=self.user_lang()
+ )
+ new_page_body = min(page_body + 1, len(body_pages) - 1)
+ await self._display_module(call, module_info, module_path, session, page_body=new_page_body)
+
+ async def _display_filter_menu(
+ self, call: InlineCall, session: Dict[str, Any]
+ ):
+ query = session["query"]
+ current_filters = session["filters"]
+
categories = current_filters.get("category", [])
filters_text = self.strings["selected_categories"].format(
categories=(
", ".join(categories) if categories else self.strings["no_category"]
)
)
-
markup = [
[
{
"text": self.strings["filter_cat"],
"callback": self._select_category,
- "args": (query, current_filters),
+ "args": (session,),
},
],
[
{
"text": self.strings["apply_filters"],
"callback": self._apply_filters,
- "args": (query, current_filters),
+ "args": (session,),
},
{
"text": self.strings["clear_filters"],
"callback": self._clear_filters,
- "args": (query,),
+ "args": (session,),
},
],
[
{
"text": self.strings["back_to_results"],
"callback": self._show_results,
- "args": (query, {}, True),
+ "args": (session, True),
},
],
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}],
]
-
- text = self.strings["filter_menu"].format(query=query) + f"\n\n{filters_text}"
- await call.edit(text, reply_markup=markup)
+ text = self.strings["filter_menu"].format(query=query) + f"\n{filters_text}"
+ await call.edit(text, reply_markup=markup, photo=self._get_banner_for_state("filter_select"))
async def _select_category(
- self, call: InlineCall, query: str, current_filters: dict
+ self, call: InlineCall, session: Dict[str, Any]
):
+ query = session["query"]
+ current_filters = session["filters"]
+
all_categories = set()
for module_data in self.modules.values():
all_categories.update(module_data.get("category", ["No category"]))
categories = sorted(all_categories)
-
if not categories:
await call.edit(
self.strings["no_categories"],
@@ -686,17 +832,15 @@ class Limoka(loader.Module):
{
"text": self.strings["back"],
"callback": self._display_filter_menu,
- "args": (query, current_filters),
+ "args": (session,),
}
]
],
)
return
-
selected_categories = current_filters.get("category", [])
buttons = []
row = []
-
for i, cat in enumerate(categories):
button_text = (
self.strings["category"].format(category=cat)
@@ -705,71 +849,74 @@ class Limoka(loader.Module):
)
if cat in selected_categories:
button_text = "✅ " + button_text
-
+
+ # Create new session with updated filters
+ new_session = session.copy()
row.append(
{
"text": button_text,
"callback": self._toggle_category,
- "args": (query, current_filters, cat),
+ "args": (session, cat),
}
)
-
if (i + 1) % 3 == 0 or i == len(categories) - 1:
buttons.append(row)
row = []
-
buttons.append(
[
{
"text": self.strings["back"],
"callback": self._display_filter_menu,
- "args": (query, current_filters),
+ "args": (session,),
}
]
)
-
- # Add close button to category selector
buttons.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
-
text = self.strings["select_category"].format(query=query)
await call.edit(text, reply_markup=buttons)
async def _toggle_category(
- self, call: InlineCall, query: str, current_filters: dict, category: str
+ self, call: InlineCall, session: Dict[str, Any], category: str
):
+ query = session["query"]
+ current_filters = session["filters"]
+
new_filters = current_filters.copy()
selected_categories = new_filters.get("category", [])
-
if category in selected_categories:
selected_categories.remove(category)
else:
selected_categories.append(category)
-
if selected_categories:
new_filters["category"] = selected_categories
else:
new_filters.pop("category", None)
+ new_session = session.copy()
+ new_session["filters"] = new_filters
+ await self._select_category(call, new_session)
- await self._select_category(call, query, new_filters)
+ async def _apply_filters(self, call: InlineCall, session: Dict[str, Any]):
+ await self._show_results(call, session, from_filters=True)
- async def _apply_filters(self, call: InlineCall, query: str, filters: dict):
- await self._show_results(call, query, filters, from_filters=True)
-
- async def _clear_filters(self, call: InlineCall, query: str):
- await self._show_results(call, query, {}, from_filters=True)
+ async def _clear_filters(self, call: InlineCall, session: Dict[str, Any]):
+ new_session = session.copy()
+ new_session["filters"] = {}
+ await self._show_results(call, new_session, from_filters=True)
async def _show_results(
- self, call: InlineCall, query: str, filters: dict, from_filters: bool = False
+ self, call: InlineCall, session: Dict[str, Any], from_filters: bool = False
):
+ query = session["query"]
+ filters = session["filters"]
+
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
except Exception:
await call.edit(self.strings["?"], reply_markup=[])
return
-
if not result:
markup = (
[
@@ -777,14 +924,13 @@ class Limoka(loader.Module):
{
"text": self.strings["back"],
"callback": self._display_filter_menu,
- "args": (query, filters),
+ "args": (session,),
}
]
]
if from_filters
else []
)
- # Always provide a close button on empty-result screens
markup.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
@@ -792,7 +938,6 @@ class Limoka(loader.Module):
self.strings["404"].format(query=query), reply_markup=markup
)
return
-
if filters.get("category"):
filtered_result = [
path
@@ -804,7 +949,6 @@ class Limoka(loader.Module):
]
else:
filtered_result = result
-
if not filtered_result:
markup = (
[
@@ -812,14 +956,13 @@ class Limoka(loader.Module):
{
"text": self.strings["back"],
"callback": self._display_filter_menu,
- "args": (query, filters),
+ "args": (session,),
}
]
]
if from_filters
else []
)
- # Add close button when filtered results are empty
markup.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
@@ -827,40 +970,36 @@ class Limoka(loader.Module):
self.strings["404"].format(query=query), reply_markup=markup
)
return
-
module_path = filtered_result[0]
module_info = self.modules[module_path]
+
+ # Create session for displaying module
+ display_session = self._create_search_session(
+ state=self.SEARCH_STATES["global_search"],
+ query=query,
+ filters=filters,
+ results=filtered_result,
+ current_index=0,
+ )
await self._display_module(
- call, module_info, module_path, query, filtered_result, 0, filters
+ call, module_info, module_path, display_session, 0
)
async def _enter_query_handler(
self, call_or_query, query: Optional[str] = None, *args, **kwargs
):
- """Handler for inline query input.
-
- This handler is tolerant to different calling conventions used by the
- framework: some callers provide `(call, query)`, others may provide
- `(query,)` or `(query, call)` depending on context. Normalize the
- inputs so the handler works from menus and forms alike.
- """
- # Normalize parameters: try to find `call` (message or InlineCall)
call = None
if query is None and isinstance(call_or_query, str):
- # Called as (query, ...) — search text is first argument
query = call_or_query
for a in args:
if hasattr(a, "edit") or isinstance(a, Message):
call = a
break
else:
- # Expected calling convention: (call, query, ...)
call = call_or_query
-
if call is None:
logger.error("_enter_query_handler: missing call/context")
return
-
if not query:
await call.edit(
self.strings["?"],
@@ -874,7 +1013,6 @@ class Limoka(loader.Module):
],
)
return
-
if len(query) <= 1:
await call.edit(
self.strings["?"],
@@ -888,7 +1026,6 @@ class Limoka(loader.Module):
],
)
return
-
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
@@ -905,7 +1042,6 @@ class Limoka(loader.Module):
],
)
return
-
if not result:
await call.edit(
self.strings["404"].format(query=query),
@@ -925,17 +1061,20 @@ class Limoka(loader.Module):
],
)
return
-
module_path = result[0]
module_info = self.modules[module_path]
- await self._display_module(call, module_info, module_path, query, result, 0, {})
+
+ # Create session for displaying module
+ display_session = self._create_search_session(
+ state=self.SEARCH_STATES["global_search"],
+ query=query,
+ filters={},
+ results=result,
+ current_index=0,
+ )
+ await self._display_module(call, module_info, module_path, display_session, 0)
async def _enter_query(self, call: InlineCall, query: Optional[str] = None):
- """Show input form for new query.
-
- Accepts an optional `query` when called from other menus so the
- "back to results" button can restore the previous search context.
- """
markup = [
[
{
@@ -948,7 +1087,11 @@ class Limoka(loader.Module):
{
"text": self.strings["back_to_results"],
"callback": self._show_results,
- "args": (query or "", {}),
+ "args": (self._create_search_session(
+ state=self.SEARCH_STATES["global_search"],
+ query=query or "",
+ filters={},
+ ),),
}
],
[
@@ -958,17 +1101,17 @@ class Limoka(loader.Module):
}
],
]
-
await call.edit(self.strings["enter_query"], reply_markup=markup)
- async def _show_global_results(self, call: InlineCall, query: str):
+ async def _show_global_results(self, call: InlineCall, session: Dict[str, Any]):
+ query = session["query"]
+
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
except Exception:
await call.edit(self.strings["?"], reply_markup=[])
return
-
if not result:
await call.edit(
self.strings["404"].format(query=query),
@@ -982,7 +1125,6 @@ class Limoka(loader.Module):
],
)
return
-
text = self.strings["global_search"].format(
query=html.escape(query), count=len(result)
)
@@ -992,55 +1134,72 @@ class Limoka(loader.Module):
if not info:
continue
name = info.get("name", "Unknown")
+
+ global_session = self._create_search_session(
+ state=self.SEARCH_STATES["global_search"],
+ query=query,
+ filters={},
+ results=result,
+ current_index=i,
+ )
buttons.append(
[
{
"text": f"{i+1}. {name}",
"callback": self._display_module_from_global,
- "args": (path, query, result),
+ "args": (path, global_session),
}
]
)
buttons.append(
[{"text": self.strings["change_query"], "callback": self._enter_query}]
)
-
await call.edit(text=text[:4096], reply_markup=buttons)
async def _display_module_from_global(
- self, call: InlineCall, module_path: str, query: str, result: list
+ self, call: InlineCall, module_path: str, session: Dict[str, Any]
):
module_info = self.modules[module_path]
await self._display_module(
- call, module_info, module_path, query, result, result.index(module_path), {}
+ call, module_info, module_path, session, 0
)
async def _next_page(
- self, call: InlineCall, result: list, index: int, query: str, filters: dict
+ self, call: InlineCall, session: Dict[str, Any]
):
+ result = session["results"]
+ index = session["current_index"]
+
if index + 1 >= len(result):
await call.answer(self.strings["last_page"])
return
-
index += 1
module_path = result[index]
module_info = self.modules[module_path]
+
+ new_session = session.copy()
+ new_session["current_index"] = index
await self._display_module(
- call, module_info, module_path, query, result, index, filters
+ call, module_info, module_path, new_session, 0
)
async def _previous_page(
- self, call: InlineCall, result: list, index: int, query: str, filters: dict
+ self, call: InlineCall, session: Dict[str, Any]
):
+ result = session["results"]
+ index = session["current_index"]
+
if index - 1 < 0:
await call.answer(self.strings["first_page"])
return
-
index -= 1
module_path = result[index]
module_info = self.modules[module_path]
+
+ new_session = session.copy()
+ new_session["current_index"] = index
await self._display_module(
- call, module_info, module_path, query, result, index, filters
+ call, module_info, module_path, new_session, 0
)
async def _inline_void(self, call: InlineCall):
@@ -1050,7 +1209,6 @@ class Limoka(loader.Module):
async def limokacmd(self, message: Message):
"""[query / nothing] - Search modules"""
args = utils.get_args_raw(message)
-
if not args:
markup = [
[
@@ -1068,24 +1226,21 @@ class Limoka(loader.Module):
}
],
]
- # Close button on the main no-args form
markup.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
-
await self.inline.form(
text=self.strings["start_search_form"],
message=message,
reply_markup=markup,
+ photo=self._get_banner_for_state("global_search")
)
return
-
history = self.get("history", [])
if len(history) >= 10:
history = history[-9:]
history.append(args)
self.set("history", history)
-
await utils.answer(
message,
self.strings["wait"].format(
@@ -1094,21 +1249,25 @@ class Limoka(loader.Module):
query=args,
),
)
-
searcher = Search(args.lower(), self.ix)
try:
result = searcher.search_module()
except Exception:
return await utils.answer(message, self.strings["?"])
-
if not result:
return await utils.answer(message, self.strings["404"].format(query=args))
-
module_path = result[0]
module_info = self.modules[module_path]
- await self._display_module(
- message, module_info, module_path, args, result, 0, {}
+
+ # Create session for displaying module
+ display_session = self._create_search_session(
+ state=self.SEARCH_STATES["global_search"],
+ query=args,
+ filters={},
+ results=result,
+ current_index=0,
)
+ await self._display_module(message, module_info, module_path, display_session, 0)
async def _show_global_form(self, call: InlineCall, message: Message):
markup = [
@@ -1133,12 +1292,18 @@ class Limoka(loader.Module):
}
],
]
-
await call.edit(self.strings["global_search_form"], reply_markup=markup)
async def _global_search_handler(
self, call: InlineCall, query: str, message: Message, *args, **kwargs
):
+ global_session = self._create_search_session(
+ state=self.SEARCH_STATES["global_search"],
+ query=query,
+ filters={},
+ results=[],
+ current_index=0,
+ ) # idk what is that crap but it works lol
if len(query) <= 1:
await call.edit(
self.strings["?"],
@@ -1158,7 +1323,6 @@ class Limoka(loader.Module):
],
)
return
-
searcher = Search(query.lower(), self.ix)
try:
result = searcher.search_module()
@@ -1181,7 +1345,6 @@ class Limoka(loader.Module):
],
)
return
-
if not result:
await call.edit(
self.strings["404"].format(query=query),
@@ -1201,7 +1364,6 @@ class Limoka(loader.Module):
],
)
return
-
text = self.strings["global_search"].format(
query=html.escape(query), count=len(result)
)
@@ -1216,7 +1378,7 @@ class Limoka(loader.Module):
{
"text": f"{i+1}. {name}",
"callback": self._display_module_from_global,
- "args": (path, query, result),
+ "args": (path, global_session),
}
]
)
@@ -1231,29 +1393,23 @@ class Limoka(loader.Module):
buttons.append(
[{"text": self.strings.get("close", "❌ Close"), "action": "close"}]
)
-
await call.edit(text=text[:4096], reply_markup=buttons)
@loader.command(ru_doc="[clear] — Показать или очистить историю поиска")
async def lshistorycmd(self, message: Message):
"""[clear] - Show or clear search history"""
args = utils.get_args_raw(message).strip().lower()
-
if args == "clear":
self.set("history", [])
await utils.answer(message, self.strings["history_cleared"])
return
-
if args:
await utils.answer(message, self.strings["invalid_history_arg"])
return
-
history = self.get("history", [])
-
if not history:
await utils.answer(message, self.strings["empty_history"])
return
-
formatted_history = [
f"{i+1}. {utils.escape_html(h)}"
for i, h in enumerate(history[-10:])
@@ -1265,80 +1421,44 @@ class Limoka(loader.Module):
@loader.watcher(from_dl=False)
async def secure_install_watcher(self, message: Message):
- """Secure install watcher for official Limoka bot.
-
- This watcher cleans HTML from incoming messages, extracts a
- signed #limoka:: tag, verifies the signature and
- triggers the loader to download and install the module if valid.
- """
if not message.text:
return
-
- # Verify sender id is present and comes from the official Limoka bot
if not hasattr(message, "from_id") or not message.from_id:
return
-
sender_id = None
if hasattr(message.from_id, "user_id"):
sender_id = message.from_id.user_id
elif hasattr(message.from_id, "channel_id"):
sender_id = message.from_id.channel_id
-
- if sender_id != 8581621390:
+ if sender_id != self._service_bot_id:
logger.debug("Message not from official bot, ignoring")
return
-
- # Only act when external installs are enabled
if not self.config["external_install_allowed"]:
return
-
try:
- # Prefer raw_text/message when available to preserve original
- # formatting (some clients provide parsed .text that loses
- # tags/links). Fall back to .text if needed.
clean_text = getattr(message, "raw_text", None) or getattr(
message, "message", None
) or message.text or ""
-
if message.entities:
from html import unescape
-
clean_text = unescape(clean_text)
- # Remove HTML tags but keep their inner text so we don't
- # accidentally remove the tag content when it's wrapped
- # in an or similar.
- clean_text = re.sub(r"<[^>]+>", "", clean_text)
-
- # Extract the first #limoka: occurrence. Allow for
- # characters until whitespace or HTML/quote delimiters.
+ clean_text = re.sub(r"<[^>]+>", "", clean_text)
match = re.search(r"#limoka:([^\s\"'<>]+)", clean_text)
if not match:
logger.debug(
"No #limoka tag found in cleaned text; leaving original message intact"
)
- # Do not send a user-visible reply for missing tag; simply exit.
return
-
tag_content = match.group(1)
-
- # Expect format: :
parts = tag_content.split(":", 1)
if len(parts) != 2:
logger.error("Invalid tag format after cleaning")
await utils.answer(message, self.strings["watcher_invalid_format"])
- # Do not delete the original message on parse errors.
return
-
module_path, signature_hex = parts
-
- # Strip leftover quote characters and whitespace
module_path = re.sub(r"[<>\"']", "", module_path).strip()
-
- # Handle possible href= artifacts
if module_path.startswith("href="):
module_path = module_path[5:].strip('"').strip("'")
-
- # Try to resolve the module key in database
if module_path not in self.modules:
found = False
for db_path in self.modules.keys():
@@ -1346,30 +1466,18 @@ class Limoka(loader.Module):
module_path = db_path
found = True
break
-
if not found:
logger.warning(f"Module not found after cleanup: {module_path}")
await utils.answer(
message, self.strings["watcher_module_not_found"].format(path=html.escape(module_path))
)
- # Keep original message in chat for inspection.
return
-
- # logger.info(f"Module found in database: {module_path}")
-
- # Verify signature using embedded public key — signature covers
- # the module path AND the SHA256 of the module content (format:
- # "{module_path}|{sha256}"). Download module, compute hash and
- # verify signature against that combined payload.
try:
import base64
from cryptography.hazmat.primitives.asymmetric import ed25519
-
PUB_KEY_B64 = "MCowBQYDK2VwAyEA1ltSnqtf3pGBuctuAYqHivCXsaRtKOVxavai7yin7ZE="
der_bytes = base64.b64decode(PUB_KEY_B64)
raw_pubkey = der_bytes[-32:]
-
- # Download module content to compute SHA256
module_url = self.config["limokaurl"] + module_path
async with aiohttp.ClientSession() as session:
async with session.get(module_url, timeout=10) as resp:
@@ -1378,65 +1486,41 @@ class Limoka(loader.Module):
await utils.answer(message, self.strings["watcher_loader_missing"])
return
module_bytes = await resp.read()
-
- sha256 = hashlib.sha256(module_bytes).hexdigest()
-
- public_key = ed25519.Ed25519PublicKey.from_public_bytes(raw_pubkey)
- signature = bytes.fromhex(signature_hex)
- signed_payload = f"{module_path}|{sha256}".encode()
- public_key.verify(signature, signed_payload)
- # logger.info(f"Signature verified for {module_path} (sha256={sha256})")
+ sha256 = hashlib.sha256(module_bytes).hexdigest()
+ public_key = ed25519.Ed25519PublicKey.from_public_bytes(raw_pubkey)
+ signature = bytes.fromhex(signature_hex)
+ signed_payload = f"{module_path}|{sha256}".encode()
+ public_key.verify(signature, signed_payload)
except Exception as e:
logger.error(f"Signature verification failed for {module_path}: {e}")
await utils.answer(message, self.strings["watcher_signature_invalid"])
- # Keep original message so admins can inspect the signed payload.
return
-
- # Perform install via loader
loader_mod = self.lookup("loader")
if not loader_mod:
logger.error("Loader module not found")
await utils.answer(message, self.strings["watcher_loader_missing"])
- # Do not delete the original message on loader problems.
return
-
module_url = self.config["limokaurl"] + module_path
- # logger.info(f"Installing from URL: {module_url}")
-
status = await loader_mod.download_and_install(module_url, None)
-
if getattr(loader_mod, "fully_loaded", False):
loader_mod.update_modules_in_db()
-
- # Attempt to remove the original message
try:
await message.delete()
- # logger.info("Original message deleted")
except Exception as e:
logger.error(f"Failed to delete message: {e}")
-
- #logger.info(status)
-
if status:
- # module_name = module_path.split("/")[-1].replace(".py", "")
- # Notify official bot about success
try:
- bot_peer = await self.client.get_entity(8581621390)
+ bot_peer = await self.client.get_entity(self._service_bot_id)
await self.client.send_message(bot_peer, f"#limoka:sucsess:{message.id}")
- # logger.info(f"Sent success confirmation to bot for message {message.id}")
except Exception as e:
logger.error(f"Failed to send success confirmation: {e}")
-
- # logger.info(f"Module {module_name} installed successfully")
else:
logger.error(f"Installation failed with status: {status}")
try:
- bot_peer = await self.client.get_entity(8581621390)
+ bot_peer = await self.client.get_entity(self._service_bot_id)
await self.client.send_message(bot_peer, f"#limoka:failed:{message.id}")
- # logger.info(f"Sent failure notification to bot for message {message.id}")
except Exception as e:
logger.error(f"Failed to send failure notification: {e}")
-
except Exception as e:
logger.exception(f"CRITICAL ERROR in secure_install_watcher: {e}")
try:
diff --git a/backup.py b/backup.py
new file mode 100644
index 0000000..1dc16f1
--- /dev/null
+++ b/backup.py
@@ -0,0 +1,74 @@
+import asyncio
+import aiohttp
+import argparse
+import subprocess
+import os
+import glob
+
+parser = argparse.ArgumentParser(description="Backup Script")
+parser.add_argument(
+ "--token",
+ type=str,
+ required=True,
+ help="Token of Telegram bot",
+)
+parser.add_argument(
+ "--api_url",
+ type=str,
+ default="https://api.telegram.org",
+ help="API URL of Telegram API",
+)
+parser.add_argument(
+ "--chat_id",
+ type=str,
+ required=True,
+ help="Chat ID to send backup message to",
+)
+
+arguments = parser.parse_args()
+
+async def send_file(session, file_path, caption=None):
+ url = f"{arguments.api_url}/bot{arguments.token}/sendDocument"
+ with open(file_path, 'rb') as f:
+ data = aiohttp.FormData()
+ data.add_field('chat_id', arguments.chat_id)
+ data.add_field('document', f, filename=os.path.basename(file_path))
+ if caption:
+ data.add_field('caption', caption)
+ data.add_field('parse_mode', 'Markdown')
+ async with session.post(url, data=data) as response:
+ return await response.json()
+
+async def main():
+ # Get commit info
+ commit_message = subprocess.check_output(['git', 'log', '-1', '--pretty=%B']).decode().strip()
+ commit_date = subprocess.check_output(['git', 'log', '-1', '--pretty=%ci']).decode().strip()
+ commit_hash = subprocess.check_output(['git', 'rev-parse', '--short=6', 'HEAD']).decode().strip()
+ commit_url = f"https://github.com/MuRuLOSE/limoka/commit/{subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip()}"
+ message = f"Commit Date: {commit_date}, Commit Message: {commit_message}, Commit Hash: [`{commit_hash}`]({commit_url})"
+
+ # Create zip
+ subprocess.run(['git', 'archive', '--format=zip', '--output=repository-original.zip', 'HEAD'])
+ subprocess.run(['zip', '-9', 'repository.zip', 'repository-original.zip'])
+ os.remove('repository-original.zip')
+
+ # Split zip
+ subprocess.run(['split', '-b', '49M', 'repository.zip', 'repository-part-'])
+
+ # Send parts
+ async with aiohttp.ClientSession() as session:
+ parts = sorted(glob.glob('repository-part-*'))
+ first = True
+ for part in parts:
+ caption = message if first else None
+ result = await send_file(session, part, caption)
+ print(f"Sent {part}: {result}")
+ first = False
+
+ # Cleanup
+ os.remove('repository.zip')
+ for part in parts:
+ os.remove(part)
+
+if __name__ == "__main__":
+ asyncio.run(main())
\ No newline at end of file
diff --git a/parse.py b/parse.py
index 4d10721..4b1b657 100644
--- a/parse.py
+++ b/parse.py
@@ -1,171 +1,353 @@
-import os
import ast
import json
+import os
+import logging
+from typing import Dict, Any, Optional, List
-from clone_repos import repos
-from typing import Dict
+logging.basicConfig(level=logging.WARNING, format="%(message)s")
+logger = logging.getLogger(__name__)
-# TODO: ADD VENV IGNORE
+def safe_unparse(node: ast.AST) -> str:
+ try:
+ return ast.unparse(node)
+ except AttributeError:
+ return getattr(node, 'id', str(type(node).__name__))
+def load_blacklist(file_path):
+ with open(file_path, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ repositories = data.get("repositories", [])
+ blacklisted_modules = {}
-def get_module_info(module_path):
- """Парсит Python-модуль и извлекает информацию о нем."""
- with open(module_path, "r", encoding="utf-8") as f:
- module_content = f.read()
+ for i in repositories:
+ path = i.get("path", "")
+ blacklist = i.get("blacklist", [])
+ if path and blacklist:
+ blacklisted_modules[path] = blacklist
- meta_info = {"pic": None, "banner": None}
- for line in module_content.split("\n"):
- if line.startswith("# meta"):
- key, value = line.replace("# meta ", "").split(": ")
- meta_info[key] = value
+ return blacklisted_modules
- tree = ast.parse(module_content)
+def extract_string_value(node: ast.AST) -> Optional[str]:
+ try:
+ if isinstance(node, ast.Constant) and isinstance(node.value, str):
+ return node.value
+ if isinstance(node, ast.Str):
+ return node.s
+ if isinstance(node, ast.Name):
+ return node.id
+ if isinstance(node, ast.Attribute):
+ return f"{safe_unparse(node.value)}.{node.attr}"
+ return str(node)
+ except Exception:
+ return None
- def get_decorator_names(decorator_list):
- return [ast.unparse(decorator) for decorator in decorator_list]
-
- def extract_loader_command_args(decorator):
- """Извлекает аргументы `ru_doc` и `en_doc` из `@loader.command`."""
- if (
- isinstance(decorator, ast.Call)
- and hasattr(decorator.func, "attr")
- and decorator.func.attr == "command"
- ):
- ru_doc = None
- en_doc = None
- for keyword in decorator.keywords:
- if keyword.arg == "ru_doc":
- ru_doc = ast.literal_eval(keyword.value)
- elif keyword.arg == "en_doc":
- en_doc = ast.literal_eval(keyword.value)
- return ru_doc, en_doc
- return None, None
-
- result = {}
- for node in ast.walk(tree):
- if isinstance(node, ast.ClassDef):
- decorators = get_decorator_names(node.decorator_list)
- is_tds_mod = [d for d in decorators if "loader.tds" in d]
- if "Mod" not in node.name and not is_tds_mod:
+def extract_loader_command_args(decorator: ast.Call) -> Dict[str, Any]:
+ args = {"lang_docs": {}, "aliases": [], "usage": None}
+ try:
+ for kw in decorator.keywords:
+ arg_name = kw.arg
+ if not arg_name:
continue
+ if arg_name.endswith("_doc"):
+ lang = arg_name[:-4]
+ args["lang_docs"][lang] = extract_string_value(kw.value)
+ elif arg_name == "aliases":
+ try:
+ val = ast.literal_eval(kw.value)
+ if isinstance(val, (list, tuple)):
+ args["aliases"] = list(val)
+ except (ValueError, SyntaxError):
+ pass
+ elif arg_name == "usage":
+ args["usage"] = extract_string_value(kw.value)
+ except Exception:
+ pass
+ return args
- class_docstring = ast.get_docstring(node)
- class_info = {
- "name": node.name,
- "description": class_docstring,
- "meta": meta_info,
- "commands": [],
- "new_commands": [],
- }
+def get_module_info(module_path: str) -> Optional[Dict[str, Any]]:
+ try:
+ with open(module_path, "r", encoding="utf-8") as f:
+ source = f.read()
+ except Exception as e:
+ logger.warning(f"Skipping {module_path}: read failed — {e}")
+ return None
- for class_body_node in node.body:
- if isinstance(class_body_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
- decorators = get_decorator_names(class_body_node.decorator_list)
- is_loader_command = [d for d in decorators if "command" in d]
- if not is_loader_command and "cmd" not in class_body_node.name:
- continue
+ source = source.lstrip('\ufeff')
+ source = ''.join(c for c in source if ord(c) >= 32 or c in '\n\r\t') if source else source
- method_docstring = ast.get_docstring(class_body_node)
- command_name = class_body_node.name
- ru_doc, en_doc = None, None
+ meta = {"pic": None, "banner": None, "developer": None}
+ for line in source.splitlines():
+ line = line.strip()
+ if line.startswith("# meta "):
+ try:
+ key, val = line[len("# meta "):].split(":", 1)
+ meta[key.strip()] = val.strip()
+ except ValueError:
+ pass
- for decorator in class_body_node.decorator_list:
- ru_doc_tmp, en_doc_tmp = extract_loader_command_args(decorator)
- if ru_doc_tmp:
- ru_doc = ru_doc_tmp
- if en_doc_tmp:
- en_doc = en_doc_tmp
+ try:
+ tree = ast.parse(source, filename=module_path)
+ except SyntaxError as e:
+ logger.warning(f"Skipping {module_path}: syntax error — {e}")
+ return {
+ "name": module_path.split(os.sep)[-1].replace(".py", ""),
+ "description": "",
+ "cls_doc": {},
+ "meta": meta,
+ "commands": [],
+ "new_commands": [],
+ "inline_handlers": [],
+ "strings": {},
+ "has_on_load": False,
+ "has_on_unload": False,
+ "class_cmd_names": {},
+ }
- descriptions = []
- if method_docstring:
- descriptions.append(method_docstring)
- if ru_doc:
- descriptions.append(ru_doc)
- if en_doc:
- descriptions.append(en_doc)
+ module_data = None
- class_info["commands"].append(
- {command_name: ' '.join(descriptions)}
- )
-
- command_name = command_name.replace('cmd', '')
-
- class_info["new_commands"].append(
- {
- command_name: {
- "ru_doc": ru_doc,
- "en_doc": en_doc,
- "doc": method_docstring,
- }
- }
- )
-
- result = class_info
-
- return result
-
-def parse_developers(base_dir: str) -> Dict[str, list]:
- developers = {
- "repo": set(), # используем set внутри функции
- "channel": set()
- }
-
- for repo_url in repos:
- repo_path = repo_url.replace("https://github.com/", "")
- try:
- owner, repo_name = repo_path.split("/")
- developers["repo"].add(owner)
- except ValueError:
- print(f"Incorrect URL of repository: {repo_url}")
+ for node in ast.walk(tree):
+ if not isinstance(node, ast.ClassDef):
continue
- for root, _, files in os.walk(base_dir):
- for file in files:
- if file.endswith(".py"):
- file_path = os.path.join(root, file)
- try:
- module_info = get_module_info(file_path)
- if module_info and "meta" in module_info:
- developer = module_info["meta"].get('developer')
- if developer: # Проверяем, что developer не None
- # Разделяем строки с запятыми, &, | и пробелами
- for dev in developer.replace(',', ' ').replace('&', ' ').replace('|', ' ').split():
- # Добавляем только элементы, начинающиеся с @
- if dev.startswith('@'):
- developers["channel"].add(dev.strip())
- except Exception as e:
- print(f"Ошибка при парсинге файла {file_path}: {e}")
+ is_module_class = (
+ "Mod" in node.name or
+ any(isinstance(d, ast.Attribute) and safe_unparse(d).startswith("loader.tds") for d in node.decorator_list) or
+ any(isinstance(d, ast.Name) and d.id == "loader" for d in node.decorator_list)
+ )
- # Преобразуем set в list перед возвратом
- return {
- "repo": list(developers["repo"]),
- "channel": list(developers["channel"])
+ if not is_module_class:
+ continue
+
+ info = {
+ "name": node.name,
+ "description": ast.get_docstring(node) or "",
+ "cls_doc": {},
+ "meta": meta,
+ "commands": [],
+ "new_commands": [],
+ "inline_handlers": [],
+ "strings": {},
+ "has_on_load": False,
+ "has_on_load": False,
+ "has_on_unload": False,
+ "class_cmd_names": {},
+ }
+
+ for item in node.body:
+ if isinstance(item, ast.Assign):
+ for target in item.targets:
+ if isinstance(target, ast.Name) and (target.id == "strings" or target.id.startswith("strings_")):
+ try:
+ lit = ast.literal_eval(item.value)
+ if isinstance(lit, dict):
+ if target.id == "strings":
+ info["strings"].update(lit)
+ if "_cls_doc" in lit:
+ info["cls_doc"]["default"] = lit["_cls_doc"]
+ else:
+ lang = target.id.split("_", 1)[1] if "_" in target.id else None
+ if lang:
+ for k, v in lit.items():
+ if isinstance(k, str) and isinstance(v, str):
+ if k == "_cls_doc":
+ info["cls_doc"][lang] = v
+ elif k.startswith("_cmd_doc_"):
+ rest = k[len("_cmd_doc_"):]
+ info["strings"][f"_cmd_doc_{lang}_{rest}"] = v
+ info["strings"][f"_cmd_doc_{rest}_{lang}"] = v
+ elif k.startswith("_ihandle_doc_"):
+ rest = k[len("_ihandle_doc_"):]
+ info["strings"][f"_ihandle_doc_{lang}_{rest}"] = v
+ info["strings"][f"_ihandle_doc_{rest}_{lang}"] = v
+ elif k.startswith("_cls_cmd_"):
+ info["class_cmd_names"][lang] = v
+ else:
+ info["strings"][f"{k}_{lang}"] = v
+ except Exception:
+ pass
+
+ if "_cls_doc" in info["strings"]:
+ info["cls_doc"]["default"] = info["strings"]["_cls_doc"]
+
+ for func in node.body:
+ if not isinstance(func, (ast.FunctionDef, ast.AsyncFunctionDef)):
+ continue
+
+ name = func.name
+ if name == "on_load":
+ info["has_on_load"] = True
+ continue
+ if name == "on_unload":
+ info["has_on_unload"] = True
+ continue
+
+ is_decorated = any(
+ isinstance(d, ast.Call) and hasattr(d.func, 'attr') and
+ d.func.attr in ("command", "inline_handler", "unrestricted", "owner")
+ for d in func.decorator_list
+ )
+
+ if name.startswith("_") and not is_decorated:
+ continue
+
+ cmd = {
+ "name": name,
+ "doc": ast.get_docstring(func) or "",
+ "lang_docs": {},
+ "aliases": [],
+ "usage": None,
+ "inline": False,
+ "is_inline_handler": False,
+ "decorators": [],
+ "cmd_names": {},
+ }
+
+ for dec in func.decorator_list:
+ if isinstance(dec, ast.Call) and hasattr(dec.func, 'attr'):
+ attr = dec.func.attr
+ if attr == "command":
+ cmd.update(extract_loader_command_args(dec))
+ elif attr == "inline_handler":
+ cmd["inline"] = True
+ cmd["is_inline_handler"] = True
+ elif attr in ("unrestricted", "owner", "support"):
+ cmd["decorators"].append(attr)
+
+ for stmt in func.body:
+ if isinstance(stmt, ast.Assign):
+ for target in stmt.targets:
+ if isinstance(target, ast.Attribute):
+ attr = target.attr
+ val = extract_string_value(stmt.value)
+ if not val:
+ continue
+ if attr == "_cmd":
+ cmd["name"] = val
+ elif attr == "_doc":
+ cmd["doc"] = val
+ elif attr == "_cls_doc":
+ info["cls_doc"]["default"] = val
+ elif attr.startswith("_cls_doc_"):
+ lang = attr[len("_cls_doc_"):]
+ info["cls_doc"][lang] = val
+ elif attr.startswith("_cmd_"):
+ lang = attr[len("_cmd_"):]
+ cmd["cmd_names"][lang] = val
+
+ is_command_name = "cmd" in name and not name.startswith("__")
+ if not (is_decorated or is_command_name):
+ continue
+
+ clean_name = cmd["name"].replace("cmd", "").replace("_", "")
+
+ descs = []
+ legacy_key = f"_cmd_doc_{clean_name}"
+ legacy_doc = info["strings"].get(legacy_key)
+ base_doc = legacy_doc if legacy_doc else cmd["doc"]
+ if base_doc:
+ descs.append(base_doc)
+
+ for lang, text in cmd["lang_docs"].items():
+ if text:
+ descs.append(f"({lang.upper()}) {text}")
+
+ for k, v in info["strings"].items():
+ if k.startswith("_cmd_doc_") and clean_name in k and v:
+ if k.endswith(f"_{clean_name}"):
+ lang_part = k[len("_cmd_doc_"):-len(f"_{clean_name}")-1]
+ if lang_part:
+ descs.append(f"({lang_part.upper()}) {v}")
+ elif k.startswith(f"_cmd_doc_{clean_name}_"):
+ lang_part = k[len(f"_cmd_doc_{clean_name}_"):]
+ if lang_part:
+ descs.append(f"({lang_part.upper()}) {v}")
+
+ full_desc = " | ".join(filter(None, descs))
+ info["commands"].append({clean_name: full_desc})
+
+ desc_map = {"default": legacy_doc or cmd["doc"]}
+ desc_map.update(cmd["lang_docs"])
+
+ for k, v in info["strings"].items():
+ if k.startswith("_cmd_doc_") and clean_name in k and v:
+ if k.endswith(f"_{clean_name}"):
+ lang_part = k[len("_cmd_doc_"):-len(f"_{clean_name}")-1]
+ if lang_part:
+ desc_map[lang_part] = v
+ elif k.startswith(f"_cmd_doc_{clean_name}_"):
+ lang_part = k[len(f"_cmd_doc_{clean_name}_"):]
+ if lang_part:
+ desc_map[lang_part] = v
+
+ info["new_commands"].append({
+ "name": clean_name,
+ "original_name": cmd["name"],
+ "description": desc_map,
+ "cmd_names": cmd["cmd_names"],
+ "aliases": cmd["aliases"],
+ "usage": cmd["usage"],
+ "inline": cmd["inline"],
+ "is_inline_handler": cmd["is_inline_handler"],
+ "decorators": cmd["decorators"],
+ })
+
+ if cmd["is_inline_handler"]:
+ inline_desc_map = {"default": cmd["doc"]}
+ inline_desc_map.update(cmd["lang_docs"])
+
+ for k, v in info["strings"].items():
+ if k.startswith("_ihandle_doc_") and clean_name in k and v:
+ if k.endswith(f"_{clean_name}"):
+ lang_part = k[len("_ihandle_doc_"):-len(f"_{clean_name}")-1]
+ if lang_part:
+ inline_desc_map[lang_part] = v
+ elif k.startswith(f"_ihandle_doc_{clean_name}_"):
+ lang_part = k[len(f"_ihandle_doc_{clean_name}_"):]
+ if lang_part:
+ inline_desc_map[lang_part] = v
+
+ info["inline_handlers"].append({
+ "name": clean_name,
+ "description": inline_desc_map,
+ "decorators": cmd["decorators"],
+ })
+
+ module_data = info
+ break
+
+ return module_data
+
+def main():
+ base_dir = os.getcwd()
+ modules = {}
+ blacklisted_modules = load_blacklist("repositories.json")
+
+ for root, dirs, files in os.walk(base_dir):
+ dirs[:] = [d for d in dirs if d not in ("venv", ".venv", "env", ".env", ".git")]
+
+ for file in files:
+ if file.endswith(".py") and not file.startswith("_") and file not in blacklisted_modules.get(os.path.relpath(root, base_dir), []):
+ path = os.path.join(root, file)
+ try:
+ data = get_module_info(path)
+ if data:
+ rel = os.path.relpath(path, base_dir).replace("\\", "/")
+ modules[rel] = data
+ except Exception as e:
+ logger.error(f"Error processing {path}: {e}")
+
+ output = {
+ "modules": modules,
+ "meta": {
+ "total_modules": len(modules),
+ "generated_at": __import__("datetime").datetime.now().isoformat(),
+ }
}
+ with open("modules.json", "w", encoding="utf-8") as f:
+ json.dump(output, f, ensure_ascii=False, indent=2)
-modules_data = {}
-base_dir = os.getcwd()
+ print(f"modules.json written ({len(modules)} modules)")
-for root, _, files in os.walk(base_dir):
- for file in files:
- if file.endswith(".py"):
- file_path = os.path.join(root, file)
- try:
- module_info = get_module_info(file_path)
- if module_info:
- relative_path = os.path.relpath(file_path, base_dir)
- modules_data[relative_path] = module_info
- except Exception as e:
- print(f"Ошибка при парсинге файла {file_path}: {e}")
-
-developers = parse_developers(base_dir)
-
-with open("modules.json", "w", encoding="utf-8") as json_file:
- json.dump(modules_data, json_file, ensure_ascii=False, indent=2)
-
-print("Файл modules.json создан!")
-
-with open("developers.json", "w", encoding="utf-8") as json_file:
- json.dump(developers, json_file, ensure_ascii=False, indent=2)
-
-print("Файл developers.json создан!")
\ No newline at end of file
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/repositories.json b/repositories.json
index aabbb9a..43cfa2f 100644
--- a/repositories.json
+++ b/repositories.json
@@ -2,191 +2,238 @@
"repositories": [
{
"path": "DziruModules/hikkamods",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "kamolgks/Hikkamods",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "thomasmod/hikkamods",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "SkillsAngels/Modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "Sad0ff/modules-ftg",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "Yahikoro/Modules-for-FTG",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "KeyZenD/modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "AlpacaGang/ftg-modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "trololo65/Modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "Ijidishurka/modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "Fl1yd/FTG-Modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "D4n13l3k00/FTG-Modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "iamnalinor/FTG-modules",
- "tags": ["hikkatrusted", "nonactive"]
+ "tags": ["hikkatrusted", "nonactive"],
+ "blacklist": []
},
{
"path": "SekaiYoneya/modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "GeekTG/FTG-Modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "Den4ikSuperOstryyPer4ik/Astro-modules",
- "tags": ["hikkatrusted", "herokutrusted"]
+ "tags": ["hikkatrusted", "herokutrusted"],
+ "blacklist": []
},
{
"path": "vsecoder/hikka_modules",
- "tags": ["hikkatrusted", "herokutrusted"]
+ "tags": ["hikkatrusted", "herokutrusted"],
+ "blacklist": []
},
{
"path": "sqlmerr/hikka_mods",
- "tags": ["hikkatrusted", "herokutrusted"]
+ "tags": ["hikkatrusted", "herokutrusted"],
+ "blacklist": []
},
{
"path": "N3rcy/modules",
- "tags": ["hikkatrusted", "herokutrusted"]
+ "tags": ["hikkatrusted", "herokutrusted"],
+ "blacklist": []
},
{
"path": "KorenbZla/HikkaModules",
- "tags": ["hikkatrusted", "herokutrusted"]
+ "tags": ["hikkatrusted", "herokutrusted"],
+ "blacklist": []
},
{
"path": "MuRuLOSE/HikkaModulesRepo",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "coddrago/modules",
- "tags": ["herokutrusted", "hikkatrusted"]
+ "tags": ["herokutrusted", "hikkatrusted"],
+ "blacklist": []
},
{
"path": "1jpshiro/hikka-modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "MoriSummerz/ftg-mods",
- "tags": ["hikkatrusted", "nonactive"]
+ "tags": ["hikkatrusted", "nonactive"],
+ "blacklist": []
},
{
"path": "anon97945/hikka-mods",
- "tags": ["hikkatrusted", "nonactive"]
+ "tags": ["hikkatrusted", "nonactive"],
+ "blacklist": []
},
{
"path": "dorotorothequickend/DorotoroModules",
- "tags": ["hikkatrusted", "nonlongermaintained"]
+ "tags": ["hikkatrusted", "nonlongermaintained"],
+ "blacklist": []
},
{
"path": "AmoreForever/amoremods",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "idiotcoders/idiotmodules",
- "tags": ["hikkatrusted", "herokutrusted", "nonactive"]
+ "tags": ["hikkatrusted", "herokutrusted", "nonactive"],
+ "blacklist": []
},
{
"path": "CakesTwix/Hikka-Modules",
- "tags": ["hikkatrusted", "nonactive"]
+ "tags": ["hikkatrusted", "nonactive"],
+ "blacklist": []
},
{
"path": "archquise/H.Modules",
- "tags": ["hikkatrusted", "nonactive"]
+ "tags": ["hikkatrusted", "nonactive"],
+ "blacklist": []
},
{
"path": "GD-alt/mm-hikka-mods",
- "tags": ["hikkatrusted", "herokutrusted", "nonactive"]
+ "tags": ["hikkatrusted", "herokutrusted", "nonactive"],
+ "blacklist": []
},
{
"path": "HitaloSama/FTG-modules-repo",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "SekaiYoneya/Friendly-telegram",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "blazedzn/ftg-modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "hikariatama/ftg",
- "tags": ["hikkatrusted", "nonactive"]
+ "tags": ["hikkatrusted", "nonactive"],
+ "blacklist": []
},
{
"path": "m4xx1m/FTG",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "skillzmeow/skillzmods_hikka",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "fajox1/famods",
- "tags": ["hikkatrusted", "herokutrusted"]
+ "tags": ["hikkatrusted", "herokutrusted"],
+ "blacklist": []
},
{
"path": "TheKsenon/MyHikkaModules",
- "tags": ["hikkatrusted", "herokutrusted"]
+ "tags": ["hikkatrusted", "herokutrusted"],
+ "blacklist": []
},
{
"path": "cryptexctl/modules-mirror",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "Ruslan-Isaev/modules",
- "tags": ["herokutrusted"]
+ "tags": ["herokutrusted"],
+ "blacklist": []
},
{
"path": "shadowhikka/sh.modules",
- "tags": []
+ "tags": [],
+ "blacklist": []
},
{
"path": "fiksofficial/python-modules",
- "tags": ["herokutrusted"]
+ "tags": ["herokutrusted"],
+ "blacklist": []
},
{
"path": "mead0wsss/mead0wsMods",
- "tags": ["herokutrusted"]
+ "tags": ["herokutrusted"],
+ "blacklist": []
},
{
"path": "SenkoGuardian/SenModules",
- "tags": ["herokutrusted"]
+ "tags": ["herokutrusted"],
+ "blacklist": []
},
{
"path": "ZetGoHack/nullmod",
- "tags": ["herokutrusted"]
+ "tags": ["herokutrusted"],
+ "blacklist": []
},
{
"path": "yummy1gay/limoka",
- "tags": []
+ "tags": [],
+ "blacklist": []
}
]
}
\ No newline at end of file
diff --git a/update_diffs.py b/update_diffs.py
new file mode 100644
index 0000000..d039d53
--- /dev/null
+++ b/update_diffs.py
@@ -0,0 +1,193 @@
+import asyncio
+import aiohttp
+import argparse
+import subprocess
+import os
+import tempfile
+from pathlib import Path
+
+
+parser = argparse.ArgumentParser(description="Update Diffs Script")
+parser.add_argument(
+ "--token",
+ type=str,
+ required=True,
+ help="Token of Telegram bot",
+)
+parser.add_argument(
+ "--api_url",
+ type=str,
+ default="https://api.telegram.org",
+ help="API URL of Telegram API",
+)
+parser.add_argument(
+ "--chat_id",
+ type=str,
+ required=True,
+ help="Chat ID to send updates to",
+)
+parser.add_argument(
+ "--base_commit",
+ type=str,
+ default="HEAD~1",
+ help="Base commit to compare against",
+)
+
+arguments = parser.parse_args()
+
+async def send_message(session, text):
+ """Send a text message to the channel"""
+ url = f"{arguments.api_url}/bot{arguments.token}/sendMessage"
+ data = {
+ 'chat_id': arguments.chat_id,
+ 'text': text,
+ 'parse_mode': 'Markdown',
+ }
+ async with session.post(url, data=data) as response:
+ return await response.json()
+
+async def send_document(session, file_path, caption=None):
+ """Send a document to the channel"""
+ url = f"{arguments.api_url}/bot{arguments.token}/sendDocument"
+ with open(file_path, 'rb') as f:
+ data = aiohttp.FormData()
+ data.add_field('chat_id', arguments.chat_id)
+ data.add_field('document', f, filename=os.path.basename(file_path))
+ if caption:
+ data.add_field('caption', caption)
+ data.add_field('parse_mode', 'Markdown')
+ async with session.post(url, data=data) as response:
+ return await response.json()
+
+def get_changed_files(base_commit):
+ """Get list of changed files between commits"""
+ try:
+ result = subprocess.check_output(
+ ['git', 'diff', '--name-only', base_commit, 'HEAD'],
+ cwd=os.getcwd()
+ ).decode().strip().split('\n')
+ return [f for f in result if f]
+ except subprocess.CalledProcessError:
+ return []
+
+def get_file_diff(file_path, base_commit):
+ """Get diff for a specific file"""
+ try:
+ diff = subprocess.check_output(
+ ['git', 'diff', base_commit, 'HEAD', '--', file_path],
+ cwd=os.getcwd()
+ ).decode()
+ return diff
+ except subprocess.CalledProcessError:
+ return ""
+
+def is_module_file(file_path):
+ """Check if file is a Python module in a modules directory"""
+ # Check if it's a .py file and in a modules-like directory
+ return file_path.endswith('.py') and any(
+ part in file_path.lower() for part in [
+ 'modules', 'mods', 'ftg', 'hikka'
+ ]
+ )
+
+def extract_module_name(file_path):
+ """Extract module name from file path"""
+ return Path(file_path).stem
+
+async def main():
+ changed_files = get_changed_files(arguments.base_commit)
+
+ if not changed_files:
+ print("No changes detected")
+ return
+
+ # Filter for module files only
+ module_files = [f for f in changed_files if is_module_file(f)]
+
+ if not module_files:
+ print("No module changes detected")
+ return
+
+ async with aiohttp.ClientSession() as session:
+ for file_path in module_files:
+ try:
+ module_name = extract_module_name(file_path)
+
+ # Create message with raw GitHub URL
+ github_url = f"https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/{file_path}"
+ try:
+ new_hash = subprocess.check_output(
+ ['git', 'rev-list', '-n', '1', 'HEAD', '--', file_path],
+ cwd=os.getcwd()
+ ).decode().strip()
+ except Exception:
+ new_hash = 'HEAD'
+
+ try:
+ old_hash = subprocess.check_output(
+ ['git', 'rev-list', '-n', '1', arguments.base_commit, '--', file_path],
+ cwd=os.getcwd()
+ ).decode().strip()
+ except Exception:
+ old_hash = arguments.base_commit
+
+ diff_url = f"https://github.com/MuRuLOSE/limoka/compare/{old_hash}...{new_hash}.diff"
+ message = (
+ f"🪼 Module {module_name} changes approved\n\n"
+ f"[File URL]({github_url}) | [Diff URL]({diff_url})\n\n"
+ )
+
+ # Get diff
+ diff = get_file_diff(file_path, arguments.base_commit)
+
+ if not diff:
+ print(f"Skipping {file_path} - no diff content")
+ continue
+
+ # Create temporary file with diff using only module name
+ diff_filename = f"{module_name}.diff"
+ with tempfile.NamedTemporaryFile(
+ mode='w',
+ suffix='',
+ prefix='',
+ delete=False,
+ encoding='utf-8',
+ dir=tempfile.gettempdir()
+ ) as tmp_file:
+ tmp_file.write(diff)
+ tmp_file_path = tmp_file.name
+
+ try:
+ # Rename temp file to have proper name
+ final_path = os.path.join(tempfile.gettempdir(), diff_filename)
+ os.rename(tmp_file_path, final_path)
+
+ # Send diff as document with full message as caption
+ doc_result = await send_document(
+ session,
+ final_path,
+ caption=message
+ )
+ print(f"Sent diff for {module_name}: {doc_result}")
+
+ except Exception as e:
+ print(f"Error sending {module_name}: {e}")
+ finally:
+ # Cleanup temp files
+ if os.path.exists(tmp_file_path):
+ try:
+ os.remove(tmp_file_path)
+ except:
+ pass
+ final_path = os.path.join(tempfile.gettempdir(), diff_filename)
+ if os.path.exists(final_path):
+ try:
+ os.remove(final_path)
+ except:
+ pass
+
+ except Exception as e:
+ print(f"Error processing {file_path}: {e}")
+
+if __name__ == "__main__":
+ asyncio.run(main())
\ No newline at end of file