Module openagents.JobContext
Expand source code
from openagents_grpc_proto import rpc_pb2_grpc
from openagents_grpc_proto import rpc_pb2
from .Logger import Logger
from .Disk import Disk
from .RunnerConfig import RunnerConfig
import time
import os
import json
import pickle
from typing import Union
class JobContext:
"""
A class that represents the context of a job.
"""
def __init__(self, node: 'OpenAgentsNode',runner:'JobRunner', job: rpc_pb2.Job__pb2):
self.job=job
self._node=node
self.runner=runner
self._cachePath = os.getenv('CACHE_PATH', "cache")
if not os.path.exists(self._cachePath):
os.makedirs(self._cachePath)
self.logger=Logger(
self._node.getMeta()["name"]+"."+self.runner.getMeta()["name"],
self._node.getMeta()["version"],
self.job.id,
lambda x: self._node._log(x, self.job.id),
)
self._disksByUrl = {}
self._disksById = {}
self._diskByName = {}
def getLogger(self):
"""
Get the logger of the job.
"""
return self.logger
def getNode(self):
"""
Get the node running this job
"""
return self._node
def getJob(self):
"""
Get the job object.
"""
return self.job
async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15):
"""
Set a value in the cache.
Args:
key (str): The key of the value to set.
value (object): The value to set.
version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0.
expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
local (bool): Whether to store the value locally or remotely. Defaults to True.
CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15.
"""
try:
dataBytes = pickle.dumps(value)
if local:
fullPath = os.path.join(self._cachePath, key)
with open(fullPath, "wb") as f:
f.write(dataBytes)
with open(fullPath+".meta.json", "w") as f:
f.write(json.dumps({"version":version, "expireAt":expireAt}))
else:
client = self._node._getClient()
def write_data():
for j in range(0, len(dataBytes), CHUNK_SIZE):
chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])
request = rpc_pb2.RpcCacheSetRequest(
key=key,
data=chunk,
expireAt=expireAt,
version=version
)
yield request
res=await client.cacheSet(write_data())
return res.success
except Exception as e:
self._node.getLogger().error("Error setting cache "+str(e))
return False
async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any:
"""
Get a value from the cache.
Args:
path (str): The key of the value to get.
lastVersion (int): The version of the cache to check. Defaults to 0.
local (bool): Whether to get the value locally or remotely. Defaults to True.
Returns:
any: The value of the cache.
"""
try:
if local:
fullPath = os.path.join(self._cachePath, key)
if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"):
return None
with open(fullPath+".meta.json", "r") as f:
meta = json.loads(f.read())
if lastVersion > 0 and meta["version"] != lastVersion:
return None
if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]:
return None
with open(fullPath, "rb") as f:
return pickle.load(f)
else:
client = self._node._getClient()
bytesOut = bytearray()
stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion))
async for chunk in stream:
if not chunk.exists:
return None
bytesOut.extend(chunk.data)
return pickle.loads(bytesOut)
except Exception as e:
self._node.getLogger().error("Error getting cache "+str(e))
return None
async def openStorage(self, url:str)->Disk:
"""
Open a storage disk.
Args:
url (str): The URL of the disk.
Returns:
Disk: The disk object.
"""
if url in self._disksByUrl:
return self._disksByUrl[url]
client = self._node._getClient()
diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
disk = Disk(id=diskId, url=url, node=self._node)
self._disksByUrl[url] = disk
self._disksById[diskId] = disk
return disk
async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk:
"""
Create a storage disk.
Args:
name (str): Optional: The name of the disk.
encryptionKey (str): Optional: The encryption key of the disk.
includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL.
Returns:
Disk: The disk object.
"""
if name in self._diskByName:
return self._diskByName[name]
client = self._node._getClient()
url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest(
name=name,
encryptionKey=encryptionKey,
includeEncryptionKeyInUrl=includeEncryptionKeyInUrl
))).url
diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
disk = Disk(id=diskId, url=url, node=self._node)
self._disksByUrl[url] = disk
self._disksById[diskId] = disk
if name:self._diskByName[name] = disk
return disk
async def close(self):
"""
Close the job context.
Free up resources, submit pending logs.
"""
for disk in self._disksById.values():
await disk.close()
for disk in self._disksByUrl.values():
await disk.close()
for disk in self._diskByName.values():
await disk.close()
self._disksById = {}
self._disksByUrl = {}
self._diskByName = {}
self.logger.close()
def getJobParamValues(self,key,default:list[str]=None)->list[str]:
job=self.getJob()
for p in job.param:
if p.key == key:
return p.value or default
return default
def getJobParamValue(self,key,default:str=None)->str:
job=self.getJob()
for p in job.param:
if p.key == key:
return p.value[0] or default
return default
def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]:
job=self.getJob()
out=[]
for i in job.input:
if marker is None or i.marker == marker:
out.append(i)
return out
def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2:
job=self.getJob()
for i in job.input:
if marker is None or i.marker == marker:
return i
return None
def getOutputFormat(self):
job=self.getJob()
return job.outputFormat
Classes
class JobContext (node: OpenAgentsNode, runner: JobRunner, job:
) -
A class that represents the context of a job.
Expand source code
class JobContext: """ A class that represents the context of a job. """ def __init__(self, node: 'OpenAgentsNode',runner:'JobRunner', job: rpc_pb2.Job__pb2): self.job=job self._node=node self.runner=runner self._cachePath = os.getenv('CACHE_PATH', "cache") if not os.path.exists(self._cachePath): os.makedirs(self._cachePath) self.logger=Logger( self._node.getMeta()["name"]+"."+self.runner.getMeta()["name"], self._node.getMeta()["version"], self.job.id, lambda x: self._node._log(x, self.job.id), ) self._disksByUrl = {} self._disksById = {} self._diskByName = {} def getLogger(self): """ Get the logger of the job. """ return self.logger def getNode(self): """ Get the node running this job """ return self._node def getJob(self): """ Get the job object. """ return self.job async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15): """ Set a value in the cache. Args: key (str): The key of the value to set. value (object): The value to set. version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0. expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0. local (bool): Whether to store the value locally or remotely. Defaults to True. CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15. """ try: dataBytes = pickle.dumps(value) if local: fullPath = os.path.join(self._cachePath, key) with open(fullPath, "wb") as f: f.write(dataBytes) with open(fullPath+".meta.json", "w") as f: f.write(json.dumps({"version":version, "expireAt":expireAt})) else: client = self._node._getClient() def write_data(): for j in range(0, len(dataBytes), CHUNK_SIZE): chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) request = rpc_pb2.RpcCacheSetRequest( key=key, data=chunk, expireAt=expireAt, version=version ) yield request res=await client.cacheSet(write_data()) return res.success except Exception as e: self._node.getLogger().error("Error setting cache "+str(e)) return False async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any: """ Get a value from the cache. Args: path (str): The key of the value to get. lastVersion (int): The version of the cache to check. Defaults to 0. local (bool): Whether to get the value locally or remotely. Defaults to True. Returns: any: The value of the cache. """ try: if local: fullPath = os.path.join(self._cachePath, key) if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"): return None with open(fullPath+".meta.json", "r") as f: meta = json.loads(f.read()) if lastVersion > 0 and meta["version"] != lastVersion: return None if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]: return None with open(fullPath, "rb") as f: return pickle.load(f) else: client = self._node._getClient() bytesOut = bytearray() stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion)) async for chunk in stream: if not chunk.exists: return None bytesOut.extend(chunk.data) return pickle.loads(bytesOut) except Exception as e: self._node.getLogger().error("Error getting cache "+str(e)) return None async def openStorage(self, url:str)->Disk: """ Open a storage disk. Args: url (str): The URL of the disk. Returns: Disk: The disk object. """ if url in self._disksByUrl: return self._disksByUrl[url] client = self._node._getClient() diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId disk = Disk(id=diskId, url=url, node=self._node) self._disksByUrl[url] = disk self._disksById[diskId] = disk return disk async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk: """ Create a storage disk. Args: name (str): Optional: The name of the disk. encryptionKey (str): Optional: The encryption key of the disk. includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL. Returns: Disk: The disk object. """ if name in self._diskByName: return self._diskByName[name] client = self._node._getClient() url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest( name=name, encryptionKey=encryptionKey, includeEncryptionKeyInUrl=includeEncryptionKeyInUrl ))).url diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId disk = Disk(id=diskId, url=url, node=self._node) self._disksByUrl[url] = disk self._disksById[diskId] = disk if name:self._diskByName[name] = disk return disk async def close(self): """ Close the job context. Free up resources, submit pending logs. """ for disk in self._disksById.values(): await disk.close() for disk in self._disksByUrl.values(): await disk.close() for disk in self._diskByName.values(): await disk.close() self._disksById = {} self._disksByUrl = {} self._diskByName = {} self.logger.close() def getJobParamValues(self,key,default:list[str]=None)->list[str]: job=self.getJob() for p in job.param: if p.key == key: return p.value or default return default def getJobParamValue(self,key,default:str=None)->str: job=self.getJob() for p in job.param: if p.key == key: return p.value[0] or default return default def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]: job=self.getJob() out=[] for i in job.input: if marker is None or i.marker == marker: out.append(i) return out def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2: job=self.getJob() for i in job.input: if marker is None or i.marker == marker: return i return None def getOutputFormat(self): job=self.getJob() return job.outputFormat
Methods
async def cacheGet(self, key: str, lastVersion=0, local=True) ‑>
-
Get a value from the cache.
Args
path
:str
- The key of the value to get.
lastVersion
:int
- The version of the cache to check. Defaults to 0.
local
:bool
- Whether to get the value locally or remotely. Defaults to True.
Returns
any
- The value of the cache.
Expand source code
async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any: """ Get a value from the cache. Args: path (str): The key of the value to get. lastVersion (int): The version of the cache to check. Defaults to 0. local (bool): Whether to get the value locally or remotely. Defaults to True. Returns: any: The value of the cache. """ try: if local: fullPath = os.path.join(self._cachePath, key) if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"): return None with open(fullPath+".meta.json", "r") as f: meta = json.loads(f.read()) if lastVersion > 0 and meta["version"] != lastVersion: return None if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]: return None with open(fullPath, "rb") as f: return pickle.load(f) else: client = self._node._getClient() bytesOut = bytearray() stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion)) async for chunk in stream: if not chunk.exists: return None bytesOut.extend(chunk.data) return pickle.loads(bytesOut) except Exception as e: self._node.getLogger().error("Error getting cache "+str(e)) return None
async def cacheSet(self, key: str, value, version: int = 0, expireAt: int = 0, local=True, CHUNK_SIZE=15728640)
-
Set a value in the cache.
Args
key
:str
- The key of the value to set.
value
:object
- The value to set.
version
:int
- The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0.
expireAt
:int
- The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
local
:bool
- Whether to store the value locally or remotely. Defaults to True.
CHUNK_SIZE
:int
- The size of each chunk to write in bytes, if needed. Defaults to 1024102415.
Expand source code
async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15): """ Set a value in the cache. Args: key (str): The key of the value to set. value (object): The value to set. version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0. expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0. local (bool): Whether to store the value locally or remotely. Defaults to True. CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15. """ try: dataBytes = pickle.dumps(value) if local: fullPath = os.path.join(self._cachePath, key) with open(fullPath, "wb") as f: f.write(dataBytes) with open(fullPath+".meta.json", "w") as f: f.write(json.dumps({"version":version, "expireAt":expireAt})) else: client = self._node._getClient() def write_data(): for j in range(0, len(dataBytes), CHUNK_SIZE): chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) request = rpc_pb2.RpcCacheSetRequest( key=key, data=chunk, expireAt=expireAt, version=version ) yield request res=await client.cacheSet(write_data()) return res.success except Exception as e: self._node.getLogger().error("Error setting cache "+str(e)) return False
async def close(self)
-
Close the job context. Free up resources, submit pending logs.
Expand source code
async def close(self): """ Close the job context. Free up resources, submit pending logs. """ for disk in self._disksById.values(): await disk.close() for disk in self._disksByUrl.values(): await disk.close() for disk in self._diskByName.values(): await disk.close() self._disksById = {} self._disksByUrl = {} self._diskByName = {} self.logger.close()
async def createStorage(self, name: str = None, encryptionKey: str = None, includeEncryptionKeyInUrl: str = None) ‑> Disk
-
Create a storage disk.
Args
name
:str
- Optional: The name of the disk.
encryptionKey
:str
- Optional: The encryption key of the disk.
includeEncryptionKeyInUrl
:str
- Optional: Whether to include the encryption key in the URL.
Returns
Disk
- The disk object.
Expand source code
async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk: """ Create a storage disk. Args: name (str): Optional: The name of the disk. encryptionKey (str): Optional: The encryption key of the disk. includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL. Returns: Disk: The disk object. """ if name in self._diskByName: return self._diskByName[name] client = self._node._getClient() url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest( name=name, encryptionKey=encryptionKey, includeEncryptionKeyInUrl=includeEncryptionKeyInUrl ))).url diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId disk = Disk(id=diskId, url=url, node=self._node) self._disksByUrl[url] = disk self._disksById[diskId] = disk if name:self._diskByName[name] = disk return disk
def getJob(self)
-
Get the job object.
Expand source code
def getJob(self): """ Get the job object. """ return self.job
def getJobInput(self, marker: str | None = None) ‑>
-
Expand source code
def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2: job=self.getJob() for i in job.input: if marker is None or i.marker == marker: return i return None
def getJobInputs(self, marker: str | None = None) ‑> list[
] -
Expand source code
def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]: job=self.getJob() out=[] for i in job.input: if marker is None or i.marker == marker: out.append(i) return out
def getJobParamValue(self, key, default: str = None) ‑> str
-
Expand source code
def getJobParamValue(self,key,default:str=None)->str: job=self.getJob() for p in job.param: if p.key == key: return p.value[0] or default return default
def getJobParamValues(self, key, default: list[str] = None) ‑> list[str]
-
Expand source code
def getJobParamValues(self,key,default:list[str]=None)->list[str]: job=self.getJob() for p in job.param: if p.key == key: return p.value or default return default
def getLogger(self)
-
Get the logger of the job.
Expand source code
def getLogger(self): """ Get the logger of the job. """ return self.logger
def getNode(self)
-
Get the node running this job
Expand source code
def getNode(self): """ Get the node running this job """ return self._node
def getOutputFormat(self)
-
Expand source code
def getOutputFormat(self): job=self.getJob() return job.outputFormat
async def openStorage(self, url: str) ‑> Disk
-
Open a storage disk.
Args
url
:str
- The URL of the disk.
Returns
Disk
- The disk object.
Expand source code
async def openStorage(self, url:str)->Disk: """ Open a storage disk. Args: url (str): The URL of the disk. Returns: Disk: The disk object. """ if url in self._disksByUrl: return self._disksByUrl[url] client = self._node._getClient() diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId disk = Disk(id=diskId, url=url, node=self._node) self._disksByUrl[url] = disk self._disksById[diskId] = disk return disk