diff options
author | Santo Cariotti <santo@dcariotti.me> | 2024-12-31 18:32:27 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2024-12-31 18:32:27 +0100 |
commit | d6ee6dae981d99f339f80ed6e65deea91eeff2ce (patch) | |
tree | cc2860dabeae54a2f4285a41cf94ddc6bd8223a0 | |
parent | 861400563abee500682bea89161eabfcb86a3e60 (diff) |
Use hash partitioner
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index a12c850..0acce5b 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -1,5 +1,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD +import org.apache.spark.HashPartitioner import java.nio.file.{Paths, Files} /** A Spark application that analyzes co-purchased products. @@ -106,18 +107,23 @@ object CoPurchaseAnalysis { * RDD containing CoPurchase instances with purchase frequency counts */ def processData(data: RDD[OrderProduct]): RDD[String] = { - data + val groups = data .groupBy(_.orderId) + .partitionBy(new HashPartitioner(200)) + .persist() + + val pairs = groups .flatMap { case (_, orders) => orders.map(_.productId).toSeq.sorted.combinations(2).map { case List(p1, p2) => ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 } } - .reduceByKey(_ + _) - .map { case (ProductPair(product1, product2), count) => - s"${product1},${product2},${count}" - } + val coProducts = pairs.reduceByKey(_ + _) + + coProducts.map { case (ProductPair(product1, product2), count) => + s"${product1},${product2},${count}" + } } /** Main entry point for the application. |