Przejdź te 3 kroki, by poznać podstawy Redux Observable

Rx jest biblioteką ułatwiającą programowanie asynchroniczne, implementującą paradygmaty programowania funkcyjnego.
Jest już niemalże standardem w świecie Androida, zdobywa również coraz większą popularność w świecie frontendu gdzie implementacją jest biblioteka RxJS – której pod spodem używa choćby framework Angular.
RxJS pojawił się również w świecie Reacta, użyty w bibliotece Redux Observable – dość w nim popularnej choć sama biblioteka nie jest z Reactem powiązana.
W tym poście przedstawione zostaną trzy kroki, których przejście pozwoli na zapoznanie się z podstawami bibliotek RxJS i Redux Observable – przeczytanie go będzie dobrą bazą do dalszej nauki.

Lista kroków do przejścia

  1. Poznanie czym są subjecty w RxJS
  2. Podstawowe operatory RxJS
  3. Pierwszy epic w Redux Observable

Zapoznawanie się z podstawami Redux Obervable zacznijmy od podstaw RxJS

Subjecty w RxJS

Subjecty umożliwiają nam tworzenie strumieni do których potem następowała będzie subskrypcja:

const source$ = new Subject();
// gdzieś tam kodzie ktoś subskrybuje do strumienia
source$.subscribe(data => console.log(data));

source$.next('test');

W takiej sytuacji zostanie zalogowane słowo test. Mamy klika subjectów:

Subject

Subject umożliwia dostęp do wartości, które zostały nadane już po zasubskrybowaniu – nie mamy dostępu do wcześniejszych wartości w strumieniu:

const source$ = new Subject();

source$.next('first');
source$.next('second');

source$.subscribe(data => console.log(data));

source$.next('third');

Zalogowane zostanie tylko “third”, nie ma dostępu do poprzednich wartości.

BehaviorSubject

W odróżnieniu od poprzednika, BehaviorSubject zaraz po zasubskrybowaniu da nam ostatnią wartość w strumieniu – stąd trzeba go zainicjalizować jakąś wartością przy tworzeniu:

const source$ = new BehaviorSubject('first');

source$.next('second');

source$.subscribe(data => console.log(data));
// Od razu zalogowanie "second"

source$.next('third');

Zalogowane zostaną 2 słowa: second i third. Słowo second pojawi się od razu po zasubksrybowaniu.

ReplaySubject

ReplaySubject daje nam dostęp do N ostatnich wartości:

const source$ = new ReplaySubject(2);

source$.next('first');
source$.next('second');
source$.next('third');

source$.subscribe(data => console.log(data));
// Od razu zalogowanie "second" a potem "third"

source$.next('fourth'); // Zalogowanie "fourth"

Ponieważ ReplaySubject został zainicjowany wartością 2, subskrypcja odpali się dwukrotnie od razu po zasubksrybowaniu. Jeżeli w konstruktorze nie podamy ile ostatnich wartości ma otrzymać nowy subskrybent, otrzyma on wszystkie poprzednie wartości.

Podstawowe operatory RxJS

ajax

Operator ajax umożliwia wysłanie zapytanie http i zasubskrybowania do rezultatu:

ajax('/users').subscribe(onSuccess, onError, onComplete)

map

Operator map() przemapowuje otrzymaną ze strumienia wartość na inną i zwraca ją jako strumień:

users$.pipe(
  map(user => user.name)
).subscribe(names => names.forEach(name => console.log(name)));

concatMap

Operator jest bardzo podobny do map, z tym że wartością zwracaną musi być strumień – zamiast mapowania wartości na wartość, mamy mapowanie wartości na strumień. Kolejny operator wykonuje się dopiero po tym jak poprzedni się wykona. Użyty może zostać do sekwencjonowania zapytań do serwera bez konieczności zagnieżdżania subskrypcji

