Una delle maggiori sfide nella gestione dei dati al giorno d’oggi è la sua natura in costante cambiamento. Di recente ci siamo imbattuti in un requisito per ingerire i dati del sondaggio e inviarli per analisi sentimentali. I dati del sondaggio sono molto dinamici e anche per i sondaggi esistenti, verranno aggiunte, modificate o eradicate nuove domande.
Man mano che l’origine dati si evolve e si adatta a nuove modifiche, la sfida è come gestire i dati che cambiano dinamicamente da rappresentare in un formato della tabella. Abbiamo implementato questo sfruttando la funzione Evolution Schema nelle tabelle delta.
Perché Delta Lake?
Lago Delta (o Delta Desk) è progettato per portare affidabilità e robustezza ai laghi di dati. Delta Lake è un formato da tavolo open supply che supporta Transazioni acideGestione dei metadati scalabili, un’affidabilità dei dati migliorata, prestazioni più veloci di question e integrazione senza soluzione di continuità con Spark.
Delta Lake supporta anche l’evoluzione dello schema. Nella gestione dei dati, la capacità di modificare la struttura dei dati nel tempo è un requisito comune, noto come evoluzione dello schema.
Dati di origine e ingestione
I dati di origine sono dati semi-strutturati e, in questo caso, riceviamo il file in formato di testo. Di solito, questi tipi di dati sono disponibili nei formati di file XML o JSON. Il primo passo è leggere i dati dall’origine e appiattirli di conseguenza agli customary di governance dei dati.
In questo esempio, riceviamo dati di origine in quattro lotti e ogni batch verrà ingerito nella tabella delta uno dopo l’altro. La tabella Delta viene creata inizialmente con questa struttura.
CREATE TABLE IF NOT EXISTS surveydb.CampaignTable
(
CustomerID STRING
,Tier STRING
,CampaignQn1 STRING
)
USING DELTA
LOCATION '/mnt/blobname/DeltaLake/CampaignTable'
Questo è il codice PysPark che viene utilizzato per caricare il telaio di dati in arrivo nella tabella Delta e questo viene eseguito per ogni batch. Delta non ci consente di aggiungere dati con lo schema non corrispondente per impostazione predefinita e questa funzione è nota come applicazione dello schema. Quindi, facciamo un body di dati vuoto aggiunto con lo schema unire e poi facciamo unione normale.
from delta.tables import DeltaTable
MergeColumns=(Col for Col in SourceDF.columns)
SourceDF.restrict(0).write.format("delta")
.mode("append")
.possibility("mergeSchema",True)
.save(TargetPath)
DeltaTable.forPath(spark,TargetPath).alias("TGT")
.merge(SourceDF.alias("STG")
,f"TGT.CustomerID=STG.CustomerID")
.whenNotMatchedInsert(values={Col:f"STG.{Col}" for Col in MergeColumns})
.execute()
Evoluzione dello schema in ogni lotto
Esaminiamo i dati di origine e la struttura della tabella Delta dopo l’esecuzione di ciascun batch. Nel batch iniziale, i dati vengono ricevuti nella stessa struttura della tabella, ma ciò non è obbligatorio in questo approccio.
Batch 1
Dati di origine per il batch iniziale.
CustomerID |
Livello |
Campagnaqn1 |
1001 |
Tier2 |
Q1_1001 |
Tabella di destinazione dopo il caricamento del lotto iniziale.
Batch 2
Nel secondo batch, una nuova colonna viene aggiunta ai dati di origine. Il comando di scrittura con il mergeSchema
opzione e restrict(0)
Aprirà solo lo schema, quindi la nuova colonna CampaignQn2
viene aggiunto automaticamente alla tabella.
IL merge
Il comando che segue successivo inserirà solo colonne specifiche disponibili nell’elenco. IL MergeColumns
L’elenco ha solo le colonne disponibili nel body dati in arrivo. In questo modo, la nuova colonna viene creata e caricata con dati.
CustomerID |
Livello |
Campagnaqn1 |
Campagnaqn2 |
1003 |
Tierl |
Q1_1003 |
Q2_1003 |
Batch 3
A differenza dei lotti precedenti, qui, i dati in arrivo hanno CampaignQn1
colonna mancante. Anche se il mergeSchema
corse, non c’è cambio di schema, quindi il merge
comando con MergeColumns
L’elenco inserisce solo le colonne disponibili nel body dati di origine.
CustomerID |
Livello |
Campagnaqn2 |
1005 |
Tier1 |
Q2_1005 |
Batch 4
Qui, le posizioni della colonna sono mescolate e c’è una nuova colonna, CampaignQn3
. IL mergeSchema
Fa un body di dati vuoto e aggiunge unione con lo schema e questo crea la nuova colonna.
La modifica della posizione della colonna viene gestita nominando le colonne durante merge
usando il MergeColumns
lista. In questo modo, i dati vengono aggiunti nelle colonne corrette nella tabella Delta.
CustomerID |
Livello |
Campagnaqn3 |
Campagnaqn2 |
1007 |
Tier1 |
Q3_1007 |
Q2_1007 |
Questo ci consente di modificare lo schema della tabella senza riscrivere una riscrittura completa dei dati. Questa funzione offre molta flessibilità, ma dobbiamo usarla attentamente. In Delta, il registro Delta tiene traccia di tutte le modifiche allo schema e ai dati. Oltre a ciò, possiamo tracciare la storia di questi cambiamenti usando il comando di cronologia descrivi.
DESCRIBE HISTORY surveydb.CampaignTable
Lago Delta Ci consente di aggiungere, modificare o eliminare le colonne in una tabella senza influire con pipeline di dati e quindi semplifica l’evoluzione dello schema. Inoltre, tiene traccia delle modifiche, rendendo più facile capire come i dati si sono evoluti nel tempo. L’applicazione dello schema e l’evoluzione in Delta aiutano a mantenere la coerenza e l’integrità dei dati prevenendo gli errori, con conseguente migliore qualità dei dati e governance dei dati più efficiente.
ConCrusca
L’evoluzione automatica dello schema in Delta è una caratteristica molto utile quando si tratta di fonti di dati dinamiche e supporta scenari più complessi. Quando le fonti di dati si evolvono, possiamo evitare l’intervento manuale di aggiornamento dello schema della tabella con un comando semplice che può essere eseguito insieme al processo di carico. L’evoluzione dello schema è particolarmente utile quando si consumano più fonti di dati con i dati che cambiano dinamicamente.