天道酬勤,学无止境

project-reactor

Split a flux into two fluxes - head and tail

I want to split a flux into two fluxes where the first one has the first item of the original flux and the second one will takes the rest of items. After applying a custom transformation myLogic on each flux I want to combine them into one flux preserving the order of the original flux. Example: S: student S': student after applying myLogic Emitted flux: s1 -> s2 -> s3 -> s4 The first splited flux: s1' => myLogic The second splited flux: s2' -> s3' -> s4' => myLogic The combined flux: s1' -> s2' -> s3' -> s4'

2021-06-15 04:33:30    分类:问答    java   spring-boot   spring-webflux   project-reactor

Execute blocking JDBC call in Spring Webflux

I am using Spring Webflux with Spring data jpa using PostgreSql as backend db. I don't want to block the main thread while making db calls like find and save. To achieve the same, I have a main scheduler in Controller class and a jdbcScheduler service classes. The way I have defined them is: @Configuration @EnableJpaAuditing public class CommonConfig { @Value("${spring.datasource.hikari.maximum-pool-size}") int connectionPoolSize; @Bean public Scheduler scheduler() { return Schedulers.parallel(); } @Bean public Scheduler jdbcScheduler() { return Schedulers.fromExecutor(Executors

2021-06-14 09:18:50    分类:问答    java   spring-boot   spring-webflux   project-reactor

“Operator called default onErrorDropped” on Mono timeout

In my Production code, I am getting errors in my logs when a Mono times out. I have managed to recreate these errors with the following code: @Test public void testScheduler() { Mono<String> callableMethod1 = callableMethod(); callableMethod1.block(); Mono<String> callableMethod2 = callableMethod(); callableMethod2.block(); } private Mono<String> callableMethod() { return Mono.fromCallable(() -> { Thread.sleep(60); return "Success"; }) .subscribeOn(Schedulers.elastic()) .timeout(Duration.ofMillis(50)) .onErrorResume(throwable -> Mono.just("Timeout")); } In the Mono.fromCallable I am making a

2021-06-13 07:18:09    分类:问答    project-reactor

Project Reactor: How to control Flux emission

I have a flux that emits some Date. This Date is mapped to 1024 simulated HTTP requests that I'm running on some Executer. What I'd like to do is waiting for all the 1024 HTTP requests before emitting the next Date. Currently when running, onNext() is called for many times and then it is stabilised on some steady rate. How can I change this behaviour? P.S. I'm willing to change to architecture, if needed. private void run() throws Exception { Executor executor = Executors.newFixedThreadPool(2); Flux<Date> source = Flux.generate(emitter -> emitter.next(new Date()) ); source .log() .limitRate(1)

2021-06-12 15:36:57    分类:问答    java   reactive-programming   project-reactor   reactive-streams

Reactor Schedulers keep running long after main thread is done?How to deal with this?

