Пишем numpy-модуль для ускорения математических функций с помощью SIMD-инструкций

в 8:21, , рубрики: python, scipy, simd, аппаратное ускорение, высокая производительность, Программирование, метки: , ,

Пакеты numpy и scipy предоставляют прекрасные возможности для быстрого решения различных вычислительных задач. Концепция универсальных функций (ufunc), работающих как со скалярными значениями, так и с массивами различных размерностей, позволяет получить высокую производительность при сохранении присущей языку Python простоты и элегантности. Универсальная функция обычно используются для выполнения одной операции над большим массивом данных, что идеально подходит для оптимизации с помощью SIMD-инструкций, однако мне не удалось найти готового решения, основанного на свободном программном обеспечении и позволяющего использовать SIMD для вычисления в numpy таких математических функций, как синус, косинус и экспонента. Реализовывать алгоритмы вычисления этих функций с нуля совсем не хотелось, но к счастью в интернете нашлось несколько свободных библиотек на языке «С». Преодолев лень сомнения, я решил написать собственный numpy-модуль, предлагающий универсальные функции для синуса, косинуса и экспоненты. За подробностями и результатами тестов добро пожаловать под кат.

Немного о SIMD-инструкциях

SIMD-инструкции позволяют одновременно производить один и тот же набор операций над несколькими числами, записанными в одном регистре. Таким образом, за один такт можно обрабатывать сразу несколько чисел и потенциально увеличить производительность в несколько раз.
Например, набор SIMD-инструкций Advanced Vector Extensions (AVX) позволяет выполнять операции с 256-битными регистрами, каждый из которых может включать восемь 32-разрядных чисел с плавающей точкой (числа одинарной точности), либо четыре 64-разрядных (числа двойной точности). Набор операций довольно скромный, в основном это сложение, вычитание, умножение и деление. Точная и быстрая реализация тригонометрических функций с помощью этих операций — задача нетривиальная и, чтобы не изобретать велосипед, стоит использовать какую-нибудь готовую библиотеку. Помимо проприетарной Intel MKL(которая уже умеет работать с numpy) вариантов нашлось всего три (раз, два, три).
Первый вариант представляет собой заголовочный файл с очень скромным набором функций, практически без документации и тестов. Второй вариант — С++ библиотека vecmathlib, которая у меня почему-то упорно отказывалась компилироваться, несмотря на использование рекомендованного компилятора GCC-4.7. Вариант перспективный, но пока еще похоже сырой. Третий вариант — библиотеку SLEEF — удалось найти именно благодаря vecmathlib, которая использует его кодовую базу. Вариант мне сразу понравился простотой и ясностью кода, а также обилием тестов.

Небольшой тест для мотивации

Чтобы получить достаточную мотивацию для написания модуля, а заодно разобраться с использованием SLEEF, я решил сравнить скорость вычисления синуса на языке «C» при использовании SLEEF со стандартной math.h. Естественно, речь идет о поэлементном вычислении синуса для большого массива данных.
К сожалению, документации и примеров в SLEEF практически нет, но зато есть вполне понятно написанные тесты, так что разобраться с использованием библиотеки было не сложно. Исходный код SLEEF состоит из четырех директорий: java, purec, simd и tester. Кроме этого, там лежит файл README с кратким описанием библиотеки и общий Makefile, дергающий Makefile из перечисленных директорий. Меня естественно больше всего заинтересовала директория simd, в которой, как можно догадаться из названия, содержались оптимизированные с помощью SIMD-инструкций функции.
Из Makefile в директории simd понятно, что поддерживаются 4 варианта SIMD-инструкций: SSE2, AVX, AVX2 и FMA4. Прототипы функций определены в файле sleefsimd.h, а нужный набор инструкций выбирается при компиляции с помощью флагов -DENABLE_SSE2, -DENABLE_AVX, -DENABLE_AVX2 или -DENABLE_FMA4. Makefile собирает исполняемые файлы для тестирования функций с использованием каждого из наборов инструкций: iutsse2, iutavx, iutavx2 или iutfma4. Эти файлы вызываются из универсальной программы tester (из директории tester) и осуществляют выполнение получаемых от tester команд. Реализация команд находится в файле iut.c, откуда становится очевидным способ использования библиотеки.

