Files
agents/log_reader_agent.py
2026-03-21 13:01:52 +00:00

114 lines
3.4 KiB
Python

# python3 -m pip install --user virtualenv
# python3 -m venv myenv
# source myenv/bin/activate
# pip install langgraph langchain langchain-community langchain-ollama
from typing import TypedDict, List
#from asyncio import tools
from langgraph.graph import StateGraph, END
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import RecursiveCharacterTextSplitter
import subprocess
import shutil
try:
from langchain_ollama import ChatOllama
except ImportError:
from fake_ChatOllama import ChatOllama # type: ignore
# ----- Shared State -----
class AgentState(TypedDict):
service: str
log: List[str] # A list of findings
response: str # The final output
def reader_node(state: AgentState):
service = state["service"]
print(f"Reader is looking up: {service} logs...")
if shutil.which('docker'):
cmd = ['docker', 'logs', service]
else:
print('docker not installed, using dummy log')
cmd = ['cat', 'docker_log_pihole.txt']
try:
result = subprocess.run(cmd, capture_output=True, shell=False, check=True).stdout.decode('utf-8')
except Exception as e:
print("Error during log retrieval:", e)
result = f"Could not read log for service: {service}:\n {e}"
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=80)
try:
chunks = text_splitter.split_text(result)
results = chunks
except Exception as e:
print("Error during log splitting:", e)
results = f"Error during log splitting for service: {service}:\n {e}"
# Only return the keys you want to update
return {"log": state.get("log", []) + [results]}
def analyzer_node(state: AgentState):
print("start analyzing log...")
service = state["service"]
data = state["log"][-1] if state["log"] else ""
llm = ChatOllama(model="qwen3:8b", base_url="http://localhost:11434")
prompt = ChatPromptTemplate.from_template(
"""You are a helpful AI assistant.
Provide a concise summary of the service health from {service} docker log, including any errors, warnings, or notable events.
You must answer based *only* on the supplied log. If the log is empty or do not contain anythin indicating service-health, say 'I could not find
any information in the supplied information.'
Log:
{data}
"""
)
chain = prompt | llm
response = chain.invoke({"service": service, "data": data})
print("Analyzing complete.")
return {"response": response.content}
# ----- Build the LangGraph -----
workflow = StateGraph(AgentState)
workflow.add_node("Reader", reader_node)
workflow.add_node("Analyzer", analyzer_node)
# Flow: Start -> Reader -> Writer -> END
workflow.set_entry_point("Reader")
workflow.add_edge("Reader", "Analyzer")
workflow.add_edge("Analyzer", END)
app = workflow.compile()
if __name__ == "__main__":
import sys
import my_tools
#print("Starting the Multi-Agent System...\n")
if len(sys.argv) > 1:
service = sys.argv[1]
else:
exit("Please provide the service name as a command-line argument.")
inputs: AgentState = {
"service": service,
"log": [],
"response": "",
}
result = app.invoke(inputs)
#print(result["response"])
my_tools.tool_send_mail('johan.p1sson@gmail.com', service + ' weekly log analysis result', result["response"])