Source code for hackthebox.htb

from __future__ import annotations

import atexit
import base64
import getpass
import json
import os
import time
from typing import List, Callable, Union, Optional, Tuple, cast, Any, TYPE_CHECKING

import requests

from .constants import API_BASE, USER_AGENT
from .errors import AuthenticationException, NotFoundException, \
    IncorrectOTPException, ApiError

if TYPE_CHECKING:
    from .user import User
    from .search import Search
    from .machine import Machine, MachineInstance
    from .challenge import Challenge
    from .endgame import Endgame
    from .fortress import Fortress
    from .team import Team
    from .leaderboard import Leaderboard
    from .vpn import VPNServer


def jwt_expired(token: str) -> bool:
    """ Checks if a JWT token is expired

    Args:
        token: A JWT string - 3 Base64 sequences joined with .

    Returns:
        If the token is expired

    """
    payload = base64.b64decode(token.split('.')[1] + "==").decode()
    if time.time() > json.loads(payload)['exp']:
        return True
    else:
        return False


[docs]class HTBClient: """The client via which API requests are made Examples: Connecting to the API:: from hackthebox import HTBClient client = HTBClient(email="user@example.com", password="S3cr3tP455w0rd!") Attributes: challenge_cooldown: Time when next download is allowed """ # noinspection PyUnresolvedReferences _user: Optional["User"] = None _access_token: Optional[str] _refresh_token: Optional[str] _app_token: Optional[str] _api_base: str challenge_cooldown: int = 0 def _refresh_access_token(self): """ Use a saved refresh token to gain a new access token when the current one expires """ headers = {"User-Agent": USER_AGENT} r = requests.post(self._api_base + "login/refresh", json={ "refresh_token": self._refresh_token }, headers=headers) data = r.json()['message'] if isinstance(data, str) and data.startswith("Unauthenticated"): raise AuthenticationException self._access_token = data['access_token'] self._refresh_token = data['refresh_token']
[docs] def do_request(self, endpoint, json_data=None, data=None, authorized=True, download=False, post=False) -> Union[ dict, bytes]: """ Args: endpoint: The API endpoint to request json_data: Data to be sent in JSON format data: Data to be sent in application/x-www-form-urlencoded format authorized: If the request requires an Authorization header download: If we are downloading raw data post: Force POST request Returns: The JSON response from the API or the raw data (if `download` is set) """ headers = {"User-Agent": USER_AGENT} if authorized: # Don't use authorization if the API base URL isn't the real one - # i.e. we're running a test if self._app_token is not None: headers['Authorization'] = "Bearer " + self._app_token elif self._access_token is not None and self._refresh_token is not None: if jwt_expired(self._access_token): self._refresh_access_token() headers['Authorization'] = "Bearer " + self._access_token else: raise AuthenticationException("No authentication tokens available") while True: if not json_data and not data: if post: r = requests.post(self._api_base + endpoint, headers=headers, stream=download) else: r = requests.get(self._api_base + endpoint, headers=headers, stream=download) else: r = requests.post( self._api_base + endpoint, json=json_data, data=data, headers=headers, stream=download ) if r.status_code != 429: break # Not sure on the exact ratelimit - loop until we don't get 429 else: time.sleep(1) if r.status_code == 404: raise NotFoundException if download: return r.content else: return r.json()
def __init__(self, email: Optional[str] = None, password: Optional[str] = None, otp: Optional[str | int] = None, cache: Optional[str] = None, api_base: str = API_BASE, remember: Optional[bool] = False, app_token: Optional[str] = None): """ Authenticates to the API. If `cache` is set, the client will attempt to load access tokens from the given path. If they cannot be found, or are expired, normal API authentication will take place, and the tokens will be dumped to the file for the next launch. Args: email: The authenticating user's email address password: The authenticating user's password otp: The current OTP of the user, if 2FA is enabled cache: The path to load/store access tokens from/to remember: Whether to create a long-lasting 'remember me' token app_token: Authenticate using a provided App Token """ self._api_base = api_base if cache is not None: if self.load_from_cache(cache) is False: self.do_login(email, password, otp, remember, app_token) self.dump_to_cache(cache) # Make sure we dump our current tokens out when we exit atexit.register(self.dump_to_cache, cache) else: self.do_login(email, password, otp, remember, app_token)
[docs] def load_from_cache(self, cache: str) -> bool: """ Args: cache: The cache file path Returns: Whether loading from the cache was successful """ if not os.path.exists(cache): return False with open(cache, 'r') as f: data = json.load(f) self._access_token = data.get('access_token') self._refresh_token = data.get('refresh_token') self._app_token = data.get('app_token') if self._access_token is not None: if jwt_expired(self._access_token): try: self._refresh_access_token() # Our refresh token is also invalid, we must log in again except AuthenticationException: return False return True
[docs] def dump_to_cache(self, cache): """ Dumps the current access and refresh tokens to a file Args: cache: The path to the cache file """ with open(cache, 'w') as f: json.dump({ "access_token": self._access_token, "refresh_token": self._refresh_token, "app_token": self._app_token }, f)
[docs] def do_login(self, email: Optional[str] = None, password: Optional[str] = None, otp: Optional[str | int] = None, remember: Optional[bool] = False, app_token: Optional[str] = None): """ Authenticates against the API. If credentials are not provided, they will be prompted for. """ self._app_token = app_token if app_token is not None: self._access_token = self._refresh_token = None else: if email is None: email = input("Email: ") if password is None: password = getpass.getpass() data = cast(dict, self.do_request("login", json_data={ "email": email, "password": password, "remember": remember }, authorized=False)) msg = data['message'] self._access_token = msg.get('access_token') if self._access_token is None: raise ApiError(f"Failed to get access token: {msg}") self._refresh_token = msg.get('refresh_token') if self._refresh_token is None: raise ApiError(f"Failed to get refresh token: {msg}") if data['message']['is2FAEnabled'] is True: if otp is None: otp = input("OTP: ") if type(otp) == int: # Optimistically try and create a string otp = f"{otp:06d}" resp = cast(dict, self.do_request("2fa/login", json_data={ "one_time_password": otp })) if "correct" not in resp['message']: raise IncorrectOTPException
# noinspection PyUnresolvedReferences
[docs] def search(self, search_term: str) -> "Search": """ Args: search_term: The search term to pass to the API Returns: A Search object with lists of the items retrieved from the API """ from .search import Search return Search(search_term, self)
# noinspection PyUnresolvedReferences
[docs] def get_machine(self, machine_id: int | str) -> "Machine": """ Args: machine_id: The platform ID or name of the `Machine` to fetch Returns: The requested `Machine` """ from .machine import Machine data = cast(dict, self.do_request(f"machine/profile/{machine_id}"))['info'] return Machine(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_todo_machines(self, limit: int = None) -> List[int]: """ Retrieve a list of `Machine` ID's from the API based on the users todo list Args: limit: The maximum number to fetch Returns: A list of `Machine` ID's """ data = cast(dict, self.do_request("home/user/todo"))['data']['machines'][:limit] return [m['id'] for m in data]
# noinspection PyUnresolvedReferences
[docs] def get_active_machine(self, release_arena: bool=False) -> Optional["MachineInstance"]: """ Retrieve `Machine` currently assigned to user Returns: The `Machine` currently assigned (or active) to user """ from .machine import Machine, MachineInstance if release_arena: info = cast(dict, self.do_request(f"release_arena/active"))['info'] else: info = cast(dict, self.do_request(f"machine/active"))['info'] if info: box = self.get_machine(info['id']) server = box._client.get_current_vpn_server(release_arena) return MachineInstance(box.ip, server, box, box._client) return None
# noinspection PyUnresolvedReferences
[docs] def get_machines(self, limit: int = None, retired: bool = False) -> List["Machine"]: """ Retrieve a list of `Machine` from the API Args: limit: The maximum number to fetch retired: Whether to fetch from the retired list instead of the active list Returns: A list of `Machine` """ from .machine import Machine if not retired: data = cast(dict, self.do_request("machine/list"))['info'][:limit] else: data = cast(dict, self.do_request("machine/list/retired"))['info'][:limit] machines = [Machine(m, self, summary=True) for m in data] for machine in machines: machine.retired = retired return machines
# noinspection PyUnresolvedReferences
[docs] def get_challenge(self, challenge_id: int | str) -> "Challenge": """ Args: challenge_id: The platform ID or name of the `Challenge` to fetch Returns: The requested `Challenge` """ from .challenge import Challenge data = cast(dict, self.do_request(f"challenge/info/{challenge_id}"))['challenge'] return Challenge(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_challenges(self, limit=None, retired=False) -> List["Challenge"]: """Requests a list of `Challenge` from the API Args: limit: The maximum number of `Challenge` to fetch retired: Whether to fetch from the retired list instead of the active list Returns: A list of `Challenge` """ from .challenge import Challenge if retired: data = cast(dict, self.do_request("challenge/list/retired")) else: data = cast(dict, self.do_request("challenge/list")) challenges = [] for challenge in data['challenges'][:limit]: challenges.append(Challenge(challenge, self, summary=True)) return challenges
# noinspection PyUnresolvedReferences
[docs] def get_endgame(self, endgame_id: int) -> "Endgame": """Requests an Endgame from the API Args: endgame_id: The ID of the Endgame to fetch Returns: An Endgame """ from .endgame import Endgame data = cast(dict, self.do_request(f"endgame/{endgame_id}"))["data"] return Endgame(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_endgames(self, limit: int = None) -> List["Endgame"]: """Requests a list of Endgames from the API Args: limit: The maximum number of Endgames to fetch Returns: A list of Endgames """ from .endgame import Endgame data = cast(dict, self.do_request(f"endgames"))["data"][:limit] endgames = [] for endgame in data: endgames.append(Endgame(endgame, self, summary=True)) return endgames
# noinspection PyUnresolvedReferences
[docs] def get_fortress(self, fortress_id: int) -> "Fortress": """Requests an Fortress from the API Args: fortress_id: The ID of the Fortress to fetch Returns: A Fortresse """ from .fortress import Fortress data = cast(dict, self.do_request(f"fortress/{fortress_id}"))["data"] return Fortress(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_fortresses(self, limit: int = None) -> List["Fortress"]: """Requests a list of Fortresses from the API Args: limit: The maximum number of Fortresses to fetch Returns: A list of Fortresses """ from .fortress import Fortress data = cast(dict, self.do_request(f"fortresses"))["data"] fortresses = [] # For some reason, the fortress list is in the format {"1": <fortress1>, "2": <fortress2>} # instead of [<fortress1>, <fortress2>], meaning we have to sort it ourselves for fortress_id, fortress in sorted(data.items())[:limit]: fortresses.append(Fortress(fortress, self, summary=True)) return fortresses
# noinspection PyUnresolvedReferences
[docs] def get_user(self, user_id: int) -> "User": """ Args: user_id: The platform ID of the `User` to fetch Returns: The requested `User` """ from .user import User data = cast(dict, self.do_request(f"user/profile/basic/{user_id}"))['profile'] return User(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_team(self, team_id: int) -> "Team": """ Args: team_id: The platform ID of the `Team` to fetch Returns: The requested `Team` """ from .team import Team data = cast(dict, self.do_request(f"team/info/{team_id}")) return Team(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_hof(self, vip: bool = False) -> "Leaderboard": """ Returns: A Leaderboard of the top 100 Users """ from .leaderboard import Leaderboard from .user import User endpoint = "rankings/users" if vip: endpoint += "?vip=1" data = cast(dict, self.do_request(endpoint))['data'] return Leaderboard(data, self, User)
# noinspection PyUnresolvedReferences
[docs] def get_hof_countries(self) -> "Leaderboard": """ Returns: A Leaderboard of the top 100 Countries """ from .leaderboard import Leaderboard, Country data = cast(dict, self.do_request("rankings/countries"))['data'] return Leaderboard(data, self, Country)
# noinspection PyUnresolvedReferences
[docs] def get_hof_teams(self) -> "Leaderboard": """ Returns: A Leaderboard of Teams """ from .leaderboard import Leaderboard from .team import Team data = cast(dict, self.do_request("rankings/teams"))['data'] return Leaderboard(data, self, Team)
# noinspection PyUnresolvedReferences
[docs] def get_hof_universities(self) -> "Leaderboard": """ Returns: A Leaderboard of Universities """ from .leaderboard import Leaderboard, University data = cast(dict, self.do_request("rankings/universities"))['data'] return Leaderboard(data, self, University)
# noinspection PyUnresolvedReferences
[docs] def get_current_vpn_server(self, release_arena=False) -> VPNServer: """ Returns: The currently assigned `VPNServer` Args: release_arena: Get the current release arena VPN server """ from .vpn import VPNServer if release_arena: connections = cast(dict, self.do_request('connections/servers?product=release_arena'))['data'] data = connections['assigned'] else: connections = cast(dict, self.do_request('connections'))['data'] data = connections['lab']['assigned_server'] return VPNServer(data, self)
# noinspection PyUnresolvedReferences
[docs] def get_all_vpn_servers(self, release_arena=False) -> "List[VPNServer]": """ Returns: A list of `VPNServer` Args: release_arena: Use the release arena VPN servers """ from .vpn import VPNServer if release_arena: data = cast(dict, self.do_request("connections/servers?product=release_arena"))["data"]["options"] else: data = cast(dict, self.do_request("connections/servers?product=labs"))["data"]["options"] servers = [] for location in data.keys(): # 'EU' for location_role in data[location].keys(): # 'EU - Free' for server in data[location][location_role]["servers"].values(): servers.append(VPNServer(server, self)) return servers
# noinspection PyUnresolvedReferences @property def user(self) -> "User": """ Returns: The `User` associated with the current `HTBClient` """ if not self._user: uid = cast(dict, self.do_request("user/info"))['info']['id'] self._user = self.get_user(uid) return self._user
class HTBObject: """ Base class of all API objects Attributes: id: The ID of the associated object """ _client: HTBClient # Attributes not fetched by a summary _detailed_attributes: Tuple[str, ...] _detailed_func: Callable[..., Any] _is_summary: bool = False id: int def __getattr__(self, item): """Retrieve attributes not given when initialised from a summary Some endpoints only provide a subset of the attributes available for a given object. If these extra attributes are requested, the object will request the full data from the API and fill out the missing items. Args: item: The name of the property to retrieve """ if item in self._detailed_attributes and self._is_summary: new_obj = self._detailed_func(self.id) for attr in self._detailed_attributes: setattr(self, attr, getattr(new_obj, attr)) self._is_summary = False return getattr(new_obj, item) else: raise AttributeError def __eq__(self, other): return self.id == other.id and type(self) == type(other)