summaryrefslogtreecommitdiff
path: root/co-purchase-analysis/src/main/scala/Main.scala
diff options
context:
space:
mode:
Diffstat (limited to 'co-purchase-analysis/src/main/scala/Main.scala')
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala35
1 files changed, 23 insertions, 12 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index 8fd82cb..e4938d8 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -1,6 +1,7 @@
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
+import scala.util.Try
/** A Spark application that analyzes co-purchased products.
*
@@ -14,8 +15,8 @@ import org.apache.spark.HashPartitioner
*
* @example
* {{{
- * // Run the application with input and output paths
- * spark-submit co-purchase-analysis.jar input.csv output/
+ * // Run the application with input path, output path and number of partitions
+ * spark-submit co-purchase-analysis.jar input.csv output/ 50
* }}}
*/
object CoPurchaseAnalysis {
@@ -41,17 +42,21 @@ object CoPurchaseAnalysis {
/** Validates command line arguments and checks file existence.
*
* @param args
- * Command line arguments array containing input file path and output
- * directory path
+ * Command line arguments array containing input file path, output
+ * directory path and partitions number
* @return
* Some(errorMessage) if validation fails, None if validation succeeds
*/
def checkArguments(args: Array[String]): Option[String] = {
- if (args.length != 2) {
- Some("You must define input file and output folder.")
- } else {
- None
+ if (args.length != 3) {
+ return Some("Need params: <inputPath> <outputFolder> <numPartitions>")
}
+
+ if (Try(args(2).toInt).isFailure) {
+ return Some(s"'${args(2)}' is not a valid integer")
+ }
+
+ return None
}
/** Creates and configures a SparkSession.
@@ -102,10 +107,15 @@ object CoPurchaseAnalysis {
*
* @param data
* RDD containing OrderProduct instances
+ * @param partitionsNumber
+ * Number of partitions used by HashPartitioner
* @return
* RDD containing CoPurchase instances with purchase frequency counts
*/
- def processData(data: RDD[OrderProduct]): RDD[String] = {
+ def processData(
+ data: RDD[OrderProduct],
+ partitionsNumber: Int
+ ): RDD[String] = {
val pairs = data
.map(order => (order.orderId, order.productId))
.groupByKey()
@@ -116,7 +126,7 @@ object CoPurchaseAnalysis {
y <- products if x < y
} yield (ProductPair(x, y), 1)
}
- .partitionBy(new HashPartitioner(50))
+ .partitionBy(new HashPartitioner(partitionsNumber))
val coProducts = pairs.reduceByKey(_ + _)
@@ -143,7 +153,8 @@ object CoPurchaseAnalysis {
"appName" -> "Co-Purchase Analysis",
"master" -> "local[*]",
"inputPath" -> args(0),
- "outputPath" -> args(1)
+ "outputPath" -> args(1),
+ "partitionsNumber" -> args(2)
)
// Program execution composed of pure functions
@@ -155,7 +166,7 @@ object CoPurchaseAnalysis {
.textFile(config("inputPath"))
.map(parseLine)
- val result = processData(inputRDD)
+ val result = processData(inputRDD, config("partitionsNumber").toInt)
.saveAsTextFile(config("outputPath"))
} finally {
spark.stop()