天道酬勤,学无止境

Spark Framework: Listen for server stop

Is there a way to listen for when Spark framework is shutting down, to run some cleanup? For example, I want to close out my ElasticSearch client.

评论

As @Martin Eden explains an approach is to use Runtime.getRuntime().addShutdownHook(...); but this has nothing to do with spark server (jetty) lifecycle. Actually, you could stop the server without stopping the app and this shutdown hook wouldn't run any hook-cleanup added to the runtime. So this would cleanup your app if you stop it.

Another option is to add a Lifecycle (managed) bean bean in Jetty and set the property stop at shutdown as true server.setStopAtShutdown(true);.

EmbeddedServers.add(EmbeddedServers.Identifiers.JETTY, new MyJettyFactory());

static class MyJettyFactory implements EmbeddedServerFactory {

      public EmbeddedServer create(Routes routeMatcher, StaticFilesConfiguration staticFilesConfiguration, boolean hasMultipleHandler) {
          MatcherFilter matcherFilter = new MatcherFilter(routeMatcher, staticFilesConfiguration, false, hasMultipleHandler);
          matcherFilter.init(null);

          final Handler handler = new JettyHandler(matcherFilter);

          return new EmbeddedJettyServer((maxThreads, minThreads, threadTimeoutMillis) -> {
            final Server server = new Server();
            server.setStopAtShutdown(true);
            server.setStopTimeout(Duration.of(30, ChronoUnit.SECONDS).toMillis());
            server.addBean(new ManagedObjects(new Managed() {
                @Override
                public void doStart() {

                }

                @Override
                public void doStop() {
                    System.out.println("Good bye!");
                }
            }));
            return server;
        }, handler);

    }
}

If you don't set a stop shutdown you could register when the spark service stops:

final Service ignite = Service.ignite();
Runtime.getRuntime().addShutdownHook(service::stop);

One approach is to use Runtime.getRuntime().addShutdownHook().

This is a general Java mechanism for running code when the program exits. Since a Spark Framework web application is just a normal web application, this will work. See the Java documentation.

