Создаём свой реестр данных на основе ФГИС «Аршин». Часть 1 — добавляем данные в PostgreSQL и сокращаем размер БД

в 8:15, , рубрики: pandas, postgresql, sqlalchemy, АРШИН, обработка данных, реестр данных

Всем привет. Этот практический цикл статей рассчитан на начинающих. Я решил поделиться своим опытом создания реестра данных на основе государственного. Данные будут храниться в базе данных PostgreSQL, доступ к ним будет осуществляться через Fast API. В текущей статье займёмся загрузкой данных в базу и уменьшением её размеров.

Введение

Нам предстоит работать с данными о результатах поверки средств измерений, которые будем загружать из раздела сайта «Результаты поверок СИ» ФГИС АРШИН. Вкратце — каждый измерительный прибор из лаборатории должен считать с определенной точностью. Для проверки погрешностей измерительного прибора на соответствие допустимым пределам, он периодически отправляется в метрологический орган, который затем через «АРШИН» размещает информацию об устройстве и результаты поверки (пройдена или нет).

Вот пример данных, которые хранятся в реестре:

Результаты поверок СИ

Результаты поверок СИ

Создание и заполнение БД

На основе этих данных я создал базу данных "Habrdb" с таблицей "Info":

CREATE TABLE IF NOT EXISTS "Info"
(
    id bigint NOT NULL GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    "org_title" TEXT,
    "mit_number" TEXT,
    "mit_title" TEXT,
    "mi_number" TEXT,
    "mit_notation" TEXT,
    "mi_modification" TEXT,
    "verification_date" date,
    "valid_date" date,
    "result_docnum" TEXT,
    "applicability" boolean,
    "vri_id" bigint
);

АРШИН предоставляет API для доступа к данным, также периодически выгружает данные в zip архивах по этому пути. Необходимо скачать последний файл poverki.snapshot…zip, который содержит csv файлы с данными о поверке за всё время работы ФГИС. Этот архив имеет такую вложенность:

Структура вложенности архивов

Структура вложенности архивов

В скачанном архиве есть 3 папки, в которых лежит около 30 небольших архивов, каждый из которых содержит один csv файл с информацией о поверенных устройствах за один день. Ещё в нём лежит большой архив poverki.snapshot, который содержит 3 папки, 30 небольших архивов за следующий месяц и большой архив poverki.snapshot, далее аналогично. Каждый csv файл зачастую содержит данные такого вида:

Отрывок данных из csv файла
Отрывок данных из csv файла

Сразу скажу, что загружать данные без обработки не получится, так как во-первых нам пока не нужны колонки под номером 1, 12, 13 и 15 (эту колонку на скрине не видно, обычно она пустая или содержит ненужную нам информацию). Во-вторых, нередко при отправке данных на поверку задавались некорректные значения, впоследствии в базе такие строки удаляются или меняются.

К тому же в некоторых файлах наименования колонок не соответствуют содержимому, такие строки придётся отбрасывать. Приведу примеры невалидных строк (имена колонкам присвоены для удобства, 1 и 0 уже заменены на булевые True и False).

Пример некорректных значений

Пример некорректных значений

Проблемы:

  1. В этих строках из файла значения в колонке 'result_docnum' это тире, однако колонка должна содержать значения из букв и цифр.

  2. Здесь не заданы значения 'verification_date', при этом 'appliciability' = True. То есть даты поверки нет, но зато указано, что оно прошло поверку. Такие строки тоже со временем удаляются из Аршин. Ко всему прочему не стоит забывать, что поле date в postgresql имеет единый формат "YYYY-MM-DD", а в файлах могут попадаться значения с другим форматом.

Таким образом, придётся обрабатывать данные перед добавлением в БД, для этого используем библиотеки pandas и sqlalchemy. Так как данных слишком много возьмем только за 2023 и 2024 год.

Код загрузки данных в БД
import pandas as pd
import re, os, zipfile
from sqlalchemy import BigInteger, TEXT, Boolean, Date, create_engine, create_engine
from sqlalchemy.orm import sessionmaker

# Создаём сессию к БД
engine = create_engine('postgresql://ИМЯ:ПАРОЛЬ@localhost:5432/Habrdb')
Session = sessionmaker(bind=engine)
session = Session()


