martes, 23 de octubre de 2012

De OrangeHRM a OpenERP con OpenETL.

Después de mis últimos post Migración de datos entre distintas instancias de OpenERP usando OpenETL y Carga de datos en OpenERP usando OpenETL y aprovechando que estamos realizando una migración en la empresa desde OrangeHRM a OpenERP que mejor forma de cerrar el círculo que presentar en forma de post el proceso de migración que hemos seguido.

El escenario:

Para migrar los datos hemos partido de los archivos .csv que se generan desde OrangeHRM. Como dichos archivos contienen información de la empresa no los voy a adjuntar con el post, simplemente me limitaré a describir los campos. También partimos de un OpenERP que ya tiene cargados datos como países, provincias, y empresa. Para realizar las llamadas al xmlrpc he usado al usuario admin, el cual ya pertenece a la empresa. De esta forma el campo company_id se ha ajustado automáticamente.
Los archivos .csv contienen los siguientes campos:
Empleados.csv:
  • empID: Identificación del empleado en Orange.
  • lastName: Apellidos del empleado.
  • firstName: Primer nombre del empleado.
  • middleName: Segundo nombre del empleado.
  • street1: Dirección del empleado.
  • street2: Campo de apoyo para street1.
  • city: Municipio.
  • state: Provincia.
  • zip: Código postal.
  • gender: Género.
  • birthDate: Fecha de nacimiento.
  • ssn: Número de la seguridad social.
  • workStation: Departamento al que pertenece el empleado.
Los campos middleName, street1, street2, city, state, zip, workStation,birthDate pueden estar vacíos en el archivo .csv, por lo que hay que controlar estos casos.
dptos.csv:
  • workStation: Nombre del departamento en el sistema.
cargos.csv:
  • empId: Identificación del empleado en Orange.
  • empStatus: Cargo del empleado en la empresa.
El modelo de datos relacionados afectado de OpenERP se presenta a continuación. Lo he simplificado bastante y sólo he puesto los campos de los objetos que se van a cargar con los valores de los .csv.

Si te alejas de la pantalla y te pones bizco, verás a un tio bailando. En realidad es el diagrama de clases simplificado de objetos OpenERP.


A tener en cuenta:

Observando el modelo de datos se puede ver que muchos objetos están relacionados entre sí. Esto implica que si queremos cargar los empleados, antes tenemos que tener cargados en el sistema las direcciones. Este mismo comportamiento nos sucede con los departamentos, puestos de trabajo, etc.
Para solucionar este inconveniente he usado subtareas de OpenETL. El archivo subjob_example.py contiene un ejemplo para el uso de subtareas con OpenETL. El funcionamiento es bastante sencillo. Simplemente en vez de ejecutar la tarea que queremos convertir en subtarea, crearemos un nuevo componente de tipo subtarea con ella. Después a la tarea padre le pasamos como parámetro dicha subtarea.
En el código:
job_ = openetl.job([csv_in1,datos_ajustados,openobject_out2])  # Para poder relacionar direcciones con personas, las direcciones deben estar cargadas
subjob = openetl.component.transform.subjob(job_)              # en el sistema. Las cargo previamente en una subtarea.

job1=openetl.job([subjob_cargos,subjob_dptos,subjob_paises,subjob,csv_in1,datos_ajustados,openobject_out1])

job1.run()

Las subtareas implicadas son son:
  • subjob_cargos: Carga las categorías de los empleados.
  • subjob_dptos: Carga los departamentos de la empresa.
  • subjo_paises: Realiza correspondencia de países de OpenERP con Orange.
  • subjob: Carga las direcciones de los empleados.
Y los diagramas de cada subtarea:


Diagramas de subjob_cargos y subjob_dptos.



Diagramas de subjob_paises y subjob.


Para relacionar los objetos de las subtareas con la carga final, la de los empleados, he usado un pequeño truco. Vamos a fijarnos en la lista de categorías de empleados (subjob_cargos).
Al leer las categorías desde el csv inicial, las he pasado por una transformación que ejecuta una función (preprocess_cargos):
lista_cargos = {}
def preprocess_cargos(self, channels):
    for trans in channels['carga_cargos']:
        for d in trans:
            lista_cargos[d['empId']] = d['empStatus']
    return None

pres_cargos=openetl.component.transform.map({},preprocess_cargos)
Dicha función lo único que hace es cargar en un diccionario una relación empId-empStatus, es decir, relaciona id de empleado con su categoría.
Más adelante en el código, al realizar la carga de los empleados, consulto dicho diccionario:
def preprocess(self, channels):
    cdict = {}
    for trans in channels['modificacion']:
        for d in trans:            
            . 
            .
            .   
            # Ajuste de cargo    
            d['cargo'] = lista_cargos[d['empId']]
            
    return {'resultado':cdict}  

