hendrik/services/telegram_client.py

276 lines
10 KiB
Python

import asyncio
import random
import re
import threading
import time
from datetime import datetime
import config
from services.session_manager import SessionManager
from services.agent_loop import run_agent_loop
from scripts.persona import PERSONALITY
from tools.roleplayer import _name_mentioned
def _ts():
return datetime.now().strftime('%H:%M:%S')
def _strip_bot_mention(text: str, bot_username: str) -> str:
if not bot_username:
return text
pattern = re.compile(r'@' + re.escape(bot_username), re.IGNORECASE)
return pattern.sub('', text).strip()
class TelegramClient:
def __init__(self, token, llm_client, tools_definition, TOOLS,
TOOL_HANDLERS, build_system_prompt, agent_max_iterations,
allowed_group_ids=None):
self._token = token
self._llm = llm_client
self._tools_def = tools_definition
self._TOOLS = TOOLS
self._TOOL_HANDLERS = TOOL_HANDLERS
self._build_system_prompt = build_system_prompt
self._max_iterations = agent_max_iterations
self._allowed_group_ids = allowed_group_ids or []
self._skill = config.AGENT_SKILL
self._bot_username = ""
self._session_mgr = SessionManager()
self._loop = None
self._stopped = None
from telegram.ext import Application
self._app = Application.builder().token(token).build()
def start(self):
print(f'[{_ts()}] Starting Telegram service...')
asyncio.run(self._async_run())
def stop(self):
if self._loop and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop)
async def _async_stop(self):
self._stopped.set()
async def _async_run(self):
self._loop = asyncio.get_running_loop()
self._stopped = asyncio.Event()
bot_user = await self._app.bot.get_me()
self._bot_username = bot_user.username or ""
print(f'[{_ts()}] Telegram bot: @{self._bot_username}')
self._register_handlers()
await self._app.initialize()
await self._app.start()
await self._app.updater.start_polling()
print(f'[{_ts()}] Telegram bot is polling')
try:
await self._stopped.wait()
except (asyncio.CancelledError, KeyboardInterrupt):
pass
await self._app.updater.stop()
await self._app.stop()
await self._app.shutdown()
print(f'[{_ts()}] Telegram service stopped')
def _register_handlers(self):
from telegram.ext import MessageHandler, filters, CommandHandler
self._app.add_handler(
MessageHandler(
filters.TEXT & ~filters.COMMAND & filters.ChatType.PRIVATE,
self._on_private_message
)
)
self._app.add_handler(
MessageHandler(
filters.TEXT & ~filters.COMMAND & filters.ChatType.GROUPS,
self._on_group_message
)
)
self._app.add_handler(CommandHandler('start', self._on_start_command))
self._app.add_handler(CommandHandler('new', self._on_new_command))
async def _on_start_command(self, update, context):
await update.message.reply_text(
'Halo! Aku adalah asisten AI. Kirim pesan untuk memulai percakapan.'
)
async def _on_new_command(self, update, context):
chat_id = str(update.effective_chat.id)
self._session_mgr.reset(chat_id)
await update.message.reply_text('Memulai sesi baru. Ada yang bisa dibantu?')
async def _on_private_message(self, update, context):
chat_id = update.effective_chat.id
text = update.message.text.strip()
if not text:
return
msg_id = update.message.message_id
print(f'[{_ts()}] Telegram DM from {chat_id}: {text[:60]}')
threading.Thread(
target=self._process_message,
args=(chat_id, text, 'private', '', msg_id),
daemon=True
).start()
async def _on_group_message(self, update, context):
chat_id = update.effective_chat.id
chat_id_str = str(chat_id)
if self._allowed_group_ids and chat_id_str not in self._allowed_group_ids:
return
text = update.message.text.strip()
if not text:
return
replied_to_bot = (
update.message.reply_to_message
and update.message.reply_to_message.from_user
and update.message.reply_to_message.from_user.id == context.bot.id
)
has_mention = False
if update.message.entities:
for ent in update.message.entities:
if ent.type == 'mention' and self._bot_username:
start = ent.offset
end = ent.offset + ent.length
mention_text = update.message.text[start:end]
if mention_text.lower() == f'@{self._bot_username.lower()}':
has_mention = True
break
elif ent.type == 'text_mention' and ent.user.id == context.bot.id:
has_mention = True
break
name_mentioned = _name_mentioned(PERSONALITY.name, text)
if not replied_to_bot and not has_mention and not name_mentioned:
print(f'[{_ts()}] Telegram Group [{chat_id}] NO-REPLY: {text[:60]}')
return
if has_mention and self._bot_username:
text = _strip_bot_mention(text, self._bot_username)
if not text:
return
msg_id = update.message.message_id
sender = update.effective_user
sender_name = sender.full_name or sender.username or str(sender.id) if sender else str(chat_id)
print(f'[{_ts()}] Telegram Group [{chat_id}] <{sender_name}>: {text[:60]}')
threading.Thread(
target=self._process_message,
args=(chat_id, text, 'group', sender_name, msg_id),
daemon=True
).start()
def _process_message(self, chat_id, body, chat_type, sender_name, reply_to_msg_id):
session = self._session_mgr.get_or_create(
str(chat_id), self._build_system_prompt(
tools_definition=self._tools_def,
character=config.AGENT_CHARACTER or None,
skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None,
)
)
session.cancel_timer()
if body in (':new', '/new'):
self._session_mgr.reset(str(chat_id))
print(f'[{_ts()}] Session reset for {chat_id}')
self._schedule_send(chat_id, 'Memulai sesi baru. Ada yang bisa dibantu?', reply_to_msg_id)
return
session.add_message('user', body)
is_roleplay = self._skill == 'roleplayer'
asyncio.run_coroutine_threadsafe(
self._app.bot.send_chat_action(chat_id=chat_id, action='typing'),
self._loop
)
delay = random.uniform(config.READ_DELAY_MIN, config.READ_DELAY_MAX)
time.sleep(delay)
def on_tool_calls(tnames):
info = f'Using: {", ".join(tnames)}'
print(f'[{_ts()}] {info}')
if not is_roleplay:
self._schedule_send(chat_id, info, reply_to_msg_id)
final_content = run_agent_loop(
session, self._llm, self._TOOLS, self._TOOL_HANDLERS,
self._max_iterations, on_tool_calls=on_tool_calls
)
if final_content is not None:
if is_roleplay:
my_name = PERSONALITY.name
if config.TELEGRAM_SELECTIVE_RESPONSE:
recent_msgs = []
for msg in session.messages[-6:]:
if msg.get('role') == 'user':
recent_msgs.append(f"User: {msg.get('content', '')}")
elif msg.get('role') == 'assistant' and msg.get('content'):
recent_msgs.append(f"{my_name}: {msg.get('content', '')}")
recent_history = "\n".join(recent_msgs)
from tools.roleplayer import should_respond
if should_respond(
message=body,
sender_nickname=sender_name or str(chat_id),
recent_history=recent_history,
my_name=my_name,
):
print(f'[{_ts()}] need_response=True → sending response')
self._schedule_send(chat_id, final_content, reply_to_msg_id)
else:
print(f'[{_ts()}] need_response=False → staying silent')
else:
from tools.roleplayer import _name_mentioned
if _name_mentioned(my_name, body):
print(f'[{_ts()}] Name mentioned → sending response')
self._schedule_send(chat_id, final_content, reply_to_msg_id)
else:
print(f'[{_ts()}] Name not mentioned → staying silent')
else:
self._schedule_send(chat_id, final_content, reply_to_msg_id)
else:
msg = 'Max iterations reached without final answer.'
self._schedule_send(chat_id, msg, reply_to_msg_id)
timeout = 86400 if chat_type == 'private' else 300
session.start_timer(timeout, self._timeout_session, chat_id)
def _schedule_send(self, chat_id, text, reply_to_msg_id=None):
if self._loop and not self._loop.is_closed():
char_count = len(text) if text else 0
sleep_delay = max(1.0, min(char_count / config.TYPING_SPEED, config.TYPING_MAX))
print(f'[{_ts()}] Typing delay: {sleep_delay:.1f}s ({char_count} chars)')
time.sleep(sleep_delay)
asyncio.run_coroutine_threadsafe(
self._app.bot.send_message(
chat_id=chat_id,
text=text,
reply_to_message_id=reply_to_msg_id
),
self._loop
)
else:
print(f'[{_ts()}] WARNING: cannot send to {chat_id} — loop unavailable')
def _timeout_session(self, chat_id):
print(f'[{_ts()}] Session timeout: {chat_id}')
self._schedule_send(chat_id, 'Sesi ditutup. Sampai jumpa')
self._session_mgr.reset(str(chat_id))