Data science и качественный код

в 9:53, , рубрики: big data, data mining, data science, machine learning, python, машинное обучение

Обычно модели машинного обучения строят в jupyter-ноутбуках, код которых выглядит, мягко говоря, не очень — длинные простыни из лапши выражений и вызовов "на коленке" написанных функций. Понятно, что такой код почти невозможно поддерживать, поэтому каждый проект переписывается чуть ли не с нуля. А о внедрении этого кода в production даже подумать страшно.

Поэтому сегодня представляем на ваш строгий суд превью библиотеки по работе с датасетами и data science моделями. С ее помощью ваш код может выглядеть так:

my_dataset.
    load('/some/path').
    normalize().
    resize(shape=(256, 256, 256)).
    random_rotate(angle=(-30, 30))
    random_crop(shape=(64, 64, 64))

for i in range(MAX_ITER):
    batch = my_dataset.next_batch(BATCH_SIZE, shuffle=True)
    # обучаем модель, подавая ей батчи с данными    

В этой статье вы узнаете об основных классах и методах, которые помогут сделать ваш код простым, понятным и удобным.

Библиотека пока проходит финальную полировку и еще не выложена в открытый доступ.
Данная статья не является полной документацией, а лишь кратким описанием библиотеки и примеров ее использования.
Ваши комментарии помогут доработать библиотеку и включить в нее нужные вам возможности.

Датасет

Объем данных может быть очень большим, да и к началу обработки данных у вас может вообще не быть всех данных, например, если они поступают постепенно. Поэтому класс Dataset и не хранит в себе данные. Он включает в себя индекс — перечень элементов ваших данных (это могут быть идентификаторы или просто порядковые номера), а также Batch-класс, в котором определены методы работы с данными.

dataset = Dataset(index = some_index, batch_class=DataFrameBatch)

Основное назначение Dataset — формирование батчей.

batch = dataset.next_batch(BATCH_SIZE, shuffle=True)
# batch - объект класса DataFrameBatch,
# содержащий BATCH_SIZE элементов датасета

или можно вызвать генератор:

for batch in dataset.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True):
    # batch - объект класса DataFrameBatch

Батчи можно собирать строго упорядоченно или хаотически, итерировать бесконечно или сделать ровно 1 цикл по вашим данным. Можно даже на каждом шаге создавать батчи разного размера, если в вашей ситуации это имеет смысл.

Кроме итерирования в Dataset доступна еще одна полезная операция — cv_split — которая делит датасет на train, test и validation. И, что особенно удобно, каждый из них снова является датасетом.

dataset.cv_split([0.7, 0.2, 0.1])  # делим в пропорции 70 / 20 / 10
# а дальше все обычно
for i in range(MAX_ITER):
    batch = dataset.train.next_batch(BATCH_SIZE, shuffle=True)
    # обучаем модель, подавая ей батчи с данными

Индекс

Адресация элементов датасета осуществляется с помощью индекса. Это может быть набор идентификаторов (клиентов, транзакций, КТ-снимков) или просто порядковые номера (например, numpy.arange(N)).
Датасет может быть (почти) сколь угодно большим и не помещаться в оперативную память. Но это и не требуется. Ведь обработка данных выполняется батчами.

Создать индекс очень просто:

ds_index = DatasetIndex(sequence_of_item_ids)

В качестве последовательности может выступать список, numpy-массив, pandas.Series или любой другой итерируемый тип данных.

Когда исходные данные хранятся в отдельных файлах, то удобно строить индекс сразу из списка этих файлов:

