«Невозможный» параллельный алгоритм неотрицательной суммы

в 7:02, , рубрики: forkjoin, haskell, java, java streams, MapReduce, sql, sum, ассоциативность, магия, моноид, параллельное программирование, параллельные вычисления, Программирование

Сумма целых чисел — что может быть проще? Сумма есть в SQL, в Java Stream API… в крайнем случае напишем сами. Как и всякая абстракция, она расходится с реальностью.

Вот счёт клиента в банке, по нему движения — положительные пополнения и отрицательные списания — в сумме дают текущий баланс. Так сумма работает в идеальном мире. А в реальности при большом минусе банк с отсрочкой, но предпримет нетривиальные действия вплоть до обращения в суд, чтобы закрыть финансовую брешь.

static long usualSum(LongStream changes) {
    return changes.reduce(0, (a, b) -> a + b);
}

Для склада вроде бы те же положительные приходы и отрицательные отгрузки в сумме дают текущий остаток. Но действия по увязке абстракции с реальностью придётся предпринимать безотлагательно — нельзя отгрузить товар, которого физически нет, нужно сразу считать неотрицательную сумму, поправляя отгрузку. Это касается всех материально-физических объектов — хоть наличных в кошельке, хоть овец в стаде.

static long nonNegativeSum(LongStream changes) {
    return changes.reduce(0, (a, b) -> Math.max(0, a + b));
}

А здесь уже проблемы реализации:

  • Неассоциативная бинарная операция нарушает контракт свёртки Stream.reduce(). При вычислении после Stream.parallel() результат будет некорректным, отличным от результата последовательного вычисления.

  • Внутри Math.max() есть ветвление и заранее неизвестно, хватит ли остатка на складе для отгрузки. Как разбить вычисление на независимые куски для параллельных вычислителей, если следующий кусок последовательно зависит от предыдущего?

  • В SQL нет аналога, а пользовательский агрегат, опять же, должен быть ассоциативным для параллелизации.

Fork/Join сломан неассоциативной операцией
Fork/Join сломан неассоциативной операцией

"Невозможная" параллельная реализация неотрицательной суммы всё же существует. Весь немногословный код поместим в

public class Example {
    …
}

Для тестирования создадим длинную историю псевдослучайных изменений, пусть распределённых на отрезке [-99, 99]

static LongStream changes() {
    return LongStream
            .range(1, 1000_000_000)
            .map(i -> (i * 137) % 199 - 99);
}

Чтобы на коленке оценить корректность и эффект от параллелизации, печатаем время вычисления и результат.

static void bench(ToLongFunction<LongStream> function, LongStream changes) {
    long start = System.currentTimeMillis();
    long result = function.applyAsLong(changes);
    long end = System.currentTimeMillis();
    System.out.printf("%4sms: %sn", end - start, result);
}

Проверим обычную сумму: вычисления параллелятся идеально и результат совпадает с последовательным. Догадайтесь, сколько ядер в процессоре?

bench(Example::usualSum, changes());
bench(Example::usualSum, changes().parallel());
> 1585ms: 147
>  390ms: 147

Наивная реализация считается быстро, но неправильно при параллельном вычислении.

bench(Example::nonNegativeSum, changes());
bench(Example::nonNegativeSum, changes().parallel());
> 1274ms: 300
>  390ms: 13309

Для верного параллельного решения познакомимся с группой Гротендика, которая позволяет представить целое, в том числе отрицательное, парой натуральных чисел

-2 ~ (5, 3) ~ (15, 13) ~ (105, 103) ~ …

где первый элемент пары трактуется как вычитаемое, а второй элемент — как уменьшаемое. Класс с двумя полями в представлении Гротендика будет хранить промежуточное состояние агрегата неотрицательной суммы.

static final class Gro {
    private long fall;
    private long grow;
}
Место для удара головой 🤯
y(x)=grow + max(0, x - fall)
y(x) = grow + max(0, x - fall)

Семейство функций смещённых ReLU с двумя параметрами fall и grow, задающих точку излома, замкнуто относительно композиции. То есть y₂(y₁(x)) имеет такой же вид смещённого ReLU, ибо

max(0, a + max(0, b)) = max(0, a) + max(0, b + min(0, a))

Композиция функций ассоциативна, значит, композиция смещённых ReLU тоже ассоциативна.

Неотрицательную сумму реализуем через мутабельную свёртку Stream.collect():

  • Параллельным вычислителям раздаём по пустому агрегату Gro::new;

  • Агрегируем accumulator независимые куски данных;

  • В конце попарно, сохраняя порядок кусков, объединяем через combiner агрегаты всех вычислителей в один.

  • Результатом из объединённого агрегата достаём только уменьшаемое, а про вычитаемое забываем.

    static long grosum(LongStream changes) {
        return changes.collect(Gro::new, Example::accumulator, Example::combiner)
          					  .grow;
    }
    
    static void accumulator(Gro a, long value) {
        a.grow += value;
        if (a.grow < 0) {
            a.fall -= a.grow;
            a.grow = 0;
        }
    }
    
    static void combiner(Gro a, Gro b) {
        if (a.grow < b.fall) {
            a.fall += b.fall - a.grow;
            a.grow = b.grow;
        } else {
            a.grow += b.grow - b.fall;
        }
    }

Проверяем параллельную реализацию неотрицательной суммы: вычисления параллелятся отлично и результат совпадает с последовательным.

bench(Example::grosum, changes());
bench(Example::grosum, changes().parallel());
> 1277ms: 300
>  402ms: 300
Исходники на разных языках: Java, SQL, Haskell, русский

Кроме кода на Java, можно глянуть SQL, реализованный пользовательским агрегатом на Oracle. В отличие от обычной ассоциативной и коммутативной sum, grosum ассоциативна, но некоммутативна. Поэтому требуется всегда указывать порядок.

select grosum(change) over (order by time) as balance
from changes

Есть реализация моноида на Haskell, которых хорош тем, что тесты в 2 строки.

prop_eq xs  = grosum xs == foldl' (⊞) 0 xs
prop_monoid = monoid (mempty :: Gro Int)

На русском языке можно посмотреть доклад, слайды и транскрипт.

Какая-то абстрактная магия в реальности работает на благо человека.

Автор: deb

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js