1- import { concat , NEVER , Observable , of , Subject } from "rxjs"
1+ import { concat , from , NEVER , Observable , of , Subject } from "rxjs"
22import { catchError , switchMap , take } from "rxjs/operators"
33import { TestScheduler } from "rxjs/testing"
44import { partitionByKey } from "./"
@@ -9,40 +9,210 @@ const scheduler = () =>
99 } )
1010
1111describe ( "partitionByKey" , ( ) => {
12- describe ( "activeKeys$ " , ( ) => {
13- it ( "emits a list with all the active keys " , ( ) => {
12+ describe ( "behaviour " , ( ) => {
13+ it ( "groups observables by using the key function " , ( ) => {
1414 scheduler ( ) . run ( ( { expectObservable, cold } ) => {
15- const source = cold ( "-ab-a-cd---" )
16- const expectedStr = "efg---hi---"
17- const [ , result ] = partitionByKey (
15+ const source = cold ( "-12-3456-" )
16+ const expectOdd = " -1--3-5--"
17+ const expectEven = " --2--4-6-"
18+
19+ const [ getInstance$ ] = partitionByKey (
20+ source ,
21+ ( v ) => Number ( v ) % 2 ,
22+ ( v$ ) => v$ ,
23+ )
24+
25+ expectObservable ( getInstance$ ( 0 ) ) . toBe ( expectEven )
26+ expectObservable ( getInstance$ ( 1 ) ) . toBe ( expectOdd )
27+ } )
28+ } )
29+
30+ it ( "unsubscribes from all streams when refcount reaches 0" , ( ) => {
31+ let innerSubs = 0
32+ const inner = new Observable < number > ( ( ) => {
33+ innerSubs ++
34+ return ( ) => {
35+ innerSubs --
36+ }
37+ } )
38+
39+ const sourceSubject = new Subject < number > ( )
40+ let sourceSubs = 0
41+ const source = new Observable < number > ( ( obs ) => {
42+ sourceSubs ++
43+ sourceSubject . subscribe ( obs )
44+ return ( ) => {
45+ sourceSubs --
46+ }
47+ } )
48+
49+ const [ getObs ] = partitionByKey (
50+ source ,
51+ ( v ) => v ,
52+ ( ) => inner ,
53+ )
54+ const observable = getObs ( 1 )
55+
56+ expect ( sourceSubs ) . toBe ( 0 )
57+ expect ( innerSubs ) . toBe ( 0 )
58+
59+ const sub1 = observable . subscribe ( )
60+
61+ expect ( sourceSubs ) . toBe ( 1 )
62+ expect ( innerSubs ) . toBe ( 0 )
63+
64+ sourceSubject . next ( 1 )
65+
66+ expect ( sourceSubs ) . toBe ( 1 )
67+ expect ( innerSubs ) . toBe ( 1 )
68+
69+ const sub2 = observable . subscribe ( )
70+
71+ expect ( sourceSubs ) . toBe ( 1 )
72+ expect ( innerSubs ) . toBe ( 1 )
73+
74+ sub1 . unsubscribe ( )
75+
76+ expect ( sourceSubs ) . toBe ( 1 )
77+ expect ( innerSubs ) . toBe ( 1 )
78+
79+ sub2 . unsubscribe ( )
80+
81+ expect ( sourceSubs ) . toBe ( 0 )
82+ expect ( innerSubs ) . toBe ( 0 )
83+ } )
84+
85+ it ( "emits a complete on the inner observable when the source completes" , ( ) => {
86+ scheduler ( ) . run ( ( { expectObservable, cold } ) => {
87+ const source = cold ( "-ab-a-|" )
88+ const expectA = " -a--a-(c|)"
89+ const expectB = " --b---(c|)"
90+
91+ const [ getInstance$ ] = partitionByKey (
1892 source ,
1993 ( v ) => v ,
20- ( ) => NEVER ,
94+ ( v$ ) => concat ( v$ , [ "c" ] ) ,
2195 )
2296
23- expectObservable ( result ) . toBe ( expectedStr , {
24- e : [ ] ,
25- f : [ "a" ] ,
26- g : [ "a" , "b" ] ,
27- h : [ "a" , "b" , "c" ] ,
28- i : [ "a" , "b" , "c" , "d" ] ,
97+ expectObservable ( getInstance$ ( "a" ) ) . toBe ( expectA )
98+ expectObservable ( getInstance$ ( "b" ) ) . toBe ( expectB )
99+ } )
100+ } )
101+
102+ it ( "emits the error on the inner observable when the source errors" , ( ) => {
103+ scheduler ( ) . run ( ( { expectObservable, cold } ) => {
104+ const source = cold ( "-ab-a-#" )
105+ const expectA = " -a--a-(e|)"
106+ const expectB = " --b---(e|)"
107+
108+ const [ getInstance$ ] = partitionByKey (
109+ source ,
110+ ( v ) => v ,
111+ ( v$ ) => v$ . pipe ( catchError ( ( ) => of ( "e" ) ) ) ,
112+ )
113+
114+ expectObservable ( getInstance$ ( "a" ) ) . toBe ( expectA )
115+ expectObservable ( getInstance$ ( "b" ) ) . toBe ( expectB )
116+ } )
117+ } )
118+
119+ it ( "handles an empty Observable" , ( ) => {
120+ scheduler ( ) . run ( ( { expectSubscriptions, expectObservable, cold } ) => {
121+ const e1 = cold ( " |" )
122+ const e1subs = " (^!)"
123+ const expectObs = "|"
124+ const expectKey = "(x|)"
125+
126+ const [ getObs , keys$ ] = partitionByKey (
127+ e1 ,
128+ ( v ) => v ,
129+ ( v$ ) => v$ ,
130+ )
131+
132+ expectObservable ( getObs ( "" ) ) . toBe ( expectObs )
133+ expectSubscriptions ( e1 . subscriptions ) . toBe ( e1subs )
134+ expectObservable ( keys$ ) . toBe ( expectKey , { x : [ ] } )
135+ } )
136+ } )
137+
138+ it ( "handles a never Observable" , ( ) => {
139+ scheduler ( ) . run ( ( { expectSubscriptions, expectObservable, cold } ) => {
140+ const e1 = cold ( " --" )
141+ const e1subs = " ^-"
142+ const expectObs = "--"
143+ const expectKey = "x-"
144+
145+ const [ getObs , keys$ ] = partitionByKey (
146+ e1 ,
147+ ( v ) => v ,
148+ ( v$ ) => v$ ,
149+ )
150+
151+ expectObservable ( getObs ( "" ) ) . toBe ( expectObs )
152+ expectSubscriptions ( e1 . subscriptions ) . toBe ( e1subs )
153+ expectObservable ( keys$ ) . toBe ( expectKey , { x : [ ] } )
154+ } )
155+ } )
156+
157+ it ( "handles a just-throw Observable" , ( ) => {
158+ scheduler ( ) . run ( ( { expectSubscriptions, expectObservable, cold } ) => {
159+ const e1 = cold ( " #" )
160+ const e1subs = " (^!)"
161+ const expectObs = "#"
162+ const expectKey = "(x#)"
163+
164+ const [ getObs , keys$ ] = partitionByKey (
165+ e1 ,
166+ ( v ) => v ,
167+ ( v$ ) => v$ ,
168+ )
169+
170+ expectObservable ( getObs ( "" ) ) . toBe ( expectObs )
171+ expectSubscriptions ( e1 . subscriptions ) . toBe ( e1subs )
172+ expectObservable ( keys$ ) . toBe ( expectKey , { x : [ ] } )
173+ } )
174+ } )
175+
176+ it ( "handles synchronous values" , ( ) => {
177+ scheduler ( ) . run ( ( { expectObservable } ) => {
178+ const e1 = from ( [ "1" , "2" , "3" , "4" , "5" ] )
179+ const expectOdd = " (135|)"
180+ const expectEven = "(24|)"
181+ const expectKeys = "(wxyz|)"
182+ const [ getObs , keys$ ] = partitionByKey (
183+ e1 ,
184+ ( v ) => Number ( v ) % 2 ,
185+ ( v$ ) => v$ ,
186+ )
187+ expectObservable ( keys$ ) . toBe ( expectKeys , {
188+ w : [ 1 ] ,
189+ x : [ 1 , 0 ] ,
190+ y : [ 0 ] ,
191+ z : [ ] ,
29192 } )
193+ expectObservable ( getObs ( 0 ) ) . toBe ( expectEven )
194+ expectObservable ( getObs ( 1 ) ) . toBe ( expectOdd )
30195 } )
31196 } )
197+ } )
32198
33- it ( "emits all the synchronous groups in a single emission" , ( ) => {
199+ describe ( "activeKeys$" , ( ) => {
200+ it ( "emits a list with all the active keys" , ( ) => {
34201 scheduler ( ) . run ( ( { expectObservable, cold } ) => {
35- const source = concat ( of ( "a" , "b" ) , cold ( "--c--" ) )
36- const expectedStr = " g-h --"
202+ const source = cold ( "-ab-a-cd---" )
203+ const expectedStr = "efg---hi- --"
37204 const [ , result ] = partitionByKey (
38205 source ,
39206 ( v ) => v ,
40207 ( ) => NEVER ,
41208 )
42209
43210 expectObservable ( result ) . toBe ( expectedStr , {
211+ e : [ ] ,
212+ f : [ "a" ] ,
44213 g : [ "a" , "b" ] ,
45214 h : [ "a" , "b" , "c" ] ,
215+ i : [ "a" , "b" , "c" , "d" ] ,
46216 } )
47217 } )
48218 } )
@@ -130,12 +300,12 @@ describe("partitionByKey", () => {
130300 } )
131301 } )
132302
133- it ( "errors when the source emits an error" , ( ) => {
303+ it ( "errors when the source emits an error and no group is active " , ( ) => {
134304 scheduler ( ) . run ( ( { expectObservable, cold } ) => {
135305 const source = cold ( "-ab--#" )
136- const a = cold ( " --1---2 " )
137- const b = cold ( " ------ " )
138- const expectedStr = "efg-- #"
306+ const a = cold ( " --1| " )
307+ const b = cold ( " -| " )
308+ const expectedStr = "efghi #"
139309 const innerStreams : Record < string , Observable < string > > = { a, b }
140310 const [ , result ] = partitionByKey (
141311 source ,
@@ -151,16 +321,18 @@ describe("partitionByKey", () => {
151321 e : [ ] ,
152322 f : [ "a" ] ,
153323 g : [ "a" , "b" ] ,
324+ h : [ "a" ] ,
325+ i : [ ] ,
154326 } )
155327 } )
156328 } )
157329
158- it ( "removes a key when its inner stream emits an error" , ( ) => {
330+ it ( "doesn't error when the source errors and its inner streams stop the error" , ( ) => {
159331 scheduler ( ) . run ( ( { expectObservable, cold } ) => {
160- const source = cold ( "-ab----- " )
161- const a = cold ( " --1-# " )
162- const b = cold ( " ------ " )
163- const expectedStr = "efg--h "
332+ const source = cold ( "-ab--# " )
333+ const a = cold ( " --1--2--3| " )
334+ const b = cold ( " ----| " )
335+ const expectedStr = "efg---h---(i|) "
164336 const innerStreams : Record < string , Observable < string > > = { a, b }
165337 const [ , result ] = partitionByKey (
166338 source ,
@@ -176,7 +348,29 @@ describe("partitionByKey", () => {
176348 e : [ ] ,
177349 f : [ "a" ] ,
178350 g : [ "a" , "b" ] ,
179- h : [ "b" ] ,
351+ h : [ "a" ] ,
352+ i : [ ] ,
353+ } )
354+ } )
355+ } )
356+
357+ it ( "errors when one of its inner stream emits an error" , ( ) => {
358+ scheduler ( ) . run ( ( { expectObservable, cold } ) => {
359+ const source = cold ( "-ab-----" )
360+ const a = cold ( " --1-#" )
361+ const b = cold ( " ------" )
362+ const expectedStr = "efg--#"
363+ const innerStreams : Record < string , Observable < string > > = { a, b }
364+ const [ , result ] = partitionByKey (
365+ source ,
366+ ( v ) => v ,
367+ ( _ , v ) => innerStreams [ v ] ,
368+ )
369+
370+ expectObservable ( result ) . toBe ( expectedStr , {
371+ e : [ ] ,
372+ f : [ "a" ] ,
373+ g : [ "a" , "b" ] ,
180374 } )
181375 } )
182376 } )
@@ -235,84 +429,29 @@ describe("partitionByKey", () => {
235429 expect ( lateNext ) . toHaveBeenCalledWith ( 1 )
236430 } )
237431
238- it ( "unsubscribes from all streams when refcount reaches 0" , ( ) => {
239- let innerSubs = 0
240- const inner = new Observable < number > ( ( ) => {
241- innerSubs ++
242- return ( ) => {
243- innerSubs --
244- }
245- } )
246-
247- const sourceSubject = new Subject < number > ( )
248- let sourceSubs = 0
249- const source = new Observable < number > ( ( obs ) => {
250- sourceSubs ++
251- sourceSubject . subscribe ( obs )
252- return ( ) => {
253- sourceSubs --
254- }
255- } )
256-
257- const [ getObs ] = partitionByKey (
258- source ,
259- ( v ) => v ,
260- ( ) => inner ,
261- )
262- const observable = getObs ( 1 )
263-
264- expect ( sourceSubs ) . toBe ( 0 )
265- expect ( innerSubs ) . toBe ( 0 )
266-
267- const sub1 = observable . subscribe ( )
268-
269- expect ( sourceSubs ) . toBe ( 1 )
270- expect ( innerSubs ) . toBe ( 0 )
271-
272- sourceSubject . next ( 1 )
273-
274- expect ( sourceSubs ) . toBe ( 1 )
275- expect ( innerSubs ) . toBe ( 1 )
276-
277- const sub2 = observable . subscribe ( )
278-
279- expect ( sourceSubs ) . toBe ( 1 )
280- expect ( innerSubs ) . toBe ( 1 )
281-
282- sub1 . unsubscribe ( )
283-
284- expect ( sourceSubs ) . toBe ( 1 )
285- expect ( innerSubs ) . toBe ( 1 )
286-
287- sub2 . unsubscribe ( )
288-
289- expect ( sourceSubs ) . toBe ( 0 )
290- expect ( innerSubs ) . toBe ( 0 )
291- } )
292-
293- it ( "emits a complete on the inner observable when the source completes" , ( ) => {
432+ it ( "lets the projection function handle completions" , ( ) => {
294433 scheduler ( ) . run ( ( { expectObservable, cold } ) => {
295434 const source = cold ( "-ab-a-|" )
296- const expectA = " -a--a-(c|)"
297- const expectB = " --b---(c|)"
435+ const concatenated = cold ( "123|" )
436+ const expectA = " -a--a-123|"
437+ const expectB = " --b---123|"
298438
299439 const [ getInstance$ ] = partitionByKey (
300440 source ,
301441 ( v ) => v ,
302- ( v$ ) => concat ( v$ , [ "c" ] ) ,
442+ ( v$ ) => concat ( v$ , concatenated ) ,
303443 )
304444
305445 expectObservable ( getInstance$ ( "a" ) ) . toBe ( expectA )
306446 expectObservable ( getInstance$ ( "b" ) ) . toBe ( expectB )
307447 } )
308448 } )
309449
310- // Do we want this behaviour?
311- it ( "emits an error when the source errors" , ( ) => {
450+ it ( "lets the projection function catch source errors" , ( ) => {
312451 scheduler ( ) . run ( ( { expectObservable, cold } ) => {
313452 const source = cold ( "-ab-a-#" )
314- const expectA = " -a--a-# "
315- const expectB = " --b---# "
453+ const expectA = " -a--a-(e|) "
454+ const expectB = " --b---(e|) "
316455
317456 const [ getInstance$ ] = partitionByKey (
318457 source ,
0 commit comments