Azure: MS Fabric i Spark Notebooks

3-paź-2024

Apache Spark (spark) pracuje na obiektach data frame podobnych do tych z Pandas, ale zoptymalizowanych do pracy z Spark engine. Tutaj wczytujemy (read) plik csv (format), który w pierwszym wierszu ma nagłówek (option) z pliku (load):

df = spark.read.format("csv").option("header","true").load("Files/orders/2019.csv")
# df now is a Spark DataFrame containing CSV data from "Files/orders/2019.csv".
display(df)

Plik CSV może nie mieć nagłówka i wtedy po jego wczytaniu kolumny będą miały nazwy _c01, _c02, … Dlatego można „nałożyć” na plik CSV strukturę. W takim przypadku będziemy pracować z czymś, co przypomina tabelę SQL. Rzeczywiście w spark mamy definicje typów SQL. Tutaj wczytujemy bibliotekę pyspark.sql.types, a następnie tworzymy strukturę (StructType), która składa się z kolumn (StructField), a te kolumny mogą być różnego typu, np.: StringType, IntegerType, DateType, FloatType. Zmienia się też sposób wczytania danych. Nadal czytamy (read) dane w postaci csv (format), ale teraz nadajemy im strukturę (schema), a same dane pobieramy z pliku (load).

from pyspark.sql.types import *

orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
    ])

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/2019.csv")
display(df)

