summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-02-16 11:35:42 +0100
committerSanto Cariotti <santo@dcariotti.me>2025-02-16 11:35:42 +0100
commit10fb40f056442741c7520f77ffccf137d224e191 (patch)
treecf04dfb6057a2ac76a84184818817642f3cec622
parent16801825bf5d342c02c64b496687dc30f6423120 (diff)
Remove OrderProduct case class to reduce load
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala25
1 files changed, 7 insertions, 18 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index a711136..fbca3ac 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -20,15 +20,6 @@ import org.apache.spark.HashPartitioner
*/
object CoPurchaseAnalysis {
- /** Represents an order-product relationship.
- *
- * @param orderId
- * The unique identifier for the order
- * @param productId
- * The unique identifier for the product
- */
- case class OrderProduct(orderId: Int, productId: Int)
-
/** Represents a pair of products that were purchased together.
*
* @param product1
@@ -81,17 +72,17 @@ object CoPurchaseAnalysis {
session.getOrCreate()
}
- /** Parses a single line from the input file into an OrderProduct instance.
- * Expects the line to be in CSV format with orderId and productId.
+ /** Parses a single line from the input file into a tuple. Expects the line to
+ * be in CSV format with orderId and productId.
*
* @param line
* Input line in format "orderId,productId"
* @return
- * OrderProduct instance containing the parsed data
+ * (Int, Int) tuple containing the parsed data
*/
- def parseLine(line: String): OrderProduct = {
+ def parseLine(line: String): (Int, Int) = {
val parts = line.split(",")
- OrderProduct(parts(0).toInt, parts(1).toInt)
+ (parts(0).toInt, parts(1).toInt)
}
/** Processes the order data to generate co-purchase statistics.
@@ -101,18 +92,17 @@ object CoPurchaseAnalysis {
* product pair
*
* @param data
- * RDD containing OrderProduct instances
+ * RDD containing (Int, Int)
* @param partitionsNumber
* Number of partitions used by HashPartitioner
* @return
* RDD containing CoPurchase instances with purchase frequency counts
*/
def processData(
- data: RDD[OrderProduct],
+ data: RDD[(Int, Int)],
partitionsNumber: Int
): RDD[String] = {
val pairs = data
- .map(order => (order.orderId, order.productId))
.partitionBy(new HashPartitioner(partitionsNumber))
.groupByKey()
.flatMap { case (_, productIds) =>
@@ -122,7 +112,6 @@ object CoPurchaseAnalysis {
y <- products if x < y
} yield (ProductPair(x, y), 1)
}
- .partitionBy(new HashPartitioner(partitionsNumber))
val coProducts = pairs.reduceByKey(_ + _)