diff options
Diffstat (limited to 'co-purchase-analysis/src/main')
| -rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 16 | 
1 files changed, 11 insertions, 5 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index a12c850..0acce5b 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -1,5 +1,6 @@  import org.apache.spark.sql.SparkSession  import org.apache.spark.rdd.RDD +import org.apache.spark.HashPartitioner  import java.nio.file.{Paths, Files}  /** A Spark application that analyzes co-purchased products. @@ -106,18 +107,23 @@ object CoPurchaseAnalysis {      *   RDD containing CoPurchase instances with purchase frequency counts      */    def processData(data: RDD[OrderProduct]): RDD[String] = { -    data +    val groups = data        .groupBy(_.orderId) +      .partitionBy(new HashPartitioner(200)) +      .persist() + +    val pairs = groups        .flatMap { case (_, orders) =>          orders.map(_.productId).toSeq.sorted.combinations(2).map {            case List(p1, p2) =>              ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1          }        } -      .reduceByKey(_ + _) -      .map { case (ProductPair(product1, product2), count) => -        s"${product1},${product2},${count}" -      } +    val coProducts = pairs.reduceByKey(_ + _) + +    coProducts.map { case (ProductPair(product1, product2), count) => +      s"${product1},${product2},${count}" +    }    }    /** Main entry point for the application.  |