天道酬勤,学无止境

Executors

如何以线程安全的方式访问 ThreadpoolExecutor 的底层队列(How to access the underlying queue of a ThreadpoolExecutor in a thread safe way)

问题 getQueue() 方法提供对 ThreadPoolExecutor 中底层阻塞队列的访问,但这似乎并不安全。 遍历此函数返回的队列可能会错过 ThreadPoolExecutor 对队列所做的更新。 “方法 getQueue() 允许出于监视和调试的目的访问工作队列。强烈建议不要将此方法用于任何其他目的。” 如果你想遍历 ThreadPoolExecutor 使用的 workQueue,你会怎么做? 或者有替代方法吗? 这是.. 为生产者消费者问题的变体选择数据结构的延续 现在,我正在尝试多个生产者多个消费者,但我想使用一些现有的线程池,因为我不想自己管理线程池,而且我还想要在 ThreadPoolExecutor 完成执行某些任务时进行回调以及检查的能力以线程安全的方式处理“正在进行的事务”数据结构。 回答1 您可以覆盖 beforeExecute 和 afterExecute 方法,让您知道任务已经开始和完成。 您可以覆盖 execute() 以了解何时添加任务。 您遇到的问题是 Queue 不是为查询而设计的,并且可以在您看到任务之前使用它。 解决此问题的一种方法是创建您自己的队列实现(可能覆盖/包装 ConcurrentLinkedQueue) 顺便说一句:队列是线程安全的,但是不能保证您会看到每个条目。 A ConcurrentLinkedQueue

2021-11-11 06:59:09    分类:技术分享    java   Executors

How to access the underlying queue of a ThreadpoolExecutor in a thread safe way

The getQueue() method provides access to the underlying blocking queue in the ThreadPoolExecutor, but this does not seem to be safe. A traversal over the queue returned by this function might miss updates made to the queue by the ThreadPoolExecutor. "Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged." What would you do if you wanted to traverse the workQueue used by the ThreadPoolExecutor? Or is there an alternate approach? This is a continuation of.. Choosing a data structure for a

2021-11-06 01:39:19    分类:问答    java   Executors

如何从 spark 执行程序中读取/写入配置单元表(How to read/write a hive table from within the spark executors)

