By Viatorus


2018-11-08 23:25:07 8 Comments

I have to following problem:

Many API calls go through an API interface (Google API) and have to be limited in request per seconds/concurrency because of the Google API limitation.

I use a subject (sink/call pool), which manages all API requests with mergeMap and returns a result to another, piped subject.

Because API requests can unsubscribe before they finish, they shouldn't block my sink. So I have to stop the API request (task) after unsubscription.

The issue: I don't know how to capture this unsubscribed state correctly. What I currently do is overwriting subscribe and unsubscribe to catch this state. It works but it does not look to "rxjs"ish for me.

What could I improve it?

import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap} from 'rxjs/operators';

function doHeavyRequest() {
    return new Observable(subscribe => {
        // Simulate delay.
        setTimeout(() => {
            subscribe.next(1);
            subscribe.complete();
        }, 1000);
    });
}

const sink = new Subject<[Subject<any>, number]>();

sink.pipe(
    mergeMap(([subject, id]) => {
        // Stop request here if already unsubscribed.
        if (subject.closed) {
            console.log('Request cancelled:', id);
            return EMPTY;
        }
        return doHeavyRequest()
            .pipe(
                tap(res => {
                    if (!subject.closed) {
                        subject.next(res);
                        subject.complete();
                    } else {
                        console.log('Request aborted:', id);
                    }
                })
            );
    }, 2)
).subscribe();

// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe over the flag alive.
function getSomething(id: number) {
    const task = new Subject();

    const ob = task.asObservable();

    ob.subscribe = (...args: any[]) => {
        const sub = Observable.prototype.subscribe.call(ob, ...args);
        sub.unsubscribe = () => {
            if (!task.isStopped)
                task.unsubscribe();
            Subscription.prototype.unsubscribe.call(sub);
    };
        return sub;
    };

    sink.next([task, id]);

    return ob;
}

// Make 3 requests and unsubscribe.
export function test() {
    const ob0 = getSomething(0);
    const ob1 = getSomething(1);
    const ob2 = getSomething(2);

    const sub0 = ob0.subscribe(e => {
        console.log('0:', e);
    });
    setTimeout(() => sub0.unsubscribe(), 1500);

    const sub1 = ob1.subscribe(e => {
        console.log('1:', e);
    });
    setTimeout(() => sub1.unsubscribe(), 900);

    const sub2 = ob2.subscribe(e => {
        console.log('2:', e);
    });
    setTimeout(() => sub2.unsubscribe(), 100);
}

See the test.ts at plunker and the console output:

https://next.plnkr.co/edit/KREjMprTrjHu2zMI?preview

2 comments

@Viatorus 2018-11-09 09:59:28

Thanks to @Badashi, using finalize worked and looks much better:

import {Observable, Subject, Subscription, Subscribable, EMPTY} from 'rxjs';
import {mergeMap, tap, finalize} from 'rxjs/operators';

function doHeavyRequest() {
    return new Observable(subscribe => {
        // Simulate delay.
        setTimeout(() => {
            subscribe.next(1);
            subscribe.complete();
        }, 1000);
    });
}

const sink = new Subject<[Subject<any>, number]>();

sink.pipe(
    mergeMap(([subject, id]) => {
        // Stop request here if already unsubscribed.
        if (subject.closed) {
            console.log('Request cancelled:', id);
            return EMPTY;
        }
        return doHeavyRequest()
            .pipe(
                tap(res => {
                    if (!subject.closed) {
                        subject.next(res);
                        subject.complete();
                    } else { 
                        console.log('Request aborted:', id);
                    }
                })
            );
    }, 2)
).subscribe();

// Insert request into sink.
// Overwrite subscribe and unsubscribe.
// Track unsubscribe.
function getSomething(id: number) {
    const task = new Subject();
    const ob = task.pipe(finalize(() => {
        if (!task.isStopped) {
            task.unsubscribe();
        }
    }));

    sink.next([task, id]);

    return ob;
}