Y por último en el mapeado del objeto, antes de cargarlo en OpenERP:
openobject_out1 = openetl.component.output.openobject_out( ooconnector, 'hr.employee', { . . . 'job_id':'cargo', } )
En el diagrama también aparece una paso previo por el componente unique. Dicho componente quita los elementos duplicados antes de cargarlos en el sistema. Hay un ejemplo de uso de dicho componente en el fichero unique_csv_data.py.
Otra cosa interesante de esta migración es como se han mapeado los datos del csv a los objetos. El fichero join_example.py contiene un ejemplo que usa map_keys. Dicho ejemplo está muy bien y funciona siempre y cuando se usen componentes “openetl.component.input.data” definidos en el propio archivo de script. El problema es que cuando se lee un archivo de csv no se está usando una entrada “estática”, sino secuencial. De modo que el map_keys es ignorado. La solución en este caso ha sido pasar por parámetro un map_key vacío y realizar el mapeo de datos desde el propio código de la función preprocess.
En el caso de países:
pre_paises=openetl.component.transform.map({},preprocess_paises)



def preprocess(self, channels):
    cdict = {}
    for trans in channels['modificacion']:
        for d in trans:
           
           .
           .
           .     
            # Ajuste de paises
            if d['state'] == "Santo Domingo": 
                d['state'] = lista_paises[62]  # Codigo de Republica Dominicana
            elif d['state'] == "Distrito Nacional":
                d['state'] = lista_paises[62]
            else:
                d['state'] = lista_paises[69] # Codigo de Espagna
            .
            .
            .                            
    return {'resultado':cdict}  

El resultado:

Como en mis anteriores post presento el código completo de la solución obtenida. Evidentemente este script de migración cubre nuestras necesidades concretas, pero es fácil adaptarlo si se necesitan migrar datos diferentes. También se podría haber realizado un script de migración atacando directamente a la base de datos de Orange, aunque por seguir con el ejemplo planteado en el primer post de OpenETL se han usado archivos .csv. En cualquier caso OpenETL también contiene conectores para consultas SQL. El archivo sql_in_example.py contiene un ejemplo con el que se podrían sustituir las llamadas a los csv con consultas sql.
import sys
sys.path.append('..')
 
import openetl
  
#===============================================================================
# Conectores
#===============================================================================
fileconnector_orange=openetl.connector.localfile('/home/carlos/Escritorio/Orange/Empleados.csv')
fileconnector_orange_dptos=openetl.connector.localfile('/home/carlos/Escritorio/Orange/dptos.csv') # Con tratamiento previo
fileconnector_orange_cargos=openetl.connector.localfile('/home/carlos/Escritorio/Orange/cargos.csv') # Con tratamiento previo
ooconnector = openetl.connector.openobject_connector('http://localhost:8069', 'master_viavansi', 'admin', 'admin', con_type='xmlrpc')


#===============================================================================
# Componentes
#===============================================================================
csv_in1= openetl.component.input.csv_in(fileconnector_orange,name='Datos de Orange')
csv_in_dptos= openetl.component.input.csv_in(fileconnector_orange_dptos,name='Departamentos')
csv_in_cargos= openetl.component.input.csv_in(fileconnector_orange_cargos,name='Cargos')



openobject_out1 = openetl.component.output.openobject_out(
     ooconnector,
     'hr.employee',
     {
      'name':'name_csv',
      'ssnid':'ssn',
      'gender':'gender',
      'birthday':'birthDate',
      'address_home_id':'name_csv', # Nombre de la relacion
      'department_id':'workStation',
      'job_id':'cargo',
      }
    )


openobject_out2 = openetl.component.output.openobject_out(
     ooconnector,
     'res.partner.address',
     {
      'name':'name_csv',
      'street':'street1',
      'street2':'street2',
      'zip':'zip',
      'city':'city',
      'country_id':'state',
      }
    )


openobject_out3 = openetl.component.output.openobject_out(
     ooconnector,
     'hr.department',
     {
      'name':'workStation',
      }
    )

# Soporte para carga de datos de cargo de empleado. El Diccionario se carga en subtarea previa
lista_cargos = {}
openobject_out4 = openetl.component.output.openobject_out(
     ooconnector,
     'hr.job',
     {
      'name':'empStatus',
      }
    )

def preprocess_cargos(self, channels):
    for trans in channels['carga_cargos']:
        for d in trans:
            lista_cargos[d['empId']] = d['empStatus']
    return None

pres_cargos=openetl.component.transform.map({},preprocess_cargos)

# Soporte para carga de datos de paises. El Diccionario se carga en subtarea previa
lista_paises = {}

