Skip to content

xiaoji-duan/duan-jobflows

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

30 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

作业流调度

vert.x 3.6.0 purple

通过Active MQ消息队列,分发汇总处理流程

定义处理流程

  • 流程定义

  • 子任务定义

  • 子任务完成后处理定义

  • 循环子任务定义

  • 循环子任务完成后处理定义

流程定义

{
  "name": "冥王星 语音解析通知 处理流程",     // 流程名称
  "trigger": "mwxing_voice_analyze_jobflow",  // 流程触发ID [即Active MQ队列名称]
  "parameters": [
      // 流程输入参数名称
      "param1",
      "param2",
      "..."
  ],
  "follows": [
    {
      // 子任务节点处理定义
    }
  ]
}

子任务定义

{
...
  "follows": [
    {
    "name": "讯飞云AIUI语义分析",  // 子任务名称
    "trigger": "xfy",         // 子任务节点处理触发ID [即Active MQ队列名称]
    "parameters": [
        // 子任务节点处理输入参数名称和参数值
        "userId;$.parent.parameters.userId",
        "dataType;$.parent.parameters.contentType",
        "content;$.parent.parameters.content",
        "context;$.parent.parameters.context"
    ],
    "outputs": [
        // 节点处理输出参数名称
        "xunfeiyun",
        "userId"
      ],
    "next": [
        {
          // 子任务节点处理定义 (可选)
        }
    ]
    }
  ]
...
}
{
...
    "next": [
        {
          // 子任务节点处理定义
          "name": "数据清洗与转换 迅飞返回结果格式整理",     // 子任务名称
          "trigger": "acj",                     // 子任务节点处理触发ID [即Active MQ队列名称]
          "parameters": [
              // 子任务节点处理输入参数名称和参数值
              "datasource;$.parent.outputs.xunfeiyun"
          ],
          "outputs": [
              // 节点处理输出参数名称
              "cleaned"
          ]
        }
    ]
...
}

循环子任务定义

{
...
    "next": [
        {
          // 循环子任务节点处理定义
          "type": "loop",         // 子任务类型  loop: 循环
          "variable": "image;$.parent.outputs.partitionimages[*]",    // 循环变量
          "start": "0",           // 循环参数 开始
          "end": "size",          // 循环参数 结束
          "task": {
              // 循环子任务 处理节点定义
              "name": "OpenCV Canny",
              "trigger": "abt",
              "parameters": [
                  "image;$.parent.outputs.partitionimages[##image_value##].image",
                  "extension;$.parent.outputs.extension"
              ],
              "outputs": [
                  "edges",
                  "extension"
              ],
              "next": [
                  {
                    // 子任务节点处理定义 (可选)
                  }
              ]
          }
        }
    ]
...
}

子任务完成后处理定义

{
...
{
    "name": "...",
    "trigger": "...",
    "parameters": [
        // ...
    ],
    "outputs": [
        // ...
    ],
    "next": [
        {
          "name": "数据清洗与转换 迅飞返回结果格式整理",
          "trigger": "acj",
          "parameters": [
              "datasource;$.parent.outputs.xunfeiyun"
          ],
          "outputs": [
              "cleaned"
          ]
        },
        {
          "name": "取得文本中的中文人名",
          "trigger": "nlp",
          "parameters": [
              "function;NlpAnalysis",
              "text;$.root.parameters.content"
          ],
          "outputs": [
              "function",
              "text",
              "parsed"
          ]
        }
    ],
    "complete": {
      "next": {
        "all": {
          "name": "数据清洗与转换 整合迅飞语音和人名识别的结果",
          "trigger": "acj",
          "parameters": ["datasource;$.complete.outputs"],
          "outputs": ["cleaned"],
          "next": [
              {
                  "name": "中文拼音转换参与人姓名",
                  "trigger": "pin",
                  "parameters": ["type;IN_DATA_COVERAGE",
                  "data;$.parent.outputs.cleaned",
                  "text-paths;json-path.announceContent.content..parameters.fs[*].n"],
                  "outputs": ["pinyin"],
                  "next": [
                      {
                          // ...
                      }
                  ]
              }
          ]
        }
      }
    }
  }
...
}

循环子任务完成后处理定义

{
...
   {
     "type": "loop",
     "variable": "agenda;$.parent.outputs.cleaned.agendas[*]",
     "start": "0",
     "end": "size",
     "task": {
       "name": "数据清洗与转换 每个子任务计算重复日程到日历",
       "trigger": "acj",
       "parameters": [
           "datasource;$.parent.outputs.cleaned.agendas[##agenda_value##]"
       ],
       "outputs": [
           "cleaned"
       ]
     },
     "complete": {
       "all": {
         "name": "数据清洗与转换 所有计算完成合并结果",
         "trigger": "acj",
         "parameters": [
             "datasource;$.loop.outputs"
         ],
         "outputs": [
             "cleaned"
         ],
         "next": [
             {
                // ...
             }
          ]
       }
     }
   }
...
}

开发节点处理

About

短应用™ 作业流调度

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages