Module openagents.OpenAgentsNode

Expand source code
import grpc
from openagents_grpc_proto import rpc_pb2_grpc
from openagents_grpc_proto import rpc_pb2
import time
import os
import traceback
import asyncio
from .JobRunner import JobRunner
from .NodeConfig import NodeConfig
from .Logger import Logger
from typing import Union
from .JobContext import JobContext
import json
class HeaderAdderInterceptor(
    grpc.aio.ClientInterceptor     
):
    """
    An interceptor for GRPC that adds headers to outgoing requests.
    """
    def __init__(self, headers):
        self._headers = headers

  

    async def _intercept(self, continuation, client_call_details, request_or_iterator):
        metadata = client_call_details.metadata
        if not metadata:
            metadata = grpc.aio.Metadata()
        for header in self._headers:
            metadata.add(header[0], header[1])
        new_client_call_details = client_call_details._replace(metadata=metadata)
        response = await continuation(new_client_call_details, request_or_iterator)
        return response

class HeaderAdderInterceptor0(grpc.aio.UnaryStreamClientInterceptor,HeaderAdderInterceptor):
    async def intercept_unary_stream(self, continuation, client_call_details, request):
        return await self._intercept(continuation, client_call_details, request)

class HeaderAdderInterceptor1(grpc.aio.StreamUnaryClientInterceptor,HeaderAdderInterceptor):
    async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
        return await self._intercept(continuation, client_call_details, request_iterator)
    
class HeaderAdderInterceptor2(grpc.aio.StreamStreamClientInterceptor,HeaderAdderInterceptor):
    async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
        return await self._intercept(continuation, client_call_details, request_iterator)

class HeaderAdderInterceptor3(grpc.aio.UnaryUnaryClientInterceptor,HeaderAdderInterceptor):
    async def intercept_unary_unary(self, continuation, client_call_details, request):
        return await self._intercept(continuation, client_call_details, request)
 

