summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-01-07 16:06:05 +0100
committerSanto Cariotti <santo@dcariotti.me>2025-01-07 16:06:05 +0100
commit83c3fa60c752486d2b4d0001b6fad3ee31fb9672 (patch)
treea2a22aa686f729ce9213689060257d157c60ea14
parent8094e6a64f166863fbeb445457364616f8bd8a7e (diff)
Use `groupByKey` instead of `groupBy`
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala17
1 files changed, 7 insertions, 10 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index 0acce5b..7fdb4ed 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -107,16 +107,13 @@ object CoPurchaseAnalysis {
* RDD containing CoPurchase instances with purchase frequency counts
*/
def processData(data: RDD[OrderProduct]): RDD[String] = {
- val groups = data
- .groupBy(_.orderId)
- .partitionBy(new HashPartitioner(200))
- .persist()
-
- val pairs = groups
- .flatMap { case (_, orders) =>
- orders.map(_.productId).toSeq.sorted.combinations(2).map {
- case List(p1, p2) =>
- ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1
+ val pairs = data
+ .map(order => (order.orderId, order.productId))
+ .groupByKey()
+ .partitionBy(new HashPartitioner(50))
+ .flatMap { case (_, productIds) =>
+ productIds.toSeq.sorted.combinations(2).map { case List(p1, p2) =>
+ ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1
}
}
val coProducts = pairs.reduceByKey(_ + _)