Apache Spark
Apache Spark és una plataforma de codi obert orientat a l'analítica i processat de dades massives. Ofereix una interfície per a treballar amb clústers tot considerant el paral·lelisme de dades i la tolerància a fallades de forma implícita. Originàriament desenvolupat per la Universitat de Califòrnia a Berkeley el 2009, el codi font fou cedit el 2013 a l'Apache Software Foundation, qui el manté des de llavors.[1][2][3]
| |
Tipus | entorn de treball, machine learning framework (en) , informàtica al núvol i programari lliure |
---|---|
Versió inicial | 30 maig 2014 i 1r març 2014 |
Versió estable | |
Llicència | Llicència Apache, versió 2.0 llicència BSD |
Característiques tècniques | |
Sistema operatiu | Microsoft Windows, Linux i macOS |
Plataforma | Màquina Virtual Java |
Escrit en | Scala, Java, Python, R, Structured Query Language i Java Database Connectivity |
Format de fitxer de lectura | |
Format de fitxer d'escriptura | |
Equip | |
Desenvolupador(s) | Apache Software Foundation |
Fonts de codi | |
Més informació | |
Lloc web | spark.apache.org (anglès) |
Stack Exchange | Etiqueta |
Seguiment d'errors | Seguiment d'errors |
| |
Visió General
modificaL'arquitectura d'Apache Spark està basada en el concepte de RDD (Resilient Distributed Dataset), un conjunt de dades immutable distribuït al voltant d'un clúster.[4] Sobre aquesta idea fonamental, es van anar creant capes d'abstracció per a facilitar les tasques de programació i control, utilitzant per exemple el concepte de dataset (joc de dades). Així doncs, com a API es recomana la interfície orientada a datasets des de la versió Spark 2.x[5] malgrat que la orientada a RDD segueix existint.[6][7]
El concepte de RDD neix com a contraposició al paradigma MapReduce, una estratègia per lidiar amb grans volums de dades consistent a llegir dades del disc, mapejar-les seguint una funció, reduir-ne els resultats obtinguts i emmagatzemar-los de nou al disc. Malgrat l'enfocament és molt adient (i àmpliament utilitzat) per a multitud de contextos, n'hi ha d'altres en que mostra limitacions, com ara quan prima la velocitat de processament o quan el processat és iteratiu (es processa el mateix conjunt una vegada i una altra). En comptes d'emmagatzemar cada resultat al disc, els RDDs es guarden en memòria compartida, la qual cosa n'optimitza l'accés i disponibilitat.[8]
Els escenaris en que el tractament amb RDDs són beneficiosos, ha donat lloc a nous avantatges i ha permès millorar en el tractament de certs problemes. Són casos ideals per a treballar amb Spark: l'anàlisi exploratori d'un conjunt de dades, les consultes estil SQL o els processos d'aprenentatge automàtic, fent ús de la capacitat iterativa de càlcul.[9][10]
Apache Spark necessitarà en qualsevol cas coordinar-se amb el clúster de maquinari sobre el que treballi. Per a aquesta finalitat, Spark suporta Hadoop YARN, Apache Mesos or Kubernetes,[11] així com una versió pròpia o standalone que sol usar-se per a fer proves.[12]
També necessitarà poder emmagatzemar dades de forma distribuïda. En aquest cas, podria integrar-se amb gairebé qualsevol de les solucions actualment disponibles al mercat, incloent Hadoop Distributed File System (HDFS)[13] o Cassandra.[14] Igual que en el cas del gestor, per a escenaris preliminars de proves, existeix una versió senzilla que no necessita cap complement; simplement un entorn local que simularia el clúster.
Components
modificaSpark Core
modificaÉs la base en què es recolza la resta de mòduls. Considerat el nucli del framework, Spark Core és un motor distribuït d'ús general per processar dades. En ell es troben les llibreries de SQL, processament de streaming, aprenentatge automàtic i càlcul de grafs que es poden combinar en les aplicacions. Aquest nucli constitueix la base dels projectes i facilita l'enviament de tasques distribuïdes, la programació i les funcions bàsiques d'E/S.
El codi font d'Spark està codificat mitjançant el llenguatge de programació Scala i està centrat en la idea dels RDDs immutables. Té APIs per a Scala, Java, .NET, Python i R.
Un exemple típic d'aplicació centrada en l'ús d'RDDs podria ser el següent extracte, que computa la freqüència d'aparició de les paraules que apareixen en una sèrie de fitxers de text i en retorna les més comunes. Es pot observar que els conceptes clau de MapReduce són presents en la lógica d'Spark (funcions map, flatMap o reduceByKey); la diferència és a nivell d'implementació, doncs cada funció genera un nou RDD sobre la base de l'RDD anterior.
val conf = new SparkConf().setAppName("wiki_test") // crea un objecte configuració.
val sc = new SparkContext(conf) // crea un context Spark.
val data = sc.textFile("/cami/al/directori") // Llegeix els fitxers de la carpeta "directori" i els transforma en un primer RDD.
val tokens = data.flatMap(_.split(" ")) // Divideix cada fitxer en totes les seves paraules.
val wordFreq = tokens.map((_, 1)).reduceByKey(_ + _) // Afegeix un 1 a cada paraula, després suma agrupant per paraula.
wordFreq.sortBy(s => -s._2).map(x => (x._2, x._1)).top(10) // Recupera les 10 paraules amb major puntuació i ordena de forma decreixent.
Spark SQL
modificaApache Spark SQL és un mòdul complementari idealment dissenyat per a treballar en contextos en que les dades són estructurades o semi-estructurades i es desitja consultar-les seguint l'estil SQL. Proporciona una capa d'abstracció que de cara a l'usuari difumina les diferències entre treballar sobre RDDs o sobre taules relaciones clàssiques.[15]
L'exemple mostra un cas senzill en que es crea un dataframe a partir d'una taula d'una base de dades i després s'hi executen consultes de tipus SQL.
import org.apache.spark.sql.SparkSession
val url = "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword" // URL a la base de dades.
val spark = SparkSession.builder().getOrCreate() // Crea una sessió Spark
val df = spark
.read
.format("jdbc")
.option("url", url)
.option("dbtable", "poblacio")
.load()
df.createOrReplaceTempView("poblacio")
val countsByAge = spark.sql("SELECT persona, count(*) FROM poblacio GROUP BY edat")
Spark SQL té llibreries per a Scala, Java, .NET i Python.
Spark Streaming
modificaPer a casos en que cal lidiar amb fluxos de dades en temps real, Spark proposa el mòdul Spark Streaming. Les senyals d'entrada s'agrupen en mini-lots, de forma que es puguin aplicar sobre cada un els mateixos tractaments que quan es treballa sobre grans blocs de dades, però de forma infinitament més ràpida, la qual cosa dona la sensació de treballar quasi en temps real. Com a contraposició, altres tecnologies com Storm o Flink, sí que permeten treballar senyal a senyal (autènticament en temps real).
Spark Streaming permet alinear-se amb algunes de les eines de missatgeria més populars, com Kafka, Flume o Twitter, i consumir les dades que aquestes li entreguen.
Per a aquells fluxos que tinguin una aparença estructurada existeix Spark Structured Streaming que, de forma similar al que Apache Spark SQL permetia fer amb els grans conjunts de dades, permet tractar els fluxos com si fossin taules SQL.[16] És possible llavors combinar taules estàtiques (o jocs de dades acotats) amb fluxos continus, tractant aquests segons com si fossin taules en continu creixement. És important en aquests casos definir bé els casos d'unions obertes (outer joins) ja que la banda que equival al flux no està acotada.
L'exemple següent mostra el fragment d'un codi que crea un client http de twitter utilitzant spark streaming.
# crea una configuració spark
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# crea un context spark amb la configuració anterior
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# crea elcontect Streaming des del context spark d'abans amb un interval de 2 segons
ssc = StreamingContext(sc, 2)
# estableix un punt de control per a la recuperació de RDD
ssc.checkpoint("checkpoint_TwitterApp")
# llegeix les dades
dataStream = ssc.socketTextStream("localhost",9009)
# divideix el tweet en paraules
words = dataStream.flatMap(lambda line: line.split(" "))
# filtra les paraules per obtenir només els hashtags, després mapeja a cada hashtag una parella de (hashtag, 1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# agrega el compte de cada hashtag al seu últim compte
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# processa cada RDD generat en cada interval
tags_totals.foreachRDD(process_rdd)
# comencça la computació en streaming
ssc.start()
# espera a que la transmissió acabi
ssc.awaitTermination()
Spark Mlib
modificaMLlib és la llibreria escalable d'aprenentatge automàtic de Spark. Conté eines amb les quals les tasques pràctiques d'aprenentatge automàtic són senzilles i escalables, a més de nombrosos algoritmes d'aprenentatge d'ús habitual, com ara classificació, regressió, recomanació i agrupació en clústers. També inclou la pipeline i altres utilitats, com transformacions de característiques, creació de fluxos de processament d'aprenentatge automàtic, avaluació de models, àlgebra lineal distribuïda i estadístiques. MLlib s'integra perfectament amb altres components de Spark com ara Spark SQL, Spark Streaming i DataFrames i s'instal·la a l'entorn d'execució de Databricks. La llibreria es pot utilitzar en Java, Scala i Python com a part de les aplicacions de Spark, de manera que es pot incloure en fluxos de treball complets. També s'utilitzen models entrenats amb MLlib per fer prediccions en Structured Streaming, tots aquest processos permeten una externalització a partir d'una API.
En el següent exemple tenim un model de regressió lineal amb Spark MLlib, utilitzant VectorAssembler per concatenar les característiques, StandardScaler per estandarditzar-les, i un pipeline per organitzar les etapes de transformació i l'estimador. Les prediccions són realitzades utilitzant el model de pipeline ajustat.
from pyspark.ml.feature import VectorAssembler
vecAssembler= VectorAssembler(inputCols = trainDF.drop("SalePrice").columns,
outputCol = "features")
vecTrainDF = vecAssembler.transform(trainDF)
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True,
withMean=False)
scalerModel = scaler.fit(vecTrainDF)
scTrainDF = scalerModel.transform(vecTrainDF)
from pyspark.ml.regression import LinearRegression
lr = (
LinearRegression(
featuresCol="features", labelCol="SalePrice", maxIter=10, regParam=0.8, elasticNetParam=0.1,)
)
lrModel = lr.fit(scTrainDF)
from pyspark.ml import Pipeline
pipeline=Pipeline(stages=[vecAssembler, scaler, lr])
pipelineModel=pipeline.fit(trainDF)
predDF=pipelineModel.transform(validationDF)
predDF.select("prediction","SalePrice","features").show(5)
Spark Graph X
modificaGraphX és el mòdul de Spark per al càlcul i processament de grafs. L'estructura de dades del graf pot ser definida utilitzant un esquema de graf o un RDD de vèrtexs i arestes, on els vèrtexs són etiquetes d'entitats i les arestes representen les relacions entre les entitats. És flexible i funciona perfectament tant amb grafs com amb col·leccions, unificant en un mateix sistema el procés d'extracció, transformació i càrrega (ETL), les anàlisis exploratòries i el càlcul iteratiu de gràfics. GraphX suporta diversos algoritmes, com ara PageRank, components connectats, camí més curt i recompte de triangles.
Si el comparem amb altre eines de processament de grafs, com per exemple Giraph, podem afirmar que GraphX és un marc de processament paral·lel de grafs que permet analitzar grafs a gran escala amb una latència mínima, i que també suporta una gran varietat d'operacions de grafs, incloent càrrega de grafs, recorregut i consulta.
En el següent exemple el codi (en llenguatge Scala) crea un graf dirigit amb vèrtexs que representen usuaris i arestes que representen relacions entre els usuaris.
// assumim que el context ja està creat
val sc: SparkContext
// crea una RDD pels vèrtexs
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Crea una RDD per les arestes
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// crea un nou usuari per defecte en cas que hi hagi una relació sense usuari
val defaultUser = ("John Doe", "Missing")
// genera el graf inicial
val graph = Graph(users, relationships, defaultUser)
Referències
modifica- ↑ «Apache Spark™ - What is Spark» (en anglès). https://databricks.com,+18-02-2020.+[Consulta: 18 febrer 2020].
- ↑ Pointer, Ian. «What is Apache Spark? The big data platform that crushed Hadoop» (en anglès). https://www.infoworld.com,+13-11-2017.+[Consulta: 18 febrer 2020].
- ↑ Kovachev, Dilyan. «A Beginner’s Guide to Apache Spark» (en anglès). https://towardsdatascience.com,+20-03-2019.+[Consulta: 18 febrer 2020].
- ↑ "Spark: Cluster Computing with Working Sets" a USENIX Workshop on Hot Topics in Cloud Computing (HotCloud).
- ↑ «Spark 2.2.0 Quick Start», 11-07-2017. «we highly recommend you to switch to use Dataset, which has better performance than RDD»
- ↑ «Spark 2.2.0 deprecation list», 11-07-2017.
- ↑ Damji, Jules. «A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets: When to use them and why», 14-07-2016.
- ↑ (2010) "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing" a USENIX Symp. Networked Systems Design and Implementation.
- ↑ «Spark vs. Hadoop MapReduce: Which big data framework to choose» (en anglès). [Consulta: 22 març 2021].
- ↑ Harris, Derrick. «4 reasons why Spark could jolt Hadoop into hyperdrive», 28-06-2014. Arxivat de l'original el 24 d’octubre 2017. [Consulta: 22 març 2021].
- ↑ «Cluster Mode Overview - Spark 2.4.0 Documentation - Cluster Manager Types». Apache Foundation, 09-07-2019.
- ↑ «Spark Standalone Mode - Spark 3.1.1 Documentation». [Consulta: 22 març 2021].
- ↑ Figure showing Spark in relation to other open-source Software projects including Hadoop
- ↑ Plantilla:Cite mailing list
- ↑ «What is Spark SQL?» (en anglès americà). [Consulta: 22 març 2021].
- ↑ «Structured Streaming Programming Guide - Spark 3.1.1 Documentation». [Consulta: 22 març 2021].