天道酬勤,学无止境

saveAsTextFile() to write the final RDD as single text file - Apache Spark

Question

I am working on a batch application using Apache Spark, i wanted to write the final RDD as text file, currently i am using saveAsTextFile("filePath") method available in RDD.

My text file contains the fields delimited with \u0001 delimiter. So in the model class toString() method i added all the fields seperated with \u0001 delimiter.

is this the correct way to handle this? or any other best approach available?

Also what if i iterate the RDD and write the file content using FileWriter class available in Java?

Please advise on this.

Regards, Shankar

Answer1

To write as a single file there are a few options. If your writing to HDFS or a similar distributed store you can first coalesce your RDD down to a single partition (note your data must then fit on a single worker), or you could collect the data to the driver and then use a filewriter.

Answer2
public static boolean copyMerge(SparkConf sparkConf, JavaRDD rdd, String dstPath) throws IOException, URISyntaxException {
    Configuration hadoopConf = sparkConf.hadoopConfiguration();
    hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKey);
    hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretKey);
    String tempFolder = "s3://bucket/folder";
    rdd.saveAsTextFile(tempFolder);
    FileSystem hdfs = FileSystem.get(new URI(tempFolder), hadoopConfig);
    return FileUtil.copyMerge(hdfs, new Path(tempFolder), hdfs, new Path(dstPath), false, hadoopConfig, null);
}

This solution is for S3 or any HDFS system. Achieved in two steps:

  1. Save the RDD by saveAsTextFile, this generates multiple files in the folder.

  2. Run Hadoop "copyMerge".

Answer3

Instead of doing collect and collecting it to driver I would rather suggest to use coalesce which would be good in reducing memory problems

Restricted HTML

  • Allowed HTML tags: <a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • Lines and paragraphs break automatically.
  • Web page addresses and email addresses turn into links automatically.

