diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-01-07 16:06:05 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-01-07 16:06:05 +0100 |
commit | 83c3fa60c752486d2b4d0001b6fad3ee31fb9672 (patch) | |
tree | a2a22aa686f729ce9213689060257d157c60ea14 | |
parent | 8094e6a64f166863fbeb445457364616f8bd8a7e (diff) |
Use `groupByKey` instead of `groupBy`
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index 0acce5b..7fdb4ed 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -107,16 +107,13 @@ object CoPurchaseAnalysis { * RDD containing CoPurchase instances with purchase frequency counts */ def processData(data: RDD[OrderProduct]): RDD[String] = { - val groups = data - .groupBy(_.orderId) - .partitionBy(new HashPartitioner(200)) - .persist() - - val pairs = groups - .flatMap { case (_, orders) => - orders.map(_.productId).toSeq.sorted.combinations(2).map { - case List(p1, p2) => - ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 + val pairs = data + .map(order => (order.orderId, order.productId)) + .groupByKey() + .partitionBy(new HashPartitioner(50)) + .flatMap { case (_, productIds) => + productIds.toSeq.sorted.combinations(2).map { case List(p1, p2) => + ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 } } val coProducts = pairs.reduceByKey(_ + _) |