Spark. Быстрый старт

Spark - это аналитический движок для крупномасштабной обработки данных. Предоставляет быструю и универсальную платформу для обработки данных. Hadoop Spark ускоряет работу программ в памяти более чем в 100 раз, а на диске – более чем в 10 раз.

Spark-shell

Для открытия оболочки Spark введите в команду:
$spark-shell
С помощью spark-shell можно работать с уже существующими базами данных.
Пример Select-запроса к ранее созданной таблице:

Увеличить

Преобразования RDD

RDD – это разновидность датасета (простого набора данных), который разделён на множество машин, работающих в кластере.
Ниже приведен список преобразований RDD.
ПреобразованиеОписание
map(func)
Возвращает новый распределённый набор данных, сформированный путём передачи каждого элемента источника через функцию func
filter(func)
Возвращает новый набор данных, сформированный путем выбора тех элементов источника, для которых func возвращает true
flatMap(func)
Подобно map, но каждый элемент ввода может быть сопоставлен с 0 или более элементами вывода (поэтому func должен возвращать Seq, а не один элемент)
mapPartitions(func)
Подобно map, но запускается отдельно для каждого раздела (блока) RDD, поэтому func должен иметь тип Iterator <T> ⇒ Iterator <U> при запуске в RDD типа T
mapPartitionsWithIndex(func)
Подобно map Partitions, но также предоставляет func целочисленным значением, представляющим индекс раздела, поэтому func должен иметь тип (Int, Iterator <T>) ⇒ Iterator <U> при работе с RDD типа T
sample(withReplacement, fraction, seed)
Произведите выборку части данных с заменой или без нее, используя заданное начальное число генератора случайных чисел
union(otherDataset)
Возвращает новый набор данных, содержащий объединение элементов исходного набора данных и аргумента
intersection(otherDataset)
Возвращает новый RDD, который содержит пересечение элементов в исходном наборе данных и аргументе
distinct([numTasks])
Возвращает новый набор данных, содержащий отдельные элементы исходного набора данных
groupByKey([numTasks])
При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, Iterable <V>). Примечание: если вы группируете для выполнения агрегирования (например, суммы или среднего) по каждому ключу, использование reduceByKey или aggregateByKey даст гораздо лучшую производительность
reduceByKey(func, [numTasks])
При вызове для набора данных из пар (K, V) возвращает набор данных из пар (K, V), где значения для каждого ключа агрегируются с использованием заданной функции сокращения func, которая должна иметь тип (V, V) ⇒ V. Как и в groupByKey, количество задач сокращения настраивается с помощью необязательного второго аргумента
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, U), где значения для каждого ключа агрегированы с использованием заданных функций объединения и нейтрального «нулевого» значения. Позволяет использовать тип агрегированного значения, отличный от типа входного значения, избегая при этом ненужного выделения. Как и в groupByKey, количество задач уменьшения настраивается с помощью необязательного второго аргумента
sortByKey([ascending], [numTasks])
При вызове набора данных из пар (K, V), где K реализует Ordered, возвращает набор данных из пар (K, V), отсортированных по ключам в порядке возрастания или убывания, как указано в логическом аргументе ascending
join(otherDataset, [numTasks])
При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных из пар (K, (V, W)) со всеми парами элементов для каждого ключа. Внешние соединения поддерживаются через leftOuterJoin, rightOuterJoin и fullOuterJoin
cogroup(otherDataset, [numTasks])
При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных из кортежей (K, (Iterable <V>, Iterable <W>))
cartesian(otherDataset)
При вызове наборов данных типов T и U возвращает набор данных из пар (T, U) (все пары элементов)
pipe(command, [envVars])
Передайте каждый раздел RDD через команду оболочки, например, сценарий Perl или bash. Элементы RDD записываются в стандартный ввод процесса, а строки, выводимые на его стандартный вывод, возвращаются как RDD-строки
coalesce(numPartitions)
Уменьшите количество разделов в RDD до numPartitions. Полезно для более эффективного выполнения операций после фильтрации большого набора данных
repartition(numPartitions)
Перемешивайте данные в RDD случайным образом, чтобы создать больше или меньше разделов и сбалансировать их между собой. Это всегда перемешивает все данные по сети
repartitionAndSortWithinPartitions(partitioner)
Переразбейте RDD в соответствии с заданным разделителем и в каждом разделе отсортируйте записи по их ключам. Это более эффективно, чем вызов повторного разбиения, а затем сортировка внутри каждого раздела, поскольку он может подтолкнуть сортировку к механизму перемешивания

