Следим за Telegram по-деловому

в 22:11, , рубрики: aiogram, business mode, logger, python, telegram
Следим за Telegram по-деловому - 1

Приветствую!
Сегодня будем писать бота для хранения истории личных сообщений

Безусловно, идея не уникальная: часть людей уже использует неофициальные клиенты, другая – юзерботов (например, на pyrogram)

Тогда что здесь?

Официальный бизнес-режим бота, не нарушающий TOS, работающий 24/7 и за который не сносят аккаунты (!)

Ближе к делу

Вот что мы получим по итогу

Вот что мы получим по итогу

Создание бота (Для новичков)

  1. Зайдите в бота @BotFather

  2. Напишите /newbot и как-нибудь обзовите бота, например "Уведомления"

  3. Придумайте ему @юзернейм, подходят только латинские буквы, цифры и нижнее подчеркивание, обязательно напишите "bot" в конце

  4. Бот готов – скопируйте и сохраните токен

  5. Настройте бота:

    1. Напишите /mybots и выберите своего бота

    2. Edit Bot -> Edit Botpic -> Отправьте аватарку для бота

    3. Back to Bot -> Bot Settings -> Business Mode -> Turn On

Написание бота

Стек: python 3.12, aiogram, redis, docker
(Ссылка на GitHub репозиторий внизу статьи)

Для начала придумываем файловую структуру, что-то типа:

.
├── main.py              # Запуск бота
├── pyproject.toml       # Управление зависимостями Poetry
├── Dockerfile           # Инструкции для сборки Docker образа
├── docker-compose.yml   # Конфигурация Docker контейнеров (бот + redis)
├── .env                 # Переменные окружения (токен, настройки)
└── src/
    ├── bot.py           # Основная логика бота, хендлеры
    ├── keyboards.py     # Клавиатуры и коллбэки
    └── settings.py      # Pydantic парсер .env файла

Теперь нужно установить библиотеки:

poetry add aiogram pydantic pydantic_settings loguru redis

pyproject.toml должен получится таким:

[tool.poetry]
name = "business-bot"
version = "0.1.0"
description = ""
authors = []
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.12"
aiogram = "^3.13.1"
loguru = "^0.7.2"
redis = "^5.2.0"
pydantic = "^2.9.2"
pydantic-settings = "^2.6.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

Далее прописываем конфиг в settings.py, используя pydantic_settings, который парсит .env:

  • REDIS_HOST, REDIS_PORT, REDIS_PASSWORD по умолчанию устанавливаются в docker-compose.yml, но они нужны, если Вы будете запускать бота вне контейнера, используя локальный/облачный редис

from typing import Optional

from pydantic import SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict


class Environment(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")


class Redis(Environment):
    REDIS_HOST: str = "localhost"
    REDIS_PORT: int = 6379
    REDIS_PASSWORD: Optional[SecretStr] = None


class Bot(Environment):
    TOKEN: SecretStr = SecretStr("7285627548:FaGPjgAJFU8G524HE38hubGrgTcQcrpmbyc")


class Settings(Bot, Redis):
    USER_ID: int = 0


settings = Settings()

И соответственно настраиваем .env файл:

  • Токен бота ранее доставали из Bot Father

  • Свой ID можно получить с помощью @getmyid_bot

TOKEN="7285627548:FaGPjgAJFU8G524HE38hubGrgTcQcrpmbyc"
USER_ID=123456789

Прописываем переменные проекта в bot.py:

from aiogram import Bot, Dispatcher
from redis.asyncio import Redis

from .settings import settings

bot = Bot(token=settings.TOKEN.get_secret_value())
dp = Dispatcher()

redis = Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    password=(
        settings.REDIS_PASSWORD.get_secret_value() 
        if settings.REDIS_PASSWORD else None
    ),
)

и используем их в main.py для запуска:

from loguru import logger
from src.bot import dp, bot

if __name__ == "__main__":
    logger.info("Starting...")
    dp.run_polling(
        bot,
        allowed_updates=[
            "callback_query",
            "business_message",
            "edited_business_message",
            "deleted_business_messages",
        ],
    )

