class: center, middle  .img-2nd[] # Reactive Programming with RxJS --- class:center, middle # About me Thorben Ziemek .img-inline[] thorben_z .img-inline[] BörseGo .img-inline[] Guidants --- class: center, middle, fullbg background-image: url(assets/guidants-2.png) --- class: center, middle  # Reactive Programming with RxJS --- class: center, middle, oneword # Reactive programming --- # 🎓Reactive programming -- - declarative programming paradigm -- - automatic data propagation --
(https://en.wikipedia.org/wiki/Reactive_programming)
--- class: center, middle  # RxJS = JavaScript implementation of ReactiveX --- class: center, middle, fullbg background-image: url(assets/reactivex.png) --- class: center, middle, oneword # Asynchronous programming --- class: middle # Asynchronous programming -- - DOM events -- - timers -- - message queues -- - callbacks -- - HTTP requests -- - WebSocket connections -- ...*all things web* are async -- this might be useful! --- class: center, middle, oneword # Observable streams --- class: center, middle .center.guidants-img[] --- class: center, middle .center.guidants-img[] --- class: center, middle .center.guidants-img[] --- class: center, middle .center.guidants-img[] --- class: center, middle .center.guidants-img[] --- class: center, middle
Single
Multiple
Pull
Function
Iterator
Push
Promise
--- class: center, middle
Single
Multiple
Pull
Function
Iterator
Push
Promise
👉Observable👈
--- # Observable pushes data to it's *observer* until it terminates ``` subscription = observable.subscribe(observer) ``` -- # Observer ``` { next(data) { /* observable emitted data */ } error(err) { /* observable ended with error */ } complete() { /* observable completed */ } } ``` --- class: middle, center # programming with observable streams -- .img-inline[] Create -- .img-inline[] Combine/Transform -- .img-inline[] Listen --- class: center,middle,oneword # .img-inline[] Create Create event or data streams. --- class: center,middle,oneword # let's create a simple observable --- # Get RxJS
$ npm install rxjs + rxjs@6.4.0
or https://github.com/ReactiveX/rxjs/releases `` -- # Use it ``` const {Observable} = require('rxjs'); /* Node.js */ ``` or ``` import {Observable} from 'rxjs'; /* e.g. webpack bundling */ ``` --- class: codeslide example: basicObservable0 ``` import {Observable} from 'rxjs'; const random$ = Observable.create(observer => { /* TODO: send things to observer */ }); const subscription = random$.subscribe(observer); ```
-- # 💡 LogObserver ```js /* example observer in this talk */ function createLogObserver(name) { return { next(value) { console.log(name, ' next ', value); }, error(err) { console.error(name, ' ended with error ', err); }, complete() { console.log(name, ' complete'); } } } ``` --- class: codeslide example: basicObservable1 ``` import {Observable} from 'rxjs'; const random$ = Observable.create(observer => { log.info("observable started"); observer.next(Math.random()); observer.complete(); }); const s1 = random$.subscribe(createLogObserver("#1")); ```
--- class: codeslide example: basicObservable2 ``` import {Observable} from 'rxjs'; const random$ = Observable.create(observer => { log.info("observable started"); observer.next(Math.random()); observer.complete(); }); const s1 = random$.subscribe(createLogObserver("#1")); const s2 = random$.subscribe(createLogObserver("#2")); ```
--- class: codeslide example: basicObservable3 ``` import {Observable} from 'rxjs'; const random$ = Observable.create(observer => { log.info("observable started"); setTimeout(function() { log.info('timeout execution'); observer.next(Math.random()); observer.complete(); }, 1000); }); const s1 = random$.subscribe(createLogObserver("#1")); const s2 = random$.subscribe(createLogObserver("#2")); ```
--- class: codeslide example: basicObservable4 ``` import {Observable} from 'rxjs'; const random$ = Observable.create(observer => { log.info("observable started"); setTimeout(function() { log.info('timeout execution'); observer.next(Math.random()); observer.complete(); }, 1000); }); const s1 = random$.subscribe(createLogObserver("#1")); s1.unsubscribe(); const s2 = random$.subscribe(createLogObserver("#2")); ```
--- class: codeslide example: basicObservable5 ``` import {Observable} from 'rxjs'; const random$ = Observable.create(observer => { log.info("observable started"); const timeout = setTimeout(function() { log.info('timeout execution'); observer.next(Math.random()); observer.complete(); }, 1000); return function unsubscribe() { log.info('unsubscribed from observable'); clearTimeout(timeout); } }); const s1 = random$.subscribe(createLogObserver("#1")); s1.unsubscribe(); const s2 = random$.subscribe(createLogObserver("#2")); ```
--- # Observable Lifecycle -- - lazyness: computation does not start until subscription -- - `observable` calls - `observer.next`: `0-∞ times` - `observer.error` *or* `observable.complete`: `0-1 times` -- - `observable.subscribe` returns a `subscription` -- - `subscription.unsubscribe` ends the subscription - `observer` does not get any more notifications, independent of what the Observable does (enforced by RxJS) - `observable` cleans up resources --- # 🔋 Batteries included ```js rxjs.of(1, 2, 3) rxjs.from([1, 2, 3]) rxjs.from(promise) rxjs.timer(1000) rxjs.interval(100) rxjs.fromEvent(document, "click") rxjs.ajax.getJSON(request) rxjs.bindCallback(fn) rxjs.bindNodeCallback(fn) ``` --- # 🔋 Batteries included ```js rxjs.of(1, 2, 3) // ~ next(1); next(2); next(3); rxjs.from([1, 2, 3]) // ~ next(1); next(2); next(3); rxjs.from(promise) // ~ promise.then(v=>next(v);complete(), error) rxjs.timer(1000) // ~ setTimeout(complete) rxjs.interval(100) // ~ setInterval(next) rxjs.fromEvent(document, "click") // ~ document.on('click', next) rxjs.ajax.getJSON(request) // ~ xhr(...) -> next(response); complete() rxjs.bindCallback(fn) // fn(v=>next(v);complete()) rxjs.bindNodeCallback(fn) // fn((err,v)=> err ? error(err) : next(v); complete() ) ``` --- class: middle, center # programming with observable streams .img-inline[] Create ✅ .img-inline[] Combine/Transform .img-inline[] Listen --- class: center,middle,oneword # .img-inline[] Combine Compose and transform streams with query-like operators. --- class: center, middle, oneword # let's create a *search-as-you-type* box --- class: codeslide example: naiveSearch65 ``` import { fromEvent } from 'rxjs'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup'); search$.subscribe(createLogObserver()); ```
--- class: center,middle,oneword # operators --- class: center, middle # marble diagrams .w100[] --- class: center, middle, oneword # >100 operators! --- class: center, middle
rxmarbles.com
rxjs.dev/operator-decision-tree
--- class: codeslide example: naiveSearch70 ``` import { fromEvent } from 'rxjs'; import { map } from 'rxjs/operations'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup').pipe( map(event => inputEl.value) ); search$.subscribe(createLogObserver()); ```
--- class: center, middle, .w100[] --- class: codeslide example: naiveSearch75 ``` import { fromEvent } from 'rxjs'; import { map, startWith } from 'rxjs/operations'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup').pipe( map(event => inputEl.value), startWith(inputEl.value) ); search$.subscribe(createLogObserver()); ```
--- class: center, middle, .w100[] --- class: codeslide example: naiveSearch80 ``` import { fromEvent } from 'rxjs'; import { map, startWith, distinctUntilChanged } from 'rxjs/operations'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup').pipe( map(event => inputEl.value), startWith(inputEl.value), distinctUntilChanged() ); search$.subscribe(createLogObserver()); ```
--- class: center, middle, .w100[] --- class: codeslide example: naiveSearch85 ``` import { fromEvent } from 'rxjs'; import { map, startWith, debounceTime, distinctUntilChanged } from 'rxjs/operations'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup').pipe( map(event => inputEl.value), startWith(inputEl.value), distinctUntilChanged(), debounceTime(500) ); search$.subscribe(createLogObserver()); ```
--- class: codeslide example: naiveSearch90 ``` import { fromEvent } from 'rxjs'; import { map, startWith, debounceTime, distinctUntilChanged } from 'rxjs/operations'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup').pipe( map(event => inputEl.value), startWith(inputEl.value), debounceTime(500), distinctUntilChanged() ); search$.subscribe(createLogObserver()); ```
--- class: codeslide example: naiveSearch95 ``` import { fromEvent } from 'rxjs'; import { map, startWith, debounceTime, distinctUntilChanged } from 'rxjs/operations'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup').pipe( map(event => inputEl.value), startWith(inputEl.value), debounceTime(100), distinctUntilChanged() ); let ajaxSubscription = null; search$.subscribe(query => { /* TODO cancel ajax request */ /* TODO perform new ajax request */ }); ```
--- class: codeslide example: naiveSearch100 ``` import { fromEvent } from 'rxjs'; import { map, startWith, debounceTime, distinctUntilChanged } from 'rxjs/operations'; import { ajax } from 'rxjs/ajax'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup').pipe( map(event => inputEl.value), startWith(inputEl.value), debounceTime(100), distinctUntilChanged() ); let ajaxSubscription = null; search$.subscribe(query => { if (ajaxSubscription) { ajaxSubscription.unsubscribe(); } ajaxSubscription = createAjaxRequest(query) .subscribe(createOutputObserver()); }); function createAjaxRequest(query) { return ajax("http://localhost:3000/movies?q=" + encodeURIComponent(query) + "&_page=1&_limit=15" ).pipe( map(ajaxResult => ajaxResult.response.map(movie => movie.title + " (" + movie.year + ")" )) ); } ```
--- class: center, middle # emit an Observable whenever the query changes --- class: center, middle, background-image: url(assets/c3po.jpg) --- class: center, middle # higher order observables --- class: center, middle, .w100[] --- class: center, middle, oneword # switchMap --- class: codeslide example: search90 ``` import { fromEvent, concat, timer, of } from 'rxjs'; import { map, distinctUntilChanged, startWith, switchMap } from 'rxjs/operators'; import { ajax } from 'rxjs/ajax'; const inputEl = document.getElementById('query'); const search$ = fromEvent(inputEl, 'keyup').pipe( map(event => inputEl.value), startWith(inputEl.value), distinctUntilChanged(), debounceTime(100), switchMap(createAjaxRequest), map(data=>JSON.stringify(data,null,1)) ); search$.subscribe(createOutputObserver()); function createAjaxRequest(query) { return ajax("http://localhost:3000/movies?q=" + encodeURIComponent(query) + "&_page=1&_limit=15" ).pipe( map(ajaxResult => ajaxResult.response.map(movie => movie.title + " (" + movie.year + ")" )) ); } ```
--- class: codeslide example: search95 ``` import { fromEvent, concat, timer, of } from 'rxjs'; import { map, distinctUntilChanged, startWith, switchMap } from 'rxjs/operators'; import { ajax } from 'rxjs/ajax'; const inputEl = document.getElementById("query"); const search$ = fromEvent(inputEl, "keyup").pipe( map(event => inputEl.value), startWith(inputEl.value), distinctUntilChanged(), switchMap(debouncedMovieSearch), map(data => JSON.stringify(data, null, 1)) ); search$.subscribe(createOutputObserver()); function debouncedMovieSearch(query) { return concat( of({ status: "is " + query + " final?" }), timer(100), of({ status: "loading " + query }), createAjaxRequest(query) ); } function createAjaxRequest(query) { return ajax("http://localhost:3000/movies?q=" + encodeURIComponent(query) + "&_page=1&_limit=15" ).pipe( map(ajaxResult => ajaxResult.response.map(movie => movie.title + " (" + movie.year + ")" )) ); } ```
--- class: codeslide example: search100 ``` import { fromEvent, concat, timer, of } from 'rxjs'; import { map, distinctUntilChanged, startWith, switchMap, catchError } from 'rxjs/operators'; import { ajax } from 'rxjs/ajax'; const inputEl = document.getElementById("query"); const search$ = fromEvent(inputEl, "keyup").pipe( map(event => inputEl.value), startWith(inputEl.value), distinctUntilChanged(), switchMap(debouncedMovieSearch), map(data => JSON.stringify(data, null, 1)) ); search$.subscribe(createOutputObserver()); function debouncedMovieSearch(query) { return concat( of({ status: "is " + query + " final?" }), timer(100), of({ status: "loading " + query }), createAjaxRequest(query) ).pipe(catchError(err => of('Error occured:' + err))); } function createAjaxRequest(query) { return ajax("http://localhost:3000/movies?q=" + encodeURIComponent(query) + "&_page=1&_limit=15" ).pipe( map(ajaxResult => ajaxResult.response.map(movie => movie.title + " (" + movie.year + ")" )) ); } ```
--- class: center, oneword # custom operators --- class: codeslide example: search200 ``` import { fromEvent, concat, timer, of } from 'rxjs'; import { map, distinctUntilChanged, startWith, switchMap, catchError } from 'rxjs/operators'; import { ajax } from 'rxjs/ajax'; const inputEl = document.getElementById("query"); const search$ = fromEvent(inputEl, "keyup").pipe( map(event => inputEl.value), startWith(inputEl.value), distinctUntilChanged(), searchAsYouType(createAjaxRequest), map(data => JSON.stringify(data, null, 1)) ); search$.subscribe(createOutputObserver()); function searchAsYouType(performSearch) { return switchMap(query => concat( of({ status: "is " + query + " final?" }), timer(100), of({ status: "loading " + query }), performSearch(query) ).pipe(catchError(err => of('Error occured:' + err))) ); } function createAjaxRequest(query) { return ajax("http://localhost:3000/movies?q=" + encodeURIComponent(query) + "&_page=1&_limit=15" ).pipe( map(ajaxResult => ajaxResult.response.map(movie => movie.title + " (" + movie.year + ")" )) ); } ```
--- ``` import { ajax } from 'rxjs/ajax'; import createLogObserver from './createLogObserver'; const observable$ = ajax('http://localhost:3000/movies?q=star%20wars'); observable$.subscribe(createLogObserver('observer 1')); observable$.subscribe(createLogObserver('observer 2')); ``` -- .center.guidants-img[] -- # (aka ❄️ cold observable) --- .center.guidants-img[] --- class: center, middle, oneemoji #❄️🔥️️️ --- class: center, middle, oneemoji #❄️☕️🔥️️️ --- class: center, oneword # multicast / share observables --- class: codeslide example: multicast10 ``` import { Observable } from 'rxjs'; import { share } from 'rxjs/operators'; const inner$ = Observable.create(observer => { observer.next(42); observer.next(Math.random()); observer.complete(); }); const shared$ = inner$.pipe(share()); s1 = shared$.subscribe(createLogObserver('share()-1')); s2 = shared$.subscribe(createLogObserver('share()-2')); ```
--- class: center, middle, oneword # ❌only shared while still active --- class: codeslide example: multicast20 ``` import { Observable } from 'rxjs'; import { share } from 'rxjs/operators'; const inner$ = Observable.create(observer => { observer.next(42); observer.next(Math.random()); setTimeout(function() { observer.complete(); }, 1); }); const shared$ = inner$.pipe(share()); s1 = shared$.subscribe(createLogObserver('#1')); s2 = shared$.subscribe(createLogObserver('#2')); ```
--- class: codeslide example: multicast30 ``` import { Observable } from 'rxjs'; import { shareReplay } from 'rxjs/operators'; const inner$ = Observable.create(observer => { observer.next(42); observer.next(Math.random()); observer.complete(); }); const shared$ = inner$.pipe(shareReplay()); s1 = shared$.subscribe(createLogObserver('#1')); s2 = shared$.subscribe(createLogObserver('#2')); ```
--- class: codeslide example: multicast35 ``` import { Observable } from 'rxjs'; import { shareReplay } from 'rxjs/operators'; const inner$ = Observable.create(observer => { observer.next(42); observer.next(Math.random()); observer.complete(); }); const shared$ = inner$.pipe(shareReplay(1)); s1 = shared$.subscribe(createLogObserver('#1')); s2 = shared$.subscribe(createLogObserver('#2')); ```
--- class: codeslide example: multicast40 ``` import { Observable } from 'rxjs'; import { publish } from 'rxjs/operators'; const inner$ = Observable.create(observer => { observer.next(42); observer.next(Math.random()); observer.complete(); }); const publish$ = inner$.pipe(publish()); s1 = publish$.subscribe(createLogObserver('#1')); s2 = publish$.subscribe(createLogObserver('#2')); ```
--- class: codeslide example: multicast45 ``` import { Observable } from 'rxjs'; import { publish } from 'rxjs/operators'; const inner$ = Observable.create(observer => { observer.next(42); observer.next(Math.random()); observer.complete(); }); const publish$ = inner$.pipe(publish()); s1 = publish$.subscribe(createLogObserver('#1')); s2 = publish$.subscribe(createLogObserver('#2')); publish$.connect(); ```
--- class: codeslide example: multicast50 ``` import { Observable } from 'rxjs'; import { publish } from 'rxjs/operators'; const inner$ = Observable.create(observer => { observer.next(42); observer.next(Math.random()); observer.complete(); }); const publish$ = inner$.pipe(publish()); s1 = publish$.subscribe(createLogObserver('#1')); publish$.connect(); s2 = publish$.subscribe(createLogObserver('#2')); ```
--- # Subjects - Subject (like EventEmitter) ``` s = new Subject(); s.next(42); s.subscribe(observer); ``` - BehaviorSubject (emits its current value whenever it is subscribed to) ``` s = new BehaviorSubject(InitialValue); s.subscribe(observer); s.next(42); ``` - AsyncSubject (only emits its last value when it completes to all observers) ``` s = new AsyncSubject(InitialValue); s.subscribe(observer); s.next(42); s.complete(); ``` --- class: center, middle, oneword # ↺ Retry --- class: codeslide example: retry ``` import {Observable} from 'rxjs'; const unstableApi$ = Observable.create(observer => { log.info('trying unstable API access...') let timeout = setTimeout(function(){ if (Math.random() > 0.7) { observer.next(true); observer.complete(); } else { log.error('failed...'); observer.error(new Error("request failed")); } }, 500); return function unsubscribe() { clearTimeout(timeout); } }); unstableApi$.pipe(rxjs.operators.retry(3)).subscribe( createLogObserver() ) ```
--- class: middle, center # programming with observable streams .img-inline[] Create ✅ .img-inline[] Combine/Transform ✅ .img-inline[] Listen --- class: center,middle,oneword # .img-inline[] Listen Subscribe to any observable stream to perform side effects. --- # 👂subscribing to Observable - we did that (`createLogObserver`, `createOutputObserver`) - `toPromise` - `forEach` --- class: middle, center # programming with observable streams .img-inline[] Create ✅ .img-inline[] Combine/Transform ✅ .img-inline[] Listen ✅ --- # 🕰 advanced: schedulers > An execution context and a data structure to order tasks and schedule their execution. Provides a notion of (potentially virtual) time. -- ``` //value will be emitted at once — synchronously of(1).subscribe(console.log) //value will be emitted at once — synchronously of(1, queueScheduler).subscribe(console.log) // value will be emitted in mIcrotask just after current mAcrotask of(1, asapScheduler).subscribe(console.log) // value will be emitted in another mAcrotask of(1, asyncScheduler).subscribe(console.log) // value will be emitted in another mAcrotask just before browser repaint of(1, animationFrameScheduler).subscribe(console.log) ```
(https://blog.cloudboost.io/so-how-does-rx-js-queuescheduler-actually-work-188c1b46526e)
--- # ✅ advanced: testing - like any async code... - or with marble diagrams ``` it('should multiply by "2" each value emitted', () => { const values = { a: 1, b: 2, c: 3, x: 2, y: 4, z: 6}; const source = cold('-a-b-c-|', values); const expected = cold('-x-y-z-|', values); const result = source.pipe(map(x => x*2)); expect(result).toBeObservable(expected); }); ```
(https://medium.com/@bencabanes/marble-testing-observable-introduction-1f5ad39231c)
--- # conclusion -- ⚡️ one abstraction for everything: *Observable* ([TC39 Proposal](https://github.com/tc39/proposal-observable)) -- 🔋utilities for converting from/to Observable included -- 🏆 combining/transforming Observables with operators: powerful -- .img-inline2[]
-- .img-inline[] -- .img-inline[] --- # sources & resources [ReactiveX](https://reactivex.io) - the official ReactiveX website [rxjs.dev](https://rxjs.dev) - the official RxJS website [RxMarbles](https://rxmarbles.com) - example app for exploring operators [Rx Visualizer](https://rxviz.com/) - Animated playground for Rx Observables [Angular Observable Docs](https://angular.io/guide/observables) - part of the Angular documentation [Awesome RxJS](https://github.com/ichpuchtli/awesome-rxjs) - A collection of awesome RxJS tools, frameworks and resources [Angular In Depth: Understanding the publish and share Operators](https://blog.angularindepth.com/rxjs-understanding-the-publish-and-share-operators-16ea2f446635) - RxJS: Understanding the publish and share Operators [RxJS 5 Thinking Reactively | Ben Lesh](https://www.youtube.com/watch?v=3LKMwkuK0ZE&t=1154s) - Talk by Ben Lesh [Comprehensive Guide to Higher-Order RxJs Mapping Operators](https://blog.angular-university.io/rxjs-higher-order-mapping/) --- class: center, middle, oneword # happy coding! 🚀