Как правило, в Go для безопасного доступа к общим данным используются мьютексы. Да, каналы тоже можно приспособить для изменения общих данных, так как они потокобезопасны, но это усложняет и замедляет логику.
Но в этой статье мы поговорим о другом. Современные процессоры имеют поддержку атомарных операций, что позволяет на основе них организовывать работу с общими данными до нескольких раз быстрее, чем с помощью общепринятых вариантов. Так как мьютексы реализованы на основе ОС, каналы сделаны на основе внутреннего кода Go с использованием тех же мьютексов из ОС под капотом, а атомарные операции делает сам процессор аппаратно за существенно меньшее количество тактов.
Пример кода, который при многопоточности неожиданно начинает глючить
Начнём с основ. Вначале будет код, который параллельно к переменной прибавляет и убавляет по миллиону единиц. В типе A лежит счётчик, тип B добавляет 1 (метод B.II), тип С убавляет 1 (метод C.DD). И всё это параллельно. Казалось бы, тут не нужны никакие примитивы синхронизации, так как общее число прибавлений и убавлений равно миллиону, а поэтому результат будет равен нулю.
package main
import (
"fmt"
"sync"
)
type A struct {
i int
}
func (a *A) Inc(delta int) {
a.i += delta
}
func (a *A) Get() int {
return a.i
}
type B struct {
}
func (b *B) II(c int) {
for i := 0; i < c; i++ {
a.Inc(1)
}
wg.Done()
}
type C struct {
}
func (c *C) DD(count int) {
for i := 0; i < count; i++ {
a.Inc(-1)
}
wg.Done()
}
var a A
var b B
var c C
var wg sync.WaitGroup
func main() {
var nCount = 1000000
a.i = 0
wg.Add(1)
go b.II(nCount)
wg.Add(1)
go c.DD(nCount)
wg.Wait()
fmt.Println("Sum", a.Get())
}
Однако если мы запустим этот код, то получим что угодно, но только не ноль. Пробуем!
Объяснить это можно только тем, что процессы параллельно лезут к одним и тем же ячейкам памяти, их действия накладываются друг на друга (хотя чему тут накладываться? инкремент/декремент — одна из самых примитивных операций), но компилятор не догадывается их использовать, а использует несколько регистров, вступают в действие кеши L1, L2 и L3, и на выходе вместо нуля белиберда.
Классика: мьютексы решают вопрос
Хорошо, теперь перепишем этот код классически, на базе мьютексов:
package main
import (
"fmt"
"sync"
)
type A struct {
i int
m sync.Mutex
}
func (a *A) Inc(delta int) {
a.m.Lock()
defer a.m.Unlock()
a.i += delta
}
func (a *A) Get() int {
a.m.Lock()
defer a.m.Unlock()
return a.i
}
type B struct {
}
func (b *B) II(c int) {
for i := 0; i < c; i++ {
a.Inc(1)
}
wg.Done()
}
type C struct {
}
func (c *C) DD(count int) {
for i := 0; i < count; i++ {
a.Inc(-1)
}
wg.Done()
}
var a A
var b B
var c C
var wg sync.WaitGroup
func main() {
var nCount = 1000000
a.i = 0
wg.Add(1)
go b.II(nCount)
wg.Add(1)
go c.DD(nCount)
wg.Wait()
fmt.Println("Sum", a.Get())
}
Мьютекс ожидаемо делает своё дело, и на выходе мы получаем ноль. Проверить самому. Кстати, мьютекс можно разместить в любом объекте, не только в защищаемом, и он сделает своё благородное дело. Только из соображений понятности и поддержки кода мьютексы обычно размещают в защищаемых объектах.
На сцену выходят примы: атомарные операции
Теперь же решим эту задачу неблокирующим, быстрым и красивым способом. Атомарным.
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type A struct {
i int64
}
func (a *A) Inc(delta int64) {
atomic.AddInt64(&a.i, delta)
}
func (a *A) Get() int64 {
return atomic.LoadInt64(&a.i)
}
type B struct {
}
func (b *B) II(c int) {
for i := 0; i < c; i++ {
a.Inc(1)
}
wg.Done()
}
type C struct {
}
func (c *C) DD(count int) {
for i := 0; i < count; i++ {
a.Inc(-1)
}
wg.Done()
}
var a A
var b B
var c C
var wg sync.WaitGroup
func main() {
var nCount = 1000000
a.i = 0
wg.Add(1)
go b.II(nCount)
wg.Add(1)
go c.DD(nCount)
wg.Wait()
fmt.Println("Sum", a.Get())
}
Во дела! Работает! Мы тоже получили на выходе ноль после параллельной работы увеличивающей и уменьшающей процедуры.
Достаточно было только использовать специфическую атомарную функцию atomic.AddInt64. Атомарное сложение — очень полезная функция, так как неблокирующим способом мы можем корректно складывать некие результаты из разных потоков практически без затрат времени на синхронизацию. Также я использовал atomic.LoadInt64, но в данном примере она необязательна.
Специфика атомарных операций в том, что они работают только с узким набором данных — это 32-х, 64-х битные данные и указатели.
▍ Сравнение скорости мьютексов и атомарных операций
Для того чтобы сравнить скорость действия атомарных операций и мьютексов я увеличил число операций до 100 миллионов.
Итак, атомарные операции: 2.12 секунды, мьютексы 5.85 секунды.
Как и следовало ожидать, атомарные операции существенно быстрее, примерно в 3 раза.
Частый случай использования атомарных операций, когда несколько горутин что-то делают с общим аккумулятором. Например, суммы убытков и прибылей из разных источников складываются и отображаются онлайн. А вот суммирование массива несколькими горутинами через атомарное сложение не ускоришь. Так как оно существенно дороже по процессорному времени, чем операция обычного сложения из-за сбросов кешей процессора.
С точки зрения процессора (и всех его многочисленных ядер) есть мир до атомарной операции и после. На основе этого принципа из атомарных операций можно делать самопальные конструкции для синхронизации многопоточной работы. И довольно быстрые.
Важный момент: скорость атомарных операций в виртуальной среде может быть значительно ниже из-за того, что они могут эмулироваться. Похоже, что так происходит в WSL.
Самодельный мьютекс на основе CompareAndSwap
Для того, чтобы создавать самодельные мьютексы, можно использовать специальную атомарную фунцию
CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
С ней всё ещё проще. Сделаем самодельный мьютекс.
type MyMutex struct {
i int64
}
func (m *MyMutex) get() int64 {
return atomic.LoadInt64(&m.i)
}
func (m *MyMutex) set(val int64) {
atomic.StoreInt64(&m.i, val)
}
func (m *MyMutex) Lock() {
for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true {
}
return
}
func (m *MyMutex) Unlock() {
if m.get() == 0 {
panic(fmt.Errorf("Double unlocking"))
}
m.set(0)
}
Этот подход известен в системном программировании как Compare and swap. Также для этой цели можно использовать подход Test and set, но пакет sync/atomic не предоставляет функций для этого подхода. «Compare and swap» в википедии считают более хорошим подходом. Дополнительная информация. При тестировании этот самодельный мьютекс показывает на 40% худшие результаты, чем оригинальный. Из-за нескольких причин:
- Есть цикл (ниже), на который тратится много процессорного времени.
for atomic.CompareAndSwapInt64(&m.i, 0, 1) != true { }
Если бы могли в него вставить, что-то типа ассемблерных инструкций NO_OPERATION или какой-то микрозадержки, то стал бы лучше работать. Стандартная функция time.Sleep() плохо работает для этой цели. При выставлении 1 наносекунды там может быть в тысячи раз большая реальная задержка.
- Планировщик ОС, при использовании системного мьютекса, выделяет поменьше квантов времени тому потоку, который просто ждёт захвата мьютекса.
- Библиотечный мьютекс, наверняка, написан на более низком уровне.
- Обвязка для самодельного мьютекса, скорее всего, менее эффективно компилируется в ассемблерные инструкции.
Безопасная многопоточная работа с общими данными без каналов и мьютексов
Хорошо, а если нам нужны какие-то более сложные операции и структуры данных? Можно ли тут получить пользу от атомарных операций?
Да, можно! Перейдём к самому интересному. Создадим на основе атомарных операций самодельную потокобезопасную конструкцию, которая будет контролировать состояние конечного автомата. И сделаем это по другому, чем ранее в статье.
▍ Конечный автомат, обслуживающий несколько потоков на атомарных операциях
Некий автомат имеет 3 состояния: открыт, занят уменьшением, занят увеличением. Это условно. Состояний может быть больше. Они могут быть называться как угодно. Суть в том, что есть общие данные любого вида и мы организуем к ним бесконфликтный доступ разных методов в зависимости от состояния объекта. Без использования мьютексов и каналов.
Например, потому что мы знаем, что атомарные операции могут быть сильно быстрее, или просто любим ненормальное программирование.
Итак, представляю код. В нём (для демонстрации потенциальных вариантов реализации) я обошёлся без функции CompareAndSwap. Хотя с ней было бы чуть проще.
package main
import (
"fmt"
"sync"
"sync/atomic"
)
const (
Open int64 = iota
Increasing
Decreasing
)
type A struct {
state int64
i int64
}
func (a *A) SetState(state int64) {
atomic.StoreInt64(&a.state, state)
}
func (a *A) GetState() int64 {
return atomic.LoadInt64(&a.state)
}
func (a *A) AddState(delta int64) int64 {
return atomic.AddInt64(&a.state, delta)
}
// По хорошему тут должно быть 2 процедуры: для состояния Increasing и для Decreasing,
// но не хотелось плодить почти одинаковый код
func (a *A) Inc(delta int64) {
if delta > 0 {
a.SetState(Increasing)
} else {
a.SetState(Decreasing)
}
a.i += delta
a.SetState(Open)
}
func (a *A) Init() {
a.SetState(Open)
}
func (a *A) Get() int64 {
return a.i
}
type B struct {
}
func (b *B) II(c int) {
for i := 0; i < c; i++ {
i_begin:
for a.GetState() != Open {
}
if a.AddState(Increasing) == Increasing {
a.Inc(1)
} else {
goto i_begin
}
}
wg.Done()
}
type C struct {
}
func (c *C) DD(count int) {
for i := 0; i < count; i++ {
d_begin:
for a.GetState() != Open {
}
if a.AddState(Decreasing) == Decreasing {
a.Inc(-1)
} else {
goto d_begin
}
}
wg.Done()
}
var a A
var b B
var c C
var wg sync.WaitGroup
func main() {
a.Init()
var nCount = 1000000
wg.Add(1)
go b.II(nCount)
wg.Add(1)
go c.DD(nCount)
wg.Wait()
fmt.Println("Sum", a.Get())
}
Запустить в Go Playground. Если проблем с многопоточностью нет, то код должен вывести ноль как результат. И выводит.
Обратите внимание, что в этом коде операция инкремента теперь обычная, а не атомарная, но тем не менее всё корректно считается.
Логика: когда объект A объявляет о том, что он открыт, то объекты B и C пытаются переключить его состояние в Increasing или Decreasing с помощью добавления к нулю (это состояние Open) значений Increasing или Decreasing. Тот, кто первый добавил, получает, например, Increasing. Второй уже получит значение Increasing + Decreasing, которое не даст ему начать делать свою работу и переведёт в состояние ожидания, пока состояние не станет Open. Мы можем использовать сколько угодно состояний, главное, чтобы сумма номеров любых остальных не равнялась никакому номеру другого состояния.
Вот кусочек с главным хаком, который делает нашу работу потокобезопасной:
for a.GetState() != Open {
}
if a.AddState(Decreasing) == Decreasing {
a.Inc(-1)
}
Например, можно использовать степени двойки: 1, 2, 4, 8, 16, 32. Сумма любого количества состояний не будет равна никакому другому номеру состояния. Каждая степень двойки представлена бинарно отдельным битом, который не пересекается с другими битами. Каждое состояние будет представлено машинным словом со всего одной битовой единицей, только в разном месте. И поэтому при сложении состояний мы будем получать результат с несколькими единицами.
Наш переключатель реагирует только на чистые состояния, в которых только по одному включённому биту. И поэтому каждая горутина легко отличает ситуации, когда ей удалось занять очередь на исполнение, и начинает исполнять свою работу.
Для 100 миллионов циклов этот код выполнялся 14 секунд, аналогичный код с тремя состояними на мьютексах выполнялся 25.3 секунды. Код на мьютексах немного сложнее, чем мог бы быть, но только для того, чтобы имитировать объект с 3 состояниями и с минимально блокирующим доступом к нему. Наверное, более правильно его переписать на каналах, но тогда он будет выполняться ещё дольше, так как под капотом каналов всё те же мьютексы.
Сразу скажу, что в Go Playground не получится потестить мои скрипты со 100М выполнений, и функция времени там работает некорректно. Для того, чтобы воспроизвести мои эксперименты вам придётся компилировать самому.
Я не призываю переписывать все мьютексы на атомарных операциях, то что я рассказал, это скорее proof-of-concept. Но при этом нужно знать об атомарных операциях и почаще их использовать — инструмент великолепный и быстрый!
Вывод
Атомарные функции в Go дают возможность организовать как блокирующие, так и в особенности неблокирующие алгоритмы с механизмами синхронизации, которые иногда работают в 2-3 раза быстрее, чем мьютексы и каналы. В некоторых ситуациях низкоуровневого программирования это может дать существенную пользу, хотя и требует экстремальной внимательности во время разработки.
Во всех остальных ситуациях разработчики Go рекомендуют использовать каналы и мьютексы, как более понятные и наглядные в применении.
Нативные мьютексы от ОС имеют более тесную связь с планировшиком задач, поэтому нет смысла переписывать именно мьютексы на атомарных операциях, но в других задачах, в особенности в неблокирующих алгоритмах, стоит рассмотреть атомарные операции для ускорения алгоритмов.
Более детально посмотреть на атомарные функции вы можете в документации Go.
Ранее на Хабре на эту тему: Атомарные и неатомарные операции, Atomic operation.
Автор: Сергей Ю. Каменев