De sobra es conocido Apache Spark como motor de procesamiento de datos masivo. El objetivo de este post no es hacer una introducción a Spark, sino, más bien, ver qué capacidades ofrece el servicio Spark Serverless de Google Cloud, cuáles son sus ventajas, inconvenientes y limitaciones, así como un ejemplo de productivización de un proceso en la vida real.
Cuando hablamos de Spark Serverless en Google Cloud realmente nos referimos al servicio Dataproc Serverless. Nos permite ejecutar procesos batch de Spark sin tener que preocuparnos por la infraestructura subyacente ni por la gestión de recursos.
Spark Serverless utiliza un modelo de precios basado en uso. Los usuarios solo pagan por los recursos que utilizan, lo que puede ayudar a ahorrar dinero en cargas de trabajo de Spark que son de corta duración o que varían en tamaño.
A diferencia de Dataproc on Compute Engine, donde Google se encarga de la gestión y configuración de las máquinas y de los framework del ecosistema Hadoop y tienes que especificar parámetros de las máquinas, tipo de discos, etc., en Dataproc Serverless, lo único que tienes que especificar es la capacidad de cómputo que quieres para tu proceso.
Pasamos entonces de una visión genérica de configuración de cluster (donde se podrían ejecutar muchos procesos) a una visión de proceso. Aquí una tabla con las principales características del servicio Dataproc en sus diferentes sabores:
Función | Dataproc Serverless | Dataproc en Compute Engine |
---|
Frameworks de procesamiento | Batch: Spark 3.4 y versiones anteriores Interactivo: Kernel de PySpark para Spark 3.4 y versiones anteriores | Spark 3.3 y versiones anteriores Otros frameworks de código abierto, como Hive, Flink, Trino y Kafka |
Serverless | Sí | No |
Tiempo de arranque | 60 segundos | 90 segundos |
Control de la infraestructura | No | Sí |
Administración de recursos | Basado en Spark | Basada en YARN |
Asistencia de GPU | Planificado | Sí |
Sesiones interactivas | Sí | No |
Contenedores personalizados | Sí | No. |
Acceso a VM (por ejemplo, SSH) | No | Sí |
Versiones de Java | Java 17 y 11 | Versiones anteriores compatibles |
Asistencia de OS Login | No | Sí |
Los procesos Batch que podemos ejecutar en Dataproc Serverless son:
- PySpark.
- Spark SQL.
- Spark R.
- Spark (Java o Scala).
De momento está en preview, pero también se puede escribir y ejecutar código de manera interactiva en notebooks de Jupyter teniendo disponible una Spark Session.
Vamos a ver un ejemplo real de cómo ejecutar un proceso de Spark sobre Dataproc Serveless y llevarlo a producción. Los ejemplos de la documentación oficial, on-hands, labs, etc., son ideales para probar la tecnología, pero a veces quedan lejos de lo que es poner un proceso en producción.
Nuestro proceso leerá 131 millones de tickets que están en formato CSV almacenado en Cloud Storage, los procesará y los insertará en BigQuery.
Definición de variables globales que utilizaremos:
# GLOBAL VARS
export PROJECT_ID=spark-serverless-bigquery
export REGION_GCP=europe-west1
export ARTIFACT_REPOSITORY=spark-serverless
export IMAGE=spark-serverless-bigquery-goodly
Buckets necesarios:
#BUCKETS
gcloud storage buckets create gs://metadata-dataproc-goodly \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access
gcloud storage buckets create gs://staging-dataproc-goodly \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access
gcloud storage buckets create gs://temp_dataproc_bigquery_indirect \
--project=$PROJECT_ID \
--default-storage-class=STANDARD \
--location=$REGION_GCP \
--uniform-bucket-level-access
Configuración de networking. En este caso, vamos a crear una VPC desde 0, pero en un entorno productivo habrá que integrarse dentro de la VPC que esté definida y habilitar el private ip google access.
# NETWORKS
gcloud compute networks create spark-serverless-vpc \
--project=$PROJECT_ID \
--subnet-mode=auto \
--mtu=1460 \
--bgp-routing-mode=regional
gcloud compute firewall-rules create allow-internal-ingress \
--project=$PROJECT_ID \
--network="spark-serverless-vpc" \
--source-ranges="10.132.0.0/20" \
--direction="ingress" \
--action="allow" \
--rules="all"
gcloud compute networks subnets update spark-serverless-vpc \
--region=$REGION_GCP \
--enable-private-ip-google-access
Vamos a crear un repositorio de artefactos, en nuestro caso en GCP, pero puede ser cualquier otro que permita la persistencia de imágenes de docker.
# ARTIFACTORY
gcloud artifacts repositories create $ARTIFACT_REPOSITORY --repository-format=docker \
--location=$REGION_GCP --description="Docker repository"
En este punto, tenemos la infraestructura lista para lanzar por primera vez un proceso de CI sobre nuestro repositorio de código. Nosotros utilizaremos Cloud Build, pero puede utilizarse cualquier herramienta de Integración Continua.
gcloud builds submit --project=$PROJECT_ID \
--region=$REGION_GCP \
--substitutions=_REGION_GCP=$REGION_GCP,_REPOSITORY=$ARTIFACT_REPOSITORY,_IMAGE=$IMAGE \
--config=cloudbuild.yaml .
El fichero cloudbuild.yaml (donde definimos los pasos de CI y la construcción de la imagen) es tal que así:
steps:
- id: 'mvn test'
name: maven:3-jdk-11-slim
entrypoint: mvn
args: [ 'test' ]
- id: 'mvn package'
name: maven:3-jdk-11-slim
entrypoint: 'bash'
args:
- '-c'
- |
mvn package -DskipTests
echo $(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) > version.txt
- id: 'Prepare provided jar'
name: 'gcr.io/cloud-builders/gsutil'
args: [ 'cp', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar','.' ]
- id: 'Build image'
name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args:
- '-c'
- |
VERSION=$(cat version.txt)
docker build -t europe-west1-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}:$VERSION \
--build-arg JAR_FILE=target/spark-serverless-bigquery-$VERSION-jar-with-dependencies.jar .
- id: 'Push image'
name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args:
- '-c'
- |
VERSION=$(cat version.txt)
docker push europe-west1-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}:$VERSION
substitutions:
_REPOSITORY: spark-serverless # default value
_IMAGE: spark-serverless-bigquery-goodly # default value
_REGION_GCP: europe-west1
options:
substitution_option: 'ALLOW_LOOSE'
logging: CLOUD_LOGGING_ONLY
Una vez construida la imagen, deberíamos tener en nuestro artifact registry (o equivalente) una imagen de docker con nuestro código dentro:
Tip: recomendamos seguir este artículo para el fichero Dockerfile que servirá para crear el custom container que lanzaremos en el siguiente punto.
Procedemos a lanzar nuestro proceso:
gcloud dataproc batches submit spark \
--batch `echo "csv-to-bq-dataproc-serverless-$(date "+%Y-%m-%d-%H%M%S")"` \
--properties spark.driver.cores=4\
,spark.driver.memory=8g\
,spark.dataproc.driver.disk.size=250g\
,spark.executor.cores=8\
,spark.executor.memory=16g\
,spark.dataproc.executor.disk.size=500g\
,spark.dynamicAllocation.enabled=true\
,spark.dynamicAllocation.initialExecutors=8\
,spark.dynamicAllocation.minExecutors=8\
,spark.dynamicAllocation.maxExecutors=8 \
--project=$PROJECT_ID \
--container-image=$REGION_GCP-docker.pkg.dev/spark-serverless-bigquery/$ARTIFACT_REPOSITORY/$IMAGE:1.0.1 \
--version=1.1 \
--region=$REGION_GCP \
--subnet=https://www.googleapis.com/compute/v1/projects/$PROJECT_ID/regions/$REGION_GCP/subnetworks/spark-serverless-vpc \
--class=com.paradigma.tech.goodly.sparkserverless.gcp.commitconf.CsvToBqDataprocServerless
Con esto hemos lanzado nuestro primer proceso de Spark y habríamos terminado aquí este post si el mundo fuese ideal, pero no lo es.
Cuando lanzas un proceso, puede fallar o, lo que es peor, puede no tener el performance que esperas. Es en ese punto donde la UI de Spark es fundamental, ya que nos permitirá ver nuestros Jobs, Stages, Tasks así como la distribución de los datos por las diferentes particiones, etc.
Por defecto, Dataproc Serverless no lo da, así que necesitamos un history server para poder tener toda la información visual que nos proporciona Spark por defecto.
# Creación History Server
gcloud dataproc clusters create phs-cluster \
--enable-component-gateway \
--bucket staging-dataproc-goodly --region $REGION_GCP \
--subnet spark-serverless-vpc --no-address \
--single-node --master-machine-type n2-standard-4 \
--master-boot-disk-size 200 --image-version 2.1-debian11 \
--properties mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://metadata-dataproc-goodly/*/mapreduce-job-history/done,yarn:yarn.nodemanager.remote-app-log-dir=gs://metadata-dataproc-goodly/*/yarn-logs,spark:spark.eventLog.dir=gs://metadata-dataproc-goodly/spark-event-log/spark-job-history,spark:spark.history.fs.logDirectory=gs://metadata-dataproc-goodly/*/spark-job-history,spark:spark.history.custom.executor.log.url.applyIncompleteApplication=false,spark:spark.history.custom.executor.log.url={{YARN_LOG_SERVER_URL}}/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}} \
--project $PROJECT_ID
Si volvemos a lanzar el proceso, pero esta vez especificando que utilice el history server que hemos definido como parámetro (“--history-server-cluster=projects/$PROJECT_ID/regions/$REGION_GCP/clusters/phs-cluster \”), ya podremos ver habilitado el botón “View Spark History Server” en la interfaz de Dataproc > Serverless > Batch y nos llevará a la UI de Spark con información sobre nuestra ejecución.
Podemos ver, entre otros resultados, el tiempo de duración total del proceso (2,8 minutos), el tiempo que cada job, número de stages por job, etc.
Spark Serverless ofrece una serie de ventajas sobre los enfoques que requieren de infraestructura dedicada, como por ejemplo:
- Fácil de usar. Spark Serverless es fácil de usar y configurar. Los usuarios pueden enviar jobs de Spark sin tener que preocuparse por la administración de la infraestructura.
- Escalabilidad. Spark Serverless se escala automáticamente según las necesidades de la carga de trabajo, siempre y cuando tengamos habilitada la propiedad de spark spark.dynamicAllocation.enabled=true. Esto ayuda a garantizar que las cargas de trabajo se ejecuten de manera eficiente, demandando y liberando recursos según el proceso lo requiera.
- Coste. Spark Serverless solo cobra por los recursos que utiliza. Esto puede ayudar a ahorrar dinero en cargas de trabajo de Spark que son de corta duración o que varían en tamaño.
- Integración con Hive-Metastore externos. Si tu organización lleva ya tiempo utilizando Spark, es posible que tenga una instalación de un hive metastore donde estén todos los metadatos de tu datalake. En ese caso, Spark Serveless permite la conexión externa en caso de seguir queriendo lanzar tus cargas en on-premise y llevarte solo algunas cargas a Google Cloud.
Pero también tiene algunos puntos de mejora:
- Procesamiento Batch. No está disponible para procesamiento en Streaming.
- Ausencia de History Server por defecto. Si bien no es un requisito indispensable para funcionar, es muy recomendable tenerlo y tiene un gran problema a priori. Es necesaria la creación de un cluster de dataproc sobre compute engine de manera adicional a la carga que vamos a ejecutar, con el coste que tiene asociado.
Dataproc sobre Compute Engine. Cluster permanente 24x7
- Cuando estés haciendo un uso intensivo de tu cluster prácticamente en horario 24x7. Por ejemplo, un cluster de 60 cores y 240 GB de RAM, que esté casi al 100% de uso durante todo el día, es más barato que tener esa misma capacidad en Dataproc Serverless durante todo el día.
- Cuando tengas que hacer procesamiento en Streaming.
Hay que tener en cuenta que, en este caso, vamos a tener que pagar el cluster, independientemente de que se esté usando o no.
Dataproc Serveless
En cargas batch recurrentes, diarias, semanales, mensuales, etc. Debería de ser la opción por defecto y si con el tiempo, se dispone de tantas cargas que siempre hay alguna corriendo la consola, puede tener sentido crear un cluster con Dataproc sobre Compute Engine.
Dataproc sobre Compute Engine. Cluster efimeros
Esta opción es un punto intermedio entre las dos anteriores y probablemente deje de tener sentido teniendo ya la posibilidad de utilizar Dataproc Serverless. Consiste básicamente en crear un cluster, lanzar tus cargas y eliminar el cluster cuando hayan terminado.
Su uso puede acotarse a cuando no podamos lanzar una carga en Dataproc Serverless por alguna limitación técnica, como por ejemplo procesamiento con soporte para GPUs o procesos batch que duran más de 24 horas.
Como conclusión, podemos decir que Dataproc Serverless se convierte en la opción por defecto si quieres lanzar procesos de Spark teniendo en cuenta siempre las limitaciones técnicas que hemos comentado anteriormente.
Tell us what you think.