summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanto Cariotti <santo@dcariotti.me>2025-02-14 23:24:57 +0100
committerSanto Cariotti <santo@dcariotti.me>2025-02-14 23:24:57 +0100
commit16801825bf5d342c02c64b496687dc30f6423120 (patch)
tree23b2733564762fd9bed26602b4c07c045c3abd52
parent2bb1d30882187e66f24c6211d7cde1c0641a6a18 (diff)
Auto partitioner
-rw-r--r--README.md12
-rw-r--r--co-purchase-analysis/src/main/scala/Main.scala38
-rwxr-xr-xscripts/05-dataproc-submit.sh8
3 files changed, 25 insertions, 33 deletions
diff --git a/README.md b/README.md
index 079b5f6..5c53535 100644
--- a/README.md
+++ b/README.md
@@ -41,10 +41,10 @@ $ sbt
sbt:co-purchase-analysis> run input/ output/
```
-The above commands will generate three files in the output/ folder that can be merged:
+The above commands will generate two files in the output/ folder that can be merged:
```bash
-$ cat output/_SUCCESS output/part-00000 output/part-00001
+$ cat output/_SUCCESS output/part-00000
8,14,2
12,16,1
14,16,1
@@ -101,13 +101,13 @@ $ scripts/00-create-service-account.sh; \
scripts/02-dataproc-copy-jar.sh; \
scripts/03-update-network-for-dataproc.sh; \
scripts/04-dataproc-create-cluster.sh 1 n1-standard-4 n1-standard-4; \
- scripts/05-dataproc-submit.sh 200; \
+ scripts/05-dataproc-submit.sh; \
scripts/06-dataproc-update-cluster.sh 2 n1-standard-4 n1-standard-4; \
- scripts/05-dataproc-submit.sh 200; \
+ scripts/05-dataproc-submit.sh; \
scripts/06-dataproc-update-cluster.sh 3 n1-standard-4 n1-standard-4; \
- scripts/05-dataproc-submit.sh 200; \
+ scripts/05-dataproc-submit.sh; \
scripts/06-dataproc-update-cluster.sh 4 n1-standard-4 n1-standard-4; \
- scripts/05-dataproc-submit.sh 200
+ scripts/05-dataproc-submit.sh
```
After that, you can also check the created 4 jobs.
diff --git a/co-purchase-analysis/src/main/scala/Main.scala b/co-purchase-analysis/src/main/scala/Main.scala
index d3679b2..a711136 100644
--- a/co-purchase-analysis/src/main/scala/Main.scala
+++ b/co-purchase-analysis/src/main/scala/Main.scala
@@ -1,7 +1,6 @@
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
-import scala.util.Try
/** A Spark application that analyzes co-purchased products.
*
@@ -15,8 +14,8 @@ import scala.util.Try
*
* @example
* {{{
- * // Run the application with input path, output path and number of partitions
- * spark-submit co-purchase-analysis.jar input.csv output/ 50
+ * // Run the application with input path and output path
+ * spark-submit co-purchase-analysis.jar input.csv output/
* }}}
*/
object CoPurchaseAnalysis {
@@ -42,18 +41,14 @@ object CoPurchaseAnalysis {
/** Validates command line arguments and checks file existence.
*
* @param args
- * Command line arguments array containing input file path, output
- * directory path and partitions number
+ * Command line arguments array containing input file path and output
+ * directory path
* @return
* Some(errorMessage) if validation fails, None if validation succeeds
*/
def checkArguments(args: Array[String]): Option[String] = {
- if (args.length != 3) {
- return Some("Need params: <inputPath> <outputFolder> <numPartitions>")
- }
-
- if (Try(args(2).toInt).isFailure) {
- return Some(s"'${args(2)}' is not a valid integer")
+ if (args.length != 2) {
+ return Some("Need params: <inputPath> <outputFolder>")
}
return None
@@ -63,15 +58,15 @@ object CoPurchaseAnalysis {
*
* @param appName
* The name of the Spark application
- * @param master
- * The Spark master URL (e.g., "local", "yarn")
* @return
* Configured SparkSession instance
*/
- def createSparkSession(appName: String, master: String): SparkSession = {
+ def createSparkSession(appName: String): SparkSession = {
var session = SparkSession.builder
.appName(appName)
- .config("spark.master", master)
+ .config("spark.executor.memory", "6g")
+ .config("spark.executor.cores", "4")
+ .config("spark.driver.memory", "4g")
val creds = System.getenv("GOOGLE_APPLICATION_CREDENTIALS")
if (creds != null) {
@@ -155,14 +150,12 @@ object CoPurchaseAnalysis {
// Configuration values should be passed as parameters
val config = Map(
"appName" -> "Co-Purchase Analysis",
- "master" -> "local[*]",
"inputPath" -> args(0),
- "outputPath" -> args(1),
- "partitionsNumber" -> args(2)
+ "outputPath" -> args(1)
)
// Program execution composed of pure functions
- val spark = createSparkSession(config("appName"), config("master"))
+ val spark = createSparkSession(config("appName"))
try {
spark.sparkContext.setLogLevel("ERROR")
@@ -170,7 +163,12 @@ object CoPurchaseAnalysis {
.textFile(config("inputPath"))
.map(parseLine)
- val result = processData(inputRDD, config("partitionsNumber").toInt)
+ val cores = spark.conf.get("spark.executor.cores", "4").toInt
+ val nodes = spark.conf.get("spark.executor.instance", "4").toInt
+ val partitinosNumber =
+ math.max(cores * nodes * 2, spark.sparkContext.defaultParallelism * 2)
+
+ val result = processData(inputRDD, partitinosNumber)
.saveAsTextFile(config("outputPath"))
} finally {
spark.stop()
diff --git a/scripts/05-dataproc-submit.sh b/scripts/05-dataproc-submit.sh
index 5b2153e..6da9a29 100755
--- a/scripts/05-dataproc-submit.sh
+++ b/scripts/05-dataproc-submit.sh
@@ -6,12 +6,6 @@ if [ ${DEBUG:+1} ]; then
set -xo pipefail
fi
-if [ "$#" -ne 1 ]; then
- echo "Usage: 'sh ${PWD}/$0 <num-partitions>'"
- exit 1
-fi
-
-NUM_PARTITIONS="$1"
INPUT_PATH="gs://${BUCKET_NAME}/input/"
OUTPUT_PATH="gs://${BUCKET_NAME}/output"
@@ -45,4 +39,4 @@ gcloud dataproc jobs submit spark \
--jar="gs://${BUCKET_NAME}/scala/co-purchase-analysis_2.12-1.0.jar" \
--region="${REGION}" \
--properties="spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \
- -- "${INPUT_PATH}" "${OUTPUT_PATH}" "${NUM_PARTITIONS}"
+ -- "${INPUT_PATH}" "${OUTPUT_PATH}"