RDD con Spark: el componente básico de Apache Spark



Este blog sobre RDD usando Spark le proporcionará un conocimiento detallado y completo de RDD, que es la unidad fundamental de Spark y lo útil que es.

, La palabra en sí es suficiente para generar una chispa en la mente de todos los ingenieros de Hadoop. A n en memoria herramienta de procesamiento que es increíblemente rápido en la computación en clúster. En comparación con MapReduce, el intercambio de datos en memoria hace que los RDD 10-100x Más rápido que la red y el uso compartido de discos y todo esto es posible gracias a los RDD (conjuntos de datos distribuidos resistentes). Los puntos clave en los que nos enfocamos hoy en este artículo de RDD usando Spark son:

¿Necesita RDD?

¿Por qué necesitamos RDD? -RDD usando Spark





El mundo evoluciona con y Ciencia de los datos debido al avance en . Algoritmos Residencia en Regresión , , y que se ejecuta en Repartido Computación iterativa ación moda que incluye la reutilización y el intercambio de datos entre varias unidades informáticas.

Lo tradicional técnicas necesitaban un almacenamiento intermedio estable y distribuido como HDFS que comprende cálculos repetitivos con replicaciones de datos y serialización de datos, lo que hizo que el proceso fuera mucho más lento. Encontrar una solución nunca fue fácil.



Aquí es donde RDD (Conjuntos de datos distribuidos resilientes) llega al panorama general.

RDD Los correos electrónicos son fáciles de usar y de crear sin esfuerzo, ya que los datos se importan de fuentes de datos y se colocan en los RDD. Además, las operaciones se aplican para procesarlas. Son un colección distribuida de memoria con permisos como Solo lectura y lo más importante, son Tolerante a fallos .



Si alguna partición de datos de el RDD es perdió , se puede regenerar aplicando el mismo transformación operación en esa partición perdida en linaje , en lugar de procesar todos los datos desde cero. Este tipo de enfoque en escenarios en tiempo real puede hacer que sucedan milagros en situaciones de pérdida de datos o cuando un sistema no funciona.

¿Qué son los RDD?

RDD o ( Conjunto de datos distribuidos resilientes ) es fundamental estructura de datos en Spark. El termino Elástico define la capacidad que genera los datos automáticamente o los datos retroceder al estado original cuando ocurre una calamidad inesperada con una probabilidad de pérdida de datos.

Los datos escritos en RDD son particionado y almacenado en múltiples nodos ejecutables . Si un nodo en ejecución falla en el tiempo de ejecución, luego instantáneamente obtiene la copia de seguridad del siguiente nodo ejecutable . Esta es la razón por la que los RDD se consideran un tipo avanzado de estructuras de datos en comparación con otras estructuras de datos tradicionales. Los RDD pueden almacenar datos estructurados, no estructurados y semiestructurados.

Avancemos con nuestro blog RDD using Spark y conozcamos las características únicas de los RDD que le dan una ventaja sobre otros tipos de estructuras de datos.

Características de RDD

  • En memoria (RAM) Cálculos : El concepto de computación en memoria lleva el procesamiento de datos a una etapa más rápida y eficiente donde el actuación del sistema es actualizado.
  • L su evaluación : El término evaluación perezosa dice que transformaciones se aplican a los datos en RDD, pero la salida no se genera. En cambio, las transformaciones aplicadas son registrado.
  • Persistencia : Los RDD resultantes son siempre reutilizable.
  • Operaciones de grano grueso : El usuario puede aplicar transformaciones a todos los elementos en conjuntos de datos a través de mapa, filtrar o agrupar por operaciones.
  • Tolerante a fallos : Si hay una pérdida de datos, el sistema puede Retroceder a su estado original utilizando el registro transformaciones .
  • Inmutabilidad : Los datos definidos, recuperados o creados no pueden cambiado una vez que se registra en el sistema. En caso de que necesite acceder y modificar el RDD existente, debe crear un nuevo RDD aplicando un conjunto de Transformación funciona en el RDD actual o anterior.
  • Fraccionamiento : Es el unidad crucial de paralelismo en Spark RDD. De forma predeterminada, el número de particiones creadas se basa en su fuente de datos. Incluso puede decidir la cantidad de particiones que desea hacer usando partición personalizada funciones.

