Reaktiva programmeringsoperatörer i RxJava 2

Om din Android-app ska racka upp de femstjärniga recensionerna på Google Play måste den kunna fungera med flera uppgifter.

Som ett rent minimum förväntar dagens mobilanvändare att de fortfarande kan interagera med din app medan den arbetar i bakgrunden. Det här låter enkelt, men Android är enkeltgängad som standard, så om du ska uppfylla dina publikers förväntningar, måste du förr eller senare behöva skapa några ytterligare trådar.

I den tidigare artikeln i den här serien fick vi en introduktion till RxJava, ett reaktivt bibliotek för JVM som kan hjälpa dig att skapa Android-applikationer som reagerar på data och händelser som de uppstår. Men du kan också använda det här biblioteket för att reagera på data och händelser samtidigt.

I det här inlägget kommer jag att visa dig hur du kan använda RxJavas operatörer för att äntligen göra samtidighet på Android en smärtfri upplevelse. I slutet av den här artikeln kommer du att veta hur du använder RxJava-operatörer för att skapa ytterligare trådar, ange det arbete som ska uppstå på dessa trådar och sedan posta resultaten tillbaka till Android: s viktigaste huvudbruksanvisning-allt med bara en några rader av kod.

Och eftersom ingen teknik är perfekt kommer jag också att berätta om ett stort potentiellt fallgrop att lägga till RxJava-biblioteket till dina projekt - innan du visar hur du använder operatörer för att säkerställa detta problem aldrig förekommer i dina egna Android-projekt.

Introduktion av operatörer

RxJava har en enorm samling av operatörer som huvudsakligen är avsedda att hjälpa dig att modifiera, filtrera, slå samman och omvandla de data som utges av din Märkbars. Du hittar den fullständiga listan över RxJava-operatörer över på de officiella dokumenten, och även om ingen förväntar dig att memorera varje enskild operatör, Det är värt att spendera lite tid att läsa igenom den här listan, bara så att du har en grov uppfattning om de olika datatransformationerna du kan utföra.

RxJavas lista över operatörer är redan ganska uttömmande, men om du inte kan hitta den perfekta operatören för datatransformationen du hade i åtanke, kan du alltid kedja flera operatörer tillsammans. Applicera en operatör till en Märkbar returnerar vanligtvis en annan Märkbar, så du kan bara fortsätta att använda operatörer tills du får de resultat du vill ha.

Det finns alltför många RxJava-operatörer att täcka i en enda artikel, och de officiella RxJava-dokumenten gör redan ett bra jobb för att introducera alla operatörer som du kan använda för datatransformationer, så jag kommer att fokusera på två operatörer som har mest potential att göra ditt liv som en Android-utvecklare enklare: subscribeOn () och observeOn ()

Multithreading med RxJava Operatörer

Om din app ska ge bästa möjliga användarupplevelse måste den kunna utföra intensiva eller långvariga uppgifter och utföra flera uppgifter samtidigt, utan att blockera Androids viktigaste huvudbruksanvisning.

Tänk dig att din app behöver hämta viss information från två olika databaser. Om du utför båda dessa uppgifter efter varandra på Android-huvudtråden kommer det inte bara att ta mycket tid, men användargränssnittet kommer inte att svara förrän din app har slutat hämta varje enskild information från båda databaserna . Inte precis en bra användarupplevelse!

En mycket bättre lösning är att skapa ytterligare två trådar där du kan utföra båda dessa uppgifter samtidigt utan att de blockerar huvud-UI-tråden. Detta tillvägagångssätt innebär att arbetet kommer att genomföras dubbelt så snabbt, och användaren kan fortsätta att interagera med din apps användargränssnitt hela tiden. Potentiellt kan dina användare inte ens vara medvetna om att din app utför ett intensivt och långvarigt arbete i bakgrunden. All databasinformation kommer helt enkelt att visas i programmets användargränssnitt, som om det är magiskt!

Utanför lådan tillhandahåller Android några verktyg som du kan använda för att skapa ytterligare trådar, inklusive Services och IntentServices, men dessa lösningar är knepiga att implementera och kan snabbt resultera i komplex, verbos kod. Plus, om du inte implementerar multithreading korrekt, kan du hitta dig själv med ett program som läcker minne och kasta alla typer av fel.

För att göra multithreading på Android ännu mer huvudvärk-inducerande, är Android huvudinriktad tråd den enda tråden som kan uppdatera appens användargränssnitt. Om du vill uppdatera appens användargränssnitt med resultatet av arbetet som utförs på någon annan tråd, då behöver du vanligtvis skapa en Hanterare på huvudgränssnittet och använd sedan det här Hanterare för att överföra data från din bakgrundsgänga till huvudtråden. Det betyder mer kod, mer komplexitet och fler möjligheter till fel att krypa in i ditt projekt.

