Clojure — трансдьюсеры, редьюсеры и прочая муть

в 4:28, , рубрики: clojure, fold, lazy collections, reduce, reducers, transducers, муть всякая, Программирование, редьюсеры, трансдьюсеры, функциональное программирование

Clojure — трансдьюсеры, редьюсеры и прочая муть - 1 В последнее время определенную известность получили transducers — новая фишка из еще не вышедшей Clojure 1.7. На момент написания статьи актуальна Сlojure 1.7-alpha5, но уже успело появиться изрядное количество портов трансдьюсеров на разнообразные языки: Python, Ruby, JavaScript, PHP, Java, C++, Lua, Erlang. И… это, по правде говоря, немного обескураживает. Ведь довольно давно (еще в Clojure 1.5) добавили библиотеку reducers. Так вот про редьюсеры никто особо не говорил, никуда ничего не портировал, хотя, вроде как, делают они схожие вещи… Или нет?

Давайте разберемся, для чего нам в Clojure понадобились все эти reducers & transducers (они нам правда нужны?), как они работают, как их использовать… И выясним наконец, не пора ли выкидывать reducers на свалку.

Будет неправильным описывать концепции, возникшие в Clojure, вне контекста данного языка. Потому дальше будет много листингов на Clojure. Но зато не будет никакого матана. В общем начальные знания Clojure уместны (особенно представление о sequences), а вот знать Haskell не обязательно. Также заранее предупреждаю, что все приведенные листинги стандартных функций на самом деле очень сильно изменены, иногда даже «слегка» поломаны. Все во благо упрощения. Ах да, на картинке тот самый буррито.

Сворачиваемся…

Итак, Clojure язык функциональный, а значит обыкновенный императивный цикл не есть хорошо.
Ну и ладно, не очень нам хотелось — есть же функциональноугодный reduce!

(defn my-reduce
 ([rf coll]  ;; этот вариант для удобства
  (if-let [s (seq coll)]
    (my-reduce rf (first s) (next s))
    (rf)))
 ([rf acc coll]
   (if-let [[x & xs] (seq coll)]
     (recur rf (rf acc x) xs)
     acc)))

В действительности reduce, конечно же, реализован несколько иначе, но нам это сейчас не важно, забудем. Функция rf (назовем ее редукт-функция) тут принимает два аргумента: первый — некое «скользящее состояние»; второй — элемент из последовательности coll. Если начальное состояние не задано, то используется (first coll) или (rf). Пробегаемся по всей коллекции coll, для каждого элемента вызываем rf, при этом «протаскивая» состояние acc. Когда элементы закончились, то просто возвращаем acc.

Небольшой пример. Допустим, у нас есть список строк, хотим подсчитать их суммарную длину.
Вот императивный код с циклом:

(defn length-of-strings [strings]
  (with-local-vars [acc 0]  ;; да-да, в Clojure есть локальные переменные!
    (doseq [c strings]
      (var-set
        acc
        (+ @acc (count c))))  ;; собственно вся работа
    @acc))

Состояние цикла — простой счетчик acc (число). На каждой итерации мы полагаем его равным (+ @acc (count c)).
А теперь еще разок, только через reduce:

(defn length-of-strings [coll]
  (my-reduce
    (fn ([acc c] (+ acc (count c))))  ;; наша редукт-функция
    0                                 ;; начальное значение
    coll))

В случае, если временно «подзабыть» о ленивости, то можно многие примитивные операции реализовать, вроде map или filter.

(defn my-map [f coll]
  (my-reduce
    (fn [acc c] (conj acc (f c)))
    []
    coll))

(defn my-filter [p coll]
  (my-reduce
    (fn [acc c] (if (p c) (conj acc c) acc))
    []
    coll))

Для реализации take приведенный вариант reduce уже не сгодится — цикл всегда пробегает по всей последовательности (это не какой то там Haskell, где все ленивое).

Дабы побороть сей недостаток, в версию 1.5 добавили специальный костыль маркер reduced и соответствующий ему предикат reduced?. Заодно переписали reduce, получив что-то наподобие этого:

(defn my-reduce
  ([rf coll]
   (if-let [s (seq coll)]
     (my-reduce rf (first s) (next s))
     (rf)))
  ([rf acc coll]
   (if-let [[x & xs] (seq coll)]
     (let [ret (rf acc x)]
       (if (reduced? ret)
         @ret
         (recur rf ret xs)))
     acc)))

Как только редукт-функция вернет нам (reduced ...), цикл обрывается и возвращается значение @ret.

(defn take-r [n coll]
  (my-reduce
    (fn [[n1 acc] c]
      (if (pos? n1)
        [(dec n1) (conj acc c)]
        (reduced acc)))
    [n []]
    coll))

;; функция поддерживает бесконечные последовательности!
(take-r 5 (range))
;; => [0 1 2 3 4]

Нельзя не вспомнить про замечательнейшую функцию reductions. По сути своей это аналог reduce, только возвращает ленивый список из всех промежуточных значений acc, а не только последнее. Ее весьма удобно использовать при отладке. Пишем шаг алгоритма в виде функции, запускаем reduce на коллекции с входными данными. Если вдруг что-то не так, заменяем reduce на reductions, запускаем в REPL и получаем все промежуточные шаги. С циклами так просто не получится — надо будет городить отладочные костыли, что не очень удобно.

Бывает reductions полезна и сама по себе, факториалы там какие посчитать:

;; => *ленивая* последовательность факториалов
(def factorials (reductions *' (cons 1 (map inc (range)))))

(nth factorials 20)
;; => 2432902008176640000

Clojure использует sequences для прохода по коллекциям. Если мы решим пробежаться по вектору, хеш-таблице или простому итератору, то в куче будет создано изрядное количество временных объектов.

Очевидная оптимизация, которая просится в такой ситуации — реализовать специализированный вариант reduce для тех коллекций, для которых это имеет смысл. Ну а если уж коллекция не поддается такой оптимизации, тогда использовать стандартную реализацию, сходную с той, что приведена в начале статьи. Для этого имеется специальный протокол clojure.core.protocol/CollReduce. Когда объект-коллекция его поддерживает, то эта реализация и будет использоваться внутри clojure.core/reduce. Поэтому reduce в Clojure обычно быстрее аналогичного цикла doseq.

Трансформеры

Трансформер — это такая функция, которая принимает одну редукт-функцию и возвращает новую.
Например, вот трансформер «увеличить на 1»:

(defn inc-t [rf]
  (fn [acc c] (rf acc (inc c))))

;; и сразу пример использования
(reduce + 0 (map inc [1 2 3 4]))
;; => 14

(reduce (inc-t +) 0 [1 2 3 4])
;; => 14

Можно это дело несколько обобщить, разрешив вместо inc указывать любую функцию:

(defn map-t [f]
  (fn [rf]
    (fn [acc c] (rf acc (f c)))))

(def inc-t (map-t inc))
(def dec-t (map-t dec))
;; ...

(reduce (inc-t +) 0 [1 2 3 4])
;; => 14

А вот, например, трансформер «фильтратор»:

(defn filter-t [pred]
  (fn [rf]
    (fn [acc c]
      (if (pred c)
         (rf acc c)
         acc))))

(def odd?-t (filter-t odd?))
(def even?-t (filter-t even?))

;; пример
(reduce (even?-t *) 1 [1 2 3 4])
;; => 8

А можно ли совместить несколько трансформеров? Разумеется!

(defn odd?-inc-t [rf]
  (odd?-t (inc-t rf)))

;; ..или чуть более канонично
(def odd?-inc-t (comp (filter-t odd?) (map-t inc)))

;; что логически эквивалентно..
(def odd?-inc-t
  (comp
    (fn [rf]
      (fn [acc c]
        (if (odd? c) (rf acc c) acc)))
    (fn [rf]
      (fn [acc c]
        (rf acc (inc c))))))

;; что даст эквивалент такой функции
(defn odd?-inc-t [rf]
  (fn [acc c]
    (if (odd? c)
      (rf acc (inc c))
      acc)))

;; пример использования
(reduce * 1 (->> [1 2 3 4 5] (map inc) (filter odd?)))
;; => 15

(reduce (odd?-inc-t *) 1 [1 2 3 4 5])
;; ==> 15

Стоит обратить внимание, что трансформеры идут в «обратном» порядке. Если мы хотим, чтобы элементы коллекции обрабатывались трансформером A до того, как они попадут в B, то склеивать их нужно как (comp A B). А теперь фокус:

(def cc (vec (range 1000000)))

(time (reduce + 0 (->> cc (filter odd?) (map inc))))
;; "Elapsed time: 171.390143 msecs"
;; => 250000500000

(time (reduce ((comp (filter-t odd?) (map-t inc)) +) 0 cc))
;; "Elapsed time: 93.246015 msecs"
;; => 250000500000

Вот как, ощутимый прирост скорости на ровном месте! Все, конечно, зависит от множества деталей и различных ньансов, поэтому в реальности выигрыш может быть иным. В общем я хочу сказать, что не стоит воспринимать этот кусок кода как бенчмарк.

Но в целом результаты совсем не удивительные. При использовании map и filter создается 2 промежуточных последовательности. Мы пробегаем по исходному вектору, создаем временный список из отфильтрованных значений. Затем пробегаем по этому списку и строим еще один, но с увеличенными элементами. И, наконец, пробегаемся уже по нему, суммируя значения.

С другой стороны, вариант с трансформерами не создает никаких временных коллекций. Вместо этого над исходными элементами сразу же применяется и odd?, и inc.

Где мои редьюсеры?

И было все хорошо, пока версия 1.5 не привнесла новую стандартную библиотеку clojure.core.reducers. Именно так, отдельная библиотека, придется ее явно импортировать. А еще в ней объявлены свои версии map, filter, take-while, и другие. И, конечно же, они не совместимы с обычными версиями из clojure.core. Поэтому лучше писать (require '[clojure.core.reducers :as r]) вместо простого (use 'clojure.core.reducers).

Итак, что же такое редьюсер? Кратко и глупо: редьюсер — это всякий объект, по которому можно редьюситься. Любая коллекция в терминах clojure.core.reducers — это редьюсер. Хеш-таблица — редьюсер. И java.lang.String редьюсер. Ну и nil, разумеется, тоже. Посмотрим определение:

(defn reducer [coll xf]
  ;; `xf` - это трансформер
  (reify
    clojure.core.protocols/CollReduce
    (coll-reduce [this f1]
      (let [f2 (xf f1)]
        (clojure.core.protocols/coll-reduce coll f2 (f2))))
    (coll-reduce [this f1 init]
      (let [f2 (xf f1)]
        (clojure.core.protocols/coll-reduce coll f2 init)))))

Тут берется коллекция coll, и возвращается новая, по которой можно запустить reduce, и только это. Ни добавить элемент, ни удалить, ни даже пройтись по элементам. Зато перед каждым запуском reduce редукт-функция будет пропущена сквозь трансформер xf.

(def nums [1 2 3 4 5])
(def nums+1 (reducer nums inc-t))

(reduce + 0 nums)
;; => 15

(reduce + 0 nums+1)
;; => 20

Как уже упоминалось, в библиотеке reducers объявлены свои варианты map, filter, take-while и подобные. Все они принимают редьюсер и возвращают новый, к которому «прикреплен» соответствующий трансформер.

Так бы могла выглядеть clojure.core.reducers/map (она, конечно же, выглядит совсем иначе):

(def map-r [f coll]
  (reducer coll (map-t f)))

И теперь несколько примеров как все это добро можно использовать:

(require '[clojure.core.reducers :as r])

(def nums [1 2 3 4 5 6 7 8])

(type (map inc nums))
;; => clojure.lang.LazySeq

(reduce conj [] (map inc nums))
;; => [2 3 4 5 6 7 8 9]

(type (r/map inc nums))
;; => clojure.core.reducers$folder$reify__1234
;; совсем-совсем не sequence

(reduce conj [] (r/map inc nums))
;; => [2 3 4 5 6 7 8 9]
;; но все еще умеет редьюситься

(reduce conj [] (r/filter odd? nums))
;; => [1 3 5 7]

(reduce + 0 (->> nums (r/map inc) (r/map inc)))
;; => 52
;; ~~ (+ 0 (inc (inc 1)) (inc (inc 2)) ...)

(reduce + 0 (->> nums (r/filter odd?) (r/map inc)))
;; => 20
;; ~~ (+ 0 (inc 1) (inc 3) ...)

Параллелимся

Если честно, то «reducers» зря так назвали. Правильнее было бы «folders». Ведь помимо протокола CollReduce (который появился задолго до reducers), в библиотеке объявлен другой, более важный, протокол CollFold:

(defprotocol CollFold
  (coll-fold [coll n combinef reducef]))

В принципе очень похоже, только редукт-функций теперь две, а еще добавился непонятный аргумент n. Идея: по некоторым коллекциям можно пробегаться в несколько потоков. Кратко: разбиваем ее на блоки размером примерно n элементов, каждый кусок сворачиваем при помощи #(reduce reducef (combinef) %), затем список результатов (по одному на блок) сворачиваем еще раз, но уже при помощи #(reduce combinef %).

Редьюсер, который умеет сворачивать себя параллельно, называется folder.
Всего 2 стандартных коллекции поддерживает протокол CollFold — вектора и хеш-таблицы.

(def v (vec (range 10000000)))

;; линейно, в 1 поток
(time (reduce + v))
;; "Elapsed time: 648.897616 msecs"
;; => 49999995000000

;; в несколько потоков
(time (r/coll-fold v 512 + +))
;; "Elapsed time: 187.414147 msecs"
;; => 49999995000000

Все стандартные редьюсеры, для которых это имеет смысл, реализуют CollFold. Это, например, r/map, r/filter, r/mapcat, r/flatten. С другой стороны, r/take, r/take-while, r/drop не поддерживают параллелизацию. Выше приводилась реализация r/map. Вот ее обновленный вариант:

(def map-r [f coll]
  ;; просто заменили `reducer` на `folder`
  (folder coll (map-t f)))

Напрямую использовать coll-fold не нужно — для повседневных нужд имеется обертка fold. В ней задано значение по умолчанию для n (размер блока) — 512. В общем намек ясен — reducers явно предназначены для больших коллекций (>1K элементов). И еще раз: не используйте coll-fold напрямую, вызывайте fold.

Ах, есть же еще foldcat. Эдакий ускоренный (за счет многопоточности) вариант #(reduce conj [] %). Возвращает эта функция объекты clojure.core.reducers.Cat, которые реализуют и Counted, и Sequable, и CollFold.

(r/map inc [1 2 3])
;; => #<reducers$folder$reify__.....
;; как же *это* использовать?

;; ну.. можно это дело через `reduce` прогнать
(reduce conj [] (r/map inc [1 2 3]))
;; => [2 3 4]

;; что там со скоростью...
(def v (vec (range 1000000)))

(time (count (reduce conj [] (r/map inc v))))
;; "Elapsed time: 90.124397 msecs"
;; => 1000000

;; что-то не очень, а если через `foldcat`
(time (count (r/foldcat (r/map inc v))))
;; "Elapsed time: 25.054988 msecs"
;; => 1000000

(time (count (r/foldcat (r/map inc (r/foldcat (r/map inc v))))))
;; "Elapsed time: 32.054988 msecs"
;; => 1000000

;; результат `foldcat`, кстати, тоже foldable (привет, многопоточность)
(satisfies? r/CollFold (r/foldcat []))
;; => true

На сцену врываются…

В отличие от редьюсеров, трансдьюсеры уже не отдельная библиотека. Это скорее концепция (читай идея), которая будет интегрирована прямо в модуль clojure.core. Ждем это добро в версии 1.7 (совсем чуть-чуть осталось).

Кратко: трансдьюсеры — это те же трансформеры, только после ребрендинга иначе названные. Ну ладно, почти: редукт-функции отныне могут принимать не только 0 и 2 аргумента, а еще и 1. А трансдьюсер, соответственно, это функция от 0-1-2-арной редукт-функции в 0-1-2-арную новую.

(def typical-transducer
  (fn [rf]
    (fn ([] ...)         ;; возвращаем начальный элемент
        ([acc] ...)      ;; непонятно...
        ([acc c] ...)))  ;; собственно, тут все самое важное, как и раньше

;; новый вариант `map-t`, на 33% лучше старого
(defn map-t-improved [f]
  (fn [rf]
    (fn ([] (rf))                    ;; пробрасываем дальше
        ([acc] (rf acc))             ;; пробрасываем дальше
        ([acc c] (rf acc (f c))))))  ;; заменяем `c` на `(f c)`

0-арная редукт-функция как и раньше, может быть вызвана, если нужен начальный элемент. Вариант 2-арный используется для, собственно, самой редукции. А 1-арный вариант вызывается в самом конце всей работы (по завершению reduce). Он нужен в тех случаях, когда нужно «дописать» новые элементы за последним.

Пример: трансдьюсер dedupe, пропускающий повторы из коллекции:

(defn my-dedupe []
  (fn [rf]
    ;; острожно, состояние!
    (let [prev (atom ::none)]
      (fn  ;; это наша редукт-функция
        ([] (rf))
        ([acc] (rf acc))
        ([acc c]
          (let [p @prev]
            (reset! prev c)
            (if (= p c)
              acc
              (rf acc c))))))))

(def rf ((my-dedupe) +))

(reduce rf 0 [1 1, 2 2, 3, 1 1])
;; => 7

(reduce rf 0 [1 1, 2 2, 3, 1 1])
;; => 6
;; упс... `rf` не чистая, нельзя ее использовать 2 раза

Тонкий момент — наш трансдьюсер возвращает новую редукт-функцию. Причем эта редукт-функция обладает мутабельным состоянием и умеет делать по сути 3 разных вещи (по 1 на арность). Ну прям объект какой-то. Но при этом сам трансдьюсер состоянием не обладает, он лишь выступает в роли некой фабрики.

Как пример использования 1-арного варианта редукт-функции приводят partition-all. Упрощенная реализация:

(defn partition-all-t [n]
  (fn [rf]
    (let [buffer (java.util.ArrayList. n)]  ;; состояние!
      (fn
        ([] (rf))
        ([acc]
          (if (.isEmpty buffer)
            ;; если буффер пустой - пробрасываем дальше
            (rf acc)
            ;; иначе...
            (let [v (vec (.toArray buffer))  ;; превращаем буфер в вектор
                  acc' (rf acc v)]           ;; сбрасываем при помощи 2-арной `rf`
              ;; а теперь можно и пробросить дальше
              (rf acc'))))
        ([acc c]
          (.add buffer c)
          (if (= n (.size buffer))
            ;; если буффер переполнился - "сбрасываем" его
            (let [v (vec (.toArray buffer))]
              (.clear buffer)
              (rf acc v))
            ;; иначе - ничего не делаем
            acc))))))

;; пользуем то, что наваяли (не указали начальный элемент, ведь (conj) => [])
(reduce ((partition-all-t 3) conj) (range 10))
; >> ClassCastException java.lang.Long cannot be cast to clojure.lang.IPersistentCollection
;; не работает...

;; ну ладно, а если указать []... 
(reduce ((partition-all-t 3) conj) [] (range 10))
;; => [[0 1 2] [3 4 5] [6 7 8]]
;; работает, но неверно...

Хм… Ни 0-арный, ни 1-арный варианты ((partition-all-t 3) conj) так и не были вызваны — обычная reduce ничего не знает про все эти новшества. Она 0-арный вариант вызывает только если коллекция пустая, 1-арный не вызывает вообще никогда.

Потому создали новую функцию transduce. Вот она уже, в отличие от «устаревшей» reduce, вызывает (rf) всегда, если только начальное состояние явно не задано. А еще эта функция гарантированно вызывает (rf acc), причем ровно один раз. А еще transduce сама вызывает наш трансдьюсер и прячет мутабельную редукт-функцию от наших глаз. Другими словами, вся грязная работа (в плане побочных эффектов) выполняется «под капотом».

;; передаем сам иммутабельный трансдьюсер, а не результат его работы
(transduce (partition-all-t 3) conj (range 10))
;; => [[0 1 2] [3 4 5] [6 7 8] [9]]
;; ии... работает!

;; композиция трансдьюсеров (опять работает)
(transduce (comp (filter odd?) (partition-all-t 3)) conj (range 10))
;; => [[1 3 5] [7 9]]

А что, если попробовать transduce вместо reduce использовать?

(reduce (identity -) 0 [1 2 3 4])
;; => -10
;; ~~ (- (- (- (- 0 1) 2) 3) 4)

(transduce identity - 0 [1 2 3 4])
;; => 10
;; не верно!

Оказывается, напрямую заменить reduce на transduce не получается — мешает новое требование 1-арной редукт-функции. В нашем примере после окончания вычислений transduce вызывает (- acc), что меняет знак результата на противоположный. Исправить ситуацию поможет completing:

((completing -) 3 2)
;; => 1

((identity -) 1)
;; => -1

((completing -) 1)
;; => 1

(transduce completing - 0 [1 2 3 4])
;; => -10
;; вот теперь правильно!

В ядре языка появились специальные функции для работы с трансформерами трансдьюсерами. Вполне ожидаемо, что также добавлен набор стандартных этих самых трансдьюсеров. А дабы не плодить множество новых функций (их и так много, запутаешься в два счета), решили проапгрейдить существующие map, filter, take, interpose, mapcat и компанию:

(map inc [1 2 3])
;; => (2 3 4)

(map inc)
;; => #<core$map$fn__4525 clojure.core$map$fn__4525@2d9e9038>
;; это трансдьюсер!

;; можно делать так
(transduce (map inc) + [1 2 3])
;; => 9

(transduce (comp (map inc) (filter even?)) + [1 2 3])
;; => 6
;; ~~ (+ (inc 1) (inc 3))  => 6

Помимо transduce есть еще несколько функций для работы с трансдьюсерами:

;; применяем трансдьюсер к коллекции
(sequence (map inc) [1 2 3])
;; => (2 3 4)

;; это равносильно такому коду
(transduce (map inc) conj [1 2 3])
;; => [2 3 4]

;; Но...
;; функция `sequence` выполняет трансдьюсер *лениво* !
;; с `transduce` такой фокус уже не сработает
(take 5 (sequence (map inc) (range)))
;; => (1 2 3 4 5)

;; в функцию `into` также добавили поддержку трансдьюсеров
(into [9] (map inc) [1 2 3])
;; => [9 2 3 4]

Но самая забавная функция, это eduction. Она возвращает прокси-объект, на котором можно вызывать seq, reduce или получить java-iterator. Ожидаемо, что этот объект просто вызовет transduce или sequnce. Мелочь, но удобно.

(def odds (eduction (filter odd?) (range)))
(def evens (eduction (remove odd?) (range)))

;; можно работать как с sequential
(take 5 odds)
;; => (1 3 5 7 9)

;; в памяти будет строится sequence из первых 100500 чисел
;; но ссылки на начало не останется - sequence соберется GC
(nth odds 100500)
;; => 2010001

;; а вот тут сразу будет запущен reduce (никаких временных LazyCol)
;; ~= (reduce ((filter even?) ((take 100500) +)) 0 (range))
(transduce (take 100500) + evens)
;; => 10100149500

Стоп, стоп, стоп. Оно ведь подозрительно напоминает clojure.core.reducers/reducer… Тот, правда, можно было только сворачивать, а тут еще seq разрешили запускать. Так что r/reducer выкидываем на помойку! Но только не r/folder, он умеет в многопоточность!

(require '[clojure.core.reducers :as r])
(def v (vec (range 1000000)))

(time (transduce (map inc) + v))
;; "Elapsed time: 120.193971 msecs"
;; => 500000500000

(time (r/fold + (r/folder v (map inc))))
;; "Elapsed time: 37.597224 msecs"
;; => 500000500000

;; но соблюдайте осторожность!
(transduce (take 100500) + v)
;; => 5050074750

(r/fold + (r/reducer v (take 100500)))
;; => 5050074750
;; верно

;; reducer устарел - лучше использовать eduction
(r/fold + (eduction (take 100500) v))
;; => 5050074750

(reduce + (r/folder v (take 100500)))
;; => 5050074750
;; даже так верно

(r/fold + (r/folder v (take 100500)))
;; => 109071345018
;; упс...
;; не всякий трансдьюсер параллелизируется (можно превратить в фолдер)

При использовании трансдьюсеров достигается как большая производительность по сравнению с обычными map/filter/etc (на основе ленивых последовательностей), так и большая гибкость/абстрактность. Замечу, что речь тут идет именно о clojur'ных последовательностях: по уровню абстракции и скорости трансдьюсеры сопоставимы с обычными итераторами/энумераторами/генераторами (в разных языках их принято звать по-разному).

Но вернемся к Clojure. Раньше в core.async была целая уйма функций вида map>, map<, filter<, filter>, и т.п. Нынче их убрали (ну как убрали, пока лишь задепрекейтили). Зато разрешили указать трансформер трансдьюсер при создании канала:

;; не забыли подключить библиотеку в project.clj
(require '[clojure.core.async :as a])

;; самый такой обычный трансдьюсер
(def xf (filter odd?))

;; и канал с буфером
(def ch (a/chan 10 xf))

;; положили в канал числа от 0 до 9 да и закрыли его
(a/onto-chan ch (range 10))

;; достаем числа из канала
(a/<!! (a/into [] ch))
;; => [1 3 5 7 9]

Трансдьюсер можно навесить только на буферизированные каналы. И перед тем, как элемент окажется в буфере, его обрабатывает наш трансдьюсер. Есть еще всякие pipeline'ы, они тоже с трансдьюсерами работают.

Подводим итоги

Разнообразные редьюсеры/трансдьюсеры — все суть обобщение операции свертки. А посему, требуют они редукт-функцию с 2-мя аргументами для работы своей полезной.

Помимо 2-арного варианта лучше заодно определить еще и 0-арный — он может быть использован, если не задано начальное состояние свертки. А может быть и не использован: если исходная коллекция не пустая, тогда reduce возьмет ее первый элемент. А вот transduce так подло не поступает — либо начальное состояние передается в него явно, либо используется 0-арный вызов редукт-функции.

С другой стороны, transduce требует от редукт-функции большего — обязательно нужен 1-арный вариант. Который, в общем случае, чаще всего совсем ничего не делает. Серьезно, обычно ([x] x) — самая осмысленная реализация в данном случае. Но ведь мы ленивые, переписывать старые функции (0/2-арные) нам лень, поэтому используем обертку completing, которая добавляет пустой 1-арный вариант.

Далее, редьюсеры основаны на трансформерах. Трансформер = функция с типом rf -> rf. Фактически редьюсер — это коллекция, к которой намертво прикрутили трансформер. И, всякий раз, как мы reduce по этой коллекции запускаем, сначала трансформер «портит» нашу редукт-функциию.

Трансдьюсер ~= трансформер, только требует еще поддержку 1-арной редукт-функции. Так что всегда определяем этот самый злополучный 1-арник, и гордо так всем заявляем: «ну конечно же я не использую устаревшие трансформеры, только трансдьюсеры».

При всем этом трансдьюсеры не ограничиваются работой с коллекциями. Можно их прикрутить к каналам, потокам ввода-вывода, очередям, обсерверам и т.п. В общем ко всему, на что фантазии хватит.

Итого:

  • если создаем новый алгоритм обработки последовательностей, то стоит попробовать его написать как трансдьюсер;
  • когда куча данных в памяти, нету I/O, не нужна ленивость — используем reducers;
  • хотим обработать коллекцию и запустить на ней reduce — неплохо бы попробовать transduce;
  • но это вовсе не обязательно — преждевременные оптимизации не есть добро;
  • фильтрованные каналы, хитрые подписчики на события… трансдьюсеры тут так и просятся;
  • нужны ленивые последовательности или не знаем что же нам использовать — старые добрые map, filter и их друзья.

Автор: anjensan

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js