Java è un linguaggio di programmazione con molte caratteristiche del linguaggio, specifiche e API. Anche tra gli sviluppatori di Java esperti, essere consapevoli di tutti questi è piuttosto raro. Se fossero stati condotti uno studio, potremmo imbatterci in sviluppatori Java che non hanno mai lavorato con thread, non hanno mai usato JPA o non hanno mai sviluppato annotazioni personalizzate. Tuttavia, esiste uno sviluppatore Java che ha lavorato con Java 8 o successivamente ma non ha mai usato l’API di streaming? Ne dubito fortemente.
I GATTERER sono una potente estensione dell’API del flusso che introduce il supporto per le operazioni intermedie personalizzate. Inizialmente introdotto come a Funzione di anteprima in JDK 22è diventata una caratteristica commonplace in JDK 24.
Cosa sono i raccoglitori?
I raccoglitori sono stati sviluppati per modellare le operazioni intermedie nel API di streaming. Proprio come un collettore modella un’operazione terminale, un raccoglitore è un oggetto che modella un’operazione intermedia. I raccoglitori supportano le caratteristiche delle operazioni intermedie: possono spingere qualsiasi numero di elementi nel flusso che producono, mantenere uno stato mutabile interno, corto circuito Un flusso, consumo di ritardo, essere incatenato ed eseguire in parallelo.
Per questo motivo, come affermato in JEP 485:
In effetti ogni pipeline di flusso è, concettualmente, equivalente a
Supply.Collect (…) .Collect (…) .Collect (…) .Gather (…)
public interface Gatherer { … }
T
rappresenta l’elemento di enter.A
rappresenta il potenziale oggetto statale mutabile.R
rappresenta l’output che verrà spinto a valle.
Un raccoglitore è costruito su quattro elementi chiave:
Initializer
– Una funzione che produce un’istanza dello stato intermedio interno.Integrator
– Integra un nuovo elemento nel flusso prodotto dal raccoglitore.Combiner
– Una funzione che accetta due stati intermedi e li unisce in uno. Supportare l’esecuzione parallela.Finisher
– Una funzione che consente di eseguire un’azione finale alla tremendous degli elementi di enter.
Tra questi quattro elementi, solo l’integratore è obbligatorio perché ha il ruolo di integrare un nuovo elemento nel flusso prodotto dal raccoglitore. Gli altri elementi possono o non possono essere richiesti, a seconda dell’operazione che si intende modellare, rendendoli opzionali.
Creare un raccoglitore
I raccoglitori vengono creati utilizzando metodi di fabbrica o è possibile implementare l’interfaccia Gatcher. A seconda dell’operazione che si desidera modellare, è possibile utilizzare le varianti sovraccariche di Gatherer.of
E Gatherer.ofSequential
.
var uppercaseGatherer = Gatherer.of((state, factor, downstream)
-> downstream.push(factor.toUpperCase()));
Il raccoglitore di esempio sopra chiama toUpperCase
su un elemento di enter di tipo String
e spinge il risultato a valle. Questo raccoglitore è equivalente alla seguente operazione della mappa.
Stream.of("a", "b", "c", "d", "e", "f", "g")
.map(String::toUpperCase)
.forEach(System.out::print);
L’interfaccia del flusso ora embody un metodo chiamato collect()
che accetta un parametro di un raccoglitore. Possiamo usarlo passando il raccoglitore che abbiamo creato.
Stream.of("a", "b", "c", "d", "e", "f", "g")
.collect(uppercaseGatherer)
.forEach(System.out::print);
Gaters integrati
IL java.util.stream.Gatherers
La classe è una classe di fabbrica che contiene implementazioni predefinite del java.util.stream.Gatherer
interfaccia, definendo cinque diversi raccoglitori.
- finestra fissata. È un raccoglitore da molti a molti che raggruppa gli elementi di inserimento in elenchi di dimensioni fornite, emettendo le finestre a valle quando sono piene.
- Windowsliding. È un raccoglitore da molti a molti che raggruppa gli elementi di inserimento in elenchi di dimensioni fornite. Dopo la prima finestra, ogni finestra successiva viene creata da una copia del suo predecessore facendo cadere il primo elemento e aggiungendo l’elemento successivo dal flusso di enter.
- piega. È un raccoglitore molti-to-one che costruisce un aggregato in modo incrementale ed emette che si aggregano quando non esistono più elementi di enter.
- scansione. È un raccoglitore individuale che applica una funzione fornita allo stato corrente e all’elemento corrente per produrre l’elemento successivo, che passa a valle.
- MapConCurrent. È un raccoglitore individuale che invoca una funzione fornita per ciascun elemento di enter contemporaneamente, fino a un limite fornito. La funzione esegue nel thread virtuale.
Tutti i raccoglitori di cui sopra sono statali. La piega e la scansione sono molto simili al funzionamento del flusso. La differenza chiave è che entrambi possono assumere un enter di tipo T e produrre un output di tipo R e il loro elemento di identità è obbligatorio, non facoltativo.
Crea il tuo raccoglitore
Vediamo come possiamo scrivere il nostro raccoglitore personalizzato usando uno state of affairs del mondo reale. Immagina di elaborare il flusso di registro di un sistema. Ogni voce di registro rappresenta un evento ed è valutato in base a determinate regole per determinare se è anomalo. La regola e lo state of affairs sono i seguenti:
- Regola. Un evento (voce di registro) è considerato anomalo se supera una determinata soglia o contiene un errore.
- State of affairs. Se si verifica un errore e viene immediatamente seguito da diversi eventi anomali (tre di fila, advert esempio), potrebbero far parte di una catena di fallimento. Tuttavia, se un evento “normale” appare in mezzo, la catena è rotta.
In questo caso, possiamo scrivere un raccoglitore che elabora un flusso di registro e restituisce solo gli eventi anomali ininterrotti.
Informazioni, errore, errore, informazioni, avviso, errore, errore, errore, informazioni, debug
Supponiamo che l’oggetto nel nostro flusso di registro sia strutturato come segue.
class LogWrapper {
enum Stage{
INFO,
DEBUG,
WARNING,
ERROR
}
non-public Stage degree;
non-public String particulars;
}
L’oggetto ha un campo di livello che rappresenta il livello di registro. Il campo Dettagli rappresenta il contenuto della voce del registro.
Abbiamo bisogno di un raccoglitore statale perché dobbiamo conservare le informazioni sugli eventi passati per determinare se si verificano fallimenti consecutivamente. Per raggiungere questo obiettivo, lo stato interno del nostro raccoglitore può essere un Listing
static Provider> initializer() {
return ArrayList::new;
}
L’oggetto restituito dal file initializer()
corrisponde al secondo parametro spiegato in precedenza nei parametri di tipo dell’interfaccia Gatherer.
static Integrator, LogWrapper, String> integrator(remaining int threshold) {
return ((internalState, factor, downstream) -> {
if(downstream.isRejecting()){
return false;
}
if(factor.getLevel().equals(LogWrapper.Stage.ERROR)){
internalState.add(factor);
} else {
if(internalState.dimension() >= threshold){
internalState.stream().map(LogWrapper::getDetails).forEach(downstream::push);
}
internalState.clear();
}
return true;
});
}
L’integratore sarà responsabile dell’integrazione di elementi nel flusso prodotto. Il terzo parametro dell’integratore rappresenta l’oggetto a valle.
Controlliamo se sono necessari più elementi chiamando il isRejecting()
che determina se la fase successiva non vuole più ricevere elementi. Se questa condizione è soddisfatta, ritorniamo falsi.
Se l’integratore restituisce falso, esegue un file short-circuit
operazione simile alle operazioni intermedie come allMatch
, anyMatch
E noneMatch
Nell’API del flusso, indicando che non verranno più integrati elementi nel flusso.
Se isRejecting()
Restituisce false, controlliamo se il valore di livello del nostro elemento di flusso, LogWrapper, è un errore. Se il livello è un errore, aggiungiamo l’oggetto al nostro stato interno. Se il livello non è un errore, controlliamo le dimensioni del nostro stato interno.
Se la dimensione supera o è uguale alla soglia, spingiamo gli oggetti LogWrapper memorizzati nello stato interno a valle. In caso contrario, non lo facciamo.
Voglio che tu presti attenzione a due cose qui. Spingere un elemento a valle o no, secondo la regola aziendale, è simile a quello
filter()
fa. Accettare un enter di tipo logWrapper e produrre un output di stringa di tipo è simile a quellomap()
fa.
Successivamente, secondo la nostra regola aziendale, chiariamo lo stato interno e ritorniamo fedele a consentire di integrare nuovi elementi nel flusso.
static BinaryOperator> combiner() {
return (_, _) -> {
throw new UnsupportedOperationException("Can't be parallelized");
};
}
Per evitare che il nostro raccoglitore venga utilizzato in un flusso parallelo, definiamo un combinatore, anche se non è rigorosamente richiesto. Questo perché il nostro raccoglitore è intrinsecamente progettato per funzionare come previsto solo in un flusso sequenziale.
static BiConsumer, Downstream tremendous String>> finisher(remaining int threshold) {
return (state, downstream) -> {
if(!downstream.isRejecting() && state.dimension() >= threshold){
state.stream().map(LogWrapper::getDetails).forEach(downstream::push);
}
};
}
Infine, definiamo un finitore per spingere eventuali elementi di flusso rimanenti che non sono ancora stati emessi a valle.
Se isRejecting()
Restituisce falso e la dimensione dello stato interno è maggiore o uguale alla soglia, spingiamo gli oggetti LogWrapper memorizzati nello stato interno a valle.
Quando utilizziamo questo raccoglitore sui dati:
ERROR, Course of ID: 191, occasion particulars ...
INFO, Course of ID: 216, occasion particulars ...
DEBUG, Course of ID: 279, occasion particulars ...
ERROR, Course of ID: 312, occasion particulars ...
WARNING, Course of ID: 340, occasion particulars ...
ERROR, Course of ID: 367, occasion particulars ...
ERROR, Course of ID: 389, occasion particulars ...
INFO, Course of ID: 401, occasion particulars ...
ERROR, Course of ID: 416, occasion particulars ...
ERROR, Course of ID: 417, occasion particulars ...
ERROR, Course of ID: 418, occasion particulars ...
WARNING, Course of ID: 432, occasion particulars ...
ERROR, Course of ID: 444, occasion particulars ...
ERROR, Course of ID: 445, occasion particulars ...
ERROR, Course of ID: 446, occasion particulars ...
ERROR, Course of ID: 447, occasion particulars ...
Simile a quello sopra, otteniamo il seguente risultato:
Course of ID: 416, occasion particulars …
Course of ID: 417, occasion particulars …
Course of ID: 418, occasion particulars …
Course of ID: 444, occasion particulars …
Course of ID: 445, occasion particulars …
Course of ID: 446, occasion particulars …
Course of ID: 447, occasion particulars …
L’esempio del codice è accessibile in Repository github.
Conclusione
I raccoglitori sono un’API nuova e potente che migliora l’API del flusso modellando le operazioni intermedie e consentendo la definizione di operazioni intermedie personalizzate. Un raccoglitore supporta le funzionalità che hanno le operazioni intermedie; Può spingere un numero qualsiasi di elementi sul flusso risultante, mantenere uno stato mutabile interno, corto circuito Un flusso, consumo di ritardo, essere incatenato ed eseguire in parallelo.