Module openagents.Disk

Expand source code
import asyncio
import grpc
import struct

from openagents_grpc_proto import rpc_pb2_grpc
from openagents_grpc_proto import rpc_pb2
from .DiskReader import DiskReader
from .DiskWriter import DiskWriter
from typing import List

class Disk:
    """
    A virtual p2p disk on the OpenAgents network.
    """

    def __init__(self, id: str, url: str, node):
        self.id = id
        self.url = url
        self.node = node
        self.closed = False
    
    async def list(self, prefix:str="/") -> list[str]:
        """
        List files in the disk with the given prefix.

        Args:
            prefix (str): The prefix to filter files. Defaults to "/".
        
        Returns:
            list[str]: A list of file paths.
        """
        client = self.node._getClient()
        files = await client.diskListFiles(rpc_pb2.RpcDiskListFilesRequest(diskId=self.id, path=prefix))
        return files.files
    
    async def delete(self, path:str) -> bool:
        """
        Delete a file from the disk.

        Args:
            path (str): The path of the file to delete.

        Returns:
            bool: True if the file was deleted successfully, False otherwise.
        """
        client = self.node._getClient()
        res = await client.diskDeleteFile(rpc_pb2.RpcDiskDeleteFileRequest(diskId=self.id, path=path))
        return res.success

    async def writeBytes(self, path:str, dataBytes:bytes, CHUNK_SIZE:int=1024*1024*15) -> bool:
        """
        Write bytes to a file on the disk.

        This method is deprecated. Use 
            async with disk.openwriteStream(path) as writer:
                await writer.write(data)
        instead.

        Args:
            path (str): The path of the file to write.
            dataBytes (bytes): The bytes to write.
            CHUNK_SIZE (int): The size of each chunk to write. Defaults to 1024*1024*15.

        Returns:
            bool: True if the bytes were written successfully, False otherwise.
        """
        client = self.node._getClient()
        def write_data():
            for j in range(0, len(dataBytes), CHUNK_SIZE):
                chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
                request = rpc_pb2.RpcDiskWriteFileRequest(diskId=str(self.id), path=path, data=chunk)
                yield request                              
        res=await client.diskWriteFile(write_data())
        return res.success


    async def openWriteStream(self, path:str, CHUNK_SIZE:int= 1024*1024*15) -> DiskWriter:
        """
        Open a stream for writing data to a file on the disk.

        Args:
            path (str): The path of the file to write.
            CHUNK_SIZE (int): The size of each chunk to write. Defaults to 1024*1024*15.

        Returns:
            DiskWriter: A writer for writing data to the stream.
        """
        client = self.node._getClient()
        writeQueue = asyncio.Queue()             
        async def write_data():
            while True:
                dataBytes = await writeQueue.get()
                if dataBytes is None:  # End of stream
                    break
                for j in range(0, len(dataBytes), CHUNK_SIZE):
                    chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
                    request = rpc_pb2.RpcDiskWriteFileRequest(diskId=str(self.id), path=path, data=chunk)
                    yield request
                writeQueue.task_done()
        res=client.diskWriteFile(write_data())
        return DiskWriter(writeQueue, res)

    
    async def openReadStream(self, path:str)-> DiskReader:
        """
        Open a stream for reading data from a file on the disk.

        Args:
            path (str): The path of the file to read.

        Returns:
            DiskReader: A reader for reading data from the stream.
        """

        client = self.node._getClient()
        readQueue = asyncio.Queue()
        async def read_data():
            async for chunk in client.diskReadFile(rpc_pb2.RpcDiskReadFileRequest(diskId=self.id, path=path)):
                readQueue.put_nowait(chunk.data)
        r = asyncio.create_task(read_data())
        return DiskReader(readQueue, r)

    async def readBytes(self, path:str):
        """
        Read bytes from a file on the disk.

        This method is deprecated. Use
            async with disk.openReadStream(path) as reader:
                data = await reader.read()

        Args:
            path (str): The path of the file to read.

        Returns:
            bytes: The bytes read from the file.
        """
        client = self.node._getClient()
        bytesOut = bytearray()
        async for chunk in client.diskReadFile(rpc_pb2.RpcDiskReadFileRequest(diskId=self.id, path=path)):
            bytesOut.extend(chunk.data)
        return bytesOut

    async def writeUTF8(self, path:str, data:str) -> bool:
        """
        Write a UTF-8 string to a file on the disk.

        This method is deprecated. Use
            async with disk.openWriteStream(path) as writer:
                await writer.writeUTF8(data)
        
        Args:
            path (str): The path of the file to write.
            data (str): The string to write.

        Returns:
            bool: True if the string was written successfully, False otherwise.
        """
        return await self.writeBytes(path, data.encode('utf-8'))

    async def readUTF8(self, path:str)-> str:
        """
        Read a UTF-8 string from a file on the disk.

        This method is deprecated. Use
            async with disk.openReadStream(path) as reader:
                data = await reader.readUTF8()

        Args:
            path (str): The path of the file to read.

        Returns:
            str: The string read from the file.
        """
        return (await self.readBytes(path)).decode('utf-8')


    
    async def close(self)-> None:
        """
        Close the disk.
        """
        if self.closed: return
        client = self.node._getClient()
        await client.closeDisk(rpc_pb2.RpcCloseDiskRequest(diskId=self.id))
        self.closed=True

    def getUrl(self)-> str:
        """
        Get the URL that can be used by any node to access the disk.

        Returns:
            str: The URL of the disk.
        """
        return self.url

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

