В этой статье рассказывается, как реализовать синхронизацию данных CDC почти в реальном времени из Mysql в ES через Flink.
CDC — это аббревиатура (Change Data Capture). Основная идея состоит в том, чтобы отслеживать и фиксировать изменения в базе данных (включая вставку данных или таблицы данных INSERT,
Обновить UPDATE, удалить DELETE и т. д.), полностью записать эти изменения в том порядке, в котором они происходят, и записать их в промежуточное программное обеспечение сообщений, чтобы другие службы могли подписаться и использовать их.
В настоящее время большинство методов flink cdc в ES, представленных на рынке, заключаются в использовании клиента flink sql для создания исходной таблицы и синхронизации таблицы mysql, построения терминальной таблицы для синхронизации связанного индекса ES и постановки задачи синхронизации.
insert into es_table select * from mysql_table;
Полная синхронизация данных из Mysql в ES в реальном времени, полагаясь на ядро flink, что очень просто. Однако если вам необходимо выполнить обработку данных во время процесса CDC, вам необходимо вручную установить CDC.
Mysql 8.0
ElasticSearch 7.16
Flink 1.14.4
JDK 1.8
pom-файл
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-walkthrough-common_${scala.binary.version}
</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("es-ip", 9200, "http")));
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("ip")
.port(3306)
.databaseList("database_name")
.tableList("table_name")
.username("root")
.password("password")
.deserializer(newJsonDebeziumDeserializationSchema())
.build();
Запрашивать mysql для нового binlog каждые три секунды
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
DataStream<String> input = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(1);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("es-ip", 9200, "http"));
//Создаем ElasticsearchSink sink to es
ElasticsearchSink.Builder<String> esSinkBuilder =
new ElasticsearchSink.Builder<>(httpHosts,
new ElasticsearchSinkFunction<String>() {
//Добавляем данные в ES
public IndexRequest createIndexRequest(
String index,HashMap<String,Object> map)
{
return Requests.indexRequest().index(index).source(map);
}
//Удалить данные в ES
public void delete(String index,String element,
String before_after) throws IOException {
System.out.println("Удалить эти данные");
client.delete(Requests.deleteRequest(index)
.id(getID(element,before_after)),
RequestOptions.DEFAULT);
}
//В соответствии с полями в бинлоге выполняем множественное сопоставление для запроса идентификатора данных в ES
public String getID(String element,String before_after) throws IOException {
JSONObject object = JSON.parseObject(element);
JSONObject json_value =object.getJSONObject(before_after);
if(json_value.toString().equals("null")){
System.out.println("Это удаленный бинлог, данные удалены и не могут быть найдены");
return "";
}
int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value.entrySet();
HashMap<String,Object> map = new HashMap<>();
//Получаем все значения ключей в этом json через итератор
for (Map.Entry<String, Object> entry : entrySet) {
map.put("field"+i,entry.getKey());
map.put("value"+i,entry.getValue());
i++;
}
//Добавляем запрос на сопоставление полей
MultiSearchRequest request = new MultiSearchRequest();
SearchRequest firstSearchRequest = new SearchRequest();
for (i = 0; i < entrySet.size(); i++) {
SearchSourceBuilder searchSourceBuilder =
new SearchSourceBuilder();
//Несколько запросов
searchSourceBuilder.query(QueryBuilders.matchQuery(map.get("field"+i).toString(), map.get("value"+i).toString()));
firstSearchRequest.source(searchSourceBuilder);
request.add(firstSearchRequest);
}
//Получаем соответствующий идентификатор данных в ответе
MultiSearchResponse response = client.msearch
(request, RequestOptions.DEFAULT);
MultiSearchResponse.Item firstResponse = response
.getResponses()[0];
SearchResponse searchResponse=firstResponse.getResponse();
SearchHits hits = searchResponse.getHits();
return firstResponse.getResponse().toString()
.contains("\"hits\":[]") ? «пустые данные» : hits.getHits()[0].getId();
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
// Оцениваем бинлог и «инкрементируем» данные ES | "удалить" | Действие «Изменить»
String index = "mysql_es";
if(element.contains("\"before\":null")){
//Разбираем и добавляем данные в binlog
JSONObject json_value = JSON.parseObject(element)
.getJSONObject("after");
int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value
.entrySet();
HashMap<String,Object> map = new HashMap<>();
//Получаем все ключи в этом json через итератор value
for (Map.Entry<String, Object> entry : entrySet) {
map.put(entry.getKey(),entry.getValue());
}
indexer.add(createIndexRequest(index,map));
}else if (element.contains("\"after\":null")){
//Разбираем и удаляем данные binlog
try {
delete(index,element,"before");
} catch (IOException e) {
System.out.println("Выполняется исключение");
throw new RuntimeException(e);
}
}else if (!element.contains("\"after\":null") && !element.contains("\"before\":null)")){
try {
delete(index,element,"before");
//Разбираем бинлог данных обновления
} catch (IOException e) {
throw new RuntimeException(e);
}
JSONObject json_value = JSON.parseObject(element)
.getJSONObject("after");
Set<Map.Entry<String, Object>> entrySet = json_value
.entrySet();
HashMap<String,Object> map = new HashMap<>();
//Получаем все значения ключей в этом json через итератор
for (Map.Entry<String, Object> entry : entrySet) {
map.put(entry.getKey(),entry.getValue());
}
indexer.add(createIndexRequest(index,map));
}else {
System.out.println("binlog находится за пределами допустимого диапазона");
}
}
}
);
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {}
);
// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
env.execute();