Module openagents.Logger

Expand source code
import time
import os
import concurrent
from threading import Condition
import queue
from typing import Literal
import base64
import requests
import traceback
LogLevel = Literal[
    "error",
    "warn",
    "info",
    "debug",
    "fine",
    "finer",
    "finest"
]

class OpenObserveLogger:
    """
    A logger for OpenObserve that sends logs in batches.
    """
    def __init__(self, options:dict):
        self.options = options        
        self.batchSize= self.options["batchSize"]
        self.flushInterval = self.options["flushInterval"]
        if not self.flushInterval:
            self.flushInterval = 5000
        if not self.batchSize:
            self.batchSize = 21        
        self.buffer = queue.Queue()
        self.wait = Condition()
        self.flushThread = concurrent.futures.ThreadPoolExecutor(max_workers=3)
        self.flushThread.submit(self.flushLoop)
        

    def batchReady(self):
        with self.wait:
            self.wait.notify_all()

    def log(self, level:LogLevel, message:str, timestamp:int=None):
        """
        Log a message with a specific level.

        Args:
            level (LogLevel): The level of the log.
            message (str): The message to log.
            timestamp (int): The timestamp of the log. Defaults to the current time.
        """
        log_entry = {
            'level': level,
            '_timestamp': timestamp or int(time.time()*1000),
            'log': message
        }
        meta=self.options["meta"] if "meta" in self.options else {}
        for key in meta:
            log_entry[key]=meta[key]

        self.buffer.put(log_entry)
        if self.buffer.qsize() >= self.batchSize:
            self.flushThread.submit(self.batchReady)

    def _close(self):
        while not self.buffer.empty():
            with self.wait:
                self.wait.notify_all()
            time.sleep(0.1)
        self.flushThread.shutdown()
        
    def close(self):
        """
        Immediately flush all the logs to OpenObserve and shutdown the logger.
        """
        self.flushThread.submit(self._close)        
            
        
    def _flushToOpenObserve(self, batch):
        if len(batch) == 0:
            return
        try:
            url = self.options["baseUrl"]+"/api/"+self.options["org"]+"/"+self.options["stream"]+"/_json"   
            basicAuth = self.options["auth"]
            if not isinstance(basicAuth, str):
                if "username" in basicAuth and "password" in basicAuth:
                    basicAuth = basicAuth["username"]+":"+basicAuth["password"]
                    basicAuth = base64.b64encode(basicAuth.encode()).decode()
            headers = {
                'Content-Type': 'application/json',
                "Authorization": "Basic "+basicAuth if basicAuth else None
            }
            res = requests.post(url, headers=headers, json=batch)
            if res.status_code != 200:
                print("Error flushing log "+str(res.status_code))
        except Exception as e:
            print("Error flushing log "+str(e))


    def flushLoop(self):
        while True:
            with self.wait:
                self.wait.wait(self.flushInterval/1000)
            batch = []
            while not self.buffer.empty() and len(batch) < self.batchSize:
                try:
                    batch.append(self.buffer.get(block=False))
                except queue.Empty:
                    break         
            self._flushToOpenObserve(batch)


    


