241 lines
7.3 KiB
Python
241 lines
7.3 KiB
Python
# python3 -m pip install --user virtualenv
|
|
# python3 -m venv venv
|
|
# source venv/bin/activate
|
|
# pip install langgraph langchain langchain-community langchain-ollama
|
|
# source venv/bin/activate
|
|
|
|
|
|
from typing import TypedDict, List
|
|
from urllib import response
|
|
#from asyncio import tools
|
|
from langgraph.graph import START, StateGraph, END
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
from langchain_core.messages import HumanMessage, SystemMessage
|
|
from langchain_core.tools import tool
|
|
from langgraph.prebuilt import ToolNode
|
|
import requests, json, re, string
|
|
from html.parser import HTMLParser
|
|
from langchain_ollama import ChatOllama
|
|
import operator
|
|
from typing_extensions import TypedDict, Annotated
|
|
|
|
class HTMLCrawler(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.skip_content = False # Skip content inside <script> and <style>
|
|
self.crawled_text = ''
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
# Skip content inside script and style tags
|
|
if tag in ['script', 'style']:
|
|
self.skip_content = True
|
|
|
|
def handle_endtag(self, tag):
|
|
# Resume collecting after closing script and style tags
|
|
if tag in ['script', 'style']:
|
|
self.skip_content = False
|
|
|
|
def handle_data(self, data):
|
|
# Only process data if not inside script/style tags
|
|
if self.skip_content:
|
|
return
|
|
|
|
# Clean up the data: strip whitespace and filter empty lines
|
|
text = data.strip()
|
|
|
|
# Skip if empty or too short
|
|
if not text or len(text) < 2:
|
|
return
|
|
|
|
# Skip common junk: JSON-like patterns, URLs in certain contexts, etc.
|
|
if text.startswith('{') or text.startswith('['):
|
|
return
|
|
|
|
# Print meaningful text
|
|
#print("data :", text)
|
|
self.crawled_text += text + '\n'
|
|
|
|
|
|
|
|
# Configuration
|
|
LLM_BASE_URL = "http://192.168.50.215:11434"
|
|
MODEL_NAME = "qwen3:8b"
|
|
LINKDING_API_URL = "https://linkding.hal.se/api/bookmarks/"
|
|
LINKDING_API_TOKEN = "fa54dee2ccbcad80a0c6259bdbbed896581e1423"
|
|
|
|
|
|
llm = ChatOllama(model=MODEL_NAME, base_url=LLM_BASE_URL)
|
|
|
|
@tool
|
|
def todays_date() -> str:
|
|
"""Use this tool whenever you need to know today's date in ISO 8601 format."""
|
|
from datetime import datetime
|
|
return datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
@tool
|
|
def crawl_homepage(url:str) -> str:
|
|
"""Use this tool whenever you need to read the content of a home page on the internet.
|
|
|
|
Args:
|
|
url: home page url to crawl
|
|
|
|
Returns:
|
|
Formatted test from homepage
|
|
"""
|
|
htmlParser = HTMLCrawler()
|
|
try:
|
|
raw_html = requests.get(url.strip()).text
|
|
htmlParser.feed(raw_html)
|
|
except Exception as e:
|
|
return f"Error occurred while parsing HTML: {e}"
|
|
|
|
return htmlParser.crawled_text
|
|
|
|
@tool
|
|
def fetch_bookmarks(date_added: str = "2026-03-01T00:00:00Z") -> str:
|
|
"""Use this tool whenever you need to fetch bookmarks.
|
|
|
|
Args:
|
|
date_added: The date from which to fetch bookmarks, in ISO 8601 format (e.g., "2026-04-01T00:00:00Z").
|
|
|
|
Returns:
|
|
Formatted bookmark with titles, descriptions, URL, date_added, tags and ID.
|
|
"""
|
|
|
|
_url = f"{LINKDING_API_URL}?added_since={date_added}"
|
|
|
|
_headers = {
|
|
"Authorization": "Token " + LINKDING_API_TOKEN
|
|
}
|
|
|
|
response = requests.get(_url, headers=_headers)
|
|
data = response.json()
|
|
|
|
if not data:
|
|
return f"No bookmarks found"
|
|
|
|
formatted_results = [f"Bookmarks:"]
|
|
for i, bookmark in enumerate(data['results'], 1):
|
|
title = bookmark.get('title', 'No title')
|
|
id = bookmark.get('id', 'No ID')
|
|
url = bookmark.get('url', 'No URL')
|
|
date_added = bookmark.get('date_added', 'No date added')
|
|
description = bookmark.get('description', 'No description')
|
|
tag_names = bookmark.get('tag_names', 'No tags')
|
|
|
|
text = f" {title}\n {description}\n {url}\n {date_added}\n {tag_names}\n {id}\n"
|
|
|
|
formatted_results.append(text)
|
|
|
|
return "\n\n".join(formatted_results)
|
|
|
|
@tool
|
|
def add_tag_to_bookmark(bookmark_id: int, tag: str) -> str:
|
|
"""Use this tool whenever you need to add a tag to a bookmark.
|
|
|
|
Args:
|
|
bookmark_id: The ID of the bookmark to which the tag should be added.
|
|
tag: The tag to add to the bookmark.
|
|
|
|
Returns:
|
|
A message indicating whether the tag was successfully added or if an error occurred.
|
|
"""
|
|
_url = f"{LINKDING_API_URL}{bookmark_id}/"
|
|
_headers = {
|
|
"Authorization": "Token " + LINKDING_API_TOKEN,
|
|
"Content-Type": "application/json"
|
|
}
|
|
payload = {
|
|
"tag_names": [tag]
|
|
}
|
|
|
|
try:
|
|
response = requests.patch(_url, headers=_headers, data=json.dumps(payload))
|
|
response.raise_for_status() # Raise an exception for HTTP errors
|
|
return f"Tag '{tag}' successfully added to bookmark with ID {bookmark_id}."
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Error occurred while adding tag: {e}"
|
|
|
|
|
|
|
|
# ----- Shared State -----
|
|
class AgentState(TypedDict):
|
|
messages: Annotated[list, operator.add]
|
|
|
|
# ----- Agent Nodes -----
|
|
def agent_node(state: AgentState):
|
|
"""This is the main agent node that processes messages and decides when to call tools."""
|
|
llm_with_tools = llm.bind_tools([add_tag_to_bookmark, fetch_bookmarks, crawl_homepage, todays_date])
|
|
|
|
system_prompt = SystemMessage(f"""
|
|
You are a research agent with web search capabilities.
|
|
INSTRUCTIONS:
|
|
1. **MUST use web_search tool** first to gather information
|
|
2. Provide comprehensive research based on search results
|
|
|
|
Always call **web_search** before responding.
|
|
""")
|
|
|
|
human_prompt = HumanMessage("What are the latest bookmarks added to my Linkding?")
|
|
|
|
messages = [system_prompt, human_prompt]
|
|
|
|
response = llm_with_tools.invoke(messages)
|
|
|
|
if hasattr(response, 'tool_calls') and response.tool_calls:
|
|
for tc in response.tool_calls:
|
|
print(f"[AGENT] called Tool {tc.get('name', '?')} with args {tc.get('args', '?')}")
|
|
else:
|
|
print(f"[AGENT] Responding...")
|
|
|
|
return {'messages': [response]}
|
|
|
|
|
|
def should_continue(state: AgentState):
|
|
last = state['messages'][-1]
|
|
|
|
if hasattr(last, 'tool_calls') and last.tool_calls:
|
|
return "tools"
|
|
else:
|
|
return END
|
|
|
|
# =============================================================================
|
|
# Graph
|
|
# =============================================================================
|
|
def create_agent():
|
|
|
|
builder = StateGraph(AgentState)
|
|
|
|
builder.add_node("agent", agent_node)
|
|
builder.add_node("tools", ToolNode([add_tag_to_bookmark, fetch_bookmarks, crawl_homepage, todays_date]))
|
|
builder.set_entry_point("agent")
|
|
|
|
builder.add_conditional_edges("agent", should_continue, ["tools", END])
|
|
|
|
builder.add_edge("tools", "agent")
|
|
|
|
graph = builder.compile()
|
|
|
|
return graph
|
|
|
|
agent = create_agent()
|
|
result = agent.invoke({})
|
|
|
|
print(result['messages'])
|
|
print(result['messages'][-1].content)
|
|
|
|
|
|
result = fetch_bookmarks.invoke({})
|
|
print(result)
|
|
|
|
result = crawl_homepage.invoke({'url': "http://example.com"})
|
|
print(result)
|
|
|
|
result = todays_date.invoke({})
|
|
print(result)
|
|
|
|
result = add_tag_to_bookmark.invoke({'bookmark_id': 123, 'tag': 'newtag'})
|
|
print(result)
|
|
|
|
result = add_tag_to_bookmark.invoke({'bookmark_id': 4, 'tag': 'another_tag'})
|
|
print(result) |