diff options
author | Santo Cariotti <santo@dcariotti.me> | 2025-02-14 23:24:57 +0100 |
---|---|---|
committer | Santo Cariotti <santo@dcariotti.me> | 2025-02-14 23:24:57 +0100 |
commit | 16801825bf5d342c02c64b496687dc30f6423120 (patch) | |
tree | 23b2733564762fd9bed26602b4c07c045c3abd52 | |
parent | 2bb1d30882187e66f24c6211d7cde1c0641a6a18 (diff) |
Auto partitioner
-rw-r--r-- | README.md | 12 | ||||
-rw-r--r-- | co-purchase-analysis/src/main/scala/Main.scala | 38 | ||||
-rwxr-xr-x | scripts/05-dataproc-submit.sh | 8 |
3 files changed, 25 insertions, 33 deletions
@@ -41,10 +41,10 @@ $ sbt sbt:co-purchase-analysis> run input/ output/ ``` -The above commands will generate three files in the output/ folder that can be merged: +The above commands will generate two files in the output/ folder that can be merged: ```bash -$ cat output/_SUCCESS output/part-00000 output/part-00001 +$ cat output/_SUCCESS output/part-00000 8,14,2 12,16,1 14,16,1 @@ -101,13 +101,13 @@ $ scripts/00-create-service-account.sh; \ scripts/02-dataproc-copy-jar.sh; \ scripts/03-update-network-for-dataproc.sh; \ scripts/04-dataproc-create-cluster.sh 1 n1-standard-4 n1-standard-4; \ - scripts/05-dataproc-submit.sh 200; \ + scripts/05-dataproc-submit.sh; \ scripts/06-dataproc-update-cluster.sh 2 n1-standard-4 n1-standard-4; \ - scripts/05-dataproc-submit.sh 200; \ + scripts/05-dataproc-submit.sh; \ scripts/06-dataproc-update-cluster.sh 3 n1-standard-4 n1-standard-4; \ - scripts/05-dataproc-submit.sh 200; \ + scripts/05-dataproc-submit.sh; \ scripts/06-dataproc-update-cluster.sh 4 n1-standard-4 n1-standard-4; \ - scripts/05-dataproc-submit.sh 200 + scripts/05-dataproc-submit.sh ``` After that, you can also check the created 4 jobs. diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala index d3679b2..a711136 100644 --- a/co-purchase-analysis/src/main/scala/Main.scala +++ b/co-purchase-analysis/src/main/scala/Main.scala @@ -1,7 +1,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.rdd.RDD import org.apache.spark.HashPartitioner -import scala.util.Try /** A Spark application that analyzes co-purchased products. * @@ -15,8 +14,8 @@ import scala.util.Try * * @example * {{{ - * // Run the application with input path, output path and number of partitions - * spark-submit co-purchase-analysis.jar input.csv output/ 50 + * // Run the application with input path and output path + * spark-submit co-purchase-analysis.jar input.csv output/ * }}} */ object CoPurchaseAnalysis { @@ -42,18 +41,14 @@ object CoPurchaseAnalysis { /** Validates command line arguments and checks file existence. * * @param args - * Command line arguments array containing input file path, output - * directory path and partitions number + * Command line arguments array containing input file path and output + * directory path * @return * Some(errorMessage) if validation fails, None if validation succeeds */ def checkArguments(args: Array[String]): Option[String] = { - if (args.length != 3) { - return Some("Need params: <inputPath> <outputFolder> <numPartitions>") - } - - if (Try(args(2).toInt).isFailure) { - return Some(s"'${args(2)}' is not a valid integer") + if (args.length != 2) { + return Some("Need params: <inputPath> <outputFolder>") } return None @@ -63,15 +58,15 @@ object CoPurchaseAnalysis { * * @param appName * The name of the Spark application - * @param master - * The Spark master URL (e.g., "local", "yarn") * @return * Configured SparkSession instance */ - def createSparkSession(appName: String, master: String): SparkSession = { + def createSparkSession(appName: String): SparkSession = { var session = SparkSession.builder .appName(appName) - .config("spark.master", master) + .config("spark.executor.memory", "6g") + .config("spark.executor.cores", "4") + .config("spark.driver.memory", "4g") val creds = System.getenv("GOOGLE_APPLICATION_CREDENTIALS") if (creds != null) { @@ -155,14 +150,12 @@ object CoPurchaseAnalysis { // Configuration values should be passed as parameters val config = Map( "appName" -> "Co-Purchase Analysis", - "master" -> "local[*]", "inputPath" -> args(0), - "outputPath" -> args(1), - "partitionsNumber" -> args(2) + "outputPath" -> args(1) ) // Program execution composed of pure functions - val spark = createSparkSession(config("appName"), config("master")) + val spark = createSparkSession(config("appName")) try { spark.sparkContext.setLogLevel("ERROR") @@ -170,7 +163,12 @@ object CoPurchaseAnalysis { .textFile(config("inputPath")) .map(parseLine) - val result = processData(inputRDD, config("partitionsNumber").toInt) + val cores = spark.conf.get("spark.executor.cores", "4").toInt + val nodes = spark.conf.get("spark.executor.instance", "4").toInt + val partitinosNumber = + math.max(cores * nodes * 2, spark.sparkContext.defaultParallelism * 2) + + val result = processData(inputRDD, partitinosNumber) .saveAsTextFile(config("outputPath")) } finally { spark.stop() diff --git a/scripts/05-dataproc-submit.sh b/scripts/05-dataproc-submit.sh index 5b2153e..6da9a29 100755 --- a/scripts/05-dataproc-submit.sh +++ b/scripts/05-dataproc-submit.sh @@ -6,12 +6,6 @@ if [ ${DEBUG:+1} ]; then set -xo pipefail fi -if [ "$#" -ne 1 ]; then - echo "Usage: 'sh ${PWD}/$0 <num-partitions>'" - exit 1 -fi - -NUM_PARTITIONS="$1" INPUT_PATH="gs://${BUCKET_NAME}/input/" OUTPUT_PATH="gs://${BUCKET_NAME}/output" @@ -45,4 +39,4 @@ gcloud dataproc jobs submit spark \ --jar="gs://${BUCKET_NAME}/scala/co-purchase-analysis_2.12-1.0.jar" \ --region="${REGION}" \ --properties="spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \ - -- "${INPUT_PATH}" "${OUTPUT_PATH}" "${NUM_PARTITIONS}" + -- "${INPUT_PATH}" "${OUTPUT_PATH}" |