class Logger :
    """
    A logger for OpenAgents Nodes.
    The logger can be configured with the following environment variables:
    - LOG_LEVEL: The minimum level of logs to print. Defaults to "debug".
    - OPENOBSERVE_LOGLEVEL: The minimum level of logs to send to OpenObserve. Defaults to the value of LOG_LEVEL.
    - OPENOBSERVE_ENDPOINT: The endpoint of the OpenObserve server.
    - OPENOBSERVE_ORG: The organization to log to. Defaults to "default".
    - OPENOBSERVE_STREAM: The stream to log to. Defaults to "default".
    - OPENOBSERVE_BASICAUTH: The basic authentication to use for logging.
    - OPENOBSERVE_USERNAME: The username for basic authentication.
    - OPENOBSERVE_PASSWORD: The password for basic authentication.
    - OPENOBSERVE_BATCHSIZE: The batch size for logging to OpenObserve. Defaults to 21.
    - OPENOBSERVE_FLUSHINTERVAL: The flush interval for logging to OpenObserve. Defaults to 5000.

    """

    def __init__(self, name:str, version:str, jobId:str=None, runnerLogger=None, level=None, enableOobs:bool=True):
        """
        Create a new logger.
        Args:
            name (str): The name of the logger.
            version (str): The version of the logger.
            runnerLogger : The function to log to the runner.
            level (LogLevel): Optional: The minimum level of logs to print. Defaults to environment variable or "debug".
            enableOobs (bool): Optional: Whether to enable logging to OpenObserve. Defaults to True.
        """
        self.name=name or "main"
        self.runnerLogger=runnerLogger
        self.logLevel=None
        self.oobsLogger=None
        self.version=version
        self.jobId=jobId
        
        logLevelName = os.getenv('LOG_LEVEL', "finer" if not os.getenv('PRODUCTION', None) else "info")
        oobsLogLevelName= os.getenv('OPENOBSERVE_LOGLEVEL', "info")

        self.logLevel = self._levelToValue(logLevelName)
        self.oobsLogLevel = self._levelToValue(oobsLogLevelName)
        
        if level and self._levelToValue(level) > self.logLevel:
            self.logLevel = self._levelToValue(level)

        if level and self._levelToValue(level) > self.oobsLogLevel:
            self.oobsLogLevel = self._levelToValue(level)

        oobsEndPoint = os.getenv('OPENOBSERVE_ENDPOINT', None)
        if enableOobs and oobsEndPoint:
            
            self.oobsLogger = OpenObserveLogger({
                "baseUrl": oobsEndPoint,
                "org": os.getenv('OPENOBSERVE_ORG', "default"),
                "stream": os.getenv('OPENOBSERVE_STREAM', "default"),
                "auth": os.getenv('OPENOBSERVE_BASICAUTH', None) or {
                    "username": os.getenv('OPENOBSERVE_USERNAME', None),
                    "password": os.getenv('OPENOBSERVE_PASSWORD', None)
                },
                "batchSize": int(os.getenv('OPENOBSERVE_BATCHSIZE', 21)),
                "flushInterval": int(os.getenv('OPENOBSERVE_FLUSHINTERVAL', 0)),
                "meta":{
                    "appName": self.name,
                    "appVersion": self.version,
                    "jobId": self.jobId
                }                
            })

    def _levelToValue(self, level:LogLevel)->int:
        if level == "error": return 7
        if level == "warn": return 6
        if level == "info": return 5
        if level == "debug": return 4
        if level == "fine": return 3
        if level == "finer": return 2
        if level == "finest": return 1
        return 1

    def _log(self, level: LogLevel, args:tuple):
        message = " ".join([str(x) for x in args])

        levelV=self._levelToValue(level)
        minLevel = self.logLevel
        minObsLevel = self.oobsLogLevel
        minNostrLevel = self._levelToValue("info")

        if levelV >= minLevel:
            date = time.strftime("%Y-%m-%d %H:%M:%S")
            print(date+" ["+self.name+":"+self.version+"] "+(("("+self.jobId+")") if self.jobId else "")+": "+level+" : "+message)

        if self.oobsLogger and levelV >= minObsLevel:
            self.oobsLogger.log(level, message)
        
        if self.runnerLogger and levelV >= minNostrLevel:
            self.runnerLogger(message)


    def log(self, *args):
        self._log("debug", args)
    
    def info(self, *args):
        self._log("info", args)
    
    def warn(self, *args):
        self._log("warn",   args)
    
    def error(self, *args):
        self._log("error", args)
        traceback.print_exc()

    def debug(self, *args):
        self._log("debug", args)
    
    def fine(self, *args):
        self._log("fine",  args)
    
    def finer(self, *args):
        self._log("finer",  args)

    def finest(self, *args):
        self._log("finest", args)

    def close(self):
        if self.oobsLogger:
            self.oobsLogger.close()


       

