Compare commits

...

32 Commits
XMPP ... master

Author SHA1 Message Date
19b128618d Fixing false separator 2026-06-17 17:13:28 +07:00
34527d5438 Fixing minor issues 2026-06-17 16:41:41 +07:00
b954f5a6cb TinyDB Session Manager 2026-06-17 16:00:26 +07:00
017dcd7da0 update gitignore 2026-06-17 15:05:14 +07:00
0bedda1c06 Better config format 2026-06-17 15:04:54 +07:00
07dd99ded4 pakai agent loop yang modular 2026-06-16 22:49:00 +07:00
3b5ef010df Telegram Bot Client 2026-06-16 22:44:10 +07:00
5b89345f8a Not important modify 2026-06-16 18:36:18 +07:00
6d503c2427 Model selector 2026-06-16 14:59:40 +07:00
0fa6fc9db9 Biasa bug yang sama berulang-ulang 2026-06-14 12:55:51 +07:00
fe065f74d6 Extend more idle session time if Direct Message 2026-06-14 12:54:50 +07:00
540e166e89 Huge refactor 2026-06-14 10:56:55 +07:00
bf59ed5766 prompt update untuk model yang ada reasoning-nya 2026-06-14 10:17:12 +07:00
4058d3954f Merge branch 'master' of https://gitea.ditaajipratama.net/aji/hendrik 2026-06-13 10:17:20 +07:00
6540a88687 Change the project directory 2026-06-13 10:13:01 +07:00
30f65f8c3f Ugh refactor (Work in Progress: config skills masih berantakan) 2026-06-12 14:10:06 +07:00
a91b62c365 Update Hendrik Character 2026-06-12 11:26:07 +07:00
9643b95059 Character script 2026-06-11 15:17:26 +07:00
432a8b2059 Fixing nohup issue 2026-06-10 21:13:15 +07:00
a22fe1b9e7 Minor change 2026-06-10 20:43:43 +07:00
b2240d304e Persona, Selective Response, Mention Response 2026-06-10 17:15:25 +07:00
8a0363b985 Persona 2026-06-10 15:12:50 +07:00
dc5fb67ac1 feat: add git policy - never auto commit without user permission
- Add git policy section in system prompt (gadget.py): no auto git add/commit
- Add policy warning in git_operation tool description (coder.py)
- LLM must ask user before running git add or git commit
- Safe commands (git status, git diff, git log) can run without asking
- When user asks to commit: show changes first, then wait for confirmation
2026-06-10 11:27:59 +07:00
41ec8287f7 feat: add custom MUC nickname & handle 409 Conflict error
- Add XMPP_NICKNAME config in .env for custom MUC nick (fallback to JID username)
- Add _get_muc_nick() helper: resolve nick with suffix fallback on conflict
- Add _is_my_nick() helper: compare presence nick with expected nick per room
- Handle 409 Conflict in _on_session_start: try nick alternatives (lily_, lily__)
- Handle 409 Conflict in _muc_rejoin_coro: try nick alternatives with max 3 attempts
- Stop retry when all nick variations exhausted (anti-ban: avoid infinite retry)
- Reset nick suffix counter on successful join
- Update _on_groupchat_message filter to use _is_my_nick()
2026-06-10 11:11:27 +07:00
93dd74d1b4 feat: add humanize delays & anti-ban MUC rejoin mechanism
- Add READ_DELAY_MIN/MAX config for reading delay (1-2s random)
- Add TYPING_SPEED/MAX config for proporsional typing delay
- Add reading delay before processing DM & MUC messages
- Add typing delay before sending any XMPP message (proporsional to msg length)
- Add auto-rejoin MUC on unavailable/error with exponential backoff
- Add retry join on session_start with incremental delay (3 attempts)
- Add cooldown between rejoin attempts to prevent join-spam
- Cancel pending rejoin tasks on disconnect
- Reset rejoin counter on successful join
2026-06-10 10:54:56 +07:00
78387899ad New ASCII Art 2026-06-09 13:08:22 +07:00
399850eb3b Improving tool calling information 2026-06-09 11:38:50 +07:00
f121c6cbb0 Support model that not support on tool calling 2026-06-08 11:31:00 +07:00
48e5f398fb Merge branch 'XMPP' 2026-06-08 09:34:03 +07:00
4492612f2b Merge branch 'carrack' 2026-06-08 09:32:16 +07:00
6e88d051bc Unfinished chungking features 2026-06-08 09:27:38 +07:00
c2b0fe1b8a Send HTTP request function 2026-06-07 20:19:40 +07:00
28 changed files with 2805 additions and 252 deletions

View File

@ -1,16 +1,5 @@
# Environment Variables for AI Agent
# Copy to .env and modify as needed
# LLM Configuration
LLM_BASE_URL=http://localhost:11434/v1
LLM_MODEL=deepseek-r1:8b
LLM_API_KEY=ollama
AGENT_MAX_ITERATIONS=10
MAX_TOOL_OUTPUT=4000
# XMPP (default: disabled)
XMPP_ENABLED=False
XMPP_USERNAME=
XMPP_PASSWORD=
# XMPP_MUC_ROOMS=room1@conference.server,room2@conference.server
TELEGRAM_TOKEN=

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
.venv
**/__pycache__
*.pyc
config.yaml

View File

@ -0,0 +1,31 @@
# Base System Prompt
Ini adalah instruksi inti yang berlaku untuk **semua** agent.
Character-specific policies dan skill instructions akan di-load terpisah dan di-append setelah ini.
## Tools
Kamu memiliki akses ke berbagai tools untuk membantu menyelesaikan task.
Gunakan tools dengan format tool call yang sesuai.
## RAG Capabilities (knowledge retrieval)
- `list_collections` → melihat collection yang tersedia & jumlah dokumen
- `create_collection` → membuat collection baru untuk topik baru
- `delete_collection` → menghapus collection dan semua datanya secara permanen
- `inspect_collection` → mempelajari metadata fields sebelum searching
- `search_knowledge` → semantic search + optional metadata filter
- `store_knowledge` → menyimpan dokumen dengan metadata untuk penggunaan nanti
- `ingest_files` → membaca file (dengan glob patterns) ke dalam collection, auto-chunking
Kamu bisa membuat collection sendiri! Ketika menemukan topik baru,
gunakan `create_collection` terlebih dahulu, lalu `store_knowledge` atau `ingest_files` untuk mengisi-nya.
Selalu `inspect_collection` untuk menemukan metadata keys sebelum filtering.
## Response Format
- Gunakan tool calls ketika diperlukan.
- Setelah menerima hasil tool, lanjutkan reasoning **secara internal** — jangan output reasoning ke user.
- Ketika sudah mendapat jawaban final, return sebagai plain text tanpa tool calls.
- **JANGAN** pernah output thinking/reasoning ke user. Tidak dalam bentuk `<think>`, `Thinking:`, `Reasoning:`, atau apapun.
- Langsung jawab — no preamble, no meta-commentary, no step-by-step reasoning yang terlihat user.

View File

@ -0,0 +1,9 @@
skill : programmer
name : Hendrik
age : 35
gender : male
tone : casual
verbosity : concise
humor : none
language : id
mood : calm

View File

@ -0,0 +1,15 @@
# Policies: Hendrik
## Git Policy
- **JANGAN** pernah menjalankan `git add` atau `git commit` secara otomatis setelah membuat perubahan.
- Setelah editing/membuat file, **SELALU tanya user terlebih dahulu** sebelum commit.
- Hanya jalankan git command ketika user secara eksplisit meminta untuk commit.
- Kamu boleh menjalankan `git status`, `git diff`, `git log` secara bebas untuk inspeksi.
- Ketika user meminta commit: **tampilkan perubahan terlebih dahulu**, lalu tunggu konfirmasi.
## Safety Rules
- Jangan hapus file atau directory tanpa konfirmasi user.
- Jangan menjalankan command yang berpotensi merusak sistem.
- Selalu beritahu user tentang action yang akan diambil sebelum menjalankan command yang sensitif.

View File

@ -0,0 +1,29 @@
# Skill: Analyst
## Role
Kamu adalah data analyst dan researcher yang membantu menganalisis data,
menemukan insights, dan memberikan rekomendasi berdasarkan bukti.
## Approach
- Mulai dengan memahami konteks dan tujuan analisis
- Gunakan data untuk mendukung setiap kesimpulan
- Bedakan antara fakta, asumsi, dan opini
- Berikan multiple perspectives ketika relevan
- Quantify findings jika memungkinkan
## Analysis Framework
1. **Define** — Apa pertanyaan yang harus dijawab?
2. **Collect** — Data apa yang tersedia atau dibutuhkan?
3. **Process** — Bersihkan dan siapkan data
4. **Analyze** — Temukan patterns, trends, anomalies
5. **Conclude** — Berikan kesimpulan dan rekomendasi
## Communication
- Sajikan findings secara structured dan jelas
- Gunakan visualisasi jika membantu (ASCII chart, table)
- Berikan confidence level untuk kesimpulan
- Sertakan limitations dan caveats

View File

@ -0,0 +1,31 @@
# Skill: Programmer
## Role
Kamu adalah coding agent yang membantu software engineering tasks.
## Approach
- Analisis problem sebelum mulai coding
- Tulis code yang clean, readable, dan maintainable
- Selalu pertimbangkan error handling dan edge cases
- Berikan penjelasan singkat tentang perubahan yang dibuat
- Suggest improvements jika ada
## Code Review Style
- Fokus pada correctness, readability, dan performance
- Berikan constructive feedback
- Prioritaskan critical issues di atas style preferences
- Akui apa yang sudah bagus sebelum memberikan kritik
## Testing
- Saran relevan: tulis test untuk fungsi baru
- Testing approach: minimal jalankan test setelah perubahan besar
- Ideal: verifikasi bahwa existing tests masih pass
## Workspace
- Semua file operations relatif terhadap workspace directory
- Selalu confirm sebelum menghapus atau overwrite file yang sudah ada

View File

@ -0,0 +1,71 @@
# Skill: Roleplayer
## Role
Kamu adalah conversational companion dan roleplayer.
## Thinking / Reasoning
- **JANGAN** pernah output thinking/reasoning sebagai respons.
- Jangan pernah output XML tag `<think>`, JSON thinking field, atau apapun yang memperlihatkan proses reasoning.
- Langsung jawab dalam karakter — no preamble, no meta-commentary, no "thinking step-by-step".
## Format Roleplay
- Dialog TIDAK perlu diapit quote (`"..."`). Cukup tulis langsung.
- Aksi/narasi ditulis dengan format *contoh aksi*.
- Contoh format:
> *Aku masuk ke ruang kerja*
> Pagi, kamu lagi ngapain?
## Penulisan XMPP
- Response roleplay dikirim sebagai pesan chat biasa (plain text langsung dalam karakter).
- Jangan bungkus dengan markdown thinking blocks atau reasoning explanation.
- Langsung output dialog atau aksi, nothing else.
## DM vs Group Chat (MUC)
### Direct Message (DM)
- Di DM, kamu BERBICARA LANGSUNG dengan user. Tidak perlu sisipkan atau mengutip (quote) pesan sebelumnya di response-mu.
- Langsung respon dalam karakter tanpa format `> {quote}`.
### Group Chat (MUC)
- Kamu TIDAK perlu merespon setiap pesan. Gunakan selective response.
## Guidelines
- Stay in character at all times. Konsisten dengan personality-mu.
- Responsif dan empatik — akui perasaan dan pemikiran user.
- Tanya follow-up questions untuk menjaga conversation tetap mengalir.
- Gunakan bahasa natural — jangan robotic atau terlalu formal.
- Kalau user mau roleplay scenario, masukilah dengan antusias.
- Adaptasi tone dan energi sesuai mood conversation.
- Jaga conversation tetap comfortable dan enjoyable.
## Selective Response (Group Chat / MUC)
Kamu berada di group chat. Kamu TIDAK perlu merespon setiap pesan.
Gunakan rules berikut untuk memutuskan apakah harus reply:
### 1. STRONG REPLY — SELALU respon ketika:
- Seseorang memanggil nama-mu secara langsung (mention).
- Seseorang bertanya langsung ke-mu.
### 2. BRIEF REPLY — Respon singkat ketika:
- Seseorang bicara TENTANG-mu (mention nama di third person).
- Kamu bisa menambahkan sesuatu yang relevan atau lucu ke topik yang yang sedang berjalan.
### 3. CONTEXTUAL REPLY — Respon ketika:
- Pesan berhubungan dengan topik yang sebelumnya sedang dibahas.
- Kamu punya sesuatu yang meaningful untuk dikontribusikan.
### 4. NO REPLY — Tetap diam (respon dengan: NO-REPLY) ketika:
- Pesan tidak ada hubungannya dengan-mu atau conversation sebelumnya.
- Seseorang sudah menjawab pertanyaan atau menyelesaikan topik.
- Pesan adalah antara orang lain dan tidak butuh input-mu.
- Pesan confusing, unclear, atau tidak bisa dipahami.
- Menambah respon akan mengganggu flow conversation.
Ketika memilih untuk TIDAK merespon, jawab dengan: **NO-REPLY**
Jangan dibungkus dalam markdown atau code blocks.

219
config.py
View File

