天道酬勤,学无止境

Spark 框架:监听服务器停止(Spark Framework: Listen for server stop)

问题

有没有办法在 Spark 框架关闭时进行侦听,以进行一些清理? 例如,我想关闭我的 ElasticSearch 客户端。

回答1

正如@Martin Eden 解释的那样,一种方法是使用Runtime.getRuntime().addShutdownHook(...); 但这与spark服务器(码头)生命周期无关。 实际上,您可以在不停止应用程序的情况下停止服务器,并且此关闭钩子不会运行添加到runtime任何钩子清理。 所以如果你停止它,这会清理你的应用程序。

另一种选择是在 Jetty 中添加一个 Lifecycle(托管)bean bean,并将属性 stop at shutdown 设置为 true server.setStopAtShutdown(true); .

EmbeddedServers.add(EmbeddedServers.Identifiers.JETTY, new MyJettyFactory());

static class MyJettyFactory implements EmbeddedServerFactory {

      public EmbeddedServer create(Routes routeMatcher, StaticFilesConfiguration staticFilesConfiguration, boolean hasMultipleHandler) {
          MatcherFilter matcherFilter = new MatcherFilter(routeMatcher, staticFilesConfiguration, false, hasMultipleHandler);
          matcherFilter.init(null);

          final Handler handler = new JettyHandler(matcherFilter);

          return new EmbeddedJettyServer((maxThreads, minThreads, threadTimeoutMillis) -> {
            final Server server = new Server();
            server.setStopAtShutdown(true);
            server.setStopTimeout(Duration.of(30, ChronoUnit.SECONDS).toMillis());
            server.addBean(new ManagedObjects(new Managed() {
                @Override
                public void doStart() {

                }

                @Override
                public void doStop() {
                    System.out.println("Good bye!");
                }
            }));
            return server;
        }, handler);

    }
}

如果您没有设置停止关闭,您可以在 spark 服务停止时注册:

final Service ignite = Service.ignite();
Runtime.getRuntime().addShutdownHook(service::stop);
回答2

一种方法是使用Runtime.getRuntime().addShutdownHook()

这是程序退出时运行代码的通用Java机制。 由于 Spark 框架 web 应用程序只是一个普通的 web 应用程序,这将起作用。 请参阅 Java 文档。