class OpenAgentsNode:
    """
    An OpenAgents node that can run jobs.
    The node can be configured with the following environment variables:
    - NODE_NAME: The name of the node. Defaults to "OpenAgentsNode".
    - NODE_ICON: The icon of the node. Defaults to "".
    - NODE_VERSION: The version of the node. Defaults to "0.0.1".
    - NODE_DESCRIPTION: The description of the node. Defaults to "".
    - POOL_ADDRESS: The address of the pool. Defaults to "
    - POOL_PORT: The port of the pool. Defaults to 5000.
    - POOL_SSL: Whether to use SSL for the pool. Defaults to False.
    - NODE_TPS: The ticks per second of the node main loop. Defaults to 10.
    - NODE_TOKEN: The token of the node. Defaults to None.
    - NWC: Nostr wallet connect URL
    """
  
    def __init__(self, config: NodeConfig):
        self.meta = config.getMeta()
            
        self.nextNodeAnnounce = 0        
        self.channel = None
        self.rpcClient = None
        self.registeredRunners=[]
        self.poolAddress = None
        self.poolPort = None
        self.lockedJobs = []
        self.isLooping = False
        self.logger = None
        self.loopInterval = 100
        
        self.NWC = os.getenv('NWC', None)
        if self.NWC and "prices" not in self.meta:
            self.meta["prices"] = [
                {
                    "amount": int(os.getenv('PRICE_SATS', os.getenv('PRICE_MSATS', "3000"))),
                    "currency": os.getenv('CURRENCY', "bitcoin"),
                    "protocol": os.getenv('PROTOCOL', "lightning"),
                }
            ]

        self.nodeName = self.meta["name"]
        self.nodeIcon =  self.meta["picture"]
        self.nodeVersion =  self.meta["version"]
        self.nodeDescription =  self.meta["description"]

        self.channel = None
        self.rpcClient = None
        self.logger = Logger(self.nodeName,self.nodeVersion)
        self.logger.info("Starting "+self.nodeName+" v"+self.nodeVersion)

    def getMeta(self):
        return self.meta

    def registerRunner(self, runner:JobRunner) -> None:
        """
        Register a runner to the node.
        Args:
            runner (JobRunner): The runner to register.
        """
        self.registeredRunners.append({
            "runner": runner,
            "nextAnnouncementTimestamp": 0    
        })

    def getLogger(self):
        """
        Get the active logger for the node.
        """
        return self.logger        

    def _getClient(self): 
        """
        Get or create a GRPC client for the node.
        """
        if self.channel is None or self.channel._channel.check_connectivity_state(True)  == grpc.ChannelConnectivity.SHUTDOWN:
            if self.channel is not None:
                try:
                    self.getLogger().info("Closing channel")
                    self.channel.close()
                except Exception as e:
                    self.getLogger().error("Error closing channel "+str(e))
            self.getLogger().info("Connect to "+self.poolAddress+":"+str(self.poolPort)+" with ssl "+str(self.poolSsl))
            
            options=[
                # 20 MB
                ('grpc.max_send_message_length', 1024*1024*20),
                ('grpc.max_receive_message_length', 1024*1024*20)
            ]

            interceptors=None
            nodeToken = os.getenv('NODE_TOKEN', None)
            nwc = self.NWC
            if nodeToken or nwc:
                metadata=[]
                if nwc:
                    metadata.append(("nwc", str(nwc)))                
                if nodeToken:
                    metadata.append(("authorization", str(nodeToken)))                    
                if interceptors is None: interceptors=[]
                interceptors.append(HeaderAdderInterceptor0(metadata))
                interceptors.append(HeaderAdderInterceptor1(metadata))
                interceptors.append(HeaderAdderInterceptor2(metadata))
                interceptors.append(HeaderAdderInterceptor3(metadata))
                
            if nwc:
                self.getLogger().info("This node can receive payments")
            else: 
                self.getLogger().warn("This node is not enabled to receive payments. Please provide a NWC URL")
                
            if self.poolSsl:
                self.channel = grpc.aio.secure_channel(self.poolAddress+":"+str(self.poolPort), grpc.ssl_channel_credentials(),options,interceptors=interceptors)
            else:
                self.channel = grpc.aio.insecure_channel(self.poolAddress+":"+str(self.poolPort),options,interceptors=interceptors)
            
            self.rpcClient = rpc_pb2_grpc.PoolConnectorStub(self.channel)
            

        return self.rpcClient

    async def _logToJob(self, message:str, jobId:str=None):
        """
        Log a message to a job.
        Args:
            message (str): The message to log.
            jobId (str): The ID of the job to log to.
        """
        try:
            await self._getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message)) 
        except Exception as e:
            print("Error logging to job "+str(e))

    def _log(self,message:str, jobId:str=None):
        """
        Log a message to the network.
        Args:
            message (str): The message to log.
            jobId (str): The ID of the job to log to.
        """
        if jobId: 
            asyncio.create_task(self._logToJob(message, jobId))
    
    async def _acceptJob(self, jobId:str):
        """
        Accept a job.
        Args:
            jobId (str): The ID of the job to accept.
        """
        await self._getClient().acceptJob(rpc_pb2.RpcAcceptJob(jobId=jobId))

    async def _executePendingJobForRunner(self , runner:JobRunner):
        """
        Execute all pending jobs for a runner.
        Args:
            runner (JobRunner): The runner to execute the job.
        """
        if len([x for x in self.registeredRunners if x["runner"]==runner])==0:
            del self.runnerTasks[runner]
            return
        
        try:
            if not runner.initialized:
                runner.initialized=True
                await runner.init(self)
            client = self._getClient()
            jobs=[]
            filter = runner.getFilter()
            meta = runner.getMeta()
            prices = "prices" in meta and meta["prices"] or None

            self.lockedJobs = [x for x in self.lockedJobs if time.time()-x[1] < 60]
            jobs.extend((await client.getPendingJobs(rpc_pb2.RpcGetPendingJobs(
                filterByRunOn =  filter["filterByRunOn"] if "filterByRunOn" in filter else None,
                filterByCustomer = filter["filterByCustomer"] if "filterByCustomer" in filter else None,
                filterByDescription = filter["filterByDescription"] if "filterByDescription" in filter else None,
                filterById = filter["filterById"] if "filterById" in filter else None,
                filterByKind  = filter["filterByKind"] if "filterByKind" in filter else None,
                filterByBids = prices,
                wait=60000,
                # exclude failed jobs
                excludeId = [x[0] for x in self.lockedJobs]
            ))).jobs)    

            if len(jobs)>0 : self.getLogger().log(str(len(jobs))+" pending jobs for "+runner.__class__.__name__)
            else : self.getLogger().finer("No pending jobs for "+runner.__class__.__name__)
            
            for job in jobs:              
                wasAccepted=False
                t=time.time()   
                ctx = JobContext(self,runner,job)
                try:
                    client = self._getClient() # Refresh client connection if needed
                    if not await runner.canRun(ctx):
                        await ctx.close()
                        continue
                    self.lockedJobs.append([job.id, time.time()])
                    await self._acceptJob(job.id)
                    wasAccepted = True


                    
                    ctx.getLogger().info("Job started on node "+self.nodeName)  
                    
                    await runner.preRun(ctx)
                    async def task():
                        try:
                            output=await runner.run(ctx) 
                            await runner.postRun(ctx)                  
                            ctx.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id)                
                            await client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output))
                        except Exception as e:
                            ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
                            if wasAccepted:
                                await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
                            traceback.print_exc()
                        await ctx.close()
                    if not runner.isRunInParallel():
                        await task()
                    else:
                        asyncio.create_task(task())
                except Exception as e:
                    ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
                    await ctx.close()
                    if wasAccepted:
                        await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
                    traceback.print_exc()

        except Exception as e:
            traceback.print_exc()
            self.getLogger().error("Error executing runner "+str(e))
            await asyncio.sleep(5000.0/1000.0)
        self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner))

 
    runnerTasks={}
    async def _executePendingJob(self ):
        """
        Execute all pending jobs for all runners.
        """
        for reg in self.registeredRunners:
            try:
                runner = reg["runner"]
                if not runner in self.runnerTasks:
                    self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner))
            except Exception as e:
                self.getLogger().log("Error executing pending job "+str(e), None)


    async def reannounce(self):    
        """
        Reannounce the node and all templates.
        """
        # Announce node
        try:
            time_ms=int(time.time()*1000)
            if time_ms >= self.nextNodeAnnounce:
                try:
                    client = self._getClient()
                    res=await client.announceNode(rpc_pb2.RpcAnnounceNodeRequest(
                        iconUrl = self.nodeIcon,
                        name = self.nodeName,
                        description = self.nodeDescription,
                    ))
                    self.nextNodeAnnounce = int(time.time()*1000) + res.refreshInterval
                    self.getLogger().log("Node announced, next announcement in "+str(res.refreshInterval)+" ms")
                except Exception as e:
                    self.getLogger().error("Error announcing node "+ str(e), None)
                    self.nextNodeAnnounce = int(time.time()*1000) + 5000

            for reg in self.registeredRunners:
                try:
                    if time_ms >=  reg["nextAnnouncementTimestamp"]:
                        client = self._getClient()
                        res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest(
                            meta=json.dumps(reg["runner"].getMeta()),
                            template=reg["runner"].getTemplate(),
                            sockets=json.dumps(reg["runner"].getSockets())
                        ))
                        reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval
                        self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms")
                except Exception as e:
                    self.getLogger().error("Error announcing template "+ str(e), None)
                    reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000
        except Exception as e:
            self.getLogger().error("Error reannouncing "+str(e), None)
        await asyncio.sleep(5)
        asyncio.create_task(self.reannounce())
  
    async def _loop(self):
        """
        The main loop of the node.
        """
        promises = [reg["runner"].loop(self) for reg in self.registeredRunners]
        await asyncio.gather(*promises)
        self.isLooping = False
        await asyncio.sleep(self.loopInterval/1000.0)
        asyncio.create_task(self._loop())
        

    async def _run(self, poolAddress=None, poolPort=None, poolSsl=False):
        """
        Internal method to run the node.
        Should not be called, use start() instead.
        """
        await asyncio.sleep(5000.0/1000.0)
        self.poolAddress = poolAddress or os.getenv('POOL_ADDRESS', "playground.openagents.com")
        self.poolPort = poolPort or int(os.getenv('POOL_PORT', "6021"))
        self.poolSsl = poolSsl or os.getenv('POOL_SSL', "true")== "true"
        self.loopInterval = 1000.0/int(os.getenv('NODE_TPS', "10"))

        await self._loop()
        await self.reannounce()
        while True:
            await self._executePendingJob()
            await asyncio.sleep(1000.0/1000.0)
        
    def start(self, poolAddress:str=None, poolPort:str=None):
        """
        Start the node in an asyncio event loop.
        Args:
            poolAddress (str): The address of the pool. Defaults to
                the environment variable POOL_ADDRESS.
            poolPort (int): The port of the pool. Defaults to the
                environment variable POOL_PORT.
        """
        asyncio.run(self._run(poolAddress, poolPort))