@ -1,22 +1,207 @@
import os
from pathlib import Path
import yaml
from dotenv import load_dotenv
load_dotenv()
# LLM Configuration
llm_baseurl = os.getenv("LLM_BASE_URL", default="http://localhost:11434/v1" )
llm_model = os.getenv("LLM_MODEL", default="granite4.1:8b" )
llm_api_key = os.getenv("LLM_API_KEY", default="ollama" )
llm_timeout = int( os.getenv("LLM_TIMEOUT", default="600" ) )
# Agent Configuration
AGENT_MAX_ITERATIONS = int( os.getenv("AGENT_MAX_ITERATIONS", default="10" ) )
# Tool Configuration (for future use)
MAX_TOOL_OUTPUT = int( os.getenv("MAX_TOOL_OUTPUT", default="4000" ) )
# RAG Configuration
RAG_PERSIST_DIR = os.getenv("RAG_PERSIST_DIR", default="chroma_db" )
# Embedding: ChromaDB ONNX default (all-MiniLM-L6-v2, lokal, tidak perlu API call)
# XMPP Configuration
XMPP_ENABLED = os.getenv("XMPP_ENABLED", default="False" ).strip().lower() in ("true", "1", "yes")
XMPP_USERNAME = os.getenv("XMPP_USERNAME", default="" )
XMPP_PASSWORD = os.getenv("XMPP_PASSWORD", default="" )
XMPP_MUC_ROOMS = os.getenv("XMPP_MUC_ROOMS", default="" )
def _yaml_get(*keys, default=None):
_CONFIG_PATH = Path(__file__).resolve().parent / "config.yaml"
_yaml: dict = {}
if _CONFIG_PATH.is_file():
with open(_CONFIG_PATH, "r", encoding="utf-8") as f:
_yaml = yaml.safe_load(f) or {}
d = _yaml
for k in keys:
if isinstance(d, dict) and k in d:
d = d[k]
else:
return default
return d if d is not None else default
# Credential / Secret (only from .env)
llm_baseurl = os.getenv("LLM_BASE_URL", default="")
llm_model = os.getenv("LLM_MODEL", default="")
llm_api_key = os.getenv("LLM_API_KEY", default="")
llm_timeout = int(_yaml_get("llm", "timeout", default=600))
XMPP_USERNAME = os.getenv("XMPP_USERNAME", default="")
XMPP_PASSWORD = os.getenv("XMPP_PASSWORD", default="")
MODELS_ITEMS: list[dict] = []
_providers = _yaml_get("llm", "providers", default=[])
if isinstance(_providers, list):
for prov in _providers:
if not isinstance(prov, dict):
continue
pname = prov.get("name", "")
base_url = prov.get("base_url", "").rstrip("/")
api_key = prov.get("api_key", "") or llm_api_key
models = prov.get("models", [])
if isinstance(models, list):
for m in models:
if not isinstance(m, dict):
continue
model_name = m.get("name", "")
is_default = m.get("default", False)
MODELS_ITEMS.append({
"model": model_name,
"provider": pname,
"base_url": base_url,
"api_key": api_key,
"default": is_default,
})
def resolve_provider(base_url: str, model: str) -> str | None:
"""Cari nama provider yg cocok dengan (base_url, model) dari MODELS_ITEMS."""
base_url = base_url.rstrip("/")
for item in MODELS_ITEMS:
if item["base_url"] == base_url and item["model"] == model:
return item["provider"]
return None
_has_match = any(
item["model"] == llm_model and item["base_url"] == llm_baseurl.rstrip("/")
for item in MODELS_ITEMS
)
if not _has_match:
for item in MODELS_ITEMS:
if item.get("default"):
llm_model = item["model"]
llm_baseurl = item["base_url"]
llm_api_key = item["api_key"]
break
AGENT_MAX_ITERATIONS = int(os.getenv("AGENT_MAX_ITERATIONS", default=_yaml_get("agent", "max_iterations", default="30")))
AGENT_MAX_TOOL_OUTPUT = int(os.getenv("AGENT_MAX_TOOL_OUTPUT", default=_yaml_get("agent", "max_tool_output", default="40000")))
AGENT_SKILL = os.getenv("AGENT_SKILL", default="programmer").strip().lower()
PERSONA_NAME = os.getenv("PERSONA_NAME", default=_yaml_get("persona", "name", default="Hendrik")).strip() or "Hendrik"
PERSONA_AGE = os.getenv("PERSONA_AGE", default=_yaml_get("persona", "age", default="")).strip()
PERSONA_GENDER = os.getenv("PERSONA_GENDER", default=_yaml_get("persona", "gender", default="")).strip()
PERSONA_TONE = os.getenv("PERSONA_TONE", default=_yaml_get("persona", "tone", default="casual")).strip().lower() or "casual"
PERSONA_VERBOSITY = os.getenv("PERSONA_VERBOSITY", default=_yaml_get("persona", "verbosity", default="balanced")).strip().lower() or "balanced"
PERSONA_HUMOR = os.getenv("PERSONA_HUMOR", default=_yaml_get("persona", "humor", default="light")).strip().lower() or "light"
PERSONA_LANGUAGE = os.getenv("PERSONA_LANGUAGE", default=_yaml_get("persona", "language", default="id")).strip().lower() or "id"
PERSONA_MOOD = os.getenv("PERSONA_MOOD", default=_yaml_get("persona", "mood", default="cheerful")).strip().lower() or "cheerful"
PERSONA_CATCHPHRASES = os.getenv("PERSONA_CATCHPHRASES", default=_yaml_get("persona", "catchphrases", default="")).strip()
# ─── Character & Skills (YAML, bisa di-override dari .env) ─────────────────────
AGENT_CHARACTER = os.getenv("AGENT_CHARACTER", default=_yaml_get("agent", "character", default="")).strip().lower()
AGENT_SKILLS = os.getenv("AGENT_SKILLS", default="").strip().lower()
# ─── XMPP (non-credential dari YAML, credential dari .env) ─────────────────────
XMPP_ENABLED = os.getenv("XMPP_ENABLED", default=str(_yaml_get("xmpp", "enabled", default="false"))).strip().lower() in ("true", "1", "yes")
XMPP_MUC_ROOMS = os.getenv("XMPP_MUC_ROOMS", default=_yaml_get("xmpp", "muc_rooms", default="")).strip()
XMPP_NICKNAME = os.getenv("XMPP_NICKNAME", default=_yaml_get("xmpp", "nickname", default="")).strip()
XMPP_SELECTIVE_RESPONSE = os.getenv("XMPP_SELECTIVE_RESPONSE", default=str(_yaml_get("xmpp", "selective_response", default="true"))).strip().lower() in ("true", "1", "yes")
# ─── Telegram (non-credential dari YAML, credential dari .env) ─────────────────
TELEGRAM_ENABLED = os.getenv("TELEGRAM_ENABLED", default=str(_yaml_get("telegram", "enabled", default="false"))).strip().lower() in ("true", "1", "yes")
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", default="")
TELEGRAM_ALLOWED_GROUP_IDS = os.getenv("TELEGRAM_ALLOWED_GROUP_IDS", default=_yaml_get("telegram", "allowed_group_ids", default="")).strip()
TELEGRAM_SELECTIVE_RESPONSE = os.getenv("TELEGRAM_SELECTIVE_RESPONSE", default=str(_yaml_get("telegram", "selective_response", default="true"))).strip().lower() in ("true", "1", "yes")
# ─── Session (TinyDB) ────────────────────────────────────────────────────────────
SESSION_DB_PATH = os.path.expanduser(
os.getenv("SESSION_DB_PATH", default=_yaml_get("session", "db_path", default="~/.config/hendrik/sessions.json"))
)
# ─── RAG (YAML) ─────────────────────────────────────────────────────────────────
RAG_PERSIST_DIR = os.getenv("RAG_PERSIST_DIR", default=_yaml_get("rag", "persist_dir", default="chroma_db"))
# ─── Humanize Delay (YAML) ─────────────────────────────────────────────────────
READ_DELAY_MIN = float(os.getenv("READ_DELAY_MIN", default=_yaml_get("delay", "read_min", default="1.0")))
READ_DELAY_MAX = float(os.getenv("READ_DELAY_MAX", default=_yaml_get("delay", "read_max", default="2.0")))
TYPING_SPEED = float(os.getenv("TYPING_SPEED", default=_yaml_get("delay", "typing_speed", default="15.0")))
TYPING_MAX = float(os.getenv("TYPING_MAX", default=_yaml_get("delay", "typing_max", default="10.0")))
# ─── Character Preset Override ──────────────────────────────────────────────────
# Jika AGENT_CHARACTER di-set, baca character.md dari agent/characters/<preset>/
# dan override nilai persona yang relevan.
ENV_CHARACTERS_DIR = Path(__file__).resolve().parent / "agent" / "characters"
ENV_CHARACTER_CONFIG_PATH = ENV_CHARACTERS_DIR / AGENT_CHARACTER / "character.md" if AGENT_CHARACTER else None
# Coba persona.yaml dulu (prioritas utama), kalau tidak ada fallback ke character.md
_persona_yaml_path = ENV_CHARACTERS_DIR / AGENT_CHARACTER / "persona.yaml" if AGENT_CHARACTER else None
if _persona_yaml_path and _persona_yaml_path.is_file():
# Prioritas utama: baca persona.yaml dari character directory
try:
_character_env = yaml.safe_load(_persona_yaml_path.read_text(encoding="utf-8")) or {}
if not isinstance(_character_env, dict):
_character_env = {}
except Exception as _e:
print(f"[config] Warning: gagal load persona.yaml untuk '{AGENT_CHARACTER}': {_e}")
_character_env = {}
elif ENV_CHARACTER_CONFIG_PATH and ENV_CHARACTER_CONFIG_PATH.is_file():
# Fallback: baca character.md (format lama)
_character_env = {}
for line in ENV_CHARACTER_CONFIG_PATH.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
_character_env[key.strip()] = value.strip()
else:
_character_env = None
if _character_env:
_character_overrides = {
"AGENT_SKILL": AGENT_SKILL,
"PERSONA_NAME": PERSONA_NAME,
"PERSONA_AGE": PERSONA_AGE,
"PERSONA_GENDER": PERSONA_GENDER,
"PERSONA_TONE": PERSONA_TONE,
"PERSONA_VERBOSITY": PERSONA_VERBOSITY,
"PERSONA_HUMOR": PERSONA_HUMOR,
"PERSONA_LANGUAGE": PERSONA_LANGUAGE,
"PERSONA_MOOD": PERSONA_MOOD,
"PERSONA_CATCHPHRASES": PERSONA_CATCHPHRASES,
}
# Mapping dari key persona.yaml ke key config
_yaml_to_config_key = {
"AGENT_SKILL": "skill",
"PERSONA_NAME": "name",
"PERSONA_AGE": "age",
"PERSONA_GENDER": "gender",
"PERSONA_TONE": "tone",
"PERSONA_VERBOSITY": "verbosity",
"PERSONA_HUMOR": "humor",
"PERSONA_LANGUAGE": "language",
"PERSONA_MOOD": "mood",
"PERSONA_CATCHPHRASES": "catchphrases",
}
for key, fallback in _character_overrides.items():
yaml_key = _yaml_to_config_key.get(key, key.lower())
raw = _character_env.get(yaml_key, _character_env.get(key, fallback))
value = str(raw).strip() if raw is not None else ""
if key in {"AGENT_SKILL", "PERSONA_TONE", "PERSONA_VERBOSITY", "PERSONA_HUMOR", "PERSONA_LANGUAGE", "PERSONA_MOOD"}:
value = value.lower() or fallback
if key == "PERSONA_NAME" and not value:
value = fallback
_character_overrides[key] = value
for key, value in _character_overrides.items():
globals()[key] = value
os.environ[key] = value

59
default-config.yaml Normal file
View File

@ -0,0 +1,59 @@
# Copy and edit to `config.yaml`
agent:
character: hendrik # Directory name in agent/characters/<character>/
max_iterations: 40 # step
max_tool_output: 40000
llm:
timeout: 3000 # second
providers:
- name : "Ollama Local"
base_url : "http://localhost:11434/v1"
api_key : "ollama"
models :
- name : "granite4.1:8b"
- name : "Transformers API Local"
base_url : "http://localhost:12345/v1"
api_key : "sk-not-needed"
models :
- name : "granite4.1:8b"
- name : "Ollama Cloud"
base_url : "https://ollama.com/v1"
api_key : ""
models :
- name : "ministral-3:14b-cloud"
- name : "gemma4:31b-cloud"
default : true
- name : "OpenRouter"
base_url : "https://openrouter.ai/api/v1"
api_key : ""
models :
- name : "openrouter/owl-alpha"
- name : "nex-agi/nex-n2-pro:free"
- name : "z-ai/glm-5"
rag:
persist_dir: chroma_db # ChromaDB ONNX default (all-MiniLM-L6-v2, local)
session:
db_path: "~/.config/hendrik/sessions.json"
xmpp:
enabled: false
muc_rooms: "" # comma-separated, e.g. "room1@conference.server,room2@conference.server"
nickname: "" # custom MUC nickname (empty = use username)
selective_response: true # true = only response if mentioned/relevant
telegram:
enabled: false
allowed_group_ids: "" # comma-separated, empty = all group
selective_response: true # true = only response if mentioned/relevant
# Humanize Delay (anti-bot detection)
delay:
read_min: 1.0 # second
read_max: 2.0 # second
typing_speed: 15.0 # characters per second
typing_max: 10.0 # max typing delay limit per second

4
hendrik Normal file → Executable file
View File

@ -1,8 +1,8 @@
#!/bin/bash
# hendrik — wrapper to run the TUI agent from anywhere
# wrapper to run the TUI agent from anywhere
# Set HENDRIK_DIR env var to override, or update the default below
DEFAULT_DIR="/home/ambadar-aji/experiment/hendrik"
DEFAULT_DIR="/opt/hendrik"
PROJECT_DIR="${HENDRIK_DIR:-$DEFAULT_DIR}"
if [ ! -d "$PROJECT_DIR/.venv" ]; then

View File

@ -1,27 +1,36 @@
import os, sys
import os, sys, threading, time
import signal
import config
from scripts.llm_client import LLMClient
from tools import coder, rag
from scripts import gadget
from services.xmpp_client import XMPPClient
from scripts.llm_client import LLMClient
from tools import coder, rag, carrack
from scripts import gadget
from scripts.persona import build_system_prompt
# Daftar tools yang tersedia
tools_definition = [
gadget.tools_mapping( schema = coder.schema_read_file, handler = coder.read_file ),
gadget.tools_mapping( schema = coder.schema_write_file, handler = coder.write_file ),
gadget.tools_mapping( schema = coder.schema_edit_file, handler = coder.edit_file ),
gadget.tools_mapping( schema = coder.schema_run_bash, handler = coder.run_bash ),
gadget.tools_mapping( schema = coder.schema_search_code, handler = coder.search_code ),
gadget.tools_mapping( schema = coder.schema_git_operation, handler = coder.git_operation ),
gadget.tools_mapping( schema = rag.schema_store_knowledge, handler = rag.store_knowledge ),
gadget.tools_mapping( schema = rag.schema_search_knowledge, handler = rag.search_knowledge ),
gadget.tools_mapping( schema = coder.schema_read_file, handler = coder.read_file ),
gadget.tools_mapping( schema = coder.schema_write_file, handler = coder.write_file ),
gadget.tools_mapping( schema = coder.schema_edit_file, handler = coder.edit_file ),
gadget.tools_mapping( schema = coder.schema_run_bash, handler = coder.run_bash ),
gadget.tools_mapping( schema = coder.schema_search_code, handler = coder.search_code ),
gadget.tools_mapping( schema = coder.schema_git_operation, handler = coder.git_operation ),
gadget.tools_mapping( schema = rag.schema_ingest_files, handler = rag.ingest_files ),
gadget.tools_mapping( schema = rag.schema_store_knowledge, handler = rag.store_knowledge ),
gadget.tools_mapping( schema = rag.schema_search_knowledge, handler = rag.search_knowledge ),
gadget.tools_mapping( schema = rag.schema_create_collection, handler = rag.create_collection ),
gadget.tools_mapping( schema = rag.schema_delete_collection, handler = rag.delete_collection ),
gadget.tools_mapping( schema = rag.schema_list_collections, handler = rag.list_collections ),
gadget.tools_mapping( schema = rag.schema_inspect_collection, handler = rag.inspect_collection ),
gadget.tools_mapping( schema = carrack.schema_sendhttprequest, handler = carrack.sendhttprequest ),
]
# Ekstrak dari tools_definition ke dua format berbeda
TOOLS = gadget.tool_schemas (tools_definition)
TOOL_HANDLERS = gadget.tool_handlers (tools_definition)
@ -29,9 +38,8 @@ TOOL_HANDLERS = gadget.tool_handlers (tools_definition)
def main():
llm_client = LLMClient(config.llm_baseurl, config.llm_model, config.llm_api_key, config.llm_timeout)
# Parsing arguments `-w <dir>` atau `--workspace <dir>`
workspace = None
i = 1
workspace = None
i = 1
while i < len(sys.argv):
if sys.argv[i] in ('-w', '--workspace') and i + 1 < len(sys.argv):
workspace = sys.argv[i + 1]
@ -39,7 +47,6 @@ def main():
else:
i += 1
# Apply workspace jika ada
if workspace:
resolved = os.path.abspath(workspace)
if not os.path.isdir(resolved):
@ -47,25 +54,69 @@ def main():
sys.exit(1)
os.chdir(resolved)
if config.XMPP_ENABLED:
from services.xmpp_client import XMPPClient
services = []
if config.XMPP_ENABLED:
muc_rooms = []
if config.XMPP_MUC_ROOMS.strip():
muc_rooms = [r.strip() for r in config.XMPP_MUC_ROOMS.split(',') if r.strip()]
client = XMPPClient(
jid = config.XMPP_USERNAME,
password = config.XMPP_PASSWORD,
llm_client = llm_client,
tools_definition = tools_definition,
TOOLS = TOOLS,
TOOL_HANDLERS = TOOL_HANDLERS,
build_system_prompt = gadget.build_system_prompt,
agent_max_iterations= config.AGENT_MAX_ITERATIONS,
muc_rooms = muc_rooms,
jid = config.XMPP_USERNAME,
password = config.XMPP_PASSWORD,
llm_client = llm_client,
tools_definition = tools_definition,
TOOLS = TOOLS,
TOOL_HANDLERS = TOOL_HANDLERS,
build_system_prompt = build_system_prompt,
agent_max_iterations = config.AGENT_MAX_ITERATIONS,
muc_rooms = muc_rooms,
)
client.start() # blocking, headless
services.append(client)
if config.TELEGRAM_ENABLED:
from services.telegram_client import TelegramClient
allowed_ids = []
if config.TELEGRAM_ALLOWED_GROUP_IDS.strip():
allowed_ids = [r.strip() for r in config.TELEGRAM_ALLOWED_GROUP_IDS.split(',') if r.strip()]
tg = TelegramClient(
token = config.TELEGRAM_TOKEN,
llm_client = llm_client,
tools_definition = tools_definition,
TOOLS = TOOLS,
TOOL_HANDLERS = TOOL_HANDLERS,
build_system_prompt = build_system_prompt,
agent_max_iterations = config.AGENT_MAX_ITERATIONS,
allowed_group_ids = allowed_ids,
)
services.append(tg)
if services:
threads = []
for svc in services:
t = threading.Thread(target=svc.start, daemon=True)
t.start()
threads.append(t)
_shutdown = False
def _handle_sig(signum, frame):
nonlocal _shutdown
print("\nShutting down...")
for svc in services:
svc.stop()
_shutdown = True
signal.signal(signal.SIGTERM, _handle_sig)
signal.signal(signal.SIGINT, _handle_sig)
try:
while not _shutdown:
time.sleep(1)
except KeyboardInterrupt:
_shutdown = True
print("Exiting.")
else:
from tui import HendrikTUI
HendrikTUI(
@ -73,7 +124,7 @@ def main():
tools_definition = tools_definition,
TOOLS = TOOLS,
TOOL_HANDLERS = TOOL_HANDLERS,
build_system_prompt = gadget.build_system_prompt,
build_system_prompt = build_system_prompt,
agent_max_iterations = config.AGENT_MAX_ITERATIONS,
).run()