Classes

class Disk (id: str, url: str, node)

A virtual p2p disk on the OpenAgents network.

Expand source code
class Disk:
    """
    A virtual p2p disk on the OpenAgents network.
    """

    def __init__(self, id: str, url: str, node):
        self.id = id
        self.url = url
        self.node = node
        self.closed = False
    
    async def list(self, prefix:str="/") -> list[str]:
        """
        List files in the disk with the given prefix.

        Args:
            prefix (str): The prefix to filter files. Defaults to "/".
        
        Returns:
            list[str]: A list of file paths.
        """
        client = self.node._getClient()
        files = await client.diskListFiles(rpc_pb2.RpcDiskListFilesRequest(diskId=self.id, path=prefix))
        return files.files
    
    async def delete(self, path:str) -> bool:
        """
        Delete a file from the disk.

        Args:
            path (str): The path of the file to delete.

        Returns:
            bool: True if the file was deleted successfully, False otherwise.
        """
        client = self.node._getClient()
        res = await client.diskDeleteFile(rpc_pb2.RpcDiskDeleteFileRequest(diskId=self.id, path=path))
        return res.success

    async def writeBytes(self, path:str, dataBytes:bytes, CHUNK_SIZE:int=1024*1024*15) -> bool:
        """
        Write bytes to a file on the disk.

        This method is deprecated. Use 
            async with disk.openwriteStream(path) as writer:
                await writer.write(data)
        instead.

        Args:
            path (str): The path of the file to write.
            dataBytes (bytes): The bytes to write.
            CHUNK_SIZE (int): The size of each chunk to write. Defaults to 1024*1024*15.

        Returns:
            bool: True if the bytes were written successfully, False otherwise.
        """
        client = self.node._getClient()
        def write_data():
            for j in range(0, len(dataBytes), CHUNK_SIZE):
                chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
                request = rpc_pb2.RpcDiskWriteFileRequest(diskId=str(self.id), path=path, data=chunk)
                yield request                              
        res=await client.diskWriteFile(write_data())
        return res.success


    async def openWriteStream(self, path:str, CHUNK_SIZE:int= 1024*1024*15) -> DiskWriter:
        """
        Open a stream for writing data to a file on the disk.

        Args:
            path (str): The path of the file to write.
            CHUNK_SIZE (int): The size of each chunk to write. Defaults to 1024*1024*15.

        Returns:
            DiskWriter: A writer for writing data to the stream.
        """
        client = self.node._getClient()
        writeQueue = asyncio.Queue()             
        async def write_data():
            while True:
                dataBytes = await writeQueue.get()
                if dataBytes is None:  # End of stream
                    break
                for j in range(0, len(dataBytes), CHUNK_SIZE):
                    chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
                    request = rpc_pb2.RpcDiskWriteFileRequest(diskId=str(self.id), path=path, data=chunk)
                    yield request
                writeQueue.task_done()
        res=client.diskWriteFile(write_data())
        return DiskWriter(writeQueue, res)

    
    async def openReadStream(self, path:str)-> DiskReader:
        """
        Open a stream for reading data from a file on the disk.

        Args:
            path (str): The path of the file to read.

        Returns:
            DiskReader: A reader for reading data from the stream.
        """

        client = self.node._getClient()
        readQueue = asyncio.Queue()
        async def read_data():
            async for chunk in client.diskReadFile(rpc_pb2.RpcDiskReadFileRequest(diskId=self.id, path=path)):
                readQueue.put_nowait(chunk.data)
        r = asyncio.create_task(read_data())
        return DiskReader(readQueue, r)

    async def readBytes(self, path:str):
        """
        Read bytes from a file on the disk.

        This method is deprecated. Use
            async with disk.openReadStream(path) as reader:
                data = await reader.read()

        Args:
            path (str): The path of the file to read.

        Returns:
            bytes: The bytes read from the file.
        """
        client = self.node._getClient()
        bytesOut = bytearray()
        async for chunk in client.diskReadFile(rpc_pb2.RpcDiskReadFileRequest(diskId=self.id, path=path)):
            bytesOut.extend(chunk.data)
        return bytesOut

    async def writeUTF8(self, path:str, data:str) -> bool:
        """
        Write a UTF-8 string to a file on the disk.

        This method is deprecated. Use
            async with disk.openWriteStream(path) as writer:
                await writer.writeUTF8(data)
        
        Args:
            path (str): The path of the file to write.
            data (str): The string to write.

        Returns:
            bool: True if the string was written successfully, False otherwise.
        """
        return await self.writeBytes(path, data.encode('utf-8'))

    async def readUTF8(self, path:str)-> str:
        """
        Read a UTF-8 string from a file on the disk.

        This method is deprecated. Use
            async with disk.openReadStream(path) as reader:
                data = await reader.readUTF8()

        Args:
            path (str): The path of the file to read.

        Returns:
            str: The string read from the file.
        """
        return (await self.readBytes(path)).decode('utf-8')


    
    async def close(self)-> None:
        """
        Close the disk.
        """
        if self.closed: return
        client = self.node._getClient()
        await client.closeDisk(rpc_pb2.RpcCloseDiskRequest(diskId=self.id))
        self.closed=True

    def getUrl(self)-> str:
        """
        Get the URL that can be used by any node to access the disk.

        Returns:
            str: The URL of the disk.
        """
        return self.url

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

