De sobra es conocido Apache Spark como motor de procesamiento de datos masivo. El objetivo de este post no es hacer una introducción a Spark, sino, más bien, ver qué capacidades ofrece el servicio Spark Serverless de Google Cloud, cuáles son sus ventajas, inconvenientes y limitaciones, así como un ejemplo de productivización de un proceso en la vida real.

¿Qué es Spark Serverless?

Cuando hablamos de Spark Serverless en Google Cloud realmente nos referimos al servicio Dataproc Serverless. Nos permite ejecutar procesos batch de Spark sin tener que preocuparnos por la infraestructura subyacente ni por la gestión de recursos.

¿Cómo funciona Dataproc Serverless?

Spark Serverless utiliza un modelo de precios basado en uso. Los usuarios solo pagan por los recursos que utilizan, lo que puede ayudar a ahorrar dinero en cargas de trabajo de Spark que son de corta duración o que varían en tamaño.

A diferencia de Dataproc on Compute Engine, donde Google se encarga de la gestión y configuración de las máquinas y de los framework del ecosistema Hadoop y tienes que especificar parámetros de las máquinas, tipo de discos, etc., en Dataproc Serverless, lo único que tienes que especificar es la capacidad de cómputo que quieres para tu proceso.

Pasamos entonces de una visión genérica de configuración de cluster (donde se podrían ejecutar muchos procesos) a una visión de proceso. Aquí una tabla con las principales características del servicio Dataproc en sus diferentes sabores:

FunciónDataproc ServerlessDataproc en Compute Engine
Frameworks de procesamientoBatch: Spark 3.4 y versiones anteriores Interactivo: Kernel de PySpark para Spark 3.4 y versiones anterioresSpark 3.3 y versiones anteriores Otros frameworks de código abierto, como Hive, Flink, Trino y Kafka
ServerlessNo
Tiempo de arranque60 segundos90 segundos
Control de la infraestructuraNo
Administración de recursosBasado en SparkBasada en YARN
Asistencia de GPUPlanificado
Sesiones interactivasNo
Contenedores personalizadosNo.
Acceso a VM (por ejemplo, SSH)No
Versiones de JavaJava 17 y 11Versiones anteriores compatibles
Asistencia de OS LoginNo

Los procesos Batch que podemos ejecutar en Dataproc Serverless son:

De momento está en preview, pero también se puede escribir y ejecutar código de manera interactiva en notebooks de Jupyter teniendo disponible una Spark Session.

Productivizar un proceso en Spark Serverless

Vamos a ver un ejemplo real de cómo ejecutar un proceso de Spark sobre Dataproc Serveless y llevarlo a producción. Los ejemplos de la documentación oficial, on-hands, labs, etc., son ideales para probar la tecnología, pero a veces quedan lejos de lo que es poner un proceso en producción.

Nuestro proceso leerá 131 millones de tickets que están en formato CSV almacenado en Cloud Storage, los procesará y los insertará en BigQuery.

Proceso Bigquery

Definición de variables globales que utilizaremos:

# GLOBAL VARS
export PROJECT_ID=spark-serverless-bigquery
export REGION_GCP=europe-west1
export ARTIFACT_REPOSITORY=spark-serverless
export IMAGE=spark-serverless-bigquery-goodly

Buckets necesarios:

#BUCKETS
gcloud storage buckets create gs://metadata-dataproc-goodly \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access

gcloud storage buckets create gs://staging-dataproc-goodly \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access

gcloud storage buckets create gs://temp_dataproc_bigquery_indirect \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access

Configuración de networking. En este caso, vamos a crear una VPC desde 0, pero en un entorno productivo habrá que integrarse dentro de la VPC que esté definida y habilitar el private ip google access.

# NETWORKS
gcloud compute networks create spark-serverless-vpc \
--project=$PROJECT_ID \
--subnet-mode=auto \
--mtu=1460 \
--bgp-routing-mode=regional

