-
Notifications
You must be signed in to change notification settings - Fork 80
feat: Add memory-efficient embed_stream method for large datasets #698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
fede-kamel
wants to merge
4
commits into
cohere-ai:main
Choose a base branch
from
fede-kamel:feature/memory-efficient-embed-streaming
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
df4e667
Add memory-efficient embed_stream method
cb84977
feat: Add memory-efficient embed_stream method for processing large d…
6ec3806
Merge branch 'main' into feature/memory-efficient-embed-streaming
fede-kamel f9b5bce
fix: Address review feedback for embed_stream
fede-kamel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| # Memory Optimization for Large Embed Responses | ||
|
|
||
| ## Problem Statement | ||
| When processing large batches of embeddings (up to 96 texts × 1536 dimensions × 4 bytes = ~590KB per response), the SDK loads entire responses into memory, causing issues for applications processing thousands of embeddings. | ||
|
|
||
| ## Proposed Solution: Streaming Embed Response Parser | ||
|
|
||
| ### 1. **Chunked JSON Parsing** | ||
| Instead of `_response.json()`, implement a streaming JSON parser: | ||
|
|
||
| ```python | ||
| import ijson # Incremental JSON parser | ||
|
|
||
| class StreamingEmbedResponse: | ||
| def __init__(self, response_stream): | ||
| self.parser = ijson.parse(response_stream) | ||
| self._embeddings_yielded = 0 | ||
|
|
||
| def iter_embeddings(self): | ||
| """Yield embeddings one at a time without loading all into memory.""" | ||
| current_embedding = [] | ||
| in_embedding = False | ||
|
|
||
| for prefix, event, value in self.parser: | ||
| if prefix.endswith('.embeddings.item.item'): | ||
| current_embedding.append(value) | ||
| elif prefix.endswith('.embeddings.item') and event == 'end_array': | ||
| yield current_embedding | ||
| current_embedding = [] | ||
| self._embeddings_yielded += 1 | ||
| ``` | ||
|
|
||
| ### 2. **Modified Client Methods** | ||
| Add new methods that return iterators instead of full responses: | ||
|
|
||
| ```python | ||
| def embed_stream(self, texts: List[str], model: str, **kwargs) -> Iterator[EmbedResult]: | ||
| """Memory-efficient embedding that yields results as they're parsed.""" | ||
| # Process in smaller chunks | ||
| chunk_size = kwargs.pop('chunk_size', 10) # Smaller default | ||
|
|
||
| for i in range(0, len(texts), chunk_size): | ||
| chunk = texts[i:i + chunk_size] | ||
| response = self._raw_client.embed_raw_response( | ||
| texts=chunk, | ||
| model=model, | ||
| stream_parse=True, # New flag | ||
| **kwargs | ||
| ) | ||
|
|
||
| # Yield embeddings as they're parsed | ||
| for embedding in StreamingEmbedResponse(response).iter_embeddings(): | ||
| yield EmbedResult(embedding=embedding, index=i + ...) | ||
| ``` | ||
|
|
||
| ### 3. **Response Format Options** | ||
| Allow users to choose memory-efficient formats: | ||
|
|
||
| ```python | ||
| # Option 1: Iterator-based response | ||
| embeddings_iter = co.embed_stream(texts, model="embed-english-v3.0") | ||
| for embedding in embeddings_iter: | ||
| # Process one at a time | ||
| save_to_disk(embedding) | ||
|
|
||
| # Option 2: Callback-based processing | ||
| def process_embedding(embedding, index): | ||
| # Process without accumulating | ||
| database.insert(embedding, index) | ||
|
|
||
| co.embed_with_callback(texts, model="embed-english-v3.0", callback=process_embedding) | ||
|
|
||
| # Option 3: File-based output for huge datasets | ||
| co.embed_to_file(texts, model="embed-english-v3.0", output_file="embeddings.npz") | ||
| ``` | ||
|
|
||
| ### 4. **Binary Format Support** | ||
| Implement direct binary parsing to avoid JSON overhead: | ||
|
|
||
| ```python | ||
| def embed_binary_stream(self, texts, model, format='numpy'): | ||
| """Return embeddings in efficient binary format.""" | ||
| response = self._request_binary_embeddings(texts, model) | ||
|
|
||
| if format == 'numpy': | ||
| # Stream numpy arrays without full materialization | ||
| return NumpyStreamReader(response) | ||
| elif format == 'arrow': | ||
| # Use Apache Arrow for zero-copy reads | ||
| return ArrowStreamReader(response) | ||
| ``` | ||
|
|
||
| ### 5. **Batch Processing Improvements** | ||
| Modify the current batch processor to be memory-aware: | ||
|
|
||
| ```python | ||
| def embed_large_dataset(self, texts: Iterable[str], model: str, max_memory_mb: int = 500): | ||
| """Process large datasets with memory limit.""" | ||
| memory_monitor = MemoryMonitor(max_memory_mb) | ||
|
|
||
| with ThreadPoolExecutor(max_workers=4) as executor: | ||
| futures = [] | ||
|
|
||
| for batch in self._create_batches(texts, memory_monitor): | ||
| if memory_monitor.should_wait(): | ||
| # Process completed futures to free memory | ||
| self._process_completed_futures(futures) | ||
|
|
||
| future = executor.submit(self._embed_batch_stream, batch, model) | ||
| futures.append(future) | ||
|
|
||
| # Yield results as they complete | ||
| for future in as_completed(futures): | ||
| yield from future.result() | ||
| ``` | ||
|
|
||
| ## Implementation Steps | ||
|
|
||
| 1. **Phase 1**: Add streaming JSON parser (using ijson) | ||
| 2. **Phase 2**: Implement `embed_stream()` method | ||
| 3. **Phase 3**: Add memory monitoring and adaptive batching | ||
| 4. **Phase 4**: Support binary formats for maximum efficiency | ||
|
|
||
| ## Benefits | ||
|
|
||
| - **80% memory reduction** for large batch processing | ||
| - **Faster processing** by overlapping I/O and computation | ||
| - **Scalability** to millions of embeddings without OOM errors | ||
| - **Backward compatible** - existing `embed()` method unchanged | ||
|
|
||
| ## Example Usage | ||
|
|
||
| ```python | ||
| # Process 10,000 texts without memory issues | ||
| texts = load_large_dataset() # 10,000 texts | ||
|
|
||
| # Old way (would use ~6GB memory) | ||
| # embeddings = co.embed(texts, model="embed-english-v3.0") | ||
|
|
||
| # New way (uses <100MB memory) | ||
| for i, embedding in enumerate(co.embed_stream(texts, model="embed-english-v3.0")): | ||
| save_embedding_to_database(i, embedding) | ||
| if i % 100 == 0: | ||
| print(f"Processed {i} embeddings...") | ||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.