天道酬勤,学无止境

为什么在使用扩展应用程序时火花广播效果不佳?(Why spark broadcast doesn't work well when I use extends App?)

问题

第一个代码抛出空指针异常。

object TryBroadcast extends App{
  val conf = new SparkConf().setAppName("o_o")
  val sc = new SparkContext(conf)
  val sample = sc.parallelize(1 to 1024)
  val bro = sc.broadcast(6666)
  val broSample = sample.map(x => x.toString + bro.value)
  broSample.collect().foreach(println)
}

第二个效果很好。

object TryBroadcast {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("o_o")
    val sc = new SparkContext(conf)
    val sample = sc.parallelize(1 to 1024)
    val bro = sc.broadcast(6666)
    val broSample = sample.map(x => x.toString + bro.value)
    broSample.collect().foreach(println)
  }
}

似乎火花广播与 scala.App 有冲突

Scala 版本:2.10.5 火花版本:1.4.0 堆栈跟踪:

lang.NullPointerException
    at TryBroadcast$$anonfun$1.apply(TryBroadcast.scala:11)
    at TryBroadcast$$anonfun$1.apply(TryBroadcast.scala:11)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
回答1

bro在这两种情况下是完全不同的。 在第一个中,它是单例类实例( TryBroadcast )上的一个字段。 在第二个中,它是一个局部变量。

局部变量被捕获、序列化并发送给执行程序。 在第一种情况下,引用是对一个字段,因此单例将被捕获并发送。 我不确定如何构建 Scala 单例以及如何捕获它。 显然,在这种情况下,在执行程序上访问它时它最终未初始化。

你可以让bro成为这样的局部变量:

object TryBroadcast extends App {
  val conf = new SparkConf().setAppName("o_o")
  val sc = new SparkContext(conf)
  val sample = sc.parallelize(1 to 1024)
  val broSample = {
    val bro = sc.broadcast(6666)
    sample.map(x => x.toString + bro.value)
  }
  broSample.collect().foreach(println)
}
回答2

它没有很好的文档记录,但建议使用def main(args: Array[String]): Unit = ??? 而不是extends App

