天道酬勤,学无止境

Serialization Exception on spark

I meet a very strange problem on Spark about serialization. The code is as below:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
    def infer(document: RDD[Document]): RDD[DocumentParameter] = {
      val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
      docs
    }
}

where Document is defined as:

class Document(val tokens: SparseVector[Int]) extends Serializable

and DocumentParameter is:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable

object DocumentParameter extends Serializable
{
  def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics))
}

SparseVectoris a serializable class in breeze.linalg.SparseVector.

This is a simple map procedure, and all the classes are serializable, but I get this exception:

org.apache.spark.SparkException: Task not serializable

But when I remove the numOfTopics parameter, that is:

object DocumentParameter extends Serializable
{
  def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10))
}

and call it like this:

val docs = documents.map(DocumentParameter.apply)

and it seems OK.

Is type Int not serializable? But I do see that some code is written like that.

I am not sure how to fix this bug.

#UPDATED#:

Thank you @samthebest. I will add more details about it.

stack trace:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.topicmodel.PLSA.infer(PLSA.scala:13)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC.<init>(<console>:41)
    at $iwC.<init>(<console>:43)
    at <init>(<console>:45)
    at .<init>(<console>:49)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 46 more

As the stack trace gives the general information of exception, I removed it.

I run the code in the spark-shell.

// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)

Could you give me some tutorials or tips on serializable?

评论

Anonymous functions serialize their containing class. When you map {doc => DocumentParameter(doc, numOfTopics)}, the only way it can give that function access to numOfTopics is to serialize the PLSA class. And that class can't actually be serialized, because (as you can see from the stacktrace) it contains the SparkContext which isn't serializable (Bad Things would happen if individual cluster nodes had access to the context and could e.g. create new jobs from within a mapper).

In general, try to avoid storing the SparkContext in your classes (edit: or at least, make sure it's very clear what kind of classes contain the SparkContext and what kind don't); it's better to pass it as a (possibly implicit) parameter to individual methods that need it. Alternatively, move the function {doc => DocumentParameter(doc, numOfTopics)} into a different class from PLSA, one that really can be serialized.

(As multiple people have suggested, it's possible to keep the SparkContext in the class but marked as @transient so that it won't be serialized. I don't recommend this approach; it means the class will "magically" change state when serialized (losing the SparkContext), and so you might end up with NPEs when you try to access the SparkContext from inside a serialized job. It's better to maintain a clear distinction between classes that are only used in the "control" code (and might use the SparkContext) and classes that are serialized to run on the cluster (which must not have the SparkContext)).

