0

I try to learn Rxjs by coding below example.

example is for accessing userinfo with usage pipe operator.

const rxjs =require('rxjs');
const { Observable, fromEvent, switchMap, take, pipe, map, interval, of, delay, tap, from, toArray, mergeMap } = rxjs;



const userList = [{
  id : 1,
  name : "Amanda",
  address : "LA",
  follows: [ 2, 3]
},{
  id : 2,
  name : "Harry",
  address : "London",
  follows : [ 3 ]
},{
  id : 3,
  name : "Daniel",
  address : "Cologne",
  follows : [ 1, 2 , 4 , 5 ,6]
},{
  id : 4,
  name : "John",
  address : "Pusan",
  follows : [ 2, 3, 4]
},{
  id : 5,
  name : "Hopper",
  address : "Tokyo",
  follows : [ 2, 3, 4, 6]
},{
 id : 6, 
 name : "Jim",
 address : "Paris",
 follows : [ 5 ]
}
]

const getUserById = async (id) => { //(*1)
    console.log("getUserById with " , id);
    //await new Promise( (resolve, _) => setTimeout( () => { resolve(); console.log("[promise] resoved : " , id) }, id*500));
    const ret = userList.find( (e) => e.id === id);

    console.log("getUserById ret : ", ret);
    return ret;
};


async function test(){
  
  const request$ = of([1,2],[3,4],[5,6]);

  console.log("Begin");

  const userInfos$ = request$.pipe(
    //delay(1000), (*2)
    tap( x => console.log('----------[handling]----------')),
    switchMap((ids) => {
      console.log("Observable start with : ", ids);
      const infoObservables = ids.map( id => from(getUserById(id)));

      const infos$ = rxjs.combineLatest(infoObservables);
      console.log("done :", ids);
      infos$.subscribe( x => console.log("#Inner", x));

      return infos$;


    }),
    tap( v => console.log("---------[end]---------"))
  );

userInfos$.subscribe( x => console.log("observer",x));


console.log("End");
}

test();

Results

I expected that observer executes n times when input n of Observable. But result is only 1 time subscriber execution.

Expected

\[1   ,    2\] ---------- \[3    ,    4\] ---------- \[5   ,   6\]------------------- |--\>

| switchMap |

\[usr1, usr2\] ---------- \[usr3, usr4\] ---------- \[usr5, usr6\]------------------- |--\>

Actual

\[1   ,    2\] ---------- \[3    ,    4\] ---------- \[5   ,   6\]------------------- |--\>

| switchMap |

------------------------------------------------------\[usr5, usr6\]------------------- |--\>

My Environment

OS :windows10

runtime : nodejs v18.17.1

package : [email protected]

What I found

I tried to log for finding what I miss. But, I can't still find it.

What I found is that something defers for switchMap() to return output Observable. I checked this result by subscribing output Observable to log it inside SwtichMap().

I considered nodejs featrues such as event loop, call stack, microtask queue, .. etc. But these consideration is not fit to below sub-question. If problem is related to nodejs features, case when using delay() operator , which is commented (*2), doesn't effect result, since delay() operator just delays emiting source from request$ variable.

So, I hope advice to recognize me what I miss.

Here is process I understand when request$ is only Observable([1,2]). If I misunderstand, please give me advice.

Expected ) case: requests$ = Observable([1,2])

requests$ = Observable([1,2]) 

Inside switchMap() { 
    ids = [ 1, 2 ] 

    infoObservables = [ Observable(usr1) , Observable(usr2) ]

    info$ = Observable([usr1, usr2])

}

userInfos$ = Observable([usr1, usr2])

Inside observer {
 x = [usr1,usr2]
}


sub-question

when I modify (*1) and (*2) which I comment at code I can get expected result. But I can't understand result niether.

I tried to relate these results from modification (*1) and (*2) and actual result, But I failed. I hope explanation to fully understand Observable and Rxjs Operator concept.

modification (*1)

// fix return type to just origin value from Promise<value>
const getUserById = (id) => { //(*1)
    console.log("getUserById with " , id);
    //await new Promise( (resolve, _) => setTimeout( () => { resolve(); console.log("[promise] resoved : " , id) }, id*500));
    const ret = userList.find( (e) => e.id === id);

    console.log("getUserById ret : ", ret);
    return ret;
};

// then change operator from() to of() because getUserById return type is not Promise.

    switchMap((ids) => {
      ...
      const infoObservables = ids.map( id => of(getUserById(id)));
      ...
    }),


modification (*2)

  // insert delay() operator
  const userInfos$ = request$.pipe(
    delay(1000), //(*2) 
    tap( x => console.log('----------[handling]----------')),
    switchMap((ids) => {
             ...
    }),
    tap( v => console.log("---------[end]---------"))
  );
3
  • I found out that problem is from not considering nodejs feature. Commented Sep 5 at 6:10
  • conclusion is need to consider microtask and event loop on this example code, when converting Promise to Observable by using from() or defer(). I thought that converted promise immediately returns values in callback of swtichMap() and returing is handled on test() execution. but, it is handled on asynchornously and seemed to be queued in microtask q. Commented Sep 5 at 6:34
  • adding delay() pipe-operator gave me hint. when I set delay 0s, I got expected result. Due to delay() queues userInfo$ execution on microtask queue, userInfo$ exeuction runs after test() execution. since there is no task on callstak , test() has been done already, converted Promise returns value immediately to userInfos$. Commented Sep 5 at 6:40

0