Enkelt design af Kotlin Flow

Flow af Grant Tarrant

I en tidligere historie om ”Kolde strømme, varme kanaler” har jeg defineret kolde og varme datastrømme og vist en brugskasse til Kotlin Flows - kolde asynkrone strømme. Lad os nu kigge under hætten, undersøge deres design og se, hvordan en kombination af sprogfunktioner og et bibliotek muliggør en kraftig abstraktion med enkelt design.

En flow i Kotlin er repræsenteret af en grænseflade²:

interface Flow  {
    suspendere sjovt samle (samler: FlowCollector )
}

Alt, hvad der er til en flow, er en enkelt indsamlingsfunktion, der accepterer en forekomst af FlowCollector-interface med en enkelt emitmetode:

interface FlowCollector  {
    suspendere sjov udsendelse (værdi: T)
}

Et udsendelsesnavn skal lyde velkendt for en læser af “Kolde strømme, varme kanaler”. Der har jeg faktisk vist et eksempel på følgende flowdefinition:

val ints: Flow  = flow {
    for (i i 1..10) {
        forsinkelse (100)
        emit (i) // <- emit kaldes her
    }
}

En signatur af flowbuilder bruger også et FlowCollector-interface som en receiver³, så vi kan udsende direkte fra kroppen af ​​den tilsvarende lambda:

sjov  flow (blok: suspendere FlowCollector . () -> Enhed): Flow 

For en simpel brug af en strøm, når strømmen opsamles, sådan:

ints.collect {println (it)} // tager 1 sekund, udskriver 10 ints

hvad der sker er, at der oprettes en forekomst af FlowCollector baseret på den lambda, der er sendt til at indsamle {…} -funktionen, og netop dette tilfælde sendes derefter til flowet {…} body⁴.

Således er en vekselvirkning mellem en flowemitter og en flowkollektor den ved et simpelt funktionsopkald - en call of emit funktion. Hvis vi mentalt indlæser dette funktionskald, kan vi øjeblikkeligt forstå, hvad der sker, når vi kører denne kode⁵ - det vil svare til:

for (i i 1..10) {
    forsinkelse (100)
    println (i) // <- emit blev kaldt her
}

Operatører

En flowbuilder og en indsamlingsterminaloperatør er alt, hvad vi har brug for at vide for at begynde at skrive operatører, der transformerer flow på forskellige måder. For eksempel kan en grundlæggende kortoperator, der anvender en specificeret transformation på enhver udsendt værdi, implementeres på denne måde:

sjov  Flow  .map (transform: suspend (værdi: T) -> R) = flow {
    indsamle {emit (transform (it))}
}

Ved hjælp af denne operator kan vi nu gøre ints.map {it * it} for at definere en strøm med kvadrater af de originale heltal. Der flyder stadig elementer fra emitteren til samleren via funktionskald. Der er ganske enkelt endnu en funktion imellem nu.

Faktisk definerer kotlinx.coroutines bibliotek allerede kort og en række andre generelle formål-operatører som udvidelser på Flow-typen efter udvidelsesorienteret design-tilgang. Det, der er vigtigt i dette design, er, at det er ret let at definere domænespecifikke operatører. Der skelnes ikke mellem “indbyggede” og “brugerdefinerede” operatører - alle operatører er førsteklasses.

Rygpres

Modtryk i softwareteknik defineres som en datakonsument, der ikke kan følge med indgående data til at sende et signal til dataproducenten til at nedsætte hastigheden for dataelementerne.

Traditionelt reaktivt streams-design involverer en back-channel for at anmode om flere data fra producenterne efter behov. Håndtering af denne anmodningsprotokol fører til notorisk vanskelige implementeringer, selv for enkle operatører. Vi ser ikke noget af denne kompleksitet i designet af Kotlin-strømme eller i implementeringen af ​​operatører til dem, men alligevel understøtter Kotlin-strømme modtryk. Hvorfor?

Gennemsigtig håndtering af modtryk opnås i Kotlin-strømme ved hjælp af Kotlin-ophængningsfunktioner. Du har måske bemærket, at alle funktioner og funktionstyper i Kotlin flowdesign er markeret med Suspend modifier - disse funktioner har en superstyrke til at suspendere udførelsen af ​​opkalderen uden at blokere en tråd⁹. Så når strømmen er overvældet, kan den blot suspendere emitteren og genoptage den senere, når den er klar til at acceptere flere elementer.

Dette svarer ret til styring af modtryk i traditionelle trådbaserede synkrone datapipelinier, hvor en langsom forbruger automatisk anvender modtryk på producenten i kraft af at blokere producentens tråd. Suspendering af funktioner tager det ud over en enkelt tråd og til asynkron programmering ved at håndtere modtryk på tværs af trådene uden at blokere dem. Men det skal fortælles i en anden historie.

Yderligere læsning og fodnoter

  1. ^ Kold strømme, varme kanaler
  2. ^ Flow og relaterede typer og funktioner er stadig i preview som version 1.2.1 af biblioteket kotlinx.coroutines. Læs mere her.
  3. ^ Funktionstyper i Kotlin
  4. ^ Dette er en let forenkling. Den tager ikke højde for yderligere kontroller for at sikre bevarelse af konteksten, men dette emne er uden for denne historie. Flere detaljer i eksekveringssammenhæng med Kotlin Flows.
  5. ^ Du kan køre denne kode via Kotlin Playground her.
  6. ^ Udvidelsesorienteret design
  7. ^ Reaktive strømme
  8. ^ Implementering af operatører til [RxJava] 2.0
  9. ^ Blokerer tråde, suspenderer koroutiner