logo
down
shadow

SPARK RDD Between logic using scala


SPARK RDD Between logic using scala

By : ChristosL
Date : November 20 2020, 03:01 PM
With these it helps I want to check a between logic using 2 RDD's. I am not able to figure out how I can do that
code :
// create input rdd 
val rdd1 = spark.sparkContext.makeRDD(Array(("EMP1",0),("EMP2",1),("EMP3",2),("EMP4",3),("EMP5",4),("EMP6",5),("EMP7",6),("EMP8",7)))
val rdd2 = spark.sparkContext.makeRDD(Array((0,3,"ABC"),(3,5,"XYZ"),(5,1000,"PQR")))

// 1. perform a cross join/cartesian, the rdd looks like (("EMP1",0), (0,3,"ABC"))
// 2. filter out those records which are not within range
// 3. formatting 
rdd1.cartesian(rdd2)
    .filter(record => record._1._2 >= record._2._1 && record._1._2 < record._2._2)
    .map(record => (record._1._1 + " " + record._2._3, record._1._2))
    .collect().foreach(println(_))

// result
(EMP1 ABC,0)
(EMP2 ABC,1)
(EMP3 ABC,2)
(EMP4 XYZ,3)
(EMP5 XYZ,4)
(EMP6 PQR,5)
(EMP7 PQR,6)
(EMP8 PQR,7)


Share : facebook icon twitter icon
How to add more code logic to spark/scala's getOrElse()?

How to add more code logic to spark/scala's getOrElse()?


By : user3447527
Date : March 29 2020, 07:55 AM
should help you out In Spark rdd's join function, we could use getOrElse() function like this: , You can build the array in the .getOrElse() :
code :
rdd_a.leftOuterJoin(rdd_b) { (id, data_a, data_b) => 
    data_b.getOrElse{
        arr = new Array[Double](10)
        arr.map(_=>Utils.random.nextGaussian())
    }
}
scala> List().headOption.getOrElse{ println("Building an array then."); Array(1) }
Building an array then.
res1: Array[Int] = Array(1)

scala> List(1).headOption.getOrElse{ println("Building an array then."); Array(1) }
res2: Any = 1
scala> Array.fill(3)(Random.nextGaussian)
res6: Array[Double] = Array(-0.2773138805049755, -1.4178827462945545, -0.8710624835785054)
Implementing logic in Scala for Spark

Implementing logic in Scala for Spark


By : Rahul Verma
Date : March 29 2020, 07:55 AM
will help you I am not I understand your question but I think that following approach should work.
First create a data frame with following schema
code :
root
 |-- node: string (nullable = true)
 |-- parent: string (nullable = true)
 |-- value: integer (nullable = true)
val children = df.groupBy($"parent").agg(sum($"value").alias("csum"))
df
  .select($"node", $"value")
  .join(children, df("node") <=> children("parent"))
  .select($"node", ($"value" === $"csum").alias("holds"))
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val nodes: RDD[(VertexId, (String, Int))] = sc.parallelize(Array(
    (0L, ("p1", 3)),
    (1L, ("c1", 2)),
    (2L, ("c2", 1)),
    (11L, ("c11",  1)),
    (12L, ("c12", 1))
))

val relationships: RDD[Edge[String]] = sc.parallelize(Array(
    Edge(1L, 0L, "child"),
    Edge(2L, 0L, "child"),
    Edge(11L, 1L, "child"),
    Edge(12L, 1L, "child")
))

val graph = Graph(nodes, relationships)


graph.aggregateMessages[(Int, Int)](
    triplet => triplet.sendToDst(triplet.dstAttr._2, triplet.srcAttr._2),
    (a, b) => (a._1,  a._2 + b._2)
).map{case (id, (expected, actual)) => expected == actual}.reduce(_ & _)
Implementing SQL logic via Dataframes using Spark and Scala

Implementing SQL logic via Dataframes using Spark and Scala


By : Adeoye Akanmu
Date : March 29 2020, 07:55 AM
will be helpful for those in need First, you can use Column's || operator to construct logical OR conditions. Also - note that when takes only 2 arguments (condition and value), and if you want to supply an alternative value (to be used if condition isn't met) - you need to use .otherwise:
code :
val df_withalias = df.withColumn("myalias",
  when(df("c1") === 0 || isnull(df("c1")),
    when(isnull(df("c2")/df("c3")), 1).otherwise(df("c2")/df("c3"))
  ).otherwise(df("c1"))
)
Spark Scala: group by multiple columns with aggregation logic

Spark Scala: group by multiple columns with aggregation logic


By : user3606764
Date : March 29 2020, 07:55 AM
it fixes the issue Given the following DataFrame: , It should be quite straightforward groupBy and count
code :
import org.apache.spark.sql.functions._

