DBInputFormat para transferir datos de SQL a la base de datos NoSQL



El objetivo de este blog es aprender cómo transferir datos de bases de datos SQL a HDFS, cómo transferir datos de bases de datos SQL a bases de datos NoSQL.

En este blog exploraremos las capacidades y posibilidades de uno de los componentes más importantes de la tecnología Hadoop, es decir, MapReduce.

Hoy en día, las empresas están adoptando el marco de trabajo Hadoop como su primera opción para el almacenamiento de datos debido a sus capacidades para manejar grandes datos de manera efectiva. Pero también sabemos que los datos son versátiles y existen en varias estructuras y formatos. Para controlar una variedad tan grande de datos y sus diferentes formatos, debe haber un mecanismo para acomodar todas las variedades y, sin embargo, producir un resultado efectivo y consistente.





El componente más poderoso en el marco de Hadoop es MapReduce que puede proporcionar el control sobre los datos y su estructura mejor que sus otras contrapartes. Aunque requiere una sobrecarga de la curva de aprendizaje y la complejidad de la programación, si puede manejar estas complejidades, seguramente podrá manejar cualquier tipo de datos con Hadoop.

El framework MapReduce divide todas sus tareas de procesamiento en básicamente dos fases: Map y Reduce.



La preparación de sus datos sin procesar para estas fases requiere la comprensión de algunas clases e interfaces básicas. La superclase para estos reprocesos es InputFormat.

los InputFormat class es una de las clases principales de la API de Hadoop MapReduce. Esta clase es responsable de definir dos cosas principales:

  • Divisiones de datos
  • Lector de registros

División de datos es un concepto fundamental en el marco de Hadoop MapReduce que define tanto el tamaño de las tareas de mapas individuales como su servidor de ejecución potencial. los Lector de registros es responsable de leer los registros reales del archivo de entrada y enviarlos (como pares clave / valor) al asignador.



que es el curso de ciencia de datos

El número de mapeadores se decide en función del número de divisiones. El trabajo de InputFormat es crear las divisiones. La mayoría de las veces, el tamaño de la división es equivalente al tamaño del bloque, pero no siempre las divisiones se crearán en función del tamaño del bloque HDFS. Depende totalmente de cómo se haya anulado el método getSplits () de su InputFormat.

Existe una diferencia fundamental entre MR split y HDFS block. Un bloque es un fragmento físico de datos, mientras que una división es solo un fragmento lógico que lee un asignador. Una división no contiene los datos de entrada, solo contiene una referencia o dirección de los datos. Una división tiene básicamente dos cosas: una longitud en bytes y un conjunto de ubicaciones de almacenamiento, que son solo cadenas.

Para entender esto mejor, tomemos un ejemplo: Procesando datos almacenados en su MySQL usando MR. Dado que no existe el concepto de bloques en este caso, la teoría: 'las divisiones siempre se crean en base al bloque HDFS',falla. Una posibilidad es crear divisiones basadas en rangos de filas en su tabla MySQL (y esto es lo que hace DBInputFormat, un formato de entrada para leer datos de bases de datos relacionales). Podemos tener k número de divisiones que constan de n filas.

Es solo para los InputFormats basados ​​en FileInputFormat (un InputFormat para manejar datos almacenados en archivos) que las divisiones se crean en función del tamaño total, en bytes, de los archivos de entrada. Sin embargo, el tamaño de bloque de FileSystem de los archivos de entrada se trata como un límite superior para las divisiones de entrada. Si tiene un archivo más pequeño que el tamaño del bloque HDFS, obtendrá solo 1 asignador para ese archivo. Si desea tener un comportamiento diferente, puede usar mapred.min.split.size. Pero nuevamente depende únicamente de getSplits () de su InputFormat.

Tenemos tantos formatos de entrada preexistentes disponibles en el paquete org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

El valor predeterminado es TextInputFormat.

Del mismo modo, tenemos tantos formatos de salida que leen los datos de los reductores y los almacenan en HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

que es keyerror en python

TextOutputFormat.html

El valor predeterminado es TextOutputFormat.

Cuando termines de leer este blog, habrás aprendido:

  • Cómo escribir un programa de reducción de mapas
  • Acerca de los diferentes tipos de InputFormats disponibles en Mapreduce
  • ¿Cuál es la necesidad de InputFormats?
  • Cómo escribir InputFormats personalizados
  • Cómo transferir datos de bases de datos SQL a HDFS
  • Cómo transferir datos de bases de datos SQL (aquí MySQL) a bases de datos NoSQL (aquí Hbase)
  • Cómo transferir datos de una base de datos SQL a otra tabla en bases de datos SQL (Quizás esto no sea tan importante si hacemos esto en la misma base de datos SQL. Sin embargo, no hay nada de malo en tener conocimiento de la misma. Nunca se sabe cómo puede entrar en uso)

