Apache Spark

plataforma d'anàlisi de dades de forma distribuïda

Apache Spark és una plataforma de codi obert orientat a l'analítica i processat de dades massives. Ofereix una interfície per a treballar amb clústers tot considerant el paral·lelisme de dades i la tolerància a fallades de forma implícita. Originàriament desenvolupat per la Universitat de Califòrnia a Berkeley el 2009, el codi font fou cedit el 2013 a l'Apache Software Foundation, qui el manté des de llavors.[1][2][3]

Apache Spark
Modifica el valor a Wikidata
Modifica el valor a Wikidata
Tipusentorn de treball, machine learning framework (en) Tradueix, informàtica al núvol i programari lliure Modifica el valor a Wikidata
Versió inicial30 maig 2014 i 1r març 2014 Modifica el valor a Wikidata
Versió estable
3.5.2 (6 agost 2024) Modifica el valor a Wikidata
LlicènciaLlicència Apache, versió 2.0
llicència BSD Modifica el valor a Wikidata
Característiques tècniques
Sistema operatiuMicrosoft Windows, Linux i macOS Modifica el valor a Wikidata
PlataformaMàquina Virtual Java Modifica el valor a Wikidata
Escrit enScala, Java, Python, R, Structured Query Language i Java Database Connectivity Modifica el valor a Wikidata
Format de fitxer de lectura
Format de fitxer d'escriptura
Equip
Creador/sMatei Zaharia Modifica el valor a Wikidata
Desenvolupador(s)Apache Software Foundation Modifica el valor a Wikidata
Més informació
Lloc webspark.apache.org (anglès) Modifica el valor a Wikidata
Stack ExchangeEtiqueta Modifica el valor a Wikidata
Seguiment d'errorsSeguiment d'errors Modifica el valor a Wikidata

Fig. 1 Estructura interna de l'Apache Spark

Visió General

modifica

L'arquitectura d'Apache Spark està basada en el concepte de RDD (Resilient Distributed Dataset), un conjunt de dades immutable distribuït al voltant d'un clúster.[4] Sobre aquesta idea fonamental, es van anar creant capes d'abstracció per a facilitar les tasques de programació i control, utilitzant per exemple el concepte de dataset (joc de dades). Així doncs, com a API es recomana la interfície orientada a datasets des de la versió Spark 2.x[5] malgrat que la orientada a RDD segueix existint.[6][7]

El concepte de RDD neix com a contraposició al paradigma MapReduce, una estratègia per lidiar amb grans volums de dades consistent a llegir dades del disc, mapejar-les seguint una funció, reduir-ne els resultats obtinguts i emmagatzemar-los de nou al disc. Malgrat l'enfocament és molt adient (i àmpliament utilitzat) per a multitud de contextos, n'hi ha d'altres en que mostra limitacions, com ara quan prima la velocitat de processament o quan el processat és iteratiu (es processa el mateix conjunt una vegada i una altra). En comptes d'emmagatzemar cada resultat al disc, els RDDs es guarden en memòria compartida, la qual cosa n'optimitza l'accés i disponibilitat.[8]

Els escenaris en que el tractament amb RDDs són beneficiosos, ha donat lloc a nous avantatges i ha permès millorar en el tractament de certs problemes. Són casos ideals per a treballar amb Spark: l'anàlisi exploratori d'un conjunt de dades, les consultes estil SQL o els processos d'aprenentatge automàtic, fent ús de la capacitat iterativa de càlcul.[9][10]

Apache Spark necessitarà en qualsevol cas coordinar-se amb el clúster de maquinari sobre el que treballi. Per a aquesta finalitat, Spark suporta Hadoop YARN, Apache Mesos or Kubernetes,[11] així com una versió pròpia o standalone que sol usar-se per a fer proves.[12]

També necessitarà poder emmagatzemar dades de forma distribuïda. En aquest cas, podria integrar-se amb gairebé qualsevol de les solucions actualment disponibles al mercat, incloent Hadoop Distributed File System (HDFS)[13] o Cassandra.[14] Igual que en el cas del gestor, per a escenaris preliminars de proves, existeix una versió senzilla que no necessita cap complement; simplement un entorn local que simularia el clúster.

Components

modifica

Spark Core

modifica

És la base en què es recolza la resta de mòduls. Considerat el nucli del framework, Spark Core és un motor distribuït d'ús general per processar dades. En ell es troben les llibreries de SQL, processament de streaming, aprenentatge automàtic i càlcul de grafs que es poden combinar en les aplicacions. Aquest nucli constitueix la base dels projectes i facilita l'enviament de tasques distribuïdes, la programació i les funcions bàsiques d'E/S.

El codi font d'Spark està codificat mitjançant el llenguatge de programació Scala i està centrat en la idea dels RDDs immutables. Té APIs per a Scala, Java, .NET, Python i R.

Un exemple típic d'aplicació centrada en l'ús d'RDDs podria ser el següent extracte, que computa la freqüència d'aparició de les paraules que apareixen en una sèrie de fitxers de text i en retorna les més comunes. Es pot observar que els conceptes clau de MapReduce són presents en la lógica d'Spark (funcions map, flatMap o reduceByKey); la diferència és a nivell d'implementació, doncs cada funció genera un nou RDD sobre la base de l'RDD anterior.

val conf = new SparkConf().setAppName("wiki_test") // crea un objecte configuració.
val sc = new SparkContext(conf) // crea un context Spark.
val data = sc.textFile("/cami/al/directori") // Llegeix els fitxers de la carpeta "directori" i els transforma en un primer RDD.
val tokens = data.flatMap(_.split(" ")) // Divideix cada fitxer en totes les seves paraules.
val wordFreq = tokens.map((_, 1)).reduceByKey(_ + _) // Afegeix un 1 a cada paraula, després suma agrupant per paraula.
wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10) // Recupera les 10 paraules amb major puntuació i ordena de forma decreixent.

Spark SQL

modifica

Apache Spark SQL és un mòdul complementari idealment dissenyat per a treballar en contextos en que les dades són estructurades o semi-estructurades i es desitja consultar-les seguint l'estil SQL. Proporciona una capa d'abstracció que de cara a l'usuari difumina les diferències entre treballar sobre RDDs o sobre taules relaciones clàssiques.[15]

L'exemple mostra un cas senzill en que es crea un dataframe a partir d'una taula d'una base de dades i després s'hi executen consultes de tipus SQL.

import org.apache.spark.sql.SparkSession

val url = "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword" // URL a la base de dades.
val spark = SparkSession.builder().getOrCreate() // Crea una sessió Spark

val df = spark
 .read
 .format("jdbc")
 .option("url", url)
 .option("dbtable", "poblacio")
 .load()

df.createOrReplaceTempView("poblacio")
val countsByAge = spark.sql("SELECT persona, count(*) FROM poblacio GROUP BY edat")

Spark SQL té llibreries per a Scala, Java, .NET i Python.

Spark Streaming

modifica

Per a casos en que cal lidiar amb fluxos de dades en temps real, Spark proposa el mòdul Spark Streaming. Les senyals d'entrada s'agrupen en mini-lots, de forma que es puguin aplicar sobre cada un els mateixos tractaments que quan es treballa sobre grans blocs de dades, però de forma infinitament més ràpida, la qual cosa dona la sensació de treballar quasi en temps real. Com a contraposició, altres tecnologies com Storm o Flink, sí que permeten treballar senyal a senyal (autènticament en temps real).

Spark Streaming permet alinear-se amb algunes de les eines de missatgeria més populars, com Kafka, Flume o Twitter, i consumir les dades que aquestes li entreguen.

Per a aquells fluxos que tinguin una aparença estructurada existeix Spark Structured Streaming que, de forma similar al que Apache Spark SQL permetia fer amb els grans conjunts de dades, permet tractar els fluxos com si fossin taules SQL.[16] És possible llavors combinar taules estàtiques (o jocs de dades acotats) amb fluxos continus, tractant aquests segons com si fossin taules en continu creixement. És important en aquests casos definir bé els casos d'unions obertes (outer joins) ja que la banda que equival al flux no està acotada.

L'exemple següent mostra el fragment d'un codi que crea un client http de twitter utilitzant spark streaming.

# crea una configuració spark
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# crea un context spark amb la configuració anterior
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# crea elcontect Streaming des del context spark d'abans amb un interval de 2 segons
ssc = StreamingContext(sc, 2)
# estableix un punt de control per a la recuperació de RDD
ssc.checkpoint("checkpoint_TwitterApp")
# llegeix les dades
dataStream = ssc.socketTextStream("localhost",9009)
# divideix el tweet en paraules
words = dataStream.flatMap(lambda line: line.split(" "))
# filtra les paraules per obtenir només els hashtags, després mapeja a cada hashtag una parella de (hashtag, 1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# agrega el compte de cada hashtag al seu últim compte
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# processa cada RDD generat en cada interval
tags_totals.foreachRDD(process_rdd)
# comencça la computació en streaming
ssc.start()
# espera a que la transmissió acabi
ssc.awaitTermination()

Spark Mlib

modifica

MLlib és la llibreria escalable d'aprenentatge automàtic de Spark. Conté eines amb les quals les tasques pràctiques d'aprenentatge automàtic són senzilles i escalables, a més de nombrosos algoritmes d'aprenentatge d'ús habitual, com ara classificació, regressió, recomanació i agrupació en clústers. També inclou la pipeline i altres utilitats, com transformacions de característiques, creació de fluxos de processament d'aprenentatge automàtic, avaluació de models, àlgebra lineal distribuïda i estadístiques. MLlib s'integra perfectament amb altres components de Spark com ara Spark SQL, Spark Streaming i DataFrames i s'instal·la a l'entorn d'execució de Databricks. La llibreria es pot utilitzar en Java, Scala i Python com a part de les aplicacions de Spark, de manera que es pot incloure en fluxos de treball complets. També s'utilitzen models entrenats amb MLlib per fer prediccions en Structured Streaming, tots aquest processos permeten una externalització a partir d'una API.