相关推荐
  • Process Spark Streaming rdd and store to single HDFS file
    Question I am using Kafka Spark Streaming to get streaming data. val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2) I am using this DStream and processing RDDs val output = lines.foreachRDD(rdd => rdd.foreachPartition { partition => partition.foreach { file => runConfigParser(file)} }) runConfigParser is a JAVA method which parses a file and produces an output which i have to save in HDFS. So multiple nodes will process RDD and write output into one single HDFS file. As i want to load this fie in HIVE. should I
  • How can I make (Spark1.6) saveAsTextFile to append existing file?
    Question In SparkSQL,I use DF.wirte.mode(SaveMode.Append).json(xxxx),but this method get these files like the filename is too complex and random,I can't use api to get.So I want to use saveAstextfile ,beacuse filename is not complex and regular, but I don't know how to append file in same diretory?Appreciate for your time. Answer1 worked on Spark 1.5 , I think this is right usage.. dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).**partitionBy**("parameter1", "parameter2").save(path); Answer2 As spark uses HDFS, this is the typical output it produces. You can use the FileUtil to
  • Why does Spark job fails to write output?
    Question Setup: I have a Spark job running on a distributed Spark Cluster with 10 nodes. I am doing some text file processing on HDFS. The job runs fine, until the last step: saving output as text files. Problem: I get the following stacktrace: 15/04/07 11:32:11 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:235, took 1.377335791 s Exception in thread "main" java.io.IOException: Failed to rename RawLocalFileStatus{path=file:/home/ds_myuser/tmp/myapp/out/_temporary/0/task_201504071132_0016_m_000003/part-00003; isDirectory=false; length=2494; replication=1; blocksize=33554432
  • What are the files generated by Spark when using “saveAsTextFile”?
    Question When I run a Spark job and save the output as a text file using method "saveAsTextFile" as specified at https://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache.spark.rdd.RDD : here are the files that are created : Is the .crc file a Cyclic Redundancy Check file ? and so is used to check that the content of each generated file IS correct ? The _SUCCESS file is always empty, what does this signify ? The files that do not have an extension in above screenshot contain the actual data from the RDD but why are many files generated instead of just one ? Answer1 Those are files
  • Writing files to local system with Spark in Cluster mode
    Question I know this is a weird way of using Spark but I'm trying to save a dataframe to the local file system (not hdfs) using Spark even though I'm in cluster mode. I know I can use client mode but I do want to run in cluster mode and don't care which node (out of 3) the application is going to run on as driver. The code below is the pseudo code of what I'm trying to do. // create dataframe val df = Seq(Foo("John", "Doe"), Foo("Jane", "Doe")).toDF() // save it to the local file system using 'file://' because it defaults to hdfs:// df.coalesce(1).rdd.saveAsTextFile(s"file://path/to/file") And
  • How to overwrite the output directory in spark
    Question I have a spark streaming application which produces a dataset for every minute. I need to save/overwrite the results of the processed data. When I tried to overwrite the dataset org.apache.hadoop.mapred.FileAlreadyExistsException stops the execution. I set the Spark property set("spark.files.overwrite","true") , but there is no luck. How to overwrite or Predelete the files from spark? Answer1 UPDATE: Suggest using Dataframes, plus something like ... .write.mode(SaveMode.Overwrite) .... Handy pimp: implicit class PimpedStringRDD(rdd: RDD[String]) { def write(p: String)(implicit ss
  • How to get multiple line json File Into Single record as a rdd
    Question rdd=sc.textFile(json or xml) rdd.collect() [u'{', u' "glossary": {', u' "title": "example glossary",', u'\t\t"GlossDiv": {', u' "title": "S",', u'\t\t\t"GlossList": {', u' "GlossEntry": {', u' "ID": "SGML",', u'\t\t\t\t\t"SortAs": "SGML",', u'\t\t\t\t\t"GlossTerm": "Standard Generalized Markup Language",', u'\t\t\t\t\t"Acronym": "SGML",', u'\t\t\t\t\t"Abbrev": "ISO 8879:1986",', u'\t\t\t\t\t"GlossDef": {', u' "para": "A meta-markup language, used to create markup languages such as DocBook.",', u'\t\t\t\t\t\t"GlossSeeAlso": ["GML", "XML"]', u' },', u'\t\t\t\t\t"GlossSee": "markup"', u'
  • How to write to CSV in Spark
    Question I'm trying to find an effective way of saving the result of my Spark Job as a csv file. I'm using Spark with Hadoop and so far all my files are saved as part-00000. Any ideas how to make my spark saving to file with a specified file name? Answer1 Since Spark uses Hadoop File System API to write data to files, this is sort of inevitable. If you do rdd.saveAsTextFile("foo") It will be saved as "foo/part-XXXXX" with one part-* file every partition in the RDD you are trying to save. The reason each partition in the RDD is written a separate file is for fault-tolerance. If the task writing
  • sparkCore
    SparkCore explain 1. Basic concepts of RDD 1.1, what is RDD RDD (Resilient Distributed Dataset) is called the flexible distributed data sets, is the most basic data abstraction Spark, which represents an immutable, partitionable, which set of elements parallel computing. Dataset: a set of data, for storing data. Distributed: RDD data is distributed storage can be used for distributed computing. Resilient: RDD data can be stored in memory or on disk. 1.2 Why is RDD generated ​ RDD is the cornerstone of Spark and the core abstraction for data processing in Spark. So why does RDD come into being
  • (null) entry in command string exception in saveAsTextFile() on Pyspark
    Question I am working in PySpark on a Jupyter notebook (Python 2.7) in windows 7. I have an RDD of type pyspark.rdd.PipelinedRDD called idSums. When attempting to execute idSums.saveAsTextFile("Output"), I receive the following error: Py4JJavaError: An error occurred while calling o834.saveAsTextFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 33.0 failed 1 times, most recent failure: Lost task 1.0 in stage 33.0 (TID 131, localhost): java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\seride\Desktop\Experiments\PySpark\Output\
  • How to write the resulting RDD to a csv file in Spark python
    Question I have a resulting RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions). This has output in this format: [(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....] What I want is to create a CSV file with one column for labels (the first part of the tuple in above output) and one for predictions(second part of tuple output). But I don't know how to write to a CSV file in Spark using Python. How can I create a CSV file with the above output? Answer1 Just map the lines of the RDD (labelsAndPredictions) into strings (the lines of the CSV) then use rdd
  • Can I write a plain text HDFS (or local) file from a Spark program, not from an RDD?
    Question I have a Spark program (in Scala) and a SparkContext. I am writing some files with RDD's saveAsTextFile. On my local machine I can use a local file path and it works with the local file system. On my cluster it works with HDFS. I also want to write other arbitrary files as the result of processing. I'm writing them as regular files on my local machine, but want them to go into HDFS on the cluster. SparkContext seems to have a few file-related methods but they all seem to be inputs not outputs. How do I do this? Answer1 Thanks to marios and kostya, but there are few steps to writing a
  • Scala Spark: Split collection into several RDD?
    Question Is there any Spark function that allows to split a collection into several RDDs according to some creteria? Such function would allow to avoid excessive itteration. For example: def main(args: Array[String]) { val logFile = "file.txt" val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt") val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt") } In this example I have to iterate 'logData`
  • Spark specifies the specific file name saved to HDFS
    One, realize the function The dataframe is saved to the specified path, usually a folder, and the specific saved file is the part-00000* file inside the folder. For scenes that need to specify the final save file, the api itself cannot be implemented. This article provides two methods to save to the specified folder and specify the final file name. Second, method 1: directly use hdfs api to modify the file name 1. Implementation ideas First of all, the DataFrame is reduced to one partition first, then converted to RDD, and then written into hdfs, because if the DataFrame is saved directly
  • Spark study notes (1)-basic and architecture, installation and configuration, Spark architecture design, Spark API, RDD, RDD operation, RDD conversion operator, RDD action operator
    Spark basics and architecture Spark basics and architecture1. Comparison of Spark and Hadoop2. Spark advantageThree, Spark technology stackFour, Spark environment deployment 1. Installation and configuration 2. TestFive, Spark architecture design 1. Operating architecture 2. Run the basic process 3. Core components Six, Spark API 1. SparkContext 2. SparkSession 3. RDD core Seven, RDD 1. RDD concept 2. Five characteristics 3. RDD and DAG 4. RDD programming process 5. Creation of RDD (1) Use collections to create RDD (2) Generate RDD by loading files (3) Method of creating PairRDD (4) Other
  • Spark Streaming : Join Dstream batches into single output Folder
    Question I am using Spark Streaming to fetch tweets from twitter by creating a StreamingContext as : val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1)) and creating twitter stream as : val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters) then saving it as text file tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/") and the problem is that the tweets are being saved as folders based on batch time but I need all the data of each batch in a same folder. Is there any workaround for it? Thanks Answer1 We can do this using
  • nullpointerexception while saving kmeansmodel
    Question I am new to Scala and implementing ML algorithms. I am trying to implement K-MeansModel from my dataset. The code is as follows: import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors // Load and parse the data val data = sc.textFile(path to my dataset) val pdata = data.map(line => line.replaceAll("\"", " ")) val parsedData = pdata.map(s => Vectors.dense(s.split(",").drop(5).take(5).map(_.toDouble))).cache() // Cluster the data into two classes using KMeans val numClusters = 3 val numIterations = 20 val clusters = KMeans.train
  • reading all files from HDFS recursively in spark java api
    Question I 'm using spark to read data of all files from HDFS in a single RDD from a directory and it's sub directories as well. I could not find any efficient method to do that. So I tried to write some customized code as shown below: public Object fetch(String source,String sink) { //reading data boolean isDir=new File(source).isDirectory(); System.out.println("isDir="+isDir); JavaRDD<String> lines; if(isDir) { lines=readFiles(new File(source).listFiles(), null); } else lines= sc.textFile(source); lines.saveAsTextFile(sink); return true; } public static JavaRDD<String> readFiles(File[] files
  • How to export a table dataframe in PySpark to csv?
    Question I am using Spark 1.3.1 (PySpark) and I have generated a table using a SQL query. I now have an object that is a DataFrame. I want to export this DataFrame object (I have called it "table") to a csv file so I can manipulate it and plot the columns. How do I export the DataFrame "table" to a csv file? Thanks! Answer1 If data frame fits in a driver memory and you want to save to local files system you can convert Spark DataFrame to local Pandas DataFrame using toPandas method and then simply use to_csv: df.toPandas().to_csv('mycsv.csv') Otherwise you can use spark-csv: Spark 1.3 df.save(
  • Hadoop books (1)—Notes on "Detailed Explanation of Big Data Technology System: Principles, Architecture and Practice"
    "Detailed Explanation of Big Data Technology System: Principles, Architecture and Practice_Dong Xicheng (Work)" flumeHow to ensure that flume will not lose data in the following situations The machine where the Agent is located suddenly crashes, and the machine resumes after restarting; The machine where the Agent is located suddenly crashes, and the machine cannot be recovered after restarting;Assuming that the company has 10 web applications, try to explain how to collect nginx logs from these machines (storage directory: /var/log/nginx), and ensure that hdfs exist without data loss. Please