Azure: MS Fabric i Spark Notebooks

2024-10-03

Apache Spark (spark) pracuje na obiektach data frame podobnych do tych z Pandas, ale zoptymalizowanych do pracy z Spark engine. Tutaj wczytujemy (read) plik csv (format), który w pierwszym wierszu ma nagłówek (option) z pliku (load):

df = spark.read.format("csv").option("header","true").load("Files/orders/2019.csv")
# df now is a Spark DataFrame containing CSV data from "Files/orders/2019.csv".
display(df)

Plik CSV może nie mieć nagłówka i wtedy po jego wczytaniu kolumny będą miały nazwy _c01, _c02, … Dlatego można „nałożyć” na plik CSV strukturę. W takim przypadku będziemy pracować z czymś, co przypomina tabelę SQL. Rzeczywiście w spark mamy definicje typów SQL. Tutaj wczytujemy bibliotekę pyspark.sql.types, a następnie tworzymy strukturę (StructType), która składa się z kolumn (StructField), a te kolumny mogą być różnego typu, np.: StringType, IntegerType, DateType, FloatType. Zmienia się też sposób wczytania danych. Nadal czytamy (read) dane w postaci csv (format), ale teraz nadajemy im strukturę (schema), a same dane pobieramy z pliku (load).

from pyspark.sql.types import *

orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
    ])

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/2019.csv")
display(df)