openobject_in1 = openetl.component.input.openobject_in(
                 ooconnector,'res.country',
                 fields=['id','name'],
                 )

def preprocess_paises(self, channels):
    for trans in channels['carga_paises']:
        for d in trans:
            lista_paises[d['id']] = d['name']
    return None

pre_paises=openetl.component.transform.map({},preprocess_paises)

# Soporte transformaciones y componentes


def preprocess(self, channels):
    cdict = {}
    for trans in channels['modificacion']:
        for d in trans:
            # name: no existia,lo creo yo con la suma de los campos 
            
            if d['middleName'] == "":  # En OpenERP, no se separan los campos, hay un unico campo name
                d["name_csv"] = d["firstName"] + str(" ")+ d["lastName"]
            else:
                d["name_csv"] = d["firstName"] + str(" ")+ d["middleName"] +str(" ")+ d["lastName"]
            
            
            if d['gender'] == "M":     # Adaptacion de nomencaltura de datos de Orange a OpenERP
                d['gender'] = 'male'
            else:
                d['gender'] ='female'
                
            # Ajuste de paises
            if d['state'] == "Santo Domingo": 
                d['state'] = lista_paises[62]  # Codigo de Republica Dominicana
            elif d['state'] == "Distrito Nacional":
                d['state'] = lista_paises[62]
            else:
                d['state'] = lista_paises[69] # Codigo de Espagna
                
            # Ajuste de cargo    
            d['cargo'] = lista_cargos[d['empId']]
            
    return {'resultado':cdict}            

datos_ajustados=openetl.component.transform.map({},preprocess)  # Como leo un flujo de datos, no hay key_map. key_maps es para diccionarios


#===============================================================================
# Transiciones, Definicion de trabajo y ejecucion. Operaciones de Carga
#===============================================================================


log_cargos=openetl.component.transform.logger(name='Log de cargos')
unique_job = openetl.component.transform.unique()
openetl.transition(csv_in_cargos,pres_cargos,channel_destination='carga_cargos')
openetl.transition(pres_cargos,log_cargos)
openetl.transition(csv_in_cargos,unique_job)
openetl.transition(unique_job,openobject_out4)
job_cargos=openetl.job([csv_in_cargos,unique_job,openobject_out4,log_cargos])
subjob_cargos = openetl.component.transform.subjob(job_cargos)  


unique = openetl.component.transform.unique()
log_dptos=openetl.component.transform.logger(name='Log departamentos')

openetl.transition(csv_in_dptos,unique)
openetl.transition(unique,log_dptos,channel_source='main')
openetl.transition(unique,openobject_out3)
job_dptos=openetl.job([log_dptos,openobject_out3])
subjob_dptos = openetl.component.transform.subjob(job_dptos)  


openetl.transition(openobject_in1,pre_paises, channel_destination='carga_paises')
job_paises = openetl.job([openobject_in1,pre_paises])
subjob_paises = openetl.component.transform.subjob(job_paises)  


openetl.transition(csv_in1,datos_ajustados, channel_destination='modificacion') # Leo datos aplicando preprocesamiento
openetl.transition(csv_in1,openobject_out2) # Direcciones 
openetl.transition(csv_in1,openobject_out1) # Personas


job_ = openetl.job([csv_in1,datos_ajustados,openobject_out2])  # Para poder relacionar direcciones con personas, las direcciones deben estar cargadas
subjob = openetl.component.transform.subjob(job_)              # en el sistema. Las cargo previamente en una subtarea.

job1=openetl.job([subjob_cargos,subjob_dptos,subjob_paises,subjob,csv_in1,datos_ajustados,openobject_out1])
job1.run()
Con esto concluye la parte técnica del post. Creo que OpenETL es una tecnología muy interesante, que permite realizar trabajos de ETL de forma bastante cómoda e intuitiva. También os comento que he echado en falta algo más de documentación técnica sobre OpenETL, ya que he tenido que recurrir al código fuente de muchos componentes, transformaciones, etc. para averiguar que es lo que hacían.
A pesar de ello la línea de aprendizaje de esta tecnología es bastante sencilla una vez que sabes que hay que hacer, y se pueden lograr grandes cosas en poco tiempo.
Para finalizar os comentaré que mi impresión final sobre OpenETL es muy buena. No sólo porque se adapte perfectamente a operaciones ETL sobre OpenERP, sino porque tiene un amplio abanico de conectores (sql, facebook, xmlrpc,csv, gdoc, gcalendar, etc) que permiten usar OpenETL en muchos proyectos con distintas tecnologías.

No hay comentarios:

Publicar un comentario

Related Posts Plugin for WordPress, Blogger...