Миграция с Apache Kudu на Apache Hudi
Миграция с Apache Kudu на Apache Hudi

При построении локального центра обработки данных многие пользователи выберут технологический стек Impala/Spark + Kudu из-за хорошей производительности Apache Kudu, функций OLTP и OLAP, а также поддержки Impala SQL и Spark. Однако из-за зависимости Kudu от локального хранилища, приводящей к неспособности поддерживать высокую доступность данных и эластичное расширение, а также постепенной неактивности сообщества, все больше пользователей начали мигрировать на технологический стек Trino/Spark + Hudi на платформе облако. В этой статье на практическом примере рассматривается реконструкция кода и миграция данных, которые происходят в процессе миграции.

1. Текущая ситуация

Следующий случай резюмируется в реальном сценарии: Предположим, мы помогаем типичному независимому поставщику программного обеспечения для цифрового маркетинга в сфере розничной торговли (называемому Компанией C) мигрировать. Их программное обеспечение (платформа) для цифрового маркетинга использует большие данные Интернета, искусственный интеллект и другие. технологии, помогающие корпоративным пользователям создавать портретные аналитические данные, близкие к реальному поведению клиентов. Улучшите качество обслуживания клиентов и добейтесь роста производительности за счет точного контакта и взаимодействия посредством автоматизации маркетинга.

Когда большинство компаний создают свои собственные центры обработки данных, они используют Cloudera Distributed Hadoop (CDH) в качестве платформы для разработки данных. Она включает в себя широко используемые стеки технологий, такие как Spark, Impala, Kudu и т. д. Для конкретных сценариев применения см. ниже. главы. Давайте сначала взглянем на характеристики этих стеков технологий. Давайте сначала договоримся о нескольких названиях, используемых в статье:

  • • Разработчик: относится к пользователям и разработчикам сервисов AWS; в этой статье — к разработчикам компании C.
  • • Продавец: бизнес-пользователи, использующие маркетинговое программное обеспечение компании C, например компания S, которые используют маркетинговое программное обеспечение компании C для обслуживания своих клиентов.
  • • Клиенты: индивидуальные клиенты, обслуживаемые корпоративными пользователями (например, компанией S), которые используют маркетинговое программное обеспечение компании C.

Все коды в этой статье используют таблицу инвентаризации в тестовых данных TPCDS в качестве тестовых данных.

1.1. Введение в CDH

CDH — это версия Hadoop, выпущенная Cloudera, включающая общие компоненты экосистемы Apache Hadoop и специально созданная для удовлетворения потребностей предприятий. CDH предоставляет готовые услуги, необходимые для разработки предприятия. Интегрируя Hadoop с более чем дюжиной других ключевых проектов с открытым исходным кодом, CDH предоставляет полный стек технологий для разработки больших данных предприятия.

В то же время Cloudera создала программное обеспечение для управления, а именно Cloudera Manager, с автоматической установкой кластера, централизованным управлением, мониторингом кластера и функциями сигнализации, что значительно повышает эффективность управления кластером.

Поскольку CDH внесла множество оптимизаций в услуги собственного сообщества, она значительно улучшила стабильность компонентов и совместимость между несколькими компонентами, предоставив разработчикам большое удобство, что сделало CDH предпочтительной платформой для построения локальных центров обработки данных.

1.2. Знакомство с Апач Импала

Impala — это механизм запросов Cloudera, написанный на C++ и основанный на архитектуре MPP (массовая параллельная обработка). Он состоит из различных процессов-демонов, работающих в кластере CDH. Он может использовать такую ​​​​информацию, как база данных и таблица в Metastore Hive. Impala может читать данные таблицы Hive, а также может самостоятельно создавать таблицы, особенно таблицы с данными в Kudu.

Стабильность и скорость Impala, как популярного механизма синтаксического анализа SQL, при обработке запросов Ad-Hoc Query широко проверены в отрасли.

1.3. Введение в Апач Куду

Kudu и Impala — проекты высшего уровня, предоставленные Cloudera Фонду Apache. В качестве базового хранилища Kudu поддерживает запросы KV с высоким уровнем параллелизма и низкой задержкой, сохраняя при этом хорошую производительность сканирования. Эта функция позволяет теоретически учитывать запросы как OLTP, так и OLAP. В качестве механизма запросов Impala изначально поддерживала HDFS. После выпуска Kudu Impala и Kudu были глубоко интегрированы.

Как видно из рисунка ниже, первоначальная цель дизайна Куду — принять во внимание двойные преимущества Parquet (хранилище столбцов, высокая пропускная способность) и HBase (первичный ключ, низкая задержка).