Classes

class HeaderAdderInterceptor (headers)

An interceptor for GRPC that adds headers to outgoing requests.

Expand source code
class HeaderAdderInterceptor(
    grpc.aio.ClientInterceptor     
):
    """
    An interceptor for GRPC that adds headers to outgoing requests.
    """
    def __init__(self, headers):
        self._headers = headers

  

    async def _intercept(self, continuation, client_call_details, request_or_iterator):
        metadata = client_call_details.metadata
        if not metadata:
            metadata = grpc.aio.Metadata()
        for header in self._headers:
            metadata.add(header[0], header[1])
        new_client_call_details = client_call_details._replace(metadata=metadata)
        response = await continuation(new_client_call_details, request_or_iterator)
        return response

Ancestors

  • grpc.aio._interceptor.ClientInterceptor

Subclasses

class HeaderAdderInterceptor0 (headers)

Affords intercepting unary-stream invocations.

Expand source code
class HeaderAdderInterceptor0(grpc.aio.UnaryStreamClientInterceptor,HeaderAdderInterceptor):
    async def intercept_unary_stream(self, continuation, client_call_details, request):
        return await self._intercept(continuation, client_call_details, request)

Ancestors

  • grpc.aio._interceptor.UnaryStreamClientInterceptor
  • HeaderAdderInterceptor
  • grpc.aio._interceptor.ClientInterceptor