Jeśli masz kilka plików CSV o takiej samej strukturze, to możesz je załadować jednocześnie zamieniając nazwę pliku na maskę:

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/*.csv")

W oparciu o jeden data frame można tworzyć kolejne. Można np. wybrać tylko niektóre kolumny wymieniając je w nawiasach kwadratowych. Data frame ma automatycznie kilka przydatnych funkcji. Np. count() policzy ile wierszy ma data frame, distinct() zwróci tylko wiersze unikalne, a co za tym idzie distinct().count() policzy ile wierszy jest unikalnych:

customers = df['CustomerName', 'Email']
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

Zamiast wymieniania nazw kolumn w nawiasie kwadratowym, można też użyć jawnej funkcji select:

customers = df.select("CustomerName", "Email")

Data frame ma metodę where, która pozwala odfiltrować wiersze spełniające określone warunki. Warunek definiuje się odwołując się do kolumn danych. Np. tutaj filtr wskazuje, że należy wyświetlić te wiersze, które w kolumnie Item mają określony tekst:

customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

Dane z data frame można grupować. Wystarczy wskazać, które kolumny zawierają dane „po których” ma się odbywać grupowanie, oraz wybrać funkcję agregującą:

productSales = df.select("Item", "Quantity").groupBy("Item").sum()

Spark może się posługiwać funkcjami występującymi w SQL. Te funkcje znajdują się w module pyspark.sql.functions. Dzięki temu można uruchomić funkcję, np. tutaj year, przekazać do niej kolumnę (col), oraz nadać jej alias (alias). Dodatkowo dane można posortować (orderBy):

from pyspark.sql.functions import *yearlySales = df.select(year(col("OrderDate")).alias("Year")).groupBy("Year").count().orderBy("Year")display(yearlySales)

Podczas transformacji można wyznaczać nowe kolumny, których wartość jest zbudowana w oparciu o istniejące kolumny. W tym celu korzysta się z withColumn, której argumentem jest nazwa nowo tworzonej kolumny oraz wyrażenie wyznaczające wartość tej kolumny. W wyrażeniach można używać funkcji takich, jak: year, month, split, getItem. Kolumny można eliminować lub zmieniać ich kolejność korzystając z metody select lub z nawiasów kwadratowych, w których zostaną wymienione we właściwej kolejności tylko te kolumny, które mają pozostać.

from pyspark.sql.functions import *

transformed_df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "Email", "Item", "Quantity", "UnitPrice", "Tax"]

display(transformed_df.limit(5))

Przetworzone dane można „zwrócić” zapisując je (write) do pliku. Można zdecydować, że plik ma być nadpisany (mode), oraz wskazać na format tworzonego pliku (parquet).

transformed_df.write.mode("overwrite").parquet('Files/transformed_data/orders')
print ("Transformed data saved!")

Dane zapisane w formacie parquet można wczytać, wskazując na format i lokalizację. Słowo order w ścieżce oznacza nazwę katalogu. W tym katalogu w formacie parquet są przechowywane dane.

orders_df = spark.read.format("parquet").load("Files/transformed_data/orders")
display(orders_df)

Podczas zapisu danych, można je partycjonować. Tutaj we wskazanym w metodzie parquet katalogu powstaną podkatalogi Year i Month wskazujące na konkretną partycję, np. Year=2022/Month=1:

orders_df.write.partitionBy("Year","Month").mode("overwrite").parquet("Files/partitioned_data")
print ("Transformed data saved!")

Następnie w intuicyjny sposób można wczytać tylko część danych:

orders_2021_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2021/Month=*")
display(orders_2021_df)

Jeśli preferujesz pracować z danymi z wykorzystaniem SQL, to możesz zapisać dane „jako tabela” (saveAsTable). Opisanie danych strukturą SQL może być wykonane w wewnętrznym metastore Sparka (tzw managed table) lub może odnosić się do pliku, który znajduje się w data lake (external table). W tej metodzie nie podajemy pełnej ścieżki dostępu do pliku, co oznacza, że będzie on przechowywany w metastore. „delta” to jeden z dostępnych formatów, inne to: csv, parquet, avro itd. Te formaty mogą wprowadzać typowo bazo-danowe funkcjonalności jak transakcje, wersjonowanie rekordów itp.

df.write.format("delta").saveAsTable("salesorders")spark.sql("DESCRIBE EXTENDED salesorders").show(truncate=False)

Mając tak zdefiniowaną tabelę, można pisać do niej zapytania SQL i zapisywać wynik w data frame spark:

df = spark.sql("SELECT * FROM xyz_lakehouse.salesorders LIMIT 1000")
display(df)

Ponieważ spark pozwala na pracę z różnymi językami, to można przełączyć się do języka SQL komendą %%sql

%%sql
SELECT YEAR(OrderDate) AS OrderYear,
       SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue
       FROM salesorders       
       GROUP BY YEAR(OrderDate)
       ORDER BY OrderYear;

Przełączając się między widokami, można dokonać wstępnej analizy danych na automatycznie generowanych wykresach:

Z drugiej strony Spark to nadal notebook, w którym jest uruchamiany kod Python, dlatego można korzystać z licznych funkcjonalności, które występują w tym języku, np. można tworzyć wykresy z wykorzystaniem popularnego modułu matplotlib:

sqlQuery = "SELECT CAST(YEAR(OrderDate) AS CHAR(4)) AS OrderYear, \
                SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue \
                FROM salesorders \
                GROUP BY CAST(YEAR(OrderDate) AS CHAR(4)) \
               ORDER BY OrderYear"
df_spark = spark.sql(sqlQuery)
df_spark.show()

from matplotlib import pyplot as pl

tdf_sales = df_spark.toPandas()
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'])
plt.show()

Mając pythonowe funkcje możemy wpływać na to, jak taki wykres ma wyglądać:

from matplotlib import pyplot as plt

plt.clf()
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')
plt.title('Revenue by Year')
plt.xlabel('Year')
plt.ylabel('Revenue')
plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7)
plt.xticks(rotation=45)
plt.show()

Matplotlib ma dużo możliwości pod względem formatowania wykresów i generalnie można korzystać ze wszystkich z nich. Tu np. tworzymy dwa wykresy w jednym:

from matplotlib import pyplot as plt

plt.clf()
fig, ax = plt.subplots(1, 2, figsize = (10,4))

ax[0].bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')
ax[0].set_title('Revenue by Year')

yearly_counts = df_sales['OrderYear'].value_counts()

ax[1].pie(yearly_counts)
ax[1].set_title('Orders per Year')
ax[1].legend(yearly_counts.keys().tolist())
fig.suptitle('Sales Data')

plt.show()

Matplotlib jest uniwersalny, ale też pracochłonny. Równolegle powstają też inne biblioteki do tworzenia wykresów, jak np. seaborn:

import seaborn as sns

plt.clf()
ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()

Seaborn pozwala na zastosowanie „tematów” definiujących z grubsza wygląd wykresu:

import seaborn as sns

plt.clf()
sns.set_theme(style="whitegrid")
ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()

Do dyspozycji mamy wiele typów wykresów, w tym również liniowy:

import seaborn as sns

plt.clf()
ax = sns.lineplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()

Komentarze są wyłączone

Autor: Rafał Kraik