Иванов Максим
Младший Java программист
Рецепт легкого перекуса для «Telegram - монстра Франкенштейна»
Всем привет, это вторая часть создания телеграм-бота (ссылка на первую часть), в ней мы реализуем Kafka Consumer, который будет ловить любые колебания в силе и выдавать нам всю информацию о действиях пользователя.
Ингредиенты:
-
Создание Spring Boot проект, проще всего это сделать через Spring Initializr. (в качестве системы сборки будет использоваться Gradle)
-
PostgreSQL (для комфортной работы я использую DBeaver)
Если возникнут сложности с воссозданием туториала
Начинаем с яичных желтков:
Первостепенно нужно настроить build.gradle со всеми зависимостями
build.gradle
plugins {
id 'org.springframework.boot' version '2.5.6'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
group = 'com.demo.kafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '14'
repositories {
mavenCentral()
}
configurations.all {
exclude module: 'slf4j-log4j12'
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
implementation 'org.springframework.kafka:spring-kafka:2.7.6'
implementation 'org.projectlombok:lombok:1.18.22'
implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
implementation 'org.springframework.data:spring-data-commons:2.6.0'
implementation 'org.postgresql:postgresql:42.3.1'
implementation 'com.h2database:h2:1.4.200'
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.5.6'
testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.6'
compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'
}
test {
useJUnitPlatform()
}
Далее для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka consumer
application.yml
server:
port: 9002
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Ну и в конце - настройки application.properties
application.properties
# HTTP port for incoming requests
server.port=8080
# Log db
app.db.demo.url=jdbc:postgresql://localhost:5432/change-me
app.db.demo.driver=org.postgresql.Driver
app.db.demo.user=change-me
app.db.demo.password=change-me
app.db.demo.pool-size=10
# kafka-metadata-consumer
app.metadata.tag=logs
app.metadata.bootstrapServers=athena:9092
app.metadata.groupId=group_id
app.metadata.topic=users
app.metadata.autoOffsetReset=earliest
app.metadata.enableAutoCommit=false
app.metadata.maxPollRecords=10
app.metadata.concurrency=4
app.metadata.path=files
# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me
Хорошо, говоря о структуре проекта, то советую придерживаться такого вида:
Пакеты:
-
config - описание бинов и конфигурации проекта
-
controller - обрабатывает запрос пользователя
-
model - хранит модель данных, а так же описывает маппер для этой модели
-
repository - логика работа с БД
-
service - основная бизнес логика проекта
Намазываем на тост, посыпаем сыром и кидаем в духовку:
Так как настройка бинов, можно сказать - шаблонный код, за основу она идентична с первой частью, так что, не удивляйтесь сходству.
Настройки бинов:
- Первым делом в пакете config прописываем конфигурацию бинов нашего приложения, тут настройки инициализации JdbcTemplate, так же, обратите внимание, что внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc
DbConfig
@Configuration
public class DbConfig extends DefaultDbConfig {
@Bean
@Qualifier("demo")
@ConfigurationProperties(prefix = "app.db.demo")
SpringDataJdbcProperties demoJdbcProperties() {
return new SpringDataJdbcProperties();
}
@Bean
@Qualifier("demo")
public DataSource demoDataSource(@Qualifier("demo") SpringDataJdbcProperties properties) {
return hikariDataSource("db", properties);
}
@Bean
@Qualifier("demo")
JdbcTemplate demoJdbcTemplate(@Qualifier("demo") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Data
@NoArgsConstructor
public static class SpringDataJdbcProperties {
// constants
private static final String H2_DATABASE_DRIVER = "org.h2.Driver";
/**
* JDBC URL property
*/
String url;
/**
* JDBC driver class name property
*/
String driver;
/**
* JDBC username property
*/
String user;
/**
* JDBC password property
*/
String password;
/**
* Hikari / Vertica maxPoolSize property
*/
String poolSize;
/**
* Minimum pool size
*/
int minPoolSize = 4;
/**
* Maximum pool size
*/
int maxPoolSize = 10;
/**
* This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
* sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
*/
long idleTimeout;
/**
* This property controls the maximum lifetime of a connection in the pool. When a connection
* reaches this timeout, even if recently used, it will be retired from the pool.
* An in-use connection will never be retired, only when it is idle will it be removed
*/
long maxLifetime;
/**
* Bulk insert size
*/
Integer bulkSize;
/**
* All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
*
* @param url JDBC driver class name property
* @param driver JDBC driver class name property
* @param user JDBC username property
* @param password JDBC password property
* @param poolSize Hikari / Vertica maxPoolSize property
* @param bulkSize bulk insert size
*/
public SpringDataJdbcProperties(
String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
this.url = url;
this.driver = driver;
this.user = user;
this.password = password;
this.poolSize = poolSize;
this.bulkSize = bulkSize;
}
/**
* Возвращает истину, если экземпляр описывает in-memory H2 database
*
* @return истина, если экземпляр описывает in-memory H2 database
*/
public boolean isH2Database() {
return driver.equals(H2_DATABASE_DRIVER);
}
/**
* Возвращает строковое представление экземпляра объекта в формате JSON
*
* @return строковое представление экземпляра объекта в формате JSON
*/
@Override
public String toString() {
var props = new SpringDataJdbcProperties(
url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
return Json.encode(props);
}
}
}
- Создадим базовый класс для уменьшения дублирования кода инициализации бинов
DefaultDbConfig
@Slf4j
class DefaultDbConfig {
protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
log.info("[{}] настройки БД: [{}]", tag, properties.toString());
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(properties.getUrl());
ds.setDriverClassName(properties.getDriver());
ds.setUsername(properties.getUser());
ds.setPassword(properties.getPassword());
ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
return ds;
}
}
- После, напишем утилитный класс для логирования
Json
public class Json {
static final ObjectMapper mapper = new ObjectMapper();
/**
* Encode instance as JSON
*
* @param obj instance
* @return JSON
*/
public static String encode(Object obj) {
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
return obj.toString();
}
}
public static <T> T decode(String json, Class<T> clazz) throws JsonProcessingException {
return mapper.readValue(json, clazz);
}
}
Создание модели данных
- Создаем модель для обработки логов ConsumerLog, а так же маппер ConsumerMapper, который понадобиться для работы с БД и описывания полей в таблице
ConsumerLog
@Data
@RequiredArgsConstructor
public class ConsumerLog {
@JsonProperty("id")
@JsonIgnoreProperties(ignoreUnknown = true)
private final int id;
@JsonProperty("message")
@JsonIgnoreProperties(ignoreUnknown = true)
private final String msg;
@JsonProperty("topic")
@JsonIgnoreProperties(ignoreUnknown = true)
private final String topic;
@JsonProperty("logDate")
@JsonIgnoreProperties(ignoreUnknown = true)
private final LocalDate logDate;
@Override
public String toString() {
return "Was added log [id=" + id + ", topic=" + topic + "log=" + msg + ", date=" + logDate.toString() + "]";
}
}
ConsumerMapper
@Slf4j
public class ConsumerMapper implements RowMapper<ConsumerLog> {
@Override
public ConsumerLog mapRow(ResultSet rs, int rowNum) throws SQLException {
var date = rs.getDate("date_time");
var entity = new ConsumerLog(
rs.getInt("id"),
rs.getString("message"),
rs.getString("topic"),
date == null ? null : date.toLocalDate()
);
log.trace("ConsumerMapper(): entity = [{}]", entity);
return entity;
}
}
После создания модели данных и ее маппера, приступаем к репозиториям
- Создадим интерфейс, который описывает методы, для работы с записями в БД
IConsumerLogRepository
public interface IConsumerLogRepository {
/**
* Возвращает список записей
*
* @return список всех записей
* @throws DbException в случае ошибки БД
*/
List<ConsumerLog> getLogsList();
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
void insert(ConsumerLog entity);
}
- Теперь напишем класс, который реализует методы интерфейса
ConsumerLogRepository
@Repository
@Slf4j
public class ConsumerLogRepository implements IConsumerLogRepository {
private static final String SQL_SELECT_LIST = "SELECT id, message, date_time, topic FROM log";
private static final String SQL_INSERT = "INSERT INTO log (message, date_time, topic) VALUES (?, ?, ?)";
protected final static ConsumerMapper CONSUMER_LOG_MAPPER = new ConsumerMapper();
protected final JdbcTemplate template;
public ConsumerLogRepository(@Qualifier("demo") JdbcTemplate template) {
this.template = template;
}
/**
* Возвращает записи элемента из таблицы логов подписчика
*/
@Override
public List<ConsumerLog> getLogsList() throws DbException {
return template.query(SQL_SELECT_LIST, CONSUMER_LOG_MAPPER);
}
/**
* Заполняет записи элементами из приходящего топика логов
*/
@Override
public void insert(ConsumerLog entity) throws DbException {
var result = template.update(SQL_INSERT, entity.getMsg(), entity.getLogDate(), entity.getTopic());
if (result != 1) log.trace("ConsumerLogRepository.insert() with {} rows inserted", entity);
log.trace("insert({}) result={}", entity, result);
}
}
Ну и главный элемент бизнес логики приложения - kafka consumer
- Это класс подписчик, он получает сообщения из Kafka и обрабатывает их
Consumer
@Slf4j
@Service
@AllArgsConstructor
public class Consumer {
private static final String TOPIC_NAME = "users";
protected final IConsumerLogRepository consumerRepo;
/**
* Метод обработки сообщений от producer,
* который "отлавливает" эти самые сообщения с помощью аннотации KafkaListener и принимает их в виде параметра.
*
* @param message сообщение от producer, которое прилетает в кафка
*/
@KafkaListener(topics = TOPIC_NAME, groupId = "group_id")
public void consumeWriting(String message) {
var consumerLog = new ConsumerLog(0, message, TOPIC_NAME, LocalDate.now());
consumerRepo.insert(consumerLog);
log.info("#### Consumed received message [{}]", message);
}
/**
* Получение списка логов из БД
*/
public List<ConsumerLog> consumeLog() {
var list = consumerRepo.getLogsList();
list.forEach(msg -> log.info("#### Consumer list log [{}]", msg.toString()));
return list;
}
}
Далее, как и в прошлой статье, мы напишем контроллер, для доступа к сервису из вне
- Создаем простенький контроллер, для получения списка логов из БД
TestController
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping(value = "/kafka")
public class TestController {
private final Consumer consumerService;
/**
* Возвращает записи элемента из таблицы логов подписчика
*
*/
@GetMapping(value = "/log_list")
public String getLogList() {
log.trace("[GET] getLogList()");
return consumerService.consumeLog().toString();
}
}
В заключении, класс, который собственно и запускает все наше приложение
BotLogsApplication
@SpringBootApplication
public class BotLogsApplication {
public static void main(String[] args) {
SpringApplication.run(BotLogsApplication.class, args);
}
}
Перед тем, как вытаскивать из духовки наше блюдо, нужно подготовиться:
- Сначала проверим, запущена ли Kafka
- После, запускаем Conductor и видим, что у нас работает топик users
- Далее запускаем DBeaver и благодаря первой статье, у нас уже заранее создано 2 таблицы (log и user_table), схема создания таблиц из первой части
Отлично, вынимаем наши тосты из духовки:
- Запускаем проект, проверяем, что все настроено и корректно работает
Логи запущенного приложения
. ____ _ __ _ _
/ / ' __ _ () __ __ _
( ( )__ | '_ | '| | ' / ` |
/ )| |)| | | | | || (| | ) ) ) )
' || .__|| ||| |_, | / / / /
=========||==============|/=////
:: Spring Boot :: (v2.5.6)
2022-01-19 22:14:49.283 INFO 41808 --- [ main] c.l.kafka.consumer.BotLogsApplication : No active profile set, falling back to default profiles: default
2022-01-19 22:14:49.903 INFO 41808 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9002 (http)
2022-01-19 22:14:49.910 INFO 41808 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-01-19 22:14:49.910 INFO 41808 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54]
2022-01-19 22:14:49.974 INFO 41808 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-01-19 22:14:49.974 INFO 41808 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 667 ms
2022-01-19 22:14:50.048 INFO 41808 --- [ main] c.l.k.consumer.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/change-me","driver":"org.postgresql.Driver","user":"change-me","password":"*","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-19 22:14:50.242 DEBUG 41808 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice2022-01-19 22:14:50.175 DEBUG 41808 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-19 22:14:50.214 DEBUG 41808 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-19 22:14:50.236 DEBUG 41808 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/, /] in 'resourceHandlerMapping'
2022-01-19 22:14:50.242 DEBUG 41808 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-19 22:14:50.367 INFO 41808 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-group_id-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2022-01-19 22:14:50.406 INFO 41808 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1
2022-01-19 22:14:50.407 INFO 41808 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457
2022-01-19 22:14:50.407 INFO 41808 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642619690406
2022-01-19 22:14:50.408 INFO 41808 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): users
2022-01-19 22:14:50.422 INFO 41808 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9002 (http) with context path ''
2022-01-19 22:14:50.429 INFO 41808 --- [ main] c.l.kafka.consumer.BotLogsApplication : Started BotLogsApplication in 1.413 seconds (JVM running for 1.876)
2022-01-19 22:14:50.551 INFO 41808 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: O9iXkXIMQpKE3DgrEQtJ5w
2022-01-19 22:14:50.552 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator omen:9092 (id: 2147483647 rack: null)
2022-01-19 22:14:50.553 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-01-19 22:14:50.560 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-01-19 22:14:50.562 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=17, memberId='consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b', protocol='range'}
2022-01-19 22:14:50.563 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 17: {consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b=Assignment(partitions=[users-0])}
2022-01-19 22:14:50.632 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=17, memberId='consumer-group_id-1-cb88956f-09ef-4c33-a241-2941be87ff1b', protocol='range'}
2022-01-19 22:14:50.633 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[users-0])
2022-01-19 22:14:50.637 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: users-0
2022-01-19 22:14:50.651 INFO 41808 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omen:9092 (id: 0 rack: null)], epoch=0}}
2022-01-19 22:14:50.652 INFO 41808 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group_id: partitions assigned: [users-0]
- Открываем телеграм и пробуем на вкус наши закуски
-
Пишем - /start, начинаем тест и видим, что бот работает !
- Давайте посмотрим, что же нам написал Spring в логах, отловил ли наш consumer данные из Kafka и сделал ли записи в БД ?
Логи нашего consumer, ошибок не наблюдается
. ____ _ __ _ _
/ / ' __ _ () __ __ _
( ( )__ | '_ | '| | ' / ` |
/ )| |)| | | | | || (| | ) ) ) )
' || .__|| ||| |_, | / / / /
=========||==============|/=////
:: Spring Boot :: (v2.5.6)
2022-01-19 22:21:26.142 INFO 42281 --- [ main] c.l.kafka.consumer.BotLogsApplication : No active profile set, falling back to default profiles: default
2022-01-19 22:21:27.195 INFO 42281 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9002 (http)
2022-01-19 22:21:27.201 INFO 42281 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-01-19 22:21:27.201 INFO 42281 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54]
2022-01-19 22:21:27.245 INFO 42281 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-01-19 22:21:27.246 INFO 42281 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1030 ms
2022-01-19 22:21:27.329 INFO 42281 --- [ main] c.l.k.consumer.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"postgres","password":"*","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-19 22:21:27.561 DEBUG 42281 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice2022-01-19 22:21:27.490 DEBUG 42281 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-19 22:21:27.524 DEBUG 42281 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-19 22:21:27.551 DEBUG 42281 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/, /] in 'resourceHandlerMapping'
2022-01-19 22:21:27.561 DEBUG 42281 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-19 22:21:27.726 INFO 42281 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-group_id-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group_id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2022-01-19 22:21:27.772 INFO 42281 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1
2022-01-19 22:21:27.772 INFO 42281 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457
2022-01-19 22:21:27.772 INFO 42281 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642620087771
2022-01-19 22:21:27.774 INFO 42281 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group_id-1, groupId=group_id] Subscribed to topic(s): users
2022-01-19 22:21:27.787 INFO 42281 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9002 (http) with context path ''
2022-01-19 22:21:27.794 INFO 42281 --- [ main] c.l.kafka.consumer.BotLogsApplication : Started BotLogsApplication in 2.184 seconds (JVM running for 2.825)
2022-01-19 22:21:27.964 INFO 42281 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group_id-1, groupId=group_id] Cluster ID: O9iXkXIMQpKE3DgrEQtJ5w
2022-01-19 22:21:27.965 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Discovered group coordinator omen:9092 (id: 2147483647 rack: null)
2022-01-19 22:21:27.974 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-01-19 22:21:27.988 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] (Re-)joining group
2022-01-19 22:21:27.993 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully joined group with generation Generation{generationId=19, memberId='consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066', protocol='range'}
2022-01-19 22:21:27.994 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Finished assignment for group at generation 19: {consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066=Assignment(partitions=[users-0])}
2022-01-19 22:21:28.000 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Successfully synced group in generation Generation{generationId=19, memberId='consumer-group_id-1-74db2249-45e9-493e-9180-d349f0688066', protocol='range'}
2022-01-19 22:21:28.002 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Notifying assignor about the new Assignment(partitions=[users-0])
2022-01-19 22:21:28.003 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Adding newly assigned partitions: users-0
2022-01-19 22:21:28.011 INFO 42281 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group_id-1, groupId=group_id] Setting offset for partition users-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omen:9092 (id: 0 rack: null)], epoch=0}}
2022-01-19 22:21:28.012 INFO 42281 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group_id: partitions assigned: [users-0]
2022-01-19 22:21:29.407 INFO 42281 --- [ntainer#0-0-C-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2022-01-19 22:21:29.566 INFO 42281 --- [ntainer#0-0-C-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2022-01-19 22:22:17.027 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> команда: /start]
2022-01-19 22:22:20.652 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> мысль: Как же хочется написать статью на Хабр !!!]
2022-01-19 22:22:25.344 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> мысль: Может написать статью о боте в Телеграмм ?]
2022-01-19 22:22:30.394 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> мысль: Написать статью!]
2022-01-19 22:22:35.652 INFO 42281 --- [ntainer#0-0-C-1] c.logs.kafka.consumer.service.Consumer : #### Consumed received message [Writing in log -> команда: /idea]
- Сообщения, отправленные producer в Kafka, были обработаны нашим consumer и записаны в БД
- Далее, по инструкции из первой статьи, откройте окно Сonsume from Topic, здесь показаны прилетевшие в Kafka сообщения
Как и в первой статье, мы убедились, что приложение корректно работает, сообщения благополучно прилетели в Kafka, были отловлены, обработаны и записаны в БД
Вот и все, надеюсь, что у всех получилось повторить туториал в первого раза, в будущем будет еще много интересного, всем спасибо.
Автор: Max I.