class DataLoader:

    def __unzip_file(self, zip_filename, extract_to):
        '''Распаковывает и удаляет zip архив'''
        with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
            # Удаляем распакованный архив
        os.remove(zip_filename)


    def __add_file_to_db(self, path):
        '''Читает содержимое файла, валидирует и записывает в БД'''

        def convert_to_date_with_limitation(xVal):
            '''Приводит дату к формату YYYY-MM-DD и отбрасывает года < 2023'''
            xVals = str(xVal).split(' ')
            for x in xVals:
                if len(x) == 10:
                    x = re.sub(r"[.,:;]", "-", x)
                    year, month, day = x.split("-")
                    if int(year) == 2023 or int(year) == 2024:
                        return f"{year}-{month}-{day}"
                else:
                    return pd.NaT  # Если формат не распознан, вернуть NaT (Not a Time)
                
        def convert_to_date(xVal):
            '''Приводит дату к формату YYYY-MM-DD'''
            xVals = str(xVal).split(' ')
            for x in xVals:
                if len(x) == 10:
                    x = re.sub(r"[.,:;]", "-", x)
                    year, month, day = x.split("-")
                    return f"{year}-{month}-{day}"
                else:
                    return pd.NaT

        names = ['Number', 'org_title', 'mit_number', 'mit_title', 'mit_notation', 'mi_modification',
                        'mi_number', 'verification_date', 'valid_date', 'result_docnum', 'appliciability',
                        'Date1', 'Date2', 'Pusto', 'vri_id', 'rabbish']   
        # Читаем файл частями по 100000 строк
        df = pd.read_csv(path, chunksize = 100000, on_bad_lines='skip', delimiter=';',quotechar='"', header=None, names=names)

        # Пробегаемся по частям
        for chunk in df:   
            # Удаляем ненужные столбцы 1, 12, 13, 15
            chunk = chunk.drop(columns=['Number', 'Date1', 'Date2', 'Pusto', 'rabbish'])

            # Валидируем даты
            chunk['verification_date'] = chunk['verification_date'].map(convert_to_date_with_limitation)
            chunk['valid_date'] = chunk['valid_date'].map(convert_to_date)
            
            # Будем брать только эти колонки
            allColumns = ['org_title', 'mit_number', 'mit_title', 'mit_notation', 'mi_modification',
            'mi_number', 'verification_date', 'valid_date', 'result_docnum', 'appliciability', 'vri_id']

            # Убираем двойные пробелы из всех колонок
            chunk[allColumns].replace('  ', ' ')

            # Явно задаём типы для столбцов, параметр errors='coerce' позволяет отбрасывать некорректные строки
            chunk['verification_date'] = pd.to_datetime(chunk['verification_date'], errors='coerce')
            chunk['valid_date'] = pd.to_datetime(chunk['valid_date'], errors='coerce')

            # Удаляем все NaT значения из столбца 'verification_date' 
            chunk = chunk.dropna(subset=['verification_date'])

            # Удаляем строки, где 'result_docnum' состоит только из дефисов и пробелов
            chunk = chunk[~chunk['result_docnum'].str.fullmatch(r'[-s]+', na=False)]

            # Преобразуем пригодность из 1 и 0 в True и False
            chunk['appliciability'] = chunk['appliciability'].astype('bool')
            # Преобразуем 'vri_id' из строк в bigInt 
            chunk['vri_id'] = pd.to_numeric(chunk['vri_id'], errors='coerce').astype('Int64')

            # Заменяем все значения NaN на None (чтобы они добавились в таблицу как null)
            chunk = chunk.where(pd.notnull(chunk), None)

            # Типы данных для временной таблицы
            infoTypes = {
            'mi_number': TEXT,
            'result_docnum': TEXT,
            'mit_number': TEXT,
            'mit_title': TEXT,
            'mit_notation': TEXT,
            'mi_modification': TEXT,
            'org_title': TEXT,
            'verification_date': Date,
            'valid_date': Date,
            'vri_id': BigInteger,
            'appliciability': Boolean
            }
            # Добавляем часть в таблицу
            chunk.to_sql('Info', engine, if_exists='append', index=False, dtype=infoTypes)


    def extract_and_add_to_db_old_files(self, rootPath):
        """ Разархивирует все вложенные архивы, добавляет в БД и удаляет все вложенные в них файлы"""
        
        for z, dirs, files in os.walk(rootPath):
            # Проверяем содержит ли папка файлы
            for filename in files:
                fileSpec = os.path.join(z, filename)
                
                # Если это архив
                if filename.endswith('.zip'):
                    # Проверяем на наличие 'snapshot' в имени файла
                    if 'snapshot' in fileSpec.split('\')[-1]:
                        self.__unzip_file(fileSpec, self.rootPath)
                        # Рекурсия для новых файлов внутри snapshot
                        if self.extract_and_add_to_db_old_files(self.rootPath) == 0:
                            # Удаляем пустые папки
                            self.__remove_empty_dirs(self.rootPath)
                            return 0
                    else:
                        # Распаковываем архив
                        self.__unzip_file(fileSpec, z)
                    
                    # Добавляем данные из файла в БД
                    self.__add_file_to_db(fileSpec[:-4])
                    print(f'Файл {filename[:-4]} добавлен в БД')
                    # Удаляем файл
                    os.remove(fileSpec[:-4])
                
                # Если это разархивированный файл
                elif filename.endswith('.csv'):
                    self.__add_file_to_db(fileSpec)
                    print(f'Файл {filename} добавлен в БД')
                    os.remove(fileSpec)
        return 0
    
    def __remove_empty_dirs(self, rootPath):
        """ Рекурсивно удаляет все пустые папки, начиная с самой глубокой"""
        for dirpath, dirnames, filenames in os.walk(rootPath, topdown=False):  # topdown=False позволяет идти с самого конца
            # Удаляем папку, если в ней нет файлов и папок
            if not dirnames and not filenames:
                os.rmdir(dirpath)
                print(f'Удалена пустая папка: {dirpath}')
                # Выходим, если наткнулись на родительскую папку в которой лежали остальные
                if dirpath == self.rootPath:
                    return 0
                self.__remove_empty_dirs(self.rootPath)

    def Main(self):
        self.rootPath = 'ПОЛНЫЙ_ПУТЬ_К_РАЗАРХИВИРОВАННОЙ_ПАПКЕ'
        self.extract_and_add_to_db_old_files(self.rootPath)

# Создаём экземпляр класса
ekz = DataLoader()
# Вызываем функцию, которая является точкой входа 
ekz.Main()

Посмотрим сколько весит наша база, у меня получилось 38 ГБ.

SELECT pg_size_pretty(pg_database_size('Habrdb');
Отрывок данных из таблицы "Info"

Отрывок данных из таблицы "Info"

Сокращение размера базы данных

Можно заметить, что строки в колонках "mit_title" (наименование типа устройства), "org_title" (поверяющая организация), "mit_number" (регистрационный номер устройства), "mit_notaion" (обозначение типа), "mi_modification" (модификация типа) бывают длинными и могут повторяться. Даже без анализа таблицы можно заметить - строки в колонке "mit_title" самые длинные, поэтому начнём с неё.

Создадим таблицу, в которую закинем ненулевые уникальные значения из "mit_title"

CREATE TABLE IF NOT EXISTS "UniqueMitTitles"
(
    id integer NOT NULL GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    "mit_title" TEXT NOT NULL
);

Добавляем туда уникальные значения из столбца "mit_title" таблицы "Info"

INSERT INTO "UniqueMitTitles" ("mit_title")
SELECT DISTINCT "mit_title"
FROM "Info"
WHERE COALESCE("mit_title") IS NOT NULL;

Создаём в "Info" колонку, которая будет хранить идентификаторы из базы "UniqueMitTitles"

ALTER TABLE "Info" ADD COLUMN "mit_titleId" INT;

Добавляем туда соответствующие идентификаторы из уникальной таблицы

UPDATE "Info" SET "mit_titleId" = "UniqueMitTitles".id
FROM "UniqueMitTitles"
WHERE "Info"."mit_title" = "UniqueMitTitles"."mit_title";

Удаляем из основной таблицы лишнюю колонку

ALTER TABLE "Info" DROP COLUMN "mit_title";

Сразу скажу, что в "Info" много текстовых строк внушительного размера. В PostgreSQL строки (тип данных с переменной длинной), размер которых в сжатом виде превышает 2 Кбайта, нарезаются на части, сжимаются и хранятся в кортежах специальной TOAST таблицы. Эта таблица скрыта от пользователя, она прикрепляется к основной и наполняется автоматически. Нам важно знать лишь только то, что такие таблицы могут сильно расти в размерах, так как при UPDATE основной таблицы в TOAST добавляются новые кортежи, а старые (dead tuples) не удаляются и их приходится чистить с помощью команды VACUUM. А вообще TOAST это механизм и данные сжимаются не всегда, об этом можно почитать здесь и здесь. Самое время посмотреть сколько весит таблица "Info" и что там с TOAST

SELECT l.metric, l.nr AS bytes
     , CASE WHEN is_size THEN pg_size_pretty(nr) END AS bytes_pretty
     , CASE WHEN is_size THEN nr / NULLIF(x.ct, 0) END AS bytes_per_row
FROM  (
SELECT min(tableoid)        AS tbl      -- = 'public.tbl'::regclass::oid
     , count(*)             AS ct
     , sum(length(t::text)) AS txt_len  -- length in characters
   FROM "Info" t
   ) x
CROSS  JOIN LATERAL (
   VALUES
     (true , 'core_relation_size'               , pg_relation_size(tbl))
   , (true , 'free_space_map'                   , pg_relation_size(tbl, 'fsm'))
   , (true , 'total_size_incl_toast_and_indexes', pg_total_relation_size(tbl))
   , (true , 'live_rows_in_text_representation' , txt_len)
   , (false, 'row_count'                        , ct)
   , (false, 'live_tuples'                      , pg_stat_get_live_tuples(tbl))
   , (false, 'dead_tuples'                      , pg_stat_get_dead_tuples(tbl))
   ) l(is_size, metric, nr);
Информация о таблице после UPDATE

Информация о таблице после UPDATE
  • core_relation_size — размер основной таблицы без учета индексов и метаданных.

  • free_space_map — размер карты свободного пространства, скажет о свободном пространстве в страницах таблицы, которое образуется в результате частых вставок, удалений и обновлений.

  • total_size_incl_toast_and_indexes — общий размер, включая TOAST и индексы. Индексов мы не создавали, поэтому это значение примерно равно размеру TOAST.

  • live_rows_in_text_representation — размер "живых" текстовых строк, постепенно он будет уменьшаться за счет замены строк на идентификаторы.

  • row_count — общее количество строк в таблице.

  • live_tuples — количество "живых" кортежей (строк) в таблице.

  • dead_tuples — количество "мертвых" кортежей (строк) в таблице, они то и занимают внушительную часть пространства

Теперь таблица весит целых 77 ГБ! Нужно очистить мёртвые кортежи, как ранее отмечалось для этого используется команда VACUUM. Есть 2 варианта: подождать несколько часов пока не выполнится AUTOVACUUM или выполнить VACUUM вручную. Автоочистку придется ждать так долго если используется конфигурация postgresql по умолчанию, которая содержит специальные таймеры для задержки и уменьшения интенсивности очистки, это сделано для снижения нагрузки на базу данных.

Следить за процессом очистки можно с помощью pg_stat_progress_vacuum, на моем сервере уже через минуту автоматически запустился процесс очистки

SELECT * FROM pg_stat_progress_vacuum;
Данные о процессе VACUUM

Данные о процессе VACUUM

Более подробную информацию о фазах VACUUM и этих столбцах можно найти здесь

Мы видим, что сейчас VACUUM на стадии сканирования кучи и пока просканировано всего 10%, затем наступит фаза очистки индексов (vacuuming indexes) и только потом стадия очистки кучи (vacuuming heap). В общем вместо многочасового ожидания или изменения конфигурации очистим мёртвые кортежи и записи после DROP вручную

VACUUM;
Как видите dead_tuples=0

Как видите dead_tuples = 0

Так как идентификаторы из колонки "mit_titleId" являются связующим звеном между двумя таблицами, то создаём внешний ключ на колонку из уникальной таблицы. При удалении строк в "UniqueMitTitles" в зависимой таблице "Info" удалятся строки с таким Id.

ALTER TABLE "Info" ADD CONSTRAINT "Info_mit_titleId_fkey" FOREIGN KEY ("mit_titleId")
REFERENCES "UniqueMitTitles" ("id") ON DELETE CASCADE;

Далее повторяем те же шаги для оставшихся четырёх колонок "mit_notation", "org_title", "mit_number", "mi_modification" (создаем таблицы "UniqueMitNotations", "UniqueOrgTitles", "UniqueMitNumbers" и "UniqueMiModifications").

Отрывок данных из "Info" после всех запросов

Отрывок данных из "Info" после всех запросов

После всех операций выполняем очистку с перезаписью (VACUUM FULL), при такой очистке старая таблица не будет удаляться до полной записи новой, поэтому на вашем диске должно быть свободное пространство для новой таблицы. После полной очистки база весит 17 ГБ, что меньше в 2,2 раза изначального размера в 38 ГБ.

Минусы данного подхода сокращения размера:

  1. Необходимость использования JOIN уникальных таблиц для получения всех данных.

  2. Сложность добавления данных в таблицу.

  3. Сложность поддержки - теперь придется следить за 6-ю таблицами, а не за одной.

Если незначительное сокращение скорости выборки для вас не критично, то эти минусы с лихвой окупаются малым размером базы.

В следующих статьях будем ускорять поиск по таблицам и создавать API.

Автор: AlexOnegin

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js