Compare commits

...

9 Commits

Author SHA1 Message Date
github-actions[bot]
e9bb89cf62 Updated modules.json after parse 2026-01-30 11:50:33 2026-01-30 11:50:33 +00:00
github-actions[bot]
51055e6427 Added and updated repositories 2026-01-30 11:50:03 2026-01-30 11:50:03 +00:00
88ab265755 fixed workflow 2026-01-30 14:46:22 +03:00
Macsim
baada2d019 Merge pull request #168 from MuRuLOSE/update-submodules_2dd772b52d052144384a8e0c71f2d33b7c21ff1d
Update of repositories 2026-01-30 11:40:55
2026-01-30 14:42:26 +03:00
github-actions[bot]
ad782a6f46 Updated modules.json after parse 2026-01-30 11:40:21 2026-01-30 11:40:21 +00:00
github-actions[bot]
2435df880e Added and updated repositories 2026-01-30 11:39:54 2026-01-30 11:39:54 +00:00
2dd772b52d fix (wrong json structure) 2026-01-30 14:38:18 +03:00
c9ed00bb78 removed categories 2026-01-30 14:28:51 +03:00
504d1f32e9 Limoka 1.4.0 2026-01-30 14:24:43 +03:00
12 changed files with 82607 additions and 45243 deletions

View File

@@ -10,8 +10,10 @@ on:
pull_request:
branches:
- main
types: [opened, synchronize, reopened, closed]
workflow_dispatch: # Allows manual triggering from GitHub UI
# Environment variables available to all jobs
env:
BRANCH_NAME: "update-submodules_${{ github.sha }}"
@@ -137,9 +139,8 @@ jobs:
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install requests scikit-learn tqdm
pip install requests
python3 parse.py
python3 categories.py
git add modules.json
git commit -m "Updated modules.json after parse $(date +'%Y-%m-%d %H:%M:%S')" || echo "No changes for modules.json"
git remote set-url origin "https://x-access-token:${GITHUB_TOKEN}@${REPO_URL}"
@@ -206,61 +207,53 @@ jobs:
echo "Branch ${{ env.BRANCH_NAME }} does not exist in remote repository, skipping PR creation."
fi
notify_diffs:
runs-on: ubuntu-latest
if: |
(github.event_name == 'push' && github.ref == 'refs/heads/main') ||
(github.event_name == 'pull_request' && github.event.action == 'closed' && github.event.pull_request.merged == true)
needs: parse
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: ${{ env.GIT_DEPTH }}
- name: Configure Git for github-actions[bot]
run: |
git config --global user.email "github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
- name: Set up Python 3.9
uses: actions/setup-python@v5
with:
python-version: '3.9'
- name: Install Python dependencies
run: pip install aiohttp
- name: Send module diffs to channel
env:
TELEGRAM_BOT_TOKEN: ${{ secrets.TELEGRAM_BOT_TOKEN }}
TELEGRAM_CHAT_ID: ${{ secrets.TELEGRAM_CHAT_ID_UPDATE }}
run: |
git fetch origin main
python3 update_diffs.py --token ${{ secrets.TELEGRAM_BOT_TOKEN }} --chat_id ${{ secrets.TELEGRAM_CHAT_ID_UPDATE }} --base_commit HEAD~1
backup:
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main' || github.event_name == 'workflow_dispatch'
needs: parse
steps:
- name: Configure Git for github-actions[bot]
run: |
git config --global user.email "github-actions[bot]@users.noreply.github.com"
git config --global user.name "github-actions[bot]"
git config --global --list
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: ${{ env.GIT_DEPTH }}
- name: Create and send backup to Telegram
- name: Set up Python 3.9
uses: actions/setup-python@v5
with:
python-version: '3.9'
- name: Install Python dependencies
run: pip install aiohttp
- name: Run backup script
env:
TELEGRAM_BOT_TOKEN: ${{ secrets.TELEGRAM_BOT_TOKEN }}
TELEGRAM_CHAT_ID: ${{ secrets.TELEGRAM_CHAT_ID }}
run: |
echo "Creating .zip file of the repository with maximum compression..."
git archive --format=zip --output=repository-original.zip HEAD
zip -9 repository.zip repository-original.zip
rm repository-original.zip
echo "File size of the created .zip file:"
du -sh repository.zip
echo "Splitting the .zip file into 8 parts..."
split -b 49M repository.zip repository-part-
echo "Files created after split:"
ls repository-part-*
COMMIT_MESSAGE="$(git log -1 --pretty=%B)"
COMMIT_DATE="$(date --date="$(git log -1 --pretty=%ci)" +'%Y-%m-%d %H:%M:%S')"
COMMIT_HASH="$(git rev-parse --short=6 HEAD)"
COMMIT_URL="https://${REPO_URL}/commit/$(git rev-parse HEAD)"
MESSAGE="Commit Date: $COMMIT_DATE, Commit Message: $COMMIT_MESSAGE, Commit Hash: [\`$COMMIT_HASH\`]($COMMIT_URL)"
echo "Sending .zip file parts to Telegram..."
FIRST_PART=true
for part in $(ls repository-part-* | sort); do
if $FIRST_PART; then
TELEGRAM_API_URL="https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendDocument"
echo "Sending first file to Telegram: $TELEGRAM_API_URL"
curl -X POST "$TELEGRAM_API_URL" \
-F chat_id=$TELEGRAM_CHAT_ID \
-F document=@$part \
-F caption="$MESSAGE" \
-F parse_mode="Markdown"
FIRST_PART=false
else
TELEGRAM_API_URL="https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendDocument"
echo "Sending file to Telegram: $TELEGRAM_API_URL"
curl -X POST "$TELEGRAM_API_URL" \
-F chat_id=$TELEGRAM_CHAT_ID \
-F document=@$part
fi
done
echo "Files sent to Telegram successfully!"
python backup.py --token ${{ secrets.TELEGRAM_BOT_TOKEN }} --chat_id ${{ secrets.TELEGRAM_CHAT_ID }}

858
Limoka.py

File diff suppressed because it is too large Load Diff

View File

@@ -23,4 +23,4 @@ class K(loader.Module):
"""K"""
raise Exception("Testing error handling")
await utils.answer(message, "K")
# why
# why FUCK YOU BILL GATES

View File