However, this hook will not be run if the VM process is aborted using the SIGKILL signal on Unix or the TerminateProcess call on Microsoft Windows. Note that this applies to pressing the "Stop" button in most IDEs.

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • 创建 SparkContext 失败(Fail to create SparkContext)
    问题 我正在使用 Scala 代码在 spark-shell 中测试 Spark。 我正在构建原型以使用 Kafka 和 Spark。 我运行了如下所示的spark-shell 。 spark-shell --jars ~/spark/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.3.1.jar 我在 shell 中运行了下面的代码。 import kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2) ) 然后我在创建ssc时发现了错误。 spark-shell告诉我如下消息。 scala> val ssc = new
  • Fail to create SparkContext
    I'm testing Spark in spark-shell with scala code. I'm building up the prototype to use Kafka and Spark. I ran the spark-shell like below. spark-shell --jars ~/spark/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.3.1.jar And I ran the code below in the shell. import kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2) )
  • 使用 Spark Web 框架时如何使用本机 Servlet 过滤器?(How can a native Servlet Filter be used when using Spark web framework?)
    问题 我在玩 Spark(Java Web 框架,而不是 Apache Spark)。 我发现定义路由和过滤器真的很好也很容易,但是我希望将本机 servlet 过滤器应用于我的路由,但似乎无法找到一种方法来做到这一点。 更具体地说,我想使用 Jetty 的 DoSFilter,它是一个 servlet 过滤器(与 Spark 过滤器定义相反)。 由于 Spark 使用嵌入式 Jetty,我没有 web.xml 来注册 DoSFilter。 但是,Spark 不公开服务器实例,因此我也找不到以编程方式注册过滤器的优雅方法。 有没有办法将本机 servlet 过滤器应用于我的路由? 我想将 DoSFilter 包装在我自己的 Spark 过滤器中,但这似乎是一个奇怪的想法。 回答1 你可以这样做: public class App { private static Logger LOG = LoggerFactory.getLogger(App.class); public static void main(String[] args) throws Exception { ServletContextHandler mainHandler = new ServletContextHandler(); mainHandler.setContextPath("/base/path")
  • Spark Shell 监听本地主机而不是配置的 IP 地址(Spark Shell Listens on localhost instead of configured IP address)
    问题 我正在尝试通过 spark-shell 运行一个简单的 spark 作业,它看起来像 spark-shell 的 BlockManager 侦听 localhost 而不是配置的 IP 地址,这会导致 spark 作业失败。 抛出的异常是“无法连接到本地主机”。 这是我的配置: 机器 1(ubunt64):Spark master [192.168.253.136] 机器 2(ubuntu64server):Spark Slave [192.168.253.137] 机器 3(ubuntu64server2):Spark Shell 客户端[192.168.253.138] Spark 版本: spark-1.3.0-bin-hadoop2.4环境: Ubuntu 14.04 要在 Spark Shell 中执行的源代码: import org.apache.spark.SparkConf import org.apache.spark.SparkContext var conf = new SparkConf().setMaster("spark://192.168.253.136:7077") conf.set("spark.driver.host","192.168.253.138") conf.set("spark.local.ip","192.168.253.138
  • Spark Streaming from Kafka has error numRecords must not be negative
    Its kind of strange error because I still push data to kafka and consume message from kafka and Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative is kind of strange too. I search and don't get any resource related to. Let me explain my cluster. I have 1 server is master and agents run mesos, on that I set up 3 brokers of kafka like that. Then I run spark-job on that cluster. I am using spark 1.5.2 brokers: id: 0 active: true state: running resources: cpus:1.00, mem:1024, heap:512, port:31000 failover: delay:1m, max-delay:10m
  • 来自 Kafka 的 Spark Streaming 有错误 numRecords 不能为负(Spark Streaming from Kafka has error numRecords must not be negative)
    问题 这是一种奇怪的错误,因为我仍然将数据推送到 kafka 并Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative使用来自 kafka 和Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative消息Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative也有点奇怪。 我搜索并没有得到任何相关的资源。 让我解释一下我的集群。 我有 1 个服务器是主服务器,代理运行 mesos,我在上面设置了 3 个 kafka 代理。 然后我在该集群上运行 spark-job。 我正在使用spark 1.5.2 brokers: id: 0 active: true state: running resources: cpus:1.00, mem:1024, heap:512, port
  • nodejs 没有停止(nodejs without stopping)
    问题 有没有办法让 nodejs 每次服务页面时重新加载? 我想在开发周期中这样做,这样我就可以避免在每次代码更改时都必须关闭和启动? 回答1 编辑:尝试结节及其require.reloadable()函数。 我之前的回答是关于为什么不重新加载 Node.js 的过程,在这里并不真正适用。 但我认为它仍然很重要,所以我把它留在这里。 Node.js 是事件化的 IO ,专门为避免多个线程或进程而设计。 著名的 C10k 问题询问如何同时为 1 万个客户提供服务。 这是线程不能很好地工作的地方。 Node.js 只用一个线程就可以服务 1 万个客户端。 如果您每次都重新启动 Node.js,则会严重削弱 Node.js。 事件 IO是什么意思? 以您的示例为例:提供页面。 每次 Node.js 即将服务一个页面时,事件循环都会调用一个回调。 事件循环是每个 Node.js 应用程序固有的,并在初始化完成后开始运行。 服务器端的 Node.js 与浏览器中的客户端 Javascript 完全一样。 每当事件(鼠标单击、超时等)发生时,都会调用回调 - 一个事件处理程序。 在服务器端? 让我们看一个简单的 HTTP 服务器(源代码示例取自 Node.js 文档) var http = require('http'); http.createServer(function (request
  • WSL2-Ubuntu18.04 Linux环境下部署大数据开发测试环境
    目录 hadoop3.2.1+Hive3.1.2+HBase2.2.4+Phoenix5.0.0+Zookeeper3.6.1+Kylin3.0.2+Kafka2.5.0+Scala2.12+Spark3.0.0+Flink1.10.1+Tez0.10.1 ...Windows子系统WSL 2 部署与应用系统设置初始化部分Mysql 安装部署配置部分WSL 下初始化服务安装部署JavaHadoop 3.2.1Hive 3.1.2HBase 2.2.4Phoenix 5.0.0Zookeeper 3.6.1Kylin 3.0.2Kafka 2.5.0MaxwellsScala 2.12.11Spark 3.0.0Flink 1.10.1Tez 0.10.1启动服务关闭服务 hadoop3.2.1+Hive3.1.2+HBase2.2.4+Phoenix5.0.0+Zookeeper3.6.1+Kylin3.0.2+Kafka2.5.0+Scala2.12+Spark3.0.0+Flink1.10.1+Tez0.10.1 … Windows子系统WSL 2 部署与应用 参考Windows子系统WSL 2 部署与应用 系统设置初始化部分 root@kylin:~# sudo apt update && sudo apt upgrade root@kylin:~# cd /root
  • Apache Spark and Java error - Caused by: java.lang.StringIndexOutOfBoundsException: begin 0, end 3, length 2
    I am new in spark framework. I have tried to create a sample application using spark and java. I have the following code Pom.xml <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> Source import org.apache.spark.SparkConf; import org.apache.spark.api.java.*; public class SparkTest { public static void main(String[] args) { SparkConf sparkConf = new SparkConf() .setAppName("Example Spark App") .setMaster("local[*]"); // Delete this line when submitting to a cluster JavaSparkContext sparkContext = new JavaSparkContext
  • Spark ExecutorLostFailure
    I'm trying to run spark 1.5 on mesos in cluster mode. I'm able to launch the dispatcher and to run the spark-submit. But when I do so, the spark driver fails with the following: I1111 16:21:33.515130 25325 fetcher.cpp:414] Fetcher Info: {"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/2bbe0c3b-433b-45e0-938b-f4d4532df129-S29","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/home\/optimus.prime\/Test.jar"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/2bbe0c3b-433b-45e0-938b-f4d4532df129-S29\/frameworks\/2bbe0c3b-433b-45e0-938b-f4d4532df129-0114\/executors\/driver
  • Linux MySQL 常见无法启动或启动异常的解决方案
    Linux MySQL 常见无法启动或启动异常的解决方案   在 Linux 上自建 MySQL 服务器,经常遇到各种无法启动或启动后异常的问题,本文列举一些常见问题的解决办法。  注意:以下错误日志提示,都是查看 MySQL 错误日志得到,查看方法如下:  查看下 MySQL 配置文件 my.cnf 中有记录,日志记录在 /alidata/log/mysql/error.log 下   MySQL 配置文件 my.cnf 权限问题导致无法启动,错误提示:World-writable config file '/etc/my.cnf' is ignored  Binlog 丢失导致无法启动,错误日志: File './mysql-bin.000001' not found  Binlog 无法读取导致无法启动,错误日志:Failed to open log (file './mysql-bin.000001', errno 13)  不能创建 PID 导致无法启动,错误日志:Can't start server: can't create PID file: No such file or directory  不能创建临时文件导致无法启动,错误日志:mysqld: Can't create/write to file '/tmp/ibfguTtC' (Errcode: 13)
  • Spark ExecutorLostFailure(Spark ExecutorLostFailure)
    问题 我正在尝试以集群模式在 mesos 上运行 spark 1.5。 我能够启动调度程序并运行 spark-submit。 但是当我这样做时,火花驱动程序失败并显示以下内容: I1111 16:21:33.515130 25325 fetcher.cpp:414] Fetcher Info: {"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/2bbe0c3b-433b-45e0-938b-f4d4532df129-S29","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/home\/optimus.prime\/Test.jar"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/2bbe0c3b-433b-45e0-938b-f4d4532df129-S29\/frameworks\/2bbe0c3b-433b-45e0-938b-f4d4532df129-0114\/executors\/driver-20151111162132-0036\/runs\/f0e8f4d7-35cb-4b73-bb5f-1112de2d8156"} I1111 16:21:33.516376 25325 fetcher
  • Linux MySQL 常见无法启动或启动异常的解决方案-学习收藏-20210411
    Linux MySQL 常见无法启动或启动异常的解决方案 Linux MySQL 常见无法启动或启动异常的解决方案   在 Linux 上自建 MySQL 服务器,经常遇到各种无法启动或启动后异常的问题,本文列举一些常见问题的解决办法。   注意:以下错误日志提示,都是查看 MySQL 错误日志得到,查看方法如下:   查看下 MySQL 配置文件 my.cnf 中有记录,日志记录在 /alidata/log/mysql/error.log 下   MySQL 配置文件 my.cnf 权限问题导致无法启动,错误提示:World-writable config file '/etc/my.cnf' is ignored   Binlog 丢失导致无法启动,错误日志: File './mysql-bin.000001' not found   Binlog 无法读取导致无法启动,错误日志:Failed to open log (file './mysql-bin.000001', errno 13)   不能创建 PID 导致无法启动,错误日志:Can't start server: can't create PID file: No such file or directory   不能创建临时文件导致无法启动,错误日志:mysqld: Can't create/write to
  • Spark Shell Listens on localhost instead of configured IP address
    I am trying to run a simple spark job via spark-shell and it looks like BlockManager for the spark-shell listens on localhost instead of configured IP address which causes the spark job to fail. The exception thrown is "Failed to connect to localhost" . Here is the my configuration: Machine 1(ubunt64): Spark master [192.168.253.136] Machine 2(ubuntu64server): Spark Slave [192.168.253.137] Machine 3(ubuntu64server2): Spark Shell Client[192.168.253.138] Spark Version: spark-1.3.0-bin-hadoop2.4 Environment: Ubuntu 14.04 Source Code to be executed in Spark Shell: import org.apache.spark.SparkConf
  • 分布式资源调度——YARN框架
    YARN产生背景 YARN是Hadoop2.x才有的,所以在介绍YARN之前,我们先看一下MapReduce1.x时所存在的问题: 单点故障 节点压力大 不易扩展 MapReduce1.x时的架构如下: 可以看到,1.x时也是Master/Slave这种主从结构,在集群上的表现就是一个JobTracker带多个TaskTracker。 JobTracker:负责资源管理和作业调度TaskTracker:定期向JobTracker汇报本节点的健康状况、资源使用情况以及作业执行情况。还可以接收来自JobTracker的命令,例如启动任务或结束任务等。 那么这种架构存在哪些问题呢: 整个集群中只有一个JobTracker,就代表着会存在单点故障的情况 JobTracker节点的压力很大,不仅要接收来自客户端的请求,还要接收大量TaskTracker节点的请求 由于JobTracker是单节点,所以容易成为集群中的瓶颈,而且也不易域扩展 JobTracker承载的职责过多,基本整个集群中的事情都是JobTracker来管理 1.x版本的整个集群只支持MapReduce作业,其他例如Spark的作业就不支持了 由于1.x版本不支持其他框架的作业,所以导致我们需要根据不同的框架去搭建多个集群。这样就会导致资源利用率比较低以及运维成本过高,因为多个集群会导致服务环境比较复杂。如下图:
  • queue:work --daemon和queue:listen有什么区别(What is the difference between queue:work --daemon and queue:listen)
    问题 我正在设置离线作业服务器。 我已经阅读了文档,但是仍然看不到两个命令之间的区别: artisan queue:work --daemon和artisan queue:listen 。 我应该使用哪个命令来运行守护程序? 回答1 编辑更新2017-04-07: 现在有三种运行队列的方法: queue:work这是新的“守护程序”进程(不再需要该标志)。 该框架将“一次”启动-然后继续循环执行这些作业。 这将无限期地继续下去。 它使用的内存/ CPU少于queue:listen因为框架会一直保持运行状态。 您还必须记住使用queue:restart来强制队列更新在修补过程中推送的所有代码更改。 queue:work --once这将启动框架,处理一项工作,然后关闭。 在开发等过程中进行测试时很有用。 queue:listen这将在每个周期启动框架,处理一个作业,然后完全关闭,然后再次启动框架,并无限期循环。 这意味着在处理每个作业后释放所有内存/进程。 如果您的queue:work内存泄漏,请尝试一下。 --daemon标志不再对这些命令产生影响。 原始答案: 列出了两个不同的问题。 有artisan queue:work和artisan queue:listen queue:work将仅弹出queue:work的下一个作业,并仅处理该作业。 这是一个“一次性”命令
  • 根据Scala和Spark中的频率替换二元组(Replace bigrams based on their frequency in Scala and Spark)
    问题 我想用此模式(word1.concat("-").concat(word2))替换其频率计数大于阈值的所有双(word1.concat("-").concat(word2)) ,我已经尝试过: import org.apache.spark.{SparkConf, SparkContext} object replace { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName("replace") val sc = new SparkContext(conf) val rdd = sc.textFile("data/ddd.txt") val threshold = 2 val searchBigram=rdd.map { _.split('.').map { substrings => // Trim substrings and then tokenize on spaces substrings.trim.split(' '). // Remove non-alphanumeric characters and convert to lowercase map { _.replaceAll( """\W""", "")
  • 如何通过 REST API 提交作业?(How to submit a job via REST API?)
    问题 我正在使用 Datastax Enterprise 4.8.3。 我正在尝试实现基于 Quartz 的应用程序来远程提交 Spark 作业。 在我的研究过程中,我偶然发现了以下链接: Apache Spark 隐藏的 REST API Spark 特性 - 在独立集群模式下提供稳定的应用提交网关 为了测试这个理论,我尝试在我的 2 节点集群的主节点(IP:“spark-master-ip”;直接在 shell 上)上执行以下代码片段(如上面链接 #1 中提供的): curl -X POST http://spark-master-ip:6066/v1/submissions/create --header "Content-Type:application/json;charset=UTF-8" --data '{ "action" : "CreateSubmissionRequest", "appArgs" : [ "myAppArgument1" ], "appResource" : "file:/home/local/sparkjob.jar", "clientSparkVersion" : "1.4.2", "environmentVariables" : { "SPARK_ENV_LOADED" : "1" }, "mainClass" : "com.spark
  • 停止TcpListener的正确方法(Proper way to stop TcpListener)
    问题 我目前正在使用TcpListener来处理传入的连接,每个连接都有一个处理通信的线程,然后关闭该单个连接。 代码如下: TcpListener listener = new TcpListener(IPAddress.Any, Port); System.Console.WriteLine("Server Initialized, listening for incoming connections"); listener.Start(); while (listen) { // Step 0: Client connection TcpClient client = listener.AcceptTcpClient(); Thread clientThread = new Thread(new ParameterizedThreadStart(HandleConnection)); clientThread.Start(client.GetStream()); client.Close(); } listen变量是一个布尔值,它是类中的一个字段。 现在,当程序关闭时,我希望它停止监听客户端。 将listen设置为false将阻止它建立更多连接,但是由于AcceptTcpClient是阻塞调用,因此它至少会占用下一个客户端,然后退出。
  • Spark Streaming(一)—— Spark Streaming介绍
    文章目录 1. 什么是Spark Streaming2. Spark Streaming特点3. 常用的实时计算引擎4. Spark Streaming内部结构5. StreamingContext对象创建方式6. 离散流DStream6.1 什么是DStream6.2 DStream中的算子 7. 窗口 1. 什么是Spark Streaming Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 易于构建灵活的、高容错的流式系统。 流式计算框架 Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且还可以在数据流上应用Spark提供的机器学习和图处理算法。 2. Spark Streaming特点 易用:已经集成在Spark中容错性:底层也是RDD,RDD本身就具备了容错机制。支持多种语言:Java Scala Python 3. 常用的实时计算引擎