Creación de RDD usando Spark

Los RDD se pueden crear en tres maneras:

  1. Leyendo datos de colecciones paralelizadas
val PCRDD = spark.sparkContext.parallelize (Array ('Lun', 'Mar', 'Mie', 'Jue', 'Vie', 'Sáb'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Aplicando transformación en RDD anteriores
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'poderoso', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Leyendo datos de almacenamiento externo o rutas de archivo como HDFS o HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operaciones realizadas en RDD:

Existen principalmente dos tipos de operaciones que se realizan en RDD, a saber:

  • Transformaciones
  • Comportamiento

Transformaciones : los operaciones aplicamos en RDD para filtro, acceso y modificar los datos en el RDD principal para generar un RDD sucesivo se llama transformación . El nuevo RDD devuelve un puntero al RDD anterior asegurando la dependencia entre ellos.

Las transformaciones son Evaluaciones perezosas, en otras palabras, las operaciones aplicadas en el RDD en el que está trabajando serán registradas pero no ejecutado. El sistema arroja un resultado o una excepción después de activar la Acción .

Podemos dividir las transformaciones en dos tipos de la siguiente manera:

  • Transformaciones estrechas
  • Amplias transformaciones

Transformaciones estrechas Aplicamos transformaciones estrechas en un partición única del RDD principal para generar un nuevo RDD, ya que los datos necesarios para procesar el RDD están disponibles en una sola partición del padre ASD . Los ejemplos de transformaciones estrechas son:

  • mapa()
  • filtrar()
  • mapa plano()
  • dividir()
  • mapPartitions ()

Amplias transformaciones: Aplicamos la amplia transformación en particiones múltiples para generar un nuevo RDD. Los datos necesarios para procesar el RDD están disponibles en las múltiples particiones del padre ASD . Los ejemplos de transformaciones amplias son:

java romper el método
  • reducir por()
  • Unión()

Comportamiento : Las acciones indican a Apache Spark que aplique cálculo y pasar el resultado o una excepción al controlador RDD. Algunas de las acciones incluyen:

  • recoger()
  • contar()
  • tomar()
  • primero()

Apliquemos prácticamente las operaciones en RDD:

IPL (Liga Premier de la India) es un torneo de cricket con su hipe en un nivel máximo. Por lo tanto, hagamos hoy en nuestras manos el conjunto de datos de IPL y ejecutemos nuestro RDD usando Spark.

  • En primer lugar, descarguemos los datos de coincidencia CSV de IPL. Después de descargarlo, comienza a verse como un archivo EXCEL con filas y columnas.

En el siguiente paso, encendemos la chispa y cargamos el archivo matches.csv desde su ubicación, en mi caso micsvla ubicación del archivo es '/User/edureka_566977/test/matches.csv'

Ahora comencemos con el Transformación primera parte:

  • mapa():

Usamos Transformación de mapas para aplicar una operación de transformación específica en cada elemento de un RDD. Aquí creamos un RDD con el nombre CKfile donde almacenamos nuestrocsvarchivo. Crearemos otro RDD llamado Estados para almacenar los detalles de la ciudad .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val estados = CKfile.map (_. split (',') (2)) estados.collect (). foreach (println)

  • filtrar():

Transformación de filtro, el nombre en sí describe su uso. Usamos esta operación de transformación para filtrar los datos selectivos de una colección de datos dada. Aplicamos operación de filtro aquí para obtener los registros de los partidos de IPL del año 2017 y guárdelo en fil RDD.

cómo usar recortar en java
val fil = CKfile.filter (línea => línea.contains ('2017')) fil.collect (). foreach (println)

  • mapa plano():

Aplicamos flatMap es una operación de transformación a cada uno de los elementos de un RDD para crear un nuevo RDD. Es similar a la transformación de mapas. aquí aplicamosMapa planoa escupir los partidos de la ciudad de Hyderabad y almacenar los datos enfilRDDRDD.

val filRDD = fil.flatMap (línea => línea.split ('Hyderabad')). collect ()

  • dividir():

Cada dato que escribimos en un RDD se divide en un cierto número de particiones. Usamos esta transformación para encontrar el número de particiones los datos se dividen en realidad.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Consideramos MapPatitions como una alternativa de Map () ypara cada() juntos. Usamos mapPartitions aquí para encontrar el número de filas tenemos en nuestro fil RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reducir por():

UsamosReducir por() en Pares clave-valor . Usamos esta transformación en nuestrocsvarchivo para encontrar el reproductor con el hombre más alto de los partidos .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (falso) ManOTH.take (10) .foreach (println)

  • Unión():

El nombre lo explica todo, usamos transformación sindical para club dos RDD juntos . Aquí estamos creando dos RDD, a saber, fil y fil2. fil RDD contiene los registros de coincidencias de IPL de 2017 y fil2 RDD contiene el registro de coincidencias de IPL de 2016.

val fil = CKfile.filter (línea => línea.contains ('2017')) val fil2 = CKfile.filter (línea => línea.contains ('2016')) val uninRDD = fil.union (fil2)

Empecemos por el Acción parte donde mostramos la salida real:

  • recoger():

Recoger es la acción que usamos para mostrar el contenido en el RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • contar():

Contares una acción que usamos para contar el número de registros presente en el RDD.Aquíestamos usando esta operación para contar el número total de registros en nuestro archivo Match.csv.

val CKfile = sc.textFile ('/ usuario / edureka_566977 / test / matches.csv') CKfile.count ()

  • tomar():

Tomar es una operación de acción similar a recopilar, pero la única diferencia es que puede imprimir cualquier número selectivo de filas según la solicitud del usuario. Aquí aplicamos el siguiente código para imprimir el diez informes principales.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. tomar (10) .foreach (println)

  • primero():

First () es una operación de acción similar a collect () y take ()esoutilizado para imprimir el informe superior s la salida Aquí usamos la primera operación () para encontrar el número máximo de partidos jugados en una ciudad en particular y obtenemos Mumbai como salida.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val estados = CKfile.map (_. split (',') (2)) val Scount = estados.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (falso) statecountm.first ()

Para que nuestro proceso de aprendizaje de RDD usando Spark, sea aún más interesante, he presentado un caso de uso interesante.

RDD usando Spark: Caso de uso de Pokémon

  • En primer lugar, Vamos a descargar un archivo Pokemon.csv y cargarlo en el Spark-Shell como hicimos con el archivo Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Los pokemons están disponibles en una gran variedad. Busquemos algunas variedades.

  • Eliminando el esquema del archivo Pokemon.csv

Puede que no necesitemos el Esquema del archivo Pokemon.csv. Por lo tanto, lo eliminamos.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Encontrar el número de particiones nuestro pokemon.csv se distribuye en.
println ('No de particiones =' + NoHeader.partitions.size)

  • Pokemon de agua

Encontrar el número de pokemon de agua

val WaterRDD = PokemonDataRDD1.filter (línea => línea.contains ('Agua')) WaterRDD.collect (). foreach (println)

  • Pokemon de fuego

Encontrar el número de pokemon de fuego

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • También podemos detectar el población de un tipo diferente de pokemon usando la función de conteo
WaterRDD.count () FireRDD.count ()

  • Ya que me gusta el juego de estrategia defensiva vamos a encontrar el pokemon con máxima defensa.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

ejemplo de variable de instancia en java
  • Conocemos el máximo valor de la fuerza de defensa pero no sabemos qué Pokémon es. Entonces, encontremos cuál es ese pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Pedido [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Ahora solucionemos el pokemon con menos defensa
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Ahora veamos al Pokémon con un estrategia menos defensiva.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemonName2 = NoHeaderName2 .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Entonces, con esto, llegamos al final de este artículo de RDD usando Spark. Espero que hayamos aclarado un poco su conocimiento sobre los RDD, sus características y los diversos tipos de operaciones que se pueden realizar en ellos.

Este artículo basado en está diseñado para prepararlo para el examen de certificación de desarrollador de Cloudera Hadoop y Spark (CCA175). Obtendrá un conocimiento profundo sobre Apache Spark y el ecosistema Spark, que incluye Spark RDD, Spark SQL, Spark MLlib y Spark Streaming. Obtendrá un conocimiento completo sobre el lenguaje de programación Scala, HDFS, Sqoop, Flume, Spark GraphX ​​y sistemas de mensajería como Kafka.