Methods

async def intercept_unary_stream(self, continuation, client_call_details, request)

Intercepts a unary-stream invocation asynchronously.

The function could return the call object or an asynchronous iterator, in case of being an asyncrhonous iterator this will become the source of the reads done by the caller.

Args

continuation
A coroutine that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use call = await continuation(client_call_details, request) to continue with the RPC. continuation returns the call to the RPC.
client_call_details
A ClientCallDetails object describing the outgoing RPC.
request
The request value for the RPC.

Returns

The RPC Call or an asynchronous iterator.

Raises

AioRpcError
Indicating that the RPC terminated with non-OK status.
asyncio.CancelledError
Indicating that the RPC was canceled.
Expand source code
async def intercept_unary_stream(self, continuation, client_call_details, request):
    return await self._intercept(continuation, client_call_details, request)
class HeaderAdderInterceptor1 (headers)

Affords intercepting stream-unary invocations.

Expand source code
class HeaderAdderInterceptor1(grpc.aio.StreamUnaryClientInterceptor,HeaderAdderInterceptor):
    async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
        return await self._intercept(continuation, client_call_details, request_iterator)

Ancestors

  • grpc.aio._interceptor.StreamUnaryClientInterceptor
  • HeaderAdderInterceptor
  • grpc.aio._interceptor.ClientInterceptor

Methods

async def intercept_stream_unary(self, continuation, client_call_details, request_iterator)

Intercepts a stream-unary invocation asynchronously.

Within the interceptor the usage of the call methods like write or even awaiting the call should be done carefully, since the caller could be expecting an untouched call, for example for start writing messages to it.

Args

continuation
A coroutine that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use call = await continuation(client_call_details, request_iterator) to continue with the RPC. continuation returns the call to the RPC.
client_call_details
A ClientCallDetails object describing the outgoing RPC.
request_iterator
The request iterator that will produce requests for the RPC.

Returns

The RPC Call.

Raises

AioRpcError
Indicating that the RPC terminated with non-OK status.
asyncio.CancelledError
Indicating that the RPC was canceled.
Expand source code
async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
    return await self._intercept(continuation, client_call_details, request_iterator)
class HeaderAdderInterceptor2 (headers)

Affords intercepting stream-stream invocations.

Expand source code
class HeaderAdderInterceptor2(grpc.aio.StreamStreamClientInterceptor,HeaderAdderInterceptor):
    async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
        return await self._intercept(continuation, client_call_details, request_iterator)

Ancestors

  • grpc.aio._interceptor.StreamStreamClientInterceptor
  • HeaderAdderInterceptor
  • grpc.aio._interceptor.ClientInterceptor

Methods

async def intercept_stream_stream(self, continuation, client_call_details, request_iterator)

Intercepts a stream-stream invocation asynchronously.

Within the interceptor the usage of the call methods like write or even awaiting the call should be done carefully, since the caller could be expecting an untouched call, for example for start writing messages to it.

The function could return the call object or an asynchronous iterator, in case of being an asyncrhonous iterator this will become the source of the reads done by the caller.

Args

continuation
A coroutine that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use call = await continuation(client_call_details, request_iterator) to continue with the RPC. continuation returns the call to the RPC.
client_call_details
A ClientCallDetails object describing the outgoing RPC.
request_iterator
The request iterator that will produce requests for the RPC.

Returns

The RPC Call or an asynchronous iterator.

Raises

AioRpcError
Indicating that the RPC terminated with non-OK status.
asyncio.CancelledError
Indicating that the RPC was canceled.
Expand source code
async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
    return await self._intercept(continuation, client_call_details, request_iterator)
class HeaderAdderInterceptor3 (headers)

Affords intercepting unary-unary invocations.

Expand source code
class HeaderAdderInterceptor3(grpc.aio.UnaryUnaryClientInterceptor,HeaderAdderInterceptor):
    async def intercept_unary_unary(self, continuation, client_call_details, request):
        return await self._intercept(continuation, client_call_details, request)

Ancestors

  • grpc.aio._interceptor.UnaryUnaryClientInterceptor
  • HeaderAdderInterceptor
  • grpc.aio._interceptor.ClientInterceptor

Methods

async def intercept_unary_unary(self, continuation, client_call_details, request)

Intercepts a unary-unary invocation asynchronously.

Args

continuation
A coroutine that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use call = await continuation(client_call_details, request) to continue with the RPC. continuation returns the call to the RPC.
client_call_details
A ClientCallDetails object describing the outgoing RPC.
request
The request value for the RPC.

Returns

An object with the RPC response.

Raises

AioRpcError
Indicating that the RPC terminated with non-OK status.
asyncio.CancelledError
Indicating that the RPC was canceled.
Expand source code
async def intercept_unary_unary(self, continuation, client_call_details, request):
    return await self._intercept(continuation, client_call_details, request)
class OpenAgentsNode (config: NodeConfig)

An OpenAgents node that can run jobs. The node can be configured with the following environment variables: - NODE_NAME: The name of the node. Defaults to "OpenAgentsNode". - NODE_ICON: The icon of the node. Defaults to "". - NODE_VERSION: The version of the node. Defaults to "0.0.1". - NODE_DESCRIPTION: The description of the node. Defaults to "". - POOL_ADDRESS: The address of the pool. Defaults to " - POOL_PORT: The port of the pool. Defaults to 5000. - POOL_SSL: Whether to use SSL for the pool. Defaults to False. - NODE_TPS: The ticks per second of the node main loop. Defaults to 10. - NODE_TOKEN: The token of the node. Defaults to None. - NWC: Nostr wallet connect URL