问题 我有一个要求,其中我使用 DStream 从 Kafka 检索消息。 现在,在获取消息或 RDD 之后,我使用映射操作在执行器上独立处理消息。 我面临的一个挑战是我需要从执行程序中读取/写入配置单元表,为此我需要访问 SQLContext。 但据我所知 SparkSession 仅在驱动程序端可用,不应在执行程序中使用。 现在没有 spark 会话(在 spark 2.1.1 中)我无法掌握 SQLContext。 总结一下我的驱动程序代码看起来像: if (inputDStream_obj.isSuccess) { val inputDStream = inputDStream_obj.get inputDStream.foreachRDD(rdd => { if (!rdd.isEmpty) { val rdd1 = rdd.map(idocMessage => SegmentLoader.processMessage(props, idocMessage.value(), true)) } } 所以在这个 rdd.map 之后,下一个代码在执行器上执行,我有类似的东西: val sqlContext = spark.sqlContext import sqlContext.implicits._ spark.sql("USE " + databaseName) val

2021-10-15 17:50:51    分类:技术分享    apache-spark   Executors

如何知道线程何时停止或停止?(How to know when a thread stops or is stopped?)

问题 我有下一个代码: Executor exe = Executors.newFixedThreadPool(20); while (true) { try { exe.execute(new DispatcherThread(serverSocket.accept())); continue; } catch (SocketException sExcp) { System.exit(-1); } catch (Exception excp) { System.exit(-1); } } 对于每个DispatcherThread我都创建了一个到数据库的连接(这意味着我有 20 个连接),我需要知道的是,当线程停止或停止或完成其流程时,如何关闭与数据库的连接。 回答1 您无法直接知道线程何时停止,您拥有的最接近的方法是Thread#isAlive方法,但是当线程中的run方法完成时它可能会返回false ,但线程可能不会停止,因为 JVM 无法保证它。 但是如果你的DispatcherThread类实现了Runnable接口,那么你可以在run方法的底部编写清理。 骨架代码: class DispatcherThread implements Runnable { @Override public void run() { try { //open database

2021-10-10 06:15:36    分类:技术分享    java   database   multithreading   runnable   Executors

How to know when a thread stops or is stopped?

I have the next code: Executor exe = Executors.newFixedThreadPool(20); while (true) { try { exe.execute(new DispatcherThread(serverSocket.accept())); continue; } catch (SocketException sExcp) { System.exit(-1); } catch (Exception excp) { System.exit(-1); } } For each DispatcherThread I create a connection to the database (it means I have 20 connections), what I need to know is how I can close the connection to the database when the thread is stopped or it stops or finishes its flow.

2021-10-10 05:54:45    分类:问答    java   database   multithreading   runnable   Executors

如何正确使用 ScheduledExecutorService?(How to correctly use ScheduledExecutorService?)

问题 所以这是我第一次使用 ScheduledFuture,我承认我在这里可能有点不知所措。 我似乎无法让下面的示例工作。 目标只是采取两组动作,每组动作都有自己的超时,然后再继续下一组并无限重复。 static ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); ScheduledFuture<?> warm = executor.scheduleWithFixedDelay(() -> { System.out.println("warmbeans"); //do more stuff here }, 0, 1000, TimeUnit.MILLISECONDS); ScheduledFuture<?> cool = executor.scheduleWithFixedDelay(() -> { System.out.println("coolbeans"); //do more stuff here }, 0, 500, TimeUnit.MILLISECONDS); while(true) { try {warm.get(1000, TimeUnit.MILLISECONDS);} catch (InterruptedException | ExecutionException |

2021-09-20 19:01:04    分类:技术分享    java   scheduledexecutorservice   Executors

如何正确使用 ScheduledExecutorService?(How to correctly use ScheduledExecutorService?)

问题 所以这是我第一次使用 ScheduledFuture,我承认我在这里可能有点不知所措。 我似乎无法让下面的示例工作。 目标只是采取两组动作,每组动作都有自己的超时,然后再继续下一组并无限重复。 static ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); ScheduledFuture<?> warm = executor.scheduleWithFixedDelay(() -> { System.out.println("warmbeans"); //do more stuff here }, 0, 1000, TimeUnit.MILLISECONDS); ScheduledFuture<?> cool = executor.scheduleWithFixedDelay(() -> { System.out.println("coolbeans"); //do more stuff here }, 0, 500, TimeUnit.MILLISECONDS); while(true) { try {warm.get(1000, TimeUnit.MILLISECONDS);} catch (InterruptedException | ExecutionException |

2021-09-20 18:59:32    分类:技术分享    java   scheduledexecutorservice   Executors

Runnable 和 Callable 上的 ThreadPool / Executor - 可以自动退出 java8(ThreadPool / Executor on both Runnable and Callable - that can exits automatically java8)

问题 我正在尝试在我的应用程序中使用 ThreadPoolExecutor/ExecutorService - 它是一个静态全局对象。 我使用:Executors.newScheduledThreadPool(corePoolSize) - 但我在关闭 executorService 时遇到问题。 如果我不调用 shutdown() + awaitTermination() - 那么我的应用程序将不会完成 - 即使所有线程都已完成。 我的应用程序具有由其他线程创建的线程 - 所以我不能在代码中的任何地方放置一个 shutdown() 而不阻塞更多要运行的线程。 有没有办法在所有线程完成后让 java shutdown() 执行程序服务。 谢谢 -- 此外,这个问题不是重复的-守护程序修复使我的应用程序在所有线程完成之前退出(请参阅评论)-对此有什么解决方法? 谢谢

2021-09-19 05:55:46    分类:技术分享    java   multithreading   executorservice   threadpoolexecutor   Executors

ThreadPool / Executor on both Runnable and Callable - that can exits automatically java8

I am trying to use a ThreadPoolExecutor/ExecutorService in my application - it is a static global object. I use: Executors.newScheduledThreadPool(corePoolSize) - but I am having problems with shutting down the executorService. If I don't call shutdown() + awaitTermination() - then my application won't finish -even if all threads are completed. My application has threads being created by other threads - so I can't put a shutdown() anywhere in the code without blocking further threads to be run. Is there a way to let java shutdown() the executor service when all threads are completed. Thanks --

2021-09-04 12:30:22    分类:问答    java   multithreading   executorservice   threadpoolexecutor   Executors

ScheduledThreadPoolExecutors 和自定义队列(ScheduledThreadPoolExecutors and custom queue)

问题 如果我使用ThreadPoolExecutor我有各种构造函数,我可以为池的工作队列传递/使用我自己的队列。 现在我看到ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,但构造函数要少得多。 有没有办法使用ScheduledThreadPoolExecutor并仍然使用我自己的工作队列? 回答1 您可以扩展ScheduledThreadPoolExecutor类并使用与绑定到当前ScheduledThreadPoolExecutor实现的DelayedWorkQueue不同的队列。 请注意, DelayedWorkQueue只是在幕后使用DelayQueue的BlockingQueue实现。 但是,如果您只需要配置 min、max、keepAlive 或其他参数(不需要更改DelayedWorkQueue ),您将只扩展ThreadPoolExecutor (类似于ScheduledThreadPoolExecutor正在执行的操作)并且在您的构造函数中您将执行类似于ScheduledThreadPoolExecutor构造函数现在正在做,委托给ThreadPoolExecutor像: super(min, max, keepAliveTime, TimeUnit.NANOSECONDS, new CustomQueue()

2021-08-11 03:08:42    分类:技术分享    java   multithreading   concurrency   executorservice   Executors