diff --git a/scripts/extract_minidump.py b/scripts/extract_minidump.py new file mode 100755 index 000000000..6521e6183 --- /dev/null +++ b/scripts/extract_minidump.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +""" +Extract minidump (.dmp) attachments from Sentry envelope files. + +Sentry envelope format: +- Line 1: Envelope header (JSON) +- For each item: + - Item header (JSON with "type", "length", and optional metadata) + - Item payload (raw bytes of specified length) + - Items are separated by newlines + +Usage: + python extract_minidump.py [output_file] + +If output_file is not specified, the filename from the envelope will be used. +""" + +import argparse +import json +import os +import sys +from pathlib import Path + + +def parse_envelope(data: bytes) -> tuple[dict, list[tuple[dict, bytes]]]: + """ + Parse a Sentry envelope and return the header and list of items. + + Args: + data: Raw envelope file contents + + Returns: + Tuple of (envelope_header, list of (item_header, item_payload) tuples) + """ + pos = 0 + + # Parse envelope header (first line) + newline_pos = data.find(b'\n', pos) + if newline_pos == -1: + raise ValueError("Invalid envelope: missing newline after header") + + envelope_header = json.loads(data[pos:newline_pos].decode('utf-8')) + pos = newline_pos + 1 + + items = [] + + # Parse items + while pos < len(data): + # Skip any extra newlines between items + while pos < len(data) and data[pos:pos+1] == b'\n': + pos += 1 + + if pos >= len(data): + break + + # Parse item header + newline_pos = data.find(b'\n', pos) + if newline_pos == -1: + # No more complete items + break + + item_header_bytes = data[pos:newline_pos] + try: + item_header = json.loads(item_header_bytes.decode('utf-8')) + except json.JSONDecodeError as e: + print(f"Warning: Failed to parse item header at position {pos}: {e}") + break + + pos = newline_pos + 1 + + # Get payload length + payload_len = item_header.get('length') + + if payload_len is None: + # Length omitted: read until next newline or end + next_newline = data.find(b'\n', pos) + if next_newline == -1: + payload_len = len(data) - pos + else: + payload_len = next_newline - pos + + # Extract payload + payload = data[pos:pos + payload_len] + pos += payload_len + + items.append((item_header, payload)) + + return envelope_header, items + + +def extract_minidump(envelope_path: str, output_path: str = None) -> str: + """ + Extract the minidump attachment from a Sentry envelope file. + + Args: + envelope_path: Path to the envelope file + output_path: Optional output path for the minidump. If not specified, + uses the filename from the envelope metadata. + + Returns: + Path to the extracted minidump file + + Raises: + FileNotFoundError: If envelope file doesn't exist + ValueError: If no minidump found in envelope + """ + envelope_path = Path(envelope_path) + + if not envelope_path.exists(): + raise FileNotFoundError(f"Envelope file not found: {envelope_path}") + + # Read envelope file + with open(envelope_path, 'rb') as f: + data = f.read() + + print(f"Read {len(data)} bytes from {envelope_path}") + + # Parse envelope + envelope_header, items = parse_envelope(data) + + print(f"Envelope event_id: {envelope_header.get('event_id', 'N/A')}") + print(f"Found {len(items)} item(s) in envelope") + + # Find minidump attachment + minidump_item = None + minidump_header = None + + for item_header, item_payload in items: + item_type = item_header.get('type', '') + attachment_type = item_header.get('attachment_type', '') + + print(f" - Item type: {item_type}, attachment_type: {attachment_type}, " + f"length: {len(item_payload)} bytes") + + if item_type == 'attachment' and attachment_type == 'event.minidump': + minidump_item = item_payload + minidump_header = item_header + print(f" -> Found minidump!") + + if minidump_item is None: + raise ValueError("No minidump attachment found in envelope") + + # Determine output path + if output_path is None: + filename = minidump_header.get('filename', 'minidump.dmp') + output_path = envelope_path.parent / filename + else: + output_path = Path(output_path) + + # Verify minidump magic bytes (optional sanity check) + if minidump_item[:4] == b'MDMP': + print(f"Minidump magic verified: MDMP") + else: + print(f"Warning: Unexpected magic bytes: {minidump_item[:4]}") + + # Write minidump + with open(output_path, 'wb') as f: + f.write(minidump_item) + + print(f"\nExtracted minidump to: {output_path}") + print(f"Size: {len(minidump_item)} bytes") + + return str(output_path) + + +def list_envelope_contents(envelope_path: str) -> None: + """ + List the contents of a Sentry envelope file without extracting. + """ + envelope_path = Path(envelope_path) + + if not envelope_path.exists(): + raise FileNotFoundError(f"Envelope file not found: {envelope_path}") + + with open(envelope_path, 'rb') as f: + data = f.read() + + envelope_header, items = parse_envelope(data) + + print(f"Envelope: {envelope_path}") + print(f" Size: {len(data)} bytes") + print(f" Event ID: {envelope_header.get('event_id', 'N/A')}") + print(f" DSN: {envelope_header.get('dsn', 'N/A')}") + print() + print(f"Items ({len(items)}):") + + for i, (item_header, item_payload) in enumerate(items): + print(f" [{i}] Type: {item_header.get('type', 'unknown')}") + print(f" Length: {len(item_payload)} bytes") + + if item_header.get('attachment_type'): + print(f" Attachment Type: {item_header['attachment_type']}") + if item_header.get('filename'): + print(f" Filename: {item_header['filename']}") + if item_header.get('content_type'): + print(f" Content-Type: {item_header['content_type']}") + + # Show preview for text items + if item_header.get('type') in ('event', 'session', 'transaction'): + try: + preview = item_payload[:200].decode('utf-8') + if len(item_payload) > 200: + preview += '...' + print(f" Preview: {preview}") + except UnicodeDecodeError: + pass + + print() + + +def main(): + parser = argparse.ArgumentParser( + description='Extract minidump attachments from Sentry envelope files' + ) + parser.add_argument( + 'envelope', + help='Path to the Sentry envelope file' + ) + parser.add_argument( + 'output', + nargs='?', + help='Output path for the minidump (default: use filename from envelope)' + ) + parser.add_argument( + '-l', '--list', + action='store_true', + help='List envelope contents without extracting' + ) + + args = parser.parse_args() + + try: + if args.list: + list_envelope_contents(args.envelope) + else: + extract_minidump(args.envelope, args.output) + except (FileNotFoundError, ValueError) as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/tests/fixtures/minidump.envelope b/tests/fixtures/minidump.envelope new file mode 100644 index 000000000..cdf5cb740 Binary files /dev/null and b/tests/fixtures/minidump.envelope differ diff --git a/tests/test_extract_minidump.py b/tests/test_extract_minidump.py new file mode 100644 index 000000000..9ec307d56 --- /dev/null +++ b/tests/test_extract_minidump.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +""" +Tests for the extract_minidump.py script. + +Uses the minidump.envelope fixture which contains the minidump.dmp from +tests/fixtures to verify that extraction produces identical output. +""" + +import hashlib +import json +import shutil +import sys +import tempfile +import unittest +from pathlib import Path + +# Add the scripts directory to the path so we can import extract_minidump +TESTS_DIR = Path(__file__).parent +REPO_ROOT = TESTS_DIR.parent +SCRIPTS_DIR = REPO_ROOT / "scripts" +sys.path.insert(0, str(SCRIPTS_DIR)) + +from extract_minidump import parse_envelope, extract_minidump + + +class TestExtractMinidump(unittest.TestCase): + """Test cases for minidump extraction from Sentry envelopes.""" + + @classmethod + def setUpClass(cls): + """Set up test fixtures paths.""" + cls.fixtures_dir = TESTS_DIR / "fixtures" + cls.envelope_path = cls.fixtures_dir / "minidump.envelope" + cls.original_minidump_path = cls.fixtures_dir / "minidump.dmp" + + # Verify fixtures exist + if not cls.envelope_path.exists(): + raise FileNotFoundError( + f"Envelope fixture not found: {cls.envelope_path}\n" + "Run scripts/create_envelope_fixture.py to create it." + ) + if not cls.original_minidump_path.exists(): + raise FileNotFoundError( + f"Original minidump not found: {cls.original_minidump_path}" + ) + + def test_parse_envelope_structure(self): + """Test that envelope parsing returns correct structure.""" + with open(self.envelope_path, "rb") as f: + data = f.read() + + envelope_header, items = parse_envelope(data) + + # Check envelope header + self.assertIn("dsn", envelope_header) + self.assertIn("event_id", envelope_header) + + # Check we have at least 2 items (event + attachment) + self.assertGreaterEqual(len(items), 2) + + # Check item types + item_types = [item[0].get("type") for item in items] + self.assertIn("event", item_types) + self.assertIn("attachment", item_types) + + def test_parse_envelope_minidump_header(self): + """Test that the minidump attachment header is correct.""" + with open(self.envelope_path, "rb") as f: + data = f.read() + + envelope_header, items = parse_envelope(data) + + # Find minidump item + minidump_item = None + for item_header, item_payload in items: + if item_header.get("attachment_type") == "event.minidump": + minidump_item = (item_header, item_payload) + break + + self.assertIsNotNone(minidump_item, "No minidump attachment found") + + header, payload = minidump_item + self.assertEqual(header["type"], "attachment") + self.assertEqual(header["attachment_type"], "event.minidump") + self.assertIn("filename", header) + self.assertEqual(header["length"], len(payload)) + + def test_minidump_magic_bytes(self): + """Test that extracted minidump has correct magic bytes.""" + with open(self.envelope_path, "rb") as f: + data = f.read() + + envelope_header, items = parse_envelope(data) + + # Find minidump payload + minidump_payload = None + for item_header, item_payload in items: + if item_header.get("attachment_type") == "event.minidump": + minidump_payload = item_payload + break + + self.assertIsNotNone(minidump_payload) + # MDMP is the minidump magic signature + self.assertEqual( + minidump_payload[:4], b"MDMP", "Minidump should start with MDMP magic bytes" + ) + + def test_extract_minidump_matches_original(self): + """Test that extracted minidump is identical to original.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "extracted.dmp" + + # Extract the minidump + result_path = extract_minidump(str(self.envelope_path), str(output_path)) + + self.assertEqual(result_path, str(output_path)) + self.assertTrue(output_path.exists()) + + # Compare with original + with open(self.original_minidump_path, "rb") as f: + original_data = f.read() + with open(output_path, "rb") as f: + extracted_data = f.read() + + # Compare sizes + self.assertEqual( + len(extracted_data), + len(original_data), + f"Size mismatch: extracted={len(extracted_data)}, " + f"original={len(original_data)}", + ) + + # Compare content + self.assertEqual( + extracted_data, + original_data, + "Extracted minidump content differs from original", + ) + + def test_extract_minidump_hash_comparison(self): + """Test extraction using hash comparison for additional verification.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "extracted.dmp" + + extract_minidump(str(self.envelope_path), str(output_path)) + + # Calculate hashes + with open(self.original_minidump_path, "rb") as f: + original_hash = hashlib.md5(f.read()).hexdigest() + with open(output_path, "rb") as f: + extracted_hash = hashlib.md5(f.read()).hexdigest() + + self.assertEqual( + extracted_hash, + original_hash, + f"MD5 hash mismatch: extracted={extracted_hash}, " + f"original={original_hash}", + ) + + def test_extract_minidump_default_filename(self): + """Test that extraction uses filename from envelope when not specified.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Copy envelope to temp dir so output goes there + temp_envelope = Path(tmpdir) / "test.envelope" + shutil.copy(self.envelope_path, temp_envelope) + + # Extract without specifying output path + result_path = extract_minidump(str(temp_envelope)) + + # Should use filename from envelope header (minidump.dmp) + self.assertTrue(Path(result_path).exists()) + self.assertEqual(Path(result_path).name, "minidump.dmp") + + def test_extract_minidump_nonexistent_file(self): + """Test that extraction fails gracefully for nonexistent file.""" + with self.assertRaises(FileNotFoundError): + extract_minidump("/nonexistent/path/to/envelope.envelope") + + def test_envelope_without_minidump(self): + """Test that extraction fails gracefully when no minidump present.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create envelope without minidump + envelope_path = Path(tmpdir) / "no_minidump.envelope" + + envelope_header = {"dsn": "https://test@sentry.invalid/42"} + event_payload = {"event_id": "test", "level": "info"} + event_bytes = json.dumps(event_payload).encode("utf-8") + event_header = {"type": "event", "length": len(event_bytes)} + + with open(envelope_path, "wb") as f: + f.write(json.dumps(envelope_header).encode("utf-8")) + f.write(b"\n") + f.write(json.dumps(event_header).encode("utf-8")) + f.write(b"\n") + f.write(event_bytes) + + with self.assertRaises(ValueError) as ctx: + extract_minidump(str(envelope_path)) + + self.assertIn("No minidump", str(ctx.exception)) + + +class TestParseEnvelope(unittest.TestCase): + """Test cases for envelope parsing edge cases.""" + + def test_parse_empty_envelope(self): + """Test parsing empty data.""" + with self.assertRaises(Exception): + parse_envelope(b"") + + def test_parse_header_only(self): + """Test parsing envelope with only header.""" + data = json.dumps({"dsn": "test"}).encode("utf-8") + b"\n" + header, items = parse_envelope(data) + self.assertEqual(header["dsn"], "test") + self.assertEqual(len(items), 0) + + def test_parse_multiple_items(self): + """Test parsing envelope with multiple items.""" + envelope_header = {"dsn": "test"} + item1_payload = b"payload1" + item1_header = {"type": "event", "length": len(item1_payload)} + item2_payload = b"payload2" + item2_header = {"type": "attachment", "length": len(item2_payload)} + + data = b"" + data += json.dumps(envelope_header).encode("utf-8") + b"\n" + data += json.dumps(item1_header).encode("utf-8") + b"\n" + data += item1_payload + data += b"\n" + data += json.dumps(item2_header).encode("utf-8") + b"\n" + data += item2_payload + + header, items = parse_envelope(data) + + self.assertEqual(len(items), 2) + self.assertEqual(items[0][1], item1_payload) + self.assertEqual(items[1][1], item2_payload) + + +if __name__ == "__main__": + # Run tests with verbosity + unittest.main(verbosity=2)