import asyncio import random import re import threading import time from datetime import datetime import config from services.session_manager import SessionManager from services.agent_loop import run_agent_loop from scripts.persona import PERSONALITY from tools.roleplayer import _name_mentioned def _ts(): return datetime.now().strftime('%H:%M:%S') def _strip_bot_mention(text: str, bot_username: str) -> str: if not bot_username: return text pattern = re.compile(r'@' + re.escape(bot_username), re.IGNORECASE) return pattern.sub('', text).strip() class TelegramClient: def __init__(self, token, llm_client, tools_definition, TOOLS, TOOL_HANDLERS, build_system_prompt, agent_max_iterations, allowed_group_ids=None): self._token = token self._llm = llm_client self._tools_def = tools_definition self._TOOLS = TOOLS self._TOOL_HANDLERS = TOOL_HANDLERS self._build_system_prompt = build_system_prompt self._max_iterations = agent_max_iterations self._allowed_group_ids = allowed_group_ids or [] self._skill = config.AGENT_SKILL self._bot_username = "" self._session_mgr = SessionManager() self._loop = None self._stopped = None from telegram.ext import Application self._app = Application.builder().token(token).build() def start(self): print(f'[{_ts()}] Starting Telegram service...') asyncio.run(self._async_run()) def stop(self): if self._loop and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop) async def _async_stop(self): self._stopped.set() async def _async_run(self): self._loop = asyncio.get_running_loop() self._stopped = asyncio.Event() bot_user = await self._app.bot.get_me() self._bot_username = bot_user.username or "" print(f'[{_ts()}] Telegram bot: @{self._bot_username}') self._register_handlers() await self._app.initialize() await self._app.start() await self._app.updater.start_polling() print(f'[{_ts()}] Telegram bot is polling') try: await self._stopped.wait() except (asyncio.CancelledError, KeyboardInterrupt): pass await self._app.updater.stop() await self._app.stop() await self._app.shutdown() print(f'[{_ts()}] Telegram service stopped') def _register_handlers(self): from telegram.ext import MessageHandler, filters, CommandHandler self._app.add_handler( MessageHandler( filters.TEXT & ~filters.COMMAND & filters.ChatType.PRIVATE, self._on_private_message ) ) self._app.add_handler( MessageHandler( filters.TEXT & ~filters.COMMAND & filters.ChatType.GROUPS, self._on_group_message ) ) self._app.add_handler(CommandHandler('start', self._on_start_command)) self._app.add_handler(CommandHandler('new', self._on_new_command)) async def _on_start_command(self, update, context): await update.message.reply_text( 'Halo! Aku adalah asisten AI. Kirim pesan untuk memulai percakapan.' ) async def _on_new_command(self, update, context): chat_id = str(update.effective_chat.id) self._session_mgr.reset(chat_id) await update.message.reply_text('Memulai sesi baru. Ada yang bisa dibantu?') async def _on_private_message(self, update, context): chat_id = update.effective_chat.id text = update.message.text.strip() if not text: return msg_id = update.message.message_id print(f'[{_ts()}] Telegram DM from {chat_id}: {text[:60]}') threading.Thread( target=self._process_message, args=(chat_id, text, 'private', '', msg_id), daemon=True ).start() async def _on_group_message(self, update, context): chat_id = update.effective_chat.id chat_id_str = str(chat_id) if self._allowed_group_ids and chat_id_str not in self._allowed_group_ids: return text = update.message.text.strip() if not text: return replied_to_bot = ( update.message.reply_to_message and update.message.reply_to_message.from_user and update.message.reply_to_message.from_user.id == context.bot.id ) has_mention = False if update.message.entities: for ent in update.message.entities: if ent.type == 'mention' and self._bot_username: start = ent.offset end = ent.offset + ent.length mention_text = update.message.text[start:end] if mention_text.lower() == f'@{self._bot_username.lower()}': has_mention = True break elif ent.type == 'text_mention' and ent.user.id == context.bot.id: has_mention = True break name_mentioned = _name_mentioned(PERSONALITY.name, text) if not replied_to_bot and not has_mention and not name_mentioned: print(f'[{_ts()}] Telegram Group [{chat_id}] NO-REPLY: {text[:60]}') return if has_mention and self._bot_username: text = _strip_bot_mention(text, self._bot_username) if not text: return msg_id = update.message.message_id sender = update.effective_user sender_name = sender.full_name or sender.username or str(sender.id) if sender else str(chat_id) print(f'[{_ts()}] Telegram Group [{chat_id}] <{sender_name}>: {text[:60]}') threading.Thread( target=self._process_message, args=(chat_id, text, 'group', sender_name, msg_id), daemon=True ).start() def _process_message(self, chat_id, body, chat_type, sender_name, reply_to_msg_id): session = self._session_mgr.get_or_create( str(chat_id), self._build_system_prompt( tools_definition=self._tools_def, character=config.AGENT_CHARACTER or None, skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None, ) ) session.cancel_timer() if body in (':new', '/new'): self._session_mgr.reset(str(chat_id)) print(f'[{_ts()}] Session reset for {chat_id}') self._schedule_send(chat_id, 'Memulai sesi baru. Ada yang bisa dibantu?', reply_to_msg_id) return session.add_message('user', body) is_roleplay = self._skill == 'roleplayer' asyncio.run_coroutine_threadsafe( self._app.bot.send_chat_action(chat_id=chat_id, action='typing'), self._loop ) delay = random.uniform(config.READ_DELAY_MIN, config.READ_DELAY_MAX) time.sleep(delay) def on_tool_calls(tnames): info = f'Using: {", ".join(tnames)}' print(f'[{_ts()}] {info}') if not is_roleplay: self._schedule_send(chat_id, info, reply_to_msg_id) final_content = run_agent_loop( session, self._llm, self._TOOLS, self._TOOL_HANDLERS, self._max_iterations, on_tool_calls=on_tool_calls ) if final_content is not None: if is_roleplay: my_name = PERSONALITY.name if config.TELEGRAM_SELECTIVE_RESPONSE: recent_msgs = [] for msg in session.messages[-6:]: if msg.get('role') == 'user': recent_msgs.append(f"User: {msg.get('content', '')}") elif msg.get('role') == 'assistant' and msg.get('content'): recent_msgs.append(f"{my_name}: {msg.get('content', '')}") recent_history = "\n".join(recent_msgs) from tools.roleplayer import should_respond if should_respond( message=body, sender_nickname=sender_name or str(chat_id), recent_history=recent_history, my_name=my_name, ): print(f'[{_ts()}] need_response=True → sending response') self._schedule_send(chat_id, final_content, reply_to_msg_id) else: print(f'[{_ts()}] need_response=False → staying silent') else: from tools.roleplayer import _name_mentioned if _name_mentioned(my_name, body): print(f'[{_ts()}] Name mentioned → sending response') self._schedule_send(chat_id, final_content, reply_to_msg_id) else: print(f'[{_ts()}] Name not mentioned → staying silent') else: self._schedule_send(chat_id, final_content, reply_to_msg_id) else: msg = 'Max iterations reached without final answer.' self._schedule_send(chat_id, msg, reply_to_msg_id) timeout = 86400 if chat_type == 'private' else 300 session.start_timer(timeout, self._timeout_session, chat_id) def _schedule_send(self, chat_id, text, reply_to_msg_id=None): if self._loop and not self._loop.is_closed(): char_count = len(text) if text else 0 sleep_delay = max(1.0, min(char_count / config.TYPING_SPEED, config.TYPING_MAX)) print(f'[{_ts()}] Typing delay: {sleep_delay:.1f}s ({char_count} chars)') time.sleep(sleep_delay) asyncio.run_coroutine_threadsafe( self._app.bot.send_message( chat_id=chat_id, text=text, reply_to_message_id=reply_to_msg_id ), self._loop ) else: print(f'[{_ts()}] WARNING: cannot send to {chat_id} — loop unavailable') def _timeout_session(self, chat_id): print(f'[{_ts()}] Session timeout: {chat_id}') self._schedule_send(chat_id, 'Sesi ditutup. Sampai jumpa') self._session_mgr.reset(str(chat_id))