This is indeed a weird one, but I think I can guess the problem. But first, you have not provided the bare minimum to solve the problem (I can guess, because I've seen 100s of these before). Here are some problems with your question:

def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = {
  val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
}

This method doesn't return RDD[DocumentParameter] it returns Unit. You must have copied and pasted code incorrectly.

Secondly you haven't provided the entire stack trace? Why? There is no reason NOT to provide the full stack trace, and the full stack trace with message is necessary to understand the error - one needs the whole error to understand what the error is. Usually a not serializable exception tells you what is not serializable.

Thirdly you haven't told us where method infer is, are you doing this in a shell? What is the containing object/class/trait etc of infer?

Anyway, I'm going guess that by passing in the Int your causing a chain of things to get serialized that you don't expect, I can't give you any more information than that until you provide the bare minimum code so we can fully understand your problem.

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • 将Spark Dataframe保存到Elasticsearch-无法处理类型异常(Save Spark Dataframe into Elasticsearch - Can’t handle type exception)
    问题 我设计了一个简单的工作,可以从MySQL读取数据并将其保存在带有Spark的Elasticsearch中。 这是代码: JavaSparkContext sc = new JavaSparkContext( new SparkConf().setAppName("MySQLtoEs") .set("es.index.auto.create", "true") .set("es.nodes", "127.0.0.1:9200") .set("es.mapping.id", "id") .set("spark.serializer", KryoSerializer.class.getName())); SQLContext sqlContext = new SQLContext(sc); // Data source options Map<String, String> options = new HashMap<>(); options.put("driver", MYSQL_DRIVER); options.put("url", MYSQL_CONNECTION_URL); options.put("dbtable", "OFFERS"); options.put("partitionColumn", "id"); options.put("lowerBound",
  • Understanding Spark's closures and their serialization
    Disclaimer: just starting to play with Spark. I'm having troubles understanding the famous "Task not serializable" exception but my question is a little different from those I see on SO (or so I think). I have a tiny custom RDD (TestRDD). It has a field which stores objects whose class does not implement Serializable (NonSerializable). I've set the "spark.serializer" config option to use Kryo. However, when I try count() on my RDD, I get the following: Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable Serialization stack: - object not serializable (class: com
  • spark kafka producer serializable
    I come up with the exception: ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:889)
  • Spark:写入Avro文件(Spark: Writing to Avro file)
    问题 我在Spark中,我有一个Avro文件中的RDD。 现在,我想对该RDD进行一些转换并将其另存为Avro文件: val job = new Job(new Configuration()) AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema)) rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2)) .saveAsNewAPIHadoopFile(outputPath, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], job.getConfiguration) 运行此Spark时会抱怨Schema $ recordSchema无法序列化。 如果我取消注释.map调用(并且只有rdd.saveAsNewAPIHadoopFile),则调用成功。 我在这里做错了什么? 任何想法? 回答1 这里的问题与Job中使用的avro.Schema类的不可序列化有关。 当您尝试从map函数内部的代码引用架构对象时,将引发异常。 例如
  • How to fix “java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord” in Spark Streaming Kafka Consumer?
    Spark 2.0.0 Apache Kafka 0.10.1.0 scala 2.11.8 When I use spark streaming and kafka integration with kafka broker version 0.10.1.0 with the following Scala code it fails with the following exception: 16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord Serialization stack: - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key
  • SparkStreaming消费Kafka项目实战(JAVA版)
    原文连接 http://notes.itennishy.com/article/43 一 项目需求 1、统计用户访问直播的uv数、pv数? 2、统计用户跳转直播间的统计排名,即我的粉丝从哪个直播间过来到哪个直播间去? 3、统计评论数和评论人数? 4、统计引导进店数和引导进店人数? 5、直播数据查询等。 二 整体方案设计图 三 开发过程中关键点总结 3.1 过程优化 1、采用队列方式,通过队列进出的方式来监控主播和粉丝的流向,最后将结果存入PostgreSQL中,提供查询。 //监控的主播ID private static HashSet<String> MonLiveId = new HashSet<>(); //正在直播的观看用户ID private static HashSet<String> AccessingUser = new HashSet<>(); //已经访问过正在直播的观看用户ID,还未离开直播间 private static HashSet<String> AccessedUser = new HashSet<>(); //已经访问过正在直播的观看用户ID,已离开直播间 private static HashSet<String> DepartureUser = new HashSet<>(); //历史访问直播间ID private static List
  • 了解Spark序列化(Understanding Spark serialization)
    问题 在Spark中,如何知道哪些对象在驱动程序上实例化,哪些对象在执行程序上实例化,因此如何确定需要实现Serializable的类? 回答1 序列化对象意味着将其状态转换为字节流,以便可以将字节流还原回该对象的副本。 如果Java对象的类或其任何超类实现java.io.Serializable接口或其子接口java.io.Externalizable,则该Java对象是可序列化的。 类从不序列化,仅类的对象被序列化。 如果需要持久化对象或通过网络传输对象,则需要对象序列化。 Class Component Serialization instance variable yes Static instance variable no methods no Static methods no Static inner class no local variables no 让我们以示例Spark代码为例,并经历各种场景 public class SparkSample { public int instanceVariable =10 ; public static int staticInstanceVariable =20 ; public int run(){ int localVariable =30; // create Spark conf final
  • java.lang.ClassCastException在远程服务器上的Spark作业中使用Lambda表达式(java.lang.ClassCastException using lambda expressions in spark job on remote server)
    问题 我正在尝试使用sparkjava.com框架为我的Apache Spark作业构建Web API。 我的代码是: @Override public void init() { get("/hello", (req, res) -> { String sourcePath = "hdfs://spark:54310/input/*"; SparkConf conf = new SparkConf().setAppName("LineCount"); conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" }); File configFile = new File("config.properties"); String sparkURI = "spark://hamrah:7077"; conf.setMaster(sparkURI); conf.set("spark.driver.allowMultipleContexts", "true"); JavaSparkContext sc = new JavaSparkContext(conf); @SuppressWarnings("resource") JavaRDD<String> log = sc.textFile(sourcePath)
  • Spark non-serializable exception when parsing JSON with json4s
    I've run into an issue with attempting to parse json in my spark job. I'm using spark 1.1.0, json4s, and the Cassandra Spark Connector. The exception thrown is: java.io.NotSerializableException: org.json4s.DefaultFormats Examining the DefaultFormats companion object, and with this stack question, it is clear that DefaultFormats cannot be serialized. The question is now what to do. I can see this ticket has apparently addressed this issue in the spark code base, by adding the keyword transient, yet I am not sure exactly how or where to apply it to my case. Is the solution to only instantiate
  • Why does Spark application fail with “Exception in thread ”main“ java.lang.NoClassDefFoundError: …StringDeserializer”?
    I am developing a Spark application that listens to a Kafka stream using Spark and Java. I use kafka_2.10-0.10.2.1. I have set various parameters for Kafka properties: bootstrap.servers, key.deserializer, value.deserializer, etc. My application compiles fine, but when I submit it, it fails with the following error: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer I do use StringDeserializer for key.deserializer and value.deserializer so it's indeed related to how I wrote my application. Various maven dependencies used in pom
  • 在Spark(Scala)中要求Kryo序列化(Require kryo serialization in Spark (Scala))
    问题 我用以下方法打开了kryo序列化: conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" ) 我想确保在节点之间随机播放时使用kryo对自定义类进行序列化。 我可以通过以下方式向kryo注册课程: conf.registerKryoClasses(Array(classOf[Foo])) 据我了解,这实际上并不能保证使用kyro序列化。 如果没有序列化器,kryo将退回到Java序列化。 为了确保进行kryo序列化,我遵循了Spark文档中的以下建议: conf.set("spark.kryo.registrationRequired", "true") 但这会导致抛出IllegalArugmentException(“类未注册”),这是我假设Spark在内部使用的许多不同类的示例,例如: org.apache.spark.util.collection.CompactBuffer scala.Tuple3 当然,我不必用kryo手动注册每个单独的类吗? 这些序列化器都是在kryo中定义的,因此有没有一种方法可以自动注册所有这些序列化器? 回答1 据我了解,这实际上并不能保证使用kyro序列化。 如果没有序列化器,kryo将退回到Java序列化。 否。如果将spark
  • 序列化RDD(Serializing RDD)
    问题 我有一个RDD,我正在尝试对其进行序列化,然后通过反序列化进行重构。 我试图查看在Apache Spark中是否可行。 static JavaSparkContext sc = new JavaSparkContext(conf); static SerializerInstance si = SparkEnv.get().closureSerializer().newInstance(); static ClassTag<JavaRDD<String>> tag = scala.reflect.ClassTag$.MODULE$.apply(JavaRDD.class); .. .. JavaRDD<String> rdd = sc.textFile(logFile, 4); System.out.println("Element 1 " + rdd.first()); ByteBuffer bb= si.serialize(rdd, tag); JavaRDD<String> rdd2 = si.deserialize(bb, Thread.currentThread().getContextClassLoader(),tag); System.out.println(rdd2.partitions().size()); System.out.println(
  • Spark经典面试题
    1.什么是spark? spark是基于内存计算的通用大数据并行计算框架,是一个快速、通用可扩展的大数据分析引擎。它给出了大一统的软件开发栈,适用于不同场合的分布式场景,如批处理、迭代算法、交互式查询、流处理、机器学习和图计算。 2.spark生态有哪些? SparkCore:spark的核心计算 主要Rdd SparkSQL:提供了类sql方式操作结构化半结构化数据。对历史数据进行交互式查询。(即席查询:用户根据自己的需求,自定义查询) SparkStreaming:提供了近乎实时的流式数据处理,与storm相比有更高的吞吐量。(实时计算 目前实时计算框架有哪些? storm、sparkstreaming、flink) SparkMl:提供了常见的机器学习算法库,包括分类、回归、聚类、协同工过滤(个性推荐:用户画像)等,还提供模型评估、数据处理等额外功能,使得机器学习能够更加方便的在分布式大数据环境下,快速的对数据进行处理形成模型后提供在线服务。 Graphx:用来操作图的程序库,可以进行并行的图计算。支持各种常见的图算法,包括page rank、Triangle Counting等。 3.spark的提交流程? 4.spark的提交方式?有什么区别? client模式: cluster模式: client模式与cluster模式的区别:
  • Spark 性能优化和故障处理
    Spark 性能优化和故障处理 文章目录 Spark 性能优化和故障处理一、Spark 性能优化1.1 常规性能优化生产环境 Spark submit 脚本RDD 优化并行调节广播大变量Kryo 序列化调节本地化等待时间 1.2 算子调优mapPatitionsforeachPartition 优化数据库操作filter 与 coalsce 的配合使用repartition 解决 SparkSQL 低并行度问题reduceByKey 预聚合 1.3 JVM 调优降低 cache 操作的内存占比调节 Executor 堆外内存调节连接等待时长 二、Spark 数据倾斜解决方案2.1 Shuffle 调优调节 map 端缓冲区大小调节 reduce 端拉取数据缓冲区大小调节 reduce 端拉取数据重试次数调节 reduce 端拉取数据等待间隔调节 SortShuffle 排序操作阈值 2.2 聚合原数据避免 shuffle 过程改变 Key 的粒度过滤导致倾斜的 key提高 shuffle 操作中的 reduce 并行度使用随机 key 实现双重聚合将 reduce join 转换为 map joinsample 采样对倾斜 key 单独进行 join使用随机数以及扩容进行 join 三、Spark TroubleShooting控制 reduce 端缓冲大小以避免 OOMJVM
  • Require kryo serialization in Spark (Scala)
    I have kryo serialization turned on with this: conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" ) I want to ensure that a custom class is serialized using kryo when shuffled between nodes. I can register the class with kryo this way: conf.registerKryoClasses(Array(classOf[Foo])) As I understand it, this does not actually guarantee that kyro serialization is used; if a serializer is not available, kryo will fall back to Java serialization. To guarantee that kryo serialization happens, I followed this recommendation from the Spark documentation: conf.set("spark.kryo
  • Spark Streaming优化建议
    文章目录 1.缓存操作2.Checkpoint机制3.DriverHA4.代码实战4.1Driver代码实现4.2DriverHA的配置 5.SparkStreaming程序的部署、升级与维护6.调优建议6.1设置合理的CPU6.2接受数据的调优6.3设置合理的并行度6.4序列化调优说明6.5batchInterval6.6内存调优 1.缓存操作 SparkStreaming的缓存就是DStream的缓存,DStream的缓存就只有一个方面,DStream对应的RDD的缓存,说白了就是RDD的缓存,只要使用rdd.persist()算子指定持久化策略,大多算子默认情况下,持久化策略为MEMORY_AND_DISK_SER_2。 2.Checkpoint机制 每一个Spark Streaming应用,正常来说,都是要7*24小时运转的,这就是实时计算程序的特点。因为要持续不断的对数据进行计算。因此,对实时计算应用的要求,应该是必须要能够对与应用程序逻辑无关的失败,进行容错。 如果要实现这个目标,Spark Streaming程序就必须将足够的信息checkpoint到容错的存储系统上,从而让它能够从失败中进行恢复。有两种数据需要被进行checkpoint: 元数据checkpoint 将定义了流式计算逻辑的信息,保存到容错的存储系统上,比如HDFS。当运行Spark
  • 玩转Spark Sql优化之缓存级别设置(一)
    01PART前言 在离线任务当中,我们经常需要调整任务中所涉及到的一些参数来使任务到达最优的效果,本文就介绍如选择Spark当中的缓存级别。 在Spark当中堆内存的计算使用被划分两块,分别是Storage内存和Shuffle内存,我们此次所调试的就是Stroage内存。02PART环境准备 此次场景演示选用在线教育场景,准备三张表分别是售课基础表、购物车表、支付表。针对三张表可以划分为大小表即小表售课表,大表购物车表和支付表,那么三表进行join就有了小表join大表和大表join大表的场景,针对两种场景就可以进行相应的优化调试对比了。三张表数据量分别为 课程表3MB,购物车表4.3G,支付表2.3G。 03PART代码演示RDD Cache 查询支付表,先使用RDD的默认缓存级别进行测试,RDD默认缓存级别为MEMORY_ONLY,可能会出现内存不够,无法全部缓存的情况import org.apache.spark.SparkConfimport org.apache.spark.sql.{Row, SparkSession}object MemoryTuning { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("test") val
  • 收到TimeoutException的可能原因是什么:使用Spark时[n秒]之后,期货超时[重复](What are possible reasons for receiving TimeoutException: Futures timed out after [n seconds] when working with Spark [duplicate])
    问题 这个问题已经在这里有了答案: 为什么联接失败并显示“ java.util.concurrent.TimeoutException:期货在[300秒]后超时”? (4个答案) 2年前关闭。 我正在开发Spark SQL程序,并且收到以下异常: 16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds] java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent
  • PySpark serialization EOFError
    I am reading in a CSV as a Spark DataFrame and performing machine learning operations upon it. I keep getting a Python serialization EOFError - any idea why? I thought it might be a memory issue - i.e. file exceeding available RAM - but drastically reducing the size of the DataFrame didn't prevent the EOF error. Toy code and error below. #set spark context conf = SparkConf().setMaster("local").setAppName("MyApp") sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) #read in 500mb csv as DataFrame df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema=
  • Not serialazable exception while running Linear regression scala 2.12
    While running the following spark mllib on local mode with scala 2.12.3 , encountered the following error lambda not serialazable Any inputs would be much appreciated ? (Moving onto scala 2.11 is not an option for me) Can you please let me know what can i do to avoid this issue? Thankyou import java.io.FileWriter import org.apache.spark.SparkConf import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.regression.LinearRegression