View File

@ -1,3 +1,7 @@
python-dotenv>=1.0.0
PyYAML>=6.0
chromadb>=0.5.0
openpyxl>=3.1.0
slixmpp
python-telegram-bot>=20.0
tinydb>=4.8.0

View File

@ -1,49 +1,12 @@
import os
from .persona import build_system_prompt
def tools_mapping(schema, handler, name=None):
tool_name = name or schema["function"]["name"]
return {"name": tool_name, "schema": schema, "handler": handler}
def tool_schemas(tools_definition):
return [t["schema"] for t in tools_definition]
def tool_handlers(tools_definition):
return {t["name"]: t["handler"] for t in tools_definition}
def build_system_prompt(tools_definition):
lines = [
"You are a coding agent that assists with software engineering tasks. "
"You have access to the following tools:",
""
]
for i, tool in enumerate(tools_definition, 1):
name = tool["name"]
desc = tool["schema"]["function"]["description"]
lines.append(f"{i}. {name}: {desc}")
lines.extend([
"",
"Use tools by returning tool calls when needed. After receiving tool "
"results, continue your reasoning. When you have the final answer, "
"return it as plain text without tool calls.",
"",
f"Your workspace directory is: {os.getcwd()}. "
"All file operations are relative to this directory.",
"",
"RAG capabilities (knowledge retrieval):",
"- list_collections → see available collections & doc counts.",
"- create_collection → create a new collection for a new topic.",
"- delete_collection → permanently remove a collection and its data.",
"- inspect_collection → learn metadata fields before searching.",
"- search_knowledge → semantic search + optional metadata filter.",
"- store_knowledge → save docs with rich metadata for later use.",
"",
"You can create collections yourself! When you encounter a new topic,",
"use create_collection first, then store_knowledge to populate it.",
"Always inspect_collection to discover metadata keys before filtering."
])
return "\n".join(lines)

View File

