Всем привет. Я думаю, что некоторые задавались вопросом о том, как написать свой небольшой парсер на Python. В данной статье я хочу рассмотреть достаточно простой случай - это парсинг данных уже с готового датасета stackexchange.com
. В дальнейшем уже можно будет работать с более сложными задачами. По факту, создание парсера можно разделить на два этапа: подготовка и кодинг.
Этап I. Подготовка
Для начала нам нужно создать папку для нашего парсера. В ней будет храниться наш код и файл с распарсенными данными. Лично я буду использовать программу Visual Studio Code, но особой разницы нет.
Добавляете созданную папку в рабочую область вашей программы и в ней создаете файл (например parser.py)
Открываете терминал и проверяете, что у вас выбран корректный интерпретатор и вы находитесь в своей рабочей папке (по дефолту всегда так, но на всякий случай). Дальше в терминале, используя pip
скачиваете библиотеки:
pip install requests
pip install tqdm
Важно чтобы данные библиотеки были установлены в вашу виртуальную среду venv
.
Библиотеки json
и time
являются встроенными в Python. Их устанавливать не нужно.
На этом первый этап закончен.
Этап II. Кодим
Открываем файл и начинаем кодить. Для начала нам нужно импортировать библиотеки:
import requests
import time
import json
from tqdm import tqdm
В данном случае библиотека requests
отправляет запросы используя путь запрошенный пользователем, time
используется для форматирования времени, json
для работы с .json
файлами. Библиотека tqdm
нужна для удобства и красоты использования. Она показывает прогресс бар и примерное оставшееся время.
Дальше мы пишем наши пути к серверу или сайту, откуда будем собирать инфу:
quest_url = "https://api.stackexchange.com/2.3/questions"
ans_url = "https://api.stackexchange.com/2.3/questions/{question_id}/answers"
В данном случае quest_url
идет в место где хранятся вопросы юзеров, ans_url
в место где хранятся ответы на них.
Напишем нашу основную функцию fetch_stackexchange_questions
, которая будет захватывать вопросы:
def fetch_stackexchange_questions(site="stackoverflow", page=1, pagesize=100):
params = {
"order": "desc",
"sort": "activity",
"site": site,
"pagesize": pagesize,
"page": page,
"filter": "withbody"
}
response = requests.get(quest_url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Я ошибся {response.status_code}")
return None
В params
у нас хранятся параметры соответственно:
-
site
- сайт из сетиStackExchange
(например "stackoverflow"). -
page
- номер страницы (по умолчанию 1). -
pagesize
- количество запросов на одной странице (по умолчанию 100 и лучше такое количество и использовать, потому что при увеличении количества можете упасть в ошибку 400. Короче лучше 100). -
filter
- наш фильтр (параметрwithbody
позволяет нам захватывать тело вопроса. Да, мы можем захватить не нужную нам инфу, но лучше взять избыточно и потом использовать нужную, чем чего-то недобрать).
Дальше выполняется GET-запрос, и, если статус ответа успешный (код 200), то данные возвращаются в формате JSON. Иначе выводится сообщение об ошибке по типу Я ошибся #номер ошибки
.
Напишем функцию fetch_answers_for_question
, которая получает список ответов для конкретного вопроса по его ID:
def fetch_answers_for_question(question_id, site="stackoverflow"):
params = {
"order": "desc",
"sort": "activity",
"site": site,
"filter": "withbody"
}
response = requests.get(ans_url.format(question_id=question_id), params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Я ошибся {question_id}: {response.status_code}")
return None
Выполняется аналогичный GET-запрос к API с подстановкой question_id
. Ответы возвращаются в формате JSON.
Напишем функцию parse_questions_with_answers
, которая обрабатывает вопросы и добавляет к ним соответствующие ответы.
def parse_questions_with_answers(data, site="stackoverflow"):
parsed_data = []
if "items" in data:
for item in tqdm(data["items"], desc="Parsing questions and answers"):
question = {
"question_id": item.get("question_id"),
"title": item.get("title"),
"body": item.get("body"),
"tags": item.get("tags"),
"link": item.get("link"),
"creation_date": time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(item.get("creation_date"))),
"score": item.get("score"),
"answers": []
}
answers_data = fetch_answers_for_question(item.get("question_id"), site)
if answers_data and "items" in answers_data:
for answer in answers_data["items"]:
answer_info = {
"answer_id": answer.get("answer_id"),
"body": answer.get("body"),
"creation_date": time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(answer.get("creation_date"))),
"score": answer.get("score"),
"is_accepted": answer.get("is_accepted")
}
question["answers"].append(answer_info)
parsed_data.append(question)
return parsed_data
Она проходит по списку вопросов, и для каждого вопроса добавляет метаданные, такие как заголовок, тело, теги, ссылка, дата создания и оценки. Дальше для каждого вопроса отправляется дополнительный запрос для получения ответов, которые затем добавляются к вопросу. Процесс обработки отображается с помощью tqdm
.
Напишем функцию save_to_json
, которая сохраняет полученные и обработанные данные в JSON-файл. Если такого файла в директории нет, то будет автоматически создан файл stackexchange_data_with_answers.json
.
def save_to_json(data, filename="stackexchange_data_with_answers.json"):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"Данные сохранены в {filename}")
Напишем основную логику программы:
if __name__ == "__main__":
site = "stackoverflow"
page = 1
pagesize = 100
questions_data = fetch_stackexchange_questions(site=site, page=page, pagesize=pagesize)
if questions_data:
parsed_questions = parse_questions_with_answers(questions_data, site)
save_to_json(parsed_questions, f"{site}_questions_with_answers.json")
В блоке if __name__ == "__main__"
выполняются шаги получения вопросов функцией fetch_stackexchange_questions
, парсинг вопросов и их ответов через parse_questions_with_answers
и сохранение данных в JSON-файл через save_to_json
.
Полный код:
import requests
import time
import json
from tqdm import tqdm
quest_url = "https://api.stackexchange.com/2.3/questions"
ans_url = "https://api.stackexchange.com/2.3/questions/{question_id}/answers"
def fetch_stackexchange_questions(site="stackoverflow", page=1, pagesize=100):
params = {
"order": "desc",
"sort": "activity",
"site": site,
"pagesize": pagesize,
"page": page,
"filter": "withbody"
}
response = requests.get(quest_url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Я ошибся {response.status_code}")
return None
def fetch_answers_for_question(question_id, site="stackoverflow"):
params = {
"order": "desc",
"sort": "activity",
"site": site,
"filter": "withbody"
}
response = requests.get(ans_url.format(question_id=question_id), params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Я ошибся {question_id}: {response.status_code}")
return None
def parse_questions_with_answers(data, site="stackoverflow"):
parsed_data = []
if "items" in data:
for item in tqdm(data["items"], desc="Parsing questions and answers"):
question = {
"question_id": item.get("question_id"),
"title": item.get("title"),
"body": item.get("body"),
"tags": item.get("tags"),
"link": item.get("link"),
"creation_date": time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(item.get("creation_date"))),
"score": item.get("score"),
"answers": []
}
answers_data = fetch_answers_for_question(item.get("question_id"), site)
if answers_data and "items" in answers_data:
for answer in answers_data["items"]:
answer_info = {
"answer_id": answer.get("answer_id"),
"body": answer.get("body"),
"creation_date": time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(answer.get("creation_date"))),
"score": answer.get("score"),
"is_accepted": answer.get("is_accepted")
}
question["answers"].append(answer_info)
parsed_data.append(question)
return parsed_data
def save_to_json(data, filename="stackexchange_data_with_answers.json"):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"Данные сохранены в {filename}")
if __name__ == "__main__":
site = "stackoverflow"
page = 1
pagesize = 100
questions_data = fetch_stackexchange_questions(site=site, page=page, pagesize=pagesize)
if questions_data:
parsed_questions = parse_questions_with_answers(questions_data, site)
save_to_json(parsed_questions, f"{site}_questions_with_answers.json")
Как по факту работает наш парсер?
Теперь нам нужно понять, что-же делает наш код:
Сначала код отправляет запрос к API StackExchange для получения списка вопросов. Потом для каждого вопроса отправляется еще один запрос для получения ответов. Дальше все данные структурируются и записываются в файл JSON.
Для чего в основном используется парсинг различной информации?
Парсинг информации может быть полезен в различных областях и для разных целей. Приведу для вас несколько примеров:
-
Для анализа данных
-
Для конкурентного анализа
-
Для сбора информации во время исследований
-
Для создания контента на сайтах или блогах
-
Для автоматизации
-
Для пользовательской аналитики
-
Для исследования ниши
Полезная информация и доп материалы
В данной статье используются достаточно стандартные библиотеки, но все же не все с ними ознакомлены. Для этого предложу вам ознакомиться с данными статьями :
На этом всё. Буду рад видеть ваши комментарии.
Автор: EgorKl