diff options
Diffstat (limited to 'co-purchase-analysis/src/main/scala/Main.scala')
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index a12c850..0acce5b 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -1,5 +1,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD +import org.apache.spark.HashPartitioner import java.nio.file.{Paths, Files} /** A Spark application that analyzes co-purchased products. @@ -106,18 +107,23 @@ object CoPurchaseAnalysis { * RDD containing CoPurchase instances with purchase frequency counts */ def processData(data: RDD[OrderProduct]): RDD[String] = { - data + val groups = data .groupBy(_.orderId) + .partitionBy(new HashPartitioner(200)) + .persist() + + val pairs = groups .flatMap { case (_, orders) => orders.map(_.productId).toSeq.sorted.combinations(2).map { case List(p1, p2) => ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 } } - .reduceByKey(_ + _) - .map { case (ProductPair(product1, product2), count) => - s"${product1},${product2},${count}" - } + val coProducts = pairs.reduceByKey(_ + _) + + coProducts.map { case (ProductPair(product1, product2), count) => + s"${product1},${product2},${count}" + } } /** Main entry point for the application. |