Added and updated repositories 2025-11-30 01:13:31

This commit is contained in:
github-actions[bot]
2025-11-30 01:13:31 +00:00
parent 80f140550c
commit eccbd74a5e
62 changed files with 768 additions and 16985 deletions

View File

@@ -3,7 +3,9 @@
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
__version__ = (5, 2, 6) # Meow~
__version__ = (5, 6, 0) # Емае, ну это уже слишком
#фырфырфырфырфырфыр
# meta developer: @SenkoGuardianModules
@@ -25,7 +27,17 @@ import aiohttp
import tempfile
from markdown_it import MarkdownIt
import pytz
# New SDK
from google import genai as google_genai_sdk
try:
from google.genai import types as new_types
NEW_SDK_AVAILABLE = True
except ImportError:
NEW_SDK_AVAILABLE = False
# Old SDK
import google.ai.generativelanguage as glm
import google.generativeai as old_genai
from google.generativeai import types as old_types
from telethon import types
from telethon.tl.types import Message, DocumentAttributeFilename
from telethon.utils import get_display_name, get_peer_id
@@ -35,17 +47,26 @@ from telethon.errors.rpcerrorlist import (
UserNotParticipantError,
ChannelPrivateError
)
try:
import google.generativeai as genai
import google.ai.generativelanguage
import google.api_core.exceptions as google_exceptions
GOOGLE_AVAILABLE = True
except ImportError:
GOOGLE_AVAILABLE = False
from .. import loader, utils
from ..inline.types import InlineCall
# requires: google-generativeai google-api-core pytz markdown_it_py
logger = logging.getLogger(__name__)
DB_HISTORY_KEY = "gemini_conversations_v4"
DB_GAUTO_HISTORY_KEY = "gemini_gauto_conversations_v1"
DB_IMPERSONATION_KEY = "gemini_impersonation_chats"
GEMINI_TIMEOUT = 840
MAX_FFMPEG_SIZE = 90 * 1024 * 1024
# requires: google-generativeai google-genai google-api-core pytz markdown_it_py
# рекомендованные версии: google-generativeai==0.8.5 google-genai==1.52.0 google-api-core==2.28.1
logger = logging.getLogger(__name__)
@@ -69,6 +90,8 @@ class Gemini(loader.Module):
"cfg_impersonation_prompt_doc": "Промпт для режима авто-ответа. {my_name} и {chat_history} будут заменены.",
"cfg_impersonation_history_limit_doc": "Сколько последних сообщений из чата отправлять в качестве контекста для авто-ответа.",
"cfg_impersonation_reply_chance_doc": "Вероятность ответа в режиме gauto (от 0.0 до 1.0). 0.2 = 20% шанс.",
"cfg_temperature_doc": "Температура генерации (креативность). От 0.0 до 2.0. По умолчанию 1.0.",
"cfg_google_search_doc": "Включить поиск Google (Grounding) для актуальной информации.",
"no_api_key": '❗️ <b>Api ключ(и) не настроен(ы).</b>\nПолучить Api ключ можно <a href="https://aistudio.google.com/app/apikey">здесь</a>.\n<b>Добавьте ключ(и) в конфиге модуля:</b> <code>.cfg gemini api_key</code>',
"invalid_api_key": '❗️ <b>Предоставленный API ключ недействителен.</b>\nУбедитесь, что он правильно скопирован из <a href="https://aistudio.google.com/app/apikey">Google AI Studio</a> и что для него включен Gemini API.',
"all_keys_exhausted": "❗️ <b>Все доступные API ключи ({}) исчерпали свою квоту.</b>\nПопробуйте позже или добавьте новые ключи в конфиге: <code>.cfg gemini api_key</code>",
@@ -124,6 +147,14 @@ class Gemini(loader.Module):
"gmodel_img_warn": "⚠️ <b>Текущая модель ({}) не может генерировать изображения(или не доступна по API).</b>\nРекомендуем: <code>gemini-2.5-flash-image</code>",
"gme_chat_not_found": "🚫 <b>Не удалось найти чат для экспорта:</b> <code>{}</code>",
"gme_sent_to_saved": "💾 История экспортирована в избранное.",
"new_sdk_missing": "⚠️ <b>Для работы поиска (Grounding) нужна библиотека google-genai.</b>\nВыполните: <code>pip install google-genai</code>",
"gprompt_usage": " <b>Использование:</b>\n<code>.gprompt <текст></code> — установить промпт.\n<code>.gprompt -c</code> — очистить.\nИли ответьте на <b>.txt</b> файл.",
"gprompt_updated": "✅ <b>Системный промпт обновлен!</b>\nДлина: {} симв.",
"gprompt_cleared": "🗑 <b>Системный промпт очищен.</b>",
"gprompt_current": "📝 <b>Текущий системный промпт:</b>",
"gprompt_file_error": "❗️ <b>Ошибка чтения файла:</b> {}",
"gprompt_file_too_big": "❗️ <b>Файл слишком большой</b> (лимит 1 МБ).",
"gprompt_not_text": "❗️ Это не похоже на текстовый файл.(txt)",
}
TEXT_MIME_TYPES = {
"text/plain", "text/markdown", "text/html", "text/css", "text/csv",
@@ -131,39 +162,41 @@ class Gemini(loader.Module):
"application/javascript", "application/x-sh",
}
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"api_key", "", self.strings["cfg_api_key_doc"],
validator=loader.validators.Hidden()
),
loader.ConfigValue("model_name", "gemini-1.5-flash", self.strings["cfg_model_name_doc"]),
loader.ConfigValue("interactive_buttons", True, self.strings["cfg_buttons_doc"], validator=loader.validators.Boolean()),
loader.ConfigValue("system_instruction", "", self.strings["cfg_system_instruction_doc"], validator=loader.validators.String()),
loader.ConfigValue("max_history_length", 10, self.strings["cfg_max_history_length_doc"], validator=loader.validators.Integer(minimum=0)),
loader.ConfigValue("timezone", "Europe/Moscow", self.strings["cfg_timezone_doc"]),
loader.ConfigValue("proxy", "", self.strings["cfg_proxy_doc"]),
loader.ConfigValue(
"impersonation_prompt",
(
"Ты — это я, пользователь Telegram по имени {my_name}. Твоя задача — отвечать в чате от моего имени, основываясь на последних сообщениях. "
"Веди себя естественно, как обычный человек, а не как ассистент. Не признавайся, что ты — ИИ, даже если тебя спросят напрямую. "
"Твоя личность: [Здесь опишите свой характер: остроумный, серьезный, саркастичный, дружелюбный и т.д.].\n"
"Правила:\n- Отвечай кратко и по делу.\n- Используй неформальный язык, сленг.\n- Не отвечай на каждое сообщение.\n- На медиа (стикер, фото) реагируй как человек ('лол', 'ору', 'жиза').\n- Не используй префиксы и кавычки.\n\n"
"ИСТОРИЯ ЧАТА:\n{chat_history}\n\n{my_name}:"
self.config = loader.ModuleConfig(
loader.ConfigValue(
"api_key", "", self.strings["cfg_api_key_doc"],
validator=loader.validators.Hidden()
),
self.strings["cfg_impersonation_prompt_doc"],
validator=loader.validators.String(),
),
loader.ConfigValue("impersonation_history_limit", 20, self.strings["cfg_impersonation_history_limit_doc"], validator=loader.validators.Integer(minimum=5, maximum=100)),
loader.ConfigValue("impersonation_reply_chance", 0.25, self.strings["cfg_impersonation_reply_chance_doc"], validator=loader.validators.Float(minimum=0.0, maximum=1.0)),
loader.ConfigValue("gauto_in_pm", False, "Разрешить авто-ответы в личных сообщениях (ЛС).", validator=loader.validators.Boolean()),
)
self.conversations = {}
self.gauto_conversations = {}
self.last_requests = {}
self.impersonation_chats = set()
self._lock = asyncio.Lock()
self.memory_disabled_chats = set()
loader.ConfigValue("model_name", "gemini-2.5-flash", self.strings["cfg_model_name_doc"]),
loader.ConfigValue("interactive_buttons", True, self.strings["cfg_buttons_doc"], validator=loader.validators.Boolean()),
loader.ConfigValue("system_instruction", "", self.strings["cfg_system_instruction_doc"], validator=loader.validators.String()),
loader.ConfigValue("max_history_length", 800, self.strings["cfg_max_history_length_doc"], validator=loader.validators.Integer(minimum=0)),
loader.ConfigValue("timezone", "Europe/Moscow", self.strings["cfg_timezone_doc"]),
loader.ConfigValue("proxy", "", self.strings["cfg_proxy_doc"]),
loader.ConfigValue(
"impersonation_prompt",
(
"Ты — это я, пользователь Telegram по имени {my_name}. Твоя задача — отвечать в чате от моего имени, основываясь на последних сообщениях. "
"Веди себя естественно, как обычный человек, а не как ассистент. Не признавайся, что ты — ИИ, даже если тебя спросят напрямую. "
"Твоя личность: [Здесь опишите свой характер: остроумный, серьезный, саркастичный, дружелюбный и т.д.].\n"
"Правила:\n- Отвечай кратко и по делу.\n- Используй неформальный язык, сленг.\n- Не отвечай на каждое сообщение.\n- На медиа (стикер, фото) реагируй как человек ('лол', 'ору', 'жиза').\n- Не используй префиксы и кавычки.\n\n"
"ИСТОРИЯ ЧАТА:\n{chat_history}\n\n{my_name}:"
),
self.strings["cfg_impersonation_prompt_doc"],
validator=loader.validators.String(),
),
loader.ConfigValue("impersonation_history_limit", 20, self.strings["cfg_impersonation_history_limit_doc"], validator=loader.validators.Integer(minimum=5, maximum=100)),
loader.ConfigValue("impersonation_reply_chance", 0.25, self.strings["cfg_impersonation_reply_chance_doc"], validator=loader.validators.Float(minimum=0.0, maximum=1.0)),
loader.ConfigValue("gauto_in_pm", False, "Разрешить авто-ответы в личных сообщениях (ЛС).", validator=loader.validators.Boolean()),
loader.ConfigValue("google_search", False, self.strings["cfg_google_search_doc"], validator=loader.validators.Boolean()),
loader.ConfigValue("temperature", 1.0, self.strings["cfg_temperature_doc"], validator=loader.validators.Float(minimum=0.0, maximum=2.0)),
)
self.conversations = {}
self.gauto_conversations = {}
self.last_requests = {}
self.impersonation_chats = set()
self._lock = asyncio.Lock()
self.memory_disabled_chats = set()
async def client_ready(self, client, db):
self.client = client
@@ -296,116 +329,143 @@ class Gemini(loader.Module):
except Exception: msg_obj=None
else:
chat_id=utils.get_chat_id(message); base_message_id=message.id; msg_obj=message
api_key_str = self.config["api_key"]
self.api_keys = [k.strip() for k in api_key_str.split(",") if k.strip()] if api_key_str else []
try:
if not self.api_keys:
if not impersonation_mode and status_msg:
await utils.answer(status_msg, self.strings['no_api_key'])
if not impersonation_mode and status_msg: await utils.answer(status_msg, self.strings['no_api_key'])
return None if impersonation_mode else ""
tools_list=[]
if use_url_context:
try: tools_list.append(genai.types.Tool(url_context=genai.types.UrlContext()))
except AttributeError: logger.error("Инструмент UrlContext не поддерживается вашей версией библиотеки.")
system_instruction_to_use=None; api_history_content=[]
if impersonation_mode:
my_name=get_display_name(self.me); chat_history_text=await self._get_recent_chat_text(chat_id); system_instruction_to_use=self.config["impersonation_prompt"].format(my_name=my_name, chat_history=chat_history_text)
raw_history=self._get_structured_history(chat_id, gauto=True); api_history_content=[glm.Content(role=e["role"], parts=[glm.Part(text=e['content'])]) for e in raw_history]
else:
system_instruction_val=self.config["system_instruction"]; system_instruction_to_use=(system_instruction_val.strip() if isinstance(system_instruction_val, str) else "") or None
raw_history=self._get_structured_history(chat_id, gauto=False)
if regeneration: raw_history=raw_history[:-2]
api_history_content=[glm.Content(role=e["role"], parts=[glm.Part(text=e['content'])]) for e in raw_history]
full_request_content=list(api_history_content)
if not impersonation_mode:
from datetime import datetime
try: user_timezone=pytz.timezone(self.config["timezone"])
except pytz.UnknownTimeZoneError: user_timezone=pytz.utc
now=datetime.now(user_timezone); time_str=now.strftime("%Y-%m-%d %H:%M:%S %Z"); time_note=f"[System note: Current time is {time_str}]"
text_part_found=False
for p in parts:
if hasattr(p, 'text'): p.text=f"{time_note}\n\n{p.text}"; text_part_found=True; break
if not text_part_found: parts.insert(0, glm.Part(text=time_note))
api_key = self.api_keys[self.current_api_key_index]
if regeneration:
current_turn_parts,request_text_for_display=self.last_requests.get(f"{chat_id}:{base_message_id}", (parts, "[регенерация]"))
current_turn_parts, request_text_for_display = self.last_requests.get(f"{chat_id}:{base_message_id}", (parts, "[регенерация]"))
else:
current_turn_parts=parts; request_text_for_display=display_prompt or (self.strings["media_reply_placeholder"] if any("inline_data" in str(p) for p in parts) else ""); self.last_requests[f"{chat_id}:{base_message_id}"]=(current_turn_parts, request_text_for_display)
if current_turn_parts: full_request_content.append(glm.Content(role="user", parts=current_turn_parts))
if not full_request_content and not system_instruction_to_use:
if not impersonation_mode and status_msg: await utils.answer(status_msg, self.strings["no_prompt_or_media"])
return None if impersonation_mode else ""
response = None
error_to_report = None
max_retries = len(self.api_keys)
for i in range(max_retries):
current_key_index = (self.current_api_key_index + i) % max_retries
api_key = self.api_keys[current_key_index]
current_turn_parts = parts
request_text_for_display = display_prompt or (self.strings["media_reply_placeholder"] if any("inline_data" in str(p) for p in parts) else "")
self.last_requests[f"{chat_id}:{base_message_id}"] = (current_turn_parts, request_text_for_display)
has_media = False
for p in current_turn_parts:
if hasattr(p, 'inline_data') and p.inline_data:
has_media = True; break
should_use_new_sdk = (self.config["google_search"] or use_url_context) and NEW_SDK_AVAILABLE and not has_media
result_text = ""; was_successful = False; search_icon = ""
if should_use_new_sdk:
try:
genai.configure(api_key=api_key)
sanitized_model_name = self.config["model_name"].lower().replace(" ", "-")
model = genai.GenerativeModel(
sanitized_model_name,
safety_settings=self.safety_settings,
system_instruction=system_instruction_to_use
)
api_response = await asyncio.wait_for(
model.generate_content_async(full_request_content, tools=tools_list or None),
timeout=GEMINI_TIMEOUT
)
response = api_response
self.current_api_key_index = current_key_index
break
except google_exceptions.GoogleAPIError as e:
msg = str(e)
if "quota" in msg.lower() or "exceeded" in msg.lower():
if max_retries == 1:
error_to_report = e
break
logger.warning(f"Ключ Gemini API №{current_key_index + 1} исчерпал квоту. Пробую следующий.")
if i == max_retries - 1:
error_to_report = RuntimeError("Все ключи исчерпали квоту.")
continue
client = google_genai_sdk.Client(api_key=api_key); sys_instruct = None
if impersonation_mode:
my_name = get_display_name(self.me); chat_history_text = await self._get_recent_chat_text(chat_id)
sys_instruct = self.config["impersonation_prompt"].format(my_name=my_name, chat_history=chat_history_text)
else:
error_to_report = e
break
sys_val = self.config["system_instruction"]; sys_instruct = (sys_val.strip() if isinstance(sys_val, str) else "") or None
new_contents = []; raw_hist = self._get_structured_history(chat_id, gauto=impersonation_mode)
if regeneration and raw_hist: raw_hist = raw_hist[:-2]
for item in raw_hist:
new_contents.append(new_types.Content(role=item['role'], parts=[new_types.Part(text=item['content'])]))
new_parts = []
for p in current_turn_parts:
if hasattr(p, 'text'): new_parts.append(new_types.Part(text=p.text))
new_contents.append(new_types.Content(role="user", parts=new_parts))
tools = []
if self.config["google_search"] or use_url_context:
tools.append(new_types.Tool(google_search=new_types.GoogleSearch()))
config_new = new_types.GenerateContentConfig(tools=tools if tools else None, temperature=self.config["temperature"], system_instruction=sys_instruct)
response = await asyncio.to_thread(client.models.generate_content, model=self.config["model_name"], contents=new_contents, config=config_new)
if response.text:
result_text = response.text; was_successful = True
if self.config["google_search"]: search_icon = " 🌐"
else: result_text = "⚠️ Пустой ответ от модели (возможно, блокировка безопасности)."
except Exception as e:
error_to_report = e
break
if error_to_report:
raise error_to_report
if response is None:
raise RuntimeError("Не удалось получить ответ от Gemini.")
result_text,was_successful="",False
try:
if response.prompt_feedback.block_reason: result_text=f"🚫 <b>Запрос был заблокирован Google.</b>\nПричина: <code>{response.prompt_feedback.block_reason.name}</code>."
except AttributeError: pass
if not result_text:
try:
result_text = re.sub(r"</?emoji[^>]*>", "", response.text)
was_successful=True
except ValueError:
reason="Неизвестная причина"
logger.error(f"New SDK Error: {e}")
if "quota" in str(e).lower(): raise e
was_successful = False
if not was_successful:
if (self.config["google_search"] or use_url_context) and not NEW_SDK_AVAILABLE and not has_media:
if status_msg: await utils.answer(status_msg, self.strings["new_sdk_missing"])
return None
tools_list = []
if use_url_context and not has_media:
try: tools_list.append(old_types.Tool(url_context=old_types.UrlContext()))
except AttributeError: pass
system_instruction_to_use = None; api_history_content = []
if impersonation_mode:
my_name = get_display_name(self.me); chat_history_text = await self._get_recent_chat_text(chat_id)
system_instruction_to_use = self.config["impersonation_prompt"].format(my_name=my_name, chat_history=chat_history_text)
raw_history = self._get_structured_history(chat_id, gauto=True)
api_history_content = [glm.Content(role=e["role"], parts=[glm.Part(text=e['content'])]) for e in raw_history]
else:
system_instruction_val = self.config["system_instruction"]
system_instruction_to_use = (system_instruction_val.strip() if isinstance(system_instruction_val, str) else "") or None
raw_history = self._get_structured_history(chat_id, gauto=False)
if regeneration: raw_history = raw_history[:-2]
api_history_content = [glm.Content(role=e["role"], parts=[glm.Part(text=e['content'])]) for e in raw_history]
full_request_content = list(api_history_content)
if not impersonation_mode:
from datetime import datetime
try: user_timezone = pytz.timezone(self.config["timezone"])
except pytz.UnknownTimeZoneError: user_timezone = pytz.utc
now = datetime.now(user_timezone); time_str = now.strftime("%Y-%m-%d %H:%M:%S %Z"); time_note = f"[System note: Current time is {time_str}]"
request_content_parts = list(current_turn_parts)
if request_content_parts and hasattr(request_content_parts[0], 'text'):
original_text = request_content_parts[0].text
request_content_parts[0] = glm.Part(text=f"{time_note}\n\n{original_text}")
else: request_content_parts.insert(0, glm.Part(text=time_note))
else: request_content_parts = current_turn_parts
if request_content_parts: full_request_content.append(glm.Content(role="user", parts=request_content_parts))
if not full_request_content and not system_instruction_to_use:
if not impersonation_mode and status_msg: await utils.answer(status_msg, self.strings["no_prompt_or_media"])
return None if impersonation_mode else ""
error_to_report = None; max_retries = len(self.api_keys)
generation_cfg = old_types.GenerationConfig(temperature=self.config["temperature"])
for i in range(max_retries):
current_key_index = (self.current_api_key_index + i) % max_retries
api_key = self.api_keys[current_key_index]
try:
if response.candidates: reason=response.candidates[0].finish_reason.name
except(IndexError, AttributeError): pass
result_text=f"❗️ Gemini не смог сгенерировать ответ.\nПричина завершения: <code>{reason}</code>."
if was_successful and self._is_memory_enabled(str(chat_id)): self._update_history(chat_id, current_turn_parts, result_text, regeneration, msg_obj, gauto=impersonation_mode)
old_genai.configure(api_key=api_key)
sanitized_model_name = self.config["model_name"].lower().replace(" ", "-")
model = old_genai.GenerativeModel(sanitized_model_name, safety_settings=self.safety_settings, system_instruction=system_instruction_to_use)
api_response = await asyncio.wait_for(model.generate_content_async(full_request_content, tools=tools_list or None, generation_config=generation_cfg), timeout=GEMINI_TIMEOUT)
response = api_response; self.current_api_key_index = current_key_index; break
except google_exceptions.GoogleAPIError as e:
msg = str(e)
if "quota" in msg.lower() or "exceeded" in msg.lower():
if max_retries == 1: error_to_report = e; break
logger.warning(f"Ключ №{current_key_index + 1} исчерпал квоту.")
if i == max_retries - 1: error_to_report = RuntimeError("Все ключи исчерпали квоту."); continue
else: error_to_report = e; break
except Exception as e: error_to_report = e; break
if error_to_report: raise error_to_report
if response is None: raise RuntimeError("Не удалось получить ответ.")
try:
if response.prompt_feedback.block_reason: result_text = f"🚫 <b>Запрос заблокирован.</b>\nПричина: <code>{response.prompt_feedback.block_reason.name}</code>."
except AttributeError: pass
if not result_text:
try:
result_text = re.sub(r"</?emoji[^>]*>", "", response.text); was_successful = True
except ValueError: result_text = "❗️ Gemini не смог сгенерировать ответ (возможно, только safety ratings)."
if was_successful and self._is_memory_enabled(str(chat_id)):
self._update_history(chat_id, current_turn_parts, result_text, regeneration, msg_obj, gauto=impersonation_mode)
if impersonation_mode: return result_text if was_successful else None
hist_len_pairs=len(self._get_structured_history(chat_id, gauto=False)) // 2; limit=self.config["max_history_length"]; mem_indicator=self.strings["memory_status_unlimited"].format(hist_len_pairs) if limit <= 0 else self.strings["memory_status"].format(hist_len_pairs, limit)
question_html=f"<blockquote>{utils.escape_html(request_text_for_display[:200])}</blockquote>"; response_html=self._markdown_to_html(result_text); formatted_body=self._format_response_with_smart_separation(response_html)
header=f"{mem_indicator}\n\n{self.strings['question_prefix']}\n{question_html}\n\n{self.strings['response_prefix']}\n"; text_to_send=f"{header}{formatted_body}"
buttons=self._get_inline_buttons(chat_id, base_message_id) if self.config["interactive_buttons"] else None
hist_len_pairs = len(self._get_structured_history(chat_id, gauto=False)) // 2
limit = self.config["max_history_length"]
mem_indicator = self.strings["memory_status_unlimited"].format(hist_len_pairs) if limit <= 0 else self.strings["memory_status"].format(hist_len_pairs, limit)
question_html = f"<blockquote>{utils.escape_html(request_text_for_display[:200])}</blockquote>"
response_html = self._markdown_to_html(result_text)
formatted_body = self._format_response_with_smart_separation(response_html)
header = f"{mem_indicator}\n\n{self.strings['question_prefix']}\n{question_html}\n\n{self.strings['response_prefix']}{search_icon}\n"
text_to_send = f"{header}{formatted_body}"
buttons = self._get_inline_buttons(chat_id, base_message_id) if self.config["interactive_buttons"] else None
if len(text_to_send) > 4096:
file_content=(f"Вопрос: {display_prompt}\n\n════════════════════\n\nОтвет Gemini:\n{result_text}")
file=io.BytesIO(file_content.encode("utf-8")); file.name="Gemini_response.txt"
file_content = (f"Вопрос: {display_prompt}\n\n════════════════════\n\nОтвет Gemini:\n{result_text}")
file = io.BytesIO(file_content.encode("utf-8")); file.name = "Gemini_response.txt"
if call:
await call.answer("Ответ слишком длинный, отправляю файлом...", show_alert=False); await self.client.send_file(call.chat_id, file, caption=self.strings["response_too_long"], reply_to=call.message_id); await call.edit(f"{self.strings['response_too_long']}", reply_markup=None)
await call.answer("Ответ длинный, отправляю файлом...", show_alert=False); await self.client.send_file(call.chat_id, file, caption=self.strings["response_too_long"], reply_to=call.message_id); await call.edit(f"{self.strings['response_too_long']}", reply_markup=None)
elif status_msg:
await status_msg.delete(); await self.client.send_file(chat_id, file, caption=self.strings["response_too_long"], reply_to=base_message_id)
else:
if call: await call.edit(text_to_send, reply_markup=buttons)
elif status_msg: await utils.answer(status_msg, text_to_send, reply_markup=buttons)
except Exception as e:
error_text=self._handle_error(e)
if impersonation_mode: logger.error(f"Gauto | Ошибка авто-ответа: {error_text}")
error_text = self._handle_error(e)
if impersonation_mode: logger.error(f"Gauto | Ошибка: {error_text}")
elif call: await call.edit(error_text, reply_markup=None)
elif status_msg: await utils.answer(status_msg, error_text)
return None if impersonation_mode else ""
@@ -491,10 +551,13 @@ class Gemini(loader.Module):
current_key_index = (self.current_api_key_index + i) % max_retries
api_key = self.api_keys[current_key_index]
try:
genai.configure(api_key=api_key)
old_genai.configure(api_key=api_key)
sanitized_model_name = self.config["model_name"].lower().replace(" ", "-")
model = genai.GenerativeModel(sanitized_model_name, safety_settings=self.safety_settings)
api_response = await asyncio.wait_for(model.generate_content_async(full_prompt), timeout=GEMINI_TIMEOUT)
generation_cfg = old_types.GenerationConfig(
temperature=self.config["temperature"]
)
model = old_genai.GenerativeModel(sanitized_model_name, safety_settings=self.safety_settings)
api_response = await asyncio.wait_for(model.generate_content_async(full_prompt, generation_config=generation_cfg), timeout=GEMINI_TIMEOUT)
response = api_response
self.current_api_key_index = current_key_index
break
@@ -526,6 +589,41 @@ class Gemini(loader.Module):
except Exception as e:
await utils.answer(status_msg, self._handle_error(e))
@loader.command()
async def gprompt(self, message: Message):
"""[текст / -c / ответ на файл] — [-c (очистить)] / (ничего. увидеть промпт) Установить системный промпт (инструкцию/system_instruction)."""
args = utils.get_args_raw(message)
reply = await message.get_reply_message()
if args == "-c":
self.config["system_instruction"] = ""
return await utils.answer(message, self.strings["gprompt_cleared"])
new_prompt = None
if reply and reply.file:
if reply.file.size > 1024 * 1024:
return await utils.answer(message, self.strings["gprompt_file_too_big"])
try:
file_data = await self.client.download_file(reply.media, bytes)
try:
new_prompt = file_data.decode("utf-8")
except UnicodeDecodeError:
return await utils.answer(message, self.strings["gprompt_not_text"])
except Exception as e:
return await utils.answer(message, self.strings["gprompt_file_error"].format(e))
elif args:
new_prompt = args
if new_prompt is not None:
self.config["system_instruction"] = new_prompt
return await utils.answer(message, self.strings["gprompt_updated"].format(len(new_prompt)))
current_prompt = self.config["system_instruction"]
if not current_prompt:
return await utils.answer(message, self.strings["gprompt_usage"])
if len(current_prompt) > 4000:
file = io.BytesIO(current_prompt.encode("utf-8"))
file.name = "system_instruction.txt"
await utils.answer(message, self.strings["gprompt_current"], file=file)
else:
await utils.answer(message, f"{self.strings['gprompt_current']}\n<code>{utils.escape_html(current_prompt)}</code>")
@loader.command()
async def gauto(self, message: Message):
"""<on/off/[id]> — Вкл/выкл авто-ответ в чате."""
@@ -811,9 +909,9 @@ class Gemini(loader.Module):
status_msg = await utils.answer(message, self.strings["processing"])
try:
api_key = self.api_keys[self.current_api_key_index]
genai.configure(api_key=api_key)
old_genai.configure(api_key=api_key)
models_list = []
for model_obj in genai.list_models():
for model_obj in old_genai.list_models():
model_name = model_obj.name
display_name = model_obj.display_name or "Неизвестно"
methods = ", ".join(model_obj.supported_generation_methods) if model_obj.supported_generation_methods else "Нет"
@@ -875,8 +973,9 @@ class Gemini(loader.Module):
if message.is_private and not self.config["gauto_in_pm"]:
return
is_from_self_user = isinstance(message.from_id, types.PeerUser) and message.from_id.user_id == self.me.id
is_command = message.text and message.text.startswith(self.get_prefix())
if message.out or is_from_self_user or is_command:
if message.out or is_from_self_user:
return
if message.text and message.text.startswith(self.get_prefix()):
return
sender = await message.get_sender()
is_sender_a_bot = isinstance(sender, types.User) and sender.bot
@@ -891,8 +990,20 @@ class Gemini(loader.Module):
return
response_text = await self._send_to_gemini(message=message, parts=parts, impersonation_mode=True)
if response_text and response_text.strip():
await asyncio.sleep(random.uniform(1.0, 2.5))
await message.reply(response_text.strip())
clean_text = response_text.strip()
reaction_delay = random.uniform(2.0, 10.0)
await asyncio.sleep(reaction_delay)
try:
await self.client.send_read_acknowledge(message.chat_id, message=message)
except Exception:
pass
char_count = len(clean_text)
typing_speed = random.uniform(0.1, 0.25)
typing_duration = char_count * typing_speed
typing_duration = max(1.5, min(typing_duration, 25.0))
async with message.client.action(message.chat_id, "typing"):
await asyncio.sleep(typing_duration)
await message.reply(clean_text)
def _load_history_from_db(self, db_key: str) -> dict:
raw_conversations=self.db.get(self.strings["name"], db_key, {})