By Mohammad Ismail


2018-08-15 16:51:05 8 Comments

I have an observable get data from stream each time at size 512 each next I have to break it up to 200 char at other observable and keep [12] char in other buffer to concatenate with next block, I solve it by using new subject and for loop, I believe there maybe a better, more pretty solution.

received Observable ----------------------------------------

  • 1st next [512] -------> [112] [200] [200] -------> [200] [200]
  • 2nd next [512][112] --> [24][200][200] [88+112] --> [200] [200]
  • 3rd next [512][24] --> [136] [200] [76+124] .....
  • nth iteration [512][194] --> [106][200][200][106+94] --> [200][200][200]

  • n+1th [512][6].......

maxValueSize = 200
this._sreamRecord$.subscribe(
    {
        next: (val) => {
            const bufferToSend: Buffer = Buffer.concat([completationBuffer, val])
            for (let i = 0; i < bufferToSend.length; i += maxValueSize) {
                if (bufferToSend.length - i > maxValueSize) {
                    bufferStreamer.next(bufferToSend.slice(i, i + maxValueSize))
                } else {
                    completationBuffer = bufferToSend.slice(i, i + maxValueSize)
                }
            }
        },
        complete() {
            if (completationBuffer.length) {
                bufferStreamer.next(completationBuffer)
            }
            bufferStreamer.complete()
        }
    })

1 comments

@Picci 2018-08-19 13:17:01

You may want to consider a solution along these lines

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

let f = splitInChunksWithRemainder([]);

this._sreamRecord$.pipe(
    switchMap(s => {
        const res = f(s);
        f = splitInChunksWithRemainder(res.newRemainder);
        return from(res.chunks);
    })
)
.subscribe(console.log);

The idea is to split each streamRecord with lodash chunk function after having concatenated the previous remainder, i.e. the array left as tail from the split of the previous streamRecord.

This is done using the function splitInChunksWithRemainder, which is an higher level function, i.e. a function which returns a function, in this case after having set the remainder coming from the previous split.

UPDATE after comment

If you need to emit also the last newRemainder, than you can consider a slightly more complex solution such as the following

const splitInChunksWithRemainder = (remainder: Array<any>) => {
    return (streamRecord: Array<any>) => {
        const streamRecordWithRemainder = remainder.concat(streamRecord);
        let chunks = _.chunk(streamRecordWithRemainder, maxValueSize);
        const last = chunks[chunks.length - 1];
        let newRemainder = [];
        if (last.length != maxValueSize) {
            newRemainder = chunks[chunks.length - 1];
            chunks.length = chunks.length - 1;
        }
        return {chunks, newRemainder};
    };
}

const pipeableChain = () => (source: Observable<any>) => {
    let f = splitInChunksWithRemainder([]);
    let lastRemainder: any[];
    return source.pipe(
        switchMap(s => {
            const res = f(s);
            lastRemainder = res.newRemainder;
            f = splitInChunksWithRemainder(lastRemainder);
            return from(res.chunks);
        }),
        concat(defer(() => of(lastRemainder)))
    )
}

_streamRecord$.pipe(
    pipeableChain()
)
.subscribe(console.log);

We have introduced the pipeableChain function. In this function we save the remainder which is returned by the execution of splitInChunksWithRemainder. Once the source Observable completes, we add a last notification via the concat operator. As you see, we have also to use the defer operator to make sure we create the Observable only when the Observer subscribes, i.e. after the source Observable completes. Without defer the Observable passed to concat as parameter would be created when the source Observable is initially subscribed, i.e. when lastRemainder is still undefined.

@Mohammad Ismail 2018-08-27 07:52:28

at complete the newRemainder will not be emitted.

@Picci 2018-08-28 13:13:16

The fastest way to get the the last newRemainder is to change the code from return from(res.chunks); to return from(res);. In this way for every notification of the Observable (and therefore also for the last one) you would get both the chunks and the remainder. If you really need an Observable which emits the last newRemainder as its last value before completing, than you can look at my updated answer.

Related Questions

Sponsored Content

16 Answered Questions

[SOLVED] Get the size of the screen, current web page and browser window

17 Answered Questions

4 Answered Questions

[SOLVED] How to break/exit from a each() function in JQuery?

  • 2009-11-25 19:11:17
  • Tebo
  • 642528 View
  • 587 Score
  • 4 Answer
  • Tags:   javascript jquery

28 Answered Questions

[SOLVED] Short circuit Array.forEach like calling break

  • 2010-04-14 21:57:31
  • Scott
  • 864774 View
  • 1321 Score
  • 28 Answer
  • Tags:   javascript arrays

1 Answered Questions

[SOLVED] Pausable buffer with RxJS

  • 2019-02-22 14:21:50
  • frankie567
  • 128 View
  • 2 Score
  • 1 Answer
  • Tags:   rxjs rxjs6

4 Answered Questions

[SOLVED] Promise.all behavior with RxJS Observables?

11 Answered Questions

[SOLVED] Return an empty Observable

1 Answered Questions

[SOLVED] RXJS Subject - stopping errors from propagating

2 Answered Questions

[SOLVED] Was there a recent breaking change in RXJS?

1 Answered Questions

[SOLVED] RXJS buffer observable with max and min size

Sponsored Content