Requisito previo:

  • Hadoop preinstalado
  • SQL preinstalado
  • Hbase preinstalado
  • Comprensión básica de Java
  • Conocimiento de MapReduce
  • Conocimientos básicos del framework Hadoop

Entendamos el enunciado del problema que vamos a resolver aquí:

Tenemos una tabla de empleados en MySQL DB en nuestra base de datos relacional Edureka. Ahora, según el requisito comercial, tenemos que cambiar todos los datos disponibles en la base de datos relacional al sistema de archivos Hadoop, es decir, HDFS, NoSQL DB conocido como Hbase.

cuál es la diferencia entre java y c ++

Tenemos muchas opciones para realizar esta tarea:

  • Sqoop
  • Canal artificial
  • Mapa reducido

Ahora bien, no desea instalar y configurar ninguna otra herramienta para esta operación. Solo le queda una opción, que es MapReduce, el marco de procesamiento de Hadoop. El marco MapReduce le daría un control total sobre los datos durante la transferencia. Puede manipular las columnas y colocarlas directamente en cualquiera de las dos ubicaciones de destino.

Nota:

  • Necesitamos descargar y poner el conector MySQL en la ruta de clase de Hadoop para obtener tablas de la tabla MySQL. Para hacer esto, descargue el conector com.mysql.jdbc_5.1.5.jar y guárdelo en el directorio Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Además, coloque todos los archivos jar de Hbase en Hadoop classpath para que su programa de MR acceda a Hbase. Para hacer esto, ejecute el siguiente comando :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Las versiones de software que he utilizado en la ejecución de esta tarea son:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Eclipse Luna

Para evitar el programa en cualquier problema de compatibilidad, prescribo a mis lectores que ejecuten el comando en un entorno similar.

DBInputWritable personalizado:

paquete com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implementa Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException // El objeto Resultset representa los datos devueltos por una declaración SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) throws IOException { } public void write (PreparedStatement ps) lanza SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {nombre de retorno} public String getDept () {departamento de retorno}}

DBOutputWritable personalizado:

paquete com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementa Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException {} public void write (DataOutput out) throws IOException {} public void write (PreparedStatement) ps) lanza SQLException {ps.setString (1, nombre) ps.setInt (2, id) ps.setString (3, dept)}}

Tabla de entrada:

crear base de datos edureka
crear tabla emp (empid int no nulo, nombre varchar (30), dept varchar (20), clave primaria (empid))
insertar en los valores emp (1, 'abhay', 'desarrollo'), (2, 'brundesh', 'prueba')
seleccionar * de emp

Caso 1: Transferencia de MySQL a HDFS

paquete com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // clase de controlador' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // nombre de usuario' root ') // contraseña Job job = nuevo Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) FileOutputForput, new Path (args [0])) DBInputFormat.setInput (trabajo, DBInputWritable.class, 'emp', // nombre de la tabla de entrada null, null, new String [] {'empid', 'name', 'dept'} / / columnas de tabla) Ruta p = nueva ruta (args [0]) FileSystem fs = FileSystem.get (nuevo URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Este código nos permite preparar o configurar el formato de entrada para acceder a nuestra base de datos SQL fuente. El parámetro incluye la clase del controlador, la URL tiene la dirección de la base de datos SQL, su nombre de usuario y la contraseña.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // clase de controlador 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // nombre de usuario 'root') //contraseña

Este fragmento de código nos permite pasar los detalles de las tablas en la base de datos y configurarlo en el objeto de trabajo. Los parámetros incluyen, por supuesto, la instancia de trabajo, la clase de escritura personalizada que debe implementar la interfaz DBWritable, el nombre de la tabla de origen, la condición si alguna otra cosa es nula, cualquier parámetro de clasificación si no es nulo, la lista de columnas de la tabla respectivamente.