Expand source code
class OpenAgentsNode:
    """
    An OpenAgents node that can run jobs.
    The node can be configured with the following environment variables:
    - NODE_NAME: The name of the node. Defaults to "OpenAgentsNode".
    - NODE_ICON: The icon of the node. Defaults to "".
    - NODE_VERSION: The version of the node. Defaults to "0.0.1".
    - NODE_DESCRIPTION: The description of the node. Defaults to "".
    - POOL_ADDRESS: The address of the pool. Defaults to "
    - POOL_PORT: The port of the pool. Defaults to 5000.
    - POOL_SSL: Whether to use SSL for the pool. Defaults to False.
    - NODE_TPS: The ticks per second of the node main loop. Defaults to 10.
    - NODE_TOKEN: The token of the node. Defaults to None.
    - NWC: Nostr wallet connect URL
    """
  
    def __init__(self, config: NodeConfig):
        self.meta = config.getMeta()
            
        self.nextNodeAnnounce = 0        
        self.channel = None
        self.rpcClient = None
        self.registeredRunners=[]
        self.poolAddress = None
        self.poolPort = None
        self.lockedJobs = []
        self.isLooping = False
        self.logger = None
        self.loopInterval = 100
        
        self.NWC = os.getenv('NWC', None)
        if self.NWC and "prices" not in self.meta:
            self.meta["prices"] = [
                {
                    "amount": int(os.getenv('PRICE_SATS', os.getenv('PRICE_MSATS', "3000"))),
                    "currency": os.getenv('CURRENCY', "bitcoin"),
                    "protocol": os.getenv('PROTOCOL', "lightning"),
                }
            ]

        self.nodeName = self.meta["name"]
        self.nodeIcon =  self.meta["picture"]
        self.nodeVersion =  self.meta["version"]
        self.nodeDescription =  self.meta["description"]

        self.channel = None
        self.rpcClient = None
        self.logger = Logger(self.nodeName,self.nodeVersion)
        self.logger.info("Starting "+self.nodeName+" v"+self.nodeVersion)

    def getMeta(self):
        return self.meta

    def registerRunner(self, runner:JobRunner) -> None:
        """
        Register a runner to the node.
        Args:
            runner (JobRunner): The runner to register.
        """
        self.registeredRunners.append({
            "runner": runner,
            "nextAnnouncementTimestamp": 0    
        })

    def getLogger(self):
        """
        Get the active logger for the node.
        """
        return self.logger        

    def _getClient(self): 
        """
        Get or create a GRPC client for the node.
        """
        if self.channel is None or self.channel._channel.check_connectivity_state(True)  == grpc.ChannelConnectivity.SHUTDOWN:
            if self.channel is not None:
                try:
                    self.getLogger().info("Closing channel")
                    self.channel.close()
                except Exception as e:
                    self.getLogger().error("Error closing channel "+str(e))
            self.getLogger().info("Connect to "+self.poolAddress+":"+str(self.poolPort)+" with ssl "+str(self.poolSsl))
            
            options=[
                # 20 MB
                ('grpc.max_send_message_length', 1024*1024*20),
                ('grpc.max_receive_message_length', 1024*1024*20)
            ]

            interceptors=None
            nodeToken = os.getenv('NODE_TOKEN', None)
            nwc = self.NWC
            if nodeToken or nwc:
                metadata=[]
                if nwc:
                    metadata.append(("nwc", str(nwc)))                
                if nodeToken:
                    metadata.append(("authorization", str(nodeToken)))                    
                if interceptors is None: interceptors=[]
                interceptors.append(HeaderAdderInterceptor0(metadata))
                interceptors.append(HeaderAdderInterceptor1(metadata))
                interceptors.append(HeaderAdderInterceptor2(metadata))
                interceptors.append(HeaderAdderInterceptor3(metadata))
                
            if nwc:
                self.getLogger().info("This node can receive payments")
            else: 
                self.getLogger().warn("This node is not enabled to receive payments. Please provide a NWC URL")
                
            if self.poolSsl:
                self.channel = grpc.aio.secure_channel(self.poolAddress+":"+str(self.poolPort), grpc.ssl_channel_credentials(),options,interceptors=interceptors)
            else:
                self.channel = grpc.aio.insecure_channel(self.poolAddress+":"+str(self.poolPort),options,interceptors=interceptors)
            
            self.rpcClient = rpc_pb2_grpc.PoolConnectorStub(self.channel)
            

        return self.rpcClient

    async def _logToJob(self, message:str, jobId:str=None):
        """
        Log a message to a job.
        Args:
            message (str): The message to log.
            jobId (str): The ID of the job to log to.
        """
        try:
            await self._getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message)) 
        except Exception as e:
            print("Error logging to job "+str(e))

    def _log(self,message:str, jobId:str=None):
        """
        Log a message to the network.
        Args:
            message (str): The message to log.
            jobId (str): The ID of the job to log to.
        """
        if jobId: 
            asyncio.create_task(self._logToJob(message, jobId))
    
    async def _acceptJob(self, jobId:str):
        """
        Accept a job.
        Args:
            jobId (str): The ID of the job to accept.
        """
        await self._getClient().acceptJob(rpc_pb2.RpcAcceptJob(jobId=jobId))

    async def _executePendingJobForRunner(self , runner:JobRunner):
        """
        Execute all pending jobs for a runner.
        Args:
            runner (JobRunner): The runner to execute the job.
        """
        if len([x for x in self.registeredRunners if x["runner"]==runner])==0:
            del self.runnerTasks[runner]
            return
        
        try:
            if not runner.initialized:
                runner.initialized=True
                await runner.init(self)
            client = self._getClient()
            jobs=[]
            filter = runner.getFilter()
            meta = runner.getMeta()
            prices = "prices" in meta and meta["prices"] or None

            self.lockedJobs = [x for x in self.lockedJobs if time.time()-x[1] < 60]
            jobs.extend((await client.getPendingJobs(rpc_pb2.RpcGetPendingJobs(
                filterByRunOn =  filter["filterByRunOn"] if "filterByRunOn" in filter else None,
                filterByCustomer = filter["filterByCustomer"] if "filterByCustomer" in filter else None,
                filterByDescription = filter["filterByDescription"] if "filterByDescription" in filter else None,
                filterById = filter["filterById"] if "filterById" in filter else None,
                filterByKind  = filter["filterByKind"] if "filterByKind" in filter else None,
                filterByBids = prices,
                wait=60000,
                # exclude failed jobs
                excludeId = [x[0] for x in self.lockedJobs]
            ))).jobs)    

            if len(jobs)>0 : self.getLogger().log(str(len(jobs))+" pending jobs for "+runner.__class__.__name__)
            else : self.getLogger().finer("No pending jobs for "+runner.__class__.__name__)
            
            for job in jobs:              
                wasAccepted=False
                t=time.time()   
                ctx = JobContext(self,runner,job)
                try:
                    client = self._getClient() # Refresh client connection if needed
                    if not await runner.canRun(ctx):
                        await ctx.close()
                        continue
                    self.lockedJobs.append([job.id, time.time()])
                    await self._acceptJob(job.id)
                    wasAccepted = True


                    
                    ctx.getLogger().info("Job started on node "+self.nodeName)  
                    
                    await runner.preRun(ctx)
                    async def task():
                        try:
                            output=await runner.run(ctx) 
                            await runner.postRun(ctx)                  
                            ctx.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id)                
                            await client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output))
                        except Exception as e:
                            ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
                            if wasAccepted:
                                await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
                            traceback.print_exc()
                        await ctx.close()
                    if not runner.isRunInParallel():
                        await task()
                    else:
                        asyncio.create_task(task())
                except Exception as e:
                    ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
                    await ctx.close()
                    if wasAccepted:
                        await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
                    traceback.print_exc()

        except Exception as e:
            traceback.print_exc()
            self.getLogger().error("Error executing runner "+str(e))
            await asyncio.sleep(5000.0/1000.0)
        self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner))

 
    runnerTasks={}
    async def _executePendingJob(self ):
        """
        Execute all pending jobs for all runners.
        """
        for reg in self.registeredRunners:
            try:
                runner = reg["runner"]
                if not runner in self.runnerTasks:
                    self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner))
            except Exception as e:
                self.getLogger().log("Error executing pending job "+str(e), None)


    async def reannounce(self):    
        """
        Reannounce the node and all templates.
        """
        # Announce node
        try:
            time_ms=int(time.time()*1000)
            if time_ms >= self.nextNodeAnnounce:
                try:
                    client = self._getClient()
                    res=await client.announceNode(rpc_pb2.RpcAnnounceNodeRequest(
                        iconUrl = self.nodeIcon,
                        name = self.nodeName,
                        description = self.nodeDescription,
                    ))
                    self.nextNodeAnnounce = int(time.time()*1000) + res.refreshInterval
                    self.getLogger().log("Node announced, next announcement in "+str(res.refreshInterval)+" ms")
                except Exception as e:
                    self.getLogger().error("Error announcing node "+ str(e), None)
                    self.nextNodeAnnounce = int(time.time()*1000) + 5000

            for reg in self.registeredRunners:
                try:
                    if time_ms >=  reg["nextAnnouncementTimestamp"]:
                        client = self._getClient()
                        res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest(
                            meta=json.dumps(reg["runner"].getMeta()),
                            template=reg["runner"].getTemplate(),
                            sockets=json.dumps(reg["runner"].getSockets())
                        ))
                        reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval
                        self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms")
                except Exception as e:
                    self.getLogger().error("Error announcing template "+ str(e), None)
                    reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000
        except Exception as e:
            self.getLogger().error("Error reannouncing "+str(e), None)
        await asyncio.sleep(5)
        asyncio.create_task(self.reannounce())
  
    async def _loop(self):
        """
        The main loop of the node.
        """
        promises = [reg["runner"].loop(self) for reg in self.registeredRunners]
        await asyncio.gather(*promises)
        self.isLooping = False
        await asyncio.sleep(self.loopInterval/1000.0)
        asyncio.create_task(self._loop())
        

    async def _run(self, poolAddress=None, poolPort=None, poolSsl=False):
        """
        Internal method to run the node.
        Should not be called, use start() instead.
        """
        await asyncio.sleep(5000.0/1000.0)
        self.poolAddress = poolAddress or os.getenv('POOL_ADDRESS', "playground.openagents.com")
        self.poolPort = poolPort or int(os.getenv('POOL_PORT', "6021"))
        self.poolSsl = poolSsl or os.getenv('POOL_SSL', "true")== "true"
        self.loopInterval = 1000.0/int(os.getenv('NODE_TPS', "10"))

        await self._loop()
        await self.reannounce()
        while True:
            await self._executePendingJob()
            await asyncio.sleep(1000.0/1000.0)
        
    def start(self, poolAddress:str=None, poolPort:str=None):
        """
        Start the node in an asyncio event loop.
        Args:
            poolAddress (str): The address of the pool. Defaults to
                the environment variable POOL_ADDRESS.
            poolPort (int): The port of the pool. Defaults to the
                environment variable POOL_PORT.
        """
        asyncio.run(self._run(poolAddress, poolPort))

Class variables

var runnerTasks

Methods

def getLogger(self)

Get the active logger for the node.

Expand source code
def getLogger(self):
    """
    Get the active logger for the node.
    """
    return self.logger        
def getMeta(self)
Expand source code
def getMeta(self):
    return self.meta
async def reannounce(self)

Reannounce the node and all templates.

Expand source code
async def reannounce(self):    
    """
    Reannounce the node and all templates.
    """
    # Announce node
    try:
        time_ms=int(time.time()*1000)
        if time_ms >= self.nextNodeAnnounce:
            try:
                client = self._getClient()
                res=await client.announceNode(rpc_pb2.RpcAnnounceNodeRequest(
                    iconUrl = self.nodeIcon,
                    name = self.nodeName,
                    description = self.nodeDescription,
                ))
                self.nextNodeAnnounce = int(time.time()*1000) + res.refreshInterval
                self.getLogger().log("Node announced, next announcement in "+str(res.refreshInterval)+" ms")
            except Exception as e:
                self.getLogger().error("Error announcing node "+ str(e), None)
                self.nextNodeAnnounce = int(time.time()*1000) + 5000

        for reg in self.registeredRunners:
            try:
                if time_ms >=  reg["nextAnnouncementTimestamp"]:
                    client = self._getClient()
                    res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest(
                        meta=json.dumps(reg["runner"].getMeta()),
                        template=reg["runner"].getTemplate(),
                        sockets=json.dumps(reg["runner"].getSockets())
                    ))
                    reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval
                    self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms")
            except Exception as e:
                self.getLogger().error("Error announcing template "+ str(e), None)
                reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000
    except Exception as e:
        self.getLogger().error("Error reannouncing "+str(e), None)
    await asyncio.sleep(5)
    asyncio.create_task(self.reannounce())
def registerRunner(self, runner: JobRunner) ‑> None

Register a runner to the node.

Args

runner : JobRunner
The runner to register.
Expand source code
def registerRunner(self, runner:JobRunner) -> None:
    """
    Register a runner to the node.
    Args:
        runner (JobRunner): The runner to register.
    """
    self.registeredRunners.append({
        "runner": runner,
        "nextAnnouncementTimestamp": 0    
    })
def start(self, poolAddress: str = None, poolPort: str = None)

Start the node in an asyncio event loop.

Args

poolAddress : str
The address of the pool. Defaults to the environment variable POOL_ADDRESS.
poolPort : int
The port of the pool. Defaults to the environment variable POOL_PORT.
Expand source code
def start(self, poolAddress:str=None, poolPort:str=None):
    """
    Start the node in an asyncio event loop.
    Args:
        poolAddress (str): The address of the pool. Defaults to
            the environment variable POOL_ADDRESS.
        poolPort (int): The port of the pool. Defaults to the
            environment variable POOL_PORT.
    """
    asyncio.run(self._run(poolAddress, poolPort))