Модель памяти, существующая на данный момент в Java, гарантирует ожидаемый порядок выполнения многопоточного кода, при отсутствии в этом коде гонок потоков. И для того, чтобы обезопасить ваш код от гонок, придуманы различные способы синхронизации и обмена данными между ними.
Пакет java.util.concurrent
, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:
- Atomic
- Locks
- Collections
- Synchronization points
- Executors
- Accumulators _jdk 1.8_
Atomic
В дочернем пакете java.util.concurrent.atomic
находится набор классов для атомарной работы с примитивными типами. Контракт данных классов гарантирует выполнение операции compare-and-set
за «1 единицу процессорного времени». При установке нового значения этой переменной вы также передаете ее старое значение (подход оптимистичной блокировки). Если с момента вызова метода значение переменной отличается от ожидаемого — результатом выполнения будет false
.
Для примера возьмем два массива long
переменных [1,2,3,4,5]
и [-1,-2,-3,-4,-5]
. Каждый из потоков будет последовательно итерироваться по массиву и суммировать элементы в единую переменную. Код (groovy) с пессимистичной блокировкой выглядит так:
class Sum {
static monitor = new Object()
static volatile long sum = 0
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
synchronized (Sum.monitor) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
Sum.sum += it
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Результат выполнения будет ожидаемым:
pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0
Однако такой подход имеет существенные недостатки по производительности. В данном случае на бесполезную для нас работу, уходит больше ресурсов, чем на полезную:
- попытка блокирования монитора
- блокировка потока
- разблокировка монитора
- разблокировка потока
Рассмотрим использование AtomicLong
для реализации оптимистичной блокировки при расчете этой же суммы:
class Sum {
static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
while(true) {
long localSum = Sum.sum.get()
if (Sum.sum.compareAndSet(localSum, localSum + it)) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
break;
} else {
println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
}
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Как видно из результатов «ошибочных» попыток было не так уж и много:
[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0
При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные compare-and-set
, и тем чаще придется выполнять это действие повторно.
На основе compare-and-set
может также реализовываться неблокирующая read блокировка. В данном случае в atomic переменной будет храниться версия обрабатываемого объекта. Получив значение версии до вычислений мы можем сверить ее после вычисления. Обычные read-write
блокировки вступают в силу, только если проверка версии провалилась.
class Transaction {
long debit
}
class Account {
AtomicLong version = new AtomicLong()
ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
List<Transaction> transactions = new ArrayList<Transaction>()
}
long balance(Account account) {
ReentrantReadWriteLock.ReadLock locked
while(true) {
long balance = 0
long version = account.version.get()
account.transactions.each {balance += it.debit}
//volatile write for JMM
if (account.version.compareAndSet(version, version)) {
if (locked) {locked.unlock()}
return balance
} else {
locked = account.readWriteLock.readLock()
}
}
}
void modifyTransaction(Account account, int position, long newDebit) {
def writeLock = account.readWriteLock.writeLock()
account.version.incrementAndGet()
account.transactions[position].debit = newDebit
writeLock.unlock()
}
Locks
ReentrantLock
В отличие от syncronized блокировок, ReentrantLock
позволяет более гибко выбирать моменты снятия и получения блокировки т.к. использует обычные Java вызовы. Также ReentrantLock
позволяет получить информацию о текущем состоянии блокировки, разрешает «ожидать» блокировку в течение определенного времени. Поддерживает правильное рекурсивное получение и освобождение блокировки для одного потока. Если вам необходимы честные блокировки (соблюдающие очередность при захвате монитора) — ReentrantLock
также снабжен этим механизмом.
Несмотря на то, что syncronized
и ReentrantLock
блокировки очень похожи — реализация на уровне JVM отличается довольно сильно.
Не вдаваясь в подробности JMM: использовать ReentrantLock
вместо предоставляемой JVM syncronized блокировки стоит только в том случае, если у вас очень часто происходит битва потоков за монитор. В случае, когда в syncronized метод _обычно_ попадает лишь один поток — производительность ReentrantLock
уступает механизму блокировок JVM.
ReentrantReadWriteLock
Дополняет свойства ReentrantLock
возможностью захватывать множество блокировок на чтение и блокировку на запись. Блокировка на запись может быть «опущена» до блокировки на чтение, если это необходимо.
StampedLock _jdk 1.8_
Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):
double distanceFromOriginV1() { // A read-only method
long stamp;
if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
double currentX = x;
double currentY = y;
if (sl.validate(stamp))
return Math.sqrt(currentX * currentX + currentY * currentY);
}
stamp = sl.readLock(); // fall back to read lock
try {
double currentX = x;
double currentY = y;
return Math.sqrt(currentX * currentX + currentY * currentY);
} finally {
sl.unlockRead(stamp);
}
}
Collections
ArrayBlockingQueue
Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие (put()
take()
) и неблокирующие (offer()
pool()
) методы. Запрещает null значения. Емкость очереди должна быть указанна при создании.
ConcurrentHashMap
Ключ-значение структура, основанная на hash
функции. Отсутствуют блокировки на чтение. При записи блокируется только часть карты (сегмент). Кол-во сегментов ограничено ближайшей к concurrencyLevel
степени 2.
ConcurrentSkipListMap
Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.
ConcurrentSkipListSet
ConcurrentSkipListMap
без значений.
CopyOnWriteArrayList
Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.
CopyOnWriteArraySet
CopyOnWriteArrayList
без значений.
DelayQueue
PriorityBlockingQueue
разрешающая получить элемент только после определенной задержки (задержка объявляется через Delayed
интерфейс объекта). DelayQueue
может быть использована для реализации планировщика. Емкость очереди не фиксирована.
LinkedBlockingDeque
Двунаправленная BlockingQueue
, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.
LinkedBlockingQueue
Однонаправленная BlockingQueue
, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.
LinkedTransferQueue
Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.
PriorityBlockingQueue
Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.
SynchronousQueue
Однонаправленная `BlockingQueue`, реализующая transfer()
логику для put()
методов.
Synchronization points
CountDownLatch
Барьер (await()
), ожидающий конкретного (или больше) кол-ва вызовов countDown()
. Состояние барьера не может быть сброшено.
CyclicBarrier
Барьер (await()
), ожидающий конкретного кол-ва вызовов await()
другими потоками. Когда кол-во потоков достигнет указанного будет вызван опциональный callback и блокировка снимется. Барьер сбрасывает свое состояние в начальное при освобождении ожидающих потоков и может быть использован повторно.
Exchanger
Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.
Phaser
Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.
Semaphore
Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.
Executors
ExecutorService
пришел на замену new Thread(runnable)
чтобы упростить работу с потоками. ExecutorService
помогает повторно использовать освободившиеся потоки, организовывать очереди из задач для пула потоков, подписываться на результат выполнения задачи. Вместо интерфейса Runnable
пул использует интерфейс Callable
(умеет возвращать результат и кидать ошибки).
ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
Object call() throws Exception {
println("In thread")
return "From thread"
}
})
println("From main")
println(future.get())
try {
pool.submit(new Callable() {
Object call() throws Exception {
throw new IllegalStateException()
}
}).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}
pool.shutdown()
Метод invokeAll
отдает управление вызвавшему потоку только по завершению всех задач. Метод invokeAny
возвращает результат первой успешно выполненной задачи, отменяя все последующие.
ThreadPoolExecutor
Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.
ScheduledThreadPoolExecutor
Расширяет функционал ThreadPoolExecutor
возможностью выполнять задачи отложенно или регулярно.
ThreadPoolExecutor
Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.
class LNode {
List<LNode> childs = []
def object
}
class Finder extends RecursiveTask<LNode> {
LNode node
Object expect
protected LNode compute() {
if (node?.object?.equals(expect)) {
return node
}
node?.childs?.collect {
new Finder(node: it, expect: expect).fork()
}?.collect {
it.join()
}?.find {
it != null
}
}
}
ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
node: new LNode(
childs: [
new LNode(object: "ivalid"),
new LNode(
object: "ivalid",
childs: [new LNode(object: "test")]
)
]
),
expect: "test"
))
print("${invoke?.object}")
Accumulators _jdk 1.8_
Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.
Автор: fls_welvet