@@ -3,9 +3,7 @@
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
__version__ = (5, 7, 0) #перепешите на меня квартиру пж
#ладно
__version__ = (5, 8, 1) #фыр
# meta developer: @SenkoGuardianModules
@@ -21,10 +19,14 @@ import os
import io
import random
import socket
import base64
import uuid
import json
from PIL import Image
import asyncio
import logging
import tempfile
import httpx
import aiohttp
from datetime import datetime
from markdown_it import MarkdownIt
import pytz
@@ -59,11 +61,13 @@ DB_GAUTO_HISTORY_KEY = "gemini_gauto_conversations_v1"
DB_IMPERSONATION_KEY = "gemini_impersonation_chats"
GEMINI_TIMEOUT = 840
MAX_FFMPEG_SIZE = 90 * 1024 * 1024
DB_KEY_MAP_KEY = "gemini_key_model_map"
CHECK_MODEL = "gemini-2.5-pro"
# requires: google-genai google-api-core pytz markdown_it_py
class Gemini(loader.Module):
"""Модуль для работы с Google Gemini AI (New SDK). Поддержка видео/фото/аудио и контекста пользователей."""
"""Модуль для работы с Google Gemini AI. (Поддержка видео/фото/аудио"""
strings = {
"name": "Gemini",
"cfg_api_key_doc": "API ключи Google Gemini, разделенные запятой. Будут скрыты.",
@@ -78,6 +82,8 @@ class Gemini(loader.Module):
"cfg_impersonation_reply_chance_doc": "Вероятность ответа в режиме gauto (от 0.0 до 1.0). 0.2 = 20% шанс.",
"cfg_temperature_doc": "Температура генерации (креативность). От 0.0 до 2.0. По умолчанию 1.0.",
"cfg_google_search_doc": "Включить поиск Google (Grounding) для актуальной информации.",
"cfg_image_model_doc": "Модель Gemini для генерации изображений (например: gemini-2.5-flash-image).",
"cfg_inline_pagination_doc": "Использовать инлайн-кнопки для длинных ответов.",
"no_api_key": '❗️ <b>Api ключ(и) не настроен(ы).</b>\nПолучить Api ключ можно <a href="https://aistudio.google.com/app/apikey">здесь</a>.\n<b>Добавьте ключ(и) в конфиге модуля:</b> <code>.cfg gemini api_key</code>',
"invalid_api_key": '❗️ <b>Предоставленный API ключ недействителен.</b>\nУбедитесь, что он правильно скопирован из <a href="https://aistudio.google.com/app/apikey">Google AI Studio</a> и что для него включен Gemini API.',
"all_keys_exhausted": "❗️ <b>Все доступные API ключи ({}) исчерпали свою квоту.</b>\nПопробуйте позже или добавьте новые ключи в конфиге: <code>.cfg gemini api_key</code>",
@@ -135,7 +141,7 @@ class Gemini(loader.Module):
"gme_sent_to_saved": "💾 История экспортирована в избранное.",
"new_sdk_missing": "⚠️ <b>Для работы модуля нужна библиотека google-genai.</b>\nВыполните: <code>pip install google-genai</code>",
"gprompt_usage": " <b>Использование:</b>\n<code>.gprompt <текст></code> — установить промпт.\n<code>.gprompt -c</code> — очистить.\nИли ответьте на <b>.txt</b> файл.",
"gprompt_updated": "✅ <b>Системный промпт обновлен!</b>\nДлина: {} симв.",
"gprompt_updated": "✅ <b>Системный промпт обновлен!</b>\nДлина: {} символов.",
"gprompt_cleared": "🗑 <b>Системный промпт очищен.</b>",
"gprompt_current": "📝 <b>Текущий системный промпт:</b>",
"gprompt_file_error": "❗️ <b>Ошибка чтения файла:</b> {}",
@@ -143,6 +149,7 @@ class Gemini(loader.Module):
"gprompt_not_text": "❗️ Это не похоже на текстовый файл.(txt)",
"gmodel_no_models": "⚠️ Не удалось получить список моделей.",
"gmodel_list_error": "❗️ Ошибка получения списка: {}",
"gimg_process": "<emoji document_id=5325547803936572038>✨</emoji> <b>Генерация...</b>\n🧠 <i>Модель: {model}</i>",
}
TEXT_MIME_TYPES = {
"text/plain", "text/markdown", "text/html", "text/css", "text/csv",
@@ -174,6 +181,8 @@ class Gemini(loader.Module):
loader.ConfigValue("gauto_in_pm", False, "Разрешить авто-ответы в личных сообщениях (ЛС).", validator=loader.validators.Boolean()),
loader.ConfigValue("google_search", False, self.strings["cfg_google_search_doc"], validator=loader.validators.Boolean()),
loader.ConfigValue("temperature", 1.0, self.strings["cfg_temperature_doc"], validator=loader.validators.Float(minimum=0.0, maximum=2.0)),
loader.ConfigValue("inline_pagination", False, self.strings["cfg_inline_pagination_doc"], validator=loader.validators.Boolean()),
loader.ConfigValue("image_model_name", "gemini-2.5-flash-image", self.strings["cfg_image_model_doc"]),
)
self.conversations = {}
self.gauto_conversations = {}
@@ -181,16 +190,25 @@ class Gemini(loader.Module):
self.impersonation_chats = set()
self._lock = asyncio.Lock()
self.memory_disabled_chats = set()
self.pager_cache = {}
self.key_model_map = {}
self.prompt_presets = []
self.api_keys = []
async def client_ready(self, client, db):
self.client = client
self.db = db
self.me = await client.get_me()
api_key_str = self.config["api_key"]
self.api_keys = [k.strip() for k in api_key_str.split(",") if k.strip()] if api_key_str else []
self.key_model_map = self.db.get(self.strings["name"], DB_KEY_MAP_KEY, {})
keys_to_remove = [k for k in self.key_model_map if k not in self.api_keys]
if keys_to_remove:
for k in keys_to_remove: del self.key_model_map[k]
self.db.set(self.strings["name"], DB_KEY_MAP_KEY, self.key_model_map)
if not GOOGLE_AVAILABLE:
logger.error("Gemini: 'google-genai' library missing! pip install google-genai")
return
api_key_str = self.config["api_key"]
self.api_keys = [k.strip() for k in api_key_str.split(",") if k.strip()] if api_key_str else []
self.current_api_key_index = 0
self.conversations = self._load_history_from_db(DB_HISTORY_KEY)
self.gauto_conversations = self._load_history_from_db(DB_GAUTO_HISTORY_KEY)
@@ -332,16 +350,13 @@ class Gemini(loader.Module):
raw_hist = self._get_structured_history(chat_id, gauto=impersonation_mode)
if regeneration and raw_hist: raw_hist = raw_hist[:-2]
for item in raw_hist:
contents.append(types.Content(
role=item['role'],
parts=[types.Part(text=item['content'])]
))
contents.append(types.Content(role=item['role'], parts=[types.Part(text=item['content'])]))
request_parts = list(current_turn_parts)
if not impersonation_mode:
try: user_timezone = pytz.timezone(self.config["timezone"])
except pytz.UnknownTimeZoneError: user_timezone = pytz.utc
now = datetime.now(user_timezone)
time_note = f"[System note: Current time is {now.strftime('%Y-%m-%d %H:%M:%S %Z')}]"
time_note = f"[System Info: Current local time is {now.strftime('%Y-%m-%d %H:%M:%S %Z')}]"
if request_parts and getattr(request_parts[0], 'text', None):
request_parts[0] = types.Part(text=f"{time_note}\n\n{request_parts[0].text}")
else:
@@ -367,22 +382,19 @@ class Gemini(loader.Module):
http_opts = None
if proxy_config:
http_opts = types.HttpOptions(async_client_args={"proxies": proxy_config})
client = genai.Client(api_key=api_key, http_options=http_opts)
response = await client.aio.models.generate_content(
model=self.config["model_name"],
contents=contents,
config=gen_config
)
if response.text:
result_text = response.text
was_successful = True
if self.config["google_search"]: search_icon = " 🌐"
self.current_api_key_index = current_idx
break
else:
raise ValueError("Empty response (Safety?)")
else: raise ValueError("Empty response")
except Exception as e:
err_str = str(e).lower()
if "quota" in err_str or "exhausted" in err_str or "429" in err_str:
@@ -392,8 +404,7 @@ class Gemini(loader.Module):
last_error = e
break
try:
if not was_successful:
raise last_error or RuntimeError("Unknown generation error")
if not was_successful: raise last_error or RuntimeError("Unknown generation error")
if self._is_memory_enabled(str(chat_id)):
self._update_history(chat_id, current_turn_parts, result_text, regeneration, msg_obj, gauto=impersonation_mode)
if impersonation_mode: return result_text
@@ -406,14 +417,25 @@ class Gemini(loader.Module):
question_html = f"<blockquote>{utils.escape_html(request_text_for_display[:200])}</blockquote>"
text_to_send = f"{mem_ind}\n\n{self.strings['question_prefix']}\n{question_html}\n\n{self.strings['response_prefix']}{search_icon}\n{formatted_body}"
buttons = self._get_inline_buttons(chat_id, base_message_id) if self.config["interactive_buttons"] else None
if len(text_to_send) > 4096:
is_long_text = len(result_text) > 3500
if is_long_text and self.config["inline_pagination"]:
chunks = self._paginate_text(result_text, 3000)
uid = uuid.uuid4().hex[:6]
header = f"{mem_ind}\n\n{self.strings['question_prefix']} <blockquote>{utils.escape_html(request_text_for_display[:100])}...</blockquote>\n\n{self.strings['response_prefix']}{search_icon}\n"
self.pager_cache[uid] = {
"chunks": chunks,
"total": len(chunks),
"header": header,
"chat_id": chat_id,
"msg_id": base_message_id
}
await self._render_page(uid, 0, call or status_msg)
elif len(text_to_send) > 4096:
file_content = (f"Вопрос: {display_prompt}\n\n════════════════════\n\nОтвет Gemini:\n{result_text}")
file = io.BytesIO(file_content.encode("utf-8"))
file.name = "Gemini_response.txt"
file = io.BytesIO(file_content.encode("utf-8")); file.name = "Gemini_response.txt"
if call:
await call.answer("Ответ длинный, отправляю файлом...", show_alert=False)
await self.client.send_file(call.chat_id, file, caption=self.strings["response_too_long"], reply_to=call.message_id)
await call.edit(f"{self.strings['response_too_long']}", reply_markup=None)
elif status_msg:
await status_msg.delete()
await self.client.send_file(chat_id, file, caption=self.strings["response_too_long"], reply_to=base_message_id)
@@ -451,6 +473,78 @@ class Gemini(loader.Module):
use_url_context=use_url_context, display_prompt=clean_args or None
)
@loader.command()
async def gimg(self, message: Message):
"""<промпт> [реплай на фото] — Генерация/Редактирование изображений через Gemini."""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
input_bytes = None
if reply:
if reply.photo:
input_bytes = await self.client.download_media(reply, bytes)
elif reply.document and reply.document.mime_type.startswith("image/"):
input_bytes = await self.client.download_media(reply, bytes)
if not args and not input_bytes:
return await utils.answer(message, "🎨 <b>Введите промпт.</b>\nПример: <code>.gimg кот в космосе</code>")
prompt = args if args else "Describe/Modify this image"
model = self.config["image_model_name"]
m = await utils.answer(message, self.strings["gimg_process"].format(model=model))
try:
res = await self._call_google_rest(model, prompt, input_bytes)
if "error" in res:
err_msg = res["error"]["message"]
try: err_msg = json.loads(err_msg)["error"]["message"]
except: pass
raise ValueError(err_msg)
img_bytes = None
try:
parts = res["candidates"][0]["content"]["parts"]
for part in parts:
if "inlineData" in part:
img_bytes = base64.b64decode(part["inlineData"]["data"])
break
except Exception as e:
raise ValueError(f"Ошибка парсинга ответа: {e}")
if not img_bytes:
raise ValueError("Модель не вернула изображение (возможно, сработал Safety Filter).")
out = io.BytesIO(img_bytes)
out.name = f"gemini_{uuid.uuid4().hex[:6]}.jpg"
await self.client.send_file(
utils.get_chat_id(message),
out,
caption=f"🎨 <b>Gemini Image</b>\n🧠 <code>{model}</code>\n📜 <code>{utils.escape_html(prompt[:100])}</code>",
reply_to=message.id
)
await m.delete()
except Exception as e:
await utils.answer(m, f"❌ <b>Ошибка:</b>\n<code>{utils.escape_html(str(e))}</code>")
@loader.command()
async def gskey(self, message: Message):
"""[-h] — Сканировать ключи. -h: показать статус из кеша без проверки."""
args = utils.get_args_raw(message).strip()
if args in ["-h", "--having", "having"]:
premium = sum(1 for v in self.key_model_map.values() if v == 1)
free = sum(1 for v in self.key_model_map.values() if v == 0)
report = (
f"📊 <b>Статус ключей (кеш):</b>\n"
f"💎 <b>Premium/Active:</b> {premium}\n"
f"👻 <b>Free/Unknown:</b> {free}\n"
f"🔑 <b>Всего в конфиге:</b> {len(self.api_keys)}"
)
return await utils.answer(message, report)
await utils.answer(message, "<emoji document_id=5386367538735104399>⌛️</emoji> <b>Сканирую ключи...</b>\n<i>Это займет время (1.2 сек на ключ).</i>")
report, invalid_keys = await self._scan_keys(force=True)
if invalid_keys:
txt_keys = "\n".join(invalid_keys)
try:
await self.client.send_message("me", f"🚫 <b>Gemini: Найдены невалидные ключи:</b>\nУдали их из конфига:\n\n<code>{txt_keys}</code>")
report += "\n\n⚠️ <b>Список невалидных ключей отправлен в Избранное.</b>"
except:
report += "\n\n⚠️ <b>Найдены невалидные ключи.</b>"
await utils.answer(message, report)
@loader.command()
async def gch(self, message: Message):
"""<[id чата]> <кол-во> <вопрос> - Проанализировать историю чата."""
@@ -479,6 +573,8 @@ class Gemini(loader.Module):
entity = await self.client.get_entity(target_chat_id)
chat_name = utils.escape_html(get_display_name(entity))
chat_log = await self._get_recent_chat_text(target_chat_id, count=count, skip_last=False)
except (ValueError, TypeError, ChatAdminRequiredError, UserNotParticipantError, ChannelPrivateError) as e:
return await utils.answer(status_msg, self.strings["gch_chat_error"].format(target_chat_id, e.__class__.__name__))
except Exception as e:
return await utils.answer(status_msg, self.strings["gch_chat_error"].format(target_chat_id, e))
full_prompt = (
@@ -671,7 +767,6 @@ class Gemini(loader.Module):
import json
hist = json.loads(f)
if not isinstance(hist, list): raise ValueError
cid = utils.get_chat_id(message)
target = self.gauto_conversations if gauto else self.conversations
target[str(cid)] = hist
@@ -681,7 +776,7 @@ class Gemini(loader.Module):
@loader.command()
async def gmemfind(self, message: Message):
"""[слово] — Поиск по истории текущего чата по ключевому слову или фразе."""
"""[слово] — Поиск в памяти текущего чата по ключевому слову или фразе."""
q = utils.get_args_raw(message).lower()
if not q: return await utils.answer(message, "Укажите слово для поиска.")
cid = utils.get_chat_id(message)
@@ -755,6 +850,118 @@ class Gemini(loader.Module):
self._save_history_sync(False)
await utils.answer(message, self.strings["memory_fully_cleared"].format(n))
@loader.callback_handler()
async def gemini_callback_handler(self, call: InlineCall):
if not call.data.startswith("gemini:"): return
parts = call.data.split(":")
action = parts[1]
if action == "noop":
await call.answer()
return
if action == "pg":
uid = parts[2]
page = int(parts[3])
await self._render_page(uid, page, call)
return
async def _clear_callback(self, call: InlineCall, cid):
self._clear_history(cid, gauto=False)
await call.edit(self.strings["memory_cleared"], reply_markup=None)
async def _regenerate_callback(self, call: InlineCall, mid, cid):
key = f"{cid}:{mid}"
if key not in self.last_requests: return await call.answer(self.strings["no_last_request"], show_alert=True)
parts, disp = self.last_requests[key]
use_url_context = bool(re.search(r'https?://\S+', disp or ""))
await self._send_to_gemini(mid, parts, regeneration=True, call=call, chat_id_override=cid, display_prompt=disp, use_url_context=use_url_context)
async def _close_callback(self, call: InlineCall, uid: str):
"""Обрабатывает нажатие кнопки закрытия для пагинации"""
await call.answer()
if uid in self.pager_cache:
del self.pager_cache[uid]
try:
await self.client.delete_messages(call.chat_id, call.message_id)
except Exception:
try:
await call.edit("✔️ Сессия закрыта.", reply_markup=None)
except Exception:
pass
async def _render_page(self, uid, page_num, entity):
data = self.pager_cache.get(uid)
if not data:
if isinstance(entity, InlineCall):
await entity.edit("⚠️ <b>Сессия истекла (RAM cleared).</b>", reply_markup=None)
return
chunks = data["chunks"]
total = data["total"]
header = data.get("header", "")
raw_text_chunk = chunks[page_num]
safe_text = self._markdown_to_html(raw_text_chunk)
text_to_show = f"{header}<blockquote expandable>{safe_text}</blockquote>"
nav_row = []
if page_num > 0:
nav_row.append({"text": "◀️", "data": f"gemini:pg:{uid}:{page_num - 1}"})
nav_row.append({"text": f"{page_num + 1}/{total}", "data": "gemini:noop"})
if page_num < total - 1:
nav_row.append({"text": "▶️", "data": f"gemini:pg:{uid}:{page_num + 1}"})
extra_row = [{"text": "❌ Закрыть", "callback": self._close_callback, "args": (uid,)}]
if data.get("chat_id") and data.get("msg_id"):
extra_row.append({"text": "🔄", "callback": self._regenerate_callback, "args": (data['msg_id'], data['chat_id'])})
buttons = [nav_row, extra_row]
if isinstance(entity, Message):
await self.inline.form(text=text_to_show, message=entity, reply_markup=buttons)
elif isinstance(entity, InlineCall):
await entity.edit(text=text_to_show, reply_markup=buttons)
elif hasattr(entity, "edit"):
try: await entity.edit(text=text_to_show, reply_markup=buttons)
except: pass
def _paginate_text(self, text: str, limit: int) -> list:
pages = []
current_page_lines = []
current_len = 0
in_code_block = False
current_code_lang = ""
lines = text.split('\n')
for line in lines:
line_len = len(line) + 1
stripped = line.strip()
if stripped.startswith("```"):
if in_code_block:
in_code_block = False
current_code_lang = ""
else:
in_code_block = True
current_code_lang = stripped.replace("```", "").strip()
if current_len + line_len > limit:
if current_page_lines:
if in_code_block: current_page_lines.append("```")
pages.append("\n".join(current_page_lines))
current_page_lines = []
current_len = 0
if in_code_block:
header = f"```{current_code_lang}"
current_page_lines.append(header)
current_len += len(header) + 1
if line_len > limit:
chunks = [line[i:i+limit] for i in range(0, len(line), limit)]
for chunk in chunks:
if current_len + len(chunk) > limit:
pages.append("\n".join(current_page_lines))
current_page_lines = [chunk]
current_len = len(chunk)
else:
current_page_lines.append(chunk)
current_len += len(chunk)
continue
current_page_lines.append(line)
current_len += line_len
if current_page_lines:
pages.append("\n".join(current_page_lines))
return pages
@loader.watcher(only_incoming=True, ignore_edited=True)
async def watcher(self, message: Message):
if not hasattr(message, 'chat_id'): return
@@ -806,10 +1013,13 @@ class Gemini(loader.Module):
user_id = self.me.id
user_name = get_display_name(self.me)
message_id = getattr(message, "id", None)
if message:
if message.sender_id:
user_id = message.sender_id
try:
peer_id = get_peer_id(message)
if peer_id:
user_id = peer_id
except (TypeError, ValueError):
if message.sender_id: user_id = message.sender_id
if message.sender:
user_name = get_display_name(message.sender)
user_text = " ".join([p.text for p in user_parts if hasattr(p, "text") and p.text]) or "[ответ на медиа]"
@@ -838,7 +1048,6 @@ class Gemini(loader.Module):
"date": now,
"user_id": None
}
history.extend([user_entry, model_entry])
limit = self.config["max_history_length"]
if limit > 0 and len(history) > limit * 2:
@@ -853,8 +1062,6 @@ class Gemini(loader.Module):
del d[str(cid)]
self._save_history_sync(gauto)
def _is_memory_enabled(self, cid): return cid not in self.memory_disabled_chats
def _markdown_to_html(self, text):
text = re.sub(r"^(#+)\s+(.*)", lambda m: f"<b>{m.group(2)}</b>", text, flags=re.M)
text = re.sub(r"^([ \t]*)[-*+]\s+", r"\1• ", text, flags=re.M)
@@ -911,42 +1118,34 @@ class Gemini(loader.Module):
if txt.strip(): lines.append(f"{name}: {txt.strip()}")
except: pass
return "\n".join(reversed(lines))
def _handle_error(self, e: Exception) -> str:
logger.exception("Gemini execution error")
if isinstance(e, asyncio.TimeoutError):
return self.strings["api_timeout"]
if google_exceptions and isinstance(e, google_exceptions.GoogleAPIError):
msg = str(e)
if "quota" in msg.lower() or "exceeded" in msg.lower():
model = self.config.get("model_name", "unknown")
return (
f"❗️ <b>Превышен лимит Google Gemini API для модели <code>{utils.escape_html(model)}</code>.</b>\n"
f"<b>Детали ошибки:</b>\n<code>{utils.escape_html(msg)}</code>"
)
if "User location is not supported" in msg or "location is not supported" in msg:
return (
'❗️ <b>В данном регионе Gemini API не доступен.</b>\n'
'Используйте VPN или прокси.'
)
if "API key not valid" in msg:
return self.strings["invalid_api_key"]
if "blocked" in msg.lower():
return self.strings["blocked_error"].format(utils.escape_html(msg))
return self.strings["api_error"].format(utils.escape_html(msg))
if isinstance(e, (OSError, socket.timeout)):
return "❗️ <b>Сетевая ошибка:</b>\n<code>{}</code>".format(utils.escape_html(str(e)))
msg = str(e)
if "quota" in msg.lower() or "exhausted" in msg.lower() or "429" in msg:
model = self.config.get("model_name", "unknown")
return (
f"❗️ <b>Превышен лимит Google Gemini API для модели <code>{utils.escape_html(model)}</code>.</b>"
"\n\nЧаще всего это происходит на бесплатном тарифе. Вы можете:\n"
"• Подождать, пока лимит сбросится (обычно раз в сутки).\n"
"• Проверить свой тарифный план в <a href='https://aistudio.google.com/app/billing'>Google AI Studio</a>.\n"
"• Узнать больше о лимитах <a href='https://ai.google.dev/gemini-api/docs/rate-limits'>здесь</a>.\n\n"
f"<b>Детали ошибки:</b>\n<code>{utils.escape_html(msg)}</code>"
)
if "location" in msg.lower() or "not supported" in msg.lower():
return (
'❗️ <b>В данном регионе Gemini API не доступен.</b>\n'
'Скачайте VPN (для пк/тел) или поставьте прокси (платный/бесплатный).\n'
'Или воспользуйтесь инструкцией <a href="https://t.me/SenkoGuardianModules/23">вот тут</a>\n'
'А для тех у кого UserLand инструкция <a href="https://t.me/SenkoGuardianModules/35">тут</a>'
)
if "key" in msg.lower() and "valid" in msg.lower():
return self.strings["invalid_api_key"]
if "blocked" in msg.lower():
return self.strings["blocked_error"].format(utils.escape_html(msg))
if "500" in msg:
return (
"❗️ <b>Ошибка 500 от Google API.</b>\n"
"Это значит, что формат медиа (файл или еще что то) который ты отправил, не поддерживается.\n"
"Такое случается, по такой причине:\n "
"• Если формат файла в принципе не поддерживается Gemini/Гуглом.\n "
"• Временный сбой на серверах Google. Попробуйте повторить запрос позже."
)
return self.strings["api_error"].format(utils.escape_html(msg))
if "quota" in msg.lower() or "429" in msg: return self.strings["all_keys_exhausted"].format(len(self.api_keys))
return self.strings["generic_error"].format(utils.escape_html(msg))
def _markdown_to_html(self, text: str) -> str:
def heading_replacer(match):
@@ -1001,24 +1200,7 @@ class Gemini(loader.Module):
async def _clear_callback(self, call: InlineCall, chat_id: int):
self._clear_history(chat_id, gauto=False)
await call.edit(self.strings["memory_cleared"], reply_markup=None)
async def _regenerate_callback(self, call: InlineCall, original_message_id: int, chat_id: int):
key = f"{chat_id}:{original_message_id}"
last_request_tuple = self.last_requests.get(key)
if not last_request_tuple:
return await call.answer(self.strings["no_last_request"], show_alert=True)
last_parts, display_prompt = last_request_tuple
use_url_context = bool(re.search(r'https?://\S+', display_prompt or ""))
await self._send_to_gemini(
message=original_message_id,
parts=last_parts,
regeneration=True,
call=call,
chat_id_override=chat_id,
use_url_context=use_url_context,
display_prompt=display_prompt
)
async def _get_recent_chat_text(self, chat_id: int, count: int = None, skip_last: bool = False) -> str:
history_limit = count or self.config["impersonation_history_limit"]
@@ -1049,6 +1231,111 @@ class Gemini(loader.Module):
logger.warning(f"Не удалось получить историю для авто-ответа: {e}")
return "\n".join(reversed(chat_history_lines))
async def _scan_keys(self, force=False):
"""
Сканирует ключи на валидность.
"""
if not GOOGLE_AVAILABLE: return "Library missing", []
current_map_keys = list(self.key_model_map.keys())
for k in current_map_keys:
if k not in self.api_keys: del self.key_model_map[k]
if not force and all(k in self.key_model_map for k in self.api_keys):
return "Loaded from cache", []
if force: self.key_model_map = {}
proxy_config = self._get_proxy_config()
http_opts = types.HttpOptions(async_client_args={"proxies": proxy_config, "timeout": 10.0}) if proxy_config else None
active_keys = []
invalid_keys = []
minimal_config = types.GenerateContentConfig(
response_mime_type="text/plain",
max_output_tokens=1,
candidate_count=1,
safety_settings=[types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE")]
)
for i, key in enumerate(self.api_keys):
if i > 0: await asyncio.sleep(1.2)
try:
client = genai.Client(api_key=key, http_options=http_opts)
response = await client.aio.models.generate_content(
model=CHECK_MODEL, contents="test", config=minimal_config
)
active_keys.append(key)
self.key_model_map[key] = 1
except Exception as e:
err = str(e).lower()
if "invalid_argument" in err or "api_key_invalid" in err or "400" in err or "blocked" in err:
invalid_keys.append(key)
else:
self.key_model_map[key] = 0
self.db.set(self.strings["name"], DB_KEY_MAP_KEY, self.key_model_map)
short_report = (
f"✅ <b>Скан завершен.</b>\n"
f"💎 <b>Active:</b> {len(active_keys)}\n"
f"🗑 <b>Invalid:</b> {len(invalid_keys)}\n"
f"👻 <b>RateLimited/Other:</b> {len(self.api_keys) - len(active_keys) - len(invalid_keys)}"
)
return short_report, invalid_keys
def _get_sorted_keys(self):
valid_keys = []
for key in self.api_keys:
if key not in self.key_model_map:
if not self.key_model_map: valid_keys.append((key, 0, random.random()))
continue
tier = self.key_model_map[key]
valid_keys.append((key, tier, random.random()))
valid_keys.sort(key=lambda x: (x[1], x[2]))
return [item[0] for item in valid_keys]
async def _call_google_rest(self, model_name: str, prompt: str, input_image_bytes=None):
keys = self._get_sorted_keys()
if not keys: return {"error": {"message": "Нет доступных API ключей"}}
parts = [{"text": prompt}]
if input_image_bytes:
resized = await utils.run_sync(self._resize_image_ig, input_image_bytes)
b64_img = base64.b64encode(resized).decode('utf-8')
parts.insert(0, {"inlineData": {"mimeType": "image/jpeg", "data": b64_img}})
payload = {
"contents": [{"parts": parts}],
"safetySettings": [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}
],
"generationConfig": {"candidateCount": 1, "temperature": 1.0}
}
proxy = self.config['proxy'] if self.config['proxy'] else None
last_error = None
async with aiohttp.ClientSession() as session:
for i, api_key in enumerate(keys):
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model_name}:generateContent?key={api_key}"
try:
if i > 0: await asyncio.sleep(1)
async with session.post(url, json=payload, proxy=proxy, timeout=60) as resp:
if resp.status == 200:
return await resp.json()
elif resp.status in [429, 503, 403]:
last_error = f"HTTP {resp.status}"
continue
else:
text = await resp.text()
return {"error": {"message": f"HTTP {resp.status}: {text}"}}
except Exception as e:
last_error = str(e)
continue
return {"error": {"message": f"All keys exhausted. Last error: {last_error}"}}
def _resize_image_ig(self, img_bytes):
try:
img = Image.open(io.BytesIO(img_bytes))
img.thumbnail((1024, 1024))
out = io.BytesIO()
if img.mode in ("RGBA", "P"): img = img.convert("RGB")
img.save(out, format='JPEG', quality=85)
return out.getvalue()
except: return img_bytes
def _is_memory_enabled(self, chat_id: str) -> bool: return chat_id not in self.memory_disabled_chats
def _disable_memory(self, chat_id: int): self.memory_disabled_chats.add(str(chat_id))
def _enable_memory(self, chat_id: int): self.memory_disabled_chats.discard(str(chat_id))

74
backup.py Normal file
View File

@@ -0,0 +1,74 @@
import asyncio
import aiohttp
import argparse
import subprocess
import os
import glob
parser = argparse.ArgumentParser(description="Backup Script")
parser.add_argument(
"--token",
type=str,
required=True,
help="Token of Telegram bot",
)
parser.add_argument(
"--api_url",
type=str,
default="https://api.telegram.org",
help="API URL of Telegram API",
)
parser.add_argument(
"--chat_id",
type=str,
required=True,
help="Chat ID to send backup message to",
)
arguments = parser.parse_args()
async def send_file(session, file_path, caption=None):
url = f"{arguments.api_url}/bot{arguments.token}/sendDocument"
with open(file_path, 'rb') as f:
data = aiohttp.FormData()
data.add_field('chat_id', arguments.chat_id)
data.add_field('document', f, filename=os.path.basename(file_path))
if caption:
data.add_field('caption', caption)
data.add_field('parse_mode', 'Markdown')
async with session.post(url, data=data) as response:
return await response.json()
async def main():
# Get commit info
commit_message = subprocess.check_output(['git', 'log', '-1', '--pretty=%B']).decode().strip()
commit_date = subprocess.check_output(['git', 'log', '-1', '--pretty=%ci']).decode().strip()
commit_hash = subprocess.check_output(['git', 'rev-parse', '--short=6', 'HEAD']).decode().strip()
commit_url = f"https://github.com/MuRuLOSE/limoka/commit/{subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip()}"
message = f"Commit Date: {commit_date}, Commit Message: {commit_message}, Commit Hash: [`{commit_hash}`]({commit_url})"
# Create zip
subprocess.run(['git', 'archive', '--format=zip', '--output=repository-original.zip', 'HEAD'])
subprocess.run(['zip', '-9', 'repository.zip', 'repository-original.zip'])
os.remove('repository-original.zip')
# Split zip
subprocess.run(['split', '-b', '49M', 'repository.zip', 'repository-part-'])
# Send parts
async with aiohttp.ClientSession() as session:
parts = sorted(glob.glob('repository-part-*'))
first = True
for part in parts:
caption = message if first else None
result = await send_file(session, part, caption)
print(f"Sent {part}: {result}")
first = False
# Cleanup
os.remove('repository.zip')
for part in parts:
os.remove(part)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,126 +0,0 @@
import json
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.multiclass import OneVsRestClassifier
from sklearn.preprocessing import MultiLabelBinarizer
from tqdm import tqdm
import numpy as np
# Тренировочные данные (48 модулей)
training_data = {
"MuRuLOSE/HikkaModulesRepo/filters.py": ["Tools", "Chat"],
"MuRuLOSE/HikkaModulesRepo/autogiveawayjoin.py": ["Automation", "Social"],
"MuRuLOSE/HikkaModulesRepo/HTTPCat.py": ["Fun"],
"MuRuLOSE/HikkaModulesRepo/CustomPing.py": ["Tools", "Networking"],
"MuRuLOSE/HikkaModulesRepo/FuckTagOne.py": ["Moderation"],
"MuRuLOSE/HikkaModulesRepo/InlineButtons.py": ["Tools", "Chat"],
"MuRuLOSE/HikkaModulesRepo/YoutubeDL.py": ["Media"],
"MuRuLOSE/HikkaModulesRepo/youtubesearcher.py": ["Media", "Tools"],
"MuRuLOSE/HikkaModulesRepo/INumber.py": ["Fun", "Info"],
"MuRuLOSE/HikkaModulesRepo/RandomDog.py": ["Fun"],
"MuRuLOSE/HikkaModulesRepo/RemoveLinks.py": ["Moderation", "Chat"],
"MuRuLOSE/HikkaModulesRepo/SteamClient.py": ["Games", "Tools"],
"MuRuLOSE/HikkaModulesRepo/PinMoreChats.py": ["Chat", "Productivity"],
"MuRuLOSE/HikkaModulesRepo/MindGameCheat.py": ["Games", "Tools"],
"MuRuLOSE/HikkaModulesRepo/NasaImages.py": ["Media", "Info"],
"MuRuLOSE/HikkaModulesRepo/autoreader.py": ["Automation", "Chat"],
"MuRuLOSE/HikkaModulesRepo/K.py": ["Fun"],
"MuRuLOSE/HikkaModulesRepo/Genshin.py": ["Games"],
"MuRuLOSE/HikkaModulesRepo/compliments.py": ["Social", "Fun"],
"MuRuLOSE/HikkaModulesRepo/AutoLeave.py": ["Automation", "Chat"],
"MuRuLOSE/HikkaModulesRepo/ToTHosting.py": ["Tools", "Admin"],
"MuRuLOSE/HikkaModulesRepo/PasswordUtils.py": ["Security", "Tools"],
"MuRuLOSE/HikkaModulesRepo/FuckJoins.py": ["Security", "Chat"],
"MuRuLOSE/HikkaModulesRepo/SpyEVO.py": ["Tools", "Info"],
"MuRuLOSE/HikkaModulesRepo/FindID.py": ["Tools", "Admin"],
"MuRuLOSE/HikkaModulesRepo/ChannelCheck.py": ["Tools", "Social"],
"MuRuLOSE/HikkaModulesRepo/controlspam.py": ["Chat", "Tools"],
"MuRuLOSE/HikkaModulesRepo/VKMusic.py": ["Media"],
"MuRuLOSE/HikkaModulesRepo/morse.py": ["Tools", "Fun"],
"MuRuLOSE/HikkaModulesRepo/YamiManager.py": ["Chat", "Tools"],
"MuRuLOSE/HikkaModulesRepo/SearchersGenQuery.py": ["Tools", "Info"],
"MuRuLOSE/HikkaModulesRepo/Limoka.py": ["Utilities", "Tools"],
"MuRuLOSE/HikkaModulesRepo/CheckTime.py": ["Productivity", "Info"],
"MuRuLOSE/HikkaModulesRepo/ReplaceWords.py": ["Chat", "Customization"],
"MuRuLOSE/HikkaModulesRepo/TempJoinChannel.py": ["Chat", "Automation"],
"MuRuLOSE/HikkaModulesRepo/timer.py": ["Productivity", "Tools"],
"den4ikSuperOstryyPer4ik/astro-modules/astroafk.py": ["Automation", "Customization"],
"den4ikSuperOstryyPer4ik/astro-modules/akinator.py": ["Games"],
"den4ikSuperOstryyPer4ik/astro-modules/Emotions.py": ["Social", "Fun"],
"den4ikSuperOstryyPer4ik/astro-modules/RandomStatuses.py": ["Social", "Fun"],
"den4ikSuperOstryyPer4ik/astro-modules/RandomTrack.py": ["Media", "Fun"],
"den4ikSuperOstryyPer4ik/astro-modules/minesweeper.py": ["Games"],
"den4ikSuperOstryyPer4ik/astro-modules/inline_bot_manager.py": ["Tools", "Automation"],
"MuRuLOSE/HikkaModulesRepo/ReplaceWords.py": ["Customization", "Chat"],
"MuRuLOSE/HikkaModulesRepo/CheckTime.py": ["Productivity"],
"MuRuLOSE/HikkaModulesRepo/SearchersGenQuery.py": ["Utilities", "Info"]
}
all_categories = [
"Utilities", "Fun", "Admin", "Media", "Games", "Tools", "Security", "Social",
"Automation", "Info", "Chat", "Moderation", "Productivity", "Customization",
"Networking", "Education", "Finance", "Health", "Creative", "Other"
]
def get_module_text(module_path, module_data):
name = module_data.get("name", "").lower()
description = (module_data.get("description", "") or module_data.get("meta", {}).get("desc", "")).lower()
if not description or description == "desc":
description = ""
commands_text = " ".join([f"{cmd} {desc}".lower() for func in module_data.get("commands", []) for cmd, desc in func.items()])
new_commands_text = " ".join([f"{cmd} {data.get('doc', '')} {data.get('ru_doc', '') or ''}".lower()
for func in module_data.get("new_commands", []) for cmd, data in func.items()])
file_path = module_path.lower()
file_name = file_path.split("/")[-1]
return f"{file_name} {name} {description} {file_path} {commands_text} {new_commands_text}".strip()
with open("modules.json", "r", encoding="utf-8") as f:
modules = json.load(f)
# Подготовка тренировочных данных
train_texts = [get_module_text(path, modules[path]) for path in training_data.keys() if path in modules]
train_labels = [training_data[path] for path in training_data.keys() if path in modules]
# Векторизация текста
vectorizer = TfidfVectorizer(max_features=2000)
X_train = vectorizer.fit_transform(train_texts)
# Преобразование меток
mlb = MultiLabelBinarizer(classes=all_categories)
y_train = mlb.fit_transform(train_labels)
# Обучение модели с балансировкой классов
base_clf = LogisticRegression(class_weight="balanced", max_iter=1000)
clf = OneVsRestClassifier(base_clf)
clf.fit(X_train, y_train)
# Обработка всех модулей
print("Classifying all modules...")
texts = [get_module_text(path, data) for path, data in modules.items()]
X_all = vectorizer.transform(texts)
# Предсказание вероятностей
probs = clf.predict_proba(X_all)
# Присваивание категорий
threshold = 0.2 # Снижаем порог для большего разнообразия
for module_path, prob_vector in tqdm(zip(modules.keys(), probs), total=len(modules), desc="Assigning categories"):
module_data = modules[module_path]
sorted_indices = np.argsort(prob_vector)[::-1]
sorted_probs = prob_vector[sorted_indices]
sorted_labels = mlb.classes_[sorted_indices]
selected_categories = [label for label, prob in zip(sorted_labels, sorted_probs) if prob >= threshold][:2]
if not selected_categories:
selected_categories = ["Other"]
module_data["category"] = selected_categories
print(f"Module: {module_path} -> Categories: {selected_categories} (top probs: {[f'{p:.2f}' for p in sorted_probs[:3]]})")
# Сохранение результата
with open("modules.json", "w", encoding="utf-8") as f:
json.dump(modules, f, ensure_ascii=False, indent=2)
print("Done! Check modules_categorized.json.")

