Skip to content Skip to sidebar Skip to footer

How To Combine Multiple Rxjava Chains Non-blocking In Error Case

My requirements: N Retrofit calls in parallel Wait for all calls to finish (success or failure) If k (0<= k < N) calls fail, they should NOT block the others. Imagine fail

Solution 1:

  1. You can not achieve parallel via combineLast or zip, rxjava will execute and emit your items in sequence in my testing.

  2. If one of your task fail, your Func2#call will not get called and onError will submitted instead. You even can not get the results of other successful tasks in this way.

  3. The solution is flatMap, it's the traditional way to achieve concurrent in rxjava. It also meet your other requirements.

Here is a small but completed example.

I use a simple website service to test.

I use a Semaphore to wait for all task done, you can completely ignore it. And I add logging to the http request for better understanding, you can complete ignore it also.

publicinterfaceWebsiteService {

    @GET
    Observable<ResponseBody> website(@Url String url);

}

Then I use the following to test the result with rxjava.

HttpLoggingInterceptorloggingInterceptor=newHttpLoggingInterceptor();
    loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);

    Retrofitretrofit=newRetrofit.Builder().baseUrl("https://www.google.com")
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .client(newOkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
            .build();
    WebsiteServicewebsiteService= retrofit.create(WebsiteService.class);

    finalSemaphores=newSemaphore(1);
    try {
        s.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    Observable<ResponseBody> first = websiteService.website("http://github.com");
    Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
    Observable<ResponseBody> third = websiteService.website("http://notexisting.com");

    finalintnumberOfCalls=3; // testing for three calls

    Observable.just(first, second, third)
            .flatMap(newFunction<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
                @Overridepublic ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable)throws Exception {
                    return responseBodyObservable.subscribeOn(Schedulers.computation());
                }
            })
            .subscribeOn(Schedulers.computation())
            .subscribe(newObserver<ResponseBody>() {

                privateintcurrentDoneCalls=0;

                privatevoidcheckShouldReleaseSemaphore() {
                    if (currentDoneCalls >= numberOfCalls) {
                        s.release();
                    }
                }

                @OverridepublicvoidonSubscribe(@NonNull Disposable d) {

                }

                @OverridepublicvoidonNext(@NonNull ResponseBody responseBody) {
                    System.out.println("Retrofit call success " + responseBody.contentType());
                    synchronized (this) {
                        currentDoneCalls++;
                    }
                    checkShouldReleaseSemaphore();
                }

                @OverridepublicvoidonError(@NonNull Throwable e) {
                    System.out.println("Retrofit call failed " + e.getMessage());
                    synchronized (this) {
                        currentDoneCalls++;
                    }
                    checkShouldReleaseSemaphore();
                }

                @OverridepublicvoidonComplete() {
                    System.out.println("onComplete, All request success");
                    checkShouldReleaseSemaphore();
                }

            });

    try {
        s.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("All request done");
        s.release();
    }

I use rxjava2 and retrofit adapter-rxjava2 for testing.

compile'io.reactivex.rxjava2:rxandroid:2.0.1'compile'io.reactivex.rxjava2:rxjava:2.1.0'compile'com.squareup.retrofit2:retrofit:2.3.0'compile'com.squareup.retrofit2:adapter-rxjava2:2.3.0'compile'com.squareup.okhttp3:logging-interceptor:3.8.1'

Updated

The introduction page of RxJava2 from github has pointed out the practical way to implement paralellism.

Practically, paralellism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this...

Although this example is based on RxJava2, the operation flatMap is already existing in RxJava.

Solution 2:

I think in your use case Zip operator it´s more suitable

Here you can see running in the main thread, but also it´s possible make it run every one of them in another thread if you use observerOn

/**
 * Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
  */
@Test
publicvoidtestAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async in:", start, result));
}



private Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

private Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

private Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "!");
}

You can see more examples here https://github.com/politrons/reactive

Solution 3:

You can use Observable.mergeDelayError(api1Call, api2Call, api3Call).

Bonus: You can also specify how many maximum parallels calls can be run at the same time. For example:

Observable .mergeDelayError(Observable.from(api1Call, api2Call, api3Call), 5).

Solution 4:

  1. For multiple parallel calls in Retrofit, we need to set it up from OkHttp layer when initializing Retrofit. Please check this.
  2. If you use combineLatest or zip operators in this case (for Retrofit call), each call only emits an item. So, both of operators will wait for all calls to finish. So, we do not need to worry about this point. For more information, check out combineLatest and zip.
  3. If you meant 1 call fail about the RxJava stream error, this error will be thrown, none of combined item will be emitted. But 1 call fail is http request fail, the stream always emit one item when 3 calls finish. we can not use combineLast or zip operator here.

Solution 5:

Thanks to @TinTran and this, here is the correct solution: (I can't put up the exact syntax for Retrofit Observables now but that shouldn't matter, logic remains the same Retrofit or not)

Observable.mergeDelayError(getData1(), getData2()).doAfterTerminate(newAction0() {
            @Overridepublicvoidcall() {
                Logger.i("end of all streams");
                tvTheText.setText("all streams finished");
            }
        }).subscribe(newPrintSubscriber<>("merge" +
                " delay w error"));

The observables (Retrofit ones should work the same way):

privateObservable<String> getData1() {
        returnObservable.create(newObservable.OnSubscribe<String>() {
            @Overridepublicvoidcall(Subscriber<? superString> singleSubscriber) {
                try {
                    long responseTime = 120 + newRandom().nextInt(30);
                    Thread.sleep(responseTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                singleSubscriber.onNext("data 1");
                singleSubscriber.onCompleted();
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io());
    }

    privateObservable<String> getData2() {
        returnObservable.create(newObservable.OnSubscribe<String>() {
            @Overridepublicvoidcall(Subscriber<? superString> singleSubscriber) {
                try {
                    long responseTime = 100 + newRandom().nextInt(19);
                    Thread.sleep(responseTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                singleSubscriber.onError(newException());// this one never blocks the other Observables' streams
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io());
    }

Output logs:

10-24 15:27:23.335 D:┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────10-24 15:27:23.335 D:Thread:main10-24 15:27:23.335 D:├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄10-24 15:27:23.335 D:SafeSubscriber.onNext(SafeSubscriber.java:134)10-24 15:27:23.335 D:PrintSubscriber.onNext(PrintSubscriber.java:32)10-24 15:27:23.335 D:├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄10-24 15:27:23.336 D:mergedelaywerror-onNext-data110-24 15:27:23.336 D:└────────────────────────────────────────────────────────────────────────────────────────────────────────────────10-24 15:27:23.342 V:onError(e=java.lang.Exception)10-24 15:27:23.342 V:onError [0ms]
10-24 15:27:23.343 I:┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────10-24 15:27:23.343 I:Thread:main10-24 15:27:23.343 I:├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄10-24 15:27:23.343 I:OperatorDoAfterTerminate$1.callAction(OperatorDoAfterTerminate.java:73)10-24 15:27:23.343 I:MainActivity$1.call(MainActivity.java:37)10-24 15:27:23.343 I:├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄10-24 15:27:23.344 I:endofallstreams10-24 15:27:23.344 I:└────────────────────────────────────────────────────────────────────────────────────────────────────────────────

Post a Comment for "How To Combine Multiple Rxjava Chains Non-blocking In Error Case"