Stream API: универсальная промежуточная операция

в 10:23, , рубрики: java, lazy evaluation, stream api, Программирование, функциональное программирование

Я разрабатываю бесплатную библиотеку StreamEx, которая расширяет стандартное Java 8 Stream API, добавляя туда новые операции, коллекторы и источники стримов. Обычно я не добавляю всё подряд, а всесторонне рассматриваю каждую потенциальную фичу. Например, при добавлении новой промежуточной (intermediate) операции встают такие вопросы:

  1. Будет ли она действительно промежуточной, то есть не будет трогать источник до выполнения терминальной операции?
  2. Будет ли она ленивой и вытаскивать из источника не больше данных, чем требуется?
  3. Сработает ли она на бесконечном стриме? Требует ли она ограниченный объём памяти?
  4. Будет ли она хорошо параллелиться?

Минусик по любому из этих пунктов заставляет серьёзно задуматься, добавлять ли такую операцию. Минусик по первому — это сразу нет. Например, у конкурентов в jOOλ есть операция shuffle(), которая выглядит как промежуточная, но на самом деле прямо сразу потребляет весь стрим в список, перемешивает его и создаёт новый стрим. Я такое не уважаю.

Минусики по остальными пунктам не означают сразу нет, потому что есть и стандартные операции, которые их нарушают. Второй пункт нарушает flatMap(), третий — sorted(), четвёртый — всякие limit() и takeWhile() (в JDK-9). Но всё-таки я стараюсь этого избегать. Однако на днях я открыл для себя операцию, которая плохо параллелится и в зависимости от использования может не сработать на бесконечном стриме, но всё же слишком хороша. Через неё удаётся буквально в несколько строчек выразить как практически любую существующую промежуточную операцию, так и кучу несуществующих. Я назвал операцию headTail().

Метод операции принимает два параметра-функции (везде ниже опускаю PECS для краткости):

<R> StreamEx<R> headTail(BiFunction<T, StreamEx<T>, Stream<R>> mapper, Supplier<Stream<R>> supplier)

Первая функция принимает два аргумента: первый элемент исходного стрима и стрим, содержащий все остальные элементы. Функция может сделать с этим всё что угодно и вернуть новый стрим, который и будет передан нижеследующим операциям. Второй аргумент — функция, не принимающая аргументов, которая возвращает стрим того же типа, что и первая функция. Она вызывается в случае, если исходный стрим оказался пустым. По факту вызывается только одна из функций и только один раз в процессе выполнения терминальной операции всего стрима.

Часто вторая функция должна возвращать просто пустой стрим (если исходный стрим пуст, то и результат пуст), поэтому её можно опускать:

<R> StreamEx<R> headTail(BiFunction<T, StreamEx<T>, Stream<R>> mapper)

Давайте посмотрим, что с этим можно сделать. Простой вариант использования может выглядеть так:

StreamEx.of("name", "John", "Mary", "Lucy")
        .headTail((head, tail) -> tail.map(str -> head+": "+str))
        .forEach(System.out::println);

Вывод:

name: John
name: Mary
name: Lucy

Здесь мы просто откусили первый элемент стрима и использовали его для конкатенации с последующими элементами. Так можно парсить текстовый файл, у которого в первой строке заголовки. Но это довольно скучно. Играясь с этим методом я обнаружил, что он гораздо мощнее. Давайте попробуем выразить через него основные промежуточные операции из JDK.

Stream.map

Операция map применяет заданную функцию ко всем элементам исходного стрима. Вот так она будет выглядеть через headTail():

public static <T, R> StreamEx<R> map(StreamEx<T> input, Function<T, R> mapper) {
    return input.headTail((head, tail) -> map(tail, mapper).prepend(mapper.apply(head)));
}

Здесь мы пользуемся ещё одной простой операцией prepend, без которой бы ничего не вышло. Это вариация на тему конкатенации двух стримов (в стандартном API есть Stream.concat). Здесь мы вызываем сами себя для хвоста, а затем добавляем в начало стрима результат применения функции к головному элементу.

