Module openagents.JobContext
Expand source code
from openagents_grpc_proto import rpc_pb2_grpc
from openagents_grpc_proto import rpc_pb2
from .Logger import Logger
from .Disk import Disk
from .RunnerConfig import RunnerConfig
import time
import os
import json
import pickle
from typing import Union
class JobContext:
    """
    A class that represents the context of a job.
    """
    def __init__(self, node: 'OpenAgentsNode',runner:'JobRunner', job: rpc_pb2.Job__pb2):
        self.job=job
        self._node=node
        self.runner=runner
        
        self._cachePath = os.getenv('CACHE_PATH',  "cache")
        if not os.path.exists(self._cachePath):
            os.makedirs(self._cachePath)
        self.logger=Logger(
            self._node.getMeta()["name"]+"."+self.runner.getMeta()["name"],
            self._node.getMeta()["version"],
            self.job.id,
            lambda x: self._node._log(x, self.job.id),
        )
        self._disksByUrl = {}
        self._disksById = {}
        self._diskByName = {}
    def getLogger(self):
        """
        Get the logger of the job.
        """
        return self.logger
 
    def getNode(self):
        """
        Get the node running this job
        """
        return self._node
    def getJob(self):
        """
        Get the job object.
        """
        return self.job
    
    async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15):
        """
        Set a value in the cache.
        Args:
            key (str): The key of the value to set.
            value (object): The value to set.
            version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0.            expireAt (int): The timestamp at which the value expires. Defaults to 0.
            expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
            local (bool): Whether to store the value locally or remotely. Defaults to True.
            CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15.
        """
        try:
            dataBytes = pickle.dumps(value)
            if local:
                fullPath = os.path.join(self._cachePath, key)
                with open(fullPath, "wb") as f:
                    f.write(dataBytes)
                with open(fullPath+".meta.json", "w") as f:
                    f.write(json.dumps({"version":version, "expireAt":expireAt}))
            else:
                client = self._node._getClient()
                def write_data():
                    for j in range(0, len(dataBytes), CHUNK_SIZE):
                        chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
                        request = rpc_pb2.RpcCacheSetRequest(
                            key=key, 
                            data=chunk,
                            expireAt=expireAt,
                            version=version
                        )
                        yield request                              
                res=await client.cacheSet(write_data())
                return res.success
        except Exception as e:
            self._node.getLogger().error("Error setting cache "+str(e))
            return False
        
    async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any:
        """
        Get a value from the cache.
        Args:
            path (str): The key of the value to get.
            lastVersion (int): The version of the cache to check. Defaults to 0.
            local (bool): Whether to get the value locally or remotely. Defaults to True.
        Returns:
            any: The value of the cache.
        """
        try:
            if local:
                fullPath = os.path.join(self._cachePath, key)
                if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"):
                    return None
                with open(fullPath+".meta.json", "r") as f:
                    meta = json.loads(f.read())
                if lastVersion > 0 and meta["version"] != lastVersion:
                    return None
                if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]:
                    return None
                with open(fullPath, "rb") as f:
                    return pickle.load(f)
            else:
                client = self._node._getClient()
                bytesOut = bytearray()
                stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion))
                async for chunk in stream:
                    if not chunk.exists:
                        return None
                    bytesOut.extend(chunk.data)
                return pickle.loads(bytesOut)
        except Exception as e:
            self._node.getLogger().error("Error getting cache "+str(e))
            return None
    
    async def openStorage(self, url:str)->Disk:
        """
        Open a storage disk.
        Args:
            url (str): The URL of the disk.
        Returns:
            Disk: The disk object.
        """
        if url in self._disksByUrl:
            return self._disksByUrl[url]
        client = self._node._getClient()
        diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
        disk =  Disk(id=diskId, url=url, node=self._node)
        self._disksByUrl[url] = disk
        self._disksById[diskId] = disk
        return disk
    async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk:
        """
        Create a storage disk.
        Args:
            name (str): Optional: The name of the disk.
            encryptionKey (str): Optional: The encryption key of the disk.
            includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL.
        Returns:
            Disk: The disk object.
        """
        if name in self._diskByName:
            return self._diskByName[name]
        
        client = self._node._getClient()
        url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest(
            name=name,
            encryptionKey=encryptionKey,
            includeEncryptionKeyInUrl=includeEncryptionKeyInUrl
        ))).url
        diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
        disk = Disk(id=diskId, url=url, node=self._node)
        self._disksByUrl[url] = disk
        self._disksById[diskId] = disk
        if name:self._diskByName[name] = disk
        return disk
    async def close(self):
        """
        Close the job context.
        Free up resources, submit pending logs.
        """
        for disk in self._disksById.values():
            await disk.close()
        for disk in self._disksByUrl.values():
            await disk.close()
        for disk in self._diskByName.values():
            await disk.close()
        self._disksById = {}
        self._disksByUrl = {}
        self._diskByName = {}
        self.logger.close()
    def getJobParamValues(self,key,default:list[str]=None)->list[str]:
        job=self.getJob()
        for p in job.param:
            if p.key == key:
                return p.value or default
        return default
    
    def getJobParamValue(self,key,default:str=None)->str:
        job=self.getJob()
        for p in job.param:
            if p.key == key:
                return p.value[0] or default
        return default 
    def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]:
        job=self.getJob()
        out=[]
        for i in job.input:
            if marker is None or i.marker == marker:
                out.append(i)
        return out
    def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2:
        job=self.getJob()
        for i in job.input:
            if marker is None or i.marker == marker:
                return i
        return None
    def getOutputFormat(self):
        job=self.getJob()
        return job.outputFormat
