If Comparator passed to Transformers.orderedMergeWith throws exception, the result observable produces neither more results nor any terminal event. Basically the following code just blocks forever.
How to reproduce:
try {
System.out.println("started");
Observable<Integer> o1 = Observable.range(0, 10000);
Observable<Integer> o2 = Observable.range(0, 10000);
Comparator<Integer> comparator = new Comparator<Integer>() {
AtomicInteger i = new AtomicInteger(0);
@Override
public int compare(Integer o1, Integer o2) {
if (i.getAndIncrement() > 1000) {
throw new RuntimeException("failed");
}
return o1 - o2;
}
};
Observable<Integer> res = o1.compose(Transformers.orderedMergeWith(o2, comparator));
System.out.println("finished " + res.toList().toBlocking().single().size());
} catch (Exception e) {
e.printStackTrace();
}
@davidmoten can you please take a look?
If Comparator passed to Transformers.orderedMergeWith throws exception, the result observable produces neither more results nor any terminal event. Basically the following code just blocks forever.
How to reproduce:
@davidmoten can you please take a look?