logo
down
shadow

RxJs split stream into multiple streams


RxJs split stream into multiple streams

By : Abdel rachid MECHAKR
Date : November 19 2020, 03:01 PM
wish help you to fix your issue You can use window and share the source Observable. There's also a little trick with bufferCount(2, 1):
code :
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

source
    .bufferCount(2, 1) // delay emission by one item
    .map(arr => arr[0])
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);
[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]
const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

const connectable = source.publish();

connectable
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

connectable.connect();


Share : facebook icon twitter icon
Combine initial stream with modify streams with RxJS

Combine initial stream with modify streams with RxJS


By : Lingling
Date : March 29 2020, 07:55 AM
I hope this helps you . I'm trying to think functional reactive and set up a list of elements on a page as a stream. This is using Angular2 but the problem should be similar to any stream-based architecture. So, I have two streams currently, the initial stream (http call to get a list of users from github) and a remove user stream (occurs when the remove user button is clicked). I believe the marble diagram would look like the following:
code :
ngOnInit() { 
   this.users$ = new BehaviorSubject([]); // init with empty user array
   this._githubUserService.getUsers()
       .map((res) => res.json())
       .subscribe(this.users$);

   this.removeUser$ = new Subject();
   this.removeUser$.subscribe((user) => { 
       this.users$.take(1)  // get current users, as users$ is a BehaviorSubject
           .map(users => users.filter(u => u != user))   // remove `user`
           .subscribe(this.users$); // update users$ stream
   });
}
.....
   this.removeUser$.withLatestFrom(user$, 
       (toRemove, users) => users.filter(u => u != toRemove)   // remove `toRemove`
   ).subscribe(this.users$); // update users$ stream
Add n+1 streams to a RXJS stream with combineLatest

Add n+1 streams to a RXJS stream with combineLatest


By : Mattia Mello
Date : March 29 2020, 07:55 AM
it should still fix some issue Taking Oles' answer, simplifying a little and adding test data as given in question update
code :
const Subject = Rx.ReplaySubject
const ReplaySubject = Rx.ReplaySubject

const newStream =  new Subject()

// Set up output, no streams yet
const streamOfStreams = newStream
  .scan( (acc, stream) => {
    acc.push(stream);
    return acc; 
  }, [])
  .switchMap(vs => Observable.combineLatest(vs))
  .map(arrayOfValues => arrayOfValues.join(''))    // declutter
  .subscribe(console.log)

// Add a stream
const s1 = new ReplaySubject() 
newStream.next(s1)
// emit on streams
s1.next('A'); s1.next('B')

// Add a stream
const s2 = new ReplaySubject() 
newStream.next(s2)
// emit on streams
s2.next('0'); s1.next('C')
s1.next('D'); s2.next('1'); s1.next('E'); 

// Add a stream
const s3 = new ReplaySubject() 
newStream.next(s3)
// emit on streams
s3.next('a'); 
s1.next('F'); s2.next('2'); s1.next('G'); s2.next('3'); s1.next('H'); 
s3.next('b'); s1.next('I')
const streamA = Rx.Observable.timer(0,800).map(x => String.fromCharCode(x+ 65));
const streamB = Rx.Observable.timer(0,1300).map(x => x);
const streamC = Rx.Observable.timer(1100, 2000).map(x => String.fromCharCode(x+ 97));

setTimeout(() => newStream.next(streamA), 500);
setTimeout(() => newStream.next(streamB), 2000);
setTimeout(() => newStream.next(streamC), 3000);
  .switchMap(vs => Observable.combineLatest(vs))
const asHot = (stream) => {
  const hot = stream.multicast(() => new Rx.Subject())
  hot.connect()
  return hot
}
const asBuffered = (stream) => {
  const bufferOne = new ReplaySubject(1)
  stream.subscribe(value => bufferOne.next(value))
  return bufferOne
}
const addStreamOnFirstEmit = (stream) => {
  const buffered = asBuffered(stream)
  buffered.first().subscribe( _ => {
    newStream.next(buffered)
  })
}
const streamC = Rx.Observable.timer(1800, 2000).map(x => String.fromCharCode(x+ 97));
Rxjs: Merge two streams with one stream that depends of another

Rxjs: Merge two streams with one stream that depends of another


By : Vlad Opuchenik
Date : March 29 2020, 07:55 AM
like below fixes the issue I have a stream that calls a service that return a stream that needs the info of the original stream for the request. , Use the selector function in the switchMap operation:
code :
this.messages$ = this.selectedChat$
.switchMap(chat => this.chatService.subscribeMessages(chat.room), 
(chat,messages)=>({chat,messages})) // create a new object, using the inner observable and the mapped one
.map(data =>
  data.messages.map((message: any) => {
    // Here data.chat has the chat information
    return message;
  })
);
How to split a stream into multiple streams in sequence

