diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-01-13 19:08:36 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-01-13 19:09:09 +0100 |
commit | 80930bb9d945b2ffee0fdda78ebd8cbe1caa4dc2 (patch) | |
tree | 589ff2610bab1c2d2facabc103974179d0c289e9 | |
parent | e6d14f7b388f4f866234d444668d8801cbf9661c (diff) |
Partitions number as argument
-rw-r--r-- | README.md | 10 | ||||
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 35 | ||||
-rwxr-xr-x | scripts/05-dataproc-submit.sh | 8 |
3 files changed, 35 insertions, 18 deletions
@@ -64,7 +64,7 @@ To test on Google Cloud, execute the following shell scripts in the given order: - `scripts/02-dataproc-copy-jar.sh` - `scripts/03-update-network-for-dataproc.sh` - `scripts/04-dataproc-create-cluster.sh <num-workers> <master-machine> <worker-machine>` -- `scripts/05-dataproc-submit.sh` +- `scripts/05-dataproc-submit.sh <num-partitions>` - `scripts/06-dataproc-update-cluster.sh <num-workers> <master-machine> <worker-machine>` - `scripts/07-cleanup.sh` @@ -93,13 +93,13 @@ $ scripts/00-create-service-account.sh; \ scripts/02-dataproc-copy-jar.sh; \ scripts/03-update-network-for-dataproc.sh; \ scripts/04-dataproc-create-cluster.sh 1 n1-standard-4 n1-standard-4; \ - scripts/05-dataproc-submit.sh; \ + scripts/05-dataproc-submit.sh 200; \ scripts/06-dataproc-update-cluster.sh 2 n1-standard-4 n1-standard-4; \ - scripts/05-dataproc-submit.sh; \ + scripts/05-dataproc-submit.sh 200; \ scripts/06-dataproc-update-cluster.sh 3 n1-standard-4 n1-standard-4; \ - scripts/05-dataproc-submit.sh; \ + scripts/05-dataproc-submit.sh 200; \ scripts/06-dataproc-update-cluster.sh 4 n1-standard-4 n1-standard-4; \ - scripts/05-dataproc-submit.sh + scripts/05-dataproc-submit.sh 200 ``` After that, you can also check the created 4 jobs. diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index 8fd82cb..e4938d8 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -1,6 +1,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD import org.apache.spark.HashPartitioner +import scala.util.Try /** A Spark application that analyzes co-purchased products. * @@ -14,8 +15,8 @@ import org.apache.spark.HashPartitioner * * @example * {{{ - * // Run the application with input and output paths - * spark-submit co-purchase-analysis.jar input.csv output/ + * // Run the application with input path, output path and number of partitions + * spark-submit co-purchase-analysis.jar input.csv output/ 50 * }}} */ object CoPurchaseAnalysis { @@ -41,17 +42,21 @@ object CoPurchaseAnalysis { /** Validates command line arguments and checks file existence. * * @param args - * Command line arguments array containing input file path and output - * directory path + * Command line arguments array containing input file path, output + * directory path and partitions number * @return * Some(errorMessage) if validation fails, None if validation succeeds */ def checkArguments(args: Array[String]): Option[String] = { - if (args.length != 2) { - Some("You must define input file and output folder.") - } else { - None + if (args.length != 3) { + return Some("Need params: <inputPath> <outputFolder> <numPartitions>") } + + if (Try(args(2).toInt).isFailure) { + return Some(s"'${args(2)}' is not a valid integer") + } + + return None } /** Creates and configures a SparkSession. @@ -102,10 +107,15 @@ object CoPurchaseAnalysis { * * @param data * RDD containing OrderProduct instances + * @param partitionsNumber + * Number of partitions used by HashPartitioner * @return * RDD containing CoPurchase instances with purchase frequency counts */ - def processData(data: RDD[OrderProduct]): RDD[String] = { + def processData( + data: RDD[OrderProduct], + partitionsNumber: Int + ): RDD[String] = { val pairs = data .map(order => (order.orderId, order.productId)) .groupByKey() @@ -116,7 +126,7 @@ object CoPurchaseAnalysis { y <- products if x < y } yield (ProductPair(x, y), 1) } - .partitionBy(new HashPartitioner(50)) + .partitionBy(new HashPartitioner(partitionsNumber)) val coProducts = pairs.reduceByKey(_ + _) @@ -143,7 +153,8 @@ object CoPurchaseAnalysis { "appName" -> "Co-Purchase Analysis", "master" -> "local[*]", "inputPath" -> args(0), - "outputPath" -> args(1) + "outputPath" -> args(1), + "partitionsNumber" -> args(2) ) // Program execution composed of pure functions @@ -155,7 +166,7 @@ object CoPurchaseAnalysis { .textFile(config("inputPath")) .map(parseLine) - val result = processData(inputRDD) + val result = processData(inputRDD, config("partitionsNumber").toInt) .saveAsTextFile(config("outputPath")) } finally { spark.stop() diff --git a/scripts/05-dataproc-submit.sh b/scripts/05-dataproc-submit.sh index b70e138..b2c9e42 100755 --- a/scripts/05-dataproc-submit.sh +++ b/scripts/05-dataproc-submit.sh @@ -2,6 +2,12 @@ set -e +if [ "$#" -ne 1 ]; then + echo "Usage: 'sh ${PWD}/$0 <num-partitions>'" + exit 1 +fi + +NUM_PARTITIONS="$1" INPUT_PATH="gs://${BUCKET_NAME}/input/" OUTPUT_PATH="gs://${BUCKET_NAME}/output" @@ -35,4 +41,4 @@ gcloud dataproc jobs submit spark \ --jar="gs://${BUCKET_NAME}/scala/co-purchase-analysis_2.12-1.0.jar" \ --region="${REGION}" \ --properties="spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \ - -- "${INPUT_PATH}" "${OUTPUT_PATH}" + -- "${INPUT_PATH}" "${OUTPUT_PATH}" "${NUM_PARTITIONS}" |