DBInputFormat.setInput (trabajo, DBInputWritable.class, 'emp', // nombre de la tabla de entrada null, null, new String [] {'empid', 'name', 'dept'} // columnas de la tabla)

Mapeador

paquete com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map amplía Mapper {
mapa vacío protegido (clave LongWritable, valor DBInputWritable, ctx de contexto) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (nuevo texto (nombre + '' + id + '' + departamento), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reductor: Reductor de identidad utilizado

Comando para ejecutar:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Salida: tabla MySQL transferida a HDFS

hadoop dfs -ls / dbtohdfs / *

Caso 2: Transferencia de una tabla en MySQL a otra en MySQL

creando tabla de salida en MySQL

crear tabla employee1 (nombre varchar (20), id int, dept varchar (20))

paquete com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable clase pública Mainonetable_to_other_table {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // nombre de usuario' root ') // contraseña Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nombre de la tabla de entrada nulo, nulo, nuevo String [] {'empid ',' name ',' dept '} // columnas de la tabla) DBOutputFormat.setOutput (job,' employee1 ', // nombre de la tabla de salida new String [] {' name ',' id ',' dept '} // table columnas) System.exit (job.waitForCompletion (true)? 0: 1)}}

Esta pieza de código nos permite configurar el nombre de la tabla de salida en SQL DB. Los parámetros son la instancia del trabajo, el nombre de la tabla de salida y los nombres de la columna de salida, respectivamente.

DBOutputFormat.setOutput (trabajo, 'empleado1', // nombre de la tabla de salida new String [] {'nombre', 'id', 'dept'} // columnas de la tabla)

Mapeador: igual que el caso 1

Reductor:

paquete com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable clase pública Reducir extiende Reductor {vacío protegido reducir (clave de texto, valores iterables, contexto ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (nuevo DBOutputWritable (línea [0] .toString (), Integer.parseInt (línea [1] .toString ()), línea [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Comando para ejecutar:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Salida: datos transferidos de la tabla EMP en MySQL a otra tabla Employee1 en MySQL

Caso 3: Transferencia de la tabla en MySQL a la tabla NoSQL (Hbase)

Creando una tabla Hbase para acomodar la salida de la tabla SQL:

crear 'empleado', 'official_info'

Clase de conductor:

package Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // clase de controlador 'jdbc: mysql: // localhost: 3306 / edureka' , // URL de la base de datos 'root', // nombre de usuario 'root') // contraseña Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map. class) DBInputFormat.setInput (trabajo, DBInputWritable.class, 'emp', // nombre de la tabla de entrada nulo, nulo, nuevo String [] {'empid', 'name', 'dept'} // columnas de la tabla) System.exit (job.waitForCompletion (verdadero)? 0: 1)}}

Este fragmento de código le permite configurar la clase de clave de salida que en el caso de hbase es ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Aquí estamos pasando el nombre de la tabla hbase y el reductor para actuar sobre la mesa.

TableMapReduceUtil.initTableReducerJob ('empleado', Reduce.clase, trabajo)

Mapeador:

paquete Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map amplía Mapper {private IntWritable one = new IntWritable (1) mapa vacío protegido (LongWritable id, DBInputWritable valor, contexto de contexto) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (línea + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

En este fragmento de código, tomamos valores de los captadores de la clase DBinputwritable y luego los pasamos
ImmutableBytesWritable para que alcancen el reductor en forma de bytes que Hbase entiende.

Cadena línea = valor.getName () Cadena cd = valor.getId () + '' Cadena dept = valor.getDept () context.write (nuevo ImmutableBytesWritable (Bytes.toBytes (cd)), nuevo Texto (línea + '' + dept ))

Reductor:

package Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce extiende TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {String [] cause = null // Valores de bucle for (Text val: values) {cause = val.toString (). split ('')} // Put to HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info' ), Bytes.toBytes ('nombre'), Bytes.toBytes (causa [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('departamento'), Bytes.toBytes (causa [1 ])) context.write (clave, poner)}}

Este fragmento de código nos permite decidir la fila exacta y la columna en la que estaríamos almacenando los valores del reductor. Aquí estamos almacenando cada empid en una fila separada ya que hicimos empid como clave de fila que sería única. En cada fila almacenamos la información oficial de los empleados en la familia de columnas “official_info” en las columnas “nombre” y “departamento” respectivamente.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('nombre'), Bytes.toBytes (causa [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('departamento'), Bytes.toBytes (causa [1])) context.write (clave, poner)

Datos transferidos en Hbase:

escanear empleado

Como vemos, pudimos completar con éxito la tarea de migrar nuestros datos comerciales de una base de datos SQL relacional a una base de datos NoSQL.

En el próximo blog, aprenderemos cómo escribir y ejecutar códigos para otros formatos de entrada y salida.

Siga publicando sus comentarios, preguntas o cualquier comentario. Me encantaría saber de ti.

Tienes una pregunta para nosotros? Menciónalo en la sección de comentarios y nos comunicaremos contigo.

Artículos Relacionados: