Spark. Быстрый старт
Spark - это аналитический движок для крупномасштабной обработки данных. Предоставляет быструю и универсальную платформу для обработки данных. Hadoop Spark ускоряет работу программ в памяти более чем в 100 раз, а на диске – более чем в 10 раз.
Для открытия оболочки Spark введите в команду:
$spark-shell
С помощью
spark-shell
можно работать с уже существующими базами данных.Пример Select-запроса к ранее созданной таблице:

Увеличить
Преобразования RDD
RDD – это разновидность датасета (простого набора данных), который разделён на множество машин, работающих в кластере.
Ниже приведен список преобразований RDD.
Преобразование | Описание |
---|---|
map(func) | Возвращает новый распределённый набор данных, сформированный путём передачи каждого элемента источника через функцию func |
filter(func) | Возвращает новый набор данных, сформированный путем выбора тех элементов источника, для которых func возвращает true |
flatMap(func) | Подобно map, но каждый элемент ввода может быть сопоставлен с 0 или более элементами вывода (поэтому func должен возвращать Seq, а не один элемент) |
mapPartitions(func) | Подобно map, но запускается отдельно для каждого раздела (блока) RDD, поэтому func должен иметь тип Iterator <T> ⇒ Iterator <U> при запуске в RDD типа T |
mapPartitionsWithIndex(func) | Подобно map Partitions, но также предоставляет func целочисленным значением, представляющим индекс раздела, поэтому func должен иметь тип (Int, Iterator <T>) ⇒ Iterator <U> при работе с RDD типа T |
sample(withReplacement, fraction, seed) | Произведите выборку части данных с заменой или без нее, используя заданное начальное число генератора случайных чисел |
union(otherDataset) | Возвращает новый набор данных, содержащий объединение элементов исходного набора данных и аргумента |
intersection(otherDataset) | Возвращает новый RDD, который содержит пересечение элементов в исходном наборе данных и аргументе |
distinct([numTasks]) | Возвращает новый набор данных, содержащий отдельные элементы исходного набора данных |
groupByKey([numTasks]) | При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, Iterable <V>) . Примечание: если вы группируете для выполнения агрегирования (например, суммы или среднего) по каждому ключу, использование reduceByKey или aggregateByKey даст гораздо лучшую производительность |
reduceByKey(func, [numTasks]) | При вызове для набора данных из пар (K, V) возвращает набор данных из пар (K, V) , где значения для каждого ключа агрегируются с использованием заданной функции сокращения func, которая должна иметь тип (V, V) ⇒ V . Как и в groupByKey , количество задач сокращения настраивается с помощью необязательного второго аргумента |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, U) , где значения для каждого ключа агрегированы с использованием заданных функций объединения и нейтрального «нулевого» значения. Позволяет использовать тип агрегированного значения, отличный от типа входного значения, избегая при этом ненужного выделения. Как и в groupByKey , количество задач уменьшения настраивается с помощью необязательного второго аргумента |
sortByKey([ascending], [numTasks]) | При вызове набора данных из пар (K, V) , где K реализует Ordered , возвращает набор данных из пар (K, V) , отсортированных по ключам в порядке возрастания или убывания, как указано в логическом аргументе ascending |
join(otherDataset, [numTasks]) | При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных из пар (K, (V, W)) со всеми парами элементов для каждого ключа. Внешние соединения поддерживаются через leftOuterJoin , rightOuterJoin и fullOuterJoin |
cogroup(otherDataset, [numTasks]) | При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных из кортежей (K, (Iterable <V>, Iterable <W>)) |
cartesian(otherDataset) | При вызове наборов данных типов T и U возвращает набор данных из пар (T, U) (все пары элементов) |
pipe(command, [envVars]) | Передайте каждый раздел RDD через команду оболочки, например, сценарий Perl или bash . Элементы RDD записываются в стандартный ввод процесса, а строки, выводимые на его стандартный вывод, возвращаются как RDD-строки |
coalesce(numPartitions) | Уменьшите количество разделов в RDD до numPartitions . Полезно для более эффективного выполнения операций после фильтрации большого набора данных |
repartition(numPartitions) | Перемешивайте данные в RDD случайным образом, чтобы создать больше или меньше разделов и сбалансировать их между собой. Это всегда перемешивает все данные по сети |
repartitionAndSortWithinPartitions(partitioner) | Переразбейте RDD в соответствии с заданным разделителем и в каждом разделе отсортируйте записи по их ключам. Это более эффективно, чем вызов повторного разбиения, а затем сортировка внутри каждого раздела, поскольку он может подтолкнуть сортировку к механизму перемешивания |
Действия RDD
Ниже приведен список действий RDD.
Действие | Описание |
---|---|
reduce(func) | Агрегируйте элементы набора данных с помощью функции func (которая принимает два аргумента и возвращает один). Функция должна быть коммутативной и ассоциативной, чтобы её можно было правильно вычислить параллельно |
collect() | Возвращает все элементы набора данных в виде массива в программу драйвера. Обычно это полезно после фильтра или другой операции, которая возвращает достаточно небольшое подмножество данных |
count() | Возвращает количество элементов в наборе данных |
first() | Возвращает первый элемент набора данных |
take(n) | Возвращает массив с первыми n-элементами набора данных |
takeSample (withReplacement,num, [seed]) | Возвращает массив с случайной выборкой num элементами набора данных, с или без замены, при необходимости предварительно указав seed генератора случайных чисел |
takeOrdered(n, [ordering]) | Возвращает первые n элементов RDD, используя их естественный порядок или настраиваемый компаратор |
saveAsTextFile(path) | Записывает элементы набора данных в виде текстового файла (или набора текстовых файлов) в заданном каталоге в локальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой Hadoop.Spark, вызывает toString для каждого элемента, чтобы преобразовать его в строку текста в файле |
saveAsSequenceFile(path) (Java and Scala) | Записывает элементы набора данных в виде файла последовательности Hadoop по заданному пути в локальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой Hadoop. Это доступно в RDD пар ключ-значение, реализующих интерфейс Hadoop Writable. В Scala он также доступен для типов, которые неявно конвертируются в Writable (Spark включает преобразования для базовых типов, таких как Int, Double, String и т. д.) |
saveAsObjectFile(path) (Java and Scala) | Записывает элементы набора данных в простом формате с использованием сериализации Java, которые затем можно загрузить с помощью SparkContext.objectFile() |
countByKey() | Доступно только для RDD типа (K, V) . Возвращает хэш-карту пар (K, Int) со счетчиком каждого ключа |
Числовые операции RDD
Spark позволяет выполнять различные операции с числовыми данными, используя один из предопределенных методов API. Числовые операции Spark реализованы с помощью алгоритма потоковой передачи, который позволяет строить модель по одному элементу за раз. Эти операции вычисляются и возвращаются как объект
StatusCounter
путем вызова метода status()
. Ниже приводится числовых методов, доступных в StatusCounter
.Операция | Описание |
---|---|
count() | Количество элементов в RDD |
Mean() | Среднее значение элементов в RDD |
Sum() | Общая сумма элементов в RDD |
Max() | Максимальное значение среди всех элементов в RDD |
Min() | Минимальное значение среди всех элементов в RDD |
Variance() | Дисперсия элементов |
Stdev() | Стандартное отклонение |
Spark-submit - это программа для отправки заданий Spark или PySpark для обработки на кластер Spark.
Простейшим примером использования
spark-submit
является выполнение тестового скрипта, который вычисляет число π с указанной точностью:spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --num-executors 1 \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ <path_to_spark>/examples/jars/spark-examples.jar 10
Результатом работы запроса является строка:


Увеличить
В состав дистрибутива SDP Hadoop входят 2 версии Spark: Spark2 и Spark3.
При исполнении команды
spark-shell
по умолчанию запускается Spark2. Для использования Spark3 необходимо явно указать версию перед вызовом нужной команды:SPARK_MAJOR_VERSION=3 spark-shell
или
SPARK_MAJOR_VERSION=3 spark-submit <params>