Дизайн архитектуры хранилища Kudu основан на большом опыте HBase. См.: https://kudu.apache.org/docs/index.html. Kudu, основанный на архитектуре хранилища Kudu, предоставляет хорошую функцию Upsert без каких-либо изменений. разработчикам нравятся данные всего раздела. Например, в сценариях моделирования хранилища данных небольшой объем данных в разделе можно часто изменять вместо перезаписи всего раздела.

В то же время Kudu также имеет некоторые ограничения, такие как ограничения первичного ключа, ограничения сегментирования, ограничения размера таблицы... Подробнее об этом см.: https://kudu.apache.org/docs/known_issues.html. В частности, Kudu полагается на локальное хранилище, не может поддерживать методы хранения с высокой доступностью, такие как HDFS или объектное хранилище (например, S3), в результате чего Kudu недостаточно учитывает аварийное восстановление и резервное копирование. В то же время локальное хранилище не может обеспечить полноценное хранилище. и разделение вычислений и эластичные вычисления. Поэтому мы рекомендуем Hudi нашим клиентам как альтернативу Kudu в качестве услуги хранения.

1.4. Введение в Апач Худи.

Apache Hudi (произносится как «худи», полное название: Hadoop Update Delete Incremental, в дальнейшем именуемый Hudi) как платформа озера потоковой передачи данных нового поколения находит все более широкое применение. Это проект Uber с открытым исходным кодом, который может принимать данные с низкой задержкой и сохранять их в HDFS или объектном хранилище (например, S3).

Hudi в полной мере использует файлы хранилища столбцов (Parquet) и хранилища строк (Avro) с открытым исходным кодом в качестве форматов хранения данных и генерирует индексы при записи данных для повышения производительности запросов. Подробную информацию можно найти по адресу: https://hudi. .org/docs/indexing.

Подобно Kudu, Hudi также поддерживает вставки, обновления (Upserts) и удаления на уровне записей, что делает Hudi адаптируемым ко многим сценариям Kudu.

Amazon EMR предоставляет компоненты Hudi, начиная с версий 6.0.0 и 5.28.0. Причины и сценарии, по которым мы рекомендуем использовать Hudi вместо Kudu, включают:

  • • Spark + Hudi может реализовать Spark + Большая часть сцены Куду,НапримерUpsert
  • • Hudi Данные могут быть сохранены в хранилище объектов. (например, S3) начальство,Он обладает уникальными преимуществами для разделения хранилища и вычислений, а также резервного копирования для аварийного восстановления.,В то же время снижается стоимость хранилища (Kudu требует использования локальных SSD-хранилищ, чтобы воспользоваться преимуществами высокой пропускной способности).
  • • Стол Худи поддерживает широко используемые системы запросов, включая Hive, Presto, Трино посещает Стола Худи

Что касается других преимуществ Hudi, таких как кластеризация, индекс метаданных и т. д., мы не использовали их в этой миграции и не будем обсуждать их здесь.

Автор также провел множество тестов, связанных с производительностью. На тех же ресурсах производительность Impala + Kudu, будь то специальный запрос (Ad-Hoc Query) или случайный запрос через JDBC, лучше, чем Trino + Hudi. , но есть проблемы с производительностью, которые можно улучшить и отрегулировать с помощью эластичного расширения Amazon EMR.

Помимо производительности, необходимо также учитывать универсальность и применимость перенесенных компонентов. Например, интеграция и использование других общих компонентов, а также то, является ли стек технологий, используемый в процессах разработки, эксплуатации и обслуживания, универсальным, то есть он не потребует от разработчиков выполнения большого количества рефакторинга кода и не будет отклоняться от обычно используемые и основные технологические стеки. Мы сохраним большую часть кода Spark клиента.

Далее мы будем использовать следующие два сценария, чтобы помочь клиентам перейти с кода Spark/Impala + Kudu на Spark/Trino + Hudi.

2. Сценарий «Досье клиента»

Профили клиентов — это распространенный сценарий, используемый компанией S, продавцом, который использует платформу анализа данных компании C. Она в основном собирает статистику поведения клиентов, организует статистику поведения клиентов, реализует группировку клиентов и подготавливает данные для маркировки клиентов. Характеристики данных в этом сценарии:

  1. 1. Файл клиента состоит из 3-5 таблиц, которые создает продавец. Система генерирует единый первичный ключ для клиента.
  2. 2. Количество столбцов в таблице около 100, столбцы можно добавлять свободно.
  3. 3. Продавец будет обновлять таблицу с нерегулярной частотой. Например, первая партия обновит 1 миллион товаров в первый день, а вторая партия обновит 2 миллиона позиций на следующий день.

