diff options
Diffstat (limited to 'co-purchase-analysis/src')
| -rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 8 | 
1 files changed, 6 insertions, 2 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index e4938d8..d3679b2 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -118,6 +118,7 @@ object CoPurchaseAnalysis {    ): RDD[String] = {      val pairs = data        .map(order => (order.orderId, order.productId)) +      .partitionBy(new HashPartitioner(partitionsNumber))        .groupByKey()        .flatMap { case (_, productIds) =>          val products = productIds.toSeq @@ -130,9 +131,12 @@ object CoPurchaseAnalysis {      val coProducts = pairs.reduceByKey(_ + _) -    coProducts.map { case (ProductPair(product1, product2), count) => -      s"${product1},${product2},${count}" +    val result = coProducts.map { +      case (ProductPair(product1, product2), count) => +        s"${product1},${product2},${count}"      } + +    result.repartition(1)    }    /** Main entry point for the application.  |