Skip to content

Commit

Permalink
Add other starmap_indexed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Boudewijn26 committed Feb 16, 2023
1 parent 953ce32 commit cab78b4
Showing 1 changed file with 172 additions and 0 deletions.
172 changes: 172 additions & 0 deletions tests/test_observable/test_starmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,54 @@ def mapper(x, y):
assert xs.subscriptions == [subscribe(200, 290)]
assert invoked[0] == 3

def test_starmap_with_index_throws(self):
with self.assertRaises(RxException):
mapper = ops.starmap_indexed(lambda x, y, index: x)

return return_value((1, 10)).pipe(mapper).subscribe(lambda x: _raise("ex"))

with self.assertRaises(RxException):
return (
throw("ex").pipe(mapper).subscribe(lambda x: x, lambda ex: _raise(ex))
)

with self.assertRaises(RxException):
return (
empty()
.pipe(mapper)
.subscribe(lambda x: x, lambda ex: None, lambda: _raise("ex"))
)

with self.assertRaises(RxException):
return create(lambda o, s: _raise("ex")).pipe(mapper).subscribe()

def test_starmap_with_index_dispose_inside_mapper(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(100, (4, 40)), on_next(200, (3, 30)), on_next(500, (2, 20)), on_next(600, (1, 10))
)
invoked = [0]
results = scheduler.create_observer()
d = SerialDisposable()

def projection(x, y, index):
invoked[0] += 1
if scheduler.clock > 400:
d.dispose()

return x + y + index * 100

d.disposable = xs.pipe(ops.starmap_indexed(projection)).subscribe(results)

def action(scheduler, state):
return d.dispose()

scheduler.schedule_absolute(disposed, action)
scheduler.start()
assert results.messages == [on_next(100, 44), on_next(200, 133)]
assert xs.subscriptions == [subscribe(0, 500)]
assert invoked[0] == 3

def test_starmap_with_index_completed(self):
scheduler = TestScheduler()
invoked = [0]
Expand Down Expand Up @@ -438,6 +486,130 @@ def projection(x, y, index):
assert xs.subscriptions == [subscribe(200, 400)]
assert invoked[0] == 4

def test_starmap_with_index_default_mapper(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_completed(400),
on_next(410, (-1, -10)),
on_completed(420),
on_error(430, "ex"),
)

def factory():
return xs.pipe(ops.starmap_indexed())

results = scheduler.start(factory)
assert results.messages == [
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_completed(400),
]

assert xs.subscriptions == [subscribe(200, 400)]

def test_starmap_with_index_not_completed(self):
scheduler = TestScheduler()
invoked = [0]
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
)

def factory():
def projection(x, y, index):
invoked[0] += 1
return (x + 1) + (y + 10) + (index * 100)

return xs.pipe(ops.starmap_indexed(projection))

results = scheduler.start(factory)
assert results.messages == [
on_next(210, 55),
on_next(240, 144),
on_next(290, 233),
on_next(350, 322),
]
assert xs.subscriptions == [subscribe(200, 1000)]
assert invoked[0] == 4

def test_starmap_with_index_error(self):
scheduler = TestScheduler()
ex = "ex"
invoked = [0]
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_error(400, ex),
on_next(410, (-1, -10)),
on_completed(420),
on_error(430, "ex"),
)

def factory():
def projection(x, y, index):
invoked[0] += 1
return (x + 1) + (y + 10) + (index * 100)

return xs.pipe(ops.starmap_indexed(projection))

results = scheduler.start(factory)

assert results.messages == [
on_next(210, 55),
on_next(240, 144),
on_next(290, 233),
on_next(350, 322),
on_error(400, ex),
]
assert xs.subscriptions == [subscribe(200, 400)]

def test_starmap_with_index_mapper_throws(self):
scheduler = TestScheduler()
invoked = [0]
ex = "ex"
xs = scheduler.create_hot_observable(
on_next(180, (5, 50)),
on_next(210, (4, 40)),
on_next(240, (3, 30)),
on_next(290, (2, 20)),
on_next(350, (1, 10)),
on_completed(400),
on_next(410, (-1, -10)),
on_completed(420),
on_error(430, "ex"),
)

def factory():
def projection(x, y, index):
invoked[0] += 1
if invoked[0] == 3:
raise Exception(ex)
return (x + 1) + (y + 10) + (index * 100)

return xs.pipe(ops.starmap_indexed(projection))

results = scheduler.start(factory)
assert results.messages == [
on_next(210, 55),
on_next(240, 144),
on_error(290, ex),
]
assert xs.subscriptions == [subscribe(200, 290)]
assert invoked[0] == 3


if __name__ == "__main__":
unittest.main()

0 comments on commit cab78b4

Please sign in to comment.