diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-02-13 21:33:18 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-02-13 21:50:43 +0100 |
commit | 2bb1d30882187e66f24c6211d7cde1c0641a6a18 (patch) | |
tree | 8076e61a1e938555c12b8fabac5b8f0bc4741be9 | |
parent | cc1e3b52cf769e371931646b741ffff7be6588f3 (diff) |
Multi repartion
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index e4938d8..d3679b2 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -118,6 +118,7 @@ object CoPurchaseAnalysis { ): RDD[String] = { val pairs = data .map(order => (order.orderId, order.productId)) + .partitionBy(new HashPartitioner(partitionsNumber)) .groupByKey() .flatMap { case (_, productIds) => val products = productIds.toSeq @@ -130,9 +131,12 @@ object CoPurchaseAnalysis { val coProducts = pairs.reduceByKey(_ + _) - coProducts.map { case (ProductPair(product1, product2), count) => - s"${product1},${product2},${count}" + val result = coProducts.map { + case (ProductPair(product1, product2), count) => + s"${product1},${product2},${count}" } + + result.repartition(1) } /** Main entry point for the application. |