Methods

async def close(self) ‑> None

Close the disk.

Expand source code
async def close(self)-> None:
    """
    Close the disk.
    """
    if self.closed: return
    client = self.node._getClient()
    await client.closeDisk(rpc_pb2.RpcCloseDiskRequest(diskId=self.id))
    self.closed=True
async def delete(self, path: str) ‑> bool

Delete a file from the disk.

Args

path : str
The path of the file to delete.

Returns

bool
True if the file was deleted successfully, False otherwise.
Expand source code
async def delete(self, path:str) -> bool:
    """
    Delete a file from the disk.

    Args:
        path (str): The path of the file to delete.

    Returns:
        bool: True if the file was deleted successfully, False otherwise.
    """
    client = self.node._getClient()
    res = await client.diskDeleteFile(rpc_pb2.RpcDiskDeleteFileRequest(diskId=self.id, path=path))
    return res.success
def getUrl(self) ‑> str

Get the URL that can be used by any node to access the disk.

Returns

str
The URL of the disk.
Expand source code
def getUrl(self)-> str:
    """
    Get the URL that can be used by any node to access the disk.

    Returns:
        str: The URL of the disk.
    """
    return self.url
async def list(self, prefix: str = '/') ‑> list[str]

List files in the disk with the given prefix.

Args

prefix : str
The prefix to filter files. Defaults to "/".

Returns

list[str]
A list of file paths.
Expand source code
async def list(self, prefix:str="/") -> list[str]:
    """
    List files in the disk with the given prefix.

    Args:
        prefix (str): The prefix to filter files. Defaults to "/".
    
    Returns:
        list[str]: A list of file paths.
    """
    client = self.node._getClient()
    files = await client.diskListFiles(rpc_pb2.RpcDiskListFilesRequest(diskId=self.id, path=prefix))
    return files.files
async def openReadStream(self, path: str) ‑> DiskReader

Open a stream for reading data from a file on the disk.

Args

path : str
The path of the file to read.

Returns

DiskReader
A reader for reading data from the stream.
Expand source code
async def openReadStream(self, path:str)-> DiskReader:
    """
    Open a stream for reading data from a file on the disk.

    Args:
        path (str): The path of the file to read.

    Returns:
        DiskReader: A reader for reading data from the stream.
    """

    client = self.node._getClient()
    readQueue = asyncio.Queue()
    async def read_data():
        async for chunk in client.diskReadFile(rpc_pb2.RpcDiskReadFileRequest(diskId=self.id, path=path)):
            readQueue.put_nowait(chunk.data)
    r = asyncio.create_task(read_data())
    return DiskReader(readQueue, r)
async def openWriteStream(self, path: str, CHUNK_SIZE: int = 15728640) ‑> DiskWriter

Open a stream for writing data to a file on the disk.

Args

path : str
The path of the file to write.
CHUNK_SIZE : int
The size of each chunk to write. Defaults to 1024102415.

Returns