但是,如果使用 Unix 上的 SIGKILL 信号或 Microsoft Windows 上的 TerminateProcess 调用中止 VM 进程,则不会运行此挂钩。 请注意,这适用于在大多数 IDE 中按下“停止”按钮。

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • 使用 Spark Web 框架时如何使用本机 Servlet 过滤器?(How can a native Servlet Filter be used when using Spark web framework?)
    问题 我在玩 Spark(Java Web 框架,而不是 Apache Spark)。 我发现定义路由和过滤器真的很好也很容易,但是我希望将本机 servlet 过滤器应用于我的路由,但似乎无法找到一种方法来做到这一点。 更具体地说,我想使用 Jetty 的 DoSFilter,它是一个 servlet 过滤器(与 Spark 过滤器定义相反)。 由于 Spark 使用嵌入式 Jetty,我没有 web.xml 来注册 DoSFilter。 但是,Spark 不公开服务器实例,因此我也找不到以编程方式注册过滤器的优雅方法。 有没有办法将本机 servlet 过滤器应用于我的路由? 我想将 DoSFilter 包装在我自己的 Spark 过滤器中,但这似乎是一个奇怪的想法。 回答1 你可以这样做: public class App { private static Logger LOG = LoggerFactory.getLogger(App.class); public static void main(String[] args) throws Exception { ServletContextHandler mainHandler = new ServletContextHandler(); mainHandler.setContextPath("/base/path")
  • Spark Shell 监听本地主机而不是配置的 IP 地址(Spark Shell Listens on localhost instead of configured IP address)
    问题 我正在尝试通过 spark-shell 运行一个简单的 spark 作业,它看起来像 spark-shell 的 BlockManager 侦听 localhost 而不是配置的 IP 地址,这会导致 spark 作业失败。 抛出的异常是“无法连接到本地主机”。 这是我的配置: 机器 1(ubunt64):Spark master [192.168.253.136] 机器 2(ubuntu64server):Spark Slave [192.168.253.137] 机器 3(ubuntu64server2):Spark Shell 客户端[192.168.253.138] Spark 版本: spark-1.3.0-bin-hadoop2.4环境: Ubuntu 14.04 要在 Spark Shell 中执行的源代码: import org.apache.spark.SparkConf import org.apache.spark.SparkContext var conf = new SparkConf().setMaster("spark://192.168.253.136:7077") conf.set("spark.driver.host","192.168.253.138") conf.set("spark.local.ip","192.168.253.138
  • Spark Framework: Listen for server stop
    Is there a way to listen for when Spark framework is shutting down, to run some cleanup? For example, I want to close out my ElasticSearch client.
  • spark-streaming总结
    sparkStreaming总结 文章目录 sparkStreaming总结1、sparkStreaming概述1.1 SparkStreaming是什么1.2 SparkStreaming的组件 2、Spark Streaming编码实践3、Spark Streaming的状态操作3.1 updateStateByKey案例:updateStateByKey3.2 Windows 1、sparkStreaming概述 1.1 SparkStreaming是什么 它是一个可扩展,高吞吐具有容错性的流式计算框架 吞吐量:单位时间内成功传输数据的数量 之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。 但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。 实时计算框架对比 Storm 流式计算框架以record为单位处理数据也支持micro-batch方式(Trident) Spark 批处理计算框架以RDD为单位处理数据支持micro-batch流式处理数据(Spark Streaming)
  • nodejs 没有停止(nodejs without stopping)
    问题 有没有办法让 nodejs 每次服务页面时重新加载? 我想在开发周期中这样做,这样我就可以避免在每次代码更改时都必须关闭和启动? 回答1 编辑:尝试结节及其require.reloadable()函数。 我之前的回答是关于为什么不重新加载 Node.js 的过程,在这里并不真正适用。 但我认为它仍然很重要,所以我把它留在这里。 Node.js 是事件化的 IO ,专门为避免多个线程或进程而设计。 著名的 C10k 问题询问如何同时为 1 万个客户提供服务。 这是线程不能很好地工作的地方。 Node.js 只用一个线程就可以服务 1 万个客户端。 如果您每次都重新启动 Node.js,则会严重削弱 Node.js。 事件 IO是什么意思? 以您的示例为例:提供页面。 每次 Node.js 即将服务一个页面时,事件循环都会调用一个回调。 事件循环是每个 Node.js 应用程序固有的,并在初始化完成后开始运行。 服务器端的 Node.js 与浏览器中的客户端 Javascript 完全一样。 每当事件(鼠标单击、超时等)发生时,都会调用回调 - 一个事件处理程序。 在服务器端? 让我们看一个简单的 HTTP 服务器(源代码示例取自 Node.js 文档) var http = require('http'); http.createServer(function (request
  • 盘点2019年晋升为Apache TLP以及进去Apache孵化器的大数据相关项目
    盘点2019年晋升为Apache TLP以及进去Apache孵化器的大数据相关项目 过往记忆大数据 过往记忆大数据 今天是 2019年的最后一天了,明天就是新的一年,在这里预祝大家元旦快乐!也感谢大家过去一年对小编的支持!在过去两年,本博客盘点了当年晋升为 Apache TLP(Apache Top-Level Project) 的大数据相关项目,具体参见《盘点2017年晋升为Apache TLP的大数据相关项目》、《盘点2018年晋升为Apache TLP的大数据相关项目》,继承这个惯例,本文将给大家盘点2019年晋升为 Apache TLP 的大数据相关项目,由于今年晋升成 TLP 的大数据项目很少,只有三个,而且其中两个好像和我们日常关系不大;所以这次我把今年提交到 Apache 孵化器的大数据相关项目也加进来了。项目的介绍从孵化器毕业的时间开始排的,加上几年进入孵化的项目,一共有六个,具体如下。 Apache Airflow:开源分布式任务调度框架 Apache Airflow 是一个灵活的,可扩展的工作流自动化和调度系统,用于管理数百 PB 的大数据处理管道。该项目最初由 Airbnb 于2014年开发,于2016年03月进入提交给 Apache 孵化器,2019年01月08日 Apache 基金会正式宣布其成为 Apache 顶级项目。 Apache Airflow
  • Linux MySQL 常见无法启动或启动异常的解决方案
    Linux MySQL 常见无法启动或启动异常的解决方案   在 Linux 上自建 MySQL 服务器,经常遇到各种无法启动或启动后异常的问题,本文列举一些常见问题的解决办法。  注意:以下错误日志提示,都是查看 MySQL 错误日志得到,查看方法如下:  查看下 MySQL 配置文件 my.cnf 中有记录,日志记录在 /alidata/log/mysql/error.log 下   MySQL 配置文件 my.cnf 权限问题导致无法启动,错误提示:World-writable config file '/etc/my.cnf' is ignored  Binlog 丢失导致无法启动,错误日志: File './mysql-bin.000001' not found  Binlog 无法读取导致无法启动,错误日志:Failed to open log (file './mysql-bin.000001', errno 13)  不能创建 PID 导致无法启动,错误日志:Can't start server: can't create PID file: No such file or directory  不能创建临时文件导致无法启动,错误日志:mysqld: Can't create/write to file '/tmp/ibfguTtC' (Errcode: 13)
  • 网易Spark Kyuubi核心架构设计与源码实现剖析
    版权声明:本文为xpleaf(香飘叶子)博主原创文章,遵循CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文较为系统、全面并且由浅入深地介绍了网易Spark Kyuubi出现的背景、核心架构设计与关键源码实现,是学习、应用和对Kyuubi进行二次开发不可多得的技术干货,但由于作者认知水平有限,文中难免会出现描述不准确的措辞,还请多多包容和指出。 1 概述 Kyuubi是网易数帆旗下易数大数据团队开源的一个高性能的通用JDBC和SQL执行引擎,建立在Apache Spark之上,Kyuubi的出现,较好的弥补了Spark ThriftServer在多租户、资源隔离和高可用等方面的不足,是一个真正可以满足大多数生产环境场景的开源项目。 通过分析Spark ThriftServer的设计与不足,本文会逐渐带你深入理解Kyuubi的核心设计与实现,同时会选取多个关键场景来剖析其源码,通过本文的阅读,希望能让读者对网易Kyuubi的整体架构设计有一个较为清晰的理解,并能够用在自己的生产环境中解决更多实际应用问题。 本文主要主要选取Kyuubi 1.1.0版本来对其设计与实现进行分析,后续的版本迭代社区加入了数据湖等概念和实现,本文不会对这方面的内容进行探讨。 2 Spark ThriftServer的设计、实现与不足 2.1 产生背景 在最初使用Spark时
  • 如何通过 REST API 提交作业?(How to submit a job via REST API?)
    问题 我正在使用 Datastax Enterprise 4.8.3。 我正在尝试实现基于 Quartz 的应用程序来远程提交 Spark 作业。 在我的研究过程中,我偶然发现了以下链接: Apache Spark 隐藏的 REST API Spark 特性 - 在独立集群模式下提供稳定的应用提交网关 为了测试这个理论,我尝试在我的 2 节点集群的主节点(IP:“spark-master-ip”;直接在 shell 上)上执行以下代码片段(如上面链接 #1 中提供的): curl -X POST http://spark-master-ip:6066/v1/submissions/create --header "Content-Type:application/json;charset=UTF-8" --data '{ "action" : "CreateSubmissionRequest", "appArgs" : [ "myAppArgument1" ], "appResource" : "file:/home/local/sparkjob.jar", "clientSparkVersion" : "1.4.2", "environmentVariables" : { "SPARK_ENV_LOADED" : "1" }, "mainClass" : "com.spark
  • 创建 SparkContext 失败(Fail to create SparkContext)
    问题 我正在使用 Scala 代码在 spark-shell 中测试 Spark。 我正在构建原型以使用 Kafka 和 Spark。 我运行了如下所示的spark-shell 。 spark-shell --jars ~/spark/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.3.1.jar 我在 shell 中运行了下面的代码。 import kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2) ) 然后我在创建ssc时发现了错误。 spark-shell告诉我如下消息。 scala> val ssc = new
  • Fail to create SparkContext
    I'm testing Spark in spark-shell with scala code. I'm building up the prototype to use Kafka and Spark. I ran the spark-shell like below. spark-shell --jars ~/spark/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.3.1.jar And I ran the code below in the shell. import kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2) )
  • Linux MySQL 常见无法启动或启动异常的解决方案-学习收藏-20210411
    Linux MySQL 常见无法启动或启动异常的解决方案 Linux MySQL 常见无法启动或启动异常的解决方案   在 Linux 上自建 MySQL 服务器,经常遇到各种无法启动或启动后异常的问题,本文列举一些常见问题的解决办法。   注意:以下错误日志提示,都是查看 MySQL 错误日志得到,查看方法如下:   查看下 MySQL 配置文件 my.cnf 中有记录,日志记录在 /alidata/log/mysql/error.log 下   MySQL 配置文件 my.cnf 权限问题导致无法启动,错误提示:World-writable config file '/etc/my.cnf' is ignored   Binlog 丢失导致无法启动,错误日志: File './mysql-bin.000001' not found   Binlog 无法读取导致无法启动,错误日志:Failed to open log (file './mysql-bin.000001', errno 13)   不能创建 PID 导致无法启动,错误日志:Can't start server: can't create PID file: No such file or directory   不能创建临时文件导致无法启动,错误日志:mysqld: Can't create/write to
  • 分布式资源调度——YARN框架
    YARN产生背景 YARN是Hadoop2.x才有的,所以在介绍YARN之前,我们先看一下MapReduce1.x时所存在的问题: 单点故障 节点压力大 不易扩展 MapReduce1.x时的架构如下: 可以看到,1.x时也是Master/Slave这种主从结构,在集群上的表现就是一个JobTracker带多个TaskTracker。 JobTracker:负责资源管理和作业调度TaskTracker:定期向JobTracker汇报本节点的健康状况、资源使用情况以及作业执行情况。还可以接收来自JobTracker的命令,例如启动任务或结束任务等。 那么这种架构存在哪些问题呢: 整个集群中只有一个JobTracker,就代表着会存在单点故障的情况 JobTracker节点的压力很大,不仅要接收来自客户端的请求,还要接收大量TaskTracker节点的请求 由于JobTracker是单节点,所以容易成为集群中的瓶颈,而且也不易域扩展 JobTracker承载的职责过多,基本整个集群中的事情都是JobTracker来管理 1.x版本的整个集群只支持MapReduce作业,其他例如Spark的作业就不支持了 由于1.x版本不支持其他框架的作业,所以导致我们需要根据不同的框架去搭建多个集群。这样就会导致资源利用率比较低以及运维成本过高,因为多个集群会导致服务环境比较复杂。如下图:
  • 实时计算框架:Spark集群搭建与入门案例
    一、Spark概述 1、Spark简介 Spark是专为大规模数据处理而设计的,基于内存快速通用,可扩展的集群计算引擎,实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流,运算速度相比于MapReduce得到了显著的提高。 2、运行结构 Driver 运行Spark的Applicaion中main()函数,会创建SparkContext,SparkContext负责和Cluster-Manager进行通信,并负责申请资源、任务分配和监控等。 ClusterManager 负责申请和管理在WorkerNode上运行应用所需的资源,可以高效地在一个计算节点到数千个计算节点之间伸缩计算,目前包括Spark原生的ClusterManager、ApacheMesos和HadoopYARN。 Executor Application运行在WorkerNode上的一个进程,作为工作节点负责运行Task任务,并且负责将数据存在内存或者磁盘上,每个 Application都有各自独立的一批Executor,任务间相互独立。 二、环境部署 1、Scala环境 安装包管理 [root@hop01 opt]# tar -zxvf scala-2.12.2.tgz [root@hop01 opt]# mv scala-2.12.2 scala2.12 配置变量 [root@hop01 opt]#
  • 【Spark Streaming】Spark Streaming的使用
    一、Spark Streaming引入 集群监控 一般的大型集群和平台, 都需要对其进行监控的需求。 要针对各种数据库, 包括 MySQL, HBase 等进行监控 要针对应用进行监控, 例如 Tomcat, Nginx, Node.js 等 要针对硬件的一些指标进行监控, 例如 CPU, 内存, 磁盘 等 Spark Streaming介绍 官网:http://spark.apache.org/streaming/ Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。 Spark Streaming的特点 1.易用 可以像编写离线批处理一样去编写流式程序,支持java/scala/python语言。 2.容错 SparkStreaming在没有额外代码和配置的情况下可以恢复丢失的工作。 3.易整合到Spark体系 流式处理与批处理和交互式查询相结合。 实时计算所处的位置 二、Spark Streaming原理 1、SparkStreaming原理 整体流程 Spark Streaming中,会有一个接收器组件Receiver,作为一个长期运行的task跑在一个Executor上。Receiver接收外部的数据流形成input DStream
  • 大数据之Spark环境搭建集群模式Standalone HA
    环境搭建-Standalone HA 高可用HA Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF)的问题。 如何解决这个单点故障的问题,Spark提供了两种方案: 1.基于文件系统的单点恢复(Single-Node Recovery with Local File System)--只能用于开发或测试环境。 2.基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)--可以用于生产环境。 ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。 基于Zookeeper实现HA 官方文档:http://spark.apache.org/docs/2.4.5/spark
  • Bookmarks(三)
    Bookmarks 书签栏 tooltips提示效果,支持点击与经过显示,位置和效果可以自定义 - CSDN博客 疯狂的小萝卜头 - 博客园 【Kettle从零开始】第九弹之Kettle定时任务介绍 - RotKang - CSDN博客 Freemarker模版 Java开源Web开发框架分类列表 HTML5模板引擎 Thymeleaf 教程 - OPEN 开发经验库 Spring MVC视图层:thymeleaf vs. JSP - OPEN 开发经验库 thymeleaf 学习笔记-基础篇 - OPEN 开发经验库 HTML5模板引擎 Thymeleaf - OPEN 开发经验库 Freemarker中Configuration的setClassForTemplateLoading方法参数问题 - format丶 - 博客园 利用freemarker 静态化网页 - ajun_studio的专栏 - CSDN博客 关于Freemarker生成静态html文件及中文乱码的问题 - it_man的专栏 - CSDN博客 FreeMarker入门教程 - Raul·Fu - 博客园 最近在看的freemarker,个人认为有助于理解 - 老夫聊发少年狂 - CSDN博客 SpringMVC处理脚本,SQL注入问题 - 逍遥叹!! - 博客园 使用freemarker生成word
  • Spark Streaming架构原理详解!
    目录 一、Spark Streaming功能介绍(1)概述(2)DStream概述(3)Storm和Spark Streaming比较 二、Spark Streaming服务架构及工作原理三、StreamingContext原理详解四、DStream和Receiver详解五、Spark Streaming基于HDFS的实时计算开发六、Spark Streaming读取并处理Socket流数据七、Spark Streaming结果数据保存到MySQL数据库八、Spark Streaming与Kafka集成进行数据处理九、Spark Streaming 集成Kafka开发- 基于Direct的方式 一、Spark Streaming功能介绍 (1)概述 Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理.Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。在“One
  • 如何停止在Spark控制台上显示的INFO消息?(How to stop INFO messages displaying on spark console?)
    问题 我想停止Spark Shell上出现的各种消息。 我试图编辑log4j.properties文件以停止这些消息。 这是log4j.properties的内容 # Define the root logger with appender file log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl
  • Spark Streaming(一)—— Spark Streaming介绍
    文章目录 1. 什么是Spark Streaming2. Spark Streaming特点3. 常用的实时计算引擎4. Spark Streaming内部结构5. StreamingContext对象创建方式6. 离散流DStream6.1 什么是DStream6.2 DStream中的算子 7. 窗口 1. 什么是Spark Streaming Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 易于构建灵活的、高容错的流式系统。 流式计算框架 Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且还可以在数据流上应用Spark提供的机器学习和图处理算法。 2. Spark Streaming特点 易用:已经集成在Spark中容错性:底层也是RDD,RDD本身就具备了容错机制。支持多种语言:Java Scala Python 3. 常用的实时计算引擎