Module openagents.JobRunner
Expand source code
from .Logger import Logger
from .Disk import Disk
from openagents_grpc_proto import rpc_pb2_grpc
from openagents_grpc_proto import rpc_pb2
from .RunnerConfig import RunnerConfig
import time
import os
import json
import pickle
from typing import Union
from .JobContext import JobContext
from .Logger import Logger
import asyncio
class JobRunner:
"""
An abstract class that represents a job runner.
Implementations of this class should be able to run jobs.
The internal logic of the runner uses these additional environment variables for configuration:
- CACHE_PATH: The path to store cached data. Defaults to "./cache".
"""
def __init__(self, config: RunnerConfig):
self._node = None
self._template = config.getTemplate()
self._meta = config.getMeta()
self._sockets = config.getSockets()
self._filter = config.getFilter()
self.runInParallel=False
self.initialized=False
def getMeta(self):
return self._meta
def getFilter(self):
return self._filter
def getTemplate(self):
return self._template
def getSockets(self):
return self._sockets
def setRunInParallel(self, runInParallel:bool):
"""
Set whether the runner should run in parallel.
Args:
runInParallel (bool): True if the runner should run in parallel, False otherwise. Defaults to False.
"""
self.runInParallel = runInParallel
def isRunInParallel(self) -> bool:
"""
Check if the runner should run in parallel.
Returns:
bool: True if the runner should run in parallel, False otherwise.
"""
return self.runInParallel
async def postRun(self, ctx:JobContext) -> None:
"""
Called after the runner has finished running.
Args:
ctx (JobContext): The context of the job.
"""
pass
async def canRun(self,ctx:JobContext) -> bool:
"""
Check if the runner can run a job.
Args:
ctx (JobContext): The context of the job.
Returns:
bool: True if the runner can run the job, False otherwise.
"""
return True
async def preRun(self, ctx:JobContext)-> None:
"""
Called before the runner starts running.
Args:
ctx (JobContext): The context of the job.
"""
pass
async def run(self, ctx:JobContext) -> None:
"""
Run a job.
Args:
ctx (JobContext): The context of the job.
"""
pass
async def loop(self, node: 'OpenAgentsNode')-> None:
"""
The main loop of the runner.
Args:
node (OpenAgentsNode): The node
"""
pass
async def init(self,node: 'OpenAgentsNode')-> None:
"""
Initialize the runner.
Args:
node (OpenAgentsNode): The node
"""
pass
Classes
class JobRunner (config: RunnerConfig)
-
An abstract class that represents a job runner. Implementations of this class should be able to run jobs. The internal logic of the runner uses these additional environment variables for configuration: - CACHE_PATH: The path to store cached data. Defaults to "./cache".
Expand source code
class JobRunner: """ An abstract class that represents a job runner. Implementations of this class should be able to run jobs. The internal logic of the runner uses these additional environment variables for configuration: - CACHE_PATH: The path to store cached data. Defaults to "./cache". """ def __init__(self, config: RunnerConfig): self._node = None self._template = config.getTemplate() self._meta = config.getMeta() self._sockets = config.getSockets() self._filter = config.getFilter() self.runInParallel=False self.initialized=False def getMeta(self): return self._meta def getFilter(self): return self._filter def getTemplate(self): return self._template def getSockets(self): return self._sockets def setRunInParallel(self, runInParallel:bool): """ Set whether the runner should run in parallel. Args: runInParallel (bool): True if the runner should run in parallel, False otherwise. Defaults to False. """ self.runInParallel = runInParallel def isRunInParallel(self) -> bool: """ Check if the runner should run in parallel. Returns: bool: True if the runner should run in parallel, False otherwise. """ return self.runInParallel async def postRun(self, ctx:JobContext) -> None: """ Called after the runner has finished running. Args: ctx (JobContext): The context of the job. """ pass async def canRun(self,ctx:JobContext) -> bool: """ Check if the runner can run a job. Args: ctx (JobContext): The context of the job. Returns: bool: True if the runner can run the job, False otherwise. """ return True async def preRun(self, ctx:JobContext)-> None: """ Called before the runner starts running. Args: ctx (JobContext): The context of the job. """ pass async def run(self, ctx:JobContext) -> None: """ Run a job. Args: ctx (JobContext): The context of the job. """ pass async def loop(self, node: 'OpenAgentsNode')-> None: """ The main loop of the runner. Args: node (OpenAgentsNode): The node """ pass async def init(self,node: 'OpenAgentsNode')-> None: """ Initialize the runner. Args: node (OpenAgentsNode): The node """ pass
Methods
async def canRun(self, ctx: JobContext) ‑> bool
-
Check if the runner can run a job.
Args
ctx
:JobContext
- The context of the job.
Returns
bool
- True if the runner can run the job, False otherwise.
Expand source code
async def canRun(self,ctx:JobContext) -> bool: """ Check if the runner can run a job. Args: ctx (JobContext): The context of the job. Returns: bool: True if the runner can run the job, False otherwise. """ return True
def getFilter(self)
-
Expand source code
def getFilter(self): return self._filter
def getMeta(self)
-
Expand source code
def getMeta(self): return self._meta
def getSockets(self)
-
Expand source code
def getSockets(self): return self._sockets
def getTemplate(self)
-
Expand source code
def getTemplate(self): return self._template
async def init(self, node: OpenAgentsNode)
-
Initialize the runner.
Args
node
:OpenAgentsNode
- The node
Expand source code
async def init(self,node: 'OpenAgentsNode')-> None: """ Initialize the runner. Args: node (OpenAgentsNode): The node """ pass
def isRunInParallel(self) ‑> bool
-
Check if the runner should run in parallel.
Returns
bool
- True if the runner should run in parallel, False otherwise.
Expand source code
def isRunInParallel(self) -> bool: """ Check if the runner should run in parallel. Returns: bool: True if the runner should run in parallel, False otherwise. """ return self.runInParallel
async def loop(self, node: OpenAgentsNode)
-
The main loop of the runner.
Args
node
:OpenAgentsNode
- The node
Expand source code
async def loop(self, node: 'OpenAgentsNode')-> None: """ The main loop of the runner. Args: node (OpenAgentsNode): The node """ pass
async def postRun(self, ctx: JobContext) ‑> None
-
Called after the runner has finished running.
Args
ctx
:JobContext
- The context of the job.
Expand source code
async def postRun(self, ctx:JobContext) -> None: """ Called after the runner has finished running. Args: ctx (JobContext): The context of the job. """ pass
async def preRun(self, ctx: JobContext) ‑> None
-
Called before the runner starts running.
Args
ctx
:JobContext
- The context of the job.
Expand source code
async def preRun(self, ctx:JobContext)-> None: """ Called before the runner starts running. Args: ctx (JobContext): The context of the job. """ pass
async def run(self, ctx: JobContext) ‑> None
-
Run a job.
Args
ctx
:JobContext
- The context of the job.
Expand source code
async def run(self, ctx:JobContext) -> None: """ Run a job. Args: ctx (JobContext): The context of the job. """ pass
def setRunInParallel(self, runInParallel: bool)
-
Set whether the runner should run in parallel.
Args
runInParallel
:bool
- True if the runner should run in parallel, False otherwise. Defaults to False.
Expand source code
def setRunInParallel(self, runInParallel:bool): """ Set whether the runner should run in parallel. Args: runInParallel (bool): True if the runner should run in parallel, False otherwise. Defaults to False. """ self.runInParallel = runInParallel