Оглавление
1. Принцип синхронизации данных DataX2. Полностью синхронизированная реализация3. Мысли об инкрементной синхронизации4. План реализации добавочной синхронизации5. О высокой доступности DataXссылка
DataX — это версия с открытым исходным кодом интеграции данных Alibaba Cloud DataWorks, инструмента/платформы для автономной синхронизации данных, широко используемой в Alibaba Group. DataX реализует эффективную синхронизацию данных между различными разнородными источниками данных, включая MySQL, Oracle, OceanBase, SqlServer, Postgre, HDFS, Hive, ADS, HBase, TableStore (OTS), MaxCompute (ODPS), Hologres, DRDS, databend и т. д. Функция.
Сам DataX, как платформа автономной синхронизации данных, построен с использованием архитектуры плагинов Framework +. Абстрактное чтение и запись источников данных в подключаемые модули Reader/Writer и включение их во всю структуру синхронизации.
Версия DataX 3.0 с открытым исходным кодом поддерживает многопоточный режим на одном компьютере для выполнения синхронного выполнения задания. На следующем рисунке представлена временная диаграмма жизненного цикла задания DataX. На ней кратко объясняется взаимосвязь между каждым модулем DataX в общей конструкции архитектуры. .
Например, пользователь отправил задание DataX и настроил 20 параллелизмов для синхронизации данных MySQL из 100 таблиц с ODPS. Идея принятия решений по планированию DataX заключается в следующем:
Datax использует среды Python, Java и Maven. Рекомендуется установить python2.7, jdk1.8 и maven3.6. Процесс установки не будет подробно описан.
mkdir -p /opt/tools/datax
cd /opt/tools/datax
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
tar zxf datax.tar.gz -C /usr/local/
rm -rf /usr/local/datax/plugin/*/._*
Просмотр шаблонов синхронизации:
python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter
Формат выходного шаблона:
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
],
"password": "",
"preSql": [],
"session": [],
"username": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
Измените его в соответствии с форматом шаблона, замените источник данных и целевые таблицы, а также секреты учетной записи и создайте сценарий *.json в каталоге /usr/local/datax/script:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "source_user",
"password": "pwd",
"column": ["*"],
"splitPk": "ID",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://sourceHost:port/database"
],
"table": ["source_table"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://targetHost:port/database",
"table": ["target_table"]
}
],
"password": "pwd",
"preSql": [
"truncate target_table"
],
"session": [
"set session sql_mode='ANSI'"
],
"username": "target_user",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "5",
"byte" : 5242880
}
}
}
}
Следует отметить следующее:
Выполняем скрипт синхронизации:
python /usr/local/datax/bin/datax.py /usr/local/datax/script/xxx.json
Но есть проблема. Команда синхронная. Окно должно быть активным во время процесса выполнения, и клиент не может быть отключен, иначе задача будет прекращена. Поскольку журнал выполнения является консольным выводом, журнал выполнения невозможно просмотреть. после завершения выполнения, поэтому нам необходимо изменить команду на выполнение без приостановки и вывести журнал выполнения в указанный каталог. Замените его следующей командой:
nohup python /usr/local/datax/bin/datax.py /usr/local/datax/script/xxx.json >> /var/log/datax/xxx.log 2>&1 &
Таким образом, нет необходимости постоянно блокировать окно, и ssh-клиент не будет отключен по сетевым причинам, что приведет к завершению задачи. Если вы видите в журнале следующее содержимое, это означает, что задача синхронизации успешно выполнена:
Конечно, нам не нужно выполнять полную синхронизацию каждый раз, когда мы синхронизируем данные. Итак, если некоторые таблицы уже были синхронизированы один раз, как выполнить инкрементальную синхронизацию?
Прежде всего, Datax — это синхронизация одной таблицы, поэтому, если нам нужно выполнить инкрементальную синхронизацию, нам нужно знать, какова «сумма» приращения и каковы правила измерения.
Поскольку тенденция роста таблицы неопределенна, начальное и конечное значения id для инкрементальной синхронизации определить невозможно, а тенденцию роста id нельзя использовать в качестве правила измерения. Однако время — это инкрементный показатель, на который мы можем рассчитывать. и определить, например, синхронизацию T+1. То есть синхронизировать данные предыдущего дня 24 часа, синхронизировать раз в 5 минут и т. д. Мы можем определить моменты начала и окончания с помощью простых вычислений.
Конечно, инкрементальная синхронизация не означает синхронизацию в режиме реального времени или почти в реальном времени. Она больше используется в сценариях резервного копирования данных и автономных вычислений. Datax сам по себе не очень хорош для выполнения этих задач. требований реального времени, вы можете использовать другие методы, такие как инструменты синтаксического анализа binlog и т. д. Следовательно, инкрементную синхронизацию, о которой мы здесь говорим, также можно понимать как сценарии данных, в которых данные не будут меняться, или как данные с относительно коротким жизненным циклом.
В качестве обратного примера, для данных о возврате электронной коммерции временной цикл бизнес-сценария относительно длинный, поэтому от создания заказа на возврат до возврата товаров на склад может пройти несколько дней, а также может потребоваться десять с половиной месяцев или даже больше, чтобы пересечь границу в течение длительного времени, тогда в определенной степени или в определенном диапазоне данных использовать Da нецелесообразно. Tax выполняет инкрементную синхронизацию, поскольку данные, синхронизированные в T+1 или T+n, все равно могут измениться. Если обработка не будет выполнена, возникнут проблемы со строгостью и точностью синхронизированных данных. Если будет выполнена компенсационная обработка, процесс синхронизации завершится. изменить заново. Это чрезвычайно сложно, поэтому лучше рассмотреть другие, более качественные и подходящие решения.
Таким образом, использование Datax для выполнения инкрементной синхронизации данных в автономном режиме больше подходит для сценариев, в которых жизненный цикл данных относительно короткий, таких как пополнение счета, снятие средств, игровые заказы и т. д., а также для сценариев, в которых точность периферийных данных невысока.
Причина, по которой ее называют инкрементной синхронизацией, заключается в том, что это либо запуск в реальном времени, либо запуск с фиксированной частотой, а Datax больше подходит для запуска с фиксированной частотой. Фиксированная частота не может избежать планирования. Datax — это автономный инструмент синхронизации, поэтому мы можем рассмотреть возможность регулярного запуска на основе crontab планирования, поставляемого с системой Linux, или с использованием платформы планирования с открытым исходным кодом.
Используйте встроенную функцию планирования системы Linux crontab. Например, синхронизируйте один раз в день. Сценарий оболочки вычисляет время, а затем прозрачно передает его в условиеwhere файла конфигурации json Datax с помощью команды. можно рассчитать следующим образом:
where create_time >= UNIX_TIMESTAMP(date_sub(curdate(),interval 1 day))
and create_time < UNIX_TIMESTAMP(curdate());
Читающую часть сценария синхронизации можно изменить следующим образом:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "source_user",
"password": "pwd",
"column": ["*"],
"splitPk": "ID",
"where": "create_time >= UNIX_TIMESTAMP(date_sub(curdate(),interval 1 day))
and create_time < UNIX_TIMESTAMP(curdate())";
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://sourceHost:port/database"
],
"table": ["source_table"]
}
]
}
},
"writer": ...
}
],
"setting": {
"speed": {
"channel": "5",
"byte" : 5242880
}
}
}
}
Затем напишите расписание crontab:
crontab -e
0 0 1 * * ? * nohup python /usr/local/datax/bin/datax.py /usr/local/datax/script/xxx.json >> /var/log/datax/xxx.log 2>&1 &
Выполняйте задачу каждый день в час ночи, чтобы синхронизировать данные предыдущего дня и тем самым добиться дополнительной синхронизации. Еще следует отметить, что условия инкрементной синхронизации требуют индекса, в противном случае можно легко загрузить процессор экземпляра базы данных.
Аналогичным образом мы можем использовать относительно зрелое в отрасли решение по планированию для запуска команд синхронизации для добавочной синхронизации. Например, xxl-job поддерживает задачи планирования оболочки.
Перейдите к машине Datax, чтобы написать сценарий оболочки:
#!/bin/bash
nohup python /usr/local/datax/bin/datax.py /usr/local/datax/script/xxx.json >> /var/log/datax/xxx.log 2>&1 &
Создайте задачу оболочки:
Напишите сценарий оболочки исполнителя:
Исходный код задачи-скрипта размещается в диспетчерском центре, а логика сценария запускается в исполнителе. Когда запускается задача сценария, исполнитель загружает исходный код сценария для создания файла сценария на компьютере исполнителя, а затем вызывает сценарий через код Java и записывает журнал вывода сценария в файл журнала задачи в режиме реального времени; что его можно просмотреть в режиме реального времени в диспетчерском центре. Следить за выполнением скрипта.
Следует отметить, что,Даже если вы просто хотите запланировать оболочку,Исполнитель по-прежнему необходим (конкретную причину можно найти в принципе выполнения задачи оболочки linkxxl-job),Вы можете написать простой исполнитель с помощью ссылкиxxl-job-executor-sample-springboot.
Но этот метод немного невзрачный. Нужно ли мне развертывать набор Java-сервисов, чтобы выполнить скрипт? В основном используется частота планирования xxl-задания, а затем, когда достигается узел времени выполнения, сценарий оболочки может выполняться удаленно, и можно решить беспарольный вход в систему и разрешения на выполнение между компьютерами в интрасети.
Datax сам по себе является автономным инструментом синхронизации и, естественно, автономен. Так как же обеспечить высокую доступность, например, микросервисов или распределенного планирования?
Конечно, мы можем рассмотреть реализацию планирования xxl-задания. Когда кластер исполнителей будет развернут, все узлы будут зарегистрированы для администратора. Затем при запуске задачи исполнитель будет выбран в соответствии с политикой для выполнения. сообщается об ошибке или выполнение завершается неудачно, таким же образом можно выполнить такое же выполнение команд синхронизации Datax. Вы можете себе представить, что мы регистрируем все машины, на которых развернут инструмент синхронизации DataX, в xxl-задании. платформу планирования, настройте стратегию планирования и отправьте команды удаленного сценария оболочки для выполнения синхронизации при запуске задачи. Если выполнение не удалось, попробуйте другое.
К счастью, кто-то уже это сделал. Веб-инструмент DataX был написан на основе DataX. Его интеграция и вторичная разработка xxl-job реализуют инкрементальную синхронизацию данных на основе времени и автоматического увеличения первичных ключей. «Исполнитель» задачи поддерживает развертывание кластера, выбор стратегии многоузловой маршрутизации исполнителя, контроль тайм-аута, повторную попытку сбоя, сигнализацию сбоя, зависимость задачи, процессор исполнителя, память, мониторинг нагрузки и т. д.
Подробности можно найти в документе по ссылке:
https://github.com/WeiYe-Jing/datax-web
https://github.com/alibaba/DataX/blob/master/introduction.md
https://blog.csdn.net/zhuyu19911016520/article/details/124143716
https://github.com/WeiYe-Jing/datax-web