Существующая схема архитектуры выглядит следующим образом:

2.1. Реализация в Куду.

Мы сосредоточимся на двух частях. Одна — это реализация записи пользователей в Kudu в режиме реального времени через Java API. Тестовый код для этой части функции выглядит следующим образом:

Язык кода:javascript
копировать
……

try {
    KuduTable kuduTable = kuduClient.openTable(tableName);
    KuduSession kuduSession = kuduClient.newSession();

    kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
    for(int i =0; i < dataArr.length; i++) {
        String dataStr = dataArr[i];
        Upsert upsert = kuduTable.newUpsert();
        PartialRow row = upsert.getRow();
        String[] dataInfo = dataStr.split(",");
        if(dataInfo.length == 4) {
            row.addInt("inv_item_sk", Integer.valueOf(dataInfo[0]));
            row.addInt("inv_warehouse_sk", Integer.valueOf(dataInfo[1]));
            row.addString("inv_date_sk", dataInfo[2]);
            row.addInt("inv_quantity_on_hand", Integer.valueOf(dataInfo[3]));
        }
        kuduSession.apply(upsert);
    }
    kuduSession.flush();
    kuduSession.close();
    System.out.println(" ******** KuduJavaSample -> upsert() successfule ! ");
} catch (KuduException e) {
    e.printStackTrace();
}

    ……

Обратите внимание, что при записи в таблицу Kudu через Java API вам не нужно знать полную схему таблицы Kudu. Другая часть — это пакетная обработка данных таблицы Kudu через клиент Spark Kudu. Тестовый код для этого. часть функции выглядит следующим образом:

Язык кода:javascript
копировать
……