How to split a stream into multiple streams in sequence


By : Dyrathror
Date : March 29 2020, 07:55 AM
To fix this issue You can iterate over N and split your stream in two at each iteration using partition:
code :
import { from, merge } from 'rxjs';
import { partition, map } from 'rxjs/operators';

const source = from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']);

function split(source, n) {
  const streams = [];
  let toSplit = source;
  for (let k = n; k > 0; --k) {
    const [stream, rest] = toSplit.pipe(
      partition((_, i) => i % k === 0)
    );
    streams.push(stream);
    toSplit = rest;
  }
  return streams;
}

const obs = split(source, 3);

const subscribe = merge(
  obs[0].pipe(map(val => `1: ${val}`)),
  obs[1].pipe(map(val => `2: ${val}`)),
  obs[2].pipe(map(val => `3: ${val}`)),
).subscribe(val => console.log(val));
How to split a stream and recombine the sub-streams final results in RxJs

How to split a stream and recombine the sub-streams final results in RxJs


By : user2968790
Date : March 29 2020, 07:55 AM
Any of those help About splitting the stream: https://www.learnrxjs.io/operators/transformation/partition.html
About "emit on complete", can't you just use complete callback instead? .subscribe(() => console.log('Emitted"), null, () => console.log('Completed'));
code :
const [evens, odds] = source.pipe(partition(val => val % 2 === 0));
evens = evens.pipe(startWith(undefined)); // This will emit undefined before everything, so forkJoin will surely emit
forkJoin(evens.pipe(startWith(undefined)), odds.pipe(startWith(undefined)))
  .subscribe(console.log))
Related Posts Related Posts :
  • Bootstrap DatePicker format mm/yyyy set max month
  • Firebase cloud firestore + auth: write only for signed in users
  • Getting jQuery.data functionality without jQuery
  • How to get incrementing serial numbers for new and removed items for jQuery sortable?
  • Highlighting a word or sentence in iframe, using javascript/Jquery
  • Calculate what percentage of a specific element has been scrolled into view
  • Knockout autocomplete with jquery doesn't allow to select custom value
  • react native - react-native-maps performance slow on iOS
  • ajax fallback when no internet connection
  • Show Textbox based on RadioButton selection or value when Page Loads
  • JS maximum call stack exceeded
  • Resetting object key values
  • How can i disable the toggle, preventing user to tap/click it?
  • How to include javascript file into LOV popups on apex oracle?
  • Javascript Angular 4 eventEmitter with ngClass
  • Webpack - module not found even though module exists
  • How to display following values using vue js?
  • Regexp: Allow only use of a few words and only once per word
  • CCapture.js webm video blacked out
  • Using a HTML hyperlink to call a JS function on the parent element
  • Return undefined from existing property in javascript model
  • What is the Difference Between These two jQuery Code Snippets?
  • How to get Network Speed in WebRTC
  • How to get text from selected value in a dropdownlist which is js based
  • window is not defined angular universal third library
  • Angularjs ng-repeat stylization depending on previous value
  • Trying to implement Fittext.js
  • Calculate number of match in array Lodash
  • Jquery Smooth Scroll Using Offset.top
  • How to extract data to React state from CSV file using Papa Parse?
  • How to add unique links to google maps markers
  • How to use if condition in a tool bar in java script
  • Ajax filter in django not showing in HTML
  • data collection with Javascript
  • Rotate image on lightbox2 load
  • Prevent body from scrolling when a Pop-Up is open
  • How to copy files that do not need to be compiled in Gulp?
  • Array not assigned to variable? How does this work and what exactly is it doing?
  • Sorting associative array of objects in javascript
  • Changing Icon in Sap.m.tree having CustomTreeItem
  • Merge two array of objects based on a key
  • javascript in css not working
  • Passing only clicked element to onClick function - reactjs
  • React boilerplate doesn't load js files in the index.html
  • is Child service inside child component visible in the Parent component?
  • Check if data attribute value equals a string
  • How to get value of child tag of a button tag
  • How to access subjects of selected mails in Apple Mail using JavaScript?
  • How to get all dynamically set inline-style CSS in jQuery?
  • Error: Module "html" does not provide a view engine (Express)
  • Random Image in <Div> from array
  • Slider with touch function
  • ReactJS Component Architecture Problems / Nested Components or Single Component Manager
  • Javascript: Caching within Closure doesn't work
  • HTM5 Canvas Drawing App: How Do I Select The Color?
  • Assigning Events using HTML DOM
  • html5 getUserMedia() portrait mode
  • How to avoid 'headers already sent' within Promise chain?
  • Get a result from a react native app integrated into an existing android app
  • Why does the value of input field return undefined
  • shadow
    Privacy Policy - Terms - Contact Us © voile276.org