Это похоже на рекурсию, а все знают, что рекурсия жрёт стек. В функциональных языках программирования порой спасает оптимизация хвостовой рекурсии, но в Java её нет и не предвидится. Однако в данном случае это не совсем рекурсия: мы не вызываем метод map внутри самого себя, а только создаём функцию, которая будет вызвана позже. Оказалось, что в данном случае можно проконтролировать глубину вызовов, если изменения каждого отдельного headTail() затрагивают только начало стрима, оставляя неизменный хвост. Я не очень творчески назвал эту фичу «оптимизацией хвостовых стримов» (tail stream optimization). Она совместима с промежуточными операциями prepend (добавить что-то в начало стрима), mapFirst (изменить первый элемент стрима, не трогая остальное) и самим headTail. В принципе её можно бы было распространить на стандартные skip и dropWhile (с JDK-9), но моя библиотека обещает, что стандартные операции полностью совместимы с оригинальным Stream API, а тут возникли бы тонкие отличия.

Так или иначе, приведённая выше операция map не кушает стек или память больше константного размера и вполне применима для стримов любой длины. Давайте посмотрим на другие операции.

Stream.limit

Ограничить стрим заданной длиной. Если ограничить одним элементом, то просто создаём стрим из головы, иначе вызываем себя для хвоста с уменьшенным ограничением (обработать n <= 0 — упражнение для читателя):

public static <T> StreamEx<T> limit(StreamEx<T> input, int n) {
    return input.headTail((head, tail) -> n > 1 ? limit(tail, n - 1).prepend(head) : Stream.of(head));
}

Вначале я написал немного по-другому (как и аргумент flatMap, аргумент headTail может вернуть null вместо пустого стрима):

public static <T> StreamEx<T> limit(StreamEx<T> input, int n) {
    return input.headTail((head, tail) -> n > 0 ? limit(tail, n - 1).prepend(head) : null);
}

Но у такой реализации есть недостаток: она считает из источника на один элемент больше, чем надо (при n = 0 аргумент head считывается, но не используется). Иногда это может быть критично. Например, такой код должен работать:

limit(StreamEx.of(new Random().ints(0, 1000).boxed().distinct()), 1000).forEach(System.out::println);

Бесконечный поток случайных чисел от 0 до 999, из которого мы выбираем уникальные. 1000 уникальных есть, а вот 1001 нету, поэтому если пытаться вытащить из источника 1001-е число, то всё зависнет.

Stream.skip

Выкинуть n первых элементов. Если n = 0, вернём просто хвост с приклеенной головой, иначе вызовем себя с уменьшенным аргументом:

static <T> StreamEx<T> skip(StreamEx<T> input, int n) {
    return input.headTail((head, tail) -> n > 0 ? skip(tail, n - 1) : tail.prepend(head));
}

Stream.flatMap

Отобразить каждый элемент на стрим и сделать из них общий стрим. В нашем случае реализация такая же, как у map:

public static <T, R> StreamEx<R> flatMap(StreamEx<T> input, Function<T, Stream<R>> mapper) {
    return input.headTail((head, tail) -> flatMap(tail, mapper).prepend(mapper.apply(head)));
}

Здесь отличие только в том, что используется другой prepend, который принимает стрим (собственно, первый prepend — это частный случай этого).

Stream.peek

Выполнить дополнительное действие для каждого элемента стрима и вернуть стрим как есть. Выполняем действие и приклеиваем голову к хвосту:

public static <T> StreamEx<T> peek(StreamEx<T> input, Consumer<T> consumer) {
    return input.headTail((head, tail) -> {
        consumer.accept(head);
        return peek(tail, consumer).prepend(head);
    });
}

Stream.filter

Оставить элементы удовлетворяющие предикату. Приклеиваем голову, только если предикат выполняется:

public static <T> StreamEx<T> filter(StreamEx<T> input, Predicate<T> predicate) {
    return input.<T> headTail((head, tail) -> predicate.test(head) 
        ? filter(tail, predicate).prepend(head) 
        : filter(tail, predicate));
}

Stream.distinct

Оставить уникальные элементы. Тут уже явно потребуется дополнительная память. Наивная реализация будет использовать filter (можно стандартный или объявленный выше):

public static <T> StreamEx<T> distinct(StreamEx<T> input) {
    return input.headTail((head, tail) -> distinct(tail.filter(n -> !Objects.equals(head, n))).prepend(head));
}

Но такой код всё же жрёт стек, оптимизации хвостовых стримов нет. Кроме того, каждый элемент проверяется цепочкой фильтров линейно, а хотелось бы оптимизировать. Для этого будем держать в параметрах HashSet:

private static <T> StreamEx<T> distinct(StreamEx<T> input, Set<T> observed) {
    return input.headTail((head, tail) -> observed.add(head) 
            ? distinct(tail, observed).prepend(head)
            : distinct(tail, observed));
}

Не забываем, что Set.add возвращает false, если элемент уже был в множестве. В этом случае голову не приклеиваем. Такая реализация стек уже не кушает и по памяти не уступает стандартной. Тут стоит добавить метод для запуска (с рекурсивными функциями часто бывает, что нужен отдельный публичный метод для запуска):

public static <T> StreamEx<T> distinct(StreamEx<T> input) {
    return distinct(input, new HashSet<>());
}

Stream.sorted

Отсортировать стрим. Операция особенная: здесь нельзя ничего выдать в результат, пока источник не прочитан полностью. Придётся всё буферизовать (например, в ArrayList) и здесь нам впервые пригодится второй аргумент headTail:

public static <T> StreamEx<T> sorted(StreamEx<T> input) {
    return sorted(input, new ArrayList<>());
}

private static <T> StreamEx<T> sorted(StreamEx<T> input, List<T> buf) {
    return input.headTail((head, tail) -> {
        buf.add(head);
        return sorted(tail, buf);
    }, () -> {
        buf.sort(null);
        return buf.stream();
    });
}

Когда весь исходный стрим кончился, мы сортируем буфер и возвращаем с него поток. Замечу, что такой sorted работает похоже на стандартный и он всё же лучше, чем приведённый выше shuffle. К примеру, если вы конкатенируете два сортированных стрима, второй не будет сортироваться, пока вы полностью не прочитаете первый. Кстати, заменив buf.sort(null) на Collections.shuffle(buf) вы и shuffle можете сделать более-менее нормально. А с Collections.reverse(buf) можно перевернуть стрим.

JDK-9 пока добавляет две новых промежуточных операции. Реализуем и их:

Stream.takeWhile

Обрезать стрим как только предикат вернёт false. Похоже на limit:

public static <T> StreamEx<T> takeWhile(StreamEx<T> input, Predicate<T> predicate) {
    return input.headTail((head, tail) -> predicate.test(head) 
        ? takeWhile(tail, predicate).prepend(head) : null);
}

Stream.dropWhile

Выкидывать элементы из стрима, пока предикат не вернёт false. Похоже на skip:

public static <T> StreamEx<T> dropWhile(StreamEx<T> input, Predicate<T> predicate) {
    return input.headTail((head, tail) -> predicate.test(head) ? dropWhile(tail, predicate) : tail.prepend(head));
}

Ну изобретать велосипед скучно. Давайте попробуем реализовать новые операции, которых нет в Stream API.

mirror

Добавим в конец стрима его содержимое в обратном порядке (чтобы стрим из 1, 2, 3 превратился в 1, 2, 3, 3, 2, 1). Можно сделать просто, но без хвостовой оптимизации:

public static <T> StreamEx<T> mirror(StreamEx<T> input) {
    return input.headTail((head, tail) -> mirror(tail).append(head).prepend(head));
}

С хвостовой же потребуется буфер:

public static <T> StreamEx<T> mirror(StreamEx<T> input) {
    return mirror(input, new ArrayDeque<>());
}

