跳到主要内容

DataX Transformer 开发


复制粘贴开始干 -> transformer开发

Transformer定义

在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。

UDF手册

1. dx_substr

  • 参数:3个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:目标字段长度。
  • 返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
  • 举例:
dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"
dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test"

2. dx_pad

  • 参数:4个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。
    • 第三个参数:目标字段长度。
    • 第四个参数:需要pad的字符。
  • 返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
  • 举例:
         dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz
dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz

3. dx_replace

  • 参数:4个
    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:需要替换的字段长度。
    • 第四个参数:需要替换的字符串。
  • 返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
  • 举例:

dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est"
dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"data****"

4. dx_filter

(关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。)

  • 参数:

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <=
    • 第三个参数:正则表达式(java正则表达式)、值。
  • 返回:

    • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
    • like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
    • >, =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
    • 如果目标colunn为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。
    • 举例:
dx_filter(1,"like","dataTest")  
dx_filter(1,">=","10")

5. dx_groovy

  • 参数。
    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage
  • 备注:
    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):
  • 举例:
groovy 实现的subStr:
String code = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String newValue = oriValue.substring(0, 3);\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
dx_groovy(record);
groovy 实现的Replace
String code2 = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
groovy 实现的Pad
String code3 = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String padString = \"12345\";\n" +
" String finalPad = \"\";\n" +
" int NeedLength = 8 - oriValue.length();\n" +
" while (NeedLength > 0) {\n" +
"\n" +
" if (NeedLength >= padString.length()) {\n" +
" finalPad += padString;\n" +
" NeedLength -= padString.length();\n" +
" } else {\n" +
" finalPad += padString.substring(0, NeedLength);\n" +
" NeedLength = 0;\n" +
" }\n" +
" }\n" +
" String newValue= finalPad + oriValue;\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";

Job定义

  • 本例中,配置3个UDF。
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 100000
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
},
"transformer": [
{
"name": "dx_substr",
"parameter":
{
"columnIndex":5,
"paras":["1","3"]
}
},
{
"name": "dx_replace",
"parameter":
{
"columnIndex":4,
"paras":["3","4","****"]
}
},
{
"name": "dx_groovy",
"parameter":
{
"code": "//groovy code//",
"extraPackage":[
"import somePackage1;",
"import somePackage2;"
]
}
}
]
}
]
}
}

计量和脏数据

Transform过程涉及到数据的转换,可能造成数据的增加或减少,因此更加需要精确度量,包括:

  • Transform的入参Record条数、字节数。
  • Transform的出参Record条数、字节数。
  • Transform的脏数据Record条数、字节数。
  • 如果是多个Transform,某一个发生脏数据,将不会再进行后面的transform,直接统计为脏数据。
  • 目前只提供了所有Transform的计量(成功,失败,过滤的count,以及transform的消耗时间)。

涉及到运行过程的计量数据展现定义如下:

Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%

注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。

涉及到最终作业的计量数据展现定义如下:

任务启动时刻                    : 2015-03-10 17:34:21
任务结束时刻 : 2015-03-10 17:34:31
任务总计耗时 : 10s
任务平均流量 : 2.10MB/s
记录写入速度 : 100000rec/s
转换输入总数 : 1000000
转换输出总数 : 1000000
读出记录总数 : 1000000
同步失败总数 : 0

注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。

实现一个Transformer

需求:数据在源端库使用了国密SM4加密,抽取后再解密相当于做了 2 次转换,因此需要在 DataX 中实现一个 SM4 解密的 Transformer,在抽取过程中就可以解密数据。

1. Transformer接口

package com.alibaba.datax.core.transport.transformer;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.util.container.CoreConstant;
import com.alibaba.datax.core.util.container.JarLoader;
import com.alibaba.datax.transformer.ComplexTransformer;
import com.alibaba.datax.transformer.Transformer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* no comments.
* Created by boomlee on 16/3/3.
*/
public class TransformerRegistry {

private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>();

static {
/**
* add native transformer
* local storage and from server will be delay load.
*/

registTransformer(new SubstrTransformer());
registTransformer(new PadTransformer());
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());

// 新增SM4解密Transformer
registTransformer(new SM4DecryptTransformer());

}

public static void loadTransformerFromLocalStorage() {
//add local_storage transformer
loadTransformerFromLocalStorage(null);
}


public static void loadTransformerFromLocalStorage(List<String> transformers) {

String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list();
if (null == paths) {
return;
}

for (final String each : paths) {
try {
if (transformers == null || transformers.contains(each)) {
loadTransformer(each);
}
} catch (Exception e) {
LOG.error(String.format("skip transformer(%s) loadTransformer has Exception(%s)", each, e.getMessage()), e);
}

}
}

public static void loadTransformer(String each) {
String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;
Configuration transformerConfiguration;
try {
transformerConfiguration = loadTransFormerConfig(transformerPath);
} catch (Exception e) {
LOG.error(String.format("skip transformer(%s),load transformer.json error, path = %s, ", each, transformerPath), e);
return;
}

String className = transformerConfiguration.getString("class");
if (StringUtils.isEmpty(className)) {
LOG.error(String.format("skip transformer(%s),class not config, path = %s, config = %s", each, transformerPath, transformerConfiguration.beautify()));
return;
}

String funName = transformerConfiguration.getString("name");
if (!each.equals(funName)) {
LOG.warn(String.format("transformer(%s) name not match transformer.json config name[%s], will ignore json's name, path = %s, config = %s", each, funName, transformerPath, transformerConfiguration.beautify()));
}
JarLoader jarLoader = new JarLoader(new String[]{transformerPath});

try {
Class<?> transformerClass = jarLoader.loadClass(className);
Object transformer = transformerClass.newInstance();
if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
((ComplexTransformer) transformer).setTransformerName(each);
registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
} else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
((Transformer) transformer).setTransformerName(each);
registTransformer((Transformer) transformer, jarLoader, false);
} else {
LOG.error(String.format("load Transformer class(%s) error, path = %s", className, transformerPath));
}
} catch (Exception e) {
//错误funciton跳过
LOG.error(String.format("skip transformer(%s),load Transformer class error, path = %s ", each, transformerPath), e);
}
}

private static Configuration loadTransFormerConfig(String transformerPath) {
return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
}

public static TransformerInfo getTransformer(String transformerName) {

TransformerInfo result = registedTransformer.get(transformerName);

//if (result == null) {
//todo 再尝试从disk读取
//}

return result;
}

public static synchronized void registTransformer(Transformer transformer) {
registTransformer(transformer, null, true);
}

public static synchronized void registTransformer(Transformer transformer, ClassLoader classLoader, boolean isNative) {

checkName(transformer.getTransformerName(), isNative);

if (registedTransformer.containsKey(transformer.getTransformerName())) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + transformer.getTransformerName());
}

registedTransformer.put(transformer.getTransformerName(), buildTransformerInfo(new ComplexTransformerProxy(transformer), isNative, classLoader));

}

public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer, ClassLoader classLoader, boolean isNative) {

checkName(complexTransformer.getTransformerName(), isNative);

if (registedTransformer.containsKey(complexTransformer.getTransformerName())) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + complexTransformer.getTransformerName());
}

registedTransformer.put(complexTransformer.getTransformerName(), buildTransformerInfo(complexTransformer, isNative, classLoader));
}

private static void checkName(String functionName, boolean isNative) {
boolean checkResult = true;
if (isNative) {
if (!functionName.startsWith("dx_")) {
checkResult = false;
}
} else {
if (functionName.startsWith("dx_")) {
checkResult = false;
}
}

if (!checkResult) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR, " name=" + functionName + ": isNative=" + isNative);
}

}

private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
TransformerInfo transformerInfo = new TransformerInfo();
transformerInfo.setClassLoader(classLoader);
transformerInfo.setIsNative(isNative);
transformerInfo.setTransformer(complexTransformer);
return transformerInfo;
}

public static List<String> getAllSuportTransformer() {
return new ArrayList<String>(registedTransformer.keySet());
}
}

2. Transformer实现

package com.alibaba.datax.core.transport.transformer;

import cn.hutool.crypto.symmetric.SymmetricCrypto;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.transformer.Transformer;


/**
* @author Boomlee
* @description 解密
* @date 2023/3/20 17:24
*/

public class SM4DecryptTransformer extends Transformer {

// 密钥长度
private static final int KEY_LENGTH = 16;

public SM4DecryptTransformer() {
setTransformerName("dx_decrypt_sm4");
}

@Override
public Record evaluate(Record record, Object... paras) {

String key;
int columnIndex;

columnIndex = (Integer) paras[0];
key = (String) paras[1];

// 检查密钥长度
if (key.length() != KEY_LENGTH) {
throw new IllegalArgumentException("Transformer dx_encrypt key must be 16 characters long");
}

SymmetricCrypto sm4 = new SymmetricCrypto("SM4/ECB/PKCS5Padding", key.getBytes());

// 获取列的值并加密
try {
if (record.getColumn(columnIndex).asString() != null) {
record.setColumn(columnIndex, new StringColumn(sm4.decryptStr(record.getColumn(columnIndex).asString())));
}
} catch (Exception e) {
String errorMessage = String.format(
"Transformer dx_decrypt_sm4 failed: 解密失败,请检查数据密钥是否正确以及数据是否已加密。\n" +
"出错列信息:\n" +
" - 列类型: %s\n" +
" - 列索引: %d\n" +
" - 列数据: %s",
record.getColumn(columnIndex).getType(),
columnIndex,
record.getColumn(columnIndex).asString()
);
throw new RuntimeException(errorMessage);
}

return record;
}

public static void main(String[] args) {

String key= "b7403453c95411ed";
String value = "/";


SymmetricCrypto sm4 = new SymmetricCrypto("SM4/ECB/PKCS5Padding", key.getBytes());

System.out.println(sm4.decryptStr(value));

}
}

注意:SM4DecryptTransformer类中的evaluate方法是实现解密的核心逻辑,其中的代码逻辑是对列的值进行解密操作。

3.Job 配置

{
"core":
{
"transport":
{
"channel":
{
"speed":
{
"byte": 4194304
}
}
}
},
"job":
{
"setting":
{
"speed":
{
"channel": 3,
"byte": 1048576
},
"errorLimit":
{
"record": 0,
"percentage": 0.02
}
},
"content":
[
{
"reader":
{
"name": "postgresqlreader",
"parameter":
{
"username": "*",
"password": "*",
"column":
[
"\"id\"",
"\"name\"",
"\"create_time\"",
"\"str1\"",
"\"str2\"",
"\"str3\"",
"\"str4\"",
"\"str5\"",
"\"str6\"",
"\"str7\"",
"\"str8\"",
"\"str9\"",
"\"str10\""
],
"splitPk": "",
"connection":
[
{
"table":
[
"\"public\".\"stu4\""
],
"jdbcUrl":
[
"jdbc:postgresql://IP:5432/data_center_db"
]
}
]
}
},
"writer":
{
"name": "gpdbwriter",
"parameter":
{
"username": "*",
"password": "*",
"column":
[
"\"id\"",
"\"name\"",
"\"create_time\"",
"\"str1\"",
"\"str2\"",
"\"str3\"",
"\"str4\"",
"\"str5\"",
"\"str6\"",
"\"str7\"",
"\"str8\"",
"\"str9\"",
"\"str10\""
],
"connection":
[
{
"table":
[
"\"public\".\"stu5\""
],
"jdbcUrl": "jdbc:postgresql://IP:5432/data_center_db"
}
]
}
},
"transformer":
[
{
"name": "dx_decrypt_sm4",
"parameter":
{
"columnIndex": 4,
"paras":
[
"boomleeboomlee"
]
}
},
{
"name": "dx_decrypt_sm4",
"parameter":
{
"columnIndex": 5,
"paras":
[
"boomleeboomlee"
]
}
}
]
}
]
}
}
  • name : 固定值 dx_decrypt_sm4
  • columnIndex : 需要解密的列索引,从 0 开始
  • paras: SM4 解密的密钥 ,16 位长度