En el següent exemple tenim un model de regressió lineal amb Spark MLlib, utilitzant VectorAssembler per concatenar les característiques, StandardScaler per estandarditzar-les, i un pipeline per organitzar les etapes de transformació i l'estimador. Les prediccions són realitzades utilitzant el model de pipeline ajustat.

from pyspark.ml.feature import VectorAssembler
vecAssembler= VectorAssembler(inputCols = trainDF.drop("SalePrice").columns,
outputCol = "features")
vecTrainDF = vecAssembler.transform(trainDF)
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True,
withMean=False)
scalerModel = scaler.fit(vecTrainDF)
scTrainDF = scalerModel.transform(vecTrainDF)
from pyspark.ml.regression import LinearRegression
lr = (
 LinearRegression(
 featuresCol="features", labelCol="SalePrice", maxIter=10, regParam=0.8, elasticNetParam=0.1,)
)
lrModel = lr.fit(scTrainDF)
from pyspark.ml import Pipeline
pipeline=Pipeline(stages=[vecAssembler, scaler, lr])
pipelineModel=pipeline.fit(trainDF)
predDF=pipelineModel.transform(validationDF)
predDF.select("prediction","SalePrice","features").show(5)

Spark Graph X

modifica

GraphX és el mòdul de Spark per al càlcul i processament de grafs. L'estructura de dades del graf pot ser definida utilitzant un esquema de graf o un RDD de vèrtexs i arestes, on els vèrtexs són etiquetes d'entitats i les arestes representen les relacions entre les entitats. És flexible i funciona perfectament tant amb grafs com amb col·leccions, unificant en un mateix sistema el procés d'extracció, transformació i càrrega (ETL), les anàlisis exploratòries i el càlcul iteratiu de gràfics. GraphX suporta diversos algoritmes, com ara PageRank, components connectats, camí més curt i recompte de triangles.

Si el comparem amb altre eines de processament de grafs, com per exemple Giraph, podem afirmar que GraphX és un marc de processament paral·lel de grafs que permet analitzar grafs a gran escala amb una latència mínima, i que també suporta una gran varietat d'operacions de grafs, incloent càrrega de grafs, recorregut i consulta.

En el següent exemple el codi (en llenguatge Scala) crea un graf dirigit amb vèrtexs que representen usuaris i arestes que representen relacions entre els usuaris.

// assumim que el context ja està creat
val sc: SparkContext
// crea una RDD pels vèrtexs
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Crea una RDD per les arestes
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// crea un nou usuari per defecte en cas que hi hagi una relació sense usuari
val defaultUser = ("John Doe", "Missing")
// genera el graf inicial
val graph = Graph(users, relationships, defaultUser)

Referències

modifica
  1. «Apache Spark™ - What is Spark» (en anglès). https://databricks.com,+18-02-2020.+[Consulta: 18 febrer 2020].
  2. Pointer, Ian. «What is Apache Spark? The big data platform that crushed Hadoop» (en anglès). https://www.infoworld.com,+13-11-2017.+[Consulta: 18 febrer 2020].
  3. Kovachev, Dilyan. «A Beginner’s Guide to Apache Spark» (en anglès). https://towardsdatascience.com,+20-03-2019.+[Consulta: 18 febrer 2020].
  4. "Spark: Cluster Computing with Working Sets" a USENIX Workshop on Hot Topics in Cloud Computing (HotCloud).  
  5. «Spark 2.2.0 Quick Start», 11-07-2017. «we highly recommend you to switch to use Dataset, which has better performance than RDD»
  6. «Spark 2.2.0 deprecation list», 11-07-2017.
  7. Damji, Jules. «A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets: When to use them and why», 14-07-2016.
  8. (2010) "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing" a USENIX Symp. Networked Systems Design and Implementation.  
  9. «Spark vs. Hadoop MapReduce: Which big data framework to choose» (en anglès). [Consulta: 22 març 2021].
  10. Harris, Derrick. «4 reasons why Spark could jolt Hadoop into hyperdrive», 28-06-2014. Arxivat de l'original el 24 d’octubre 2017. [Consulta: 22 març 2021].
  11. «Cluster Mode Overview - Spark 2.4.0 Documentation - Cluster Manager Types». Apache Foundation, 09-07-2019.
  12. «Spark Standalone Mode - Spark 3.1.1 Documentation». [Consulta: 22 març 2021].
  13. Figure showing Spark in relation to other open-source Software projects including Hadoop
  14. Plantilla:Cite mailing list
  15. «What is Spark SQL?» (en anglès americà). [Consulta: 22 març 2021].
  16. «Structured Streaming Programming Guide - Spark 3.1.1 Documentation». [Consulta: 22 març 2021].