// Make 3 requests and unsubscribe.
export function test() {
    const ob0 = getSomething(0);
    const ob1 = getSomething(1);
    const ob2 = getSomething(2);

    const sub0 = ob0.subscribe(e => {
        console.log('0:', e);
    });
    setTimeout(() => sub0.unsubscribe(), 1500);

    const sub1 = ob1.subscribe(e => {
        console.log('1:', e);
    });
    setTimeout(() => sub1.unsubscribe(), 900);

    const sub2 = ob2.subscribe(e => {
        console.log('2:', e);
    });
    setTimeout(() => sub2.unsubscribe(), 100);
}

Output:

0: 1
Request cancelled: 2
Request aborted: 1

@Badashi 2018-11-09 01:22:37

I'm not sure I understood it properly, but it looks like you want to do some cleanup upon unsubscribing, correct?

You can add teardown logic to a single subscription like so:

const subscription = obs.subscribe(() => {...})
subscription.add(() => { /* do cleanup here. This is executed upon unsubscribing. */})

Perhaps the finalize pipeable operator might be useful as well. This one adds logic to an observable when it completes, which most of the time is upon complete OR unsubscription. Varies a bit for hot observables, so be aware.

When creating an observable, you can also add teardown logic into it by returning a function from its inner logic function, much like pipe'ing a finalize:

const obs = new Observable(subject => { /* subject.next/error/complete somewhere */
  return () => { /* cleanup resources upon unsubscribe OR complete */ }
})

@Viatorus 2018-11-09 07:05:31

Yes, something like a cleanup. I want to prevent an API call which stucks in the pipeline of the sink (because of limited concurrency) if it is already unsubscribed.

@Viatorus 2018-11-09 07:15:30

Your first example (subscription.add) does not work for me, because "I don't know" where the observable get subscribed. The second example looks better for me but a subject (task) doesn't have a finalize method?

@Badashi 2018-11-09 09:41:56

Since Subjects are observables, you can pipe the finalize operator on them: const subWithFinalize = subject.pipe(finalize(cleanupFunction)).

Related Questions

Sponsored Content

8 Answered Questions

[SOLVED] Angular2 - Http POST request parameters

  • 2016-02-04 21:52:48
  • Christopher
  • 153130 View
  • 75 Score
  • 8 Answer
  • Tags:   typescript angular

16 Answered Questions

[SOLVED] Angular HTTP GET with TypeScript error http.get(...).map is not a function in [null]

  • 2015-12-29 16:35:48
  • Claudiu
  • 194849 View
  • 286 Score
  • 16 Answer
  • Tags:   angular rxjs

3 Answered Questions

[SOLVED] Cancel pending $http request

  • 2018-09-24 08:00:25
  • Abhinav Singh
  • 57 View
  • 0 Score
  • 3 Answer
  • Tags:   angular typescript

9 Answered Questions

[SOLVED] How to pass url arguments (query string) to a HTTP request on Angular?

19 Answered Questions

16 Answered Questions

[SOLVED] Angular EXCEPTION: No provider for Http

  • 2015-11-15 15:15:21
  • daniel
  • 184707 View
  • 286 Score
  • 16 Answer
  • Tags:   angular

1 Answered Questions

[SOLVED] RxJS HTTP-Client: response type for not JSON

5 Answered Questions

[SOLVED] Delayed HTTP Request in Angular

  • 2018-07-18 13:33:15
  • Marsch
  • 261 View
  • 0 Score
  • 5 Answer
  • Tags:   angular rxjs rxjs6

7 Answered Questions

1 Answered Questions

[SOLVED] Cancel a HTTP request inside an Observable.create

  • 2016-07-04 16:09:29
  • Daniel Crisp
  • 3797 View
  • 4 Score
  • 1 Answer
  • Tags:   angular rxjs

Sponsored Content