天道酬勤,学无止境

rx-java2

Caching network calls using RxJava for some duration

I am making a network using Retorfit + RxJava2 and I want to cache the response for 30 seconds. Any calls made after 30 seconds interval should get the latest results from server. I tried doing this using Replay operator but it still makes a network call every time I call subscribe. I am not an expert in RxJava so maybe my understanding of using Replay for caching like that is wrong. public Observable<Name> getName() { return retrofitBuilder.getName() .subscribeOn(Schedulers.io()) .replay(30, TimeUnit.SECONDS,Schedulers.io()) .autoConnect(); } and I am calling the above code like this: service

2021-09-09 01:49:12    分类:问答    android   caching   retrofit2   rx-java2   replay

RxJava2 - execute call synchronously

I've a TestService, where I do an async task to get my data. I would like to wait for the response before I continue. public List<Data> getData() { List<Data> data = new ArrayList<>(); Disposable disposable = repository.getDataFromApi(false) .observeOn(AndroidSchedulers.mainThread()) .subscribe( newData -> { data.addAll(newData); }, __ -> { }); mCompositeDisposable.add(disposable); //Here I want to stop till "Data" arraylist is filled with data ... do something with data } In Volley I could just call req.executeSynchronously(); to make it happen. As getData() have to return data already, I've

2021-09-08 05:45:47    分类:问答    android   rx-java2

RxJava: how do you flush a timed buffer?

I'm creating this PublishProcessor that save to the database its element every 10 seconds: val publishProcessor = PublishProcessor.create<Entity>() publishProcessor .buffer(10, SECONDS) .observeOn(Schedulers.io()) .subscribe( { saveToDatabase(it) }, { Log.e("TAG", "Error: $it") }) .addTo(compositeDisposable) When my activity finish, I want to flush everything that is in the current buffer, and not wait 10 seconds. How do I do that?

2021-09-07 15:00:27    分类:问答    rx-java   rx-java2   rx-android

RxJava 2: flatMapCompletable is not waiting for the Completable to finish

The use case: There is a dataset of a POJO class. In this case it is an ArrayList. I have created a class which extends ArrayList with some helper methods which divides the list into pages. A page just divides the list into small pages for uploading to a REST API. I have created an Iterable from an Iterator, which checks whether a page is available for uploading and then fetches that page. The page needs to uploaded and on a success event of the upload, it will be removed and then again the next page needs to uploaded. This process must continue serially uploading every page from that list

2021-09-07 07:33:40    分类:问答    android   rx-java   retrofit2   rx-java2   rx-android

RxJava: How to multicast a Completable?

I have a method which returns a Completable and I want it to be multicasted because any second subscriber should not re-execute the method, instead they should get the same old emitted values. I achieved it using replay().autoConnect() as follows and working as expected public Completable init() { return repository.init() .subscribeOn(Schedulers.io()) .flatMapCompletable(s -> Completable.fromAction(() -> { // some action })).toObservable().replay().autoConnect().ignoreElements(); } As you see, the Completable I'm converting to Observable and after applying replay().autoConnect() again back to

2021-09-06 19:18:40    分类:问答    android   rx-java2

RetryWhen is giving some issues when using with Single

I am trying to hit an API call using retrofit and receive the response from the call. I am using the Single of Rxjava to get the response. What I need to do is that a retry if the call fails. I have gone through lots of examples but it seems none could have been a help (Also because of my limited knowledge on RXjava and Kotlin). Below is the function which does the call and the retryWhen function I wrote fun testFetch(): Single<ResponseBody>{ return retrofitService.getCTLNetworkService().test().retryWhen { val ato = AtomicInteger() it.takeWhile { ato.incrementAndGet() < 4 }.flatMap { Flowable

2021-09-06 13:15:06    分类:问答    android   kotlin   rx-java   rx-java2

State machine using RxJava?

I'm trying to go all in on RxJava and solve this problem I have with it, but it seems very unsuited for it, as RxJava seems to not want to deal with having any sort of state, rather only passing along events and mutating them to handle them. The basic state machine behavior I'm trying to emulate with RxJava is this: On app start event, wait for the next app pause. On app pause event, start a 15 minute timer and wait for the next app resume. If the app resumes within the timer, cancel it and go back to step 1. On app resume event, if the 15 minute timer has elapsed, refresh and go back to step

2021-09-06 02:12:16    分类:问答    android   rx-java   rx-java2

OkHttp Authenticator sometimes does not call authenticate on HTTP 401

I have an issue that OkHttp's Authenticator does not call the authenticate method in some cases of HTTP 401 errors. Interceptors are always called, but authenticator is sometimes called and sometimes not. Once it does not call it it will not call it for several minutes and all network calls will fail with HTTP 401. Then after 5-6 minutes it will "unstuck" and call the authenticate method again. I use it for refreshing token. If the token expiration is set to one hour, there are no issues, but when we set the expiration to one minute (for testing) the authenticator stops working. When quickly

2021-09-05 06:01:09    分类:问答    authentication   retrofit2   rx-java2   okhttp3   authenticator

Axon 4: EventSourcingHandler not triggered when applying event from a different thread

I've encountered a little issue with command handling in Axon 4. Let say I have an aggregate that need to call an external service when handling a command. The external service uses an asynchronous client (vertx tcp client + rxjava), so the response is given in a different thread than the one that created the aggregate instance. I want to apply an event given the result of my service, but it does not work because the AggregateLifecycle.apply() call is on a different thread... How can I "transfert" the scope of the aggregate ? Here is a little exemple of what I want to do (uses rxjava 2 and

2021-09-04 14:12:03    分类:问答    rx-java2   axon

RXjava and Retrofit to handle empty data response

I use Rxjava and Retrofit to handle the response from an HTTP server. Most of the time, data response is in this format: { code: 'OK', data: {.....} } And sometimes, the data property doesn't exist: { code: 'OK' } I make a BaseResponse class. public class BaseResponse<T> implements Serializable { private String code; private String msg; private T data; public String getCode() { return code; } public String getMsg() { return msg; } public T getData() { return data; } } And make a Map function to parse it: public static <T> Function<BaseResponse<T>, T> applyUnwrapResponse () { return new

2021-09-03 00:23:07    分类:问答    android   rx-java   rx-android   rx-java2