diff options
Diffstat (limited to 'co-purchase-analysis/src')
| -rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 35 | 
1 files changed, 23 insertions, 12 deletions
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index 8fd82cb..e4938d8 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -1,6 +1,7 @@  import org.apache.spark.sql.SparkSession  import org.apache.spark.rdd.RDD  import org.apache.spark.HashPartitioner +import scala.util.Try  /** A Spark application that analyzes co-purchased products.    * @@ -14,8 +15,8 @@ import org.apache.spark.HashPartitioner    *    * @example    *   {{{ -  * // Run the application with input and output paths -  * spark-submit co-purchase-analysis.jar input.csv output/ +  * // Run the application with input path, output path and number of partitions +  * spark-submit co-purchase-analysis.jar input.csv output/ 50    *   }}}    */  object CoPurchaseAnalysis { @@ -41,17 +42,21 @@ object CoPurchaseAnalysis {    /** Validates command line arguments and checks file existence.      *      * @param args -    *   Command line arguments array containing input file path and output -    *   directory path +    *   Command line arguments array containing input file path, output +    *   directory path and partitions number      * @return      *   Some(errorMessage) if validation fails, None if validation succeeds      */    def checkArguments(args: Array[String]): Option[String] = { -    if (args.length != 2) { -      Some("You must define input file and output folder.") -    } else { -      None +    if (args.length != 3) { +      return Some("Need params: <inputPath> <outputFolder> <numPartitions>")      } + +    if (Try(args(2).toInt).isFailure) { +      return Some(s"'${args(2)}' is not a valid integer") +    } + +    return None    }    /** Creates and configures a SparkSession. @@ -102,10 +107,15 @@ object CoPurchaseAnalysis {      *      * @param data      *   RDD containing OrderProduct instances +    * @param partitionsNumber +    *   Number of partitions used by HashPartitioner      * @return      *   RDD containing CoPurchase instances with purchase frequency counts      */ -  def processData(data: RDD[OrderProduct]): RDD[String] = { +  def processData( +      data: RDD[OrderProduct], +      partitionsNumber: Int +  ): RDD[String] = {      val pairs = data        .map(order => (order.orderId, order.productId))        .groupByKey() @@ -116,7 +126,7 @@ object CoPurchaseAnalysis {            y <- products if x < y          } yield (ProductPair(x, y), 1)        } -      .partitionBy(new HashPartitioner(50)) +      .partitionBy(new HashPartitioner(partitionsNumber))      val coProducts = pairs.reduceByKey(_ + _) @@ -143,7 +153,8 @@ object CoPurchaseAnalysis {        "appName" -> "Co-Purchase Analysis",        "master" -> "local[*]",        "inputPath" -> args(0), -      "outputPath" -> args(1) +      "outputPath" -> args(1), +      "partitionsNumber" -> args(2)      )      // Program execution composed of pure functions @@ -155,7 +166,7 @@ object CoPurchaseAnalysis {          .textFile(config("inputPath"))          .map(parseLine) -      val result = processData(inputRDD) +      val result = processData(inputRDD, config("partitionsNumber").toInt)          .saveAsTextFile(config("outputPath"))      } finally {        spark.stop()  |