Содержание этой статьи:
1. Обзор настройки 2. Явление, когда возникает перекос данных 3. Принцип перекоса данных 4. Как найти код, вызывающий искажение данных 5. Выполнение определенной задачи происходит особенно медленно. 6. Память определенной задачи необъяснимым образом переполняется. 7. Проверьте распределение данных по ключам, вызывающим искажение данных. 8. Решения проблемы искажения данных:
Иногда мы можем столкнуться с одной из самых сложных проблем при обработке больших данных — неравномерностью данных. В это время производительность заданий Spark будет намного хуже, чем ожидалось. Настройка перекоса данных заключается в использовании различных технических решений для решения различных типов проблем перекоса данных, чтобы обеспечить производительность заданий Spark.
Большинство задач выполняются очень быстро, но отдельные задачи выполняются крайне медленно. Например, всего задач 1000, и все 997 задач выполняются в течение одной минуты, а оставшиеся две или три задачи занимают час или два. Эта ситуация очень распространена. Задание Spark, которое изначально выполнялось нормально, однажды внезапно сообщило об исключении OOM (недостаточно памяти). Судя по стеку исключений, оно было вызвано написанным нами бизнес-кодом. Такая ситуация относительно редка.
Принцип асимметрии данных прост:существоватьруководитьshuffleкогда,Один и тот же ключ на каждом узле необходимо подтянуть к задаче на определенном узле для обработки,Например, выполните операции агрегации или соединения на основе ключа. В это время, если объем данных, соответствующий определенному ключу, особенно велик,,Произойдет перекос данных. Например, большинство ключей соответствуют 10 фрагментам данных.,Однако отдельные ключи соответствуют 1 миллиону фрагментов данных.,Тогда большинству задач может быть выделено только 10 фрагментов данных.,Тогда он будет запущен за 1 секунду, но отдельным задачам может быть выделено 1 миллион данных;,Бег занимает час или два. поэтому,Ход выполнения всего задания Spark определяется задачей с наибольшим временем выполнения.
Таким образом, при возникновении неравномерности данных задание Spark будет выполняться очень медленно и может даже вызвать переполнение памяти, поскольку объем данных, обрабатываемых определенной задачей, слишком велик.
На рисунке ниже показан очень наглядный пример: ключ hello соответствует в общей сложности 7 фрагментам данных на трех узлах, и эти данные будут задействованы в одной задаче для обработки, в то время как два ключа world и you соответствуют друг другу соответственно; . 1 фрагмент данных, поэтому двум другим задачам необходимо обрабатывать только 1 фрагмент данных соответственно. При этом время выполнения первой задачи может быть в 7 раз больше, чем у двух других задач, а скорость работы всего этапа также определяется самой медленной задачей.
Рассогласование данных происходит только во время процесса перемешивания. Вот список некоторых часто используемых операторов, которые могут запускать операции перемешивания: Different, groupByKey, уменьшитьByKey,агрегатByKey, join, cogroup, перераспределение и т. д. Неравномерность данных может быть вызвана использованием одного из этих операторов в вашем коде.
Первое, на что стоит обратить внимание,Это этап, на котором происходит перекос данных. Если отправлено в режиме Yarn-client,Затем вы можете напрямую просмотреть журнал локально.,В журнале можно узнать, какой этап выполняется в данный момент.,Вы можете прочитать то, что я написал ранееHiveПерекос данных:Hive Обнаружение, устранение и устранение проблем с неравномерностью данных
Если вы отправляете заявку в режиме кластера пряжи, вы можете использовать веб-интерфейс Spark, чтобы проверить, какой этап выполняется в данный момент.
Кроме того, независимо от того, используется ли режим пряжи-клиента или режим пряжи-кластера, мы можем подробно изучить объем данных, выделенных для каждой задачи на текущем этапе в веб-интерфейсе Spark, чтобы дополнительно определить, является ли неравномерное распределение данных задач вызвало наклон данных.
Например, на рисунке ниже в предпоследнем столбце показано время выполнения каждой задачи. Хорошо видно, что некоторые задачи выполняются очень быстро и занимают всего несколько секунд; в то время как некоторые задачи выполняются очень медленно и занимают несколько минут. В этом случае это можно определить только по времени выполнения. перекошенный. Кроме того, в предпоследнем столбце показан объем данных, обработанных каждой задачей. Хорошо видно, что задаче с особенно коротким временем выполнения требуется обработать всего несколько сотен КБ данных, а задаче с особенно длительным временем выполнения. необходимо обработать несколько тысяч КБ данных, объем обрабатываемых данных в 10 раз хуже. На данный момент более вероятно, что произошел перекос данных.
Зная, на каком этапе происходит перекос данных, нам нужно вычислить, какая часть кода соответствует этапу, на котором происходит перекос, исходя из принципа разделения этапов. В этой части кода обязательно будет оператор перемешивания.
Точно рассчитать соответствующую взаимосвязь между этапами и кодами,Требуется глубокое понимание исходного кода Spark.,Здесь мы можем представить относительно простой и практичный метод расчета.:Пока вы видите оператор перемешивания или Spark Существуют операторы, которые вызывают перемешивание операторов SQL (например, group по утверждению), то можно определить, что передняя и задняя стадии разделены, используя это место в качестве границы.。Здесь мы возьмемSparkСамая базовая программа начального уровня——Количество слов в качестве примера,Как с помощью простейшего метода примерно рассчитать код, соответствующий этапу. Пример, как показано ниже,по всему коду,Существует только один оператор уменьшенияByKey, который вызывает перемешивание.,Поэтому его можно считать,Возьмите этот оператор как предел,Он будет разделен на два этапа: передний и задний.
val conf = new SparkConf()
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.collect().foreach(println(_))
Я надеюсь, что посредством анализа программы подсчета слов каждый сможет понять самый основной принцип разделения этапов и то, как выполняется операция перемешивания на границе двух этапов после разделения этапов. Тогда мы знаем, как быстро определить, какая часть кода соответствует этапу, на котором происходит перекос данных. Например, в веб-интерфейсе Spark или в локальном журнале мы обнаруживаем, что определенные задачи на этапе 1 выполняются очень медленно, и определяем, что на этапе 1 имеется неравномерность данных. Затем мы можем вернуться к коду и обнаружить, что этап 1 в основном включает в себя оператор перемешивания уменьшитьByKey. На этом этапе можно в принципе определить, что проблема с искажением данных вызвана оператором сокращениеByKey. Например, если одно слово встречается 1 миллион раз, а другие слова встречаются только 10 раз, то задача на этапе 1 обработает 1 миллион данных, и эта задача замедлит скорость всего этапа.
В этом случае легче найти проблемный код. Мы рекомендуем напрямую просматривать стек исключений в локальном журнале в режиме клиента Yarn или просматривать стек исключений в журнале в режиме кластера пряжи через YARN. Вообще говоря, информацию стека исключений можно использовать для определения того, какая строка вашего кода имеет переполнение памяти. Затем просмотрите эту строку кода. Обычно там присутствует оператор перемешивания. Вероятно, этот оператор вызывает искажение данных. Однако всем следует отметить, что перекос в данных не может быть обусловлен просто случайным переполнением памяти. Из-за ошибок в написанном вами коде и случайных аномалий данных также может произойти переполнение памяти. Таким образом, вам все равно необходимо следовать методу, упомянутому выше, и проверять время выполнения каждой задачи и объем выделенных данных на этапе, когда об ошибке сообщалось через веб-интерфейс Spark, чтобы определить, вызвано ли переполнение памяти неравномерностью данных.
Узнав, где происходит перекос данных, обычно необходимо проанализировать таблицу RDD/Hive, которая выполнила операцию перемешивания и вызвала перекос данных, и проверить распределение ключей в ней. В основном это необходимо для того, чтобы обеспечить основу для выбора технического решения в дальнейшем. В различных ситуациях, когда комбинируются разные распределения ключей и разные операторы тасования, для решения проблемы может потребоваться выбор разных технических решений. На данный момент, в зависимости от ситуации, в которой вы выполняете операцию, существует множество способов просмотра распределения ключей:
Например, для упомянутой выше программы подсчета слов, если определено, что оператор уменьшитьByKey на этапе 1 вызывает искажение данных, вам следует посмотреть распределение ключей в RDD, где выполняется операция уменьшенияByKey. В этом примере она относится к. пары. В следующем примере мы можем сначала выбрать 10% выборочных данных для пар, затем использовать оператор countByKey для подсчета количества вхождений каждого ключа и, наконец, просмотреть и распечатать количество вхождений каждого ключа в выборочных данных на клиент.
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
Применимые сценарии решения:导致Перекос данных的是Hiveповерхность。ЕслиHiveповерхность中的数据本身很不均匀(Например, определенныйkeyПереписка100Тысячи данных,Остальные ключи соответствуют только 10 фрагментам данных),А бизнес-сценарии требуют частого использования Spark для выполнения определенных операций анализа над таблицами Hive.,Тогда целесообразнее использовать это техническое решение.
Идеи реализации схемы:На этом этапе вы можете оценить,Можно ли пройтиHive来руководить数据预处理(то есть пройтиHive ETL заранее агрегирует данные в соответствии с ключом или заранее объединяет их с другими таблицами), а затем источником данных, целевым в задании Spark, является не исходная таблица Hive, а предварительно обработанная таблица Hive. В настоящее время, поскольку данные были агрегированы или объединены заранее, нет необходимости использовать исходный оператор перемешивания для выполнения таких операций в задании Spark.
Принцип реализации решения:这种方案从根源上解决Понятно Перекос данных,Потому что он полностью позволяет избежать выполнения операторов перемешивания в Spark.,Тогда точно не будет проблемы перекоса данных. Но я также хочу напомнить всем здесь,Такой подход лечит симптомы, а не первопричину. Потому что, в конце концов, сами данные имеют проблему неравномерного распределения.,такHive Группировка в ETL Во время операций перемешивания, таких как by или join, асимметрия данных по-прежнему будет происходить, что приводит к тому, что Hive ETL работает очень медленно. Мы просто заранее перенесли возникновение неравномерности данных в Hive. В ETL это делается просто для того, чтобы избежать искажения данных в программе Spark.
Преимущества решения:Просто и удобно реализовать,Эффект очень хороший,Полностью исключает искажение данных,Производительность заданий Spark будет значительно улучшена.
Недостатки решения:Борьба с симптомами, а не с первопричиной,Hive В ETL по-прежнему происходит перекос данных.
Практический опыт программы:в некоторыхJavaСистема иSpark结合использовать的项目中,Будут сценарии, в которых код Java часто вызывает задания Spark.,Более того, производительность выполнения заданий Spark очень требовательна.,就比较适合использовать这种方案。Воля Перекос данных提前到上游的Hive ETL выполняется только один раз в день, и только этот раз относительно медленно. После этого каждый раз, когда Java вызывает задание Spark, скорость выполнения будет очень высокой, что может обеспечить лучшее взаимодействие с пользователем.
Практический опыт проекта:В Мэйтуане·点评的交互式用户行为分析系统中использовать Понятно这种方案,Система в основном позволяет пользователям использовать Java Веб-система отправляет задачи анализа данных и статистики, а серверная часть отправляет задания Spark через Java для анализа данных и статистики. Задания Spark должны выполняться быстро и стараться выполнять их в течение 10 минут. В противном случае скорость будет слишком низкой, и взаимодействие с пользователем будет плохим. Поэтому мы заранее перенесли операции перемешивания некоторых заданий Spark в Hive. В ETL Spark может напрямую использовать предварительно обработанную промежуточную таблицу Hive, максимально сокращая операции перемешивания Spark, значительно повышая производительность и повышая производительность некоторых заданий более чем в 6 раз.
Применимые сценарии решения:Если установлено, что причина наклонаkeyВсего несколько,И если на сам расчет это не оказывает большого влияния,,Поэтому использование этого решения очень удобно. Например, 99% ключей соответствуют 10 фрагментам данных.,Но только один ключ соответствует 1 миллиону данных,Это приводит к искажению данных.
Идеи реализации схемы:Если мы оценим тех немногих, у которых есть особенно большой объем данных,key,Если результаты выполнения и расчета задания не имеют особого значения,Затем просто отфильтруйте эти несколько ключей. например,В Искре Вы можете использовать предложениеwhere в SQL для фильтрации этих ключей или в Spark. Ядро выполняет оператор фильтра на RDD, чтобы отфильтровать эти ключи. Если вам нужно динамически определять, какие ключи содержат наибольший объем данных каждый раз при выполнении задания, а затем фильтровать его, вы можете использовать оператор выборки для выборки RDD, затем вычислить количество каждого ключа и отфильтровать ключ с помощью самый большой объем данных.
Принцип реализации решения:Воля导致Перекос данных的keyПосле фильтрации,Эти ключи не будут участвовать в расчете.,Естественно, невозможно создать искажение данных.
Преимущества решения:Просто реализовать,И эффект тоже очень хороший,Искажения данных можно полностью избежать.
Недостатки решения:Не так много применимых сценариев,большую часть времени,Есть еще много клавиш, вызывающих перекосы,Это не просто несколько человек.
Практический опыт программы:существовать项目中我们也采用过这种方案解决Перекос данных。Однажды я обнаружил, что однаждыSpark作业существовать运行когда突然OOMПонятно,После расследования было обнаружено,Это определенный ключ в таблице Hive, который содержит аномальные данные в этот день.,Это приводит к резкому увеличению объема данных. Таким образом, выборка выполняется перед каждым выполнением.,После расчета ключей с наибольшим объемом данных в выборке,,Отфильтруйте эти ключи прямо в программе.
Применимые сценарии решения:如果我们必须要верно Перекос данных迎难而上,Тогда рекомендуется отдать приоритет этому решению.,Потому что это самое простое решение проблемы неравномерности данных.
Идеи реализации схемы:существоватьверноRDDосуществлятьshuffleВремя оператора,Передайте параметр оператору перемешивания,Например, уменьшитьByKey(1000),该参数就设置Понятно这个shuffle算子осуществлять时shuffle read Количество задач. Для Искры Операторы перемешивания в SQL, такие как группа by, join и т. д., вам необходимо установить параметр, а именно spark.sql.shuffle.partitions, который представляет перетасовку read Параллелизм задач, значение по умолчанию — 200, что слишком мало для многих сценариев.
Принцип реализации решения:Увеличиватьshuffle read Количество задач позволяет назначать несколько клавиш, изначально назначенных одной задаче, нескольким задачам, что позволяет каждой задаче обрабатывать меньше данных, чем изначально. Например, если изначально имеется 5 ключей, каждый ключ соответствует 10 фрагментам данных, и эти 5 ключей назначены задаче, то эта задача будет обрабатывать 50 фрагментов данных. И добавил перемешивание read После задачи каждой задаче присваивается ключ, то есть каждая задача обрабатывает 10 фрагментов данных, поэтому, естественно, время выполнения каждой задачи будет сокращено. Конкретный принцип показан на рисунке ниже.
Преимущества решения:Это относительно просто реализовать,Это может эффективно смягчить и смягчить влияние неравномерности данных.
Недостатки решения:只是缓解Понятно Перекос данных而已,Полностью проблему не устранил,На основе практического опыта,Его эффект ограничен.
Практический опыт программы:该方案通常无法彻底解决Перекос данных,Потому что если возникнут какие-то экстремальные ситуации,Например, объем данных, соответствующий определенному ключу, равен 1 миллиону.,Тогда независимо от того, насколько увеличится количество ваших задач,Этот ключ, соответствующий 1 миллиону данных, обязательно будет назначен задаче на обработку.,Таким образом, неизбежно произойдет искажение данных. Таким образом, это решение можно назвать только первым методом, используемым при обнаружении асимметрии данных.,Просто попробуйте использовать простые методы, чтобы уменьшить искажение данных.,Или его можно использовать в сочетании с другими решениями.
Применимые сценарии решения:верноRDDосуществлятьreduceByKeyКласс агрегацииshuffle算子或者В Искре Использование групп в SQL Это решение больше подходит, когда оператор by используется для агрегации групп.
Идеи реализации схемы:这个方案的核心实现思路就是руководитьдвухэтапная агрегация。第一次是локальная агрегация,Сначала дайте каждому ключу случайное число,Например, случайное число в пределах 10,В это время исходный ключ становится другим.,например(hello, 1) (hello, 1) (hello, 1) (hello, 1), станет (1_привет, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1). Затем выполните операции агрегирования, такие как сокращение по ключу, для данных после добавления случайных чисел для выполнения локальной агрегации. Тогда результат локальной агрегации станет (1_hello, 2) (2_hello, 2). Затем удалите префикс каждого ключа, и он станет (привет, 2) (привет, 2). Выполните операцию глобальной агрегации еще раз, чтобы получить окончательный результат, например (привет, 2). 4)。
Принцип реализации решения:Воля原本相同的keyДобавляя случайный префикс,Станьте несколькими разными ключами,Это позволяет распределять данные, первоначально обработанные одной задачей, по нескольким задачам для локального агрегирования.,Это решает проблему чрезмерного объема данных, обрабатываемых одной задачей. Затем удалите случайный префикс,Снова глобальная агрегация,Вы можете получить окончательный результат. Конкретный принцип смотрите на рисунке ниже.
Преимущества решения:верно于聚合类的shuffleдействовать导致的Перекос данных,Эффект очень хороший. Неравномерность данных обычно можно устранить.,Или, по крайней мере, значительно уменьшить искажение данных.,Улучшите производительность заданий Spark более чем в несколько раз.
Недостатки решения:Применяется только к агрегированным классамshuffleдействовать,Область применения относительно узкая. Если это операция перемешивания класса соединения,Придется использовать другие решения.
// Первым шагом является присвоение каждому ключу в RDD случайного префикса.
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
new PairFunction<Tuple2<Long,Long>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
}
});
// Второй шаг — выполнить частичную агрегацию ключей со случайными префиксами.
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
// Третий шаг — удалить случайный префикс каждого ключа в RDD.
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});
// Четвертый шаг — выполнить глобальную агрегацию на RDD с удалением случайных префиксов.
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
Применимые сценарии решения:существоватьверноRDDиспользоватьjoin类действовать,或者是В Искре Если в SQL используется оператор соединения, а объем данных в RDD или таблице в операции соединения относительно невелик (например, несколько сотен M или один или два G), это решение является более подходящим.
Идеи реализации схемы:不использоватьjoin算子руководить连接действовать,И используйте широковещательные переменные и операторы отображения для реализации операций соединения.,Это полностью позволяет избежать операций перемешивания.,Полностью избегать возникновения и появления перекоса данных. Перенесите данные из меньшего RDD непосредственно в память стороны драйвера через оператор сбора.,Затем создайте для него переменную Broadcast, а затем выполните оператор карты на другом RDD;,внутри операторной функции,Получите полные данные меньшего RDD из переменной Broadcast.,Сравните каждый фрагмент данных с текущим СДР в соответствии с ключом подключения.,Если ключи подключения совпадают,Затем соедините данные двух RDD нужным вам способом.
Принцип реализации решения:ОбычныйjoinДа, могу ходитьshuffleпроцедурный,И однажды перетасовать,就相当于会Воля相同keyДанные переносятся вshuffle read Соединение выполняется снова в задаче, которая в данный момент сокращается. присоединиться. Но если RDD относительно небольшой, вы можете использовать широковещательную рассылку всех данных небольшого оператора RDD + Map, чтобы добиться того же эффекта, что и соединение, то есть Map join,В это время операция перемешивания не будет выполняться.,也就不会发生Перекос данных。Конкретный принцип показан на рисунке ниже.。Преимущества решения:верноjoinдействовать导致的Перекос данных,Эффект очень хороший,Потому что тасования вообще не произойдет,也就根本不会发生Перекос данных。Недостатки решения:Несколько применимых сценариев,Потому что это решение применимо только к одной большой таблице и одной маленькой таблице. Ведь нам нужно транслировать небольшую таблицу,Это потребует больше ресурсов памяти.,Полные данные небольшого RDD будут храниться в памяти драйвера и каждого исполнителя. Если передаваемые нами данные RDD относительно велики,,Например, 10G или более.,Тогда может произойти переполнение памяти. Поэтому он не подходит для ситуаций, когда обе таблицы большие.
// Сначала соберите данные RDD с относительно небольшим объемом данных в драйвере.
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// Затем используйте функцию широковещания Spark для преобразования небольших данных RDD в широковещательные переменные, чтобы у каждого исполнителя была только одна копия данных RDD.
// Это может максимально сэкономить место в памяти и снизить накладные расходы на производительность передачи по сети.
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// Выполните операцию сопоставления с другим RDD вместо операции соединения.
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
throws Exception {
// В операторной функции данные rdd1 в локальном Исполнителе получаются путем широковещательной передачи переменных.
List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
// Данные rdd1 можно преобразовать в карту для облегчения последующих операций соединения.
Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
for(Tuple2<Long, Row> data : rdd1Data) {
rdd1DataMap.put(data._1, data._2);
}
// Получите ключ и значение текущих данных RDD.
String key = tuple._1;
String value = tuple._2;
// Из карты данных rdd1 данные, которые можно объединить, получаются в соответствии с ключом.
Row rdd1Value = rdd1DataMap.get(key);
return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
}
});
// Вот напоминание.
// Вышеупомянутый подход применим только к сценарию, где ключи в rdd1 не повторяются и все уникальны.
// Если в rdd1 имеется несколько одинаковых ключей, вам необходимо использовать операцию класса FlatMap. Вы не можете использовать карту при выполнении соединения. Вместо этого вам нужно просмотреть все данные в rdd1, чтобы присоединиться.
// Каждый фрагмент данных в rdd2 может возвращать несколько фрагментов объединенных данных.
Применимые сценарии решения:дваRDD/Hiveповерхностьруководитьjoinкогда,Если объем данных относительно велик,Невозможно принять «Решение пятое».,Затем вы можете взглянуть на распределение ключей в двух таблицах RDD/Hive. Если происходит перекос данных,Это связано с тем, что объем данных нескольких ключей в одной из таблиц RDD/Hive слишком велик.,Все ключи в другой таблице RDD/Hive распределены равномерно.,Тогда целесообразнее принять это решение.
Идеи реализации схемы:
Принцип реализации решения:верно于join导致的Перекос данных,Если только несколько клавиш вызывают перекос,Несколько ключей можно разделить на независимые RDD.,И добавьте случайные префиксы, чтобы разбить их на n частей для объединения.,В настоящее время данные, соответствующие этим ключам, не будут сосредоточены на нескольких задачах.,Вместо этого он распределяется по нескольким задачам для присоединения. Конкретный принцип смотрите на рисунке ниже.
Преимущества решения:верно于join导致的Перекос данных,Если только несколько клавиш вызывают перекос,Этот метод можно использовать для наиболее эффективного разделения ключей для соединения. И ему нужно расширить только n раз для данных, соответствующих нескольким клавишам наклона.,Нет необходимости расширять все данные. Не занимайте слишком много памяти.
Недостатки решения:Если это вызывает наклонkeyОчень много слов,Например, тысячи ключей приведут к искажению данных.,Тогда этот способ тоже не подходит.
// Сначала выберите 10 % выборочных данных из rdd1, который содержит несколько ключей, вызывающих искажение данных.
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);
// Подсчитайте количество вхождений каждого ключа в выборку данных RDD и отсортируйте их в порядке убывания вхождений.
// Для данных, отсортированных по убыванию, уберите верхнюю часть 1 или топ Данные 100 — это первые n данных с наибольшим количеством ключей.
// Каждый сам решает, сколько ключей с наибольшим объемом данных мы возьмем здесь в качестве примера.
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._1, 1L);
}
});
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
new PairFunction<Tuple2<Long,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._2, tuple._1);
}
});
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
// Разделите ключ, вызывающий искажение данных, из rdd1, чтобы сформировать независимый RDD.
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
});
// Разделите обычные ключи, которые не вызывают искажения данных из rdd1, в независимый RDD.
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return !tuple._1.equals(skewedUserid);
}
});
// Rdd2 — это RDD, в котором все ключи распределены относительно равномерно.
// Здесь данные, соответствующие ранее полученному ключу в rdd2, отфильтровываются, разбиваются на отдельные rdd, а оператор FlatMap используется для расширения данных в rdd в 100 раз.
// Каждому фрагменту данных, который необходимо расширить, присваивается префикс числа от 0 до 100.
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
new Function<Tuple2<Long,Row>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
}).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(
Tuple2<Long, Row> tuple) throws Exception {
Random random = new Random();
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
for(int i = 0; i < 100; i++) {
list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
}
return list;
}
});
// Разделите независимый rdd, вызывающий искажение ключа, от rdd1 и добавьте случайный префикс в пределах 100 к каждому фрагменту данных.
// Затем соедините независимое разделение rdd из rdd1 с независимым разделением rdd из rdd2 выше.
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
})
.join(skewedUserid2infoRDD)
.mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Tuple2<String, Row>> call(
Tuple2<String, Tuple2<String, Row>> tuple)
throws Exception {
long key = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
}
});
// Разделите независимый rdd, содержащий обычные ключи, из rdd1 и напрямую соедините его с rdd2.
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);
// наклонит ключ Результат после соединения такой же, как и при использовании обычного ключа. Результат после соединения — uinon.
// Это окончательный результат соединения.
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);
Применимые сценарии решения:如果существоватьруководитьjoinдействовать时,В RDD имеется большое количество ключей, что приводит к искажению данных.,Тогда нет смысла разбивать ключ.,На этом этапе последнее решение можно использовать только для решения проблемы.
Идеи реализации схемы:
Принцип реализации решения:Воля原先一样的keyСтаньте другим, добавив случайный префиксkey,Затем вы можете распределить эти обработанные «разные ключи» по нескольким задачам обработки.,Вместо того, чтобы одна задача обрабатывала большое количество одинаковых ключей. Отличие этого плана от «Решения шестого» состоит в том, что,Предыдущее решение — попытаться выполнить специальную обработку только данных, соответствующих нескольким ключам перекоса.,Поскольку процесс обработки требует расширения СДР,Таким образом, предыдущее решение не занимает большой объем памяти после расширения RDD, и это решение предназначено для ситуаций, когда имеется большое количество перекошенных ключей.,Невозможно выделить некоторые ключи для отдельной обработки,Таким образом, расширение данных может быть выполнено только на всем СДР.,Высокие требования к ресурсам памяти.
Преимущества решения:верноjoin类型的Перекос данных基本都可以处理,И эффект относительно значительный,Улучшение производительности очень хорошее.
Недостатки решения:该方案更多的是缓解Перекос данных,Вместо того, чтобы полностью избежать искажения данных. И весь РДД надо расширять,Высокие требования к ресурсам памяти.
Практический опыт программы:曾经开发一个数据需求когда,Было обнаружено, что объединение приводило к искажению данных. До оптимизации,Время выполнения задания после оптимизации с использованием данного решения составляет около 60 минут;,Время выполнения сокращено примерно до 10 минут.,Производительность улучшилась в 6 раз.
// Сначала один из RDD с относительно равномерным распределением ключей расширяется в 100 раз.
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
throws Exception {
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
for(int i = 0; i < 100; i++) {
list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
}
return list;
}
});
// Во-вторых, добавьте еще один RDD с ключом асимметрии данных и добавьте случайный префикс в пределах 100 к каждому фрагменту данных.
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
});
// Просто соедините два обработанных RDD.
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);
На практике обнаруживается, что во многих случаях, если вы имеете дело только с относительно простыми сценариями искажения данных, вы можете решить его, используя одно из вышеперечисленных решений. Однако если вы хотите иметь дело с более сложным сценарием неравномерности данных, вам может потребоваться объединить несколько решений. Например, для заданий Spark с несколькими неравномерными связями данных мы можем сначала использовать решения первое и второе для предварительной обработки части данных и фильтрации части данных для ее устранения, во-вторых, мы можем улучшить параллелизм некоторых операций перемешивания и оптимизировать их; Производительность; наконец, вы можете выбрать решение для оптимизации производительности для различных операций агрегации или соединения. Каждому необходимо хорошо понимать идеи и принципы этих решений, а затем гибко использовать различные решения для решения собственных проблем перекоса данных в соответствии с различными практическими ситуациями.