Действия RDD

Ниже приведен список действий RDD.
ДействиеОписание
reduce(func)
Агрегируйте элементы набора данных с помощью функции func (которая принимает два аргумента и возвращает один). Функция должна быть коммутативной и ассоциативной, чтобы её можно было правильно вычислить параллельно
collect()
Возвращает все элементы набора данных в виде массива в программу драйвера. Обычно это полезно после фильтра или другой операции, которая возвращает достаточно небольшое подмножество данных
count()
Возвращает количество элементов в наборе данных
first()
Возвращает первый элемент набора данных
take(n)
Возвращает массив с первыми n-элементами набора данных
takeSample (withReplacement,num, [seed])
Возвращает массив с случайной выборкой num элементами набора данных, с или без замены, при необходимости предварительно указав seed генератора случайных чисел
takeOrdered(n, [ordering])
Возвращает первые n элементов RDD, используя их естественный порядок или настраиваемый компаратор
saveAsTextFile(path)
Записывает элементы набора данных в виде текстового файла (или набора текстовых файлов) в заданном каталоге в локальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой Hadoop.Spark, вызывает toString для каждого элемента, чтобы преобразовать его в строку текста в файле
saveAsSequenceFile(path) (Java and Scala)
Записывает элементы набора данных в виде файла последовательности Hadoop по заданному пути в локальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой Hadoop. Это доступно в RDD пар ключ-значение, реализующих интерфейс Hadoop Writable. В Scala он также доступен для типов, которые неявно конвертируются в Writable (Spark включает преобразования для базовых типов, таких как Int, Double, String и т. д.)
saveAsObjectFile(path) (Java and Scala)
Записывает элементы набора данных в простом формате с использованием сериализации Java, которые затем можно загрузить с помощью SparkContext.objectFile()
countByKey()
Доступно только для RDD типа (K, V). Возвращает хэш-карту пар (K, Int) со счетчиком каждого ключа

Числовые операции RDD

Spark позволяет выполнять различные операции с числовыми данными, используя один из предопределенных методов API. Числовые операции Spark реализованы с помощью алгоритма потоковой передачи, который позволяет строить модель по одному элементу за раз. Эти операции вычисляются и возвращаются как объект StatusCounter путем вызова метода status(). Ниже приводится числовых методов, доступных в StatusCounter.
ОперацияОписание
count()
Количество элементов в RDD
Mean()
Среднее значение элементов в RDD
Sum()
Общая сумма элементов в RDD
Max()
Максимальное значение среди всех элементов в RDD
Min()
Минимальное значение среди всех элементов в RDD
Variance()
Дисперсия элементов
Stdev()
Стандартное отклонение

Spark-submit

Spark-submit - это программа для отправки заданий Spark или PySpark для обработки на кластер Spark.
Простейшим примером использования spark-submit является выполнение тестового скрипта, который вычисляет число π с указанной точностью:
spark-submit --class org.apache.spark.examples.SparkPi \
 --master yarn \
 --num-executors 1 \
 --driver-memory 512m \
 --executor-memory 512m \
 --executor-cores 1 \
 <path_to_spark>/examples/jars/spark-examples.jar 10
Результатом работы запроса является строка:

Увеличить

Переключение версии Spark

В состав дистрибутива SDP Hadoop входят 2 версии Spark: Spark2 и Spark3.
При исполнении команды spark-shell по умолчанию запускается Spark2. Для использования Spark3 необходимо явно указать версию перед вызовом нужной команды:
SPARK_MAJOR_VERSION=3 spark-shell
или
SPARK_MAJOR_VERSION=3 spark-submit <params>
Предыдущий раздел
Быстрый старт
Следующий раздел
Быстрый старт
Была ли страница полезной?