@ -1,12 +1,51 @@
import json
import re
import urllib.request
import urllib.error
def _strip_thinking(text: str) -> str:
"""
Hapus semua bentuk thinking/reasoning dari response text.
Handles:
- <think>...</think> blocks (any case)
- <reasoning>...</reasoning> blocks
- "Thinking:" / "Reasoning:" inline prefixes
"""
if not text:
return text
# Strip XML-style thinking blocks (case-insensitive, DOTALL for multiline)
text = re.sub(r'<think[^>]*>.*?</think>', '', text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<reasoning[^>]*>.*?</reasoning>', '', text, flags=re.DOTALL | re.IGNORECASE)
# Strip lines starting with Thinking: / Reasoning: / Let me think...
lines = text.splitlines()
cleaned = []
skip_block = False
for line in lines:
stripped = line.strip().lower()
if stripped.startswith(('thinking:', 'reasoning:', 'let me thought', 'let me think')):
skip_block = True
continue
if skip_block and not stripped:
skip_block = False
continue
if not skip_block:
cleaned.append(line)
result = '\n'.join(cleaned).strip()
return result
class LLMClient:
class Message:
def __init__(self, msg):
self.content = msg.get('content', '')
raw_content = msg.get('content', '')
# Auto-strip thinking dari content
self.content = _strip_thinking(raw_content) if isinstance(raw_content, str) else raw_content
self.tool_calls = msg.get('tool_calls', None)
self.warning = None
def __init__(self, base_url, model, api_key, timeout=600):
self.base_url = base_url.rstrip('/')
@ -24,6 +63,10 @@ class LLMClient:
payload["tools"] = tools
payload["tool_choice"] = "auto"
# Disable reasoning/thinking di level API bila didukung
# OpenRouter & beberapa provider support ini
payload["reasoning"] = {"enabled": False}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Content-Type', 'application/json')
@ -31,10 +74,69 @@ class LLMClient:
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
response = json.loads(resp.read().decode('utf-8'))
message = response['choices'][0]['message']
return self.Message(message)
raw = resp.read().decode('utf-8')
response = json.loads(raw)
except urllib.error.HTTPError as e:
return self.Message({'content': f"HTTP Error: {e.code} {e.reason}", 'tool_calls': None})
body_text = ""
try:
body_text = e.read().decode('utf-8', errors='replace')
except Exception:
pass
if tools and e.code == 404:
try:
body = json.loads(body_text) if body_text else {}
if 'tool use' in body.get('error', {}).get('message', '').lower():
result = self.chat(messages, tools=None)
result.warning = "Tool calling not supported by this model. Running in chat-only mode."
return result
except Exception:
pass
detail = f" - {body_text[:500]}" if body_text else ""
return self.Message({'content': f"HTTP Error: {e.code} {e.reason}{detail}", 'tool_calls': None})
except Exception as e:
return self.Message({'content': f"Error: {str(e)}", 'tool_calls': None})
if 'choices' not in response:
raw_preview = json.dumps(response)[:500]
return self.Message({
'content': (
f"Error: Unexpected response — 'choices' key missing.\n"
f" URL : {url}\n"
f" Model : {self.model}\n"
f" Response: {raw_preview}"
),
'tool_calls': None
})
if not response['choices']:
raw_preview = json.dumps(response)[:500]
return self.Message({
'content': (
f"Error: 'choices' is empty in the response.\n"
f" URL : {url}\n"
f" Model : {self.model}\n"
f" Response: {raw_preview}"
),
'tool_calls': None
})
if 'message' not in response['choices'][0]:
raw_preview = json.dumps(response['choices'][0])[:500]
return self.Message({
'content': (
f"Error: 'message' key missing in first choice.\n"
f" URL : {url}\n"
f" Model : {self.model}\n"
f" Choice : {raw_preview}"
),
'tool_calls': None
})
message = response['choices'][0]['message']
# Handle reasoning_content field dari OpenRouter/models yang support thinking
# Pindahkan ke content jangan sampai keluar
reasoning_content = message.pop('reasoning_content', None)
reasoning_field = message.pop('reasoning', None)
# Jangan inject reasoning ke content — buang saja
# (kita sudah strip via _strip_thinking di Message.__init__)
return self.Message(message)

344
scripts/persona.py Normal file
View File

@ -0,0 +1,344 @@
"""
Persona & System Prompt Builder
Arsitektur:
1. Base System Prompt instruksi inti (tools, RAG, response format)
2. Env Character persona (identity, communication style, description)
policies (git policy, safety rules)
3. Skills role-specific instructions (programmer, roleplayer, analyst)
Load order: Base Character Policies Skills
"""
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
import yaml
# ─── Paths ────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent.parent / "agent"
BASE_PROMPT_PATH = BASE_DIR / "base-system-prompt.md"
ENV_CHARACTERS_DIR = BASE_DIR / "characters"
SKILLS_DIR = BASE_DIR / "skills"
# ─── Mode / Skill ──────────────────────────────────────────────────────────────
SKILL = os.getenv("AGENT_SKILL", default="programmer").strip().lower()
# ─── Personality Configuration ────────────────────────────────────────────────
@dataclass
class PersonalityConfig:
"""Konfigurasi personality AI yang berlaku lintas mode."""
name: str = "OWL"
age: str = ""
gender: str = ""
tone: str = "casual"
verbosity: str = "balanced"
humor_level: str = "light"
language: str = "id"
mood: str = "cheerful"
catchphrases: list = field(default_factory=list)
def _load_personality_from_env() -> PersonalityConfig:
"""Baca personality config dari environment variables."""
raw_catchphrases = os.getenv("PERSONA_CATCHPHRASES", default="").strip()
catchphrases = [c.strip() for c in raw_catchphrases.split(",") if c.strip()] if raw_catchphrases else []
return PersonalityConfig(
name=os.getenv("PERSONA_NAME", default="OWL").strip() or "OWL",
age=os.getenv("PERSONA_AGE", default="").strip(),
gender=os.getenv("PERSONA_GENDER", default="").strip(),
tone=os.getenv("PERSONA_TONE", default="casual").strip().lower() or "casual",
verbosity=os.getenv("PERSONA_VERBOSITY", default="balanced").strip().lower() or "balanced",
humor_level=os.getenv("PERSONA_HUMOR", default="light").strip().lower() or "light",
language=os.getenv("PERSONA_LANGUAGE", default="id").strip().lower() or "id",
mood=os.getenv("PERSONA_MOOD", default="cheerful").strip().lower() or "cheerful",
catchphrases=catchphrases,
)
PERSONALITY = _load_personality_from_env()
# ─── Markdown Parser ───────────────────────────────────────────────────────────
def _parse_simple_kv(filepath: Path) -> dict:
"""
Parse file markdown dengan format sederhana:
# Section
- **Key:** Value
- Key: Value
Returns dict { key: value }.
"""
result = {}
if not filepath.exists():
return result
for line in filepath.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
# Format: "- **Key:** Value" atau "- Key: Value"
cleaned = re.sub(r'^-\s*', '', line)
cleaned = re.sub(r'\*\*', '', cleaned)
if ':' in cleaned:
key, value = cleaned.split(':', 1)
result[key.strip().lower()] = value.strip()
return result
def _read_markdown_section(filepath: Path) -> str:
"""Baca seluruh isi file markdown, stripping frontmatter jika ada."""
if not filepath.exists():
return ""
content = filepath.read_text(encoding="utf-8")
# Strip leading --- frontmatter blocks
content = re.sub(r'^---\s*\n.*?\n---\s*\n', '', content, flags=re.DOTALL)
# Strip leading # title if present (first line only)
lines = content.strip().splitlines()
if lines and lines[0].startswith('# '):
lines = lines[1:]
return '\n'.join(lines).strip()
# ─── Prompt Builders ───────────────────────────────────────────────────────────
def _build_personality_block(cfg: PersonalityConfig) -> str:
"""Generate deskripsi personality dari config."""
parts = [f"You are {cfg.name}."]
if cfg.age:
parts.append(f"Your persona age is {cfg.age} years old.")
if cfg.gender:
parts.append(f"Your persona gender is {cfg.gender}.")
tone_map = {
"casual": "You speak in a casual, relaxed manner — like chatting with a friend.",
"formal": "You speak formally and professionally, using polite language.",
"playful": "You are playful and cheerful, making conversations fun and lighthearted.",
"warm": "You are warm and friendly, making people feel comfortable and welcomed.",
"sweet": "You speak in a sweet, gentle, and caring manner — soft and endearing.",
}
parts.append(tone_map.get(cfg.tone, tone_map["casual"]))
verbosity_map = {
"concise": "Keep your answers short and to the point.",
"balanced": "Provide balanced answers — not too brief, not too long.",
"detailed": "Give thorough, detailed answers with explanations.",
}
parts.append(verbosity_map.get(cfg.verbosity, verbosity_map["balanced"]))
humor_map = {
"none": "Stay serious and avoid jokes.",
"light": "Occasionally sprinkle in light humor when appropriate.",
"witty": "Be witty and humorous — jokes, puns, and playful banter are welcome.",
}
parts.append(humor_map.get(cfg.humor_level, humor_map["light"]))
if cfg.language == "id":
parts.append("Always respond in Indonesian (Bahasa Indonesia).")
elif cfg.language == "en":
parts.append("Always respond in English.")
else:
parts.append("Respond in the same language the user uses.")
mood_map = {
"cheerful": "Your overall mood is cheerful and positive.",
"calm": "Your overall mood is calm and soothing.",
"energetic": "Your overall mood is energetic and enthusiastic.",
"sarcastic": "Your overall mood is sarcastic and dry-humored.",
}
parts.append(mood_map.get(cfg.mood, mood_map["cheerful"]))
if cfg.catchphrases:
phrases = ", ".join(f'"{p}"' for p in cfg.catchphrases)
parts.append(f"You sometimes use these catchphrases: {phrases}.")
return "\n".join(parts)
def _build_tools_block(tools_definition: list[dict]) -> str:
"""Generate daftar tools dari tools_definition."""
lines = [
"You have access to the following tools:",
"",
]
for i, tool in enumerate(tools_definition, 1):
name = tool["name"]
desc = tool["schema"]["function"]["description"]
lines.append(f"{i}. {name}: {desc}")
lines.append("")
lines.append(
"Use tools by returning tool calls when needed. After receiving tool "
"results, continue your reasoning. When you have the final answer, "
"return it as plain text without tool calls."
)
return "\n".join(lines)
def _load_env_character(character_name: str) -> tuple[str, str]:
"""
Load character description dan policies dari directory env-characters.
Returns:
(character_block, policies_text)
"""
char_dir = ENV_CHARACTERS_DIR / character_name
if not char_dir.is_dir():
return "", ""
# Read character.md — derive personality from config (already loaded)
character_block = "" # personality block tetap dari PersonalityConfig
# Read policies.md
policies_text = _read_markdown_section(char_dir / "policies.md")
return character_block, policies_text
def _load_skills(skill_names: list[str]) -> str:
"""
Load dan gabungkan skill instructions.
Args:
skill_names: List nama skill aktif, e.g. ["programmer"]
Returns:
Gabungan skill instructions sebagai string.
"""
sections = []
for skill_name in skill_names:
skill_path = SKILLS_DIR / skill_name / "instructions.md"
content = _read_markdown_section(skill_path)
if content:
sections.append(content)
return "\n\n".join(sections)
# ─── Public API ────────────────────────────────────────────────────────────────
def build_system_prompt(
tools_definition: list[dict] | None = None,
skill: str | None = None,
personality: PersonalityConfig | None = None,
character: str | None = None,
skills: list[str] | None = None,
) -> str:
"""
Build system prompt berdasarkan skill, character, dan skills.
Load order:
1. Base prompt
2. Personality block (dari config / persona.yaml character)
3. Policies (dari agent/characters/<name>/policies.md)
4. Skill instructions (dari skills/<name>/instructions.md)
Args:
tools_definition: Daftar tools (required untuk skill programmer).
skill: "programmer" atau "roleplayer". Default: dari env AGENT_SKILL.
personality: PersonalityConfig instance. Default: global PERSONALITY.
character: Nama env character. Default: dari env AGENT_CHARACTER.
skills: List nama skill aktif. Default: derives dari env AGENT_SKILLS.
Returns:
String system prompt lengkap.
"""
selected_skill = (skill or "").strip().lower()
cfg = personality or PERSONALITY
# Resolve character name
character_name = (character or os.getenv("AGENT_CHARACTER", default="")).strip().lower()
# ── Load persona.yaml dari character directory ─────────────────────────────────
if character_name:
char_dir = ENV_CHARACTERS_DIR / character_name
persona_yaml_path = char_dir / "persona.yaml"
if persona_yaml_path.is_file():
try:
with open(persona_yaml_path, "r", encoding="utf-8") as f:
_persona_data = yaml.safe_load(f) or {}
if isinstance(_persona_data, dict):
if _persona_data.get("name"):
cfg.name = _persona_data["name"]
if _persona_data.get("age"):
cfg.age = str(_persona_data["age"])
if _persona_data.get("gender"):
cfg.gender = _persona_data["gender"]
if _persona_data.get("tone"):
cfg.tone = _persona_data["tone"]
if _persona_data.get("verbosity"):
cfg.verbosity = _persona_data["verbosity"]
if _persona_data.get("humor"):
cfg.humor_level = _persona_data["humor"]
if _persona_data.get("language"):
cfg.language = _persona_data["language"]
if _persona_data.get("mood"):
cfg.mood = _persona_data["mood"]
# Skill wajib dari persona.yaml
_skill_from_yaml = _persona_data.get("skill") or _persona_data.get("mode")
if _skill_from_yaml:
selected_skill = _skill_from_yaml.strip().lower()
else:
selected_skill = ""
except Exception as e:
print(f"[persona] Warning: gagal load persona.yaml untuk '{character_name}': {e}")
# Resolve skills list
# Priority: explicit skills param > persona.yaml skill > AGENT_SKILL env > AGENT_SKILLS env > selected_skill
if skills is not None:
skills_list = skills
elif selected_skill != SKILL:
# persona.yaml meng-override skill → pakai skill dari persona.yaml
skills_list = [selected_skill] if selected_skill in ("programmer", "roleplayer", "analyst") else []
else:
skills_env = os.getenv("AGENT_SKILLS", default="").strip()
if skills_env:
skills_list = [s.strip() for s in skills_env.split(",") if s.strip()]
else:
skills_list = [selected_skill] if selected_skill in ("programmer", "roleplayer", "analyst") else []
# ── 1. Base prompt ──────────────────────────────────────────────────────────
base_prompt = ""
if BASE_PROMPT_PATH.exists():
base_prompt = _read_markdown_section(BASE_PROMPT_PATH)
# ── 2. Personality block ────────────────────────────────────────────────────
personality_block = _build_personality_block(cfg)
# ── 3. Tools block (hanya untuk skill yang butuh tools) ─────────────────────
needs_tools = any(s in ("programmer", "analyst") for s in skills_list)
tools_block = ""
if needs_tools and tools_definition is not None:
tools_block = _build_tools_block(tools_definition)
# ── 4. Policies ──────────────────────────────────────────────────────────────
policies_block = ""
if character_name:
_, policies_block = _load_env_character(character_name)
# ── 5. Skills ────────────────────────────────────────────────────────────────
skills_block = _load_skills(skills_list)
# ── Assemble ─────────────────────────────────────────────────────────────────
sections = [
base_prompt,
personality_block,
tools_block,
policies_block,
skills_block,
]
# Filter empty sections dan gabungkan
return "\n\n".join(s for s in sections if s.strip())

68
services/agent_loop.py Normal file
View File

@ -0,0 +1,68 @@
import json
from datetime import datetime
def _ts():
return datetime.now().strftime('%H:%M:%S')
def execute_tool(tool_call, TOOL_HANDLERS):
tname = tool_call['function']['name']
targs = json.loads(tool_call['function']['arguments'])
handler = TOOL_HANDLERS.get(tname)
if not handler:
return f'Tool {tname} not found'
try:
if tname == 'search_code':
return handler(
pattern=targs['pattern'],
search_type=targs['search_type'],
path=targs.get('path', '.'),
)
elif tname == 'git_operation':
return handler(args=targs['args'])
else:
return handler(**targs)
except Exception as e:
return f'Error executing tool: {str(e)}'
def run_agent_loop(session, llm_client, TOOLS, TOOL_HANDLERS, max_iterations, on_tool_calls=None):
for step in range(max_iterations):
print(f'[{_ts()}] Step {step + 1} — calling LLM...')
response = llm_client.chat(session.messages, tools=TOOLS)
if response.tool_calls:
amsg = {
'role': 'assistant',
'content': response.content,
'tool_calls': response.tool_calls,
}
session.messages.append(amsg)
tnames = [tc['function']['name'] for tc in response.tool_calls]
print(f'[{_ts()}] Using tools: {", ".join(tnames)}')
if on_tool_calls:
on_tool_calls(tnames)
for tc in response.tool_calls:
result = execute_tool(tc, TOOL_HANDLERS)
session.messages.append({
'role': 'tool',
'tool_call_id': tc['id'],
'content': str(result),
})
else:
if response.content:
print(f'[{_ts()}] Response generated ({len(response.content)} chars)')
session.messages.append({'role': 'assistant', 'content': response.content})
return response.content
return None
print(f'[{_ts()}] Max iterations ({max_iterations}) reached')
session.messages.append({
'role': 'assistant',
'content': 'Max iterations reached without final answer.',
})
return None

View File

@ -0,0 +1,133 @@
from __future__ import annotations
import os
import re
import uuid
from datetime import datetime, timezone
from dataclasses import dataclass, field
from typing import Optional
from tinydb import TinyDB, Query
import config
DB_DIR = os.path.dirname(config.SESSION_DB_PATH)
os.makedirs(DB_DIR, exist_ok=True)
@dataclass
class NeoSession:
session_id: str
name: str
created_at: str
updated_at: str
model_info: dict
messages: list = field(default_factory=list)
doc_id: Optional[int] = None
class NeoSessionManager:
def __init__(self, db_path: str = config.SESSION_DB_PATH):
self._db = TinyDB(db_path)
self._table = self._db.table("sessions")
def create(self, name: str, model_info: dict) -> NeoSession:
now = datetime.now(timezone.utc).isoformat()
doc = {
"session_id": str(uuid.uuid4()),
"name": name,
"created_at": now,
"updated_at": now,
"model_info": model_info,
"messages": [],
}
doc_id = self._table.insert(doc)
return self._doc_to_session(self._table.get(doc_id=doc_id))
def list(self) -> list[dict]:
results = []
for doc in self._table.all():
results.append({
"doc_id": doc.doc_id,
"session_id": doc["session_id"],
"name": doc["name"],
"created_at": doc["created_at"],
"updated_at": doc["updated_at"],
"model_info": doc["model_info"],
"message_count": len(doc["messages"]),
})
return sorted(results, key=lambda x: x["updated_at"], reverse=True)
def get(self, doc_id: int) -> Optional[NeoSession]:
doc = self._table.get(doc_id=doc_id)
if doc is None:
return None
return self._doc_to_session(doc)
def rename(self, doc_id: int, new_name: str) -> bool:
if not new_name.strip():
return False
return self._table.update({"name": new_name.strip()}, doc_ids=[doc_id])
def delete(self, doc_id: int) -> bool:
return len(self._table.remove(doc_ids=[doc_id])) > 0
def search(self, query: str) -> list[dict]:
q = Query()
results = []
for doc in self._table.search(q.name.search(query, flags=re.IGNORECASE)):
results.append({
"doc_id": doc.doc_id,
"session_id": doc["session_id"],
"name": doc["name"],
"created_at": doc["created_at"],
"updated_at": doc["updated_at"],
"model_info": doc["model_info"],
"message_count": len(doc["messages"]),
})
return sorted(results, key=lambda x: x["updated_at"], reverse=True)
def search_messages(self, query: str) -> list[dict]:
q = Query()
results = []
for doc in self._table.all():
for msg in doc["messages"]:
content = msg.get("content", "")
if query.lower() in content.lower():
results.append({
"doc_id": doc.doc_id,
"session_name": doc["name"],
"role": msg.get("role"),
"content": content,
"session_id": doc["session_id"],
})
break
return results
def add_message(self, doc_id: int, role: str, content: str, **kwargs) -> bool:
doc = self._table.get(doc_id=doc_id)
if doc is None:
return False
msg = {"role": role, "content": content}
msg.update(kwargs)
messages = doc["messages"]
messages.append(msg)
now = datetime.now(timezone.utc).isoformat()
return self._table.update(
{"messages": messages, "updated_at": now},
doc_ids=[doc_id],
)
def update_model_info(self, doc_id: int, model_info: dict) -> bool:
return self._table.update({"model_info": model_info}, doc_ids=[doc_id])
@staticmethod
def _doc_to_session(doc) -> NeoSession:
return NeoSession(
doc_id=doc.doc_id,
session_id=doc["session_id"],
name=doc["name"],
created_at=doc["created_at"],
updated_at=doc["updated_at"],
model_info=doc["model_info"],
messages=doc.get("messages", []),
)

275
services/telegram_client.py Normal file
View File

@ -0,0 +1,275 @@
import asyncio
import random
import re
import threading
import time
from datetime import datetime
import config
from services.session_manager import SessionManager
from services.agent_loop import run_agent_loop
from scripts.persona import PERSONALITY
from tools.roleplayer import _name_mentioned
def _ts():
return datetime.now().strftime('%H:%M:%S')
def _strip_bot_mention(text: str, bot_username: str) -> str:
if not bot_username:
return text
pattern = re.compile(r'@' + re.escape(bot_username), re.IGNORECASE)
return pattern.sub('', text).strip()
class TelegramClient:
def __init__(self, token, llm_client, tools_definition, TOOLS,
TOOL_HANDLERS, build_system_prompt, agent_max_iterations,
allowed_group_ids=None):
self._token = token
self._llm = llm_client
self._tools_def = tools_definition
self._TOOLS = TOOLS
self._TOOL_HANDLERS = TOOL_HANDLERS
self._build_system_prompt = build_system_prompt
self._max_iterations = agent_max_iterations
self._allowed_group_ids = allowed_group_ids or []
self._skill = config.AGENT_SKILL
self._bot_username = ""
self._session_mgr = SessionManager()
self._loop = None
self._stopped = None
from telegram.ext import Application
self._app = Application.builder().token(token).build()
def start(self):
print(f'[{_ts()}] Starting Telegram service...')
asyncio.run(self._async_run())
def stop(self):
if self._loop and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop)
async def _async_stop(self):
self._stopped.set()
async def _async_run(self):
self._loop = asyncio.get_running_loop()
self._stopped = asyncio.Event()
bot_user = await self._app.bot.get_me()
self._bot_username = bot_user.username or ""
print(f'[{_ts()}] Telegram bot: @{self._bot_username}')
self._register_handlers()
await self._app.initialize()
await self._app.start()
await self._app.updater.start_polling()
print(f'[{_ts()}] Telegram bot is polling')
try:
await self._stopped.wait()
except (asyncio.CancelledError, KeyboardInterrupt):
pass
await self._app.updater.stop()
await self._app.stop()
await self._app.shutdown()
print(f'[{_ts()}] Telegram service stopped')
def _register_handlers(self):
from telegram.ext import MessageHandler, filters, CommandHandler
self._app.add_handler(
MessageHandler(
filters.TEXT & ~filters.COMMAND & filters.ChatType.PRIVATE,
self._on_private_message
)
)
self._app.add_handler(
MessageHandler(
filters.TEXT & ~filters.COMMAND & filters.ChatType.GROUPS,
self._on_group_message
)
)
self._app.add_handler(CommandHandler('start', self._on_start_command))
self._app.add_handler(CommandHandler('new', self._on_new_command))
async def _on_start_command(self, update, context):
await update.message.reply_text(
'Halo! Aku adalah asisten AI. Kirim pesan untuk memulai percakapan.'
)
async def _on_new_command(self, update, context):
chat_id = str(update.effective_chat.id)
self._session_mgr.reset(chat_id)
await update.message.reply_text('Memulai sesi baru. Ada yang bisa dibantu?')
async def _on_private_message(self, update, context):
chat_id = update.effective_chat.id
text = update.message.text.strip()
if not text:
return
msg_id = update.message.message_id
print(f'[{_ts()}] Telegram DM from {chat_id}: {text[:60]}')
threading.Thread(
target=self._process_message,
args=(chat_id, text, 'private', '', msg_id),
daemon=True
).start()
async def _on_group_message(self, update, context):
chat_id = update.effective_chat.id
chat_id_str = str(chat_id)
if self._allowed_group_ids and chat_id_str not in self._allowed_group_ids:
return
text = update.message.text.strip()
if not text:
return
replied_to_bot = (
update.message.reply_to_message
and update.message.reply_to_message.from_user
and update.message.reply_to_message.from_user.id == context.bot.id
)
has_mention = False
if update.message.entities:
for ent in update.message.entities:
if ent.type == 'mention' and self._bot_username:
start = ent.offset
end = ent.offset + ent.length
mention_text = update.message.text[start:end]
if mention_text.lower() == f'@{self._bot_username.lower()}':
has_mention = True
break
elif ent.type == 'text_mention' and ent.user.id == context.bot.id:
has_mention = True
break
name_mentioned = _name_mentioned(PERSONALITY.name, text)
if not replied_to_bot and not has_mention and not name_mentioned:
print(f'[{_ts()}] Telegram Group [{chat_id}] NO-REPLY: {text[:60]}')
return
if has_mention and self._bot_username:
text = _strip_bot_mention(text, self._bot_username)
if not text:
return
msg_id = update.message.message_id
sender = update.effective_user
sender_name = sender.full_name or sender.username or str(sender.id) if sender else str(chat_id)
print(f'[{_ts()}] Telegram Group [{chat_id}] <{sender_name}>: {text[:60]}')
threading.Thread(
target=self._process_message,
args=(chat_id, text, 'group', sender_name, msg_id),
daemon=True
).start()
def _process_message(self, chat_id, body, chat_type, sender_name, reply_to_msg_id):
session = self._session_mgr.get_or_create(
str(chat_id), self._build_system_prompt(
tools_definition=self._tools_def,
character=config.AGENT_CHARACTER or None,
skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None,
)
)
session.cancel_timer()
if body in (':new', '/new'):
self._session_mgr.reset(str(chat_id))
print(f'[{_ts()}] Session reset for {chat_id}')
self._schedule_send(chat_id, 'Memulai sesi baru. Ada yang bisa dibantu?', reply_to_msg_id)
return
session.add_message('user', body)
is_roleplay = self._skill == 'roleplayer'
asyncio.run_coroutine_threadsafe(
self._app.bot.send_chat_action(chat_id=chat_id, action='typing'),
self._loop
)
delay = random.uniform(config.READ_DELAY_MIN, config.READ_DELAY_MAX)
time.sleep(delay)
def on_tool_calls(tnames):
info = f'Using: {", ".join(tnames)}'
print(f'[{_ts()}] {info}')
if not is_roleplay:
self._schedule_send(chat_id, info, reply_to_msg_id)
final_content = run_agent_loop(
session, self._llm, self._TOOLS, self._TOOL_HANDLERS,
self._max_iterations, on_tool_calls=on_tool_calls
)
if final_content is not None:
if is_roleplay:
my_name = PERSONALITY.name
if config.TELEGRAM_SELECTIVE_RESPONSE:
recent_msgs = []
for msg in session.messages[-6:]:
if msg.get('role') == 'user':
recent_msgs.append(f"User: {msg.get('content', '')}")
elif msg.get('role') == 'assistant' and msg.get('content'):
recent_msgs.append(f"{my_name}: {msg.get('content', '')}")
recent_history = "\n".join(recent_msgs)
from tools.roleplayer import should_respond
if should_respond(
message=body,
sender_nickname=sender_name or str(chat_id),
recent_history=recent_history,
my_name=my_name,
):
print(f'[{_ts()}] need_response=True → sending response')
self._schedule_send(chat_id, final_content, reply_to_msg_id)
else:
print(f'[{_ts()}] need_response=False → staying silent')
else:
from tools.roleplayer import _name_mentioned
if _name_mentioned(my_name, body):
print(f'[{_ts()}] Name mentioned → sending response')
self._schedule_send(chat_id, final_content, reply_to_msg_id)
else:
print(f'[{_ts()}] Name not mentioned → staying silent')
else:
self._schedule_send(chat_id, final_content, reply_to_msg_id)
else:
msg = 'Max iterations reached without final answer.'
self._schedule_send(chat_id, msg, reply_to_msg_id)
timeout = 86400 if chat_type == 'private' else 300
session.start_timer(timeout, self._timeout_session, chat_id)
def _schedule_send(self, chat_id, text, reply_to_msg_id=None):
if self._loop and not self._loop.is_closed():
char_count = len(text) if text else 0
sleep_delay = max(1.0, min(char_count / config.TYPING_SPEED, config.TYPING_MAX))
print(f'[{_ts()}] Typing delay: {sleep_delay:.1f}s ({char_count} chars)')
time.sleep(sleep_delay)
asyncio.run_coroutine_threadsafe(
self._app.bot.send_message(
chat_id=chat_id,
text=text,
reply_to_message_id=reply_to_msg_id
),
self._loop
)
else:
print(f'[{_ts()}] WARNING: cannot send to {chat_id} — loop unavailable')
def _timeout_session(self, chat_id):
print(f'[{_ts()}] Session timeout: {chat_id}')
self._schedule_send(chat_id, 'Sesi ditutup. Sampai jumpa')
self._session_mgr.reset(str(chat_id))

