Source code for defectio.models.message

from __future__ import annotations

import asyncio
import io
from typing import Optional
from typing import TYPE_CHECKING
from typing import Union

from defectio.models.user import PartialUser

from .embed import Embed
from .abc import Messageable
from .mixins import Hashable

    from ..state import ConnectionState
    from ..types.payloads import MessagePayload, AttachmentPayload
    from ..types.websocket import MessageUpdate
    from .channel import MessageableChannel

class Attachment:
    def __init__(self, state: ConnectionState, data: AttachmentPayload):
        self._state: ConnectionState = state = data.get("_id")
        self.tag = data.get("tag")
        self.size = data.get("size")
        self.filename = data.get("filename")
        self.content_type = data.get("content_type")
        self.metadata_type = data.get("metadata").get("type")

    def url(self) -> str:
        base_url = self._state.api_info["features"]["autumn"]["url"]

        return f"{base_url}/{self.tag}/{}"

class Reply:
    def __init__(self, message: Message, mention: Optional[bool] = True):
        self.message: Message = message
        self.mention: Optional[bool] = mention

    def __repr__(self):
        return "<Reply message={0!r} mention={1}>".format(self.message, self.mention)

class File:
    """Respresents a file about to be uploaded to revolt

    file: Union[str, bytes]
        The name of the file or the content of the file in bytes, text files will be need to be encoded
    filename: Optional[str]
        The filename of the file when being uploaded, this will default to the name of the file if one exists
    spoiler: bool
        Determines if the file will be a spoiler, this prefexes the filename with `SPOILER_`

    def __init__(
        file: Union[str, bytes],
        filename: Optional[str] = None,
        spoiler: bool = False,
        if isinstance(file, str):
            self.f = open(file, "rb")
        elif isinstance(file, bytes):
            self.f = io.BytesIO(file)

        if filename is None and isinstance(file, str):
            filename =

        if spoiler or (filename and filename.startswith("SPOILER_")):
            self.spoiler = True
            self.spoiler = False

        if self.spoiler and (filename and not filename.startswith("SPOILER_")):
            filename = f"SPOILER_{filename}"

        self.filename = filename

[docs]class Message(Hashable): def __init__( self, state: ConnectionState, channel: MessageableChannel, data: MessagePayload ): self._state: ConnectionState = state = data.get("_id") = channel self.content = data.get("content") self.author_id = data.get("author") self.replies = [state.get_message(r) for r in data.get("replies", [])] self.attachments = [Attachment(state, a) for a in data.get("attachments", [])] self.embeds = [Embed.from_dict(e) for e in data.get("embeds", [])] def __repr__(self) -> str: name = self.__class__.__name__ return f"<{name} id={} channel={!r} author={!r}" @property def server(self) -> str: return @property def author(self) -> PartialUser: return self._state.get_user(self.author_id) or PartialUser(self.author_id) async def reply( self, content: str = None, *, file: Optional[File] = None, files: Optional[list[File]] = None, mention: bool = False, delete_after: int = None, nonce=None, ): return await content, file=file, files=files, delete_after=delete_after, nonce=nonce, replies=[Reply(self, mention)], ) async def delete(self, *, delay: Optional[float] = None) -> None: if delay is not None: async def delete(delay: float): await asyncio.sleep(delay) await self._state.http.delete_message(, asyncio.create_task(delete(delay)) else: await self._state.http.delete_message(, async def edit(self, content: str) -> Message: await self._state.http.edit_message(,, content=content) self.content = content return self def _update(self, data: MessageUpdate) -> None: if "content" in data["data"]: self.content = data.get("data").get("content")