在数据集成中,可以通过编辑脚本的方式实现主数据的转换和服务模型的数据转换和数据分发逻辑。

数据转换即将数据库中的原始数据(增量和全量数据)通过编辑脚本代码的方式进行格式的转换。

数据分发即将数据转换的结果通过脚本的方式实现分发,分发的目的地可以是数字工厂的主数据,也可以是服务模型的API,这些都可以在脚本中自行实现,下面介绍脚本的使用方法。

脚本使用JavaScript语言进行编写,语法上支持ES5以及ES6的const和let语法。

ES5的语法及常见API可以参考地址: http://www.ecma-international.org/ecma-262/5.1/index.html

ES6语法

  • const
    • const声明一个只读的常量,一旦声明,常量的值就就不能改变;
    • const的作用域和let一样,只在声明所在的块级作用域内有效。
  • let
    • 在ES5中,只有两种作用域,一种是函数作用域,另一种是全局作用域,这个导致在写脚本的时候容易出现变量覆盖的情况;
    • 所以ES6增加了let语法,用于声明变量,用法类似于var,但是所声明的变量只在let所在的代码块内有效;
    • 建议在脚本开发的时候函数内部尽量使用let来声明变量,全局的采用var。

扩展的API

我们对标准的JavaScript API进行了相应的扩展,以提供访问数据库,调用IoT服务模型,同步数据到主数据的能力。

说明:如下API的使用都需要引用标准扩展插件包

var iotPluginApi = require('./iotPlugin.js');

数据库Select接口

API:iotSqlSelect(statement)

用途:用于在JS脚本中执行sql select语句。

参数 类型 说明
statement String 需要执行的select sql语句,底层已经适配了数据库,所以这里只需要填写对应的sql语句。

返回值

参数 类型 说明
code Integer

200代表sql语句执行成功

其他代表sql执执行失败,

在使用时需要判断该返回值

data

RowValueObject[]

其中RowValueObject的类型为:

Map<String,Object>

一个数组对象,数组成员为一个Map<String,Object>对象。

对应sql select出来的每一行内容。

Map的Key为列名,Object为该列对应的值。

使用的时候需要判断该数组的长度。

示例:

[

{"key1":1,"key2":2},{"key1":3,"key2":4}

]

使用示例

var iotPluginApi = require('./iotPlugin.js');
function jsSqlTest(){
            var sqlselect = "select MainId,cInvCode,fQuantity from PP_POMain where ID = 100";
            var sqlResult = iotPluginApi.iotSqlSelect(sqlselect);
            var invCode ="";
            if (sqlResult.code == 200 && sqlResult.data.length > 0 ) {
                var packingList = new Array();
                for (var i = 0; i < sqlResult.data.length; i++) {
                    var itemData = new Map();
                    //返回值code为200代表执行js api 调用成功
                    invCode = (sqlResult.data)[i].cInvCode;
                    itemData["rowCode"] = (sqlResult.data)[i].MainId;
                    itemData["materialCode"] = (sqlResult.data)[i].cInvCode;
                    itemData["quantity"] = (sqlResult.data)[i].fQuantity;
            }      
          }
   }

日志打印

API: log(format,..args)

用途:格式化日志打印

参数 类型 说明
format String 支持%s,%d,%c,%f等通用的格式转换符号。
args Object... 需要打印的参数,可以为多个。

返回值

参数 类型 说明

使用示例

var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
   iotPluginApi.log("toJSONString %s",utils.toJSONString(msgPayLoadList));
}

API:log(object)

用途: 日志打印

参数 类型 说明
object Object 需要打印的对象,可以为String,也可以为其他类型,如果是其他类型,则默认转成相应的String打印出来。

返回值

参数 类型 说明

使用示例

var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
    iotPluginApi.log(eventType);
  iotPluginApi.log(msgPayLoadList);
  iotPluginApi.log('this is msgPlayLoad:' +msgPayLoadList);
}

主数据同步

API: iotMasterDataUpload(param);

用途:用于在JS脚本中执行sql select语句。

参数 类型 说明
param

RowValueObject[]

其中RowValueObject的类型为:

Map<String,Object>

param为一个数组对象,

数组中的对象为Map<String,Object>类型。

String为主数据的属性标识,

Object为主数据的赋值。

返回值

参数 类型 说明
code Integer

200代表调用API成功

但不代表主数据同步成功,主数据是否成功需要判断data的值

data Boolean

true代表同步主数据成功,

false代表同步主数据失败

message String 失败时的log信息

使用示例

function distribute(eventType, transformMsgDataList){
    //eventType为事件类型,对于数据库事件有 insert,update,delete
    //transformMsgDataList 为上面的transform转换后的结果,注意是一个数组

    var masterDataOpenApiParam = new Map();
    masterDataOpenApiParam["eventType"]=eventType;
    masterDataOpenApiParam["msgList"]=transformMsgDataList;
    var result = iotPluginApi.iotOpenApiInvoke("/industry/octopus/adapter/sync/masterdata","1.0.0",masterDataOpenApiParam);
    return result;
}

服务模型调用

API:iotServiceModelInvoke(path,version,param);

用途:用于在JS脚本中执行sql select语句。

参数 类型 说明
path String

服务模型API的path路径

例如:

"/ManufacturingExecutionService/getWorkplan"

version String

服务模型API的版本号

例如“1.0.0”

param Map<String,Object>

服务模型API的需要同步的数据

Map的key为参数名

Map的value为参数的值

返回值

参数 类型 说明
code Integer 200代表调用服务模型API调用成功,其他值为调用失败
message String 失败时的log信息

使用示例

function distribute(eventType, transformMsgDataList){
    //eventType为事件类型,对于数据库事件有 insert,update,delete
    //transformMsgDataList 为上面的transform转换后的结果,注意是一个数组
    for(j = 0; j < transformMsgDataList.length; j++) {
        var transformMsgData = transformMsgDataList[j];
        iotPluginApi.log('transform :'+transformMsgData);
        var result = iotPluginApi.iotServiceModelInvoke("/ManufacturingExecutionService/getWorkplan","1.0",transformMsgData);
        iotPluginApi.log("result="+result);
        if (result.code == 200) {
            print('js 服务模型 接口调用成功');
        } else {
            print('js 服务模型 接口调用失败');
        }
    }
    return result;
}

工具函数

API: sleep(ms);

用途: 用于在JS脚本中休眠sleep

参数 类型 说明
ms Integer 需要休眠的毫秒数字

返回值

参数 类型 说明

使用示例

//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');

function transform(eventType,msgPayLoadList){
    utils.sleep(1000);
    iotPluginApi.log("sleep 1 seconds");
}

API: toJSONString(param);

用途: 用于在JS中将一个JS对象转成JSON字符串

参数 类型 说明
param Object 需要转换的JS对象,比如为Map,List可以JSON序列化的对象

返回值

参数 类型 说明
String 返回一个JSON字符串

//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');

function transform(eventType,msgPayLoadList){
     iotPluginApi.log("toJSONString %s",utils.toJSONString(msgPayLoadList));
}

常见问题

  1. JS的变量提升问题

说明:在脚本中经常需要在函数中实现较多的转换逻辑,而这里需要注意的是JavaScript(ES5)中的var变量的作用域跟Java和C的作用域不同,否则会出现函数中的变量被覆盖的情况,导致各种问题,例如死循环。

在大多数编程语言中,会用花括号{}来形成一个作用域,俗称“块作用域”,例如C语言、C++等。但是在JS中{}并不能产生块作用域,JS中的作用域是依靠函数形成的。

在ECMAScript5中,JS只有两类作用域:全局作用域、函数作用域。

  • 全局作用域:全局对象的作用域,在代码的任何地方都可访问,但有时会被函数作用域覆盖
  • 函数作用域:作用于整个函数范围内,不管到底是在函数中的何处进行声明

在最新的数字工厂脚本引擎版本中,已经支持了ES6的let和const语法,建议在编写脚本的时候使用let和const以避免该问题的产生。

调试方法

查看脚本函数执行的日志

在数字工厂中,支持在云端调试脚本运行,支持通过mock参数调试和打印日志。

用于在实现一个正式脚本发布前的调试和确认。

步骤:

选择某个元数据(对于调试服务模型,可以任意新建一个自定义的主数据进行调试),点击编辑脚本:

说明:在实现一个脚本的时候,可以点击“参考模版”,选择一个模版的脚本文件进行参考和修改。

在脚本中有三个函数,一个是config函数,一个是transfrom函数,一个是distribute函数。

在使用脚本的时候这三个函数都必须实现。

mock参数说明:

mock参数是在点击调试的时候,transform函数、distribute函数的入参:msgPayLoadList 和 transformMsgDataList 的值,其类型为一个数组,数组的类型为一个Map<String,Object>类型

transform函数调试:transform函数用于将边缘应用的全量/增量数据进行数据格式的转换,例如将数据库的表的数据进行转换成主数据的格式,或者服务模型的格式。最后调用distribute函数。

distribute函数调试:入参为transform函数的返回值。用于将transform函数的转换结果进行分发,例如分发到主数据,分发到服务模型等。

查看JS的运行日志

当调试完一个脚本后,需要将该脚本正式下发到边缘应用,以正式数据集成。

数据集成时可能会产生全量数据,增量数据,进而出发脚本运行,如果需要查看运行中的JS日志,可以通过如下方法查看:

进入边缘集群管理,数据集成,单击查看日志。

输入阿里云账号和密码

打开SLS日志窗口后,输入关键字“JsScriptLog”进行搜索查看。

脚本函数说明

config()函数

必须实现的函数

说明:config函数配置了该脚本运行的配置,这里必须实现的是table的赋值,用于配置目标数据源监控的表。

function config(){
  var config= new Map();
  // 监控的数据库表的名称
  config["table"] = "数据库的表";
  //上传时最大分块
  config["uploadBlockSize"] = 256*1024;
  //全量同步时一次最多可以传多少条数据
  config["snapshotBatchSize"] = 100;
   // 失败最多重试次数
  config["maxRetryTimes"]=3;
  //失败最小重试时间间隔-毫秒
  config["minRetryDuration"]=10000;
  return config;
}

在config函数中新增了几个配置项,下面解释每一个配置项的含义:

配置项参数 默认值 含义
table 无默认值,必须填写 监控的数据库的表名
uploadBlockSize 默认为256*1024

distribute分发数据到云端时,一个接口一次最大发送的消息容量大小。

如果客户使用服务模型对接时,可以根据实际情况自定义。

snapshotBatchSize 默认为30条 全量同步时一次最多同步多少条数据
maxRetryTimes 默认为3次 发送数据失败时,最多重试几次
minRetryDuration 默认为10000ms 发送数据失败时,超时重试的时间间隔
上述参数都有默认值,除了非必填的,其他参数用于提高性能和稳定性时配置,由ISV/SI根据实际情况进行配置调优。

transform()函数

说明: transform函数是数据源数据同步过程中调用的第一个函数。

参数说明

其中eventType为事件类型,类型为String,取值有 insert、update、delete用于对应数据库事件的插入、更新、删除。

msgPayLoadList 为需要转换的目标数据,注意其格式是一个数组,对应Java的格式为 List<Map<String, Object>>,当数据源(数据库目标表即config中配置的table)发生数据变化时,会将该表的每一列的数据以Map格式传入到该函数,其中Map的key为String类型,对应数据库表的列名,Map的value为一个Object类型,对应数据库表的列的值。

返回值:

返回值必须也为 List<Map<String, Object>>类型,该结果会自动传入到下面将要介绍的distribute函数中。

/**
 * 客户实现: transform()转换函数
 * 这是脚本转换器的第一个转换函数,负责将 数据来源(如数据库变换的数据) 转换成目标数据格式,如主数据,服务模型数据,物联网数据等
 * @param eventType为事件类型,对于数据库事件有 insert,update,delete
 * @param msgPayLoadList 为需要转换的目标数据,注意其格式是一个数组,对应Java的格式为 List<Map<String, Object>>
 * @return 返回转换后的数据格式,必须也为一个数组,即一组数据输入,转换格式后还是为一组数据
 */
function transform(eventType,msgPayLoadList){
  //实现自定义的转换逻辑
}

distribute()函数

说明: distribute函数会在transform函数调用后,将transform函数的返回值作为输入参数传入并调用

参数说明:其中eventType为事件类型,类型为String,取值有 insert、update、delete用于对应数据库事件的插入、更新、删除。

transformMsgDataList 为需要分发的目标数据,为transform函数的返回值,注意其格式是一个数组,对应Java的格式为 List<Map<String, Object>>,

返回值:无(自定义,根据用户需要决定是否需要返回值)

/**
 * 客户实现: distribute()数据分发函数
 * 这是脚本转换器的第二个数据分发函数,负责将 前一步transform转换后的数据分发到其他地方,例如数字工厂的主数据、服务模型、三方应用等
 * @param eventType为事件类型,对于数据库事件有 insert,update,delete
 * @param transformMsgDataList 为上一步transfrom函数返回的数据,注意其格式是一个数组,对应Java的格式为 List<Map<String, Object>>
 * @return Void或者
 */
function distribute(eventType, transformMsgDataList){
 //实现自定义的数据分发逻辑,例如调用主数据同步API,服务模型同步API
}

使用步骤

下面介绍如何使用脚本的几个步骤。

  1. 编辑脚本

    当需要编辑一个脚本的时候,例如使用脚本同步主数据,则进入数据集成中点击编辑脚本,选择一个参考模版进行脚本代码编写。实现脚本中的config()、transfrom()、distribute()三个函数。

  2. 调试脚本

    在完成脚本编写后,通过Mock参数的方法进行脚本调试,查看调试的返回值确认脚本是否编写正确。

  3. 下载配置

    脚本编写和调试完成后,返回数据集成页面,点击下载配置,将脚本下载到边缘应用。首次下载该脚本的时候会自动全量同步config函数中定义的数据库表的数据到云端。

  4. 查看主数据/服务模型同步结果

下载成功后,查看主数据的数据或者服务模型的数据以判断是否同步成功。

也可以通过在边缘的数据库进行数据库表的修改触发增量数据,进而脚本转换查看同步的结果。

一个完整的脚本例子

如下是一个将用友U8人员同步到数字工厂主数据“人员”的示例供参考。

//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');

var Map = Java.type('java.util.HashMap');


/*
*定义该脚本的配置
table: 监控的表名,必填,这里监控的是Person这个表
*/
function config(){
    var config= new Map();
    config["table"] = "dbo.Person";
    return config;
}

/**
 * 客户实现: transform()转换函数
 * 这是脚本转换器的第一个转换函数,负责将 数据来源(如数据库变换的数据) 转换成 目标数据格式,如主数据,服务模型数据,物联网数据等
 * 如下的代码是一个示例:仅供参考,示例代码是将用友U8/T6数据库的人员数据转换成iot工业数字工厂的人员主数据格式
 * @param eventType为事件类型,对于数据库事件有 insert,update,delete
 * @param msgPayLoadList 为需要转换的目标数据,注意其格式是一个数组,对应Java的格式为 List<Map<String, Object>>
 * @return 返回转换后的数据格式,必须也为一个数组,即一组数据输入,转换格式后还是为一组数据
 */

function transform(eventType,msgPayLoadList){
    var transformResultList = new Array();
    iotPluginApi.log("eventType="+eventType);
  iotPluginApi.log("msgPayLoadList="+msgPayLoadList);

    for(var index = 0; index < msgPayLoadList.length; index++) {

        var msgPayload = msgPayLoadList[index];
        var masterData = new Map();
        
        masterData["name"]=msgPayload["cPersonName"];
        masterData["id"]=msgPayload["cPersonCode"];
        masterData["phone"]=msgPayload["cPersonPhone"];
        masterData["postcode"]=msgPayload["cPersonEmail"];

        transformResultList.push(masterData); 
    }

    return transformResultList;
}

/**
 * 客户实现: distribute()数据分发函数
 * 这是脚本转换器的第二个数据分发函数,负责将 前一步transform转换后的数据 分发到其他地方,例如数字工厂的主数据,服务模型,三方应用等
 * 如下的代码是一个示例:仅供参考,示例代码是将数据 同步到数字工厂的主数据
 * @param eventType为事件类型,对于数据库事件有 insert,update,delete
 * @param transformMsgDataList 为上一步transfrom函数返回的数据,注意其格式是一个数组,对应Java的格式为 List<Map<String, Object>>
 * @return Void或者
 */
function distribute(eventType, transformMsgDataList){
    //eventType为事件类型,对于数据库事件有 insert,update,delete
    //transformMsgDataList 为上面的transform转换后的结果,注意是一个数组

    var masterDataOpenApiParam = new Map();
    masterDataOpenApiParam["eventType"]=eventType;
    masterDataOpenApiParam["msgList"]=transformMsgDataList;
    var result = iotPluginApi.iotOpenApiInvoke("/industry/octopus/adapter/sync/masterdata","1.0.0",masterDataOpenApiParam);
    return result;
}