El escenario:
Para migrar los datos hemos partido de los archivos .csv que se generan desde OrangeHRM. Como dichos archivos contienen información de la empresa no los voy a adjuntar con el post, simplemente me limitaré a describir los campos. También partimos de un OpenERP que ya tiene cargados datos como países, provincias, y empresa. Para realizar las llamadas al xmlrpc he usado al usuario admin, el cual ya pertenece a la empresa. De esta forma el campo company_id se ha ajustado automáticamente.Los archivos .csv contienen los siguientes campos:
Empleados.csv:
- empID: Identificación del empleado en Orange.
- lastName: Apellidos del empleado.
- firstName: Primer nombre del empleado.
- middleName: Segundo nombre del empleado.
- street1: Dirección del empleado.
- street2: Campo de apoyo para street1.
- city: Municipio.
- state: Provincia.
- zip: Código postal.
- gender: Género.
- birthDate: Fecha de nacimiento.
- ssn: Número de la seguridad social.
- workStation: Departamento al que pertenece el empleado.
dptos.csv:
- workStation: Nombre del departamento en el sistema.
- empId: Identificación del empleado en Orange.
- empStatus: Cargo del empleado en la empresa.
Si te alejas de la pantalla y te pones bizco, verás a un tio bailando. En realidad es el diagrama de clases simplificado de objetos OpenERP.
A tener en cuenta:
Observando el modelo de datos se puede ver que muchos objetos están relacionados entre sí. Esto implica que si queremos cargar los empleados, antes tenemos que tener cargados en el sistema las direcciones. Este mismo comportamiento nos sucede con los departamentos, puestos de trabajo, etc.Para solucionar este inconveniente he usado subtareas de OpenETL. El archivo subjob_example.py contiene un ejemplo para el uso de subtareas con OpenETL. El funcionamiento es bastante sencillo. Simplemente en vez de ejecutar la tarea que queremos convertir en subtarea, crearemos un nuevo componente de tipo subtarea con ella. Después a la tarea padre le pasamos como parámetro dicha subtarea.
En el código:
job_ = openetl.job([csv_in1,datos_ajustados,openobject_out2]) # Para poder relacionar direcciones con personas, las direcciones deben estar cargadas subjob = openetl.component.transform.subjob(job_) # en el sistema. Las cargo previamente en una subtarea. job1=openetl.job([subjob_cargos,subjob_dptos,subjob_paises,subjob,csv_in1,datos_ajustados,openobject_out1]) job1.run()Las subtareas implicadas son son:
- subjob_cargos: Carga las categorías de los empleados.
- subjob_dptos: Carga los departamentos de la empresa.
- subjo_paises: Realiza correspondencia de países de OpenERP con Orange.
- subjob: Carga las direcciones de los empleados.
Diagramas de subjob_cargos y subjob_dptos.
Diagramas de subjob_paises y subjob.
Para relacionar los objetos de las subtareas con la carga final, la de los empleados, he usado un pequeño truco. Vamos a fijarnos en la lista de categorías de empleados (subjob_cargos).
Al leer las categorías desde el csv inicial, las he pasado por una transformación que ejecuta una función (preprocess_cargos):
lista_cargos = {} def preprocess_cargos(self, channels): for trans in channels['carga_cargos']: for d in trans: lista_cargos[d['empId']] = d['empStatus'] return None pres_cargos=openetl.component.transform.map({},preprocess_cargos)Dicha función lo único que hace es cargar en un diccionario una relación empId-empStatus, es decir, relaciona id de empleado con su categoría.
Más adelante en el código, al realizar la carga de los empleados, consulto dicho diccionario:
def preprocess(self, channels): cdict = {} for trans in channels['modificacion']: for d in trans: . . . # Ajuste de cargo d['cargo'] = lista_cargos[d['empId']] return {'resultado':cdict} Y por último en el mapeado del objeto, antes de cargarlo en OpenERP:En el diagrama también aparece una paso previo por el componente unique. Dicho componente quita los elementos duplicados antes de cargarlos en el sistema. Hay un ejemplo de uso de dicho componente en el fichero unique_csv_data.py.
openobject_out1 = openetl.component.output.openobject_out( ooconnector, 'hr.employee', { . . . 'job_id':'cargo', } )
Otra cosa interesante de esta migración es como se han mapeado los datos del csv a los objetos. El fichero join_example.py contiene un ejemplo que usa map_keys. Dicho ejemplo está muy bien y funciona siempre y cuando se usen componentes “openetl.component.input.data” definidos en el propio archivo de script. El problema es que cuando se lee un archivo de csv no se está usando una entrada “estática”, sino secuencial. De modo que el map_keys es ignorado. La solución en este caso ha sido pasar por parámetro un map_key vacío y realizar el mapeo de datos desde el propio código de la función preprocess.
En el caso de países:
pre_paises=openetl.component.transform.map({},preprocess_paises) def preprocess(self, channels): cdict = {} for trans in channels['modificacion']: for d in trans: . . . # Ajuste de paises if d['state'] == "Santo Domingo": d['state'] = lista_paises[62] # Codigo de Republica Dominicana elif d['state'] == "Distrito Nacional": d['state'] = lista_paises[62] else: d['state'] = lista_paises[69] # Codigo de Espagna . . . return {'resultado':cdict}
El resultado:
Como en mis anteriores post presento el código completo de la solución obtenida. Evidentemente este script de migración cubre nuestras necesidades concretas, pero es fácil adaptarlo si se necesitan migrar datos diferentes. También se podría haber realizado un script de migración atacando directamente a la base de datos de Orange, aunque por seguir con el ejemplo planteado en el primer post de OpenETL se han usado archivos .csv. En cualquier caso OpenETL también contiene conectores para consultas SQL. El archivo sql_in_example.py contiene un ejemplo con el que se podrían sustituir las llamadas a los csv con consultas sql.import sys sys.path.append('..') import openetl #=============================================================================== # Conectores #=============================================================================== fileconnector_orange=openetl.connector.localfile('/home/carlos/Escritorio/Orange/Empleados.csv') fileconnector_orange_dptos=openetl.connector.localfile('/home/carlos/Escritorio/Orange/dptos.csv') # Con tratamiento previo fileconnector_orange_cargos=openetl.connector.localfile('/home/carlos/Escritorio/Orange/cargos.csv') # Con tratamiento previo ooconnector = openetl.connector.openobject_connector('http://localhost:8069', 'master_viavansi', 'admin', 'admin', con_type='xmlrpc') #=============================================================================== # Componentes #=============================================================================== csv_in1= openetl.component.input.csv_in(fileconnector_orange,name='Datos de Orange') csv_in_dptos= openetl.component.input.csv_in(fileconnector_orange_dptos,name='Departamentos') csv_in_cargos= openetl.component.input.csv_in(fileconnector_orange_cargos,name='Cargos') openobject_out1 = openetl.component.output.openobject_out( ooconnector, 'hr.employee', { 'name':'name_csv', 'ssnid':'ssn', 'gender':'gender', 'birthday':'birthDate', 'address_home_id':'name_csv', # Nombre de la relacion 'department_id':'workStation', 'job_id':'cargo', } ) openobject_out2 = openetl.component.output.openobject_out( ooconnector, 'res.partner.address', { 'name':'name_csv', 'street':'street1', 'street2':'street2', 'zip':'zip', 'city':'city', 'country_id':'state', } ) openobject_out3 = openetl.component.output.openobject_out( ooconnector, 'hr.department', { 'name':'workStation', } ) # Soporte para carga de datos de cargo de empleado. El Diccionario se carga en subtarea previa lista_cargos = {} openobject_out4 = openetl.component.output.openobject_out( ooconnector, 'hr.job', { 'name':'empStatus', } ) def preprocess_cargos(self, channels): for trans in channels['carga_cargos']: for d in trans: lista_cargos[d['empId']] = d['empStatus'] return None pres_cargos=openetl.component.transform.map({},preprocess_cargos) # Soporte para carga de datos de paises. El Diccionario se carga en subtarea previa lista_paises = {} openobject_in1 = openetl.component.input.openobject_in( ooconnector,'res.country', fields=['id','name'], ) def preprocess_paises(self, channels): for trans in channels['carga_paises']: for d in trans: lista_paises[d['id']] = d['name'] return None pre_paises=openetl.component.transform.map({},preprocess_paises) # Soporte transformaciones y componentes def preprocess(self, channels): cdict = {} for trans in channels['modificacion']: for d in trans: # name: no existia,lo creo yo con la suma de los campos if d['middleName'] == "": # En OpenERP, no se separan los campos, hay un unico campo name d["name_csv"] = d["firstName"] + str(" ")+ d["lastName"] else: d["name_csv"] = d["firstName"] + str(" ")+ d["middleName"] +str(" ")+ d["lastName"] if d['gender'] == "M": # Adaptacion de nomencaltura de datos de Orange a OpenERP d['gender'] = 'male' else: d['gender'] ='female' # Ajuste de paises if d['state'] == "Santo Domingo": d['state'] = lista_paises[62] # Codigo de Republica Dominicana elif d['state'] == "Distrito Nacional": d['state'] = lista_paises[62] else: d['state'] = lista_paises[69] # Codigo de Espagna # Ajuste de cargo d['cargo'] = lista_cargos[d['empId']] return {'resultado':cdict} datos_ajustados=openetl.component.transform.map({},preprocess) # Como leo un flujo de datos, no hay key_map. key_maps es para diccionarios #=============================================================================== # Transiciones, Definicion de trabajo y ejecucion. Operaciones de Carga #=============================================================================== log_cargos=openetl.component.transform.logger(name='Log de cargos') unique_job = openetl.component.transform.unique() openetl.transition(csv_in_cargos,pres_cargos,channel_destination='carga_cargos') openetl.transition(pres_cargos,log_cargos) openetl.transition(csv_in_cargos,unique_job) openetl.transition(unique_job,openobject_out4) job_cargos=openetl.job([csv_in_cargos,unique_job,openobject_out4,log_cargos]) subjob_cargos = openetl.component.transform.subjob(job_cargos) unique = openetl.component.transform.unique() log_dptos=openetl.component.transform.logger(name='Log departamentos') openetl.transition(csv_in_dptos,unique) openetl.transition(unique,log_dptos,channel_source='main') openetl.transition(unique,openobject_out3) job_dptos=openetl.job([log_dptos,openobject_out3]) subjob_dptos = openetl.component.transform.subjob(job_dptos) openetl.transition(openobject_in1,pre_paises, channel_destination='carga_paises') job_paises = openetl.job([openobject_in1,pre_paises]) subjob_paises = openetl.component.transform.subjob(job_paises) openetl.transition(csv_in1,datos_ajustados, channel_destination='modificacion') # Leo datos aplicando preprocesamiento openetl.transition(csv_in1,openobject_out2) # Direcciones openetl.transition(csv_in1,openobject_out1) # Personas job_ = openetl.job([csv_in1,datos_ajustados,openobject_out2]) # Para poder relacionar direcciones con personas, las direcciones deben estar cargadas subjob = openetl.component.transform.subjob(job_) # en el sistema. Las cargo previamente en una subtarea. job1=openetl.job([subjob_cargos,subjob_dptos,subjob_paises,subjob,csv_in1,datos_ajustados,openobject_out1]) job1.run()Con esto concluye la parte técnica del post. Creo que OpenETL es una tecnología muy interesante, que permite realizar trabajos de ETL de forma bastante cómoda e intuitiva. También os comento que he echado en falta algo más de documentación técnica sobre OpenETL, ya que he tenido que recurrir al código fuente de muchos componentes, transformaciones, etc. para averiguar que es lo que hacían.
A pesar de ello la línea de aprendizaje de esta tecnología es bastante sencilla una vez que sabes que hay que hacer, y se pueden lograr grandes cosas en poco tiempo.
Para finalizar os comentaré que mi impresión final sobre OpenETL es muy buena. No sólo porque se adapte perfectamente a operaciones ETL sobre OpenERP, sino porque tiene un amplio abanico de conectores (sql, facebook, xmlrpc,csv, gdoc, gcalendar, etc) que permiten usar OpenETL en muchos proyectos con distintas tecnologías.
No hay comentarios:
Publicar un comentario