I mitt senaste webinar pratade jag om skiftet som skett från ETL till ELT för data pipelines. Allt fler pipelines är idag byggda som ELT. Förenklat så beror detta på att ELT pipelines är mycket mindre resurskrävande på både kort och lång sikt.
Målet med den här artikeln är att ge dig som läsare förståelse för hur en ELT-Pipeline fungerar i praktiken.
Jag kommer att gå igenom förutsättningarna och koden för ett påhittat exempel, så att det blir enklare för dig att förstå. Det här är guiden jag önskade att jag hade när jag började utforska området. För mig trillade poletten ner först när jag sett ett exempel på en implementation.
Innan jag börjar med det så vill jag gå igenom skillnaderna mellan ETL & ELT på ett högre plan, samt dela några tankar kring vad förändringen drivits av.
Skillnad mellan ETL och ELT
I det tidigare paradigmet flyttades data mellan källor (apier, databaser, filsystem) och analysplattformar med hjälp av vad som kallas ETL pipelines. Där data behandlas i den här ordningen:
- Extract. Datat hämtas ut ur sin källa
- Transform. Datat tvättas och transformeras.
- Load. Datat laddas in i analysplattform för att kunna skapa värde
Generellt gäller att steg 1 (Extract) och 3 (Load) är triviala steg. Det är steg som alla företag som har sin data i samma källa kommer att behöva bygga och underhålla.
Det som skiljer företag A från företag B är oftast bara vad de gör med datan i steg 2 (Transform).
Här sker det som är unikt för företagets affär och hur de vill skapa värde med hjälp av datan.
Men eftersom alla tre steg tidigare låg i samma tjänst/program, var det inte helt lätt att separera dem. Fristående bibliotek/SDK:er för API:er för Extract och Load stegen fanns såklart redan, men eftersom alla företag ville ha sin egen Transformering var de också tvungna att själva koppla ihop delarna till ett program. Vi behöver också komma ihåg att detta var innan massivt parallella databaser i Cloud (RedShift, BigQuery & SnowFlake) fanns. Vilket gjorde att det även fanns en prestandamässig anledning att hålla kvar transformeringen i samma flöde som extract:andet och load:andet. Datan som landade i analysplattformen skulle vara så färdig för analys som möjligt.
Skiftet från ETL till ELT påbörjades när AWS släppte RedShift till världen.
AWS var först ut med att göra en managed service av den sortens teknik. Helt plötsligt blev en massa kraft tillgänglig i Cloud för en billig peng. Det som tidigare varit ett argument för att lägga Transformeringen utanför data warehouset – den begränsade kraften i data warehouset – blev nu ett argument mot. Kan vi bara få datan till DW:t så kan vi göra allt vi vill med den där.
Men det behövdes fortfarande ett verktyg för att orkestrera transformeringen av datan i DW:t. Ett verktyg där vi kan ha koll på vart den relativt råa indatan landar och vad vi sedan gör med den. Ett verktyg som var populärt bland Data Engineers för att bygga ETL pipelines var Airflow. Airflow är ett verktyg för orkestrering och schemaläggning, som verkligen excellerar när komplexa arbetsflöden behöver orkestrera över flera delar av IT-arkitekturen. Nackdelen är att Airflow är ett relativt resurskrävande verktyg att komma igång med.
Några som såg begränsningen med att använda verktyg likt Airflow för att transformera data i data warehouset var Fishtown Analytics (numera dbt Labs). Dom såg istället möjligheten med att bygga ett verktyg fokuserat på att göra det så enkelt och robust som möjligt att transformera data inom ett cloud data warehouse.
Resten av artikeln tänker jag lägga på att förklara hur T:et i ELT kan byggas i dbt. Jag hoppas att det blir tydligt hur mycket det förändrar kraven på EL stegen.
En detaljerad titt på T:et i ELT
Vad är det då som behöver göras i Transformations-steget? Det mest grundläggande är följande:
- Läsa in fält från ostrukturerad källdata
- Casta dessa fält till korrekta typer och namnge dessa fält på ett konsekvent sätt
- Rensa bort dubbletter
- Tester / Felhantering
För att visa och förklara hur vi kan göra utföra detta i dbt så behöver vi exempeldata att utgå från. I tabellen nedan har jag lagt in data som vi kan tänka oss genereras från en tjänst som tar hand om EL. Varje timme laddas data från de tre senaste dygnen från en källa och sparas i tabellen helt orört. Datan kommer från en tjänst som hanterar in och utgående samtal för kundsupport.
_loaded_at | _loaded_data |
---|---|
2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”Incoming.”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:08:12”, ”number”: ”0703857940”, ”direction”: ”Incoming”, ”Outcome”: Answered”, ”minutes”:”4.2″}} |
2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:07:58”, ”number”: ”0765831292”, ”direction”: ”Outgoing”, ”Outcome”: Unanswered”, ”minutes”:”0″} |
2021-12-05 13:02:10 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”Incoming.”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
2021-12-05 13:02:10 UTC | {”call_time”: ”2021-12-06T09:08:12”, ”number”: ”0703857940”, ”direction”: ”Incoming”, ”Outcome”: Answered”, ”minutes”:”4.2″} |
där _loaded_at innehåller en tidsstämpel på när datan laddades, och _loaded_data innehåller datan i JSON format.
Steg 1: Läsa in fält från källdata i i JSON-format
Det första vi behöver göra är att packa upp json-datan i _loaded_data till kolumner. Att behandla en sträng som JSON och hämta värdet för en specifik nyckel (eller en nyckel i en lista) stödjer alla vanliga DW:n. Exemplet nedan är skrivet för BigQuery, men jag tror att du rätt lätt kan översätta det till ditt DW:s syntax
som genererar följande tabell:
call_time | caller_number | call_type | call_outcome | call_duration | _loaded_at | _loaded_data |
---|---|---|---|---|---|---|
2021-12-06T09:10:12 | 0708887950 | Incoming | Answered | 0.7 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”Incoming.”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
2021-12-06T09:08:12 | 0703857940 | Incoming | Answered | 4.2 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”direction”: ”Incoming”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
2021-12-06T09:07:58 | 0765831292 | Outgoing | Unanswered | 0 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:07:58”, ”number”: ”0765831292”, ”direction”: ”Outgoing”, ”Outcome”: Unanswered”, ”minutes”:”0″} |
2021-12-06T09:10:12 | 0708887950 | Incoming | Answered | 0.7 | 2021-12-05 13:02:10 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”Incoming.”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
2021-12-06T09:08:12 | 0703857940 | Incoming | Answered | 4.2 | 2021-12-05 13:02:10 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”direction”: ”Incoming”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
Sedär! Nu börjar det likna något. Notera att jag även gett kolumner nya namn, för att vara mer konsekvent med övriga data i vår (påhittade) organisation.
Steg 2: Casta fält till korrekta typer
Men, vi kan definitivt göra en del med datakvalitén. T.ex. är datumet fortfarande inläst som en sträng, värdena i type och outcome kan standardiseras och göra enklare att querya, och måttet duration blir enklare att arbeta med om det definieras i sekunder. Såhär ser koden för att fixa det ut:
som genererar följande tabell:
call_time | caller_number | call_type | call_outcome | call_duration | _loaded_at | _loaded_data |
---|---|---|---|---|---|---|
2021-12-06 08:10:12 UTC | 0708887950 | incoming | answered | 42.0 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”Incoming.”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
2021-12-06 08:08:12 UTC | 0703857940 | incoming | answered | 252.0 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”direction”: ”Incoming”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
2021-12-06 08:07:58 UTC | 0765831292 | outgoing | unanswered | 0.0 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:07:58”, ”number”: ”0765831292”, ”direction”: ”Outgoing”, ”Outcome”: Unanswered”, ”minutes”:”0″} |
2021-12-06 08:10:12 UTC | 0708887950 | incoming | answered | 42.0 | 2021-12-05 13:02:10 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”Incoming.”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
2021-12-06 08:08:12 UTC | 0703857940 | incoming | answered | 252.0 | 2021-12-05 13:02:10 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”direction”: ”Incoming”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
Steg 3: Rensa bort dubbletter
Men, ett problem som vi skjutit på framtiden är att vi har dubbletter i datan. Dags att ta tag i det nu.
Att vi har dubbletter beror på att vår EL-pipeline är relativt dum, ”Varje timme syncas de senaste 3 dygnens data från källan”. EL pipelinen bryr sig alltså inte om att kolla huruvida en rad redan syncats från källan till DW:t. Det gör ju EL-delen av pipelinen mindre komplex, men vi har flyttat problemet till Transformerings-steget.
För att lösa detta kan vi använda oss av en metod som förlitar sig på surrogate keys. En surrogate key är en nyckel som vi kan använda för att identifiera unika rader. För att skapa en sådan behöver vi först ta reda på hur vi kan identifiera att en rad i indatan är unik. För detta exempel kan vi säga att varje unik rad ska ha en unik kombination av värden i kolumn_a, b, c, d osv. Såhär gör vi för att räkna ut varje rads surrogate_key
call_surr_key | call_surr_key_rn | call_time | caller_number | call_type | call_outcome | call_duration | _loaded_at | _loaded_data |
---|---|---|---|---|---|---|---|---|
0cc175b9… | 2 | 2021-12-06 08:10:12 UTC | 0708887950 | incoming | answered | 42.0 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”Incoming.”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
92eb5ff… | 2 | 2021-12-06 08:07:58 UTC | 0703857940 | incoming | answered | 252.0 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”direction”: ”Incoming”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
4a8a08f09… | 1 | 2021-12-06 08:07:58 UTC | 0765831292 | outgoing | unanswered | 0.0 | 2021-12-06 13:04:14 UTC | {”call_time”: ”2021-12-06T09:07:58”, ”number”: ”0765831292”, ”direction”: ”Outgoing”, ”Outcome”: Unanswered”, ”minutes”:”0″} |
0cc175b9… | 1 | 2021-12-06 08:10:12 UTC | 0708887950 | incoming | answered | 42.0 | 2021-12-05 13:02:10 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”Incoming.”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
92eb5ff… | 1 | 2021-12-06 08:08:12 UTC | 0703857940 | incoming | answered | 252.0 | 2021-12-05 13:02:10 UTC | {”call_time”: ”2021-12-06T09:10:12”, ”number”: ”0708887950”, ”direction”: ”Incoming”, ”Outcome”: Answered”, ”minutes”:”0.7″} |
Som vi kan se har vi två rader där surrogate_keyn är ”0cc175b9…” respektive ”92eb5ff…”. En av dessa rader är en dubblett och kan slängas. För att identifiera dem har jag använt mig av en fönsterfunktionen (window function) row_number, som ger varje rad med samma surrogate key en siffra från ett och uppåt. Vilken siffra varje rad får beror på hur vi sorterar fönstret.
Vilken rad vi slänger är i detta fall irrelevant, då båda raderna har exakt samma värde. Men det finns situationer där vi har preferens för vilken vi vill spara. Tänk tillexempel om vi hade arbetat med data från Facebook Ads. Vi hade haft två eller fler rader för en kampanjs utfall en dag, med olika antal konverteringar baserat på när datan hämtades från Facebook. Eftersom Facebook använder sig av ett attribueringsfönster som låter konverteringar trillar in på en aktivitet ett visst antal dagar i framtiden. I det fallet hade vi velat slänga alla rader förutom den senast laddade för en unik surrogate key.
Men, i vårt fall så är nu nästan färdiga med att göra datan redo för analys. Det enda som återstår är att faktiskt filtrera bort dubbletterna och slänga fält som hör till källan.
call_surr_key | call_time | caller_number | call_type | call_outcome | call_duration |
---|---|---|---|---|---|
0cc175b9… | 2021-12-06 08:10:12 UTC | 0708887950 | incoming | answered | 42.0 |
92eb5ff… | 2021-12-06 08:07:58 UTC | 0703857940 | incoming | answered | 252.0 |
4a8a08f09… | 2021-12-06T09:07:58 | 0765831292 | outgoing | unanswered | 0.0 |
Snyggt va? 🙂
Steg 4: Tester / Felhantering i dbt
Nu är vi nästan hemma. Tabellen som vi skapat är tvättad och redo att användas för analys. I det här skedet är det bra att också få ner de förväntningar vi har på datan i tester. Tester gör att vi kan lita på att datan fortsätter att flöda in och håller samma format som tidigare.
Tester jag hade velat sätta upp i detta fall är:
- Fräschhet. Hur gammal är den senaste datapunkten vi har inladdad just nu? Vi vill få en varning om att det verkar som att ny indata inte laddas in.
- Förväntade värden. Vilka värden förväntar vi oss i olika kolumner? Tänk om indatan skulle börja innehålla fler värden än incoming eller outgoing för call_type.
- Unikhet för surrogate_keyn. Vi vill vara helt säkra på att vi inte råkar läsa in dubbletter för samma rad i framtiden. Detta test fångar eventuella fel om någon skulle ändra i koden och glömma det kravet.
dbt gör detta väldigt enkelt. Vi kan med hjälp av en .yml fil enkelt mappa ut vilka fält som finns i vår tabell och vilka antaganden vi har på dem. dbt kommer sedan att testa vår tabell varje gång den byggts om på ny indata.
Allt tillsammans
Svårare än såhär är det inte. Med en väldigt simpel integration som bara trycker in data i en tabell kan vi göra datan korrekt och redo för analys med hjälp av relativt lite SQL och dbt.
Vill du ha all kod som referens? Här hittar du dbt-modellen (sql-koden), och här dess tillhörande dokumentation (yml filen).
Vad innebär detta?
Som du ser är det både robust och enkelt att sätta upp T:et i ELT med dbt. Vilket gör att vi kan sänka komplexiteten i/kraven på EL-stegen rejält. Det har gått så långt att du kan få rådata levererad till ert DW med hjälp av SaaS-aktörer som Fivetran eller Stitch.
Dessutom finns det två open source-projekt som vill demokratisera tillgången på integrationer helt och hållet. Ett av deras argument är att det är det enda sättet att hantera ”den långa svansen av integrationer”, med vilket dem menar att FiveTran/Stitch alltid kommer att ha svårt att motivera underhållet att mindre populära integrationer ur en ekonomisk synvinkel. Genom att göra integrationskoden fullt tillgänglig möjliggör Airbyte och Meltano för alla företag att gå in och fixa/underhålla eventuella integrationer som är unika för dom, medans de får resten ”gratis”.
Det är ett intressant space där mycket händer. En allt större del av det som tidigare varit dyr och komplex data engineering går att lösa med spetsiga verktyg skapade av communities. En ökande del av vår tankekraft kan vi lägga på att skapa värde från datan.
Jag hoppas att du lärde dig något nytt av den här artikeln, oavsett vilken roll du har.
Har du någon fråga eller är du intresserad av att prata mer? Lägg till mig på LinkedIn eller skriv i dbt-slacken 🙂