本文通过示例的方式为您介绍自定义输入与输出组件的开发案例。
输入组件开发
通过以下示例代码构建Java工程,并打为JAR包。
Maven的依赖如下。
<dependency> <groupId>com.alibaba.dt.pipeline</groupId> <artifactId>plugin.center.base</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
代码示例如下。
package demo; import com.alibaba.dt.pipeline.plugin.center.base.Reader; import com.alibaba.dt.pipeline.plugin.center.base.RecordSender; import com.alibaba.dt.pipeline.plugin.center.conf.Configuration; import com.alibaba.dt.pipeline.plugin.center.element.*; import com.alibaba.dt.pipeline.plugin.center.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * 用户入口类ReaderDemo * 该类必须继承自com.alibaba.dt.pipeline.plugin.center.base.Reader * 该类中暂时不需要定义方法,但是必须要定义两个public的静态子类:Job和Task,名字必须是Job和Task,大小写敏感,否则系统找不到类 * * 系统会首先初始化Job类,调用init做初始化,再调用prepare做准备工作,然后调用split,把job的configuration拆分成用户指定的并发度个数的 * configuration。然后执行task。当所有的task全部执行完毕,再执行post,最后再执行destroy。destroy和post方法的不同是,destroy总是 * 会执行,哪怕出现了异常。 * * 当系统调用Job的split方法得到很多configuration后,会用每一个configuration实例化一个Task,Task的方法依次执行的顺序是:init, * prepare,getInputRowMeta,startRead,post,destroy,同样的,destroy和post方法的不同是,destroy总是会执行,哪怕出现了异常。 * */ public class ReaderDemo extends Reader { // 这个是用户在自定义数据源中定义的key,这是一个demo数据源的key,用户自定义的话,这个key值应该不同 public static final String DS_KEY = "demo_ds"; // 代码内部定义的task编号,没有使用到可以不用 public static final String TASK_INDEX = "taskIndex"; // 用户在离线管道中配置该组件的一个参数 public static final String USER_KEY = "user_param"; /** * public的静态子类Job必须继承自Reader.Job */ public static class Job extends Reader.Job { private static final Logger logger = LoggerFactory.getLogger(Job.class); Configuration jobConfig; @Override public void init() { logger.info("job init"); //通过这个方法拿到用户的输入组件配置,这些参数就是用户在"输入组件"界面上配置的参数 this.jobConfig = super.getPluginJobConf(); String value = jobConfig.getString(USER_KEY, "default_value"); String ds = jobConfig.getString(DS_KEY, "default_ds"); logger.info("user_param:{} ds:{}", value, ds); } @Override public void prepare() { super.prepare(); logger.info("job prepare"); } @Override public List<Configuration> split(int i) { logger.info("job split:{}", i); return IntStream.range(0, i).boxed().map(x -> { Configuration tmpConfiguration = jobConfig.clone(); // 写入configuration的编号 tmpConfiguration.set(TASK_INDEX, x); return tmpConfiguration; }).collect(Collectors.toList()); } @Override public void post() { super.post(); logger.info("job post"); } @Override public void destroy() { logger.info("job destroy"); } } /** * public的静态子类Task必须继承自Reader.Task */ public static class Task extends Reader.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private int index; private RowMeta rowMeta; @Override public void init() { // 获取Job split出来的configuration this.taskConfig = super.getPluginJobConf(); // 获取Task的编号 index = taskConfig.getInt(TASK_INDEX, -1); logger.info("task init:{}", index); } @Override public void prepare() { super.prepare(); logger.info("task prepare"); } @Override public void startRead(RecordSender recordSender) { logger.info("task start"); Random random = new Random(); // 读取数据,封装成Record,发送到系统内部 for(int i = 0; i < 10; i++) { Record record = recordSender.createRecord(); // 只是3个列,这个列的类型需要和getInputRowMeta函数的meta对其,如果是真实数据源db,需要把读取到的数据转换成特定的column record.addColumn(new LongColumn(i)); record.addColumn(new StringColumn("name_" + i)); record.addColumn(new DoubleColumn(random.nextDouble())); recordSender.sendToWriter(record); logger.info("read record:{}", i); } } @Override public RowMeta getInputRowMeta(){ logger.info("task column meta"); rowMeta = new RowMeta(); /** * 在这里定义输入组件读取到数据后,往下游写出的数据的schema,一般而言,用户可能需要连接到db,获取到真实数据源的schema * 特别注意:这里的column名字必须和输入组件配置页面的column名字完全一样,顺序也要一样。比如:这里定义了id、name、score * 那么,该组件的配置页面也必须配置上一样的列名:id、name、score */ ColumnMeta columnMeta1 = new ColumnMeta(); columnMeta1.setName("id"); columnMeta1.setType(Column.Type.LONG); rowMeta.addColumnMeta(columnMeta1); ColumnMeta columnMeta2 = new ColumnMeta(); columnMeta2.setName("name"); columnMeta2.setType(Column.Type.STRING); rowMeta.addColumnMeta(columnMeta2); ColumnMeta columnMeta3 = new ColumnMeta(); columnMeta3.setName("score"); columnMeta3.setType(Column.Type.DOUBLE); rowMeta.addColumnMeta(columnMeta3); return rowMeta; } @Override public void post() { super.post(); logger.info("task post"); } @Override public void destroy() { logger.info("task destroy"); } } }
请参见新建离线自定义源类型,新建自定义数据源。
说明通过配置自定义数据源类型,Dataphin将自动为您生成对应的数据源类型和组件。
参数示例如下。
参数
示例
基本配置
类型
其他数据库。
名称
demo_reader。
类型编码
组件的唯一标识。供后端使用,创建后不可编辑。
数据源JSON
数据源JSON示例如下:
[ { "columnName":"demo_ds", "columnType":"NORMAL", "text":{ "zh_CN":"数据源", "en_US":"数据源", "zh_TW":"数据源" }, "placeholder":{ "zh_CN":"abc", "en_US":"abc", "zh_TW":"abc" } } ]
上述JSON示例解释说明如下:
columnName:系统会生成demo_reader的数据源模板,该模板只包含一个参数demo_ds。
columnType:参数类型为NORMAL。密码类的,需使用ENCRYPT。
placeholder:用户输入框中的默认值在三种语言情况下均为abc。不填写,则用户输入框为空。
text:参数名称在三种语言情况下均显示为数据源。
资源配置
读写插件
勾选读取插件。
ClassName:插件的类名。该示例为demo.ReaderDemo。
上传文件:上传打包完成的JAR文件。
描述信息
描述
请输入组件的简单描述信息。不可超过128个字符。
单击创建。
当自定义组件创建成功后,数据源管理会自动生成一个DEMO_READER的数据源类型;且在离线管道的组件库中也将生成一个DEMO_READER输入组件。
输出组件开发
通过以下示例代码构建Java工程,并打为JAR包。
Maven的依赖如下。
<dependency> <groupId>com.alibaba.dt.pipeline</groupId> <artifactId>plugin.center.base</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
代码示例如下。
package demo; import com.alibaba.dt.pipeline.plugin.center.base.RecordReceiver; import com.alibaba.dt.pipeline.plugin.center.base.Writer; import com.alibaba.dt.pipeline.plugin.center.conf.Configuration; import com.alibaba.dt.pipeline.plugin.center.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * 用户入口类WriterDemo * 该类必须继承自com.alibaba.dt.pipeline.plugin.center.base.Writer * 该类中暂时不需要定义方法,但是必须要定义两个public的静态子类:Job和Task,名字必须是Job和Task,大小写敏感,否则系统找不到类 * * 系统会首先初始化Job类,调用init做初始化,再调用prepare做准备工作,然后调用split,把job的configuration拆分成用户指定的并发度个数的 * configuration。然后执行task。当所有的task全部执行完毕,再执行post,最后再执行destroy。destroy和post方法的不同是,destroy总是 * 会执行,哪怕出现了异常。 * * 当系统调用Job的split方法得到很多configuration后,会用每一个configuration实例化一个Task,Task的方法依次执行的顺序是:init, * prepare,startWrite,post,destroy,同样的,destroy和post方法的不同是,destroy总是会执行,哪怕出现了异常。 * */ public class WriterDemo extends Writer { // 这个是用户在自定义数据源中定义的key,这是一个demo数据源的key,用户自定义的话,这个key值应该不同 public static final String DS_KEY = "demo_ds"; // 代码内部定义的task编号,没有使用到可以不用 public static final String TASK_INDEX = "taskIndex"; // 用户在离线管道中配置该组件的一个参数 public static final String USER_KEY = "user_param"; /** * public的静态子类Job必须继承自Writer.Job */ public static class Job extends Writer.Job { private static final Logger logger = LoggerFactory.getLogger(Job.class); Configuration jobConfig; @Override public void init() { logger.info("job init"); //通过这个方法拿到用户的输出组件配置,这些参数就是用户在输出组件界面上配置的参数 this.jobConfig = super.getPluginJobConf(); String value = jobConfig.getString(USER_KEY, "default_value"); String ds = jobConfig.getString(DS_KEY, "default_ds"); logger.info("user_param:{} ds:{}", value, ds); } @Override public void prepare() { super.prepare(); logger.info("job prepare"); } @Override public List<Configuration> split(int i) { logger.info("job split:{}", i); return IntStream.range(0, i).boxed().map(x -> { Configuration tmpConfiguration = jobConfig.clone(); // 写入configuration的编号 tmpConfiguration.set(TASK_INDEX, x); return tmpConfiguration; }).collect(Collectors.toList()); } @Override public void post() { super.post(); logger.info("job post"); } @Override public void destroy() { logger.info("job destroy"); } } /** * public的静态子类Task必须继承自Writer.Task */ public static class Task extends Writer.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private int index; @Override public void init() { // 获取Job split出来的configuration this.taskConfig = super.getPluginJobConf(); // 获取Task的编号 index = taskConfig.getInt(TASK_INDEX, -1); logger.info("task init:{}", index); } @Override public void prepare() { super.prepare(); logger.info("task prepare"); } @Override public void startWrite(RecordReceiver recordReceiver) { logger.info("task start"); Record record; while ((record = recordReceiver.getFromReader()) != null) { logger.info("======: " + record.toString()); } } @Override public void post() { super.post(); logger.info("task post"); } @Override public void destroy() { logger.info("task destroy"); } } }
请参见新建离线自定义源类型,新建自定义组件。
说明通过配置自定义数据源类型,Dataphin将自动为您生成对应的数据源类型和组件。
参数示例如下。
参数
示例
基本配置
类型
其他数据库。
名称
demo_reader。
类型编码
组件的唯一标识。供后端使用,创建后不可编辑。
数据源JSON
数据源JSON示例如下:
[ { "columnName":"demo_ds", "columnType":"NORMAL", "text":{ "zh_CN":"数据源", "en_US":"数据源", "zh_TW":"数据源" }, "placeholder":{ "zh_CN":"abc", "en_US":"abc", "zh_TW":"abc" } } ]
上述JSON示例解释说明如下:
columnName:系统会生成demo_reader的数据源模板,该模板只包含一个参数demo_ds。
columnType:参数类型为NORMAL。密码类的,需使用ENCRYPT。
placeholder:用户输入框中的默认值在三种语言情况下均为abc。不填写,则用户输入框为空。
text:参数名称在三种语言情况下均显示为数据源。
资源配置
读写插件
勾选写入插件。
ClassName:插件的类名。该示例为demo.ReaderDemo。
上传文件:上传打包完成的JAR文件。
描述信息
描述
请输入组件的简单描述信息。不可超过128个字符。
单击创建。
当自定义组件创建成功后,数据源管理会自动生成一个DEMO_READER的数据源类型;且在离线管道的组件库中也将生成一个DEMO_READER输出组件。
- 本页导读 (0)