Classes

class Logger (name: str, version: str, jobId: str = None, runnerLogger=None, level=None, enableOobs: bool = True)

A logger for OpenAgents Nodes. The logger can be configured with the following environment variables: - LOG_LEVEL: The minimum level of logs to print. Defaults to "debug". - OPENOBSERVE_LOGLEVEL: The minimum level of logs to send to OpenObserve. Defaults to the value of LOG_LEVEL. - OPENOBSERVE_ENDPOINT: The endpoint of the OpenObserve server. - OPENOBSERVE_ORG: The organization to log to. Defaults to "default". - OPENOBSERVE_STREAM: The stream to log to. Defaults to "default". - OPENOBSERVE_BASICAUTH: The basic authentication to use for logging. - OPENOBSERVE_USERNAME: The username for basic authentication. - OPENOBSERVE_PASSWORD: The password for basic authentication. - OPENOBSERVE_BATCHSIZE: The batch size for logging to OpenObserve. Defaults to 21. - OPENOBSERVE_FLUSHINTERVAL: The flush interval for logging to OpenObserve. Defaults to 5000.

Create a new logger.

Args

name : str
The name of the logger.
version : str
The version of the logger.
runnerLogger : The function to log to the runner.
level : LogLevel
Optional: The minimum level of logs to print. Defaults to environment variable or "debug".
enableOobs : bool
Optional: Whether to enable logging to OpenObserve. Defaults to True.
Expand source code
class Logger :
    """
    A logger for OpenAgents Nodes.
    The logger can be configured with the following environment variables:
    - LOG_LEVEL: The minimum level of logs to print. Defaults to "debug".
    - OPENOBSERVE_LOGLEVEL: The minimum level of logs to send to OpenObserve. Defaults to the value of LOG_LEVEL.
    - OPENOBSERVE_ENDPOINT: The endpoint of the OpenObserve server.
    - OPENOBSERVE_ORG: The organization to log to. Defaults to "default".
    - OPENOBSERVE_STREAM: The stream to log to. Defaults to "default".
    - OPENOBSERVE_BASICAUTH: The basic authentication to use for logging.
    - OPENOBSERVE_USERNAME: The username for basic authentication.
    - OPENOBSERVE_PASSWORD: The password for basic authentication.
    - OPENOBSERVE_BATCHSIZE: The batch size for logging to OpenObserve. Defaults to 21.
    - OPENOBSERVE_FLUSHINTERVAL: The flush interval for logging to OpenObserve. Defaults to 5000.

    """

    def __init__(self, name:str, version:str, jobId:str=None, runnerLogger=None, level=None, enableOobs:bool=True):
        """
        Create a new logger.
        Args:
            name (str): The name of the logger.
            version (str): The version of the logger.
            runnerLogger : The function to log to the runner.
            level (LogLevel): Optional: The minimum level of logs to print. Defaults to environment variable or "debug".
            enableOobs (bool): Optional: Whether to enable logging to OpenObserve. Defaults to True.
        """
        self.name=name or "main"
        self.runnerLogger=runnerLogger
        self.logLevel=None
        self.oobsLogger=None
        self.version=version
        self.jobId=jobId
        
        logLevelName = os.getenv('LOG_LEVEL', "finer" if not os.getenv('PRODUCTION', None) else "info")
        oobsLogLevelName= os.getenv('OPENOBSERVE_LOGLEVEL', "info")

        self.logLevel = self._levelToValue(logLevelName)
        self.oobsLogLevel = self._levelToValue(oobsLogLevelName)
        
        if level and self._levelToValue(level) > self.logLevel:
            self.logLevel = self._levelToValue(level)

        if level and self._levelToValue(level) > self.oobsLogLevel:
            self.oobsLogLevel = self._levelToValue(level)

        oobsEndPoint = os.getenv('OPENOBSERVE_ENDPOINT', None)
        if enableOobs and oobsEndPoint:
            
            self.oobsLogger = OpenObserveLogger({
                "baseUrl": oobsEndPoint,
                "org": os.getenv('OPENOBSERVE_ORG', "default"),
                "stream": os.getenv('OPENOBSERVE_STREAM', "default"),
                "auth": os.getenv('OPENOBSERVE_BASICAUTH', None) or {
                    "username": os.getenv('OPENOBSERVE_USERNAME', None),
                    "password": os.getenv('OPENOBSERVE_PASSWORD', None)
                },
                "batchSize": int(os.getenv('OPENOBSERVE_BATCHSIZE', 21)),
                "flushInterval": int(os.getenv('OPENOBSERVE_FLUSHINTERVAL', 0)),
                "meta":{
                    "appName": self.name,
                    "appVersion": self.version,
                    "jobId": self.jobId
                }                
            })

    def _levelToValue(self, level:LogLevel)->int:
        if level == "error": return 7
        if level == "warn": return 6
        if level == "info": return 5
        if level == "debug": return 4
        if level == "fine": return 3
        if level == "finer": return 2
        if level == "finest": return 1
        return 1

    def _log(self, level: LogLevel, args:tuple):
        message = " ".join([str(x) for x in args])

        levelV=self._levelToValue(level)
        minLevel = self.logLevel
        minObsLevel = self.oobsLogLevel
        minNostrLevel = self._levelToValue("info")

        if levelV >= minLevel:
            date = time.strftime("%Y-%m-%d %H:%M:%S")
            print(date+" ["+self.name+":"+self.version+"] "+(("("+self.jobId+")") if self.jobId else "")+": "+level+" : "+message)

        if self.oobsLogger and levelV >= minObsLevel:
            self.oobsLogger.log(level, message)
        
        if self.runnerLogger and levelV >= minNostrLevel:
            self.runnerLogger(message)


    def log(self, *args):
        self._log("debug", args)
    
    def info(self, *args):
        self._log("info", args)
    
    def warn(self, *args):
        self._log("warn",   args)
    
    def error(self, *args):
        self._log("error", args)
        traceback.print_exc()

    def debug(self, *args):
        self._log("debug", args)
    
    def fine(self, *args):
        self._log("fine",  args)
    
    def finer(self, *args):
        self._log("finer",  args)

    def finest(self, *args):
        self._log("finest", args)

    def close(self):
        if self.oobsLogger:
            self.oobsLogger.close()