ds_index = FilesIndex(path=‘/some/path/*.dat’, no_ext=True)

Тут элементами индекса станут имена файлов (без расширений) из заданной директории.

Бывает, что элементы датасета (например, 3-мерные КТ снимки) хранятся в отдельных директориях.

ds_index = FilesIndex(path=‘/ct_images_??/*’, dirs=True)

Так будет построен общий индекс всех поддиректорий из /ct_images_01, /ct_images_02, /ct_images_02 и т.д.
Файловый индекс помнит полные пути своих элементов. Поэтому позднее в методе load или save можно удобно получить путь index.get_fullpath(index_item).

Хотя чаще всего вам вообще не придется оперировать индексами — вся нужная работа выполняется внутри, а вы уже работаете только с батчем целиком.

Класс Batch

Вся логика хранения и методы обработки ваших данных определяются в Batch-классе. Давайте в качестве примера создадим класс для работы с КТ-снимками. Базовый класс Batch, потомком которого и станет наш CTImagesBatch, уже имеет атрибут index, который хранит список элементов данного батча, а также атрибут data, который инициализируется в None. И поскольку нам этого вполне хватает, то конструктор переопределять не будем.

Поэтому сразу перейдем к созданию action-метода load:

class CTImagesBatch(Batch):
    @action
    def load(self, src, fmt):
        if fmt == 'dicom':
            self.data = self._load_dicom(src)
        elif fmt == 'blosc':
            self.data = self._load_blosc(src)
        elif fmt == 'npz':
            self.data = self._load_npz(src)
        else:
            raise ValueError("Incorrect format")
       return self

Во-первых, метод обязательно должен предваряться декоратором @action (чуть позже вы узнаете зачем).
Во-вторых, он должен возвращать Batch-объект. Это может быть новый объект того же самого класса (в данном случае CTImagesBatch), или объект другого класса (но обязательно потомка Batch), или можно просто вернуть self.
Такой подход позволяет описывать цепочки действий над данными. Причем в ходе обработки данные могут меняться не только по содержанию, но и по формату и структуре.

Не будем сейчас тратить время на приватные методы _load_dicom, _load_blosc и _load_npz. Они умеют загружать данные из файлов определенного формата и возвращают 3-мерный numpy-массив — [размер батча, ширина изображения, высота изображения]. Главное, что именно здесь мы определили, как устроены данные каждого батча, и дальше будем работать с этим массивом.

Теперь напишем метод very_complicated_processing, который выполняет какую-то чрезвычайно сложную обработку снимков. Поскольку снимки в батче независимы друг от друга, то было бы удобно обрабатывать их параллельно.

class CTImagesBatch(Batch):
...
    @action
    @inbatch_parallel(target=’threads’)
    def very_complicated_processing(self, item, *args, **kwargs):
        # очень сложные вычисления...
        return processed_image_as_array

То есть метод следует писать так словно он обрабатывает один снимок, и индекс этого снимка передается в первом параметре.

Чтобы магия параллелизма сработала, метод необходимо обернуть декоратором, где задается технология параллелизма (процессы, потоки и т.д.), а также функции пре- и постпроцессинга, которые вызываются до и после распараллеливания.
Кстати, операции с интенсивным вводом-выводом лучше писать как async-методы и распараллеливать через target=’async’, что позволит значительно ускорить загрузку-выгрузку данных.

Понятно, что это все добавляет удобства при программировании, однако совсем не избавляет от "думания", нужен ли тут параллелизм, какой именно и не станет ли от этого хуже.

Когда все action-методы написаны, можно работать с батчем:

for i in range(MAX_ITER):
    batch = ct_images_dataset.next_batch(BATCH_SIZE, shuffle=True)
    processed_batch = batch.load("/some/path/", "dicom")
                           .very_complicated_processing(some_arg=some_value)
                           .resize(shape=(256, 256, 256))
                           .random_rotate(angle=(-30, 30))
                           .random_crop(shape=(64, 64, 64))

    # обучаем модель, подавая ей processed_batch с готовыми данными

Выглядит неплохо… но как-то это неправильно, что итерация по батчам смешана с обработкой данных. Да и цикл обучения модели хочется предельно сократить, чтобы там вообще ничего кроме next_batch не было.
В общем, надо вынести цепочку action-методов на уровень датасета.

Пайплайн

И это можно сделать. Мы ведь не зря городили все эти action-декораторы. В них скрывается хитрая магия переноса методов на уровень датасета. Поэтому просто пишите:

ct_images_pipeline = ct_images_dataset.pipeline().
                         .load("/some/path/", "dicom")
                         .very_complicated_processing(some_arg=some_value)
                         .resize(shape=(256, 256, 256)).
                         .random_rotate(angle=(-30, 30))
                         .random_crop(shape=(64, 64, 64))
# ...
for i in range(MAX_ITER):
    batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True)
    # обучаем модель, подавая ей батчи с обработанными данными    

Вам не нужно создавать новый класс-потомок Dataset и описывать в нем все эти методы. Они есть в соответствующих Batch-классах и отмечены декоратором @action — значит вы их можете смело вызывать словно они есть в классе Dataset.

Еще одна хитрость заключается в том, что при таком подходе все action-методы становятся "ленивыми" (lazy) и выполняются отложенно. То есть загрузка, обработка, ресайз и прочие действия выполняются для каждого батча в момент формирования этого батча при вызове next_batch.

И поскольку обработка каждого батча может занимать много времени, то было бы неплохо формировать батчи заблаговременно. Это особенно важно, если обучение модели выполняется на GPU, ведь тогда простой GPU в ожидании нового батча может запросто "съесть" все преимущества ее высокой производительности.

batch = ct_images_pipeline.next_batch(BATCH_SIZE, shuffle=True, prefetch=3)

Параметр prefetch указывает, что надо параллельно считать 3 батча. Дополнительно можно указать технологию распараллеливания (процессы, потоки).

Объединяем датасеты

В реальных задачах машинного обучения вам редко придется иметь дело с единственным датасетом. Чаще всего у вас будет как минимум два набора данных: X и Y. Например, данные о параметрах домов и данные о их стоимости. В задачах компьютерного зрения кроме самих изображений еще есть метки классов, сегментирующие маски и bounding box’ы.
В общем, полезно уметь формировать параллельные батчи из нескольких датасетов. И для этого вы можете выполнить операцию join или создать JointDataset.

JointDataset

Если вам нужна лишь параллельная итерация по батчам, то удобнее будет создать единый датасет:

joint_dataset = JointDataset((ds_X, ds_Y))

Если ds_X и ds_Y основаны не на одном и том же индексе, то важно, чтобы индексы были одинаковой длины и одинаково упорядочены, то есть значение ds_Y[i] соответствовало значению ds_X[i]. В этом случае создание датасета будет выглядеть немного иначе:

joint_dataset = JointDataset((ds_X, ds_Y), align=’order’)

А дальше все происходит совершенно стандартным образом:

for i in range(MAX_ITER):
    batch_X, batch_Y = joint_dataset.next_batch(BATCH_SIZE, shuffle=True)

Только теперь next_batch возвращает не один батч, а tuple с батчами из каждого датасета.

Естественно, JointDataset можно состоять и из пайплайнов:

pl_images = ct_images_ds.pipeline()
               .load(‘/some/path’, ‘dicom’)
               .hu_normalize()
               .resize(shape=(256,256,256))
               .segment_lungs()
pl_labels = labels_ds.pipeline()
               .load(‘/other/path’, ‘csv’)
               .apply(lambda x: (x[‘diagnosis’] == ‘C’).astype(‘int’))

full_ds = JointDataset((pl_images, pl_labels), align=’same’)
for i in range(MAX_ITER):
    images_batch, labels_batch = full_ds.next_batch(BATCH_SIZE, shuffle=True)
    # и теперь обучаем нейросеть, подавая в нее изображения и метки классов

И поскольку компонентами датасета являются пайплайны, то загрузка и обработка изображений и меток запускается лишь при вызове next_batch. То есть все вычисления выполняются и батч формируется только тогда, когда он нужен.

Операция join

Однако бывают и иные ситуации, когда нужно выполнить операцию с датасетом, применяя к нему данные из другого датасета.

Это лучше продемонстрировать на примере с КТ-снимками. Загружаем координаты и размеры раковых новообразований и формируем из них 3-мерные маски.

pl_masks = nodules_ds.pipeline()
                .load(‘/other/path’, ‘csv’)
                .calculate_3d_masks()

Загружаем КТ-снимки и применяем к ним маски, чтобы выделить только раковые области.

pl_images = ct_images_ds.pipeline().
                .load(‘/some/path’, ‘dicom’)
                .hu_normalize()
                .resize(shape=(256, 256, 256))
                .join(pl_masks)
                .apply_masks(op=’mult’)

В join вы указываете датасет. Благодаря чему в следующий action-метод (в данном примере в apply_masks) в качестве первого аргумента будут передаваться батчи из этого датасета. И не какие попало батчи, а ровно те, которые и нужны. Например, если текущий батч из ct_images_ds содержит снимки 117, 234, 186 и 14, то и присоединяемый батч с масками также будет относиться к снимкам 117, 234, 186 и 14.

Естественно, метод apply_masks должен быть написан с учетом данного аргумента, ведь его можно передать и явно, без предварительного join'а. Причем в action-методе можно уже не задумываться об индексах и идентификаторах элементов батча — вы просто к массиву снимков применяете массив масок.

И снова отмечу, что никакие загрузки и вычисления, ни с изображениями, ни с масками не будут запущены, пока вы не вызовете pl_images.next_batch

Собираем все вместе

Итак, посмотрим как будет выглядет полный workflow data science проекта.

  1. Создаем индекс и датасет
    ct_images_index = FilesIndex(path=‘/ct_images_??/*’, dirs=True)
    ct_images_dataset = Dataset(index = ct_images_index, batch_class=CTImagesBatch)
  2. Выполняем препроцессинг и сохраняем обработанные снимки

    ct_images_dataset.pipeline()
       .load(None, 'dicom')     # загружаем данные в формате dicom по путям из индекса
       .hu_normalize()
       .resize(shape=(256, 256, 256))
       .segment_lungs()
       .save('/preprocessed/images', 'blosc')
       .run(BATCH_SIZE, shuffle=False, one_pass=True)

  3. Описываем подготовку и аугментацию данных для модели

    ct_preprocessed_index = FilesIndex(path='/preprocessed/images/*')
    ct_preprocessed_dataset = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch)
    #        
    ct_images_pipeline = ct_preprocessed_dataset.pipeline()
         .load(None, 'blosc')
         .split_to_patches(shape=(64, 64, 64))
    #        
    ct_masks_ds = Dataset(index = ct_preprocessed_index, batch_class=CTImagesBatch)
    ct_masks_pipeline = ct_masks_ds.pipeline().
         .load('/preprocessed/masks', 'blosc')
         .split_to_patches(shape=(64, 64, 64))
    #       
    full_ds = JointDataset((ct_images_pipeline, ct_masks_pipeline))

  4. Формируем тренировочные батчи и обучаем модель

    full_ds.cv_split([0.8, 0.2])
    for i in range(MAX_ITER):
     images, masks = full_ds.train.next_batch(BATCH_SIZE, shuffle=True)
     # обучаем модель, подавая в нее снимки и маски

  5. Проверяем качество модели
    for images, masks in full_ds.test.gen_batch(BATCH_SIZE, shuffle=False, one_pass=True):
     # рассчитываем метрики качества модели

Вот такая вот удобная библиотека, которая помогает значительно быстрее разрабатывать понятный код высокого качества, повторно использовать ранее созданные модели со сложным препроцессингом данных и даже разрабатывать production-ready системы.

А теперь вопрос: что еще стоит добавить в библиотеку? чего вам остро не хватает при работе с данными и моделями?

Автор: Roman_Kh

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js