Привет всем, это технологическое сообщество 857.
Сегодня я хотел бы поделиться с вами концептуальной статьей о Spark объемом 10 000 слов, которую легко понять новичкам. Эта статья стремится быть краткой и простой для понимания. Я надеюсь, что это поможет новичкам начать обучение, начиная с базовых концепций и затем углубляя принципы, чтобы они могли легко освоить Spark от более поверхностного к более глубокому.
Большие данные, искусственный интеллект ( Artificial Intelligence ), как нефть и электроэнергетика в прошлом, влияет на все отрасли с беспрецедентной широтой и глубиной. Основными барьерами нынешних и будущих компаний являются основные конкурентные преимущества, возникающие в результате конкуренции, основанной на искусственном интеллекте.
Spark на сегодняшний день является самой активной, популярной и эффективной вычислительной платформой в области больших данных.
Родился в 2009 году в AMP, Калифорнийский университет, Беркли, США. лаборатория,
Открытый исходный код выпущен под лицензией BSD в 2010 году.
В 2013 году он сделал пожертвование в Apache Software Foundation и переключил лицензионное соглашение с открытым исходным кодом на Apache2.0.
В феврале 2014 года Spark стал проектом Apache верхнего уровня.
ноябрь 2014 г., Команда Databricks, материнской компании Sparkiz, использует Spark для обновления мирового рекорда по сортировке данных
Spark успешно создал интегрированную, диверсифицированную и крупномасштабную систему обработки данных. В любом масштабе изданные расчеты, Spark имеет больше преимуществ в производительности и масштабируемости.
(1) Hadoop Дуг Каттинг отметил: используйте of MapReduce engine for Big Data projects will decline, replaced by Apache Spark (Большой проект данных, использующий движок MapReduce, будет прекращен на базе Apache. Spark заменять)
(2)Hadoop Коммерческое распространение от лидера рынка Cloudera 、HortonWorks 、MapR Все они перешли на Spark и считали Spark лучшим выбором и основной вычислительной системой для крупномасштабных решений.
из такого бенчмарктеста в 2014 году, Spark мгновенно убивает Hadoop , в случае использования одной десятой вычислительных ресурсов, в том же порядке, Spark в 3 раза лучше MapReduceбыстрый! Официального ПБ нет. Сортировка вернее, чем в других случаях, в первый раз будет S park Перенесено в IPB данные(Десять триллионов записей) из сортировки, используя 190 узлов, рабочая нагрузка была выполнена за 4 часа, Это также намного превышает предыдущий рекорд Yahoo в 16 часов при использовании 3800 хостов.
В июне 2015 года наибольшее количество узлов Spark поступило от Tencent — 8000 узлов, а наибольшее количество одиночных работ — у Alibaba и Databricks — 1PB, что шокирует! При этом количество SparkизContributors утроилось с 2014 года до 730: общее количество строк кода также увеличилось более чем вдвое с 2014 года до 40 строк.
ИБМ в 2015 году 6 класс Ежемесячное обязательство по улучшению Apache Spark проект, Его еще называют проектом «Должен»: возглавляемый данными, это будет самый важный новый проект с открытым исходным кодом в ближайшие десять лет. Это-обещание из Кореда Искроет Встроить IBM Ведущая в отрасли аналитическая бизнес-платформа, интегрирующая Spark как услуга,существоватьIBMBПредоставляется клиентам на платформе。IBMбудет инвестировать больше, чем3500знаменитое исследованиеиразвиватьперсоналсуществоватьпо всему миру10Создано более 10 лабораторий.иSparkСвязанныйизпроект,и будетSparkЭкосистема с открытым исходным кодом бесплатно обеспечивает прорывыизтехнология машинного обучения–IBM СистемаML. В то же время IBM также обучит более 1 миллиона ученых и инженеров по данным Spark.
В 2016 году на всемирно известной сортировке, известной как «Компьютерные олимпиады». В глобальном рейтинговом соревновании Benchmark NADSort, участвующая команда, состоящая из Лаборатории компьютерных наук и технологий PASA Нанкинского университета и компании Alibaba Databricks, завершила процесс сортировки стандартного набора объемом 100 ТБ стоимостью 144 доллара США, установив новый рекорд Стоимость сортировки за ТБ составляет 1,44 доллара США (последний мировой рекорд) по сравнению с 2014 годом. Выиграв чемпионат в 2017 году, команда TritonSort из Калифорнийского университета в Сан-Диего стоила 4,51 доллара за ТБ и снизила стоимость почти на 70%. Однако в этом соревновании по-прежнему используется Apache. Платформа вычислений больших данных Spark провела множество оптимизаций в крупномасштабном алгоритме параллельной сортировки и нижнем уровне системы Spark, чтобы улучшить производительность вычислений сортировки и максимально снизить нагрузку на ресурсы хранения, чтобы гарантировать, что она в конечном итоге выиграет конкуренцию. .
Под руководством идеалов FullStack Spark in Spark SQL 、SparkStreaming 、MLLib 、GraphX Пять основных подплатформ и библиотек R могут беспрепятственно обмениваться операциями с данными, Это не только создает непревзойденные преимущества Spark перед другими вычислительными платформами в современной большой вычислительной области, И Spark постепенно становится предпочтительной вычислительной платформой для крупных вычислительных центров.
Причина 1: Отличные изданные модели и вычислительная абстракция.
Spark До его появления уже существовали очень зрелые вычислительные системы, такие как MapReduce, которые предоставляли API высокого уровня (map/reduce) для выполнения вычислений в системе и обеспечения отказоустойчивости, тем самым достигая распределенных вычислений.
Хотя MapReduce обеспечивает абстракцию доступа и вычислений, повторное использование в файловой системе так же просто, как запись промежуточных данных в стабильную файловую систему (например, H DFS), поэтому будет генерироваться резервное копирование данных, дисковый ввод-вывод и сериализация данных, поэтому эффективность будет очень низкой при возникновении необходимости повторного использования промежуточных результатов операций между несколькими вычислениями. Этот тип операций очень распространен, например, итеративные вычисления, интерактивный анализ, графовые вычисления и т. д.
Осознав эту проблему, академическое сообщество AMPLab предложило новую модель под названием RDD. RDD — это отказоустойчивая и параллельная структура (на самом деле ее можно понимать как распределенную коллекцию, и ее работа так же проста, как и работа с локальной коллекцией. Она позволяет пользователям явно преобразовывать промежуточные результаты). Набор данных хранится в памяти и позволяет управлять набором данных и разделять его для оптимизации обработки хранения данных. В то же время RDD также предоставляет богатые возможности. API (map, уменьшить, foreach, уменьшитьByKey...) для работы с набором данных. позже RDD используются AMPLab в программе под названием Spark Предоставляется в рамках и с открытым исходным кодом.
Короче говоря, Spark был разработан на основе идей MapReduce, сохранив его преимущества распределенных параллельных вычислений и улучшив его очевидные недостатки. Разрешение хранить промежуточные данные в памяти повышает скорость работы и предоставляет богатый API-интерфейс для работы с данными для повышения скорости разработки.
Причина 2: Улучшить экосистему
В настоящее время Spark превратился в набор нескольких подпроектов, включая SparkSQL, Spark Streaming, GraphX, MLlib и другие подпроекты
Spark Ядро: реализует основные функции Spark, включая такие модули, как RDD, планирование задач, управление памятью, восстановление после ошибок и взаимодействие с системой хранения.
Spark SQL: Spark используется для работы с пакетами структурированных данных. нанести Искру SQL, мы можем использовать данные операций SQL.
Spark Потоковая передача: Spark предоставляет компоненты потоковых вычислений в реальном времени. Предоставляет API для работы с потоками данных.
Spark MLlib: предоставляет общие функции и библиотеки машинного обучения (ML). Включает классификацию, регрессию, кластеризацию, совместную фильтрацию и т. д., а также предоставляет дополнительные функции поддержки, такие как оценка модели и импорт данных.
GraphX (Graph Computing): API Spark для графовых вычислений, имеет хорошую производительность, имеет богатые функции и операторы и может свободно запускать сложные графовые алгоритмы на огромных объемах данных.
Менеджер кластера: Spark предназначен для эффективного масштабирования вычислений между одним вычислительным узлом и тысячами вычислительных узлов.
Дальнейшее чтение:Spark VS Hadoop
★Уведомление:
Хотя Spark имеет большие преимущества перед Hadoop, Spark не может полностью заменить Hadoop. Spark в основном используется для замены вычислительной модели MapReduce в Hadoop. HDFS по-прежнему можно использовать для хранения, но промежуточные результаты можно хранить в памяти; также можно использовать встроенное планирование Spark или более зрелые системы планирования, такие как YARN.
Фактически, Spark хорошо интегрирован в экосистему Hadoop и стал ее важным членом. Он может реализовать управление планированием ресурсов с помощью YARN и обеспечить распределенное хранилище с помощью HDFS.
Кроме того, Hadoop может использовать дешевые и гетерогенные машины для распределенного хранения и вычислений, однако аппаратные требования Spark несколько выше, а к его памяти и ЦП предъявляются определенные требования.
Apache Единая аналитическая система Sparkda для крупномасштабной обработки данных
Spark основан на вычислениях в памяти, что повышает производительность обработки в реальном времени в больших средах.
В то же время он обеспечивает высокую отказоустойчивость и высокую масштабируемость, позволяя пользователям развертывать Spark на большом количестве оборудования для формирования кластера.
Официальный сайт Искры
http://spark.apachecn.org
http://spark.apachecn.org
1.Spark Ядро: содержит основные функции Spark, особенно определение RDDAPI, операций и того и другого. Другие библиотеки Spark построены на основе RDD Spark. Ядро выше из.
2.Spark SQL:поставлятьпроходитьApache Hive и вариант SQL Hive Query Language (HiveQL) и Spark взаимодействуют с API. Каждая таблица базы данных рассматривается как RDD, Spark. SQL-запросы обрабатываются Конвертировать как Spark.
3.Spark Потоковая передача: верная потоковая передача данных в реальном времени для обработки и управления. Искра Потоковая передача позволяет программам обрабатывать данные в реальном времени так же, как обычные RDD.
4.MLlib: общая библиотека алгоритмов машинного обучения, алгоритм реализован как операция trueRDDизSpark. Эта библиотека содержит масштабируемые алгоритмы обучения, такие как классификация, регрессия и т. д., которые требуют больших наборов данных для итеративных операций.
5. GraphX: граф управления, параллельная работа и расчет графа, набор алгоритмов и набор инструментов. GraphX расширяет RDD API, включая граф управления, подграф создания и все операции вершин на пути доступа.
Схема композиции архитектуры Spark выглядит следующим образом:
Фотографии доступны,Мастерда Спаркиз Главный узел управления,В реальной производственной среде будет несколько мастеров.,Активен только один Мастер. WorkerдаSparkизрабочий узел,Сообщать капитану об изменениях в собственных ресурсах и исполнительном статусе.,И примите Мастера из команды запускать Исполнитель или Водитель. Драйвер-приложение из драйвера, каждое приложение включает в себя множество мелких задач. Драйвер отвечает за упорядоченное выполнение этих небольших задач.
Исполнитель да Спаркиз рабочего процесса, контролируемый Рабочим, отвечает за конкретные задачи изосуществлять.
Мастер-Работник (соответствующие функции и отношения)
По всему Спаркластеру,Разделен на главный узел и рабочий узел.,один одновременнокластер Есть несколькоmasterузели Несколькоworkerузел。
1. Мастер: Мастер-нода, Должен-узел отвечает за управление рабочими узлами. Мы подаем заявки с главного узла, который отвечает за превращение последовательных задач в параллельные изучаиз набора задач, а также отвечает за обработку ошибок и т. д.;
2. Рабочий: подчиненный узел, Должен узел и главный узел связи, отвечающий за изучение задач и управляющий процессом-исполнителем. Это любой узел, на котором может выполняться код приложения. В автономной модели это означает издапроходить подчиненный файл. Конфигурация Рабочий узел в Spark. on Модель пряжи под узлом даNoteManager.
1.Application
Приложение относится к пользователю, пишущему приложение Spark.,ввключатьодинDriverФункцияизкодираспределенныйсуществоватькластерсередина НесколькоузелначальствобегатьизExecutorкод
2.Driver
Driverdasparkиздрайверный узел,Используется в миссии «Изучить искру» из основного метода.,Ответственный за реальностькодизосуществлять Работа。В основном отвечает за следующие задачи:
1) Преобразование пользовательской программы в задание
2) Планирование задач между исполнителями
3) Отследить ситуацию изосуществования Исполнителя
4)проходить UI Показать статус выполнения запроса
С точки зрения непрофессионала, Driver — это программа, которая управляет запуском всего приложения, также известная как класс Driver.
3.Executor
Понимание 1:
Spark В рабочем узле (Worker) в Executor есть процесс JVM, который отвечает за выполнение определенных задач (Task) в заданиях Spark. Задачи независимы друг от друга. Когда приложение Spark запущено, узел Executor работает одновременно и всегда существует на протяжении всего жизненного цикла приложения Spark. Если узел Executor выходит из строя или выходит из строя, приложение Spark может продолжить работу, а задачи на неисправном узле будут перенаправлены на другие узлы Executor для продолжения работы.
1) Отвечает за выполнение задач, составляющих приложение Spark, и передачу результатов процессу драйвера.
2) Они осуществляют себя из менеджера блоков (Block Manager) кэширует RDD, необходимые в пользовательских программах. Обеспечивает хранение в памяти. RDD кэшируется непосредственно в процессе Executor, поэтому задача может полностью использовать кэш для ускорения операций во время выполнения.
Понимание 2:
Изучение — это процесс, выполняемый на рабочем узле для определенного Приложения. Процесс Должен отвечает за выполнение определенных Заданий и сохранение данных в памяти или на диске. Каждое Приложение имеет свой собственный независимый пакет процессов-Исполнителей. Исполнитель размещается на рабочем узле, и каждый Worker На компьютере имеется один или несколько процессов-исполнителей. Каждый исполнитель содержит пул потоков, и каждый поток может содержать задачу. в соответствии Количество ядер ЦП на сExecutor позволяет каждый раз выполнять несколько задач параллельно с одним и тем же количеством ядер. Задача Задача — это конкретная задача программы «Изучить Spark». После того как исполнитель изучает выполнение задачи, он отправляет результат драйверу. Каждый исполнитель принадлежит одному и тому же приложению. Кроме того, у исполнителя также есть функция запроса кеширования в приложении. RDD Предоставляя хранилище в памяти, RDD кэшируется непосредственно в процессе-исполнителе, поэтому задачи могут в полной мере использовать кеш для ускорения операций во время выполнения.
Когда мы выполняем операции сохранения, такие как кэширование/сохранение в коде, в соответствии с Мы выбираем разные уровни персистентности. Расчет каждой задачи также будет сохранен в памяти процесса-исполнителя или в файле диска узла, где он расположен.
Таким образом, память Исполнителя в основном разделена на три блока: первый блок позволяет задаче изучать собственный код. По умолчанию на ее долю приходится 20% всей памяти Исполнителя, второй блок позволяет выполнить задачу. После того, как процесс перемешивания извлекает выходные данные задачи предыдущего этапа, он используется для агрегации и других операций. По умолчанию он занимает 20% от общего объема памяти Исполнителя. Третий блок используется при сохранении RDD и учитывает. По умолчанию 60% от общей памяти Исполнителя.
задача изосуществлять скорость с каждым процессом-исполнителем из ЦП Количество ядер напрямую связано. процессор Ядро может иметь только один поток одновременно. Каждому процессу-исполнителю назначается несколько задач, и каждая задача выполняется в одном потоке, причем несколько потоков выполняются одновременно. Если процессор Количество ядер относительно достаточно, и количество задач, выделяемых на него, является разумным. Вообще говоря, эти потоки задач могут выполняться относительно быстро и эффективно.
4.Cluter Manager
Менеджер кластера, относится к изда получения ресурсов из внешних сервисов на кластере. На данный момент существует три типа:
1)Standalone : Управление собственными ресурсами Spark, мастер отвечает за распределение ресурсов, легко построить кластер
2)Apache Mesos:Универсальныйизкластеруправлять,иhadoop MR-совместимостьхорошая — платформа планирования ресурсов, на которой можно запускать Hadoop. MapReduceи некоторые сервисные приложения
3)Hadoop Yarn: Main да относится к Yarn изResourceManager.
Когда кластер не особенно велик и одновременно не работают карты MapReduce и Spark, использование автономной модели является наиболее эффективным.
5. Задача
Реальный расчет из части. Stage эквивалентен TaskSet.,Каждый этап содержит несколько задач,Отправьте каждое Задание каждому Исполнителю для расчета.
Логика обработки каждой задачи совершенно одинакова, изданные должны обрабатываться по-разному. То есть: мобильные компьютеры без мобильных данных.
Task действительно выполняет работу, поэтому не будет преувеличением сказать, что он косвенно определяет скорость работы программы Spark.
6. Работа
Sparkв соответствии Операция действия запускает отправку задания и использует операцию действия для разделения нашего кода на несколько заданий.
7. Этап (этап планирования)
В каждой работе будет соответствии При широкой зависимости Job будет разделяться на несколько этапов (включая ShuffleMapStage и ResultStage).
Соответствующие отношения между заданием, этапом и задачей следующие:
8.DAGScheduler
в соответствии сJob build основан на StageизDAG (Directed Acyclic Ациклический граф, ориентированный на граф), и отправьте сцену в TASKScheduler. Его разделение Stageiz основано на независимости отношений между dRDD, чтобы найти минимальные накладные расходы метода планирования.
Расширение (принцип разделения стадий DAG):
Spark В распределенной среде данные Раздел, Затем преобразуйте задание в DAG, и действовать поэтапно DAG из Scheduling и Task из Распределенной параллельной обработки. DAG отправляет расписание в DAGScheduler, Когда DAGScheduler планирует, он будет соответствии сда Нужно ли проходить процесс перемешивания? Работаразделять состоит из нескольких этапов.
Чтобы облегчить понимание принципа изDAGchedulerразделятьStage, давайте посмотрим на типичную диаграмму изDAGразделятьStage, как показано на рисунке.
На приведенном выше рисунке RDD a приезжать Между ShuffledRDD, а такжеUnionRDDприезжатьCoGroupedRDDмеждуизданныенужно пройтиShuffleпроцесс, Поэтому РОД a иUnionRDD соответственно даStage 1 Следуйте за этапом 3иStage 2 Следуйте за этапом 3изразделятьточка。иShuffledRDDприезжатьCoGroupedRDDмежду,а такжеRDD bприезжатьMappedRDDприезжатьUnionRDDиRDD c приезжатьUnionRDDмеждуизданные Нетнужно пройтиShuffleпроцесс。поэтому,ShuffledRDDиCoGroupedRDDиззависимостейузкие зависимости,Два RDD принадлежат одному и тому же Stage3.,Оставшееся разделение RDD составляет 2 этапа. Этап1 Этап2 дафазно независимый,Может работать параллельно. Этап3 зависит от текущих результатов Этапа1и Этапа2из,такStage3наконецосуществлять。
Видно, что в процессе планирования DAGScheduler этап Stage меняется в зависимости от того, имеет ли задание процесс Shuffle, то есть при наличии ShuffleDependency и широкой зависимости требуется Shuffle, и только тогда задание будет разделен на несколько этапов.
9.TASKSedulter
Концепция TaskScheduler:
Отправьте TaskSET работнику для запуска и назначьте здесь любую задачу, которую будет выполнять каждый исполнитель. TaskScheduler поддерживает все наборы задач. Когда Исполнитель отправляет контрольный сигнал драйверу, TaskScheduler будет это делать. соответствии с Остальные ресурсы распределяются соответственно из Task. Кроме того, TaskScheduler также сохраняет метки всех выполняемых задач и повторяет неудачные задачи.
TaskScheduler принцип:
1) Когда DAGScheduler отправляет Taskset базовому планировщику, он сталкивается с интерфейсом TaskScheduler, который соответствует принципу объектно-ориентированной абстракции зависимостей и обеспечивает возможность подключения базового планировщика ресурсов. В результате Spark может работать на многих серверах ресурсов. Например: Автономный, Yarn, Mesos, local. EC2 и другие планировщики ресурсов.
2) При создании экземпляра SparkContext используется createTaskScheduler. TaskSchedulerImpliStandaloneSchedulerBackend. TaskSchedulerImplizinitializeметод Поместите StandaloneSchedulerBackend. Передайте его и назначьте TaskSchedulerImplizbackend, он будет вызываться, когда TaskSchedulerImple вызывает startметодиз; backend.startметод。
3) TaskScheduler из основной задачи отправляет операцию TaskSet приехатькластер и сообщает о результатах.
a) Поддерживать TaskSetManager для TaskSetсоздавать и отслеживать местоположение задач и информацию об ошибках;
b)сталкиватьсяприезжать StraggleЗадача будет выпущенаприезжатьдругойизузелруководить Попробуйте еще раз。
в) в DAYScheduler Отчет о ситуации, включая получение отчетов при потере выходных данных Shuffle ошибка неудачи и другая информация.
4) TaskScheduler будет хранить SchedulerBackend внутри себя. С точки зрения Standalone и модели, он конкретно реализует StandaloneSchedulerBackend.
5) StandaloneSchedulerBackend создает цикл сообщений ClientEndpoint при запускеизсоздавать экземпляр StandaloneAppClient, а когда экземпляр Должен начинаетиззапускать, ClientEndpoint регистрирует текущую программу в Master при запуске. Родительский класс StandaloneSchedulerBackendизCoarseGrainedSchedulerBackend при запуске создаст экземпляр тела цикла сообщений типа DriverEndpoint. StandaloneSchedulerBackend конкретно отвечает за сбор информации о ресурсах в Workers. Когда ExecutorBackendзапускает, он отправит информацию RegisteredExecutor для регистрации в DriverEndpoint. В это время StandaloneSchedulerBackend знает, что у текущего приложения есть вычислительные ресурсы, поэтому он может StandaloneSchedulerBackend располагает вычислительными ресурсами для выполнения задачи.
6) SparkContext, DAGScheduler, TaskSchedulerImpl и StandaloneSchedulerBackend создаются только один раз в процессе приложения, и эти объекты всегда будут существовать в течение всего существования приложения.
На следующем рисунке показана роль TaskScheduler:
● Быстро
иHadoopизMapReduceComparison,Операции Spark с памятью выполняются более чем в 100 раз быстрее.,
Расчет на основе жесткого диска происходит более чем в 10 раз быстрее. Spark реализует эффективный движок изDAGосуществлять,
поток может эффективно обрабатывать потоки данных на основе памяти.
● Простота использования (множество алгоритмов)
MR поддерживает только один вычислительный алгоритм, а Spark — несколько алгоритмов.
Spark поддерживает Java, Python, RиScalaизAPI и более 80 продвинутых алгоритмов.
Позволяет пользователям быстро создавать различные приложения. А Spark поддерживает интерактивность из Python иScalaиз. shell,
В этих оболочках очень удобно использовать Sparklaster для проверки решения вопросизметода.
● Универсальный
Spark предлагает единое решение. Spark можно использовать для пакетной обработки.、Интерактивный запрос (Spark SQL)、
Обработка потока в реальном времени (Spark потоковая передача), машинное обучение (Spark MLlib) и графовые вычисления (GraphX).
Эти различные типы обработки могут легко использоваться в одном приложении. Искра Унифицированное решение очень привлекательно,
Ведь любая компания желает использовать единыйизплатформа для обработкисталкиватьсяприезжатьизвопрос,Меньшеразвивать техническое обслуживание с учетом человеческих затрат и развертывания платформы с учетом материальных затрат.
● Совместимость
Spark можно легко интегрировать с другими продуктами с открытым исходным кодом. Например, Spark может использовать Hadoop и зYARN и Apache. Mesos, как управление ресурсами и планировщик,
И может обрабатывать всю поддержку Hadoop, включая HDFS, HBase и Cassandra и т. д.
Это особенно важно для пользователей, которые уже развернули Hadoop, поскольку они могут использовать мощные возможности обработки Spark без выполнения миграции.
Spark также не полагается на сторонние средства управления ресурсами и планировщики. Он реализует автономный режим в качестве встроенной среды управления ресурсами и планирования.
Это еще больше снижает порог использования Spark, делая его развертывание и использование очень простым для всех. Spark。
Кроме того, Spark также предоставляет инструменты для развертывания Standalone изSparkкластериз на EC2.
1.local Локальная модель (автономная) – использование улучшения
Разделяется на локальный однопоточный и локально-кластерный многопоточный.
2.режим автономного независимого кластера – используется для разработки и тестирования.
Типичная модель «Master/Slave»
3.standalone-HA Режим высокой доступность – для использования в производственных условиях
На основе автономной модели используйте zk для обеспечения высокой доступности, чтобы избежать единой точки отказа в Masterdaiz.
4.on yarn режим кластера – для использования производственной среды
При работе на пряже Yarn отвечает за управление ресурсами, а Spark — за планирование и расчет задач.
Преимущества: вычислительные ресурсы можно масштабировать вверх и вниз по требованию, обеспечивая высокий уровень использования, общее базовое хранилище и избежание трансграничной миграции.
5.on mesos режим кластера - реже используется внутри страны
Работая на платформе менеджера ресурсов mesos, mesos отвечает за управление ресурсами, а Spark — за планирование и расчет задач.
6.on cloud режим кластера – В будущем малые и средние компании будут больше использовать облачные сервисы
Например, AWS изEC2, используя эту модель, может легко получить доступ к Amazon изS3.
режим автономного независимого кластера – используется для разработки и тестирования.
Sparkda основана на вычислениях в памяти и структуре параллельных вычислений с большими данными.
На практике использование режима при выполнении вычислительных задач совершенно необходимо. кластера,
Итак, давайте сначала узнаем о встроенном автономном режиме Spark. кластера Узнайте о его архитектуре и механизме работы.
Автономный кластер использует модель «главный-подчиненный» в распределенных вычислениях.
мастердакластер содержит главный процесс из узла
Рабочий узел в рабакластере содержит процесс Executor9.
Схема архитектуры Spark выглядит следующим образом (сначала разберитесь):
node01:master
node02:slave/worker
node03:slave/worker
1) Измените файл конфигурации Spark (введите распакованный каталог Spark).
cd /export/servers/spark/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
#Конфигурацияjavaпеременные среда(Если вы ранее проходили Конфигурацию, то переезжать не нужно)
export JAVA_HOME=/export/servers/jdk1.8
#specifiedspark MasterизIP
export SPARK_MASTER_HOST=node01
#specifiedspark Мастер из порта
export SPARK_MASTER_PORT=7077
mv slaves.template slaves
vim slaves
node02
node03
2) Настройте переменные среды Spark (рекомендуется не добавлять их во избежание конфликтов с командами Hadoop)
Воляsparkдобавить вприезжатьпеременные среды,добавить вк Вниз Внутри Позволятьприезжать /etc/profile
export SPARK_HOME=/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0
export PATH=$PATH:$SPARK_HOME/bin
Уведомление:
hadoop/sbin из Оглавлениеи spark/sbin Могут быть конфликты команд:
start-all.sh stop-all.sh
Решение:
1. Ставим один из кадров изsbin из переменных удален из среды;
2.Изменить имя hadoop/sbin/start-all.sh Изменить на: start-all-hadoop.sh
3) Распространите файл конфигурации на другие машины с помощью команды scp.
scp -r /export/servers/spark node02:/export/servers
scp -r /export/servers/spark node03:/export/servers
scp /etc/profile root@node02:/etc
scp /etc/profile root@node03:/etc
source /etc/profile обновить Конфигурация
Запуск и остановка кластера
На главном узле запускаетsparklaster
/export/servers/spark/sbin/start-all.sh
Остановить спаркластер на главном узле.
/export/servers/spark/sbin/stop-all.sh
Запуск и остановка индивидуально
На главном узле установки начать и остановитьmaster:
start-master.sh
stop-master.sh
На узле, где находится Мастер и stopworker (работа относится к имени хоста в файле)
start-slaves.sh
stop-slaves.sh
нормальныйзапускатьsparkкластерназад,Проверятьsparkизwebинтерфейс,Проверять сопутствующую информацию.
http://узел01:8080/
нуждаться
использоватьрежим кластера Запустите программу Spark для чтения файлов на HDFS и изученияWordCount
Запуск искровой оболочки в режиме кластера
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-shell --master spark://node01:7077
Запустить программу
sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.saveAsTextFile("hdfs://node01:8020/wordcount/output2")
SparkContext web UI
http://node01:4040/jobs/
Уведомление
режим кластера запускается на кластере, не читайте локальные файлы напрямую, вам следует прочитать на hdfs из Поскольку программа работает на кластере, мы не знаем, на каком узле она работает. На других узлах может не быть этого файла данных.
Spark Вырезан фреймворк StandaloneclusterdaMaster-Slaves Как и в большинстве структур Master-Slave, у Master есть одна точка отказа.
Чтобы решить эту единственную точку отказа, Spark предлагает два решения:
1. Одноточечное восстановление на основе файловой системы (Single-Node). Recovery with Local File System) — может использоваться только в средах разработки или тестирования.
2.На базе Zookeeperиз Standby Masters(Standby Masters with ZooKeeper) — может использоваться в производственных средах.
Должен HA Решение очень простое в использовании. Сначала запустите. ZooKeeper кластер, а затем запустить на разных узлах Мастер, Уведомление Эти узлы должны иметь одинаковые zookeeper конфигурация.
1) Сначала остановите Спраккластер
/export/servers/spark/sbin/stop-all.sh
2)существоватьnode01начальство Конфигурация:
vim /export/servers/spark/conf/spark-env.sh
3) Закомментировать Мастер Конфигурация
#export SPARK_MASTER_HOST=node01
4) Добавьте SPARK_DAEMON_JAVA_OPTS в spark-env.sh со следующим содержимым:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181
-Dspark.deploy.zookeeper.dir=/spark"
параметриллюстрировать
spark.deploy.recoveryMode: восстановить модель
spark.deploy.zookeeper.url: ZooKeeperиз Адрес сервера
spark.deploy.zookeeper.dir: сохраняет файлы и каталоги данных элементов кластера. Включая информацию о работнике, драйвере и приложении.
zkServer.sh status
zkServer.sh stop
zkServer.sh start
1)node01начальствозапускатьSparkкластеросуществлять
/export/servers/spark/sbin/start-all.sh
2) Запустите отдельный мастер на узле 02:
/export/servers/spark/sbin/start-master.sh
3)Уведомление:
существоватьобычномодель Вниззапускатьsparkкластер
Просто нужно изучитьstart-all.sh на главном узле Вот и все
существовать Режим высокой доступности Вниззапускатьsparkкластер
Сначала вам нужно запустить-all.sh на любом мастер-ноде.
Затем запустите отдельный изучитьstart-master.sh на другом главном узле.
4)Проверятьnode01иnode02
http://node01:8080/
http://node02:8080/
Вы можете заметить, что приезжать имеет статус StandBy.
Активное и резервное тестовое переключение
1) Используйте идентификатор процесса jpsПроверятьmaster на узле 01.
2) Используйте убийство -9 Идентификационный номер приводит к завершению процесса Должена.
3) Подождите некоторое время, обновите веб-интерфейс node02 и обнаружите, что node02 активен.
1.Запуск в режиме кластераspark-shell
/export/servers/spark/bin/spark-shell --master
spark://node01:7077,node02:7077
2.Запустить программу
sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.saveAsTextFile("hdfs://node01:8020/wordcount/output3")
Официальная документация http://spark.apache.org/docs/latest/running-on-yarn.html.
1. Устанавливаем запускать Hadoop (требуется HDFS и YARN, уже ок)
2. Установите автономную версию Spark (уже ок)
Уведомление: В кластере нет необходимости, поскольку отправка программы Spark в YARN для запуска — это, по сути, передача байт-кода в кластер YARN и из JVM для запуска.
Но мне нужно что-то, что поможет мне отправлять задачи в YARN, поэтому мне нужна автономная версия Spark.
В нем есть команда искровой оболочки и команда искровой отправки.
3. Измените конфигурацию:
в искровом-env.sh , добавьте HADOOP_CONF_DIRКонфигурация, укажите расположение файла Hadoop
vim /export/servers/spark/conf/spark-env.sh
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
1)иллюстрировать
В большинстве корпоративных производственных сред дакластер развертывается для запуска приложений Spark.
Spark On YARNизClusterмодель Относится к программе издаDriver, работающей на YARNкластере.
2) Что добавить в Драйверда:
Запустите приложение из функции main() и создайте процесс SparkContextiz.
3)Иллюстрация
4) Запустите пример программы
Spark-Shellдапростой изиспользуется для тестированияинтерактивного окна
spark-submit используется для отправки задач в jar-пакеты.
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10
5)Проверятьинтерфейс
http://node01:8088/cluster
1)иллюстрировать
Используйте его при изучении теста, а не развития, просто поймите его.
Spark On YARNизClientмодель Относится к программе издаDriver, работающей на клиенте, который отправляет задачу.
2)Иллюстрация
3) Запустите пример программы
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--мастер пряжи \
--клиент режима развертывания \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue по умолчанию \
/export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \
10
Самая существенная разница между режимами Кластера и Клиента заключается в том, где запускается программа-драйвер!
В кластере YARN работает модель даCluster,
Запуск на клиенте — даClientмодель
Конечно, есть и различия, выходящие за пределы существенных различий. Можно просто вкратце упомянуть несколько моментов во время собеседования.
кластерная модель: используйте Долженмодель в производственной среде.
1. Программа Драйвер находится в YARNкластере.
2. Результаты запуска приложения не могут отображаться на клиенте.
3. В процессе запуска ApplicationMaster под Долженмоделью, если появится вопрос, Yarn перезапустит ApplicationMaster (Драйвер)
clientмодель:
1. Драйвер работает на клиенте и в процессе SparkSubmit.
2. Результаты запуска приложения будут отображаться на клиенте.
spark-shell да Spark Встроенный интерактивный Shell Программа, которая удобна для пользователей для выполнения интерактивного программирования. Пользователи могут использовать ее из командной строки Должен. scala писать spark Программа, пригодная для использования при подготовке к тестам!
Пример:
искровая оболочка может содержать параметры
spark-shell --master local[N] Число N указывает, что локально моделируется N потоков для выполнения текущей задачи.
spark-shell --master local[*] * означает использование всех доступных ресурсов на текущем компьютере
По умолчанию да--master используется без параметров. local[*]
spark-shell --master spark://node01:7077,node02:7077 Указывает на работу в кластере
spark-submit команда для отправки jar гарантированный spark Кластер/ПРЯЖА spark-shell Интерактивное программирование действительно нам очень удобно изучать, но на практике мы обычно используем IDEA развивать Spark заявление с печатью jar Пакет передается в Spark Кластер/ПРЯЖА выполнить. Искра-отправка Команда да мы развиваемся часто использует из!!!
Пример:вычислитьπ
компакт-диск /экспорт/серверы/искра
/export/servers/spark/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
/export/servers/spark/examples/jars/spark-examples_2.11-2.2.0.jar \
10
Форма основного параметра
Другие примеры параметров
--master spark://node01:7077 Укажите адрес Мастера
--name "appName" Укажите имя запуска программы
--class Программа измейнметод находится в классе
--jars xx.jar Программа дополнительно использует пакет изjar
--driver-memory 512m Память, необходимая для запуска драйвера, По умолчанию 1 г
--executor-memory 2g Укажите доступную память для каждого исполнителя как 2g, По умолчанию 1 г
--executor-cores 1 Укажите количество доступных ядер для каждого исполнителя
--total-executor-cores 2 Укажите задачу запуска всего кластера с помощью iscup Количество ядер — 2.
--queue default Укажите задачу изверно в столбце
--deploy-mode Укажите действующую модель (клиент/кластер)
Уведомление:
еслиworkerузелиз Недостаточно памяти,Затем, когда запускаемspark-submitiz,Вы не можете выделить исполнителю больше памяти, чем доступно рабочему процессу.
Если --executor-cores превышает количество доступных ядер на одного работника, задача находится в состоянии ожидания.
Если --total-executor-cores превышает доступные ядра, по умолчанию используются все ядра. В дальнейшем, когда другие ресурсы будут освобождены, они будут использоваться программой Должен.
Если недостаточно памяти или один исполнитель из ядер, запускspark-submit сообщит об ошибке, задача находится в состоянии ожидания и не может нормально работать.
RDD(Resilient Distributed Dataset) называется устойчивым распределенным набором данных. ,да Spark Самая базовая абстракция данных в , представляющая неизменяемую разделяемую коллекцию, элементы которой можно вычислять параллельно. 。
демонтаж слова
Resilient : Он гибкий, и RDD можно хранить в памяти или на диске.
Distributed : содержит элемент распределенного хранилища, который можно использовать для распределенных вычислений.
Dataset: Это коллекция, которая может хранить множество элементов.
Во многих итерационных алгоритмах (таких как машинное обучение)、графовый алгоритмждать)и Интерактивный майнинг данныхсередина,Промежуточные результаты повторно используются на разных этапах расчета.,Прямо сейчасодинэтапизвыходрезультатвстречаделатьдля Внизодинэтапизвходить。нода,Ранее в платформе MapReduce использовалась модель нециклических изданных потоков.,ПучоксерединамеждурезультатписатьприезжатьHDFSсередина,принес многоизданныекопировать、дискIOисериализациянакладные расходы。И эти фреймворки могут поддерживать только некоторые конкретныеизвычислитьмодель(map/reduce),ибезиметьпоставлятьодиндобрый Универсальныйизабстракция данных。
AMP Labs опубликовала статью о RDD: «Устойчивый Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Вычисления》Чтобы решить эти проблемы
RDD предоставляет абстрактную изданную модель, поэтому нам не нужно беспокоиться о базовых распределенных характеристиках данных. Нам нужно только выразить конкретную логику приложения в виде серии операций конвертации (функции), которые входят в число операций конвертации между ними. разные РДД. Зависимости также можно формировать в пространстве, тем самым реализуя конвейер, тем самым избегая хранения промежуточных результатов, значительно сокращая накладные расходы на копирование данных и дисковый ввод-вывод, а также предоставляя больше изAPI (map/reduec/filter/groupBy...)
1)A list of partitions :
Набор разделов (Partition)/список разделов, то есть базовая единица раздела.
верно Для RDD каждый шард будет обрабатываться вычислительной задачей, а количество шардов определяет степень параллелизма.
Пользователи могут указать число сегментов RDD при созданииRDD. Если не указано, будет использоваться значение по умолчанию.
2)A function for computing each split :
Функция будет применена к каждому Разделу.
SparkсерединаRDDизвычислитьдак Разделкак единицаиз,computeфункциябудет затронутоприезжатькаждый Разделначальство
3)A list of dependencies on other RDDs:
RDD зависит от множества других RDD.
RDDиз будет генерировать новый изRDD каждый раз при Конвертировать, поэтому между RDD будет формироваться похожее. на: Линия сборки одинакова с передней и задней стороны. Когда часть Разделенных отсутствует, Spark может применить эту Зависимость и пересчитать недостающие из Разделданные вместо даверноRDDиз всех Разделов. (Искраизмеханизм отказоустойчивости)
4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):
В Spark одно да основано на хеше и зHashPartitioner, а другое да основано на диапазоне и зRangePartitioner.
верно Для типа КВ изRDD будет функция Partitioner, то есть RDDиз Разделфункция (опционально)
Только с ключом-значением и зRDD будет Partitioner, без ключ-значения и зRDD и зParititioner и з значением даNone. Функция Partitioner определяет номер самого RDD, а также определяет родительский RDD Перемешать вывод из Раздел количества.
5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):
Необязательно, список, в котором хранится каждая позиция раздела (предпочтительно location)。
Для файла HDFS в этом списке хранится расположение блоков каждого раздела. Согласно концепции «мобильные вычисления не так хороши, как мобильные вычисления», когда Spark выполняет планирование задач, он старается изо всех сил выбрать те узлы с рабочими узлами для расчета задач.
Подвести итог
Набор данных RDD,Не только выражает набор данных,Он также рассказал, откуда взялся этот набор данных.,Как рассчитать.
Основные атрибутывключать
1.много Раздел
2.вычислитьфункция
3.Зависимости
4.Разделфункция(по умолчаниюдаhash)
5. Лучшее расположение
1) Устанавливается внешней системой хранения данных создателем.,включатьместныйизфайловая система,Также имеются все изданные наборы поддержки Hadoop,напримерHDFS、Cassandra、HBaseждать
val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”)
2)проходить Уже естьизRDDпройтиоператор Конвертироватьгенерировать новыеизRDD
val rdd2=rdd1.flatMap(_.split(" "))
3) Из уже существующей коллекции Scala создателя.
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
или
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
Нижний уровень метода makeRDD вызывает метод распараллеливания.
2.2.1 Классификация
Операторы RDD делятся на две категории:
1)TransformationКонвертироватьдействовать:вернуть новыйRDD
2)Actionдействиедействовать:возвращаться Стоит ли оно того?даRDD(никтовозвращатьсяценность иливозвращатьсядругойиз)
Уведомление:
RDD на самом деле не хранит то, что на самом деле рассчитывается.,И да записал местоположение данныхиз,данныеизконвертировать отношение (то, что называется методом,Что будет)
Все данные в RDD обрабатываются с отложенной оценкой/задержкой, то есть они не рассчитываются напрямую. Только когда возникает запрос, результат передается Driveriz. Действие-действие, эти Конвертировать действительно будут запущены.
Причина использования ленивой оценки/задержки изучения,да Потому что это может сформировать ациклический граф, направленный DAG, во время операции Action RDD для параллельной оптимизации Stage.,Такая конструкция позволяет Spark работать более эффективно.
2.2.2 Оператор преобразования преобразования
Конвертировать | значение |
---|---|
map(func) | вернуть новый RDD,Должен RDD прошло через каждый входной элемент func функция Конвертироватьназад Группастановиться |
filter(func) | вернуть новый RDD,Должен RDD проходя мимо func Возвращаемое значение после вычисления функции: true Входные элементы состоят из |
flatMap(func) | Похоже на: карта, но каждый входной элемент может быть сопоставлен с 0 или несколько выходных элементов (поэтому func Долженвозвращаться должна быть последовательность, а не отдельный элемент) |
mapPartitions(func) | Похоже на: карту, но самостоятельно в RDD работает на каждом фрагменте T из RDD При работе функция func тип функции должен быть да Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | Похоже на: MapPartitions, но func Принимает целочисленный параметр, представляющий значение индекса сегмента, поэтому в типе T из RDD При работе функция func тип функции должен быть да(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | в соответствии с fraction Укажите соотношение верных данных для выборки, вы можете выбрать, использовать ли случайные числа для замены, начального числа. Используется для указания начального числа генератора случайных чисел. |
union(otherDataset) | к источнику RDD и параметры RDD Найдите союз после возврата новый RDD |
intersection(otherDataset) | к источнику RDD и параметры RDD Найдите перекрёсток после возврата новый RDD |
distinct([numTasks])) | к источнику RDD После дедупликации вернуть новый RDD |
groupByKey([numTasks]) | в a(K,V)из RDD Вызов выше, возвращает (K, Iterator[V])из RDD |
reduceByKey(func, [numTasks]) | в a(K,V)из RDD Вызов выше, возвращает (K,V)из СДР, использование указано из reduce функция будет такой же key изагрегирование значенийприезжать Вместе,и groupByKey Похоже на: уменьшить Задание «изчисло» можно задать, указав второй необязательный параметр «из». |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | Групповые операции. вызов groupByKey,Обычно используется。Похоже на: агрегат, тип эксплуатации изданный. |
sortByKey([ascending], [numTasks]) | в a(K,V)из RDD позвони, К. Должно быть достигнуто Ordered интерфейс, возвращает key Сортировать из(K,V)из RDD |
sortBy(func,[ascending], [numTasks]) | и sortByKey аналогично,нодаболее гибкий |
join(otherDataset, [numTasks]) | В типе (K,V)и(K,W)из RDD Вызывается, возвращает то же самое key верно, все элементы верны вместе из(K,(V,W))из RDD |
cogroup(otherDataset, [numTasks]) | В типе (K,V)и(K,W)из RDD Вызов выше, возвращает (K,(Итерируемый,Итерируемый))Типиз RDD |
cartesian(otherDataset) | Декартово произведение |
pipe(command, [envVars]) | Трубопроводные операции на РДД |
coalesce(numPartitions) | уменьшать RDD из Разделчислоприезжать Укажите значение。существовать Большой фильтрколичестводанные Изназад,Ты можешь сделать это |
repartition(numPartitions) | Перераспределение RDD |
2.2.3 Оператор действия действия
действие | значение |
---|---|
reduce(func) | проходить func агрегирование функций RDD Для всех элементов из эта функция должна быть коммутативной и параллельной. |
collect() | в драйвере,Набор из всех элементов массива имеет форму возникших |
count() | в драйвере,Набор из всех элементов массива имеет форму возникших |
first() | возвращаться RDD из Первыйэлементы(Похоже на: take(1)) |
take(n) | возвращатьсяодин Зависит отданныенаборизвперед n элементы, состоящие из массива |
takeSample(withReplacement,num, [seed]) | Получив группу чисел, массив Должен состоит из случайных выборок из набора данных из num элементов, вы можете выбрать, заменять ли недостающую часть случайными числами, начальным числом Используется для указания начального числа генератора случайных чисел. |
takeOrdered(n, [ordering]) | возвращатьсяестественный порядокили Индивидуальный заказизвперед n элементы |
saveAsTextFile(path) | Замените набор данных из элемента на textfile изформадержатьприезжать HDFS Файловая система другой поддерживает файловую систему, верно для каждого элемента, Spark будет вызван toString метод, конвертируем его в текст из файла |
saveAsSequenceFile(path) | Сосредоточьте данные из элементов на Hadoop sequencefile из Форматдержатьприезжатьобозначениеиз Оглавление Вниз,Можеткделать HDFS или другой Hadoop Поддержка файловой системы. |
saveAsObjectFile(path) | Установите данные элемента на Java сериализацияиз Способдержатьприезжатьобозначениеиз Оглавление Вниз |
countByKey() | Иголкаверно(K,V) Тип из РДД, получился (К,Int)из карта,представляющая каждый key верноотвечатьизэлементчисло。 |
foreach(func) | существоватьданныенаборизкаждыйэлементыначальство,функция запуска func Сделайте обновление. |
foreachPartition(func) | Для каждого набора данных из каждого раздела запустите функцию func. |
Статистические операции
оператор | значение |
---|---|
count | число |
mean | иметь в виду |
sum | Сумма |
max | максимальное значение |
min | минимальное значение |
variance | дисперсия |
sampleVariance | Вычислить дисперсию по выборкам |
stdev | Стандартное отклонение: мера дисперсии |
sampleStdev | Выборка по стандартному отклонению |
stats | Посмотреть статистические результаты |
2.3.1 Подготовка
Запуск в режиме кластера
запускатьSparkкластер
/export/servers/spark/sbin/start-all.sh
запускатьspark-shell
/export/servers/spark/bin/spark-shell \
--master spark://node01:7077 \
--executor-memory 1g \
--total-executor-cores 2
Или запустите в локальном режиме
/export/servers/spark/bin/spark-shell
2.3.2 WordCount
val res = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//Приведенный выше код сразу изучать не буду, потому что все да Transformation Конвертироватьдействовать //Следующий код действительно будет отправлен и изучен, потому что да Action действие/операция действия
res.collect
2.3.3 Создание СДРД
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
val rdd2 = sc.makeRDD(List(5,6,4,7,3,8,2,9,1,10))
2.3.4 Проверять Должен RDD из Разделчислоколичество
sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).partitions.length
//Количество разделов не указано, значение по умолчанию да2
sc.parallelize(List(5,6,4,7,3,8,2,9,1,10),3).partitions.length
//Количество разделов указано равным 3
sc.textFile("hdfs://node01:8020/wordcount/input/words.txt").partitions.length
//2
РДД Разделизданные зависит от каких факторов?
RDDРазделизв принципедаделать Разделизчислостарайся изо всех силждать ВкластерсерединаизCPUосновной(core)числоглаз,так Можеткдостаточная прибыльиспользоватьCPUизвычислитьресурс,нодасуществоватьдействительныйсерединадля Понятно ДажедобавлятьполныйизнажиматьCPUизвычислитьресурс,установит параллелизм наcpuядерныйчислоиз2~3раз。RDDРазделчислоизапускатьчасобозначениеизядерныйчисло、Укажите количество из Разделов при вызове метода、например, сам файл Разделчисло Это как-то связано с
Принцип зонирования
1) При указании номера ядра ЦП определяется значение параметра:
spark.default.parallelism=Укажите количество ядер процессора (режим кластерасамый маленький2)
2) метод VernoВScala setparallelize(set,sectionNumber),
Если количество разделов не указано, используйте spark.default.parallelism,
Если указано, используйте указанный номер раздела (не указывайте число больше, чем spark.default.parallelism)
3)vernoВtextFile(использовать, номер раздела) defaultMinPartitions
Если количество разделов не указано, sc.defaultMinPartitions=min(defaultParallelism,2)
Если указано, используйте указанный номер из Раздел sc.defaultMinPartitions=Укажите номер из Раздел.
рдд из раздела
верно для локальных файлов:
номер рддиз Раздел = max (количество локальных фрагментов файла, sc.defaultMinPartitions)
верно для файлов HDFS:
номер рддиз Раздел = max(hdfs файл из номера блока, sc.defaultMinPartitions)
Таким образом, если количество выделенных ядер многократно и данныесоздаватьRDD считываются из файла, даже если файл hdfs имеет только 1 слайс, окончательное количество изSparkizRDDизразделов может быть да2.
2.3.5 Различные значения и применение Конвертироватьоператориз
1)map
верноRDD из каждого элемента выполняет операции и начинает операции с результатов
//проходимпараллельно для генерации rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//каждый элемент vernordd1
rdd1.map(_ * 2).collect
//collectmethod означает сбор, операцию действия
2)filter
Уведомление:функциясерединавозвращатьсяTrueизодеяло Держать Вниз,появляется Falseиз отфильтровывается
val rdd2 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
val rdd3 = rdd2.filter(_ >= 10)
rdd3.collect
//10
3)flatmap
В верноRDD каждый элемент сначала сопоставляется, затем сглаживается, и, наконец, выполняется операция появления и результат
val rdd1 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”))
//Сначала вырезаем каждый элемент внутри rdd1, а затем выравниваем его
val rdd2 = rdd1.flatMap(_.split(’ '))
rdd2.collect
//Array[String] = Array(a, b, c, d, e, f, h, i, j)
4)sortBy
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
val rdd2 = rdd1.sortBy(x=>x,true)
// x=>x Указывает сортировку по самим элементам, True указывает на возрастающий порядок.
rdd2.collect
//1,2,3,…
val rdd2 = rdd1.sortBy(x=>x+"",true)
//x=>x+""значит в соответствии сxизнитьформасортировать Изменятьстановиться Понятнонить,Результат в лексикографическом порядке
rdd2.collect
//1,10,2,3…
5) Пересечение, объединение, разностное множество, Декартово. произведение
Типы уведомлений должны быть последовательными
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//union не будет удалять дубликаты
val rdd3 = rdd1.union(rdd2)
rdd3.collect
//удаляем дублирование
rdd3.distinct.collect
//Находим пересечение
val rdd4 = rdd1.intersection(rdd2)
rdd4.collect
//Находим набор различий
val rdd5 = rdd1.subtract(rdd2)
rdd5.collect
//Декартово произведение
val rdd1 = sc.parallelize(List(“jack”, "том"))//ученик
val rdd2 = sc.parallelize(List(“java”, “python”, "скала"))//Курс
val rdd3 = rdd1.cartesian(rdd2)//Указывает всех студентов и все выбранные курсы
rdd3.collect
//Array[(String, String)] = Array((jack,java), (jack,python), (jack,scala), (tom,java), (tom,python), (tom,scala))
6)join
соединение (внутреннее соединение) объединяет кортежи с одинаковым ключом и значением
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3)))
val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2)))
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9)))
Иллюстрация 1
val rdd4 = rdd1.leftOuterJoin(rdd2) //Левое внешнее соединение, оставляем все левое и оставляем только правое, если оно соответствует условиям
rdd4.collect
//Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(2))), (tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None)))
Иллюстрация 2
val rdd5 = rdd1.rightOuterJoin(rdd2)
rdd5.collect
//Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),2)), (tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7)))
val rdd6 = rdd1.union(rdd2)
rdd6.collect
//Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))
7)groupbykey
groupByKey() из функции да, верно группирует значения с одним и тем же ключом из.
Например, верно четыре значения ключа: верно("spark",1), ("spark",2), ("hadoop",3)и("hadoop",5),
использоватьgroupByKey()назадпридетсяприезжатьизрезультатда:(“spark”,(1,2))и(“hadoop”,(3,5))。
//Группируем по ключу
val rdd6 = sc.parallelize(Array((“tom”,1), (“jerry”,2), (“kitty”,3), (“jerry”,9), (“tom”,8), (“shuke”,7), (“tom”,2)))
val rdd7=rdd6.groupByKey
rdd7.collect
//Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)))
8) коллега [понимать]
cogroupда сначала группирует внутри RDD, затем группирует между RDD
val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect
// Array((tom,(CompactBuffer(1, 2),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
9)groupBy
в соответствии с Укажите середину функции из правила/ключа для группировки
val intRdd = sc.parallelize(List(1,2,3,4,5,6))
val result = intRdd.groupBy(x=>{if(x%2 == 0)“even” else “odd”}).collect
//Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6)), (odd,CompactBuffer(1, 3, 5)))
10)reduce
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//уменьшаем агрегацию
val result = rdd1.reduce(_ + )
// Первый Результат последней операции из, второй_ На этот раз приходит из элемента
★Вопросы на собеседовании
уменьшитьByKeyдаTransformationвозвращатьсядаAction? --Transformation
уменьшитьда Преобразованиевозвращатьсяда Действие? --Action
11)reducebykey
Уведомлениеreducebykeyда Конвертироватьоператор
уменьшитьByKey(func) из функции да, использует funcфункцию для объединения значений с одним и тем же ключом из.
Например, сокращениеByKey((a,b) => a+b), есть четыре ключевых значения верно("spark",1), ("spark",2), ("hadoop",3)и("hadoop",5)
верно имеет то же значение ключа верно. После слияния результат: ("spark", 3), ("hadoop", 8).
Видно, что (а, б) => В лямбда-выражении a+b ab относится к значению.
Например, если верно имеет одно и то же значение ключа верно («искра»,1), («искра»,2), то a равно да1, а b равно да2.
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5)))
val rdd3 = rdd1.union(rdd2) //Союз
rdd3.collect
//Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (shuke,1), (jerry,2), (tom,3), (shuke,2), (kitty,5))
//Агрегация по ключу
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//Array[(String, Int)] = Array((tom,4), (jerry,5), (shuke,3), (kitty,7))
12)repartition
Изменить количество Разделов
val rdd1 = sc.parallelize(1 to 10,3) //Указываем 3 раздела
//Используем перераспределение для изменения количества rdd1Раздел
//уменьшать Раздел
rdd1.repartition(2).partitions.length //Количество вновь созданных изрдд Раздел равно 2
rdd1.partitions.length //3 //Уведомление: Исходное количество изрдд Раздел остается неизменным
//Добавить раздел
rdd1.repartition(4).partitions.length
//уменьшать Раздел
rdd1.repartition(3).partitions.length
//Используем объединение, чтобы изменить количество rdd1Раздел
//уменьшать Раздел
rdd1.coalesce(2).partitions.size
rdd1.coalesce(4).partitions.size
★Уведомление:
Передел может увеличить количество из Разделов в иуменьшатьrdd,
Число объединений по умолчанию — уменьшение rddРаздел. Увеличение числа rddРаздел не вступит в силу.
Нет管增добавлятьвозвращатьсядауменьшать Разделчисло ОригиналrddРазделчисло Нет Изменять,изменение изда нового поколения изномер рддиз Раздел
★Сценарии применения:
существовать Пучокиметь дело срезультатдержатьприезжатьhdfsначальство Извперед Можеткуменьшать Разделчисло(Объединение небольших файлов)
sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”)
.flatMap(.split(" ")).map((,1)).reduceByKey(+)
.repartition(1)
//существоватьдержатьприезжатьHDFSИзвпередруководить Тяжелый Разделдля1,ТакдержатьсуществоватьHDFSначальствоизрезультатдокумент Толькоиметь1индивидуальный
.saveAsTextFile(“hdfs://node01:8020/wordcount/output5”)
13)collect
val rdd1 = sc.parallelize(List(6,1,2,3,4,5), 2)
rdd1.collect
14)count
подсчитать элементы в статистической коллекции изчисло
rdd1.count //6
Найдите изчисло элемента в самом внешнем наборе RDD.
val rdd3 = sc.parallelize(List(List(“a b c”, “a b b”),List(“e f g”, “a f g”), List(“h i j”, “a a b”)))
rdd3.count //3
15)distinct
val rdd = sc.parallelize(Array(1,2,3,4,5,5,6,7,8,1,2,3,4), 3)
rdd.distinct.collect
16)top
//Получаем N верхних элементов самого большого из
val rdd1 = sc.parallelize(List(3,6,1,2,4,5))
rdd1.top(2)
17)take
//Возьмем первые N элементов в исходном порядке
rdd1.take(2) //3 6
//нуждаться: вынуть минимум из2
rdd1.sortBy(x=>x,true).take(2)
18)first
//Согласно оригиналуиз Приведи в порядоквперед Первыйиндивидуальный
rdd1.first
19)keys、values
val rdd1 = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)
val rdd2 = rdd1.map(x => (x.length, x))
rdd2.collect
//Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))
rdd2.keys.collect
//Array[Int] = Array(3, 5, 4, 3, 7, 5)
rdd2.values.collect
//Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
20)mapValues
MapValues означает, что элементы из верноRDD работают, Key остается неизменным, а Value меняется на после операции.
val rdd1 = sc.parallelize(List((1,10),(2,20),(3,30)))
val rdd2 = rdd1.mapValues(_*2).collect //_ представляет каждое значение , ключ остается неизменным, примените функцию к значению
//(1,20),(2,40),(3,60)
21)collectAsMap
КонвертироватьстановитьсяMap
val rdd = sc.parallelize(List((“a”, 1), (“b”, 2)))
rdd.collectAsMap
//scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
Вопросы на собеседовании: foreach и foreachPartition
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.foreach(x => println(x*100)) //xдакаждыйэлементы
rdd1.foreachPartition(x => println(x.reduce(_ + _))) //xdaeach Раздел
Уведомление:foreachиforeachPartitionВседаActionдействовать,нодакначальствокодсуществоватьspark-shellсерединаосуществлять Не могу видетьприезжатьвыходрезультат,
Причина да передается в foreach и foreachPartitioniz, чтобы вычислить Функцию в каждом Разделосуществлятьиз, то есть в кластере по каждому Worker изучаиз.
Сценарии применения:напримерсуществоватьфункциясерединахотеть ВоляRDDсерединаизэлементдержатьприезжатьданные Библиотека
foreach:встреча ВоляфункцияэффектприезжатьRDDсерединаизкаждыйполоскаданные,Так сколько же данных?,действоватьданные Библиотекасоединятьиз Просто включи и выключи егоосуществлятьмногонемного Второсортный
foreachPartition:Воляфункцияэффектприезжать Каждыйодин Раздел,Затем каждое Разделосуществлять после открытия и закрытия соединения с библиотекой данных.,Есть несколько разделов, которые будут изучать данные о подключении библиотеки: открываются и закрываются.
import org.apache.spark.{SparkConf, SparkContext}
object Test {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(config)
//Устанавливаем уровень вывода журнала
sc.setLogLevel("WARN")
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
//Applies a function f to all elements of this RDD.
//Применяем Functionf ко всем элементам этого RDDiz
rdd1.foreach(x => println(x*100))
//Передаем функцию в каждый Раздел и циклически перебираем элементы в Должен Раздел внутри Раздела
//x — это каждый элемент, то есть один из числа
println("==========================")
//Applies a function f to each partition of this RDD.
//Применяем Functionf к каждому разделу этого RDDiz
rdd1.foreachPartition(x => println(x.reduce(_ + _)))
//Передаем каждый раздел в функцию существования
//xdaeach Раздел
}
}
Вопрос на собеседовании: карта и mapPartitions
Передайте каждый раздел в функцию
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
rdd1.mapPartitions(x=>x.map(y=>y*2)).collect
// Каждый Раздел в xда, из элемента в yда Раздел
Расширение: mapPartitionsWithIndex (одновременно получить номер раздела)
Функция: Возьмите Раздел в верные и изданные,Так же вы можете вывести номер Разделиз,так Сразу Можетк Знатьданныедарод Вгдеиндивидуальный Разделиз
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
//Долженфункцияиз функции да вынесет правильный раздел Раздел изданные и выведет номер Раздела
// индекс Номер раздела
// одинiterРаздел Внутриизданные
val func = (index: Int, iter: Iterator[Int]) => {
iter.map(x => “[partID:” + index + ", val: " + x + “]”)
}
rdd1.mapPartitionsWithIndex(func).collect
//Array[String] = Array(
[partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3],
[partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6],
[partID:2, val: 7], [partID:2, val: 8], [partID:2, val: 9]
)
Расширение: агрегат
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
//0 представляет начальное значение
//Первыйиндивидуальный_+,Указывает внутрирегиональную агрегацию,Первыйиндивидуальный_Представляет историческую ценность,Второй _ представляет текущее значение
//Второй +_,Представляет интервальную агрегацию,Первыйиндивидуальный_Представляет историческую ценность,Второй _ представляет текущее значение
val result1: Int = rdd1.aggregate(0)( _ + _, _ + _) //45 ==> 6 + 15 + 24 = 45
//10 представляет начальное значение. Каждый раздел имеет начальное значение, а также начальное значение во время агрегации интервалов.
val result2: Int = rdd1.aggregate(10)( _ + _ , _ + _) //85 ==> 10+ (10+6 + 10+15 + 10+24)=85
Расширение: joinByKey
val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”).flatMap(.split(" ")).map((, 1))
//Array((hello,1), (me,1), (hello,1), (you,1), (hello,1), (her,1))
//x => x означает, что ключ остается неизменным
//(a: Int, b: Int) => a + b: указывает на внутриобластное агрегирование
//(m: Int, n: Int) => m + n: представляет собой агрегацию интервалов
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
//val rdd2 = rdd1.combineByKey(x => x, _ + _ , _ + _ )//Уведомление Аббревиатура здесь неправильная, принцип такой: экономь, если можешь, не ленись, если не можешь.
rdd2.collect
//Array[(String, Int)] = Array((hello,3), (me,1), (you,1), (her,1))
val rddData1: RDD[(String, Float)] = sc.parallelize(
Array(
(«Класс 1», 95f),
(«Класс 2», 80f),
(«Класс 1», 75f),
(«Класс 3», 97f),
(«Класс 2», 88f)),
2)
val rddData2 = rddData1.combineByKey(
grade => (grade, 1),
(gc: (Float, Int), grade) => (gc._1 + grade, gc._2 + 1),
(gc1: (Float, Int), gc2: (Float, Int)) => (gc1._1 + gc2._1, gc1._2 + gc2._2)
)
val rddData3 = rddData2.map(t => (t._1, t._2._1 / t._2._2))
rddData3.collect
Расширение: агрегатByKey
val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)
def func(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.map(x => “[partID:” + index + ", val: " + x + “]”)
}
pairRDD.mapPartitionsWithIndex(func).collect
//Array(
[partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)],
[partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]
)
pairRDD.aggregateByKey(0)(math.max( _ , _ ), _ + _ ).collect
// Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
//100 представляет начальное значение в области, агрегирование интервалов отсутствует
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
//Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
pairRDD.aggregateByKey(5)(math.max(_, _), _ + _).collect
//Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,10))
pairRDD.aggregateByKey(10)(math.max(_, _), _ + _).collect
//Array[(String, Int)] = Array((dog,12), (cat,22), (mouse,20))
val rddData1 = sc.parallelize(
Array(
(«Пользователь 1», «Интерфейс 1»),
(«Пользователь 2», «Интерфейс 1»),
(«Пользователь 1», «Интерфейс 1»),
(«Пользователь 1», «Интерфейс 2»),
(«Пользователь 2», «Интерфейс 3»)),
2)
val rddData2 = rddData1.aggregateByKey(collection.mutable.SetString)(
(urlSet, url) => urlSet += url,
(urlSet1, urlSet2) => urlSet1 ++= urlSet2)
rddData2.collect
небольшое упражнение
нуждаться
Учитывая ключевое значение верноRDD
val rdd = sc.parallelize(Array((“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6)))
ключ представляет название книги,
значение представляет продажи книг в определенный день,
Пожалуйста, подсчитайте, насколько правильно должен выглядеть каждый ключ. в То есть да вычисляет средний дневной объем продаж каждого типа книг.
Конечный результат: ("искра",4),("хадуп",5)
val rdd1 = rdd.groupByKey
rdd1.collect
//Array((spark,CompactBuffer(6, 2)), (hadoop,CompactBuffer(4, 6)))
val rdd2 = rdd1.mapValues(v => v.sum / v.size)
rdd2.collect
Отвечать
val rdd = sc.parallelize(Array((“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6)))
val rdd2 = rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))
val rdd3 = rdd2.map(t=>(t._1,t._2.sum /t._2.size))
rdd3.collect
//Array[(String, Int)] = Array((spark,4), (hadoop,5))
Подвести итог
1) Классификация
Изоператор РДД делится на две категории.,Класс операций даTransformationКонвертировать.,Разновидность операции даActionдействие
2) Как отличить Трансформацию и Действие
возвращатьсяценитьдаRDDиздляTransformationКонвертироватьдействовать,Задерживатьосуществлять/ленивыйосуществлять/инерцияосуществлять
возвращаться Стоит ли оно того?даRDD(нравитьсяUnit、Array、Int)издляActionдействиедействовать
3) Вопросы на собеседовании:
1.Что такое операции преобразования и API? --map/flatMap/filter…
2.Что такое API-интерфейсы действий? --collect/reduce/saveAsTextFile…
3.reduceByKeyдаTransformationвозвращатьсядаAction? --Transformation
4.ReduceTransformationвозвращатьсяdaAction? – Action
5. В чем разница между foreach и foreachPartition? foreach действует на каждый элемент, foreachPartition действует на каждый раздел
4)Уведомление:
RDD на самом деле не хранит то, что на самом деле рассчитывается.,И только да записывает отношение RDD из Конвертировать (называемое какой метод,Что будет,От чего зависят RDD,Разделустройстводачто,числоколичествомашина с исходным кодом блоковустройство Списокповерхность)
Все операции Конвертировать в RDD задерживаются, то есть не рассчитываются напрямую. Эти файлы действительно запускаются только при выполнении операции Action.
В реальном развитии некоторые RDD Из расчета или Конвертировать может потребоваться больше времени, если эти RDD назад Продолжениевозвращатьсявстречачастоизодеялоиспользоватьприезжать,Тогда эти РДД могут быть Сохранение/кэширование.,так Вниз Второсортный Сноваиспользоватьприезжатьизчасждать Сразу Нетиспользовать Снова Тяжелый新вычислить Понятно,Повышена эффективность работы программы.
метод persist и метод кэша
RDD методpersist или кэш-метод могут кэшировать предыдущие результаты вычислений.,нода Нетдаэти двоеиндивидуальныйметод Кэшировать сразу при вызове,И когда да запускает следующее действие,ДолженRDD будет кэшироваться в памяти вычислительного узла из,идляназадлапша Тяжелыйиспользовать。
Проверять Исходный кодRDDиз обнаружил, что кэш в конечном итоге вызывается методом persist без параметров (хранилище по умолчанию существует только в памяти)
1) Запускаем кластер и искровую оболочку
/export/servers/spark/sbin/start-all.sh
/export/servers/spark/bin/spark-shell \
--master spark://node01:7077,node02:7077 \
--executor-memory 1g \
--total-executor-cores 2
2) Поставьте RDD Настойчивость, последующие операции Должен RDD Вы можете получить его прямо из кеша
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //кэширование/постоянство
Действие rdd2.sortBy(_._2,false).collect//trigger прочитает файл HDFS, и rdd2 будет действительно постоянным.
rdd2.sortBy(_._2,false).collect//триггер действия,Прочитаю кэш изданные,осуществлятьскоростьстепеньвстреча比Извпередбыстрый,потому чтодляrdd2Уже сохраняетсяприезжать Внутрижитьсередина Понятно
3) Уровень хранения
Уровень хранения по умолчанию — хранить в памяти только одну копию.,Для Spark существует множество различных уровней хранения.,жить储级别существоватьobject StorageLevel, определенный в
Подвести итог
1.RDD Сохранение/кэшированиеиззда с целью повышения скорости последующих операций.
2. Уровней кэша много. По умолчанию он существует только в памяти. Memory_and_disk используется в разработке.
3. Только после выполнения операции действия СДР будет фактически обработан.
4. В реальном развитии, если определенный RDD будет часто использоваться в будущем, вы можете использовать ДолженRDD для Сохранение/кэширование.
1) Ограничения настойчивости
Сохранение/кэширование может помещать данные в память.,Хотядабыстрыйскоростьиз,нодатакжеданаименее надежныйиз;также Можетк Пучокданныепомещатьсуществоватьдискначальство,также Нетда Абсолютно надежныйиз!Напримердискбудет поврежденждать。
2) Решение проблем
Контрольная точка создана для более надежного сохранения,Во время Checkpoint данные обычно размещаются в HDFS.,Естественно, это зависит от присущей HDFS высокой отказоустойчивости и высокой надежности для достижения максимальной безопасности.,Реализован RDD для обеспечения отказоустойчивости и высокой доступности.
3) Этапы использования
1.SparkContext.setCheckpointDir("каталог") //HDFSиз Каталог
2.RDD.checkpoint()
sc.setCheckpointDir(“hdfs://node01:8020/ckpdir”)
//Установим каталог контрольной точки, который немедленно создаст пустой каталог в HDFS
val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”).flatMap(.split(" ")).map(( _ , 1)).reduceByKey( _ +)
rdd1.checkpoint() //вернордд1 выполняет сохранение контрольной точки
rdd1.collect //Операция действия действительно будет контрольной точкой
//назад Продолжениееслихотетьиспользоватьприезжатьrdd1Можеткотcheckpointсерединачитать
Посмотреть результаты
hdfs dfs -ls /
илипроходитьwebинтерфейс Проверять
http://192.168.1.101:50070/dfshealth.html#tab-overview
1) Как обеспечить безопасность и эффективность чтения данных в разработке.
Могут использоваться часто и важные изданные,Сначала выполните кеширование/сохранение,Повторите операцию проверки
2) Настойчивость и Checkpoint изразница
1. Местоположение
Persist и Cache может толькодержатьсуществоватьместныйиздиски Внутрижитьсередина(или За пределами кучи Внутрижить–экспериментсередина)
Checkpoint Можеткдержатьданныеприезжать HDFS Этот тип надежного хранилища
2. Жизненный цикл
CacheиPersistизRDDвстречасуществовать Конец программыназадвстречаодеяло Прозрачныйили Ручной вызовunpersistметод
CheckpointизRDD все еще существует после завершения работы программы и не будет удален
3.Lineage (родословная, цепочка зависимостей – собственно да Зависимости)
PersistиCache не будет выбрасывать цепочку зависимостей/Зависимости между RDD, поскольку этот кеш ненадежен при возникновении каких-либо ошибок (например Executor Время простоя), вам необходимо пересчитать трассировку цепочки зависимостей.
Checkpoint Это разорвет цепочку зависимостей, потому что Checkpoint сохранит результаты в HDFS этотдобрыйжить储середина,Более безопасный и надежный,Обычно нет необходимости отслеживать цепочку зависимостей.
Дополнение: Lineage RDD изLineage (происхождение, цепочка зависимостей) будет записывать RDD и информацию о данных элемента и конвертировать поведение. Когда часть ДолженRDDiz Разделенные потеряна, она может быть изменена. соответствии с Эта информация используется для повторных вычислений и восстановления потерянных данных.
При выполнении восстановления после сбоя Spark считывает стоимость Checkpoint и пересчитывает стоимость RDD для сравнения, тем самым автоматически выбирая оптимальную стратегию восстановления.
1) Два типа зависимостей
RDDи зависит от родителя. RDD имеет два разных типа отношений, а именно:
широкая зависимость (широкая dependency/shuffle dependency)
узкая зависимость dependency)
2) Иллюстрация
3) Как отличить широкие и узкие зависимости
Узкая зависимость: родительский RDDиз a Раздел будет зависеть только от дочернего RDDiz a Раздел.
Ширинаполагаться:отецRDDизодин Раздел Можно стегатьRDDиз Несколько Разделполагаться(с участиемприезжатьshuffle)
4) Вопросы для собеседования
Зависит ли один дочерний RDD от нескольких родительских RDD? Это широкая или узкая зависимость?
Не уверен, тоже да Зависимость от ширины и шириныизразделятьв соответствии сдаотецRDDизодин Разделда Нет одеялаRDDиз Несколько Раздел Местополагаться,да,Просто широкая зависимость,илиотshuffleиз Суждение под углом,Идет перемешивание Просто широкая зависимость
1) верно опирается на узкие зависимости
Spark может выполнять параллельные вычисления
Если один Разделданный потерян, нужно пересчитать только 1 Раздел из родительского RDDизверно. Пересчитывать всю задачу не нужно, что повышает отказоустойчивость.
2) верно полагается на Куана
даразделятьStageиз На основе
одинсложныйизбизнес-логикаеслииметьshuffle,Тогда это означает, что после того, как предыдущий этап дал результаты,,Талант изучить следующий этап,То есть расчет следующего этапа зависит от предыдущего этапа. Далее действуем в порядке перемешивания разделять (то есть просто следуем широкой зависимости разделять),Вы можете преобразовать DAGразделить в несколько этапов/стадий.,на той же стадии,Будет несколько операций оператора,Может сформировать линию сборки трубопровода,Несколько параллелей в конвейере могут выполняться параллельно.
верно полагаться на узкую зависимость,обработка раздела из Конвертировать завершает расчет на этапе,Не разделять(По возможности размещайте узкие зависимости на одном этапе.,Можетквыполнитьпоток Ватерлиниявычислить)
Из-за широкой зависимости, из-за существования перемешивания, следующий расчет может быть запущен только после завершения обработки родительского СДР, а это означает, что требуется этап разделять (широкая зависимость означает расщепление)
Spark будет в соответствии с shuffle/wide Dependency использовать алгоритм обратного отслеживания, чтобы верноDAG для Stageразделять.,сзади вперед,сталкиватьсяприезжать Ширинаполагаться Сразуотключиться,сталкиватьсяприезжатьузкийполагаться Сразу ПучоккогдавпередизRDDприсоединитьсяприезжатькогдавпередизstage/этапсередина
Подробную информацию об алгоритме разделения можно найти в статье, опубликованной AMP Lab.
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se
Spark запускает основные процессы
1. При подаче заявки Spark ее сначала необходимо Приложение создает базовую рабочую среду, состоящую из узла управления задачами (Драйвер) и SparkContext.
2.SparkContext регистрируется в диспетчере ресурсов и применяется для запуска ресурсов Executor;
3.ресурсуправлятьустройстводляExecutorраспределять ресурсы изапускатьExecutorпроцесс,Executorбегать Состояние Воля Отправлено с большим удовольствиемприезжатьресурсуправлятьустройствоначальство;
4.SparkContextв соответствии сRDDиз Зависимости конструируется в граф DAG и передается в DAGScheduler для анализа разделять в Stage, а Taskset, состоящий из Task в ДолженStage, отправляется в TaskScheduler.
5.TaskScheduler передает задачу Исполнителю для запуска, а SparkContext передает код приложения Исполнителю.
6.ExecutorВоляTaskвброситьприезжатьпул потоковсерединаосуществлять,Отправьте результаты в планировщик задач.,Затем вернитесь к планировщику DAG.,После запуска запишите данные и освободите все ресурсы.
1.Sparkотвечатьиспользоватьодеялопредставлять на рассмотрение–>SparkContext向ресурсуправлятьустройствозарегистрироватьсяи申请ресурс–>запускатьExecutor
2.RDD–>СтроитьDAG–>DAGSchedulerразделятьStageформастановитьсяTaskSet–>TaskSchedulerпредставлять на рассмотрениеTask–>WorkerначальствоизExecutorосуществлятьTask
по умолчанию,Когда Spark запускает функцию параллельно на нескольких разных узлах из нескольких задач,это будетфункциясерединас участиемприезжатьизкаждый Изменятьколичество,существоватькаждый Задачаначальство Всерожденныйстановитьсяодинкопировать。нода,Иногда необходимо разделить переменные между несколькими задачами.,илисуществовать Задача(Task)и Задача控制узел(Driver Программа) общие переменные.
Чтобы удовлетворить эту нуждаться, Искра Предусмотрено два типа переменных:
1.аккумуляторaccumulators:аккумуляторподдерживатьсуществовать Местоиметьдругойузелмеждуруководитьусталыйдобавлятьвычислить(напримерсчитатьчислоили Сумма)
2.Широковещательные переменныеbroadcast variables:Широковещательные переменные используются для совместного использования переменных в памяти всех узлов и кэширования переменной, доступной только для чтения, на каждой машине вместо создания копии для каждой задачи на машине.
8.1.1 Не использовать аккумуляторы
8.1.2 Использование аккумуляторов
обычно для Spark При передаче функции, например, с помощью map() функцияилииспользовать filter() При прохождении условий,Вы можете использовать программу драйвера для определения переменных,нодакластерсерединабегатьизкаждый Задача Всевстречапридетсяприезжатьэтот些Изменятьколичествоизновыйизкопировать,Обновление этих значений реплик не повлияет на величину изверно нагрузки в приводе. На данный момент с помощью аккумулятора можно добиться желаемого эффекта.
val xx: Аккумулятор[Int] = sc.accumulator(0)
8.1.3 демо-код
package cn.itcast.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//Используем коллекцию scala для завершения накопления
var counter1: Int = 0;
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
println(counter1)//6
println("+++++++++++++++++++++++++")
//Используем RDD для накопления
var counter2: Int = 0;
val dataRDD: RDD[Int] = sc.parallelize(data) //Распределенная коллекция из[1,2,3]
dataRDD.foreach(x => counter2 += x)
println(counter2)//0
//Уведомление: Приведенные выше результаты операции изRDD да0
//потому чтодляforeachсерединаизфункциядаперешел кWorkerсерединаизExecutorосуществлять,использоватьприезжать Понятноcounter2Изменятьколичество
//Переменная counter2 определяется на стороне драйвера. При передаче исполнителю каждый исполнитель имеет копию счетчика2.
//наконецкаждыйExecutorВолякаждыйсиндивидуальныйxдобавлятьприезжать Собственныйизcounter2начальстволапша Понятно,иDriverконецизcounter2без Это как-то связано с
//Тогда эту проблему нужно решить! Не может быть, чтобы мы даже сложить не могли только потому, что используем Spark!
//Если решено?---Использовать аккумулятор
val counter3: Accumulator[Int] = sc.accumulator(0)
dataRDD.foreach(x => counter3 += x)
println(counter3)//6
}
}
8.2.1 Не используйте широковещательные переменные
8.2.2 Использование широковещательных переменных
8.2.3 демо-код
package cn.itcast.core
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}
object BroadcastVariablesTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//Нетиспользовать Широковещательные переменные
val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
//scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
//в соответствии с Номер фрукта, чтобы узнать название фрукта
val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
fruitNames.foreach(println)
//Уведомление:кначальствокодсмотреть似одинточкавопросбезиметь,нодаучитыватьприезжатьданныеколичествоесли Больше,И количество Заданий большое,//Это приведет к,одеялокаждыйTaskобщийиспользоватьприезжатьизfruitMapвстречаодеяломного Второсортныйпередача инфекции
//Если вы хотите перенести FruitMap из одной машины на одну, Должениз Таск может использовать ее на всех машинах.
//нравиться Что делатьприезжать?---использовать Широковещательные переменные
println("=====================")
val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
fruitNames2.foreach(println)
}
}
sc.textFile("./dir/*.txt")
Если передан каталог, все файлы в нем будут читаться как RDD. Пути к файлам поддерживают подстановочные знаки.
нодатакверно Вбольшойколичествоиз Маленькийдокументчитатьэффективность Нетвысокий,отвечать ДолжениспользоватьwholeTextFiles
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String,String)])
возвращатьсяценитьRDD[(String, String)], где Key — имя файла, Value — содержимое.
Spark поддерживатьпроходить Java JDBC Доступ к реляционным базам данных. Нужно использовать JdbcRDD
демо-код
package cn.itcast.core
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Desc ДемоиспользоватьSparkдействоватьJDBC-APIвыполнить Воляданные ДепозитприезжатьMySQLи прочитай это
*/
object JDBCDataSourceTest {
def main(args: Array[String]): Unit = {
//1.создаватьSparkContext
val config = new SparkConf().setAppName("JDBCDataSourceTest").setMaster("local[*]")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
//2.вставлятьданные val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))
//Вызов foreachPartition для выполнения операций над каждым разделом
//data.foreachPartition(saveToMySQL)
//3.читатьданные def getConn():Connection={
DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
}
val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,
getConn,
"select * from t_student where id >= ? and id <= ? ",
4,
6,
2,
rs => {
val id: Int = rs.getInt("id")
val name: String = rs.getString("name")
val age: Int = rs.getInt("age")
(id, name, age)
}
)
println(studentRDD.collect().toBuffer)
}
/*
CREATE TABLE `t_student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
*/
def saveToMySQL(partitionData:Iterator[(String, Int)] ):Unit = {
//Воляданные ДепозитприезжатьMySQL
//Получаем соединение
val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
partitionData.foreach(data=>{
//Волякаждыйполоскаданные ДепозитприезжатьMySQL
val sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (NULL, ?, ?);"
val ps: PreparedStatement = conn.prepareStatement(sql)
ps.setString(1,data._1)
ps.setInt(2,data._2)
ps.execute()//preparedStatement.addBatch()
})
//ps.executeBatch()
conn.close()
}
}
https://blog.csdn.net/leen0304/article/details/78854530
Вся экосистема Spark и Hadoop полностью совместима.,такверно ВHadoopМестоподдерживатьиз Тип файлаилиданные Библиотекадобрыйформа,Sparkтакжетакой жеподдерживать。
HadoopRDD、newAPIHadoopRDD、saveAsHadoopFile、saveAsNewAPIHadoopFile да Базовый API
Другие интерфейсы API настроены для облегчения окончательной разработки программы Spark, и эти два интерфейса представляют собой высокоэффективные версии реализации.
Файл SequenceFile — это плоский файл (Flat File), разработанный Hadoop для хранения двоичного значения ключа.
https://blog.csdn.net/bitcarmanlee/article/details/78111289
Чтение sc.sequenceFile keyClass, valueClass
Файл RDD.saveAsSequenceFile(путь)
Требуется, чтобы ключ и значение могли быть автоматически преобразованы в тип Writable.
вернофайл изображенияда Волявернослонсериализацияназаддержатьиздокумент
читатьsc.objectFilek,v //Поскольку да сериализуется, необходимо указать тип
Функция RDD.saveAsObjectFile().
потому что org.apache.hadoop.hbase.mapreduce.TableInputFormat Класс реализации, Spark Доступ к HBase можно получить в формате ввода Hadoop.
Этот формат ввода станет значением ключа верные,
Тип ключа — org. apache.hadoop.hbase.io.ImmutableBytesWritable,
Тип значения — org.apache.hadoop.hbase.client.Result.
https://github.com/teeyog/blog/issues/22
package cn.itcast.core
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object DataSourceTest {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
System.setProperty("HADOOP_USER_NAME", "root")
//1.HadoopAPI
println("HadoopAPI")
val dataRDD = sc.parallelize(Array((1,"hadoop"), (2,"hive"), (3,"spark")))
dataRDD.saveAsNewAPIHadoopFile("hdfs://node01:8020/spark_hadoop/",
classOf[LongWritable],
classOf[Text],
classOf[TextOutputFormat[LongWritable, Text]])
val inputRDD: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile(
"hdfs://node01:8020/spark_hadoop/*",
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
conf = sc.hadoopConfiguration
)
inputRDD.map(_._2.toString).foreach(println)
//2. Чтение небольших файлов.
println("Прочитать небольшой файл")
val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("D:\\data\\spark\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
//3.Работа с файлом последовательности
println("SequenceFile")
val dataRDD2: RDD[(Int, String)] = sc.parallelize(List((2, "aa"), (3, "bb"), (4, "cc"), (5, "dd"), (6, "ee")))
dataRDD2.saveAsSequenceFile("D:\\data\\spark\\SequenceFile")
val sdata: RDD[(Int, String)] = sc.sequenceFile[Int, String]("D:\\data\\spark\\SequenceFile\\*")
sdata.collect().foreach(println)
//4.Операция ObjectFile
println("ObjectFile")
val dataRDD3 = sc.parallelize(List((2, "aa"), (3, "bb"), (4, "cc"), (5, "dd"), (6, "ee")))
dataRDD3.saveAsObjectFile("D:\\data\\spark\\ObjectFile")
val objRDD = sc.objectFile[(Int, String)]("D:\\data\\spark\\ObjectFile\\*")
objRDD.collect().foreach(println)
sc.stop()
}
}
package cn.itcast.core
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object DataSourceTest2 {
def main(args: Array[String]): Unit = {
val config = new SparkConf().setAppName("DataSourceTest").setMaster("local[*]")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
val fruitTable = TableName.valueOf("fruit")
val tableDescr = new HTableDescriptor(fruitTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
val admin = new HBaseAdmin(conf)
if (admin.tableExists(fruitTable)) {
admin.disableTable(fruitTable)
admin.deleteTable(fruitTable)
}
admin.createTable(tableDescr)
def convert(triple: (String, String, String)) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, put)
}
val dataRDD: RDD[(String, String, String)] = sc.parallelize(List(("1","apple","11"), ("2","banana","12"), ("3","pear","13")))
val targetRDD: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(convert)
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit")
//писатьданные targetRDD.saveAsHadoopDataset(jobConf)
println("Запись в данные успешна")
//читатьданные conf.set(TableInputFormat.INPUT_TABLE, "fruit")
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count: Long = hbaseRDD.count()
println("hBaseRDD RDD Count:"+ count)
hbaseRDD.foreach {
case (_, result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
val color = Bytes.toString(result.getValue("info".getBytes, "price".getBytes))
println("Row key:" + key + " Name:" + name + " Color:" + color)
}
sc.stop()
}
}
Официальный сайт
http://spark.apache.org/sql/
Spark SQLдаSpark — модуль, используемый для обработки структурированных данных.
Spark SQL также предоставляет различные методы использования, включая DataFrames. APIиDatasets API。ноникто Аргументда Какой видAPIилидаязык программирования,Они оба основаны на одном и том же движке.,Таким образом, вы можете переключаться между различными API по своему желанию.,Каждый из них имеет свои особенности.
1.легкий Интегрировать
Вы можете использовать Java, Scala, Python, R и другие языки операций API.
2. Унификация изданных визитов
Подключите приезжатьлюбой источник данныеизируются таким же образом.
3. Совместимость с Ульем
Поддержка синтаксиса hiveHQLиз.
Совместимость с hive (библиотека метаданных, синтаксис SQL, UDF, сериализация, механизм десериализации)
4. Стандартные изданные соединения
Можно использовать стандартные соединения JDBC или ODBC.
1) Преимущества SQLиз
Выражение очень ясное, Например, этот абзац SQL Очевидно, что для запроса трех полей условием является запрос возраста, превышающего 10 возрастиз
Низкая сложность и легкость в освоении.
2) SQLиз Недостатки
Сложный анализ, больше вложенности SQL: только представьте себе 3 уровня вложенности Поддержка SQL должна быть довольно сложной, не так ли?
Машинное обучение — это сложно: только представьте, насколько сложно было бы реализовать алгоритм машинного обучения с использованием SQL.
Hiveда конвертирует SQL в MapReduce
SparkSQL можно понимать как синтаксический анализ SQL в RDD. + Оптимизируйте еще разосуществлять
1.5.1 DataFrame
Какой даDataFrame?
DataFrameда Разновидность RDD на основе метаинформации схемы из распределенного набора данных, Похоже на:Традиционная библиотека данных из двумерной таблицы 。
1.5.2 DataSet
Какой даDataSet?
DataSetда сохраняет больше информации описания, типа информации из распределенного набора данных.
По сравнению с RDD он сохраняет больше описательной информации и концептуально эквивалентен двумерной таблице в реляционной базе данных.
DataFrame сохраняет информацию о типе, обеспечивает более строгую типизацию, обеспечивает проверку типов во время компиляции,Вызов Datasetismetod сначала сгенерирует логический план.,Затем оптимизируется спаркиз оптимизатором,Окончательное создание физического плана,Ранназадпредставлять на рассмотрениеприезжатькластерсерединабегать!
DataSet содержит функцию DataFrameиз,
В Spark2.0 они объединены, а DataFrame представлен как DataSet[Row], который является подмножеством DataSetиз.
DataFrame на самом деле — это просто даDateset[Row]
1.5.3 Различия между RDD, DataFrame и DataSetструктура Иллюстрация
СДР[Человек]
Возьмите Person в качестве параметра типа,но Нет Понятноразвязать его внутреннее строение.
DataFrame
Предоставляет подробную информацию о столбце схемы структурной информации, а также имени и типе. Это похоже на стол
DataSet[Person]
![](https://files.mdnice.com/user/37735/a21c2a86-9dbc-41cc-837f-2ff8c538d01a.png)
Не только информация о схеме, но и информация о типе
До версии Spark2.0
SQLContextдасоздаватьDataFrameиосуществлятьSQLиз Вход
HiveContextпроходитьhive Оператор sql управляет данными таблицы hive и совместим с операциями hiveContext, наследуемыми от SQLContext.
После искры 2.0
SparkSession Инкапсуляция ПонятноSqlContextиHiveContextМестоиметь Функция。проходитьSparkSessionвозвращаться МожеткполучатьприезжатьSparkConetxt。
SparkSessionМожеткосуществлятьSparkSQLтакже МожеткосуществлятьHiveSQL.
2.2.1. Создание и чтение текстовых файлов.
1. Файл в локальном создателе,Есть три столбца: идентификатор, имя и возраст.,разделенные пробелами,Ранназадначальствопроходитьприезжатьhdfsначальство
vim /root/person.txt
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
начальствопроходитьданныедокументприезжатьHDFSначальство:
hadoop fs -put /root/person.txt /
2. в искре Следующая команда в шеллосуществлять считывает данные и разделяет каждую выделенную строку с помощью разделителей столбцов.
Открытая искровая оболочка
/export/servers/spark/bin/spark-shell
создаватьRDD
val lineRDD= sc.textFile("hdfs://node01:8020/person.txt").map(_.split(" "))
//RDD[Array[String]]
3. Определите случай класс (эквивалент таблицы изсхемы)
case class Person(id:Int, name:String, age:Int)
4. Дело СДР классовая ассоциация
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
5.ВоляRDDКонвертироватьстановитьсяDataFrame
val personDF = personRDD.toDF
//DataFrame
6.Проверятьданныеиschema
personDF.show
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan| 20|
| 2| lisi| 29|
| 3| wangwu| 25|
| 4| zhaoliu| 30|
| 5| tianqi| 35|
| 6| kobe| 40|
+---+--------+---+
personDF.printSchema
7.Регистрационная форма
personDF.createOrReplaceTempView("t_person")
8.осуществлятьSQL
spark.sql("select id,name from t_person where id > 3").show
9. Вы также можете использовать SparkSession для сборки DataFrame.
val dataFrame=spark.read.text("hdfs://node01:8020/person.txt")
dataFrame.show //Уведомление: чтение текстового файла напрямую без полной информации о схеме.
dataFrame.printSchema
2.2.2 Чтение файла JSON
1.данныедокумент
Используйте файл isjson в установочном пакете Spark.
more /export/servers/spark/examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
2. в искре Shellосуществлять следующую команду, прочитать данные
val jsonDF= spark.read.json("file:///export/servers/spark/examples/src/main/resources/people.json")
3. Далее вы можете использовать операцию DataFrameизфункции.
jsonDF.show //Уведомление:прямой Чтение файла В JSON есть информация о схеме. Поскольку сам файл json содержит информацию о схеме, SparkSQL может автоматически проанализировать ее.
2.2.3 Чтение файлов паркета.
1.данныедокумент
Используйте файл isparquet в установочном пакете Spark.
more /export/servers/spark/examples/src/main/resources/users.parquet
2. в искре Shellосуществлять следующую команду, прочитать данные
val parquetDF=spark.read.parquet("file:///export/servers/spark/examples/src/main/resources/users.parquet")
3. Далее вы можете использовать операцию DataFrameизфункции.
parquetDF.show //Уведомление:прямой Чтение файла паркет содержит информацию о схеме, поскольку информация о столбцах сохраняется в файле паркета.
1.проходитьspark.createDatasetсоздаватьDataset
val fileRdd = sc.textFile("hdfs://node01:8020/person.txt") //RDD[String]
val ds1 = spark.createDataset(fileRdd) //DataSet[String]
ds1.show
2. Сгенерируйте набор данных с помощью метода RDD.toDS.
case class Person(name:String, age:Int)
val data = List(Person("zhangsan",20),Person("lisi",30)) //List[Person]
val dataRDD = sc.makeRDD(data)
val ds2 = dataRDD.toDS //Dataset[Person]
ds2.show
3. Проходить преобразование DataFrame.as[generic] для создания набора данных.
case class Person(name:String, age:Long)
val jsonDF= spark.read.json("file:///export/servers/spark/examples/src/main/resources/people.json")
val jsonDS = jsonDF.as[Person] //DataSet[Person]
jsonDS.show
4.DataSet также можно зарегистрировать как таблицу для запроса.
jsonDS.createOrReplaceTempView("t_person")
spark.sql("select * from t_person").show
2.4.1 Подготовка
Прочтите файл и преобразуйте его в DataFrame или DataSet.
val lineRDD= sc.textFile("hdfs://node01:8020/person.txt").map(_.split(" "))
case class Person(id:Int, name:String, age:Int)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
val personDF = personRDD.toDF
personDF.show
//val personDS = personRDD.toDS
//personDS.show
2.4.2 Стиль DSL
SparkSQL предоставляет предметно-ориентированный язык (DSL) для облегчения манипулирования структурированными данными.
1.ПроверятьnameПолеизданные
personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show
2.ПроверятьnameиageПоледанные
personDF.select("name", "age").show
3. Запросите все имена и возраст и добавьте age+1.
personDF.select(personDF.col("name"), personDF.col("age") + 1).show
personDF.select(personDF("name"), personDF("age") + 1).show
personDF.select(col("name"), col("age") + 1).show
personDF.select("name","age").show
//personDF.select("name", "age"+1).show
personDF.select($"name",$"age",$"age"+1).show
4. Возраст фильтра больше или равен 25из, для фильтрации используйте метод filter.
personDF.filter(col("age") >= 25).show
personDF.filter($"age" >25).show
5. Подсчитайте количество людей старше 30 лет.
personDF.filter(col("age")>30).count()
personDF.filter($"age" >30).count()
6. Группируем по возрасту и подсчитываем количество людей одного возраста.
personDF.groupBy("age").count().show
2.4.3 Стиль SQL
Одной из мощных особенностей DataFrame является то, что мы можем думать о нем как о реляционной таблице.,Затем вы можете использовать spark.sql() в программе для выполнения SQL-запросов.,Результат будет как DataFrameвозвращаться
еслидуматьиспользоватьстиль Согласно синтаксису SQL, DataFrame необходимо зарегистрировать как таблицу, используя следующий метод:
personDF.createOrReplaceTempView("t_person")
spark.sql("select * from t_person").show
1. Отобразить информацию описания таблицы.
spark.sql("desc t_person").show
2. Опросите двух самых пожилых людей.
spark.sql("select * from t_person order by age desc limit 2").show
3. Запросить информацию о людях старше 30 лет.
spark.sql("select * from t_person where age > 30 ").показывать
4. Используйте стиль Завершение SQL DSL изнуждаться
spark.sql("select name, age + 1 from t_person").show
spark.sql("select name, age from t_person where age > 25").show
spark.sql("select count(age) from t_person where age > 30").show
spark.sql("select age, count(age) from t_person group by age").show
1.DataFrame и DataSet могут обрабатываться создателем RDD.
2. Обычный текст также можно прочитать путем прохождения создателя–Уведомления: прямое чтение без полного изложения требует прохождения RDD+Schema.
3. Проведение/паркет будет полностью ограничен
4. И даDataFrame, и даDataSet можно зарегистрировать как таблицы, а затем использовать SQL для запросов! DSL также доступен!
Spark будет в соответствии с информацией о файле и попытается вывести DataFrame/DataSetизSchema.,Конечно, мы также можем вручную указать,Существует несколько способов вручную указать из:
Тип 1: Укажите имя столбца для добавления схемы
Нет.2добрый:проходитьStructTypeобозначениеSchema
Тип 3: написать образец класса, используя механизм отражения для вывода схемы.
3.1.1 Добавьте схему, указав имя столбца
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDFDS {
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))
//3.Конвертируем RDD в DF
//Уведомление: изначально в RDD нет метода toDF. Чтобы добавить к нему метод в новой версии, вы можете использовать неявное Конвертировать.
import spark.implicits._
val personDF: DataFrame = rowRDD.toDF("id","name","age")
personDF.show(10)
personDF.printSchema()
sc.stop()
spark.stop()
}
}
3.1.2 StructType определяет понимание схемы
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object CreateDFDS2 {
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))
//3.Конвертируем RDD в DF
//Уведомление: изначально в RDD нет метода toDF. Чтобы добавить к нему метод в новой версии, вы можете использовать неявное Конвертировать.
//import spark.implicits._
val schema: StructType = StructType(Seq(
StructField("id", IntegerType, true), //разрешить быть пустым
StructField("name", StringType, true),
StructField("age", IntegerType, true))
)
val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
personDF.show(10)
personDF.printSchema()
sc.stop()
spark.stop()
}
}
3.1.3 Схема рефлексивного вывода – мастерство
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDFDS3 {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.Конвертируем RDD в DF
//Уведомление: изначально в RDD нет метода toDF. Чтобы добавить к нему метод в новой версии, вы можете использовать неявное Конвертировать.
import spark.implicits._
//Уведомление: приведенный выше изrowRDDizgeneric даPerson содержит информацию о схеме.
//Со СпаркSQLМожеткпроходить Отражение автоматически полученоприезжатьидобавить в ДаватьDF
val personDF: DataFrame = rowRDD.toDF
personDF.show(10)
personDF.printSchema()
sc.stop()
spark.stop()
}
}
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object QueryDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.Конвертируем RDD в DF
//Уведомление: изначально в RDD нет метода toDF. Чтобы добавить к нему метод в новой версии, вы можете использовать неявное Конвертировать.
import spark.implicits._
//Уведомление: приведенный выше изrowRDDizgeneric даPerson содержит информацию о схеме.
//Со СпаркSQLМожеткпроходить Отражение автоматически полученоприезжатьидобавить в ДаватьDF
val personDF: DataFrame = rowRDD.toDF
personDF.show(10)
personDF.printSchema()
//======================SQL-запрос===================== =
//0.Таблица регистрации
personDF.createOrReplaceTempView("t_person")
//1. Запросить все данные.
spark.sql("select * from t_person").show()
//2. Возраст запроса+1.
spark.sql("select age,age+1 from t_person").show()
//3. Запросить максимальный возраст двух человек.
spark.sql("select name,age from t_person order by age desc limit 2").show()
//4. Запросить количество людей каждого возраста из
spark.sql("select age,count(*) from t_person group by age").show()
//5. Возраст запроса больше 30из.
spark.sql("select * from t_person where age > 30").show()
//=======================Запрос режима DSL==================== = =
//1. Запросить все данные.
personDF.select("name","age")
//2. Возраст запроса+1.
personDF.select($"name",$"age" + 1)
//3. Запросить максимальный возраст двух человек.
personDF.sort($"age".desc).show(2)
//4. Запросить количество людей каждого возраста из
personDF.groupBy("age").count().show()
//5. Возраст запроса больше 30из.
personDF.filter($"age" > 30).show()
sc.stop()
spark.stop()
}
}
Существует множество способов взаимодействия с RDD, DF и DS (6 типов).,ноданасдействительныйдействовать Сразу Толькоиметь2добрый:
1) Работа с использованием RDDоператор
2) Используйте DSL/SQLверно табличные операции.
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object TransformDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.Конвертируем RDD в DF
//Уведомление: изначально в RDD нет метода toDF. Чтобы добавить к нему метод в новой версии, вы можете использовать неявное Конвертировать.
import spark.implicits._
//Уведомление: приведенный выше изrowRDDizgeneric даPerson содержит информацию о схеме.
//Со СпаркSQLМожеткпроходить Отражение автоматически полученоприезжатьидобавить в ДаватьDF
//=========================взаимный Конвертировать====================== //1.RDD-->DF
val personDF: DataFrame = personRDD.toDF
//2.DF-->RDD
val rdd: RDD[Row] = personDF.rdd
//3.RDD-->DS
val DS: Dataset[Person] = personRDD.toDS()
//4.DS-->RDD
val rdd2: RDD[Person] = DS.rdd
//5.DF-->DS
val DS2: Dataset[Person] = personDF.as[Person]
//6.DS-->DF
val DF: DataFrame = DS2.toDF()
sc.stop()
spark.stop()
}
}
3.4.1 Стиль SQL
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object WordCount {
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
//fileDF.show()
//fileDS.show()
//3.верно Каждая строка разделена по пробелам и сплющена
//fileDF.flatMap(_.split(" ")) //Уведомление: Ошибка, так как у DF нет дженериков, я не знаю _даString
import spark.implicits._
val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//Уведомление: Правильно, потому что DS имеет дженерики и знает _даString
//wordDS.show()
/*
+-----+
|value|
+-----+
|hello|
| me|
|hello|
| you|
...
*/
//4.верно выше изданные выполнить WordCount
wordDS.createOrReplaceTempView("t_word")
val sql =
"""
|select value ,count(value) as count
|from t_word
|group by value
|order by count desc
""".stripMargin
spark.sql(sql).show()
sc.stop()
spark.stop()
}
}
3.4.2 Стиль DSL
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object WordCount2 {
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
//fileDF.show()
//fileDS.show()
//3.верно Каждая строка разделена по пробелам и сплющена
//fileDF.flatMap(_.split(" ")) //Уведомление: Ошибка, так как у DF нет дженериков, я не знаю _даString
import spark.implicits._
val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//Уведомление: Правильно, потому что DS имеет дженерики и знает _даString
//wordDS.show()
/*
+-----+
|value|
+-----+
|hello|
| me|
|hello|
| you|
...
*/
//4.верно выше изданные выполнить WordCount
wordDS.groupBy("value").count().orderBy($"count".desc).show()
sc.stop()
spark.stop()
}
}
Spark SQL может найти данные взаимодействия, такие как обычный текст, json, паркет, csv, MySQL и т. д.
1.Напишите другой источник данных
2. Прочтите другой источник данных
package cn.itcast.sql
import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object WriterDataSourceDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
//3.Конвертируем RDD в DF
//Уведомление: изначально в RDD нет метода toDF. Чтобы добавить к нему метод в новой версии, вы можете использовать неявное Конвертировать.
import spark.implicits._
//Уведомление: приведенный выше изrowRDDizgeneric даPerson содержит информацию о схеме.
//Со СпаркSQLМожеткпроходить Отражение автоматически полученоприезжатьидобавить в ДаватьDF
val personDF: DataFrame = rowRDD.toDF
//==================ВоляDFписатьприезжатьдругойисточник данных===================
//Text data source supports only a single column, and you have 3 columns.;
//personDF.write.text("D:\\data\\output\\text")
personDF.write.json("D:\\data\\output\\json")
personDF.write.csv("D:\\data\\output\\csv")
personDF.write.parquet("D:\\data\\output\\parquet")
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
println("Запись прошла успешно")
sc.stop()
spark.stop()
}
}
package cn.itcast.sql
import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object ReadDataSourceDemo {
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
spark.read.json("D:\\data\\output\\json").show()
spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
spark.read.parquet("D:\\data\\output\\parquet").show()
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
sc.stop()
spark.stop()
}
}
1.SparkSQLзаписать данные:
DataFrame/DataSet.write.json/csv/jdbc
2.SparkSQLЧтение данных:
SparkSession.read.json/csv/text/jdbc/format
Похоже на:hiveсерединаизпользовательскаяфункция, Spark также может использовать пользовательскую функцию для реализации новых функций.
В Spark есть следующие 3 категории пользовательских функций.
1.UDF(User-Defined-Function)
Введите одну строку, выведите одну строку
2.UDAF(User-Defined Aggregation Funcation)
Введите несколько строк и выведите одну строку
3.UDTF(User-Defined Table-Generating Functions)
Введите одну строку, выведите несколько строк
нуждаться
Есть формат udf.txtданные:
Привет
абв
изучать
маленький
проходить Пользовательская пользовательская функцияфункция Напишите каждую строку с заглавной буквы
select value,smallToBig(value) from t_word
демо-код
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, SparkSession}
object UDFDemo {
def main(args: Array[String]): Unit = {
//1.создаватьSparkSession
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\udf.txt")
fileDS.show()
/*
+----------+
| value|
+----------+
|helloworld|
| abc|
| study|
| smallWORD|
+----------+
*/
/*
Напишите каждую строку с заглавной буквы
select value,smallToBig(value) from t_word
*/
//Зарегистрируем имя функции как smallToBig, функция да передаст строку, изString получится заглавная буква
spark.udf.register("smallToBig",(str:String) => str.toUpperCase())
fileDS.createOrReplaceTempView("t_word")
//Используем наше собственное определение функции
spark.sql("select value,smallToBig(value) from t_word").show()
/*
+----------+---------------------+
| value|UDF:smallToBig(value)|
+----------+---------------------+
|helloworld| HELLOWORLD|
| abc| ABC|
| study| STUDY|
| smallWORD| SMALLWORD|
+----------+---------------------+
*/
sc.stop()
spark.stop()
}
}
нуждаться
Содержимое udaf.json имеет следующий вид:
{"name":"Michael","salary":3000}
{"name":"Andy","salary":4500}
{"name":"Justin","salary":3500}
{"name":"Berta","salary":4000}
Найдите среднюю зарплату
Наследовать инструкции переопределения метода UserDefinedAggregateFunction
inputSchema: тип входных данных
bufferSchema: выдает промежуточный результат типа изданные
dataType: окончательный результат из типа результата
детерминированный: обеспечить согласованность, обычно используйте true
инициализировать: указать начальное значение
обновление: обновлять промежуточный результат каждый раз, когда происходит операция с параметром данных (обновление эквивалентно операции в каждом Разделе)
merge: глобальная агрегация (агрегировать каждый результат Разделиза)
оценить: подсчитать окончательный результат
демо-код
package cn.itcast.sql
import org.apache.spark.SparkContext
importorg.apache.spark.sql.expressions{MutableAggregationBuffer,UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object UDAFDemo {
def main(args: Array[String]): Unit = {
//1. Получить искровую сессию.
val spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.Читаем файл
val employeeDF: DataFrame = spark.read.json("D:\\data\\udaf.json")
//3.создавать временную таблицу
employeeDF.createOrReplaceTempView("t_employee")
//4.Регистрируем функцию UDAF
spark.udf.register("myavg",new MyUDAF)
//5. Использовать пользовательскую функцию UDAF.
spark.sql("select myavg(salary) from t_employee").show()
//6. Используем встроенную функцию экспорта.
spark.sql("select avg(salary) from t_employee").show()
}
}
class MyUDAF extends UserDefinedAggregateFunction{
//Ввод изданных типов изсхемы
override def inputSchema: StructType = {
StructType(StructField("input",LongType)::Nil)
}
//буферданныедобрыйформаschema,Да Конвертировать Изназадизданныеизschema
override def bufferSchema: StructType = {
StructType(StructField("sum",LongType)::StructField("total",LongType)::Nil)
}
//возвращатьсяценитьизданныедобрыйформа override def dataType: DataType = {
DoubleType
}
//Определяем, является ли да одинаковым на входе и будет ли оно иметь одинаковый выходной сигнал
override def deterministic: Boolean = {
true
}
//Инициализируем внутреннюю структуру данных
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//Обновляем внутреннюю структуру данных и выполняем вычисления внутри области
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
//Добавляем все суммы
buffer(0) = buffer.getLong(0) + input.getLong(0)
//Сколько всего данных?
buffer(1) = buffer.getLong(1) + 1
}
//Слияние из разных Разделизданных, глобальное слияние
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) =buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
//Рассчитываем значение выходных данных
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
https://www.cnblogs.com/qiuting/p/7880500.html
представлять
открытое окнофункцииизпредставлятьдадля Понятно Он показывает, как собираютсянаборвпередизданные,Также показано сборнаборназадизданные。Прямо сейчассуществоватькаждый ХОРОШОизнаконецодин Списокдобавить вполимеризацияфункцияизрезультат。
Окно используется для определения окна для строки (здесь изWindowда относится к набору строк, над которыми будет работать операция), которое работает с набором значений без использования GROUP BY Предложение верные используется для группировки, позволяющей получить как базовые строки, так и з столбцов, и агрегировать столбцы в одной строке.
Функции агрегирования и оконные функции
Функция агрегирования превращает несколько строк в одну, счетчик, среднее значение…
Функция открытия окна меняет одну строку на несколько строк;
полимеризацияфункцияеслихотеть显示другойиз Списокдолжен Воля Списокприсоединитьсяприезжатьgroup в
Вы можете открыть окно без использования группы by, напрямую отображать всю информацию
Классификация оконных функций
1.Агрегатная оконная функция
Функция агрегирования(столбец) OVER (опция), здесь опцией может быть даPARTITION BY пункт,но Нет Можеткда ORDER BY пункт.
2.функция окна сортировки
Сортировать по функции (столбец) OVER (опция), здесь опция может быть даORDER BY Пункт, также может быть да OVER(PARTITION BY пункт ORDER BY пункт),но Нет Можеткда PARTITION BY пункт.
/export/servers/spark/bin/spark-shell --master
spark://node01:7077,node02:7077
case class Score(name: String, clazz: Int, score: Int)
val scoreDF = spark.sparkContext.makeRDD(Array(
Score("a1", 1, 80),
Score("a2", 1, 78),
Score("a3", 1, 95),
Score("a4", 2, 74),
Score("a5", 2, 92),
Score("a6", 3, 99),
Score("a7", 3, 99),
Score("a8", 3, 45),
Score("a9", 3, 55),
Score("a10", 3, 78),
Score("a11", 3, 100))
).toDF("name", "class", "score")
scoreDF.createOrReplaceTempView("scores")
scoreDF.show()
+----+-----+-----+
|name|class|score|
+----+-----+-----+
| a1| 1| 80|
| a2| 1| 78|
| a3| 1| 95|
| a4| 2| 74|
| a5| 2| 92|
| a6| 3| 99|
| a7| 3| 99|
| a8| 3| 45|
| a9| 3| 55|
| a10| 3| 78|
| a11| 3| 100|
+----+-----+-----+
Пример 1
OVER Ключевое слово означает трактовку агрегатной функции как Агрегатной. оконная Функция без агрегирования функции. Стандарт SQL позволяет использовать все функции агрегатов в качестве Агрегатной функции. оконная функция。
spark.sql("select count(name) from scores").show
spark.sql("select name, class, score, count(name) over() name_count from scores").show
Результаты запроса следующие:
+----+-----+-----+----------+
|name|class|score|name_count|
+----+-----+-----+----------+
| a1| 1| 80| 11|
| a2| 1| 78| 11|
| a3| 1| 95| 11|
| a4| 2| 74| 11|
| a5| 2| 92| 11|
| a6| 3| 99| 11|
| a7| 3| 99| 11|
| a8| 3| 45| 11|
| a9| 3| 55| 11|
| a10| 3| 78| 11|
| a11| 3| 100| 11|
+----+-----+-----+----------+
Пример 2
OVER Параметры также можно добавить в скобках после ключевого слова, чтобы изменить диапазон окна для операций агрегирования.
если OVER Если опция из в скобках после ключевого слова пуста, то оконная функция будет выполнять операции агрегирования для всех строк в наборе результатов.
открытое окнофункциииз OVER из можно использовать в скобках после ключевого слова PARTITION BY пунктопределить строкииз Раздел Приходитьдляруководитьполимеризациявычислить。и GROUP BY пунктдругой,PARTITION BY пунктсоздаватьиз Разделданезависимый Врезультатнабориз,Создатьиз Раздел доступен только для совокупных расчетов из,иидругойизоткрытое окнофункция Местосоздаватьиз Разделтакже Нет互相影响。
Ниже из SQL Оператор используется для отображения количества людей в каждой группе после группировки по классам:
OVER(PARTITION BY class) означает, что набор результатов основан на class Выполните Раздел и вычислите совокупный результат расчета из группы, к которой принадлежит текущая строка.
spark.sql(“select name, class, score, count(name) over(partition by class) name_count from scores”).show
Результаты запроса следующие:
+----+-----+-----+----------+
|name|class|score|name_count|
+----+-----+-----+----------+
| a1| 1| 80| 3|
| a2| 1| 78| 3|
| a3| 1| 95| 3|
| a6| 3| 99| 6|
| a7| 3| 99| 6|
| a8| 3| 45| 6|
| a9| 3| 55| 6|
| a10| 3| 78| 6|
| a11| 3| 100| 6|
| a4| 2| 74| 2|
| a5| 2| 92| 2|
+----+-----+-----+----------+
6.4.1 Сортировка заказов ROW_NUMBERrow_number() over(order by score) as rownum значит по баллам Отсортируйте по возрастанию и получите результат сортировки по порядковому номеру.
Уведомление:существоватьфункция окна сортировкисерединаиспользовать PARTITION BY Пункт необходимо расположить в ПОРЯДКЕ. BY перед пунктом.
Пример 1
spark.sql("select name, class, score, row_number() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 5|
| a1| 1| 80| 6|
| a5| 2| 92| 7|
| a3| 1| 95| 8|
| a6| 3| 99| 9|
| a7| 3| 99| 10|
| a11| 3| 100| 11|
+----+-----+-----+----+
spark.sql("select name, class, score, row_number() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 5|
| a11| 3| 100| 6|
| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+----+
6.4.2 Сортировка по РАНГУ
rank() over(order by score) as ранг означает прессу Отсортируйте по баллам в порядке возрастания и получите результат сортировки по номеру рейтинга.
этотиндивидуальныйфункция Попроси об этомиз Рядимярезультат Можетксопоставлять(сопоставлять Первый/сопоставлять Нет.два),После ничьего рейтинга рейтинг будет равен да-ничье плюс количество ничьих.
Проще говоря, у каждого есть только один рейтинг.,Ранназад Появляются двоеиндивидуальныйсопоставлять Первыйимяиз Состояние,этотчасждать Рядсуществоватьдваиндивидуальный Первыйимяназадлапшаизлюди Воляда Нет.триимя,Это значит, что для да нет второго места.,нодаиметьдваиндивидуальный Первыйимя
Пример 2
spark.sql("select name, class, score, rank() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a10| 3| 78| 4|
| a2| 1| 78| 4|
| a1| 1| 80| 6|
| a5| 2| 92| 7|
| a3| 1| 95| 8|
| a6| 3| 99| 9|
| a7| 3| 99| 9|
| a11| 3| 100| 11|
+----+-----+-----+----+
spark.sql("select name, class, score, rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 4|
| a11| 3| 100| 6|
| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+----+
6.4.3 Непрерывная сортировка DENSE_RANK
dense_rank() over(order by score) as dense_rank значит по баллам Отсортируйте по возрастанию и получите рейтинг.
После того, как эта функция связала рейтинг, изранг дает равный рейтинг плюс 1.
Проще говорякаждыйлюди Толькоиметьодиндобрый Рядимя,Ранназад Появляются двоеиндивидуальныйсопоставлять Первыйимяиз Состояние,этотчасждать Рядсуществоватьдваиндивидуальный Первыйимяназадлапшаизлюди Воляда Нет.дваимя,также Дадваиндивидуальный Первыйимя,второе место
Пример 3
spark.sql("select name, class, score, dense_rank() over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a4| 2| 74| 3|
| a2| 1| 78| 4|
| a10| 3| 78| 4|
| a1| 1| 80| 5|
| a5| 2| 92| 6|
| a3| 1| 95| 7|
| a6| 3| 99| 8|
| a7| 3| 99| 8|
| a11| 3| 100| 9|
+----+-----+-----+----+
spark.sql("select name, class, score, dense_rank() over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 4|
| a11| 3| 100| 5|
| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+----+
6.4.4 Рейтинг группы NTILE [понимать]
ntile(6) over(order by score)as ntile означает нажатие score Отсортируйте по возрастанию, затем 6 разделить на равные части 6 группу, и отобразится номер группы.
Пример 4
spark.sql("select name, class, score, ntile(6) over(order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a8| 3| 45| 1|
| a9| 3| 55| 1|
| a4| 2| 74| 2|
| a2| 1| 78| 2|
| a10| 3| 78| 3|
| a1| 1| 80| 3|
| a5| 2| 92| 4|
| a3| 1| 95| 4|
| a6| 3| 99| 5|
| a7| 3| 99| 5|
| a11| 3| 100| 6|
+----+-----+-----+----+
spark.sql("select name, class, score, ntile(6) over(partition by class order by score) rank from scores").show()
+----+-----+-----+----+
|name|class|score|rank|
+----+-----+-----+----+
| a2| 1| 78| 1|
| a1| 1| 80| 2|
| a3| 1| 95| 3|
| a8| 3| 45| 1|
| a9| 3| 55| 2|
| a10| 3| 78| 3|
| a6| 3| 99| 4|
| a7| 3| 99| 5|
| a11| 3| 100| 6|
| a4| 2| 74| 1|
| a5| 2| 92| 2|
+----+-----+-----+----+
Официальный сайт
http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.
Процесс и принцип запроса Hive
осуществлятьHQLчас,ПервыйприезжатьMySQLЮаньданные Библиотекасередина Найти описаниеинформация,Ранназадразвязать析HQLив соответствии с Описание задачи генерации информации MR
Hive медленно конвертирует SQL в MapReduceосуществлять
Использование SparkSQL интегрирует Hive на самом деле просто просит SparkSQL загрузить Hive изданные библиотеки, а затем применить Spark SQLосуществлять движок для работы с изданными таблицами Hive.
Итак, сначала вам нужно открыть службу библиотеки метаданных Hive, чтобы SparkSQL мог загрузить метаданные.
1: Исправлять hive/conf/hive-site.xml Добавлена следующая конфигурация
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node01:9083</value>
</property>
</configuration>
2: назадбашнязапускать Hive Сервис Метастора
nohup /export/servers/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
Spark Есть встроенный MateStore, используйте Derby Встроенныйданные Библиотекадержатьданные,нодаэтотдобрый Способ Нет适合рожденный产环境,потому чтодляэтотдобрыймодель同одинчасмеждуможет толькоиметьодин SparkSession Использование, поэтому более рекомендуется использовать в производственных средах. Hive из MetaStore
SparkSQL Интегрировать Hive из MetaStore Основная идея состоит в том, чтобы иметь возможность получить к нему доступ, и иметь возможность использовать HDFS держать WareHouse, поэтому вы можете скопировать его напрямую Hadoop и Hive из Конфигурациядокументприезжать Spark из каталога конфигураций
hive-site.xml Юань данные склада о местоположении и другая информация
core-site.xml Изконфигурация, связанная с безопасностью
hdfs-site.xml HDFS Похожие из Конфигурация
Используйте локальный тест IDEA, чтобы напрямую разместить указанные выше файлы конфигурации в каталоге ресурсов.
package cn.itcast.sql
import org.apache.spark.sql.SparkSession
object HiveSupport {
def main(args: Array[String]): Unit = {
//создаватьsparkSession
val spark = SparkSession
.builder()
.appName("HiveSupport")
.master("local[*]")
//.config("spark.sql.warehouse.dir", "hdfs://node01:8020/user/hive/warehouse")
//.config("hive.metastore.uris", "thrift://node01:9083")
.enableHiveSupport()//Включить поддержку синтаксиса куста
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//Какие часы в Проверять
spark.sql("show tables").show()
//создаватьповерхность spark.sql("CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' '")
//Загружаем данные, данные в текущий каталог проекта SparkDemo изperson.txt (уровень isrc)
spark.sql("LOAD DATA LOCAL INPATH 'SparkDemo/person.txt' INTO TABLE person")
//Запросданные spark.sql("select * from person ").show()
spark.stop()
}
}
Кластерный мониторинг
Общая платформа больших кластеров, Все они необходимы для мониторинга.
Чтобы настроить таргетинг на различные библиотеки данных, включать MySQL, HBase Ждите мониторинга
Чтобы отслеживать верные приложения, Например Tomcat, Nginx, Node.js ждать
Необходимо следить за некоторыми показателями оборудования, Например CPU, Память, диск ждать
Есть много, много больше
Официальный сайт
http://spark.apache.org/streaming/
Обзор
Spark Стримингдаа на базе Spark из среды вычислений в реальном времени поверх ядра,
Доступно во многих источниках данные потребляют данные, а верданные обрабатываются в реальном времени,
Инструментиметьвысокий吞吐количествои Высокая отказоустойчивостьждатьособенныйточка。
Spark Streamingиз Возможностей
1. Простота в использовании
Вы можете перейти к программе потоковой передачи, например, к написанию автономной пакетной обработки, поддерживающей язык Java/Scala/Python.
2. Отказоустойчивость
SparkStreaming может восстановить потерянную работу без дополнительного кода.
3.легкий ИнтегрироватьприезжатьSparkсистема
Сочетание потоковой обработки, пакетной обработки и интерактивного запроса.
Spark В потоковой передаче будет компонент-получатель Receiver, который запускается на Executor как долговременная задача. Ресивер получает внешний изданный поток для формирования входных данных. DStream
DStreamвстречаодеялов соответствии счасмеждумежду隔разделятьстановитьсяодинпартияодинпартияизRDD,Когда интервал между партиями сокращаетсяприезжать Второй уровеньчас,Его можно использовать для обработки потоков данных в реальном времени. Размер временного интервала можно указать с помощью параметров.,один般设существовать500миллисекундаприезжатьнесколько секундмежду。
верноDStream работает на даверноRDD, и результаты обработки вычислений могут передаваться во внешние системы.
Spark Streamingиз Работапоток Ченг Сян Ниже изкартина Место示один样,перениматьприезжать Реальностьчасданныеназад,Давать пакеты данных,Ранназадпроходить ДаватьSpark Обработка двигателя наконец генерирует пакетные результаты.
Spark Потоковая передачаиз Базовая абстракциядаDStream(Дискретизированный Stream, поток дискретизированных данных, непрерывный изданный поток), представляет собой постоянство изданного потока и результирующего потока данных после различных операций Sparkoperator.
Вы можете получить более глубокое понимание DStream со следующих точек зрения:
1.DStream — это, по сути, серия непрерывных во времени RDD.
2. верноDStreamизданныеиз действует в соответствии с РДД как единое целое.
3. Отказоустойчивость
Между базовым RDD есть Зависимость, а DStream напрямую имеет Зависимость. RDD отказоустойчив, поэтому DStream тоже отказоустойчив.
Как показано на рисунке: Каждый эллипс представляет собой СДР.
Каждый кружок в эллипсе представляет собой RDD и раздел раздела.
Несколько RDD в каждом столбце представляют собой DStream (на рисунке три столбца, поэтому есть три DStream).
Последний СДР в каждой строке представляет каждую партию. Промежуточный результат RDD, сгенерированный Size
4. Квази-реальное время/почти реальное время
Spark Потоковая передача разбивает потоковые вычисления на несколько Sparks. Задание, верно, будет обрабатываться через Spark в каждый период времени. Декомпозиция графа DAG и набор задач Spark в процессе планирования.
верно в текущей версии изSpark Для потоковой передачи минимальный размер — пакетный. Размер: выберите значение от 0,5 до 5 секунд.
Со Спарк Потоковая передача может соответствовать сценариям потоковой обработки вычислений в квазиреальном времени, которые требуют очень высокой производительности в реальном времени. Например, она не подходит для сценариев высокочастотной торговли в реальном времени.
Подвести итог
Проще говоря, DStream инкапсулируется даверноRDDиз.,Вы уверены, что DStream работает,ДаверноRDDруководитьдействовать
верно, по сути, можно понимать как RDD для DataFrame/DataSet/DStream
Операции над DStream аналогичны RDD и делятся на следующие два типа:
Transformations(Конвертировать)
Output Операции(выход)/Действие
Общее преобразование — преобразование без сохранения состояния: каждый пакет обработки не зависит от предыдущего пакета обработки.
Transformation | Meaning |
---|---|
map(func) | верноDStreamсерединаизкаждыйэлементыруководитьfuncфункциядействовать,Ранназадвернуть новыйDStream |
flatMap(func) | Похоже на: imapметод,Просто каждый входной элемент может быть выведен как ноль или более выходных элементов. |
filter(func) | Отфильтровать все Значение функции funcвозвращаться равно trueиз элементов DStream и нового возврата DStream. |
union(otherStream) | Объедините входные параметры исходного DStream с другими элементами DStream и верните новый DStream. |
reduceByKey(func, [numTasks]) | Использовать функциифункции источникDStream выполняет операцию агрегации для изключа, а затем становится новым из(K, V)верно образует изDStream |
join(otherStream, [numTasks]) | Входные данные (K,V)、(K,W)ТипизDStream,вернуть new(K,(V,W)ТипизDStream |
transform(func) | Функция RDD-to-RDD действует на каждый RDD в DStream.,Может работать на любом RDD,отивернуть новыйRDD |
особенныйизTransformations—иметьсостояние Конвертировать:когдавпередпартия Второсортныйизиметь дело с需хотетьиспользовать Извпередпартия Второсортныйизданныеилисерединамеждурезультат。
иметьсостояние Конвертироватьвключатьбаза В追踪состояние Изменятьизменятьиз Конвертировать(updateStateByKey)ираздвижное окноиз Конвертировать
1.UpdateStateByKey(func)
2.Window Operations оконные операции
Output OperationsМожетк ВоляDStreamизданныевыходприезжатьвнешнийизданные Библиотекаилифайловая система
Когда выход При вызове Шефа зажигайте Потоковая программа запустит реальный процесс расчета (аналогично RDD изAction)
Output Operation | Meaning |
---|---|
print() | Печать в консоль |
saveAsTextFiles(prefix, [suffix]) | держатьпотокиз Внутри Позволятьдлятекстдокумент,Имя файла — «prefix-TIME_IN_MS[.suffix]». |
saveAsObjectFiles(prefix,[suffix]) | держатьпотокиз Внутри ПозволятьдляSequenceFile,Имя файла — «prefix-TIME_IN_MS[.suffix]». |
saveAsHadoopFiles(prefix,[suffix]) | держатьпотокиз Внутри Позволятьдляhadoopдокумент,Имя файла — «prefix-TIME_IN_MS[.suffix]». |
foreachRDD(func) | верноDstream внутри каждого RDDосуществлятьфункцию |
Иллюстрация
Сначала установите инструмент nc на Linux-сервер.
Аббревиатура ncданetcatiz изначально использовалась для настройки маршрутизаторов.,Мы можем использовать его для отправки запроса на определенный порт Отправить данные
ням установить -y nc
Запустите сервер, откройте порт 9999 и дождитесь отправки данных на этот порт.
nc -lk 9999
Отправить данные
package cn.itcast.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]): Unit = {
//1.создаватьStreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5 означает, что верные данные будут разделены за 5 секунд для формирования RDD
//2. Сокет монитора для получения данных.
//ReceiverInputDStreamДаперениматьприезжатьиз Местоиметьизданные ГруппастановитьсяизRDD,Инкапсулировано в DStream,Далее, верноDStream работает на даверноRDD.
val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
//3.действоватьданные val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))
val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
wordAndCount.print()
ssc.start()//Начать
ssc.awaitTermination()//ждать Быть остановленным
}
}
1.Первыйосуществлятьnc -lk 9999
2.Ранназадосуществлятькод
3. Продолжайте вводить разные слова в 1.
hadoop spark sqoop hadoop spark hive hadoop
4. Наблюдайте за выводом консоли IDEA.
SparkStreaming вычисляет текущий интервал в 5 секунд каждые 5 секунд, а затем выводит каждый пакет.
В случае выше возникает такой вопрос:
каждыйпартия Второсортныйизслово Второсортныйчисло Всеодеялоправильныйиз Статистика,нодарезультат Нет能усталыйдобавлять!
если需хотетьусталыйдобавлять需хотетьиспользовать updateStateByKey(func) чтобы обновить статус.
package cn.itcast.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object WordCount2 {
def main(args: Array[String]): Unit = {
//1.создаватьStreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5 означает, что верные данные будут разделены за 5 секунд для формирования RDD
//requirement failed: ....Please set it by StreamingContext.checkpoint().
//Уведомление:нассуществовать Внизлапшаиспользоватьприезжать ПонятноupdateStateByKeyвернокогдавпередданныеиисторияданныеруководитьусталыйдобавлять
//Так где же находятся данные истории? Нам нужно создать для них каталог контрольных точек?
ssc.checkpoint("./wc")//развиватьсерединаHDFS //2. Сокет монитора для получения данных.
//ReceiverInputDStreamДаперениматьприезжатьиз Местоиметьизданные ГруппастановитьсяизRDD,Инкапсулировано в DStream,Далее, верноDStream работает на даверноRDD.
val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
//3.действоватьданные val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))
//val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
//===================Используйте updateStateByKeyверно текущие данные и исторические данные для накопления ===================
val wordAndCount: DStream[(String, Int)] =wordAndOneDStream.updateStateByKey(updateFunc)
wordAndCount.print()
ssc.start()//Начать
ssc.awaitTermination()//ждать корректной остановки
}
//currentValues: текущее значение пакета, например: 1,1,1 (В качестве примера возьмите Hadoop в тестовых)
//historyValue:Извпередусталыйсчитатьизисторияценить,Первый Второсортныйбезиметьценитьда0,Нет.два Второсортныйда3
//Цель да использует текущие данные + исторические данныевозвращаться в качестве нового результата (в следующий раз из исторических данных)
def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={
// currentValuesтекущее значение
// HistoryValueЗначение истории
val result: Int = currentValues.sum + historyValue.getOrElse(0)
Some(result)
}
}
1.Первыйосуществлятьnc -lk 9999
2. Затем изучите приведенный выше код
3. Продолжайте вводить разные слова в 1.,
hadoop spark sqoop hadoop spark hive hadoop
4. Наблюдайте за выводом консоли IDEA.
sparkStreaming вычисляет результаты в пределах текущих 5 секунд каждые 5 секунд, а затем накапливает результаты каждого пакета и выводит их.
Вычислить периодическую частоту в пределах короткого временного диапазона, большого периода времени и получить такой результат,Такое изануждаться,использоватьокнофункцияоченьбыстрый Сразу Можеткразвязать决Понятно。
Скользящее окно «Конвертировать» и процесс расчета показаны на рисунке ниже.
Мы можем заранее установить скользящее окно по длине (то есть продолжительность окна) и установить скользящее окно по временному интервалу (как долго вычислять каждое изучаемое),
Например, установите скользящее окно длины (то есть продолжительность окна) на 24 часа, а скользящее окно временного интервала (как долго рассчитывать каждое изучаемое) на 1 час.
Тогда смысл такой: вычислять последние 24 часа, изданные каждые 1 час.
package cn.itcast.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object WordCount3 {
def main(args: Array[String]): Unit = {
//1.создаватьStreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5 означает, что верные данные будут разделены за 5 секунд для формирования RDD
//2. Сокет монитора для получения данных.
//ReceiverInputDStreamДаперениматьприезжатьиз Местоиметьизданные ГруппастановитьсяизRDD,Инкапсулировано в DStream,Далее, верноDStream работает на даверноRDD.
val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
//3.действоватьданные val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))
val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
//4. Используем функцию окна для подсчета WordCount.
//reduceFunc: (V, V) => V,setфункция
//windowDuration: Продолжительность, длина/ширина окна
//slideDuration: Продолжительность, интервал скольжения окна
//Уведомление:windowDurationиslideDurationдолжендаbatchDurationизразчисло //windowDuration=slideDuration:данные не будут потеряны или пересчитаны == будут использоваться в развитии
//windowDuration>slideDuration:данныевстреча Тяжелый复вычислить==развиватьсерединавстречаиспользовать
//windowDuration<slideDuration:данныебудет потерян
//Представление кода:
//windowDuration=10
//slideDuration=5
//Затем результат вычисляется каждые 5 секунд за последние 10 секунд
//Например, развитие позволяет считать последние изданные часы, рассчитываемые каждую 1 минуту, тогда как установить параметр Должен?
wordAndCount.print()
ssc.start()//Начать
ssc.awaitTermination()//ждать корректной остановки
}
}
1.Первыйосуществлятьnc -lk 9999
2. Затем изучите приведенный выше код
3. Продолжайте вводить разные слова в 1.
hadoop spark sqoop hadoop spark hive hadoop
4. Наблюдайте за выводом консоли IDEA.
Феномен: sparkStreaming вычисляет текущий размер окна в 10 секунд каждые 5 секунд, а затем выводит результат.
Имитация рейтинга Baidu в горячем поиске
Статистика трех самых популярных поисковых слов за последние 10 секунд, рассчитывается каждые 5 секунд.
WindowDuration = 10s
SlideDuration = 5s
package cn.itcast.streaming
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/*
* Desc Мы собираемся смоделировать рейтинговый список горячего поиска Baidu, чтобы подсчитать 3 самых популярных поисковых слова за последние 10 секунд и вычислять его каждые 5 секунд.
*/
object WordCount4 {
def main(args: Array[String]): Unit = {
//1.создаватьStreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5 означает, что верные данные будут разделены за 5 секунд для формирования RDD
//2. Сокет монитора для получения данных.
//ReceiverInputDStreamДаперениматьприезжатьиз Местоиметьизданные ГруппастановитьсяизRDD,Инкапсулировано в DStream,Далее, верноDStream работает на даверноRDD.
val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)
//3.действоватьданные val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))
//4. Используем функцию окна для подсчета WordCount.
val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
val sorteDStream: DStream[(String, Int)] = wordAndCount.transform(rdd => {
val sortedRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false) //обратный/по убыванию порядка
println("===============top3==============")
sortedRDD.take(3).foreach(println)
println("===============top3==============")
sortedRDD
}
)
//No output operations registered, so nothing to execute
sorteDStream.print
ssc.start()//Начать
ssc.awaitTermination()//ждать корректной остановки
}
}
1.Первыйосуществлятьnc -lk 9999
2. Затем добавьте приведенный выше код в изучение.
3. Продолжайте вводить разные слова в 1.
hadoop spark sqoop hadoop spark hive hadoop
4. Наблюдайте за выводом консоли IDEA.
Иллюстрация основной концепции
Broker : Установите сервис Kafka и машина станет брокером
Producer : сообщение от продюсера,Ответственный Воляданныеписатьприезжатьbrokerсередина(push)
Потребитель: Потребитель сообщений, отвечающий за получение данных (вытягивание) из Kafka. Старая версия потребителя должна полагаться на zk, а новая версия — нет.
Topic: Тема эквивалентна даданнымиз Классификации, в разных темах хранятся разные бизнес-изданные. --Тема: Дифференциация бизнеса
Replication:копировать,данныедержатьмногонемного份(гарантироватьданныене потерян) --копировать:данные Безопасность
Раздел: Раздел, да — это физический раздел, Раздел — это файл, Тема может иметь 1~n Раздел, каждый Раздел имеет свою копию. --Раздел: Одновременное чтение и запись.
Consumer Group:группа потребителей,Тема может быть использована несколькими потребителями/группами одновременно.,Несколькопотребительеслисуществоватьодингруппа потребителейсередина,Так他们Нет能Тяжелый复Потреблениеданные --Группа потребителей: повысить скорость потребления потребителями и облегчить единое управление.
Уведомление:одинTopicМожеткодеяло Несколькопотребительили Групповая подписка,одинпотребитель/Группатакже Можеткподписка Несколькотема
Уведомление:Чтение данные можно прочитать только из Лидера, записать данные можно писать только Лидеру, а Фолловер будет синхронизировать данные от Лидера, чтобы сделать копию! ! !
Общие команды
#запускатьkafka
/export/servers/kafka/bin/kafka-server-start.sh -daemon
/export/servers/kafka/config/server.properties
#стопкафка
/export/servers/kafka/bin/kafka-server-stop.sh #Проверятьtopicинформация
/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01:2181
#создаватьtopic
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic test
#Проверять Информацию по определенной теме
/export/servers/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
#Удалить тему
/export/servers/kafka/bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic test
#Начать продюсер — консоль производителя, обычно используемая для тестирования
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
#запускconsumer — консоль изconsumer обычно используется для тестирования
/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic spark_kafka--from-beginning
Потребительское подключение приезжатьборкериз адреса
/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic spark_kafka --from-beginning
★Вопрос на собеседовании: Получатель & Direct
В разработке мы часто используем SparkStreaming для чтения Kafka в реальном времени, а затем его обработки.,После версии spark1.3,В kafkaUtils есть два типа создателяDStreamизметода:
1. Способ получения приемника:
KafkaUtils.createDstream(развиватьсередина Нетиспользовать,Просто пойми,нода На собеседовании могут спросить)
ReceiverделатьдляпостоянныйизTaskбегатьсуществоватьExecutorждатьобращатьсяданные,нодаодинReceiverНизкая эффективность,Нужно открыть несколько,Затем объедините данные (объединение) вручную.,Обработать еще раз,оченьбеда
Если машина-приемник зависнет, данные могут быть потеряны, поэтому необходимо включить WAL (предварительно записанный журнал), чтобы обеспечить безопасность данных, и эффективность будет снижена!
Получатель использует дапроходитьzookeeper для подключения к очереди Kafka и вызова API высокого уровня Kafka. Смещение хранится в Zookeeper и поддерживается Получателем.
Когда искра потребляет избыток, чтобы гарантировать, что данные не будут потеряны, она также сохраняет смещение в контрольной точке, что может привести к несогласованности данных.
Таким образом, независимо от любой точки зрения, модель приемника не подходит для использования в промышленных целях и была исключена.
2.Метод прямого подключения:
KafkaUtils.createDirectStream (используется в развитии, требует мастерства)
Прямой режим да напрямую подключается к разделу kafka для получения данных и напрямую считывает данные из каждого раздела, что значительно улучшает возможности параллельного выполнения.
Непосредственно вызывает низкоуровневый API Kafka (базовый API), а смещение сохраняется и поддерживается само по себе. По умолчанию Spark сохраняет его в контрольной точке, устраняя несоответствия.
Конечно, вы также можете поддерживать его вручную и сохранять смещение в MySQL и Redis.
Таким образом, его можно использовать в разработке на основе Directmodel, а с помощью функций Directmodeliz + ручного управления можно гарантировать точность данных. once Точный один раз
Подвести итог:
Метод приема получателя
1、 НесколькоReceiverприниматьданные Высокая эффективность,ноиметьпотерянныйданныеизриск。
2、 Включить журнал(WAL)Может предотвратитьданныепотерянный,нонаписать дваждыданные Низкая эффективность。
3、 Zookeeper поддерживает смещения, что может привести к повторному использованию.
4、 Используйте API высокого уровня
Метод прямого подключения
1、 НетиспользоватьReceiver,прямойприезжатьkafkaРазделсерединачитатьданные
2、 Механизм журналирования (WAL) не используется.
3、 Spark сохраняет собственное смещение
4、 Используйте низкоуровневый API
Расширение: О семантике сообщений.
Уведомление:
В разработке есть две версии интеграции SparkStreamingikafka: 0.8 и 0.10+.
0.8ВерсияиметьReceiverиDirectмодель(нода0.8Среда производства версийвопроссравниватьмного,существоватьSpark2.3Изназад Нетподдерживать0.8Версия Понятно)
После 0.10 сохранена только прямая модель (Reveiverмодель не подходит для производственных сред), а в версии 0.10 изменился API (более мощный)
в заключение:
Когда мы учимся иразвивать, мы напрямую используем версию 0.10 изdirectmodel.
нодазакрывать ВReceiverиDirectизразницалапша试изчасждатьхотеть能够答придетсяначальство Приходить
KafkaUtils.createDstream использует приемники для получения данных,Используйте высокоуровневый потребительский API Kafka.,компенсироватьколичество Поддерживается получателемсуществоватьzkсередина,верно ВМестоиметьизreceiversперениматьприезжатьизданные ВолявстречадержатьсуществоватьSpark executorsсередина,Затем выполните задание Spark Streamingзапускающее для обработки этих данных.,будет потеряно по умолчанию,Журналирование WAL может быть включено,он синхронизируется Воляприниматьприезжатьданныедержатьприезжатьраспределенный式файловая системаначальствонапримерHDFS。гарантироватьданныесуществовать Ошибкаиз Состояние Вниз Можетк恢复出Приходить。尽管этотдобрый Способ Сотрудничать сWAL机制Можеткгарантироватьданныенольпотерянныйиз Высокая надежность,ноданачинатьиспользовать ПонятноWALэффективность будет ниже,и нет никакой гарантии, что данные будут обработаны один и только один раз,Возможна обработка дважды. Потому что Spark и ZooKeeper могут быть не синхронизированы.
Чиновники больше не рекомендуют этот метод интеграции.
Подготовка
1.запускатьzookeeperкластер
zkServer.sh start
2.запускатьkafkaкластер
kafka-server-start.sh /export/servers/kafka/config/server.properties
3.создаватьtopic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 3 --topic spark_kafka
4. Команда выполнитьshell отправляет сообщение в тему
kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
hadoop spark sqoop hadoop spark hive hadoop
5. Добавьте зависимость kafkaizpom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
API
Получатель получает данные темы в Kafka и может параллельно запускать больше получателей для чтения Kafak. В теме изданные, здесь 3
val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
})
Если WAL включен (spark.streaming.receiver.writeAheadLog.enable=true), вы можете установить уровень хранения (по умолчанию StorageLevel.MEMORY_AND_DISK_SER_2).
демо-код
package cn.itcast.streaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable
object SparkKafka {
def main(args: Array[String]): Unit = {
//1.создаватьStreamingContext
val config: SparkConf =
new SparkConf().setAppName("SparkStream").setMaster("local[*]")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
//Включаем предварительную запись журнала WAL, чтобы гарантировать источник данных Конечная надежность
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("./kafka")
//==============================================
//2. Подготовьте параметры конфигурации.
val zkQuorum = "node01:2181,node02:2181,node03:2181"
val groupId = "spark"
val topics = Map("spark_kafka" -> 2) //2 означает, что каждая тема действительно должна использовать 2 потока для потребления,
//sscизrddРазделикафкаизтопик Раздел отличается Увеличение количества потребительских потоков не увеличивает количество спаркизпараллельной обработки данных.
//3. Получатель получает данные темы в Kafka и может параллельно запускать больше получателей для чтения Kafak. В теме изданные, здесь 3
val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
})
//4. Используем метод Union для объединения потока Dstream, сгенерированного всеми получателями.
val allDStream: DStream[(String, String)] = ssc.union(receiverDStream)
//5. Получить темыизданные(String, String) Первая строка представляет тему по названию, а вторая строка представляет тему и зданные.
val data: DStream[String] = allDStream.map(_._2)
//==============================================
//6.WordCount
val words: DStream[String] = data.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
Прямой режим будет периодически запрашивать последнее смещение из соответствующего раздела в теме Kafka.,Затем диапазон смещения обрабатывается в каждом пакете.,Spark выполняет вызовы Kafka Simple из потребительского API для чтения определенного диапазона изданных.
Недостатки Директиды невозможно использовать с инструментами мониторинга на базе Zookeeperizkafka.
Direct имеет несколько преимуществ по сравнению с методом на основе приемника.
1) Упростить параллелизм
Нет需хотетьсоздавать Несколькоkafkaвходитьпоток,Тогда объедините их,sparkStreamingВолявстречасоздаватьиkafkaРазделчислоодин样изномер рддиз Раздел,И данные будут читаться параллельно из Кафки.,sparkсерединаRDDиз Разделчислоиkafkaсерединаиз Разделданныедавсеверноотвечатьизсвязь。
2) Эффективный
Receiverвыполнитьданныеизнольпотерянныйда Воляданныепредварительно ПервыйдержатьсуществоватьWALсередина,встречакопироватьодин разданные,Приведёт к двойному копированию данных,Первый Второсортныйдаодеялоkafkaкопировать,в другой разда ПисатьприезжатьWALсередина。иDirectНетиспользоватьWALустранять Понятноэтотиндивидуальныйвопрос。
3) Семантика «Ровно один раз»\
Receiverчитатьkafkaданныедапроходитьkafkaвысокий层ВторосортныйapiПучоккомпенсироватьколичествописатьzookeeperсередина,Хотяэтотдобрыйметод МожеткпроходитьданныедержатьсуществоватьWALсерединагарантироватьданныене потерян,нода Может能встречапотому чтодляsparkStreamingиZKсерединадержатьизкомпенсироватьколичество Нетодин致ипривести кданныеодеяло Потребление Понятномного Второсортный。
DirectизExactly-once-semantics(EOS)проходитьвыполнитьkafkaНижний этаж Второсортныйapi,компенсироватьколичествотолькоодеялоsscдержатьсуществоватьcheckpointсередина,Устранено несоответствие zkиssc смещению извопрос.
4) API
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, темы)
демо-код
package cn.itcast.streaming
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkKafka2 {
def main(args: Array[String]): Unit = {
//1.создаватьStreamingContext
val config: SparkConf =
new SparkConf().setAppName("SparkStream").setMaster("local[*]")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("./kafka")
//==============================================
//2. Подготовьте параметры конфигурации.
val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "spark")
val topics = Set("spark_kafka")
val allDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//3. Получить темыизданные.
val data: DStream[String] = allDStream.map(_._2)
//==============================================
//WordCount
val words: DStream[String] = data.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
иллюстрировать
версия искровой потоковой передачи-Кафка-0-10,Есть определенные изменения в API,Более гибкая работа,развиватьсерединаиспользовать
pom.xml
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
APIhttp://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.htmlсоздаватьtopic
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic spark_kafka
Начать продюсер
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092,node01:9092,node01:9092 --topic spark_kafka
демо-код
package cn.itcast.streaming
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkKafkaDemo {
def main(args: Array[String]): Unit = {
//1.создаватьStreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)3
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5 означает, что верные данные будут разделены за 5 секунд для формирования RDD
//Подготавливаемся к подключению параметров Kafkaиз
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaDemo",
//самый ранний: если под каждым разделом есть отправленное изсмещение, потребление начинается с отправленного изсмещения, когда нет отправленного изсмещения, потребление начинается с начала;
//latest: Когда под каждым Разделом есть отправленное изсмещение, потребление начинается с отправленного изсмещения, когда нет представленного изсмещения, потребление нового из Должен Разделизданные;
//нет: когда каждый раздел темы существует и отправил isoffset, потребление начнется со смещения, пока один раздел не существует и отправил isoffset, будет выдано исключение;
//здесь Конфигурацияlatestс动Тяжелый置компенсироватьколичестводлядо настоящего времениизкомпенсироватьколичество,Прямо сейчасеслииметькомпенсироватьколичествооткомпенсироватьколичество Расположение Начните потреблять,безиметькомпенсироватьколичествоот新Приходитьизданные Начните потреблять
"auto.offset.reset" -> "latest",
//falseповерхность示закрывать闭с动представлять на рассмотрение.Зависит отsparkпомочь тебепредставлять на рассмотрениеприезжатьCheckpointИли программисты вручную поддерживают
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark_kafka")
//2. Используйте KafkaUtil для подключения к Kafak и получения данных.
val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,//Стратегия местоположения, исходный код настоятельно рекомендует использовать стратегию Должен, что позволит SparkизExecutorиKafkaизBroker реагировать равномерно
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//Стратегия потребления, исходный код настоятельно рекомендует использовать стратегию Должен
//3. Получить ЗНАЧЕНИЕ.
val lineDStream: DStream[String] = RecordDStream.map(_.value())//_обратитесь киздаConsumerRecord
val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_ означает, что изда отправляется иззначение, которое представляет собой одну строку данных
val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1))
val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
result.print()
ssc.start()//Начать
ssc.awaitTermination()//ждать корректной остановки
}
}
APIhttp://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
Начать продюсер
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092,node01:9092,node01:9092 --topic spark_kafka
демо-код
package cn.itcast.streaming
import java.sql.{DriverManager, ResultSet}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{OffsetRange, _}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object SparkKafkaDemo2 {
def main(args: Array[String]): Unit = {
//1.создаватьStreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5 означает, что верные данные будут разделены за 5 секунд для формирования RDD
//Подготавливаемся к подключению параметров Kafkaиз
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaDemo",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark_kafka")
//2. Используйте KafkaUtil для подключения к Kafak и получения данных.
//Уведомление:
//еслиMySQLсерединабезиметь Записыватьoffset,затем подключитесь напрямую,отlatestНачните потреблять
//еслиMySQLсерединаиметь Записыватьoffset,Тогда вам стоит начать потребление с Долженоффсета.
val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("SparkKafkaDemo","spark_kafka")
val recordDStream: InputDStream[ConsumerRecord[String, String]] = if(offsetMap.size > 0){//Есть смещение записи
println("Если смещение записано в MySQL, потребление начнется с Долженоффсет")
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,//Стратегия местоположения, исходный код настоятельно рекомендует использовать стратегию Должен, что позволит SparkизExecutorиKafkaизBroker реагировать равномерно
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetMap))//Стратегия потребления, исходный код настоятельно рекомендует использовать стратегию Должен
}else{//Смещение не записано
println("Если смещение не записано, подключитесь напрямую и начните потреблять с последнего")
// /export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,//Стратегия местоположения, исходный код настоятельно рекомендует использовать стратегию Должен, что позволит SparkизExecutorиKafkaизBroker реагировать равномерно
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//Стратегия потребления, исходный код настоятельно рекомендует использовать стратегию Должен
}
//3.действоватьданные //Уведомление: Нашей цели да необходимо самостоятельно поддерживать смещение вручную, а это означает, что после использования небольшого пакета данных мы должны отправить смещение один раз.
//И эта небольшая порция данных представлена в DStream с помощью даRDD, поэтому нам нужно работать с вернымDStream от RDD
//И изRDD в верноDStream работает изAPI имеет преобразование(Конвертировать)иforeachRDD(действие)
recordDStream.foreachRDD(rdd=>{
if(rdd.count() > 0){//В данный момент в текущем пакете есть данные
rdd.foreach(record => println("перениматьприезжатьизKafkОтправитьизданныедля:" + record))
//перениматьприезжатьизKafkОтправитьизданныедля:ConsumerRecord(topic = spark_kafka, partition = 1, offset = 6, CreateTime = 1565400670211, checksum = 1551891492, serialized key size = -1, serialized value size = 43, key = null, value = hadoop spark ...)
//Уведомление:проходить Распечататьперениматьприезжатьизинформация Можетксмотретьприезжать,Есть смещения, которые нам нужно поддерживать,ибыть обработаннымизданные
//ловить Вниз Приходить Можеткверноданныеруководитьиметь дело с....илииспользоватьtransformвозвращатьсяи Извпередодин样иметь дело с
//После написания кода данных Должен сохраняет смещение. Поэтому, чтобы облегчить правильное обслуживание/управление смещением, Spark предоставляет класс, который помогает нам инкапсулировать данные смещения.
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges){
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},доOffset=${o.untilOffset}")
}
//Вручную отправляем смещение,по умолчаниюпредставлять на рассмотрениеприезжатьCheckpointсередина
//recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
//действительныйсерединакомпенсироватьколичество Можеткпредставлять на рассмотрениеприезжатьMySQL/Redisсередина
OffsetUtil.saveOffsetRanges("SparkKafkaDemo",offsetRanges)
}
})
/* val lineDStream: DStream[String] = RecordDStream.map(_.value())//_обратитесь киздаConsumerRecord
val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_ означает, что изда отправляется иззначение, которое представляет собой одну строку данных
val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1))
val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
result.print()*/
ssc.start()//Начать
ssc.awaitTermination()//ждать корректной остановки
}
/*
Ручное обслуживание смещения класса инструмента
Сначала в MySQLсоздаем следующую таблицу
CREATE TABLE `t_offset` (
`topic` varchar(255) NOT NULL,
`partition` int(11) NOT NULL,
`groupid` varchar(255) NOT NULL,
`offset` bigint(20) DEFAULT NULL,
PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
*/
object OffsetUtil {
//Читаем смещение из библиотеки данных
def getOffsetMap(groupid: String, topic: String) = {
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
val pstmt = connection.prepareStatement("select * from t_offset where groupid=? and topic=?")
pstmt.setString(1, groupid)
pstmt.setString(2, topic)
val rs: ResultSet = pstmt.executeQuery()
val offsetMap = mutable.Map[TopicPartition, Long]()
while (rs.next()) {
offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")
}
rs.close()
pstmt.close()
connection.close()
offsetMap
}
//Волякомпенсироватьколичестводержатьприезжатьданные Библиотека
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
//replace «in» означает замену, если она была раньше, и вставку, если ее нет.
val pstmt = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)")
for (o <- offsetRange) {
pstmt.setString(1, o.topic)
pstmt.setInt(2, o.partition)
pstmt.setString(3, groupid)
pstmt.setLong(4, o.untilOffset)
pstmt.executeUpdate()
}
pstmt.close()
connection.close()
}
}