天道酬勤,学无止境

Spark中数据的高效分组(Efficient grouping of data in Spark)

问题

我需要在 Spark (Scala) 中执行简单的数据分组。 特别是,这是我的初始数据:

1, a, X
1, b, Y
2, a, Y
1, a, Y

val seqs = Seq((1, "a", "X"),(1, "b", "Y"),(2, "a", "Y"),(1, "a", "Y"))

我需要按如下方式按第一个键对其进行分组:

1, (a, X), (b, Y), (a, Y)
2, (a, Y)

我最初的 idia 是使用DataFramegroupBy ,但我读到这个操作非常昂贵,需要对所有数据进行完全重新洗牌。

那么,执行分组的成本较低的选项是什么? 一个具体的例子将不胜感激。

回答1

你可能会做这样的事情:

  val rdd = sc.parallelize(List((1, "a", "X"),(1, "b", "Y"),(2, "a", "Y"),(1, "a", "Y")))
  val mapping = rdd.map(x=>(x._1,List((x._2,x._3))))
  val result = mapping.reduceByKey((x,y) => (x ++ y)) 

这使用了reduceByKey,但是所有reduce 过程的问题,您必须以每组1 个键值对结束。 因此,在这种情况下,您需要将每个值显式转换为列表,以便 reduce 过程可以合并它们。

您还可以考虑查看 combineByKey,它使用内部减少过程

======编辑======

