diff options
author | Santo Cariotti <santo@dcariotti.me> | 2024-12-19 22:04:19 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2024-12-19 22:04:19 +0100 |
commit | cda680f67fc81860e46e442e6d17c68e8c863ecd (patch) | |
tree | 1b1f59089d205d92d943825faedb968beb908135 |
Init code
-rw-r--r-- | co-purchase-analysis/.gitignore | 20 | ||||
-rw-r--r-- | co-purchase-analysis/.scalafmt.conf | 2 | ||||
-rw-r--r-- | co-purchase-analysis/build.sbt | 11 | ||||
-rw-r--r-- | co-purchase-analysis/input.txt | 9 | ||||
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 198 |
5 files changed, 240 insertions, 0 deletions
diff --git a/co-purchase-analysis/.gitignore b/co-purchase-analysis/.gitignore new file mode 100644 index 0000000..206c638 --- /dev/null +++ b/co-purchase-analysis/.gitignore @@ -0,0 +1,20 @@ +*.class +*.log +# Simple Build Tool +# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control + +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +.history +.cache +.lib/ + +.bloop/ +.metals/ +.sbt/ +.bsp/ +project/ diff --git a/co-purchase-analysis/.scalafmt.conf b/co-purchase-analysis/.scalafmt.conf new file mode 100644 index 0000000..ee7753a --- /dev/null +++ b/co-purchase-analysis/.scalafmt.conf @@ -0,0 +1,2 @@ +version = "3.7.15" +runner.dialect = scala213
\ No newline at end of file diff --git a/co-purchase-analysis/build.sbt b/co-purchase-analysis/build.sbt new file mode 100644 index 0000000..bd0d706 --- /dev/null +++ b/co-purchase-analysis/build.sbt @@ -0,0 +1,11 @@ +scalaVersion := "2.13.12" + +name := "co-purchase-analysis" +organization := "it.unibo.cs.scp" +version := "1.0" + +libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0" +libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.3" +libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.3" + + diff --git a/co-purchase-analysis/input.txt b/co-purchase-analysis/input.txt new file mode 100644 index 0000000..4c05d8d --- /dev/null +++ b/co-purchase-analysis/input.txt @@ -0,0 +1,9 @@ +1,12 +1,14 +2,8 +2,12 +2,14 +3,8 +3,12 +3,14 +3,16 diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala new file mode 100644 index 0000000..ad79e9f --- /dev/null +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -0,0 +1,198 @@ +import org.apache.spark.sql.SparkSession +import org.apache.spark.rdd.RDD +import java.nio.file.{Paths, Files} + +/** A Spark application that analyzes co-purchased products. + * + * This object processes a CSV input file containing order-product pairs and + * generates a report of products that are frequently purchased together. The + * analysis helps identify product relationships and potential recommendations + * for a shopping system. + * + * Input format: CSV file with each line containing "orderId,productId" Output + * format: CSV file with each line containing "product1,product2,count" + * + * @example + * {{{ + * // Run the application with input and output paths + * spark-submit co-purchase-analysis.jar input.csv output/ + * }}} + */ +object CoPurchaseAnalysis { + + /** Represents an order-product relationship. + * + * @param orderId + * The unique identifier for the order + * @param productId + * The unique identifier for the product + */ + case class OrderProduct(orderId: Int, productId: Int) + + /** Represents a pair of products that were purchased together. + * + * @param product1 + * The identifier of the first product + * @param product2 + * The identifier of the second product + */ + case class ProductPair(product1: Int, product2: Int) + + /** Represents the co-purchase relationship between two products. + * + * @param product1 + * The identifier of the first product + * @param product2 + * The identifier of the second product + * @param count + * The number of times these products were purchased together + */ + case class CoPurchase(product1: Int, product2: Int, count: Int) + + /** Validates command line arguments and checks file existence. + * + * @param args + * Command line arguments array containing input file path and output + * directory path + * @return + * Some(errorMessage) if validation fails, None if validation succeeds + */ + def checkArguments(args: Array[String]): Option[String] = { + if (args.length != 2) { + Some("You must define input file and output folder.") + } else if (!Files.exists(Paths.get(args(0)))) { + Some(s"Input file `${args(0)}` does not exist.") + } else if (Files.exists(Paths.get(args(1)))) { + Some(s"Output folder `${args(1)}` already exists.") + } else { + None + } + } + + /** Creates and configures a SparkSession. + * + * @param appName + * The name of the Spark application + * @param master + * The Spark master URL (e.g., "local", "yarn") + * @return + * Configured SparkSession instance + */ + def createSparkSession(appName: String, master: String): SparkSession = { + SparkSession.builder + .appName(appName) + .config("spark.master", master) + .getOrCreate() + } + + /** Parses a single line from the input file into an OrderProduct instance. + * Expects the line to be in CSV format with orderId and productId. + * + * @param line + * Input line in format "orderId,productId" + * @return + * OrderProduct instance containing the parsed data + */ + def parseLine(line: String): OrderProduct = { + val parts = line.split(",") + OrderProduct(parts(0).toInt, parts(1).toInt) + } + + /** Generates all possible pairs of products from a list of products in an + * order. Products are sorted before pairing to ensure consistent ordering. + * + * @param products + * List of product IDs from a single order + * @return + * List of ProductPair instances representing all possible product + * combinations + */ + def generateProductPairs(products: List[Int]): List[ProductPair] = { + val sortedProducts = products.sorted + for { + i <- products.indices.toList + j <- (i + 1) until products.length + } yield { + if (products(i) < products(j)) { + ProductPair(products(i), products(j)) + } else { + ProductPair(products(j), products(i)) + } + } + // val sortedProducts = products.sorted + // for { + // i <- sortedProducts.indices.toList + // j <- (i + 1) until sortedProducts.length + // } yield { + // val product1 = sortedProducts(i) + // val product2 = sortedProducts(j) + // + // ProductPair(product1, product2) + // } + } + + /** Processes the order data to generate co-purchase statistics. + * + * The processing pipeline includes: + * 1. Grouping orders by orderId 2. Generating product pairs for each order + * 3. Counting occurrences of each product pair + * + * @param data + * RDD containing OrderProduct instances + * @return + * RDD containing CoPurchase instances with purchase frequency counts + */ + def processData(data: RDD[OrderProduct]): RDD[CoPurchase] = { + data + .groupBy(_.orderId) + .flatMap { case (_, orders) => + generateProductPairs(orders.map(_.productId).toList) + } + .map(pair => (pair, 1)) + .reduceByKey(_ + _) + .map { case (ProductPair(p1, p2), count) => + CoPurchase(p1, p2, count) + } + } + + /** Main entry point for the application. + * + * @param args + * Command line arguments array + */ + def main(args: Array[String]): Unit = { + val argsError = checkArguments(args) + + if (!argsError.isEmpty) { + println(argsError.get) + return + } + + // Configuration values should be passed as parameters + val config = Map( + "appName" -> "Co-Purchase Analysis", + "master" -> "local[*]", + "inputPath" -> args(0), + "outputPath" -> args(1) + ) + + // Program execution composed of pure functions + val spark = createSparkSession(config("appName"), config("master")) + try { + spark.sparkContext.setLogLevel("ERROR") + + val inputRDD = spark.sparkContext + .textFile(config("inputPath")) + .map(parseLine) + + val result = processData(inputRDD) + .map { case (coPurchase: CoPurchase) => + s"${coPurchase.product1},${coPurchase.product2},${coPurchase.count}" + + } + .saveAsTextFile(config("outputPath")) + } finally { + spark.stop() + } + } +} |