logo
down
shadow

Spark Scala Delete rows in one RDD based on columns of another RDD


Spark Scala Delete rows in one RDD based on columns of another RDD

By : Grzegorz Tworkowski
Date : November 20 2020, 03:01 PM
Hope that helps I'm very new to scala and spark and not sure how to start.
code :
// input rdds
val rdd1 = spark.sparkContext.makeRDD(Seq((1,2,3,11), (2,1,3,12), (1,4,5,13), (3,5,6,12)))
val rdd2 = spark.sparkContext.makeRDD(Seq((1,2), (2,1)))

// manipulate the 2 rdds as a key, val pair
// the key of the first rdd is a tuple pair of first two fields, the val contains all the fields
// the key of the second rdd is a tuple of first two fields, the val is just null
// then we could perform joins on their key
val rdd1_key = rdd1.map(record => ((record._1, record._2), record))
val rdd2_key = rdd2.map(record => (record, null))

// 1. perform left outer join, the record become (key, (val1, val2))
// 2. filter, keep those records which do not have a join
// if there is no join, val2 will be None, otherwise val2 will be null, which is the value we hardcoded from previous step
// 3. get val1 
rdd1_key.leftOuterJoin(rdd2_key)
  .filter(record => record._2._2 == None)
  .map(record => record._2._1)
  .collect().foreach(println(_))

// result
(1,4,5,13)
(3,5,6,12)


Share : facebook icon twitter icon
Transform rows to columns in Spark Scala SQL

Transform rows to columns in Spark Scala SQL


By : Vimal t d
Date : March 29 2020, 07:55 AM
I wish this help you This is a pretty simple groupByKey scenario. Although if you want to do something else with it after, then I would suggest using a more efficient PairRDDFunction as groupByKey is inefficient for follow up queries.
Spliting columns in a Spark dataframe in to new rows [Scala]

Spliting columns in a Spark dataframe in to new rows [Scala]


By : Julio Jose dos Santo
Date : March 29 2020, 07:55 AM
wish helps you I have output from a spark data frame like below: , Is this the thing you're looking for?
code :
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val sparkSession = ...
import sparkSession.implicits._

val input = sc.parallelize(Seq(
  (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq(232323,45466)),
  (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq(34343,45454))
)).toDF("amt", "id", "num", "start_date", "identifier")

val zipArrays = udf { (dates: Seq[String], identifiers: Seq[Int]) =>
  dates.zip(identifiers)
}

val output = input.select($"amt", $"id", $"num", explode(zipArrays($"start_date", $"identifier")))
  .select($"amt", $"id", $"num", $"col._1".as("start_date"), $"col._2".as("identifier"))

output.show()
+-----+-----+----+----------+----------+
|  amt|   id| num|start_date|identifier|
+-----+-----+----+----------+----------+
|43.45|19840|A345|2014-12-26|    232323|
|43.45|19840|A345|2013-12-12|     45466|
|43.45|19840|A345|2010-03-16|     34343|
|43.45|19840|A345|2013-16-12|     45454|
+-----+-----+----+----------+----------+
val input = sc.parallelize(Seq(
  (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq("232323","45466"), Seq("123", "234")),
  (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq("34343","45454"), Seq("345", "456"))
)).toDF("amt", "id", "num", "start_date", "identifier", "another_column")

val zipArrays = udf { seqs: Seq[Seq[String]] =>
  for(i <- seqs.head.indices) yield seqs.fold(Seq.empty)((accu, seq) => accu :+ seq(i))
}

val columnsToSelect = Seq($"amt", $"id", $"num")
val columnsToZip = Seq($"start_date", $"identifier", $"another_column")
val outputColumns = columnsToSelect ++ columnsToZip.zipWithIndex.map { case (column, index) =>
  $"col".getItem(index).as(column.toString())
}

val output = input.select($"amt", $"id", $"num", explode(zipArrays(array(columnsToZip: _*)))).select(outputColumns: _*)

output.show()

/*
+-----+-----+----+----------+----------+--------------+
|  amt|   id| num|start_date|identifier|another_column|
+-----+-----+----+----------+----------+--------------+
|43.45|19840|A345|2014-12-26|    232323|           123|
|43.45|19840|A345|2013-12-12|     45466|           234|
|43.45|19840|A345|2010-03-16|     34343|           345|
|43.45|19840|A345|2013-16-12|     45454|           456|
+-----+-----+----+----------+----------+--------------+
*/
how to convert rows into columns in spark dataframe, scala

how to convert rows into columns in spark dataframe, scala


