天道酬勤,学无止境

rx-java2

RxJava order of execution confusion

I have this very simple RxJava example List<Integer> arrayIntegers = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5)); Observable.fromIterable(arrayIntegers).map(i -> { Log.d("RxJava", "map i = " + i); return i; }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()). subscribe(new DisposableObserver<Integer>() { @Override public void onNext(Integer i) { Log.d("RxJava", "next i = " + i); } @Override public void onError(Throwable e) {} @Override public void onComplete() { Log.d("RxJava", "Completed"); } }); Which gives this result.. D/RxJava: map i = 1 D/RxJava: map i = 2

2022-01-25 03:57:23    分类:问答    java   rx-java   rx-java2

android rxjava2/retrofit2 chaining calls with pagination token

I'm using a REST API to query for a list of Person objects. Max limit is 100 people in response. I need to fetch all people, and the total amount is unknown. There is a field in the first response called "next", containing url for the next page. I need to chain these calls using RxJava/RxAndroid and Retrofit until the last response has an empty "next" field. Since the "next" field contains a pagination url, all subsequent calls will have different url from the first one. What is the most convenient way to do this?

2022-01-25 02:11:27    分类:问答    android   pagination   retrofit   rx-android   rx-java2

Rx Observable onCompleted 在处理后调用(Rx Observable onCompleted called after disposed)

问题 我在我的 android 应用程序中使用 RxJava2。 我有一个Observable ,它在创建时被添加到CompositeDisposable中。 这个Observable正在从一些 API 中检索数据,它可能需要很长时间才能完成。 当我在下载时离开页面时,在onPause中,我正在处理这个CompositeDisposable ,但有时仍会调用Observable的onComplete 。 怎么会这样? 回答1 如 Doc 中所述, onTerminateDetach取消对上游生产者和下游的引用。 例如 mDisposable.add( apiClient.apiCallSomeAPI() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .concatMap(this::responseObservable) .toList() .onTerminateDetach() // this is important, exactly before subscribe .doOnSuccess(this::realmBulkOperation) .subscribe(this::handleResult, this::handleError) );

2022-01-24 15:57:17    分类:技术分享    android   rx-java2

如何正确停止 rxjava Flowable?(How to properly stop rxjava Flowable?)

问题 我有以下代码结构 服务 public Flowable entryFlow() { return Flowable.fromIterable(this::getEntries) } 消费者 void start() { disposable = service .entryFlow() .observeOn(Schedulers.computation()) .subscribeOn(Schedulers.computation()) .subscribe( entry -> ..., this::onError, this::subscriptionFinished); } void stop() { disposable.dispose(); } private void onError(Throwable e) { subscriptionFinished(); } private void subscriptionFinished() { // } 当调用 stop 方法时,我需要一种方法来阻止 flowable 获取和发送数据。 通过执行以下操作,我注意到并不总是调用 doOnCancel lambda。 void start() { disposable = service .entryFlow() .observeOn(Schedulers

2022-01-23 05:32:44    分类:技术分享    rx-java   rx-java2

如何使 Vertx Reactive RecordParser 发出没有长度的消息?(How to make a Vertx Reactive RecordParser emit message without length?)

问题 我有一个响应式 vertx TCPserver,它使用 RecordParser 来处理带有 2 字节长度标头的消息。 RecordParser 的工作是理解长度部分和消息部分。 问题 1 recordparser 正确解释了 recordparser 中的数据和系统输出。 但是当我添加以下行时,系统输出不再打印。 它可能与它在不同的线程上执行有关? 我不确定。 frameParser.toFlowable() .subscribe(onNext -> System.out.println(onNext)); 问题 2 一旦添加了问题 1 中的行,onNext 就会以块的形式打印它接收到的所有数据。 我想我明白,它在接收时转发数据。 输出 5bd34970-da5a-44e1-b167-0d32d2e70bb2 成功 ? [它是一个不可打印的字节 4] 12 34 我的最终目标是只获取没有长度标头的消息。 任何指针将不胜感激。 代码 // data that is sent by client byte[] b = { 0, 4, 0x31, 0x32, 0x33, 0x34 }; import io.reactivex.Flowable; import io.reactivex.Single; import io.vertx.core.net

2022-01-23 05:00:33    分类:技术分享    java   rx-java2   vert.x

Android中的请求总是给出套接字超时异常(Request in Android always giving socket timeout exception)

问题 当用户单击按钮时,我试图发出两个请求。 但是,该请求可能需要一些时间。 当用户处于省电模式并且屏幕锁定他的设备而请求仍在完成时,请求将不会完成并且会给出套接字超时异常。 我做了一个示例项目来试试这个,你可以在这里找到它。 我正在使用改造和 RxJava 来提出这样的请求: networkFactory.request() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( { result -> Log.d("TAG", "The value is this $result") }, { error -> Log.e("TAG", "Ohoh an error ${Log.getStackTraceString(error)}") }) 我的 networkFactory request() 是: fun request(): Observable<Doc> { return service.request(API_KEY) } 使用以下界面: @GET("articlesearch.json") fun request(@Query("api-key") apiKey : String) : Observable<Doc> 我在这里做错了吗?

2022-01-23 03:13:34    分类:技术分享    android   kotlin   retrofit2   rx-java2

Request in Android always giving socket timeout exception

I'm trying to make two requests when user clicks on a button. However, the request might take a little while. When the user is on battery save mode and screens lock his device while the request is still being done, the request will not complete and will give a socket timeout exception. I made a sample project to try this out and you can find it here. I'm using retrofit and RxJava to make my requests like this: networkFactory.request() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( { result -> Log.d("TAG", "The value is this $result") }, { error -> Log.e(

2022-01-22 21:16:47    分类:问答    android   kotlin   retrofit2   rx-java2

如何取消订阅广播接收器中的 RXjava 调用(How to unsubscribe to an RXjava call in a broadcast receiver)

问题 我正在使用 RXJava2 在广播接收器中发送电子邮件,我想知道何时应该取消订阅该事件。 代码基本上是: getSmsMmsObservable() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnError(throwable -> Timber.e(throwable, "Error sending mail.")) .map(smsMmsAddress1 -> { smsMmsAddress = smsMmsAddress1; return doInBackgroundSendEmail(); }) .map(stringSingle -> { mMsgResponse = stringSingle; this.done = true; return deleteFile(); }) .subscribe(success -> { if (success) { Toast.makeText(context, "Message Status: " + mMsgResponse, Toast.LENGTH_LONG).show(); } }); 我何时取消订阅(接收器中没有 onPause 或 onDestroy)以及我如何知道 onReceive 何时完成?

2022-01-22 20:38:06    分类:技术分享    android   broadcastreceiver   rx-java2

RxJava - “一次只允许一个发射沿着 Observable 链向上传播......”(RxJava - "Only one emission is allowed to travel up the Observable chain at a time...")

问题 我在这里阅读博客文章:http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html 据说 无论您订阅什么调度程序,一次只允许一个发射沿 Observable 运营商链上行。 下面,您可以观察到,发射必须从源一直推送到订阅者,然后才能开始下一个发射。 引用文本的正上方是一个示例,如下所示: public static void main(String[] args) { Observable<Integer> source = Observable.range(1,10); source.map(i -> i * 100) .doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName())) .observeOn(Schedulers.computation()) .map(i -> i * 10) .subscribe(i -> System.out.println("Received " + i + " on thread " + Thread.currentThread().getName())); sleep(3000); }

2022-01-22 19:41:52    分类:技术分享    rx-java   rx-java2   reactivex