gcloud compute firewall-rules create allow-internal-ingress \
--project=$PROJECT_ID \
--network="spark-serverless-vpc" \
--source-ranges="10.132.0.0/20" \
--direction="ingress" \
--action="allow" \
--rules="all"

gcloud compute networks subnets update spark-serverless-vpc \
--region=$REGION_GCP \
--enable-private-ip-google-access

Vamos a crear un repositorio de artefactos, en nuestro caso en GCP, pero puede ser cualquier otro que permita la persistencia de imágenes de docker.

# ARTIFACTORY
gcloud artifacts repositories create $ARTIFACT_REPOSITORY --repository-format=docker \
    --location=$REGION_GCP --description="Docker repository"

En este punto, tenemos la infraestructura lista para lanzar por primera vez un proceso de CI sobre nuestro repositorio de código. Nosotros utilizaremos Cloud Build, pero puede utilizarse cualquier herramienta de Integración Continua.

gcloud builds submit --project=$PROJECT_ID \
--region=$REGION_GCP \
--substitutions=_REGION_GCP=$REGION_GCP,_REPOSITORY=$ARTIFACT_REPOSITORY,_IMAGE=$IMAGE \
--config=cloudbuild.yaml .

El fichero cloudbuild.yaml (donde definimos los pasos de CI y la construcción de la imagen) es tal que así:

steps:
 - id: 'mvn test'
   name: maven:3-jdk-11-slim
   entrypoint: mvn
   args: [ 'test' ]

 - id: 'mvn package'
   name: maven:3-jdk-11-slim
   entrypoint: 'bash'
   args:
     - '-c'
     - |
       mvn package -DskipTests
       echo $(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) > version.txt

 - id: 'Prepare provided jar'
   name: 'gcr.io/cloud-builders/gsutil'
   args: [ 'cp', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar','.' ]

 - id: 'Build image'
   name: 'gcr.io/cloud-builders/docker'
   entrypoint: 'bash'
   args:
     - '-c'
     - |
       VERSION=$(cat version.txt)
       docker build -t europe-west1-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}:$VERSION \
       --build-arg JAR_FILE=target/spark-serverless-bigquery-$VERSION-jar-with-dependencies.jar .

 - id: 'Push image'
   name: 'gcr.io/cloud-builders/docker'
   entrypoint: 'bash'
   args:
     - '-c'
     - |
       VERSION=$(cat version.txt)
       docker push europe-west1-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}:$VERSION

substitutions:
 _REPOSITORY: spark-serverless # default value
 _IMAGE: spark-serverless-bigquery-goodly # default value
 _REGION_GCP: europe-west1

options:
 substitution_option: 'ALLOW_LOOSE'
 logging: CLOUD_LOGGING_ONLY

Una vez construida la imagen, deberíamos tener en nuestro artifact registry (o equivalente) una imagen de docker con nuestro código dentro:

Imagen del docker con nuestro código.

Tip: recomendamos seguir este artículo para el fichero Dockerfile que servirá para crear el custom container que lanzaremos en el siguiente punto.

Procedemos a lanzar nuestro proceso:

gcloud dataproc batches submit spark \
--batch `echo "csv-to-bq-dataproc-serverless-$(date "+%Y-%m-%d-%H%M%S")"` \
--properties spark.driver.cores=4\
,spark.driver.memory=8g\
,spark.dataproc.driver.disk.size=250g\
,spark.executor.cores=8\
,spark.executor.memory=16g\
,spark.dataproc.executor.disk.size=500g\
,spark.dynamicAllocation.enabled=true\
,spark.dynamicAllocation.initialExecutors=8\
,spark.dynamicAllocation.minExecutors=8\
,spark.dynamicAllocation.maxExecutors=8 \
--project=$PROJECT_ID \
--container-image=$REGION_GCP-docker.pkg.dev/spark-serverless-bigquery/$ARTIFACT_REPOSITORY/$IMAGE:1.0.1 \
--version=1.1 \
--region=$REGION_GCP \
--subnet=https://www.googleapis.com/compute/v1/projects/$PROJECT_ID/regions/$REGION_GCP/subnetworks/spark-serverless-vpc \
--class=com.paradigma.tech.goodly.sparkserverless.gcp.commitconf.CsvToBqDataprocServerless

