Многозадачность — это то, что реализовано в Go по настоящему хорошо, хоть и не идеально. Приятный синтаксис с терпким послевкусием, простые и мощные абстракции, подкупают своим изяществом по сравнению с остальными императивными языками. А попробовав лучшее, уже так не хочется скатываться к посредственности. Поэтому, если и переходить на другой язык, то он должен быть ещё более выразителен и с не менее толковой реализацией многозадачности.
Если вы уже наигрались с Go, устали от копипасты, ручного жонглирования мьютексами и всерьёз подумываете о приобретении протезов для рук, то позвольте предложить вашему вниманию перевод Tour of the Go с эквивалентным кодом на D и краткими пояснениями.
Часть пятая. Сопрограммы.
Coroutines
Go
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world")
say("hello")
}
Go позволяет легко и просто запустить любую функцию параллельным потоком и продолжить работу, не дожидаясь её завершения. Все гопотоки (волокна, fibers, сопрограммы, coroutines, greenlets) выполяются кооперативно на ограниченном числе нативных потоков (нитях, threads), тем самым максимально утилизируя ядра процессора (cores). Стандартная библиотека D поддерживает волокна, но лишь в рамках одной нити и не умеет балансировать волокна на несколько нитей. Но такой планировщик реализован в проекте vibe.d, правда синтаксис запуска параллельного потока всё ещё не столь лаконичен как в Go. Поэтому мы воспользуемся библиотекой go.d предоставляющией шаблон "go!" для параллельного запуска функций. Кроме того, следуя лучшим практикам, код примеров мы будем оформлять в виде тестов.
D
unittest
{
import core.time;
import std.range;
import jin.go;
__gshared static string[] log;
static void saying( string message )
{
foreach( _ ; 3.iota ) {
sleep( 100.msecs );
log ~= message;
}
}
go!saying( "hello" );
sleep( 50.msecs );
saying( "world" );
log.assertEq([ "hello" , "world" , "hello" , "world" , "hello" , "world" ]);
}
В D не принято велосипедить без надобности, поэтому цикл мы реализовали через итерирование по последовательности натуральных чисел. Функцию "saying" нам пришлось объявить статической, чтобы она не имела доступа к локальным переменным, что небезопасно при параллельном выполнении её в разных нитях. Если сделать эту функцию замыканием, убрав "static", то данный код не скомпилируется — благодаря шаблонной магии компилятор не разрешит нам направлять пистолет в собственные конечности. В Go же вопрос конкурентного доступа остаётся на совести программиста, у которого, в большинстве случаев, её нет.
Buffered Channels
Go
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)
}
Запускать параллельные потоки было бы не так полезно, если бы не было возможности их синхронизации. Go использует довольно элегантную абстракцию для этого — каналы. Каналы представляют из себя типизированные очереди сообщений. Если поток пытается прочитать что-то из пустого канала, то он блокируется в ожидании другого потока, которые эти данные туда запишет. И наоборот, если попытается записать в переполненный канал, то заблокируется, пока другой поток не вычитает из канала хотя бы одно сообщение. Каналы легко и просто заменяют такие абстракции как ленивые генераторы, события и обещания, привнося с собой гораздо больше сценариев использования.
В стандартной библиотеке D для общения между потоками используется приём/передача абстрактных сообщений. То есть, зная id потока вы можете послать ему произвольное сообщение, а он должен его распаковать и как-то обработать. Довольно не удобный механизм. Vibe.d вводит абстракцию потока байт с поведением, аналогичным гоканалам. Но зачастую требуется не просто байты передавать, а некоторые структуры. Кроме того, что в Go, что в D, межпотоковая коммуникация реализована через захват мьютекса, что имеет небезызвестные проблемы. Поэтому мы опять воспользуемся библиотекой go.d, предоставляющей нам типизированные wait-free каналы.
D
unittest
{
import jin.go;
auto numbers = new Channel!int(2);
numbers.next = 1;
numbers.next = 2;
numbers.next.assertEq( 1 );
numbers.next.assertEq( 2 );
}
Виртуальное свойство "next", конечно, не так наглядно, как стрелочка в Go, зато компилятор пристально следит за положением нашего пистолета, и не позволяет передать через канал типы, не безопасные для параллельного использования из разных нитей. Однако, есть один момент — эти каналы требуют чтобы у них было не больше одного читателя и не больше одного писателя. К сожалению, пока за этим приходится следить вручную, но в будущем наверняка и тут компилятор перейдёт в наши союзники.
Также стоит отметить, что размер канала в Go по умолчанию равен одному элементу, а в go.d около 512 байт.
Channels
Go
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // receive from c
fmt.Println(x, y, x+y) // -5 17 12
}
В Go работа с каналом защищена мьютексом, так что вы можете использовать его для коммуникации сразу с несколькими потоками, когда вам не важно, в каком порядке они предоставят данные. Каналы из библиотеки go.d, напротив, неблокирующие, поэтому в таком сценарии их использовать нельзя — для каждого потока необходимо создать свой коммуникационный канал. Для упрощения работы со списками каналов библиотека предоставляет структуры-балансировщики Inputs и Outputs. В данном случае нам потребуется Inputs, которая по очереди читает из каждого непустого канала, зарегистрированного в ней.
D
unittest
{
import std.algorithm;
import std.range;
import jin.go;
static auto summing( Channel!int sums , const int[] numbers ) {
sums.next = numbers.sum;
}
immutable int[] numbers = [ 7 , 2 , 8 , -9 , 4 , 0 ];
Inputs!int sums;
go!summing( sums.make(1) , numbers[ 0 .. $/2 ] );
go!summing( sums.make(1) , numbers[ $/2 .. $ ] );
auto res = sums.take(2).array;
( res ~ res.sum ).assertEq([ 17 , -5 , 12 ]);
}
Как обычно, мы не пишем руками суммирование диапазона, а используем для этого стандартный обобщённый алгоритм "sum". Чтобы подобные алгоритмы работали с вашим типом данных достаточно реализовать один из интерфейсов диапазонов, которые, разумеется, реализованы как в Channel, так и в Inputs, и в Outputs. Алгоритм "take" выдаёт ленивый диапазон, возвращающий указанное число первых элементов исходного диапазона. А алгоритм "array" выгребает из диапазона все элементы и возвращает нативный массив с ними. Обратите внимание, что каждому потоку мы передаём отдельный канал единичной длинны и срез неизменяемого массива (привет, параллелизм!).
Range and Close
Go
package main
import (
"fmt"
)
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
fmt.Println(i)
}
}
Как видно, в Go мы тоже можем итерироваться по каналу, последовательно получая из него очередные элементы. Чтобы не зависнуть в бесконечном цикле, такие каналы должны закрываться передающей стороной, чтобы принимающая могла понять, что больше данных не будет и цикл пора заканчивать. В D мы бы написали практически то же самое, разве что объявили бы ряд Фибоначчи в виде математической рекурентной формулы.
D
unittest
{
import std.range;
import jin.go;
static auto fibonacci( Channel!int numbers , int count )
{
auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( count );
foreach( x ; range ) numbers.next = x;
numbers.close();
}
auto numbers = new Channel!int(10);
go!fibonacci( numbers , numbers.size );
numbers.array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
}
Но можно ещё сильнее упростить код, зная, что шаблон "go!" сам умеет перекладывать значения из диапазона в канал.
D
unittest
{
import std.range;
import jin.go;
static auto fibonacci( int limit )
{
return recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( limit );
}
fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
go!fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
}
Таким образом, функции вовсе не обязательно знать ничего про каналы, чтобы иметь возможность запускать её параллельным потоком, а потом дожидаться от неё результата.
Select
Go
package main
import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
Go имеет специальный лаконичный синтаксис для одновременной работы с несколькими каналами. D ничего такого, разумеется, не имеет. Однако, эквивалентный функционал реализуется не особо сложнее ручной реализацией цикла наблюдения.
D
unittest
{
import std.range;
import jin.go;
__gshared int[] log;
static auto fibonacci( Channel!int numbers , Channel!bool control )
{
auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 );
while( !control.closed )
{
if( numbers.needed ) numbers.next = range.next;
yield;
}
log ~= -1;
numbers.close();
}
static void print( Channel!bool control , Channel!int numbers )
{
foreach( i ; 10.iota ) log ~= numbers.next;
control.close();
}
auto numbers = new Channel!int(1);
auto control = new Channel!bool(1);
go!print( control , numbers );
go!fibonacci( numbers , control );
while( !control.empty || !numbers.empty ) yield;
log.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 , -1 ]);
}
Как видно, нам пришлось избавиться от замыкания, а в циклах добавить "yield", чтобы конкурирующие волокна тоже могли что-то сделать, пока текущее висит в ожидании.
Default Selection
Go
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
Специальный синтаксис в Go позволяет сделать что-то, если ни в одном из каналов не было активности. В D у вас, тем не менее, больше контроля над потоком исполнения.
D
unittest
{
import core.time;
import jin.go;
static auto after( Channel!bool channel , Duration dur )
{
sleep( dur );
if( !channel.closed ) channel.next = true;
}
static auto tick( Channel!bool channel , Duration dur )
{
while( !channel.closed ) after( channel , dur );
}
auto ticks = go!tick( 101.msecs );
auto booms = go!after( 501.msecs );
string log;
while( booms.clear )
{
while( !ticks.clear ) {
log ~= "tick";
ticks.popFront;
}
log ~= ".";
sleep( 51.msecs );
}
log ~= "BOOM!";
log.assertEq( "..tick..tick..tick..tick..BOOM!" );
}
Примечательной особенностью является то, что нам не потребовалось вручную создавать канал. Если функция первым аргументом принимает канал и мы его не передали, то он будет создан автоматически и возвращён как результат работы шаблона "go!", что весьма удобно. Функции "after" и "tick" слишком специфические, чтобы вносить их в общую библиотеку, но реализации у них весьма простые.
Mutex
В некоторых случаях без разделяемого изменяемого состояния всё же не обойтись и тут нам на помощь приходят блокировки.
Go
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter is safe to use concurrently.
type SafeCounter struct {
v map[string]int
mux sync.Mutex
}
// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(key string) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
c.v[key]++
c.mux.Unlock()
}
// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(key string) int {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer c.mux.Unlock()
return c.v[key]
}
func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.Value("somekey"))
}
Да, именно так, реализация разделяемого изменяемого состояния в Go — это боль и страдания. Одно неверное движение при работе со мьютексами и вы внезапно обнаружите у себя фантомные конечности. Не говоря уж о том, что компилятор даже не намекнёт вам о том, где мьютексы необходимы. А вот компилятор D вас сильно наругает за попытку работы с незащищённым изменяемым состоянием из разных потоков. А самый простой способ защитить состояние при многопоточном выполнении — реализовать синхронизированный класс.
D
unittest
{
import core.atomic;
import core.time;
import std.range;
import std.typecons;
import jin.go;
synchronized class SafeCounter
{
private int[string] store;
void inc( string key )
{
++ store[key];
}
auto opIndex( string key )
{
return store[ key ];
}
void opIndexUnary( string op = "++" )( string key )
{
this.inc( key );
}
}
static counter = new shared SafeCounter;
static void working( int i )
{
++ counter["somekey"];
}
foreach( i ; 1000.iota ) {
go!working( i );
}
sleep( 1.seconds );
counter["somekey"].assertEq( 1000 );
}
Особенность синхронизированного класса в том, что для него автоматически создаётся мьютекс и при вызове любого публичного метода этот мьютекс захватывается, освобождаясь лишь при выходе из метода. При этом всё внутреннее состояние обязано быть приватным. Но тут есть одна неприятная особенность (а на самом деле очень опасный и досадный баг компилятора): шаблонные методы, такие как, например, "opIndexUnary!", не заворачиваются в захват мьютекса. Поэтому мы создали отдельный публичный метод "inc", который и вызываем из шаблонного метода. Внутренняя реализация получилась уже не столь красивой, зато внешний интерфейс получился как родной. Полученный "shared SafeCounter" мы уже можем спокойно передавать через канал и использовать напрямую из разных потоков.
Автор: vintage