teahaz.client

The module containing the main objects for the Teahaz API wrapper.

View Source
"""The module containing the main objects for the Teahaz API wrapper."""

# pylint: disable=too-many-instance-attributes, too-many-lines

from __future__ import annotations

import os
import json
import pickle
import shutil
from pathlib import Path
from enum import Enum, auto
from threading import Thread
from dataclasses import asdict
from time import sleep, time as epoch
from base64 import b64encode, b64decode
from typing import Callable, Any, Union

import requests

from .dataclasses import (
    User,
    Invite,
    Channel,
    Message,
)

from .types import EventCallback

__all__ = [
    "Event",
    "Teacup",
    "Chatroom",
]


class Event(Enum):
    """Events that `Chatroom` and `Teacup` can subscribe to"""

    ERROR = auto()
    """An error occured.

    Args:
        response: The `requests.Response` object.
        method: The HTTP method used.
        request_args: A dictionary of arguments sent with the request.
    """

    NETWORK_EXCEPTION = auto()
    """A network exception has occured.

    Args:
        exception: The Exception instance that occured.
        method: The HTTP method used.
        request_args: A dictionary of arguments sent with the request.
    """

    MSG_SENT = auto()
    """A message has been sent.

    This can be used by clients to show the outgoing message,
    before the server receives and sends it back.

    Args:
        message: The **locally instanced** `Message`. Only use this while the real
            message has not yet arrived.
    """

    MSG_NEW = auto()
    """A new message has arrived.

    Args:
        message: The new `Message` instance.
    """

    MSG_DEL = auto()
    """A message was deleted.

    Args:
        message: The deleted `Message` instance.
    """

    MSG_SYS = auto()
    """A system message has arrived.

    Args:
        message: The system message.
    """

    MSG_SYS_SILENT = auto()
    """A silent system message has arrived. These should normally not be displayed.

    Args:
        message: The silent system message.
    """

    USER_JOIN = auto()
    """A new user has joined the chatroom.

    Note:
        **Not yet implemented.**
    """

    USER_LEAVE = auto()
    """A user has left the chatroom.

    Note:
        **Not yet implemented.**
    """

    SERVER_INFO = auto()
    """Some server information has changed.

    Note:
        **Not yet implemented.**
    """


class EndpointContainer:
    """Contains the endpoints of the Teahaz API."""

    _items = {
        "base": "{url}/api/v0",
        "login": "{base}/login/{chatroom_id}",
        "chatroom": "{base}/chatroom",
        "files": "{base}/files/{chatroom_id}",
        "messages": "{base}/messages/{chatroom_id}",
        "channels": "{base}/channels/{chatroom_id}",
        "invites": "{base}/invites/{chatroom_id}",
        "users": "{base}/users/{chatroom_id}",
    }

    def __init__(self, url: str, uid: str | None = None) -> None:
        """Initializes object.

        Args:
            url: The URL to use.
            uid: The chatroom uid to use.
        """

        self.url = url
        self.uid = uid

    def list(self) -> list[str]:
        """Returns a list of all endpoints."""

        return [getattr(self, key) for key in self._items]

    def __getattr__(self, item: str) -> str:
        """Gets a formatted endpoint.

        Args:
            item: The endpoint key.
        """

        # This is a special key
        if item == "base":
            return self._items["base"].format(url=self.url)

        return self._items[item].format(
            url=self.url, base=self.base, chatroom_id=self.uid
        )


