Пакеты numpy и scipy предоставляют прекрасные возможности для быстрого решения различных вычислительных задач. Концепция универсальных функций (ufunc), работающих как со скалярными значениями, так и с массивами различных размерностей, позволяет получить высокую производительность при сохранении присущей языку Python простоты и элегантности. Универсальная функция обычно используются для выполнения одной операции над большим массивом данных, что идеально подходит для оптимизации с помощью SIMD-инструкций, однако мне не удалось найти готового решения, основанного на свободном программном обеспечении и позволяющего использовать SIMD для вычисления в numpy таких математических функций, как синус, косинус и экспонента. Реализовывать алгоритмы вычисления этих функций с нуля совсем не хотелось, но к счастью в интернете нашлось несколько свободных библиотек на языке «С». Преодолев лень сомнения, я решил написать собственный numpy-модуль, предлагающий универсальные функции для синуса, косинуса и экспоненты. За подробностями и результатами тестов добро пожаловать под кат.
Немного о SIMD-инструкциях
SIMD-инструкции позволяют одновременно производить один и тот же набор операций над несколькими числами, записанными в одном регистре. Таким образом, за один такт можно обрабатывать сразу несколько чисел и потенциально увеличить производительность в несколько раз.
Например, набор SIMD-инструкций Advanced Vector Extensions (AVX) позволяет выполнять операции с 256-битными регистрами, каждый из которых может включать восемь 32-разрядных чисел с плавающей точкой (числа одинарной точности), либо четыре 64-разрядных (числа двойной точности). Набор операций довольно скромный, в основном это сложение, вычитание, умножение и деление. Точная и быстрая реализация тригонометрических функций с помощью этих операций — задача нетривиальная и, чтобы не изобретать велосипед, стоит использовать какую-нибудь готовую библиотеку. Помимо проприетарной Intel MKL(которая уже умеет работать с numpy) вариантов нашлось всего три (раз, два, три).
Первый вариант представляет собой заголовочный файл с очень скромным набором функций, практически без документации и тестов. Второй вариант — С++ библиотека vecmathlib, которая у меня почему-то упорно отказывалась компилироваться, несмотря на использование рекомендованного компилятора GCC-4.7. Вариант перспективный, но пока еще похоже сырой. Третий вариант — библиотеку SLEEF — удалось найти именно благодаря vecmathlib, которая использует его кодовую базу. Вариант мне сразу понравился простотой и ясностью кода, а также обилием тестов.
Небольшой тест для мотивации
Чтобы получить достаточную мотивацию для написания модуля, а заодно разобраться с использованием SLEEF, я решил сравнить скорость вычисления синуса на языке «C» при использовании SLEEF со стандартной math.h
. Естественно, речь идет о поэлементном вычислении синуса для большого массива данных.
К сожалению, документации и примеров в SLEEF практически нет, но зато есть вполне понятно написанные тесты, так что разобраться с использованием библиотеки было не сложно. Исходный код SLEEF состоит из четырех директорий: java
, purec
, simd
и tester
. Кроме этого, там лежит файл README с кратким описанием библиотеки и общий Makefile, дергающий Makefile из перечисленных директорий. Меня естественно больше всего заинтересовала директория simd
, в которой, как можно догадаться из названия, содержались оптимизированные с помощью SIMD-инструкций функции.
Из Makefile в директории simd
понятно, что поддерживаются 4 варианта SIMD-инструкций: SSE2, AVX, AVX2 и FMA4. Прототипы функций определены в файле sleefsimd.h
, а нужный набор инструкций выбирается при компиляции с помощью флагов -DENABLE_SSE2
, -DENABLE_AVX
, -DENABLE_AVX2
или -DENABLE_FMA4
. Makefile собирает исполняемые файлы для тестирования функций с использованием каждого из наборов инструкций: iutsse2
, iutavx
, iutavx2
или iutfma4
. Эти файлы вызываются из универсальной программы tester (из директории tester
) и осуществляют выполнение получаемых от tester команд. Реализация команд находится в файле iut.c
, откуда становится очевидным способ использования библиотеки.
double xxsin(double d) {
double s[VECTLENDP];
int i;
for(i=0;i<VECTLENDP;i++) {
s[i] = random()/(double)RAND_MAX*20000-10000;
}
int idx = random() & (VECTLENDP-1);
s[idx] = d;
vdouble a = vloadu(s);
a = xsin(a);
vstoreu(s, a);
return s[idx];
}
Для чисел двойной точности (double
) нужно определить массив длиной VECTLENDP
, заполнить аргументами интересующей нас функции и передать в функцию vloadu
, которая скопирует их в необходимое для использования SIMD место и вернет значение типа vdouble
. Значение vdouble
мы передаем функции xsin
, которая вычисляет значение синуса сразу для всех VECLENDP
аргументов и возвращает опять же vdouble
. Результат распаковывается в массив из double
с помощью функции vstoreu
.
Для желающих проверить SLEEF на своей машине привожу полный исходный код программы, которую написал для оценки потенциального ускорения от использования SIMD с помощью SLEEF.
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>
#include "sleefsimd.h"
#define TESTSIZE (VECTLENDP*10000000)
double s[TESTSIZE];
double r1[TESTSIZE];
double r2[TESTSIZE];
#define COUNT 10
int main(int argc, char *argv[])
{
int k, i;
clock_t t1, t2;
double time1, time2;
double max, rmax;
srandom(time(NULL));
for(i = 0; i < TESTSIZE; i++)
{
s[i] = random()/(double)RAND_MAX*20000-10000;
}
printf("Testing sin, %d valuesn", TESTSIZE*COUNT);
t1 = clock();
for(k = 0; k < COUNT; k++)
{
for(i = 0; i < TESTSIZE; i++)
{
r1[i] = sin(s[i]);
}
}
t2 = clock();
time1 = (double)(t2 - t1)/CLOCKS_PER_SEC;
printf("Finish sin, spent time = %lg secnn", time1);
printf("Testing xsinn");
t1 = clock();
for(k = 0; k < COUNT; k++)
{
for(i = 0; i < TESTSIZE; i += VECTLENDP)
{
vdouble a = vloadu(s+i);
a = xsin(a);
vstoreu(r2+i, a);
}
}
t2 = clock();
time2 = (double)(t2 - t1)/CLOCKS_PER_SEC;
printf("Finish xsin, spent time = %lg secnn", time2);
printf("Speed ratio: %lfn", time1/time2);
max = r1[0] - r2[0];
rmax = (r1[0] - r2[0])/r1[0];
for(i = 0; i < TESTSIZE; i++)
{
double delta = (r1[i] - r2[i]);
if(abs(delta) > abs(max)) max = delta;
delta = (r1[i] - r2[i])/r1[i];
if(abs(delta) > abs(max)) rmax = delta;
}
printf("Max absolute delta: %lg, relative delta %lgn", max, rmax);
return 0;
}
Самый продвинутый из поддерживаемых на моем компьютере набор команд — это AVX, поэтому я компилировал программу (записанную в файл simd/speedtest.c
в исходниках SLEEF) следующей командой:
gcc -O3 -Wall -Wno-unused -Wno-attributes -DENABLE_AVX -mavx speedtest.c sleefsimddp.c sleefsimdsp.c -o speedtest -lm
Я ожидал ускорения примерно в 4 раза, но результат превзошел все мои ожидания:
Testing sin, 400000000 values
Finish sin, spent time = 14.95 sec
Testing xsin
Finish xsin, spent time = 1.31 sec
Speed ratio: 11.412214
Max absolute delta: 5.55112e-17, relative delta 1.58441e-16
Ускорение более чем в 10 раз, при относительной погрешности вычисления менее 2·10-16 (порядка точности самого double
), на одном ядре процессора! Конечно, в реальном приложении ускорение будет меньше, но мотивации для написания своего numpy-модуля уже достаточно.
Пару слов об универсальных функциях
В numpy данные представляются в виде многомерных массивов. Универсальные функции поэлементно работают с массивами любой размерности, причем в случае нескольких параметров их размерность может не совпадать. Параметры универсальной функции сначала приводятся к одной размерности в соответствии со специальными правилами (это называется Broadcasting), а затем необходимые вычисления проводятся поэлементно. На выходе получается массив наибольшей из размерностей.
Например, одна и та же функция add (которая автоматически применяется при использовании оператора "+" для numpy-массивов) позволяет сложить как два числа или одномерных массива, так и добавить число к массиву или одномерный массив к двумерному.
>>> from numpy import array, add
>>> add(1, 2)
3
>>> add(array([1,2]), array([4,5])) # поэлементно складываем два массива одинаковой размерности
array([5, 7])
>>> add(array([1,2]), 1) # добавляем к одномерному массиву число (т.е. массив нулевой размерности)
array([2, 3])
>>> add(array([[1,2],[3,4]]), array([1,2])) # добавляем к двумерному массиву одномерный (построчно)
array([[2, 4],
[4, 6]])
Более подробную информацию по numpy на английском можно найти в официальной документации, на русском — например здесь.
Пишем свой numpy-модуль с универсальной функцией и SIMD-инструкциями
У numpy и skipy довольно удобный API и неплохая документация, в которой для желающих написать собственный numpy-модуль с универсальной функцией есть соответствующий tutorial. Сначала мы пишем C-функцию, в линейном цикле вычисляющую значение интересующей нас математической функции от скалярного аргумента:
static void double_xsin(char **args, npy_intp *dimensions,
npy_intp* steps, void* data)
{
npy_intp i;
npy_intp n = dimensions[0];
char *in = args[0], *out = args[1];
npy_intp in_step = steps[0], out_step = steps[1];
double tmp[VECTLENDP];
vdouble a;
int slow_n = n % VECTLENDP;
if(in_step != sizeof(double) || out_step != sizeof(double))
slow_n = n;
for(i = 0; i < slow_n; i += VECTLENDP)
{
int j;
for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
{
tmp[j] = *(double *)in;
in += in_step;
}
a = vloadu(tmp);
a = xsin(a);
vstoreu(tmp, a);
for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
{
*(double *)out = tmp[j];
out += out_step;
}
}
if(n > slow_n)
{
double *in_array = (double *)in;
double *out_array = (double *)out;
for(i = 0; i < n - slow_n; i += VECTLENDP)
{
a = vloadu(in_array + i);
a = xsin(a);
vstoreu(out_array + i, a);
}
}
}
Указатели на входные и выходные данные numpy передает нам в массиве args
. В нашем случае у функции один вход и один выход, поэтому адрес входных данных args[0]
, выходных — args[1]
. Количество элементов передается в dimensions[0]
. Для перемещения по входным данным нужно увеличивать указатель на steps[0]
, по выходным — на steps[1]
(важно, что указатель имеет тип char
, поскольку в steps
указаны значения в байтах). К сожалению, мне не удалось найти в документации numpy утверждения, что значения в steps
должны быть равны размерам соответствующих типов данных, хотя эксперимент показал, что на моей системе для массивов ненулевой размерности это правило выполняется. В случае его нарушения вычисления будут медленнее, поскольку потребуется дополнительное копирование элементов в массив tmp
и из него.
Одна и та же универсальная функция в numpy может работать с различными типами данных, но для каждого типа данных пишется отдельная C-функция. При регистрации универсальной функции мы указываем поддерживаемые типы и для каждой комбинации входных и выходных типов передаем указатель на С-функцию, которая будет с этой комбинацией работать:
static PyUFuncGenericFunction funcs[] = {&double_xsin};
static char types[] = {NPY_DOUBLE, NPY_DOUBLE};
static void *data[] = {NULL};
В массиве types
указываются как входные, так и выходные типы, поэтому он длиннее массивов funcs
и data
. Массив указателей data
позволяет для каждой комбинации типов указать свой дополнительный параметр, который будет передан в C-фунцкию в качестве последнего аргумента void* data
. В частности, это можно использовать для реализации разных универсальных функций с помощью одной C-функции.
Чтобы зарегистрировать нашу универсальную функцию, нужно вызвать PyUFunc_FromFuncAndData
и передать ей описанные выше массивы (funcs
, data
и types
), а также количество входных и выходных аргументов универсальной функции, количество поддерживаемых комбинаций типов, имя функции в модуле и строку документации.
#include "Python.h"
#include "numpy/ndarraytypes.h"
#include "numpy/ufuncobject.h"
#include "numpy/npy_3kcompat.h"
#include "sleef/sleefsimd.h"
/* The loop definition must precede the PyMODINIT_FUNC. */
static void double_xsin(char **args, npy_intp *dimensions,
npy_intp* steps, void* data)
{
npy_intp i;
npy_intp n = dimensions[0];
char *in = args[0], *out = args[1];
npy_intp in_step = steps[0], out_step = steps[1];
double tmp[VECTLENDP];
vdouble a;
int slow_n = n % VECTLENDP;
if(in_step != sizeof(double) || out_step != sizeof(double))
slow_n = n;
for(i = 0; i < slow_n; i += VECTLENDP)
{
int j;
for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
{
tmp[j] = *(double *)in;
in += in_step;
}
a = vloadu(tmp);
a = xsin(a);
vstoreu(tmp, a);
for(j = 0; j < VECTLENDP && i + j < slow_n; j++)
{
*(double *)out = tmp[j];
out += out_step;
}
}
if(n > slow_n)
{
double *in_array = (double *)in;
double *out_array = (double *)out;
for(i = 0; i < n - slow_n; i += VECTLENDP)
{
a = vloadu(in_array + i);
a = xsin(a);
vstoreu(out_array + i, a);
}
}
}
static PyMethodDef AvxmathMethods[] = {
{NULL, NULL, 0, NULL}
};
static PyUFuncGenericFunction funcs[1] = {&double_xsin};
static char types[] = {NPY_DOUBLE, NPY_DOUBLE};
static void *data[] = {NULL};
void register_xsin(PyObject *module)
{
PyObject *xsin, *d;
import_array();
import_umath();
xsin = PyUFunc_FromFuncAndData(funcs, data, types, 1, 1, 1,
PyUFunc_None, "sin",
"AVX-accelerated sine calculation", 0);
d = PyModule_GetDict(module);
PyDict_SetItemString(d, "sin", xsin);
Py_DECREF(xsin);
}
#if PY_VERSION_HEX >= 0x03000000
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"avxmath",
NULL,
-1,
AvxmathMethods,
NULL,
NULL,
NULL,
NULL
};
PyMODINIT_FUNC PyInit_avxmath(void)
{
PyObject *m;
m = PyModule_Create(&moduledef);
if (!m) {
return NULL;
}
register_xsin(m);
return m;
}
#else
PyMODINIT_FUNC initavxmath(void)
{
PyObject *m;
m = Py_InitModule("avxmath", AvxmathMethods);
if (m == NULL) {
return;
}
register_xsin(m);
}
#endif
Для сборки модуля я использовал стандартный setup.py
из документации, заменив название модуля и добавив C-файлы библиотеки SLEEF, флаги компилятора и линковщика. Приведенный выше исходный код модуля я сохранил рядом с setup.py
в файле с именем avxmath.c
, директорию simd
из исходного кода SLEEF переименовал в sleef
и также положил рядом с setup.py
.
def configuration(parent_package='', top_path=None):
import numpy
from numpy.distutils.misc_util import Configuration
config = Configuration('',
parent_package,
top_path)
config.add_extension('avxmath', ['avxmath.c', 'sleef/sleefsimddp.c', 'sleef/sleefsimdsp.c'],
extra_compile_args=['-O3', '-Wall', '-Wno-unused', '-Wno-attributes', '-DENABLE_AVX','-mavx'],
extra_link_args=['-lm'])
return config
if __name__ == "__main__":
from numpy.distutils.core import setup
setup(configuration=configuration)
Для компиляции без установки в систему нужно выполнить команду python setup.py build_ext --inplace
, результатом успешного выполнения которой должен стать готовый модуль в файле avxmath.so
. Теперь можно проверить работоспособность нашей функции. Запускаем python в той же директории, где находится файл avxmath.so
, и проверяем:
>>> from numpy import array, pi
>>> import avxmath
>>> avxmath.sin(0)
0.0
>>> avxmath.sin(pi)
1.2246467991473532e-16
>>> avxmath.sin([0, pi/2, pi, 3*pi/2, 2*pi])
array([ 0.00000000e+00, 1.00000000e+00, 1.22464680e-16,
-1.00000000e+00, -2.44929360e-16])
>>>
Убедившись, что модуль avxmath
импортируется и работает без ошибок, можно сделать небольшой тест производительности и точности новой функции.
import numpy
import avxmath
import time
from numpy import random, pi
COUNT=10
x = 2e4*random.random(40000000) - 1e4
t = time.clock()
for i in xrange(COUNT):
y1 = numpy.sin(x)
duration1 = time.clock() - t
print "numpy.sin %f sec" % duration1
t = time.clock()
for i in xrange(COUNT):
y2 = avxmath.sin(x)
duration2 = time.clock() - t
print "avxmath.sin %f sec" % duration2
delta = y2 - y1
rdelta = delta/y1
print "max absolute difference is %lg, relative %lg" % (
delta[abs(delta).argmax()], rdelta[abs(rdelta).argmax()])
print "speedup is %lg" % (duration1/duration2)
numpy.sin 15.510000 sec
avxmath.sin 2.260000 sec
max absolute difference is 2.22045e-16, relative 2.63873e-16
speedup is 6.86283
Итак, мы получили ускорение более чем в 6 раз при точности вычислений не хуже 3·10-16! Заменив вызовы функции xsin
на простое копирование памяти, нетрудно убедиться, что ускорение в 10 раз не получилось из-за того, что около 1 секунды из полученных нами 2.26 секунд выполнения ушло на накладные расходы. Аналогично, заменив функцию xsin
на обычный синус из math.h
, мы обнаружим, что времена вычислений с помощью avxmath.sin
и numpy.sin
в нашем тесте с высокой точностью совпадут.
Таким образом, с помощью SIMD инструкций можно добиться значительного ускорения выполняемых с помощью numpy и scipy вычислений, просто заменив импорты обычных функций на оптимизированные. Ну и конечно исходные коды несколько расширенного по сравнению с данной статьей модуля avxmath
доступны на Github по ссылке.
Автор: nikolaynag