Недавно ко мне обратился друг с просьбой написать бота, импортирующего новости из RSS-канала на сайте в Telegram-канал. Огромнейшим плюсом данного способа оповещения являются push-уведомления, которые приходят каждому подписанному пользователю на его устройство. Уже давно хотелось заняться чем-то подобным. Недолго думая, в качестве образца я выбрал канал Хабра telegram.me/habr_ru. В качестве языка программирования был выбран Python.
В итоге, мне надо было решить следующие проблемы:
- Парсинг RSS.
- Одним из условий был отложенный постинг сообщений (если после того, как новость была выложена, в течение n часов её скрыли/удалили/переименовали, то она не должна быть опубликована, вместо нее отправляется оповещение о корректной новости)
- Постинг сообщений в телеграм.
- Сокращение целевой ссылки с помощью сервиса bit.ly
От себя добавил еще:
- Ведение логов с помощью библиотеки (logging).
- Обработка конфига (configparser).
1. Отложенный постинг сообщений
Для решения данной проблемы было принято решение использовать SQLite базу данных. Для работы с БД использовалась библиотека SQLalchemy.
Структура до банального проста — всего одна таблица. Код объекта представлен ниже:
class News(Base):
__tablename__ = 'news'
id = Column(Integer, primary_key=True) # Порядковый номер новости
text = Column(String) # Текст (Заголовок), который будет отправлен в сообщении
link = Column(String) # Ссылка на статью на сайте. Так же отправляется в сообщении
date = Column(Integer)
# Дата появления новости на сайте. Носит Чисто информационный характер. UNIX_TIME.
publish = Column(Integer)
# Планируемая дата публикации. Сообщение будет отправлено НЕ РАНЬШЕ этой даты. UNIX_TIME.
chat_id = Column(Integer)
# Информационный столбец. В данное поле логируется чат, в который было отправлено сообщение
message_id = Column(Integer)
# Информационный столбец. В данный столбец логирует внутренний идентификатор сообщения в канале.
def __init__(self, text, link, date, publish=0,chat_id=0,message_id=0):
self.link = link
self.text = text
self.date = date
self.publish = publish
self.chat_id = chat_id
self.message_id = message_id
def _keys(self):
return (self.text, self.link)
def __eq__(self, other):
return self._keys() == other._keys()
def __hash__(self):
return hash(self._keys())
def __repr__(self):
return "<News ('%s','%s', %s)>" % (base64.b64decode(self.text).decode(),
base64.b64decode(self.link).decode(),
datetime.fromtimestamp(self.publish))
# Для зрительного восприятия данные декодируются
Для хранения текстовой информации и ссылок использется base64, форматом хранения даты-времени был выбран Unix Timestamp.
Обработка данных сессии осуществляется отдельным классом.
Base = declarative_base()
class database:
"""
Класс для обработки сессии SQLAlchemy.
Также включает в себя минимальный набор методов, вызываемых в управляющем классе.
Названия методов говорящие.
"""
def __init__(self, obj):
engine = create_engine(obj, echo=False)
Session = sessionmaker(bind=engine)
self.session = Session()
def add_news(self, news):
self.session.add(news)
self.session.commit()
def get_post_without_message_id(self):
return self.session.query(News).filter(and_(News.message_id == 0,
News.publish<=int(time.mktime(time.localtime())))).all()
def update(self, link, chat, msg_id):
self.session.query(News).filter_by(link = link).update({"chat_id":chat, "message_id":msg_id})
self.session.commit()
def find_link(self,link):
if self.session.query(News).filter_by(link = link).first(): return True
else: return False
При обнаружении новости, она добавляется в базу. Сразу же задается время публикации.
Для обнаружения новостей готовых к публикации используется метод get_post_withwithout_message_id
. Фактически, мы выбираем из базы все посты, у которых message_id=0
и дата публикации меньше текущего времени.
Для проверки на новизну отправляем запрос базе данных на факт содержания ссылки на новость (метод find_link
).
Метод update
служит для обновления данных, после публикации новости в канале.
2. Парсинг RSS
Стоит признаться, что писать свой RSS парсер совсем не хотелось, поэтому в бой вступила библиотека feedparser.
import feedparser
class source(object):
def __init__(self, link):
self.link = link
self.news = []
self.refresh()
def refresh(self):
data = feedparser.parse(sв качествеelf.link)
self.news = [News(binascii.b2a_base64(i['title'].encode()).decode(),
binascii.b2a_base64(i['link'].encode()).decode(),
int(time.mktime(i['published_parsed']))) for i in data['entries']]
Код до смешного прост. При вызове метода refresh
с помощью генератора формируется список объектов класса News из последних 30 размещенных постов в rss ленте.
3. Сокращение ссылок
Как упоминалось выше, в качестве сервиса был выбран bit.ly. API не вызвает лишних вопросов.
class bitly:
def __init__(self,access_token):
self.access_token = access_token
def short_link(self, long_link):
url = 'https://api-ssl.bitly.com/v3/shorten?access_token=%s&longUrl=%s&format=json'
% (self.access_token, long_link)
try:
return json.loads(urllib.request.urlopen(url).read().decode('utf8'))['data']['url']
except:
return long_link
В инит метод передается только наш access_token. В случае неудачного получения сокращенной ссылки, метод short_link
возвращает переданную ему изначальную ссылку.
4. Управляющий класс
class export_bot:
def __init__(self):
config = configparser.ConfigParser()
config.read('./config')
log_file = config['Export_params']['log_file']
self.pub_pause = int(config['Export_params']['pub_pause'])
self.delay_between_messages = int(config['Export_params']['delay_between_messages'])
logging.basicConfig(format = u'%(filename)s[LINE:%(lineno)d]# %(levelname)-8s
[%(asctime)s] %(message)s',level = logging.INFO, filename = u'%s'%log_file)
self.db = database(config['Database']['Path'])
self.src = source(config['RSS']['link'])
self.chat_id = config['Telegram']['chat']
bot_access_token = config['Telegram']['access_token']
self.bot = telegram.Bot(token=bot_access_token)
self.bit_ly = bitly(config['Bitly']['access_token'])
def detect(self):
#получаем 30 последних постов из rss-канала
self.src.refresh()
news = self.src.news
news.reverse()
#Проверяем на наличие в базе ссылки на новость. Если нет, то добавляем в базу данных с
#отложенной публикацией
for i in news:
if not self.db.find_link(i.link):
now = int(time.mktime(time.localtime()))
i.publish = now + self.pub_pause
logging.info( u'Detect news: %s' % i)
self.db.add_news(i)
def public_posts(self):
#Получаем 30 последних записей из rss канала и новости из БД, у которых message_id=0
posts_from_db = self.db.get_post_without_message_id()
self.src.refresh()
line = [i for i in self.src.news]
#Выбор пересечний этих списков
for_publishing = list(set(line) & set(posts_from_db))
for_publishing.reverse()
print(for_publishing)
#Постинг каждого сообщений
for post in for_publishing:
text = '%s %s' % (base64.b64decode(post.text).decode('utf8'),
self.bit_ly.short_link(base64.b64decode(post.link).decode('utf-8')))
a = self.bot.sendMessage(chat_id=self.chat_id, text=text, parse_mode=telegram.ParseMode.HTML)
message_id = a.message_id
chat_id = a['chat']['id']
self.db.update(post.link, chat_id, message_id)
logging.info( u'Public: %s;%s;' % (post, message_id))
time.sleep(self.delay_between_messages)
При инциализации с помощью библиотеки configparser считываем наш конфиг-файл и настраиваем логгирование.
Чтобы детектировать новости, используем метод detect
. Получаем последние 30 опубликованных постов, поочередно проверяем наличие ссылки в базе данных.
Перед публикацией, необходимо проверить наличие постов, выгруженных из базы данных в rss-канале. В этом нам помогут множества. И после этого уже публикуем новость с помощью библиотеки telegram. Её функционал довольно широк и ориентирован на написание ботов. После публикации необходимо обновить message_id
и chat_id
.
В итоге получаем:
Стоит отметить то, что если переписать класс rss, то так же можно будет импортировать новости из других источников (VK, facebook и т.д.).
Исходники можно найти на Github: https://github.com/Vispiano/rss2telegram
Автор: Vispiano