Module openagents.OpenAgentsNode
Expand source code
import grpc
from openagents_grpc_proto import rpc_pb2_grpc
from openagents_grpc_proto import rpc_pb2
import time
import os
import traceback
import asyncio
from .JobRunner import JobRunner
from .NodeConfig import NodeConfig
from .Logger import Logger
from typing import Union
from .JobContext import JobContext
import json
class HeaderAdderInterceptor(
grpc.aio.ClientInterceptor
):
"""
An interceptor for GRPC that adds headers to outgoing requests.
"""
def __init__(self, headers):
self._headers = headers
async def _intercept(self, continuation, client_call_details, request_or_iterator):
metadata = client_call_details.metadata
if not metadata:
metadata = grpc.aio.Metadata()
for header in self._headers:
metadata.add(header[0], header[1])
new_client_call_details = client_call_details._replace(metadata=metadata)
response = await continuation(new_client_call_details, request_or_iterator)
return response
class HeaderAdderInterceptor0(grpc.aio.UnaryStreamClientInterceptor,HeaderAdderInterceptor):
async def intercept_unary_stream(self, continuation, client_call_details, request):
return await self._intercept(continuation, client_call_details, request)
class HeaderAdderInterceptor1(grpc.aio.StreamUnaryClientInterceptor,HeaderAdderInterceptor):
async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return await self._intercept(continuation, client_call_details, request_iterator)
class HeaderAdderInterceptor2(grpc.aio.StreamStreamClientInterceptor,HeaderAdderInterceptor):
async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return await self._intercept(continuation, client_call_details, request_iterator)
class HeaderAdderInterceptor3(grpc.aio.UnaryUnaryClientInterceptor,HeaderAdderInterceptor):
async def intercept_unary_unary(self, continuation, client_call_details, request):
return await self._intercept(continuation, client_call_details, request)
class OpenAgentsNode:
"""
An OpenAgents node that can run jobs.
The node can be configured with the following environment variables:
- NODE_NAME: The name of the node. Defaults to "OpenAgentsNode".
- NODE_ICON: The icon of the node. Defaults to "".
- NODE_VERSION: The version of the node. Defaults to "0.0.1".
- NODE_DESCRIPTION: The description of the node. Defaults to "".
- POOL_ADDRESS: The address of the pool. Defaults to "
- POOL_PORT: The port of the pool. Defaults to 5000.
- POOL_SSL: Whether to use SSL for the pool. Defaults to False.
- NODE_TPS: The ticks per second of the node main loop. Defaults to 10.
- NODE_TOKEN: The token of the node. Defaults to None.
- NWC: Nostr wallet connect URL
"""
def __init__(self, config: NodeConfig):
self.meta = config.getMeta()
self.nextNodeAnnounce = 0
self.channel = None
self.rpcClient = None
self.registeredRunners=[]
self.poolAddress = None
self.poolPort = None
self.lockedJobs = []
self.isLooping = False
self.logger = None
self.loopInterval = 100
self.NWC = os.getenv('NWC', None)
if self.NWC and "prices" not in self.meta:
self.meta["prices"] = [
{
"amount": int(os.getenv('PRICE_SATS', os.getenv('PRICE_MSATS', "3000"))),
"currency": os.getenv('CURRENCY', "bitcoin"),
"protocol": os.getenv('PROTOCOL', "lightning"),
}
]
self.nodeName = self.meta["name"]
self.nodeIcon = self.meta["picture"]
self.nodeVersion = self.meta["version"]
self.nodeDescription = self.meta["description"]
self.channel = None
self.rpcClient = None
self.logger = Logger(self.nodeName,self.nodeVersion)
self.logger.info("Starting "+self.nodeName+" v"+self.nodeVersion)
def getMeta(self):
return self.meta
def registerRunner(self, runner:JobRunner) -> None:
"""
Register a runner to the node.
Args:
runner (JobRunner): The runner to register.
"""
self.registeredRunners.append({
"runner": runner,
"nextAnnouncementTimestamp": 0
})
def getLogger(self):
"""
Get the active logger for the node.
"""
return self.logger
def _getClient(self):
"""
Get or create a GRPC client for the node.
"""
if self.channel is None or self.channel._channel.check_connectivity_state(True) == grpc.ChannelConnectivity.SHUTDOWN:
if self.channel is not None:
try:
self.getLogger().info("Closing channel")
self.channel.close()
except Exception as e:
self.getLogger().error("Error closing channel "+str(e))
self.getLogger().info("Connect to "+self.poolAddress+":"+str(self.poolPort)+" with ssl "+str(self.poolSsl))
options=[
# 20 MB
('grpc.max_send_message_length', 1024*1024*20),
('grpc.max_receive_message_length', 1024*1024*20)
]
interceptors=None
nodeToken = os.getenv('NODE_TOKEN', None)
nwc = self.NWC
if nodeToken or nwc:
metadata=[]
if nwc:
metadata.append(("nwc", str(nwc)))
if nodeToken:
metadata.append(("authorization", str(nodeToken)))
if interceptors is None: interceptors=[]
interceptors.append(HeaderAdderInterceptor0(metadata))
interceptors.append(HeaderAdderInterceptor1(metadata))
interceptors.append(HeaderAdderInterceptor2(metadata))
interceptors.append(HeaderAdderInterceptor3(metadata))
if nwc:
self.getLogger().info("This node can receive payments")
else:
self.getLogger().warn("This node is not enabled to receive payments. Please provide a NWC URL")
if self.poolSsl:
self.channel = grpc.aio.secure_channel(self.poolAddress+":"+str(self.poolPort), grpc.ssl_channel_credentials(),options,interceptors=interceptors)
else:
self.channel = grpc.aio.insecure_channel(self.poolAddress+":"+str(self.poolPort),options,interceptors=interceptors)
self.rpcClient = rpc_pb2_grpc.PoolConnectorStub(self.channel)
return self.rpcClient
async def _logToJob(self, message:str, jobId:str=None):
"""
Log a message to a job.
Args:
message (str): The message to log.
jobId (str): The ID of the job to log to.
"""
try:
await self._getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message))
except Exception as e:
print("Error logging to job "+str(e))
def _log(self,message:str, jobId:str=None):
"""
Log a message to the network.
Args:
message (str): The message to log.
jobId (str): The ID of the job to log to.
"""
if jobId:
asyncio.create_task(self._logToJob(message, jobId))
async def _acceptJob(self, jobId:str):
"""
Accept a job.
Args:
jobId (str): The ID of the job to accept.
"""
await self._getClient().acceptJob(rpc_pb2.RpcAcceptJob(jobId=jobId))
async def _executePendingJobForRunner(self , runner:JobRunner):
"""
Execute all pending jobs for a runner.
Args:
runner (JobRunner): The runner to execute the job.
"""
if len([x for x in self.registeredRunners if x["runner"]==runner])==0:
del self.runnerTasks[runner]
return
try:
if not runner.initialized:
runner.initialized=True
await runner.init(self)
client = self._getClient()
jobs=[]
filter = runner.getFilter()
meta = runner.getMeta()
prices = "prices" in meta and meta["prices"] or None
self.lockedJobs = [x for x in self.lockedJobs if time.time()-x[1] < 60]
jobs.extend((await client.getPendingJobs(rpc_pb2.RpcGetPendingJobs(
filterByRunOn = filter["filterByRunOn"] if "filterByRunOn" in filter else None,
filterByCustomer = filter["filterByCustomer"] if "filterByCustomer" in filter else None,
filterByDescription = filter["filterByDescription"] if "filterByDescription" in filter else None,
filterById = filter["filterById"] if "filterById" in filter else None,
filterByKind = filter["filterByKind"] if "filterByKind" in filter else None,
filterByBids = prices,
wait=60000,
# exclude failed jobs
excludeId = [x[0] for x in self.lockedJobs]
))).jobs)
if len(jobs)>0 : self.getLogger().log(str(len(jobs))+" pending jobs for "+runner.__class__.__name__)
else : self.getLogger().finer("No pending jobs for "+runner.__class__.__name__)
for job in jobs:
wasAccepted=False
t=time.time()
ctx = JobContext(self,runner,job)
try:
client = self._getClient() # Refresh client connection if needed
if not await runner.canRun(ctx):
await ctx.close()
continue
self.lockedJobs.append([job.id, time.time()])
await self._acceptJob(job.id)
wasAccepted = True
ctx.getLogger().info("Job started on node "+self.nodeName)
await runner.preRun(ctx)
async def task():
try:
output=await runner.run(ctx)
await runner.postRun(ctx)
ctx.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id)
await client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output))
except Exception as e:
ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
if wasAccepted:
await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
traceback.print_exc()
await ctx.close()
if not runner.isRunInParallel():
await task()
else:
asyncio.create_task(task())
except Exception as e:
ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
await ctx.close()
if wasAccepted:
await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
traceback.print_exc()
except Exception as e:
traceback.print_exc()
self.getLogger().error("Error executing runner "+str(e))
await asyncio.sleep(5000.0/1000.0)
self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner))
runnerTasks={}
async def _executePendingJob(self ):
"""
Execute all pending jobs for all runners.
"""
for reg in self.registeredRunners:
try:
runner = reg["runner"]
if not runner in self.runnerTasks:
self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner))
except Exception as e:
self.getLogger().log("Error executing pending job "+str(e), None)
async def reannounce(self):
"""
Reannounce the node and all templates.
"""
# Announce node
try:
time_ms=int(time.time()*1000)
if time_ms >= self.nextNodeAnnounce:
try:
client = self._getClient()
res=await client.announceNode(rpc_pb2.RpcAnnounceNodeRequest(
iconUrl = self.nodeIcon,
name = self.nodeName,
description = self.nodeDescription,
))
self.nextNodeAnnounce = int(time.time()*1000) + res.refreshInterval
self.getLogger().log("Node announced, next announcement in "+str(res.refreshInterval)+" ms")
except Exception as e:
self.getLogger().error("Error announcing node "+ str(e), None)
self.nextNodeAnnounce = int(time.time()*1000) + 5000
for reg in self.registeredRunners:
try:
if time_ms >= reg["nextAnnouncementTimestamp"]:
client = self._getClient()
res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest(
meta=json.dumps(reg["runner"].getMeta()),
template=reg["runner"].getTemplate(),
sockets=json.dumps(reg["runner"].getSockets())
))
reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval
self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms")
except Exception as e:
self.getLogger().error("Error announcing template "+ str(e), None)
reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000
except Exception as e:
self.getLogger().error("Error reannouncing "+str(e), None)
await asyncio.sleep(5)
asyncio.create_task(self.reannounce())
async def _loop(self):
"""
The main loop of the node.
"""
promises = [reg["runner"].loop(self) for reg in self.registeredRunners]
await asyncio.gather(*promises)
self.isLooping = False
await asyncio.sleep(self.loopInterval/1000.0)
asyncio.create_task(self._loop())
async def _run(self, poolAddress=None, poolPort=None, poolSsl=False):
"""
Internal method to run the node.
Should not be called, use start() instead.
"""
await asyncio.sleep(5000.0/1000.0)
self.poolAddress = poolAddress or os.getenv('POOL_ADDRESS', "playground.openagents.com")
self.poolPort = poolPort or int(os.getenv('POOL_PORT', "6021"))
self.poolSsl = poolSsl or os.getenv('POOL_SSL', "true")== "true"
self.loopInterval = 1000.0/int(os.getenv('NODE_TPS', "10"))
await self._loop()
await self.reannounce()
while True:
await self._executePendingJob()
await asyncio.sleep(1000.0/1000.0)
def start(self, poolAddress:str=None, poolPort:str=None):
"""
Start the node in an asyncio event loop.
Args:
poolAddress (str): The address of the pool. Defaults to
the environment variable POOL_ADDRESS.
poolPort (int): The port of the pool. Defaults to the
environment variable POOL_PORT.
"""
asyncio.run(self._run(poolAddress, poolPort))
Classes
class HeaderAdderInterceptor (headers)
-
An interceptor for GRPC that adds headers to outgoing requests.
Expand source code
class HeaderAdderInterceptor( grpc.aio.ClientInterceptor ): """ An interceptor for GRPC that adds headers to outgoing requests. """ def __init__(self, headers): self._headers = headers async def _intercept(self, continuation, client_call_details, request_or_iterator): metadata = client_call_details.metadata if not metadata: metadata = grpc.aio.Metadata() for header in self._headers: metadata.add(header[0], header[1]) new_client_call_details = client_call_details._replace(metadata=metadata) response = await continuation(new_client_call_details, request_or_iterator) return response
Ancestors
- grpc.aio._interceptor.ClientInterceptor
Subclasses
class HeaderAdderInterceptor0 (headers)
-
Affords intercepting unary-stream invocations.
Expand source code
class HeaderAdderInterceptor0(grpc.aio.UnaryStreamClientInterceptor,HeaderAdderInterceptor): async def intercept_unary_stream(self, continuation, client_call_details, request): return await self._intercept(continuation, client_call_details, request)
Ancestors
- grpc.aio._interceptor.UnaryStreamClientInterceptor
- HeaderAdderInterceptor
- grpc.aio._interceptor.ClientInterceptor
Methods
async def intercept_unary_stream(self, continuation, client_call_details, request)
-
Intercepts a unary-stream invocation asynchronously.
The function could return the call object or an asynchronous iterator, in case of being an asyncrhonous iterator this will become the source of the reads done by the caller.
Args
continuation
- A coroutine that proceeds with the invocation by
executing the next interceptor in the chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
call = await continuation(client_call_details, request)
to continue with the RPC.continuation
returns the call to the RPC. client_call_details
- A ClientCallDetails object describing the outgoing RPC.
request
- The request value for the RPC.
Returns
The RPC Call or an asynchronous iterator.
Raises
AioRpcError
- Indicating that the RPC terminated with non-OK status.
asyncio.CancelledError
- Indicating that the RPC was canceled.
Expand source code
async def intercept_unary_stream(self, continuation, client_call_details, request): return await self._intercept(continuation, client_call_details, request)
class HeaderAdderInterceptor1 (headers)
-
Affords intercepting stream-unary invocations.
Expand source code
class HeaderAdderInterceptor1(grpc.aio.StreamUnaryClientInterceptor,HeaderAdderInterceptor): async def intercept_stream_unary(self, continuation, client_call_details, request_iterator): return await self._intercept(continuation, client_call_details, request_iterator)
Ancestors
- grpc.aio._interceptor.StreamUnaryClientInterceptor
- HeaderAdderInterceptor
- grpc.aio._interceptor.ClientInterceptor
Methods
async def intercept_stream_unary(self, continuation, client_call_details, request_iterator)
-
Intercepts a stream-unary invocation asynchronously.
Within the interceptor the usage of the call methods like
write
or even awaiting the call should be done carefully, since the caller could be expecting an untouched call, for example for start writing messages to it.Args
continuation
- A coroutine that proceeds with the invocation by
executing the next interceptor in the chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
call = await continuation(client_call_details, request_iterator)
to continue with the RPC.continuation
returns the call to the RPC. client_call_details
- A ClientCallDetails object describing the outgoing RPC.
request_iterator
- The request iterator that will produce requests for the RPC.
Returns
The RPC Call.
Raises
AioRpcError
- Indicating that the RPC terminated with non-OK status.
asyncio.CancelledError
- Indicating that the RPC was canceled.
Expand source code
async def intercept_stream_unary(self, continuation, client_call_details, request_iterator): return await self._intercept(continuation, client_call_details, request_iterator)
class HeaderAdderInterceptor2 (headers)
-
Affords intercepting stream-stream invocations.
Expand source code
class HeaderAdderInterceptor2(grpc.aio.StreamStreamClientInterceptor,HeaderAdderInterceptor): async def intercept_stream_stream(self, continuation, client_call_details, request_iterator): return await self._intercept(continuation, client_call_details, request_iterator)
Ancestors
- grpc.aio._interceptor.StreamStreamClientInterceptor
- HeaderAdderInterceptor
- grpc.aio._interceptor.ClientInterceptor
Methods
async def intercept_stream_stream(self, continuation, client_call_details, request_iterator)
-
Intercepts a stream-stream invocation asynchronously.
Within the interceptor the usage of the call methods like
write
or even awaiting the call should be done carefully, since the caller could be expecting an untouched call, for example for start writing messages to it.The function could return the call object or an asynchronous iterator, in case of being an asyncrhonous iterator this will become the source of the reads done by the caller.
Args
continuation
- A coroutine that proceeds with the invocation by
executing the next interceptor in the chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
call = await continuation(client_call_details, request_iterator)
to continue with the RPC.continuation
returns the call to the RPC. client_call_details
- A ClientCallDetails object describing the outgoing RPC.
request_iterator
- The request iterator that will produce requests for the RPC.
Returns
The RPC Call or an asynchronous iterator.
Raises
AioRpcError
- Indicating that the RPC terminated with non-OK status.
asyncio.CancelledError
- Indicating that the RPC was canceled.
Expand source code
async def intercept_stream_stream(self, continuation, client_call_details, request_iterator): return await self._intercept(continuation, client_call_details, request_iterator)
class HeaderAdderInterceptor3 (headers)
-
Affords intercepting unary-unary invocations.
Expand source code
class HeaderAdderInterceptor3(grpc.aio.UnaryUnaryClientInterceptor,HeaderAdderInterceptor): async def intercept_unary_unary(self, continuation, client_call_details, request): return await self._intercept(continuation, client_call_details, request)
Ancestors
- grpc.aio._interceptor.UnaryUnaryClientInterceptor
- HeaderAdderInterceptor
- grpc.aio._interceptor.ClientInterceptor
Methods
async def intercept_unary_unary(self, continuation, client_call_details, request)
-
Intercepts a unary-unary invocation asynchronously.
Args
continuation
- A coroutine that proceeds with the invocation by
executing the next interceptor in the chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
call = await continuation(client_call_details, request)
to continue with the RPC.continuation
returns the call to the RPC. client_call_details
- A ClientCallDetails object describing the outgoing RPC.
request
- The request value for the RPC.
Returns
An object with the RPC response.
Raises
AioRpcError
- Indicating that the RPC terminated with non-OK status.
asyncio.CancelledError
- Indicating that the RPC was canceled.
Expand source code
async def intercept_unary_unary(self, continuation, client_call_details, request): return await self._intercept(continuation, client_call_details, request)
class OpenAgentsNode (config: NodeConfig)
-
An OpenAgents node that can run jobs. The node can be configured with the following environment variables: - NODE_NAME: The name of the node. Defaults to "OpenAgentsNode". - NODE_ICON: The icon of the node. Defaults to "". - NODE_VERSION: The version of the node. Defaults to "0.0.1". - NODE_DESCRIPTION: The description of the node. Defaults to "". - POOL_ADDRESS: The address of the pool. Defaults to " - POOL_PORT: The port of the pool. Defaults to 5000. - POOL_SSL: Whether to use SSL for the pool. Defaults to False. - NODE_TPS: The ticks per second of the node main loop. Defaults to 10. - NODE_TOKEN: The token of the node. Defaults to None. - NWC: Nostr wallet connect URL
Expand source code
class OpenAgentsNode: """ An OpenAgents node that can run jobs. The node can be configured with the following environment variables: - NODE_NAME: The name of the node. Defaults to "OpenAgentsNode". - NODE_ICON: The icon of the node. Defaults to "". - NODE_VERSION: The version of the node. Defaults to "0.0.1". - NODE_DESCRIPTION: The description of the node. Defaults to "". - POOL_ADDRESS: The address of the pool. Defaults to " - POOL_PORT: The port of the pool. Defaults to 5000. - POOL_SSL: Whether to use SSL for the pool. Defaults to False. - NODE_TPS: The ticks per second of the node main loop. Defaults to 10. - NODE_TOKEN: The token of the node. Defaults to None. - NWC: Nostr wallet connect URL """ def __init__(self, config: NodeConfig): self.meta = config.getMeta() self.nextNodeAnnounce = 0 self.channel = None self.rpcClient = None self.registeredRunners=[] self.poolAddress = None self.poolPort = None self.lockedJobs = [] self.isLooping = False self.logger = None self.loopInterval = 100 self.NWC = os.getenv('NWC', None) if self.NWC and "prices" not in self.meta: self.meta["prices"] = [ { "amount": int(os.getenv('PRICE_SATS', os.getenv('PRICE_MSATS', "3000"))), "currency": os.getenv('CURRENCY', "bitcoin"), "protocol": os.getenv('PROTOCOL', "lightning"), } ] self.nodeName = self.meta["name"] self.nodeIcon = self.meta["picture"] self.nodeVersion = self.meta["version"] self.nodeDescription = self.meta["description"] self.channel = None self.rpcClient = None self.logger = Logger(self.nodeName,self.nodeVersion) self.logger.info("Starting "+self.nodeName+" v"+self.nodeVersion) def getMeta(self): return self.meta def registerRunner(self, runner:JobRunner) -> None: """ Register a runner to the node. Args: runner (JobRunner): The runner to register. """ self.registeredRunners.append({ "runner": runner, "nextAnnouncementTimestamp": 0 }) def getLogger(self): """ Get the active logger for the node. """ return self.logger def _getClient(self): """ Get or create a GRPC client for the node. """ if self.channel is None or self.channel._channel.check_connectivity_state(True) == grpc.ChannelConnectivity.SHUTDOWN: if self.channel is not None: try: self.getLogger().info("Closing channel") self.channel.close() except Exception as e: self.getLogger().error("Error closing channel "+str(e)) self.getLogger().info("Connect to "+self.poolAddress+":"+str(self.poolPort)+" with ssl "+str(self.poolSsl)) options=[ # 20 MB ('grpc.max_send_message_length', 1024*1024*20), ('grpc.max_receive_message_length', 1024*1024*20) ] interceptors=None nodeToken = os.getenv('NODE_TOKEN', None) nwc = self.NWC if nodeToken or nwc: metadata=[] if nwc: metadata.append(("nwc", str(nwc))) if nodeToken: metadata.append(("authorization", str(nodeToken))) if interceptors is None: interceptors=[] interceptors.append(HeaderAdderInterceptor0(metadata)) interceptors.append(HeaderAdderInterceptor1(metadata)) interceptors.append(HeaderAdderInterceptor2(metadata)) interceptors.append(HeaderAdderInterceptor3(metadata)) if nwc: self.getLogger().info("This node can receive payments") else: self.getLogger().warn("This node is not enabled to receive payments. Please provide a NWC URL") if self.poolSsl: self.channel = grpc.aio.secure_channel(self.poolAddress+":"+str(self.poolPort), grpc.ssl_channel_credentials(),options,interceptors=interceptors) else: self.channel = grpc.aio.insecure_channel(self.poolAddress+":"+str(self.poolPort),options,interceptors=interceptors) self.rpcClient = rpc_pb2_grpc.PoolConnectorStub(self.channel) return self.rpcClient async def _logToJob(self, message:str, jobId:str=None): """ Log a message to a job. Args: message (str): The message to log. jobId (str): The ID of the job to log to. """ try: await self._getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message)) except Exception as e: print("Error logging to job "+str(e)) def _log(self,message:str, jobId:str=None): """ Log a message to the network. Args: message (str): The message to log. jobId (str): The ID of the job to log to. """ if jobId: asyncio.create_task(self._logToJob(message, jobId)) async def _acceptJob(self, jobId:str): """ Accept a job. Args: jobId (str): The ID of the job to accept. """ await self._getClient().acceptJob(rpc_pb2.RpcAcceptJob(jobId=jobId)) async def _executePendingJobForRunner(self , runner:JobRunner): """ Execute all pending jobs for a runner. Args: runner (JobRunner): The runner to execute the job. """ if len([x for x in self.registeredRunners if x["runner"]==runner])==0: del self.runnerTasks[runner] return try: if not runner.initialized: runner.initialized=True await runner.init(self) client = self._getClient() jobs=[] filter = runner.getFilter() meta = runner.getMeta() prices = "prices" in meta and meta["prices"] or None self.lockedJobs = [x for x in self.lockedJobs if time.time()-x[1] < 60] jobs.extend((await client.getPendingJobs(rpc_pb2.RpcGetPendingJobs( filterByRunOn = filter["filterByRunOn"] if "filterByRunOn" in filter else None, filterByCustomer = filter["filterByCustomer"] if "filterByCustomer" in filter else None, filterByDescription = filter["filterByDescription"] if "filterByDescription" in filter else None, filterById = filter["filterById"] if "filterById" in filter else None, filterByKind = filter["filterByKind"] if "filterByKind" in filter else None, filterByBids = prices, wait=60000, # exclude failed jobs excludeId = [x[0] for x in self.lockedJobs] ))).jobs) if len(jobs)>0 : self.getLogger().log(str(len(jobs))+" pending jobs for "+runner.__class__.__name__) else : self.getLogger().finer("No pending jobs for "+runner.__class__.__name__) for job in jobs: wasAccepted=False t=time.time() ctx = JobContext(self,runner,job) try: client = self._getClient() # Refresh client connection if needed if not await runner.canRun(ctx): await ctx.close() continue self.lockedJobs.append([job.id, time.time()]) await self._acceptJob(job.id) wasAccepted = True ctx.getLogger().info("Job started on node "+self.nodeName) await runner.preRun(ctx) async def task(): try: output=await runner.run(ctx) await runner.postRun(ctx) ctx.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id) await client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output)) except Exception as e: ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) if wasAccepted: await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e))) traceback.print_exc() await ctx.close() if not runner.isRunInParallel(): await task() else: asyncio.create_task(task()) except Exception as e: ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) await ctx.close() if wasAccepted: await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e))) traceback.print_exc() except Exception as e: traceback.print_exc() self.getLogger().error("Error executing runner "+str(e)) await asyncio.sleep(5000.0/1000.0) self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner)) runnerTasks={} async def _executePendingJob(self ): """ Execute all pending jobs for all runners. """ for reg in self.registeredRunners: try: runner = reg["runner"] if not runner in self.runnerTasks: self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner)) except Exception as e: self.getLogger().log("Error executing pending job "+str(e), None) async def reannounce(self): """ Reannounce the node and all templates. """ # Announce node try: time_ms=int(time.time()*1000) if time_ms >= self.nextNodeAnnounce: try: client = self._getClient() res=await client.announceNode(rpc_pb2.RpcAnnounceNodeRequest( iconUrl = self.nodeIcon, name = self.nodeName, description = self.nodeDescription, )) self.nextNodeAnnounce = int(time.time()*1000) + res.refreshInterval self.getLogger().log("Node announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: self.getLogger().error("Error announcing node "+ str(e), None) self.nextNodeAnnounce = int(time.time()*1000) + 5000 for reg in self.registeredRunners: try: if time_ms >= reg["nextAnnouncementTimestamp"]: client = self._getClient() res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest( meta=json.dumps(reg["runner"].getMeta()), template=reg["runner"].getTemplate(), sockets=json.dumps(reg["runner"].getSockets()) )) reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: self.getLogger().error("Error announcing template "+ str(e), None) reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000 except Exception as e: self.getLogger().error("Error reannouncing "+str(e), None) await asyncio.sleep(5) asyncio.create_task(self.reannounce()) async def _loop(self): """ The main loop of the node. """ promises = [reg["runner"].loop(self) for reg in self.registeredRunners] await asyncio.gather(*promises) self.isLooping = False await asyncio.sleep(self.loopInterval/1000.0) asyncio.create_task(self._loop()) async def _run(self, poolAddress=None, poolPort=None, poolSsl=False): """ Internal method to run the node. Should not be called, use start() instead. """ await asyncio.sleep(5000.0/1000.0) self.poolAddress = poolAddress or os.getenv('POOL_ADDRESS', "playground.openagents.com") self.poolPort = poolPort or int(os.getenv('POOL_PORT', "6021")) self.poolSsl = poolSsl or os.getenv('POOL_SSL', "true")== "true" self.loopInterval = 1000.0/int(os.getenv('NODE_TPS', "10")) await self._loop() await self.reannounce() while True: await self._executePendingJob() await asyncio.sleep(1000.0/1000.0) def start(self, poolAddress:str=None, poolPort:str=None): """ Start the node in an asyncio event loop. Args: poolAddress (str): The address of the pool. Defaults to the environment variable POOL_ADDRESS. poolPort (int): The port of the pool. Defaults to the environment variable POOL_PORT. """ asyncio.run(self._run(poolAddress, poolPort))
Class variables
var runnerTasks
Methods
def getLogger(self)
-
Get the active logger for the node.
Expand source code
def getLogger(self): """ Get the active logger for the node. """ return self.logger
def getMeta(self)
-
Expand source code
def getMeta(self): return self.meta
async def reannounce(self)
-
Reannounce the node and all templates.
Expand source code
async def reannounce(self): """ Reannounce the node and all templates. """ # Announce node try: time_ms=int(time.time()*1000) if time_ms >= self.nextNodeAnnounce: try: client = self._getClient() res=await client.announceNode(rpc_pb2.RpcAnnounceNodeRequest( iconUrl = self.nodeIcon, name = self.nodeName, description = self.nodeDescription, )) self.nextNodeAnnounce = int(time.time()*1000) + res.refreshInterval self.getLogger().log("Node announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: self.getLogger().error("Error announcing node "+ str(e), None) self.nextNodeAnnounce = int(time.time()*1000) + 5000 for reg in self.registeredRunners: try: if time_ms >= reg["nextAnnouncementTimestamp"]: client = self._getClient() res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest( meta=json.dumps(reg["runner"].getMeta()), template=reg["runner"].getTemplate(), sockets=json.dumps(reg["runner"].getSockets()) )) reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: self.getLogger().error("Error announcing template "+ str(e), None) reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000 except Exception as e: self.getLogger().error("Error reannouncing "+str(e), None) await asyncio.sleep(5) asyncio.create_task(self.reannounce())
def registerRunner(self, runner: JobRunner) ‑> None
-
Register a runner to the node.
Args
runner
:JobRunner
- The runner to register.
Expand source code
def registerRunner(self, runner:JobRunner) -> None: """ Register a runner to the node. Args: runner (JobRunner): The runner to register. """ self.registeredRunners.append({ "runner": runner, "nextAnnouncementTimestamp": 0 })
def start(self, poolAddress: str = None, poolPort: str = None)
-
Start the node in an asyncio event loop.
Args
poolAddress
:str
- The address of the pool. Defaults to the environment variable POOL_ADDRESS.
poolPort
:int
- The port of the pool. Defaults to the environment variable POOL_PORT.
Expand source code
def start(self, poolAddress:str=None, poolPort:str=None): """ Start the node in an asyncio event loop. Args: poolAddress (str): The address of the pool. Defaults to the environment variable POOL_ADDRESS. poolPort (int): The port of the pool. Defaults to the environment variable POOL_PORT. """ asyncio.run(self._run(poolAddress, poolPort))