class Chatroom:
    """The object to deal with all chatroom-related API actions."""

    def __init__(
        self,
        url: str,
        uid: str | None = None,
        name: str | None = None,
        session: requests.Session | None = None,
    ) -> None:
        """Initializes chatroom.

        Args:
            url: The chatroom's server URL:PORT.
            uid: The chatroom's UUID.
            name: The display name of the chatroom.
            session: Session that should be used by this chatroom.

        Every argument except URL is optional. This object should usually
        be instanced within the module, not by outside code.
        """

        self.uid = uid
        self.url = url
        self.name = name
        self.interval = 1

        self.username: str | None = None
        self.session = session or requests.Session()
        self.active_channel: Channel | None = None
        self.channels: list[Channel] = []

        self.event_thread = Thread(target=self._loop)

        # If the chatroom doesn't exist yet its endpoints' uid
        # is only filled in the create() method
        self.endpoints = EndpointContainer(self.url, self.uid)

        self.messages: list[Message] = []

        self._listeners: dict[Event, EventCallback] = {}
        self._is_looping: bool = False
        self._is_stopped: bool = False
        self._is_server_side: bool = False
        self._last_get_time: float = epoch()

    def _request(self, method_name: str, **req_args: Any) -> Any | None:
        """Sends a request, handles events & exceptions.

        Args:
            method_name: An HTTP method name, such as GET.
            **req_args: Arguments passed to the request.

        Returns:
        - JSON of response if `status_code == 200`
        - None if exception occured but was handled

        Raises:
            ValueError: Invalid HTTP method was passed.
            RuntimeError: Response status_code was not 200, and
                no handler was available to call.
        """

        method = getattr(self.session, method_name)
        if method is None:
            raise ValueError(f'Session does not have a method for "{method_name}".')

        error_handler = self._listeners.get(Event.ERROR)
        exception_handler = self._listeners.get(Event.NETWORK_EXCEPTION)

        try:
            response = method(**req_args)
        except Exception as exception:
            if exception_handler is not None:
                exception_handler(exception, method_name, req_args)  # type: ignore

                return None

            raise exception

        if response.status_code == 200:
            return response.json()

        if error_handler is not None:
            error_handler(response, method_name, req_args)  # type: ignore

            # maybe this could return CapturedError?
            return None

        raise RuntimeError(
            f"{method_name.upper()} request with data {req_args} failed"
            f" with no error or exception handler: {response.status_code} -> {response.text}"
        )

    def _notify(self, event: Event, *data: Any) -> None:
        """Notifies listeners of an event.

        Args:
            event: The event to notify for.
            *data: Any arguments passed to the listeners.
        """

        callback = self._listeners.get(event)
        if callback is None:
            return

        callback(*data)

    def _loop(self) -> None:
        """The main event loop for a chatroom"""

        ids = [msg.uid for msg in self.messages]
        self._last_get_time = epoch()

        while not self._is_stopped:
            if self.active_channel is None:
                sleep(self.interval)
                continue

            # We need to assign to a temporary
            # variable, otherwise messages can
            # get stuck between setting & getting.
            previous = self._last_get_time
            self._last_get_time = epoch()
            messages = self.get_since(previous)

            if messages is not None:
                # there is no good way to type these
                messages.sort(key=lambda msg: msg.send_time)

                for message in messages:
                    # This is needed to avoid duplicates
                    if message.uid in ids:
                        continue

                    self.messages.append(message)
                    ids.append(message.uid)

                    if message.message_type == "delete":
                        self._notify(Event.MSG_DEL, message)

                    elif message.message_type == "system":
                        self._notify(Event.MSG_SYS, message)

                    elif message.message_type == "system-silent":
                        self._notify(Event.MSG_SYS_SILENT, message)

                    else:
                        self._notify(Event.MSG_NEW, message)

            sleep(self.interval)

    def _run(self) -> None:
        """Runs monitoring loop"""

        self._is_looping = True
        self.event_thread.start()
        self._update_thread_name()

    def _update_channels(self, channels: list[Channel] | None = None) -> None:
        """Updates channels available to the user."""

        assert self.username, "Please log in before getting channels!"

        if channels is None:
            channels = self.get_channels()
            if channels is None:
                return

        for channel in channels:
            if channel not in self.channels:
                self.channels.append(channel)

        if self.active_channel is None and len(self.channels) > 0:
            self.active_channel = self.channels[0]

    def _update_thread_name(self) -> None:
        """Sets self.event_thread.name."""

        self.event_thread.name = f'Chatroom(uid="{self.uid}")'

    def _get_messages(
        self,
        method: str,
        channel: Channel | None = None,
        count: str | None = None,
        time: str | None = None,
    ) -> list[Message] | None:
        """Gets messages by time (since) or count.

        Args:
            method: `time` or `count`.
            channel: The channel to get messages from.
            count: How many messages to get. Only used when `method=="count"`.
            time: The time to get messages since.
        """

        if channel is not None:
            self.active_channel = channel

        elif self.active_channel is None:
            raise ValueError(
                "Please use either the Chatroom.set_channel() function"
                + " or provide `channel` as a non-null value!"
            )

        else:
            channel = self.active_channel

        headers = {
            "get-method": method,
            "username": self.username,
            "channelID": channel.uid,
            "count": count,
            "time": time,
        }

        messages: list[dict[str, Any]] | None = self._request(
            "get",
            url=self.endpoints.messages,
            headers=headers,
        )

        if messages is None:
            # Getting messages failed, but error was captured
            return None

        instances = []
        for message in messages:
            if not message["type"].startswith("system"):
                message["data"] = self._decrypt(message["data"])

            instances.append(Message.from_dict(message))

        return instances

    @staticmethod
    def _encrypt(message: bytes) -> str:
        """Encrypts the given message.

        Note:
            As encryption is currently not supported by the server, all this function does
            is b64encode the given string.

        Args:
            message: Text to encrypt.

        Returns:
            The encrypted text.
        """

        return b64encode(message).decode("ascii")

    @staticmethod
    def _decrypt(message: bytes) -> str:
        """Decrypts the given message.

        Note:
            As encryption is currently not supported by the server, all this function does
            is b64decode the given string.

        Args:
            message: Text to decrypt.

        Returns:
            The decrypted text.
        """

        return b64decode(message).decode("ascii")

    def initialize_from_response(self, response: dict) -> None:
        """Initializes data of chatroom from a response dict.

        Args:
            response: A dictionary of Teahaz server response.
        """

        self.name = response["chatroom_name"]
        self.uid = response["chatroomID"]
        self.username = response["users"][0]["username"]

        assert self.uid is not None
        self.endpoints.uid = self.uid
        self._update_thread_name()

        self._update_channels(
            [Channel.from_dict(channel) for channel in response["channels"]]
        )
        self._is_server_side = True

    def subscribe(self, event: Event, callback: EventCallback) -> None:
        """Start listening for and event and run callback when it occurs.

        Sideeffect:
            This method will call `self._run()` if the event passed is not an
            Error or NetworkException.
        """

        self._listeners[event] = callback

        if not self._is_looping and not event in [Event.ERROR, Event.NETWORK_EXCEPTION]:
            self._run()

    def stop(self) -> None:
        """Stops event loop."""

        self._is_stopped = True

    def create(self, username: str, password: str) -> Chatroom | None:
        """Creates a new chatroom on the server.

        Args:
            username: The owner account's username.
            password: The owner account's password.

        Returns:
            This chatroom, logged into the given owner account.
        """

        data = {
            "chatroom-name": self.name,
            "username": username,
            "password": password,
        }

        response = self._request(
            "post",
            headers={"Content-Type": "application/json"},
            url=self.endpoints.chatroom,
            json=data,
        )

        if response is None:
            # Creation did not succeed, but error was captured
            return None

        self.initialize_from_response(response)
        return self

    def create_channel(self, name: str) -> Channel | None:
        """Creates a channel.

        Args:
            name: The display name for the new channel.

        Returns:
            The new channel object in case of success, None otherwise.
        """

        data = {
            "username": self.username,
            "channel-name": name,
            "permissions": [{"classID": "1", "r": True, "w": True, "x": False}],
        }

        response = self._request("post", url=self.endpoints.channels, json=data)

        if response is None:
            # Creation of chatroom failed, but error was captured
            return None

        channel = Channel.from_dict(response)
        self._update_channels([channel])

        return channel

    def create_invite(
        self, uses: int = 1, expiration_time: float | None = None
    ) -> Invite | None:
        """Creates an invite to this chatroom.

        Args:
            uses: How many times this invite can be used.
            expiration_time: Epoch float describing when the invite will no
                longer be valid.

        Returns:
            Invite object in case of success, None otherwise.
        """

        headers = {
            "username": self.username,
        }

        if uses is not None:
            headers["uses"] = str(uses)

        if expiration_time is not None:
            headers["expiration-time"] = str(expiration_time)

        response = self._request("get", url=self.endpoints.invites, headers=headers)

        if response is None:
            return None

        response["url"] = self.url
        response["chatroomID"] = self.uid
        return Invite.from_dict(response)

    def create_from_invite(
        self, invite: Invite, username: str, password: str
    ) -> Chatroom | None:
        """Initializes chatroom from an invite.

        Args:
            invite: The invite object to use.
            username: The username of the account registered using this invite.
            password: The password of the account registered using this invite.

        Returns:
            This chatroom instance logged into the user on success, None otherwise.
        """

        data = {
            "inviteID": invite.uid,
            "username": username,
            "password": password,
        }

        response = self._request("post", url=self.endpoints.invites, json=data)

        if response is None:
            return None

        self.initialize_from_response(response)

        return self

    def get_users(self) -> list[User] | None:
        """Gets all users in a chatroom.

        Returns:
            A list of User instances on success, None otherwise.
        """

        users = self._request(
            "get",
            url=self.endpoints.users,
            headers={"username": self.username},
        )

        if users is None:
            # Getting users failed, but error was captured
            return None

        return [User.from_dict(user) for user in users]

    def get_channels(self) -> list[Channel] | None:
        """Gets all channels the logged-in user has access to.

        Returns:
            A list of Channel instances on success, None otherwise.
        """

        channels = self._request(
            "get",
            url=self.endpoints.channels,
            headers={"username": self.username},
        )

        if channels is None:
            # Getting channels failed, but error was captured
            return None

        return [Channel.from_dict(channel) for channel in channels]

    def login(self, username: str, password: str) -> requests.Response | None:
        """Logs into the chatroom with given credentials.

        Args:
            username: Username to log into.
            password: Password to log in with.

        Returns:
            Raw response for some reason.
        """

        data = {
            "username": username,
            "password": password,
        }

        response = self._request(
            "post",
            url=self.endpoints.login,
            json=data,
        )

        if response is None:
            return None

        self.username = username
        self._update_channels()
        self._is_server_side = True

        return response

    def get_since(
        self, since: float, channel: Channel | None = None
    ) -> list[Message] | None:
        """Gets messages since provided timestamp.

        Args:
            since: UNIX epoch timestamp from which to get messages.
            channel: The optional channel to filter messages by.

        Returns:
            A list of messages on success, None otherwise.
        """

        return self._get_messages(
            "since",
            channel,
            time=str(since),
        )

    def get_count(
        self, count: int, channel: Channel | None = None
    ) -> list[Message] | None:
        """Gets a certain count of messages sent since provided timestamp.

        Args:
            count: The maximum amount of messages returned.
            channel: The channel to filter messages by.

        Returns:
            A list of messages on success, None otherwise.
        """

        return self._get_messages(
            "count",
            channel,
            str(count),
        )

    def send(
        self,
        content: Union[str, bytes],
        channel: Channel | None = None,
        reply_id: str | None = None,
    ) -> Message | None:
        """Sends a message.

        Args:
            content: Message data. Currently only `str` is supported.
            channel: The channel to send the message on. Defaults to self.active_channel.
            reply_id: The optional id of the messages this one will reply to.

        Returns:
            None. The outgoing message can be acquired by subscribing to `Event.MSG_SENT`.

        Raises:
            ValueError: No channel was passed, and self.active_channel is None.

        Sideeffect:
            This changes self.active_channel to the provided one, if it isn't None.
        """

        msg = {}
        if channel is not None:
            self.active_channel = channel

        elif self.active_channel is None:
            raise ValueError(
                "No active channel set. Please use either the Chatroom.set_channel() function"
                + " or provide `channel` as a non-null value!"
            )

        if isinstance(content, bytes):
            endpoint = self.endpoints.files
        else:
            endpoint = self.endpoints.messages

        if isinstance(content, str):
            content = content.encode("ascii")

        msg = {
            "username": self.username,
            "channelID": self.active_channel.uid,
            "replyID": reply_id,
            "data": self._encrypt(content),
        }

        sent = self._request(
            "post",
            url=endpoint,
            json=msg,
        )

        if sent is not None:
            sent["data"] = content.decode("ascii")
            message_out = Message.from_dict(sent)
            self._notify(Event.MSG_SENT, message_out)

            return message_out

        return None


