SettableFuture<V>, или как выстрелить себе в ногу сферическим велосипедом в вакууме

в 18:55, , рубрики: concurrency, java, велосипеды, метки: , ,

Сегодня я расскажу про дизайн тривиального concurrent-класса в JDK. Может быть, это будет удобнее, чем абстрактно объяснять разные концепции из JMM и concurrent-кода.

Все продвинутые парни знают, что такое Future<V> — это обещание предоставить результат типа V. Future'ы удобны, чтобы предоставлять результат асинхронных задач. Например, ExecutorService возвращает Future для описания результата задачи, которая когда-нибудь выполнится в thread pool'е.

Частенько в написании хардкорного concurrent-кода требуется такой примитив, как SettableFuture<V>, который будет выполнять все функции Future<V>, но в который можно будет также выставить значение из другого потока. Эдакий асинхронный mailbox.

За свою недолгую жизнь я видел несколько вариантов реализации такого SettableFuture, рассмотрим некоторые из них, и на заботливо разложенные в них грабли. Большинство примеров реально существовали, некоторые из них были домыслены ради плавности изложения. Чтобы вам не было сильно скучно, попробуйте не читать объяснение после каждого примера, а найти грабли самостоятельно. Для уменьшения простыни мы реализуем только методы set() и get(). Все персонажи вымышлены, хотя пост и основан на реальных событиях.

Способ №1: wait()/notifyAll()

Встречается в разных вариантах, с разным количеством граблей. Если дать такую задачку человеку (назовём его Васей), который с concurrency на «уважительное Вы», он почти наверняка напишет что-нибудь в духе:

public class SettableFuture<V> implements Future<V> {
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        wait();
        return slot;
    }

    public void set(V value) {
        slot = value;
        notifyAll();
    }
}

… что, понятно, не работает, и даже валится со странным исключением, мол, IllegalMonitorStateException, в полном соответствии с Javadoc. Поставив перед собой цель побороть wait/notify, следующую версию Вася напишет так:

public class SettableFuture<V> implements Future<V> {
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        synchronized(this) {
            wait();
        }
        return slot;
    }

    public void set(V value) {
        slot = value;
        synchronized(this) {
            notifyAll();
        }
    }
}

Это тоже не работает, потому что wait() допускает spurious wakeups. То есть, читатели рискуют уйти с null'ом в руках. Изрядно поругавшись, Василий объявит вендетту wait/notify и напишет ну стопроцентно правильный код:

public class SettableFuture<V> implements Future<V> {
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        while (slot == null) {
            synchronized(this) {
                wait();
            }
        }
        return slot;
    }

    public void set(V value) {
        slot = value;
        synchronized(this) {
            notifyAll();
        }
    }
}

Как будто бы починено, но на самом деле пара set() / get() не является так называемой «точкой безопасной публикации». Так, если один поток set()'ом выставит slot в какое-нибудь значение, то второй поток может придти в get(), обнаружить slot != null и с ним вернуться. Модель памяти при этом не гарантирует, что все записи в объект, опубликованный вторым потоком, будут видны первому. Безопасная публикация есть довольно интересный феномен, который позволяет хорошо локализовывать проблемы с видимостью значений. Большая часть конкаррентных JDK'шных классов безопасно публикует объекты, например, Executor.submit() и прочие.

При достаточном тестировании Вася это поймает и поймёт, что правильно писать вот так:

public class SettableFuture<V> implements Future<V> {
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        synchronized(this) {
            while (slot == null) {
                wait();
            }
        }
        return slot;
    }

    public void set(V value) {
        slot = value;
        synchronized(this) {
            notifyAll();
        }
    }
}

В этом случае каждый читатель обязан будет синхронизироваться на том же объекте, что и писатели в set(), что как будто даёт гарантии безопасной публикации. Более того, именно такая идиома рекомендуется в Javadoc к wait(). К сожалению, с безопасной публикацией ещё не всё чисто, но Васины стресс-тесты уже проходят и теперь он может подумать о вечном. Во-первых, неплохо было бы обеспечить однократную запись. А то чего доброго, что получим в нескольких читателях разные значения.

