Что такое CDC?
CDC — это аббревиатура (Change Data Capture). Основная идея состоит в том, чтобы отслеживать и фиксировать изменения в базе данных (включая INSERT данных или таблиц данных, обновление UPDATE, удаление DELETE и т. д.), полностью записывать эти изменения в том порядке, в котором они происходят, и записывать их в файл промежуточное программное обеспечение для других служб. Подписка и потребление.
Flink_CDC
Примечание. Если Hadoop не установлен, вы можете использовать автономную среду Flink напрямую, без пряжи.
Загрузите пакеты зависимостей flink со следующих двух адресов и поместите их в каталог lib.
Если у вас Flink другой версии, вы можете скачать его [здесь] https://repo.maven.apache.org/maven2/org/apache/flink.
Примечание. Моя версия куста — 2.1.1. Почему я выбрал здесь номер версии 2.2.0? Это соответствие версий, указанное в официальном документе:
Metastore version | Maven dependency | SQL Client JAR |
---|---|---|
1.0.0 - 1.2.2 | flink-sql-connector-hive-1.2.2 | Download |
2.0.0 - 2.2.0 | flink-sql-connector-hive-2.2.0 | Download |
2.3.0 - 2.3.6 | flink-sql-connector-hive-2.3.6 | Download |
3.0.0 - 3.1.2 | flink-sql-connector-hive-3.1.2 | Download |
Официальный адрес документа: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/hive/overview/, вы можете просмотреть его самостоятельно.
bin/yarn-session.sh -d -s 2 -jm 1024 -tm 2048 -qu root.sparkstreaming -nm flink-cdc-hive
bin/sql-client.sh embedded -s flink-cdc-hive
img
1) Предпочтительно создать каталог
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'
);
Обратите внимание: hive-conf-dir — это адрес файла конфигурации вашего куста, который должен содержать основной файл конфигурации hive-site.xml. Вы можете скопировать эти файлы конфигурации с узла куста на этот компьютер.
2) Запрос
На этом этапе нам следует выполнить несколько обычных операций DDL, чтобы проверить, есть ли какие-либо проблемы с конфигурацией:
use catalog hive_catalog;
show databases;
Запросить любую таблицу
use test
show tables;
select * from people;
Об ошибке может быть сообщено:
Поместите Hadoop-mapreduce-client-core-3.0.0.jar в каталог Lib файла Flink. Фактический выбор зависит от вашей версии Hadoop.
Примечание. Это очень важно. После размещения этого jar-пакета в Lib вам необходимо перезапустить приложение, а затем снова запустить приложение с помощью Yarn-session. Поскольку я обнаружил, что кеш, похоже, есть, закройте приложение и перезапустите. это:
Затем данные можно запросить, и результаты запроса будут следующими:
Данные Mysql не могут быть напрямую импортированы в улей в Flink sql. Их необходимо разделить на два этапа:
Что касается инкрементной синхронизации данных MySQL с Kafka, то анализ есть в предыдущей статье, и здесь я не буду делать обзор на синхронизацию данных Kafka с Hive;
1) Создайте таблицу и свяжите ее с Кафкой:
Раньше MySQL синхронизировался с Kafka, и в Flink sql создавалась таблица с разъемом = 'upsert-kafka'. Здесь есть разница:
CREATE TABLE product_view_mysql_kafka_parser(
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp
) WITH (
'connector' = 'kafka',
'topic' = 'flink-cdc-kafka',
'properties.bootstrap.servers' = 'kafka-001:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
2) Создайте таблицу ульев
создаватьhiveНеобходимо указатьSET table.sql-dialect=hive;
,В противном случае моргните sql Командная строка не распознает этот синтаксис создания таблицы. Зачем это нужно? Вы можете прочитать этот документ Hive? диалект.
-- Создание операции куста пользователя каталога
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf.cloudera.hive'
);
use catalog hive_catalog;
-- Вы можете увидеть, какие базы данных находятся в нашем улье
show databases;
use test;
show tables;
Выше мы теперь можем видеть, какие базы данных и таблицы находятся в кусте дальше, создайте таблицу куста:
CREATE TABLE product_view_kafka_hive_cdc (
`id` int,
`user_id` int,
`product_id` int,
`server_id` int,
`duration` int,
`times` string,
`time` timestamp
) STORED AS parquet TBLPROPERTIES (
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='0S',
'sink.partition-commit.policy.kind'='metastore,success-file',
'auto-compaction'='true',
'compaction.file-size'='128MB'
);
Затем выполните синхронизацию данных:
insert into hive_catalog.test.product_view_kafka_hive_cdc
select *
from
default_catalog.default_database.product_view_mysql_kafka_parser;
Примечание. Здесь указывается имя таблицы. Я использую каталог.база данных.таблица. Поскольку это две разные библиотеки, этот формат требует явного указания каталога-базы данных.
В Интернете есть и другие решения, касающиеся добавочной синхронизации MySQL с Hive в реальном времени:
Я увидел в Интернете статью о плане архитектуры хранилища данных в реальном времени и подумал, что все в порядке:
Ссылки
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/hive/hive_dialect/
Отказ от ответственности:Статьи, опубликованные в этом общедоступном аккаунте, являются оригинальными для этого общедоступного аккаунта.,или представляет собой редактирование и компиляцию отличных статей, найденных в Интернете.,Авторские права на статью принадлежат первоначальному автору,Он предназначен только для изучения и ознакомления читателей. Для неоригинальных статей, которыми поделились,Некоторые потому, что настоящий источник не может быть найден.,В случае неправильного указания источника ответственность за использованные в статье картинки, ссылки и т.п., включая, помимо прочего, программное обеспечение, материалы и т.п., несет автор.,Если есть нарушения,Пожалуйста, свяжитесь с серверной частью напрямую,Опишите конкретные статьи,Серверная часть устранит неудобства как можно скорее.,Глубоко сожалею.