|
26 | 26 |
|
27 | 27 | LINE_LIMIT_FOR_FETCHING_FILE_CONTENT = int(os.getenv('LINE_LIMIT_FOR_FETCHING_FILE_CONTENT', default=1000)) |
28 | 28 |
|
| 29 | +FILE_LIMIT_FOR_LIST_FILES = int(os.getenv('FILE_LIMIT_FOR_LIST_FILES', default=100)) |
| 30 | + |
29 | 31 | def is_subdirectory(directory, potential_subdirectory): |
30 | 32 | directory_path = Path(directory) |
31 | 33 | potential_subdirectory_path = Path(potential_subdirectory) |
@@ -69,16 +71,22 @@ def search_zipfile(database_path, term, search_dir = None): |
69 | 71 | results[filename].append(i+1) |
70 | 72 | return results |
71 | 73 |
|
72 | | -def _list_files(database_path, root_dir = None): |
| 74 | +def _list_files(database_path, root_dir = None, recursive=True): |
73 | 75 | results = [] |
74 | 76 | root_dir = strip_leading_dash(root_dir) |
75 | 77 | with zipfile.ZipFile(database_path) as z: |
76 | 78 | for entry in z.infolist(): |
77 | 79 | if entry.is_dir(): |
| 80 | + if not recursive: |
| 81 | + dirname = remove_root_dir(entry.filename) |
| 82 | + if Path(dirname).parent == Path(root_dir): |
| 83 | + results.append(dirname + '/') |
78 | 84 | continue |
79 | 85 | filename = remove_root_dir(entry.filename) |
80 | 86 | if root_dir and not is_subdirectory(root_dir, filename): |
81 | 87 | continue |
| 88 | + if not recursive and Path(filename).parent != Path(root_dir): |
| 89 | + continue |
82 | 90 | results.append(filename) |
83 | 91 | return results |
84 | 92 |
|
@@ -152,8 +160,27 @@ async def list_files( |
152 | 160 | if not source_path or not source_path.exists(): |
153 | 161 | return f"Invalid {owner} and {repo}. Check that the input is correct or try to fetch the repo from gh first." |
154 | 162 | content = _list_files(source_path, path) |
| 163 | + if len(content) > FILE_LIMIT_FOR_LIST_FILES: |
| 164 | + return f"Too many files to display in {owner}/{repo} at path {path} ({len(content)} files). Try using `list_files_non_recursive` instead." |
155 | 165 | return json.dumps(content, indent=2) |
156 | 166 |
|
| 167 | +@mcp.tool() |
| 168 | +async def list_files_non_recursive( |
| 169 | + owner: str = Field(description="The owner of the repository"), |
| 170 | + repo: str = Field(description="The name of the repository"), |
| 171 | + path: str = Field(description="The path to the directory in the repository")) -> str: |
| 172 | + """ |
| 173 | + List the files of a directory from a local GitHub repository non-recursively. |
| 174 | + Subdirectories will be listed and indicated with a trailing slash. |
| 175 | + """ |
| 176 | + source_path = Path(f"{LOCAL_GH_DIR}/{owner}/{repo}.zip") |
| 177 | + source_path = sanitize_file_path(source_path, [LOCAL_GH_DIR]) |
| 178 | + if not source_path or not source_path.exists(): |
| 179 | + return f"Invalid {owner} and {repo}. Check that the input is correct or try to fetch the repo from gh first." |
| 180 | + content = _list_files(source_path, path, recursive=False) |
| 181 | + return json.dumps(content, indent=2) |
| 182 | + |
| 183 | + |
157 | 184 | @mcp.tool() |
158 | 185 | async def search_repo( |
159 | 186 | owner: str = Field(description="The owner of the repository"), |
|
0 commit comments