MapReduce — это модель программирования (без концепции кластера задачи будут передаваться для запуска в кластер Yarn), используемая для параллельных операций с крупномасштабными наборами данных (более 1 ТБ). Понятия «Карта» и «Сокращение», являющиеся их основными идеями, заимствованы из функциональных языков программирования, а также функции, заимствованные из языков векторного программирования. Это значительно облегчает программистам запуск собственных программ в распределенных системах без знания распределенного параллельного программирования.
Текущая реализация программного обеспечения определяет функцию Map для сопоставления набора пар ключ-значение с новым набором пар ключ-значение и определяет параллельную функцию уменьшения, чтобы гарантировать, что все сопоставленные пары ключ-значение имеют один и тот же набор. ключей. (MapReduce практически больше не используется на предприятиях, просто разберитесь в этом немного).
Описание: В чем MapReduce не хорош (медленно!)
Сейчас MapReduce постепенно заменяется такими фреймворками, как Spark и Flink. Но идеи важны и заслуживают изучения. Подробнее о больших данных Hadoopрядиз Учебные статьи,Видеть:Атака на серию больших данных,Эта серия постоянно обновляется.
Примечание. В принципе, MapReduce делится на два этапа: Задача сопоставления и Задача сокращения. Однако, поскольку этап перетасовки очень важен, этот этап искусственно разделен между задачами сопоставления и задачами сокращения. понимается как вторая половина сегмента задачи «Карта» и первая половина задачи «Сокращение».
Тогда должно быть что-то, чтобы решить, как его разделить, а именно InputFormat, а размер разделения обычно определяется размером блока HDFS.
По поводу четвертого пункта: Например, если есть 3 файла, один 300М, второй 50М, третий 50М, то всего будет вырезано 5 MapTasks.
Для каждого файла первые 300 МБ были разрезаны на 3 файла, вторые 50 МБ были разрезаны на один файл, а третьи 50 МБ были разрезаны на один файл, всего получилось 5 файлов.
А если есть только один файл размером 128М+1КБ,Тогда будет разделен только один,Поскольку правило оценки нарезки->Если файл меньше размера фрагмента1.1раз,Сразуи上один切片将Сразу放существовать一起了,Это предотвратит слишком малыйиз Срез в действииизкогда,Планирование ресурсовиз Время превышает время выполненияиз Состояние。
Процесс выполнения задания в основном включает в себя следующие этапы:
Блок-схема процесса выполнения конкретного задания показана ниже:
существоватьMRиз Вызов по кодуwaitForCompletion()
метод,Инкапсулированный внутриJob.submit()
метод,иJob.submit()
метод里面会创建одинJobSubmmiterобъект。当我们существоватьwaitForCompletion(true)
час,ноwaitForCompletion
метод会每秒轮询Операцияиз Ход выполнения,Если вы обнаружили разницу между статусом из и последним запросом,затем распечатайте детали на консоли. Если задание выполнено успешно,Показать счетчик вакансий,否но将导致Операция失败из Записыватьвыходутешать。
1. Когда диспетчер ресурсов вызывается через метод submitApplication, запрос передается планировщику пряжи, а затем планировщик выделяет контейнер (container0) в диспетчере узлов для запуска приложения. мастер-процесс (основной класс — MRAppMaster). После запуска процесса он зарегистрируется у менеджера ресурсов и сообщит свою собственную информацию. Мастер также может отслеживать текущее состояние карты и уменьшать ее. Поэтому применение masterверно Инициализация задания Создав несколько книгобъект以保持верно Операция进度изотслеживать。
2. Мастер приложения получает файлы ресурсов, jar-файлы, информацию о сегментировании, информацию о конфигурации и т. д. во временном общем каталоге HDFS при отправке задания. И создайте объект карты для каждого шарда, а количество редюсеров определите через параметр mapreduce.job.reduces (задание задается через метод setNumReduceTasks()).
3. Мастер приложения определит, использовать ли режим uber (задание и мастер приложения выполняются на одной и той же JVM, то есть Maptask и Reductask выполняются на одном и том же узле) для запуска задания. Условия работы режима uber: количество карт меньше 10, 1 сокращение, а входные данные меньше одного блока HDFS.
Вы можете передать параметры:
mapreduce.job.ubertask.enable #Включить ли режим uber
mapreduce.job.ubertask.maxmaps #Максимальное количество карт для ubertask
mapreduce.job.ubertask.maxreduces #Максимальное количество редюсеров для ubertask
mapreduce.job.ubertask.maxbytes #ubertask максимальный размер задания
4. Мастер приложения вызывает метод setupJob, чтобы установить для OutputCommiter и FileOutputCommiter значение по умолчанию, что означает создание конечного выходного каталога и временной рабочей области для вывода задач.
Подробнее о больших данных Hadoopрядиз Учебные статьи,Видеть:Атака на серию больших данных,Эта серия постоянно обновляется.
1. Когда мастер приложения определяет, что задание не соответствует режиму uber, мастер приложения подает заявку на контейнеры ресурсов для задач карты и сокращения от менеджера ресурсов.
2. Первым шагом является выдача запроса на приложение ресурсов для задачи карты. Когда 5% задач карты будут выполнены, будет сделан запрос на ресурсы, необходимые для задачи сокращения.
3. В процессе распределения задачи задача сокращения может выполняться на любом узле узла данных, но при выполнении задачи сопоставления необходимо учитывать механизм локализации данных. При указании ресурсов для задачи каждая карта и сокращение по умолчанию имеют объем памяти 1 ГБ. который можно настроить с помощью следующих параметров:
mapreduce.map.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.memory.mb
mapreduce.reduce.cpu.vcores
После того как мастер приложения отправляет приложение, менеджер ресурсов выделяет ресурсы по требованию. В это время мастер приложения связывается с менеджером узлов для запуска контейнера. Эту задачу выполняет Java-приложение основного класса YarnChild. Прежде чем запускать задачу, сначала локализуйте необходимые ресурсы, включая конфигурацию задания, jar-файлы и т. д. Следующий шаг — запустить карту и сократить задачи. YarnChild работает в отдельной JVM.
Каждое задание и каждая его задача имеет статус: статус задания или задачи (выполняется, успешно, не удалось и т. д.), ход выполнения карты и сокращения, значение счетчика заданий, сообщение о состоянии или описание при Когда задание выполняется, клиент может напрямую взаимодействовать с мастером приложения и каждую секунду опрашивать статус выполнения, ход выполнения и другую информацию о задании (можно установить с помощью параметра mapreduce.client.progressmonitor.pollinterval).
Mapreduce гарантирует, что входные данные каждого сокращения сортируются в соответствии со значением ключа. Система выполняет сортировку и использует входные данные карты в качестве входных данных сокращения. Этот процесс называется процессом перемешивания. Перемешивание также является ключевой частью нашей оптимизации. Блок-схема перемешивания показана ниже:
Меньше размера осколка*1,1
)создатьmapОперация,Затем настройтеизmapметод进行自定义излогический расчет,После завершения расчета он будет записан на локальный диск.。mapreduce.map.sort.spill.percent
Модификация элемента конфигурации),Будет запущен поток сброса содержимого буфера памяти на диск (spill to disk),Этот поток переполнения независим,не влияетmap向буфер写результатизнить,существовать溢写приезжать磁盘изпроцесссередина,карта продолжает вводиться в буфер,Если буфер заполнен во время,ноmapЗапись будет заблокирована на переполненный диск.процесс Заканчивать。Перезапись осуществляется путем опросаиз方式将буферв Запись памяти в локальныйmapreduce.cluster.local.dir
в каталоге。существовать溢写приезжать磁盘之前,Мы будем знать количество сокращения,Тогда разделы будут разделены по количеству сокращаемых,По умолчанию на основеhashpartitionверно溢写изданные写入приезжать相верно应из Раздел。существоватькаждый Разделсередина,Фоновый поток будет сортироваться по ключу,Итак, переполнение записывается на дискиздокумент是Раздел且排序из。если естьcombinerфункция,它существовать排序后извыходбегать,делатьmapвыходболее компактный。Уменьшить запись на дискизданныеипередано вreduceизданные。mapreduce.task.io.sort.factor
контроль每次可以合并多少个документ。mapreduce.map.output.compress.codec
Контроль параметров。mapreduce.reduce.shuffle.parallelcopies
контроль。mapreduce.reduce.shuffle.input.buffer.percent
обозначение。один разReducer所существовать节点из内存буфер达приезжать阀值,Или количество файлов в буфере достигает порога,Переполнение слияния записывается на диск.Подробнее о больших данных Hadoopрядиз Учебные статьи,Видеть:Атака на серию больших данных,Эта серия постоянно обновляется.
Wordcount, который подсчитывает количество вхождений каждого слова в большом количестве файлов, часто используется в качестве вводного примера MapReduce. Основной процесс обработки выглядит следующим образом:
MapReduce делит весь процесс выполнения задания на два этапа: этап карты и этап сокращения.
Mapper отвечает за «разделение», то есть разложение сложных задач на несколько «простых задач» для обработки. «Простая задача» имеет три значения:
Фаза карты состоит из определенного количества задач карты и включает в себя следующие шаги:
Редуктор отвечает за обобщение результатов этапа карты.
Фаза сокращения состоит из определенного количества задач сокращения и включает в себя следующие шаги:
Если взять в качестве примера Wordcount, то внутренний процесс выполнения MapReduce показан на рисунке ниже:
Внешняя физическая структура показана на рисунке ниже:
Комбайнер можно рассматривать как локальный редуктор. После завершения расчета Mapper значения, соответствующие одному и тому же ключу, объединяются (пример Wordcount), как показано на следующем рисунке:
Комбайнер обычно имеет ту же логику, что и Редуктор. Использование Комбайнера имеет следующие преимущества:
Следует отметить, что не все сценарии MapReduce могут использовать Объединитель. Обычно можно использовать сценарии, в которых можно накапливать результаты вычислений, например Sum. Другие сценарии, например усреднение, не могут использовать Объединитель.
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>D:\software\hadoop\data\namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>D:\software\hadoop\data\datanode</value>
</property>
<!-- mapred-site.xml-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
set JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_271
set HADOOP_LOG_DIR=%HADOOP_LOG_DIR%\log
Один MapReduce делится на: Mapper, Редюсер. и Driver。 Несколько MapReduce также могут выполняться последовательно.
1. Этап картографа
(1) Пользовательский Mapper должен наследовать свой собственный родительский класс.
(2) Входные данные Mapper представлены в виде пар KV (тип KV можно настроить).
(3) Бизнес-логика Mapper записана в методе map().
(4) Выходные данные Mapper представлены в виде пар KV (тип KV можно настроить)
(5)map()метод(MapTaskпроцесс)верно每один<K,V>позвони один раз
2. Ступень редуктора
(2) Тип входных данных Редюсера соответствует типу выходных данных Mapper, который также является KV.
(3) Бизнес-логика Редюсера написана в методе сокращения().
(4)ReduceTaskпроцессверно每一组相同kиз<k,v>组позвони один разreduce()метод
3. Водительский этап
Эквивалент клиента кластера YARN, используемый для отправки всей нашей программы в кластер YARN.
Объект задания, инкапсулирующий соответствующие рабочие параметры программы MapReduce.
Описание случая: Прочитайте файл, разбейте его содержимое на слова на основе пробелов и, наконец, выведите количество вхождений каждого слова в отсортированном порядке.
<!-- MAVENСумка-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
//2. Написание локальных тестов кода mapreduce (два распространенных способа).
//3. Загрузите jar-пакет на сервер для запуска.
hadoop jar apache-hadoop-1.0-SNAPSHOT.jar com.wxl.hadoop.mapReduce.wordCount.WordCountDriver /mapreduce/input/word.txt /mapreduce/output
package com.wxl.hadoop.mapReduce.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Date;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
///Локальное тестирование, формальную среду необходимо закомментировать//
Date date = new Date();//Убедитесь, что выходной каталог не повторяется
args = new String[]{"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\input\\word.txt",
"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\output\\" + date.getTime()};
// 1 Получите информацию о конфигурации и получите job объект
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 Связанная версия Driver процедурный jar
job.setJarByClass(WordCountDriver.class);
// 3 ассоциация Mapper и Reducer из jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 настраивать Mapper выходиз kv тип
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 настраиватьфинальныйвыход kv тип
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 настраиватьвходитьивыходпуть
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 представлять на рассмотрение job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
//Этап карты
class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Текст(); //Разделить слова
IntWritable v = new IntWritable(1);//Счет
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 Получить ряд
String line = value.toString();
// 2 Вырезано пространством
String[] words = line.split(" ");
// 3 выход
for (String word : words) {
//Исключаем нулевые значения
if (word.trim() == "" || word.length() == 0) {
continue;
}
System.out.println("mapвыход>>>" + word);
// настройкивыходизkey - это сокращение слов
k.set(word);
// Считать по словам
context.write(k, v);
}
}
}
//Reducer
class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 Собрать и найти
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 выход
v.set(sum);
// 得приезжатьфинальныйизрезультат
context.write(key, v);
}
}
package com.wxl.hadoop.mapReduce.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Date;
public class WordCountMapReduce extends Configured implements Tool {
public static void main(String[] args) throws Exception {
///Локальное тестирование, формальную среду необходимо закомментировать//
Date date = new Date();//Убедитесь, что выходной каталог не повторяется
args = new String[]{"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\input\\word.txt",
"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\output\\" + date.getTime()};
// run job
int status = ToolRunner.run(new WordCountMapReduce(), args);
// exit program
System.exit(status);
}
// Driver
public int run(String[] args) throws Exception {
// 1 Получите информацию о конфигурации и получите job объект
Configuration conf = super.getConf();
//настраиватьjobимя
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
// 2 Связанная версия Driver процедурный jar
job.setJarByClass(this.getClass());
// 3 ассоциация Mapper и Reducer из jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 настраивать Mapper выходиз kv тип
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 настраиватьфинальныйвыход kv тип
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 настраиватьвходитьивыходпуть
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// submit job
boolean status = job.waitForCompletion(true);
return status ? 0 : 1;
}
//Этап карты
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Текст(); //Разделить слова
IntWritable v = new IntWritable(1);//Счет
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 Получить ряд
String line = value.toString();
// 2 Вырезано пространством
String[] words = line.split(" ");
// 3 выход
for (String word : words) {
//Исключаем нулевые значения
if (word.trim() == "" || word.length() == 0) {
continue;
}
System.out.println("mapвыход>>>" + word);
// настройкивыходизkey - это сокращение слов
k.set(word);
// Считать по словам
context.write(k, v);
}
}
}
//Reducer
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 Собрать и найти
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 выход
v.set(sum);
// 得приезжатьфинальныйизрезультат
context.write(key, v);
}
}
}
Подробнее о больших данных Hadoopрядиз Учебные статьи,Видеть:Атака на серию больших данных,Эта серия постоянно обновляется.
MapReduce предоставляет пользователям простой интерфейс программирования, а уровень инфраструктуры автоматически выполняет сложные базовые детали обработки, такие как хранение распределения данных, передача данных и отказоустойчивая обработка. Пользователям нужно использовать интерфейс только для реализации собственной логики обработки данных. Другими словами, написание распределенной программы — это то же самое, что написание простой последовательной программы. Именно благодаря этой особенности программирование MapReduce стало очень популярным.
Когда ваши вычислительные ресурсы не могут быть удовлетворены, вы можете расширить его вычислительные возможности, просто добавив машины.
Первоначальное намерение MapReduce заключалось в том, чтобы обеспечить возможность развертывания программ на дешевых ПК, что требует от него высокой отказоустойчивости. Например, если одна из машин зависает, она может передать для выполнения вышеуказанную вычислительную задачу другому узлу, чтобы задача не вышла из строя. Более того, этот процесс не требует ручного участия и полностью выполняется Hadoop.
Он подходит для автономной обработки больших объемов данных уровня PB или выше. Он может реализовать одновременную работу тысяч кластеров серверов и предоставить возможности обработки данных.
MapReduce не может возвращать результаты в течение миллисекунд или секунд, как MySQL.
Входные данные потоковых вычислений являются динамическими, тогда как набор входных данных MapReduce является статическим и не может изменяться динамически. Это связано с тем, что конструктивные характеристики MapReduce определяют, что источник данных должен быть статическим.
Несколько приложений имеют зависимости, и входные данные последнего приложения являются выходными данными предыдущего приложения. В этом случае дело не в том, что MapReduce нельзя использовать, но после использования выходные результаты каждого задания MapReduce будут записываться на диск, что приведет к большому количеству дисковых операций ввода-вывода, что приведет к очень низкой производительности.
Справочная статья: https://www.jianshu.com/p/a61fd904e2c5. cnblogs.com/liugp/p/16101242.html cnblogs.com/ttzzyy/p/12323259.html blog.csdn.net/qq_38414334/article/details/12047614