Под катом находятся заметки, в которых расписано, как реализовать в Rust хитрые concurrency паттерны, которые я с легкостью пишу в Java, и в чем различие в подходах к concurrency у этих языков. Статья будет полезна и тем, кто переходит на Rust из C#, ведь у него аналогичная модель памяти.
Пара слов о разнице в подходах Java и Rust
Java первый язык с моделью памяти (которая описывает синхронизацию операций чтения и записи в память), Rust наследует модель памяти C++11 в реализации LLVM. Так что с точки зрения терминологии есть много различий. По сути же в Java есть 3 механизма синхронизации разделяемой памяти: volatile переменные, блокировки (блоки synchronized, ReentrantLock и т.д.) и atomic. В Rust только блокировки (Mutex, Rwlock) и atomic. Volatile не нужен поскольку дублирует функционал atomic. Передачу сообщений через каналы опустим.
Ещё отличия:
- В Rust нет сборщика мусора, поэтому придется класть расшаренные между потоками объекты в атомарный счетчик ссылок Arc. Более того, Rust трепетно относится к неизменяемым объектам, поэтому что бы изменить внутреннее состояние объекта без захвата mut ссылки придется обернуть его в Cell или аналог (RefCell, UnsafeCell).
- Операции над atomic в Rust используют барьеры четырех типов, но это не сильно меняет правила игры. Используйте Relaxed, когда чтение через гонку приемлемо (например под блокировкой), Acquire на чтение + Release на запись — когда не нужно что бы все потоки видели одинаковый порядок операций, и SeqCst — когда нужно. Напоминаю, что volatile в Java дает гарантии SeqCst. Для атомарных операций, которые делают за раз и чтение и запись (например swap), предусмотрена опция AcqRel (Acquire+Release).
- В Rust нет null и его нельзя использовать как индикатор. Для этих целей используйте Option, который вокруг указателя ничего не стоит. В Java порой использование null для индикатора тоже не лишено опасностей — см. второй пример.
- AtomicPtr в Rust не то же, что AtomicReferenc в Java. Это не очевидный момент, если смотреть только на семантику, но он становится очевидным при использовании — см. последний пример. Чаще проще использовать отдельный AtomicBool для синхронизации.
- В Rust блокировка (Mutex, RwLock) неотделима от объекта, вокруг которого она берется. И доступ к объекту нельзя получить иным способом, кроме как через захват блокировки. В Java тоже нужен объект синхронизации для монитора, но обращаться к нему можно без его захвата.
- Вы должны сознательно унаследовать Sync с помощью unsafe для класса, который хотите передавать между потоками.
- В отличие от сравнительно простой идеи happens-before, которая хороша общностью, идея барьеров намного более сложная для анализа кода. Думаю, вы увидите это на примерах.
- Всё таки Rust подразумевает, что вы не будете часто шарить изменяемые данные между потоками и организуете работу иначе.
Учтите так же, что я сам только учу Rust поэтому к коду на нем стоит отнестись скептически. Я его тестировал, но в контексте concurrency это ничего не гарантирует.
Double-check lock
В Java мире очень любят double-check-lock т.к. до Java 1.5 его невозможно было реализовать корректно. Напоминаю как это выглядит:
public class Lazy<T> {
volatile T val;
final Supplier<T> supp;
public Lazy(Supplier<T> supp) {
this.supp = supp;
}
public T get() {
if (val == null) {
synchronized (this) {
if (val == null) {
val = supp.get();
}
}
}
return val;
}
}
Людей, привыкших к написанию concurrency кода, таким не удивить, однако если вдуматься тут есть масса опасных операций. Во-первых, чтение через гонку. Во-вторых, мы шарим между потоками изменяемый объект (метод get меняет внутреннее состояние объекта). Rust такого не любит. Он вообще злой. Тем не менее максимально близкий аналог в Rust выглядит так:
pub struct Lazy<T, F: Fn() -> T> {
init: AtomicBool,
val: UnsafeCell<Option<T>>,
supp: Mutex<F>
}
unsafe impl <T, F: Fn() -> T> Sync for Lazy<T, F> {}
impl <T, F: Fn() -> T> Lazy<T, F> {
fn new(func: F) -> Self {
Lazy{
init: AtomicBool::new(false),
val: UnsafeCell::new(None),
supp: Mutex::new(func)
}
}
fn get<'a>(&'a self) -> &'a T {
if !self.init.load(Ordering::Acquire) {
let func = self.supp.lock().unwrap();
if !self.init.load(Ordering::Relaxed) {
let value = unsafe { &mut *self.val.get() };
*value = Some(func());
self.init.store(true, Ordering::Release);
}
}
unsafe {
(*self.val.get()).as_ref().unwrap()
}
}
}
Выглядит необычно, но stackoverflow считает, что так и надо.
Кода тут намного больше. Не обращайте внимания на Mutex вокруг supp — он просто где-то должен быть. Из-за отсутствия null пришлось обернуть val в Option. А из-за сложностей в использовании AtomicPtr пришлось добавить AtomicBool флаг инициализации.
Здесь обратите внимание на разницу в доказательстве корректности. Например в Java, если убрать volatile, то вы просто говорите, что нет happens-before из записи в чтение, и на этом анализ заканчивается. Искать конкретный вариант исполнения, при котором проблема реализуется, не нужно.
В Rust же вы рискуете попасть в ловушку из-за разных вариантов барьеров памяти. Поэтому придется пробежать все варианты исполнения. Если вы прочитали в init true это значит вы видите запись в него и в val (по транзитивности). Поэтому все сработает нормально. Иначе вы берете блокировку, тогда чтение true там тоже гарантирует видимость записи в val. В последнем третьем случае если вы и есть тот поток, что записал значение, вы его тоже увидите т.к. другие потоки не пишут значение (и если бы вы не могли прочитать свою собственную запись — это сломало бы однониточные программы).
Double-check lock с удалением Supplier
В прошлом примере переменная supp не нужна после использования. Я встретил этот пример в докладе Шипилева о JMM.
Код в Java:
public class LazyOpt<T> {
volatile Supplier<T> supp;
T val;
final static Supplier EMPTY = new Supplier() {
@Override public Object get() {
return null;
}
};
public LazyOpt(Supplier<T> supp) {
this.supp = supp;
}
public T get() {
if (supp != EMPTY) {
synchronized (this) {
if (val == null) {
val = supp.get();
supp = EMPTY;
}
}
}
return val;
}
}
Тут все просто и знакомо: это почти double-check-lock, с той лишь разницей, что мы удаляем функцию, вычисляющую значение. Спецификация JMM позволяет нам увидеть supp == null до того как туда кто-то что-то положил, поэтому надо использовать EMPTY (подробнее в докладе Шипилева в самом конце, он просил не спрашивать это на собеседованиях). Volatile на supp нужен, что бы пробросить happens-before между операцией записи внутри synchronized и чтением вне его. Volatile на val не нужен, т.к. тот, кто видит запись в supp, видит и запись в val.
Сразу стоит сказать, что этот пример может и не иметь смысла в Rust. У структур на стеке фиксированный размер, и если supp не указатель, то ничего не выиграть, если затрете его в None. Но если supp по какой-то причине Arc, тогда можно.
Аналог в Rust:
pub struct Lazy<T, F: Fn() -> T> {
init: AtomicBool,
val: UnsafeCell<Option<T>>,
supp: Mutex<UnsafeCell<Option<Arc<F>>>>,
}
unsafe impl <T, F: Fn() -> T> Sync for Lazy<T, F> {}
impl <T, F: Fn() -> T> Lazy<T, F> {
fn new(func: F) -> Self {
Lazy{
init: AtomicBool::new(false),
val: UnsafeCell::new(None),
supp: Mutex::new(UnsafeCell::new(Some(Arc::new(func)))),
}
}
fn get<'a>(&'a self) -> &'a T {
if !self.init.load(Ordering::Acquire) {
let supp = self.supp.lock().unwrap();
if !self.init.load(Ordering::Relaxed) {
let value = unsafe { &mut *self.val.get() };
let func = unsafe { & *supp.get() };
if let Some(ref f) = *func {
*value = Some(f());
}
let func = unsafe { &mut *supp.get() };
*func = None;
self.init.store(true, Ordering::Release);
}
}
unsafe {
(*self.val.get()).as_ref().unwrap()
}
}
}
Как по мне вариант на Rust выглядит слишком сложно, что бы его использовать на практике. Однако, с точки зрения доказательства корректности ничего не поменялось т.к. все операции с функцией делаются под блокировкой.
Безопасная гонка (benign races)
Ещё один хитрый прием, из того же доклада, позволяет написать ленивую инициализацию для небольшого числа случаев, вроде подсчета хеша объекта, которая не требует блокировок.
public class LazyHash {
int hash;
@Override public int hashCode() {
int temp = hash;
if (temp == 0) {
temp = supp();
hash = temp;
}
return temp;
}
public int supp() {
return 4;
}
}
Тут главный секрет в том что:
- Нас не парит нетранзитивность видимости полей объекта если: их нет, они final, либо если это не класс, а примитив (за исключением long и double). В остальных случаях она парит, т.к. её нельзя исправить задним числом.
- Если supp вызвать много раз, он вернёт одно и тоже значение (не равное по ссылке).
- Нас не парит, что объекты могут быть разными по ссылке, если в остальном они одинаковые.
- Мы сохраняем объект в локальную копию, что бы избежать возврата null при двойном чтении.
- Оптимизация в этом месте необходима (пункт срезающий большинство ситуаций).
Хотя код выглядит так будто мы не используем иных гарантий на операции, помимо гарантий для работы однониточных программ, это не так. Ведь Java гарантирует атомарность операций чтения и записи над примитивами (включая ссылки) кроме long и double, а без атомарности этот код не корректен. Также трюком с локальной переменной мы добавляем гарантию монотонности. Rust дает в точности эти же гарантии если сделаете Relaxed операцию..
Хешы считают в Rust хитрее чем в Java, поэтому пример ниже — абстрактный getter:
pub struct LazyHash {
hash: AtomicIsize
}
impl LazyHash {
fn new() -> Self {
LazyHash{hash: AtomicIsize::new(0)}
}
fn get(&self) -> isize {
let mut temp = self.hash.load(Ordering::Relaxed);
if temp == 0 {
temp = self.supp();
self.hash.store(temp, Ordering::Relaxed);
}
temp
}
fn supp(&self) ->isize {
4
}
}
В Rust в конце можно было бы вернуть вместо temp честный self.hash.load(Ordering::Relaxed), просто в этом нет смысла. Так же в nightly сборки Rust куча атомиков (даже 128 битные), а в Java сделать такой же фокус с 64-х битным long не получится и придется лепить volatile.
Read-write структура для одного писателя
Я услышал о такой идеи в докладе computer science ещё жива, где описывались хитрые алгоритмы используемые в IDEA.
Мысль следующая: Read-write локи слишком медленные, при наличии тысячи чтецов и ровно одного писателя. Но можно ускориться если читающие потоки будут писать в ThreadLocal флажки и проверять атомарный флаг писателя. Иных подробностей не было, поэтому моя реализация будет совсем другой.
Честно говоря описание задачи похоже на mvcc для бедных. Поясню. Если у нет нужды в монотонности, то можно обойтись без блокировок в ситуации, когда имеется не более одного писателя и много чтецов. Чтецы читают из последней (на момент начала операции) копии данных, а писатель пишет в личную копию. Когда писатель заканчивает работу, он обновляет общую копию атомарно, и новые читатели будут брать уже её. Старые читатели будут работать со старой копией. Тут нарушается монотонность т.к. новый быстрый чтец может вернуть новое значение, а после него старый медленный чтец вернет старое значение. Однако заметить нарушение монотонности нельзя если получать значения в том же порядке, в котором создавались задачи или не ожидать завершения задач из других потоков. В реализации ниже это считается приемлемым.
Однако в голову приходит по меньшей мере два способа починить монотонность:
- Для случая когда нам важен throughput можно добавить блокировку: новые чтецы и писатель не сообщат о завершении своей работы пока все старые чтецы этого не сделают. Это не тоже самое что ReadWrite блокировка поскольку задачи физически выполняются одновременно и мы просто синхронизируем видимость: после снятия блокировки все готовые результаты станут видны мгновенно. Можно отслеживать последнего, кто взял текущую копию для чтения, и все, кто взял следующую копию, должны будут подождать пока он её отпустит, прежде чем отпустить свою.
- Если же мы предполагаем, что писатель у нас в приоритете и нам важна его latency и есть возможность отменять и перезапускать задачи чтецов, то можно перезапустить все задачи, которые не успели выполниться к моменту, когда писатель обновил значение — наверное это больше подходит для use case из доклада.
Код ниже не использует ThreadLocal (информации в докладе недостаточно что бы понять зачем он нужен, если можно обойтись без него). Этот код остается валидным даже если писателей больше одного. Но сломается если читатели начнут писать.
public class PoorMvcc<T extends PoorMvcc.Copyable<T>> {
volatile T currentValue;
final ReentrantLock writersSync = new ReentrantLock();
public PoorMvcc(T val) {
currentValue = val;
}
public T getReadCopy() {
return currentValue;
}
public T getWriteCopy() {
writersSync.lock();
// Copy shoud not throw any exception.
return currentValue.copy();
}
public void returnReadCopy(T oldValue) {
// No-op because we don't care about monotonic.
}
public void returnWriteCopy(T newValue) {
currentValue = newValue;
writersSync.unlock();
}
// Because Cloneable isn't generic and method clone is protected.
public static interface Copyable<T> {
T copy();
}
}
Для разработчиков Rust может показаться дикостью, что приходится объявлять собственный интерфейс для копирования. В Java метод clone объявлен protected т.е. он видим только потомкам (где можно объявить его public) и даже если объявить тип T наследником Cloneable это не решит проблему. К слову в C# с этим все тоже не слава богу: интерфейс IСloneable не generic и возвращает Object, который ещё надо кастовать к нужному типу.
Я сделал бенчмарк на JMH. Даже в случае 1 писателя и 1 читателя реализация на PoorMvcc намного быстрее реализации на ReentrantReadWriteLock. С увеличением числа писателей разрыв только растет. Я не знаю как написать аналогичный бенчмарк на Rust. Кажется там нет функционала для проверки того как методы влияют друг на друга исполняясь одновременно.
public static class LongCopy implements PoorMvcc.Copyable<LongCopy> {
public long val;
public LongCopy(long val) {
this.val = val;
}
@Override public LongCopy copy() {
return new LongCopy(val);
}
}
public final PoorMvcc<LongCopy> poorMvcc = new PoorMvcc<>(new LongCopy(1));
@Benchmark
@Group("g")
@GroupThreads(1)
public long inc() {
LongCopy copy = poorMvcc.getWriteCopy();
try {
copy.val++;
return copy.val;
} finally {
poorMvcc.returnWriteCopy(copy);
}
}
@Benchmark
@Group("g")
@GroupThreads(10)
public long read() {
LongCopy copy = poorMvcc.getReadCopy();
try {
return copy.val;
} finally {
poorMvcc.returnReadCopy(copy);
}
}
final LongCopy longCopy = new LongCopy(1);
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Benchmark
@Group("g2")
@GroupThreads(1)
public long inc2() {
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
try {
longCopy.val++;
return longCopy.val;
} finally {
writeLock.unlock();
}
}
@Benchmark
@Group("g2")
@GroupThreads(10)
public long read2() {
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
readLock.lock();
try {
return longCopy.val;
} finally {
readLock.unlock();
}
}
Результат:
Benchmark Mode Cnt Score Error Units
Main.g avgt 20 60,452 ± 2,956 ns/op
Main.g:inc avgt 20 344,700 ± 33,870 ns/op
Main.g:read avgt 20 32,027 ± 0,935 ns/op
Main.g2 avgt 20 4093,235 ± 3403,373 ns/op
Main.g2:inc2 avgt 20 28628,403 ± 37297,944 ns/op
Main.g2:read2 avgt 20 1639,718 ± 52,243 ns/op
Инкремент быстрее в 83 раза, а чтение в 51. Если читателей станет 100, тогда инкремент будет быстрее в 12 000 раз, а чтение в 17 000.
В Rust есть Thread local, но он нам не понадобится тут тоже.
struct PoorMvcc<T: Clone> {
current_value: AtomicPtr<T>,
write_copy: Mutex<T>
}
unsafe impl <T: Clone> Sync for PoorMvcc<T> {}
impl <T: Clone> PoorMvcc<T> {
fn new(val: T) -> Self {
PoorMvcc {
write_copy: Mutex::new(val.clone()),
current_value: AtomicPtr::new(Arc::into_raw(Arc::new(val)) as *mut _)
}
}
fn get_read_copy(&self) -> Arc<T> {
let val = unsafe {Arc::from_raw(self.current_value.load(Ordering::Acquire))};
let copy = Arc::clone(&val);
Arc::into_raw(val);
copy
}
fn return_read_copy(&self, val: Arc<T>) {
// Eliminate a link
}
fn get_write_copy(&self) -> MutexGuard<T> {
self.write_copy.lock().unwrap()
}
fn return_write_copy(&self, val: MutexGuard<T>) {
let new_val = Arc::new(val.clone());
let old_val = self.current_value.swap(Arc::into_raw(new_val) as *mut _, Ordering::AcqRel);
//To avoid a memory leak we have to eliminate the old value.
unsafe {Arc::from_raw(old_val);}
}
}
Давайте сперва рассмотрим получение значения для чтения. Работать с AtomicPtr в Rust надо аккуратно что бы и память не утекла и не освободилась слишком рано. Я кладу внутрь Arc поскольку результат будет передан между потоками. Соответственно get_read_copy достает атомарно текущую Arc ссылку и возвращает её клон (так в Rust передаются Arc ссылки). Если больше ничего не сделать, то в AtomicPtr протухнет указатель, ведь считанная оттуда Arc ссылка освободится, поэтому я делаю into_raw, что бы этого не произошло. А return_read_copy принимает Arc ссылку, уничтожая эту копию.
Получение значения на запись похитрее. Поскольку надо синхронизировать записи блокировкой, я завожу отдельную копию данных обернутую в Mutex. При вызове get_write_copy я просто возвращаю значение из Mutex (оно обернуто в MutexGuard что бы освободить потом блокировку — это полный аналог кода в Java). А в return_write_copy принимаю его назад, потом подменяю старое значение для чтения на новую копию. Тут есть хитрость с операцией swap, которую надо сделать атомарно, что бы потом освободить старую Arc ссылку (у других потоков может остаться копия).
На сегодня всё. Поиск ошибок приветствуется.
Автор: SharplEr