Важно! aiogram по умолчанию слушает только следующие хэндлеры: message, callback_query, errors. Остальные он автоматически не парсит и их нужно добавлять вручную через allowed_updates.

В файле bot.py сохраняем все новые сообщения в Redis с авто-удалением через 3 недели:

EX_TIME = 60 * 60 * 24 * 21


async def set_message(message: types.Message):
    await redis.set(
        f"{message.chat.id}:{message.message_id}",
        message.model_dump_json(),
        ex=EX_TIME,
    )


@dp.business_message()
async def message(message: types.Message):
    await set_message(message)

Теперь нужно как-то красиво сообщать об изменении статуса сообщений. Лучшим вариантом я считаю добавление inline клавиатуры, так не будет редактироваться исходное сообщение.
Первой кнопкой будет статус сообщения – ✏️ или 🗑️
Второй – ссылка на профиль отправителя вида tg://user?id=0
И третьей – кнопка для закрытия уведомления

Для этого прописываем markup и callbacks в файле keyboards.py:

from enum import StrEnum
from aiogram.utils.keyboard import InlineKeyboardBuilder


class Callbacks(StrEnum):
    EMPTY = "empty"
    CLOSE = "close"


def link_markup(title: str, user_id: int):
    builder = InlineKeyboardBuilder()
    builder.button(text=title, callback_data=Callbacks.EMPTY)
    builder.button(text="👤", url=f"tg://user?id={user_id}")
    builder.button(text="❌", callback_data=Callbacks.CLOSE)
    return builder.adjust(3).as_markup()

В примере выше я использую StrEnum для константных callback_data. На моей практике, импортировать класс с енамами и в следствии использовать их с F.data гораздо удобнее, чем в остальных вариантах, например, с глобальными переменными.

Отправляем уведомление при изменении сообщения (здесь создается копия сообщения на основе json строки из Redis):

@dp.edited_business_message()
async def edited_message(message: types.Message):
    model_dump = await redis.get(f"{message.chat.id}:{message.message_id}")
    await set_message(message)

    if not model_dump:
        return

    original_message = types.Message.model_validate_json(model_dump)
    if not original_message.from_user:
        return

    await original_message.send_copy(
        chat_id=settings.USER_ID,
        reply_markup=link_markup("✏️", original_message.from_user.id),
    ).as_(bot)

И аналогично отправляем уведомления при удалении сообщений:

  • Одним конвейером (pipeline) достаем все удаленные сообщения по их ID и отправляем сохраненные копии

  • Если сообщений слишком много, мы можем столкнуться с flood wait от телеграма, но все удаленные сообщения должны гарантировано доставиться, поэтому дожидаемся его окончания

  • И в конце очищаем хранилище от удаленных сообщений

async def copy_message(message: types.Message):
    await message.send_copy(
        chat_id=settings.USER_ID,
    ).as_(bot)


@dp.deleted_business_messages()
async def deleted_message(business_messages: types.BusinessMessagesDeleted):
    pipe = redis.pipeline()
    for message_id in business_messages.message_ids:
        pipe.get(f"{business_messages.chat.id}:{message_id}")
    messages_data = await pipe.execute()

    keys_to_delete = []
    for message_id, model_dump in zip(business_messages.message_ids, messages_data):
        if not model_dump:
            continue

        original_message = types.Message.model_validate_json(model_dump)
        if not original_message.from_user:
            continue

        send_copy = original_message.send_copy(
            chat_id=settings.USER_ID,
            reply_markup=link_markup("🗑️", original_message.from_user.id),
        ).as_(bot)

        try:
            await send_copy
        except exceptions.TelegramRetryAfter as exp:
            logger.warning(f"Retry after {exp.retry_after} seconds")
            await asyncio.sleep(exp.retry_after + 0.1)
            await send_copy
        finally:
            await asyncio.sleep(0.1)

        keys_to_delete.append(f"{business_messages.chat.id}:{message_id}")

    if keys_to_delete:
        await redis.delete(*keys_to_delete)

Еще нужно добавить примитивную логику для inline кнопок:

@dp.callback_query(F.data == Callbacks.EMPTY)
async def empty(query: types.CallbackQuery):
    await query.answer()


