1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
| import asyncio
import os
import zipfile
from typing import List, Optional
from mcp.server.fastmcp import FastMCP
from docx import Document
from lxml import etree
# 初始化MCP服务器实例
mcp = FastMCP("docx_agent")
@mcp.tool()
def generate_docx(output_path: str, title: str, paragraphs: List[str]) -> str:
"""
生成一个带有标题和段落的新 DOCX 文件。
Args:
output_path: 绝对路径,用于保存生成的 DOCX 文件。
title: 文档的标题。
paragraphs: 要包含在文档中的文本段落列表。
Returns:
状态消息,指示成功或失败。
"""
try:
doc = Document()
doc.add_heading(title, 0)
for p in paragraphs:
doc.add_paragraph(p)
doc.save(output_path)
return f"Successfully generated DOCX file at {output_path}"
except Exception as e:
return f"Error generating DOCX file: {e}"
@mcp.tool()
def read_docx(input_path: str) -> str:
"""
读取 DOCX 文件的文本内容。
Args:
input_path: 绝对路径,用于读取 DOCX 文件。
Returns:
从文档中提取的文本内容。
"""
try:
doc = Document(input_path)
full_text = []
for p in doc.paragraphs:
full_text.append(p.text)
return "\n".join(full_text)
except Exception as e:
return f"Error reading DOCX file: {e}"
@mcp.tool()
def comment_docx(input_path: str, output_path: str, target_text: str, author: str, comment_text: str) -> str:
"""
在 DOCX 文件中添加评论。
Args:
input_path: 现有 DOCX 文件的绝对路径。
output_path: 保存修改后的 DOCX 文件的绝对路径。
target_text: 文档中要搜索的精确文本片段。整个段落将被高亮显示。
author: 评论者的姓名。
comment_text: 评论的文本内容。
Returns:
状态消息,指示成功或失败。
"""
try:
import tempfile
import shutil
from datetime import datetime, timezone
namespaces = {
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships'
}
with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(input_path, 'r') as z:
z.extractall(tmpdir)
doc_xml_path = os.path.join(tmpdir, 'word', 'document.xml')
if not os.path.exists(doc_xml_path):
return "Error: Invalid docx file (word/document.xml not found)."
tree = etree.parse(doc_xml_path)
root = tree.getroot()
found_p = None
for p in root.xpath('//w:p', namespaces=namespaces):
texts = p.xpath('.//w:t/text()', namespaces=namespaces)
p_text = "".join(texts)
if target_text in p_text:
found_p = p
break
if found_p is None:
return f"Error: target_text '{target_text}' not found in the document."
comments_xml_path = os.path.join(tmpdir, 'word', 'comments.xml')
next_id = "0"
if os.path.exists(comments_xml_path):
comments_tree = etree.parse(comments_xml_path)
comments_root = comments_tree.getroot()
max_id = -1
for c in comments_root.xpath('//w:comment', namespaces=namespaces):
c_id = int(c.get(f"{{{namespaces['w']}}}id", "-1"))
if c_id > max_id:
max_id = c_id
next_id = str(max_id + 1)
else:
comments_root = etree.Element(f"{{{namespaces['w']}}}comments", nsmap=namespaces)
comments_tree = etree.ElementTree(comments_root)
rels_path = os.path.join(tmpdir, 'word', '_rels', 'document.xml.rels')
if os.path.exists(rels_path):
rels_tree = etree.parse(rels_path)
rels_root = rels_tree.getroot()
rel_id = f"rId{len(rels_root)+100}"
ns_rels = {'rel': 'http://schemas.openxmlformats.org/package/2006/relationships'}
etree.SubElement(rels_root, f"{{{ns_rels['rel']}}}Relationship", dict(
Id=rel_id,
Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/comments",
Target="comments.xml"
))
rels_tree.write(rels_path, xml_declaration=True, encoding='UTF-8', standalone=True)
content_types_path = os.path.join(tmpdir, '[Content_Types].xml')
if os.path.exists(content_types_path):
ct_tree = etree.parse(content_types_path)
ct_root = ct_tree.getroot()
ns_ct = {'ct': 'http://schemas.openxmlformats.org/package/2006/content-types'}
has_comments_ct = False
for override in ct_root.xpath('//ct:Override', namespaces=ns_ct):
if override.get("PartName") == "/word/comments.xml":
has_comments_ct = True
break
if not has_comments_ct:
etree.SubElement(ct_root, f"{{{ns_ct['ct']}}}Override", dict(
PartName="/word/comments.xml",
ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.comments+xml"
))
ct_tree.write(content_types_path, xml_declaration=True, encoding='UTF-8', standalone=True)
date_str = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
comment_el = etree.SubElement(comments_root, f"{{{namespaces['w']}}}comment", dict(
{f"{{{namespaces['w']}}}id": next_id, f"{{{namespaces['w']}}}author": author, f"{{{namespaces['w']}}}date": date_str}
))
cp = etree.SubElement(comment_el, f"{{{namespaces['w']}}}p")
cr = etree.SubElement(cp, f"{{{namespaces['w']}}}r")
ct = etree.SubElement(cr, f"{{{namespaces['w']}}}t")
ct.text = comment_text
comments_tree.write(comments_xml_path, xml_declaration=True, encoding='UTF-8', standalone=True)
start_el = etree.Element(f"{{{namespaces['w']}}}commentRangeStart", dict({f"{{{namespaces['w']}}}id": next_id}))
found_p.insert(0, start_el)
end_el = etree.Element(f"{{{namespaces['w']}}}commentRangeEnd", dict({f"{{{namespaces['w']}}}id": next_id}))
found_p.append(end_el)
ref_r = etree.Element(f"{{{namespaces['w']}}}r")
ref_rPr = etree.SubElement(ref_r, f"{{{namespaces['w']}}}rPr")
etree.SubElement(ref_rPr, f"{{{namespaces['w']}}}rStyle", dict({f"{{{namespaces['w']}}}val": "CommentReference"}))
etree.SubElement(ref_r, f"{{{namespaces['w']}}}commentReference", dict({f"{{{namespaces['w']}}}id": next_id}))
found_p.append(ref_r)
tree.write(doc_xml_path, xml_declaration=True, encoding='UTF-8', standalone=True)
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as z:
for root_dir, dirs, files in os.walk(tmpdir):
for file in files:
file_path = os.path.join(root_dir, file)
arcname = os.path.relpath(file_path, tmpdir)
z.write(file_path, arcname)
return f"Successfully commented on document and saved to {output_path}"
except Exception as e:
import traceback
return f"Error commenting on DOCX file: {e}\n{traceback.format_exc()}"
if __name__ == "__main__":
mcp.run()
|