summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-02-13 21:33:18 +0100
committerSanto Cariotti <santo@dcariotti.me>2025-02-13 21:50:43 +0100
commit2bb1d30882187e66f24c6211d7cde1c0641a6a18 (patch)
tree8076e61a1e938555c12b8fabac5b8f0bc4741be9
parentcc1e3b52cf769e371931646b741ffff7be6588f3 (diff)
Multi repartion
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala8
1 files changed, 6 insertions, 2 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index e4938d8..d3679b2 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -118,6 +118,7 @@ object CoPurchaseAnalysis {
): RDD[String] = {
val pairs = data
.map(order => (order.orderId, order.productId))
+ .partitionBy(new HashPartitioner(partitionsNumber))
.groupByKey()
.flatMap { case (_, productIds) =>
val products = productIds.toSeq
@@ -130,9 +131,12 @@ object CoPurchaseAnalysis {
val coProducts = pairs.reduceByKey(_ + _)
- coProducts.map { case (ProductPair(product1, product2), count) =>
- s"${product1},${product2},${count}"
+ val result = coProducts.map {
+ case (ProductPair(product1, product2), count) =>
+ s"${product1},${product2},${count}"
}
+
+ result.repartition(1)
}
/** Main entry point for the application.