public class SettableFuture<V> implements Future<V> {
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        synchronized(this) {
            while (slot == null) {
                wait();
            }
        }
        return slot;
    }

    public void set(V value) {
        if (slot == null) {  // ещё не записали?
            slot = value;
            synchronized(this) {
                notifyAll();
            }
        }
    }
}

Усталый, но довольный, Вася деплоит это в продакшн, а через день к нему с дрыном прибегает ПМ и начинает пороть Васю за intermittent гейзенбаги. Потирая ушибленные места, Василий осознает, что в этом коде гонка на set(): два писателя могут оба выставить значения, потому что каждый из них не увидит апдейт другого. А вот get()-ы смогут прочитать эти разные значения, если очень не повезёт. Вытащив с задворок сознания одну идейку, Вася предастся карго-культу и поставит модификатор volatile:

public class SettableFuture<V> implements Future<V> {
    private volatile V slot;

    public V get() throws InterruptedException, ExecutionException {
        synchronized(this) {
            while (slot == null) {
                wait();
            }
        }
        return slot;
    }

    public void set(V value) {
        if (slot == null) {  
            slot = value;
            synchronized(this) {
                notifyAll();
            }
        }
    }
}

Этим, кстати, Вася поправит и проблему с безопасной публикацией, но вряд ли это заметит.

На следующее утро Васю встретит уже близко знакомый ему ПМ и спросит, почему баги всё ещё проявляются, хотя и реже? Удивленный Вася пойдёт читать форумы и внезапно осознает, что пара операций над volatile не атомарна: может оказаться, что slot == null в обоих потоках-писателях, и они всё равно запишут оба значения. Просветившись, Вася сделает следующий локально-оптимальный шаг:

public class SettableFuture<V> implements Future<V> {
    private final AtomicReference<V> slot = new AtomicReference<>();

    public V get() throws InterruptedException, ExecutionException {
        synchronized(this) {
            while (slot.get() == null) {
                wait();
            }
        }
        return slot.get();
    }

    public void set(V value) {
        if (slot.compareAndSet(null, value)) {  
            synchronized(this) {
                notifyAll();
            }
        }
    }
}

Вася доволен собой. Ближе к концу рабочего дня Вася обнаруживает, что этот код весьма гадко ведёт себя при передаче null-ов: слот-то вроде выставляется, но читатели сидят себе в цикле по wait() и не думают оттуда выходить. Вася плюёт и дописывает отдельный флажок, чтобы можно было передавать null:

public class SettableFuture<V> implements Future<V> {

    private V slot;
    private final AtomicBoolean isSet = new AtomicBoolean();

    public V get() throws InterruptedException, ExecutionException {
        synchronized(this) {
            while (!isSet.get()) {
                wait();
            }
        }
        return slot;
    }

    public void set(V value) {
        if (isSet.compareAndSet(false, true)) {  
            slot = value;
            synchronized(this) {
                notifyAll();
            }
        }
    }
}

Этот код тоже поломан: читатель может успешно пройти через while, обнаружить, что флажок уже выставлен, но значение ещё не выставлено, и читатель рискует уйти с null'ом. Для видимости надо бы перенести присвоение slot перед CAS'ом, но это разрешит повторную запись. От этой гонки не спасёт даже volatile на slot. Возможным ответом будет затащить присвоение и возврат в synchronized:

public class SettableFuture<V> implements Future<V> {

    private V slot;
    private final AtomicBoolean isSet = new AtomicBoolean();

    public V get() throws InterruptedException, ExecutionException {
        synchronized(this) {
            while (!isSet.get()) {
                wait();
            }
            return slot;
        }
    }

    public void set(V value) {
        if (isSet.compareAndSet(false, true)) {  
            synchronized(this) {
                slot = value;
                notifyAll();
            }
        }
    }
}

В этом коде вроде всё круто, кроме того, что get() всегда синхронизирован. Попытка это исправить:

public class SettableFuture<V> implements Future<V> {

    private V slot;
    private final AtomicBoolean isSet = new AtomicBoolean();

    public V get() throws InterruptedException, ExecutionException {
        if (!isSet.get()) {
            synchronized(this) {
                while (!isSet.get()) {
                    wait();
                }
                return slot;
            }
        }
        return slot;
    }

