Observable Anatomy

An observable’s subscribe method has the following signature

  1. stream.subscribe(fnValue, fnError, fnComplete)

The first one is being demonstrated below fnValue

  1. let stream$ = Rx.Observable.create((observer) => {
  2. observer.next(1)
  3. });
  4. stream$.subscribe((data) => {
  5. console.log('Data', data);
  6. })
  7. // 1

When observer.next(<value>) is being called the fnValue is being invoked.

The second callback fnError is the error callback and is being invoked by the following code, i.e observer.error(<message>)

  1. let stream$ = Rx.Observable.create((observer) => {
  2. observer.error('error message');
  3. })
  4. stream$.subscribe(
  5. (data) => console.log('Data', data)),
  6. (error) => console.log('Error', error)

Lastly we have the fnComplete and it should be invoked when a stream is done and has no more values to emit. It is triggered by a call to observer.complete() like so:

  1. let stream$ = Rx.Observable.create((observer) => {
  2. // x calls to observer.next(<value>)
  3. observer.complete();
  4. })

Unsubscribe

So far we have been creating an irresponsible Observable, irresponsible in the sense that it doesn’t clean up after itself. So let’s look at how to do that:

  1. let stream$ = new Rx.Observable.create((observer) => {
  2. let i = 0;
  3. let id = setInterval(() => {
  4. observer.next(i++);
  5. },1000)
  6. return function(){
  7. clearInterval( id );
  8. }
  9. })
  10. let subscription = stream$.subscribe((value) => {
  11. console.log('Value', value)
  12. });
  13. setTimeout(() => {
  14. subscription.unsubscribe() // here we invoke the cleanup function
  15. }, 3000)

So ensure that you

  • Define a function that cleans up
  • Implicitely call that function by calling subscription.unsubscribe()