class Teacup:
    """The object to manage all API related actions.

    This class itself doesn't actually do any networking, rather it creates
    objects (`Chatroom`-s) that do all the dirty work.

    Standard flow of using a Teacup:

    ```python3
    from teahaz import Teacup

    cup = Teacup()
    chatroom = cup.login("username", "password", "chatroom-uuid", "url")
    chatroom.send("hello world!")
    ```
    """

    def __init__(self) -> None:
        """Initializes Teacup."""

        self.chatrooms: list[Chatroom] = []
        self._global_listeners: dict[Event, EventCallback] = {}

    @classmethod
    def from_dump(cls, save_root: str | Path) -> Teacup:
        """Restore a dump."""

        cup = cls()
        for directory in os.listdir(save_root):
            dirpath = Path(save_root) / directory

            if not dirpath.is_dir():
                continue

            with open(dirpath / "data.json", "r", encoding="utf-8") as datafile:
                data = json.load(datafile)

            with open(dirpath / "messages.json", "r", encoding="utf-8") as messagefile:
                messages = json.load(messagefile)

            with open(dirpath / "session.pickle", "rb") as picklefile:
                session = pickle.load(picklefile)

            chat = Chatroom(data["url"], session=session)
            for channel in data["channels"]:
                channel["channelID"] = channel["uid"]

            for message in messages:
                message["messageID"] = message["uid"]
                message["time"] = message["send_time"]
                message["type"] = message["message_type"]
                message["channelID"] = message["channel_id"]

            chat.messages = [Message.from_dict(msg) for msg in messages]
            chat.initialize_from_response(data)

            cup.chatrooms.append(chat)

        return cup

    def dump_to(
        self,
        save_root: str | Path,
        remove_old: bool = True,
        max_msg_count: int | None = None,
    ) -> None:
        """Dumps all chatrooms to the given save_root.

        Each dump will have the following structure:
        ```
        save_root
        |_ <chat_id1>
        |   |_ data.json
        |   |   |_ chatroomID: <value>
        |   |   |_ url: <value>
        |   |   |_ chatroom_name: <value>
        |   |   |_ users: <value>
        |   |   |_ channels: <value>
        |   |   |_ users: <value>
        |   |
        |   |_ messages.json
        |   |   |_ <list of chatroom.messages>
        |   |
        |   |_ session.pickle
        |       |_ <picklefile of chatroom.session>
        |
        |_ <chat_id2>
            |_ ...
        ```

        Args:
            save_root: The root directory to save to.
            remove_old: If set, the previous content of the root directory
                will be wiped before dumping.
        """

        if isinstance(save_root, Path):
            root = save_root
        else:
            root = Path(save_root)

        if remove_old:
            for path in os.listdir(root):
                if not os.path.isdir(path):
                    continue

                shutil.rmtree(root / path)

        for chatroom in self.chatrooms:
            assert chatroom.uid is not None

            datapath = root / chatroom.uid / "data.json"
            messagepath = root / chatroom.uid / "messages.json"
            picklepath = root / chatroom.uid / "session.pickle"

            if not os.path.exists(root / chatroom.uid):
                os.mkdir(root / chatroom.uid)

            with open(datapath, "w", encoding="utf-8") as datafile:
                # TODO: Change this once there is proper support for users
                users = [{"username": chatroom.username}]

                json.dump(
                    {
                        "chatroomID": chatroom.uid,
                        "url": chatroom.url,
                        "chatroom_name": chatroom.name,
                        "users": users,
                        "channels": [asdict(channel) for channel in chatroom.channels],
                    },
                    datafile,
                )

            with open(messagepath, "w", encoding="utf-8") as messagefile:
                if max_msg_count is not None:
                    chatroom.messages = chatroom.messages[:max_msg_count]

                json.dump([asdict(msg) for msg in chatroom.messages], messagefile)

            with open(picklepath, "wb") as picklefile:
                pickle.dump(chatroom.session, picklefile)

    def get_threads(self) -> list[str]:
        """Gets names of all chatroom threads."""

        return [chatroom.event_thread.name for chatroom in self.chatrooms]

    def login(self, url: str, chatroom: str, username: str, password: str) -> Chatroom:
        """Creates a logged-in chatroom instance.

        Args:
            url: The server URL:PORT.
            chatroom: The UUID of the chatroom.
            username: Login username for chatroom.
            password: Login password for chatroom.

        Returns:
            A chatroom instance with given user logged in.
        """

        chat = Chatroom(url=url, uid=chatroom)

        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        chat.login(username, password)
        self.chatrooms.append(chat)

        return chat

    def stop(self) -> None:
        """Stops all chatroom threads."""

        for chatroom in self.chatrooms:
            chatroom.stop()

    def get_chatroom(self, name: str) -> Chatroom | None:
        """Gets first chatroom by matching name.

        Args:
            name: The chatroom display name to search for.
        """

        for chatroom in self.chatrooms:
            if chatroom.name == name:
                return chatroom

        return None

    def create_chatroom(
        self, url: str, name: str, username: str, password: str
    ) -> Chatroom | None:
        """Creates a new chatroom.

        The given user will be the owner of the chatroom.

        Args:
            url: The server URL:PORT.
            name: The display name for the new chatroom.
            username: The login username.
            password: The login password.

        Returns:
            Either the logged-in chatroom, or None if its creation was
            unsuccessful **and** the error raised was captured.
        """

        chat = Chatroom(url=url, name=name)

        # Subscribe chatroom to all global events we are subscribed to
        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        if chat.create(username, password) is None:
            # Creation failed, but error was captured
            return None

        self.chatrooms.append(chat)
        return chat

    def use_invite(
        self, invite: Invite, username: str, password: str
    ) -> Chatroom | None:
        """Creates a chatroom instance from an invite.

        Args:
            invite: Invite instance.
            username: Username for new chatroom user.
            password: Password for new chatroom user.

        Returns:
            Logged-in chatroom, or None when creation failed but error
            was captured.
        """

        chat = Chatroom(url=invite.url, uid=invite.chatroom_id)

        # Subscribe chatroom to all global events we are subscribed to
        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        if chat.create_from_invite(invite, username, password) is None:
            # Creation failed, but error was captured
            return None

        self.chatrooms.append(chat)
        return chat

    def subscribe_all(self, event: Event, callback: EventCallback) -> None:
        """Subscribes callback to event in all (current & future) Chatrooms.

        Args:
            event: The event to subscribe to.
            callback: The callback that shall be called.
        """

        for chatroom in self.chatrooms:
            chatroom.subscribe(event, callback)

        self._global_listeners[event] = callback

    @staticmethod
    def threaded(
        target: Callable[..., Any], callback: Callable[..., Any] | None = None
    ) -> Callable[..., None]:
        """Returns a threaded callable for target.

        Args:
            target: The callable to thread.
            callback: The callable that will be called with return value
                of `target`.

        Returns:
            A lambda function that runs `target` in a thread, passing its
            return value to `callback`.
        """

        def _call_target(*args, **kwargs) -> None:
            """Calls the target."""

            returned = target(*args, **kwargs)
            if callback is not None:
                callback(returned)

        return lambda *args, **kwargs: Thread(
            target=_call_target, args=args, kwargs=kwargs
        ).start()
#   class Event(enum.Enum):
View Source
class Event(Enum):
    """Events that `Chatroom` and `Teacup` can subscribe to"""

    ERROR = auto()
    """An error occured.

    Args:
        response: The `requests.Response` object.
        method: The HTTP method used.
        request_args: A dictionary of arguments sent with the request.
    """

    NETWORK_EXCEPTION = auto()
    """A network exception has occured.

    Args:
        exception: The Exception instance that occured.
        method: The HTTP method used.
        request_args: A dictionary of arguments sent with the request.
    """

    MSG_SENT = auto()
    """A message has been sent.

    This can be used by clients to show the outgoing message,
    before the server receives and sends it back.

    Args:
        message: The **locally instanced** `Message`. Only use this while the real
            message has not yet arrived.
    """

    MSG_NEW = auto()
    """A new message has arrived.

    Args:
        message: The new `Message` instance.
    """

    MSG_DEL = auto()
    """A message was deleted.

    Args:
        message: The deleted `Message` instance.
    """

    MSG_SYS = auto()
    """A system message has arrived.

    Args:
        message: The system message.
    """

    MSG_SYS_SILENT = auto()
    """A silent system message has arrived. These should normally not be displayed.

    Args:
        message: The silent system message.
    """

    USER_JOIN = auto()
    """A new user has joined the chatroom.

    Note:
        **Not yet implemented.**
    """

    USER_LEAVE = auto()
    """A user has left the chatroom.

    Note:
        **Not yet implemented.**
    """

    SERVER_INFO = auto()
    """Some server information has changed.

    Note:
        **Not yet implemented.**
    """

Events that Chatroom and Teacup can subscribe to

#   ERROR = <Event.ERROR: 1>

An error occured.

Args
  • response: The requests.Response object.
  • method: The HTTP method used.
  • request_args: A dictionary of arguments sent with the request.
#   NETWORK_EXCEPTION = <Event.NETWORK_EXCEPTION: 2>

A network exception has occured.

Args
  • exception: The Exception instance that occured.
  • method: The HTTP method used.
  • request_args: A dictionary of arguments sent with the request.
#   MSG_SENT = <Event.MSG_SENT: 3>

A message has been sent.

This can be used by clients to show the outgoing message, before the server receives and sends it back.

Args
  • message: The locally instanced Message. Only use this while the real message has not yet arrived.
#   MSG_NEW = <Event.MSG_NEW: 4>

A new message has arrived.

Args
  • message: The new Message instance.
#   MSG_DEL = <Event.MSG_DEL: 5>

