Установление двухэтапной подачи основано на следующих предположениях:
Kafka реализует семантику Exactly Once, которая в основном основана на идемпотентности на стороне производителя и гарантиях транзакций на стороне сервера Kafka.
Продуценты Реализация идемпотентности происходит главным образом через Последовательность Номер) идентифицирует последовательность сообщений раздела:
Роль порядкового номера:
Kafka представляет координатор транзакций (аналогичный компоненту TC в режиме Seata AT) для координации транзакций и управления ими.
Псевдокод выглядит следующим образом:
// создавать Producer экземпляр и укажите transaction id
KafkaProducer producer = createKafkaProducer(
“bootstrap.servers”, “localhost:9092”,
“transactional.id”, “my-transactional-id”);
// Инициализируйте дела, здесь будет TC Сервисное приложение producer id
producer.initTransactions();
// создавать Consumer экземпляр и подписаться на topic
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-group-id”,
"isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
// Начать новые дела
producer.beginTransaction();
for (ConsumerRecord record : records) {
// Отправить сообщение в раздел
producer.send(producerRecord(“outputTopic_1”, record));
producer.send(producerRecord(“outputTopic_2”, record));
}
// представлять на рассмотрение offset
producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");
// представлять на рассмотрениедела
producer.commitTransaction();
}
первый этап После получения запроса на отправку транзакции служба TC сначала сохранит информацию об отправке в теме транзакции. После успешного сохранения сервер немедленно отправляет ответ об успехе производителю. Затем найдите все разделы, участвующие в транзакции, сгенерируйте запрос на фиксацию для каждого раздела и сохраните его в очереди, ожидающей отправки. В настоящее время статус сообщения о транзакции — представление транзакции.
второй этап Фоновый поток будет постоянно извлекать запросы из очереди и отправлять их в раздел. Когда раздел получает сообщение о результате транзакции, он сохраняет результат в разделе и возвращает ответ об успехе службе TC. Когда служба TC получает успешные ответы от всех разделов, она сохраняет сообщение о завершении транзакции в теме транзакции. На этом этапе завершается полный процесс транзакции.
В отличие от обычного двухэтапного представления, координатору необходимо получить ответы от всех участников, прежде чем он сможет оценить, успешна ли транзакция, и, наконец, вернуть результат клиенту.
Логика обработки Kafka такова: если служба TC зависает перед отправкой запроса в раздел после отправки ответа производителю. Поскольку информация о каждой транзакции будет сохраняться, после зависания службы TC и ее перезапуска информация о транзакции будет сначала загружена из темы транзакции. Если будет обнаружено, что имеется только информация об отправке транзакции, но нет информации о завершении последующей транзакции, она будет сохранена. означает, что существует информация о результате транзакции, которая не была отправлена в раздел.
Сообщение о транзакции здесь представляет собой журнал транзакций.
ссылка
Принцип реализации транзакции Kafka
Семантика Exactly Once и принципы механизма транзакций
Flink абстрагирует общую логику протокола двухфазной фиксации в класс TwoPhaseCommitSinkFunction.
Когда мы реализуем сквозное приложение, работающее ровно один раз, нам нужно реализовать всего 4 метода этого класса:
первый этап
Начало Checkpoint представляет собой фазу «предварительной фиксации» протокола двухфазной фиксации. Когда Checkpoint срабатывает, Flink JobManager вводит барьер в поток данных (он делит записи в потоке данных на часть, которая входит в поток данных). текущая контрольная точка и часть, которая входит в следующую часть контрольной точки). Барьер будет передаваться между операторами вместе с потоком данных. Для каждого оператора его серверная часть состояния будет запускаться для сохранения данных о его состоянии.
Фаза предварительной фиксации заканчивается после успешного завершения Checkpoint. В конце первого этапа данные записываются на внешнее хранилище.
второй этап
Когда все экземпляры завершат создание моментального снимка и выполнят preCommit, сообщение о завершении моментального снимка будет отправлено в JobManager. После его получения JobManager (координатор TC) посчитает, что контрольная точка завершена, и отправит сообщение о завершении контрольной точки всем экземплярам. (Уведомление о завершении контрольной точки). Когда оператор Sink получит это уведомление, он выполнит метод фиксации для формальной отправки.
Серверное/внешнее хранилище состояния здесь соответствует журналу транзакций. Используется для сохранения информации журнала.
Flink Механизм контрольных точек также реализован на основе двухфазной фиксации и журнала транзакций. ссылка <<Flink Ядропринципи реализация>>Книга №113глава,См. подробное описание
ссылка