Про auto.offset.reset в Spring Kafka

в 12:16, , рубрики: apache kafka, spring

Так исторически сложилось, что Apache Kafka использует для своих сообщений смещения (или же offset). В зависимости от нужд для настроек консьюмера можно выставить в параметр auto.offset.reset три значения: earliest, latest, none. По умолчанию, если данный параметр не задан, используется значение latest.

В данным выпуске я хочу заострить внимание на параметре none.

Earliest

Данные параметр используется, если вам необходимо получать сообщения с самого начала партиций топика. Как правило, данная опция имеет смысл, если вам нужно наполнить данными БД, передать все значения из одного место в другое и т.п.

Latest

Параметр по умолчанию. Используется тогда, когда мы хотим получать актуальные данные, т.е. пришедшие в последний момент времени. Здесь тоже ничего необычного.

None

Параметр, ради которого я и решил написать данную статью. Этот параметр не задает правил оффсета для новых консьюмеров, он кидает исключения в таких случаях.

Передо мной стояла задача: написать микросервис, который перекладывает данные из одного сервера кафки в другой (опытный читатель быть может подметит, что есть же KafkaConnect, Apache Kafka Mirror Maker и т.п., но не я выбирал требования реализации сего мероприятия через микросервис).

Из особенных требований, может быть то, что при первоначальном запуске необходимо задать оффсет, равный "текущая дата - 24 часа" и с этого момента начать консьюмить сообщения (данных очень много приходит за сутки).

Как всегда, в голову приходит решение более легкое и более быстрое - добавить флаг первичной загрузки и передавать его при старте приложения, что и было сделано (представьте, что мы не используем никакую БД, т.к. бизнес не выделил денег). Но а вдруг приложение законсьюмит данные и упадет через час, а все данные уже были взяты? Произойдет дублирование в продюссируемом топике. Что тогда делать?

Да, можно заморочиться и после запуска сразу убрать флаг первичной загрузки, все равно оффсет для группы уже будет существовать. Но это костыльное решение, от которого нужно избавляться.

Недолго думая, я решил переписать код (на самом деле, я подумал его переписать, когда столкнулся с вышеописанной ситуацией падения консьюмера и дублирования данных). Я всегда слышал, что у параметра auto.offset.reset есть 3 параметра, с двумя параметрами из них я имел опыт построения приложений, но третий - none - оставался для меня загадкой.

Мне казалось плевым делом заиспользовать none. Но в процессе написания кода, я бомбил сильнее и сильнее. Все дело в том, что нигде нет примеров с none, а я обыскал весь гугл (если точнее всю первую страницу запроса). В официальной документации сказано:

You can also select “none” if you would rather set the initial offset yourself and you are willing to handle out of range errors manually.

Но никаких примеров правильного использования нет. И во всех остальных источниках примерно то же самое, т.к. "это специфичный параметр и подходит для редких случаев и рассматривать его мы не будем".

Спустя день я наконец-то пришел к решению, с которым я намерен поделиться. Признаю, можно лучше написать код, чем я. Я далеко не идеальный программист, и если вы подскажете, как его можно лучше структурировать (ну или вообще другое решение), то буду чрезмерно благодарен.

Код

Для начала выставим параметр в application.yml (application.properties) равный none.

spring:
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: lalala-group-id
      auto-offset-reset:  none
      enable-auto-commit: false
      client-id: lalala-client-id

enable.auto.commit я тоже выключил

package org.example.service;

import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.Instant;
import java.util.*;

@Service
@RequiredArgsConstructor
@Log4j2
public class SettingService {
  private final ConsumerFactory<String, String> consumerFactory;
  @Value("${spring.kafka.topic.test}")
  String topic;
  
  @PostConstruct
  public void checkAndResetOffsetsIfNeeded() {
      Properties consumerProps = new Properties();
      // Передаем конфигурацию через ConsumerFactory
      consumerProps.putAll(consumerFactory.getConfigurationProperties());
      try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
          // Создаем временного консьюмера и берем партиции для нужного топика
          List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
          Set<TopicPartition> topicPartitions = new HashSet<>();
  
          for (PartitionInfo partitionInfo : partitionInfos) {
              topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
          }

          // присваеваем консьюмеру партиции
          consumer.assign(topicPartitions);

          // смотрим нет ли уже для группы консьюмера закомиченных оффсетов в партициях
          Map<TopicPartition, OffsetAndMetadata> commitedOffsets = consumer.committed(topicPartitions);
          Instant resetTime = Instant.now().minus(Duration.ofHours(24));
  
          Map<TopicPartition, Long> latestOffsets = consumer.endOffsets(topicPartitions);
  
          for (TopicPartition topicPartition : topicPartitions) {
              // Берем закомиченные оффсеты из мапы
              OffsetAndMetadata commitedOffset = commitedOffsets.get(topicPartition);
              Long latestOffset = latestOffsets.get(topicPartition);

              // В следующем блоке смотрим, есть ли закомиченные оффсеты, если нет, то ставим оффсет, равный resetTime
              if (commitedOffset == null || commitedOffset.offset() == -1L) {
                  Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer
                          .offsetsForTimes(Collections.singletonMap(topicPartition, resetTime.toEpochMilli()));
                  OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
  
                  long newOffset = offsetAndTimestamp != null ? offsetAndTimestamp.offset() : latestOffset;
                  log.info("Resetting offset for partition {}", topicPartition.partition());

                  // Переходим на новый оффсет и коммитим, т.к. у меня вырублен автокоммит
                  consumer.seek(topicPartition, newOffset);
                  consumer.commitSync();
                // Если есть закомиченный оффсет уже, то ничего не делаем
              } else {
                  log.info("Offset for partition {} is already commited at {}",
                          topicPartition.partition(), commitedOffset.offset());
              }
          }
  
          log.info("closing consumer setting");
      }
}

KafkaListener настроен стандартно:

@KafkaListener(
            groupId = "${spring.kafka.consumer.group-id}",
            topics = "${spring.kafka.topic.test}"
    )
public void listenTopic(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
  try {
    // какие-нибудь операции
    acknowledgment.acknowledge();
  } catch (ProducerException e) {
    log.error(e.getMessage());
    acknowledgment.nack(Duration.ofSeconds(3));
  }
}

Приведенный выше код создаст перед запуском KafkaListener временного консьюмера, через который мы уже проведем настройки оффсетов. Метод для работы с временным консьюмером вызывается через аннотацию @PostConstruct,который сначала выполнит манипуляции с временным консьюмером, закроет его и потом запустится наш KafkaListener. Конфликтов точка нет.

Через интерфейс CommandLineRunner вызов метода не сработает должным образом, т.к. KafkaListener будет запускаться в одно время с временным консьюмером и будет пробрасываться экспепшн, связанный с оффсетами (точно не помню какой, поэкспериментируйте). Я пробовал еще отключать автоматический запуск, но он требует задания KafkaListener параметра id, и я не стал с этим экспериментировать, так еще больше кода получилось бы (и не знаю, можно ли было бы легко масштабировать количество консьюмеров).

Также пробовал через обработку возникающих ошибок делать манипуляции с оффсетами, но видимо я там чего-то недоглядел и у меня не получилось это реализовать (не исключаю, что такой подход вполне возможен).

Вот таким образом можно работать с none (мне кажется я первый, кто привел пример как работать с этим параметром). Если у вас уже есть примеры работы с данным параметром, то буду рад глянуть, оставляйте в комментах :-)

Всего хорошего, всем!

Автор: beer_is_my_love

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js