A message was deleted.

Args
  • message: The deleted Message instance.
#   MSG_SYS = <Event.MSG_SYS: 6>

A system message has arrived.

Args
  • message: The system message.
#   MSG_SYS_SILENT = <Event.MSG_SYS_SILENT: 7>

A silent system message has arrived. These should normally not be displayed.

Args
  • message: The silent system message.
#   USER_JOIN = <Event.USER_JOIN: 8>

A new user has joined the chatroom.

Note

Not yet implemented.

#   USER_LEAVE = <Event.USER_LEAVE: 9>

A user has left the chatroom.

Note

Not yet implemented.

#   SERVER_INFO = <Event.SERVER_INFO: 10>

Some server information has changed.

Note

Not yet implemented.

Inherited Members
enum.Enum
name
value
#   class Teacup:
View Source
class Teacup:
    """The object to manage all API related actions.

    This class itself doesn't actually do any networking, rather it creates
    objects (`Chatroom`-s) that do all the dirty work.

    Standard flow of using a Teacup:

    ```python3
    from teahaz import Teacup

    cup = Teacup()
    chatroom = cup.login("username", "password", "chatroom-uuid", "url")
    chatroom.send("hello world!")
    ```
    """

    def __init__(self) -> None:
        """Initializes Teacup."""

        self.chatrooms: list[Chatroom] = []
        self._global_listeners: dict[Event, EventCallback] = {}

    @classmethod
    def from_dump(cls, save_root: str | Path) -> Teacup:
        """Restore a dump."""

        cup = cls()
        for directory in os.listdir(save_root):
            dirpath = Path(save_root) / directory

            if not dirpath.is_dir():
                continue

            with open(dirpath / "data.json", "r", encoding="utf-8") as datafile:
                data = json.load(datafile)

            with open(dirpath / "messages.json", "r", encoding="utf-8") as messagefile:
                messages = json.load(messagefile)

            with open(dirpath / "session.pickle", "rb") as picklefile:
                session = pickle.load(picklefile)

            chat = Chatroom(data["url"], session=session)
            for channel in data["channels"]:
                channel["channelID"] = channel["uid"]

            for message in messages:
                message["messageID"] = message["uid"]
                message["time"] = message["send_time"]
                message["type"] = message["message_type"]
                message["channelID"] = message["channel_id"]

            chat.messages = [Message.from_dict(msg) for msg in messages]
            chat.initialize_from_response(data)

            cup.chatrooms.append(chat)

        return cup

    def dump_to(
        self,
        save_root: str | Path,
        remove_old: bool = True,
        max_msg_count: int | None = None,
    ) -> None:
        """Dumps all chatrooms to the given save_root.

        Each dump will have the following structure:
        ```
        save_root
        |_ <chat_id1>
        |   |_ data.json
        |   |   |_ chatroomID: <value>
        |   |   |_ url: <value>
        |   |   |_ chatroom_name: <value>
        |   |   |_ users: <value>
        |   |   |_ channels: <value>
        |   |   |_ users: <value>
        |   |
        |   |_ messages.json
        |   |   |_ <list of chatroom.messages>
        |   |
        |   |_ session.pickle
        |       |_ <picklefile of chatroom.session>
        |
        |_ <chat_id2>
            |_ ...
        ```

        Args:
            save_root: The root directory to save to.
            remove_old: If set, the previous content of the root directory
                will be wiped before dumping.
        """

        if isinstance(save_root, Path):
            root = save_root
        else:
            root = Path(save_root)

        if remove_old:
            for path in os.listdir(root):
                if not os.path.isdir(path):
                    continue

                shutil.rmtree(root / path)

        for chatroom in self.chatrooms:
            assert chatroom.uid is not None

            datapath = root / chatroom.uid / "data.json"
            messagepath = root / chatroom.uid / "messages.json"
            picklepath = root / chatroom.uid / "session.pickle"

            if not os.path.exists(root / chatroom.uid):
                os.mkdir(root / chatroom.uid)

            with open(datapath, "w", encoding="utf-8") as datafile:
                # TODO: Change this once there is proper support for users
                users = [{"username": chatroom.username}]

                json.dump(
                    {
                        "chatroomID": chatroom.uid,
                        "url": chatroom.url,
                        "chatroom_name": chatroom.name,
                        "users": users,
                        "channels": [asdict(channel) for channel in chatroom.channels],
                    },
                    datafile,
                )

            with open(messagepath, "w", encoding="utf-8") as messagefile:
                if max_msg_count is not None:
                    chatroom.messages = chatroom.messages[:max_msg_count]

                json.dump([asdict(msg) for msg in chatroom.messages], messagefile)

            with open(picklepath, "wb") as picklefile:
                pickle.dump(chatroom.session, picklefile)

    def get_threads(self) -> list[str]:
        """Gets names of all chatroom threads."""

        return [chatroom.event_thread.name for chatroom in self.chatrooms]

    def login(self, url: str, chatroom: str, username: str, password: str) -> Chatroom:
        """Creates a logged-in chatroom instance.

        Args:
            url: The server URL:PORT.
            chatroom: The UUID of the chatroom.
            username: Login username for chatroom.
            password: Login password for chatroom.

        Returns:
            A chatroom instance with given user logged in.
        """

        chat = Chatroom(url=url, uid=chatroom)

        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        chat.login(username, password)
        self.chatrooms.append(chat)

        return chat

    def stop(self) -> None:
        """Stops all chatroom threads."""

        for chatroom in self.chatrooms:
            chatroom.stop()

    def get_chatroom(self, name: str) -> Chatroom | None:
        """Gets first chatroom by matching name.

        Args:
            name: The chatroom display name to search for.
        """

        for chatroom in self.chatrooms:
            if chatroom.name == name:
                return chatroom

        return None

    def create_chatroom(
        self, url: str, name: str, username: str, password: str
    ) -> Chatroom | None:
        """Creates a new chatroom.

        The given user will be the owner of the chatroom.

        Args:
            url: The server URL:PORT.
            name: The display name for the new chatroom.
            username: The login username.
            password: The login password.

        Returns:
            Either the logged-in chatroom, or None if its creation was
            unsuccessful **and** the error raised was captured.
        """

        chat = Chatroom(url=url, name=name)

        # Subscribe chatroom to all global events we are subscribed to
        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        if chat.create(username, password) is None:
            # Creation failed, but error was captured
            return None

        self.chatrooms.append(chat)
        return chat

    def use_invite(
        self, invite: Invite, username: str, password: str
    ) -> Chatroom | None:
        """Creates a chatroom instance from an invite.

        Args:
            invite: Invite instance.
            username: Username for new chatroom user.
            password: Password for new chatroom user.

        Returns:
            Logged-in chatroom, or None when creation failed but error
            was captured.
        """

        chat = Chatroom(url=invite.url, uid=invite.chatroom_id)

        # Subscribe chatroom to all global events we are subscribed to
        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        if chat.create_from_invite(invite, username, password) is None:
            # Creation failed, but error was captured
            return None

        self.chatrooms.append(chat)
        return chat

    def subscribe_all(self, event: Event, callback: EventCallback) -> None:
        """Subscribes callback to event in all (current & future) Chatrooms.

        Args:
            event: The event to subscribe to.
            callback: The callback that shall be called.
        """

        for chatroom in self.chatrooms:
            chatroom.subscribe(event, callback)

        self._global_listeners[event] = callback

    @staticmethod
    def threaded(
        target: Callable[..., Any], callback: Callable[..., Any] | None = None
    ) -> Callable[..., None]:
        """Returns a threaded callable for target.

        Args:
            target: The callable to thread.
            callback: The callable that will be called with return value
                of `target`.

        Returns:
            A lambda function that runs `target` in a thread, passing its
            return value to `callback`.
        """

        def _call_target(*args, **kwargs) -> None:
            """Calls the target."""

            returned = target(*args, **kwargs)
            if callback is not None:
                callback(returned)

        return lambda *args, **kwargs: Thread(
            target=_call_target, args=args, kwargs=kwargs
        ).start()

The object to manage all API related actions.

This class itself doesn't actually do any networking, rather it creates objects (Chatroom-s) that do all the dirty work.

Standard flow of using a Teacup:

from teahaz import Teacup

cup = Teacup()
chatroom = cup.login("username", "password", "chatroom-uuid", "url")
chatroom.send("hello world!")
#   Teacup()
View Source
    def __init__(self) -> None:
        """Initializes Teacup."""

        self.chatrooms: list[Chatroom] = []
        self._global_listeners: dict[Event, EventCallback] = {}

Initializes Teacup.

