summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2024-12-29 22:07:13 +0100
committerSanto Cariotti <santo@dcariotti.me>2024-12-29 22:07:13 +0100
commit4107698a14f4b711393385fc9a919b97ac5fb79e (patch)
treec37dfe1b145ae21ca15bf6850f191ca53b51a82d
parentebc7927407d9423c65df657da77b9e6bcdf930a9 (diff)
Simpler code
Remove `CoPurchase` mapping type
-rw-r--r--co-purchase-analysis/.gitignore1
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala57
2 files changed, 17 insertions, 41 deletions
diff --git a/co-purchase-analysis/.gitignore b/co-purchase-analysis/.gitignore
index 206c638..ad892e3 100644
--- a/co-purchase-analysis/.gitignore
+++ b/co-purchase-analysis/.gitignore
@@ -18,3 +18,4 @@ project/plugins/project/
.sbt/
.bsp/
project/
+input/order_products.csv
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index 8bd54e6..a55be03 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -38,17 +38,6 @@ object CoPurchaseAnalysis {
*/
case class ProductPair(product1: Int, product2: Int)
- /** Represents the co-purchase relationship between two products.
- *
- * @param product1
- * The identifier of the first product
- * @param product2
- * The identifier of the second product
- * @param count
- * The number of times these products were purchased together
- */
- case class CoPurchase(product1: Int, product2: Int, count: Int)
-
/** Validates command line arguments and checks file existence.
*
* @param args
@@ -105,27 +94,6 @@ object CoPurchaseAnalysis {
OrderProduct(parts(0).toInt, parts(1).toInt)
}
- /** Generates all possible pairs of products from a list of products in an
- * order. Products are sorted before pairing to ensure consistent ordering.
- *
- * @param products
- * List of product IDs from a single order
- * @return
- * List of ProductPair instances representing all possible product
- * combinations
- */
- def generateProductPairs(products: List[Int]): List[ProductPair] = {
- for {
- i <- products.indices.toList
- j <- (i + 1) until products.length
- } yield {
- val (p1, p2) =
- if (products(i) < products(j)) (products(i), products(j))
- else (products(j), products(i))
- ProductPair(p1, p2)
- }
- }
-
/** Processes the order data to generate co-purchase statistics.
*
* The processing pipeline includes: (1) Grouping orders by orderId, (2)
@@ -137,16 +105,27 @@ object CoPurchaseAnalysis {
* @return
* RDD containing CoPurchase instances with purchase frequency counts
*/
- def processData(data: RDD[OrderProduct]): RDD[CoPurchase] = {
+ def processData(data: RDD[OrderProduct]): RDD[String] = {
+ data
+ .groupBy(_.orderId)
+ .flatMap { case (_, orders) =>
+ orders.map(_.productId).toSeq.sorted.combinations(2).map {
+ case List(p1, p2) =>
+ ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1
+ }
+ }
+ .foreach(println)
data
.groupBy(_.orderId)
.flatMap { case (_, orders) =>
- generateProductPairs(orders.map(_.productId).toList)
+ orders.map(_.productId).toSeq.sorted.combinations(2).map {
+ case List(p1, p2) =>
+ ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1
+ }
}
- .map(pair => (pair, 1))
.reduceByKey(_ + _)
- .map { case (ProductPair(p1, p2), count) =>
- CoPurchase(p1, p2, count)
+ .map { case (ProductPair(product1, product2), count) =>
+ s"${product1},${product2},${count}"
}
}
@@ -181,10 +160,6 @@ object CoPurchaseAnalysis {
.map(parseLine)
val result = processData(inputRDD)
- .map { case (coPurchase: CoPurchase) =>
- s"${coPurchase.product1},${coPurchase.product2},${coPurchase.count}"
-
- }
.saveAsTextFile(config("outputPath"))
} finally {
spark.stop()