此项目作者为@ywolf
项目地址为 https://github.com/opensec-cn/kunpeng
是我非常喜欢的一个项目和大佬.
项目readme
先看这个原项目README是对作者们辛勤劳动的认可.
根据官方文档的介绍,这个项目是以动态链接库的形式提供给Go以及其他语言进行调用的,因此会存在调用方和被调用方两个角色,请大家仔细区分。
目录
代码赏析
其他语言调用
调用方调用plugin包加载so文件,并获取其中的Greeter对象:
plug, _ := plugin.Open("./kunpeng_go.so")
g, _ := plug.Lookup("Greeter")
由于Plugin.Lookup()获得的是一个interface{}类型对象,需要将其进行一次类型断言才能访问到Greeter对象内的公有属性。好在Go具有非侵入式接口的语言特性,使调用方在本地定义一个属于Greeter对象公有方法子集的接口就能断言成功:
type Greeter interface {
Check(string) ([]map[string]string)
GetPlugins() []map[string]string
...
}
// omit
kunpeng, ok := g.(Greeter)
// kunpeng.GetPlugins()
调用方定义一个Task结构体,实例化后转成JSON字符串传给Greeter.Check(),即可等待检测结果:
type Meta struct {
System string `json:"system"`
PathList []string `json:"pathlist"`
FileList []string `json:"filelist"`
PassList []string `json:"passlist"`
}
type Task struct {
Type string `json:"type"`
Netloc string `json:"netloc"`
Target string `json:"target"`
Meta Meta `json:"meta"`
}
// omit
task, _ := json.Marshal(Task{"service", "0.0.0.0:0000", "mysql", Meta{}})
result := kunpeng.Check(string(task))
被调用方的Check()在解析JSON字符串拿到plugin.Task对象 (结构体与调用方定义的Task一致) 后,直接交给plugin.Scan()去执行实际检测逻辑,拿到的结果又转成JSON字符串返回:
func Check(task *C.char) *C.char {
var m plugin.Task
if err := json.Unmarshal([]byte(C.GoString(task)), &m); err != nil {
return C.CString("[]")
}
result := plugin.Scan(m)
if len(result) == 0{
return C.CString("[]")
}
b, err := json.Marshal(result)
if err != nil {
return C.CString("[]")
}
return C.CString(string(b))
}
从上面的代码中可以看到,为了支持跨语言调用,KunPeng使用更底层兼容性更高的CGo来处理几个入口函数中的原始数据类型。
plugin.Scan()分别遍历GoPlugins和JSONPlugins (两种类型插件的具体区别见下面插件开发和插件加载章节) ,根据Task的Target字段选择PoC子集进行检测并返回结果集合:
func Scan(task Task) (result []map[string]interface{}) {
for n, pluginList := range GoPlugins {
if strings.Contains(strings.ToLower(task.Target), strings.ToLower(n)) || task.Target == "all" {
for _, plugin := range pluginList {
plugin.Init()
if len(task.Meta.PassList) == 0 {
task.Meta.PassList = Config.PassList
}
if !plugin.Check(task.Netloc, task.Meta) {
continue
}
for _, res := range plugin.GetResult() {
result = append(result, util.Struct2Map(res))
}
}
}
}
if task.Type == "service" {
return result
}
for target, pluginList := range JSONPlugins {
if strings.Contains(strings.ToLower(task.Target), strings.ToLower(target)) || task.Target == "all" {
for _, plugin := range pluginList {
if yes, res := jsonCheck(task.Netloc, plugin); yes {
result = append(result, util.Struct2Map(res))
}
}
}
}
return result
}
KunPeng定义了一个用于描述插件信息的公有结构体Plugin:
type References struct {
URL string `json:"url"`
CVE string `json:"cve"`
}
type Plugin struct {
Name string `json:"name"`
Remarks string `json:"remarks"`
Level int `json:"level"`
Type string `json:"type"`
Author string `json:"author"`
References References `json:"references"`
Request string
Response string
}
以及公有接口GoPlugin:
type GoPlugin interface {
Init() Plugin
Check(string, TaskMeta) bool
GetResult() []Plugin
}
由于KunPeng未定义插件的相关基类及缺省字段和方法,所以我们需要在plugin/go/目录下创建一个新的.go文件,在其中自定义一个包含info和result字段的结构体来表示新的插件:
type pluginXXX struct {
info plugin.Plugin
result []plugin.Plugin
}
随后,为该结构体实现GoPlugin接口中所有的方法:
func (p *pluginXXX) Init() plugin.Plugin {
p.info = plugin.Plugin{}
}
func (p *pluginXXX) Check(netloc string, meta TaskMeta) bool {
// 自定义检测过程逻辑,成功返回true,失败返回false
return false
}
func (p *pluginXXX) GetResult() []plugin.Plugin {
return p.result
}
并在文件的init()方法中调用plugin.Regist()注册该插件即可:
func init() {
plugin.Regist("xxx", new(plugin))
}
KunPeng同样为我们准备好了用于描述JSON插件信息的公有结构体JSONPlugin,并对它实现了统一的检测方法jsonCheck():
type JSONPlugin struct {
Target string `json:"target"`
Meta Plugin `json:"meta"`
Request struct {
Path string `json:"path"`
PostData string `json:"postdata"`
} `json:"request"`
Verify struct {
Type string `json:"type"`
Match string `json:"match"`
} `json:"verify"`
}
func jsonCheck(URL string, p JSONPlugin) (bool, Plugin) {
// 常规的HTTP发包和结果比较,略
return false, result
}
我们可以在plugin/json/目录下创建一个新的.json文件,写入我们需要的信息 (具体内容参考官方文档) 即可:
{
"target": "xxx",
"meta": {
"name": "xxx",
"remarks": "xxx",
"level": 0,
"type": "RCE",
"author": "gyyyy",
"references": {
"url": "https://github.com/gyyyy/",
"cve": ""
}
},
"request":{
"path": "/index.html",
"postData": ""
},
"verify":{
"type": "string",
"match": "gyyyy"
}
}
前面说了,Go类型插件需要在init()时进行注册,Regist()会将插件对象以target为键放入GoPlugins集合,并初始化插件信息:
func Regist(target string, plugin GoPlugin) {
GoPlugins[target] = append(GoPlugins[target], plugin)
var pluginInfo = plugin.Init()
}
由于入口文件中匿名导入了plugin/go包,所以在程序启动时,所有编写好的Go类型插件就都会init()到GoPlugins中完成加载。
相比之下,JSON类型插件的加载过程就要繁琐一些。
入口文件匿名导入plugin/json包后,会调用plugin/json/init.go文件中的init()方法进行加载:
func init() {
loadJSONPlugin(false, "/plugin/json/")
go loadExtraJSONPlugin()
}
loadJSONPlugin()遍历目录中的所有.json文件,交给readPlugin()进行处理。readPlugin()读取文件后将JSON字符串解析为JSONPlugin对象返回,所有非重复插件对象将以target为键全部放入JSONPlugins集合中。
而新开的Goroutine调用loadExtraJSONPlugin(),每隔20秒对配置的extra_plugin_path目录执行loadJSONPlugin()操作进行加载。 为了节省篇幅这里就不细述了。
这里有个很典型的例子就是go语言做动态链接库so dll或者其他形式被调用,比如我是一个python开发者,不会使用go语言进行开发编程,那就直接下载对应的so或者dll ,python调用即可.
官方原版call-so.py
# coding:utf-8
import sys
import time
import json
from ctypes import *
def _args_encode(args_string):
'''Encode by utf-8 in PY3.'''
if sys.version_info >= (3, 0):
args_string = args_string.encode('utf-8')
return args_string
# 加载动态连接库
kunpeng = cdll.LoadLibrary('./kunpeng_c.so')
# 定义出入参变量类型
kunpeng.GetPlugins.restype = c_char_p
kunpeng.Check.argtypes = [c_char_p]
kunpeng.Check.restype = c_char_p
kunpeng.SetConfig.argtypes = [c_char_p]
kunpeng.GetVersion.restype = c_char_p
print(kunpeng.GetVersion())
# 获取插件信息
out = kunpeng.GetPlugins()
# 修改配置
config = {
'timeout': 10,
# 'aider': 'http://xxxx:8080',
# 'http_proxy': 'http://xxxxx:1080',
# 'pass_list':['xtest']
# 'extra_plugin_path': '/home/test/plugin/',
}
conf_args = json.dumps(config)
kunpeng.SetConfig(_args_encode(conf_args))
# 开启日志打印
kunpeng.ShowLog()
# 扫描目标
task = {
'type': 'web',
'netloc': 'http://www.google.cn',
'target': 'web',
'meta':{
'system': '',
'pathlist':[],
'filelist':[],
'passlist':[]
}
}
task2 = {
'type': 'service',
'netloc': '192.168.0.105:3306',
'target': 'mysql',
'meta':{
'system': '',
'pathlist':[],
'filelist':[],
'passlist':[]
}
}
task = _args_encode(json.dumps(task))
task2 = _args_encode(json.dumps(task2))
out = kunpeng.Check(task)
print(json.loads(out))
out = kunpeng.Check(task2)
print(json.loads(out))
可以看到在扫描目标添加task这里,有点麻烦,所以我改进一下,变成多线程和url.txt的模式.
改版后调用文件
# coding:utf-8
# 3.7 test pass by finger
import sys
import time
import json
from ctypes import *
import threading
def _args_encode(args_string):
'''Encode by utf-8 in PY3.'''
if sys.version_info >= (3, 0):
args_string = args_string.encode('utf-8')
return args_string
# 加载动态连接库
kunpeng = cdll.LoadLibrary('./kunpeng_c.so')
# 定义出入参变量类型
kunpeng.GetPlugins.restype = c_char_p
kunpeng.Check.argtypes = [c_char_p]
kunpeng.Check.restype = c_char_p
kunpeng.SetConfig.argtypes = [c_char_p]
kunpeng.GetVersion.restype = c_char_p
print(kunpeng.GetVersion())
# 获取插件信息
out = kunpeng.GetPlugins()
# print(out)
# 修改配置
config = {
'timeout': 10,
# 'aider': 'http://xxxx:8080', # 漏洞辅助验证接口,部分漏洞无法通过回显判断是否存在漏洞,可通过辅助验证接口进行判断。python -c'import socket,base64;exec(base64.b64decode("aGlzdG9yeSA9IFtdCndlYiA9IHNvY2tldC5zb2NrZXQoc29ja2V0LkFGX0lORVQsc29ja2V0LlNPQ0tfU1RSRUFNKQp3ZWIuYmluZCgoJzAuMC4wLjAnLDgwODgpKQp3ZWIubGlzdGVuKDEwKQp3aGlsZSBUcnVlOgogICAgdHJ5OgogICAgICAgIGNvbm4sYWRkciA9IHdlYi5hY2NlcHQoKQogICAgICAgIGRhdGEgPSBjb25uLnJlY3YoNDA5NikKICAgICAgICByZXFfbGluZSA9IGRhdGEuc3BsaXQoIlxyXG4iKVswXQogICAgICAgIGFjdGlvbiA9IHJlcV9saW5lLnNwbGl0KClbMV0uc3BsaXQoJy8nKVsxXQogICAgICAgIHJhbmtfc3RyID0gcmVxX2xpbmUuc3BsaXQoKVsxXS5zcGxpdCgnLycpWzJdCiAgICAgICAgaHRtbCA9ICJORVcwMCIKICAgICAgICBpZiBhY3Rpb24gPT0gImFkZCI6CiAgICAgICAgICAgIGhpc3RvcnkuYXBwZW5kKHJhbmtfc3RyKQogICAgICAgICAgICBwcmludCAiYWRkIityYW5rX3N0cgogICAgICAgIGVsaWYgYWN0aW9uID09ICJjaGVjayI6CiAgICAgICAgICAgIHByaW50ICJjaGVjayIrcmFua19zdHIKICAgICAgICAgICAgaWYgcmFua19zdHIgaW4gaGlzdG9yeToKICAgICAgICAgICAgICAgIGh0bWw9IlZVTDAwIgogICAgICAgICAgICAgICAgaGlzdG9yeS5yZW1vdmUocmFua19zdHIpCiAgICAgICAgcmF3ID0gIkhUVFAvMS4wIDIwMCBPS1xyXG5Db250ZW50LVR5cGU6IGFwcGxpY2F0aW9uL2pzb247IGNoYXJzZXQ9dXRmLThcclxuQ29udGVudC1MZW5ndGg6ICVkXHJcbkNvbm5lY3Rpb246IGNsb3NlXHJcblxyXG4lcyIgJShsZW4oaHRtbCksaHRtbCkKICAgICAgICBjb25uLnNlbmQocmF3KQogICAgICAgIGNvbm4uY2xvc2UoKQogICAgZXhjZXB0OnBhc3M="))' 在辅助验证机器上运行以上代码,填入http://IP:8088,不开启则留空。
# 'http_proxy': 'http://xxxxx:1080',
# 'pass_list':['xtest'] // 默认密码字典,不定义则使用硬编码在代码里的小字典
# 'extra_plugin_path': '/home/test/plugin/', //除已编译好的插件(Go、JSON)外,可指定额外插件目录(仅支持JSON插件),指定后程序会周期读取加载插件
}
conf_args = json.dumps(config)
kunpeng.SetConfig(_args_encode(conf_args))
# 开启日志打印
# kunpeng.ShowLog()
def callback():
a = task
# print(a)
out = kunpeng.Check(task)
print(a, json.loads(out))
# print(a)
# 扫描目标
# with open("ips.txt") as fp:
with open("url.txt") as fp:
for item in fp.readlines():
# print(item)
task = {
'type': 'web', # 目标类型web或者service
# 'type': 'service',
'netloc': item.strip(),
'target': 'all'
# "meta":{
# "system": "windows", //操作系统,部分漏洞检测方法不同系统存在差异,提供给插件进行判断
# "pathlist":[], //目录路径URL列表,部分插件需要此类信息,例如列目录漏洞插件
# "filelist":[], //文件路径URL列表,部分插件需要此类信息,例如struts2漏洞相关插件
# "passlist":[] //自定义密码字典
# } // 非必填
}
# print(task)
task = _args_encode(json.dumps(task))
t1 = threading.Thread(target=callback)
t1.start()
明显看出import threading
和后面的with open
.
(未完待续)