#  
@classmethod
def from_dump(cls, save_root: str | pathlib.Path) -> teahaz.client.Teacup:
View Source
    @classmethod
    def from_dump(cls, save_root: str | Path) -> Teacup:
        """Restore a dump."""

        cup = cls()
        for directory in os.listdir(save_root):
            dirpath = Path(save_root) / directory

            if not dirpath.is_dir():
                continue

            with open(dirpath / "data.json", "r", encoding="utf-8") as datafile:
                data = json.load(datafile)

            with open(dirpath / "messages.json", "r", encoding="utf-8") as messagefile:
                messages = json.load(messagefile)

            with open(dirpath / "session.pickle", "rb") as picklefile:
                session = pickle.load(picklefile)

            chat = Chatroom(data["url"], session=session)
            for channel in data["channels"]:
                channel["channelID"] = channel["uid"]

            for message in messages:
                message["messageID"] = message["uid"]
                message["time"] = message["send_time"]
                message["type"] = message["message_type"]
                message["channelID"] = message["channel_id"]

            chat.messages = [Message.from_dict(msg) for msg in messages]
            chat.initialize_from_response(data)

            cup.chatrooms.append(chat)

        return cup

Restore a dump.

#   def dump_to( self, save_root: str | pathlib.Path, remove_old: bool = True, max_msg_count: int | None = None ) -> None:
View Source
    def dump_to(
        self,
        save_root: str | Path,
        remove_old: bool = True,
        max_msg_count: int | None = None,
    ) -> None:
        """Dumps all chatrooms to the given save_root.

        Each dump will have the following structure:
        ```
        save_root
        |_ <chat_id1>
        |   |_ data.json
        |   |   |_ chatroomID: <value>
        |   |   |_ url: <value>
        |   |   |_ chatroom_name: <value>
        |   |   |_ users: <value>
        |   |   |_ channels: <value>
        |   |   |_ users: <value>
        |   |
        |   |_ messages.json
        |   |   |_ <list of chatroom.messages>
        |   |
        |   |_ session.pickle
        |       |_ <picklefile of chatroom.session>
        |
        |_ <chat_id2>
            |_ ...
        ```

        Args:
            save_root: The root directory to save to.
            remove_old: If set, the previous content of the root directory
                will be wiped before dumping.
        """

        if isinstance(save_root, Path):
            root = save_root
        else:
            root = Path(save_root)

        if remove_old:
            for path in os.listdir(root):
                if not os.path.isdir(path):
                    continue

                shutil.rmtree(root / path)

        for chatroom in self.chatrooms:
            assert chatroom.uid is not None

            datapath = root / chatroom.uid / "data.json"
            messagepath = root / chatroom.uid / "messages.json"
            picklepath = root / chatroom.uid / "session.pickle"

            if not os.path.exists(root / chatroom.uid):
                os.mkdir(root / chatroom.uid)

            with open(datapath, "w", encoding="utf-8") as datafile:
                # TODO: Change this once there is proper support for users
                users = [{"username": chatroom.username}]

                json.dump(
                    {
                        "chatroomID": chatroom.uid,
                        "url": chatroom.url,
                        "chatroom_name": chatroom.name,
                        "users": users,
                        "channels": [asdict(channel) for channel in chatroom.channels],
                    },
                    datafile,
                )

            with open(messagepath, "w", encoding="utf-8") as messagefile:
                if max_msg_count is not None:
                    chatroom.messages = chatroom.messages[:max_msg_count]

                json.dump([asdict(msg) for msg in chatroom.messages], messagefile)

            with open(picklepath, "wb") as picklefile:
                pickle.dump(chatroom.session, picklefile)

Dumps all chatrooms to the given save_root.

Each dump will have the following structure:

|_ <chat_id1>
|   |_ data.json
|   |   |_ chatroomID: <value>
|   |   |_ url: <value>
|   |   |_ chatroom_name: <value>
|   |   |_ users: <value>
|   |   |_ channels: <value>
|   |   |_ users: <value>
|   |
|   |_ messages.json
|   |   |_ <list of chatroom.messages>
|   |
|   |_ session.pickle
|       |_ <picklefile of chatroom.session>
|
|_ <chat_id2>
    |_ ...
Args
  • save_root: The root directory to save to.
  • remove_old: If set, the previous content of the root directory will be wiped before dumping.
#   def get_threads(self) -> list[str]:
View Source
    def get_threads(self) -> list[str]:
        """Gets names of all chatroom threads."""

        return [chatroom.event_thread.name for chatroom in self.chatrooms]

Gets names of all chatroom threads.

#   def login( self, url: str, chatroom: str, username: str, password: str ) -> teahaz.client.Chatroom:
View Source
    def login(self, url: str, chatroom: str, username: str, password: str) -> Chatroom:
        """Creates a logged-in chatroom instance.

        Args:
            url: The server URL:PORT.
            chatroom: The UUID of the chatroom.
            username: Login username for chatroom.
            password: Login password for chatroom.

        Returns:
            A chatroom instance with given user logged in.
        """

        chat = Chatroom(url=url, uid=chatroom)

        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        chat.login(username, password)
        self.chatrooms.append(chat)

        return chat

Creates a logged-in chatroom instance.

Args
  • url: The server URL:PORT.
  • chatroom: The UUID of the chatroom.
  • username: Login username for chatroom.
  • password: Login password for chatroom.
Returns

A chatroom instance with given user logged in.

#   def stop(self) -> None:
View Source
    def stop(self) -> None:
        """Stops all chatroom threads."""

        for chatroom in self.chatrooms:
            chatroom.stop()

Stops all chatroom threads.

#   def get_chatroom(self, name: str) -> teahaz.client.Chatroom | None:
View Source
    def get_chatroom(self, name: str) -> Chatroom | None:
        """Gets first chatroom by matching name.

        Args:
            name: The chatroom display name to search for.
        """

        for chatroom in self.chatrooms:
            if chatroom.name == name:
                return chatroom

        return None

Gets first chatroom by matching name.

Args
  • name: The chatroom display name to search for.
#   def create_chatroom( self, url: str, name: str, username: str, password: str ) -> teahaz.client.Chatroom | None:
View Source
    def create_chatroom(
        self, url: str, name: str, username: str, password: str
    ) -> Chatroom | None:
        """Creates a new chatroom.

        The given user will be the owner of the chatroom.

        Args:
            url: The server URL:PORT.
            name: The display name for the new chatroom.
            username: The login username.
            password: The login password.

        Returns:
            Either the logged-in chatroom, or None if its creation was
            unsuccessful **and** the error raised was captured.
        """

        chat = Chatroom(url=url, name=name)

        # Subscribe chatroom to all global events we are subscribed to
        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        if chat.create(username, password) is None:
            # Creation failed, but error was captured
            return None

        self.chatrooms.append(chat)
        return chat

Creates a new chatroom.

The given user will be the owner of the chatroom.

Args
  • url: The server URL:PORT.
  • name: The display name for the new chatroom.
  • username: The login username.
  • password: The login password.
Returns

Either the logged-in chatroom, or None if its creation was unsuccessful and the error raised was captured.

#   def use_invite( self, invite: teahaz.dataclasses.Invite, username: str, password: str ) -> teahaz.client.Chatroom | None:
View Source
    def use_invite(
        self, invite: Invite, username: str, password: str
    ) -> Chatroom | None:
        """Creates a chatroom instance from an invite.

        Args:
            invite: Invite instance.
            username: Username for new chatroom user.
            password: Password for new chatroom user.

        Returns:
            Logged-in chatroom, or None when creation failed but error
            was captured.
        """

        chat = Chatroom(url=invite.url, uid=invite.chatroom_id)

        # Subscribe chatroom to all global events we are subscribed to
        for event, callback in self._global_listeners.items():
            chat.subscribe(event, callback)

        if chat.create_from_invite(invite, username, password) is None:
            # Creation failed, but error was captured
            return None

        self.chatrooms.append(chat)
        return chat

Creates a chatroom instance from an invite.

Args
  • invite: Invite instance.
  • username: Username for new chatroom user.
  • password: Password for new chatroom user.
Returns

Logged-in chatroom, or None when creation failed but error was captured.

#   def subscribe_all( self, event: teahaz.client.Event, callback: Union[Callable[[teahaz.dataclasses.Message], Any], Callable[[requests.models.Response, str, Dict[str, Any]], Any]] ) -> None:
View Source
    def subscribe_all(self, event: Event, callback: EventCallback) -> None:
        """Subscribes callback to event in all (current & future) Chatrooms.

        Args:
            event: The event to subscribe to.
            callback: The callback that shall be called.
        """

        for chatroom in self.chatrooms:
            chatroom.subscribe(event, callback)

        self._global_listeners[event] = callback

Subscribes callback to event in all (current & future) Chatrooms.

Args
  • event: The event to subscribe to.
  • callback: The callback that shall be called.
