diff options
author | Santo Cariotti <santo@dcariotti.me> | 2024-12-29 22:07:13 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2024-12-29 22:07:13 +0100 |
commit | 4107698a14f4b711393385fc9a919b97ac5fb79e (patch) | |
tree | c37dfe1b145ae21ca15bf6850f191ca53b51a82d | |
parent | ebc7927407d9423c65df657da77b9e6bcdf930a9 (diff) |
Simpler code
Remove `CoPurchase` mapping type
-rw-r--r-- | co-purchase-analysis/.gitignore | 1 | ||||
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 57 |
2 files changed, 17 insertions, 41 deletions
diff --git a/co-purchase-analysis/.gitignore b/co-purchase-analysis/.gitignore index 206c638..ad892e3 100644 --- a/co-purchase-analysis/.gitignore +++ b/co-purchase-analysis/.gitignore @@ -18,3 +18,4 @@ project/plugins/project/ .sbt/ .bsp/ project/ +input/order_products.csv diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index 8bd54e6..a55be03 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -38,17 +38,6 @@ object CoPurchaseAnalysis { */ case class ProductPair(product1: Int, product2: Int) - /** Represents the co-purchase relationship between two products. - * - * @param product1 - * The identifier of the first product - * @param product2 - * The identifier of the second product - * @param count - * The number of times these products were purchased together - */ - case class CoPurchase(product1: Int, product2: Int, count: Int) - /** Validates command line arguments and checks file existence. * * @param args @@ -105,27 +94,6 @@ object CoPurchaseAnalysis { OrderProduct(parts(0).toInt, parts(1).toInt) } - /** Generates all possible pairs of products from a list of products in an - * order. Products are sorted before pairing to ensure consistent ordering. - * - * @param products - * List of product IDs from a single order - * @return - * List of ProductPair instances representing all possible product - * combinations - */ - def generateProductPairs(products: List[Int]): List[ProductPair] = { - for { - i <- products.indices.toList - j <- (i + 1) until products.length - } yield { - val (p1, p2) = - if (products(i) < products(j)) (products(i), products(j)) - else (products(j), products(i)) - ProductPair(p1, p2) - } - } - /** Processes the order data to generate co-purchase statistics. * * The processing pipeline includes: (1) Grouping orders by orderId, (2) @@ -137,16 +105,27 @@ object CoPurchaseAnalysis { * @return * RDD containing CoPurchase instances with purchase frequency counts */ - def processData(data: RDD[OrderProduct]): RDD[CoPurchase] = { + def processData(data: RDD[OrderProduct]): RDD[String] = { + data + .groupBy(_.orderId) + .flatMap { case (_, orders) => + orders.map(_.productId).toSeq.sorted.combinations(2).map { + case List(p1, p2) => + ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 + } + } + .foreach(println) data .groupBy(_.orderId) .flatMap { case (_, orders) => - generateProductPairs(orders.map(_.productId).toList) + orders.map(_.productId).toSeq.sorted.combinations(2).map { + case List(p1, p2) => + ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 + } } - .map(pair => (pair, 1)) .reduceByKey(_ + _) - .map { case (ProductPair(p1, p2), count) => - CoPurchase(p1, p2, count) + .map { case (ProductPair(product1, product2), count) => + s"${product1},${product2},${count}" } } @@ -181,10 +160,6 @@ object CoPurchaseAnalysis { .map(parseLine) val result = processData(inputRDD) - .map { case (coPurchase: CoPurchase) => - s"${coPurchase.product1},${coPurchase.product2},${coPurchase.count}" - - } .saveAsTextFile(config("outputPath")) } finally { spark.stop() |