source$.pipe(
  concatMap(() => ajax('/users'))
  concatMap(() => ajax('/orders)),
).subscribe(logOrder);

tap

Operator służy do uruchamiania efektów ubocznych, takich jak logowanie:

source$
  .pipe(
    tap(elements => console.log(data)),
    map(el => el.name),
    tap(elements => console.log(elements))
);

retry

W sytuacji gdy chcemy powtórzyć zapytanie, które wyrzuciło błąd możemy posłużyć się operatorem retry :

const source$ = ajax(dataUrl);

source$.pipe(
  retry(5)
).subscribe(data => console.log(data));

Co ten kod tak naprawdę zrobi? Jeżeli na subskrypcji z operatora ajax odpali sie callback onError operator retry ponownie zasubksrybuje – co spowoduje ponowne wysłanie zapytania, po 5 próbach przestanie powtarzać to zachowanie.
Oczywiście ta implementacja jest dość naiwna – nie ma żadnych przerw pomiędzy kolejnymi próbami czy ponawiania tylko, gdy zmienią się warunki np powróci sieć.

debounceTime

Gdy piszemy np dynamiczne, pobierane z serwera podpowiedzi do pola wyszukiwania zależy nam na tym, by ograniczać ilość zapytań wysyłanych do serwera. Przykładowo, chcemy wysyłać zapytanie sekundę po tym, jak użytkownik skończy pisać w polu wyszukiwania. Operator debounceTime daje nam taką możliwość:

const search = document.querySelector('#search');

const source$ = fromEvent(search, 'keyup');

source$
  .pipe(
    map(e => e.currentTarget.value),
    debounceTime(1000),
    // zapytanie
)

Każda wartość pojawiająca się wcześniej niż 1000 milisekund od ostatniej nie zostanie przekazana dalej. Po upłynięciu sekundy zostanie wyemitowana ostatnia wartość jaka się pojawiła.

distinctUntilKeyChanged

W sytuacji, gdy np potrzebujemy reagować na sytuację gdy wciśnięty jest inny klawisz niż poprzedni, jak sterowanie w grach, możemy użyć operatora distinctUntilKeyChanged:

const source$ = fromEvent(document, 'keyup')
  .pipe(
    filter(e => e.code.includes('Arrow')),
    distinctUntilKeyChanged('code'),
    map(e => e.code),
).subscribe(console.log);

Emituje on wartość tylko wtedy, jeżeli wartość będąca pod podanym kluczem jest inna niż poprzednia. W przykładzie odfiltrowywujemy kliknięcia które nie są kliknięciami w strzałki na klawiaturze, następnie operator distinctUntilKeyChanged zapewnia nas, że wyemituje wartość tylko wtedy kiedy kliknięta jest inna strzałka.

Pierwszy epic w Redux Observable

Bibliotekę instalujemy poleceniem

npm install --save redux-observable

Napiszmy prosty epic decydujący na podstawie stanu, czy akcja SAVE_USER powinna zainicjować akcję SEND_POST w sytuacji gdy nie ma userId (użytkownik jest tworzony) czy też SEND_PUT w sytuacji gdy userId już jest (użytkownik jest edytowany):

const epic = (action$, state$) =>
  action$.pipe(
    ofType("SAVE_USER"),
    map(action => ({
      type: action.payload.userId? "SEND_UPDATE" : "SEND_POST",
      payload: action.payload
    })),
);

Operator ofType odfiltrowywuje niepożądane przez nas akcje, natomiast operator map powoduje wysłanie kolejnej akcji.
Warto wiedzieć, że akcje wpadają do epiców po tym, jak zostaną obsłużone przez reducery. Stąd nie ma możliwości zablokowania akcji poprzez zastopowanie jej w epicu.

Nasłuchiwać możemy naraz na wiele akcji. Przykładowo, jeżeli interesuje nas akcja utworzenia / aktualizacji użytkownika, do operatora ofType należy przekazać wiele akcji:

const epic = (action$, state$) =>
  action$.pipe(
    ofType("SEND_UPDATE", "SEND_POST"),
    tap(action => console.log(`Action related to user: ${action}`)),
);

Podsumowanie

Mając podstawy Redux Observable, można kontynuować naukę w kierunku bardziej zaawansowanych zastosowań – to, że biblioteka potrafi wiele jest widoczne już w jej dokumentacji, gdzie znajduje się m.in. przykład anulowania asynchronicznego kodu.