I have a question on how to clean up the Scheduler worker threads while using Reactor 3 Flux.range(1, 10000) .publishOn(Schedulers.newElastic("Y")) .doOnComplete(() -> { // WHAT should one do to ensure the worker threads are cleaned up logger.info("Shut down all Scheduler worker threads"); }) .subscribe(x -> logger.debug(x+ "**")); What I see when I execute the above code is that once the main thread has finished running the worker thread(s) is/are still in WAITING state for some time. sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java

2021-06-12 08:57:38    分类:问答    java   project-reactor

在 Spring 中使用什么样的“EventBus”? 内置,Reactor,Akka?(What kind of “EventBus” to use in Spring? Built-in, Reactor, Akka?)

问题 我们将在几周内启动一个新的 Spring 4 应用程序。 我们想使用一些事件驱动的架构。 今年我到处阅读有关“Reactor”的文章,在网上寻找它时,我偶然发现了“Akka”。 所以现在我们有3个选择: Spring 的ApplicationEvent :http://docs.spring.io/spring/docs/4.0.0.RELEASE/javadoc-api/org/springframework/context/ApplicationEvent.html Reactor :https://github.com/reactor/reactor#reactor Akka :http://akka.io/ 我找不到这些的真正比较。 现在我们只需要这样的东西: X注册监听Event E Y注册监听Event E Z发送Event E 然后X和Y将接收并处理该事件。 我们很可能会以异步方式使用它,但肯定也会有一些同步场景。 我们很可能总是发送一个类作为事件。 (Reactor 示例主要使用字符串和字符串模式,但它也支持对象)。 据我了解, ApplicationEvent默认是同步工作的,而Reactor是异步工作的。 Reactor还允许使用await()方法使其有点同步。 Akka提供或多或少与Reactor相同,但也支持 Remoting。 关于 Reactor

2021-06-11 07:52:06    分类:技术分享    multithreading   spring   akka   event-driven-design   project-reactor

What is correct way to generate and handle exceptions when using Flux from projectreactor

I am using io.projectreactor 3 (reactor-core 3.2.6.RELEASE) and I have noticed some discrepancies regarding error handling. Unfortunately, official documentation does not provide enough details to solve my problems. I have following 4 snippets. In some cases exception will be ignored and in other cases it will thrown further. What is the way to actually produce and consume exceptions? Snippet 1 In this case, exception will be ignored and main() will complete without receiving exception. import reactor.core.publisher.Flux; class Scratch { public static void main(String[] args) throws Throwable

2021-06-10 23:42:47    分类:问答    java   reactive-programming   project-reactor

How to limit the number of active Spring WebClient calls

I have a requirement where I read a bunch of rows (thousands) from a SQL DB using Spring Batch and call a REST Service to enrich content before writing them on a Kafka topic. When using the Spring Reactive webClient, how do I limit the number of active non-blocking service calls? Should I somehow introduce a Flux in the loop after I read data using Spring Batch? (I understand the usage of delayElements and that it serves a different purpose, as when a single Get Service Call brings in lot of data and you want the server to slow down -- here though, my use case is a bit different in that I have

2021-06-10 07:40:03    分类:问答    java   webclient   project-reactor   reactor

Akka 或 Reactor [关闭](Akka or Reactor [closed])

问题 关闭。 这个问题是基于意见的。 它目前不接受答案。 想改善这个问题吗? 更新问题,以便通过编辑这篇文章用事实和引文来回答问题。 4年前关闭。 改进这个问题 我正在启动一个新项目(基于 Java)。 我需要将其构建为模块化、分布式和弹性架构。 因此,我希望业务流程能够相互通信,可以互操作,但也可以独立。 我现在正在研究两个框架,除了它们的年龄差异外,还表达了两种不同的观点: 阿卡 (http://akka.io) 反应器(https://github.com/reactor/reactor) 在选择上述框架之一时,我应该考虑什么? 到目前为止,据我所知,Akka 仍然以某种方式耦合(在某种程度上,我必须“选择”我想要向其发送消息的演员),但非常有弹性。 而 Reactor 是松散的(基于事件发布)。 有人可以帮助我了解如何做出正确的决定吗? 更新 在仔细阅读了 Akka 的 Event Bus 之后,我相信 Reactor 表达的功能在某种程度上已经包含在 Akka 中。 例如订阅和事件发布,记录在 https://github.com/reactor/reactor#events-selectors-and-consumers,可以用 Akka 表示如下: final ActorSystem system = ActorSystem.create("system")

2021-06-10 05:56:51    分类:技术分享    java   spring   akka   reactor   project-reactor

How to convert Reactor Flux<String> to InputStream

Given that I have a Flux<String> of unknown size, how can I convert it into InputStream that other library is expecting? For example with WebClient I can achieve that using this approach WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream } but I can't figure out how to do the same when I have Flux<String> as an input?

2021-06-09 18:56:36    分类:问答    kotlin   spring-webflux   project-reactor   reactive-streams