Функция тестирования синуса из файла simd/iut.c исходного кода SLEEF

double xxsin(double d) { 
  double s[VECTLENDP];
  int i;
  for(i=0;i<VECTLENDP;i++) {
    s[i] = random()/(double)RAND_MAX*20000-10000;
  }
  int idx = random() & (VECTLENDP-1);
  s[idx] = d; 

  vdouble a = vloadu(s);
  a = xsin(a);
  vstoreu(s, a);

  return s[idx];
}

Для чисел двойной точности (double) нужно определить массив длиной VECTLENDP, заполнить аргументами интересующей нас функции и передать в функцию vloadu, которая скопирует их в необходимое для использования SIMD место и вернет значение типа vdouble. Значение vdouble мы передаем функции xsin, которая вычисляет значение синуса сразу для всех VECLENDP аргументов и возвращает опять же vdouble. Результат распаковывается в массив из double с помощью функции vstoreu.
Для желающих проверить SLEEF на своей машине привожу полный исходный код программы, которую написал для оценки потенциального ускорения от использования SIMD с помощью SLEEF.

Программа для оценки скорости вычисления синуса с помощью SLEEF

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>
#include "sleefsimd.h"

#define TESTSIZE (VECTLENDP*10000000)
double s[TESTSIZE];
double r1[TESTSIZE];
double r2[TESTSIZE];
#define COUNT 10
int main(int argc, char *argv[])
{
	int k, i;
    clock_t t1, t2;
    double time1, time2;
    double max, rmax;

    srandom(time(NULL));
	for(i = 0; i < TESTSIZE; i++) 
    {
		s[i] = random()/(double)RAND_MAX*20000-10000;
	}

	printf("Testing sin, %d valuesn", TESTSIZE*COUNT);
	t1 = clock();
	for(k = 0; k < COUNT; k++)
	{
		for(i = 0; i < TESTSIZE; i++) 
        {
			r1[i] = sin(s[i]);
		}
	}
    t2 = clock();
    time1 = (double)(t2 - t1)/CLOCKS_PER_SEC;
	printf("Finish sin, spent time = %lg secnn", time1);

	printf("Testing xsinn");
	t1 = clock();
	for(k = 0; k < COUNT; k++)
	{
		for(i = 0; i < TESTSIZE; i += VECTLENDP) 
        {
			vdouble a = vloadu(s+i);
			a = xsin(a);
			vstoreu(r2+i, a);
		}
	}
    t2 = clock();
    time2 = (double)(t2 - t1)/CLOCKS_PER_SEC;
	printf("Finish xsin, spent time = %lg secnn", time2);

	printf("Speed ratio: %lfn", time1/time2);

	max = r1[0] - r2[0];
    rmax = (r1[0] - r2[0])/r1[0];
	for(i = 0; i < TESTSIZE; i++) 
    {
		double delta = (r1[i] - r2[i]);
		if(abs(delta) > abs(max)) max = delta;
        delta = (r1[i] - r2[i])/r1[i];
		if(abs(delta) > abs(max)) rmax = delta;
	}

	printf("Max absolute delta: %lg, relative delta %lgn", max, rmax);
	return 0;
}

Самый продвинутый из поддерживаемых на моем компьютере набор команд — это AVX, поэтому я компилировал программу (записанную в файл simd/speedtest.c в исходниках SLEEF) следующей командой:

gcc -O3 -Wall -Wno-unused -Wno-attributes -DENABLE_AVX -mavx speedtest.c sleefsimddp.c sleefsimdsp.c -o speedtest -lm

Я ожидал ускорения примерно в 4 раза, но результат превзошел все мои ожидания:

