Module openagents.DiskReader
Expand source code
import asyncio
import struct
class DiskReader:
"""
A reader for reading data from a stream.
"""
def __init__(self, chunksQueue: asyncio.Queue, req):
self.chunksQueue = chunksQueue
self.buffer = bytearray()
self.req = req
async def read(self, n = 1) -> bytes:
"""
Read n bytes from the stream.
Args:
n (int): The number of bytes to read. Defaults to 1.
Returns:
bytes: The bytes read from the stream.
"""
while len(self.buffer) < n:
v = await self.chunksQueue.get()
if v is None: break
self.buffer.extend(v)
result, self.buffer = self.buffer[:n], self.buffer[n:]
return result
async def readInt(self) -> int:
"""
Read an integer from the stream.
Note: The integer must be written in big-endian format.
Returns:
int: The integer read from the stream.
"""
return int.from_bytes(await self.read(4), byteorder='big')
async def readUTF8(self) -> str:
"""
Read a UTF-8 string from the stream.
Returns:
str: The string read from the stream.
"""
length = await self.readInt()
return (await self.read(length)).decode("utf-8")
async def readFloat(self) -> float:
"""
Read a float from the stream.
Note: The float must be written in big-endian format.
Returns:
float: The float read from the stream.
"""
return struct.unpack('>f', await self.read(4))[0]
async def readDouble(self) -> float:
"""
Read a double from the stream.
Note: The double must be written in big-endian format.
Returns:
float: The double read from the stream.
"""
return struct.unpack('>d', await self.read(8))[0]
async def readBool(self) -> bool:
"""
Read a boolean from the stream.
Returns:
bool: The boolean read from the stream.
"""
return bool(await self.read(1))
async def readByte(self) -> int:
"""
Read a byte from the stream.
Returns:
int: The byte read from the stream.
"""
return int.from_bytes(await self.read(1), byteorder='big')
async def readShort(self) -> int:
"""
Read a short from the stream.
Note: The short must be written in big-endian format.
Returns:
int: The short read from the stream.
"""
return int.from_bytes(await self.read(2), byteorder='big')
async def readLong(self) -> int:
"""
Read a long from the stream.
Note: The long must be written in big-endian format.
Returns:
int: The long read from the stream.
"""
return int.from_bytes(await self.read(8), byteorder='big')
async def close(self):
"""
Close the stream.
"""
self.chunksQueue.task_done()
return await self.req
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
Classes
class DiskReader (chunksQueue: asyncio.queues.Queue, req)
-
A reader for reading data from a stream.
Expand source code
class DiskReader: """ A reader for reading data from a stream. """ def __init__(self, chunksQueue: asyncio.Queue, req): self.chunksQueue = chunksQueue self.buffer = bytearray() self.req = req async def read(self, n = 1) -> bytes: """ Read n bytes from the stream. Args: n (int): The number of bytes to read. Defaults to 1. Returns: bytes: The bytes read from the stream. """ while len(self.buffer) < n: v = await self.chunksQueue.get() if v is None: break self.buffer.extend(v) result, self.buffer = self.buffer[:n], self.buffer[n:] return result async def readInt(self) -> int: """ Read an integer from the stream. Note: The integer must be written in big-endian format. Returns: int: The integer read from the stream. """ return int.from_bytes(await self.read(4), byteorder='big') async def readUTF8(self) -> str: """ Read a UTF-8 string from the stream. Returns: str: The string read from the stream. """ length = await self.readInt() return (await self.read(length)).decode("utf-8") async def readFloat(self) -> float: """ Read a float from the stream. Note: The float must be written in big-endian format. Returns: float: The float read from the stream. """ return struct.unpack('>f', await self.read(4))[0] async def readDouble(self) -> float: """ Read a double from the stream. Note: The double must be written in big-endian format. Returns: float: The double read from the stream. """ return struct.unpack('>d', await self.read(8))[0] async def readBool(self) -> bool: """ Read a boolean from the stream. Returns: bool: The boolean read from the stream. """ return bool(await self.read(1)) async def readByte(self) -> int: """ Read a byte from the stream. Returns: int: The byte read from the stream. """ return int.from_bytes(await self.read(1), byteorder='big') async def readShort(self) -> int: """ Read a short from the stream. Note: The short must be written in big-endian format. Returns: int: The short read from the stream. """ return int.from_bytes(await self.read(2), byteorder='big') async def readLong(self) -> int: """ Read a long from the stream. Note: The long must be written in big-endian format. Returns: int: The long read from the stream. """ return int.from_bytes(await self.read(8), byteorder='big') async def close(self): """ Close the stream. """ self.chunksQueue.task_done() return await self.req async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()
Methods
async def close(self)
-
Close the stream.
Expand source code
async def close(self): """ Close the stream. """ self.chunksQueue.task_done() return await self.req
async def read(self, n=1) ‑> bytes
-
Read n bytes from the stream.
Args
n
:int
- The number of bytes to read. Defaults to 1.
Returns
bytes
- The bytes read from the stream.
Expand source code
async def read(self, n = 1) -> bytes: """ Read n bytes from the stream. Args: n (int): The number of bytes to read. Defaults to 1. Returns: bytes: The bytes read from the stream. """ while len(self.buffer) < n: v = await self.chunksQueue.get() if v is None: break self.buffer.extend(v) result, self.buffer = self.buffer[:n], self.buffer[n:] return result
async def readBool(self) ‑> bool
-
Read a boolean from the stream.
Returns
bool
- The boolean read from the stream.
Expand source code
async def readBool(self) -> bool: """ Read a boolean from the stream. Returns: bool: The boolean read from the stream. """ return bool(await self.read(1))
async def readByte(self) ‑> int
-
Read a byte from the stream.
Returns
int
- The byte read from the stream.
Expand source code
async def readByte(self) -> int: """ Read a byte from the stream. Returns: int: The byte read from the stream. """ return int.from_bytes(await self.read(1), byteorder='big')
async def readDouble(self) ‑> float
-
Read a double from the stream. Note: The double must be written in big-endian format.
Returns
float
- The double read from the stream.
Expand source code
async def readDouble(self) -> float: """ Read a double from the stream. Note: The double must be written in big-endian format. Returns: float: The double read from the stream. """ return struct.unpack('>d', await self.read(8))[0]
async def readFloat(self) ‑> float
-
Read a float from the stream. Note: The float must be written in big-endian format.
Returns
float
- The float read from the stream.
Expand source code
async def readFloat(self) -> float: """ Read a float from the stream. Note: The float must be written in big-endian format. Returns: float: The float read from the stream. """ return struct.unpack('>f', await self.read(4))[0]
async def readInt(self) ‑> int
-
Read an integer from the stream. Note: The integer must be written in big-endian format.
Returns
int
- The integer read from the stream.
Expand source code
async def readInt(self) -> int: """ Read an integer from the stream. Note: The integer must be written in big-endian format. Returns: int: The integer read from the stream. """ return int.from_bytes(await self.read(4), byteorder='big')
async def readLong(self) ‑> int
-
Read a long from the stream. Note: The long must be written in big-endian format.
Returns
int
- The long read from the stream.
Expand source code
async def readLong(self) -> int: """ Read a long from the stream. Note: The long must be written in big-endian format. Returns: int: The long read from the stream. """ return int.from_bytes(await self.read(8), byteorder='big')
async def readShort(self) ‑> int
-
Read a short from the stream. Note: The short must be written in big-endian format.
Returns
int
- The short read from the stream.
Expand source code
async def readShort(self) -> int: """ Read a short from the stream. Note: The short must be written in big-endian format. Returns: int: The short read from the stream. """ return int.from_bytes(await self.read(2), byteorder='big')
async def readUTF8(self) ‑> str
-
Read a UTF-8 string from the stream.
Returns
str
- The string read from the stream.
Expand source code
async def readUTF8(self) -> str: """ Read a UTF-8 string from the stream. Returns: str: The string read from the stream. """ length = await self.readInt() return (await self.read(length)).decode("utf-8")