summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2024-12-31 18:32:27 +0100
committerSanto Cariotti <santo@dcariotti.me>2024-12-31 18:32:27 +0100
commitd6ee6dae981d99f339f80ed6e65deea91eeff2ce (patch)
treecc2860dabeae54a2f4285a41cf94ddc6bd8223a0
parent861400563abee500682bea89161eabfcb86a3e60 (diff)
Use hash partitioner
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index a12c850..0acce5b 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -1,5 +1,6 @@
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
+import org.apache.spark.HashPartitioner
import java.nio.file.{Paths, Files}
/** A Spark application that analyzes co-purchased products.
@@ -106,18 +107,23 @@ object CoPurchaseAnalysis {
* RDD containing CoPurchase instances with purchase frequency counts
*/
def processData(data: RDD[OrderProduct]): RDD[String] = {
- data
+ val groups = data
.groupBy(_.orderId)
+ .partitionBy(new HashPartitioner(200))
+ .persist()
+
+ val pairs = groups
.flatMap { case (_, orders) =>
orders.map(_.productId).toSeq.sorted.combinations(2).map {
case List(p1, p2) =>
ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1
}
}
- .reduceByKey(_ + _)
- .map { case (ProductPair(product1, product2), count) =>
- s"${product1},${product2},${count}"
- }
+ val coProducts = pairs.reduceByKey(_ + _)
+
+ coProducts.map { case (ProductPair(product1, product2), count) =>
+ s"${product1},${product2},${count}"
+ }
}
/** Main entry point for the application.