diff options
Diffstat (limited to 'co-purchase-analysis/src/main')
| -rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 17 | 
1 files changed, 7 insertions, 10 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index 0acce5b..7fdb4ed 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -107,16 +107,13 @@ object CoPurchaseAnalysis {      *   RDD containing CoPurchase instances with purchase frequency counts      */    def processData(data: RDD[OrderProduct]): RDD[String] = { -    val groups = data -      .groupBy(_.orderId) -      .partitionBy(new HashPartitioner(200)) -      .persist() - -    val pairs = groups -      .flatMap { case (_, orders) => -        orders.map(_.productId).toSeq.sorted.combinations(2).map { -          case List(p1, p2) => -            ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 +    val pairs = data +      .map(order => (order.orderId, order.productId)) +      .groupByKey() +      .partitionBy(new HashPartitioner(50)) +      .flatMap { case (_, productIds) => +        productIds.toSeq.sorted.combinations(2).map { case List(p1, p2) => +          ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1          }        }      val coProducts = pairs.reduceByKey(_ + _)  |