From 2bb1d30882187e66f24c6211d7cde1c0641a6a18 Mon Sep 17 00:00:00 2001
From: Santo Cariotti <santo@dcariotti.me>
Date: Thu, 13 Feb 2025 21:33:18 +0100
Subject: Multi repartion

---
 co-purchase-analysis/src/main/scala/Main.scala | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

(limited to 'co-purchase-analysis/src')

diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index e4938d8..d3679b2 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -118,6 +118,7 @@ object CoPurchaseAnalysis {
   ): RDD[String] = {
     val pairs = data
       .map(order => (order.orderId, order.productId))
+      .partitionBy(new HashPartitioner(partitionsNumber))
       .groupByKey()
       .flatMap { case (_, productIds) =>
         val products = productIds.toSeq
@@ -130,9 +131,12 @@ object CoPurchaseAnalysis {
 
     val coProducts = pairs.reduceByKey(_ + _)
 
-    coProducts.map { case (ProductPair(product1, product2), count) =>
-      s"${product1},${product2},${count}"
+    val result = coProducts.map {
+      case (ProductPair(product1, product2), count) =>
+        s"${product1},${product2},${count}"
     }
+
+    result.repartition(1)
   }
 
   /** Main entry point for the application.
-- 
cgit v1.2.3-18-g5258