Skip to content

rabbit-php/rabbit-data-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

STS流处理

  • 基于插件的流程处理组件

组件配置

return [
        'scheduler' => [
        '{}' => Scheduler::class,
        '()' => [
            [
                '{}' => YamlParser::class,
                '()' => [
                    App::getAlias('@yaml')
                ]
            ]
        ],
        'senders' => arrdef([
            'worker' => definition(WorkerSender::class),
            'process' => definition(ProcessSender::class)
        ]),
        'cron' => definition('cron')
    ]
];

任务配置

lock: task lock time,default null
cron: crontab or int,default null.int is later sec, -1 is only run once

通用配置

type: Plugin Class String
start: true|false
output: Plugin Name
errHandler: callable
  • type插件完整类名
  • start是否开始插件,默认false
  • output输出到下一个插件,支持Http输出到远程插件,worker多进程任务,格式
    1. PluginName String|(http|worker):address:PluginName String,默认不等待
    2. ['PluginName String|(http|worker):address:PluginName String':true|false|int]
    • PS:true启用协程等待,不设超时;int启用协程等待,值为超时时间;false不等待
  • errHandler错误处理函数

内置插件

Sources


Common


TransForms


Sinks

插件开发

  • 继承AbstractPlugin
  • 实现run函数
  • 通过init$this->config中获取配置参数初始化配置属性
  • 插件间传值通过Message类,开启协程传递需要clone Message,输出值赋值到克隆的Message->data属性
  • PS:配置属性禁止在运行过程中修改,插件为单例模式,协程环境下会造成属性污染。可使用局部变量替代,例如$name=$this->name,使用$name