Никак не доходили руки переписать go-meter. Увеличить производительность, получить более полный контроль над процессом и довести до приближения к wrk. В идеале хочется увидеть легко и удобно расширяемую альтернативу. Да, в wrk недавно появилась поддержка Lua скриптов, которые решают многие неудобства, но и там тоже есть неприятные нюансы, например, расширенную статистику собирать не получится, так как методы вывода статистики работают только на первом потоке, и к собранным данным на других потоках доступа нет, поэтому сводится опять к тому, что-бы разбираться в исходниках и делать под себя, но это не тривиальная задача. И так, готовим нагрузочный тест на Go, c плюшками. Кому интересно, прошу под кат.
Что есть и что нужно
С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера
План
— пул соединений
— простые Request/Response
— статистика
— profit
Поехали
Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.
type Connection struct {
conn net.Conn
manager *ConnectionManager
}
type ConnectionManager struct {
conns chan *Connection
config *Config
}
func NewConnectionManager(config *Config) (result *ConnectionManager) {
result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)}
for i := 0; i < config.Connections; i++ {
connection := &Connection{manager: result}
if connection.Dial() != nil {
ConnectionErrors++
}
result.conns <- connection
}
return
}
func (this *ConnectionManager) Get() *Connection {
return <-this.conns
}
func (this *Connection) Dial() error {
if this.IsConnected() {
this.Disconnect()
}
conn, err := net.Dial("tcp4", this.manager.config.Url.Host)
if err == nil {
this.conn = conn
}
return err
}
func (this *Connection) Disconnect() {
this.conn.Close()
this.conn = nil
}
func (this *Connection) IsConnected() bool {
return this.conn != nil
}
func (this *Connection) Return() {
this.manager.conns <- this
}
Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.
type Request struct {
Method string
URL *url.URL
Header map[string][]string
Body io.Reader
ContentLength int64
Host string
BufferSize int64
}
func (req *Request) Write(w io.Writer) error {
bw := &bytes.Buffer{}
fmt.Fprintf(bw, "%s %s HTTP/1.1rn", valueOrDefault(req.Method, "GET"), req.URL.RequestURI())
fmt.Fprintf(bw, "Host: %srn", req.Host)
userAgent := ""
if req.Header != nil {
if ua := req.Header["User-Agent"]; len(ua) > 0 {
userAgent = ua[0]
}
}
if userAgent != "" {
fmt.Fprintf(bw, "User-Agent: %srn", userAgent)
}
if req.Method == "POST" || req.Method == "PUT" {
fmt.Fprintf(bw, "Content-Length: %drn", req.ContentLength)
}
if req.Header != nil {
for key, values := range req.Header {
if key == "User-Agent" || key == "Content-Length" || key == "Host" {
continue
}
for _, value := range values {
fmt.Fprintf(bw, "%s: %srn", key, value)
}
}
}
io.WriteString(bw, "rn")
if req.Method == "POST" || req.Method == "PUT" {
bodyReader := bufio.NewReader(req.Body)
_, err := bodyReader.WriteTo(bw)
if err != nil {
return err
}
}
req.BufferSize = int64(bw.Len())
_, err := bw.WriteTo(w)
return err
}
type Response struct {
Status string
StatusCode int
Header map[string][]string
ContentLength int64
BufferSize int64
}
func ReadResponse(r *bufio.Reader) (*Response, error) {
tp := textproto.NewReader(r)
resp := &Response{}
line, err := tp.ReadLine()
if err != nil {
return nil, err
}
f := strings.SplitN(line, " ", 3)
resp.BufferSize += int64(len(f) + 2)
if len(f) < 2 {
return nil, errors.New("Response Header ERROR")
}
reasonPhrase := ""
if len(f) > 2 {
reasonPhrase = f[2]
}
resp.Status = f[1] + " " + reasonPhrase
resp.StatusCode, err = strconv.Atoi(f[1])
if err != nil {
return nil, errors.New("malformed HTTP status code")
}
resp.Header = make(map[string][]string)
for {
line, err := tp.ReadLine()
if err != nil {
return nil, errors.New("Response Header ERROR")
}
resp.BufferSize += int64(len(line) + 2)
if len(line) == 0 {
break
} else {
f := strings.SplitN(line, ":", 2)
resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1]))
}
}
if cl := resp.Header["Content-Length"]; len(cl) > 0 {
i, err := strconv.ParseInt(cl[0], 10, 0)
if err == nil {
resp.ContentLength = i
}
}
buff := make([]byte, resp.ContentLength)
r.Read(buff)
resp.BufferSize += int64(resp.ContentLength)
return resp, nil
}
Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он корректно завершил свою работу
WorkerQuit := make(chan bool, *_threads)
WorkerQuited := make(chan bool, *_threads)
засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент
//Start Ctr+C listen
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
//Wait timers or SIGTERM
select {
case <-time.After(config.Duration):
case <-signalChan:
}
for i := 0; i < config.Threads; i++ {
config.WorkerQuit <- true
}
//Wait for threads complete
for i := 0; i < config.Threads; i++ {
<-config.WorkerQuited
}
Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы
func NewThread(config *Config) {
timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond)
allow := int32(config.MRQ / 4 / config.Threads)
if config.MRQ == -1 {
allow = 2147483647
} else if allow <= 0 {
allow = 1
}
var connectionErrors int32 = 0
currentAllow := allow
for {
select {
//По таймеру выставляем счетчик на количество разрешенных запросов
case <-timerAllow.C:
currentAllow = allow
//Получаем свободное соединение
case connection := <-config.ConnectionManager.conns:
currentAllow--
//Если разрешенные запросы кончились - возвращаем соединение в пул
if currentAllow < 0 {
connection.Return()
} else {
//Формируем запрос
req := getRequest(config.Method, config.Url, config.Source.GetNext())
//Если нужно переподключаться на каждом запросе
if config.Reconnect && connection.IsConnected() {
connection.Disconnect()
}
//Если соединение разорвано, то пробуем его восстановить
if !connection.IsConnected() {
if connection.Dial() != nil {
connectionErrors++
}
}
//Отправляем запрос если есть соединение, иначе возвращаем соединение
if connection.IsConnected() {
go writeSocket(connection, req, config.RequestStats)
} else {
connection.Return()
}
}
//Ждем завершения
case <-config.WorkerQuit:
//Записываем ошибки по соединениям
atomic.AddInt32(&ConnectionErrors, connectionErrors)
//Подтверждаем завершение
config.WorkerQuited <- true
return
}
}
}
Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.
func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) {
result := &RequestStats{}
//По окончанию обязательно отправляем статус и отдаем соединение в пул
defer func() {
connection.Return()
read <- result
}()
now := time.Now()
conn := connection.conn
bw := bufio.NewWriter(conn)
//Пишем запрос
err := req.Write(bw)
if err != nil {
result.WriteError = err
return
}
err = bw.Flush()
if err != nil {
result.WriteError = err
return
}
//Ждем ответа
res, err := http.ReadResponse(bufio.NewReader(conn))
if err != nil {
result.ReadError = err
return
}
//Собираем нужную информацию
result.Duration = time.Now().Sub(now)
result.NetOut = req.BufferSize
result.NetIn = res.BufferSize
result.ResponseCode = res.StatusCode
req.Body = nil
}
Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее
//Вся статистика
type StatsSource struct {
Readed int64
Writed int64
Requests int
Skiped int
Min time.Duration
Max time.Duration
Sum int64
Codes map[int]int
DurationPercent map[time.Duration]int
ReadErrors int
WriteErrors int
Work time.Duration
}
//Статистика для посекундных отчетов
type StatsSourcePerSecond struct {
Readed int64
Writed int64
Requests int
Skiped int
Sum int64
}
//Агрегатор статистики
func StartStatsAggregator(config *Config) {
allowStore := true
allowStoreTime := time.After(config.ExcludeSeconds)
if config.ExcludeSeconds.Seconds() > 0 {
allowStore = false
}
verboseTimer := time.NewTicker(time.Duration(1) * time.Second)
if config.Verbose {
fmt.Printf("%s %s %s %s %s %sn",
newSpancesFormatRightf("Second", 10, "%s"),
newSpancesFormatRightf("Total", 10, "%s"),
newSpancesFormatRightf("Req/sec", 10, "%s"),
newSpancesFormatRightf("Avg/sec", 10, "%s"),
newSpancesFormatRightf("In/sec", 10, "%s"),
newSpancesFormatRightf("Out/sec", 10, "%s"),
)
} else {
verboseTimer.Stop()
}
source = StatsSource{
Codes: make(map[int]int),
DurationPercent: make(map[time.Duration]int),
}
perSecond := StatsSourcePerSecond{}
start := time.Now()
for {
select {
//Таймер для посекундных отчетов
case <-verboseTimer.C:
if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose {
//Считаем среднее время ответа
avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped)
avg := time.Duration(avgMilliseconds) * time.Millisecond
//Пишем статистику
fmt.Printf("%s %s %s %s %s %sn",
newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"),
newSpancesFormatRightf(source.Requests, 10, "%d"),
newSpancesFormatRightf(perSecond.Requests, 10, "%d"),
newSpancesFormatRightf(avg, 10, "%v"),
newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"),
newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"),
)
}
//Сбрасываем данные
perSecond = StatsSourcePerSecond{}
//Таймер для разрешения сбора статистики нужен для пропуска на старте
case <-allowStoreTime:
allowStore = true
//Получаем ответ от сервера
case res := <-config.RequestStats:
//Если были ошибки - просто их записываем, остальная информация нам не интересна
if res.ReadError != nil {
source.ReadErrors++
continue
} else if res.WriteError != nil {
source.WriteErrors++
continue
}
//Инкрементируем счетчики
source.Requests++
perSecond.Requests++
perSecond.Readed += res.NetIn
perSecond.Writed += res.NetOut
source.Readed += res.NetIn
source.Writed += res.NetOut
//Собираем статистику по запросам в разрезе HTTP кодов
source.Codes[res.ResponseCode]++
if !allowStore {
perSecond.Skiped++
source.Skiped++
continue
}
//Для среднего времени ответа
sum := int64(res.Duration.Seconds() * 1000)
source.Sum += sum
perSecond.Sum += sum
//Максимальное и минимальное время ответа
if source.Min > res.Duration {
source.Min = roundDuration(res.Duration)
}
if source.Max < res.Duration {
source.Max = roundDuration(res.Duration)
}
//Количество запросов в разрезе времени ответа округленная до 10 миллисекунд
duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10
source.DurationPercent[duration]++
//Завершение сбора статистики
case <-config.StatsQuit:
//Записываем общее время теста
source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond
if config.Verbose {
s := ""
for {
if len(s) >= 61 {
break
}
s += "-"
}
fmt.Println(s)
}
//Подтверждаем завершение
config.StatsQuit <- true
return
}
}
}
Подводим итоги
Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер
% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html
Running 30s test @ http://localhost:3001/index.html
7 threads and 21 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.09ms 6.55ms 152.07ms 99.63%
Req/Sec 5.20k 3.08k 14.33k 58.75%
Latency Distribution
50% 490.00us
75% 0.89ms
90% 1.83ms
99% 5.04ms
1031636 requests in 30.00s, 153.48MB read
Requests/sec: 34388.25
Transfer/sec: 5.12MB
и тоже самое на go с GOMAXPROCS=1
% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html
Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html
Stats: Min Avg Max
Latency 0 0 83ms
843183 requests in 30s, net: in 103MB, out 62MB
HTTP Codes:
200 100.00%
Latency:
0 99.99%
10ms - 80ms 0.01%
Requests: 28106.10/sec
Net In: 27MBit/sec
Net Out: 17MBit/sec
Transfer: 5.5MB/sec
Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout
Все исходники тут — Github
Как пользоваться
% go get github.com/a696385/go-meter
% $GOPATH/bin/go-meter -h
Спасибо за внимание!
Автор: a696385