summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2024-12-19 22:04:19 +0100
committerSanto Cariotti <santo@dcariotti.me>2024-12-19 22:04:19 +0100
commitcda680f67fc81860e46e442e6d17c68e8c863ecd (patch)
tree1b1f59089d205d92d943825faedb968beb908135
Init code
-rw-r--r--co-purchase-analysis/.gitignore20
-rw-r--r--co-purchase-analysis/.scalafmt.conf2
-rw-r--r--co-purchase-analysis/build.sbt11
-rw-r--r--co-purchase-analysis/input.txt9
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala198
5 files changed, 240 insertions, 0 deletions
diff --git a/co-purchase-analysis/.gitignore b/co-purchase-analysis/.gitignore
new file mode 100644
index 0000000..206c638
--- /dev/null
+++ b/co-purchase-analysis/.gitignore
@@ -0,0 +1,20 @@
+*.class
+*.log
+# Simple Build Tool
+# http://www.scala-sbt.org/release/docs/Getting-Started/Directories.html#configuring-version-control
+
+dist/*
+target/
+lib_managed/
+src_managed/
+project/boot/
+project/plugins/project/
+.history
+.cache
+.lib/
+
+.bloop/
+.metals/
+.sbt/
+.bsp/
+project/
diff --git a/co-purchase-analysis/.scalafmt.conf b/co-purchase-analysis/.scalafmt.conf
new file mode 100644
index 0000000..ee7753a
--- /dev/null
+++ b/co-purchase-analysis/.scalafmt.conf
@@ -0,0 +1,2 @@
+version = "3.7.15"
+runner.dialect = scala213 \ No newline at end of file
diff --git a/co-purchase-analysis/build.sbt b/co-purchase-analysis/build.sbt
new file mode 100644
index 0000000..bd0d706
--- /dev/null
+++ b/co-purchase-analysis/build.sbt
@@ -0,0 +1,11 @@
+scalaVersion := "2.13.12"
+
+name := "co-purchase-analysis"
+organization := "it.unibo.cs.scp"
+version := "1.0"
+
+libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "2.3.0"
+libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.3"
+libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.3"
+
+
diff --git a/co-purchase-analysis/input.txt b/co-purchase-analysis/input.txt
new file mode 100644
index 0000000..4c05d8d
--- /dev/null
+++ b/co-purchase-analysis/input.txt
@@ -0,0 +1,9 @@
+1,12
+1,14
+2,8
+2,12
+2,14
+3,8
+3,12
+3,14
+3,16
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
new file mode 100644
index 0000000..ad79e9f
--- /dev/null
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -0,0 +1,198 @@
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.rdd.RDD
+import java.nio.file.{Paths, Files}
+
+/** A Spark application that analyzes co-purchased products.
+ *
+ * This object processes a CSV input file containing order-product pairs and
+ * generates a report of products that are frequently purchased together. The
+ * analysis helps identify product relationships and potential recommendations
+ * for a shopping system.
+ *
+ * Input format: CSV file with each line containing "orderId,productId" Output
+ * format: CSV file with each line containing "product1,product2,count"
+ *
+ * @example
+ * {{{
+ * // Run the application with input and output paths
+ * spark-submit co-purchase-analysis.jar input.csv output/
+ * }}}
+ */
+object CoPurchaseAnalysis {
+
+ /** Represents an order-product relationship.
+ *
+ * @param orderId
+ * The unique identifier for the order
+ * @param productId
+ * The unique identifier for the product
+ */
+ case class OrderProduct(orderId: Int, productId: Int)
+
+ /** Represents a pair of products that were purchased together.
+ *
+ * @param product1
+ * The identifier of the first product
+ * @param product2
+ * The identifier of the second product
+ */
+ case class ProductPair(product1: Int, product2: Int)
+
+ /** Represents the co-purchase relationship between two products.
+ *
+ * @param product1
+ * The identifier of the first product
+ * @param product2
+ * The identifier of the second product
+ * @param count
+ * The number of times these products were purchased together
+ */
+ case class CoPurchase(product1: Int, product2: Int, count: Int)
+
+ /** Validates command line arguments and checks file existence.
+ *
+ * @param args
+ * Command line arguments array containing input file path and output
+ * directory path
+ * @return
+ * Some(errorMessage) if validation fails, None if validation succeeds
+ */
+ def checkArguments(args: Array[String]): Option[String] = {
+ if (args.length != 2) {
+ Some("You must define input file and output folder.")
+ } else if (!Files.exists(Paths.get(args(0)))) {
+ Some(s"Input file `${args(0)}` does not exist.")
+ } else if (Files.exists(Paths.get(args(1)))) {
+ Some(s"Output folder `${args(1)}` already exists.")
+ } else {
+ None
+ }
+ }
+
+ /** Creates and configures a SparkSession.
+ *
+ * @param appName
+ * The name of the Spark application
+ * @param master
+ * The Spark master URL (e.g., "local", "yarn")
+ * @return
+ * Configured SparkSession instance
+ */
+ def createSparkSession(appName: String, master: String): SparkSession = {
+ SparkSession.builder
+ .appName(appName)
+ .config("spark.master", master)
+ .getOrCreate()
+ }
+
+ /** Parses a single line from the input file into an OrderProduct instance.
+ * Expects the line to be in CSV format with orderId and productId.
+ *
+ * @param line
+ * Input line in format "orderId,productId"
+ * @return
+ * OrderProduct instance containing the parsed data
+ */
+ def parseLine(line: String): OrderProduct = {
+ val parts = line.split(",")
+ OrderProduct(parts(0).toInt, parts(1).toInt)
+ }
+
+ /** Generates all possible pairs of products from a list of products in an
+ * order. Products are sorted before pairing to ensure consistent ordering.
+ *
+ * @param products
+ * List of product IDs from a single order
+ * @return
+ * List of ProductPair instances representing all possible product
+ * combinations
+ */
+ def generateProductPairs(products: List[Int]): List[ProductPair] = {
+ val sortedProducts = products.sorted
+ for {
+ i <- products.indices.toList
+ j <- (i + 1) until products.length
+ } yield {
+ if (products(i) < products(j)) {
+ ProductPair(products(i), products(j))
+ } else {
+ ProductPair(products(j), products(i))
+ }
+ }
+ // val sortedProducts = products.sorted
+ // for {
+ // i <- sortedProducts.indices.toList
+ // j <- (i + 1) until sortedProducts.length
+ // } yield {
+ // val product1 = sortedProducts(i)
+ // val product2 = sortedProducts(j)
+ //
+ // ProductPair(product1, product2)
+ // }
+ }
+
+ /** Processes the order data to generate co-purchase statistics.
+ *
+ * The processing pipeline includes:
+ * 1. Grouping orders by orderId 2. Generating product pairs for each order
+ * 3. Counting occurrences of each product pair
+ *
+ * @param data
+ * RDD containing OrderProduct instances
+ * @return
+ * RDD containing CoPurchase instances with purchase frequency counts
+ */
+ def processData(data: RDD[OrderProduct]): RDD[CoPurchase] = {
+ data
+ .groupBy(_.orderId)
+ .flatMap { case (_, orders) =>
+ generateProductPairs(orders.map(_.productId).toList)
+ }
+ .map(pair => (pair, 1))
+ .reduceByKey(_ + _)
+ .map { case (ProductPair(p1, p2), count) =>
+ CoPurchase(p1, p2, count)
+ }
+ }
+
+ /** Main entry point for the application.
+ *
+ * @param args
+ * Command line arguments array
+ */
+ def main(args: Array[String]): Unit = {
+ val argsError = checkArguments(args)
+
+ if (!argsError.isEmpty) {
+ println(argsError.get)
+ return
+ }
+
+ // Configuration values should be passed as parameters
+ val config = Map(
+ "appName" -> "Co-Purchase Analysis",
+ "master" -> "local[*]",
+ "inputPath" -> args(0),
+ "outputPath" -> args(1)
+ )
+
+ // Program execution composed of pure functions
+ val spark = createSparkSession(config("appName"), config("master"))
+ try {
+ spark.sparkContext.setLogLevel("ERROR")
+
+ val inputRDD = spark.sparkContext
+ .textFile(config("inputPath"))
+ .map(parseLine)
+
+ val result = processData(inputRDD)
+ .map { case (coPurchase: CoPurchase) =>
+ s"${coPurchase.product1},${coPurchase.product2},${coPurchase.count}"
+
+ }
+ .saveAsTextFile(config("outputPath"))
+ } finally {
+ spark.stop()
+ }
+ }
+}