Rx jest biblioteką ułatwiającą programowanie asynchroniczne, implementującą paradygmaty programowania funkcyjnego.
Jest już niemalże standardem w świecie Androida, zdobywa również coraz większą popularność w świecie frontendu gdzie implementacją jest biblioteka RxJS – której pod spodem używa choćby framework Angular.
RxJS pojawił się również w świecie Reacta, użyty w bibliotece Redux Observable – dość w nim popularnej choć sama biblioteka nie jest z Reactem powiązana.
W tym poście przedstawione zostaną trzy kroki, których przejście pozwoli na zapoznanie się z podstawami bibliotek RxJS i Redux Observable – przeczytanie go będzie dobrą bazą do dalszej nauki.
Lista kroków do przejścia
Zapoznawanie się z podstawami Redux Obervable zacznijmy od podstaw RxJS
Subjecty w RxJS
Subjecty umożliwiają nam tworzenie strumieni do których potem następowała będzie subskrypcja:
const source$ = new Subject(); // gdzieś tam kodzie ktoś subskrybuje do strumienia source$.subscribe(data => console.log(data)); source$.next('test');
W takiej sytuacji zostanie zalogowane słowo test. Mamy klika subjectów:
Subject
Subject umożliwia dostęp do wartości, które zostały nadane już po zasubskrybowaniu – nie mamy dostępu do wcześniejszych wartości w strumieniu:
const source$ = new Subject(); source$.next('first'); source$.next('second'); source$.subscribe(data => console.log(data)); source$.next('third');
Zalogowane zostanie tylko “third”, nie ma dostępu do poprzednich wartości.
BehaviorSubject
W odróżnieniu od poprzednika, BehaviorSubject zaraz po zasubskrybowaniu da nam ostatnią wartość w strumieniu – stąd trzeba go zainicjalizować jakąś wartością przy tworzeniu:
const source$ = new BehaviorSubject('first'); source$.next('second'); source$.subscribe(data => console.log(data)); // Od razu zalogowanie "second" source$.next('third');
Zalogowane zostaną 2 słowa: second i third. Słowo second pojawi się od razu po zasubksrybowaniu.
ReplaySubject
ReplaySubject daje nam dostęp do N ostatnich wartości:
const source$ = new ReplaySubject(2); source$.next('first'); source$.next('second'); source$.next('third'); source$.subscribe(data => console.log(data)); // Od razu zalogowanie "second" a potem "third" source$.next('fourth'); // Zalogowanie "fourth"
Ponieważ ReplaySubject został zainicjowany wartością 2, subskrypcja odpali się dwukrotnie od razu po zasubksrybowaniu. Jeżeli w konstruktorze nie podamy ile ostatnich wartości ma otrzymać nowy subskrybent, otrzyma on wszystkie poprzednie wartości.
Podstawowe operatory RxJS
ajax
Operator ajax umożliwia wysłanie zapytanie http i zasubskrybowania do rezultatu:
ajax('/users').subscribe(onSuccess, onError, onComplete)
map
Operator map() przemapowuje otrzymaną ze strumienia wartość na inną i zwraca ją jako strumień:
users$.pipe( map(user => user.name) ).subscribe(names => names.forEach(name => console.log(name)));
concatMap
Operator jest bardzo podobny do map, z tym że wartością zwracaną musi być strumień – zamiast mapowania wartości na wartość, mamy mapowanie wartości na strumień. Kolejny operator wykonuje się dopiero po tym jak poprzedni się wykona. Użyty może zostać do sekwencjonowania zapytań do serwera bez konieczności zagnieżdżania subskrypcji
source$.pipe( concatMap(() => ajax('/users')) concatMap(() => ajax('/orders)), ).subscribe(logOrder);
tap
Operator służy do uruchamiania efektów ubocznych, takich jak logowanie:
source$ .pipe( tap(elements => console.log(data)), map(el => el.name), tap(elements => console.log(elements)) );
retry
W sytuacji gdy chcemy powtórzyć zapytanie, które wyrzuciło błąd możemy posłużyć się operatorem retry
:
const source$ = ajax(dataUrl); source$.pipe( retry(5) ).subscribe(data => console.log(data));
Co ten kod tak naprawdę zrobi? Jeżeli na subskrypcji z operatora ajax
odpali sie callback onError
operator retry
ponownie zasubksrybuje – co spowoduje ponowne wysłanie zapytania, po 5 próbach przestanie powtarzać to zachowanie.
Oczywiście ta implementacja jest dość naiwna – nie ma żadnych przerw pomiędzy kolejnymi próbami czy ponawiania tylko, gdy zmienią się warunki np powróci sieć.
debounceTime
Gdy piszemy np dynamiczne, pobierane z serwera podpowiedzi do pola wyszukiwania zależy nam na tym, by ograniczać ilość zapytań wysyłanych do serwera. Przykładowo, chcemy wysyłać zapytanie sekundę po tym, jak użytkownik skończy pisać w polu wyszukiwania. Operator debounceTime
daje nam taką możliwość:
const search = document.querySelector('#search'); const source$ = fromEvent(search, 'keyup'); source$ .pipe( map(e => e.currentTarget.value), debounceTime(1000), // zapytanie )
Każda wartość pojawiająca się wcześniej niż 1000 milisekund od ostatniej nie zostanie przekazana dalej. Po upłynięciu sekundy zostanie wyemitowana ostatnia wartość jaka się pojawiła.
distinctUntilKeyChanged
W sytuacji, gdy np potrzebujemy reagować na sytuację gdy wciśnięty jest inny klawisz niż poprzedni, jak sterowanie w grach, możemy użyć operatora distinctUntilKeyChanged
:
const source$ = fromEvent(document, 'keyup') .pipe( filter(e => e.code.includes('Arrow')), distinctUntilKeyChanged('code'), map(e => e.code), ).subscribe(console.log);
Emituje on wartość tylko wtedy, jeżeli wartość będąca pod podanym kluczem jest inna niż poprzednia. W przykładzie odfiltrowywujemy kliknięcia które nie są kliknięciami w strzałki na klawiaturze, następnie operator distinctUntilKeyChanged
zapewnia nas, że wyemituje wartość tylko wtedy kiedy kliknięta jest inna strzałka.
Pierwszy epic w Redux Observable
Bibliotekę instalujemy poleceniem
npm install --save redux-observable
Napiszmy prosty epic decydujący na podstawie stanu, czy akcja SAVE_USER
powinna zainicjować akcję SEND_POST
w sytuacji gdy nie ma userId
(użytkownik jest tworzony) czy też SEND_PUT
w sytuacji gdy userId
już jest (użytkownik jest edytowany):
const epic = (action$, state$) => action$.pipe( ofType("SAVE_USER"), map(action => ({ type: action.payload.userId? "SEND_UPDATE" : "SEND_POST", payload: action.payload })), );
Operator ofType
odfiltrowywuje niepożądane przez nas akcje, natomiast operator map
powoduje wysłanie kolejnej akcji.
Warto wiedzieć, że akcje wpadają do epiców po tym, jak zostaną obsłużone przez reducery. Stąd nie ma możliwości zablokowania akcji poprzez zastopowanie jej w epicu.
Nasłuchiwać możemy naraz na wiele akcji. Przykładowo, jeżeli interesuje nas akcja utworzenia / aktualizacji użytkownika, do operatora ofType
należy przekazać wiele akcji:
const epic = (action$, state$) => action$.pipe( ofType("SEND_UPDATE", "SEND_POST"), tap(action => console.log(`Action related to user: ${action}`)), );
Podsumowanie
Mając podstawy Redux Observable, można kontynuować naukę w kierunku bardziej zaawansowanych zastosowań – to, że biblioteka potrafi wiele jest widoczne już w jej dokumentacji, gdzie znajduje się m.in. przykład anulowania asynchronicznego kodu.