Что такое Spark и с чем его едят?
Spark предоставляет быструю и универсальную платформу для обработки данных. По сравнению с Hadoop Spark ускоряет работу программ в памяти более чем в 100 раз, а на диске – более чем в 10 раз. Spark дает больше возможностей для работы с данными. Его синтаксис не так сложен, чтобы начать погружение, для сравнения приведу пример из Pandas.
Для работы с Spark, нужно создать сессию.
``` spark = SparkSession.builder.getOrCreate() ```
Во время создания сессии, происходит кластеризация.
Pandas ``` data = pd.read_csv('data.csv') ```` Spark ```` data = spark.read.csv(path=’data.csv’, header=True, sep=’,’) ````
Далее, сгруппируем данные и «сместим» в колонке на одну позицию. В Pandas это делается так:
``` data[group1] = pandas_df.groupby(group2)[group3].shift(-1) ```
В Spark
``` w = Window().partitionBy("group2").orderBy("group3") data = data.withColumn("group2", lag("group2", -1, 0).over(w)) ```
Можно использовать оконную функцию, где partitionBy отвечает за группировку данных, а orderBy сортировка. Функция lag принимает 3 параметра: это колонка, шаг смещения и значения, которые будет на месте шага. Или для группировки можно использовать обычную функцию groupBy, которая тоже есть в Spark. Разница в том, что с окном каждая строка будет связана с результатом агрегирования, вычисленным для всего окна. Однако при группировке каждая группа будет связана с результатом агрегации в этой группе (группа строк становится только одной строкой).
``` dataframe = spark.range(6).withColumn("key", 'id % 2) dataframe.show
windowing = Window.partitionBy("key") dataframe.withColumn("sum", sum(col("id")).over(windowing).show
dataframe.groupBy("key").agg(sum('id)).show
К сожалению, некоторых функций может не быть в Spark (например, factorize).
``` labels_start, uniques = pd.factorize(anomaly_time['activity_start']) anomaly_time['activity_start_code'] = labels_start ``` Spark ```` win_func = Window().partitionBy().orderBy(lit(' ')) data = data.select('name_column').distinct().withColumn('name_column', row_number().over(win_func) - 1) ````
Функция factorize закодирует объект как перечислимый тип или категориальную переменную, или присвоит объекту идентификатор.
``` codes, uniques = pd.factorize(['b', 'b', 'a', 'c', 'b']) codes array([0, 0, 1, 2, 0]...) ```
Для выполнения подобного функционала в Spark, берется колонка select (‘name_column’) и выбираются все уникальные значения, с помощью функции distinct. Далее с помощью функции withColumn создается колонка и присваивается номер строки (чтобы начиналось с 0 — я отнимаю 1).
Вывод
Apache Spark это огромная система, с множеством инструментов для разных типов задач от SQL до машинного обучения. В этой статье был показан лишь маленький кусочек от всего Spark, но даже этого хватит, чтобы начать обрабатывать данные.