I try to learn Rxjs by coding below example.
example is for accessing userinfo with usage pipe operator.
const rxjs =require('rxjs');
const { Observable, fromEvent, switchMap, take, pipe, map, interval, of, delay, tap, from, toArray, mergeMap } = rxjs;
const userList = [{
id : 1,
name : "Amanda",
address : "LA",
follows: [ 2, 3]
},{
id : 2,
name : "Harry",
address : "London",
follows : [ 3 ]
},{
id : 3,
name : "Daniel",
address : "Cologne",
follows : [ 1, 2 , 4 , 5 ,6]
},{
id : 4,
name : "John",
address : "Pusan",
follows : [ 2, 3, 4]
},{
id : 5,
name : "Hopper",
address : "Tokyo",
follows : [ 2, 3, 4, 6]
},{
id : 6,
name : "Jim",
address : "Paris",
follows : [ 5 ]
}
]
const getUserById = async (id) => { //(*1)
console.log("getUserById with " , id);
//await new Promise( (resolve, _) => setTimeout( () => { resolve(); console.log("[promise] resoved : " , id) }, id*500));
const ret = userList.find( (e) => e.id === id);
console.log("getUserById ret : ", ret);
return ret;
};
async function test(){
const request$ = of([1,2],[3,4],[5,6]);
console.log("Begin");
const userInfos$ = request$.pipe(
//delay(1000), (*2)
tap( x => console.log('----------[handling]----------')),
switchMap((ids) => {
console.log("Observable start with : ", ids);
const infoObservables = ids.map( id => from(getUserById(id)));
const infos$ = rxjs.combineLatest(infoObservables);
console.log("done :", ids);
infos$.subscribe( x => console.log("#Inner", x));
return infos$;
}),
tap( v => console.log("---------[end]---------"))
);
userInfos$.subscribe( x => console.log("observer",x));
console.log("End");
}
test();
Results
I expected that observer executes n times when input n of Observable. But result is only 1 time subscriber execution.
Expected
\[1 , 2\] ---------- \[3 , 4\] ---------- \[5 , 6\]------------------- |--\>
| switchMap |
\[usr1, usr2\] ---------- \[usr3, usr4\] ---------- \[usr5, usr6\]------------------- |--\>
Actual
\[1 , 2\] ---------- \[3 , 4\] ---------- \[5 , 6\]------------------- |--\>
| switchMap |
------------------------------------------------------\[usr5, usr6\]------------------- |--\>
My Environment
OS :windows10
runtime : nodejs v18.17.1
package : [email protected]
What I found
I tried to log for finding what I miss. But, I can't still find it.
What I found is that something defers for switchMap() to return output Observable. I checked this result by subscribing output Observable to log it inside SwtichMap().
I considered nodejs featrues such as event loop, call stack, microtask queue, .. etc. But these consideration is not fit to below sub-question. If problem is related to nodejs features, case when using delay() operator , which is commented (*2), doesn't effect result, since delay() operator just delays emiting source from request$ variable.
So, I hope advice to recognize me what I miss.
Here is process I understand when request$ is only Observable([1,2]). If I misunderstand, please give me advice.
Expected ) case: requests$ = Observable([1,2])
requests$ = Observable([1,2])
Inside switchMap() {
ids = [ 1, 2 ]
infoObservables = [ Observable(usr1) , Observable(usr2) ]
info$ = Observable([usr1, usr2])
}
userInfos$ = Observable([usr1, usr2])
Inside observer {
x = [usr1,usr2]
}
sub-question
when I modify (*1) and (*2) which I comment at code I can get expected result. But I can't understand result niether.
I tried to relate these results from modification (*1) and (*2) and actual result, But I failed. I hope explanation to fully understand Observable and Rxjs Operator concept.
modification (*1)
// fix return type to just origin value from Promise<value>
const getUserById = (id) => { //(*1)
console.log("getUserById with " , id);
//await new Promise( (resolve, _) => setTimeout( () => { resolve(); console.log("[promise] resoved : " , id) }, id*500));
const ret = userList.find( (e) => e.id === id);
console.log("getUserById ret : ", ret);
return ret;
};
// then change operator from() to of() because getUserById return type is not Promise.
switchMap((ids) => {
...
const infoObservables = ids.map( id => of(getUserById(id)));
...
}),
modification (*2)
// insert delay() operator
const userInfos$ = request$.pipe(
delay(1000), //(*2)
tap( x => console.log('----------[handling]----------')),
switchMap((ids) => {
...
}),
tap( v => console.log("---------[end]---------"))
);