By : Doug
Date : March 29 2020, 07:55 AM
Hope this helps I am not sure that what you need is what you actually asked. Yet, just in case here is an idea:
code :
val entries = inputDF.where('entry isNotNull)
    .where('entry !== "")
    .select("MemberID", "entry").distinct

val df = inputDF.groupBy("MemberID")
    .agg(collect_list("EncounterID") as "encounterList")
    .join(entries, Seq("MemberID"))
df.show
+--------+-------------------------+-----+
|MemberID|           encounterList |entry|
+--------+-------------------------+-----+
|    pid1|       [enc2, enc1, enc3]|  bat|
|    pid3|             [enc2, enc1]|  cat|
+--------+-------------------------+-----+
inputDF
    .groupBy("MemberID")
    .pivot("EncounterID", Seq("enc1", "enc2", "enc3"))
    .agg(first("entry")).show

+--------+----+----+----+
|MemberID|enc1|enc2|enc3|
+--------+----+----+----+
|    pid1| bat|    |    |
|    pid3| cat|    |    |
+--------+----+----+----+
How can I arrange the rows and the columns in Spark using scala

How can I arrange the rows and the columns in Spark using scala


By : Kedar
Date : March 29 2020, 07:55 AM
hope this fix your issue I want from a text file in format: , You should be doing the following (commented for clarity)
code :
//creating schema
import org.apache.spark.sql.types._
val schema = StructType(Array(
  StructField("col1", StringType, true),
  StructField("col2", StringType, true),
  StructField("col3", StringType, true)
))

//reading text file and finding total lines
val textFile = sc.textFile("*.txt")
val total = textFile.count()

//indexing lines for filtering the first and the last lines
import org.apache.spark.sql.Row
val rows = textFile.zipWithIndex()
    .filter(x => x._2 != 0 && x._2 < total - 1)
  .map(x => Row.fromSeq(x._1.split(";").toSeq))   //converting the lines to Row of sequences

//finally creating dataframe
val df = sqlContext.createDataFrame(rows, schema)
df.show(false)
+-------+-------+-------+
|col1   |col2   |col3   |
+-------+-------+-------+
|column1|column2|column3|
|column1|column2|column3|
+-------+-------+-------+
Scala spark: Sum all columns across all rows

Scala spark: Sum all columns across all rows


By : user3731156
Date : March 29 2020, 07:55 AM
To fix this issue If you check the Physical plan for both queries spark internally calls same plan so we can use either of them!
I think using df.groupBy().sum() will be handy as we don't need to specify all column names.
code :
val df=Seq((1,2,3),(4,5,6)).toDF("id","j","k")

scala> df.groupBy().sum().explain
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[sum(cast(id#7 as bigint)), sum(cast(j#8 as bigint)), sum(cast(k#9 as bigint))])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_sum(cast(id#7 as bigint)), partial_sum(cast(j#8 as bigint)), partial_sum(cast(k#9 as bigint))])
      +- LocalTableScan [id#7, j#8, k#9]

scala> df.agg(sum("id"),sum("j"),sum("k")).explain
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[sum(cast(id#7 as bigint)), sum(cast(j#8 as bigint)), sum(cast(k#9 as bigint))])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_sum(cast(id#7 as bigint)), partial_sum(cast(j#8 as bigint)), partial_sum(cast(k#9 as bigint))])
      +- LocalTableScan [id#7, j#8, k#9]
Related Posts Related Posts :
  • Create a map to call the POJO for each row of Spark Dataframe
  • Declare a generic class in scala without square brackets
  • How to hide configuration management from the main function?
  • akka http not handling parameters with dollar signs properly?
  • what is the use back ticks in scala
  • Scala. Play: get data and test result
  • Can spark-submit with named argument?
  • Scala alternative to series of if statements that append to a list?
  • Convert string column to Array
  • Unable to authenticate OAuth2 with Akka-Http
  • Iterate through rows in DataFrame and transform one to many
  • SPARK RDD Between logic using scala
  • Converting a Spark Dataframe to a mutable Map
  • Run a function in scala with a list as input
  • Convert arbitrary number of columns to Vector
  • how to call a method from another method using scala?
  • Scala: Traversable foreach definition
  • How to handle multiple invalid query params in akka http?
  • Scala error: value $ is not a member of object org.apache.spark.api.java.JavaSparkContext
  • Extract a specific JSON structure from a json string in a Spark Rdd - Scala
  • Spark: How do I query an array in a column?
  • scala - Functional way to take a string and create a dictionary using specific delimiters
  • Spark Scala: convert arbitrary N columns into Map
  • How to delete file right after processing it with Play Framework
  • scala: mapping future of tuple
  • why does sameElements returns true for sets?
  • Scala: Class of Options to Option of Class
  • timeout in scala's future firstcompletedof
  • No 'scala-library*.jar' in every new IntelliJ Scala Project
  • What is the meaning of "new {}" in Scala?
  • Why I cannot use iterator again in Scala
  • Spark worker throws FileNotFoundException on temporary shuffle files
  • Version conflict: some are suspected to be binary incompatible
  • Sbt: when to use testQuick and how does it determine which tests to skip?
  • IntelliJ: Scala worksheet don't pick up code changes without restart
  • The relationship between Type Symbol and Mirror of Scala reflection
  • Difference between [ ] and ( ) to create new Scala objects
  • Error: Could not find or load main class Main Scala
  • Maximum value of an mllib Vector?
  • Scalafx: create lineChart in scala
  • Conversion to tuple with by-name parameter
  • How to convert RDD of JSONs to Dataframe?
  • Spark: display log messages
  • How to bind Slick dependency with Lagom?
  • Sorting numeric String in Spark Dataset
  • understanding unapply without case class
  • Parsing more than 22 fields with Spray Json without nesting case classes
  • Why is Scala returning a ';' expected but ',' found error?
  • Spark reading Avro file
  • How to refactor similar and repetitive functions in scala
  • Getting ClassCastException while trying to save file in avro format in spark
  • How to Microbenchmark using data from a file?
  • Overloaded method value trigger with alternatives for '=> Unit' parameter
  • Unselecting "Run worksheet in the compiler process" causes source file not to be found
  • Why adding two List[Map[String, Any]] changes the output result type to List[Equals] in scala?
  • shadow
    Privacy Policy - Terms - Contact Us © voile276.org