Classes
class JobContext (node: OpenAgentsNode, runner: JobRunner, job:) - 
A class that represents the context of a job.
Expand source code
class JobContext: """ A class that represents the context of a job. """ def __init__(self, node: 'OpenAgentsNode',runner:'JobRunner', job: rpc_pb2.Job__pb2): self.job=job self._node=node self.runner=runner self._cachePath = os.getenv('CACHE_PATH', "cache") if not os.path.exists(self._cachePath): os.makedirs(self._cachePath) self.logger=Logger( self._node.getMeta()["name"]+"."+self.runner.getMeta()["name"], self._node.getMeta()["version"], self.job.id, lambda x: self._node._log(x, self.job.id), ) self._disksByUrl = {} self._disksById = {} self._diskByName = {} def getLogger(self): """ Get the logger of the job. """ return self.logger def getNode(self): """ Get the node running this job """ return self._node def getJob(self): """ Get the job object. """ return self.job async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15): """ Set a value in the cache. Args: key (str): The key of the value to set. value (object): The value to set. version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0. expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0. local (bool): Whether to store the value locally or remotely. Defaults to True. CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15. """ try: dataBytes = pickle.dumps(value) if local: fullPath = os.path.join(self._cachePath, key) with open(fullPath, "wb") as f: f.write(dataBytes) with open(fullPath+".meta.json", "w") as f: f.write(json.dumps({"version":version, "expireAt":expireAt})) else: client = self._node._getClient() def write_data(): for j in range(0, len(dataBytes), CHUNK_SIZE): chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) request = rpc_pb2.RpcCacheSetRequest( key=key, data=chunk, expireAt=expireAt, version=version ) yield request res=await client.cacheSet(write_data()) return res.success except Exception as e: self._node.getLogger().error("Error setting cache "+str(e)) return False async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any: """ Get a value from the cache. Args: path (str): The key of the value to get. lastVersion (int): The version of the cache to check. Defaults to 0. local (bool): Whether to get the value locally or remotely. Defaults to True. Returns: any: The value of the cache. """ try: if local: fullPath = os.path.join(self._cachePath, key) if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"): return None with open(fullPath+".meta.json", "r") as f: meta = json.loads(f.read()) if lastVersion > 0 and meta["version"] != lastVersion: return None if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]: return None with open(fullPath, "rb") as f: return pickle.load(f) else: client = self._node._getClient() bytesOut = bytearray() stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion)) async for chunk in stream: if not chunk.exists: return None bytesOut.extend(chunk.data) return pickle.loads(bytesOut) except Exception as e: self._node.getLogger().error("Error getting cache "+str(e)) return None async def openStorage(self, url:str)->Disk: """ Open a storage disk. Args: url (str): The URL of the disk. Returns: Disk: The disk object. """ if url in self._disksByUrl: return self._disksByUrl[url] client = self._node._getClient() diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId disk = Disk(id=diskId, url=url, node=self._node) self._disksByUrl[url] = disk self._disksById[diskId] = disk return disk async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk: """ Create a storage disk. Args: name (str): Optional: The name of the disk. encryptionKey (str): Optional: The encryption key of the disk. includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL. Returns: Disk: The disk object. """ if name in self._diskByName: return self._diskByName[name] client = self._node._getClient() url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest( name=name, encryptionKey=encryptionKey, includeEncryptionKeyInUrl=includeEncryptionKeyInUrl ))).url diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId disk = Disk(id=diskId, url=url, node=self._node) self._disksByUrl[url] = disk self._disksById[diskId] = disk if name:self._diskByName[name] = disk return disk async def close(self): """ Close the job context. Free up resources, submit pending logs. """ for disk in self._disksById.values(): await disk.close() for disk in self._disksByUrl.values(): await disk.close() for disk in self._diskByName.values(): await disk.close() self._disksById = {} self._disksByUrl = {} self._diskByName = {} self.logger.close() def getJobParamValues(self,key,default:list[str]=None)->list[str]: job=self.getJob() for p in job.param: if p.key == key: return p.value or default return default def getJobParamValue(self,key,default:str=None)->str: job=self.getJob() for p in job.param: if p.key == key: return p.value[0] or default return default def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]: job=self.getJob() out=[] for i in job.input: if marker is None or i.marker == marker: out.append(i) return out def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2: job=self.getJob() for i in job.input: if marker is None or i.marker == marker: return i return None def getOutputFormat(self): job=self.getJob() return job.outputFormatMethods
async def cacheGet(self, key: str, lastVersion=0, local=True) ‑>- 
Get a value from the cache.
Args
path:str- The key of the value to get.
 lastVersion:int- The version of the cache to check. Defaults to 0.
 local:bool- Whether to get the value locally or remotely. Defaults to True.
 
Returns
any- The value of the cache.
 
Expand source code
async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any: """ Get a value from the cache. Args: path (str): The key of the value to get. lastVersion (int): The version of the cache to check. Defaults to 0. local (bool): Whether to get the value locally or remotely. Defaults to True. Returns: any: The value of the cache. """ try: if local: fullPath = os.path.join(self._cachePath, key) if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"): return None with open(fullPath+".meta.json", "r") as f: meta = json.loads(f.read()) if lastVersion > 0 and meta["version"] != lastVersion: return None if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]: return None with open(fullPath, "rb") as f: return pickle.load(f) else: client = self._node._getClient() bytesOut = bytearray() stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion)) async for chunk in stream: if not chunk.exists: return None bytesOut.extend(chunk.data) return pickle.loads(bytesOut) except Exception as e: self._node.getLogger().error("Error getting cache "+str(e)) return None async def cacheSet(self, key: str, value, version: int = 0, expireAt: int = 0, local=True, CHUNK_SIZE=15728640)- 
Set a value in the cache.
Args
key:str- The key of the value to set.
 value:object- The value to set.
 version:int- The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0.
 expireAt:int- The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
 local:bool- Whether to store the value locally or remotely. Defaults to True.
 CHUNK_SIZE:int- The size of each chunk to write in bytes, if needed. Defaults to 1024102415.
 
Expand source code
async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15): """ Set a value in the cache. Args: key (str): The key of the value to set. value (object): The value to set. version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0. expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0. local (bool): Whether to store the value locally or remotely. Defaults to True. CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15. """ try: dataBytes = pickle.dumps(value) if local: fullPath = os.path.join(self._cachePath, key) with open(fullPath, "wb") as f: f.write(dataBytes) with open(fullPath+".meta.json", "w") as f: f.write(json.dumps({"version":version, "expireAt":expireAt})) else: client = self._node._getClient() def write_data(): for j in range(0, len(dataBytes), CHUNK_SIZE): chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) request = rpc_pb2.RpcCacheSetRequest( key=key, data=chunk, expireAt=expireAt, version=version ) yield request res=await client.cacheSet(write_data()) return res.success except Exception as e: self._node.getLogger().error("Error setting cache "+str(e)) return False async def close(self)- 
Close the job context. Free up resources, submit pending logs.
Expand source code
async def close(self): """ Close the job context. Free up resources, submit pending logs. """ for disk in self._disksById.values(): await disk.close() for disk in self._disksByUrl.values(): await disk.close() for disk in self._diskByName.values(): await disk.close() self._disksById = {} self._disksByUrl = {} self._diskByName = {} self.logger.close() async def createStorage(self, name: str = None, encryptionKey: str = None, includeEncryptionKeyInUrl: str = None) ‑> Disk- 
Create a storage disk.
Args
name:str- Optional: The name of the disk.
 encryptionKey:str- Optional: The encryption key of the disk.
 includeEncryptionKeyInUrl:str- Optional: Whether to include the encryption key in the URL.
 
Returns
Disk- The disk object.
 
Expand source code
async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk: """ Create a storage disk. Args: name (str): Optional: The name of the disk. encryptionKey (str): Optional: The encryption key of the disk. includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL. Returns: Disk: The disk object. """ if name in self._diskByName: return self._diskByName[name] client = self._node._getClient() url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest( name=name, encryptionKey=encryptionKey, includeEncryptionKeyInUrl=includeEncryptionKeyInUrl ))).url diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId disk = Disk(id=diskId, url=url, node=self._node) self._disksByUrl[url] = disk self._disksById[diskId] = disk if name:self._diskByName[name] = disk return disk def getJob(self)- 
Get the job object.
Expand source code
def getJob(self): """ Get the job object. """ return self.job def getJobInput(self, marker: str | None = None) ‑>- 
Expand source code
def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2: job=self.getJob() for i in job.input: if marker is None or i.marker == marker: return i return None def getJobInputs(self, marker: str | None = None) ‑> list[] - 
Expand source code
def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]: job=self.getJob() out=[] for i in job.input: if marker is None or i.marker == marker: out.append(i) return out def getJobParamValue(self, key, default: str = None) ‑> str- 
Expand source code
def getJobParamValue(self,key,default:str=None)->str: job=self.getJob() for p in job.param: if p.key == key: return p.value[0] or default return default def getJobParamValues(self, key, default: list[str] = None) ‑> list[str]- 
Expand source code
def getJobParamValues(self,key,default:list[str]=None)->list[str]: job=self.getJob() for p in job.param: if p.key == key: return p.value or default return default def getLogger(self)- 
Get the logger of the job.
Expand source code
def getLogger(self): """ Get the logger of the job. """ return self.logger def getNode(self)- 
Get the node running this job
Expand source code
def getNode(self): """ Get the node running this job """ return self._node def getOutputFormat(self)- 
Expand source code
def getOutputFormat(self): job=self.getJob() return job.outputFormat async def openStorage(self, url: str) ‑> Disk- 
Open a storage disk.
Args
url:str- The URL of the disk.
 
Returns
Disk- The disk object.
 
Expand source code
async def openStorage(self, url:str)->Disk: """ Open a storage disk. Args: url (str): The URL of the disk. Returns: Disk: The disk object. """ if url in self._disksByUrl: return self._disksByUrl[url] client = self._node._getClient() diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId disk = Disk(id=diskId, url=url, node=self._node) self._disksByUrl[url] = disk self._disksById[diskId] = disk return disk