303 lines
10 KiB
Python
303 lines
10 KiB
Python
# python3 -m pip install --user virtualenv
|
|
# python3 -m venv venv
|
|
# source venv/bin/activate
|
|
# pip install langgraph langchain langchain-community langchain-ollama
|
|
# source venv/bin/activate
|
|
|
|
|
|
from datetime import timedelta
|
|
from typing import TypedDict, List
|
|
from urllib import response
|
|
#from asyncio import tools
|
|
from langgraph.graph import START, StateGraph, END
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
from langchain_core.messages import HumanMessage, SystemMessage
|
|
from langchain_core.tools import tool
|
|
from langgraph.prebuilt import ToolNode
|
|
import requests, json, re, string
|
|
from html.parser import HTMLParser
|
|
from langchain_ollama import ChatOllama
|
|
import operator
|
|
from typing_extensions import TypedDict, Annotated
|
|
|
|
class HTMLCrawler(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.skip_content = False # Skip content inside <script> and <style>
|
|
self.crawled_text = ''
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
# Skip content inside script and style tags
|
|
if tag in ['script', 'style']:
|
|
self.skip_content = True
|
|
|
|
def handle_endtag(self, tag):
|
|
# Resume collecting after closing script and style tags
|
|
if tag in ['script', 'style']:
|
|
self.skip_content = False
|
|
|
|
def handle_data(self, data):
|
|
# Only process data if not inside script/style tags
|
|
if self.skip_content:
|
|
return
|
|
|
|
# Clean up the data: strip whitespace and filter empty lines
|
|
text = data.strip()
|
|
|
|
# Skip if empty or too short
|
|
if not text or len(text) < 2:
|
|
return
|
|
|
|
# Skip common junk: JSON-like patterns, URLs in certain contexts, etc.
|
|
if text.startswith('{') or text.startswith('['):
|
|
return
|
|
|
|
# Print meaningful text
|
|
#print("data :", text)
|
|
self.crawled_text += text + '\n'
|
|
|
|
|
|
|
|
# Configuration
|
|
LLM_BASE_URL = "http://192.168.50.215:11434"
|
|
MODEL_NAME = "qwen3:8b"
|
|
#MODEL_NAME = "gpt-oss:20b"
|
|
LINKDING_API_URL = "https://linkding.hal.se/api/bookmarks/"
|
|
LINKDING_API_TOKEN = "fa54dee2ccbcad80a0c6259bdbbed896581e1423"
|
|
|
|
|
|
llm = ChatOllama(model=MODEL_NAME, base_url=LLM_BASE_URL)
|
|
|
|
@tool
|
|
def todays_date() -> str:
|
|
"""Use this tool whenever you need to know today's date in ISO 8601 format."""
|
|
from datetime import datetime
|
|
return datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
@tool
|
|
def calculate_date(dat: str, days: int) -> str:
|
|
"""Use this tool whenever you need to calculate the date in ISO 8601 format given a date and the number of days to deduce from that date."""
|
|
from datetime import datetime
|
|
date1 = datetime.strptime(dat, "%Y-%m-%dT%H:%M:%SZ")
|
|
date2 = date1 - timedelta(days=days)
|
|
return date2.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
@tool
|
|
def crawl_homepage(url:str) -> str:
|
|
"""Use this tool whenever you need to read the content of a home page on the internet.
|
|
|
|
Args:
|
|
url: home page url to crawl
|
|
|
|
Returns:
|
|
Formatted text from homepage
|
|
"""
|
|
htmlParser = HTMLCrawler()
|
|
try:
|
|
raw_html = requests.get(url.strip()).text
|
|
htmlParser.feed(raw_html)
|
|
except Exception as e:
|
|
return f"Error occurred while parsing HTML: {e}"
|
|
|
|
return htmlParser.crawled_text
|
|
|
|
@tool
|
|
def fetch_bookmarks(date_added: str = "2026-03-01T00:00:00Z") -> str:
|
|
"""Use this tool whenever you need to fetch bookmarks.
|
|
|
|
Args:
|
|
date_added: The date from which to fetch bookmarks, in ISO 8601 format (e.g., "2026-04-01T00:00:00Z").
|
|
|
|
Returns:
|
|
Formatted bookmark with titles, descriptions, URL, date_added, tags and ID.
|
|
"""
|
|
|
|
_url = f"{LINKDING_API_URL}?added_since={date_added}"
|
|
|
|
_headers = {
|
|
"Authorization": "Token " + LINKDING_API_TOKEN
|
|
}
|
|
|
|
response = requests.get(_url, headers=_headers)
|
|
data = response.json()
|
|
|
|
if not data:
|
|
return f"No bookmarks found"
|
|
|
|
formatted_results = [f"Bookmarks:"]
|
|
for i, bookmark in enumerate(data['results'], 1):
|
|
title = bookmark.get('title', 'No title')
|
|
id = bookmark.get('id', 'No ID')
|
|
url = bookmark.get('url', 'No URL')
|
|
date_added = bookmark.get('date_added', 'No date added')
|
|
description = bookmark.get('description', 'No description')
|
|
tag_names = bookmark.get('tag_names', 'No tags')
|
|
|
|
text = f" {title}\n {description}\n {url}\n {date_added}\n {tag_names}\n {id}\n"
|
|
|
|
formatted_results.append(text)
|
|
|
|
return "\n\n".join(formatted_results)
|
|
|
|
@tool
|
|
def add_tag_to_bookmark(bookmark_id: int, tag: str) -> str:
|
|
"""Use this tool whenever you need to add a tag to a bookmark.
|
|
|
|
Args:
|
|
bookmark_id: The ID of the bookmark to which the tag should be added.
|
|
tag: The tag to add to the bookmark.
|
|
|
|
Returns:
|
|
A message indicating whether the tag was successfully added or if an error occurred.
|
|
"""
|
|
_url = f"{LINKDING_API_URL}{bookmark_id}/"
|
|
_headers = {
|
|
"Authorization": "Token " + LINKDING_API_TOKEN,
|
|
"Content-Type": "application/json"
|
|
}
|
|
payload = {
|
|
"tag_names": [tag]
|
|
}
|
|
|
|
try:
|
|
response = requests.patch(_url, headers=_headers, data=json.dumps(payload))
|
|
response.raise_for_status() # Raise an exception for HTTP errors
|
|
return f"Tag '{tag}' successfully added to bookmark with ID {bookmark_id}."
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Error occurred while adding tag: {e}"
|
|
|
|
@tool
|
|
def write_to_file(filename: str, content: str) -> str:
|
|
"""Use this tool whenever you need to write content to a file.
|
|
|
|
Args:
|
|
filename: The name of the file to which the content should be written.
|
|
content: The content to write to the file.
|
|
|
|
Returns:
|
|
A message indicating whether the content was successfully written or if an error occurred.
|
|
"""
|
|
try:
|
|
with open(filename, 'a') as f:
|
|
f.write(content)
|
|
return f"Content successfully written to {filename}."
|
|
except Exception as e:
|
|
return f"Error occurred while writing to file: {e}"
|
|
|
|
# ----- Shared State -----
|
|
class AgentState(TypedDict):
|
|
messages: Annotated[list, operator.add]
|
|
|
|
# ----- Agent Nodes -----
|
|
def agent_node(state: AgentState):
|
|
"""This is the main agent node that processes messages and decides when to call tools."""
|
|
llm_with_tools = llm.bind_tools([add_tag_to_bookmark, fetch_bookmarks, crawl_homepage, todays_date, calculate_date, write_to_file])
|
|
|
|
system_prompt = SystemMessage(f"""
|
|
You are a bookmark processing agent. You have these tools:
|
|
1. **todays_date**: Get today's date
|
|
2. **calculate_date**: Calculate a past date
|
|
3. **fetch_bookmarks**: Get bookmarks added since a date
|
|
4. **crawl_homepage**: Read website content
|
|
5. **write_to_file**: Write content to ~/bookmark_summaries.md
|
|
6. **add_tag_to_bookmark**: Add tags to bookmarks
|
|
|
|
YOUR TASK - FOLLOW THIS EXACTLY:
|
|
|
|
PHASE 1: Get bookmarks
|
|
- Call todays_date to get current date
|
|
- Call calculate_date to get the date 28 days ago
|
|
- Call fetch_bookmarks with that date to get all bookmarks
|
|
If No bookmarks found, stop here. Otherwise, move to PHASE 2.
|
|
|
|
PHASE 2: Process EACH bookmark (do NOT skip any):
|
|
For each bookmark from fetch_bookmarks:
|
|
Step A: Call crawl_homepage with the bookmark URL
|
|
Step B: IMMEDIATELY call write_to_file to write: [URL] | [DESCRIPTION] | [CRAWLED CONTENT SUMMARY OF MAX 100 WORDS]
|
|
Step C: IMMEDIATELY call add_tag_to_bookmark with the bookmark ID and 1-2 relevant tags
|
|
Step D: ONLY THEN move to the next bookmark
|
|
|
|
CRITICAL RULES:
|
|
- NEVER respond with text - ONLY call tools
|
|
- Process ALL bookmarks before finishing
|
|
- For each bookmark, MUST call: crawl_homepage, write_to_file, add_tag_to_bookmark (IN THAT ORDER)
|
|
- Do not stop until all bookmarks have all three tools called
|
|
""")
|
|
|
|
|
|
messages = [system_prompt] + state['messages']
|
|
|
|
response = llm_with_tools.invoke(messages)
|
|
|
|
if hasattr(response, 'tool_calls') and response.tool_calls:
|
|
for tc in response.tool_calls:
|
|
print(f"[AGENT] called Tool {tc.get('name', '?')} with args {tc.get('args', '?')}")
|
|
else:
|
|
print(f"[AGENT] Responding...")
|
|
|
|
return {'messages': [response]}
|
|
|
|
|
|
def should_continue(state: AgentState):
|
|
last = state['messages'][-1]
|
|
|
|
if hasattr(last, 'tool_calls') and last.tool_calls:
|
|
return "tools"
|
|
else:
|
|
return END
|
|
|
|
# =============================================================================
|
|
# Graph
|
|
# =============================================================================
|
|
def create_agent():
|
|
|
|
builder = StateGraph(AgentState)
|
|
|
|
builder.add_node("agent", agent_node)
|
|
builder.add_node("tools", ToolNode([add_tag_to_bookmark, fetch_bookmarks, crawl_homepage, todays_date, calculate_date, write_to_file]))
|
|
builder.set_entry_point("agent")
|
|
|
|
builder.add_conditional_edges("agent", should_continue, ["tools", END])
|
|
|
|
builder.add_edge("tools", "agent")
|
|
|
|
graph = builder.compile()
|
|
|
|
return graph
|
|
|
|
agent = create_agent()
|
|
human_prompt = HumanMessage("Process all bookmarks from the last 14 days: fetch them, summarize their content, write summaries to a file, and add relevant tags.")
|
|
result = agent.invoke({'messages': [human_prompt]})
|
|
|
|
|
|
#print(result['messages'])
|
|
#print(result['messages'][-1].content)
|
|
|
|
|
|
"""
|
|
result = fetch_bookmarks.invoke({})
|
|
print(result)
|
|
|
|
|
|
result = fetch_bookmarks.invoke({'date_added': "2026-04-09T20:26:31Z"})
|
|
print(result)
|
|
|
|
|
|
result = crawl_homepage.invoke({'url': "http://example.com"})
|
|
print(result)
|
|
|
|
result = todays_date.invoke({})
|
|
print(result)
|
|
|
|
#result = add_tag_to_bookmark.invoke({'bookmark_id': 123, 'tag': 'newtag'})
|
|
#print(result)
|
|
|
|
#result = add_tag_to_bookmark.invoke({'bookmark_id': 4, 'tag': 'another_tag'})
|
|
#print(result)
|
|
|
|
result = calculate_date.invoke({'dat': "2026-03-01T00:00:00Z", 'days': 3})
|
|
print(result)
|
|
|
|
result = write_to_file.invoke({'filename': 'test.txt', 'content': 'This is a test.'})
|
|
print(result)
|
|
""" |