Men RxJava har två operatörer som kan hjälpa dig att undvika mycket av denna komplexitet och potential för fel.

Observera att du använder dessa operatörer i samband med schemaläggare, vilka är väsentligen komponenter som låter dig ange trådar. För nu, bara tänka på schemaläggare som synonym med ordet tråd.

  • subscribeOn (Scheduler): Som standard är en Märkbar avger sin data på tråden där abonnemanget deklarerades, dvs där du ringde .prenumerera metod. I Android är det i allmänhet den viktigaste användargränssnittet. Du kan använda subscribeOn () operatör för att definiera en annan Schemaläggare där det Märkbar borde utföra och avge uppgifterna.
  • observeOn (Scheduler): Du kan använda den här operatören för att omdirigera din Märkbars utsläpp till en annan Schemaläggare, effektivt byta tråd där MärkbarMeddelanden skickas, och i förlängning tråden där uppgifterna konsumeras.

RxJava levereras med ett antal schemaläggare som du kan använda för att skapa olika trådar, inklusive:

  • Schedulers.io (): Designad för att användas för IO-relaterade uppgifter. 
  • Schedulers.computation (): Utformad för att användas för beräkningsuppgifter. Som standard är antalet trådar i beräkningsplaneringsprogrammet begränsat till antalet CPUer som är tillgängliga på din enhet.
  • Schedulers.newThread (): Skapar en ny tråd.

Nu har du en översikt över alla rörliga delar, låt oss titta på några exempel på hur subscribeOn () och observeOn () används och se några schemaläggare i åtgärd.

subscribeOn ()

I Android använder du vanligtvis subscribeOn () och en åtföljande Schemaläggare för att ändra tråden där något långvarigt eller intensivt arbete utförs, så det finns ingen risk att blockera huvud-UI-tråden. Du kan till exempel bestämma att importera en stor mängd data på io () schemaläggare eller utföra några beräkningar på beräkning() schemaläggare.

I följande kod skapar vi en ny tråd där Märkbar kommer att utföra sin verksamhet och avge värdena 1, 2, och 3.

Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer); 

Medan det här är allt du behöver för att skapa en tråd och börja överföra data på den tråden, kanske du vill ha någon bekräftelse på att detta observerbara verkligen fungerar på en ny tråd. En metod är att skriva ut namnet på den tråd som din ansökan använder för närvarande i Android StudioLogcat Monitor.

Bekvämt, i det föregående inlägget, Kom igång med RxJava, skapade vi en applikation som skickar meddelanden till Logcat Monitor vid olika stadier under Observables livscykel, så vi kan återanvända mycket av denna kod.

Öppna projektet du skapade i det inlägget och tweak din kod så att den använder ovanstående Märkbar som sin källa Märkbar. Lägg sedan till subscribeOn () Operatör och ange att meddelanden som skickas till Logcat bör innehålla namnet på den aktuella tråden.

Ditt färdiga projekt ska se ut så här:

importera android.support.v7.app.AppCompatActivity; importera android.os.Bundle; importera android.util.Log; importera io.reactivex.Observable; importera io.reactivex.Observer; importera io.reactivex.disposables.Disposable; importera io.reactivex.schedulers.Schedulers; public class MainActivity utökar AppCompatActivity public static final string TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer);  Observatör Observer = ny Observer() @Override public void onSubscribe (Disponibel d) Log.e (TAG, "onSubscribe" + Thread.currentThread (). GetName ());  @Override public void onNext (Integer value) Log.e (TAG, "onNext:" + value + Thread.currentThread (). GetName ());  @Override public void onError (Throwable e) Log.e (TAG, "onError:");  @Override public void onComplete () Log.e (TAG, "onComplete: All Done!" + Thread.currentThread (). GetName ()); ; 

Se till att Android Studio's Logcat Monitor är öppen (genom att välja Android Monitor fliken följt av logcat) och kör sedan ditt projekt på antingen en fysisk Android-enhet eller en AVD. Du ska se följande utmatning i Logcat Monitor:

Här kan du se det .prenumerera kallas på huvud-UI-tråden, men observerbar fungerar på en helt annan tråd.

De subscribeOn () Operatören kommer att ha samma effekt oavsett var du placerar den i observerbar kedja; Du kan dock inte använda flera subscribeOn () operatörer i samma kedja. Om du inkluderar fler än en subscribeOn (), då kommer din kedja endast Använd subscribeOn () det är närmast källan observerbar.

observeOn ()

Till skillnad från subscribeOn (), där du placerar observeOn () i din kedja gör materia, eftersom den här operatören bara ändrar tråden som används av observabla som visas nedströms