DiskWriter
A writer for writing data to the stream.
Expand source code
async def openWriteStream(self, path:str, CHUNK_SIZE:int= 1024*1024*15) -> DiskWriter:
    """
    Open a stream for writing data to a file on the disk.

    Args:
        path (str): The path of the file to write.
        CHUNK_SIZE (int): The size of each chunk to write. Defaults to 1024*1024*15.

    Returns:
        DiskWriter: A writer for writing data to the stream.
    """
    client = self.node._getClient()
    writeQueue = asyncio.Queue()             
    async def write_data():
        while True:
            dataBytes = await writeQueue.get()
            if dataBytes is None:  # End of stream
                break
            for j in range(0, len(dataBytes), CHUNK_SIZE):
                chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
                request = rpc_pb2.RpcDiskWriteFileRequest(diskId=str(self.id), path=path, data=chunk)
                yield request
            writeQueue.task_done()
    res=client.diskWriteFile(write_data())
    return DiskWriter(writeQueue, res)
async def readBytes(self, path: str)

Read bytes from a file on the disk.

This method is deprecated. Use async with disk.openReadStream(path) as reader: data = await reader.read()

Args

path : str
The path of the file to read.

Returns

bytes
The bytes read from the file.
Expand source code
async def readBytes(self, path:str):
    """
    Read bytes from a file on the disk.

    This method is deprecated. Use
        async with disk.openReadStream(path) as reader:
            data = await reader.read()

    Args:
        path (str): The path of the file to read.

    Returns:
        bytes: The bytes read from the file.
    """
    client = self.node._getClient()
    bytesOut = bytearray()
    async for chunk in client.diskReadFile(rpc_pb2.RpcDiskReadFileRequest(diskId=self.id, path=path)):
        bytesOut.extend(chunk.data)
    return bytesOut
async def readUTF8(self, path: str) ‑> str

Read a UTF-8 string from a file on the disk.

This method is deprecated. Use async with disk.openReadStream(path) as reader: data = await reader.readUTF8()

Args

path : str
The path of the file to read.

Returns

str
The string read from the file.
Expand source code
async def readUTF8(self, path:str)-> str:
    """
    Read a UTF-8 string from a file on the disk.

    This method is deprecated. Use
        async with disk.openReadStream(path) as reader:
            data = await reader.readUTF8()

    Args:
        path (str): The path of the file to read.

    Returns:
        str: The string read from the file.
    """
    return (await self.readBytes(path)).decode('utf-8')
async def writeBytes(self, path: str, dataBytes: bytes, CHUNK_SIZE: int = 15728640) ‑> bool

Write bytes to a file on the disk.

This method is deprecated. Use async with disk.openwriteStream(path) as writer: await writer.write(data) instead.

Args

path : str
The path of the file to write.
dataBytes : bytes
The bytes to write.
CHUNK_SIZE : int
The size of each chunk to write. Defaults to 1024102415.

Returns

bool
True if the bytes were written successfully, False otherwise.
Expand source code
async def writeBytes(self, path:str, dataBytes:bytes, CHUNK_SIZE:int=1024*1024*15) -> bool:
    """
    Write bytes to a file on the disk.

    This method is deprecated. Use 
        async with disk.openwriteStream(path) as writer:
            await writer.write(data)
    instead.

    Args:
        path (str): The path of the file to write.
        dataBytes (bytes): The bytes to write.
        CHUNK_SIZE (int): The size of each chunk to write. Defaults to 1024*1024*15.

    Returns:
        bool: True if the bytes were written successfully, False otherwise.
    """
    client = self.node._getClient()
    def write_data():
        for j in range(0, len(dataBytes), CHUNK_SIZE):
            chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
            request = rpc_pb2.RpcDiskWriteFileRequest(diskId=str(self.id), path=path, data=chunk)
            yield request                              
    res=await client.diskWriteFile(write_data())
    return res.success
async def writeUTF8(self, path: str, data: str) ‑> bool

Write a UTF-8 string to a file on the disk.

This method is deprecated. Use async with disk.openWriteStream(path) as writer: await writer.writeUTF8(data)

Args

path : str
The path of the file to write.
data : str
The string to write.

Returns

bool
True if the string was written successfully, False otherwise.
Expand source code
async def writeUTF8(self, path:str, data:str) -> bool:
    """
    Write a UTF-8 string to a file on the disk.

    This method is deprecated. Use
        async with disk.openWriteStream(path) as writer:
            await writer.writeUTF8(data)
    
    Args:
        path (str): The path of the file to write.
        data (str): The string to write.

    Returns:
        bool: True if the string was written successfully, False otherwise.
    """
    return await self.writeBytes(path, data.encode('utf-8'))