Apache Spark (spark) pracuje na obiektach data frame podobnych do tych z Pandas, ale zoptymalizowanych do pracy z Spark engine. Tutaj wczytujemy (read) plik csv (format), który w pierwszym wierszu ma nagłówek (option) z pliku (load):
df = spark.read.format("csv").option("header","true").load("Files/orders/2019.csv")
# df now is a Spark DataFrame containing CSV data from "Files/orders/2019.csv".
display(df)
Plik CSV może nie mieć nagłówka i wtedy po jego wczytaniu kolumny będą miały nazwy _c01, _c02, … Dlatego można „nałożyć” na plik CSV strukturę. W takim przypadku będziemy pracować z czymś, co przypomina tabelę SQL. Rzeczywiście w spark mamy definicje typów SQL. Tutaj wczytujemy bibliotekę pyspark.sql.types, a następnie tworzymy strukturę (StructType), która składa się z kolumn (StructField), a te kolumny mogą być różnego typu, np.: StringType, IntegerType, DateType, FloatType. Zmienia się też sposób wczytania danych. Nadal czytamy (read) dane w postaci csv (format), ale teraz nadajemy im strukturę (schema), a same dane pobieramy z pliku (load).
from pyspark.sql.types import *
orderSchema = StructType([
StructField("SalesOrderNumber", StringType()),
StructField("SalesOrderLineNumber", IntegerType()),
StructField("OrderDate", DateType()),
StructField("CustomerName", StringType()),
StructField("Email", StringType()),
StructField("Item", StringType()),
StructField("Quantity", IntegerType()),
StructField("UnitPrice", FloatType()),
StructField("Tax", FloatType())
])
df = spark.read.format("csv").schema(orderSchema).load("Files/orders/2019.csv")
display(df)
Jeśli masz kilka plików CSV o takiej samej strukturze, to możesz je załadować jednocześnie zamieniając nazwę pliku na maskę:
df = spark.read.format("csv").schema(orderSchema).load("Files/orders/*.csv")
W oparciu o jeden data frame można tworzyć kolejne. Można np. wybrać tylko niektóre kolumny wymieniając je w nawiasach kwadratowych. Data frame ma automatycznie kilka przydatnych funkcji. Np. count() policzy ile wierszy ma data frame, distinct() zwróci tylko wiersze unikalne, a co za tym idzie distinct().count() policzy ile wierszy jest unikalnych:
customers = df['CustomerName', 'Email']
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())
Zamiast wymieniania nazw kolumn w nawiasie kwadratowym, można też użyć jawnej funkcji select:
customers = df.select("CustomerName", "Email")
Data frame ma metodę where, która pozwala odfiltrować wiersze spełniające określone warunki. Warunek definiuje się odwołując się do kolumn danych. Np. tutaj filtr wskazuje, że należy wyświetlić te wiersze, które w kolumnie Item mają określony tekst:
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())
Dane z data frame można grupować. Wystarczy wskazać, które kolumny zawierają dane „po których” ma się odbywać grupowanie, oraz wybrać funkcję agregującą:
productSales = df.select("Item", "Quantity").groupBy("Item").sum()
Spark może się posługiwać funkcjami występującymi w SQL. Te funkcje znajdują się w module pyspark.sql.functions. Dzięki temu można uruchomić funkcję, np. tutaj year, przekazać do niej kolumnę (col), oraz nadać jej alias (alias). Dodatkowo dane można posortować (orderBy):
from pyspark.sql.functions import *yearlySales = df.select(year(col("OrderDate")).alias("Year")).groupBy("Year").count().orderBy("Year")display(yearlySales)
Podczas transformacji można wyznaczać nowe kolumny, których wartość jest zbudowana w oparciu o istniejące kolumny. W tym celu korzysta się z withColumn, której argumentem jest nazwa nowo tworzonej kolumny oraz wyrażenie wyznaczające wartość tej kolumny. W wyrażeniach można używać funkcji takich, jak: year, month, split, getItem. Kolumny można eliminować lub zmieniać ich kolejność korzystając z metody select lub z nawiasów kwadratowych, w których zostaną wymienione we właściwej kolejności tylko te kolumny, które mają pozostać.
from pyspark.sql.functions import *
transformed_df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))
transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "Email", "Item", "Quantity", "UnitPrice", "Tax"]
display(transformed_df.limit(5))
Przetworzone dane można „zwrócić” zapisując je (write) do pliku. Można zdecydować, że plik ma być nadpisany (mode), oraz wskazać na format tworzonego pliku (parquet).
transformed_df.write.mode("overwrite").parquet('Files/transformed_data/orders')
print ("Transformed data saved!")
Dane zapisane w formacie parquet można wczytać, wskazując na format i lokalizację. Słowo order w ścieżce oznacza nazwę katalogu. W tym katalogu w formacie parquet są przechowywane dane.
orders_df = spark.read.format("parquet").load("Files/transformed_data/orders")
display(orders_df)
Podczas zapisu danych, można je partycjonować. Tutaj we wskazanym w metodzie parquet katalogu powstaną podkatalogi Year i Month wskazujące na konkretną partycję, np. Year=2022/Month=1:
orders_df.write.partitionBy("Year","Month").mode("overwrite").parquet("Files/partitioned_data")
print ("Transformed data saved!")
Następnie w intuicyjny sposób można wczytać tylko część danych:
orders_2021_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2021/Month=*")
display(orders_2021_df)
Jeśli preferujesz pracować z danymi z wykorzystaniem SQL, to możesz zapisać dane „jako tabela” (saveAsTable). Opisanie danych strukturą SQL może być wykonane w wewnętrznym metastore Sparka (tzw managed table) lub może odnosić się do pliku, który znajduje się w data lake (external table). W tej metodzie nie podajemy pełnej ścieżki dostępu do pliku, co oznacza, że będzie on przechowywany w metastore. „delta” to jeden z dostępnych formatów, inne to: csv, parquet, avro itd. Te formaty mogą wprowadzać typowo bazo-danowe funkcjonalności jak transakcje, wersjonowanie rekordów itp.
df.write.format("delta").saveAsTable("salesorders")spark.sql("DESCRIBE EXTENDED salesorders").show(truncate=False)
Mając tak zdefiniowaną tabelę, można pisać do niej zapytania SQL i zapisywać wynik w data frame spark:
df = spark.sql("SELECT * FROM xyz_lakehouse.salesorders LIMIT 1000")
display(df)
Ponieważ spark pozwala na pracę z różnymi językami, to można przełączyć się do języka SQL komendą %%sql
%%sql
SELECT YEAR(OrderDate) AS OrderYear,
SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue
FROM salesorders
GROUP BY YEAR(OrderDate)
ORDER BY OrderYear;
Przełączając się między widokami, można dokonać wstępnej analizy danych na automatycznie generowanych wykresach:
Z drugiej strony Spark to nadal notebook, w którym jest uruchamiany kod Python, dlatego można korzystać z licznych funkcjonalności, które występują w tym języku, np. można tworzyć wykresy z wykorzystaniem popularnego modułu matplotlib:
sqlQuery = "SELECT CAST(YEAR(OrderDate) AS CHAR(4)) AS OrderYear, \
SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue \
FROM salesorders \
GROUP BY CAST(YEAR(OrderDate) AS CHAR(4)) \
ORDER BY OrderYear"
df_spark = spark.sql(sqlQuery)
df_spark.show()
from matplotlib import pyplot as pl
tdf_sales = df_spark.toPandas()
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'])
plt.show()
Mając pythonowe funkcje możemy wpływać na to, jak taki wykres ma wyglądać:
from matplotlib import pyplot as plt
plt.clf()
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')
plt.title('Revenue by Year')
plt.xlabel('Year')
plt.ylabel('Revenue')
plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7)
plt.xticks(rotation=45)
plt.show()
Matplotlib ma dużo możliwości pod względem formatowania wykresów i generalnie można korzystać ze wszystkich z nich. Tu np. tworzymy dwa wykresy w jednym:
from matplotlib import pyplot as plt
plt.clf()
fig, ax = plt.subplots(1, 2, figsize = (10,4))
ax[0].bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')
ax[0].set_title('Revenue by Year')
yearly_counts = df_sales['OrderYear'].value_counts()
ax[1].pie(yearly_counts)
ax[1].set_title('Orders per Year')
ax[1].legend(yearly_counts.keys().tolist())
fig.suptitle('Sales Data')
plt.show()
Matplotlib jest uniwersalny, ale też pracochłonny. Równolegle powstają też inne biblioteki do tworzenia wykresów, jak np. seaborn:
import seaborn as sns
plt.clf()
ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()
Seaborn pozwala na zastosowanie „tematów” definiujących z grubsza wygląd wykresu:
import seaborn as sns
plt.clf()
sns.set_theme(style="whitegrid")
ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()
Do dyspozycji mamy wiele typów wykresów, w tym również liniowy:
import seaborn as sns
plt.clf()
ax = sns.lineplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()