@dp.callback_query(F.data == Callbacks.CLOSE)
async def close(query: types.CallbackQuery):
    await query.answer()
    if isinstance(query.message, types.Message):
        await query.message.delete()

Полный файл bot.py:

import asyncio
from aiogram import F, Bot, Dispatcher, types, exceptions
from loguru import logger
from redis.asyncio import Redis

from .settings import settings
from .keyboards import link_markup, Callbacks

bot = Bot(token=settings.TOKEN.get_secret_value())
dp = Dispatcher()

redis = Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    password=(
        settings.REDIS_PASSWORD.get_secret_value() if settings.REDIS_PASSWORD else None
    ),
)

EX_TIME = 60 * 60 * 24 * 21


async def set_message(message: types.Message):
    await redis.set(
        f"{message.chat.id}:{message.message_id}",
        message.model_dump_json(),
        ex=EX_TIME,
    )


@dp.business_message()
async def message(message: types.Message):
    await set_message(message)


@dp.edited_business_message()
async def edited_message(message: types.Message):
    model_dump = await redis.get(f"{message.chat.id}:{message.message_id}")
    await set_message(message)

    if not model_dump:
        return

    original_message = types.Message.model_validate_json(model_dump)
    if not original_message.from_user:
        return

    await original_message.send_copy(
        chat_id=settings.USER_ID,
        reply_markup=link_markup("✏️", original_message.from_user.id),
    ).as_(bot)


async def copy_message(message: types.Message):
    await message.send_copy(
        chat_id=settings.USER_ID,
    ).as_(bot)


@dp.deleted_business_messages()
async def deleted_message(business_messages: types.BusinessMessagesDeleted):
    pipe = redis.pipeline()
    for message_id in business_messages.message_ids:
        pipe.get(f"{business_messages.chat.id}:{message_id}")
    messages_data = await pipe.execute()

    keys_to_delete = []
    for message_id, model_dump in zip(business_messages.message_ids, messages_data):
        if not model_dump:
            continue

        original_message = types.Message.model_validate_json(model_dump)
        if not original_message.from_user:
            continue

        send_copy = original_message.send_copy(
            chat_id=settings.USER_ID,
            reply_markup=link_markup("🗑️", original_message.from_user.id),
        ).as_(bot)

        try:
            await send_copy
        except exceptions.TelegramRetryAfter as exp:
            logger.warning(f"Retry after {exp.retry_after} seconds")
            await asyncio.sleep(exp.retry_after + 0.1)
            await send_copy
        finally:
            await asyncio.sleep(0.1)

        keys_to_delete.append(f"{business_messages.chat.id}:{message_id}")

    if keys_to_delete:
        await redis.delete(*keys_to_delete)


@dp.callback_query(F.data == Callbacks.EMPTY)
async def empty(query: types.CallbackQuery):
    await query.answer()


@dp.callback_query(F.data == Callbacks.CLOSE)
async def close(query: types.CallbackQuery):
    await query.answer()
    if isinstance(query.message, types.Message):
        await query.message.delete()

Почти закончили!
Осталось обернуть все в контейнер:

Dockerfile

FROM python:3.12-slim

RUN pip install poetry

WORKDIR /app

# Копируем и устанавливаем зависимости
COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.create false 
  && poetry install --no-interaction --no-ansi --no-root

# Копируем файлы проекта и переменные окружения
COPY . .
COPY .env .env

EXPOSE 80

CMD ["python3", "main.py"]

docker-compose.yml

services:
  redis:
    image: redis:alpine
    command: redis-server --requirepass ${REDIS_PASSWORD:-password} --save 60 1
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 3s
      retries: 3
    ports:
      - "6380:6379"
    volumes:
      - redis_data:/data

  web:
    build: .
    restart: unless-stopped
    ports:
      - "8081:80"
    depends_on:
      redis:
        condition: service_healthy
    environment:
      REDIS_HOST: redis
      REDIS_PORT: 6379
      REDIS_PASSWORD: ${REDIS_PASSWORD:-password}

volumes:
  redis_data:

Вот теперь готово!

Билдим бота и проверяем, что все работает:

docker-compose up --build

Исходный код

Это первая статья на Хабре, буду рад любой конструктивной критике!

Автор: AidenDev

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js