菜单
一、路由条件设置
1.1根据上一节点的流入数据量
//使用上一节点的流入的数据量作为断言
function assert(engine,nodeDoc,nodeId,indoc){
if(engine.getDataTotalCount(indoc)>0){
return 1; //有数据时条件成立
}else{
return 0;//没有数据时禁止路由
}
}
1.2根据上一节点的数据来判断
如根据data里的某个值来判断。其中PrintUtil.o(“在控制台日志输出”);
//使用变量作为判断标记
function assert(engine,nodeDoc,nodeId,indoc){
var IndocData=indoc.get("data"); //获取变量
//PrintUtil.o(flag[0].scfkdlx);
var BillType = IndocData[0].scfkdlx
if(BillType===0){
return 1; //返回1表示成立
}else{
return 0;//返回0表示禁止
}
}
1.3如果上一节点有数据流入则执行
二、高级映射
2.1 将SQL结果转换成Json
{
"json": {
"head": {
"top_billid":"$data[0].top_billid",
"pk_org_v":"$data[0].pk_org_v",
"pk_tradetypeid":"$data[0].pk_tradetypeid",
"pk_deptid_v":"$data[0].pk_deptid_v.replaceAll(' ', '')",
"pk_psndoc":"$data[0].pk_psndoc.replaceAll(' ', '')",
"supplier":"$data[0].supplier.replaceAll(' ', '')",
"objtype":"$data[0].objtype",
"pk_balatype":"$data[0].pk_balatype",
"payaccount":"$data[0].payaccount",
"cashaccount":"$data[0].cashaccount.replaceAll(' ', '')",
"vdef1":"$data[0].vdef1.replaceAll(' ', '')",
"vdef2":"$data[0].vdef2"
},
"bodys": [
#foreach($item in $data)
{
"scomment":"$item.scomment",
"checktype":"$item.checktype",
"taxrate":"$item.taxrate",
"money_de":"$item.money_de",
"checkno":"$item.checkno",
"pk_subjcode":"$item.pk_subjcode"
}#if($foreach.hasNext),#end
#end
]
},
"method": "ArapPaybillInsert"
}
2.2、进阶映射去重
在实际需求中,例如 select * from talbeA a left join tableB b on a.id=b.parentId left join tableC c on a.id=c.parentId。这时tableA是公共的,但因为tabelB和tableC会出现行数不对等,转换json时,就需要进行去重操作。可参考下图例子
{
"json": {
"head": {
"code":"$data[0].code",
"pk_org_v":"$data[0].pk_org_v",
"subscribedate":"$data[0].subscribedate",
"valdate":"$data[0].valdate",
"invallidate": "$data[0].invallidate",
"cvendorvid":"$data[0].cvendorvid",
"vdef9": "$data[0].vdef9",
"pk_payterm": "$data[0].pk_payterm",
"vdef5":"$data[0].vdef5",
"vdef1":"$data[0].vdef1",
"vdef2":"$data[0].vdef2",
"vdef3":"$data[0].vdef3",
"vdef7":"$data[0].vdef7",
"vdef8":"$data[0].vdef8",
"ct_pu_b":[
#set($ct_pu_bList = [])
#foreach($item in $data)
#if(!$ct_pu_bList.contains($item.sc2Id))
#set($TempStatus1 = $ct_pu_bList.add($item.sc2Id))
{
"pk_material":"$item.pk_material",
"nastnum":"$item.nastnum.replaceAll(' ', '')",
"ntaxrate":"$item.ntaxrate",
"norigtaxprice":"$item.norigtaxprice.replaceAll(' ', '')",
"vmemo":"$item.vmemo.replaceAll(' ', '')",
"vbdef25":"$item.vbdef25.replaceAll(' ', '')",
"vbdef26":"$item.vbdef26.replaceAll(' ', '')",
}
#end
#end
],
"ct_pu_term":[
#set($ct_pu_termList = [])
#foreach($item in $data)
#if(!$ct_pu_termList.contains($item.sc3Id))
#set($TempStatus2 = $ct_pu_termList.add($item.sc3Id))
#if(!$item.sc3Id)#break#end
{
"vtermcode":"$item.vtermcode",
"vtermname":"$item.vtermname",
"vtermtypename":"$item.vtermtypename",
"vmemo":"$item.vtermvmemo.replaceAll(' ', '')",
"vtermcontent":"$item.vtermcontent.replaceAll(' ', '')",
"votherinfo":"$item.votherinfo.replaceAll(' ', '')"
}#if($foreach.hasNext),#end
#end
#end
],
"ct_pu_cha":[
{
"vchgreason":"$data[0].vchgreason.replaceAll(' ', '')",
"vmemo":"$data[0].vmemo.replaceAll(' ', '')",
"vchgcontent":"$data[0].vchgcontent.replaceAll(' ', '')",
}
]
}
}
}
三、JAVA脚本
例如实现写入OA
package cn.restcloud.etl.rule.ext;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import java.sql.Connection;
import cn.restcloud.framework.core.context.*;
import cn.restcloud.etl.base.IETLBaseEvent;
import cn.restcloud.etl.base.IETLBaseProcessEngine;
import cn.restcloud.framework.core.util.*;
import cn.restcloud.framework.core.util.db.rdb.*;
import cn.restcloud.framework.core.util.ConfigUtil;
import cn.restcloud.etl.log.service.ETLProcessDebugLogUtil;
import java.util.*;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.asymmetric.KeyType;
import cn.hutool.crypto.asymmetric.RSA;
import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONUtil;
/**
indoc为流数据
执行成功必须返回字符1,返回0表示终止流程
*/
public class ETL_T00001_2HF710VN0CC implements IETLBaseEvent {
private static final String APPID = ConfigUtil.getConfig("OASyncNCC.APPID");
private static final String OAAddress = ConfigUtil.getConfig("OASyncNCC.OAAddress");
private static final String NCC_SECRET = ConfigUtil.getConfig("OASyncNCC.SECRIT");
private static final String NCC_PUBLICKEY = ConfigUtil.getConfig("OASyncNCC.NCC_PUBLICKEY");
public static Map<String,Object> Getoken(String address){
// 从系统缓存或者数据库中获取ECOLOGY系统公钥和Secret信息
String secret = NCC_SECRET;
String spk = NCC_PUBLICKEY;
// 公钥加密,所以RSA对象私钥为null
RSA rsa = new RSA(null,spk);
//对秘钥进行加密传输,防止篡改数据
String encryptSecret = rsa.encryptBase64(secret,CharsetUtil.CHARSET_UTF_8,KeyType.PublicKey);
//调用ECOLOGY系统接口进行注册
String data = HttpRequest.post(address+ "/api/ec/dev/auth/applytoken")
.header("appid",APPID)
.header("secret",encryptSecret)
.header("time","3600")
.execute().body();
//System.out.println("Getoken():"+data);
Map<String,Object> datas = JSONUtil.parseObj(data);
return datas;
}
public static String testRestful(String address,String api,String jsonParams){
//ECOLOGY返回的token
String token= (String)Getoken(address).get("token");
String spk = NCC_PUBLICKEY;
//封装请求头参数
RSA rsa = new RSA(null,spk);
//对用户信息进行加密传输,暂仅支持传输OA用户ID
String encryptUserid = rsa.encryptBase64("1",CharsetUtil.CHARSET_UTF_8,KeyType.PublicKey);
//调用ECOLOGY系统接口
String data = HttpRequest.get(address + api)
.header("appid",APPID)
.header("token",token)
.header("userid",encryptUserid)
.body(jsonParams)
.execute().body();
System.out.println("testRestful():"+data);
return data;
}
@Override
public String execute(IETLBaseProcessEngine engine, Document modelNodeDoc, Document indoc,String fieldId,String params) throws Exception {
List<Document> docs=engine.getData(indoc);
for(Document doc:docs){
//日志控制台输出
PrintUtil.o(doc);
}
String Token = (String) Getoken(OAAddress).get("token");
//设置为局部变量
engine.put("accessToken",Token);
//进程日志输出
ETLProcessDebugLogUtil.log(engine, "调试", "调用成功,i的值为:"+Token);
String callback = testRestful(OAAddress,"/api/system/appmanage/route",null).toString();
ETLProcessDebugLogUtil.log(engine, "调试", "调试OA返回系统信息,:"+callback);
return "1";
}
}
四、根据SQL结果对接API
4.1、执行SQL脚本
4.2高级映射
按需进行去重,可参考本文章的“2.2、进阶映射去重”
4.3变量设置
4.4调用Rest API输入控件
上图:将Api返回结果作为变量输出
五、执行shell脚本实现文件同步
首先参考文章:
本教程针对docker部署的restcloud。
方式一:把文件夹挂载到docker里面,然后直接用shell脚本
方式二:通过ssh来执行宿主机的脚本。话不多说,干!首先说明一下。这里的宿主机的IP是192.168.1.219
5.1建立docker容器与宿主机的互信
shell>docker ps
shell>docker exec -it #container bash
###容器内创建证书
shell>ssh-keygen
###更新该证书的名称
shell> mv /root/.ssh/id_rsa.pub /root/.ssh/id_rsa_docker219.pub
###scp复制证书到宿主机
shell>scp /root/.ssh/id_rsa_docker219.pub root@192.168.8.21:/root/.ssh
###切换到宿主机
[root@rancher.ssh]$ cat id_rsa_docker219.pub >> authorized_keys
[root@rancher .ssh]$ chmod 600 authorized_keys
[root@rancher.ssh]$ chmod 700 ~/.ssh
[root@rancher .ssh]$ service sshd restar
###切换回docker容器中
shell>docker exec -it #container bash
###尝试ssh是否免密登录
shell>ssh root@192.168.1.219
5.2docker容器内创建脚本
###进入容器
shell>docker exec -it #container bash
shell>mkdir -p /data/app/bash
shell>vim sync.sh
#!/bin/bash
# 尝试通过 SSH 连接到服务器
ssh root@192.168.1.219 "
echo \"aaa$(date '+%Y-%m-%d %H:%M:%S')\" >> /data/logs/test.txt
echo \"bbb$(date '+%Y-%m-%d %H:%M:%S')\" >> /data/logs/test.txt
"
###设置可执行权限
shell>chmod +x sync.sh
###打开宿主机兼容/data/logs/test.txt
[root@master ~]# tail -f /data/logs/test.txt
###在docker容器中执行./sync.sh
shell>./sync.sh