diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-02-16 11:35:42 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-02-16 11:35:42 +0100 |
commit | 10fb40f056442741c7520f77ffccf137d224e191 (patch) | |
tree | cf04dfb6057a2ac76a84184818817642f3cec622 | |
parent | 16801825bf5d342c02c64b496687dc30f6423120 (diff) |
Remove OrderProduct case class to reduce load
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 25 |
1 files changed, 7 insertions, 18 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index a711136..fbca3ac 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -20,15 +20,6 @@ import org.apache.spark.HashPartitioner */ object CoPurchaseAnalysis { - /** Represents an order-product relationship. - * - * @param orderId - * The unique identifier for the order - * @param productId - * The unique identifier for the product - */ - case class OrderProduct(orderId: Int, productId: Int) - /** Represents a pair of products that were purchased together. * * @param product1 @@ -81,17 +72,17 @@ object CoPurchaseAnalysis { session.getOrCreate() } - /** Parses a single line from the input file into an OrderProduct instance. - * Expects the line to be in CSV format with orderId and productId. + /** Parses a single line from the input file into a tuple. Expects the line to + * be in CSV format with orderId and productId. * * @param line * Input line in format "orderId,productId" * @return - * OrderProduct instance containing the parsed data + * (Int, Int) tuple containing the parsed data */ - def parseLine(line: String): OrderProduct = { + def parseLine(line: String): (Int, Int) = { val parts = line.split(",") - OrderProduct(parts(0).toInt, parts(1).toInt) + (parts(0).toInt, parts(1).toInt) } /** Processes the order data to generate co-purchase statistics. @@ -101,18 +92,17 @@ object CoPurchaseAnalysis { * product pair * * @param data - * RDD containing OrderProduct instances + * RDD containing (Int, Int) * @param partitionsNumber * Number of partitions used by HashPartitioner * @return * RDD containing CoPurchase instances with purchase frequency counts */ def processData( - data: RDD[OrderProduct], + data: RDD[(Int, Int)], partitionsNumber: Int ): RDD[String] = { val pairs = data - .map(order => (order.orderId, order.productId)) .partitionBy(new HashPartitioner(partitionsNumber)) .groupByKey() .flatMap { case (_, productIds) => @@ -122,7 +112,6 @@ object CoPurchaseAnalysis { y <- products if x < y } yield (ProductPair(x, y), 1) } - .partitionBy(new HashPartitioner(partitionsNumber)) val coProducts = pairs.reduceByKey(_ + _) |