From d6ee6dae981d99f339f80ed6e65deea91eeff2ce Mon Sep 17 00:00:00 2001 From: Santo Cariotti Date: Tue, 31 Dec 2024 18:32:27 +0100 Subject: Use hash partitioner --- co-purchase-analysis/src/main/scala/Main.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index a12c850..0acce5b 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -1,5 +1,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD +import org.apache.spark.HashPartitioner import java.nio.file.{Paths, Files} /** A Spark application that analyzes co-purchased products. @@ -106,18 +107,23 @@ object CoPurchaseAnalysis { * RDD containing CoPurchase instances with purchase frequency counts */ def processData(data: RDD[OrderProduct]): RDD[String] = { - data + val groups = data .groupBy(_.orderId) + .partitionBy(new HashPartitioner(200)) + .persist() + + val pairs = groups .flatMap { case (_, orders) => orders.map(_.productId).toSeq.sorted.combinations(2).map { case List(p1, p2) => ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 } } - .reduceByKey(_ + _) - .map { case (ProductPair(product1, product2), count) => - s"${product1},${product2},${count}" - } + val coProducts = pairs.reduceByKey(_ + _) + + coProducts.map { case (ProductPair(product1, product2), count) => + s"${product1},${product2},${count}" + } } /** Main entry point for the application. -- cgit v1.2.3-18-g5258