Agent
我们将爬虫整合一下,变成一个 LuoguCrawlerAgent 类,主要职责是爬取题目和题解。并顺便添加缓存功能,避免在测试阶段多次在爬取和解析上浪费时间。
创建一个 AnalysisAgent 类,用于接收题目数据,生成 prompt 喂给大模型,然后给出结构化输出。这是本项目的核心。
LuoguCrawlerAgent:爬虫策略的执行官
在上一篇文章中,我们探讨了爬取洛谷的“混合动力”策略(requests + bs4 + Crawl4ai)以及踩过的坑。今天,我们把这个策略封装成一个可重用、有状态的 LuoguCrawlerAgent 类。
这个 Agent 是整个爬虫的“大脑”和“指挥官”。它的核心职责是将之前零散的函数(如 search_for_solution_urls 和 extract_solution_content)编排成一个健壮的、自动化的流水线。
核心设计:指挥官与流水线
经过重构,LuoguCrawlerAgent 的职责非常清晰。它的公共入口只有一个:
async def run(self, problem_id: str, max_solutions: int = 3) -> Dict[str, Any]:
这个 run 方法本身不执行任何爬虫,它只做三件事:检查缓存、执行任务、保存缓存。
1. 缓存优先 (Caching)
这是 Agent 最重要的功能之一。run 方法会首先调用 _load_from_cache。
如果它在本地(例如 ./luogu_cache/P4137.json)找到了缓存文件,并且缓存的题解数量大于等于本次请求的 max_solutions,它会立刻返回数据,不执行任何爬虫。这极大地节省了时间和 LLM Token。当我们后续开发“分析 Agent”时,我们可以反复运行程序,而爬虫 Agent 会立即从本地返回数据,让我们能专注于调试 AI 的部分。
2. 爬取流水线 (Pipeline)
如果缓存未命中,run 方法会调用 _execute_crawl_pipeline 来执行实际的爬取工作。这个流水线就是基于我们昨天的策略。
- 并发启动: 使用
asyncio.gather同时启动两个任务:- 任务A (
_fetch_problem_details): 用Crawl4ai提取题目详情。 - 任务B (
_search_solution_urls): 用requests+bs4(在run_in_executor中运行)搜索题解链接列表。
- 任务A (
- 并发提取: 拿到题解 URL 列表后,再次使用
asyncio.gather并发爬取(_fetch_solution_content)max_solutions篇题解的详细内容。 - 汇总: 收集所有结果,打包成
final_output字典。
3. 保存结果 (Saving)
爬取流水线返回结果后,run 方法会调用 _save_to_cache,使用 aiofiles 异步地将 final_output 写入 JSON 文件,供下一次运行使用。
总结
通过这种方式,我们将复杂的爬取逻辑封装成了一个“黑盒”。主程序(main)不再关心爬虫的内部细节,它只需要知道:
# 只需要实例化,然后运行
agent = LuoguCrawlerAgent(crawler)
final_data = await agent.run(TEST_PROBLEM_ID, max_solutions=3)
LuoguCrawlerAgent 会自动处理缓存、并发和错误,交付一份干净的数据。
核心代码
class LuoguCrawlerAgent:
"""
负责洛谷题目和题解的爬取、提取和缓存。
"""
def __init__(self, crawler: AsyncWebCrawler, cache_dir: str = CACHE_DIR):
self.crawler = crawler
self.cache_dir = cache_dir
os.makedirs(self.cache_dir, exist_ok=True)
print(f"[CrawlerAgent] 初始化完成,缓存目录: {self.cache_dir}")
# --- 1. 公共主入口 (指挥官) ---
async def run(self, problem_id: str, max_solutions: int = 3) -> Dict[str, Any]:
"""
[主入口] 执行完整的爬取和提取流程。
"""
print(f"\n--- [CrawlerAgent] 启动 {problem_id} 任务 (max={max_solutions}) ---")
# 1. 尝试从缓存加载
cached_data = await self._load_from_cache(problem_id, max_solutions)
if cached_data:
print(f"--- [CrawlerAgent] {problem_id} 任务完成 (来自缓存) ---")
return cached_data
# 2. 缓存未命中,执行爬取流水线
print(f"[CrawlerAgent] CACHE MISS: {problem_id}。开始实时爬取...")
final_output = await self._execute_crawl_pipeline(problem_id, max_solutions)
# 3. 仅在爬取成功时保存到缓存
if "error" not in final_output.get("problem", {}):
await self._save_to_cache(problem_id, final_output)
else:
print(f"[CrawlerAgent] 题目爬取失败,不保存缓存。")
print(f"--- [CrawlerAgent] {problem_id} 任务完成 (实时爬取)。---")
return final_output
# --- 2. 缓存处理 (I/O) ---
async def _load_from_cache(self, problem_id: str, max_solutions: int) -> Optional[Dict[str, Any]]:
"""(职责1) 尝试从本地 JSON 加载并验证缓存"""
cache_file = os.path.join(self.cache_dir, f"{problem_id}.json")
try:
async with aiofiles.open(cache_file, 'r', encoding='utf-8') as f:
content = await f.read()
cached_data = json.loads(content)
# 验证缓存数据
if 'problem' not in cached_data or "error" in cached_data['problem']:
print(f"[CrawlerAgent] CACHE INVALID: {problem_id} 缓存损坏。")
return None
# 验证缓存数量
cached_solutions_count = len(cached_data.get("solutions", []))
if cached_solutions_count >= max_solutions:
print(f"[CrawlerAgent] CACHE HIT: {problem_id} (含 {cached_solutions_count} 篇题解) 满足需求。")
return cached_data
else:
print(f"[CrawlerAgent] CACHE PARTIAL: 缓存 {cached_solutions_count} 篇, 需求 {max_solutions}。")
return None
except FileNotFoundError:
return None # 缓存未命中,正常
except Exception as e:
print(f"[CrawlerAgent] CACHE ERROR: 加载 {cache_file} 失败 ({e})。")
return None # 缓存读取失败,当作未命中处理
async def _save_to_cache(self, problem_id: str, data: Dict[str, Any]):
"""(职责2) 异步保存结果到 JSON 缓存"""
cache_file = os.path.join(self.cache_dir, f"{problem_id}.json")
try:
async with aiofiles.open(cache_file, 'w', encoding='utf-8') as f:
content_to_save = json.dumps(data, indent=2, ensure_ascii=False)
await f.write(content_to_save)
print(f"[CrawlerAgent] CACHE SAVE: 成功保存 {problem_id} 到 {cache_file}")
except Exception as e:
print(f"[CrawlerAgent] CACHE SAVE FAILED: 写入 {cache_file} 失败: {e}")
# --- 3. 核心爬取流水线 (干活的) ---
async def _execute_crawl_pipeline(self, problem_id: str, max_solutions: int) -> Dict[str, Any]:
"""(职责3) 执行所有爬取、搜索和提取任务"""
# 3.1. 并行执行“爬题目”和“搜题解列表”
problem_task = self._fetch_problem_details(problem_id)
search_task = self._search_solution_urls(problem_id)
problem_result, solution_urls = await asyncio.gather(problem_task, search_task)
final_output = {
"problem": problem_result,
"solutions": []
}
# 如果题目爬取失败,则中止
if "error" in problem_result:
print(f"[CrawlerAgent] 严重错误:题目 {problem_id} 爬取失败。任务中止。")
return final_output
# 如果没有题解,也提前返回
if not solution_urls:
print(f"[CrawlerAgent] 警告:没有找到 {problem_id} 的题解链接。")
return final_output
# 3.2. 并行提取 N 篇题解
urls_to_fetch = solution_urls[:max_solutions]
print(f"[CrawlerAgent] 准备从 {len(urls_to_fetch)} 个链接中提取题解内容...")
solution_tasks = [self._fetch_solution_content(article) for article in urls_to_fetch]
solution_results = await asyncio.gather(*solution_tasks)
# 3.3. 清洗题解结果
successful_solutions = []
for i, res in enumerate(solution_results):
if "error" not in res:
successful_solutions.append(res)
else:
print(f"[CrawlerAgent] 提取 {urls_to_fetch[i]['url']} 失败: {res['error']}")
final_output["solutions"] = successful_solutions
print(f"[CrawlerAgent] 提取完成,成功 {len(successful_solutions)} / {len(urls_to_fetch)} 篇。")
return final_output
# --- 4. 子任务 ---
async def _fetch_problem_details(self, problem_id: str) -> Dict[str, Any]:
"""[子任务] 爬取并提取单个题目的详细信息。"""
url = f"https://www.luogu.com.cn/problem/{problem_id}"
print(f"[CrawlerAgent.Problem] 正在爬取题目: {url}")
config = CrawlerRunConfig(
cache_mode=CacheMode.ENABLED, # 题目页可以缓存
extraction_strategy=PROBLEM_STRATEGY,
delay_before_return_html=1
)
result = await self.crawler.arun(url, config=config)
if not (result.success and result.extracted_content):
return {"error": "crawl4ai 爬取题目失败", "details": result.error_message}
try:
data_list = json.loads(result.extracted_content)
if not data_list:
return {"error": "LLM 从题目页返回了空列表"}
problem_data = ProblemDetails.model_validate(data_list[0])
print(f"[CrawlerAgent.Problem] 题目 {problem_id} 提取成功。")
return problem_data.model_dump()
except json.JSONDecodeError:
return {"error": "LLM 返回了无效的 JSON", "raw": result.extracted_content}
except Exception as e:
return {"error": f"Pydantic 验证失败: {e}", "raw": result.extracted_content}
async def _search_solution_urls(self, problem_id: str) -> List[Dict[str, str]]:
"""[子任务] 异步执行同步的 URL 搜索。"""
loop = asyncio.get_running_loop()
# 在一个单独的线程中运行同步的 blocking IO
results = await loop.run_in_executor(
None, self._sync_search_solutions, problem_id
)
return results
def _sync_search_solutions(self, problem_id: str) -> List[Dict[str, str]]:
"""[子任务] 同步使用 requests 搜索题解 URL。"""
search_url = f"https://www.luogu.com.cn/article?keyword={problem_id}&page=1"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
print(f"[CrawlerAgent.Search] 正在用 requests 搜索: {search_url} ...")
try:
response = requests.get(search_url, headers=headers, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
article_links = soup.find_all('a', href=re.compile(r'^/article/[a-zA-Z0-9]{8}$'))
if not article_links:
print("[CrawlerAgent.Search] 失败:没有找到任何 /article/... 链接。")
return []
results = []
seen_urls = set()
for link in article_links:
href = link.get('href')
if href in seen_urls:
continue
title = link.get_text(strip=True)
if href and (problem_id in title or problem_id.lower() in title):
results.append({"title": title, "url": href})
seen_urls.add(href)
print(f"[CrawlerAgent.Search] 成功:找到 {len(results)} 篇相关题解链接。")
return results
except requests.RequestException as e:
print(f"[CrawlerAgent.Search] 异常: {e}")
return []
async def _fetch_solution_content(self, article: Dict[str, str]) -> Dict[str, Any]:
"""[子任务] 爬取并提取单篇题解的详细内容。"""
full_url = f"https://www.luogu.com.cn{article['url']}"
print(f"[CrawlerAgent.Solution] 正在提取: {full_url} ...")
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS, # 题解页不应缓存
extraction_strategy=SOLUTION_STRATEGY,
delay_before_return_html=1
)
result = await self.crawler.arun(full_url, config=config)
if not (result.success and result.extracted_content):
return {"error": "crawl4ai 爬取题解失败"}
try:
data_list = json.loads(result.extracted_content)
if not data_list:
return {"error": "LLM 从题解页返回了空列表"}
raw_data = data_list[0]
clean_data = SolutionDetails.model_validate(raw_data)
if not clean_data.solution_text:
return {"error": "LLM 返回了空内容"}
# 附加上 title 和 url
final_data = clean_data.model_dump()
final_data['title'] = article['title']
final_data['url'] = article['url']
return final_data
except json.JSONDecodeError:
return {"error": "LLM 返回了无效的 JSON", "raw": result.extracted_content}
except Exception as e:
return {"error": f"Pydantic 验证失败: {e}", "raw": result.extracted_content}
AnalysisAgent
调用 chatglm 的过程中我遇到了较多的困难,比如一开始我希望直接上 langchain 架构,但是遇到了
with_structured_output不支持的问题。后面就尝试从 zhipu 的官方 api 开始调试,对着官方文档,费了点功夫调通了。还有个问题是我一开始调 api 非常的慢,一度让我以为是网络问题或者单纯卡住了,后面发现是默认打开了思考模式 emmmmm....
AnalysisAgent 是我们流水线的“大脑”。LuoguCrawlerAgent 负责“搜集食材”(爬取题目和题解),而 AnalysisAgent 则负责将这些原始、杂乱的“食材”加工成一道“米其林大餐”(一份高质量、综合的分析报告)。
它的核心职责是调用大模型(ZhipuAI),并健壮地处理模型的输出。
核心设计:提示、解析与双重保存
与爬虫 Agent 类似,AnalysisAgent 也被重构成了一个清晰的流水线,由 run 方法统一指挥。
1. 构建提示 (_build_prompts)
这是最关键的一步。我们动态构建两条 Prompt:
* System Prompt: 告诉 LLM 它是一个“算法竞赛金牌教练”,并把 ProblemAnalysis Pydantic 模型的 Schema(model_fields)喂给它,命令它必须按这个格式返回。
* User Prompt: 把 LuoguCrawlerAgent 爬来的 problem_data 和 solutions_data 两个大 JSON 块塞进去,作为 LLM 分析的“原材料”。
2. LLM 调用 (_stream_llm_response)
这里我们直接使用了 ZhipuAI 的官方 API。如引言所说,一个关键优化是设置 thinking={"type": "disabled"},这能显著加快响应速度。另外开启流式输出,让我们能实时看到“打字”过程。
3. “双保险”解析 (_parse_to_dict)
LLM 的输出并不总是严格的 JSON。有时它返回的是标准 JSON(双引号),有时它返回的是 Python 字典。
_parse_to_dict 方法实现了一个“双保险”:
1. 首先,用 json.loads() 尝试按标准 JSON 解析。
2. 如果失败了(JSONDecodeError),它不会立即放弃,而是会用 ast.literal_eval() 再次尝试。ast.literal_eval 可以安全地将 Python 字典格式的字符串解析成 Python 对象,完美解决了 LLM“不守规矩”的问题。
4. 双重保存 (_save_..._result)
在成功解析出 Python 字典后,Agent 会同时保存两种格式的文件:
1. _analysis.json: 保存为标准 JSON 文件,用于后续的机器读取或程序调用。
2. _analysis.md: 提取字典中的 detailed_solution 和 sample_code 等字段,放到markdown中,这个主要是给我看的。
通过这个流程,AnalysisAgent 确保了无论爬虫数据多乱、LLM 回答多“随意”,我们最终都能得到一份结构化、易于阅读的高质量分析报告。
核心代码
class AnalysisAgent:
"""
负责调用 LLM 分析问题数据,并处理和保存结果。
会同时保存 .json (机器可读) 和 .md (人类可读) 两种格式。
"""
def __init__(self):
self.llm = ZhipuAiClient(ZHIPU_API_KEY, ZHIPU_BASE_URL)
self.schema = ProblemAnalysis
self.analysis_cache_dir = ANALYSIS_CACHE_DIR
os.makedirs(self.analysis_cache_dir, exist_ok=True)
print(f"[Agent] 初始化完成 (ZhipuAI),结果保存至 {self.analysis_cache_dir}")
def _build_prompts(self, problem_data: Dict[str, Any], solutions_data: List[Dict[str, Any]]) -> Tuple[str, str]:
"""构建系统和用户提示词"""
system_prompt = get_system_prompt(self.schema.model_fields)
user_prompt = get_user_prompt(problem_data, solutions_data)
return system_prompt, user_prompt
def _stream_llm_response(self, system_prompt: str, user_prompt: str) -> str:
"""调用 LLM 并流式获取完整响应"""
print("[Agent] 正在流式接收 LLM 响应...")
response = self.llm.chat.completions.create(
model="glm-4.6",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
thinking={"type": "disabled"},
stream=True
)
full_content = ""
for chunk in response:
if not chunk.choices: continue
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
full_content += delta.content
print(delta.content, end="", flush=True)
if chunk.choices[0].finish_reason:
print(f"\n\n[Agent] 完成原因: {chunk.choices[0].finish_reason}")
if hasattr(chunk, 'usage') and chunk.usage:
print(f"[Agent] 令牌使用: 输入 {chunk.usage.prompt_tokens}, 输出 {chunk.usage.completion_tokens}")
print("\n[Agent] LLM 响应接收完毕。")
return full_content
def _extract_json_string(self, raw_content: str) -> str:
"""从 LLM 的原始输出中提取 { ... } 块"""
match = re.search(r'\{.*\}', raw_content, re.DOTALL)
if match:
print("[Agent] 已从 LLM 响应中提取 { } 块。")
return match.group(0)
print("[Agent] 无法提取 { } 块,将尝试解析完整响应。")
return raw_content
def _parse_to_dict(self, s: str) -> Optional[Dict[str, Any]]:
"""使用双保险 (json / ast) 解析字符串为字典"""
try:
parsed = json.loads(s)
print("[Agent] 成功解析为 JSON。")
return parsed
except json.JSONDecodeError:
print("[Agent] JSON 解析失败,尝试 ast.literal_eval (Python 字典)...")
try:
parsed = ast.literal_eval(s)
print("[Agent] 成功解析为 Python 字面量。")
return parsed
except Exception as e:
print(f"[Agent] 所有解析均失败: {e}")
return None
def _save_json_result(self, problem_id: str, data: Dict[str, Any]):
"""将解析成功的字典保存为美化的 .json 文件"""
save_path = os.path.join(self.analysis_cache_dir, f"{problem_id}_analysis.json")
try:
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"[Agent] 分析结果 (JSON) 已保存到: {save_path}")
except Exception as e:
print(f"[Agent] 保存 .json 文件时出错: {e}")
def _save_markdown_result(self, problem_id: str, data: Dict[str, Any]):
"""[!] 将解析结果中的关键字段提取并保存为 .md 文件"""
save_path = os.path.join(self.analysis_cache_dir, f"{problem_id}_analysis.md")
# 1. 从字典中提取内容
solution_text = data.get('detailed_solution', 'LLM 未提供详细题解。')
sample_code = data.get('sample_code', 'LLM 未提供示例代码。')
keywords = data.get('keywords', [])
# 我们的 Pydantic 描述里写了是 C++
code_lang = "cpp"
# 2. 构建 Markdown 字符串
md_content = f"# {problem_id} 详细题解\n\n"
md_content += f"{solution_text}\n\n"
md_content += f"# 参考代码 ({code_lang})\n\n"
md_content += f"```{code_lang}\n"
md_content += f"{sample_code}\n"
md_content += "```\n\n"
md_content += "# 核心知识点\n\n"
if keywords:
for keyword in keywords:
md_content += f"* {keyword}\n"
else:
md_content += "无\n"
# 3. 保存文件
try:
with open(save_path, 'w', encoding='utf-8') as f:
f.write(md_content)
print(f"[Agent] 分析结果 (Markdown) 已保存到: {save_path}")
except Exception as e:
print(f"[Agent] 保存 .md 文件时出错: {e}")
def _save_error(self, problem_id: str, raw_content: str):
"""保存原始的、无法解析的 LLM 响应"""
error_path = os.path.join(self.analysis_cache_dir, f"{problem_id}_error.txt")
try:
with open(error_path, 'w', encoding='utf-8') as f:
f.write(raw_content)
print(f"[Agent] 原始错误响应已保存到: {error_path}")
except Exception as e:
print(f"[Agent] 保存 _error.txt 文件时出错: {e}")
def run(self, problem_id: str, problem_data: Dict[str, Any], solutions_data: List[Dict[str, Any]]) -> str:
"""
[主入口] 执行完整的分析、解析和保存流程。
返回 LLM 的原始字符串响应。
"""
print(f"\n--- [Agent] 开始分析 {problem_id} ---")
# 1. 构建提示
system_prompt, user_prompt = self._build_prompts(problem_data, solutions_data)
# 2. 获取 LLM 响应
full_content = self._stream_llm_response(system_prompt, user_prompt)
if not full_content:
print("[Agent] LLM 响应为空,任务中止。")
return full_content
# 3. 提取
extracted_str = self._extract_json_string(full_content)
# 4. 解析
parsed_data = self._parse_to_dict(extracted_str)
# 5. 保存
if parsed_data:
# [!] 同时调用两个保存方法
self._save_json_result(problem_id, parsed_data)
self._save_markdown_result(problem_id, parsed_data)
else:
self._save_error(problem_id, full_content)
return full_content
效果
值得一提的是,在调通之后,我尝试把模型的输出拿去提交,以luogu紫题P4137为例,在5次测试中,只有一次代码是可以交上去直接AC的。在P1117这道题甚至出现了编译错误(尽管后面我看了一下是个小错误)。这说明不开思考模式的能力还是比较有限的。但是从题解的质量上来说结构上是比较合理的,也比人类的题解相对来说要更加详细。
另外,我发现在爬虫阶段爬取到的前三条并非是最高赞的回答,这是一个爬虫策略的遗留问题:在solution/里面题解确实是按照点赞数排序的,但是article里并非如此。另外像P4137这种题目有多个解法的可能对LLM并不是很友好。如果能同一种解法多个题解可能效果更好。这方面还是可以继续优化的。