summaryrefslogtreecommitdiff
path: root/co-purchase-analysis/src/main/scala/Main.scala
diff options
context:
space:
mode:
Diffstat (limited to 'co-purchase-analysis/src/main/scala/Main.scala')
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala17
1 files changed, 7 insertions, 10 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index 0acce5b..7fdb4ed 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -107,16 +107,13 @@ object CoPurchaseAnalysis {
* RDD containing CoPurchase instances with purchase frequency counts
*/
def processData(data: RDD[OrderProduct]): RDD[String] = {
- val groups = data
- .groupBy(_.orderId)
- .partitionBy(new HashPartitioner(200))
- .persist()
-
- val pairs = groups
- .flatMap { case (_, orders) =>
- orders.map(_.productId).toSeq.sorted.combinations(2).map {
- case List(p1, p2) =>
- ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1
+ val pairs = data
+ .map(order => (order.orderId, order.productId))
+ .groupByKey()
+ .partitionBy(new HashPartitioner(50))
+ .flatMap { case (_, productIds) =>
+ productIds.toSeq.sorted.combinations(2).map { case List(p1, p2) =>
+ ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1
}
}
val coProducts = pairs.reduceByKey(_ + _)