In questa lezione impareremo:
- cosa sono gli stream in Dart e come si utilizzano,
- come creare uno stream con
async*
e - come creare stream a partire da altri stream.
Cosa sono gli stream
In Dart con il termine stream ci si riferisce a dei *generatori di eventi
asincroni, si può pensare ad uno stream come ad una sequenza di Future
(vedi
lezioni su async
e await
).
Attenzione
In diversi linguaggi di programmazione (ad esempio Java e C++) il concetto di
stream viene utilizzato per indicare un flusso di dati. Anche in Dart gli
stream rappresentano un flusso di dati, ma questo flusso obbedisce alle regole
della programmazione asincrona.
Nel resto della lezione vedremo, prima come si utilizza uno stream, poi come si
crea.
Utilizzare stream
Per comprendere come utilizzare uno stream, consideriamo l’esempio proposta anche
nella documentazione Dart sull’utilizzo degli stream.
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
Avendo completato le lezione su async
e await
,
l’intera funzione sumStream
dovrebbe essere chiara ad eccezione del costrutto await for
.
Il miglior modo per capire il funzionamento di await for
è di immaginarlo come un normale
for each nel quale ogni iterazione del ciclo è una chiamata asincrona con await
.
Vediamo quindi cosa succede in dettaglio alla chiamata della funzione sumStream
(si noti
come tale chiamata avviene mediante await
, infatti sumStream
è una funzione async
).
void main() async {
Stream<int> stream = // ...
int sum = await sumStream(stream);
print(sum);
}
- Quando il
main
invoca la funzione asincrona sumStream
, la funziona viene
eseguita normalmente fino alla riga contenente await for
. - Quando viene incontrata l’istruzione
await for
, Dart inizia ad iterare
sull’oggetto stream
, esattamente come un normale for each, la differenza è
che ogni volta che si assegna a value
il prossimo valore, questo assegnamento
è come una chiamata asincrona a stream
. - La chiamata
await for
“attiva” lo stream che inizia a generare eventi
asincroni (sulla base di come lo stream è stato creato, vedi sotto. - Ogni evento asincrono generato dallo stream mette in
await
la funzione sumStream
esattamente come se fosse una singola chiamata await
. - Quando lo stream ha terminato di generare eventi, l’istruzione
await for
termina esattamente come termina un for each quando tutti gli elementi dell’Iterable
sono stati considerati.
Importante
La funzione sumStream
deve essere async
per poter utilizzare await for
(esattamente come ogni funzione che usa await
deve essere async
). Essendo una
funzione async
, quindi, sumStream
restituisce un Future
(in questo caso di
tipo int
). Come visto nella lezione await
async
,
la chiamata ad una funzione async
può o meno avvenire con await
, ma nel caso
si ometta, la variabile restituita (immediatamente senza attendere la terminazione)
è di tipo Future<int>
anziché di tipo int
.
Creare stream
Per creare uno stream ci sono 3 modi:
- definire una funzione
async*
, - generare uno stream a partire da un altro stream e
- usare la classe
StreamController
.
In questa lezione ci occupiamo solo dei primi due casi, per maggiori dettagli
anche sull’uso di StreamController
si può consultare la documentazione online
sulla creazione di stream.
Creazione di uno stream con async*
Anche in questa parte della lezione utilizziamo lo stesso esempio della guida online.
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
La funzione timedCounter
sotto crea uno Stream<int>
che conta fino a maxCount-1
(o all’infinito), il conteggio avviene ad intervalli regolari di durata `interval.
Come si nota, la funzione timedCounter
utilizza due costrutti propri della programmazione
asincrona await
e yield
. L’uso di await
in questo caso serve unicamente a far
trascorrere l’intervallo di tempo richiesto, infatti il risultato della
chiamata Future.delayed
non viene mai utilizzato (tanto che non viene nemmeno
memorizzato).
L’utilizzo di yield
necessita di una breve spiegazione, ogni volta che uno stream
deve emettere un nuovo valore, lo fa attraverso la parola chiave yield
. Nell’esempio
sopra ogni secondo viene emesso il valore della variabile i
che successivamente viene
incrementato (ricorda la semantica di i++
e di ++i
).
Importante
La funzione timedCounter
sopra è stata dichiarata asincrona star con la parola
chiave async*
, questa significa che questa funzione restituisce uno Stream
esattamente
come async
(senza *
) significa che la funzione restituisce un Future
.
Per vedere un esempio di utilizzo di timedCounter
, consideriamo il seguente
codice che stampa a video il conteggio dei secondi da 0
a 9
:
void main() async {
Stream<int> stream = timedCounter(Duration(seconds: 1), 10);
await for (final i in stream) {
print('$i seconds');
}
}
Esercizio
Spesso è utile avere un countdown cioè un conteggio dal valore iniziale a 0
,
scrivere una funzione timedCountdown
con gli stessi parametri di timedCounter
che restituisca uno stream per il countdown.
Creazione di uno stream a partire da un altro stream
Spesso si vuole generare uno stream a partire da un altro stream, ad esempio
uno stream che generare un evento ogni secondo lo vogliamo trasformare in uno
che genera un evento ogni 5 secondi, oppure da una sequenza di stringhe vogliamo
estrarre solo quelle che iniziano con una specifica parola.
Un modo ovvio per fare questa operazione di trasformazione è definendo una
funzione async*
che prende lo stream originale come input e genera gli eventi
“trasformati”.
Stream<int> squaredCounter(Stream<int> counter) async* {
await for (final i in counter) {
yield i*i;
}
}
Metodi di Stream
Il secondo modo per “trasformare” uno stream in un altro è attraverso dei metodi
specifici forniti dalla classe Stream
. La lista di questi metodi è troppo
lunga per essere discussa qui per esteso, ma possiamo vederne alcuni tra i più
comunemente utilizzati.
map
: converte ogni elemento dello stream originale in un nuovo eventoskip
: salta il numero di eventi indicatotake
: prende il numero di eventi indicatowhere
: seleziona solo gli eventi con soddisfano la condizione data
Nell’esempio sotto vediamo l’utilizzo di questi metodi questi metodi
// map: trasforma 'i' in 'i*i'
Stream<int> stream = timedCounter(Duration(milliseconds: 200), 5);
Stream<int> transformed = stream.map((i) => i*i);
await for (final i in transformed) {
print(i); // 0, 1, 4, 16
}
// take: prende i primi '3' elementi dello stream
stream = timedCounter(Duration(milliseconds: 200), 5);
transformed = stream.take(3);
await for (final i in transformed) {
print(i); // 0, 1, 2
}
// skip: salta i primi '3' elemento dello stream
stream = timedCounter(Duration(milliseconds: 200), 5);
transformed = stream.skip(3);
await for (final i in transformed) {
print(i); // 3, 4
}
// where: considera solo gli elementi pari (i%2 == 0) dello stream
stream = timedCounter(Duration(milliseconds: 200), 5);
transformed = stream.where((i) => (i % 2 == 0)); // numeri pari
await for (final i in transformed) {
print(i); // 0, 2, 4
}
Attenzione
Affinché uno stream possa generare eventi a partire da un stream di origine,
quest’ultimo deve essere utilizzabile nel senso che non deve essere terminato.
L’utilizzo di uno stream ormai concluso genera un errore con un messaggio
simile al seguente:
Uncaught Error: Bad state: Stream has already been listened to.
Attenzione
L’utilizzo delle funzioni di trasformazione quali map
, where
, …
“consumano” lo stream originale. Per questo motivo, dopo aver consumato lo
stream trasformato, anche lo stream originale sarà concluso e diventa un
errore usare await for
su quello stream.
void main() async {
Stream<int> stream = timedCounter(Duration(milliseconds: 200), 10);
Stream<int> transformed = stream.map((i) => i*i);
await for (final i in transformed) {
print(i); // 0, 1, 4, 9, 16, ... 81
}
// Error: 'already listened'
await for (final i in stream) {
print(i);
}
}
Utilizzo di forEach
Nella classe Stream
è disponibile il metodo forEach
che restituisce un Future
,
questo metodo può essere utilizzato al posto di await for
per generare codice
più compatto (soprattutto se l’operazione da fare nel corpo del for
è semplice
come un semplice print(i)
). Ad esempio il codice sotto è equivalente all’utilizzo
di where
visto sopra per stampare i soli numeri pari
Stream<int> stream = timedCounter(Duration(milliseconds: 200), 6);
stream.where((i) => (i % 2 == 0)).forEach(print);
Si noti che:
- si usa lo
Stream
restituito da where
in modo implicito (senza assegnarlo) ad
una variabile, - il metodo
forEach
accetta una funzione con un parametro che è l’evento emesso
dal stream, in questo esempio la funzione è print
Link utili