Con esto hemos lanzado nuestro primer proceso de Spark y habríamos terminado aquí este post si el mundo fuese ideal, pero no lo es.

Cuando lanzas un proceso, puede fallar o, lo que es peor, puede no tener el performance que esperas. Es en ese punto donde la UI de Spark es fundamental, ya que nos permitirá ver nuestros Jobs, Stages, Tasks así como la distribución de los datos por las diferentes particiones, etc.

Por defecto, Dataproc Serverless no lo da, así que necesitamos un history server para poder tener toda la información visual que nos proporciona Spark por defecto.

# Creación History Server

gcloud dataproc clusters create phs-cluster \
--enable-component-gateway \
--bucket staging-dataproc-goodly --region $REGION_GCP \
--subnet spark-serverless-vpc --no-address \
--single-node --master-machine-type n2-standard-4 \
--master-boot-disk-size 200 --image-version 2.1-debian11 \
--properties mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://metadata-dataproc-goodly/*/mapreduce-job-history/done,yarn:yarn.nodemanager.remote-app-log-dir=gs://metadata-dataproc-goodly/*/yarn-logs,spark:spark.eventLog.dir=gs://metadata-dataproc-goodly/spark-event-log/spark-job-history,spark:spark.history.fs.logDirectory=gs://metadata-dataproc-goodly/*/spark-job-history,spark:spark.history.custom.executor.log.url.applyIncompleteApplication=false,spark:spark.history.custom.executor.log.url={{YARN_LOG_SERVER_URL}}/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}} \
--project $PROJECT_ID

Si volvemos a lanzar el proceso, pero esta vez especificando que utilice el history server que hemos definido como parámetro (“--history-server-cluster=projects/$PROJECT_ID/regions/$REGION_GCP/clusters/phs-cluster \”), ya podremos ver habilitado el botón “View Spark History Server” en la interfaz de Dataproc > Serverless > Batch y nos llevará a la UI de Spark con información sobre nuestra ejecución.

UI de Spark

Podemos ver, entre otros resultados, el tiempo de duración total del proceso (2,8 minutos), el tiempo que cada job, número de stages por job, etc.

¿Ventajas y desventajas de Spark Serverless

Spark Serverless ofrece una serie de ventajas sobre los enfoques que requieren de infraestructura dedicada, como por ejemplo:

Pero también tiene algunos puntos de mejora:

¿Cuándo merece la pena usar Dataproc serverless frente a Dataproc sobre Compute Engine?

Dataproc sobre Compute Engine. Cluster permanente 24x7

Hay que tener en cuenta que, en este caso, vamos a tener que pagar el cluster, independientemente de que se esté usando o no.

Dataproc Serveless

En cargas batch recurrentes, diarias, semanales, mensuales, etc. Debería de ser la opción por defecto y si con el tiempo, se dispone de tantas cargas que siempre hay alguna corriendo la consola, puede tener sentido crear un cluster con Dataproc sobre Compute Engine.

Dataproc sobre Compute Engine. Cluster efimeros

Esta opción es un punto intermedio entre las dos anteriores y probablemente deje de tener sentido teniendo ya la posibilidad de utilizar Dataproc Serverless. Consiste básicamente en crear un cluster, lanzar tus cargas y eliminar el cluster cuando hayan terminado.

Su uso puede acotarse a cuando no podamos lanzar una carga en Dataproc Serverless por alguna limitación técnica, como por ejemplo procesamiento con soporte para GPUs o procesos batch que duran más de 24 horas.

Como conclusión, podemos decir que Dataproc Serverless se convierte en la opción por defecto si quieres lanzar procesos de Spark teniendo en cuenta siempre las limitaciones técnicas que hemos comentado anteriormente.

Tell us what you think.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Enviar.
Goodly logo