#  
@staticmethod
def threaded( target: Callable[..., Any], callback: Optional[Callable[..., Any]] = None ) -> Callable[..., NoneType]:
View Source
    @staticmethod
    def threaded(
        target: Callable[..., Any], callback: Callable[..., Any] | None = None
    ) -> Callable[..., None]:
        """Returns a threaded callable for target.

        Args:
            target: The callable to thread.
            callback: The callable that will be called with return value
                of `target`.

        Returns:
            A lambda function that runs `target` in a thread, passing its
            return value to `callback`.
        """

        def _call_target(*args, **kwargs) -> None:
            """Calls the target."""

            returned = target(*args, **kwargs)
            if callback is not None:
                callback(returned)

        return lambda *args, **kwargs: Thread(
            target=_call_target, args=args, kwargs=kwargs
        ).start()

Returns a threaded callable for target.

Args
  • target: The callable to thread.
  • callback: The callable that will be called with return value of target.
Returns

A lambda function that runs target in a thread, passing its return value to callback.

#   class Chatroom:
View Source
class Chatroom:
    """The object to deal with all chatroom-related API actions."""

    def __init__(
        self,
        url: str,
        uid: str | None = None,
        name: str | None = None,
        session: requests.Session | None = None,
    ) -> None:
        """Initializes chatroom.

        Args:
            url: The chatroom's server URL:PORT.
            uid: The chatroom's UUID.
            name: The display name of the chatroom.
            session: Session that should be used by this chatroom.

        Every argument except URL is optional. This object should usually
        be instanced within the module, not by outside code.
        """

        self.uid = uid
        self.url = url
        self.name = name
        self.interval = 1

        self.username: str | None = None
        self.session = session or requests.Session()
        self.active_channel: Channel | None = None
        self.channels: list[Channel] = []

        self.event_thread = Thread(target=self._loop)

        # If the chatroom doesn't exist yet its endpoints' uid
        # is only filled in the create() method
        self.endpoints = EndpointContainer(self.url, self.uid)

        self.messages: list[Message] = []

        self._listeners: dict[Event, EventCallback] = {}
        self._is_looping: bool = False
        self._is_stopped: bool = False
        self._is_server_side: bool = False
        self._last_get_time: float = epoch()

    def _request(self, method_name: str, **req_args: Any) -> Any | None:
        """Sends a request, handles events & exceptions.

        Args:
            method_name: An HTTP method name, such as GET.
            **req_args: Arguments passed to the request.

        Returns:
        - JSON of response if `status_code == 200`
        - None if exception occured but was handled

        Raises:
            ValueError: Invalid HTTP method was passed.
            RuntimeError: Response status_code was not 200, and
                no handler was available to call.
        """

        method = getattr(self.session, method_name)
        if method is None:
            raise ValueError(f'Session does not have a method for "{method_name}".')

        error_handler = self._listeners.get(Event.ERROR)
        exception_handler = self._listeners.get(Event.NETWORK_EXCEPTION)

        try:
            response = method(**req_args)
        except Exception as exception:
            if exception_handler is not None:
                exception_handler(exception, method_name, req_args)  # type: ignore

                return None

            raise exception

        if response.status_code == 200:
            return response.json()

        if error_handler is not None:
            error_handler(response, method_name, req_args)  # type: ignore

            # maybe this could return CapturedError?
            return None

        raise RuntimeError(
            f"{method_name.upper()} request with data {req_args} failed"
            f" with no error or exception handler: {response.status_code} -> {response.text}"
        )

    def _notify(self, event: Event, *data: Any) -> None:
        """Notifies listeners of an event.

        Args:
            event: The event to notify for.
            *data: Any arguments passed to the listeners.
        """

        callback = self._listeners.get(event)
        if callback is None:
            return

        callback(*data)

    def _loop(self) -> None:
        """The main event loop for a chatroom"""

        ids = [msg.uid for msg in self.messages]
        self._last_get_time = epoch()

        while not self._is_stopped:
            if self.active_channel is None:
                sleep(self.interval)
                continue

            # We need to assign to a temporary
            # variable, otherwise messages can
            # get stuck between setting & getting.
            previous = self._last_get_time
            self._last_get_time = epoch()
            messages = self.get_since(previous)

            if messages is not None:
                # there is no good way to type these
                messages.sort(key=lambda msg: msg.send_time)

                for message in messages:
                    # This is needed to avoid duplicates
                    if message.uid in ids:
                        continue

                    self.messages.append(message)
                    ids.append(message.uid)

                    if message.message_type == "delete":
                        self._notify(Event.MSG_DEL, message)

                    elif message.message_type == "system":
                        self._notify(Event.MSG_SYS, message)

                    elif message.message_type == "system-silent":
                        self._notify(Event.MSG_SYS_SILENT, message)

                    else:
                        self._notify(Event.MSG_NEW, message)

            sleep(self.interval)

    def _run(self) -> None:
        """Runs monitoring loop"""

        self._is_looping = True
        self.event_thread.start()
        self._update_thread_name()

    def _update_channels(self, channels: list[Channel] | None = None) -> None:
        """Updates channels available to the user."""

        assert self.username, "Please log in before getting channels!"

        if channels is None:
            channels = self.get_channels()
            if channels is None:
                return

        for channel in channels:
            if channel not in self.channels:
                self.channels.append(channel)

        if self.active_channel is None and len(self.channels) > 0:
            self.active_channel = self.channels[0]

    def _update_thread_name(self) -> None:
        """Sets self.event_thread.name."""

        self.event_thread.name = f'Chatroom(uid="{self.uid}")'

    def _get_messages(
        self,
        method: str,
        channel: Channel | None = None,
        count: str | None = None,
        time: str | None = None,
    ) -> list[Message] | None:
        """Gets messages by time (since) or count.

        Args:
            method: `time` or `count`.
            channel: The channel to get messages from.
            count: How many messages to get. Only used when `method=="count"`.
            time: The time to get messages since.
        """

        if channel is not None:
            self.active_channel = channel

        elif self.active_channel is None:
            raise ValueError(
                "Please use either the Chatroom.set_channel() function"
                + " or provide `channel` as a non-null value!"
            )

        else:
            channel = self.active_channel

        headers = {
            "get-method": method,
            "username": self.username,
            "channelID": channel.uid,
            "count": count,
            "time": time,
        }

        messages: list[dict[str, Any]] | None = self._request(
            "get",
            url=self.endpoints.messages,
            headers=headers,
        )

        if messages is None:
            # Getting messages failed, but error was captured
            return None

        instances = []
        for message in messages:
            if not message["type"].startswith("system"):
                message["data"] = self._decrypt(message["data"])

            instances.append(Message.from_dict(message))

        return instances

    @staticmethod
    def _encrypt(message: bytes) -> str:
        """Encrypts the given message.

        Note:
            As encryption is currently not supported by the server, all this function does
            is b64encode the given string.

        Args:
            message: Text to encrypt.

        Returns:
            The encrypted text.
        """

        return b64encode(message).decode("ascii")

    @staticmethod
    def _decrypt(message: bytes) -> str:
        """Decrypts the given message.

        Note:
            As encryption is currently not supported by the server, all this function does
            is b64decode the given string.

        Args:
            message: Text to decrypt.

        Returns:
            The decrypted text.
        """

        return b64decode(message).decode("ascii")

    def initialize_from_response(self, response: dict) -> None:
        """Initializes data of chatroom from a response dict.

        Args:
            response: A dictionary of Teahaz server response.
        """

        self.name = response["chatroom_name"]
        self.uid = response["chatroomID"]
        self.username = response["users"][0]["username"]

        assert self.uid is not None
        self.endpoints.uid = self.uid
        self._update_thread_name()

        self._update_channels(
            [Channel.from_dict(channel) for channel in response["channels"]]
        )
        self._is_server_side = True

    def subscribe(self, event: Event, callback: EventCallback) -> None:
        """Start listening for and event and run callback when it occurs.

        Sideeffect:
            This method will call `self._run()` if the event passed is not an
            Error or NetworkException.
        """

        self._listeners[event] = callback

        if not self._is_looping and not event in [Event.ERROR, Event.NETWORK_EXCEPTION]:
            self._run()

    def stop(self) -> None:
        """Stops event loop."""

        self._is_stopped = True

    def create(self, username: str, password: str) -> Chatroom | None:
        """Creates a new chatroom on the server.

        Args:
            username: The owner account's username.
            password: The owner account's password.

        Returns:
            This chatroom, logged into the given owner account.
        """

        data = {
            "chatroom-name": self.name,
            "username": username,
            "password": password,
        }

        response = self._request(
            "post",
            headers={"Content-Type": "application/json"},
            url=self.endpoints.chatroom,
            json=data,
        )

        if response is None:
            # Creation did not succeed, but error was captured
            return None

        self.initialize_from_response(response)
        return self

    def create_channel(self, name: str) -> Channel | None:
        """Creates a channel.

        Args:
            name: The display name for the new channel.

        Returns:
            The new channel object in case of success, None otherwise.
        """

        data = {
            "username": self.username,
            "channel-name": name,
            "permissions": [{"classID": "1", "r": True, "w": True, "x": False}],
        }

        response = self._request("post", url=self.endpoints.channels, json=data)

        if response is None:
            # Creation of chatroom failed, but error was captured
            return None

        channel = Channel.from_dict(response)
        self._update_channels([channel])

        return channel

    def create_invite(
        self, uses: int = 1, expiration_time: float | None = None
    ) -> Invite | None:
        """Creates an invite to this chatroom.

        Args:
            uses: How many times this invite can be used.
            expiration_time: Epoch float describing when the invite will no
                longer be valid.

        Returns:
            Invite object in case of success, None otherwise.
        """

        headers = {
            "username": self.username,
        }

        if uses is not None:
            headers["uses"] = str(uses)

        if expiration_time is not None:
            headers["expiration-time"] = str(expiration_time)

        response = self._request("get", url=self.endpoints.invites, headers=headers)

        if response is None:
            return None

        response["url"] = self.url
        response["chatroomID"] = self.uid
        return Invite.from_dict(response)

    def create_from_invite(
        self, invite: Invite, username: str, password: str
    ) -> Chatroom | None:
        """Initializes chatroom from an invite.

        Args:
            invite: The invite object to use.
            username: The username of the account registered using this invite.
            password: The password of the account registered using this invite.

        Returns:
            This chatroom instance logged into the user on success, None otherwise.
        """

        data = {
            "inviteID": invite.uid,
            "username": username,
            "password": password,
        }

        response = self._request("post", url=self.endpoints.invites, json=data)

        if response is None:
            return None

        self.initialize_from_response(response)

        return self

    def get_users(self) -> list[User] | None:
        """Gets all users in a chatroom.

        Returns:
            A list of User instances on success, None otherwise.
        """

        users = self._request(
            "get",
            url=self.endpoints.users,
            headers={"username": self.username},
        )

        if users is None:
            # Getting users failed, but error was captured
            return None

        return [User.from_dict(user) for user in users]

    def get_channels(self) -> list[Channel] | None:
        """Gets all channels the logged-in user has access to.

        Returns:
            A list of Channel instances on success, None otherwise.
        """

        channels = self._request(
            "get",
            url=self.endpoints.channels,
            headers={"username": self.username},
        )

        if channels is None:
            # Getting channels failed, but error was captured
            return None

        return [Channel.from_dict(channel) for channel in channels]

    def login(self, username: str, password: str) -> requests.Response | None:
        """Logs into the chatroom with given credentials.

        Args:
            username: Username to log into.
            password: Password to log in with.

        Returns:
            Raw response for some reason.
        """

        data = {
            "username": username,
            "password": password,
        }

        response = self._request(
            "post",
            url=self.endpoints.login,
            json=data,
        )

        if response is None:
            return None

        self.username = username
        self._update_channels()
        self._is_server_side = True

        return response

    def get_since(
        self, since: float, channel: Channel | None = None
    ) -> list[Message] | None:
        """Gets messages since provided timestamp.

        Args:
            since: UNIX epoch timestamp from which to get messages.
            channel: The optional channel to filter messages by.

        Returns:
            A list of messages on success, None otherwise.
        """

        return self._get_messages(
            "since",
            channel,
            time=str(since),
        )

    def get_count(
        self, count: int, channel: Channel | None = None
    ) -> list[Message] | None:
        """Gets a certain count of messages sent since provided timestamp.

        Args:
            count: The maximum amount of messages returned.
            channel: The channel to filter messages by.

        Returns:
            A list of messages on success, None otherwise.
        """

        return self._get_messages(
            "count",
            channel,
            str(count),
        )

    def send(
        self,
        content: Union[str, bytes],
        channel: Channel | None = None,
        reply_id: str | None = None,
    ) -> Message | None:
        """Sends a message.

        Args:
            content: Message data. Currently only `str` is supported.
            channel: The channel to send the message on. Defaults to self.active_channel.
            reply_id: The optional id of the messages this one will reply to.

        Returns:
            None. The outgoing message can be acquired by subscribing to `Event.MSG_SENT`.

        Raises:
            ValueError: No channel was passed, and self.active_channel is None.

        Sideeffect:
            This changes self.active_channel to the provided one, if it isn't None.
        """

        msg = {}
        if channel is not None:
            self.active_channel = channel

        elif self.active_channel is None:
            raise ValueError(
                "No active channel set. Please use either the Chatroom.set_channel() function"
                + " or provide `channel` as a non-null value!"
            )

        if isinstance(content, bytes):
            endpoint = self.endpoints.files
        else:
            endpoint = self.endpoints.messages

        if isinstance(content, str):
            content = content.encode("ascii")

        msg = {
            "username": self.username,
            "channelID": self.active_channel.uid,
            "replyID": reply_id,
            "data": self._encrypt(content),
        }

        sent = self._request(
            "post",
            url=endpoint,
            json=msg,
        )

        if sent is not None:
            sent["data"] = content.decode("ascii")
            message_out = Message.from_dict(sent)
            self._notify(Event.MSG_SENT, message_out)

            return message_out

        return None