View File

@@ -11,7 +11,7 @@ def parse_repos(file_path: str) -> list:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
for repo in data.get("repositories", []):
repos.append(repo["path"])
repos.append(repo["url"])
return repos
repos = parse_repos("repositories.json")

View File

@@ -106,7 +106,7 @@ class DeviceInfo(loader.Module):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"api_base_url",
"https://mobilespecs.fiksofficial.fun",
"https://gmsarena.vercel.app/",
lambda: "API Url",
validator=loader.validators.String()
),

125348
modules.json

File diff suppressed because one or more lines are too long

478
parse.py
View File

@@ -1,171 +1,353 @@
import os
import ast
import json
import os
import logging
from typing import Dict, Any, Optional, List
from clone_repos import repos
from typing import Dict
logging.basicConfig(level=logging.WARNING, format="%(message)s")
logger = logging.getLogger(__name__)
# TODO: ADD VENV IGNORE
def safe_unparse(node: ast.AST) -> str:
try:
return ast.unparse(node)
except AttributeError:
return getattr(node, 'id', str(type(node).__name__))
def load_blacklist(file_path):
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
repositories = data.get("repositories", [])
blacklisted_modules = {}
def get_module_info(module_path):
"""Парсит Python-модуль и извлекает информацию о нем."""
with open(module_path, "r", encoding="utf-8") as f:
module_content = f.read()
for i in repositories:
path = i.get("path", "")
blacklist = i.get("blacklist", [])
if path and blacklist:
blacklisted_modules[path] = blacklist
meta_info = {"pic": None, "banner": None}
for line in module_content.split("\n"):
if line.startswith("# meta"):
key, value = line.replace("# meta ", "").split(": ")
meta_info[key] = value
return blacklisted_modules
tree = ast.parse(module_content)
def extract_string_value(node: ast.AST) -> Optional[str]:
try:
if isinstance(node, ast.Constant) and isinstance(node.value, str):
return node.value
if isinstance(node, ast.Str):
return node.s
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
return f"{safe_unparse(node.value)}.{node.attr}"
return str(node)
except Exception:
return None
def get_decorator_names(decorator_list):
return [ast.unparse(decorator) for decorator in decorator_list]
def extract_loader_command_args(decorator):
"""Извлекает аргументы `ru_doc` и `en_doc` из `@loader.command`."""
if (
isinstance(decorator, ast.Call)
and hasattr(decorator.func, "attr")
and decorator.func.attr == "command"
):
ru_doc = None
en_doc = None
for keyword in decorator.keywords:
if keyword.arg == "ru_doc":
ru_doc = ast.literal_eval(keyword.value)
elif keyword.arg == "en_doc":
en_doc = ast.literal_eval(keyword.value)
return ru_doc, en_doc
return None, None
result = {}
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
decorators = get_decorator_names(node.decorator_list)
is_tds_mod = [d for d in decorators if "loader.tds" in d]
if "Mod" not in node.name and not is_tds_mod:
def extract_loader_command_args(decorator: ast.Call) -> Dict[str, Any]:
args = {"lang_docs": {}, "aliases": [], "usage": None}
try:
for kw in decorator.keywords:
arg_name = kw.arg
if not arg_name:
continue
if arg_name.endswith("_doc"):
lang = arg_name[:-4]
args["lang_docs"][lang] = extract_string_value(kw.value)
elif arg_name == "aliases":
try:
val = ast.literal_eval(kw.value)
if isinstance(val, (list, tuple)):
args["aliases"] = list(val)
except (ValueError, SyntaxError):
pass
elif arg_name == "usage":
args["usage"] = extract_string_value(kw.value)
except Exception:
pass
return args
class_docstring = ast.get_docstring(node)
class_info = {
"name": node.name,
"description": class_docstring,
"meta": meta_info,
"commands": [],
"new_commands": [],
}
def get_module_info(module_path: str) -> Optional[Dict[str, Any]]:
try:
with open(module_path, "r", encoding="utf-8") as f:
source = f.read()
except Exception as e:
logger.warning(f"Skipping {module_path}: read failed — {e}")
return None
for class_body_node in node.body:
if isinstance(class_body_node, (ast.FunctionDef, ast.AsyncFunctionDef)):
decorators = get_decorator_names(class_body_node.decorator_list)
is_loader_command = [d for d in decorators if "command" in d]
if not is_loader_command and "cmd" not in class_body_node.name:
continue
source = source.lstrip('\ufeff')
source = ''.join(c for c in source if ord(c) >= 32 or c in '\n\r\t') if source else source
method_docstring = ast.get_docstring(class_body_node)
command_name = class_body_node.name
ru_doc, en_doc = None, None
meta = {"pic": None, "banner": None, "developer": None}
for line in source.splitlines():
line = line.strip()
if line.startswith("# meta "):
try:
key, val = line[len("# meta "):].split(":", 1)
meta[key.strip()] = val.strip()
except ValueError:
pass
for decorator in class_body_node.decorator_list:
ru_doc_tmp, en_doc_tmp = extract_loader_command_args(decorator)
if ru_doc_tmp:
ru_doc = ru_doc_tmp
if en_doc_tmp:
en_doc = en_doc_tmp
try:
tree = ast.parse(source, filename=module_path)
except SyntaxError as e:
logger.warning(f"Skipping {module_path}: syntax error — {e}")
return {
"name": module_path.split(os.sep)[-1].replace(".py", ""),
"description": "",
"cls_doc": {},
"meta": meta,
"commands": [],
"new_commands": [],
"inline_handlers": [],
"strings": {},
"has_on_load": False,
"has_on_unload": False,
"class_cmd_names": {},
}
descriptions = []
if method_docstring:
descriptions.append(method_docstring)
if ru_doc:
descriptions.append(ru_doc)
if en_doc:
descriptions.append(en_doc)
module_data = None
class_info["commands"].append(
{command_name: ' '.join(descriptions)}
)
command_name = command_name.replace('cmd', '')
class_info["new_commands"].append(
{
command_name: {
"ru_doc": ru_doc,
"en_doc": en_doc,
"doc": method_docstring,
}
}
)
result = class_info
return result
def parse_developers(base_dir: str) -> Dict[str, list]:
developers = {
"repo": set(), # используем set внутри функции
"channel": set()
}
for repo_url in repos:
repo_path = repo_url.replace("https://github.com/", "")
try:
owner, repo_name = repo_path.split("/")
developers["repo"].add(owner)
except ValueError:
print(f"Incorrect URL of repository: {repo_url}")
for node in ast.walk(tree):
if not isinstance(node, ast.ClassDef):
continue
for root, _, files in os.walk(base_dir):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
try:
module_info = get_module_info(file_path)
if module_info and "meta" in module_info:
developer = module_info["meta"].get('developer')
if developer: # Проверяем, что developer не None
# Разделяем строки с запятыми, &, | и пробелами
for dev in developer.replace(',', ' ').replace('&', ' ').replace('|', ' ').split():
# Добавляем только элементы, начинающиеся с @
if dev.startswith('@'):
developers["channel"].add(dev.strip())
except Exception as e:
print(f"Ошибка при парсинге файла {file_path}: {e}")
is_module_class = (
"Mod" in node.name or
any(isinstance(d, ast.Attribute) and safe_unparse(d).startswith("loader.tds") for d in node.decorator_list) or
any(isinstance(d, ast.Name) and d.id == "loader" for d in node.decorator_list)
)
# Преобразуем set в list перед возвратом
return {
"repo": list(developers["repo"]),
"channel": list(developers["channel"])
if not is_module_class:
continue
info = {
"name": node.name,
"description": ast.get_docstring(node) or "",
"cls_doc": {},
"meta": meta,
"commands": [],
"new_commands": [],
"inline_handlers": [],
"strings": {},
"has_on_load": False,
"has_on_load": False,
"has_on_unload": False,
"class_cmd_names": {},
}
for item in node.body:
if isinstance(item, ast.Assign):
for target in item.targets:
if isinstance(target, ast.Name) and (target.id == "strings" or target.id.startswith("strings_")):
try:
lit = ast.literal_eval(item.value)
if isinstance(lit, dict):
if target.id == "strings":
info["strings"].update(lit)
if "_cls_doc" in lit:
info["cls_doc"]["default"] = lit["_cls_doc"]
else:
lang = target.id.split("_", 1)[1] if "_" in target.id else None
if lang:
for k, v in lit.items():
if isinstance(k, str) and isinstance(v, str):
if k == "_cls_doc":
info["cls_doc"][lang] = v
elif k.startswith("_cmd_doc_"):
rest = k[len("_cmd_doc_"):]
info["strings"][f"_cmd_doc_{lang}_{rest}"] = v
info["strings"][f"_cmd_doc_{rest}_{lang}"] = v
elif k.startswith("_ihandle_doc_"):
rest = k[len("_ihandle_doc_"):]
info["strings"][f"_ihandle_doc_{lang}_{rest}"] = v
info["strings"][f"_ihandle_doc_{rest}_{lang}"] = v
elif k.startswith("_cls_cmd_"):
info["class_cmd_names"][lang] = v
else:
info["strings"][f"{k}_{lang}"] = v
except Exception:
pass
if "_cls_doc" in info["strings"]:
info["cls_doc"]["default"] = info["strings"]["_cls_doc"]
for func in node.body:
if not isinstance(func, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
name = func.name
if name == "on_load":
info["has_on_load"] = True
continue
if name == "on_unload":
info["has_on_unload"] = True
continue
is_decorated = any(
isinstance(d, ast.Call) and hasattr(d.func, 'attr') and
d.func.attr in ("command", "inline_handler", "unrestricted", "owner")
for d in func.decorator_list
)
if name.startswith("_") and not is_decorated:
continue
cmd = {
"name": name,
"doc": ast.get_docstring(func) or "",
"lang_docs": {},
"aliases": [],
"usage": None,
"inline": False,
"is_inline_handler": False,
"decorators": [],
"cmd_names": {},
}
for dec in func.decorator_list:
if isinstance(dec, ast.Call) and hasattr(dec.func, 'attr'):
attr = dec.func.attr
if attr == "command":
cmd.update(extract_loader_command_args(dec))
elif attr == "inline_handler":
cmd["inline"] = True
cmd["is_inline_handler"] = True
elif attr in ("unrestricted", "owner", "support"):
cmd["decorators"].append(attr)
for stmt in func.body:
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
if isinstance(target, ast.Attribute):
attr = target.attr
val = extract_string_value(stmt.value)
if not val:
continue
if attr == "_cmd":
cmd["name"] = val
elif attr == "_doc":
cmd["doc"] = val
elif attr == "_cls_doc":
info["cls_doc"]["default"] = val
elif attr.startswith("_cls_doc_"):
lang = attr[len("_cls_doc_"):]
info["cls_doc"][lang] = val
elif attr.startswith("_cmd_"):
lang = attr[len("_cmd_"):]
cmd["cmd_names"][lang] = val
is_command_name = "cmd" in name and not name.startswith("__")
if not (is_decorated or is_command_name):
continue
clean_name = cmd["name"].replace("cmd", "").replace("_", "")
descs = []
legacy_key = f"_cmd_doc_{clean_name}"
legacy_doc = info["strings"].get(legacy_key)
base_doc = legacy_doc if legacy_doc else cmd["doc"]
if base_doc:
descs.append(base_doc)
for lang, text in cmd["lang_docs"].items():
if text:
descs.append(f"({lang.upper()}) {text}")
for k, v in info["strings"].items():
if k.startswith("_cmd_doc_") and clean_name in k and v:
if k.endswith(f"_{clean_name}"):
lang_part = k[len("_cmd_doc_"):-len(f"_{clean_name}")-1]
if lang_part:
descs.append(f"({lang_part.upper()}) {v}")
elif k.startswith(f"_cmd_doc_{clean_name}_"):
lang_part = k[len(f"_cmd_doc_{clean_name}_"):]
if lang_part:
descs.append(f"({lang_part.upper()}) {v}")
full_desc = " | ".join(filter(None, descs))
info["commands"].append({clean_name: full_desc})
desc_map = {"default": legacy_doc or cmd["doc"]}
desc_map.update(cmd["lang_docs"])
for k, v in info["strings"].items():
if k.startswith("_cmd_doc_") and clean_name in k and v:
if k.endswith(f"_{clean_name}"):
lang_part = k[len("_cmd_doc_"):-len(f"_{clean_name}")-1]
if lang_part:
desc_map[lang_part] = v
elif k.startswith(f"_cmd_doc_{clean_name}_"):
lang_part = k[len(f"_cmd_doc_{clean_name}_"):]
if lang_part:
desc_map[lang_part] = v
info["new_commands"].append({
"name": clean_name,
"original_name": cmd["name"],
"description": desc_map,
"cmd_names": cmd["cmd_names"],
"aliases": cmd["aliases"],
"usage": cmd["usage"],
"inline": cmd["inline"],
"is_inline_handler": cmd["is_inline_handler"],
"decorators": cmd["decorators"],
})
if cmd["is_inline_handler"]:
inline_desc_map = {"default": cmd["doc"]}
inline_desc_map.update(cmd["lang_docs"])
for k, v in info["strings"].items():
if k.startswith("_ihandle_doc_") and clean_name in k and v:
if k.endswith(f"_{clean_name}"):
lang_part = k[len("_ihandle_doc_"):-len(f"_{clean_name}")-1]
if lang_part:
inline_desc_map[lang_part] = v
elif k.startswith(f"_ihandle_doc_{clean_name}_"):
lang_part = k[len(f"_ihandle_doc_{clean_name}_"):]
if lang_part:
inline_desc_map[lang_part] = v
info["inline_handlers"].append({
"name": clean_name,
"description": inline_desc_map,
"decorators": cmd["decorators"],
})
module_data = info
break
return module_data
def main():
base_dir = os.getcwd()
modules = {}
blacklisted_modules = load_blacklist("repositories.json")
for root, dirs, files in os.walk(base_dir):
dirs[:] = [d for d in dirs if d not in ("venv", ".venv", "env", ".env", ".git")]
for file in files:
if file.endswith(".py") and not file.startswith("_") and file not in blacklisted_modules.get(os.path.relpath(root, base_dir), []):
path = os.path.join(root, file)
try:
data = get_module_info(path)
if data:
rel = os.path.relpath(path, base_dir).replace("\\", "/")
modules[rel] = data
except Exception as e:
logger.error(f"Error processing {path}: {e}")
output = {
"modules": modules,
"meta": {
"total_modules": len(modules),
"generated_at": __import__("datetime").datetime.now().isoformat(),
}
}
with open("modules.json", "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
modules_data = {}
base_dir = os.getcwd()
print(f"modules.json written ({len(modules)} modules)")
for root, _, files in os.walk(base_dir):
for file in files:
if file.endswith(".py"):
file_path = os.path.join(root, file)
try:
module_info = get_module_info(file_path)
if module_info:
relative_path = os.path.relpath(file_path, base_dir)
modules_data[relative_path] = module_info
except Exception as e:
print(f"Ошибка при парсинге файла {file_path}: {e}")
developers = parse_developers(base_dir)
with open("modules.json", "w", encoding="utf-8") as json_file:
json.dump(modules_data, json_file, ensure_ascii=False, indent=2)
print("Файл modules.json создан!")
with open("developers.json", "w", encoding="utf-8") as json_file:
json.dump(developers, json_file, ensure_ascii=False, indent=2)
print("Файл developers.json создан!")
if __name__ == "__main__":
main()

View File

@@ -1,192 +1,239 @@
{
"repositories": [
{
"path": "DziruModules/hikkamods",
"tags": []
"url": "https://github.com/DziruModules/hikkamods",
"tags": [],
"blacklist": []
},
{
"path": "kamolgks/Hikkamods",
"tags": []
"url": "https://github.com/kamolgks/Hikkamods",
"tags": [],
"blacklist": []
},
{
"path": "thomasmod/hikkamods",
"tags": []
"url": "https://github.com/thomasmod/hikkamods",
"tags": [],
"blacklist": []
},
{
"path": "SkillsAngels/Modules",
"tags": []
"url": "https://github.com/SkillsAngels/Modules",
"tags": [],
"blacklist": []
},
{
"path": "Sad0ff/modules-ftg",
"tags": []
"url": "https://github.com/Sad0ff/modules-ftg",
"tags": [],
"blacklist": []
},
{
"path": "Yahikoro/Modules-for-FTG",
"tags": []
"url": "https://github.com/Yahikoro/Modules-for-FTG",
"tags": [],
"blacklist": []
},
{
"path": "KeyZenD/modules",
"tags": []
"url": "https://github.com/KeyZenD/modules",
"tags": [],
"blacklist": []
},
{
"path": "AlpacaGang/ftg-modules",
"tags": []
"url": "https://github.com/AlpacaGang/ftg-modules",
"tags": [],
"blacklist": []
},
{
"path": "trololo65/Modules",
"tags": []
"url": "https://github.com/trololo65/Modules",
"tags": [],
"blacklist": []
},
{
"path": "Ijidishurka/modules",
"tags": []
"url": "https://github.com/Ijidishurka/modules",
"tags": [],
"blacklist": []
},
{
"path": "Fl1yd/FTG-Modules",
"tags": []
"url": "https://github.com/Fl1yd/FTG-Modules",
"tags": [],
"blacklist": []
},
{
"path": "D4n13l3k00/FTG-Modules",
"tags": []
"url": "https://github.com/D4n13l3k00/FTG-Modules",
"tags": [],
"blacklist": []
},
{
"path": "iamnalinor/FTG-modules",
"tags": ["hikkatrusted", "nonactive"]
"url": "https://github.com/iamnalinor/FTG-modules",
"tags": ["hikkatrusted", "nonactive"],
"blacklist": []
},
{
"path": "SekaiYoneya/modules",
"tags": []
"url": "https://github.com/SekaiYoneya/modules",
"tags": [],
"blacklist": []
},
{
"path": "GeekTG/FTG-Modules",
"tags": []
"url": "https://github.com/GeekTG/FTG-Modules",
"tags": [],
"blacklist": []
},
{
"path": "Den4ikSuperOstryyPer4ik/Astro-modules",
"tags": ["hikkatrusted", "herokutrusted"]
"url": "https://github.com/Den4ikSuperOstryyPer4ik/Astro-modules",
"tags": ["hikkatrusted", "herokutrusted"],
"blacklist": []
},
{
"path": "vsecoder/hikka_modules",
"tags": ["hikkatrusted", "herokutrusted"]
"url": "https://github.com/vsecoder/hikka_modules",
"tags": ["hikkatrusted", "herokutrusted"],
"blacklist": []
},
{
"path": "sqlmerr/hikka_mods",
"tags": ["hikkatrusted", "herokutrusted"]
"url": "https://github.com/sqlmerr/hikka_mods",
"tags": ["hikkatrusted", "herokutrusted"],
"blacklist": []
},
{
"path": "N3rcy/modules",
"tags": ["hikkatrusted", "herokutrusted"]
"url": "https://github.com/N3rcy/modules",
"tags": ["hikkatrusted", "herokutrusted"],
"blacklist": []
},
{
"path": "KorenbZla/HikkaModules",
"tags": ["hikkatrusted", "herokutrusted"]
"url": "https://github.com/KorenbZla/HikkaModules",
"tags": ["hikkatrusted", "herokutrusted"],
"blacklist": []
},
{
"path": "MuRuLOSE/HikkaModulesRepo",
"tags": []
"url": "https://github.com/MuRuLOSE/HikkaModulesRepo",
"tags": [],
"blacklist": []
},
{
"path": "coddrago/modules",
"tags": ["herokutrusted", "hikkatrusted"]
"url": "https://github.com/coddrago/modules",
"tags": ["herokutrusted", "hikkatrusted"],
"blacklist": []
},
{
"path": "1jpshiro/hikka-modules",
"tags": []
"url": "https://github.com/1jpshiro/hikka-modules",
"tags": [],
"blacklist": []
},
{
"path": "MoriSummerz/ftg-mods",
"tags": ["hikkatrusted", "nonactive"]
"url": "https://github.com/MoriSummerz/ftg-mods",
"tags": ["hikkatrusted", "nonactive"],
"blacklist": []
},
{
"path": "anon97945/hikka-mods",
"tags": ["hikkatrusted", "nonactive"]
"url": "https://github.com/anon97945/hikka-mods",
"tags": ["hikkatrusted", "nonactive"],
"blacklist": []
},
{
"path": "dorotorothequickend/DorotoroModules",
"tags": ["hikkatrusted", "nonlongermaintained"]
"url": "https://github.com/dorotorothequickend/DorotoroModules",
"tags": ["hikkatrusted", "nonlongermaintained"],
"blacklist": []
},
{
"path": "AmoreForever/amoremods",
"tags": []
"url": "https://github.com/AmoreForever/amoremods",
"tags": [],
"blacklist": []
},
{
"path": "idiotcoders/idiotmodules",
"tags": ["hikkatrusted", "herokutrusted", "nonactive"]
"url": "https://github.com/idiotcoders/idiotmodules",
"tags": ["hikkatrusted", "herokutrusted", "nonactive"],
"blacklist": []
},
{
"path": "CakesTwix/Hikka-Modules",
"tags": ["hikkatrusted", "nonactive"]
"url": "https://github.com/CakesTwix/Hikka-Modules",
"tags": ["hikkatrusted", "nonactive"],
"blacklist": []
},
{
"path": "archquise/H.Modules",
"tags": ["hikkatrusted", "nonactive"]
"url": "https://github.com/archquise/H.Modules",
"tags": ["hikkatrusted", "nonactive"],
"blacklist": []
},
{
"path": "GD-alt/mm-hikka-mods",
"tags": ["hikkatrusted", "herokutrusted", "nonactive"]
"url": "https://github.com/GD-alt/mm-hikka-mods",
"tags": ["hikkatrusted", "herokutrusted", "nonactive"],
"blacklist": []
},
{
"path": "HitaloSama/FTG-modules-repo",
"tags": []
"url": "https://github.com/HitaloSama/FTG-modules-repo",
"tags": [],
"blacklist": []
},
{
"path": "SekaiYoneya/Friendly-telegram",
"tags": []
"url": "https://github.com/SekaiYoneya/Friendly-telegram",
"tags": [],
"blacklist": []
},
{
"path": "blazedzn/ftg-modules",
"tags": []
"url": "https://github.com/blazedzn/ftg-modules",
"tags": [],
"blacklist": []
},
{
"path": "hikariatama/ftg",
"tags": ["hikkatrusted", "nonactive"]
"url": "https://github.com/hikariatama/ftg",
"tags": ["hikkatrusted", "nonactive"],
"blacklist": []
},
{
"path": "m4xx1m/FTG",
"tags": []
"url": "https://github.com/m4xx1m/FTG",
"tags": [],
"blacklist": []
},
{
"path": "skillzmeow/skillzmods_hikka",
"tags": []
"url": "https://github.com/skillzmeow/skillzmods_hikka",
"tags": [],
"blacklist": []
},
{
"path": "fajox1/famods",
"tags": ["hikkatrusted", "herokutrusted"]
"url": "https://github.com/fajox1/famods",
"tags": ["hikkatrusted", "herokutrusted"],
"blacklist": []
},
{
"path": "TheKsenon/MyHikkaModules",
"tags": ["hikkatrusted", "herokutrusted"]
"url": "https://github.com/TheKsenon/MyHikkaModules",
"tags": ["hikkatrusted", "herokutrusted"],
"blacklist": []
},
{
"path": "cryptexctl/modules-mirror",
"tags": []
"url": "https://github.com/cryptexctl/modules-mirror",
"tags": [],
"blacklist": []
},
{
"path": "Ruslan-Isaev/modules",
"tags": ["herokutrusted"]
"url": "https://github.com/Ruslan-Isaev/modules",
"tags": ["herokutrusted"],
"blacklist": []
},
{
"path": "shadowhikka/sh.modules",
"tags": []
"url": "https://github.com/shadowhikka/sh.modules",
"tags": [],
"blacklist": []
},
{
"path": "fiksofficial/python-modules",
"tags": ["herokutrusted"]
"url": "https://github.com/fiksofficial/python-modules",
"tags": ["herokutrusted"],
"blacklist": []
},
{
"path": "mead0wsss/mead0wsMods",
"tags": ["herokutrusted"]
"url": "https://github.com/mead0wsss/mead0wsMods",
"tags": ["herokutrusted"],
"blacklist": []
},
{
"path": "SenkoGuardian/SenModules",
"tags": ["herokutrusted"]
"url": "https://github.com/SenkoGuardian/SenModules",
"tags": ["herokutrusted"],
"blacklist": []
},
{
"path": "ZetGoHack/nullmod",
"tags": ["herokutrusted"]
"url": "https://github.com/ZetGoHack/nullmod",
"tags": ["herokutrusted"],
"blacklist": []
},
{
"path": "yummy1gay/limoka",
"tags": []
"url": "https://github.com/yummy1gay/limoka",
"tags": [],
"blacklist": []
}
]
}

193
update_diffs.py Normal file
View File

@@ -0,0 +1,193 @@
import asyncio
import aiohttp
import argparse
import subprocess
import os
import tempfile
from pathlib import Path
parser = argparse.ArgumentParser(description="Update Diffs Script")
parser.add_argument(
"--token",
type=str,
required=True,
help="Token of Telegram bot",
)
parser.add_argument(
"--api_url",
type=str,
default="https://api.telegram.org",
help="API URL of Telegram API",
)
parser.add_argument(
"--chat_id",
type=str,
required=True,
help="Chat ID to send updates to",
)
parser.add_argument(
"--base_commit",
type=str,
default="HEAD~1",
help="Base commit to compare against",
)
arguments = parser.parse_args()
async def send_message(session, text):
"""Send a text message to the channel"""
url = f"{arguments.api_url}/bot{arguments.token}/sendMessage"
data = {
'chat_id': arguments.chat_id,
'text': text,
'parse_mode': 'Markdown',
}
async with session.post(url, data=data) as response:
return await response.json()
async def send_document(session, file_path, caption=None):
"""Send a document to the channel"""
url = f"{arguments.api_url}/bot{arguments.token}/sendDocument"
with open(file_path, 'rb') as f:
data = aiohttp.FormData()
data.add_field('chat_id', arguments.chat_id)
data.add_field('document', f, filename=os.path.basename(file_path))
if caption:
data.add_field('caption', caption)
data.add_field('parse_mode', 'Markdown')
async with session.post(url, data=data) as response:
return await response.json()
def get_changed_files(base_commit):
"""Get list of changed files between commits"""
try:
result = subprocess.check_output(
['git', 'diff', '--name-only', base_commit, 'HEAD'],
cwd=os.getcwd()
).decode().strip().split('\n')
return [f for f in result if f]
except subprocess.CalledProcessError:
return []
def get_file_diff(file_path, base_commit):
"""Get diff for a specific file"""
try:
diff = subprocess.check_output(
['git', 'diff', base_commit, 'HEAD', '--', file_path],
cwd=os.getcwd()
).decode()
return diff
except subprocess.CalledProcessError:
return ""
def is_module_file(file_path):
"""Check if file is a Python module in a modules directory"""
# Check if it's a .py file and in a modules-like directory
return file_path.endswith('.py') and any(
part in file_path.lower() for part in [
'modules', 'mods', 'ftg', 'hikka'
]
)
def extract_module_name(file_path):
"""Extract module name from file path"""
return Path(file_path).stem
async def main():
changed_files = get_changed_files(arguments.base_commit)
if not changed_files:
print("No changes detected")
return
# Filter for module files only
module_files = [f for f in changed_files if is_module_file(f)]
if not module_files:
print("No module changes detected")
return
async with aiohttp.ClientSession() as session:
for file_path in module_files:
try:
module_name = extract_module_name(file_path)
# Create message with raw GitHub URL
github_url = f"https://raw.githubusercontent.com/MuRuLOSE/limoka/refs/heads/main/{file_path}"
try:
new_hash = subprocess.check_output(
['git', 'rev-list', '-n', '1', 'HEAD', '--', file_path],
cwd=os.getcwd()
).decode().strip()
except Exception:
new_hash = 'HEAD'
try:
old_hash = subprocess.check_output(
['git', 'rev-list', '-n', '1', arguments.base_commit, '--', file_path],
cwd=os.getcwd()
).decode().strip()
except Exception:
old_hash = arguments.base_commit
diff_url = f"https://github.com/MuRuLOSE/limoka/compare/{old_hash}...{new_hash}.diff"
message = (
f"🪼 Module {module_name} changes approved\n\n"
f"[File URL]({github_url}) | [Diff URL]({diff_url})\n\n"
)
# Get diff
diff = get_file_diff(file_path, arguments.base_commit)
if not diff:
print(f"Skipping {file_path} - no diff content")
continue
# Create temporary file with diff using only module name
diff_filename = f"{module_name}.diff"
with tempfile.NamedTemporaryFile(
mode='w',
suffix='',
prefix='',
delete=False,
encoding='utf-8',
dir=tempfile.gettempdir()
) as tmp_file:
tmp_file.write(diff)
tmp_file_path = tmp_file.name
try:
# Rename temp file to have proper name
final_path = os.path.join(tempfile.gettempdir(), diff_filename)
os.rename(tmp_file_path, final_path)
# Send diff as document with full message as caption
doc_result = await send_document(
session,
final_path,
caption=message
)
print(f"Sent diff for {module_name}: {doc_result}")
except Exception as e:
print(f"Error sending {module_name}: {e}")
finally:
# Cleanup temp files
if os.path.exists(tmp_file_path):
try:
os.remove(tmp_file_path)
except:
pass
final_path = os.path.join(tempfile.gettempdir(), diff_filename)
if os.path.exists(final_path):
try:
os.remove(final_path)
except:
pass
except Exception as e:
print(f"Error processing {file_path}: {e}")
if __name__ == "__main__":
asyncio.run(main())