Om du till exempel lägger in följande i din kedja kommer alla observerbara som visas i kedjan från denna punkt att använda den nya tråden.

.observeOn (Schedulers.newThread ())

Denna kedja fortsätter att springa på den nya tråden tills den möter en annan observeOn () operatör, då kommer det att växla till den tråd som anges av den operatören. Du kan styra tråden där specifika observabla skickar sina meddelanden genom att infoga flera observeOn () operatörer i din kedja.

När du utvecklar Android-appar använder du vanligtvis observeOn () att skicka resultatet av arbete som utförs på bakgrundstrådar till Android: s huvudbruksanvisning. Det enklaste sättet att omdirigera utsläpp till Android: s huvudsakliga användargränssnitt är att använda AndroidSchedulers.mainThread Scheduler, som ingår som en del av RxAndroid-biblioteket, snarare än RxJava-biblioteket. 

RxAndroid-biblioteket innehåller Android-specifika bindningar för RxJava 2, vilket gör det till en värdefull extra resurs för Android-utvecklare (och något vi kommer att titta på i mycket mer detalj i nästa post i denna serie).

För att lägga till RxAndroid i ditt projekt, öppna din modulnivå build.gradle fil och lägg till den senaste versionen av biblioteket till avdelningen. I skrivande stund var den senaste versionen av RxAndroid 2.0.1, så jag lägger till följande:

beroenden ... kompilera 'io.reactivex.rxjava2: rxandroid: 2.0.1'

När du har lagt till det här biblioteket i ditt projekt kan du ange att resultaten av en observerbar ska skickas till din App: s huvudgränssnitt, med en enda kodkod:

.observeOn (AndroidSchedulers.mainThread ())

Med tanke på att kommunicering med din apps huvudbruksanvisning tar upp en fullständig sida av de officiella Android-dokumenten, är det en stor förbättring som kan spara mycket tid när du skapar multithreaded Android-applikationer.

RxJavas stora nackdel

Medan RxJava har mycket att erbjuda Android-utvecklare, är ingen teknik perfekt och RxJava har en stor fallgrop som har potential att krascha din app.

Som standard har RxJava ett push-baserat arbetsflöde: data produceras uppströms av en Märkbar, och pressas sedan nedströms till den tilldelade Observatör. Huvudproblemet med ett push-baserat arbetsflöde är hur enkelt det är för producenten (i detta fall, Märkbar) att skicka ut varor för snabbt för konsumenten (Observatör) att bearbeta.

En chattig Märkbar och en långsam Observatör kan snabbt resultera i en eftersläpning av oanvända föremål, vilket kommer att ge upphov till systemresurser och kan till och med resultera i en OutOfMemoryException. Detta problem är känt som mottryck.

Om du misstänker att backpressuren uppträder i din app, så finns det några möjliga lösningar, inklusive att använda en operatör för att minska antalet produkter som produceras.

Skapa provtagningsperioder med prov() och throttlefirst ()

Om en Märkbar emitterar ett stort antal objekt, då är det kanske inte nödvändigt för den tilldelade Observatör att motta varje singel en av dessa saker.

Om du säkert kan ignorera några av en Märkbars utsläpp, då finns det några operatörer som du kan använda för att skapa provtagningsperioder och sedan körsbärsval av specifika värden som emitteras under dessa perioder:

  • De prov() Operatören kontrollerar observatörens utdata med intervaller som anges av dig och tar sedan det senaste objektet som emitterades under den provtagningsperioden. Till exempel, om du inkluderar .prov (5, SECONDS) I ditt projekt kommer Observer att få det sista värdet som emitterades under varje fem sekunders intervall. 
  • De throttleFirst () Operatören tar det första värdet som emitterades under provtagningsperioden. Till exempel, om du inkluderar .gasspjäll (5, SECONDS) då kommer Observer att få det första värdet som emitteras under varje fem sekunders intervall.  

Batching Utsläpp med buffert()

Om du inte kan hoppa över några utsläpp på ett säkert sätt, kan du fortfarande ta lite press på en kamp Observatör genom att gruppera utsläpp i satser och sedan skicka vidare en massa. Bearbetning av satsvis utsläpp är vanligtvis effektivare än att bearbeta flera utsläpp separat, så detta tillvägagångssätt bör förbättra konsumtionshastigheten.

Du kan skapa batched emissioner med hjälp av buffert() operatör. Här använder vi buffert() att satsa alla poster som emitteras under en tre-sekundersperiod:

Observable.range (0, 10) .buffer (3, SECONDS) .subscribe (System.out :: println);

