Mysql To ES By Flink-CDC
Mysql To ES By Flink-CDC

В этой статье рассказывается, как реализовать синхронизацию данных CDC почти в реальном времени из Mysql в ES через Flink.

CDC — это аббревиатура (Change Data Capture). Основная идея состоит в том, чтобы отслеживать и фиксировать изменения в базе данных (включая вставку данных или таблицы данных INSERT,

Обновить UPDATE, удалить DELETE и т. д.), полностью записать эти изменения в том порядке, в котором они происходят, и записать их в промежуточное программное обеспечение сообщений, чтобы другие службы могли подписаться и использовать их.

В настоящее время большинство методов flink cdc в ES, представленных на рынке, заключаются в использовании клиента flink sql для создания исходной таблицы и синхронизации таблицы mysql, построения терминальной таблицы для синхронизации связанного индекса ES и постановки задачи синхронизации.

Язык кода:javascript
копировать
insert into es_table select * from mysql_table;

Полная синхронизация данных из Mysql в ES в реальном времени, полагаясь на ядро ​​flink, что очень просто. Однако если вам необходимо выполнить обработку данных во время процесса CDC, вам необходимо вручную установить CDC.

1. Экологическая подготовка

Mysql 8.0

ElasticSearch 7.16

Flink 1.14.4

JDK 1.8

pom-файл

Язык кода:javascript
копировать
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>1.15.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>   <artifactId>flink-walkthrough-common_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
2. Подключитесь к Mysql, чтобы получить бинлог. Datastream
Язык кода:javascript
копировать
RestHighLevelClient  client = new RestHighLevelClient(RestClient.builder(new HttpHost("es-ip", 9200, "http")));
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("ip")
                .port(3306)
                .databaseList("database_name")
                .tableList("table_name")
                .username("root")
                .password("password")
             .deserializer(newJsonDebeziumDeserializationSchema())
                .build();

Запрашивать mysql для нового binlog каждые три секунды

Язык кода:javascript
копировать
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        DataStream<String> input = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1);
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("es-ip", 9200, "http"));
3. Парсинг бинлога - соответствует погружению в ES после обработки данных в ES
Язык кода:javascript
копировать
//Создаем ElasticsearchSink sink to es
        ElasticsearchSink.Builder<String> esSinkBuilder = 
        new ElasticsearchSink.Builder<>(httpHosts,
        new ElasticsearchSinkFunction<String>() {
        //Добавляем данные в ES
        public IndexRequest createIndexRequest(
        String index,HashMap<String,Object> map) 
       {
       return Requests.indexRequest().index(index).source(map);
       }
        //Удалить данные в ES
       public void delete(String index,String element,
       String before_after) throws IOException {
       System.out.println("Удалить эти данные");
       client.delete(Requests.deleteRequest(index)
       .id(getID(element,before_after)),
       RequestOptions.DEFAULT);
       }
        //В соответствии с полями в бинлоге выполняем множественное сопоставление для запроса идентификатора данных в ES
        public String getID(String element,String before_after) throws IOException {
        JSONObject object = JSON.parseObject(element);
        JSONObject json_value =object.getJSONObject(before_after);
        if(json_value.toString().equals("null")){
           System.out.println("Это удаленный бинлог, данные удалены и не могут быть найдены");
              return "";
           }
        int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value.entrySet();

        HashMap<String,Object> map = new HashMap<>();
        //Получаем все значения ключей в этом json через итератор
        for (Map.Entry<String, Object> entry : entrySet) {
            map.put("field"+i,entry.getKey());
            map.put("value"+i,entry.getValue());
            i++;
            }
        //Добавляем запрос на сопоставление полей
        MultiSearchRequest request = new MultiSearchRequest();
        SearchRequest firstSearchRequest = new SearchRequest();
        for (i = 0; i < entrySet.size(); i++) {
            SearchSourceBuilder searchSourceBuilder = 
            new SearchSourceBuilder();
        //Несколько запросов
searchSourceBuilder.query(QueryBuilders.matchQuery(map.get("field"+i).toString(), map.get("value"+i).toString()));
                              firstSearchRequest.source(searchSourceBuilder);
        request.add(firstSearchRequest);
        }
        //Получаем соответствующий идентификатор данных в ответе
        MultiSearchResponse response = client.msearch
        (request, RequestOptions.DEFAULT);
        MultiSearchResponse.Item firstResponse = response
        .getResponses()[0];
        SearchResponse searchResponse=firstResponse.getResponse();
        SearchHits hits = searchResponse.getHits();
        return firstResponse.getResponse().toString()
        .contains("\"hits\":[]") ? «пустые данные» : hits.getHits()[0].getId();
        }
        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        // Оцениваем бинлог и «инкрементируем» данные ES | "удалить" | Действие «Изменить»
        String index = "mysql_es";
        if(element.contains("\"before\":null")){   
        //Разбираем и добавляем данные в binlog
        JSONObject json_value = JSON.parseObject(element)
        .getJSONObject("after");
        int i = 0;
        Set<Map.Entry<String, Object>> entrySet = json_value
        .entrySet();
        HashMap<String,Object> map = new HashMap<>();
        //Получаем все ключи в этом json через итератор value
        for (Map.Entry<String, Object> entry : entrySet) {
                map.put(entry.getKey(),entry.getValue());
            }
                                      indexer.add(createIndexRequest(index,map));
         }else if (element.contains("\"after\":null")){         
         //Разбираем и удаляем данные binlog
         try {
             delete(index,element,"before");
            } catch (IOException e) {
             System.out.println("Выполняется исключение");
             throw new RuntimeException(e);
            }
          }else if (!element.contains("\"after\":null") && !element.contains("\"before\":null)")){
          try {
              delete(index,element,"before");  
              //Разбираем бинлог данных обновления
             } catch (IOException e) {
               throw new RuntimeException(e);
            }

           JSONObject json_value = JSON.parseObject(element)
           .getJSONObject("after");

           Set<Map.Entry<String, Object>> entrySet = json_value
           .entrySet();

           HashMap<String,Object> map = new HashMap<>();
            //Получаем все значения ключей в этом json через итератор
           for (Map.Entry<String, Object> entry : entrySet) {
                  map.put(entry.getKey(),entry.getValue());
               }
                            indexer.add(createIndexRequest(index,map));
           }else {
           System.out.println("binlog находится за пределами допустимого диапазона");
             }
            }
          }
        );