Jeśli masz kilka plików CSV o takiej samej strukturze, to możesz je załadować jednocześnie zamieniając nazwę pliku na maskę:

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/*.csv")

W oparciu o jeden data frame można tworzyć kolejne. Można np. wybrać tylko niektóre kolumny wymieniając je w nawiasach kwadratowych. Data frame ma automatycznie kilka przydatnych funkcji. Np. count() policzy ile wierszy ma data frame, distinct() zwróci tylko wiersze unikalne, a co za tym idzie distinct().count() policzy ile wierszy jest unikalnych:

customers = df['CustomerName', 'Email']
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

Zamiast wymieniania nazw kolumn w nawiasie kwadratowym, można też użyć jawnej funkcji select:

customers = df.select("CustomerName", "Email")

Data frame ma metodę where, która pozwala odfiltrować wiersze spełniające określone warunki. Warunek definiuje się odwołując się do kolumn danych. Np. tutaj filtr wskazuje, że należy wyświetlić te wiersze, które w kolumnie Item mają określony tekst:

customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

Dane z data frame można grupować. Wystarczy wskazać, które kolumny zawierają dane „po których” ma się odbywać grupowanie, oraz wybrać funkcję agregującą:

productSales = df.select("Item", "Quantity").groupBy("Item").sum()

Spark może się posługiwać funkcjami występującymi w SQL. Te funkcje znajdują się w module pyspark.sql.functions. Dzięki temu można uruchomić funkcję, np. tutaj year, przekazać do niej kolumnę (col), oraz nadać jej alias (alias). Dodatkowo dane można posortować (orderBy):

from pyspark.sql.functions import *yearlySales = df.select(year(col("OrderDate")).alias("Year")).groupBy("Year").count().orderBy("Year")display(yearlySales)

Podczas transformacji można wyznaczać nowe kolumny, których wartość jest zbudowana w oparciu o istniejące kolumny. W tym celu korzysta się z withColumn, której argumentem jest nazwa nowo tworzonej kolumny oraz wyrażenie wyznaczające wartość tej kolumny. W wyrażeniach można używać funkcji takich, jak: year, month, split, getItem. Kolumny można eliminować lub zmieniać ich kolejność korzystając z metody select lub z nawiasów kwadratowych, w których zostaną wymienione we właściwej kolejności tylko te kolumny, które mają pozostać.

from pyspark.sql.functions import *

transformed_df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "Email", "Item", "Quantity", "UnitPrice", "Tax"]

display(transformed_df.limit(5))

Przetworzone dane można „zwrócić” zapisując je (write) do pliku. Można zdecydować, że plik ma być nadpisany (mode), oraz wskazać na format tworzonego pliku (parquet).

transformed_df.write.mode("overwrite").parquet('Files/transformed_data/orders')
print ("Transformed data saved!")

Dane zapisane w formacie parquet można wczytać, wskazując na format i lokalizację. Słowo order w ścieżce oznacza nazwę katalogu. W tym katalogu w formacie parquet są przechowywane dane.

orders_df = spark.read.format("parquet").load("Files/transformed_data/orders")
display(orders_df)

Podczas zapisu danych, można je partycjonować. Tutaj we wskazanym w metodzie parquet katalogu powstaną podkatalogi Year i Month wskazujące na konkretną partycję, np. Year=2022/Month=1:

orders_df.write.partitionBy("Year","Month").mode("overwrite").parquet("Files/partitioned_data")
print ("Transformed data saved!")

Następnie w intuicyjny sposób można wczytać tylko część danych:

orders_2021_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2021/Month=*")
display(orders_2021_df)

Jeśli preferujesz pracować z danymi z wykorzystaniem SQL, to możesz zapisać dane „jako tabela” (saveAsTable). Opisanie danych strukturą SQL może być wykonane w wewnętrznym metastore Sparka (tzw managed table) lub może odnosić się do pliku, który znajduje się w data lake (external table). W tej metodzie nie podajemy pełnej ścieżki dostępu do pliku, co oznacza, że będzie on przechowywany w metastore. „delta” to jeden z dostępnych formatów, inne to: csv, parquet, avro itd. Te formaty mogą wprowadzać typowo bazo-danowe funkcjonalności jak transakcje, wersjonowanie rekordów itp.

df.write.format("delta").saveAsTable("salesorders")spark.sql("DESCRIBE EXTENDED salesorders").show(truncate=False)

Mając tak zdefiniowaną tabelę, można pisać do niej zapytania SQL i zapisywać wynik w data frame spark:

df = spark.sql("SELECT * FROM xyz_lakehouse.salesorders LIMIT 1000")
display(df)

Ponieważ spark pozwala na pracę z różnymi językami, to można przełączyć się do języka SQL komendą %%sql

%%sql
SELECT YEAR(OrderDate) AS OrderYear,
       SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue
       FROM salesorders       
       GROUP BY YEAR(OrderDate)
       ORDER BY OrderYear;

Przełączając się między widokami, można dokonać wstępnej analizy danych na automatycznie generowanych wykresach:

Z drugiej strony Spark to nadal notebook, w którym jest uruchamiany kod Python, dlatego można korzystać z licznych funkcjonalności, które występują w tym języku, np. można tworzyć wykresy z wykorzystaniem popularnego modułu matplotlib:

sqlQuery = "SELECT CAST(YEAR(OrderDate) AS CHAR(4)) AS OrderYear, \
                SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue \
                FROM salesorders \
                GROUP BY CAST(YEAR(OrderDate) AS CHAR(4)) \
               ORDER BY OrderYear"
df_spark = spark.sql(sqlQuery)
df_spark.show()

from matplotlib import pyplot as pl

tdf_sales = df_spark.toPandas()
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'])
plt.show()

Mając pythonowe funkcje możemy wpływać na to, jak taki wykres ma wyglądać:

from matplotlib import pyplot as plt

plt.clf()
plt.bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')
plt.title('Revenue by Year')
plt.xlabel('Year')
plt.ylabel('Revenue')
plt.grid(color='#95a5a6', linestyle='--', linewidth=2, axis='y', alpha=0.7)
plt.xticks(rotation=45)
plt.show()

Matplotlib ma dużo możliwości pod względem formatowania wykresów i generalnie można korzystać ze wszystkich z nich. Tu np. tworzymy dwa wykresy w jednym:

from matplotlib import pyplot as plt

plt.clf()
fig, ax = plt.subplots(1, 2, figsize = (10,4))

ax[0].bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')
ax[0].set_title('Revenue by Year')

yearly_counts = df_sales['OrderYear'].value_counts()

ax[1].pie(yearly_counts)
ax[1].set_title('Orders per Year')
ax[1].legend(yearly_counts.keys().tolist())
fig.suptitle('Sales Data')

plt.show()

Matplotlib jest uniwersalny, ale też pracochłonny. Równolegle powstają też inne biblioteki do tworzenia wykresów, jak np. seaborn:

import seaborn as sns

plt.clf()
ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()

Seaborn pozwala na zastosowanie „tematów” definiujących z grubsza wygląd wykresu:

import seaborn as sns

plt.clf()
sns.set_theme(style="whitegrid")
ax = sns.barplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()

Do dyspozycji mamy wiele typów wykresów, w tym również liniowy:

import seaborn as sns

plt.clf()
ax = sns.lineplot(x="OrderYear", y="GrossRevenue", data=df_sales)
plt.show()
By Rafał Kraik in SQL

Linux: Konfiguracja podtrzymania połączeń SSH

2024-09-09

Domyślnie, kiedy połączenie SSH pozostanie otwarte przez dłuższy czas bez żadnej aktywności, to serwer takie połączenie zamknie. Można skonfigurować to zachowanie:

Na serwerze w pliku /etc/ssh/sshd_config dodaj linie:

ClientAliveInterval 120
ClientAliveCountMax 720
  • ClientAliveInterval określa czas w sekundach między wysyłaniem przez serwer pakietów keep-alive do klienta. W tym przypadku jest to 120 sekund.
  • ClientAliveCountMax określa maksymalną liczbę pakietów keep-alive, które serwer wyśle bez odpowiedzi od klienta, zanim zakończy połączenie. W tym przypadku jest to 720, co daje łącznie 24 godziny (120 sekund * 720).

Po wykonaniu zmian zrestartuj SSHD:

sudo systemctl restart sshd

Potem zmiany można wykonać jeszcze na kliencie. Zmiany wprowadzamy w pliku sudo nano /etc/ssh/ssh_config:

ServerAliveInterval 120
ServerAliveCountMax 720
By Rafał Kraik in Linuxy

Linux: Ubuntu 24: Stały adres IP

2024-09-09

W Ubuntu 24 do konfiguracji sieci nie używa się Network Managera, tylko NetPlan https://netplan.io/

Konfiguracja znajduje sie w plikach yaml w /etc/netplan

Najprawdopodobniej po instalacji znajdziesz w tym katalogu jakiś plik konfiguracyjny. U mnie był to 50-cloud-init.yaml

Domyślnie w tym pliku znajduje się zapis:

network:
    ethernets:
        eth0:
            dhcp4: true
    version: 2

co oznacza, że adres IP i ustawienia sieciowe dla interfejsu sieciowego mają być pobierane z DHCP

Jeśli chcesz mieć stały adres IP, zmień zawartość pliku np. na:

network:
    ethernets:
        eth0:
            dhcp4: no
            addresses:
              - 192.168.100.100/24
            gateway4: 192.168.100.1
            nameservers:
              addresses:
                - 8.8.8.8
    version: 2

Jeśli serwer korzysta z cloud-init, to zgodnie z instrukcją należy jeszcze wyłączyć automatyczą konfiguracje sieci, co robi się umieszczając w pliku /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg

network: {config: disabled}

Potem pozostaje jeszcze zaaplikować zmiany:

sudo netplan apply
By Rafał Kraik in Linuxy

PowerShell: Uruchomienie programu „jako administrator”

2024-09-06

Czasami możesz chcieć uruchomić jakiś program z poziomu linii komend „Jako administrator”. Da się to zrobić za pomocą powershellowej komendy Start-Process, która ma parametr -verb. Jeśli przekażesz wartość parameteru „RunAs” to uruchamiany proces będzie działał „jako administrator”. Oto i przykład polecenia uruchamianego w linii komend, która już wcześniej została uruchomiona jako administrator:

pwsh -command "start-process pwsh -verb runas"

Jeśli chcesz się przekonać, czy rzeczywiście jesteś teraz administratorem użyj:

$user = [Security.Principal.WindowsIdentity]::GetCurrent();
(New-Object Security.Principal.WindowsPrincipal $user).IsInRole([Security.Principal.WindowsBuiltinRole]::Administrator)

By Rafał Kraik in Power Shell

Linux: Instalacja serwera SSH

2024-08-09

Podczas instalacji serwera Linux Ubuntu można zdecydować się nie zainstalować serwera Open SSH. Jak go doinstalować później?

Zaczynamy od instalacji pakietu

sudo apt-get update
sudo apt-get upgrade
sudo apt-get intall openssh-client
sudo apt-get install openssh-server

Potem włączamy usługę i konfigurujemy ją do startu. Warto też sprawdzić status usługi, żeby sie upewnić, że wszystko działa, jak trzeba:

sudo systemctl enable ssh
sudo ufw allow ssh
sudo systemctl start ssh
sudo systemctl status ssh

I już można zacząć korzystać z ssh:

ssh username@server.address

A gdyby tak chcieć usunąć serwer SSH:

sudo systemctl stopssh
sudosystemctl disable ssh
sudo apt-get remove openssh-server
sudo ufw delete allow ssh
By Rafał Kraik in Linuxy

Helpdesk: Wolne menu kontekstowe

2024-08-04

Fajnie by było gdyby stare komputery pracowały z taką prędkością jak wtedy, gdy były nowe. Zresztą, to powinno być całkiem wykonywalne, bo przecież to nie jest tak, że starszy komputer rdzewieje i przez to robi się wolny. Spowolnienie wynika często z tego, że w swojej historii na komputerze było instalowane mnóstwo oprogramowania, które zostawiło po sobie ślady. Jednym z takich śladów mogą być niewykorzystywane pozycje w menu pod prawym przyciskiem myszy. Takie rozbudowane menu spowalnia reakcję komputera na kliknięcie prawym przyciskiem myszy.

To w jaki sposób usunąć takie niepotrzebne pozycje? Trzeba będzie zrobić to z pozycji edytora rejestru. Wystarczy uruchomić regedit i usunąć wpisy z następujących gałęzi:

HKEY_CLASSES_ROOT\*\shell
HKEY_CLASSES_ROOT\*\shellex\ContextMenuHandlers
HKEY_CLASSES_ROOT\AllFileSystemObjects\ShellEx

HKEY_CLASSES_ROOT\Drive\shell 
HKEY_CLASSES_ROOT\Drive\shellex\ContextMenuHandlers 
HKEY_CLASSES_ROOT\Folder\shell 
HKEY_CLASSES_ROOT\Folder\shellex\ContextMenuHandlers 

By Rafał Kraik in Helpdesk

Terraform – przekazywanie wartości JSON

2024-07-14

Niektóre zasoby w Terraform wymagają przekazywania parametru w formacie JSON. Tak jest np. dla azurerm_virtual_machine_extension i dla właściwości settings (azurerm_virtual_machine_extension | Resources | hashicorp/azurerm | Terraform | Terraform Registry):

resource "azurerm_virtual_machine_extension" "example" {
  name                 = "hostname"
  virtual_machine_id   = azurerm_virtual_machine.example.id
  publisher            = "Microsoft.Azure.Extensions"
  type                 = "CustomScript"
  type_handler_version = "2.0"

  settings = <<SETTINGS
 {
  "commandToExecute": "hostname && uptime"
 }
SETTINGS
}

Pojawia sie tutaj tzw. heredoc SETTINGS. Jest to po prostu metoda na „wstrzyknięcie” do skryptu poprawnie sformatowanego jako JSON tekstu.

Na upartego można to zrobić inaczej – settings można by opisać tak:

  settings = jsonencode({
    "commandToExecute": "hostname && uptime"
  })

A jeśli masz alergię również na jsonencode, to da się też tak:

 settings = "{ \"commandToExecute\": \"hostname && uptime\"}"
 

No i tak na oko: wydaje się, że heredoc jest dosyć zgrabne. Nie ma potrzeby wywoływania funkcji, liczenia nawiasów, albo co gorsza cytowania każdego cudzysłowa, czy znaku nowej linii.

By Rafał Kraik in Azure