Methods

def close(self)
Expand source code
def close(self):
    if self.oobsLogger:
        self.oobsLogger.close()
def debug(self, *args)
Expand source code
def debug(self, *args):
    self._log("debug", args)
def error(self, *args)
Expand source code
def error(self, *args):
    self._log("error", args)
    traceback.print_exc()
def fine(self, *args)
Expand source code
def fine(self, *args):
    self._log("fine",  args)
def finer(self, *args)
Expand source code
def finer(self, *args):
    self._log("finer",  args)
def finest(self, *args)
Expand source code
def finest(self, *args):
    self._log("finest", args)
def info(self, *args)
Expand source code
def info(self, *args):
    self._log("info", args)
def log(self, *args)
Expand source code
def log(self, *args):
    self._log("debug", args)
def warn(self, *args)
Expand source code
def warn(self, *args):
    self._log("warn",   args)
class OpenObserveLogger (options: dict)

A logger for OpenObserve that sends logs in batches.

Expand source code
class OpenObserveLogger:
    """
    A logger for OpenObserve that sends logs in batches.
    """
    def __init__(self, options:dict):
        self.options = options        
        self.batchSize= self.options["batchSize"]
        self.flushInterval = self.options["flushInterval"]
        if not self.flushInterval:
            self.flushInterval = 5000
        if not self.batchSize:
            self.batchSize = 21        
        self.buffer = queue.Queue()
        self.wait = Condition()
        self.flushThread = concurrent.futures.ThreadPoolExecutor(max_workers=3)
        self.flushThread.submit(self.flushLoop)
        

    def batchReady(self):
        with self.wait:
            self.wait.notify_all()

    def log(self, level:LogLevel, message:str, timestamp:int=None):
        """
        Log a message with a specific level.

        Args:
            level (LogLevel): The level of the log.
            message (str): The message to log.
            timestamp (int): The timestamp of the log. Defaults to the current time.
        """
        log_entry = {
            'level': level,
            '_timestamp': timestamp or int(time.time()*1000),
            'log': message
        }
        meta=self.options["meta"] if "meta" in self.options else {}
        for key in meta:
            log_entry[key]=meta[key]

        self.buffer.put(log_entry)
        if self.buffer.qsize() >= self.batchSize:
            self.flushThread.submit(self.batchReady)

    def _close(self):
        while not self.buffer.empty():
            with self.wait:
                self.wait.notify_all()
            time.sleep(0.1)
        self.flushThread.shutdown()
        
    def close(self):
        """
        Immediately flush all the logs to OpenObserve and shutdown the logger.
        """
        self.flushThread.submit(self._close)        
            
        
    def _flushToOpenObserve(self, batch):
        if len(batch) == 0:
            return
        try:
            url = self.options["baseUrl"]+"/api/"+self.options["org"]+"/"+self.options["stream"]+"/_json"   
            basicAuth = self.options["auth"]
            if not isinstance(basicAuth, str):
                if "username" in basicAuth and "password" in basicAuth:
                    basicAuth = basicAuth["username"]+":"+basicAuth["password"]
                    basicAuth = base64.b64encode(basicAuth.encode()).decode()
            headers = {
                'Content-Type': 'application/json',
                "Authorization": "Basic "+basicAuth if basicAuth else None
            }
            res = requests.post(url, headers=headers, json=batch)
            if res.status_code != 200:
                print("Error flushing log "+str(res.status_code))
        except Exception as e:
            print("Error flushing log "+str(e))


    def flushLoop(self):
        while True:
            with self.wait:
                self.wait.wait(self.flushInterval/1000)
            batch = []
            while not self.buffer.empty() and len(batch) < self.batchSize:
                try:
                    batch.append(self.buffer.get(block=False))
                except queue.Empty:
                    break         
            self._flushToOpenObserve(batch)