Testing sin, 400000000 values
Finish sin, spent time = 14.95 sec

Testing xsin
Finish xsin, spent time = 1.31 sec

Speed ratio: 11.412214
Max absolute delta: 5.55112e-17, relative delta 1.58441e-16

Ускорение более чем в 10 раз, при относительной погрешности вычисления менее 2·10-16 (порядка точности самого double), на одном ядре процессора! Конечно, в реальном приложении ускорение будет меньше, но мотивации для написания своего numpy-модуля уже достаточно.

Пару слов об универсальных функциях

В numpy данные представляются в виде многомерных массивов. Универсальные функции поэлементно работают с массивами любой размерности, причем в случае нескольких параметров их размерность может не совпадать. Параметры универсальной функции сначала приводятся к одной размерности в соответствии со специальными правилами (это называется Broadcasting), а затем необходимые вычисления проводятся поэлементно. На выходе получается массив наибольшей из размерностей.
Например, одна и та же функция add (которая автоматически применяется при использовании оператора "+" для numpy-массивов) позволяет сложить как два числа или одномерных массива, так и добавить число к массиву или одномерный массив к двумерному.

Пример

>>> from numpy import array, add

>>> add(1, 2)
3

>>> add(array([1,2]), array([4,5])) # поэлементно складываем два массива одинаковой размерности
array([5, 7])

>>> add(array([1,2]), 1) # добавляем к одномерному массиву число (т.е. массив нулевой размерности)
array([2, 3])

>>> add(array([[1,2],[3,4]]), array([1,2])) # добавляем к двумерному массиву одномерный (построчно)
array([[2, 4],
       [4, 6]])

Более подробную информацию по numpy на английском можно найти в официальной документации, на русском — например здесь.

Пишем свой numpy-модуль с универсальной функцией и SIMD-инструкциями

У numpy и skipy довольно удобный API и неплохая документация, в которой для желающих написать собственный numpy-модуль с универсальной функцией есть соответствующий tutorial. Сначала мы пишем C-функцию, в линейном цикле вычисляющую значение интересующей нас математической функции от скалярного аргумента:

Фукнция для вычисления синуса в numpy-модуле

static void double_xsin(char **args, npy_intp *dimensions,
                            npy_intp* steps, void* data)
{
    npy_intp i;
    npy_intp n = dimensions[0];
    char *in = args[0], *out = args[1];
    npy_intp in_step = steps[0], out_step = steps[1];
    double tmp[VECTLENDP];
    vdouble a;
    int slow_n = n % VECTLENDP;
    if(in_step != sizeof(double) || out_step != sizeof(double))
        slow_n = n;
    for(i = 0; i < slow_n; i += VECTLENDP)
    {
        int j;
        for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
        {
            tmp[j] = *(double *)in;
            in += in_step;            
        }
        a = vloadu(tmp);
        a = xsin(a);
        vstoreu(tmp, a);
        for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
        {
            *(double *)out = tmp[j];
            out += out_step;
        }        
    }
    if(n > slow_n)
    {
        double *in_array = (double *)in;
        double *out_array = (double *)out;
        for(i = 0; i < n - slow_n; i += VECTLENDP)
        {
            a = vloadu(in_array + i);
        	a = xsin(a);
	        vstoreu(out_array + i, a);    
        }
    }
}

Указатели на входные и выходные данные numpy передает нам в массиве args. В нашем случае у функции один вход и один выход, поэтому адрес входных данных args[0], выходных — args[1]. Количество элементов передается в dimensions[0]. Для перемещения по входным данным нужно увеличивать указатель на steps[0], по выходным — на steps[1] (важно, что указатель имеет тип char, поскольку в steps указаны значения в байтах). К сожалению, мне не удалось найти в документации numpy утверждения, что значения в steps должны быть равны размерам соответствующих типов данных, хотя эксперимент показал, что на моей системе для массивов ненулевой размерности это правило выполняется. В случае его нарушения вычисления будут медленнее, поскольку потребуется дополнительное копирование элементов в массив tmp и из него.
Одна и та же универсальная функция в numpy может работать с различными типами данных, но для каждого типа данных пишется отдельная C-функция. При регистрации универсальной функции мы указываем поддерживаемые типы и для каждой комбинации входных и выходных типов передаем указатель на С-функцию, которая будет с этой комбинацией работать:

