Module openagents.DiskWriter

Expand source code
import asyncio
import struct

class DiskWriter: 
    """
    A class to write data to a stream.
    """

    def __init__(self,writeQueue: asyncio.Queue, res):
        self.writeQueue = writeQueue
        self.res = res

    async def write(self, data: bytes) -> None:
        """
        Write data to the stream.

        Args:
            data (bytes): The data to write.
        """
        self.writeQueue.put_nowait(data)

    async def writeInt(self, data: int)-> None:
        """
        Write an integer to the stream.
        Note: The integer will be written in big-endian format.

        Args:
            data (int): The integer to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(4, byteorder='big'))

    async def writeUTF8(self, data: str)-> None:
        """
        Write a UTF-8 string to the stream.

        Args:
            data (str): The string to write.
        """
        self.writeQueue.put_nowait(len(data).to_bytes(4, byteorder='big'))
        self.writeQueue.put_nowait(data.encode("utf-8"))
    
    async def writeFloat(self, data: float)-> None:
        """
        Write a float to the stream.
        Note: The float will be written in big-endian format.

        Args:
            data (float): The float to write.
        """
        self.writeQueue.put_nowait(bytearray(struct.pack(">f", data)))

    async def writeDouble(self, data: float)-> None:
        """
        Write a double to the stream.
        Note: The double will be written in big-endian format.

        Args:
            data (float): The double to write.
        """
        self.writeQueue.put_nowait(bytearray(struct.pack(">d", data)))

    async def writeBool(self, data: bool)-> None:
        """
        Write a boolean to the stream.

        Args:
            data (bool): The boolean to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(1, byteorder='big'))
    
    async def writeByte(self, data: int)-> None:
        """
        Write a byte to the stream.

        Args:
            data (int): The byte to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(1, byteorder='big'))

    async def writeShort(self, data: int)-> None:
        """
        Write a short to the stream.
        Note: The short will be written in big-endian format.

        Args:
            data (int): The short to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(2, byteorder='big'))
    
    async def writeLong(self, data: int)-> None:
        """
        Write a long to the stream.
        Note: The long will be written in big-endian format.    

        Args:
            data (int): The long to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(8, byteorder='big'))
    
    async def end(self) -> None:
        """
        End the stream.
        """
        self.writeQueue.put_nowait(None)
        
    async def close(self) -> bool:
        """
        End and close the stream.

        Returns:
            bool: True if the stream was successfully closed, False otherwise.
        """
        self.writeQueue.put_nowait(None)
        res = await self.res
        return res.success

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

Classes

class DiskWriter (writeQueue: asyncio.queues.Queue, res)

A class to write data to a stream.

Expand source code
class DiskWriter: 
    """
    A class to write data to a stream.
    """

    def __init__(self,writeQueue: asyncio.Queue, res):
        self.writeQueue = writeQueue
        self.res = res

    async def write(self, data: bytes) -> None:
        """
        Write data to the stream.

        Args:
            data (bytes): The data to write.
        """
        self.writeQueue.put_nowait(data)

    async def writeInt(self, data: int)-> None:
        """
        Write an integer to the stream.
        Note: The integer will be written in big-endian format.

        Args:
            data (int): The integer to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(4, byteorder='big'))

    async def writeUTF8(self, data: str)-> None:
        """
        Write a UTF-8 string to the stream.

        Args:
            data (str): The string to write.
        """
        self.writeQueue.put_nowait(len(data).to_bytes(4, byteorder='big'))
        self.writeQueue.put_nowait(data.encode("utf-8"))
    
    async def writeFloat(self, data: float)-> None:
        """
        Write a float to the stream.
        Note: The float will be written in big-endian format.

        Args:
            data (float): The float to write.
        """
        self.writeQueue.put_nowait(bytearray(struct.pack(">f", data)))

    async def writeDouble(self, data: float)-> None:
        """
        Write a double to the stream.
        Note: The double will be written in big-endian format.

        Args:
            data (float): The double to write.
        """
        self.writeQueue.put_nowait(bytearray(struct.pack(">d", data)))

    async def writeBool(self, data: bool)-> None:
        """
        Write a boolean to the stream.

        Args:
            data (bool): The boolean to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(1, byteorder='big'))
    
    async def writeByte(self, data: int)-> None:
        """
        Write a byte to the stream.

        Args:
            data (int): The byte to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(1, byteorder='big'))

    async def writeShort(self, data: int)-> None:
        """
        Write a short to the stream.
        Note: The short will be written in big-endian format.

        Args:
            data (int): The short to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(2, byteorder='big'))
    
    async def writeLong(self, data: int)-> None:
        """
        Write a long to the stream.
        Note: The long will be written in big-endian format.    

        Args:
            data (int): The long to write.
        """
        self.writeQueue.put_nowait(data.to_bytes(8, byteorder='big'))
    
    async def end(self) -> None:
        """
        End the stream.
        """
        self.writeQueue.put_nowait(None)
        
    async def close(self) -> bool:
        """
        End and close the stream.

        Returns:
            bool: True if the stream was successfully closed, False otherwise.
        """
        self.writeQueue.put_nowait(None)
        res = await self.res
        return res.success

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

Methods

async def close(self) ‑> bool

End and close the stream.

Returns

bool
True if the stream was successfully closed, False otherwise.
Expand source code
async def close(self) -> bool:
    """
    End and close the stream.

    Returns:
        bool: True if the stream was successfully closed, False otherwise.
    """
    self.writeQueue.put_nowait(None)
    res = await self.res
    return res.success
async def end(self) ‑> None

End the stream.

Expand source code
async def end(self) -> None:
    """
    End the stream.
    """
    self.writeQueue.put_nowait(None)
async def write(self, data: bytes) ‑> None

Write data to the stream.

Args

data : bytes
The data to write.
Expand source code
async def write(self, data: bytes) -> None:
    """
    Write data to the stream.

    Args:
        data (bytes): The data to write.
    """
    self.writeQueue.put_nowait(data)
async def writeBool(self, data: bool) ‑> None

Write a boolean to the stream.

Args

data : bool
The boolean to write.
Expand source code
async def writeBool(self, data: bool)-> None:
    """
    Write a boolean to the stream.

    Args:
        data (bool): The boolean to write.
    """
    self.writeQueue.put_nowait(data.to_bytes(1, byteorder='big'))
async def writeByte(self, data: int) ‑> None

Write a byte to the stream.

Args

data : int
The byte to write.
Expand source code
async def writeByte(self, data: int)-> None:
    """
    Write a byte to the stream.

    Args:
        data (int): The byte to write.
    """
    self.writeQueue.put_nowait(data.to_bytes(1, byteorder='big'))
async def writeDouble(self, data: float) ‑> None

Write a double to the stream. Note: The double will be written in big-endian format.

Args

data : float
The double to write.
Expand source code
async def writeDouble(self, data: float)-> None:
    """
    Write a double to the stream.
    Note: The double will be written in big-endian format.

    Args:
        data (float): The double to write.
    """
    self.writeQueue.put_nowait(bytearray(struct.pack(">d", data)))
async def writeFloat(self, data: float) ‑> None

Write a float to the stream. Note: The float will be written in big-endian format.

Args

data : float
The float to write.
Expand source code
async def writeFloat(self, data: float)-> None:
    """
    Write a float to the stream.
    Note: The float will be written in big-endian format.

    Args:
        data (float): The float to write.
    """
    self.writeQueue.put_nowait(bytearray(struct.pack(">f", data)))
async def writeInt(self, data: int) ‑> None

Write an integer to the stream. Note: The integer will be written in big-endian format.

Args

data : int
The integer to write.
Expand source code
async def writeInt(self, data: int)-> None:
    """
    Write an integer to the stream.
    Note: The integer will be written in big-endian format.

    Args:
        data (int): The integer to write.
    """
    self.writeQueue.put_nowait(data.to_bytes(4, byteorder='big'))
async def writeLong(self, data: int) ‑> None

Write a long to the stream. Note: The long will be written in big-endian format.

Args

data : int
The long to write.
Expand source code
async def writeLong(self, data: int)-> None:
    """
    Write a long to the stream.
    Note: The long will be written in big-endian format.    

    Args:
        data (int): The long to write.
    """
    self.writeQueue.put_nowait(data.to_bytes(8, byteorder='big'))
async def writeShort(self, data: int) ‑> None

Write a short to the stream. Note: The short will be written in big-endian format.

Args

data : int
The short to write.
Expand source code
async def writeShort(self, data: int)-> None:
    """
    Write a short to the stream.
    Note: The short will be written in big-endian format.

    Args:
        data (int): The short to write.
    """
    self.writeQueue.put_nowait(data.to_bytes(2, byteorder='big'))
async def writeUTF8(self, data: str) ‑> None

Write a UTF-8 string to the stream.

Args

data : str
The string to write.
Expand source code
async def writeUTF8(self, data: str)-> None:
    """
    Write a UTF-8 string to the stream.

    Args:
        data (str): The string to write.
    """
    self.writeQueue.put_nowait(len(data).to_bytes(4, byteorder='big'))
    self.writeQueue.put_nowait(data.encode("utf-8"))