View File

@ -1,17 +1,42 @@
import asyncio
import json
import random
import signal
import threading
from datetime import datetime
from slixmpp import ClientXMPP
from services.session_manager import SessionManager
from services.agent_loop import run_agent_loop
import config
from tools.roleplayer import should_respond
from scripts.persona import PERSONALITY
# Anti-ban: delay constants for MUC rejoin behavior
MUC_REJOIN_INITIAL_DELAY = 5.0 # detik, delay awal sebelum rejoin
MUC_REJOIN_BACKOFF_MULT = 2.0 # multiplier exponential backoff
MUC_REJOIN_MAX_DELAY = 300.0 # detik, batas max backoff (5 menit)
MUC_REJOIN_COOLDOWN = 10.0 # detik, cooldown minimum antar rejoin attempt
MUC_NICK_SUFFIX_MAX = 3 # max coba nick alternatif (anti-ban: jangan terlalu banyak)
def _ts():
return datetime.now().strftime('%H:%M:%S')
def _typing_delay(text: str) -> float:
"""Hitung delay mengetik (detik) proporsional dengan panjang teks."""
char_count = len(text) if text else 0
delay = char_count / config.TYPING_SPEED
return max(1.0, min(delay, config.TYPING_MAX))
async def _read_delay():
"""Delay simulasi membaca pesan user."""
delay = random.uniform(config.READ_DELAY_MIN, config.READ_DELAY_MAX)
await asyncio.sleep(delay)
class XMPPClient(ClientXMPP):
def __init__(self, jid, password, llm_client, tools_definition, TOOLS,
TOOL_HANDLERS, build_system_prompt, agent_max_iterations,
@ -24,14 +49,22 @@ class XMPPClient(ClientXMPP):
self._TOOL_HANDLERS = TOOL_HANDLERS
self._build_system_prompt = build_system_prompt
self._max_iterations = agent_max_iterations
self._skill = config.AGENT_SKILL
self._muc_rooms = muc_rooms or []
self._muc_nick = jid.split('@')[0]
# Custom nick dari config, fallback ke username JID
self._muc_nick = config.XMPP_NICKNAME.strip() or jid.split('@')[0]
self._muc_nick_suffix = 0 # counter untuk nick alternatif saat 409
self._muc_ready: set[str] = set()
self._session_mgr = SessionManager()
self._loop = None
self._stopped: asyncio.Event | None = None
# Anti-ban: MUC rejoin tracking per room
self._muc_rejoin_attempts: dict[str, int] = {} # room -> jumlah attempt
self._muc_rejoin_tasks: dict[str, asyncio.Task] = {} # room -> pending rejoin task
self._muc_last_join: dict[str, datetime] = {} # room -> terakhir join (cooldown)
self.auto_reconnect = True
self.register_plugin('xep_0030')
@ -45,22 +78,147 @@ class XMPPClient(ClientXMPP):
self.add_event_handler('connected', self._on_connected)
self.add_event_handler('groupchat_presence', self._on_muc_presence)
def _get_muc_nick(self, room: str) -> str:
"""Anti-ban: resolve nick untuk room, coba nick alternatif kalau conflict."""
base = config.XMPP_NICKNAME.strip() or self._muc_nick
suffix = self._muc_rejoin_attempts.get("_nick_" + room, 0)
if suffix == 0:
return base
# Anti-ban: append suffix untuk menghindari 409 Conflict
return f"{base}_{suffix}"
def _calc_rejoin_delay(self, room: str) -> float:
"""Anti-ban: hitung delay rejoin dengan exponential backoff."""
attempts = self._muc_rejoin_attempts.get(room, 0)
delay = MUC_REJOIN_INITIAL_DELAY * (MUC_REJOIN_BACKOFF_MULT ** attempts)
return min(delay, MUC_REJOIN_MAX_DELAY)
def _schedule_muc_rejoin(self, room: str):
"""Anti-ban: schedule rejoin room dengan backoff & cooldown."""
# Cancel pending rejoin task untuk room yang sama (anti-ban: avoid duplicate rejoin)
pending = self._muc_rejoin_tasks.get(room)
if pending and not pending.done():
pending.cancel()
print(f'[{_ts()}] MUC [{room}] Cancelled pending rejoin (new trigger)')
# Check cooldown: jangan rejoin terlalu cepat berturut-turut
now = datetime.now()
last_join = self._muc_last_join.get(room)
if last_join:
elapsed = (now - last_join).total_seconds()
if elapsed < MUC_REJOIN_COOLDOWN:
# Anti-ban: too soon, schedule delayed rejoin instead of immediate
cooldown_left = MUC_REJOIN_COOLDOWN - elapsed
print(f'[{_ts()}] MUC [{room}] Cooldown active ({cooldown_left:.0f}s left), delaying rejoin')
delay = cooldown_left + self._calc_rejoin_delay(room)
else:
delay = self._calc_rejoin_delay(room)
else:
delay = self._calc_rejoin_delay(room)
# Increment attempt counter (anti-ban: track for exponential backoff)
attempts = self._muc_rejoin_attempts.get(room, 0) + 1
self._muc_rejoin_attempts[room] = attempts
print(f'[{_ts()}] MUC [{room}] Rejoin scheduled in {delay:.0f}s (attempt #{attempts})')
if self._loop and not self._loop.is_closed():
task = asyncio.run_coroutine_threadsafe(
self._muc_rejoin_coro(room, delay), self._loop
)
self._muc_rejoin_tasks[room] = task
async def _muc_rejoin_coro(self, room: str, delay: float):
"""Anti-ban: coroutine untuk rejoin room setelah delay."""
try:
await asyncio.sleep(delay)
# Double-check: jangan rejoin kalau sudah di _muc_ready
if room in self._muc_ready:
print(f'[{_ts()}] MUC [{room}] Already ready, skip rejoin')
return
nick = self._get_muc_nick(room)
print(f'[{_ts()}] MUC [{room}] Rejoining as {nick}...')
await self.plugin['xep_0045'].join_muc_wait(room, nick, maxstanzas=0)
self._muc_last_join[room] = datetime.now()
# _muc_ready akan di-set oleh _on_muc_presence saat join berhasil
self._muc_rejoin_attempts.pop(room, None)
self._muc_rejoin_attempts.pop("_nick_" + room, None)
print(f'[{_ts()}] MUC [{room}] Rejoin successful as {nick}')
except asyncio.CancelledError:
print(f'[{_ts()}] MUC [{room}] Rejoin cancelled')
except Exception as e:
print(f'[{_ts()}] MUC [{room}] Rejoin failed: {e}')
# Anti-ban: handle 409 Conflict - nick sudah dipakai orang lain
if '409' in str(e) or 'conflict' in str(e).lower():
nick_attempts = self._muc_rejoin_attempts.get("_nick_" + room, 0)
if nick_attempts < MUC_NICK_SUFFIX_MAX:
# Anti-ban: coba nick alternatif (lily_, lily__)
self._muc_rejoin_attempts["_nick_" + room] = nick_attempts + 1
new_nick = self._get_muc_nick(room)
print(f'[{_ts()}] MUC [{room}] Nick conflict, trying alternative: {new_nick}')
# Retry segera dengan nick baru (tanpa backoff rejoin, tapi tetap ada delay biasa)
self._schedule_muc_rejoin(room)
else:
# Anti-ban: semua nick alternativehabis, stop retry untuk avoid ban
print(f'[{_ts()}] MUC [{room}] All nick variations exhausted, skipping room')
print(f'[{_ts()}] MUC [{room}] Set XMPP_NICKNAME in .env to a unique nick')
else:
# Anti-ban: error biasa (network, dll), retry with backoff
self._schedule_muc_rejoin(room)
async def _on_connected(self, event):
print(f'[{_ts()}] XMPP connected')
async def _on_disconnected(self, event):
print(f'[{_ts()}] XMPP disconnected')
# Anti-ban: cancel all pending rejoin tasks on disconnect
for room, task in list(self._muc_rejoin_tasks.items()):
if not task.done():
task.cancel()
print(f'[{_ts()}] MUC [{room}] Cancelled pending rejoin (disconnected)')
self._muc_rejoin_tasks.clear()
async def _on_session_start(self, event):
self.send_presence()
self.get_roster()
print(f'[{_ts()}] XMPP online as {self.boundjid.full}')
for room in self._muc_rooms:
try:
await self.plugin['xep_0045'].join_muc_wait(room, self._muc_nick, maxstanzas=0)
print(f'[{_ts()}] Joined MUC room: {room}')
except Exception as e:
print(f'[{_ts()}] MUC join failed ({room}): {e}')
# Anti-ban: retry join dengan incremental delay & nick fallback
success = False
for attempt in range(1, 4):
nick = self._get_muc_nick(room)
try:
await self.plugin['xep_0045'].join_muc_wait(room, nick, maxstanzas=0)
print(f'[{_ts()}] Joined MUC room: {room} as {nick}')
self._muc_last_join[room] = datetime.now()
self._muc_rejoin_attempts.pop(room, None)
self._muc_rejoin_attempts.pop("_nick_" + room, None)
success = True
break
except Exception as e:
print(f'[{_ts()}] MUC join attempt #{attempt} failed ({room}): {e}')
# Anti-ban: handle 409 Conflict - coba nick alternatif
if '409' in str(e) or 'conflict' in str(e).lower():
nick_attempts = self._muc_rejoin_attempts.get("_nick_" + room, 0)
if nick_attempts < MUC_NICK_SUFFIX_MAX:
nick_attempts += 1
self._muc_rejoin_attempts["_nick_" + room] = nick_attempts
print(f'[{_ts()}] MUC [{room}] Nick conflict, switching to: {self._get_muc_nick(room)}')
# Retry segera dengan nick baru (jangan wait)
continue
else:
# Anti-ban: semua nick alternatif habis
print(f'[{_ts()}] MUC [{room}] All nick variations exhausted')
break
elif attempt < 3:
# Anti-ban: error biasa, wait before retry (2s, 4s)
retry_delay = 2.0 * attempt
print(f'[{_ts()}] MUC [{room}] Retrying in {retry_delay:.0f}s...')
await asyncio.sleep(retry_delay)
if not success:
# Anti-ban: semua attempt gagal, schedule background rejoin
print(f'[{_ts()}] MUC [{room}] All join attempts failed, scheduling background rejoin')
self._schedule_muc_rejoin(room)
def _on_message(self, msg):
if msg['type'] not in ('chat', 'normal'):
@ -75,8 +233,9 @@ class XMPPClient(ClientXMPP):
def _on_groupchat_message(self, msg):
if msg['type'] != 'groupchat':
return
room = msg['from'].bare
nick = msg['from'].resource
if nick == self._muc_nick:
if self._is_my_nick(room, nick):
return
room = msg['from'].bare
if room not in self._muc_ready:
@ -87,22 +246,45 @@ class XMPPClient(ClientXMPP):
print(f'[{_ts()}] MUC [{room}] <{nick}>: {body[:60]}')
threading.Thread(target=self._process_muc, args=(room, nick, body), daemon=True).start()
def _is_my_nick(self, room: str, nick: str) -> bool:
"""Anti-ban: cek apakah nick yang dimasukan sesuai dengan nick bot di room."""
expected = self._get_muc_nick(room)
# Bandingkan dengan nick yang diharapkan, plus base nick tanpa suffix
base = config.XMPP_NICKNAME.strip() or self._muc_nick
return nick == expected or nick == base
def _on_muc_presence(self, presence):
room = presence['from'].bare
nick = presence['from'].resource
ptype = presence['type']
if nick == self._muc_nick and ptype not in ('unavailable', 'error'):
if self._is_my_nick(room, nick) and ptype not in ('unavailable', 'error'):
self._muc_ready.add(room)
# Reset rejoin counter on successful join (anti-ban: avoid accumulating backoff)
self._muc_rejoin_attempts.pop(room, None)
self._muc_rejoin_attempts.pop("_nick_" + room, None)
if ptype == 'unavailable':
print(f'[{_ts()}] MUC [{room}] <{nick}> left')
# Anti-ban: remove from ready set on unavailable to keep state consistent
self._muc_ready.discard(room)
# Anti-ban: trigger auto-rejoin with exponential backoff
if self._is_my_nick(room, nick):
self._schedule_muc_rejoin(room)
elif ptype == 'error':
print(f'[{_ts()}] MUC [{room}] error: {presence}')
# Anti-ban: also rejoin on error (e.g. temporary failure)
if self._is_my_nick(room, nick):
self._muc_ready.discard(room)
self._schedule_muc_rejoin(room)
else:
print(f'[{_ts()}] MUC [{room}] <{nick}> joined (type={ptype})')
def _process_dm(self, jid, body):
session = self._session_mgr.get_or_create(
jid, self._build_system_prompt(self._tools_def)
jid, self._build_system_prompt(
tools_definition=self._tools_def,
character=config.AGENT_CHARACTER or None,
skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None,
)
)
session.cancel_timer()
@ -116,14 +298,73 @@ class XMPPClient(ClientXMPP):
session.add_message('user', body)
self._schedule_send(jid, f'> {body}\nThinking...')
self._agent_loop(session, jid, body, 'chat')
is_roleplay = self._skill == 'roleplayer'
if not is_roleplay:
self._schedule_send(jid, f'> {body}\nThinking...')
session.start_timer(300, self._timeout_session, jid, 'chat')
# Delay 1: simulasi membaca pesan user
if self._loop and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(_read_delay(), self._loop)
my_name = PERSONALITY.name
quote = body
def on_tool_calls(tnames):
if not is_roleplay:
self._schedule_send(jid, f'> {quote}\nUsing: {", ".join(tnames)}', 'chat')
final_content = run_agent_loop(
session, self._llm, self._TOOLS, self._TOOL_HANDLERS,
self._max_iterations, on_tool_calls=on_tool_calls
)
if final_content is not None:
if is_roleplay:
if config.XMPP_SELECTIVE_RESPONSE:
recent_msgs = []
for msg in session.messages[-6:]:
if msg.get('role') == 'user':
recent_msgs.append(f"User: {msg.get('content', '')}")
elif msg.get('role') == 'assistant' and msg.get('content'):
recent_msgs.append(f"{my_name}: {msg.get('content', '')}")
recent_history = "\n".join(recent_msgs)
if should_respond(
message=quote,
sender_nickname=jid,
recent_history=recent_history,
my_name=my_name,
):
print(f'[{_ts()}] need_response=True → sending response')
self._schedule_send(jid, final_content, 'chat')
else:
print(f'[{_ts()}] need_response=False → staying silent')
else:
from tools.roleplayer import _name_mentioned
if _name_mentioned(my_name, quote):
print(f'[{_ts()}] Name mentioned → sending response')
self._schedule_send(jid, final_content, 'chat')
else:
print(f'[{_ts()}] Name not mentioned → staying silent')
else:
self._schedule_send(jid, f'> {quote}\n{final_content}', 'chat')
else:
msg = 'Max iterations reached without final answer.'
if is_roleplay:
self._schedule_send(jid, msg, 'chat')
else:
self._schedule_send(jid, f'> {quote}\n{msg}', 'chat')
# DM: timeout 24 jam (efektif tidak auto-close), MUC tetap 5 menit
session.start_timer(86400, self._timeout_session, jid, 'chat')
def _process_muc(self, room, nick, body):
session = self._session_mgr.get_or_create(
room, self._build_system_prompt(self._tools_def)
room, self._build_system_prompt(
tools_definition=self._tools_def,
character=config.AGENT_CHARACTER or None,
skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None,
)
)
session.cancel_timer()
@ -136,68 +377,67 @@ class XMPPClient(ClientXMPP):
prefixed = f'[{nick}] {body}'
session.add_message('user', prefixed)
self._schedule_send(room, f'> [{nick}] {body}\nThinking...', mtype='groupchat')
self._agent_loop(session, room, f'[{nick}] {body}', 'groupchat')
if self._skill != 'roleplayer':
self._schedule_send(room, f'> [{nick}] {body}\nThinking...', mtype='groupchat')
# Delay 1: simulasi membaca pesan user
if self._loop and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(_read_delay(), self._loop)
my_name = PERSONALITY.name
quote = f'[{nick}] {body}'
def on_tool_calls(tnames):
if not is_roleplay:
self._schedule_send(room, f'> {quote}\nUsing: {", ".join(tnames)}', 'groupchat')
final_content = run_agent_loop(
session, self._llm, self._TOOLS, self._TOOL_HANDLERS,
self._max_iterations, on_tool_calls=on_tool_calls
)
if final_content is not None:
if is_roleplay:
if config.XMPP_SELECTIVE_RESPONSE:
recent_msgs = []
for msg in session.messages[-6:]:
if msg.get('role') == 'user':
recent_msgs.append(f"User: {msg.get('content', '')}")
elif msg.get('role') == 'assistant' and msg.get('content'):
recent_msgs.append(f"{my_name}: {msg.get('content', '')}")
recent_history = "\n".join(recent_msgs)
if should_respond(
message=quote,
sender_nickname=nick,
recent_history=recent_history,
my_name=my_name,
):
print(f'[{_ts()}] need_response=True → sending response')
self._schedule_send(room, final_content, 'groupchat')
else:
print(f'[{_ts()}] need_response=False → staying silent')
else:
from tools.roleplayer import _name_mentioned
if _name_mentioned(my_name, quote):
print(f'[{_ts()}] Name mentioned → sending response')
self._schedule_send(room, final_content, 'groupchat')
else:
print(f'[{_ts()}] Name not mentioned → staying silent')
else:
self._schedule_send(room, f'> {quote}\n{final_content}', 'groupchat')
else:
msg = 'Max iterations reached without final answer.'
if is_roleplay:
self._schedule_send(room, msg, 'groupchat')
else:
self._schedule_send(room, f'> {quote}\n{msg}', 'groupchat')
session.start_timer(300, self._timeout_session, room, 'groupchat')
def _agent_loop(self, session, to, quote, mtype):
for step in range(self._max_iterations):
print(f'[{_ts()}] Step {step + 1} — calling LLM...')
response = self._llm.chat(session.messages, tools=self._TOOLS)
if response.tool_calls:
amsg = {
'role': 'assistant',
'content': response.content,
'tool_calls': response.tool_calls,
}
session.messages.append(amsg)
tnames = [tc['function']['name'] for tc in response.tool_calls]
print(f'[{_ts()}] Using tools: {", ".join(tnames)}')
self._schedule_send(to, f'> {quote}\nUsing: {", ".join(tnames)}', mtype)
for tc in response.tool_calls:
result = self._execute_tool(tc)
session.messages.append({
'role': 'tool',
'tool_call_id': tc['id'],
'content': str(result),
})
else:
if response.content:
print(f'[{_ts()}] Response sent ({len(response.content)} chars)')
session.messages.append({'role': 'assistant', 'content': response.content})
self._schedule_send(to, f'> {quote}\n{response.content}', mtype)
return
print(f'[{_ts()}] Max iterations ({self._max_iterations}) reached')
session.messages.append({
'role': 'assistant',
'content': 'Max iterations reached without final answer.',
})
self._schedule_send(to, f'> {quote}\nMax iterations reached without final answer.', mtype)
def _execute_tool(self, tool_call):
tname = tool_call['function']['name']
targs = json.loads(tool_call['function']['arguments'])
handler = self._TOOL_HANDLERS.get(tname)
if not handler:
return f'Tool {tname} not found'
try:
if tname == 'search_code':
return handler(
pattern=targs['pattern'],
search_type=targs['search_type'],
path=targs.get('path', '.'),
)
elif tname == 'git_operation':
return handler(args=targs['args'])
else:
return handler(**targs)
except Exception as e:
return f'Error executing tool: {str(e)}'
from services.agent_loop import execute_tool
return execute_tool(tool_call, self._TOOL_HANDLERS)
def _schedule_send(self, to, body, mtype='chat'):
if self._loop and not self._loop.is_closed():
@ -209,6 +449,11 @@ class XMPPClient(ClientXMPP):
async def _send_coro(self, to, body, mtype):
try:
# Delay 2: simulasi mengetik (proporsional dengan panjang pesan)
delay = _typing_delay(body)
print(f'[{_ts()}] Typing delay: {delay:.1f}s ({len(body)} chars)')
await asyncio.sleep(delay)
msg = self.make_message(mto=to, mbody=body, mtype=mtype)
msg.send()
except Exception as e:
@ -226,11 +471,15 @@ class XMPPClient(ClientXMPP):
async def _run(self):
self._stopped = asyncio.Event()
self._loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGHUP):
try:
self._loop.add_signal_handler(sig, self._stopped.set)
except NotImplementedError:
pass
# Hanya tangani SIGTERM untuk shutdown.
# SENGATKAN SIGHUP: nohup kirim SIGHUP saat terminal close,
# dan kita tidak mau proses mati karena itu.
try:
self._loop.add_signal_handler(signal.SIGTERM, self._stopped.set)
except (NotImplementedError, RuntimeError):
pass
await self.connect()
try:
await self._stopped.wait()

93
tools/carrack.py Normal file
View File

@ -0,0 +1,93 @@
import json
import urllib.error
import urllib.parse
import urllib.request
schema_sendhttprequest = {
"type": "function",
"function": {
"name": "sendhttprequest",
"description": "Send an HTTP request with full control over method, headers, and body/params.",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"],
"description": "HTTP method"
},
"authorization": {
"type": "string",
"description": "Bearer token or full Authorization header value (e.g. 'Bearer <token>' or 'Basic <base64>')"
},
"content_type": {
"type": "string",
"description": "Content-Type header (e.g. 'application/json', 'application/x-www-form-urlencoded')"
},
"data": {
"type": "object",
"description": "JSON body for POST/PUT/PATCH requests (will be serialized as JSON). Ignored for GET/DELETE/HEAD/OPTIONS."
},
"params": {
"type": "object",
"description": "Query parameters as key-value dict (appended to URL)"
}
},
"required": ["url", "method"]
}
}
}
def sendhttprequest(url, method, authorization=None, content_type=None, data=None, params=None):
try:
if params:
url_parts = list(urllib.parse.urlparse(url))
query = dict(urllib.parse.parse_qsl(url_parts[4]))
query.update(params)
url_parts[4] = urllib.parse.urlencode(query)
url = urllib.parse.urlunparse(url_parts)
headers = {}
if authorization:
headers["Authorization"] = authorization
if content_type:
headers["Content-Type"] = content_type
body = None
if data is not None and method.upper() in ("POST", "PUT", "PATCH"):
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
if content_type is None:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=headers, method=method.upper())
with urllib.request.urlopen(req, timeout=30) as resp:
status = resp.status
resp_headers = dict(resp.getheaders())
raw = resp.read()
content_type_resp = resp_headers.get("Content-Type", "")
if "application/json" in content_type_resp:
try:
resp_body = json.loads(raw)
except json.JSONDecodeError:
resp_body = raw.decode("utf-8", errors="replace")
else:
resp_body = raw.decode("utf-8", errors="replace")
return json.dumps({
"status": status,
"headers": resp_headers,
"body": resp_body
}, ensure_ascii=False, indent=2)
except urllib.error.HTTPError as e:
return json.dumps({
"status": e.code,
"headers": dict(e.headers),
"body": e.read().decode("utf-8", errors="replace")
}, ensure_ascii=False, indent=2)
except urllib.error.URLError as e:
return f"Error: {e.reason}"
except Exception as e:
return f"Error: {str(e)}"

View File