static PyUFuncGenericFunction funcs[] = {&double_xsin};
static char types[] = {NPY_DOUBLE, NPY_DOUBLE};
static void *data[] = {NULL};

В массиве types указываются как входные, так и выходные типы, поэтому он длиннее массивов funcs и data. Массив указателей data позволяет для каждой комбинации типов указать свой дополнительный параметр, который будет передан в C-фунцкию в качестве последнего аргумента void* data. В частности, это можно использовать для реализации разных универсальных функций с помощью одной C-функции.
Чтобы зарегистрировать нашу универсальную функцию, нужно вызвать PyUFunc_FromFuncAndData и передать ей описанные выше массивы (funcs, data и types), а также количество входных и выходных аргументов универсальной функции, количество поддерживаемых комбинаций типов, имя функции в модуле и строку документации.

Полный исходный код модуля

#include "Python.h"
#include "numpy/ndarraytypes.h"
#include "numpy/ufuncobject.h"
#include "numpy/npy_3kcompat.h"
#include "sleef/sleefsimd.h"

/* The loop definition must precede the PyMODINIT_FUNC. */
static void double_xsin(char **args, npy_intp *dimensions,
                            npy_intp* steps, void* data)
{
    npy_intp i;
    npy_intp n = dimensions[0];
    char *in = args[0], *out = args[1];
    npy_intp in_step = steps[0], out_step = steps[1];
    double tmp[VECTLENDP];
    vdouble a;
    int slow_n = n % VECTLENDP;
    if(in_step != sizeof(double) || out_step != sizeof(double))
        slow_n = n;
    for(i = 0; i < slow_n; i += VECTLENDP)
    {
        int j;
        for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
        {
            tmp[j] = *(double *)in;
            in += in_step;            
        }
        a = vloadu(tmp);
        a = xsin(a);
        vstoreu(tmp, a);
        for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
        {
            *(double *)out = tmp[j];
            out += out_step;
        }        
    }
    if(n > slow_n)
    {
        double *in_array = (double *)in;
        double *out_array = (double *)out;
        for(i = 0; i < n - slow_n; i += VECTLENDP)
        {
            a = vloadu(in_array + i);
        	a = xsin(a);
	        vstoreu(out_array + i, a);    
        }
    }
}

static PyMethodDef AvxmathMethods[] = {
        {NULL, NULL, 0, NULL}
};

static PyUFuncGenericFunction funcs[1] = {&double_xsin};
static char types[] = {NPY_DOUBLE, NPY_DOUBLE};
static void *data[] = {NULL};

void register_xsin(PyObject *module)
{
    PyObject *xsin, *d;
    import_array();
    import_umath();

    xsin = PyUFunc_FromFuncAndData(funcs, data, types, 1, 1, 1,
                                    PyUFunc_None, "sin",
                                    "AVX-accelerated sine calculation", 0);
    d = PyModule_GetDict(module);
    PyDict_SetItemString(d, "sin", xsin);
    Py_DECREF(xsin);
}

#if PY_VERSION_HEX >= 0x03000000
static struct PyModuleDef moduledef = {
    PyModuleDef_HEAD_INIT,
    "avxmath",
    NULL,
    -1,
    AvxmathMethods,
    NULL,
    NULL,
    NULL,
    NULL
};

