SCS внесла большие изменения в 3.x, упразднив такие классы, как @StreamListener, @Input, @Output и т. д., сохранив Binder и Binding и предоставив поддержку пакетного потребления. В соответствии с принципом изучения нового, а не старого, в этой статье будет представлен контент, связанный с SCS 3.x. Поскольку документации по потоку весенних облаков Kafka относительно достаточно, в этой статье она используется в качестве примера для представления SCS.
Binder — это компонент, обеспечивающий интеграцию с внешним промежуточным программным обеспечением сообщений. Он предоставляет два метода для привязки, а именно:bindConsumer иbindProducer, которые используются для создания производителей и потребителей. Привязка — это мост, соединяющий приложения и промежуточное программное обеспечение для сообщений, который используется для потребления и создания сообщений.
Не пытайтесь повторить попытку и зафиксировать мертвые буквы в транзакции. При повторной попытке транзакция могла быть возвращена. Если вы хотите отправить мертвое письмо о последствиях, вы можете использовать DefaultAfterRollbackProcessor
отправлять мертвые письма после отката.
Связующее устройство будет использовать канал ошибок для передачи исключений потребителю. В то же время асинхронный производитель может быть настроен на передачу исключения в канал ошибок при возникновении исключения.
По умолчанию очередь недоставленных писем для темы будет находиться в том же разделе, что и исходная запись.
Сообщения в очереди недоставленных писем разрешено восстанавливать, но следует избегать повторных сбоев обработки сообщений, приводящих к множественным зацикливаниям в очереди недоставленных писем.
Для обработки информации в этих очередях недоставленных писем следует использовать специальный обработчик.
Как следует из названия, Consumer определяет потребителя, который представляет собой функциональный интерфейс, предоставляющий методы для получения сообщений. Мы можем добиться этого непосредственно в объявлении компонента, используя лямбда-выражение.
Стоит отметить, что Consumer также является универсальным интерфейсом, который связывает типы сообщений через дженерики. Мы будем использовать класс KStream для получения типа сообщения. Он будет соответствовать KStream, определенному при отправке сообщения. Это абстрактный поток записей, состоящий из пар ключ-значение, но записи с тем же ключом не будут перезаписаны.
@Bean
public Consumer<KStream<Object, String>> consumer() {
return input -> input.foreach((key, value) -> {
do consume;
});
}
Когда мы заявляем в приложении о возврате Consumer из Бин, тогда это Bean Он автоматически получит доступ к очереди сообщений. Кроме того, нам необходимо использовать spring.cloud.stream.bindings.{beanName}-in-{idx}={topic}
Настроить подписку из темы сообщения. По умолчанию тема и beanName То же имя.
spring.cloud.stream.bindings.consumer-in-0 = userBuy
Когда сообщение получено, оно называется Consumer Определение accept Метод потребления сообщений.
SCS и нет ответа на Отправить сообщение делает специальную инкапсуляцию, но рекомендует поддерживать ее через отдельные очереди сообщений client или template Отправить сообщение.
kafkaTemplate.send(message);
Но иногда нам необходимо обработать данные и отправить их обратно в очередь сообщений. В этом случае будет использоваться функция.
это и Consumer Аналогично, но метод имеет дополнительное возвращаемое значение. Аналогично, это возвращаемое значение необходимо использовать KStream класс, чтобы он мог поддерживать возврат обработанных данных в очередь сообщений.
@Bean
public Function<KStream<Object, String>, KStream<Object, Stream>> processor() {
return input -> input.map((key, value) -> {
do process;
return new KeyValue(key, value);
})
}
spring.cloud.stream.bindings.{beanName}-out-{idx}={topic}
Установить экспорт из темы сообщения. По умолчанию тема и beanName То же имя.
Function По сравнению с производителем или потребителем, это больше похоже на обработку сообщения. Этот процесс может выполнять серию операций над сообщением, включая разделение сообщения, фильтрацию сообщений, расчет промежуточных результатов и т. д. Обычное использование — международный обмен сообщениями и уведомления на нескольких платформах.
Интернационализированные сообщения — это локализованные сообщения. Функция Это похоже на функцию переводчика, передающую переведенное сообщение потребителю.
Иногда нам также необходимо отправлять уведомления на несколько платформ одновременно, например, по электронной почте, текстовым сообщениям и т. д. Вообще говоря, почтовые серверы и серверы SMS не имеют жестко закодированных шаблонов сообщений для повышения универсальности. В настоящее время необходим посредник для обработки сообщений и внедрения их в соответствующие шаблоны платформы.
О разделении сообщений говорилось выше, Функция Разрешить несколько topic из Отправка сообщения, будет использоваться в возвращаемом значении KStream Array, то конфигурация будет использоваться так, как показано только что. spring.cloud.stream.bindings.{beanName}-out-{idx}={topic}
,idx Это означает, что из — возвращаемое значение KStream существоватьв массивеизиндекс。
Привязка нескольких входовсуществовать редко используется в обычных приложениях и обычно используется для распределенных вычислений. Например, для расчета деления требуются как делитель, так и делимое. Распределенные вычисления также SCS из Одно из великих применений,Слепое пятно знаний,существования Я не буду здесь много рассказывать.
Выше много раз упоминалось KStream,По сути, это последовательный и растущий набор данных.,Это своего рода поток данных.
KTable и KStream Похоже, но и KStream Разница в том, что он не позволяет key повторить. Столкнувшись с тем же самым key изdata, предпочтет обновить, а не вставить. KTable По сути, это тоже поток данных, и другие классы реализации также наследуют AbstractStream。 В какой-то момент вы можете увидеть его как KStream. из Последний снимок.