Garantire le scritture simultanee affidabili con opzioni di riproduzione
Delta Lake è uno strato di archiviazione resiliente che offre transazioni acide, applicazione dello schema e versione dei dati. Tuttavia, le scritture simultanee generano contese poiché diversi processi stanno tentando di scrivere, aggiornare o eliminare contemporaneamente. Questo processo offre un meccanismo di riproduzione strutturato con backoff esponenziale per gestire la concorrenza nelle tabelle delta.
La tabella delta concorrente scrive problemi
Fallimenti di concorrenza Si verificano quando più processi tentano contemporaneamente di scrivere sulla stessa tabella delta. Gli scenari di fallimento comuni sono i seguenti:
- ConcurrentAppendException – Quando i lavori simultanei aggiungono document contemporaneamente, con aggiunte in conflitto.
- ConcurrentDeleTeAdeAxception – Se un processo sta cercando di leggere i dati eliminati da un altro processo.
- ConcurrentDeletedeleTexception – Quando due processi tentano di eliminare gli stessi dati contemporaneamente.
Questi problemi devono avere una struttura di pensionamento sempre presente che fa sì che le scritture avvengano con successo con coerenza.
Proposto meccanismo di riproduzione per le scritture della tabella delta
Viene utilizzata una procedura di ritenzione di scrittura in streaming per mitigare i guasti di scrittura simultanei utilizzando il backoff esponenziale.
Il seguente codice Python descrive il processo:
from datetime import datetime
from time import sleep
from delta.exceptions import (
ConcurrentAppendException,
ConcurrentDeleteReadException,
ConcurrentDeleteDeleteException,
)
import math
def streaming_write_with_concurrent_retry(
stream, max_attempts=3, indefinite=False, desk=None, path=None
):
"""
Handles concurrent write operations to a Delta desk or path by retrying the operation
in case of particular concurrent exceptions.
:param stream: The information stream to be written.
:param max_attempts: The utmost variety of retry makes an attempt. Default is 3.
:param indefinite: If True, will hold retrying indefinitely. Default is False.
:param desk: The Delta desk to jot down to.
:param path: The trail to jot down to.
:return: The results of author.awaitTermination().
"""
try = 0 # Initialize try counter
whereas True:
strive:
# Select the author based mostly on whether or not desk or path is offered
if desk:
author = stream.desk(desk)
elif path:
author = stream.begin(path)
else:
author = stream.begin()
# Try to jot down and await termination
return author.awaitTermination()
# Deal with concurrent exceptions
besides (
ConcurrentAppendException,
ConcurrentDeleteReadException,
ConcurrentDeleteDeleteException,
) as e:
# Increment try counter
try += 1
# If indefinite is False and makes an attempt have reached max_attempts, elevate the exception
if not indefinite and try >= max_attempts:
elevate e from None
# Calculate sleep time utilizing exponential backoff technique
sleep_time = min(120, math.pow(2, try))
# Log the retry try
print(f"Retrying {try}/{max_attempts if not indefinite else '∞'} after {sleep_time} seconds as a result of {kind(e).__name__}")
# Sleep for the calculated time earlier than retrying
sleep(sleep_time)
Ritratta la spiegazione della strategia
La politica di pensionamento segue una politica di backoff esponenziale:
1. Identificazione delle eccezioni
La funzione cattura ConcurrentAppendException
, ConcurrentDeleteReadException
E ConcurrentDeleteDeleteException
.
2. Riprovare tentativi e limitazioni
La funzione si rivolge a un massimo di max_attempts
volte prima che fallisca. IL indefinite=True
Il parametro consente i tentativi infiniti fino al successo.
3. Calcolo del backoff esponenziale
System di backoff: sleep_time = min(120, 2^try)
. Ciò garantisce che i tempi di attesa di attesa crescano esponenzialmente ma sono limitati a un massimo di 120 secondi.
Esempio di tempi di attesa di riproduzione:
- Tentativo 1 → Aspetta 2 secondi
- Tentativo 2 → Aspetta 4 secondi
- Tentativo 3 → Aspetta 8 secondi
- (fino a un massimo di 120 secondi)
Riprendendo il flusso
Una volta che il tentativo ha successo, author.awaitTermination()
consente al lavoro di streaming di continuare a funzionare.
Strategie different per la gestione della concorrenza della tabella delta
Oltre alla risoluzione dei conflitti basata sulla pensione, Lago Delta Offre tecniche aggiuntive:
1. Controllo ottimista di concorrenza (OCC)
Delta Lake controlla i conflitti prima di commettere una transazione. Se c’è un conflitto, lo farà Riprova automaticamente l’operazione.
2. Dati di partizionamento per l’isolamento
Le operazioni di scrittura dovrebbero essere mirate advert altre partizioni per evitare la collisione.
df.write.format("delta").mode("overwrite").possibility("replaceWhere", "date="2025-02-17"").save("/mnt/delta/desk")
Ciò limita gli aggiornamenti a una singola partizione, riducendo la contesa.
3. Streaming e ottimizzare automaticamente
Abilitare Auto-Optimize
E Auto-Compact
:
ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
Questa modifica della configurazione riduce i piccoli file e migliora le prestazioni di scrittura simultanee.
4. Upserts basati su unione
Invece di inserti diretti, usa MERGE
Per gestire i conflitti:
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/mnt/delta/table_name")
delta_table.alias("goal").merge(
df.alias("supply"),
"goal.id = supply.id"
).whenMatchedUpdate(set={"goal.worth": "supply.worth"}).whenNotMatchedInsertAll().execute()
Questo processo garantisce la risoluzione dei conflitti a livello di riga.
Monitoraggio dei problemi di debug di debug
1. Controllare la cronologia delle transazioni di carico dei dati:
DESCRIBE HISTORY delta.`/mnt/delta/table_name`;
2. Visualizza/controlla eventuali blocchi attivi:
SHOW TBLPROPERTIES delta.`/mnt/delta/table_name`;
3. Abilita il feed di dati di modifica (CDF) per il monitoraggio delle modifiche:
ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES ('delta.enableChangeDataFeed' = true);
Conclusione
Le tabelle delta consentono le scritture simultanee con garanzie acide, ma sono possibili conflitti.
- Una strategia basata su pensionati con backoff esponenziale aiuta a mitigare i problemi di concorrenza.
- Partizionamento,
MERGE
EAuto-Optimize
Migliora anche le prestazioni di scrittura simultanea. - Meccanismi di monitoraggio come
DESCRIBE HISTORY
ECDF
Seguire i conflitti.
Aderendo a queste migliori pratiche, il processo può raggiungere in modo efficiente le scritture della tabella Delta, mantenere l’integrità dei dati e ottenere ottimizzazioni alle prestazioni.