PyMODINIT_FUNC PyInit_avxmath(void)
{
    PyObject *m;
    m = PyModule_Create(&moduledef);
    if (!m) {
        return NULL;
    }
    register_xsin(m);
    return m;
}
#else
PyMODINIT_FUNC initavxmath(void)
{
    PyObject *m;
    m = Py_InitModule("avxmath", AvxmathMethods);
    if (m == NULL) {
        return;
    }
    register_xsin(m);
}
#endif

Для сборки модуля я использовал стандартный setup.py из документации, заменив название модуля и добавив C-файлы библиотеки SLEEF, флаги компилятора и линковщика. Приведенный выше исходный код модуля я сохранил рядом с setup.py в файле с именем avxmath.c, директорию simd из исходного кода SLEEF переименовал в sleef и также положил рядом с setup.py.

Файл setup.py для сборки модуля avxmath

def configuration(parent_package='', top_path=None):
    import numpy
    from numpy.distutils.misc_util import Configuration

    config = Configuration('',
                           parent_package,
                           top_path)
    config.add_extension('avxmath', ['avxmath.c', 'sleef/sleefsimddp.c', 'sleef/sleefsimdsp.c'], 
                         extra_compile_args=['-O3', '-Wall', '-Wno-unused', '-Wno-attributes', '-DENABLE_AVX','-mavx'], 
                         extra_link_args=['-lm'])

    return config

if __name__ == "__main__":
    from numpy.distutils.core import setup
    setup(configuration=configuration)

Для компиляции без установки в систему нужно выполнить команду python setup.py build_ext --inplace, результатом успешного выполнения которой должен стать готовый модуль в файле avxmath.so. Теперь можно проверить работоспособность нашей функции. Запускаем python в той же директории, где находится файл avxmath.so, и проверяем:

>>> from numpy import array, pi
>>> import avxmath
>>> avxmath.sin(0)
0.0
>>> avxmath.sin(pi)
1.2246467991473532e-16
>>> avxmath.sin([0, pi/2, pi, 3*pi/2, 2*pi])
array([  0.00000000e+00,   1.00000000e+00,   1.22464680e-16,
        -1.00000000e+00,  -2.44929360e-16])
>>> 

Убедившись, что модуль avxmath импортируется и работает без ошибок, можно сделать небольшой тест производительности и точности новой функции.

Программа проверки функции sin модуля avxmath и результат ее выполнения

import numpy 
import avxmath
import time
from numpy import random, pi

COUNT=10

x = 2e4*random.random(40000000) - 1e4
t = time.clock()
for i in xrange(COUNT):
    y1 = numpy.sin(x)
duration1 = time.clock() - t
print "numpy.sin %f sec" % duration1

t = time.clock()
for i in xrange(COUNT):
    y2 = avxmath.sin(x)
duration2 = time.clock() - t
print "avxmath.sin %f sec" % duration2

delta = y2 - y1
rdelta = delta/y1
print "max absolute difference is %lg, relative %lg" % (
        delta[abs(delta).argmax()], rdelta[abs(rdelta).argmax()])
print "speedup is %lg" % (duration1/duration2)

numpy.sin 15.510000 sec
avxmath.sin 2.260000 sec
max absolute difference is 2.22045e-16, relative 2.63873e-16
speedup is 6.86283

Итак, мы получили ускорение более чем в 6 раз при точности вычислений не хуже 3·10-16! Заменив вызовы функции xsin на простое копирование памяти, нетрудно убедиться, что ускорение в 10 раз не получилось из-за того, что около 1 секунды из полученных нами 2.26 секунд выполнения ушло на накладные расходы. Аналогично, заменив функцию xsin на обычный синус из math.h, мы обнаружим, что времена вычислений с помощью avxmath.sin и numpy.sin в нашем тесте с высокой точностью совпадут.
Таким образом, с помощью SIMD инструкций можно добиться значительного ускорения выполняемых с помощью numpy и scipy вычислений, просто заменив импорты обычных функций на оптимизированные. Ну и конечно исходные коды несколько расширенного по сравнению с данной статьей модуля avxmath доступны на Github по ссылке.

Автор: nikolaynag

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js