df.withColumn("new_x", round($"x" / $"width" * 100.0 ) / 100.0)
  .withColumn("new_y", round($"y" / $"height" * 100.0 ) / 100.0)
  .groupBy("name", "platform", "group_id", "new_x", "new_y")
  .count()
  .show(false)
+----+--------+--------+-----+-----+-----+
|name|platform|group_id|new_x|new_y|count|
+----+--------+--------+-----+-----+-----+
|a   |plat_a  |0       |0.5  |0.5  |2    |
|b   |plat_b  |0       |0.5  |0.5  |1    |
|b   |plat_b  |1       |0.25 |0.25 |2    |
|a   |plat_a  |0       |0.4  |0.14 |1    |
+----+--------+--------+-----+-----+-----+
Figure out grouping logic for this Spark Scala Dataframe

Figure out grouping logic for this Spark Scala Dataframe


By : user3723802
Date : March 29 2020, 07:55 AM
I wish did fix the issue. I have a Spark DataFrame (DF) which looks like this: , Make a column as array type first, and then use groupBy.
code :
val df = spark.read.option("header","true").option("delimiter","\t").csv("test.csv")
df.show

val finalDF = df.groupBy("id").agg(collect_list(array("b", "c")).as("grouping"), count("*").as("count")).orderBy("id")
finalDF.show(false)
finalDF.printSchema
+---+--------------------------------------------------+-----+
|id |grouping                                          |count|
+---+--------------------------------------------------+-----+
|1  |[[ok, 9], [not ok, 10], [sure, 1], [sure bleh, 1]]|4    |
|2  |[[not sure, 2], [not so sure, 12]]                |2    |
|3  |[[not sure, 5], [not so sure, 25]]                |2    |
+---+--------------------------------------------------+-----+

root
 |-- id: string (nullable = true)
 |-- grouping: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- count: long (nullable = false)
Related Posts Related Posts :
  • How to hide configuration management from the main function?
  • akka http not handling parameters with dollar signs properly?
  • what is the use back ticks in scala
  • Scala. Play: get data and test result
  • Can spark-submit with named argument?
  • Scala alternative to series of if statements that append to a list?
  • Convert string column to Array
  • Unable to authenticate OAuth2 with Akka-Http
  • Iterate through rows in DataFrame and transform one to many
  • Spark Scala Delete rows in one RDD based on columns of another RDD
  • Converting a Spark Dataframe to a mutable Map
  • Run a function in scala with a list as input
  • Convert arbitrary number of columns to Vector
  • how to call a method from another method using scala?
  • Scala: Traversable foreach definition
  • How to handle multiple invalid query params in akka http?
  • Scala error: value $ is not a member of object org.apache.spark.api.java.JavaSparkContext
  • Extract a specific JSON structure from a json string in a Spark Rdd - Scala
  • Spark: How do I query an array in a column?
  • scala - Functional way to take a string and create a dictionary using specific delimiters
  • Spark Scala: convert arbitrary N columns into Map
  • How to delete file right after processing it with Play Framework
  • scala: mapping future of tuple
  • why does sameElements returns true for sets?
  • Scala: Class of Options to Option of Class
  • timeout in scala's future firstcompletedof
  • No 'scala-library*.jar' in every new IntelliJ Scala Project
  • What is the meaning of "new {}" in Scala?
  • Why I cannot use iterator again in Scala
  • Spark worker throws FileNotFoundException on temporary shuffle files
  • Version conflict: some are suspected to be binary incompatible
  • Sbt: when to use testQuick and how does it determine which tests to skip?
  • IntelliJ: Scala worksheet don't pick up code changes without restart
  • The relationship between Type Symbol and Mirror of Scala reflection
  • Difference between [ ] and ( ) to create new Scala objects
  • Error: Could not find or load main class Main Scala
  • Maximum value of an mllib Vector?
  • Scalafx: create lineChart in scala
  • Conversion to tuple with by-name parameter
  • How to convert RDD of JSONs to Dataframe?
  • Spark: display log messages
  • How to bind Slick dependency with Lagom?
  • Sorting numeric String in Spark Dataset
  • understanding unapply without case class
  • Parsing more than 22 fields with Spray Json without nesting case classes
  • Why is Scala returning a ';' expected but ',' found error?
  • Spark reading Avro file
  • How to refactor similar and repetitive functions in scala
  • Getting ClassCastException while trying to save file in avro format in spark
  • How to Microbenchmark using data from a file?
  • Overloaded method value trigger with alternatives for '=> Unit' parameter
  • Unselecting "Run worksheet in the compiler process" causes source file not to be found
  • Why adding two List[Map[String, Any]] changes the output result type to List[Equals] in scala?
  • shadow
    Privacy Policy - Terms - Contact Us © voile276.org