天道酬勤,学无止境

How to use flatMap in Android RxJava to make three web service call sequentially without using lambda function?

My API Client

public class ApiClient {
    public static final String BASE_URL = "http://baseurl.com/wp-json/";
    private static Retrofit retrofit = null;


    public static Retrofit getClient() {
        if (retrofit==null) {
            retrofit = new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }
}

My endpoint interface

    public interface ApiInterface {

        @GET("posts/category/politics/recent-stories/")
        Observable<ArrayList<ModelNewsPaged>> getPoliticsArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);

        @GET("posts/category/economy/recent-stories/")
        Observable<ArrayList<ModelNewsPaged>> getEconomyArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);

        @GET("posts/category/sports/recent-stories/")
        Observable<ArrayList<ModelNewsPaged>> getSportsArrayListObservable(@Query("per_page") int perPage, @Query("page") int page);
    }

How to make the web service call sequentially to getPoliticsArrayListObservable first and then getEconomyArrayListObservable and getSportsArrayListObservable using flatMap & without using lambda function?

I tried something like this:

final ApiInterface apiInterface = ApiClient.getClient().create(ApiInterface.class);

        Observable.just(apiInterface)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Function<ApiInterface, Observable<ArrayList<ModelNewsPaged>>>() {
                    @Override
                    public Observable<ArrayList<ModelNewsPaged>> apply(ApiInterface apiInterface) throws Exception {
                        return apiInterface.getPoliticsArrayListObservable(10,1);
                    }
                })
                .subscribe(new Observer<ArrayList<ModelNewsPaged>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d("flatMap", "onComplete");
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d("flatMap", "onError");
                    }

                    @Override
                    public void onNext(ArrayList<ModelNewsPaged> integer) {
                        Log.d("flatMap", "Final result: " + integer.toString());
                    }
                });

But the problem here is, I'm unable to chain any more flatMap methods. Any help will be much appreciated.

评论

You can use Concat operator of RxJava to make the API call sequentially. If you don't care about the sequence you can use Zip.

Well, I might have done it something like this. The API calls should be treated as Single if you are not keeping an open connection with them.

apiService.getDailyCounts()
          .flatMap { apiService.updateCounts() }
          .flatMap { apiService.changeTrialCourtCaseName() }
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe()

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • 如何在 Android RxJava Observable 中按顺序运行 2 个查询?(How to run 2 queries sequentially in a Android RxJava Observable?)
    问题 我想运行 2 个异步任务,一个然后是另一个(按顺序)。 我读过一些关于 ZIP 或 Flat 的东西,但我不太明白…… 我的目的是从本地 SQLite 加载数据,当它完成时,它将查询调用到服务器(远程)。 有人可以建议我,一种实现这一目标的方法吗? 这是我正在使用的 RxJava Observable 框架(单个任务): // RxJava Observable Observable.OnSubscribe<Object> onSubscribe = subscriber -> { try { // Do the query or long task... subscriber.onNext(object); subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } }; // RxJava Observer Subscriber<Object> subscriber = new Subscriber<Object>() { @Override public void onCompleted() { // Handle the completion } @Override public void onError(Throwable e) { // Handle the error }
  • 在 android 中使用 RxJava 和 Retrofit 处理列表(Handling lists with RxJava and Retrofit in android)
    问题 我有几个 api 调用(顺序,异步),其中一些返回列表。 我的api接口如下。 @GET("/users/settings") Observable<UserWrapper> getUserSettings(); @GET("/{username}/collections") Observable<List<Item>> getItems(@Path("username") String userName); @GET("/item/{id}") Observable<ItemInfo> getItemInfo(@Path("id") int id); @GET("/{username}/friends") Observable<List<Friend>> getFriends(@Path("username") String userName); 这是我想要按顺序执行的操作: 通过调用getUserSettings()获取UserWrapper 通过调用saveUser(userWrapper)保存用户通过调用getItems(userWrapper.getUserName())获取用户的物品通过调用getItemInfo(item.getId())获取每个项目的信息通过调用saveItem(itemInfo)保存每个itemInfo 通过调用getFriends
  • 使用 RxJava 和 Retrofit 链接两个 Web 服务调用(Chaining two web service calls using RxJava and Retrofit)
    问题 我正在使用 RxJava 和 Retrofit。我的基本要求是,我想链接两个 api 调用,它们将一个接一个地被调用。 在调用第二个 api 时,将从第一个 api 收到的响应用作输入。 在互联网上阅读了一些东西后,我曾经使用 flatmap 来实现这一点。 在执行此操作时,我正在显示加载程序。有时它运行顺利,但在某些情况下此加载程序冻结。 DDMS 显示“跳过 300 帧,应用程序可能在其主线程上做了太多工作”的日志。 我怀疑我的一个网络调用正在主线程上运行。 我无法弄清楚如何链接这两个调用,以便可以在后台顺利调用它们而不会妨碍我的主线程。 任何帮助是极大的赞赏 。 提前致谢 这是我迄今为止尝试过的 private CompositeSubscription mSubscriptions = new CompositeSubscription(); Subscription subscription = Observable.just(getAddress()) .subscribeOn(Schedulers.newThread()) .flatMap(address -> mPlatformApi.secondWebService(address.getLatitude(),address.getLongitude()) .observeOn
  • Handling lists with RxJava and Retrofit in android
    I have couple of api calls to make (sequentially, asynchronously) and some of them returns lists. My api interface is given below. @GET("/users/settings") Observable<UserWrapper> getUserSettings(); @GET("/{username}/collections") Observable<List<Item>> getItems(@Path("username") String userName); @GET("/item/{id}") Observable<ItemInfo> getItemInfo(@Path("id") int id); @GET("/{username}/friends") Observable<List<Friend>> getFriends(@Path("username") String userName); Here's what I want to do sequentially: Get UserWrapper by calling getUserSettings() Save the user by calling saveUser(userWrapper
  • Chaining two web service calls using RxJava and Retrofit
    I am using RxJava and Retrofit.My basic requirement is,i want to chain two api calls, which will get called one after one another. Response received from first api is used as input while calling second api. After reading some stuff on internet i used to flatmap to achieve this. While carrying out this operation i am showing loader.Sometimes it runs smoothly but on some occasions this loader freezes. DDMS shows log of "skipped 300 frames,Application may be doing too much work on its main thread". I suspect one of my network call is running on main thread. I am not able to figure out how to
  • 如何使用 RxJava 处理分页?(How to handle paging with RxJava?)
    问题 我正在考虑将我的 android 应用程序转换为使用 Rxjava 进行网络请求。 我目前访问的网络服务类似于: getUsersByKeyword(String query, int limit, int offset) 据我了解,Observables 是一个“推”而不是“拉”接口。 所以这就是我如何理解要解决的问题: 应用向服务注册,获取 Observable 以供查询结果被推送到应用程序应用程序处理结果当应用程序想要更多结果时......? 这就是我崩溃的地方。 以前,我只会向网络服务询问我想要的内容,然后使用偏移量再次进行查询。 但在这种情况下,这将涉及创建另一个 Observable 并订阅它,这有点违背了这一点。 我应该如何处理我的应用程序中的分页? (这是一个 android 应用程序,但我认为这无关紧要)。 回答1 这是硬摇滚!)所以我们有网络请求: getUsersByKeyword(String query, int limit, int offset) 例如这个请求返回List< Result > 如果我们使用 RetroFit 进行网络连接,该请求将如下所示: Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset) 因此
  • RxJava调度程序的用例(Use cases for RxJava schedulers)
    问题 在RxJava中,有5种不同的调度程序可供选择: Immediate() :创建并返回一个Scheduler,该Scheduler立即在当前线程上执行工作。 trampoline() :创建并返回一个调度程序,该调度程序在当前工作完成后将要在当前线程上执行的工作排队。 newThread() :创建并返回一个Scheduler,该Scheduler为每个工作单元创建一个新的Thread。 Calculation() :创建并返回用于计算工作的Scheduler。 这可以用于事件循环,处理回调和其他计算工作。 不要在此调度程序上执行IO绑定工作。 使用计划程序。 io()代替。 io() :创建并返回用于IO绑定工作的Scheduler。 该实现由Executor线程池支持,该线程池将根据需要增长。 这可用于异步执行阻塞IO。 不要在此调度程序上执行计算工作。 使用计划程序。 Calculation()代替。 问题: 前三个调度程序很容易说明。 但是,我对计算和io有点困惑。 什么是“ IO绑定工作”? 它用于处理流( java.io )和文件( java.nio.files )吗? 它用于数据库查询吗? 它用于下载文件或访问REST API吗? 如何计算()从newThread不同的()? 是否所有的Calculation()调用都在单个(后台)线程上,而不是每次都在一个新的
  • RxJava和观察者代码的并行执行(RxJava and parallel execution of observer code)
    问题 我正在使用RxJava Observable api使用以下代码: Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); observable .buffer(10000) .observeOn(Schedulers.computation()) .subscribe(recordInfo -> { _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId()); for(Info info : recordInfo) { // some I/O operation logic } }, exception -> { }, () -> { }); 我的期望是观察代码,即subscribe()方法中的代码,将在我指定了计算调度程序后并行执行。 相反,该代码仍在单线程上按顺序执行。 如何使用RxJava api使代码并行运行。 回答1 当涉及到异步/多线程方面时,RxJava常常被误解。 多线程操作的编码很简单,但是了解抽象是另一回事。 关于RxJava的一个常见问题是如何实现并行化,或如何从Observable同时发出多个项目。
  • Android 程序员的技术栈大全
    AndroidAll 项目地址:chiclaim/AndroidAll 简介: Android 程序员的技术栈大全 更多:作者 提 Bug 标签: 内容涵盖绝大部分 Android 程序员所需要的技能:「设计模式」「Flutter」「ReactNative」「Kotlin」「RxJava」「Dagger2」「Retrofit」「OkHttp」「ButterKnife」「Router」「NDK」「Android 架构」「数据结构与算法」「自定义 View」「性能优化」「Android 源码分析」 「经典书籍」等。 我编写了一份详细的 Android 技术栈思维导图,由于 GitHub 图片国内展示不稳定,所以下面使用的树形结构目录。你可以点击查看详细的 -> 思维导图 如果您有任何问题可以提 Issues ,本项目也欢迎各位开发者朋友来分享自己的一些想法和实践经验,欢迎 Pull Request。 计算机基础 协议 OSI 模型、TCP/IP 模型HTTP / HTTP2 / HTTPS操作系统 Unix / LinuxWindows数据结构与算法 Java Java 基础 面向对象思想类和接口注解与反射泛型多线程 多线程通信 volatile / synchronizedawait / notify / notifyAll线程池 线程池入门 Callable 和
  • Rx Observable 周期性地发射值(Rx Observable emitting values periodically)
    问题 我必须定期轮询一些 RESTful 端点以刷新我的 android 应用程序的数据。 我还必须根据连接情况暂停和恢复它(如果手机处于离线状态,甚至无需尝试)。 我当前的解决方案正在运行,但它使用标准 Java 的ScheduledExecutorService来执行定期任务,但我想留在 Rx 范式中。 这是我当前的代码,为简洁起见,跳过了其中的一部分。 userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() { @Override public void call(final Subscriber<? super UserProfile> subscriber) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); final Runnable runnable = new Runnable() { @Override public void run() { // making http request here } }; final List<ScheduledFuture<?>> futures = new
  • Retrofit + RxJava中的链接请求(Chaining requests in Retrofit + RxJava)
    问题 我有2个API,我想按顺序发出请求并将它们的数据存储在SQLite中。 首先,我想向API A发出请求,并将其数据存储在SQL表a 。 然后向API B发出请求,并将其数据存储在表b并将一些数据存储在表a_b 。 存储在a_b的数据仅来自请求B 我如何使用RxJava做到这一点。 我读过一些关于为此使用flatMap的信息,像这样 apiService.A() // store in DB here? How? maybe use map()? .flatMap(modelA -> { // or maybe store modelA in DB here? return apiService.B().map(modelB -> { storeInDB()l // store B here ? return modelB; }); }); 如果我不使用lambda函数,则它看起来像普通的嵌套调用一样丑陋。 这是更好的方法吗? 回答1 我不认为使用map运算符是处理诸如存储api调用的结果之类的最佳方法。 我想做的是将doOnNext运算符中的这些内容分开。 因此,您的示例将如下所示: apiService.A() .doOnNext(modelA -> db.store(modelA)) .flatMap(modelA -> apiService.B()) .doOnNext
  • Retrofit和RxJava:如何合并两个请求并获得两个结果的访问权?(Retrofit and RxJava: How to combine two requests and get access to both results?)
    问题 我需要提出两个服务请求并将其结果合并: ServiceA()=> [{"id":1,"name":"title"},{"id":1,"name":"title"}] ServiceB( id )=> {"field":"value","field1":"value"} 目前,我已经设法合并了结果,但是我需要将id作为参数传递给ServiceB并获得对第一个结果的访问权限。 到目前为止我尝试过的是: Retrofit repo = new Retrofit.Builder() .baseUrl("https://api.themoviedb.org/3/genre/") .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); Observable<GenerosResponse> Genres = repo .create(services.class) .getAllGeneros("movie","list","da0d692f7f62a1dc687580f79dc1e6a0") .subscribeOn(Schedulers.newThread()) .observeOn
  • 如何使用RxJava-Android等待多个嵌套的异步调用?(How to wait multiple nested async calls by using of RxJava-Android?)
    问题 我是RxJava的新手,这是我的情况, 发送请求A并返回List<A> 对于每个A,发送请求AA,然后返回AA,然后将A和AA绑定有逻辑相似的B&BB 仅在所有请求完成后才做某事 例子: request(url1, callback(List<A> listA) { for (A a : listA) { request(url2, callback(AA aa) { a.set(aa); } } } A和B是独立的 代码的结构如何? 我还使用Retrofit作为网络客户端。 回答1 好的,我认为这应该可以解决您的问题的第一部分: 请注意,对flatMap的第二次调用给了2个参数-有一个flatMap版本,它不仅为每个输入项生成一个Observable,而且还带了第二个函数,该函数又将结果Observable中的每个项与相应的输入进行组合物品。 看一下该标题下的第三幅图形,以直观地了解: https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap-concatmap-and-flatmapiterable Observable<A> obeservableOfAs = retrofitClient.getListOfAs() .flatMap(new Func1<List<A>
  • Android:使用 Retrofit 轮询服务器(Android: Polling a server with Retrofit)
    问题 我正在 Android 上构建 2 Player 游戏。 游戏是轮流进行的,所以玩家 1 一直等到玩家 2 输入,反之亦然。 我有一个网络服务器,我在其中运行带有 Slim 框架的 API。 在客户端上,我使用 Retrofit。 因此,在客户端上,我想每 X 秒轮询一次我的网络服务器(我知道这不是最好的方法),以检查是否有来自玩家 2 的输入,如果是,则更改 UI(游戏板)。 处理 Retrofit 我遇到了 RxJava。 我的问题是弄清楚我是否需​​要使用 RxJava? 如果是的话,是否有任何非常简单的改装轮询示例? (因为我只发送了几个键/值对)如果不是,如何通过改造来实现呢? 我在这里找到了这个线程,但它也没有帮助我,因为我仍然不知道我是否需要 Retrofit + RxJava,有没有更简单的方法? 回答1 假设您为 Retrofit 定义的接口包含这样的方法: public Observable<GameState> loadGameState(@Query("id") String gameId); 可以通过以下三种方式之一定义改造方法: 1.) 一个简单的同步: public GameState loadGameState(@Query("id") String gameId); 2.) 一个采用Callback进行异步处理的: public void
  • 为什么 RxJava 经常与 Retrofit 一起使用?(Why is RxJava often used with Retrofit?)
    问题 将 Retrofit 与 Rxjava 结合使用有什么好处? 回答1 问题 改造已经在后台线程上运行。 那为什么还需要另一个后台任务 RxJava? 我认为最重要的是,避免嵌套回调( callback hell )。 例如)回调地狱(改造) public interface MyService { @GET("users") Call<List<UserModel>> getUser(); @GET("userinfo") Call<UserInfoModel> getUserInfoById(@Query("id") Integer id); } service.getUser().enqueue(new Callback<UserModel>() { @Override public void onResponse(Call<UserModel> call, Response<UserModel> response) { //process UserModel UserModel data = response.body(); //if you want user infomation from server service.getUserInfo(data.getId()).enqueue(new Callback<UserInfoModel>(){ //... is
  • 如何忽略错误并继续无限流?(How to ignore error and continue infinite stream?)
    问题 我想知道如何忽略异常并继续无限流(在我的情况下是位置流)? 我正在获取当前用户位置(使用Android-ReactiveLocation),然后将其发送到我的API(使用Retrofit)。 在我的情况下,当在网络调用期间发生异常(例如超时)时,将调用onError方法,并且流将自行停止。 如何避免呢? 活动: private RestService mRestService; private Subscription mSubscription; private LocationRequest mLocationRequest = LocationRequest.create() .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY) .setInterval(100); ... private void start() { mRestService = ...; ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this); mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest) .buffer(50)
  • RxJava中的concatMap和flatMap有什么区别(What is the difference between concatMap and flatMap in RxJava)
    问题 似乎这两个功能非常相似。 它们具有相同的签名(接受rx.functions.Func1<? super T, ? extends Observable<? extends R>> func ),并且它们的大理石图看起来完全相同。 无法在此处粘贴图片,但这是用于concatMap的图片,这是用于flatMap的图片。 似乎有在所得的描述一些细微的差别Observable ,其中一个由产生concatMap包含的项目,从concatinating所得观测量,并通过所产生的一个结果flatMap包含的项目,从第一合并所得的观测量,并且发射的结果结果那合并。 但是,这种微妙之处对我来说还不清楚。 任何人都可以对这种差异进行更好的解释,理想情况下可以提供一些示例来说明这种差异。 回答1 如您所写,这两个函数非常相似,而细微的差别是创建输出的方式(应用映射功能之后)。 平面地图使用合并运算符,而concatMap使用concat运算符。 如您所见,concatMap输出序列是有序的-第一个Observable发出的所有项目都在第二个Observable发出的所有项目之前发出, 合并flatMap输出序列时-合并的Observable发出的项目可能以任何顺序出现,而不管它们来自哪个源Observable。 回答2 即使这里的答案很好,但没有示例也不容易发现差异。 因此
  • CompletableFuture,Future和RxJava的Observable之间的区别(Difference between CompletableFuture, Future and RxJava's Observable)
    问题 我想知道CompletableFuture , Future和Observable RxJava之间的区别。 我所知道的都是异步的,但是 Future.get()阻塞线程 CompletableFuture提供了回调方法 RxJava Observable与CompletableFuture类似,但有其他好处(不确定) 例如:如果客户端需要进行多个服务调用,并且当我们使用Futures (Java)时, Future.get()将按顺序执行。 并且文档http://reactivex.io/intro.html说 使用Future来最佳地组合条件异步执行流是困难的(或者是不可能的,因为每个请求的等待时间在运行时会有所不同)。 当然可以做到这一点,但是很快就会变得复杂(因此容易出错),或者过早地在Future.get()上阻塞,这消除了异步执行的好处。 真正有兴趣了解RxJava如何解决此问题。 我发现很难从文档中了解。 回答1 期货 在Java 5(2004)中引入了期货。 对于尚未完成的操作,它们基本上是占位符。 操作完成后, Future将包含该结果。 例如,操作可以是提交给ExecutorService的Runnable或Callable实例。 操作的提交者可以使用Future对象检查该操作是否为isDone(),或使用阻塞的get()方法等待操作完成。 例子: /**
  • Andorid rxJava:如何从缓存中获取数据并同时在后台更新它?(Andorid rxJava: how to get data from cache and and the same time update it in the background?)
    问题 我刚开始学习rxJava for Android 并想实现常见的用例: 从缓存中请求数据并显示给用户从网络请求数据服务器更新存储中的数据并自动显示给用户 传统上最好的方案是使用CursorLoader从缓存中获取数据,在单独的线程中运行 Web 请求并通过内容提供程序将数据保存到磁盘,内容提供程序自动通知侦听器和CursorLoader自动更新 UI。 在 rxJava 中,我可以通过运行两个不同的观察者来实现,如下面的代码所示,但我没有找到如何将这两个调用组合到一个调用中以达到我的目标的方法。 谷歌搜索显示了这个线程,但看起来它只是从缓存中获取数据或从 Web 服务器获取数据,但不要同时执行 RxJava 和缓存数据 代码片段: @Override public Observable<SavingsGoals> getCachedSavingsGoal() { return observableGoal.getSavingsGoals() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } @Override public Observable<SavingsGoals> getRecentSavingsGoal() { return api.getSavingsGoals(
  • 使用 RxJava 链接两个改造可观察量(Chain two retrofit observables w/ RxJava)
    问题 我想一个接一个地执行 2 个网络调用。 两个网络调用都返回 Observable。 从第一个呼叫,方法在第二个呼叫的成功的结果的成功的结果第二呼叫使用数据从第二个呼叫的第一个的两个成功的结果,并使用数据。 此外,我应该能够以不同的方式处理两个onError“事件”。 我如何才能避免回调地狱,如下例所示: API().auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<AuthResponse>() { @Override public void call(final AuthResponse authResponse) { API().getUser(authResponse.getAccessToken()) .subscribe(new Action1<List<User>>() { @Override public void call(List<User> users) { doSomething(authResponse, users); } }, new Action1<Throwable>() { @Override public void call(Throwable