From 2bb1d30882187e66f24c6211d7cde1c0641a6a18 Mon Sep 17 00:00:00 2001 From: Santo Cariotti Date: Thu, 13 Feb 2025 21:33:18 +0100 Subject: Multi repartion --- co-purchase-analysis/src/main/scala/Main.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index e4938d8..d3679b2 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -118,6 +118,7 @@ object CoPurchaseAnalysis { ): RDD[String] = { val pairs = data .map(order => (order.orderId, order.productId)) + .partitionBy(new HashPartitioner(partitionsNumber)) .groupByKey() .flatMap { case (_, productIds) => val products = productIds.toSeq @@ -130,9 +131,12 @@ object CoPurchaseAnalysis { val coProducts = pairs.reduceByKey(_ + _) - coProducts.map { case (ProductPair(product1, product2), count) => - s"${product1},${product2},${count}" + val result = coProducts.map { + case (ProductPair(product1, product2), count) => + s"${product1},${product2},${count}" } + + result.repartition(1) } /** Main entry point for the application. -- cgit v1.2.3-18-g5258