summaryrefslogtreecommitdiff
path: root/co-purchase-analysis/src/main/scala/Main.scala
diff options
context:
space:
mode:
Diffstat (limited to 'co-purchase-analysis/src/main/scala/Main.scala')
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala16
1 files changed, 11 insertions, 5 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index a12c850..0acce5b 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -1,5 +1,6 @@
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
+import org.apache.spark.HashPartitioner
import java.nio.file.{Paths, Files}
/** A Spark application that analyzes co-purchased products.
@@ -106,18 +107,23 @@ object CoPurchaseAnalysis {
* RDD containing CoPurchase instances with purchase frequency counts
*/
def processData(data: RDD[OrderProduct]): RDD[String] = {
- data
+ val groups = data
.groupBy(_.orderId)
+ .partitionBy(new HashPartitioner(200))
+ .persist()
+
+ val pairs = groups
.flatMap { case (_, orders) =>
orders.map(_.productId).toSeq.sorted.combinations(2).map {
case List(p1, p2) =>
ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1
}
}
- .reduceByKey(_ + _)
- .map { case (ProductPair(product1, product2), count) =>
- s"${product1},${product2},${count}"
- }
+ val coProducts = pairs.reduceByKey(_ + _)
+
+ coProducts.map { case (ProductPair(product1, product2), count) =>
+ s"${product1},${product2},${count}"
+ }
}
/** Main entry point for the application.