见 https://issues.apache.org/jira/browse/SPARK-4170 和 https://github.com/apache/spark/pull/3497

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • 火花可扩展性:我做错了什么?(spark scalability: what am I doing wrong?)
    问题 我正在使用 spark 处理数据,它可以处理一天的数据(40G),但在一周的数据上因OOM失败: import pyspark import datetime import operator sc = pyspark.SparkContext() sqc = pyspark.sql.SQLContext(sc) sc.union([sqc.parquetFile(hour.strftime('.....')) .map(lambda row:(row.id, row.foo)) for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \ .reduceByKey(operator.add).saveAsTextFile("myoutput") 不同ID的数量小于10k。 每个 ID 都是一个很小的int 。 作业失败是因为太多执行程序因 OOM 而失败。 当作业成功(在小输入上)时, "myoutput"大约是 100k。 我究竟做错了什么? 我试着更换saveAsTextFile与collect (因为我实际上希望做一些切片和保存之前在python切割),但在行为上是没有任何区别,同样的失败。 这是意料之中的吗? 我曾经使用过reduce(lambda x,y: x.union(y), [sqc
  • 如何在火花中对每个执行器执行一次操作(How to perform one operation on each executor once in spark)
    问题 我有一个存储在 S3 中的 weka 模型,其大小约为 400MB。 现在,我有一组记录,我想在这些记录上运行模型并执行预测。 为了执行预测,我尝试过的是, 在驱动程序上下载并加载模型作为静态对象,将其广播给所有执行程序。 对预测 RDD 执行映射操作。 ----> 不工作,如在 Weka 中执行预测,模型对象需要修改并且广播需要只读副本。 将模型作为静态对象下载并加载到驱动程序上,并在每个映射操作中将其发送到执行程序。 -----> 工作(效率不高,因为在每个地图操作中,我传递了 400MB 对象) 在驱动程序上下载模型并将其加载到每个执行器上并将其缓存在那里。 (不知道怎么弄) 有人知道如何在每个执行器上加载模型一次并缓存它,以便我不会再次加载其他记录吗? 回答1 您有两个选择: 1. 创建一个用惰性 val 表示数据的单例对象: object WekaModel { lazy val data = { // initialize data here. This will only happen once per JVM process } } 然后,您可以在map函数中使用惰性 val。 lazy val确保每个工作 JVM 初始化他们自己的数据实例。 不会对data执行序列化或广播。 elementsRDD.map { element => // use
  • 缓存是map-reduce相对于map-reduce的唯一优势吗?(Is caching the only advantage of spark over map-reduce?)
    问题 我已经开始学习Apache Spark,并且对该框架印象深刻。 尽管困扰我的一件事是,在所有Spark演示中,他们都在谈论Spark如何缓存RDD,因此需要相同数据的多个操作比诸如Map Reduce的其他方法要快。 所以我的问题是,如果是这种情况,那么只需在MR框架(例如Yarn / Hadoop)内部添加一个缓存引擎。 为什么要完全创建一个新框架? 我敢肯定,我在这里缺少一些东西,您将能够为我提供一些文档,使我对火花有更多的了解。 回答1 内存计算中的缓存+对于火花来说绝对是一件大事,但是还有其他事情。 RDD(弹性分布式数据集):RDD是spark的主要抽象。 它允许通过重新计算DAG来恢复故障节点,同时还通过检查点支持与Hadoop更相似的恢复方式,以减少RDD的依赖性。 在DAG中存储spark作业可以进行RDD的延迟计算,还可以使spark的优化引擎以对性能产生重大影响的方式来调度流程。 Spark API:Hadoop MapReduce具有非常严格的API,因此无法提供太多的多功能性。 由于spark消除了许多底层细节,因此可以提高生产率。 同样,诸如广播变量和累加器之类的东西比DistributedCache和IMO计数器具有更多的用途。 Spark Streaming:Spark Streaming基于论文Discretized Streams
  • 为什么 spark.ml 不实现任何 spark.mllib 算法?(Why spark.ml don't implement any of spark.mllib algorithms?)
    问题 按照 Spark MLlib 指南,我们可以了解到 Spark 有两个机器学习库: spark.mllib ,建立在 RDD 之上。 spark.ml ,建立在 Dataframes 之上。 根据 StackOverflow 上的这个和这个问题,数据帧比 RDD 更好(并且更新),应该尽可能使用。 问题是我想使用常见的机器学习算法(例如:Frequent Pattern Mining、Naive Bayes 等)和spark.ml (对于数据帧)不提供这样的方法,只有spark.mllib (对于 RDDs)提供了这个算法。 如果 Dataframes 比 RDDs 更好,并且参考指南推荐使用spark.ml ,为什么没有在该库中实现常见的机器学习方法? 这里缺少什么? 回答1 火花 2.0.0 目前,Spark 正在大力转向DataFrame API,并不断弃用 RDD API。 虽然原生“ML”算法的数量在增长,但下面突出显示的要点仍然有效,并且在内部许多阶段直接使用 RDD 实现。 另请参阅:在 Spark 2.0 中将基于 RDD 的 MLlib API 切换到维护模式 火花 < 2.0.0 我想主要的缺失点是spark.ml算法通常不在 DataFrames 上运行。 因此,在实践中,拥有一个ml包装器比其他任何事情都重要。 甚至本机 ML 实现(如 ml
  • 为什么联接失败并显示“ java.util.concurrent.TimeoutException:期货在[300秒]后超时”?(Why does join fail with “java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]”?)
    问题 我正在使用Spark 1.5。 我有两个形式的数据框: scala> libriFirstTable50Plus3DF res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int] scala> linkPersonItemLessThan500DF res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int] libriFirstTable50Plus3DF有766,151条记录,而linkPersonItemLessThan500DF有26,694,353条记录。 请注意,我在linkPersonItemLessThan500DF上使用linkPersonItemLessThan500DF repartition(number) ,因为我打算稍后再加入这两个。 我在上面的代码跟进: val userTripletRankDF = linkPersonItemLessThan500DF .join(libriFirstTable50Plus3DF, Seq("family_id")) .take(20) .foreach(println(_)) 为此,我得到以下输出: 16/12/13 15:07:10
  • 如何在Spark Streaming中更新广播变量?(How can I update a broadcast variable in spark streaming?)
    问题 我相信,我有一个相对常见的火花流用例: 我有一些要根据一些参考数据过滤的对象流 最初,我认为使用Broadcast Variable可以很简单地实现这一目标: public void startSparkEngine { Broadcast<ReferenceData> refdataBroadcast = sparkContext.broadcast(getRefData()); final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> { final ReferenceData refData = refdataBroadcast.getValue(); return obj.getField().equals(refData.getField()); } filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); } 但是,尽管很少,但我的参考数据会定期更改 我的印象是,我可以在驱动程序上修改并重新广播我的变量,并将其传播给每个工作人员,但是Broadcast对象不是Serializable ,需要是final 。
  • 在Spark 1.6中加入数据帧时未发生广播(Broadcast not happening while joining dataframes in Spark 1.6)
    问题 下面是我正在运行的示例代码。 当该火花作业运行时,使用sortmergejoin而不是broadcastjoin进行了数据框联接。 def joinedDf (sqlContext: SQLContext, txnTable: DataFrame, countriesDfBroadcast: Broadcast[DataFrame]): DataFrame = { txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") } joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp") 即使我在join语句中指定了broadcast()提示,也不会发生broadcastjoin。 优化器正在对数据帧进行哈希分区,这会导致数据偏斜。 有人看到过这种行为吗? 我正在使用Spark 1.6和HiveContext作为SQLContext在yarn上运行它。 spark作业可在200个执行程序上运行。
  • 火花广播变量的大小是否有限制?(Is there any limit on size of a spark broadcast variable?)
    问题 我想知道spark中的广播变量是否有限制大小,因为我需要广播一个大对象(6GB,将来可能会更大)。 我已经在主页和该站点中进行了搜索,但没有找到任何答案。 播放大型物体时,我是否会遇到问题? 回答1 Java Integer.MAX_VALUE有一个限制。 也就是说,大约有2.3 GB。 请在Spark Jira中查看更多信息:https://issues.apache.org/jira/browse/SPARK-5739
  • 为什么 scala.beans.beanproperty 在 spark 中的工作方式不同(Why does scala.beans.beanproperty work differently in spark)
    问题 在 Scala REPL 中,以下代码 import scala.beans.BeanProperty class EmailAccount { @scala.beans.BeanProperty var accountName: String = null override def toString: String = { return s"acct ($accountName)" } } classOf[EmailAccount].getDeclaredConstructor() 结果是 res0: java.lang.reflect.Constructor[EmailAccount] = public EmailAccount() 但是在 spark 的 REPL 中我得到了 java.lang.NoSuchMethodException: EmailAccount.<init>() at java.lang.Class.getConstructor0(Class.java:2810) at java.lang.Class.getDeclaredConstructor(Class.java:2053) ... 48 elided 造成这种差异的原因是什么? 我怎样才能得到火花以匹配火花壳的行为。 我像这样启动了 REPL: /home/placey/Downloads
  • 是否可以从 Spark 流检查点恢复广播值(Is it possible to recover an broadcast value from Spark-streaming checkpoint)
    问题 我使用 hbase-spark 在我的 spark-streaming 项目中记录 pv/uv。 然后,当我杀死应用程序并重新启动它时,检查点恢复时出现以下异常: 16/03/02 10:17:21 错误 HBaseContext:无法从广播 java.lang.ClassCastException 中获取配置:[B 无法转换为 com.paitao.xmlife.contrib.hbase.HBaseContext 处的 org.apache.spark.SerializableWritable。 getConf(HBaseContext.scala:645) 在 com.paitao.xmlife.contrib.hbase.HBaseContext.com$paitao$xmlife$contrib$hbase$HBaseContext$$hbaseForeachPartition(HBaseContext.scala:627) 在 com.paitao.xmlife.contrib .hbase.HBaseContext$$anonfun$com$paitao$xmlife$contrib$hbase$HBaseContext$$bulkMutation$1.apply(HBaseContext.scala:457) 在 com.paitao.xmlife
  • SparkContext、JavaSparkContext、SQLContext 和 SparkSession 之间的区别?(Difference between SparkContext, JavaSparkContext, SQLContext, and SparkSession?)
    问题 SparkContext, JavaSparkContext, SQLContext和SparkSession之间有什么区别? 有什么方法可以使用SparkSession转换或创建 Context 吗? 我可以使用单个条目SparkSession完全替换所有上下文吗? SQLContext 、 SparkContext和JavaSparkContext所有函数是否也在SparkSession ? 一些函数(如parallelize化)在SparkContext和JavaSparkContext具有不同的行为。 它们在SparkSession表现如何? 如何使用SparkSession创建以下SparkSession ? RDD JavaRDD JavaPairRDD Dataset 是否有将JavaPairRDD转换为Dataset或将Dataset转换为JavaPairRDD ? 回答1 sparkContext是Scala实现的切入点和JavaSparkContext是一个Java包装sparkContext 。 SQLContext是SQLContext的入口点,可以从sparkContext接收。在2.xx之前,RDD、DataFrame和Data-set是三种不同的数据抽象。从Spark 2.xx开始,三种数据抽象都是统一的,
  • 在Jupyter Notebook的pyspark中添加自定义jar(Adding custom jars to pyspark in jupyter notebook)
    问题 我现在用的是Jupyter笔记本Pyspark具有以下泊坞窗图像:Jupyter全火花笔记本 现在,我想编写一个pyspark流应用程序,该应用程序使用来自Kafka的消息。 在《 Spark-Kafka集成指南》中,他们描述了如何使用spark-submit部署这样的应用程序(它需要链接一个外部jar,其解释在3.部署中)。 但是由于我使用的是Jupyter笔记本,所以我从未真正运行过spark-submit命令,因此我假设如果按执行,它将在后面运行。 在spark-submit命令中,您可以指定一些参数,其中一个是-jars ,但是我不清楚如何从笔记本(或通过环境变量在外部)设置此参数。 我假设我可以通过SparkConf或SparkContext对象动态链接此外部jar。 有没有人有过如何从笔记本电脑正确执行链接的经验? 回答1 我设法从jupyter笔记本中运行它,该笔记本从全火花容器运行。 我在jupyterhub中启动python3笔记本,并覆盖PYSPARK_SUBMIT_ARGS标志,如下所示。 Kafka使用者库是从maven存储库下载的,并放在我的主目录/ home / jovyan中: import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/spark-streaming
  • 在Flex中使用Spark over Halo有什么弊端?(What are the downsides to using Spark over Halo in Flex?)
    问题 是否需要更多工作或自定义外观(皮肤)所需的源代码文件? 相对于Halo,Spark的可维护性和可读性如何? 与Halo相比,总体上来说,自定义更高效,更容易自定义吗? 如果您是对Halo的外观感到满意(也许只是一些CSS调整)的99%的SDK用户,那么是否要改用Spark为您创建更多的工作? 我们现在是否需要聘请设计师来获得合理完整的外观和感觉? 回答1 恕我直言,Spark外观具有更多的可能性。 因此,在某些情况下,它需要做更多的工作,但因此皮肤也可以维护,这当然也取决于显影剂。 我还没有修改Halo皮肤,所以我开始使用Spark处理皮肤。 我不是皮肤专家,我只研究了一些皮肤。 困难还可以。 创建新皮肤似乎很困难,但是扩展现有皮肤非常容易。 如果您很满意(99%),但没有看到改用Spark的优势,那么就不应该这样做。 使用Spark组件时某些事情发生了变化,例如Spark Button中不存在在Button控件中使用图标的可能性。 当然,您可以编写自己的皮肤,并且有更多这样做的可能性,但这需要时间。 除了Button,我不后悔改用Spark。 回答2 在使用光晕和火花进行大量蒙皮之后,我可以说我发现spark更加灵活(无双关语)。 有了光环,我曾经花费大量时间编写Ac​​tionScript来绘制程序皮肤。 Flex 4引入了新的状态模型和FXG
  • 为什么 GHC 火花会发出嘶嘶声?(Why are GHC Sparks Fizzling?)
    问题 我有一个简单的例程,它采用Double向量的乘积。 我试图并行化这段代码,但许多火花最终都失败了。 这是一个独立的基准测试,它也作为要点提供: {-# LANGUAGE BangPatterns #-} {-# LANGUAGE MagicHash #-} {-# OPTIONS_GHC -O2 -Wall -threaded -fforce-recomp #-} import Criterion.Main import Control.Monad (when) import Control.Parallel.Strategies (runEval,rpar,rseq) import qualified Data.Vector.Primitive as PV main :: IO () main = do let expected = PV.product numbers when (not (serialProduct numbers == expected)) $ do fail "serialProduct implementation incorrect" defaultMain [ bgroup "product" [ bench "serial" $ whnf serialProduct numbers , bench "parallel" $ whnf
  • Spark 广播错误:超过 spark.akka.frameSize 考虑使用广播(Spark broadcast error: exceeds spark.akka.frameSize Consider using broadcast)
    问题 我有一个名为“边缘”的大数据 org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[(String, Int)]] = MappedRDD[27] at map at <console>:52 当我在独立模式下工作时,我能够收集、计算和保存这个文件。 现在,在集群上,我收到此错误 edges.count ... Serialized task 28:0 was 12519797 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. 与 .saveAsTextFile("edges") 相同 这是来自火花壳。 我试过使用该选项 --driver-java-options "-Dspark.akka.frameSize=15" 但是当我这样做时,它会无限期地挂起。 任何帮助,将不胜感激。 ** 编辑 ** 我的独立模式是 Spark 1.1.0,我的集群是 Spark 1.0.1。 此外,当我去计数、收集或另存为 * RDD 时,会发生挂起,但定义它或对其进行过滤器工作得很好。 回答1 “考虑对大值使用广播变量”错误消息通常表明您在函数闭包中捕获了一些大变量。
  • 火花:工作之间的长时间延迟(Spark: long delay between jobs)
    问题 因此,我们正在运行spark作业,该作业提取数据并进行一些扩展的数据转换并写入几个不同的文件。 一切运行良好,但是在资源密集型作业完成与下一个作业开始之间出现了随机的扩展延迟。 在下图中,我们可以看到原定于17:22:02进行的作业花了15分钟才能完成,这意味着我希望下一份作业定于17:37:02左右。 但是,下一份工作安排在22:05:59,这是工作成功后+4个小时。 当我深入研究下一个作业的Spark UI时,它会显示<1秒的调度程序延迟。 因此,我对这4个小时的延迟来自何处感到困惑。 (带有Hadoop 2的Spark 1.6.1) 更新: 我可以肯定的是,David的以下答案是关于如何在Spark中处理IO ops的,这是出乎意料的。 (有意义的是,该文件写入在考虑顺序和/或其他操作之前实际上在幕后“收集”了。)但是我对I / O时间不包括在作业执行时间这一事实感到不满意。 我猜您可以在Spark UI的“ SQL”选项卡中看到它,因为即使所有作业都成功了,查询仍在运行,但是您根本无法深入研究它。 我敢肯定还有更多的改进方法,但是下面两种方法对我来说已经足够了: 减少文件数将parquet.enable.summary-metadata设置为false 回答1 I / O操作通常带有大量开销,这些开销将在主节点上发生。 由于这项工作没有并行化,因此可能会花费很多时间。
  • 加入一个巨大而巨大的火花数据框(Joining a large and a ginormous spark dataframe)
    问题 我有两个数据帧,df1 有 600 万行,df2 有 10 亿行。 我已经尝试了标准的df1.join(df2,df1("id")<=>df2("id2")) ,但内存不足。 df1 太大,无法放入广播连接。 我什至尝试过布隆过滤器,但它也太大了,无法放入广播中,但仍然有用。 我尝试过的唯一不会出错的方法是将 df1 分成 300,000 个行块并在 foreach 循环中与 df2 连接。 但这比它可能需要的时间长一个数量级(可能是因为它太大而无法作为持久化,导致它重做分割到那个点)。 重新组合结果也需要一段时间。 你是如何解决这个问题的? 一些注意事项: df1 是 df2 的子集。 df1=df2.where("fin<1").selectExpr("id as id2").distinct()我对 df2 中的所有行都感兴趣,这些行的 ID 曾经有一个 fin<1,这意味着我不能一步到位。 df2 中大约有 2 亿个唯一 ID。 以下是一些相关的火花设置: spark.cores.max=1000 spark.executor.memory=15G spark.akka.frameSize=1024 spark.shuffle.consolidateFiles=false spark.task.cpus=1 spark.driver.cores=1 spark
  • SBT 测试不适用于火花测试(SBT test does not work for spark test)
    问题 我有一个简单的 spark 函数来测试 DF 窗口: import org.apache.spark.sql.{DataFrame, SparkSession} object ScratchPad { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").getOrCreate() spark.sparkContext.setLogLevel("ERROR") get_data_frame(spark).show() } def get_data_frame(spark: SparkSession): DataFrame = { import spark.sqlContext.implicits._ val hr = spark.sparkContext.parallelize(List( ("Steinbeck", "Sales", 100), ("Woolf", "IT", 99), ("Wodehouse", "Sales", 250), ("Hemingway", "IT", 349) ) ).toDF("emp", "dept", "sal") import org.apache.spark.sql.expressions
  • 提高火花应用程序的速度(Improve speed of spark app)
    问题 这是我的 python-spark 代码的一部分,它的一部分运行速度太慢,无法满足我的需要。 尤其是这部分代码,我真的很想提高它的速度,但不知道该怎么做。 目前 6000 万数据行大约需要 1 分钟,我想将其改进到 10 秒以下。 sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 我的 spark 应用程序的更多上下文: article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2) axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() speed_df = article_ids.join(axes,article_ids
  • 正确使用大型广播变量的提示?(Tips for properly using large broadcast variables?)
    问题 我正在使用一个大小约为100 MB的腌制的广播变量,与之近似: >>> data = list(range(int(10*1e6))) >>> import cPickle as pickle >>> len(pickle.dumps(data)) 98888896 在具有3个c3.2xlarge执行程序和m3.large驱动程序的群集上运行,并使用以下命令启动交互式会话: IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g 在RDD中,如果我坚持对该广播变量的引用,则内存使用量将激增。 对于100 MB变量的100个引用,即使将其复制100次,我也希望数据使用总量不超过10 GB(更不用说在3个节点上30 GB)。 但是,当我运行以下测试时,我看到内存不足的错误: data = list(range(int(10*1e6))) metadata = sc.broadcast(data) ids = sc.parallelize(zip(range(100), range(100))) joined_rdd = ids.mapValues(lambda _: metadata.value) joined_rdd.persist()