При обработке больших данных в режиме реального времени, будь то с использованием Spark или Flink, взаимодействие данных с другими компонентами должно иметь смысл. Во всей обработке потока данных производительность интерактивных компонентов определяет эффективность обработки данных. Например, при взаимодействии с промежуточным программным обеспечением кэша Redis слишком высокий QPS приведет к слишком медленному отклику, что проявится в общей задержке обработки данных. программа.
Обеспечение производительности компонентов стало главным приоритетом, поэтому при выборе компонентов мы будем использовать их проверенные показатели производительности в качестве ориентира. При потоковой обработке больших данных в реальном времени Kafka является наиболее часто используемым компонентом источника данных. Его механизм секционирования улучшает параллелизм, а механизм копирования обеспечивает высокую доступность данных.
Кроме того, нулевое копирование, последовательное чтение и запись на диск, а также индексирование файлов данных значительно повысили производительность Kafka. По мере изменения версий Kafka Kafka выросла до такой степени, что больше не использует Zookeeper для управления метаданными и контроля узлов. Поэтому сегодня я буду следовать официальной документации, использовать версию Kafka 3.7.0 и использовать Docker для сборки Kafka на облачном сервере.
Используйте docker для сборки kafka, не нужно учитывать платформу и среду, просто используйте docker pull, чтобы напрямую получить образ. В официальной документации также приведены команды.
Выполните команду, чтобы получить образ Кафки.
docker pull apache/kafka:3.7.0
Извлечение не удалось, что привело к ошибке «отсутствует ключ подписи». Сначала я подумал, что это проблема со хранилищем образов, но позже обнаружил, что версия докера на облачном хосте слишком старая.
Используйте docker --version, чтобы убедиться, что текущая версия docker — 1.13.1, поэтому удалите docker и переустановите его.
yum -y remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-engine
Выполните приведенную выше команду, чтобы удалить пакеты и зависимости, связанные с Docker.
После завершения удаления переустанавливаем новую версию докера.
yum install -y yum-utils
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum install -y docker-ce docker-ce-cli containerd.io
systemctl restart docker
Добавьте источник docker-ce yum, установите docker-ce и перезапустите службу docker. Когда мы снова проверяем версию докера, она стала 26.1.4.
Затем мы снова успешно извлекли изображение.
Далее мы можем использовать только что полученное изображение для запуска контейнера Kafka.
docker run -d --name kafka -p 9092:9092 apache/kafka:3.7.0
Проверьте журнал запуска:
Таким образом создается новый контейнер Kafka, а также у нас есть одноузловая Kafka. Из журнала нетрудно увидеть, что в контейнере Kafka не запускается Zookeeper, а есть KafkaRaftServer (аббревиатура: KRaft). KRaft заменяет Zookeeper и запускается на узле Kafka.
В настоящее время Kafka предоставляет два метода запуска: KRaft и Zookeeper, и существуют различия как в конфигурации server.properties, так и в методе запуска кластера. Это будет обсуждаться позже при построении кластера. Здесь мы можем увидеть процесс контейнера Kafka.
Теперь, когда брокерская служба Kafka работает в Docker, и если мы хотим подключиться к этой Kafka в Linux, нам понадобятся некоторые команды Kafka. Итак, нам нужно загрузить установочный пакет Kafka. После его распаковки в каталоге bin мы найдем ряд команд Kafka для управления Kafka.
Фактически, наиболее часто используемыйkafka-topic.sh、kafka-console-consumer.sh、kafka-console.producer.shЭти три команды,соответствующий соответственноtopicуправлять、Потребление、Произведите три операции.
В Kafka данные хранятся в темах, поэтому нам необходимо создать темы. Перед созданием мы можем проверить, есть ли темы в кластере Kafka.
# Посмотреть тему
kafka-topics.sh --bootstrap-server localhost:9092 --list
# Создать тему
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic aqi_test --partitions 10 --replication-factor 1
Создайте файл с именемaqi_testизtopic,Разделов установлено на 10.,Реплика установлена на 1.
Поскольку существует только один узел брокера Kafka, когда я впервые устанавливаю реплику на 2, будет сообщено об ошибке.
мы используемkafka-console-producerКaqi_testэтотtopicсерединапроизводственные данные。
kafka-console-producer.sh --topic aqi_test --bootstrap-server localhost:9092
Эта командная строка запустит интерактивный сеанс. После того, как мы введем строку данных, мы запишем ее в тему с помощью возврата каретки и перевода строки.
Как показано на рисунке, мы записали всего четыре фрагмента данных.
использоватьkafka-console-consumeПотреблениеtopicсерединапродюсер пишетизданные。
kafka-console-consumer.sh --topic aqi_test --bootstrap-server localhost:9092 --from-beginning
В механизме потребления в Kafka, если при потреблении не указана специальная конфигурация, потребитель может использовать только самые последние данные. Другими словами, потребитель может использовать данные только после запуска потребителя.
Хотетьиспользоватьfrom-beginning,или в Потреблениеили конфигурациясерединаобозначениеearliestПотребление Стратегия。
Как показано на рисунке, мы использовали четыре фрагмента данных, записанные ранее.
Это я использую докера Создайте один узел на облачном сервереKafkaизпроцесс。А потому что это облачный сервер,нуждатьсяиспользоватьЭластичный публичный IP-адреспосетить,И официальное изображениесерединаизadvertised.listenersВнешняя трансляцияиз Адрес
localhost, поэтому я не могу получить доступ к этим данным Kafka на своем ноутбуке.
Само собой разумеется, что вы можете войти в контейнер Kafka через docker exec и изменить Advertised.listeners в server.properties. Однако этот файл имеет разрешения только для чтения и не может быть изменен. Поэтому следующая статья в основном будет решать проблему недоступности из внешней сети.