    public void set(V value) {
        if (isSet.compareAndSet(false, true)) {  
            synchronized(this) {
                slot = value;
                notifyAll();
            }
        }
    }
}

… заново приводит к гонке: читатель может сразу вернуть slot = null, пока писатель ещё не вышел из критической секции, а то и вовсе ещё не записал значение. Попытки это исправить обычно заканчиваются либо введением второго флажка, либо внезапным озарением, что выставление статуса и присваивание объекта надо совмещать (кажется, это уже было раньше), и можно решить проблему с null'ами (может тут уже нет багов?):

public class SettableFuture<V> implements Future<V> {
    private static final Object NOT_SET = new Object();
    private final AtomicReference<V> slot = new AtomicReference<>((V)NOT_SET);

    public V get() throws InterruptedException, ExecutionException {
        if (slot.get() == NOT_SET) {
            synchronized(this) {
                while (slot.get() == NOT_SET) {
                    wait();
                }
            }
        }
        return slot.get();
    }

    public void set(V value) {
        if (slot.get() == NOT_SET) {
            if (slot.compareAndSet(NOT_SET, value)) {  
                synchronized(this) {
                    notifyAll();
                }
            }
        }
    }
}

Вася выдыхает. Знающие люди начинают нервно подёргивать глазом, видя такой замес на атомиках и synchronized. По крайней мере, я весьма подозрительно отнёсся к этому коду.

Способ №2: Одноэлементная BlockingQueue

Disclaimer: только не ржите, я такое видел в реале.

Живущий в другом городе Миша более просветлён по поводу concurrent-классов. Миша даже написал несколько нетривиальных паралельных программ, и знает, что правильный способ передавать данные между потоками — это очереди. Поэтому Миша пишет следующий класс:

public class SettableFuture<V> implements Future<V> {
    private final BlockingQueue<V> q = new ArrayBlockingQueue<V>(1);

    public V get() throws InterruptedException, ExecutionException {
        return q.take();
    }

    public void set(V value) {
        q.put(value);
    }
}

Миша доволен. Минимум кода, максимум профита. Довольно быстро Миша замечает, то на каждого читателя один писатель должен что-то положить в очередь. Очень требовательные у Миши читатели — каждый требует аудиенции у писателя. Почесав темечко, Миша делает первую попытку это исправить:

public class SettableFuture<V> implements Future<V> {
    private final BlockingQueue<V> q = new ArrayBlockingQueue<V>(1);

    public V get() throws InterruptedException, ExecutionException {
        V v = q.take();
        q.put(v);
        return v;
    }

    public void set(V value) {
        q.put(value);
    }
}

Хочется называть этот паттерн «подёргивание». У него есть гадкий эффект: пока читатель готовится переложить значение в уже пустую очередь, проснётся писатель, положит новое значение, и читатель застрянет навсегда. Осознав это, Миша ослабит читателей:

public class SettableFuture<V> implements Future<V> {
    private final BlockingQueue<V> q = new ArrayBlockingQueue<V>(1);

    public V get() throws InterruptedException, ExecutionException {
        V v = q.take();
        q.offer(v);
        return v;
    }

    public void set(V value) {
        q.put(value);
    }
}

Читатели теперь гарантированно не блокируются. Но несколько писателей всё равно ждут нескольких читателей. Поэтому Мише приходится ослаблять и писателей.

public class SettableFuture<V> implements Future<V> {
    private final BlockingQueue<V> q = new ArrayBlockingQueue<V>(1);

    public V get() throws InterruptedException, ExecutionException {
        V v = q.take();
        q.offer(v);
        return v;
    }

    public void set(V value) {
        q.offer(value);
    }
}

Как бы Мише не грезилось, но этот код допускает повторную запись, если писатель сможет запихнуть новое значение, пока читатель его передёргивает. Поэтому Миша делает ход конём и начинает использовать очередь только для блокировки потоков:

public class SettableFuture<V> implements Future<V> {
    private volatile V slot;
    private final BlockingQueue<V> q = new ArrayBlockingQueue<V>(1);

    public V get() throws InterruptedException, ExecutionException {
        V v = q.take();
        q.offer(v);
        return slot;
    }

    public void set(V value) {
        if (q.offer(value)) {
            if (slot != null) {
                slot = value;
            }
        }
    }
}

А дальше разворачиваются гонки по васиному сценарию. Придётся делать CAS, откуда всплывут проблемы с null'ом, откуда снова родится треш, угар и содомия. Продолжать эту экзекуцию не будем, можете сами додумать.

Способ №3: CountDownLatch

Disclaimer: так делает большинство вменяемых девелоперов.

Петя не слушает голоса, нашёптывающие ему использовать блокирующую очередь. Петя знает, что для таких нотификаций придумали CountDownLatch. Петя пишет:

public class SettableFuture<V> implements Future<V> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        latch.await();
        return slot;
    }

    public void set(V value) {
        slot = value;
        latch.countDown();
    }
}

Довольно быстро Петя соображает, что этот код допускает несколько записей. Петя молниеносно исправляет код:

public class SettableFuture<V> implements Future<V> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        latch.await();
        return slot;
    }

    public void set(V value) {
        if (latch.getCount() != 0) {
            slot = value;
            latch.countDown();
        }
    }
}

К сожалению, это гонка. Два потока входят в set(), оба обнаруживают латч в состоянии «можно писать», записывают значение, и опускают латч. Несколько читателей могут прочитать разные значения. Хорошо подумав, Петя исправляет этот код:

public class SettableFuture<V> implements Future<V> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private final Semaphore semaphore = new Semaphore(1);
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        latch.await();
        return slot;
    }

    public void set(V value) {
        semaphore.acquire();
        try {
            if (latch.getCount() != 0) {
                slot = value;
                latch.countDown();
            }             
        } finally {
             semaphore.release();
        }
    }
}

Чисто функционально этот код работает и даже без гонок. Однако Петя всё равно не очень доволен: что если потоки будут пытаться потом выставить значение? Не задерживать же их на семафоре? Несмотря на кажущуюся невероятность такого случая, Петя не хочет давать ни единого шанса перформансным проблемам (тем самым, кстати, нарушая заповедь Кнута):

public class SettableFuture<V> implements Future<V> {

    private final CountDownLatch latch = new CountDownLatch(1);
    private final Semaphore semaphore = new Semaphore(1);
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        latch.await();
        return slot;
    }

    public void set(V value) {
        if (latch.getCount() != 0) {
            semaphore.acquire();
            try {
                if (latch.getCount() != 0) {
                    slot = value;
                    latch.countDown();
                }             
            } finally {
                semaphore.release();
            }
        }
    }
}

Теперь Петя рассчитывает, что залётные писатели сразу увидят состояние латча и вернутся. После нескольких минут разглядывания своего кода Петя обнаруживает, что это по сути замаскировавшийся double-checked locking, и Пете известно, что он не работает без volatile. Поэтому он подрывается и делает ещё один фикс:

public class SettableFuture<V> implements Future<V> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private final Semaphore semaphore = new Semaphore(1);
    private volatile V slot;

    public V get() throws InterruptedException, ExecutionException {
        latch.await();
        return slot;
    }

    public void set(V value) {
        if (latch.getCount() != 0) {
            semaphore.acquire();
            try {
                if (latch.getCount() != 0) {
                    slot = value;
                    latch.countDown();
                }             
            } finally {
                semaphore.release();
            }
        }
    }
}

Хотя volatile в этом конкретном случае и не нужен. (Вопрос со звёздочкой читателю: почему?). Часто вместо семафора используют обычный RL, защищаясь tryAcquire(), который не пытается сделать set(), если обнаруживает конфликт. В этом коде багов быть не должно:

public class SettableFuture<V> implements Future<V> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private final Lock lock = new ReentrantLock();
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        latch.await();
        return slot;
    }

    public void set(V value) {
        if (lock.tryAcquire()) {
            try {
                if (latch.getCount() != 0) {
                    slot = value;
                    latch.countDown();
                }             
            } finally {
                lock.release();
            }
        }
    }
}

Можно было бы обойтись и CAS'ом на флажке:

public class SettableFuture<V> implements Future<V> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private final AtomicBoolean isSet = new AtomicBoolean();
    private V slot;

    public V get() throws InterruptedException, ExecutionException {
        latch.await();
        return slot;
    }

    public void set(V value) {
        if (!isSet.get() && isSet.compareAndSet(false, true)) {
            slot = value;
            latch.countDown();
        }
    }
}

… но это уже для эстетов.

Способ №4: FutureTask

Алексей куда более искушён в конкаррентных библиотеках. Он задумывается, а как же сами j.u.c.* классы выставляют значение Future'у? Рыская по документации, он обнаруживает класс FutureTask, который оборачивает Callable и реализует Future. Внимательно разглядывая его методы, Алексей замечает, что у FutureTask есть метод set(V value), но он protected, и его нужно опубликовать. Поэтому Алексей пишет:

public class SettableFuture<V> extends FutureTask<V> {
    public V get() throws InterruptedException, ExecutionException {
        return super.get();
    }

    public void set(V value) {
        super.set(value);
    }
}

К вящему удивлению Алексея, в этом коде есть досадный баг. Точнее, даже не в коде Алексея, а в самом FutureTask. Там, оказывается, есть гонка между get() и set(), в итоге которой get() может вернуть null, даже если выставлено не нулевое значение. Фокус в том, что баг не гарантирует видимость записанного значения по выходу из set(). Поэтому нам нужно руками эту видимость обеспечить. В новой версии jsr166 это исправлено, но до JDK этот фикс ещё не добрался.

Наивный способ обеспечить видимость:

public class SettableFuture<V> extends FutureTask<V> {
    private volatile V result;

    public V get() throw InterruptedException, ExecutionException {
        super.get();
        return result;
    }

    public void set(V value) {
        result = value;
        super.set(value);
    }
}

… используя свойства volatile: запись в result будет сразу видна, а читатели разблокируются только после удачного set(). Этот код вполне удачно работает, пока мы имеем исключительно методы get() и set(). Future, помимо всего прочего, также может отдать и exception, и гонка на get()/setException() может произойти по такому же сценарию. Эта гонка куда менее вероятна, ибо за время создания обёртки в виде ExecutionException значение в подавляющем большинстве случаев таки доберётся до читателя. Чтобы застраховаться от такого случая, придётся сохранять exception в volatile-поле, а это приведёт к громоздкой конструкции:

public class SettableFuture<V> extends FutureTask<V> {
    private volatile V result;
    private volatile Throwable throwable;

    public V get() throw InterruptedException, ExecutionException {
        try {
            super.get();
        } catch (Throwable t) {
            return new ExecutionException(throwable);
        }
        return result;
    }

    public void set(V value) {
        result = value;
        super.set(value);
    }

    public void setException(Throwable t) {
        throwable = t;
        super.setException(t);
    }
}

В итоге Алексей решает подождать фикса в JDK, а вместо полугромоздких конструкций полагается на правила видимости synchronized:

public class SettableFuture<V> extends FutureTask<V> {
    public V get() throw InterruptedException {
        super.get();
        synchronized(this) {
            return super.get();
        }
    }

    public void set(V value) {
        synchronized(this) {
            super.set(value);
        }
    }

    public void setException(Throwable t) {
        synchronized(this) {
            super.setException(t);
        }
    }
}

… что может нести за собой накладные расходы на раздувание мониторов, но зато не дублируются поля.

Резюме

  1. Учите Java Memory Model, читайте Herlihy/Shavit'а AoMP, узнавайте, как работает хардвар.
  2. В concurrent-коде думаем глобальнее. Локальное изменение, стопроцентно чинящее локальный баг, может вызвать сотни багов в других местах. Даже если кажущееся поведение не изменилось. Даже если однониточные тесты это не поймали.
  3. Не верим никому. Если класс не был протестирован и/или верифицирован, он априори сломан.
  4. Толкаем реализацию FutureValue в jsr166. Её можно сделать ещё более эффективно при помощи AbstractQueuedSynchronizer, но это совсем другая история.

Автор: TheShade

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js