4. Настроить ES sink
Язык кода:javascript
копировать
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
        esSinkBuilder.setRestClientFactory(
                restClientBuilder -> {}
        );

// finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());

        env.execute();
Ограничения программы
  1. Неприменимо в сценариях, где в базе данных MySQL есть одни и те же данные, таблица MySQL должна иметь первичный ключ.
  2. Точки останова не поддерживаются, и программа будет повторно синхронизироваться при каждом запуске.
  3. Сценарии вложения полей (многослойный JSON) не рассматриваются.

я участвуюНа третьем этапе специального тренировочного лагеря Tencent Technology Creation 2023 года будет проводиться конкурс сочинений. Соберите команду, чтобы выиграть приз!

boy illustration
Учебное пособие по Jetpack Compose для начинающих, базовые элементы управления и макет
boy illustration
Код js веб-страницы, фон частицы, код спецэффектов
boy illustration
【новый! Суперподробное】Полное руководство по свойствам компонентов Figma.
boy illustration
🎉Обязательно к прочтению новичкам: полное руководство по написанию мини-программ WeChat с использованием программного обеспечения Cursor.
boy illustration
[Забавный проект Docker] VoceChat — еще одно приложение для мгновенного чата (IM)! Может быть встроен в любую веб-страницу!
boy illustration
Как реализовать переход по странице в HTML (html переходит на указанную страницу)
boy illustration
Как решить проблему зависания и низкой скорости при установке зависимостей с помощью npm. Существуют ли доступные источники npm, которые могут решить эту проблему?
boy illustration
Серия From Zero to Fun: Uni-App WeChat Payment Practice WeChat авторизует вход в систему и украшает страницу заказа, создает интерфейс заказа и инициирует запрос заказа
boy illustration
Серия uni-app: uni.navigateЧтобы передать скачок значения
boy illustration
Апплет WeChat настраивает верхнюю панель навигации и адаптируется к различным моделям.
boy illustration
JS-время конвертации
boy illustration
Обеспечьте бесперебойную работу ChromeDriver 125: советы по решению проблемы chromedriver.exe не найдены
boy illustration
Поле комментария, щелчок мышью, специальные эффекты, js-код
boy illustration
Объект массива перемещения объекта JS
boy illustration
Как открыть разрешение на позиционирование апплета WeChat_Как использовать WeChat для определения местонахождения друзей
boy illustration
Я даю вам два набора из 18 простых в использовании фонов холста Power BI, так что вам больше не придется возиться с цветами!
boy illustration
Получить текущее время в js_Как динамически отображать дату и время в js
boy illustration
Вам необходимо изучить сочетания клавиш vsCode для форматирования и организации кода, чтобы вам больше не приходилось настраивать формат вручную.
boy illustration
У ChatGPT большое обновление. Всего за 45 минут пресс-конференция показывает, что OpenAI сделал еще один шаг вперед.
boy illustration
Copilot облачной разработки — упрощение разработки
boy illustration
Микросборка xChatGPT с низким кодом, создание апплета чат-бота с искусственным интеллектом за пять шагов
boy illustration
CUDA Out of Memory: идеальное решение проблемы нехватки памяти CUDA
boy illustration
Анализ кластеризации отдельных ячеек, который должен освоить каждый&MarkerгенетическийВизуализация
boy illustration
vLLM: мощный инструмент для ускорения вывода ИИ
boy illustration
CodeGeeX: мощный инструмент генерации кода искусственного интеллекта, который можно использовать бесплатно в дополнение к второму пилоту.
boy illustration
Машинное обучение Реальный бой LightGBM + настройка параметров случайного поиска: точность 96,67%
boy illustration
Бесшовная интеграция, мгновенный интеллект [1]: платформа больших моделей Dify-LLM, интеграция без кодирования и встраивание в сторонние системы, более 42 тысяч звезд, чтобы стать свидетелями эксклюзивных интеллектуальных решений.
boy illustration
LM Studio для создания локальных больших моделей
boy illustration
Как определить количество слоев и нейронов скрытых слоев нейронной сети?
boy illustration
[Отслеживание целей] Подробное объяснение ByteTrack и детали кода