The object to deal with all chatroom-related API actions.

#   Chatroom( url: str, uid: str | None = None, name: str | None = None, session: requests.sessions.Session | None = None )
View Source
    def __init__(
        self,
        url: str,
        uid: str | None = None,
        name: str | None = None,
        session: requests.Session | None = None,
    ) -> None:
        """Initializes chatroom.

        Args:
            url: The chatroom's server URL:PORT.
            uid: The chatroom's UUID.
            name: The display name of the chatroom.
            session: Session that should be used by this chatroom.

        Every argument except URL is optional. This object should usually
        be instanced within the module, not by outside code.
        """

        self.uid = uid
        self.url = url
        self.name = name
        self.interval = 1

        self.username: str | None = None
        self.session = session or requests.Session()
        self.active_channel: Channel | None = None
        self.channels: list[Channel] = []

        self.event_thread = Thread(target=self._loop)

        # If the chatroom doesn't exist yet its endpoints' uid
        # is only filled in the create() method
        self.endpoints = EndpointContainer(self.url, self.uid)

        self.messages: list[Message] = []

        self._listeners: dict[Event, EventCallback] = {}
        self._is_looping: bool = False
        self._is_stopped: bool = False
        self._is_server_side: bool = False
        self._last_get_time: float = epoch()

Initializes chatroom.

Args
  • url: The chatroom's server URL:PORT.
  • uid: The chatroom's UUID.
  • name: The display name of the chatroom.
  • session: Session that should be used by this chatroom.

Every argument except URL is optional. This object should usually be instanced within the module, not by outside code.

#   def initialize_from_response(self, response: dict) -> None:
View Source
    def initialize_from_response(self, response: dict) -> None:
        """Initializes data of chatroom from a response dict.

        Args:
            response: A dictionary of Teahaz server response.
        """

        self.name = response["chatroom_name"]
        self.uid = response["chatroomID"]
        self.username = response["users"][0]["username"]

        assert self.uid is not None
        self.endpoints.uid = self.uid
        self._update_thread_name()

        self._update_channels(
            [Channel.from_dict(channel) for channel in response["channels"]]
        )
        self._is_server_side = True

Initializes data of chatroom from a response dict.

Args
  • response: A dictionary of Teahaz server response.
#   def subscribe( self, event: teahaz.client.Event, callback: Union[Callable[[teahaz.dataclasses.Message], Any], Callable[[requests.models.Response, str, Dict[str, Any]], Any]] ) -> None:
View Source
    def subscribe(self, event: Event, callback: EventCallback) -> None:
        """Start listening for and event and run callback when it occurs.

        Sideeffect:
            This method will call `self._run()` if the event passed is not an
            Error or NetworkException.
        """

        self._listeners[event] = callback

        if not self._is_looping and not event in [Event.ERROR, Event.NETWORK_EXCEPTION]:
            self._run()

Start listening for and event and run callback when it occurs.

Sideeffect

This method will call self._run() if the event passed is not an Error or NetworkException.

#   def stop(self) -> None:
View Source
    def stop(self) -> None:
        """Stops event loop."""

        self._is_stopped = True

Stops event loop.

