summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-01-13 19:08:36 +0100
committerSanto Cariotti <santo@dcariotti.me>2025-01-13 19:09:09 +0100
commit80930bb9d945b2ffee0fdda78ebd8cbe1caa4dc2 (patch)
tree589ff2610bab1c2d2facabc103974179d0c289e9
parente6d14f7b388f4f866234d444668d8801cbf9661c (diff)
Partitions number as argument
-rw-r--r--README.md10
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala35
-rwxr-xr-xscripts/05-dataproc-submit.sh8
3 files changed, 35 insertions, 18 deletions
diff --git a/README.md b/README.md
index 2a9f08d..f38c004 100644
--- a/README.md
+++ b/README.md
@@ -64,7 +64,7 @@ To test on Google Cloud, execute the following shell scripts in the given order:
- `scripts/02-dataproc-copy-jar.sh`
- `scripts/03-update-network-for-dataproc.sh`
- `scripts/04-dataproc-create-cluster.sh <num-workers> <master-machine> <worker-machine>`
-- `scripts/05-dataproc-submit.sh`
+- `scripts/05-dataproc-submit.sh <num-partitions>`
- `scripts/06-dataproc-update-cluster.sh <num-workers> <master-machine> <worker-machine>`
- `scripts/07-cleanup.sh`
@@ -93,13 +93,13 @@ $ scripts/00-create-service-account.sh; \
scripts/02-dataproc-copy-jar.sh; \
scripts/03-update-network-for-dataproc.sh; \
scripts/04-dataproc-create-cluster.sh 1 n1-standard-4 n1-standard-4; \
- scripts/05-dataproc-submit.sh; \
+ scripts/05-dataproc-submit.sh 200; \
scripts/06-dataproc-update-cluster.sh 2 n1-standard-4 n1-standard-4; \
- scripts/05-dataproc-submit.sh; \
+ scripts/05-dataproc-submit.sh 200; \
scripts/06-dataproc-update-cluster.sh 3 n1-standard-4 n1-standard-4; \
- scripts/05-dataproc-submit.sh; \
+ scripts/05-dataproc-submit.sh 200; \
scripts/06-dataproc-update-cluster.sh 4 n1-standard-4 n1-standard-4; \
- scripts/05-dataproc-submit.sh
+ scripts/05-dataproc-submit.sh 200
```
After that, you can also check the created 4 jobs.
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index 8fd82cb..e4938d8 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -1,6 +1,7 @@
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
+import scala.util.Try
/** A Spark application that analyzes co-purchased products.
*
@@ -14,8 +15,8 @@ import org.apache.spark.HashPartitioner
*
* @example
* {{{
- * // Run the application with input and output paths
- * spark-submit co-purchase-analysis.jar input.csv output/
+ * // Run the application with input path, output path and number of partitions
+ * spark-submit co-purchase-analysis.jar input.csv output/ 50
* }}}
*/
object CoPurchaseAnalysis {
@@ -41,17 +42,21 @@ object CoPurchaseAnalysis {
/** Validates command line arguments and checks file existence.
*
* @param args
- * Command line arguments array containing input file path and output
- * directory path
+ * Command line arguments array containing input file path, output
+ * directory path and partitions number
* @return
* Some(errorMessage) if validation fails, None if validation succeeds
*/
def checkArguments(args: Array[String]): Option[String] = {
- if (args.length != 2) {
- Some("You must define input file and output folder.")
- } else {
- None
+ if (args.length != 3) {
+ return Some("Need params: <inputPath> <outputFolder> <numPartitions>")
}
+
+ if (Try(args(2).toInt).isFailure) {
+ return Some(s"'${args(2)}' is not a valid integer")
+ }
+
+ return None
}
/** Creates and configures a SparkSession.
@@ -102,10 +107,15 @@ object CoPurchaseAnalysis {
*
* @param data
* RDD containing OrderProduct instances
+ * @param partitionsNumber
+ * Number of partitions used by HashPartitioner
* @return
* RDD containing CoPurchase instances with purchase frequency counts
*/
- def processData(data: RDD[OrderProduct]): RDD[String] = {
+ def processData(
+ data: RDD[OrderProduct],
+ partitionsNumber: Int
+ ): RDD[String] = {
val pairs = data
.map(order => (order.orderId, order.productId))
.groupByKey()
@@ -116,7 +126,7 @@ object CoPurchaseAnalysis {
y <- products if x < y
} yield (ProductPair(x, y), 1)
}
- .partitionBy(new HashPartitioner(50))
+ .partitionBy(new HashPartitioner(partitionsNumber))
val coProducts = pairs.reduceByKey(_ + _)
@@ -143,7 +153,8 @@ object CoPurchaseAnalysis {
"appName" -> "Co-Purchase Analysis",
"master" -> "local[*]",
"inputPath" -> args(0),
- "outputPath" -> args(1)
+ "outputPath" -> args(1),
+ "partitionsNumber" -> args(2)
)
// Program execution composed of pure functions
@@ -155,7 +166,7 @@ object CoPurchaseAnalysis {
.textFile(config("inputPath"))
.map(parseLine)
- val result = processData(inputRDD)
+ val result = processData(inputRDD, config("partitionsNumber").toInt)
.saveAsTextFile(config("outputPath"))
} finally {
spark.stop()
diff --git a/scripts/05-dataproc-submit.sh b/scripts/05-dataproc-submit.sh
index b70e138..b2c9e42 100755
--- a/scripts/05-dataproc-submit.sh
+++ b/scripts/05-dataproc-submit.sh
@@ -2,6 +2,12 @@
set -e
+if [ "$#" -ne 1 ]; then
+ echo "Usage: 'sh ${PWD}/$0 <num-partitions>'"
+ exit 1
+fi
+
+NUM_PARTITIONS="$1"
INPUT_PATH="gs://${BUCKET_NAME}/input/"
OUTPUT_PATH="gs://${BUCKET_NAME}/output"
@@ -35,4 +41,4 @@ gcloud dataproc jobs submit spark \
--jar="gs://${BUCKET_NAME}/scala/co-purchase-analysis_2.12-1.0.jar" \
--region="${REGION}" \
--properties="spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \
- -- "${INPUT_PATH}" "${OUTPUT_PATH}"
+ -- "${INPUT_PATH}" "${OUTPUT_PATH}" "${NUM_PARTITIONS}"