正如 zero323 所指出的,减少这里不会提高效率,相反 - 过程将失去 groupByKey 的优化。

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • spark中的transformation算子详解(需要进行shuffle的算子)(一)
    shuffle算子 1、groupByKey (针对于对偶元组类型的数据进行按key分组操作) (1)经groupByKey处理后的数据得到的结果泛型可以是:(String,Iterator(Int)); (2)调用groupByKey后生成一个shuffledRDD; (3)key相同的进入同一节点的同一分区中; (4)如何保证key相同的进入同一台机器的同一分区中?一个分区中只有一个key吗? ①溢写磁盘,标识key,拉取属于自己的key到同一分区;下游到上游拉取属于自己的key到同一分区的同一组内; ②一个分区中可能有0个,可能有1个,可能有多个key,取决于分区器(默认使用hashPartitioner分区器)。 (5)shuffle之前一定要获取输入切片,知道下游的分区数量; (6)hashPartitioner的底层实现: class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match {
  • 如何在 Spark 中找到分组数据的确切中位数(How to find exact median for grouped data in Spark)
    问题 我需要使用 Scala 计算 Spark 中 Double 数据类型分组数据集的精确中位数。 它与类似查询不同:在 spark SQL 中为多个双数据类型列查找中值。 这个问题是关于分组数据的查找数据,而另一个是关于在 RDD 级别上查找中位数。 这是我的示例数据 scala> sqlContext.sql("select * from test").show() +---+---+ | id|num| +---+---+ | A|0.0| | A|1.0| | A|1.0| | A|1.0| | A|0.0| | A|1.0| | B|0.0| | B|1.0| | B|1.0| +---+---+ 预期答案: +--------+ | Median | +--------+ | 1 | | 1 | +--------+ 我尝试了以下选项,但没有运气: 1) Hive 函数百分位,仅适用于 BigInt。 2) Hive 函数percentile_approx ,但它没有按预期工作(返回 0.25 vs 1)。 scala> sqlContext.sql("select percentile_approx(num, 0.5) from test group by id").show() +----+ | _c0| +----+ |0.25| |0.25| +----+
  • 使用 Apache Spark 实现不同的高效计数(Efficient Count Distinct with Apache Spark)
    问题 1 亿客户在几个网站(比如 100 个网站)的页面上点击了 1000 亿次。 您可以在大型数据集中使用点击流。 使用 Apache Spark 的抽象,计算每个网站不同访问者的最有效方法是什么? 回答1 visitors.distinct().count()将是显而易见的方式,第一种方式在 distinct 中,您可以指定并行度,也可以看到速度的提高。 如果可以将访问者设置为流并使用 D 流,则可以实时进行计数。 您可以直接从目录中流式传输,并使用与 RDD 相同的方法,例如: val file = ssc.textFileStream("...") file.distinct().count() 最后一个选项是使用def countApproxDistinct(relativeSD: Double = 0.05): Long但这被标记为实验性的,但如果relativeSD def countApproxDistinct(relativeSD: Double = 0.05): Long (标准偏差)更高,它会比计数快得多。 编辑:由于您想要每个网站的计数,您可以减少网站 id,这可以有效地完成(使用组合器),因为计数是聚合的。 如果您有网站名称用户 ID 元组的 RDD,您可以这样做。 visitors.countDistinctByKey()或visitors
  • 计算Spark DataFrame中分组数据的标准偏差(Calculate the standard deviation of grouped data in a Spark DataFrame)
    问题 我有从csv获取并转换为DataFrame的用户日志,以便利用SparkSQL查询功能。 一个用户每小时会创建多个条目,我想为每个用户收集一些基本统计信息; 实际上,它只是用户实例的数量,平均值以及众多列的标准偏差。 通过使用groupBy($“ user”)和带有SparkSQL函数的聚合器来计数和平均,我能够快速获取均值和计数信息: val meanData = selectedData.groupBy($"user").agg(count($"logOn"), avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"), avg($"repliesPerHour"), avg($"duration")) 但是,我似乎找不到一种同样优雅的方法来计算标准偏差。 到目前为止,我只能通过映射字符串,双对并使用StatCounter()。stdev实用工具来计算它: val stdevduration = duration.groupByKey().mapValues(value => org.apache.spark.util.StatCounter(value).stdev) 但是,这将返回一个RDD,我想尝试将其全部保留在DataFrame中,以便对返回的数据进行进一步的查询。
  • 计算spark Dataframe中分组数据的分位数(Calculate quantile on grouped data in spark Dataframe)
    问题 我有以下 Spark 数据框: agent_id|payment_amount| +--------+--------------+ | a| 1000| | b| 1100| | a| 1100| | a| 1200| | b| 1200| | b| 1250| | a| 10000| | b| 9000| +--------+--------------+ 我的愿望输出会是这样的 agen_id 95_quantile a whatever is 95 quantile for agent a payments b whatever is 95 quantile for agent b payments 对于每组 agent_id 我需要计算 0.95 分位数,我采用以下方法: test_df.groupby('agent_id').approxQuantile('payment_amount',0.95) 但我犯了以下错误: 'GroupedData' object has no attribute 'approxQuantile' 我需要在新列中有 0.95 分位数(百分位数),以便以后可以用于过滤目的 我正在使用 Spark 2.0.0 回答1 一种解决方案是使用percentile_approx : >>> test_df.registerTempTable(
  • Apache Hivemall:可运行在Apache Hive, Spark 和 Pig 上的可扩展
    Apache Hivemall:可运行在Apache Hive, Spark 和 Pig 上的可扩展机器学习库 iteblog 过往记忆大数据   Apache Hivemall是机器学习算法(machine learning algorithms)和多功能数据分析函数(versatile data analytics functions)的集合,它通过Apache Hive UDF / UDAF / UDTF接口提供了一些易于使用的机器学习算法。Hivemall 最初由Treasure Data 开发的,并于2016年9月捐献给 Apache 软件基金会,进入了Apache 孵化器。   Apache Hivemall提供了各种功能包括:回归(regression)、分类(classification)、推荐(recommendation)异常检测(anomaly detection)、k-最近邻(k-nearest neighbor)以及特征工程(feature engineering)。同时它还支持最先进的机器学习算法,如软信度加权(Soft Confidence Weighted)、权重向量的自适应正则化(Adaptive Regularization of Weight Vectors)、因式分解机(Factorization Machines)和AdaDelta。
  • 从行存储到 RCFile,Facebook 为什么要设计出 RCFile?
    2010年,Facebook 的工程师在 ICDC(IEEE International Conference on Data Engineering) 发表了一篇 《RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems》 的论文,介绍了其为基于 MapReduce 的数据仓库设计的高效存储结构,这就是我们熟知的 RCFile(Record Columnar File)。下面介绍 RCFile 的一些诞生背景和设计。背景早在2010年以前,Facebook 的数据仓库每天有超过 20TB 的数据被推入数据仓库(到了 2014 年 Facebook 的仓库存储超过 300PB 的 Hive 数据,而且每天的新增数据在 600TB 左右,那时候已经开始用 ORCFile 了)。按照这么大的增长速度,存储效率是 Facebook 仓库基础设施的重要考虑因素。为了提高仓库的存储效率,Facebook 在很多方面进行了创新,比如构建冷存储数据中心,在 HDFS 中采用 RAID 等技术来降低复制比率(同时保持高可用性),以及在数据写入 HDFS 之前使用压缩进行数据压缩。但这仍然不能满足他们的需求,所以在这种背景下,RCFile
  • 从行存储到 RCFile,Facebook 为什么要设计出 RCFile?
    从行存储到 RCFile,Facebook 为什么要设计出 RCFile? 过往记忆大数据 过往记忆大数据 2010年,Facebook 的工程师在 ICDC(IEEE International Conference on Data Engineering) 发表了一篇 《RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems》 的论文,介绍了其为基于 MapReduce 的数据仓库设计的高效存储结构,这就是我们熟知的 RCFile(Record Columnar File)。下面介绍 RCFile 的一些诞生背景和设计。 背景 早在2010年以前,Facebook 的数据仓库每天有超过 20TB 的数据被推入数据仓库(到了 2014 年 Facebook 的仓库存储超过 300PB 的 Hive 数据,而且每天的新增数据在 600TB 左右,那时候已经开始用 ORCFile 了)。按照这么大的增长速度,存储效率是 Facebook 仓库基础设施的重要考虑因素。 为了提高仓库的存储效率,Facebook 在很多方面进行了创新,比如构建冷存储数据中心,在 HDFS 中采用 RAID 等技术来降低复制比率(同时保持高可用性),以及在数据写入
  • 从行存储到 RCFile,Facebook 为什么要设计出 RCFile?
    从行存储到 RCFile,Facebook 为什么要设计出 RCFile? 过往记忆大数据 过往记忆大数据 2010年,Facebook 的工程师在 ICDC(IEEE International Conference on Data Engineering) 发表了一篇 《RCFile: A Fast and Space-efficient Data Placement Structure in MapReduce-based Warehouse Systems》 的论文,介绍了其为基于 MapReduce 的数据仓库设计的高效存储结构,这就是我们熟知的 RCFile(Record Columnar File)。下面介绍 RCFile 的一些诞生背景和设计。 背景 早在2010年以前,Facebook 的数据仓库每天有超过 20TB 的数据被推入数据仓库(到了 2014 年 Facebook 的仓库存储超过 300PB 的 Hive 数据,而且每天的新增数据在 600TB 左右,那时候已经开始用 ORCFile 了)。按照这么大的增长速度,存储效率是 Facebook 仓库基础设施的重要考虑因素。 为了提高仓库的存储效率,Facebook 在很多方面进行了创新,比如构建冷存储数据中心,在 HDFS 中采用 RAID 等技术来降低复制比率(同时保持高可用性),以及在数据写入
  • 将Spark数据框转换为Pandas / R数据框的要求(Requirements for converting Spark dataframe to Pandas/R dataframe)
    问题 我在Hadoop的YARN上运行Spark。 这种转换如何运作? 是否在转换之前进行一次collect()? 我还需要在每个从属节点上安装Python和R才能使转换正常进行吗? 我正在努力寻找有关此问题的文档。 回答1 toPandas (PySpark)/ as.data.frame (SparkR) 在创建本地数据框之前,必须先收集数据。 例如toPandas方法如下所示: def toPandas(self): import pandas as pd return pd.DataFrame.from_records(self.collect(), columns=self.columns) 您需要在每个节点上都安装有Python(最好是具有所有依赖项)。 与之对应的SparkR( as.data.frame )只是collect的别名。 总结一下,在这两种情况下,数据均collected到驱动程序节点并转换为本地数据结构(分别为Python和R中的pandas.DataFrame和base::data.frame )。 向量化的用户定义函数 从Spark 2.3.0开始, PySpark还提供了一组pandas_udf( SCALAR , GROUPED_MAP , GROUPED_AGG ),它们对由定义的数据块并行运行
  • Pyspark:显示数据框列的直方图(Pyspark: show histogram of a data frame column)
    问题 在熊猫数据框中,我使用以下代码绘制列的直方图: my_df.hist(column = 'field_1') 在 pyspark 数据框中是否有可以实现相同目标的东西? (我在 Jupyter Notebook)谢谢! 回答1 不幸的是,我认为 PySpark Dataframes API 中没有干净的plot()或hist()函数,但我希望事情最终会朝着这个方向发展。 目前,您可以在 Spark 中计算直方图,并将计算出的直方图绘制为条形图。 例子: import pandas as pd import pyspark.sql as sparksql # Let's use UCLA's college admission dataset file_name = "https://stats.idre.ucla.edu/stat/data/binary.csv" # Creating a pandas dataframe from Sample Data df_pd = pd.read_csv(file_name) sql_context = sparksql.SQLcontext(sc) # Creating a Spark DataFrame from a pandas dataframe df_spark = sql_context.createDataFrame
  • 在Spark的RDD中更新价值的有效方法是什么?(What is the efficient way to update value inside Spark's RDD?)
    问题 我正在使用Spark在Scala编写一个与图形相关的程序。 数据集有400万个节点和400万个边(您可以将其视为一棵树),但是每次(一个Iteration ),我只编辑它的一部分,即以给定节点为根的子树,以及给定节点和根之间的路径中的节点。 Iteration具有依赖性,这意味着i+1 Iteration需要来自i的结果。 因此,我需要为下一步存储每个Iteration的结果。 我正在尝试找到一种更新RDD的有效方法,但是到目前为止还没有任何线索。我发现PairRDD具有lookup功能,可以将计算时间从O(N)减少到O( M ), N表示总数RDD和M的对象数表示每个分区中的元素数。 所以我在想是否可以使用O(M)更新RDD的对象? 或更理想的情况是O(1)?(我在Spark的邮件列表中看到一封电子邮件,说可以修改lookup以实现O(1)) 另一件事是,如果我可以实现O(M)来更新RDD ,是否可以将分区增加到比我拥有的内核数大一些的数量,并获得更好的性能? 回答1 RDD是分布式数据集,分区是RDD存储的单元,要处理的单元和RDD是元素。 例如,您从HDFS中将一个大文件作为RDD读取,则该RDD的元素为String (该文件中的行),然后spark将该RDD按分区存储在整个群集中。 对于您来说,作为Spark用户,您只需要关心如何处理这些文件的行
  • Spark SQL 中按日期分组的聚合(Aggregation with Group By date in Spark SQL)
    问题 我有一个 RDD 包含一个名为time类型的时间戳: root |-- id: string (nullable = true) |-- value1: string (nullable = true) |-- value2: string (nullable = true) |-- time: long (nullable = true) |-- type: string (nullable = true) 我正在尝试按 YYYY-MM-DD 按 value1、value2 和时间分组。 我尝试按演员(时间为日期)分组,但随后出现以下错误: Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang
  • Spark结构化API—DataFrame,SQL和Dataset
    一、结构化API概述 1. 结构化API是处理各种数据类型的工具,可处理非结构化的日志文件、半结构化的CSV文件以及高度结构化的Parquet文件。结构化API指以下三种核心分布式集合类型的API:Dataset类型、DataFrame类型、SQL表和视图。 大多数结构化API均适用于批处理和流处理,这意味着使用结构化API编写代码时,几乎不用改动代码就可以从批处理程序转换为流处理程序(反之亦然)。 DataFrame和Dataset是具有行和列的类似于(分布式)数据表的集合类型。所有列的行数相同(可以使用null来指定缺省值),并且某一列的类型必须在所有行中保持一致。Spark中的DataFrame和Dataset代表不可变的数据集合,可以通过它指定对特定位置数据的操作,该操作将以惰性评估方式执行。当对DataFrame执行action操作时,将触发Spark执行具体transformation操作并返回结果。表和视图与DataFrame基本相同,所以通常在DataFrame上执行SQL操作,而不是用DataFrame专用的Scala代码。 Schema定义了DataFrame的列名和类型,可以手动定义或者从数据源读取模式(通常定义为模式读取)。Schema数据模式需要指定数据类型,这意味着用户需要指定在什么地方放置什么类型的数据。 2. Spark实际上有它自己的编程语¨
  • 按键分组时,Spark的内存不足(Spark runs out of memory when grouping by key)
    问题 我正在尝试使用此指南在EC2上使用Spark主机执行常见爬网数据的简单转换,我的代码如下所示: package ccminer import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object ccminer { val english = "english|en|eng" val spanish = "es|esp|spa|spanish|espanol" val turkish = "turkish|tr|tur|turc" val greek = "greek|el|ell" val italian = "italian|it|ita|italien" val all = (english :: spanish :: turkish :: greek :: italian :: Nil).mkString("|") def langIndep(s: String) = s.toLowerCase().replaceAll(all, "*") def main(args: Array[String]): Unit = { if (args.length != 3) { System.err.println("Bad command line") System.exit(-1
  • Spark Scala - 如何对数据帧行进行分组并将复杂函数应用于组?(Spark Scala - How to group dataframe rows and apply complex function to the groups?)
    问题 我正在尝试解决这个超级简单的问题,但我已经厌倦了,我希望有人能帮我解决这个问题。 我有一个形状像这样的数据框: --------------------------- | Category | Product_ID | |------------+------------+ | a | product 1 | | a | product 2 | | a | product 3 | | a | product 1 | | a | product 4 | | b | product 5 | | b | product 6 | --------------------------- 我如何按类别对这些行进行分组并在 Scala 中应用复杂的函数? 也许是这样的: val result = df.groupBy("Category").apply(myComplexFunction) 这个 myComplexFunction 应该为每个类别生成下表,并将成对相似性上传到 Hive 表中或将其保存到 HDFS 中: +--------------------------------------------------+ | | Product_1 | Product_2 | Product_3 | +------------+------------+-----------------
  • hadoop、spark各大数据组件介绍
    hadoop相关组件 hadoop体系结构,如图: hadoop核心设计,如图 Hadoop Common Hadoop体系最底层的一个模块,为Hadoop各子项目提供各种工具,如:配置文件和日志操作等。 HDFS 是Hadoop应用程序中主要的分布式存储系统, HDFS集群包含了一个NameNode(主节点),这个节点负责管理所有文件系统的元数据及存储了真实数据的DataNode(数据节点,可以有很多)。HDFS针对海量数据所设计,所以相比传统文件系统在大批量小文件上的优化,HDFS优化的则是对小批量大型文件的访问和存储。相比于Hadoop1.0,Hadoop 2.0中的HDFS增加了两个重大特性,HA和Federaion。HA即为High Availability(高可用),用于解决NameNode单点故障问题,该特性通过热备的方式为主NameNode提供一个备用者,一旦主NameNode出现故障,可以迅速切换至备NameNode,从而实现不间断对外提供服务。Federation即为“联邦”,该特性允许一个HDFS集群中存在多个NameNode同时对外提供服务,这些NameNode分管一部分目录(水平切分),彼此之间相互隔离,但共享底层的DataNode存储资源。 文件系统。可以创建、删除、移动或重命名文件,等等。但是 HDFS 的架构是基于一组特定的节点构建的,存储在
  • 使用 pyspark 分组、排名和聚合火花数据框(Group By, Rank and aggregate spark data frame using pyspark)
    问题 我有一个看起来像的数据框: A B C --------------- A1 B1 0.8 A1 B2 0.55 A1 B3 0.43 A2 B1 0.7 A2 B2 0.5 A2 B3 0.5 A3 B1 0.2 A3 B2 0.3 A3 B3 0.4 如何将“C”列转换为每列 A 的相对排名(更高的分数-> 更好的排名)? 预期输出: A B Rank --------------- A1 B1 1 A1 B2 2 A1 B3 3 A2 B1 1 A2 B2 2 A2 B3 2 A3 B1 3 A3 B2 2 A3 B3 1 我想要达到的最终状态是聚合列 B 并存储每个 A 的排名: 例子: B Ranks B1 [1,1,3] B2 [2,2,2] B3 [3,2,1] 回答1 添加排名: from pyspark.sql.functions import * from pyspark.sql.window import Window ranked = df.withColumn( "rank", dense_rank().over(Window.partitionBy("A").orderBy(desc("C")))) 通过...分组: grouped = ranked.groupBy("B").agg(collect_list(struct("A", "rank")
  • Spark DataFrame:orderBy之后的groupBy是否保持该顺序?(Spark DataFrame: does groupBy after orderBy maintain that order?)
    问题 我有一个具有以下结构的Spark 2.0数据框example : id, hour, count id1, 0, 12 id1, 1, 55 .. id1, 23, 44 id2, 0, 12 id2, 1, 89 .. id2, 23, 34 etc. 每个ID包含24个条目(一天中的每个小时一个条目),并使用orderBy函数按id,小时进行排序。 我创建了一个聚合器groupConcat : def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable { override def zero: String = "" override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat) override def merge(b1: String, b2: String) = b1 + b2 override def finish(b: String) = b.substring(1) override def bufferEncoder: Encoder[String] = Encoders.STRING override
  • 如何在Spark SQL中按时间间隔分组(How to group by time interval in Spark SQL)
    问题 我的数据集如下所示: KEY |Event_Type | metric | Time 001 |event1 | 10 | 2016-05-01 10:50:51 002 |event2 | 100 | 2016-05-01 10:50:53 001 |event3 | 20 | 2016-05-01 10:50:55 001 |event1 | 15 | 2016-05-01 10:51:50 003 |event1 | 13 | 2016-05-01 10:55:30 001 |event2 | 12 | 2016-05-01 10:57:00 001 |event3 | 11 | 2016-05-01 11:00:01 我想在验证此的键时得到所有信息: 5分钟内“特定事件的指标总和” >阈值。 在我看来,这是使用“滑动Windows功能”的理想选择。 如何使用Spark SQL做到这一点? 谢谢你。 回答1 火花> = 2.0 您可以使用窗口(不要误认为窗口功能)。 根据一个变体,它将时间戳分配给另外一个可能重叠的存储桶: df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric") // +---+---------------------------------------------+--------