[Spring Cloud]Введение в компонент Stream
[Spring Cloud]Введение в компонент Stream

SCS внесла большие изменения в 3.x, упразднив такие классы, как @StreamListener, @Input, @Output и т. д., сохранив Binder и Binding и предоставив поддержку пакетного потребления. В соответствии с принципом изучения нового, а не старого, в этой статье будет представлен контент, связанный с SCS 3.x. Поскольку документации по потоку весенних облаков Kafka относительно достаточно, в этой статье она используется в качестве примера для представления SCS.

Binder — это компонент, обеспечивающий интеграцию с внешним промежуточным программным обеспечением сообщений. Он предоставляет два метода для привязки, а именно:bindConsumer иbindProducer, которые используются для создания производителей и потребителей. Привязка — это мост, соединяющий приложения и промежуточное программное обеспечение для сообщений, который используется для потребления и создания сообщений.

Связывающая транзакция

Не пытайтесь повторить попытку и зафиксировать мертвые буквы в транзакции. При повторной попытке транзакция могла быть возвращена. Если вы хотите отправить мертвое письмо о последствиях, вы можете использовать DefaultAfterRollbackProcessor отправлять мертвые письма после отката.

Error Channel

Связующее устройство будет использовать канал ошибок для передачи исключений потребителю. В то же время асинхронный производитель может быть настроен на передачу исключения в канал ошибок при возникновении исключения.

Dead-Letter

По умолчанию очередь недоставленных писем для темы будет находиться в том же разделе, что и исходная запись.

Сообщения в очереди недоставленных писем разрешено восстанавливать, но следует избегать повторных сбоев обработки сообщений, приводящих к множественным зацикливаниям в очереди недоставленных писем.

Для обработки информации в этих очередях недоставленных писем следует использовать специальный обработчик.

Потребитель Потребитель

Как следует из названия, Consumer определяет потребителя, который представляет собой функциональный интерфейс, предоставляющий методы для получения сообщений. Мы можем добиться этого непосредственно в объявлении компонента, используя лямбда-выражение.

Стоит отметить, что Consumer также является универсальным интерфейсом, который связывает типы сообщений через дженерики. Мы будем использовать класс KStream для получения типа сообщения. Он будет соответствовать KStream, определенному при отправке сообщения. Это абстрактный поток записей, состоящий из пар ключ-значение, но записи с тем же ключом не будут перезаписаны.

Язык кода:javascript
копировать
@Bean
public Consumer<KStream<Object, String>> consumer() {
		return input -> input.foreach((key, value) -> {
				do consume;
		});
}

Когда мы заявляем в приложении о возврате Consumer из Бин, тогда это Bean Он автоматически получит доступ к очереди сообщений. Кроме того, нам необходимо использовать spring.cloud.stream.bindings.{beanName}-in-{idx}={topic} Настроить подписку из темы сообщения. По умолчанию тема и beanName То же имя.

spring.cloud.stream.bindings.consumer-in-0 = userBuy

Когда сообщение получено, оно называется Consumer Определение accept Метод потребления сообщений.

Отправить сообщение производителю

SCS и нет ответа на Отправить сообщение делает специальную инкапсуляцию, но рекомендует поддерживать ее через отдельные очереди сообщений client или template Отправить сообщение.

Язык кода:javascript
копировать
		kafkaTemplate.send(message);

Функция перерабатывающего завода

Но иногда нам необходимо обработать данные и отправить их обратно в очередь сообщений. В этом случае будет использоваться функция.

это и Consumer Аналогично, но метод имеет дополнительное возвращаемое значение. Аналогично, это возвращаемое значение необходимо использовать KStream класс, чтобы он мог поддерживать возврат обработанных данных в очередь сообщений.

Язык кода:javascript
копировать
@Bean
public Function<KStream<Object, String>, KStream<Object, Stream>> processor() {
		return input -> input.map((key, value) -> {
			do process;
			return new KeyValue(key, value);
		})
}

spring.cloud.stream.bindings.{beanName}-out-{idx}={topic} Установить экспорт из темы сообщения. По умолчанию тема и beanName То же имя.

Function По сравнению с производителем или потребителем, это больше похоже на обработку сообщения. Этот процесс может выполнять серию операций над сообщением, включая разделение сообщения, фильтрацию сообщений, расчет промежуточных результатов и т. д. Обычное использование — международный обмен сообщениями и уведомления на нескольких платформах.

Интернационализированные сообщения — это локализованные сообщения. Функция Это похоже на функцию переводчика, передающую переведенное сообщение потребителю.

Иногда нам также необходимо отправлять уведомления на несколько платформ одновременно, например, по электронной почте, текстовым сообщениям и т. д. Вообще говоря, почтовые серверы и серверы SMS не имеют жестко закодированных шаблонов сообщений для повышения универсальности. В настоящее время необходим посредник для обработки сообщений и внедрения их в соответствующие шаблоны платформы.

Несколько привязок вывода

