diff options
Diffstat (limited to 'co-purchase-analysis/src/main/scala')
| -rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 25 | 
1 files changed, 7 insertions, 18 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index a711136..fbca3ac 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -20,15 +20,6 @@ import org.apache.spark.HashPartitioner    */  object CoPurchaseAnalysis { -  /** Represents an order-product relationship. -    * -    * @param orderId -    *   The unique identifier for the order -    * @param productId -    *   The unique identifier for the product -    */ -  case class OrderProduct(orderId: Int, productId: Int) -    /** Represents a pair of products that were purchased together.      *      * @param product1 @@ -81,17 +72,17 @@ object CoPurchaseAnalysis {      session.getOrCreate()    } -  /** Parses a single line from the input file into an OrderProduct instance. -    * Expects the line to be in CSV format with orderId and productId. +  /** Parses a single line from the input file into a tuple. Expects the line to +    * be in CSV format with orderId and productId.      *      * @param line      *   Input line in format "orderId,productId"      * @return -    *   OrderProduct instance containing the parsed data +    *   (Int, Int) tuple containing the parsed data      */ -  def parseLine(line: String): OrderProduct = { +  def parseLine(line: String): (Int, Int) = {      val parts = line.split(",") -    OrderProduct(parts(0).toInt, parts(1).toInt) +    (parts(0).toInt, parts(1).toInt)    }    /** Processes the order data to generate co-purchase statistics. @@ -101,18 +92,17 @@ object CoPurchaseAnalysis {      * product pair      *      * @param data -    *   RDD containing OrderProduct instances +    *   RDD containing (Int, Int)      * @param partitionsNumber      *   Number of partitions used by HashPartitioner      * @return      *   RDD containing CoPurchase instances with purchase frequency counts      */    def processData( -      data: RDD[OrderProduct], +      data: RDD[(Int, Int)],        partitionsNumber: Int    ): RDD[String] = {      val pairs = data -      .map(order => (order.orderId, order.productId))        .partitionBy(new HashPartitioner(partitionsNumber))        .groupByKey()        .flatMap { case (_, productIds) => @@ -122,7 +112,6 @@ object CoPurchaseAnalysis {            y <- products if x < y          } yield (ProductPair(x, y), 1)        } -      .partitionBy(new HashPartitioner(partitionsNumber))      val coProducts = pairs.reduceByKey(_ + _)  |