Alternativt kan du använda buffert() att skapa en sats bestående av ett visst antal utsläpp. Här säger vi till exempel buffert() att bunta utsläpp i grupper om fyra:

Observable.range (0, 10) .buffer (4) .subscribe (System.out :: println);

Byta observables with Flowables

En alternativ metod för att minska antalet utsläpp är att ersätta Märkbar det orsakar problem med a flytande.

I RxJava 2 bestämde RxJava-laget att dela upp standarden Märkbar in i två typer: den vanliga sorten vi har tittat på i hela serien, och flytandes.

flytandes funktion på ungefär samma sätt som Märkbars, men med en stor skillnad: flytandes skicka bara så många saker som observatörsförfrågningarna. Om du har en Märkbar som sänder ut fler objekt än vad den tilldelade observatören kan konsumera, så kanske du vill överväga att byta till en flytande istället.

Innan du kan börja använda flytandes i dina projekt måste du lägga till följande importdeklaration:

importera io.reactivex.Flowable;

Du kan då skapa flytandes använder exakt samma tekniker som används för att skapa Märkbars. Till exempel skapar varje av följande kodutdrag en flytande som kan skicka data:

flytande flytbar = Flowable.fromArray (ny sträng [] "south", "north", "west", "east"); ... flowable.subscribe ()
flytande flytbar = Flowable.range (0, 20); ... flowable.subscribe ()

Vid denna tidpunkt kanske du undrar: varför skulle jag någonsin använda Märkbars när jag bara kan använda flytandes och behöver inte oroa sig för backpressure? Svaret är att a flytande förekommer mer av en overhead än en vanlig Märkbar, så för att skapa en högpresterande app borde du hålla fast vid det Märkbars om du inte misstänker att din ansökan kämpar med backpress.

Singel

en flytande är inte den enda variationen på Märkbar som du hittar i RxJava, eftersom biblioteket också innehåller Enda klass.

Singel är användbara när du bara behöver sända ett värde. I dessa scenarier skapar du en Märkbar kan kännas som overkill, men a Enda är utformad för att helt enkelt avge ett enda värde och sedan slutföra, antingen genom att ringa:

  • onSuccess (): The Enda avger sitt enda värde.  
  • onerror (): Om Enda kan inte avge sitt föremål, då kommer det att passera den här metoden den resulterande Throwable.

en Enda kommer bara att ringa en av dessa metoder, och avsluta sedan omedelbart.

Låt oss titta på ett exempel på a Enda i åtgärd-igen, för att spara tid vi använder kod igen:

importera android.os.Bundle; importera android.support.v7.app.AppCompatActivity; importera android.util.Log; importera io.reactivex.Single; importera io.reactivex.SingleObserver; importera io.reactivex.disposables.Disposable; public class MainActivity utökar AppCompatActivity public static final string TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Single.just ("Hello World") .subscribe (getSingleObserver ());  privat SingleObserver getSingleObserver () returnera nya SingleObserver() @Override public void onSubscribe (Disposable d) Log.e (TAG, "unsubscribe");  @Override public void onSuccess (String value) Log.e (TAG, "onSuccess:" + värde);  @Override public void onError (Throwable e) Log.e (TAG, "onError:"); ; 

Kör ditt projekt på en AVD eller en fysisk Android-enhet, så ser du följande utmatning i Android Studio: s Logcat Monitor:

Om du ändrar dig och vill konvertera en Enda in i en Märkbar Vid någon tidpunkt har RxJava än en gång alla operatörer du behöver, inklusive:

  • slå ihop med(): Sammanfogar flera Singel in i en singel Märkbar
  • concatWith (): Kedjer de objekt som emitteras av flera Singel tillsammans för att bilda en Märkbar utsläpp. 
  • toObservable (): Konverterar a Enda in i en Märkbar som avger objektet som ursprungligen emitterades av singeln, och slutför sedan.

Sammanfattning

I det här inlägget utforskade vi några RxJava-operatörer som du kan använda för att skapa och hantera flera trådar, utan komplexiteten och möjligheten till fel som traditionellt åtföljs av multithreading på Android. Vi såg också hur du kan använda RxAndroid-biblioteket för att kommunicera med Androids viktigaste huvudbruksanvändningsgänga med en enda kodrad, och hur man säkerställer mottryck blir inte ett problem i din ansökan.

Vi har berört RxAndroid-biblioteket några gånger i hela serien, men det här biblioteket är fyllt med Android-specifika RxJava-bindningar som kan vara ovärderliga när vi arbetar med RxJava på Android-plattformen, så i sista inlägget i serien kommer vi titta på RxAndroid biblioteket i mycket mer detalj.

Innan dess, kolla in några av våra andra inlägg om kodning för Android!