diff options
Diffstat (limited to 'co-purchase-analysis/src/main')
| -rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 57 | 
1 files changed, 16 insertions, 41 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index 8bd54e6..a55be03 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -38,17 +38,6 @@ object CoPurchaseAnalysis {      */    case class ProductPair(product1: Int, product2: Int) -  /** Represents the co-purchase relationship between two products. -    * -    * @param product1 -    *   The identifier of the first product -    * @param product2 -    *   The identifier of the second product -    * @param count -    *   The number of times these products were purchased together -    */ -  case class CoPurchase(product1: Int, product2: Int, count: Int) -    /** Validates command line arguments and checks file existence.      *      * @param args @@ -105,27 +94,6 @@ object CoPurchaseAnalysis {      OrderProduct(parts(0).toInt, parts(1).toInt)    } -  /** Generates all possible pairs of products from a list of products in an -    * order. Products are sorted before pairing to ensure consistent ordering. -    * -    * @param products -    *   List of product IDs from a single order -    * @return -    *   List of ProductPair instances representing all possible product -    *   combinations -    */ -  def generateProductPairs(products: List[Int]): List[ProductPair] = { -    for { -      i <- products.indices.toList -      j <- (i + 1) until products.length -    } yield { -      val (p1, p2) = -        if (products(i) < products(j)) (products(i), products(j)) -        else (products(j), products(i)) -      ProductPair(p1, p2) -    } -  } -    /** Processes the order data to generate co-purchase statistics.      *      * The processing pipeline includes: (1) Grouping orders by orderId, (2) @@ -137,16 +105,27 @@ object CoPurchaseAnalysis {      * @return      *   RDD containing CoPurchase instances with purchase frequency counts      */ -  def processData(data: RDD[OrderProduct]): RDD[CoPurchase] = { +  def processData(data: RDD[OrderProduct]): RDD[String] = { +    data +      .groupBy(_.orderId) +      .flatMap { case (_, orders) => +        orders.map(_.productId).toSeq.sorted.combinations(2).map { +          case List(p1, p2) => +            ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 +        } +      } +      .foreach(println)      data        .groupBy(_.orderId)        .flatMap { case (_, orders) => -        generateProductPairs(orders.map(_.productId).toList) +        orders.map(_.productId).toSeq.sorted.combinations(2).map { +          case List(p1, p2) => +            ProductPair(Math.min(p1, p2), Math.max(p1, p2)) -> 1 +        }        } -      .map(pair => (pair, 1))        .reduceByKey(_ + _) -      .map { case (ProductPair(p1, p2), count) => -        CoPurchase(p1, p2, count) +      .map { case (ProductPair(product1, product2), count) => +        s"${product1},${product2},${count}"        }    } @@ -181,10 +160,6 @@ object CoPurchaseAnalysis {          .map(parseLine)        val result = processData(inputRDD) -        .map { case (coPurchase: CoPurchase) => -          s"${coPurchase.product1},${coPurchase.product2},${coPurchase.count}" - -        }          .saveAsTextFile(config("outputPath"))      } finally {        spark.stop()  |