Source code for defectio.models.message

from __future__ import annotations

import asyncio
import io
from typing import Optional
from typing import TYPE_CHECKING
from typing import Union

from defectio.models.user import PartialUser

from .embed import Embed
from .abc import Messageable
from .mixins import Hashable


if TYPE_CHECKING:
    from ..state import ConnectionState
    from ..types.payloads import MessagePayload, AttachmentPayload
    from ..types.websocket import MessageUpdate
    from .channel import MessageableChannel


class Attachment:
    def __init__(self, state: ConnectionState, data: AttachmentPayload):
        self._state: ConnectionState = state
        self.id = data.get("_id")
        self.tag = data.get("tag")
        self.size = data.get("size")
        self.filename = data.get("filename")
        self.content_type = data.get("content_type")
        self.metadata_type = data.get("metadata").get("type")

    @property
    def url(self) -> str:
        base_url = self._state.api_info["features"]["autumn"]["url"]

        return f"{base_url}/{self.tag}/{self.id}"


class Reply:
    def __init__(self, message: Message, mention: Optional[bool] = True):
        self.message: Message = message
        self.mention: Optional[bool] = mention

    def __repr__(self):
        return "<Reply message={0!r} mention={1}>".format(self.message, self.mention)


class File:
    """Respresents a file about to be uploaded to revolt

    Parameters
    -----------
    file: Union[str, bytes]
        The name of the file or the content of the file in bytes, text files will be need to be encoded
    filename: Optional[str]
        The filename of the file when being uploaded, this will default to the name of the file if one exists
    spoiler: bool
        Determines if the file will be a spoiler, this prefexes the filename with `SPOILER_`
    """

    def __init__(
        self,
        file: Union[str, bytes],
        *,
        filename: Optional[str] = None,
        spoiler: bool = False,
    ):
        if isinstance(file, str):
            self.f = open(file, "rb")
        elif isinstance(file, bytes):
            self.f = io.BytesIO(file)

        if filename is None and isinstance(file, str):
            filename = self.f.name

        if spoiler or (filename and filename.startswith("SPOILER_")):
            self.spoiler = True
        else:
            self.spoiler = False

        if self.spoiler and (filename and not filename.startswith("SPOILER_")):
            filename = f"SPOILER_{filename}"

        self.filename = filename


[docs]class Message(Hashable): def __init__( self, state: ConnectionState, channel: MessageableChannel, data: MessagePayload ): self._state: ConnectionState = state self.id = data.get("_id") self.channel = channel self.content = data.get("content") self.author_id = data.get("author") self.replies = [state.get_message(r) for r in data.get("replies", [])] self.attachments = [Attachment(state, a) for a in data.get("attachments", [])] self.embeds = [Embed.from_dict(e) for e in data.get("embeds", [])] def __repr__(self) -> str: name = self.__class__.__name__ return f"<{name} id={self.id} channel={self.channel!r} author={self.author!r}" @property def server(self) -> str: return self.channel.server @property def author(self) -> PartialUser: return self._state.get_user(self.author_id) or PartialUser(self.author_id) async def reply( self, content: str = None, *, file: Optional[File] = None, files: Optional[list[File]] = None, mention: bool = False, delete_after: int = None, nonce=None, ): return await self.channel.send( content, file=file, files=files, delete_after=delete_after, nonce=nonce, replies=[Reply(self, mention)], ) async def delete(self, *, delay: Optional[float] = None) -> None: if delay is not None: async def delete(delay: float): await asyncio.sleep(delay) await self._state.http.delete_message(self.channel.id, self.id) asyncio.create_task(delete(delay)) else: await self._state.http.delete_message(self.channel.id, self.id) async def edit(self, content: str) -> Message: await self._state.http.edit_message(self.channel.id, self.id, content=content) self.content = content return self def _update(self, data: MessageUpdate) -> None: if "content" in data["data"]: self.content = data.get("data").get("content")