О разделении сообщений говорилось выше, Функция Разрешить несколько topic из Отправка сообщения, будет использоваться в возвращаемом значении KStream Array, то конфигурация будет использоваться так, как показано только что. spring.cloud.stream.bindings.{beanName}-out-{idx}={topic},idx Это означает, что из — возвращаемое значение KStream существоватьв массивеизиндекс。

Привязка нескольких входов

Привязка нескольких входовсуществовать редко используется в обычных приложениях и обычно используется для распределенных вычислений. Например, для расчета деления требуются как делитель, так и делимое. Распределенные вычисления также SCS из Одно из великих применений,Слепое пятно знаний,существования Я не буду здесь много рассказывать.

KStream

Выше много раз упоминалось KStream,По сути, это последовательный и растущий набор данных.,Это своего рода поток данных.

KTable

KTable и KStream Похоже, но и KStream Разница в том, что он не позволяет key повторить. Столкнувшись с тем же самым key изdata, предпочтет обновить, а не вставить. KTable По сути, это тоже поток данных, и другие классы реализации также наследуют AbstractStream。 В какой-то момент вы можете увидеть его как KStream. из Последний снимок.

boy illustration
Углубленный анализ переполнения памяти CUDA: OutOfMemoryError: CUDA не хватает памяти. Попыталась выделить 3,21 Ги Б (GPU 0; всего 8,00 Ги Б).
boy illustration
[Решено] ошибка установки conda. Среда решения: не удалось выполнить первоначальное зависание. Повторная попытка с помощью файла (графическое руководство).
boy illustration
Прочитайте нейросетевую модель Трансформера в одной статье
boy illustration
.ART Теплые зимние предложения уже открыты
boy illustration
Сравнительная таблица описания кодов ошибок Amap
boy illustration
Уведомление о последних правилах Points Mall в декабре 2022 года.
boy illustration
Даже новички могут быстро приступить к работе с легким сервером приложений.
boy illustration
Взгляд на RSAC 2024|Защита конфиденциальности в эпоху больших моделей
boy illustration
Вы используете ИИ каждый день и до сих пор не знаете, как ИИ дает обратную связь? Одна статья для понимания реализации в коде Python общих функций потерь генеративных моделей + анализ принципов расчета.
boy illustration
Используйте (внутренний) почтовый ящик для образовательных учреждений, чтобы использовать Microsoft Family Bucket (1T дискового пространства на одном диске и версию Office 365 для образовательных учреждений)
boy illustration
Руководство по началу работы с оперативным проектом (7) Практическое сочетание оперативного письма — оперативного письма на основе интеллектуальной системы вопросов и ответов службы поддержки клиентов
boy illustration
[docker] Версия сервера «Чтение 3» — создайте свою собственную программу чтения веб-текста
boy illustration
Обзор Cloud-init и этапы создания в рамках PVE
boy illustration
Корпоративные пользователи используют пакет регистрационных ресурсов для регистрации ICP для веб-сайта и активации оплаты WeChat H5 (с кодом платежного узла версии API V3)
boy illustration
Подробное объяснение таких показателей производительности с высоким уровнем параллелизма, как QPS, TPS, RT и пропускная способность.
boy illustration
Удачи в конкурсе Python Essay Challenge, станьте первым, кто испытает новую функцию сообщества [Запускать блоки кода онлайн] и выиграйте множество изысканных подарков!
boy illustration
[Техническая посадка травы] Кровавая рвота и отделка позволяют вам необычным образом ощипывать гусиные перья! Не распространяйте информацию! ! !
boy illustration
[Официальное ограниченное по времени мероприятие] Сейчас ноябрь, напишите и получите приз
boy illustration
Прочтите это в одной статье: Учебник для няни по созданию сервера Huanshou Parlu на базе CVM-сервера.
boy illustration
Cloud Native | Что такое CRD (настраиваемые определения ресурсов) в K8s?
boy illustration
Как использовать Cloudflare CDN для настройки узла (CF самостоятельно выбирает IP) Гонконг, Китай/Азия узел/сводка и рекомендации внутреннего высокоскоростного IP-сегмента
boy illustration
Дополнительные правила вознаграждения амбассадоров акции в марте 2023 г.
boy illustration
Можно ли открыть частный сервер Phantom Beast Palu одним щелчком мыши? Супер простой урок для начинающих! (Прилагается метод обновления сервера)
boy illustration
[Играйте с Phantom Beast Palu] Обновите игровой сервер Phantom Beast Pallu одним щелчком мыши
boy illustration
Maotouhu делится: последний доступный внутри страны адрес склада исходного образа Docker 2024 года (обновлено 1 декабря)
boy illustration
Кодирование Base64 в MultipartFile
boy illustration
5 точек расширения SpringBoot, супер практично!
boy illustration
Глубокое понимание сопоставления индексов Elasticsearch.
boy illustration
15 рекомендуемых платформ разработки с нулевым кодом корпоративного уровня. Всегда найдется та, которая вам понравится.
boy illustration
Аннотация EasyExcel позволяет экспортировать с сохранением двух десятичных знаков.