一、工具简介

ETL(Extract-Transform-Load)即数据的抽取、转换和加载,是数据仓库建设中很重要的一部分。在数据仓库建设中,我们需要从各个业务系统中抽取数据,再进行数据的清洗、转换和整合,最后将处理好的数据加载到数据仓库中供决策使用。目前市场上常用的ETL工具有很多,其中比较受欢迎的三种工具为:Informatica、Talend和Pentaho。

二、Informatica

Informatica是一家美国公司,提供大数据集成解决方案,包括数据仓库、云数据集成和大数据集成等。Informatica的ETL工具是业界公认的优秀工具之一,其易用性、性能和稳定性得到广泛认可。

Informatica工具使用基于图形化的设计,通过拖拽和连接组件进行ETL流程的构建,也支持手写代码。Informatica的主要特点是具有良好的扩展性、灵活的数据转换和支持多种数据源。同时,其提供基于高质量函数的数据转换,函数库非常强大且易用,支持非常细致的数据处理。

完整代码示例:

source = profect_folder + "/employee.csv"
df_emp = spark.read.format("csv").option("header", "true").load(source)

cnfg = spark.read.format("properties").load(profect_folder+"/config.properties")
target_db = cnfg.filter("key='target_db'").collect()[0]['value']
target_tbl = cnfg.filter("key='target_tbl'").collect()[0]['value']

df_cc = df_emp
df_cc = df_cc.filter("salary > 5000")
df_cc.show()

df_cc.write.jdbc(jdbc_url, target_tbl, mode='overwrite', properties=connection_properties)

三、Talend

Talend是一家法国公司,提供开源的ETL工具,其ETL工具不仅支持传统的ETL需求,还支持大数据、企业服务总线和数据质量管理等领域。Talend的ETL工具具有模块化的体系结构,易于扩展,并且支持多种数据源和目标。

Talend的ETL工具使用基于图形化的界面,依托于视觉化的拖放和配置方式,支持快速建立数据流程。同时,Talend的ETL工具还提供强大的数据处理库,数据转换能力很强,并且支持自定义业务逻辑。

完整代码示例:

tFileInputDelimited_1 ------> tMap_1 ------> tFileOutputDelimited_1
                          |
                   tLogRow_1

四、Pentaho

Pentaho是一家美国公司,提供商务智能和数据集成解决方案,包括ETL工具、数据挖掘、报表和OLAP分析等方面。Pentaho的ETL工具基于Java语言开发,可在多种操作系统下运行,支持多种数据源和目标。Pentaho的ETL工具也提供目录式的集成,以加速数据交换和数据整合性的提高。

Pentaho的ETL工具使用基于图形化的设计,支持多种操作方式,如拖拽、导入、运行和配置组件。同时,Pentaho的ETL工具还提供了丰富的数据转换方式,比如策略分离、动态脚本、表达式和正则表达式等。

完整代码示例:

job = Job("TestJob", "Test transformation job")

source_table = job.getDatabaseTable("postgres", "public", "employees")
target_table = job.getDatabaseTable("postgres", "public", "employees_transformed")
target_table.truncate()

source = Input("Table input", "Select * From " + source_table.getQuotedString())

lookup = SelectValues("Exclude fields") # exclude unwanted fields
lookup.fields = ["employee_id", "last_name", "first_name"]
lookup.metaFields = ["employee_id"]
lookup.setInput(source)
lookup.setOutput("Transformed output")

lookup2 = ReplaceNull("Replace null with default value")
lookup2.defaultValues = { "salary" : 0 }
lookup2.setInput(lookup)
lookup2.setOutput("Transformed output")

target = Output("Table output", target_table.getQuotedString())
target.setTruncate(True)
target.setInput(lookup2)

job.start()