jumpserver中资产的ssh私钥和密码的解密
2021-9-24 00:51:35 Author: guage.cool(查看原文) 阅读量:144 收藏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import json
import base64
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad
from Cryptodome.Random import get_random_bytes
from gmssl.sm4 import CryptSM4, SM4_ENCRYPT, SM4_DECRYPT


def signer_decode(val: str):
data = val.split(".")[1]
data = data + "=" * (-len(data) % 4)
return json.loads(base64.b64decode(data))


def process_key(key):
"""
返回32 bytes 的key
"""
if not isinstance(key, bytes):
key = bytes(key, encoding='utf-8')

if len(key) >= 32:
return key[:32]

return pad(key, 32)


class BaseCrypto:

def encrypt(self, text):
return base64.urlsafe_b64encode(
self._encrypt(bytes(text, encoding='utf8'))).decode('utf8')

def _encrypt(self, data: bytes) -> bytes:
raise NotImplementedError

def decrypt(self, text):
return self._decrypt(
base64.urlsafe_b64decode(bytes(text,
encoding='utf8'))).decode('utf8')

def _decrypt(self, data: bytes) -> bytes:
raise NotImplementedError


class GMSM4EcbCrypto(BaseCrypto):

def __init__(self, key):
self.key = process_key(key)
self.sm4_encryptor = CryptSM4()
self.sm4_encryptor.set_key(self.key, SM4_ENCRYPT)

self.sm4_decryptor = CryptSM4()
self.sm4_decryptor.set_key(self.key, SM4_DECRYPT)

def _encrypt(self, data: bytes) -> bytes:
return self.sm4_encryptor.crypt_ecb(data)

def _decrypt(self, data: bytes) -> bytes:
return self.sm4_decryptor.crypt_ecb(data)


class AESCrypto:
"""
AES
除了MODE_SIV模式key长度为:32, 48, or 64,
其余key长度为16, 24 or 32
详细见AES内部文档
CBC模式传入iv参数
本例使用常用的ECB模式
"""

def __init__(self, key):
if len(key) > 32:
key = key[:32]
self.key = self.to_16(key)

@staticmethod
def to_16(key):
"""
转为16倍数的bytes数据
:param key:
:return:
"""
key = bytes(key, encoding="utf8")
while len(key) % 16 != 0:
key += b'\0'
return key

def aes(self):
return AES.new(self.key, AES.MODE_ECB)

def encrypt(self, text):
aes = self.aes()
return str(base64.encodebytes(aes.encrypt(self.to_16(text))),
encoding='utf8').replace('\n', '')

def decrypt(self, text):
aes = self.aes()
return str(
aes.decrypt(base64.decodebytes(bytes(
text, encoding='utf8'))).rstrip(b'\0').decode("utf8"))


class AESCryptoGCM:
"""
使用AES GCM模式
"""

def __init__(self, key):
self.key = process_key(key)

def encrypt(self, text):
"""
加密text,并将 header, nonce, tag (3*16 bytes, base64后变为 3*24 bytes)
附在密文前。解密时要用到。
"""
header = get_random_bytes(16)
cipher = AES.new(self.key, AES.MODE_GCM)
cipher.update(header)
ciphertext, tag = cipher.encrypt_and_digest(
bytes(text, encoding='utf-8'))

result = []
for byte_data in (header, cipher.nonce, tag, ciphertext):
result.append(base64.b64encode(byte_data).decode('utf-8'))

return ''.join(result)

def decrypt(self, text):
"""
提取header, nonce, tag并解密text。
"""
metadata = text[:72]
header = base64.b64decode(metadata[:24])
nonce = base64.b64decode(metadata[24:48])
tag = base64.b64decode(metadata[48:])
ciphertext = base64.b64decode(text[72:])

cipher = AES.new(self.key, AES.MODE_GCM, nonce=nonce)

cipher.update(header)
plain_text_bytes = cipher.decrypt_and_verify(ciphertext, tag)
return plain_text_bytes.decode('utf-8')


def get_aes_crypto(key, mode='GCM'):
if mode == 'ECB':
a = AESCrypto(key)
elif mode == 'GCM':
a = AESCryptoGCM(key)
return a


def get_gm_sm4_ecb_crypto(key):
return GMSM4EcbCrypto(key)


class Crypto:

def __init__(self, cryptoes):
self.cryptoes = cryptoes

@property
def encryptor(self):
return self.cryptoes[0]

def encrypt(self, text):
return self.encryptor.encrypt(text)

def decrypt(self, text):
for decryptor in self.cryptoes:
try:
origin_text = decryptor.decrypt(text)
if origin_text:

return origin_text
except (TypeError, ValueError, UnicodeDecodeError, IndexError):
continue


if __name__ == "__main__":

key = 'jM4YjFmYTE5OWRjODcyYTdkZWE2NjMxOGNhN2EyYWFmOTWE2N'

data = [
'eyJhbGciOiJIUzI1NiJ9.InBhc3N3b3JkIg.ME3PUOsChmTHlWimu8W1K6pTUgm9fKPnuzyQKglA7ww',
'Dz6x4Uf+fUT7S/djyxv82w=='
]
crypto = Crypto([
get_aes_crypto(key, mode='ECB'),
get_aes_crypto(key, mode='GCM'),
get_gm_sm4_ecb_crypto(key),
])
for d in data:
if d.count(".") == 2:
print('by signer:', signer_decode(d))
else:
print('by crypto:', crypto.decrypt(d))

文章来源: https://guage.cool/jumpserver/
如有侵权请联系:admin#unsafe.sh