Telegram bot jako monitoring — alerty a příkazy pro živý systém

Máš systém, co běží 24/7. Scrapery, schedulery, pipeline. Všechno jede v Dockeru, všechno je automatizované. Krásný.
Ale pak v noci selže scraper. Databáze se na chvíli odpojí. Pipeline se zasekne. A ty se to dozvíš ráno, když otevřeš dashboard a vidíš červené čtverečky.
Řešení? Telegram bot, který ti pošle push notifikaci na telefon do sekund po problému. A jako bonus — můžeš mu přes zprávy ve skupině posílat příkazy zpátky.
30 minut práce. Nula závislostí. Nula nákladů.
Proč Telegram a ne email/Slack?
Push notifikace okamžitě. Email skončí ve spamu nebo ho přečteš za hodinu. Telegram vibrace na telefonu probudí i v noci (pokud chceš).
Zero infrastructure. Nepotřebuješ webhook server, SMTP relay, ani nic hostovat. Stačí jeden HTTP POST na Telegram API.
Skupinový chat. Přidáš bota do skupiny → celý tým vidí stejné alerty. Žádné přeposílání, žádné "viděl jsi ten email?"
Zdarma. Telegram Bot API nemá limity, které by tě v normálním provozu omezovaly. Žádný pricing tier, žádné "free plan — max 100 zpráv."
Krok 1: Vytvoření bota přes BotFather
Otevři Telegram a najdi @BotFather (ověřený účet s modrým checkmarkem).
/newbot
BotFather se zeptá na jméno a username. Jméno může být cokoliv ("Můj Monitoring Bot"), username musí končit na bot a být unikátní (muj_system_alert_bot).
Dostaneš token — dlouhý string typu:
7123456789:AAF1xYz2AbC3dEf4GhI5jKl6MnO7pQrStUv
Tohle je tvůj API klíč. Nikam ho necommituj. Uložit do .env.
Krok 2: Přidat bota do skupiny a zjistit Chat ID
- Vytvoř Telegram skupinu (nebo použij existující)
- Přidej bota jako člena skupiny
- Pošli do skupiny libovolnou zprávu
- Otevři v prohlížeči:
https://api.telegram.org/bot<TVŮJ_TOKEN>/getUpdates
V JSON odpovědi najdeš chat.id — záporné číslo typu -1001234567890. To je ID tvé skupiny.
{
"result": [{
"message": {
"chat": {
"id": -1001234567890,
"title": "System Alerts",
"type": "supergroup"
}
}
}]
}
Tip: Pokud je result prázdný, pošli do skupiny ještě jednu zprávu a refreshni URL.
Krok 3: Python modul — zero dependencies
Tohle je celý modul. Žádný pip install, žádný python-telegram-bot, žádný aiogram. Jen standardní knihovna:
# src/utils/telegram.py
from urllib.request import Request, urlopen
from urllib.error import URLError
import json
import os
import logging
logger = logging.getLogger(__name__)
BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
API_URL = "https://api.telegram.org/bot{token}/{method}"
def send_alert(message: str, level: str = "info") -> bool:
"""Pošle zprávu do Telegram skupiny. Vrací True při úspěchu."""
if not BOT_TOKEN or not CHAT_ID:
logger.warning("Telegram není nakonfigurovaný, přeskakuji alert")
return False
prefix = {
"critical": "🚨",
"warning": "⚠️",
"success": "✅",
"info": "ℹ️",
}.get(level, "")
full_message = f"{prefix} {message}" if prefix else message
payload = json.dumps({
"chat_id": CHAT_ID,
"text": full_message,
"parse_mode": "HTML",
}).encode("utf-8")
url = API_URL.format(token=BOT_TOKEN, method="sendMessage")
req = Request(url, data=payload, headers={"Content-Type": "application/json"})
try:
with urlopen(req, timeout=10) as resp:
return resp.status == 200
except (URLError, TimeoutError) as e:
logger.error(f"Telegram alert selhal: {e}")
return False
Proč takhle?
Žádná knihovna. python-telegram-bot má 15+ závislostí a řeší polling, handlery, konverzace — to všechno nepotřebujeme pro odesílání zpráv. urllib je ve standardní knihovně a pro jeden POST request stačí.
HTML parse_mode. Telegram podporuje <b>, <i>, <code>, <pre> tagy. Formátování alertů vypadá profesionálně bez nutnosti Markdown escape hell.
Graceful degradace. Když Telegram API neodpoví, systém běží dál. Alert se zaloguje jako warning, ale nic nespadne. Monitoring nesmí nikdy shodit to, co monitoruje.
Env vars. Token a Chat ID v .env souboru, nikdy v kódu. V Dockeru se předávají přes docker-compose.yml.
Krok 4: Napojení na trigger pointy
Teď máš send_alert() — zbývá ho zavolat na správných místech. Tady jsou tři klíčové:
a) Selhání po vyčerpání retries
Tvůj scheduler má retry logiku s exponential backoff. Když úloha selže i po posledním pokusu:
# scheduler.py — po vyčerpání retries
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
try:
result = run_source(source)
break
except Exception as e:
if attempt == MAX_RETRIES - 1:
send_alert(
f"<b>{source.name}</b> selhal po {MAX_RETRIES} pokusech!\n"
f"Poslední chyba: <code>{str(e)[:200]}</code>",
level="critical"
)
else:
wait = 60 * (2 ** attempt) # 60s → 120s → 240s
time.sleep(wait)
b) Databáze nedostupná
Connection pool detekuje, že se nedá připojit k PostgreSQL:
# db.py — connection pool
try:
conn = pool.getconn()
except OperationalError as e:
send_alert(
f"<b>DB nedostupná!</b>\n"
f"Pool: {pool.closed}/{pool.maxconn} spojení\n"
f"<code>{str(e)[:200]}</code>",
level="critical"
)
c) Ranní report — denní souhrn
Scheduler spustí report každý den v 9:00. Report se generuje z databáze a vypadá takhle:
ℹ️ Ranní report — 06.04.2026
Scraping (včera):
✅ Zdroj A (1s)
✅ Zdroj B (158s)
❌ Zdroj C — timeout po 300s
Nové záznamy: 237
Schváleno: 12 | K posouzení: 180 | Zamítnuto: 45
DB: 4190 firem, 392 leadů, 5012 kontaktů
⚠️ Zaseklé checkpointy: 1
Kód pro generování:
# daily_report.py
from datetime import date, timedelta
def generate_daily_report(db) -> str:
yesterday = date.today() - timedelta(days=1)
# Výsledky scraperů
runs = db.fetch_all(
"SELECT source, status, duration_s, error "
"FROM scheduler_runs WHERE date = %s",
(yesterday,)
)
lines = [f"<b>Ranní report — {date.today().strftime('%d.%m.%Y')}</b>\n"]
lines.append("<b>Scraping (včera):</b>")
for run in runs:
if run["status"] == "ok":
lines.append(f" ✅ {run['source']} ({run['duration_s']}s)")
else:
lines.append(f" ❌ {run['source']} — {run['error'][:50]}")
# Statistiky nových záznamů
stats = db.fetch_one(
"SELECT count(*) as total, "
"sum(case when status='approved' then 1 else 0 end) as approved, "
"sum(case when status='pending' then 1 else 0 end) as pending, "
"sum(case when status='rejected' then 1 else 0 end) as rejected "
"FROM leads WHERE created_at::date = %s",
(yesterday,)
)
lines.append(f"\n<b>Nové záznamy:</b> {stats['total']}")
lines.append(
f" Schváleno: {stats['approved']} | "
f"K posouzení: {stats['pending']} | "
f"Zamítnuto: {stats['rejected']}"
)
return "\n".join(lines)
# V scheduleru:
report = generate_daily_report(db)
send_alert(report, level="info")
Krok 5: Interaktivní příkazy — bot poslouchá zpět
Dosud bot jen posílá. Teď ho naučíme poslouchat. Princip je jednoduchý — long polling přes getUpdates endpoint:
# src/bot/command_handler.py
from urllib.request import Request, urlopen
import json
import os
import time
import logging
logger = logging.getLogger(__name__)
BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "")
API_URL = "https://api.telegram.org/bot{token}/{method}"
ALLOWED_CHAT_IDS = {CHAT_ID} # whitelist — jen tvoje skupina
def get_updates(offset: int = 0, timeout: int = 30) -> list:
"""Long polling — čeká na nové zprávy."""
url = API_URL.format(token=BOT_TOKEN, method="getUpdates")
params = json.dumps({
"offset": offset,
"timeout": timeout,
"allowed_updates": ["message"],
}).encode("utf-8")
req = Request(url, data=params, headers={"Content-Type": "application/json"})
with urlopen(req, timeout=timeout + 10) as resp:
data = json.loads(resp.read())
return data.get("result", [])
def send_reply(chat_id: str, text: str) -> None:
"""Odešle odpověď do chatu."""
url = API_URL.format(token=BOT_TOKEN, method="sendMessage")
payload = json.dumps({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
}).encode("utf-8")
req = Request(url, data=payload, headers={"Content-Type": "application/json"})
urlopen(req, timeout=10)
Příkazy
# Handlery pro jednotlivé příkazy
def handle_status(db) -> str:
"""Aktuální stav systému."""
last_runs = db.fetch_all(
"SELECT source, status, finished_at "
"FROM scheduler_runs "
"ORDER BY finished_at DESC LIMIT 5"
)
lines = ["<b>Status systému</b>\n"]
for run in last_runs:
icon = "✅" if run["status"] == "ok" else "❌"
ago = humanize_time(run["finished_at"])
lines.append(f"{icon} {run['source']} — {ago}")
return "\n".join(lines)
def handle_health(db) -> str:
"""Zdraví databáze a systému."""
db_size = db.fetch_one("SELECT pg_database_size(current_database()) as size")
conn_count = db.fetch_one("SELECT count(*) as c FROM pg_stat_activity")
return (
f"<b>Health check</b>\n\n"
f"DB velikost: {db_size['size'] // 1024 // 1024} MB\n"
f"Aktivní spojení: {conn_count['c']}\n"
f"Pool: OK\n"
f"Uptime scheduleru: {get_uptime()}"
)
def handle_run(source_name: str, scheduler) -> str:
"""Manuální spuštění úlohy."""
available = scheduler.get_sources()
if source_name not in available:
return (
f"Neznámý zdroj: <code>{source_name}</code>\n"
f"Dostupné: {', '.join(available)}"
)
scheduler.trigger(source_name)
return f"⚡ Spouštím <b>{source_name}</b>..."
Polling loop
# src/bot/polling.py
import threading
COMMANDS = {
"/status": lambda db, _: handle_status(db),
"/health": lambda db, _: handle_health(db),
"/run": lambda db, args: handle_run(args, scheduler),
"/help": lambda *_: (
"<b>Dostupné příkazy:</b>\n\n"
"/status — poslední běhy\n"
"/health — zdraví systému\n"
"/run <zdroj> — spustit úlohu\n"
"/help — tento seznam"
),
}
def polling_loop(db, scheduler):
"""Hlavní polling loop — běží ve vlastním vlákně."""
offset = 0
while True:
try:
updates = get_updates(offset=offset)
for update in updates:
offset = update["update_id"] + 1
msg = update.get("message", {})
chat_id = str(msg.get("chat", {}).get("id", ""))
text = msg.get("text", "").strip()
# Bezpečnost: jen whitelistované skupiny
if chat_id not in ALLOWED_CHAT_IDS:
logger.warning(f"Ignoruji zprávu z neznámého chatu: {chat_id}")
continue
# Parsování příkazu
parts = text.split(maxsplit=1)
command = parts[0].lower() if parts else ""
args = parts[1] if len(parts) > 1 else ""
# Odstraní @username suffix (/status@muj_bot → /status)
command = command.split("@")[0]
if command in COMMANDS:
reply = COMMANDS[command](db, args)
send_reply(chat_id, reply)
except Exception as e:
logger.error(f"Polling error: {e}")
time.sleep(5)
def start_bot(db, scheduler):
"""Spustí polling v daemon vlákně."""
thread = threading.Thread(
target=polling_loop,
args=(db, scheduler),
daemon=True,
)
thread.start()
logger.info("Telegram bot polling spuštěn")
Bezpečnost interaktivních příkazů
Tohle je důležité — bot, co přijímá příkazy, musí mít alespoň základní ochranu:
Whitelist Chat ID. Bot reaguje jen na zprávy z tvé skupiny. Kdokoliv jiný je ignorován.
Logování. Každý přijatý příkaz zaloguj — kdo, kdy, co. Až se budeš divit proč se spustil scraper v 3 ráno, podíváš se do logů.
Žádné destruktivní operace. /run spustí úlohu, ale neexistuje /delete nebo /restart. Telegram skupina není správné místo pro destruktivní příkazy.
# Logování příkazů
logger.info(f"Příkaz: {command} {args} od chat_id={chat_id}")
Docker integrace
# docker-compose.yml
services:
scheduler:
build: ./scheduler
env_file: .env
environment:
TELEGRAM_BOT_TOKEN: ${TELEGRAM_BOT_TOKEN}
TELEGRAM_CHAT_ID: ${TELEGRAM_CHAT_ID}
restart: unless-stopped
# .env
TELEGRAM_BOT_TOKEN=7123456789:AAF1xYz2AbC3dEf4GhI5jKl6MnO7pQrStUv
TELEGRAM_CHAT_ID=-1001234567890
Spuštění polling loopu v scheduleru:
# main.py
from bot.polling import start_bot
# Spustí bot polling jako daemon vlákno
start_bot(db, scheduler)
# Scheduler pokračuje v normálním provozu
scheduler.run()
Co jsme záměrně vynechali
Rate limiting na alerty. Při kaskádovém selhání může přijít 10 zpráv za minutu. Zatím to nevadí — vidíš, že hoří, a to je přesně co chceš. Až budeš mít 50 zdrojů, přidáš deduplikaci.
Webhook místo pollingu. Webhook vyžaduje veřejně dostupný HTTPS endpoint. Polling je jednodušší, spolehlivější a pro naše objemy (desítky zpráv denně) naprosto dostačující.
Tlačítka a inline klávesnice. Telegram umí inline buttons, reply keyboards a callback queries. Pro monitoring je to overkill — textové příkazy stačí.
Výsledek
| Co | Hodnota | |---|---| | Čas implementace | ~30 minut | | Nové závislosti | 0 | | Měsíční náklady | 0 Kč | | Latence alertu | < 2 sekundy |
Operátor dostane push notifikaci na telefon do sekund po selhání. Ranní report dá přehled bez nutnosti otevírat dashboard. A když potřebuješ ručně spustit scraper, napíšeš /run zdroj do skupiny místo SSH na server.
Telegram Bot API je nejjednodušší monitoring, co jsem kdy implementoval. Žádný setup, žádná registrace, žádný pricing. Jeden Python soubor, dva env vars, a máš push monitoring na telefonu.