private static <T> StreamEx<T> mirror(StreamEx<T> input, Deque<T> buf) {
    return input.headTail((head, tail) -> {
        buf.addFirst(head);
        return mirror(tail, buf).prepend(head);
    }, buf::stream);
}

Обе реализации не берут больше, чем надо: mirror(StreamEx.of(1,2,3,4,5)).limit(3) не дойдёт до точки отражения и вычитает только три элемента из источника.

scanLeft

Последовательно модифицируем стрим, выполняя заданную операцию. Например, scanLeft(StreamEx.of(1,2,3,4,5), Integer::sum) должен последовательно суммировать элементы и создать стрим 1, 3, 6, 10, 15.

public static <T> StreamEx<T> scanLeft(StreamEx<T> input, BinaryOperator<T> operator) {
    return input.headTail((head, tail) -> 
        scanLeft(tail.mapFirst(cur -> operator.apply(head, cur)), operator).prepend(head));
}

Здесь мы воспользовались методом mapFirst, который уже есть в StreamEx. Но если б и не было, мы б его легко написали даже без всякой рекурсии:

public static <T> StreamEx<T> mapFirst(StreamEx<T> input, UnaryOperator<T> operator) {
    return input.headTail((head, tail) -> tail.prepend(operator.apply(head)));
}

В любом случае хвосты оптимизируются, как с нашим mapFirst, так и с имеющимся.

takeWhileClosed

Название, возможно, не очень удачное. Иногда хочется модифицировать takeWhile, чтобы в поток попадали не только элементы, удовлетворяющие предикату, но и первый элемент, его нарушающий. Через существующие операции и через обычный takeWhile это нормально не выразить. А через headTail — легко:

public static <T> StreamEx<T> takeWhileClosed(StreamEx<T> input, Predicate<T> predicate) {
    return input.headTail((head, tail) -> predicate.test(head) 
            ? takeWhileClosed(tail, predicate).prepend(head)
            : Stream.of(head));
}

every

Брать элементы из стрима с заданным интервалом (например, каждый десятый), начиная с первого. Здесь удобно скомбинировать с операцией skip, но стандартный skip не оптимизирует хвосты, поэтому воспользуемся нашим переопределённым skip:

public static <T> StreamEx<T> every(StreamEx<T> input, int n) {
    return input.headTail((head, tail) -> every(skip(tail, n - 1), n).prepend(head));
}

couples

Разбить стрим на непересекающиеся пары элементов, применив к ним заданную функцию (если элементов нечётное количество, последний выкинуть). Здесь удобно вызвать headTail дважды:

public static <T, R> StreamEx<R> couples(StreamEx<T> input, BiFunction<T, T, R> mapper) {
    return input.headTail((left, tail1) -> 
            tail1.headTail((right, tail2) -> 
                couples(tail2, mapper).prepend(mapper.apply(left, right))));
}

pairMap

А если мы хотим с пересекающимися парами то же самое? Легко, надо только вернуть правый элемент в стрим при рекурсивном вызове:

public static <T, R> StreamEx<R> pairMap(StreamEx<T> input, BiFunction<T, T, R> mapper) {
    return input.headTail((left, tail1) -> 
        tail1.headTail((right, tail2) -> 
            pairMap(tail2.prepend(right), mapper).prepend(mapper.apply(left, right))));
}

Такая операция уже есть в StreamEx, и я про неё писал. Она, конечно, нормально распараллеливается в отличие от реализации через headTail().

batches

Ладно, с парами понятно. А если мы хотим разбить стрим на кусочки фиксированный длины (в виде списков) и не потерять нецелый кусочек в конце? Например, batches(StreamEx(1,2,3,4,5,6,7), 3) должно сделать поток из списков [1,2,3], [4,5,6], [7]. Тут поможет аргумент, содержащий промежуточный буфер:

public static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size) {
    return batches(input, size, Collections.emptyList());
}