try {
  // Delete the table if it already exists.
  if(kuduContext.tableExists(tableName)) {
    kuduContext.deleteTable(tableName)
  }

……

  val write_rdd = spark.sparkContext.parallelize(write_arr)
  val write_df = spark.createDataFrame(write_arr.asJava, schema_df)

  kuduContext.insertRows(write_df, tableName)

  // Read from the table using an RDD.
  val read_cols = Seq("inv_item_sk", "inv_warehouse_sk", "inv_date_sk", "inv_quantity_on_hand")
  val rdd = kuduContext.kuduRDD(spark.sparkContext, tableName, read_cols)
  rdd.map { case Row(inv_item_sk: Int, inv_warehouse_sk: Int, inv_date_sk: String, inv_quantity_on_hand: Int) => (inv_item_sk, inv_warehouse_sk, inv_date_sk, inv_quantity_on_hand) }.
      collect().foreach(println(_))

  // Upsert some rows.
  val upsert_df = write_df.withColumn("inv_quantity_on_hand", col("inv_quantity_on_hand") + lit(1000))
  kuduContext.upsertRows(upsert_df, tableName)

  // Read the table in SparkSql.
  val read_df = spark.read.option("kudu.master", kuduMaster).
                option("kudu.table", tableName).
                format("kudu").load
  read_df.createOrReplaceTempView(tableName)
  spark.sqlContext.sql(s"select * from $tableName").show

……

} catch {
  case unknown : Throwable => log.error(s"got an exception: " + unknown)

……

Полный тестовый код см. по адресу: https://github.com/xudalei1977/cdh-example. Обратите внимание, что эти тестовые коды предназначены только для демонстрации основных функций и не включают логику бизнес-обработки.

2.2. Реализация в Худи.

Мы заменили Kudu в схеме профиля клиента на Hudi.

Схема модифицированной архитектуры выглядит следующим образом:

Реконструкция кода состоит из трех частей:

  1. 1. Java API изначально был написан непосредственно на Kudu, но теперь он написан на Kafka.
  2. 2. Добавьте часть, в которой Spark Streaming считывает данные Kafka и записывает их в Hudi.
  3. 3. Операция чтения и записи Spark Kudu заменена на чтение и запись Spark Hudi.

Код 1. см. по адресу: https://github.com/xudalei1977/cdh-example, коды 2) и 3) см. по адресу: https://github.com/xudalei1977/emr. -худи-пример.

2.3. Сравнение компонентов.

В контексте файлов клиентов сравнение компонентов Kudu и Hudi выглядит следующим образом:

Сравнить контент

Kudu

Hudi

хранилище

Локальная хранилище, метод никто не выполняет, разделение хранилища и вычислений, а также резервное копирование для аварийного восстановления.

Может хранить в HDFS и хранилище объектов (например, S3).

Адаптивность

Сопоставление с таблицей Impala для доступа к другим компонентам.

Синхронизировано с Hive Metastore для доступа к другим компонентам.

JavaAPI

Главный сервер Kudu предоставляет API

Для доступа необходимо использовать Spark/Trino JDBC.

Upsert

Выполните Upsert через JavaAPI, схема не требуется.

Схему необходимо прочитать или определить заранее.

3. Сценарий «Хранилище данных в реальном времени»

Торговой компании S необходимо создать хранилище данных о поведении клиентов, включая клики для доступа, охват, заказы и другие операции, оценить ценность данных, отсортировать данные в реальном времени, необходимые для маркетинговой деятельности, и добиться повышения коэффициентов конверсии. точная доставка и возврат данных. Характеристики данных в этом сценарии:

  1. 1. Объем данных Больше,таблица фактов (Fact) Имеет измерение времени
  2. 2. Большая часть данных — это новые данные, а источники данных включают офлайн-части и данные в реальном времени.
  3. 3. Общее количество таблиц фактов (Fact) и таблиц измерений (Dim) около 100.

Схема архитектуры выглядит следующим образом:

3.1. Реализация в Куду.

Как видно из диаграммы архитектуры, обработка данных разделена на две части. Impala JDBC записывает в Kudu, что представляет собой чистый оператор SQL; Java API записывает в Kudu в режиме реального времени. Эту часть кода см. к примеру в главе 2.1.

3.2. Реализация в Худи.

Мы будем использовать Impala при архитектурном проектировании хранилища данных реального времени. + Куду заменен Спарком + Hudi. Схема модифицированной архитектуры выглядит следующим образом:

Реконструкция кода состоит из трех частей:

  1. 1. JavaAPI изначально был написан непосредственно в Kudu, но теперь он написан в Kafka.
  2. 2. Spark Streaming Из Кафки Чтение данных и запись Стола Худи
  3. 3. Data ETL между различными уровнями хранилища данных, первоначально использовавшего Impala. Реализация операции SQL Kudu, теперь изменена на Spark. Потоковое чтение и письмо Стол Данные Худи

Код 1. см. по адресу: https://github.com/xudalei1977/cdh-example. В части 2 тестовый код для чтения данных из Kafka и записи их на уровень ODS выглядит следующим образом:

Язык кода:javascript
копировать

……

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", parmas.brokerList)
  .option("subscribe", parmas.sourceTopic)
  .option("startingOffsets", parmas.startPos)
  .option("failOnDataLoss", false)
  .load()
  .repartition(Integer.valueOf(parmas.partitionNum))

val schema = spark.read.format("hudi").load(s"${parmas.hudiBasePath}/${parmas.syncDB}/${parmas.syncTableName}").schema
val schema_bc = spark.sparkContext.broadcast(schema)
val tableType = if (parmas.hudiPartition != null && parmas.hudiPartition.length > 0) "MERGE_ON_READ" else "COPY_ON_WRITE"

val query = df.writeStream
  .queryName("MSK2Hudi")
  .foreachBatch { (batchDF: DataFrame, _: Long) =>
    if(batchDF != null && (!batchDF.isEmpty) ){
      val df = batchDF.withColumn("json", col("value").cast(StringType))
        .select(from_json(col("json"), schema_bc.value) as "data")
        .select("data.*")
        .withColumn("created_ts", lit((new Date()).getTime))
        .filter(genPrimaryKeyFilter(parmas.hudiKeyField))

      writeHudiTable(df, parmas.syncDB, parmas.syncTableName, "upsert", parmas.zookeeperUrl,
                      parmas.hudiKeyField, "created_ts", parmas.hudiPartition, parmas.hudiBasePath, tableType)
    }
  }
  .option("checkpointLocation", parmas.checkpointDir)
  .trigger(Trigger.ProcessingTime(parmas.trigger + " seconds"))
  .start

query.awaitTermination()

……

Часть 3. Тестовый код для чтения данных с уровня ODS и записи их на уровень DW выглядит следующим образом:

Язык кода:javascript
копировать
……

//use item as dimension table
spark.read.format("hudi").
          load("s3://dalei-demo/hudi/kudu_migration/item").
          createOrReplaceTempView("item")

while(true){
  Thread.sleep(parmas.hudiIntervel)

  endTime = DATE_FORMAT.format(new Date())

  spark.read.format("hudi").
        option(QUERY_TYPE.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
        option(BEGIN_INSTANTTIME.key(), beginTime).
        option(END_INSTANTTIME.key(), endTime).
        load("s3://dalei-demo/hudi/kudu_migration/inventory").
        createOrReplaceTempView("inventory")

  val df = spark.sql(
    s"""select in.inv_item_sk, nvl(i.i_brand, 'N/A') as i_brand, in.inv_warehouse_sk, in.inv_date_sk,
      |nvl(in.inv_quantity_on_hand, 0) as inv_quantity_on_hand,
      |${(new Date()).getTime} as created_ts
      |from inventory in left join item i on in.inv_item_sk = i.i_item_sk """.stripMargin)

  if(df.count > 0)
    writeHudiTable(df, parmas.syncDB, parmas.syncTableName, "upsert", parmas.zookeeperUrl,
        "inv_item_sk,i_brand,inv_warehouse_sk", "created_ts", "inv_date_sk", parmas.hudiBasePath, "MERGE_ON_READ")

  beginTime = endTime
}

……

Полный тестовый код см. по адресу: https://github.com/xudalei1977/emr-hudi-example. Обратите внимание, что эти тестовые коды предназначены только для демонстрации основных функций и не включают логику бизнес-обработки.

3.3. Сравнение компонентов.

В сценарии хранилища данных в реальном времени сравнение компонентов Kudu и Hudi выглядит следующим образом:

Сравнить контент

Kudu

Hudi

хранилище

Локальная хранилище, метод никто не выполняет, разделение хранилища и вычислений, а также резервное копирование для аварийного восстановления.

Может хранить в HDFS и хранилище объектов (например, S3).

Эластичный расчет

никто

Может быть достигнуто с помощью автоматического масштабирования

Легко разрабатывать

Разработка Impala SQL относительно проста.

Для Spark Dataframe требуются основы программирования.

инкрементный запрос

никто, нужно фильтровать inuseSQLот полных данных

Обеспечить мгновенное Timeизинкрементный запрос

Случайное чтение и запись

Вы можете думать о Kudu как о базе данных и запрашивать данные, записанные в реальном времени, через Java API.

Нужно использовать Spark/Trino JDBCПриходитьвыполнить Случайное чтение и запись

4. Миграция данных

В предыдущей главе было представлено преобразование соответствующих кодов из Куду в Худи. В то же время необходимо также перенести существующие данные из Куду в Худи. Мы будем рекомендовать клиентам различные решения по миграции, основанные на различных типах таблиц данных и объемах данных.

4.1. План миграции

Сначала выберите разные методы миграции в зависимости от типа таблицы:

  • • Таблица фактов (Факт) : Пакетная миграция исходных данных и инкрементальная Миграция путем записи в Kafka данных
  • • Таблица измерений (Dim): данные изменяются незначительно и могут быть полностью перенесены за один раз.
  • • Таблица агрегации (Агрегация) : По таблице фактови таблица размеры могут быть рассчитаны без миграции и используются в целевой база Получается пересчетом в данные

Во-вторых, в зависимости от объема данных для пакетной миграции исходных данных можно выбрать разные методы миграции:

Объем данных

выделенная линия

Snowball

Используйте Spark для прямого чтения и записи

Использование Kudu Export + Spark для чтения и записи

< 1 TB

рекомендовать

Spark считывает данные таблицы Kudu и записывает их в таблицу Hudi.

< 1 PB

рекомендовать

рекомендовать

Spark считывает данные таблицы Kudu и записывает их в таблицу Hudi.

Kudu экспортирует данные в файлы Parquet,Мигрировать вS3начальство,использоватьSparkписать Стол Худи

> 1 PB

рекомендовать

Kudu экспортирует данные в файлы Parquet,Мигрировать вS3начальство,использоватьSparkписать Стол Худи

Блок-схема реализации миграции данных выглядит следующим образом:

4.2. Конкретные примеры.

Давайте посмотрим на практический пример,Поместите 24 таблицы тестовых данных TPCDS в Kudu.,Мигрировать в находится в S3 начальстве Стол. Худири. Поскольку величина тестовых данных составляет 100G, мы используем отEMR Spark напрямую читает таблицу Kudu и записывает ее в Стол. Худи способ переноса данных. Весь процесс миграции занимает менее 2 часов. источник миграции данныхицелевая база данныхизсредаследующее:

среда

источник данных

целевая база данных

Версия компонента

Kudu 1.10.0

Hudi 0.10.0 (указан через --packages в коде)

платформа

CDH 6.3.2

EMR 5.35.0

Другие компоненты

Impala 3.2.0, Spark 2.4.5

Presto 0.267, Spark 2.4.8

Аппаратные ресурсы

8 nodes m5.2xlarge

8 core nodes, m6g.2xlarge

После определения сетевого подключения между CDH и EMR запустите миграцию. Конкретные шаги включают в себя:

  1. 1. Пакетная миграция исходных данных с использованием EMR Чтение CDH в Spark платформаначальствоизKuduповерхность,писать Стол Худи
  2. 2. Запись дополнительных данных из таблицы Kudu в Kafka, использовать Spark в EMR считывает данные Kafka и записывает их в Стол. Худи
  3. 3. Запустите расчет сводной таблицы в реальном времени.
  4. 4. После завершения миграции сравните источник данныхицелевая база данныхсерединаиз Объем данных

Фрагмент кода для инициализации миграции на шаге 1 выглядит следующим образом:

Язык кода:javascript
копировать
……

// get all tables in the database to migrate.
val allTable = queryByJdbc(parmas.impalaJdbcUrl + parmas.kuduDatabase, "show tables")

if(allTable != null && allTable.isInstanceOf[Seq[String]] && allTable.length > 0) {
  allTable.foreach( tableName => {
    val (primaryKey, partitionKey) = getPrimaryAndPartitionKey(parmas.impalaJdbcUrl + parmas.kuduDatabase, tableName)
    val df = spark.read
      .option("kudu.master", parmas.kuduMaster)
      .option("kudu.table", "impala::" + parmas.kuduDatabase + "." + tableName)
      .format("kudu").load
      .filter(genPrimaryKeyFilter(primaryKey))
      .withColumn("created_ts", lit((new Date()).getTime))
      .repartition(parmas.partitionNum)

    val tableType = if (partitionKey != null && partitionKey.length > 0) "MERGE_ON_READ" else "COPY_ON_WRITE"

    writeHudiTable(df, parmas.syncDB, tableName, "bulk_insert", parmas.zookeeperUrl,
                      primaryKey, "created_ts", partitionKey, parmas.hudiBasePath, tableType)

  })

……

В коде сначала укажите Кудубаза Выберите таблицу в данных, а затем сгенерируйте Стол на основе определения таблицы Kudu. XudeSchema, Включает первичный ключ и ключ раздела. Здесь таблица с разделами рассматривается просто как таблица Mor, а таблица без разделов рассматривается как таблица Cow. Читатели могут самостоятельно добавлять более сложную логику. После подтверждения Стола После указания типа и схемы Худи вызовите функцию пакета, чтобы записать данные в Стол. Худи. Шаг 2. Фрагмент кода для записи дополнительных данных из таблицы Kudu в Kafka выглядит следующим образом:

Язык кода:javascript
копировать
……

val df = spark.read
  .option("kudu.master", parmas.kuduMaster)
  .option("kudu.table", "impala::" + parmas.kuduDatabase + "." + parmas.syncTableName)
  .format("kudu").load
  .withColumn("created_ts", lit((new Date()).getTime))
  .filter(parmas.filterString)
  .select(to_json(struct("*"), Map("dropFieldIfAllNull" -> "false")).as("value"))
  .selectExpr(s"cast('${UUID.randomUUID().toString}' as string)", "cast(value as string)")

df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", parmas.brokerList)
  .option("topic", parmas.sourceTopic)
  .save()

……

Разработчики могут фильтровать дополнительные данные через params.filterString. Шаг 2. Фрагмент кода для чтения дополнительных данных из Kafka и записи их в Hudi выглядит следующим образом:

Язык кода:javascript
копировать
……

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", parmas.brokerList)
      .option("subscribePattern", parmas.sourceTopic)
      .option("startingOffsets", parmas.startPos)
      .option("failOnDataLoss", false)
      .load()
      .repartition(parmas.partitionNum)

val query = df.writeStream
      .queryName("MSK2Hudi")
      .foreachBatch { (batchDF: DataFrame, _: Long) =>
        if(batchDF != null && (!batchDF.isEmpty) )
          writeMultiTable2HudiFromDF(batchDF, parmas.syncDB, "upsert", parmas.zookeeperUrl,
            parmas.hudiBasePath, parmas.impalaJdbcUrl, parmas.kuduDatabase)
      }
      .option("checkpointLocation", parmas.checkpointDir)
      .trigger(Trigger.ProcessingTime(parmas.trigger + " seconds"))
      .start

……

В приведенном выше коде режим сопоставления используется для темы Kafka, которая может одновременно считывать инкрементальные данные нескольких таблиц Kudu. Полный тестовый код см. по адресу: https://github.com/xudalei1977/emr5-hudi-example.

4.3. Краткое описание проблемы

4.3.1. Проблемы с версией.

Spark 3.x не может читать данные Kudu 1.10.0 на CDH 6.3.2, поэтому для их чтения используется EMR 5.35.0. При записи в Hudi вы можете указать версию Hudi как 0.10 с помощью опции –packages. команда искры-отправки.

4.3.2. Примечание: org.apache.hudi.Exception.HoodieException: поле (Part -) не найдено в записи. Приемлемые поля были…….

Это связано с тем, что данные, считанные из Kudu, не содержат ключа предварительного объединения. Вы можете добавить в код поле в качестве ключа предварительного объединения, и значением может быть текущее время.

4.3.3. Ошибка выполнения: вызвана: java.lang.IllegalArgumentException: невозможно создать путь из пустой строки…….

Если Kudu не использует раздел, эта ошибка появится в Spark 2.4.8 (EMR 5.35.0). Учитывая, что таблицы, не использующие Partition, относительно небольшие, вся таблица записывается в Kafka, а затем Kafka читается из Spark 3.1.2 (EMR 6.5.0) и записывается в Hudi.

4.3.4. Ошибка выполнения: To_json не содержит поле значения «null».

Поскольку данные, записанные в Kafka (Поле значения имеет формат json) Нет полей, содержащих нулевые значения, поэтому следуйте Столу Схема Худи не выровнена. При записи таблицы отKudu в Kafka можно указать поля, содержащие нулевые значения. Вы также можете сначала использовать существующий Стол Худи читает схему для анализа данных Kafka, а затем записывает ее в Стол. Худи。

5. Некоторые предложения по использованию Kudu и Hudi

5.1. Какие сценарии подходят для использования Куду?

Сценарии, подходящие для Куду, включают:

  1. 1. Предлагает как OLTP, так и OLAP. Запросизсцена
  2. 2. Случайные запросы с высокой степенью параллелизма, особенно запросы, которые запрашивают данные, записанные немедленно.
  3. 3. Используется для горячих данных, нет необходимости учитывать проблему потери данных.
  4. 4. Простая разработка хранилища данных для самостоятельного использования.
  5. 5. Обычно используется деловыми людьми, которые в значительной степени полагаются на SQL.

5.2. Какие сценарии подходят для использования Hudi?

Сценарии, подходящие для Hudi, включают:

  1. 1. Обратите внимание навыполнить Эластичный сцена расчета и безопасности данных
  2. 2. сцена склада умного озера
  3. 3. Большое количество использования инкрементных запросов, например, более сложное хранилище данных в реальном времени.
  4. 4. Сохранение данных в хранилище объектов (например, S3) начальство, проверка сцены между несколькими компонентами службы
  5. 5. использовать Разработка основных стеков технологий с открытым исходным кодомсцена

5.3. Можно ли развернуть Kudu непосредственно на EMR?

Версии Impala и Kudu, созданные сообществом, можно развернуть непосредственно на EMR. но нетрекомендоватьсделай это,Это не только увеличивает объем работ по эксплуатации и техническому обслуживанию.,Это также повлияет на автоматическое расширение и сжатие узлов EMR.

5.4. Версия Hudi, используемая в EMR.

Пакет jar, зависящий от Hudi, представлен в EMR, его версию можно найти по адресу https://docs.aws.amazon.com/emr/latest/ReleaseGuide/Hudi-release-history.html , Вообще говоря, версия Hudi, поддерживаемая в EMR, будет немного позже версии сообщества. Многие разработчики предпочитают использовать версию Hudi сообщества в EMR. В EMR это не так. 6.5.0 Раньше проблем не былоиз。послеизEMRВерсия,Изменен интерфейс Spark, работающего с классом PartitionedFile.,Вызывает несовместимость с версией сообщества Hudi.,Таким образом, Hudi от EMR использует пакет Jar.,Вместо указания версии сообщества Hudi через –packages.

5.5. Обобщить преимущества миграции.

В целом переход с Kudu на CDH на Hudi на EMR имеет следующие преимущества:

  1. 1. Cloud Native: разделение хранилища и вычислений, эластичное масштабирование, оптимизация затрат.
  2. 2. Стабильность: управляемые услуги, высокая доступность, сокращение операций и обслуживания.
  3. 3. Открытость: компоненты сообщества с открытым исходным кодом.

Справочная документация:

https://hudi.apache.org/docs/indexing

https://kudu.apache.org/docs/security.html

https://kudu.apache.org/docs/known_issues.html

https://www.cloudera.com/content/dam/www/marketing/resources/webinars/apache-kudu-technical-deep-dive.png.landing.html

https://blog.cloudera.com/how-to-use-impala-with-kudu/

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/Hudi-release-history.html

https://parquet.apache.org/

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi.html

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-presto.html

boy illustration
Углубленный анализ переполнения памяти CUDA: OutOfMemoryError: CUDA не хватает памяти. Попыталась выделить 3,21 Ги Б (GPU 0; всего 8,00 Ги Б).
boy illustration
[Решено] ошибка установки conda. Среда решения: не удалось выполнить первоначальное зависание. Повторная попытка с помощью файла (графическое руководство).
boy illustration
Прочитайте нейросетевую модель Трансформера в одной статье
boy illustration
.ART Теплые зимние предложения уже открыты
boy illustration
Сравнительная таблица описания кодов ошибок Amap
boy illustration
Уведомление о последних правилах Points Mall в декабре 2022 года.
boy illustration
Даже новички могут быстро приступить к работе с легким сервером приложений.
boy illustration
Взгляд на RSAC 2024|Защита конфиденциальности в эпоху больших моделей
boy illustration
Вы используете ИИ каждый день и до сих пор не знаете, как ИИ дает обратную связь? Одна статья для понимания реализации в коде Python общих функций потерь генеративных моделей + анализ принципов расчета.
boy illustration
Используйте (внутренний) почтовый ящик для образовательных учреждений, чтобы использовать Microsoft Family Bucket (1T дискового пространства на одном диске и версию Office 365 для образовательных учреждений)
boy illustration
Руководство по началу работы с оперативным проектом (7) Практическое сочетание оперативного письма — оперативного письма на основе интеллектуальной системы вопросов и ответов службы поддержки клиентов
boy illustration
[docker] Версия сервера «Чтение 3» — создайте свою собственную программу чтения веб-текста
boy illustration
Обзор Cloud-init и этапы создания в рамках PVE
boy illustration
Корпоративные пользователи используют пакет регистрационных ресурсов для регистрации ICP для веб-сайта и активации оплаты WeChat H5 (с кодом платежного узла версии API V3)
boy illustration
Подробное объяснение таких показателей производительности с высоким уровнем параллелизма, как QPS, TPS, RT и пропускная способность.
boy illustration
Удачи в конкурсе Python Essay Challenge, станьте первым, кто испытает новую функцию сообщества [Запускать блоки кода онлайн] и выиграйте множество изысканных подарков!
boy illustration
[Техническая посадка травы] Кровавая рвота и отделка позволяют вам необычным образом ощипывать гусиные перья! Не распространяйте информацию! ! !
boy illustration
[Официальное ограниченное по времени мероприятие] Сейчас ноябрь, напишите и получите приз
boy illustration
Прочтите это в одной статье: Учебник для няни по созданию сервера Huanshou Parlu на базе CVM-сервера.
boy illustration
Cloud Native | Что такое CRD (настраиваемые определения ресурсов) в K8s?
boy illustration
Как использовать Cloudflare CDN для настройки узла (CF самостоятельно выбирает IP) Гонконг, Китай/Азия узел/сводка и рекомендации внутреннего высокоскоростного IP-сегмента
boy illustration
Дополнительные правила вознаграждения амбассадоров акции в марте 2023 г.
boy illustration
Можно ли открыть частный сервер Phantom Beast Palu одним щелчком мыши? Супер простой урок для начинающих! (Прилагается метод обновления сервера)
boy illustration
[Играйте с Phantom Beast Palu] Обновите игровой сервер Phantom Beast Pallu одним щелчком мыши
boy illustration
Maotouhu делится: последний доступный внутри страны адрес склада исходного образа Docker 2024 года (обновлено 1 декабря)
boy illustration
Кодирование Base64 в MultipartFile
boy illustration
5 точек расширения SpringBoot, супер практично!
boy illustration
Глубокое понимание сопоставления индексов Elasticsearch.
boy illustration
15 рекомендуемых платформ разработки с нулевым кодом корпоративного уровня. Всегда найдется та, которая вам понравится.
boy illustration
Аннотация EasyExcel позволяет экспортировать с сохранением двух десятичных знаков.