Methods

def batchReady(self)
Expand source code
def batchReady(self):
    with self.wait:
        self.wait.notify_all()
def close(self)

Immediately flush all the logs to OpenObserve and shutdown the logger.

Expand source code
def close(self):
    """
    Immediately flush all the logs to OpenObserve and shutdown the logger.
    """
    self.flushThread.submit(self._close)        
def flushLoop(self)
Expand source code
def flushLoop(self):
    while True:
        with self.wait:
            self.wait.wait(self.flushInterval/1000)
        batch = []
        while not self.buffer.empty() and len(batch) < self.batchSize:
            try:
                batch.append(self.buffer.get(block=False))
            except queue.Empty:
                break         
        self._flushToOpenObserve(batch)
def log(self, level: Literal['error', 'warn', 'info', 'debug', 'fine', 'finer', 'finest'], message: str, timestamp: int = None)

Log a message with a specific level.

Args

level : LogLevel
The level of the log.
message : str
The message to log.
timestamp : int
The timestamp of the log. Defaults to the current time.
Expand source code
def log(self, level:LogLevel, message:str, timestamp:int=None):
    """
    Log a message with a specific level.

    Args:
        level (LogLevel): The level of the log.
        message (str): The message to log.
        timestamp (int): The timestamp of the log. Defaults to the current time.
    """
    log_entry = {
        'level': level,
        '_timestamp': timestamp or int(time.time()*1000),
        'log': message
    }
    meta=self.options["meta"] if "meta" in self.options else {}
    for key in meta:
        log_entry[key]=meta[key]

    self.buffer.put(log_entry)
    if self.buffer.qsize() >= self.batchSize:
        self.flushThread.submit(self.batchReady)