private static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size, List<T> cur) {
    return input.headTail((head, tail) -> cur.size() >= size 
            ? batches(tail, size, Collections.singletonList(head)).prepend(cur) // старый буфер приклеиваем в голову и начинаем новый
            : batches(tail, size, StreamEx.of(cur).append(head).toList()), // добавляем к старому буферу
            () -> Stream.of(cur));
}

Когда источник исчерпан мы отдаём в результат последний накопленный буфер с помощью () -> Stream.of(cur), чтобы не потерять хвост. Здесь для красоты реализации я каждый раз создаю новый список через StreamEx.of(cur).append(head).toList(), а не меняю существующий. Но несложно и изменяемые списки вставить, если важна производительность.

withIndices

Потребовалось узнать индексы элементов в стриме? Можно и это. Чтобы не заводить специальный тип вроде пары индекс-элемент, примем абстрактную функцию типа BiFunction<Integer, T, R>, которая может с индексом и элементом сделать всё, что хочет:

public static <T, R> StreamEx<R> withIndices(StreamEx<T> input, BiFunction<Integer, T, R> mapper) {
    return withIndices(input, 0, mapper);
}

private static <T, R> StreamEx<R> withIndices(StreamEx<T> input, int idx, BiFunction<Integer, T, R> mapper) {
    return input.headTail((head, tail) -> withIndices(tail, idx + 1, mapper).prepend(mapper.apply(idx, head)));
}

dominators

Более экзотическая задача: будем выкидывать элементы, следующие после данного, над которыми данный «доминирует». Доминирование определяет предикат от двух элементов. Например, dominators(numbers, (a, b) -> a >= b) оставит из исходных чисел возрастающий поднабор. Реализация похожа на every, только вместо skip используется наш dropWhile:

public static <T> StreamEx<T> dominators(StreamEx<T> input, BiPredicate<T, T> isDominator) {
    return input.headTail((head, tail) -> dominators(dropWhile(tail, e -> isDominator.test(head, e)), isDominator)
            .prepend(head));
}

appendReduction

Добавить в конец стрима ещё один элемент — результат его редукции с заданной операцией. Например, appendReduction(numbers, 0, Integer::sum) допишет в стрим чисел сумму его элементов.

public static <T> StreamEx<T> appendReduction(StreamEx<T> input, T identity, BinaryOperator<T> op) {
    return input.headTail((head, tail) -> 
        appendReduction(tail, op.apply(identity, head), op).prepend(head),
        () -> Stream.of(identity));
}

Как обычно, всё лениво и хвосты оптимизируются.

primes

Скорее учебная задача. Сделать решето Эратосфена: ленивый поток простых чисел, который выкидывает те, что делятся на уже виденные ранее:

public static StreamEx<Integer> sieve(StreamEx<Integer> input) {
    return sieve(StreamEx.iterate(2, x -> x+1));
}

private static StreamEx<Integer> sieve(StreamEx<Integer> input) {
    return input.headTail((head, tail) -> sieve(tail.filter(n -> n % head != 0)).prepend(head));
}

Здесь хвостовой оптимизации не получается, хотя аналогичная штука на функциональных языках тоже, естественно, не оптимизируется. Но выглядит просто. Со стандартными настройками JVM успевает выдать простые числа до 200 000 с лишним, пока не упадёт со StackOverflowError.

Можно придумать и другие полезные операции. Например, повторить содержимое стрима в цикле заданное количество раз. Или сдублировать стрим, отфильтровав его двумя разными фильтрами (при этом не хранить в памяти то, что не прошло второй фильтр). Можно сделать бегущее окно (по аналогии с batches, но внахлёст). По факту что бы я ни придумал, мне удавалось это реализовать с помощью headTail весьма коротко (мои тесты здесь). Во всяком случае, для меня headTail точно короче и понятнее, чем писать Iterator или Spliterator. Как я понимаю, в мире функционального программирования подобные штуки — обычное дело. Приятно, что и на Java это возможно.

Программируйте с удовольствием!

Автор: lany

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js