@ -146,7 +146,10 @@ schema_git_operation = {
"type": "function",
"function": {
"name": "git_operation",
"description": "Run a git command. Pass the git arguments as a list (e.g., ['status', '--short'] for 'git status --short').",
"description": "Run a git command. Pass the git arguments as a list (e.g., ['status', '--short'] for 'git status --short'). "
"POLICY: Never run 'git add' or 'git commit' without explicit user permission. "
"Safe to run without asking: git status, git diff, git log. "
"Always ask first before committing.",
"parameters": {
"type": "object",
"properties": {

View File

@ -1,4 +1,7 @@
import glob as globmod
import json
import os
import time
import chromadb
from chromadb.config import Settings
@ -183,6 +186,50 @@ schema_inspect_collection = {
}
schema_ingest_files = {
"type": "function",
"function": {
"name": "ingest_files",
"description": (
"Read one or more files (supports glob patterns like *.py or src/**/*.md) "
"and store their content into a RAG collection. "
"Optionally chunk files into smaller pieces by line count. "
"Automatically extracts metadata: filename, path, extension, size, modification time."
),
"parameters": {
"type": "object",
"properties": {
"collection": {
"type": "string",
"description": "Target collection name (will be created if it doesn't exist)"
},
"paths": {
"type": "array",
"items": {"type": "string"},
"description": "File paths or glob patterns (e.g., ['*.txt', 'src/**/*.py'])"
},
"chunk_size": {
"type": "integer",
"description": "Lines per chunk (0 = whole file as one document)",
"default": 0
},
"chunk_overlap": {
"type": "integer",
"description": "Line overlap between chunks (only used when chunk_size > 0)",
"default": 0
},
"recursive": {
"type": "boolean",
"description": "Search directories recursively when using glob patterns",
"default": True
}
},
"required": ["collection", "paths"]
}
}
}
# ── Tool handlers ────────────────────────────────────────────────────
def _sanitize_meta(meta):
@ -294,3 +341,144 @@ def inspect_collection(collection, sample_size=3):
return "\n".join(out)
except Exception as e:
return f"Error: {e}"
def ingest_files(collection, paths, chunk_size=0, chunk_overlap=0, recursive=True):
try:
col = _collection(collection)
all_ids, all_texts, all_metas = [], [], []
processed, skipped = 0, 0
# Expand glob patterns into real file paths
file_set = set()
for p in paths:
expanded = globmod.glob(p, recursive=recursive)
if expanded:
file_set.update(expanded)
else:
# Maybe it's a literal path that doesn't look like a glob
if os.path.isfile(p):
file_set.add(p)
else:
skipped += 1
if not file_set:
return "No matching files found."
for fpath in sorted(file_set):
if not os.path.isfile(fpath):
skipped += 1
continue
ext = os.path.splitext(fpath)[1].lower()
stat = os.stat(fpath)
base_meta = {
"filename": os.path.basename(fpath),
"path": os.path.relpath(fpath),
"extension": ext,
"size": stat.st_size,
"mtime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(stat.st_mtime)),
}
base_name = os.path.splitext(os.path.basename(fpath))[0]
# ── read content ──────────────────────────────────────────
if ext in (".xlsx", ".xlsm"):
try:
import openpyxl
except ImportError:
skipped += 1
continue
wb = openpyxl.load_workbook(fpath, read_only=True, data_only=True)
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
rows = []
for row in ws.iter_rows(values_only=True):
vals = [str(c) if c is not None else "" for c in row]
rows.append("\t".join(vals))
lines = rows
content = "\n".join(lines)
if not content.strip():
continue
sheet_meta = dict(base_meta)
sheet_meta["sheet"] = sheet_name
if chunk_size > 0:
n_lines = len(lines)
cid = 0
start = 0
while start < n_lines:
end = min(start + chunk_size, n_lines)
chunk_text = "\n".join(lines[start:end])
doc_id = f"{base_name}_{sheet_name}_chunk_{cid}"
meta = dict(sheet_meta)
meta["chunk_index"] = cid
meta["chunk_lines"] = end - start
meta["chunk_start_line"] = start + 1
all_ids.append(doc_id)
all_texts.append(chunk_text)
all_metas.append(_sanitize_meta(meta))
cid += 1
step = chunk_size - chunk_overlap
start += step if step > 0 else 1
processed += 1
else:
doc_id = f"{base_name}_{sheet_name}"
all_ids.append(doc_id)
all_texts.append(content)
all_metas.append(_sanitize_meta(sheet_meta))
processed += 1
wb.close()
else:
# Plain-text files
try:
with open(fpath, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
except Exception:
skipped += 1
continue
content = "".join(lines)
if not content.strip():
skipped += 1
continue
if chunk_size > 0:
n_lines = len(lines)
cid = 0
start = 0
while start < n_lines:
end = min(start + chunk_size, n_lines)
chunk_text = "".join(lines[start:end])
doc_id = f"{base_name}_chunk_{cid}"
meta = dict(base_meta)
meta["chunk_index"] = cid
meta["chunk_lines"] = end - start
meta["chunk_start_line"] = start + 1
all_ids.append(doc_id)
all_texts.append(chunk_text)
all_metas.append(_sanitize_meta(meta))
cid += 1
step = chunk_size - chunk_overlap
start += step if step > 0 else 1
processed += 1
else:
doc_id = base_name
all_ids.append(doc_id)
all_texts.append(content)
all_metas.append(_sanitize_meta(base_meta))
processed += 1
if all_ids:
col.add(ids=all_ids, documents=all_texts, metadatas=all_metas)
parts = [f"Ingested {processed} file(s) into '{collection}'"]
if processed > 0:
parts.append(f"({len(all_ids)} document(s) total)")
if skipped > 0:
parts.append(f"({skipped} file(s) skipped)")
return " ".join(parts)
except Exception as e:
return f"Error: {e}"

99
tools/roleplayer.py Normal file
View File

@ -0,0 +1,99 @@
import re
schema_need_response = {
"type": "function",
"function": {
"name": "need_response",
"description": (
"Decide whether you should respond to the current message. "
"Use this tool when you are unsure whether to reply or stay silent. "
"Returns 'true' if you should respond, 'false' if you should stay silent. "
"Rules: "
"- Return 'true' if your name is mentioned or someone talks about you. "
"- Return 'true' if the message is related to your previous conversation. "
"- Return 'false' if the message is between other people, unclear, "
"unrelated to you, or you have nothing to add."
),
"parameters": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The latest message to evaluate.",
},
"sender_nickname": {
"type": "string",
"description": "The nickname of the person who sent the message.",
},
"recent_history": {
"type": "string",
"description": "Recent conversation history for context (last few messages).",
},
"my_name": {
"type": "string",
"description": "Your (the AI's) name.",
},
},
"required": ["message", "sender_nickname", "recent_history", "my_name"],
},
},
}
def _name_mentioned(name: str, text: str) -> bool:
text_lower = text.lower()
name_lower = name.lower()
pattern = r'\b' + re.escape(name_lower) + r'\b'
return bool(re.search(pattern, text_lower))
def need_response(message: str, sender_nickname: str, recent_history: str, my_name: str) -> str:
"""
Decide whether the AI should respond to a message.
Args:
message: The latest message to evaluate.
sender_nickname: Nickname of the sender.
recent_history: Recent conversation history for context.
my_name: The AI's name.
Returns:
"true" should respond
"false" should stay silent
Rules:
1. STRONG Direct call/mention of AI's name → always True
2. BRIEF Talks about AI in third person True
3. CONTEXTUAL Related to AI's previous conversation → True
4. NO REPLY Unrelated, confusing, between other people False
"""
msg = message.strip()
# --- Rule 4: Empty message ---
if not msg:
return "false"
# --- Rule 1: Direct mention/name call ---
if _name_mentioned(my_name, msg):
return "true"
# --- Rule 2: Talks about AI (third person) ---
name_parts = my_name.lower().split()
text_lower = msg.lower()
if any(part in text_lower for part in name_parts if len(part) > 1):
return "true"
# --- Rule 3: Contextual — check recent history ---
if recent_history:
return "true"
# --- Rule 4: Default — no strong signal, stay silent ---
return "false"
def should_respond(message: str, sender_nickname: str, recent_history: str, my_name: str) -> bool:
"""
Python-side helper: return boolean directly.
Used by XMPP client to gate whether to send the LLM's response.
"""
result = need_response(message, sender_nickname, recent_history, my_name)
return result.strip().lower() == "true"

View File

@ -1,19 +1,39 @@
import json
import threading
from datetime import datetime
import config
from scripts import ntro
WELCOME_ART = """\
\n\
def _add_msg(app, role, content, **kwargs):
msg = {"role": role, "content": content}
msg.update(kwargs)
app.messages.append(msg)
if app.current_session:
app.session_mgr.add_message(
app.current_session.doc_id, role, content, **kwargs
)
WELCOME_ART = """
__ __ _______ __ _ ______ ______ ___ ___ _
| | | || || | | || | | _ | | | | | | |
| |_| || ___|| |_| || _ || | || | | | |_| |
| || |___ | || | | || |_||_ | | | _|
| || ___|| _ || |_| || __ || | | |_
| _ || |___ | | | || || | | || | | _ |
|__| |__||_______||_| |__||______| |___| |_||___| |___| |_|
"""
"""
/\\_/\\ H E N D R I K
( o.o ) AI Agent
> ^ < siap membantu!
( )
/\\_/\\
( o.o ) HENDRIK
> ^ <
( ) AI Agent
(___)
"""
"""
def log(app, role, text):
@ -30,13 +50,25 @@ def submit(app, stdscr):
return
log(app, "user", query)
model_info = (
config.resolve_provider(app.llm.base_url, app.llm.model),
app.llm.model
)
if app.log:
app.log[-1]["model_info"] = model_info
app.input_buffer = [""]
app.input_line = 0
app.input_col = 0
app.scroll = 999999
app.processing = True
app.messages.append({"role": "user", "content": query})
if app.current_session is None:
app.current_session = app.session_mgr.create(
f"Session {datetime.now().strftime('%Y-%m-%d %H:%M')}",
app._model_info(),
)
_add_msg(app, "user", query, model_info=app._model_info())
app.agent_done.clear()
app.agent_thread = threading.Thread(
@ -59,27 +91,26 @@ def _agent_loop(app):
app.log.pop()
if response.warning:
log(app, "system", f" {response.warning}")
if response.tool_calls:
amsg = {
"role": "assistant",
"content": response.content,
"tool_calls": response.tool_calls,
}
app.messages.append(amsg)
_add_msg(app, "assistant", response.content, tool_calls=response.tool_calls)
if response.content and response.content.strip():
log(app, "ai", response.content)
app.scroll = 999999
for tc in response.tool_calls:
tname = tc["function"]["name"]
log(app, "system", f" \u2192 {tname}")
targs = tc["function"]["arguments"]
log(app, "tool_call", json.dumps({
"name": tname,
"arguments": targs,
}))
app.scroll = 999999
execute_tool(app, tc)
else:
if response.content:
app.messages.append({
"role": "assistant",
"content": response.content,
})
_add_msg(app, "assistant", response.content)
log(app, "ai", response.content)
log(app, "sep", "")
ntro.end(stamp)
@ -88,8 +119,7 @@ def _agent_loop(app):
ntro.end(stamp_step)
log(app, "error", "Max iterations reached without final answer.")
app.messages.append({"role": "assistant",
"content": "Max iterations reached without final answer."})
_add_msg(app, "assistant", "Max iterations reached without final answer.")
ntro.end(stamp)
app.agent_done.set()
@ -116,8 +146,4 @@ def execute_tool(app, tool_call):
except Exception as e:
result = f"Error executing tool: {str(e)}"
app.messages.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"content": str(result),
})
_add_msg(app, "tool", str(result), tool_call_id=tool_call["id"])

View File

@ -1,8 +1,11 @@
import curses
import threading
from datetime import datetime
import config
from .render import init_colors, draw
from .input import handle_key
from .agent import log, WELCOME_ART
from services.session_manager_neo import NeoSessionManager, NeoSession
class HendrikTUI:
@ -28,6 +31,61 @@ class HendrikTUI:
self.agent_thread: threading.Thread | None = None
self.agent_done = threading.Event()
self.session_mgr = NeoSessionManager()
self.current_session: NeoSession | None = None
def switch_model(self, item: dict):
self.llm.base_url = item["base_url"]
self.llm.model = item["model"]
self.llm.api_key = item["api_key"]
if self.current_session:
self.session_mgr.update_model_info(
self.current_session.doc_id, self._model_info()
)
def _model_info(self) -> dict:
return {
"provider": config.resolve_provider(self.llm.base_url, self.llm.model),
"base_url": self.llm.base_url,
"model": self.llm.model,
}
def new_session(self):
name = f"Session {datetime.now().strftime('%Y-%m-%d %H:%M')}"
self.current_session = self.session_mgr.create(name, self._model_info())
self.messages = [{"role": "system", "content": self.build_system_prompt(
tools_definition=self.tools_def,
character=config.AGENT_CHARACTER or None,
skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None,
)}]
self.log.clear()
self.scroll = 0
log(self, "welcome", WELCOME_ART)
def switch_session(self, doc_id: int):
session = self.session_mgr.get(doc_id)
if not session:
return
self.current_session = session
self.messages = list(session.messages)
self.log.clear()
self.scroll = 0
log(self, "welcome", WELCOME_ART)
for i, msg in enumerate(session.messages):
if msg["role"] == "user":
if i > 0:
log(self, "sep", "")
log(self, "user", msg["content"])
mi = msg.get("model_info")
if mi:
self.log[-1]["model_info"] = (mi.get("provider"), mi.get("model"))
elif msg["role"] == "assistant":
next_role = session.messages[i + 1]["role"] if i + 1 < len(session.messages) else None
if next_role in ("user", None):
log(self, "ai", msg["content"])
if session.messages and session.messages[-1]["role"] == "assistant":
log(self, "sep", "")
def run(self):
try:
curses.wrapper(self._main)
@ -40,8 +98,11 @@ class HendrikTUI:
stdscr.keypad(True)
stdscr.refresh()
self.messages = [{"role": "system",
"content": self.build_system_prompt(self.tools_def)}]
self.messages = [{"role": "system", "content": self.build_system_prompt(
tools_definition=self.tools_def,
character=config.AGENT_CHARACTER or None,
skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None,
)}]
log(self, "welcome", WELCOME_ART)
while self.running:

View File

@ -4,6 +4,7 @@
import curses
import os
import config
from .agent import submit, log
@ -55,6 +56,26 @@ def handle_key(app, stdscr, key):
workspace_popup(app, stdscr)
elif key == 12: # Ctrl+L → clear chat log
app.log.clear()
elif key == 5: # Ctrl+E → model selector popup
if not processing:
model_selector_popup(app, stdscr)
# -- Session management shortcuts --
elif key == 14: # Ctrl+N → new session
if not processing:
new_session_popup(app, stdscr)
elif key == 15: # Ctrl+O → open session browser
if not processing:
session_browser_popup(app, stdscr)
elif key == 6: # Ctrl+F → search sessions
if not processing:
session_search_popup(app, stdscr)
elif key == 18: # Ctrl+R → rename current session
if not processing:
rename_popup(app, stdscr)
elif key == 24: # Ctrl+X → delete current session
if not processing:
delete_session_popup(app, stdscr)
# -- Enter: split logical line at cursor position --
elif key in (curses.KEY_ENTER, 10, 13):
@ -154,3 +175,367 @@ def workspace_popup(app, stdscr):
stdscr.touchwin()
stdscr.refresh()
def model_selector_popup(app, stdscr):
current_base = app.llm.base_url.rstrip("/")
current_model = app.llm.model
# Group by provider
providers: list[tuple[str, list[dict]]] = []
seen = {}
for item in config.MODELS_ITEMS:
p = item["provider"]
if p not in seen:
seen[p] = []
providers.append((p, seen[p]))
seen[p].append(item)
items = [] # (type, data, label)
selectable = [] # indices into items for model entries
current_idx = 0
for pname, plist in providers:
items.append(("header", None, pname))
for entry in plist:
idx = len(items)
items.append(("model", entry, entry["model"]))
selectable.append(idx)
if entry["base_url"] == current_base and entry["model"] == current_model:
current_idx = len(selectable) - 1
if not selectable:
return
pw = min(50, app.w - 4)
ph = min(len(items) + 4, app.h - 4)
px = (app.w - pw) // 2
py = (app.h - ph) // 2
if ph < 6:
return
win = curses.newwin(ph, pw, py, px)
win.keypad(True)
while True:
win.erase()
win.box()
win.addstr(0, 2, " Pilih Model (Ctrl+E) ", curses.A_BOLD)
visible_h = ph - 2
total = len(items)
scroll = max(0, min(selectable[current_idx] - visible_h // 2, total - visible_h))
for i in range(visible_h):
idx = scroll + i
if idx >= total:
break
typ, data, label = items[idx]
y = 1 + i
if typ == "header":
win.addstr(y, 2, f" {label} ", curses.A_BOLD | curses.A_UNDERLINE)
else:
is_cur = (idx == selectable[current_idx])
is_active = (data["base_url"] == current_base and data["model"] == current_model)
if is_cur:
prefix = " \u25b6 " if is_active else " > "
elif is_active:
prefix = " \u2192 "
else:
prefix = " "
display = prefix + label
if len(display) > pw - 4:
display = display[:pw - 4]
attr = curses.A_REVERSE if is_cur else curses.A_NORMAL
win.addstr(y, 2, display.ljust(pw - 4), attr)
footer = " \u2191\u2193 nav \u21b5 select esc/q close "
win.addstr(ph - 1, 2, footer[:pw - 4], curses.A_DIM)
try:
target_y = selectable[current_idx] - scroll + 1
if 0 <= target_y < ph - 1:
win.move(target_y, 4)
except curses.error:
pass
win.refresh()
key = win.getch()
if key in (27, ord("q"), ord("Q")):
break
elif key in (curses.KEY_ENTER, 10, 13):
sel_item = items[selectable[current_idx]]
if sel_item[0] == "model":
app.switch_model(sel_item[1])
break
elif key == curses.KEY_UP:
if current_idx > 0:
current_idx -= 1
elif key == curses.KEY_DOWN:
if current_idx < len(selectable) - 1:
current_idx += 1
del win
stdscr.touchwin()
stdscr.refresh()
def new_session_popup(app, stdscr):
if not app.current_session:
app.new_session()
return
pw = min(50, app.w - 4)
ph = 3
px = (app.w - pw) // 2
py = app.h // 2 - 1
win = curses.newwin(ph, pw, py, px)
win.box()
win.addstr(0, 2, " Start new session?", curses.A_BOLD)
win.addstr(1, 2, " [Y]es [N]o ")
win.refresh()
while True:
key = win.getch()
if key in (ord("y"), ord("Y")):
app.new_session()
break
elif key in (ord("n"), ord("N"), 27):
break
del win
stdscr.touchwin()
stdscr.refresh()
def session_browser_popup(app, stdscr):
sessions = app.session_mgr.list()
if not sessions:
curses.flash()
return
items = []
for s in sessions:
dt = s["updated_at"][:16].replace("T", " ")
label = f"{s['name']} ({dt}, {s['message_count']} msg)"
items.append((s["doc_id"], label))
pw = min(60, app.w - 4)
ph = min(len(items) + 4, app.h - 4)
if ph < 5:
curses.flash()
return
px = (app.w - pw) // 2
py = (app.h - ph) // 2
win = curses.newwin(ph, pw, py, px)
win.keypad(True)
current_idx = 0
while True:
win.erase()
win.box()
win.addstr(0, 2, " Sessions (\u2191\u2193 nav \u21b5 select D delete esc close)",
curses.A_BOLD)
visible_h = ph - 2
total = len(items)
scroll = max(0, min(current_idx - visible_h // 2, total - visible_h))
for i in range(visible_h):
idx = scroll + i
if idx >= total:
break
doc_id, label = items[idx]
y = 1 + i
is_cur = (idx == current_idx)
is_active = (app.current_session and doc_id == app.current_session.doc_id)
if is_cur:
prefix = " \u25b6 " if is_active else " > "
elif is_active:
prefix = " \u2192 "
else:
prefix = " "
display = prefix + label
if len(display) > pw - 4:
display = display[:pw - 4]
attr = curses.A_REVERSE if is_cur else curses.A_NORMAL
win.addstr(y, 2, display.ljust(pw - 4), attr)
win.refresh()
key = win.getch()
if key in (27, ord("q"), ord("Q")):
break
elif key in (curses.KEY_ENTER, 10, 13):
doc_id, _ = items[current_idx]
if not app.current_session or doc_id != app.current_session.doc_id:
app.switch_session(doc_id)
break
elif key == curses.KEY_UP:
if current_idx > 0:
current_idx -= 1
elif key == curses.KEY_DOWN:
if current_idx < len(items) - 1:
current_idx += 1
elif key in (ord("d"), ord("D")):
doc_id, label = items[current_idx]
if app.current_session and doc_id == app.current_session.doc_id:
continue
app.session_mgr.delete(doc_id)
sessions = app.session_mgr.list()
items = [(s["doc_id"],
f"{s['name']} ({s['updated_at'][:16].replace('T', ' ')}, {s['message_count']} msg)")
for s in sessions]
current_idx = min(current_idx, len(items) - 1)
if not items:
break
del win
stdscr.touchwin()
stdscr.refresh()
def session_search_popup(app, stdscr):
pw = min(60, app.w - 4)
ph = 3
px = (app.w - pw) // 2
py = app.h // 2 - 1
win = curses.newwin(ph, pw, py, px)
win.box()
win.addstr(0, 2, " Search sessions: ")
win.addstr(1, 2, " " * (pw - 4))
curses.echo()
query = win.getstr(1, 2, pw - 5).decode("utf-8")
curses.noecho()
del win
stdscr.touchwin()
stdscr.refresh()
query = query.strip()
if not query:
return
results = app.session_mgr.search(query)
if not results:
log(app, "system", f"No sessions matching: {query}")
return
items = []
for s in results:
dt = s["updated_at"][:16].replace("T", " ")
label = f"{s['name']} ({dt}, {s['message_count']} msg)"
items.append((s["doc_id"], label))
pw = min(60, app.w - 4)
ph = min(len(items) + 4, app.h - 4)
if ph < 5:
curses.flash()
return
px = (app.w - pw) // 2
py = (app.h - ph) // 2
win = curses.newwin(ph, pw, py, px)
win.keypad(True)
current_idx = 0
while True:
win.erase()
win.box()
win.addstr(0, 2, f" Results for \"{query}\" (\u2191\u2193 nav \u21b5 select esc close)",
curses.A_BOLD)
visible_h = ph - 2
total = len(items)
scroll = max(0, min(current_idx - visible_h // 2, total - visible_h))
for i in range(visible_h):
idx = scroll + i
if idx >= total:
break
doc_id, label = items[idx]
y = 1 + i
is_cur = (idx == current_idx)
attr = curses.A_REVERSE if is_cur else curses.A_NORMAL
win.addstr(y, 2, f" {label}"[:pw - 4].ljust(pw - 4), attr)
win.refresh()
key = win.getch()
if key in (27, ord("q"), ord("Q")):
break
elif key in (curses.KEY_ENTER, 10, 13):
doc_id, _ = items[current_idx]
if not app.current_session or doc_id != app.current_session.doc_id:
app.switch_session(doc_id)
break
elif key == curses.KEY_UP:
if current_idx > 0:
current_idx -= 1
elif key == curses.KEY_DOWN:
if current_idx < len(items) - 1:
current_idx += 1
del win
stdscr.touchwin()
stdscr.refresh()
def rename_popup(app, stdscr):
if not app.current_session:
return
pw = min(60, app.w - 4)
ph = 3
px = (app.w - pw) // 2
py = app.h // 2 - 1
win = curses.newwin(ph, pw, py, px)
win.box()
win.addstr(0, 2, " Rename session: ")
win.addstr(1, 2, " " * (pw - 4))
curses.echo()
new_name = win.getstr(1, 2, pw - 5).decode("utf-8")
curses.noecho()
del win
stdscr.touchwin()
stdscr.refresh()
new_name = new_name.strip()
if new_name:
app.session_mgr.rename(app.current_session.doc_id, new_name)
app.current_session.name = new_name
def delete_session_popup(app, stdscr):
if not app.current_session:
return
pw = min(50, app.w - 4)
ph = 4
px = (app.w - pw) // 2
py = app.h // 2 - 1
win = curses.newwin(ph, pw, py, px)
win.box()
win.addstr(0, 2, " Delete current session?", curses.A_BOLD)
name = app.current_session.name
display = name if len(name) <= pw - 6 else name[:pw - 9] + "..."
win.addstr(1, 2, f" \"{display}\"")
win.addstr(2, 2, " [Y]es [N]o ")
win.refresh()
while True:
key = win.getch()
if key in (ord("y"), ord("Y")):
doc_id = app.current_session.doc_id
app.session_mgr.delete(doc_id)
app.new_session()
break
elif key in (ord("n"), ord("N"), 27):
break
del win
stdscr.touchwin()
stdscr.refresh()

View File

@ -3,8 +3,10 @@
# lalu membaca state dari `app` untuk menggambar di layar.
import curses
import json
import os
import textwrap
import config
# -- Color pair IDs (id 1-9, id 0 = default curses) --
C_HEADER = 1 # header bar: biru
@ -21,6 +23,8 @@ C_INPUT_BORDER = 9 # border input box: biru
C_STATUS_INFO = 12 # status info (workspace/hints): putih
C_HINT_DISABLED = 13 # hint disabled (abu-abu)
C_WELCOME = 14 # welcome art: light blue
C_TOOL_CALL = 15 # tool call: kuning terang
C_TOOL_RESULT = 16 # tool result: magenta muda
def init_colors():
@ -40,6 +44,8 @@ def init_colors():
curses.init_pair(C_INPUT_BORDER, curses.COLOR_BLUE, -1)
curses.init_pair(C_HINT_DISABLED, 8, -1) # abu-abu di atas bg default
curses.init_pair(C_WELCOME, curses.COLOR_BLUE + 8, -1) # light blue
curses.init_pair(C_TOOL_CALL, curses.COLOR_YELLOW + 8, -1) # bright yellow
curses.init_pair(C_TOOL_RESULT, curses.COLOR_MAGENTA + 8, -1) # bright magenta
def draw(app, stdscr):
@ -74,67 +80,124 @@ def draw_chat(app, stdscr):
if chat_h <= 0:
return
# Render log ke list of (color, text) agar scroll calculation akurat
# Setiap baris di-wrap sesuai lebar terminal
rendered = []
# Render log ke list of rows; setiap row = list of (color, text) segments.
# Ini memungkinkan satu baris punya multi-warna (misal label tool_call).
# Setiap baris di-wrap sesuai lebar terminal.
rendered = [] # list of list of (color, text)
def _add_row(segments):
# segments: list of (color, text)
rendered.append(segments)
def _add_blank():
rendered.append([(None, "")])
def _wrap_render(text, indent=0, color=C_INPUT):
available = w - indent - 1 # sisakan 1 kolom margin kanan
if available <= 0:
rendered.append((color, " " * indent))
_add_row([(color, " " * indent)])
return
for line in text.split("\n"):
if not line:
rendered.append((color, " " * indent))
_add_row([(color, " " * indent)])
continue
start = 0
while start < len(line):
chunk = line[start:start + available]
rendered.append((color, " " * indent + chunk))
_add_row([(color, " " * indent + chunk)])
start += available
for idx, item in enumerate(app.log):
role, text = item["role"], item["text"]
if role == "sep":
rendered.append((None, ""))
rendered.append((None, ""))
_add_blank()
_add_blank()
continue
# Tambah blank line sebelum system log setelah user/ai response
if role == "system" and idx > 0 and app.log[idx - 1]["role"] in ("user", "ai"):
rendered.append((None, ""))
_add_blank()
# Tambah blank line sebelum ai response setelah user (langsung, tanpa tools)
if role == "ai" and idx > 0 and app.log[idx - 1]["role"] in ("user", "ai"):
rendered.append((None, ""))
_add_blank()
# Tambah blank line sebelum ai response setelah system log
if role == "ai" and idx > 0 and app.log[idx - 1]["role"] == "system":
rendered.append((None, ""))
_add_blank()
if role == "user":
model_info = item.get("model_info", None)
if model_info:
p_name, m_name = model_info
else:
p_name = config.resolve_provider(app.llm.base_url, app.llm.model)
m_name = app.llm.model
if p_name:
info_line = f" {p_name} - {m_name} "
else:
info_line = f" {m_name} "
_add_row([(C_USER, info_line)])
label = f" You ({item['time']}) "
rendered.append((C_USER, label))
_add_row([(C_USER, label)])
_wrap_render(text, indent=1, color=C_INPUT)
elif role == "ai":
label = f" Hendrik ({item['time']}) "
rendered.append((C_AI, label))
_add_row([(C_AI, label)])
_wrap_render(text, indent=1, color=C_INPUT)
elif role == "system":
lines = text.split("\n")
rendered.append((C_SYSTEM, lines[0]))
_add_row([(C_SYSTEM, lines[0])])
for line in lines[1:]:
rendered.append((C_SYSTEM, " " + line))
_add_row([(C_SYSTEM, " " + line)])
elif role == "welcome":
lines = text.split("\n")
for line in lines:
rendered.append((C_WELCOME, " " + line))
_add_row([(C_WELCOME, " " + line)])
elif role == "tool_call":
# Format:
# (blank line)
# Hendrik run_bash (HH:MM) ← "Hendrik" hijau, "run_bash (HH:MM)" kuning
# { ← arguments indent 1 spasi, kuning
# "command": "ls -la"
# }
# (blank line)
# Blank line di atas (kecuali sebelumnya sudah blank dari role lain)
if idx > 0 and app.log[idx - 1]["role"] not in ("sep", "welcome"):
_add_blank()
try:
tc = json.loads(text)
tname = tc["name"]
targs_raw = tc["arguments"]
# Pretty-print arguments
try:
targs = json.loads(targs_raw) if isinstance(targs_raw, str) else targs_raw
args_str = json.dumps(targs, indent=2, ensure_ascii=False)
except Exception:
args_str = str(targs_raw)
# Label: "Hendrik" hijau + "tool_name" kuning + "(HH:MM)" hijau
_add_row([
(C_AI, " Hendrik "),
(C_TOOL_CALL, tname),
(C_AI, f" ({item['time']}) "),
])
for aline in args_str.split("\n"):
_add_row([(C_INPUT, " " + aline)])
except Exception:
_add_row([
(C_AI, " Hendrik "),
(C_TOOL_CALL, "unknown"),
(C_AI, f" ({item['time']}) "),
])
# Blank line di bawah
_add_blank()
elif role == "error":
label = " \u2717 "
lines = text.split("\n")
rendered.append((C_ERROR, label + (lines[0] if lines else "")))
_add_row([(C_ERROR, label + (lines[0] if lines else ""))])
for line in lines[1:]:
rendered.append((C_ERROR, " " + line))
_add_row([(C_ERROR, " " + line)])
# Clamp scroll agar tidak melebihi total baris
total = len(rendered)
@ -153,14 +216,21 @@ def draw_chat(app, stdscr):
y = chat_top
for i in range(app.scroll, min(app.scroll + chat_h, total)):
color, text = rendered[i]
attr = curses.color_pair(color) | curses.A_BOLD if color else curses.A_NORMAL
if len(text) > w:
text = text[:w]
try:
stdscr.addstr(y, 0, text, attr)
except curses.error:
pass
segments = rendered[i]
x = 0
for color, text in segments:
if not text:
continue
attr = curses.color_pair(color) | curses.A_BOLD if color else curses.A_NORMAL
remaining = w - x
if remaining <= 0:
break
display = text[:remaining]
try:
stdscr.addstr(y, x, display, attr)
except curses.error:
pass
x += len(display)
y += 1
@ -263,27 +333,46 @@ def draw_input(app, stdscr):
def draw_status(app, stdscr):
# Status bar di baris h-9: workspace, mode (READY/PROCESSING), shortcut hints
# Status bar di baris h-9: [session] workspace, mode (READY/PROCESSING), shortcut hints
h, w = app.h, app.w
y = h - 9
ws = os.getcwd()
mode = " PROCESSING " if app.processing else " READY "
hints = " ^D:send ^W:workspace ^C:exit "
max_ws = w - len(mode) - len(hints) - 4
if len(ws) > max_ws:
ws = ".." + ws[-(max_ws - 2):]
status = (f" {ws} \u2502{mode}\u2502{hints}").ljust(w)[:w]
session_tag = ""
if app.current_session:
sname = app.current_session.name
if len(sname) > 20:
sname = sname[:17] + "..."
session_tag = f"[{sname}] "
mode = " PROCESSING " if app.processing else " READY "
hints = " ^N:new ^O:open ^R:rename ^D:send ^E:model ^W:ws ^C:exit "
max_left = w - len(mode) - len(hints) - 4
left = session_tag + ws
if len(left) > max_left:
left = ".." + left[-(max_left - 2):]
status = (f" {left} \u2502{mode}\u2502{hints}").ljust(w)[:w]
stdscr.addstr(y, 0, status, curses.color_pair(C_STATUS_INFO))
# Highlight mode dengan warna berbeda
mode_start = len(f" {ws} \u2502")
mode_start = len(f" {left} \u2502")
mode_end = mode_start + len(mode)
mode_attr = curses.color_pair(C_STATUS_READY) if not app.processing else curses.color_pair(C_STATUS_PROC)
stdscr.addstr(y, mode_start, mode, mode_attr | curses.A_BOLD)
# ^D:send — abu-abu saat processing, bold putih saat idle
if app.processing:
stdscr.addstr(y, mode_end + 2, "^D:send", curses.color_pair(C_HINT_DISABLED))
else:
stdscr.addstr(y, mode_end + 2, "^D:send", curses.color_pair(C_STATUS_INFO) | curses.A_BOLD)
# Hint shortcuts di kanan — tombol tertentu abu-abu saat processing
x = mode_end + 2
hints_parts = [
("^N:new", True),
(" ^O:open", True),
(" ^R:rename", True),
(" ^D:send", not app.processing),
(" ^E:model", True),
(" ^W:ws", True),
(" ^C:exit", True),
]
for text, enabled in hints_parts:
attr = curses.color_pair(C_STATUS_INFO) | curses.A_BOLD if enabled else curses.color_pair(C_HINT_DISABLED)
stdscr.addstr(y, x, text, attr)
x += len(text)