#   def create(self, username: str, password: str) -> teahaz.client.Chatroom | None:
View Source
    def create(self, username: str, password: str) -> Chatroom | None:
        """Creates a new chatroom on the server.

        Args:
            username: The owner account's username.
            password: The owner account's password.

        Returns:
            This chatroom, logged into the given owner account.
        """

        data = {
            "chatroom-name": self.name,
            "username": username,
            "password": password,
        }

        response = self._request(
            "post",
            headers={"Content-Type": "application/json"},
            url=self.endpoints.chatroom,
            json=data,
        )

        if response is None:
            # Creation did not succeed, but error was captured
            return None

        self.initialize_from_response(response)
        return self

Creates a new chatroom on the server.

Args
  • username: The owner account's username.
  • password: The owner account's password.
Returns

This chatroom, logged into the given owner account.

#   def create_channel(self, name: str) -> teahaz.dataclasses.Channel | None:
View Source
    def create_channel(self, name: str) -> Channel | None:
        """Creates a channel.

        Args:
            name: The display name for the new channel.

        Returns:
            The new channel object in case of success, None otherwise.
        """

        data = {
            "username": self.username,
            "channel-name": name,
            "permissions": [{"classID": "1", "r": True, "w": True, "x": False}],
        }

        response = self._request("post", url=self.endpoints.channels, json=data)

        if response is None:
            # Creation of chatroom failed, but error was captured
            return None

        channel = Channel.from_dict(response)
        self._update_channels([channel])

        return channel

Creates a channel.

Args
  • name: The display name for the new channel.
Returns

The new channel object in case of success, None otherwise.

#   def create_invite( self, uses: int = 1, expiration_time: float | None = None ) -> teahaz.dataclasses.Invite | None:
View Source
    def create_invite(
        self, uses: int = 1, expiration_time: float | None = None
    ) -> Invite | None:
        """Creates an invite to this chatroom.

        Args:
            uses: How many times this invite can be used.
            expiration_time: Epoch float describing when the invite will no
                longer be valid.

        Returns:
            Invite object in case of success, None otherwise.
        """

        headers = {
            "username": self.username,
        }

        if uses is not None:
            headers["uses"] = str(uses)

        if expiration_time is not None:
            headers["expiration-time"] = str(expiration_time)

        response = self._request("get", url=self.endpoints.invites, headers=headers)

        if response is None:
            return None

        response["url"] = self.url
        response["chatroomID"] = self.uid
        return Invite.from_dict(response)

Creates an invite to this chatroom.

Args
  • uses: How many times this invite can be used.
  • expiration_time: Epoch float describing when the invite will no longer be valid.
Returns

Invite object in case of success, None otherwise.

#   def create_from_invite( self, invite: teahaz.dataclasses.Invite, username: str, password: str ) -> teahaz.client.Chatroom | None:
View Source
    def create_from_invite(
        self, invite: Invite, username: str, password: str
    ) -> Chatroom | None:
        """Initializes chatroom from an invite.

        Args:
            invite: The invite object to use.
            username: The username of the account registered using this invite.
            password: The password of the account registered using this invite.

        Returns:
            This chatroom instance logged into the user on success, None otherwise.
        """

        data = {
            "inviteID": invite.uid,
            "username": username,
            "password": password,
        }

        response = self._request("post", url=self.endpoints.invites, json=data)

        if response is None:
            return None

        self.initialize_from_response(response)

        return self

Initializes chatroom from an invite.

Args
  • invite: The invite object to use.
  • username: The username of the account registered using this invite.
  • password: The password of the account registered using this invite.
Returns

This chatroom instance logged into the user on success, None otherwise.

#   def get_users(self) -> list[teahaz.dataclasses.User] | None:
View Source
    def get_users(self) -> list[User] | None:
        """Gets all users in a chatroom.

        Returns:
            A list of User instances on success, None otherwise.
        """

        users = self._request(
            "get",
            url=self.endpoints.users,
            headers={"username": self.username},
        )

        if users is None:
            # Getting users failed, but error was captured
            return None

        return [User.from_dict(user) for user in users]

Gets all users in a chatroom.

Returns

A list of User instances on success, None otherwise.

#   def get_channels(self) -> list[teahaz.dataclasses.Channel] | None:
View Source
    def get_channels(self) -> list[Channel] | None:
        """Gets all channels the logged-in user has access to.

        Returns:
            A list of Channel instances on success, None otherwise.
        """

        channels = self._request(
            "get",
            url=self.endpoints.channels,
            headers={"username": self.username},
        )

        if channels is None:
            # Getting channels failed, but error was captured
            return None

        return [Channel.from_dict(channel) for channel in channels]

Gets all channels the logged-in user has access to.

Returns

A list of Channel instances on success, None otherwise.

#   def login( self, username: str, password: str ) -> requests.models.Response | None:
View Source
    def login(self, username: str, password: str) -> requests.Response | None:
        """Logs into the chatroom with given credentials.

        Args:
            username: Username to log into.
            password: Password to log in with.

        Returns:
            Raw response for some reason.
        """

        data = {
            "username": username,
            "password": password,
        }

        response = self._request(
            "post",
            url=self.endpoints.login,
            json=data,
        )

        if response is None:
            return None

        self.username = username
        self._update_channels()
        self._is_server_side = True

        return response

Logs into the chatroom with given credentials.

Args
  • username: Username to log into.
  • password: Password to log in with.
Returns

Raw response for some reason.

#   def get_since( self, since: float, channel: teahaz.dataclasses.Channel | None = None ) -> list[teahaz.dataclasses.Message] | None:
View Source
    def get_since(
        self, since: float, channel: Channel | None = None
    ) -> list[Message] | None:
        """Gets messages since provided timestamp.

        Args:
            since: UNIX epoch timestamp from which to get messages.
            channel: The optional channel to filter messages by.

        Returns:
            A list of messages on success, None otherwise.
        """

        return self._get_messages(
            "since",
            channel,
            time=str(since),
        )

Gets messages since provided timestamp.

Args
  • since: UNIX epoch timestamp from which to get messages.
  • channel: The optional channel to filter messages by.
Returns

A list of messages on success, None otherwise.

#   def get_count( self, count: int, channel: teahaz.dataclasses.Channel | None = None ) -> list[teahaz.dataclasses.Message] | None:
View Source
    def get_count(
        self, count: int, channel: Channel | None = None
    ) -> list[Message] | None:
        """Gets a certain count of messages sent since provided timestamp.

        Args:
            count: The maximum amount of messages returned.
            channel: The channel to filter messages by.

        Returns:
            A list of messages on success, None otherwise.
        """

        return self._get_messages(
            "count",
            channel,
            str(count),
        )

Gets a certain count of messages sent since provided timestamp.

Args
  • count: The maximum amount of messages returned.
  • channel: The channel to filter messages by.
Returns

A list of messages on success, None otherwise.

#   def send( self, content: Union[str, bytes], channel: teahaz.dataclasses.Channel | None = None, reply_id: str | None = None ) -> teahaz.dataclasses.Message | None:
View Source
    def send(
        self,
        content: Union[str, bytes],
        channel: Channel | None = None,
        reply_id: str | None = None,
    ) -> Message | None:
        """Sends a message.

        Args:
            content: Message data. Currently only `str` is supported.
            channel: The channel to send the message on. Defaults to self.active_channel.
            reply_id: The optional id of the messages this one will reply to.

        Returns:
            None. The outgoing message can be acquired by subscribing to `Event.MSG_SENT`.

        Raises:
            ValueError: No channel was passed, and self.active_channel is None.

        Sideeffect:
            This changes self.active_channel to the provided one, if it isn't None.
        """

        msg = {}
        if channel is not None:
            self.active_channel = channel

        elif self.active_channel is None:
            raise ValueError(
                "No active channel set. Please use either the Chatroom.set_channel() function"
                + " or provide `channel` as a non-null value!"
            )

        if isinstance(content, bytes):
            endpoint = self.endpoints.files
        else:
            endpoint = self.endpoints.messages

        if isinstance(content, str):
            content = content.encode("ascii")

        msg = {
            "username": self.username,
            "channelID": self.active_channel.uid,
            "replyID": reply_id,
            "data": self._encrypt(content),
        }

        sent = self._request(
            "post",
            url=endpoint,
            json=msg,
        )

        if sent is not None:
            sent["data"] = content.decode("ascii")
            message_out = Message.from_dict(sent)
            self._notify(Event.MSG_SENT, message_out)

            return message_out

        return None

Sends a message.

Args
  • content: Message data. Currently only str is supported.
  • channel: The channel to send the message on. Defaults to self.active_channel.
  • reply_id: The optional id of the messages this one will reply to.
Returns

None. The outgoing message can be acquired by subscribing to Event.MSG_SENT.

Raises
  • ValueError: No channel was passed, and self.active_channel is None.
Sideeffect

This changes self.active_channel to the provided one, if it isn't None.