В предыдущей статье я описала как создать простую лямбду на Golang, которая принимает на вход простой объект из двух полей и такой же простой объект отдает на выходе. Теперь немного усложним задачу, подсоединив к лямбде в качестве источника данных Kinesis, а результат обработки записей Kinesis мы будем перекидывать в CloudSearch. Никакой особенной логики в лямбде не будет для упрощения: просто примем запросы от Kinesis, залогируем их в CloudWatch, преобразуем и отправим в CloudSearch.
Событие Kinesis, которое мы ожидаем получить в функции выглядит следующим образом:
{
"Records": [
{
"awsRegion": "us-east-1",
"eventID": "<event-id>",
"eventName": "aws:kinesis:record",
"eventSource": "aws:kinesis",
"eventSourceARN": "arn:aws:kinesis:us-east-1:<xxxxxxxx>:stream/<stream>",
"eventVersion": "1.0",
"invokeIdentityArn": "arn:aws:iam::<xxxxxx>:role/<role>",
"kinesis": {
"approximateArrivalTimestamp": <timestamp>,
"data": <data>,
"partitionKey": "<partionkey>",
"sequenceNumber": "<sequenceNumber>",
"kinesisSchemaVersion": "1.0"
}
}
]
}
Здесь нас интересует поле data. Код функции Lambda, которая получает события из Kinesis и логирует данные поля data описан ниже: (Код взят здесь):
package main
import (
"context"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
for _, record := range kinesisEvent.Records {
kinesisRecord := record.Kinesis
dataBytes := kinesisRecord.Data
dataText := string(dataBytes)
fmt.Printf("%s Data = %s n", record.EventName, dataText)
}
return nil
}
func main() {
lambda.Start(handler)
}
Теперь необходимо дополнить код, чтобы записывать измененные данные в CloudSearch
Здесь мы будем формировать полученные данные от Kinesis в наше представление для поискового домена (CloudSearch).
Данные от Kinesis приходят в закодированном base64 виде в поле data. После декодирования данные содержат следующие поля:
type KinesisEventData struct {
FilePath string `json:"filePath"`
Id int `json:"id"`
}
В CloudSearch мы отправляем данные следующего вида:
type CloudSearchDocument struct {
Directory string `json:"dir"`
FileName string `json:"name"`
FileExtension string `json:"ext"`
}
Поле id при этом мы сохраним в идетификаторе документа. С подготовкой данных для загрузки в CloudSearch можно подробно ознакомиться здесь. Если вкратце, то в CloudSearch мы оправляем json следующего вида:
[
{"type": "add",
"id": "12345",
"fields": {
"dir": "С:",
"name": "file.txt",
"ext": "txt"
}
}
]
где type — тип запросы, который принимает два значения: add или delete; id — идентификатор документа, а в нашем случае значение, сохраненное в объекте из кинесис в поле Id; fields — пары имя-значения, которые мы сохраняем в поисковом домене, в нашем случае — объект типа CloudSearchDocument.
Код ниже преобразует данные из коллекции Records объекта, пришедшего из Kinesis, в коллекцию данных, готовых для загрузки в CloudSearch:
var amasonDocumentsBatch []AmazonDocumentUploadRequest
//Preparing data
for _, record := range kinesisEvent.Records {
kinesisRecord := record.Kinesis
dataBytes := kinesisRecord.Data
fmt.Printf("%s Data = %s n", record.EventName, string(dataBytes))
//Deserialize data from kinesis to KinesisEventData
var eventData KinesisEventData
err := json.Unmarshal(dataBytes, &eventData)
if err != nil {
return failed(), err
}
//Convert data to CloudSearch format
document := ConvertToCloudSearchDocument(eventData)
request := CreateAmazonDocumentUploadRequest(eventData.Id, document)
amasonDocumentsBatch = append(amasonDocumentsBatch, request)
}
Следующий код подключается к поисковому домену для загрузки в него подготовленных ранее данных:
if len(amasonDocumentsBatch) > 0 {
fmt.Print("Connecting to cloudsearch...n")
svc := cloudsearchdomain.New(session.New(&aws.Config{
Region: aws.String(os.Getenv("SearchRegion")),
Endpoint: aws.String(os.Getenv("SearchEndpoint")),
MaxRetries: aws.Int(6),
}))
fmt.Print("Creating request...n")
batch, err := json.Marshal(amasonDocumentsBatch)
if err != nil {
return failed(), err
}
fmt.Printf("Search document = %s n", batch)
params := &cloudsearchdomain.UploadDocumentsInput{
ContentType: aws.String("application/json"),
Documents: strings.NewReader(string(batch)),
}
fmt.Print("Starting to upload...n")
req, resp := svc.UploadDocumentsRequest(params)
fmt.Print("Send request...n")
err = req.Send()
if err != nil {
return failed(), err
}
fmt.Println(resp)
}
Для того, чтобы собрать код, необходимо подгрузить билиотеки aws-sdk-go и aws-lambda-go:
go get -u github.com/aws/aws-lambda-go/cmd/build-lambda-zip
go get -d github.com/aws/aws-sdk-go/
Как собрать и задеплоить лямбду описано в предыдущей статье, здесь только необходимо добавить переменные среды через консоль Лямбда и подготовить новые тестовые данные:
os.Getenv("SearchRegion")
os.Getenv("SearchEndpoint")
Полный код доступен по ссылке.
Но вначале откроем консоль CloudSearch и создадим поисковый домен. Для домена я буду выбирать самый минимальный инстанс и количество репликаций = 1. Далее необходимо создать поля dir, name, ext. Для данных полей я выберу тип string, но некоторые из них могут иметь и другой тип, например, литеральное поле. Но все зависит от того, как вы будете манипулировать этими полями. Для более подробной информации лучше ознакомиться с документацией Amazon.
Создаем поисковый домен(кнопка Create a new Domain), заполняем имя и выбираем тип инстанса:
Создаем поля:
Домен создается около 10 минут, после того, как он станет активным, у вас будет Url поискового домена, который необходимо ввести в переменные среды в консоли Lambda, не забывайте перед Url указывать протокол как на изображении ниже, а также укажите регион, в котором находится поисковый домен:
Теперь не забудьте выдать права лямбде через консоль IAM для работы с Kinesis, CloudWatch и CloudSearch. Kinesis можно подключить через консоль Lambda: для этого необходимо выбрать его в блоке Add triggers и заполнить поля, указав существующий в данном регионе стрим, количество записей в батче и позицию в стриме, с которой будет начинаться считывание. Мы можем протестировать работу лямбды, не подключая ее к кинесис, для этого нужно создать тест и добавить в него json следующего вида:
{
"Records": [
{
"awsRegion": "us-east-1",
"eventID": "shardId-000000000001:1",
"eventName": "aws:kinesis:record",
"eventSource": "aws:kinesis",
"eventSourceARN": "arn:aws:kinesis:us-east-1:xxx",
"eventVersion": "1.0",
"invokeIdentityArn": "arn:aws:iam::xxx",
"kinesis": {
"approximateArrivalTimestamp": 1522222222.06,
"data": "eyJpZCI6IDEyMzQ1LCJmaWxlUGF0aCI6ICJDOlxcZmlsZS50eHQifQ==",
"partitionKey": "key",
"sequenceNumber": "1",
"kinesisSchemaVersion": "1.0"
}
},
{
"awsRegion": "us-east-1",
"eventID": "shardId-000000000001:1",
"eventName": "aws:kinesis:record",
"eventSource": "aws:kinesis",
"eventSourceARN": "arn:aws:kinesis:us-east-1:xxx",
"eventVersion": "1.0",
"invokeIdentityArn": "arn:aws:iam::xxx",
"kinesis": {
"approximateArrivalTimestamp": 1522222222.06,
"data": "eyJpZCI6IDEyMzQ2LCJmaWxlUGF0aCI6ICJDOlxcZm9sZGVyXFxmaWxlLnR4dCJ9",
"partitionKey": "key",
"sequenceNumber": "2",
"kinesisSchemaVersion": "1.0"
}
}
]
}
Для генерации других значений поля data можно воспользоваться ссылкой.
Результат работы также можно посмотреть в поисковом домене:
Дополнительные материалы:
Код поиска документов в поисковом домене на Go.
В следующей статье планируется рассмотрение CloudFormation скрипта для автоматического создания и подключения Lambda, Kinesis, CloudSearch.
Автор: GolovinskayaAlbina