From 9fcf75dcf8b178e53bb317a2ffb2cf8c1b609b44 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Tue, 3 Feb 2026 00:03:53 +0000 Subject: [PATCH 01/11] NiFi scripts: lazy cerner blob decompression possible fix. --- nifi/user_scripts/utils/codecs/cerner_blob.py | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/nifi/user_scripts/utils/codecs/cerner_blob.py b/nifi/user_scripts/utils/codecs/cerner_blob.py index cfd299bf..6be3a3e0 100644 --- a/nifi/user_scripts/utils/codecs/cerner_blob.py +++ b/nifi/user_scripts/utils/codecs/cerner_blob.py @@ -9,7 +9,7 @@ class DecompressLzwCernerBlob: def __init__(self) -> None: self.MAX_CODES: int = 8192 self.tmp_decompression_buffer: list[int] = [0] * self.MAX_CODES - self.lzw_lookup_table: list[LzwItem] = [LzwItem()] * self.MAX_CODES + self.lzw_lookup_table: list[LzwItem] = [LzwItem() for _ in range(self.MAX_CODES)] self.tmp_buffer_index: int = 0 self.current_byte_buffer_index: int = 0 @@ -31,10 +31,11 @@ def save_to_lookup_table(self, compressed_code: int): for i in reversed(list(range(self.tmp_buffer_index + 1))): self.output_stream.append(self.tmp_decompression_buffer[i]) - def decompress(self, input_stream: bytearray = bytearray()): + def decompress(self, input_stream: bytearray): byte_buffer_index: int = 0 - + self.output_stream = bytearray() + # used for bit shifts shift: int = 1 current_shift: int = 1 @@ -45,8 +46,19 @@ def decompress(self, input_stream: bytearray = bytearray()): skip_flag: bool = False + def read_current_byte() -> int: + if byte_buffer_index >= len(input_stream): + raise ValueError("Truncated input_stream while decoding LZW") + return input_stream[byte_buffer_index] + + if not input_stream: + return self.output_stream + if byte_buffer_index >= len(input_stream): + raise ValueError("Truncated input_stream while decoding LZW") + first_code = input_stream[byte_buffer_index] + while True: if current_shift >= 9: @@ -56,8 +68,7 @@ def decompress(self, input_stream: bytearray = bytearray()): byte_buffer_index += 1 middle_code = input_stream[byte_buffer_index] - first_code = (first_code << current_shift + - 8) | (middle_code << current_shift) + first_code = (first_code << (current_shift + 8)) | (middle_code << current_shift) byte_buffer_index += 1 middle_code = input_stream[byte_buffer_index] @@ -76,19 +87,19 @@ def decompress(self, input_stream: bytearray = bytearray()): middle_code = input_stream[byte_buffer_index] if not skip_flag: - lookup_index = (first_code << current_shift) | ( - middle_code >> 8 - current_shift) + lookup_index = (first_code << current_shift) | (middle_code >> (8 - current_shift)) if lookup_index == 256: shift = 1 current_shift += 1 - first_code = input_stream[byte_buffer_index] + first_code = read_current_byte() self.tmp_decompression_buffer = [0] * self.MAX_CODES self.tmp_buffer_index = 0 - self.lzw_lookup_table = [LzwItem()] * self.MAX_CODES + self.lzw_lookup_table = [LzwItem() for _ in range(self.MAX_CODES)] self.code_count = 257 + previous_code = 0 continue elif lookup_index == 257: # EOF marker @@ -107,10 +118,12 @@ def decompress(self, input_stream: bytearray = bytearray()): self.tmp_decompression_buffer[self.tmp_buffer_index]) self.code_count += 1 else: - self.lzw_lookup_table[self.code_count] = LzwItem( - previous_code, - self.tmp_decompression_buffer[self.tmp_buffer_index]) - self.code_count += 1 + if self.code_count < self.MAX_CODES: + self.lzw_lookup_table[self.code_count] = LzwItem( + previous_code, + self.tmp_decompression_buffer[self.tmp_buffer_index] + ) + self.code_count += 1 self.save_to_lookup_table(lookup_index) # end of skipit From 15f80029b0f30e2e6088b039a37b4fbf8ab36d0c Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Tue, 3 Feb 2026 11:52:00 +0000 Subject: [PATCH 02/11] NiFi processors: added more sanity checks to cerner processor. --- .../record_decompress_cerner_blob.py | 44 ++++++++++++++----- nifi/user_scripts/utils/codecs/cerner_blob.py | 1 - typings/nifiapi/flowfiletransform.pyi | 3 +- 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py index b650beea..eef08f9d 100644 --- a/nifi/user_python_extensions/record_decompress_cerner_blob.py +++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py @@ -33,6 +33,8 @@ class Java: class ProcessorDetails: version = '0.0.1' + description = "Decompresses Cerner LZW compressed blobs from a JSON input stream" + tags = ["cerner", "oracle", "blob"] def __init__(self, jvm: JVMView): super().__init__(jvm) @@ -118,7 +120,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr # read avro record input_raw_bytes: bytes | bytearray = flowFile.getContentsAsBytes() - records = [] + records: list | dict = [] try: records = json.loads(input_raw_bytes.decode()) @@ -140,11 +142,16 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr if not records: raise ValueError("No records found in JSON input") - concatenated_blob_sequence_order = {} - output_merged_record = {} + # sanity check: blobs are from the same document_id + doc_ids: set = {str(r.get(self.document_id_field_name, "")) for r in records} + if len(doc_ids) > 1: + raise ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)[:5]}") - have_any_sequence = any(self.blob_sequence_order_field_name in record for record in records) - have_any_no_sequence = any(self.blob_sequence_order_field_name not in record for record in records) + concatenated_blob_sequence_order: dict = {} + output_merged_record: dict = {} + + have_any_sequence: bool = any(self.blob_sequence_order_field_name in record for record in records) + have_any_no_sequence: bool = any(self.blob_sequence_order_field_name not in record for record in records) if have_any_sequence and have_any_no_sequence: raise ValueError( @@ -174,16 +181,27 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr full_compressed_blob = bytearray() - for k in sorted(concatenated_blob_sequence_order.keys()): + # double check to make sure there is no gap in the blob sequence, i.e missing blob. + order_of_blobs_keys = sorted(concatenated_blob_sequence_order.keys()) + for i in range(1, len(order_of_blobs_keys)): + if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1: + raise ValueError( + f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} " + f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})" + ) + + for k in order_of_blobs_keys: v = concatenated_blob_sequence_order[k] if self.binary_field_source_encoding == "base64": if not isinstance(v, str): - raise ValueError(f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}") + raise ValueError( + f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}" + ) try: temporary_blob = base64.b64decode(v, validate=True) except Exception as e: - raise ValueError(f"Error decoding base64 blob part {k}: {e}") + raise ValueError(f"Error decoding base64 blob part {k}: {e}") from e else: # raw bytes path if isinstance(v, (bytes, bytearray)): @@ -196,7 +214,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr try: decompress_blob = DecompressLzwCernerBlob() decompress_blob.decompress(full_compressed_blob) - output_merged_record[self.binary_field_name] = decompress_blob.output_stream + output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream) except Exception as exception: self.logger.error(f"Error decompressing cerner blob: {str(exception)} \n") raise exception @@ -209,11 +227,17 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, "")) attributes["binary_field"] = str(self.binary_field_name) attributes["output_text_field_name"] = str(self.output_text_field_name) attributes["mime.type"] = "application/json" + attributes["blob_parts"] = str(len(order_of_blobs_keys)) + attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else "" + attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else "" + attributes["compressed_len"] = str(len(full_compressed_blob)) + attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex() - return FlowFileTransformResult(relationship="success", + return FlowFileTransformResult(relationship=self.REL_SUCCESS, attributes=attributes, contents=json.dumps(output_contents).encode("utf-8")) except Exception as exception: diff --git a/nifi/user_scripts/utils/codecs/cerner_blob.py b/nifi/user_scripts/utils/codecs/cerner_blob.py index 6be3a3e0..dbfc4332 100644 --- a/nifi/user_scripts/utils/codecs/cerner_blob.py +++ b/nifi/user_scripts/utils/codecs/cerner_blob.py @@ -58,7 +58,6 @@ def read_current_byte() -> int: first_code = input_stream[byte_buffer_index] - while True: if current_shift >= 9: diff --git a/typings/nifiapi/flowfiletransform.pyi b/typings/nifiapi/flowfiletransform.pyi index 65306797..c1950c51 100644 --- a/typings/nifiapi/flowfiletransform.pyi +++ b/typings/nifiapi/flowfiletransform.pyi @@ -1,5 +1,6 @@ from typing import Any, Protocol +from nifiapi.relationship import Relationship from py4j.java_gateway import JavaObject from .properties import ProcessContext @@ -13,7 +14,7 @@ class FlowFileTransform(Protocol): class FlowFileTransformResult: def __init__( self, - relationship: str, + relationship: str | Relationship, attributes: dict[str, str], contents: bytes | None = None, ) -> None: ... From 68acac894ba5a54715e50adc73c8d2b7fb7ade8c Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Tue, 3 Feb 2026 12:59:48 +0000 Subject: [PATCH 03/11] NiFi scripts: more Cerner blob processing attempted fixes. --- nifi/user_scripts/utils/codecs/cerner_blob.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/nifi/user_scripts/utils/codecs/cerner_blob.py b/nifi/user_scripts/utils/codecs/cerner_blob.py index dbfc4332..3bf53d61 100644 --- a/nifi/user_scripts/utils/codecs/cerner_blob.py +++ b/nifi/user_scripts/utils/codecs/cerner_blob.py @@ -8,6 +8,8 @@ def __init__(self, _prefix: int = 0, _suffix: int = 0) -> None: class DecompressLzwCernerBlob: def __init__(self) -> None: self.MAX_CODES: int = 8192 + # Safety guard to avoid runaway prefix chains on corrupt inputs + self.MAX_PREFIX_EXPANSION: int = self.MAX_CODES * 4 self.tmp_decompression_buffer: list[int] = [0] * self.MAX_CODES self.lzw_lookup_table: list[LzwItem] = [LzwItem() for _ in range(self.MAX_CODES)] self.tmp_buffer_index: int = 0 @@ -17,15 +19,29 @@ def __init__(self) -> None: self.code_count: int = 257 self.output_stream = bytearray() + def _ensure_tmp_capacity(self, required_index: int) -> None: + if required_index < len(self.tmp_decompression_buffer): + return + new_len = max(required_index + 1, len(self.tmp_decompression_buffer) * 2) + self.tmp_decompression_buffer.extend([0] * (new_len - len(self.tmp_decompression_buffer))) + def save_to_lookup_table(self, compressed_code: int): self.tmp_buffer_index = -1 + steps = 0 while compressed_code >= 258: + if steps > self.MAX_PREFIX_EXPANSION: + raise ValueError( + "Invalid LZW stream: prefix chain too long (possible corrupt input)." + ) self.tmp_buffer_index += 1 + self._ensure_tmp_capacity(self.tmp_buffer_index) self.tmp_decompression_buffer[self.tmp_buffer_index] = \ self.lzw_lookup_table[compressed_code].suffix compressed_code = self.lzw_lookup_table[compressed_code].prefix + steps += 1 self.tmp_buffer_index += 1 + self._ensure_tmp_capacity(self.tmp_buffer_index) self.tmp_decompression_buffer[self.tmp_buffer_index] = compressed_code for i in reversed(list(range(self.tmp_buffer_index + 1))): From 42ea18e1d0811b0ee4487352373d712edea061f9 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 4 Feb 2026 10:33:17 +0000 Subject: [PATCH 04/11] NiFi scripts: Cerner blob decompression updates. --- nifi/user_scripts/utils/codecs/cerner_blob.py | 80 ++++++++----------- 1 file changed, 34 insertions(+), 46 deletions(-) diff --git a/nifi/user_scripts/utils/codecs/cerner_blob.py b/nifi/user_scripts/utils/codecs/cerner_blob.py index 3bf53d61..b62a2363 100644 --- a/nifi/user_scripts/utils/codecs/cerner_blob.py +++ b/nifi/user_scripts/utils/codecs/cerner_blob.py @@ -8,8 +8,6 @@ def __init__(self, _prefix: int = 0, _suffix: int = 0) -> None: class DecompressLzwCernerBlob: def __init__(self) -> None: self.MAX_CODES: int = 8192 - # Safety guard to avoid runaway prefix chains on corrupt inputs - self.MAX_PREFIX_EXPANSION: int = self.MAX_CODES * 4 self.tmp_decompression_buffer: list[int] = [0] * self.MAX_CODES self.lzw_lookup_table: list[LzwItem] = [LzwItem() for _ in range(self.MAX_CODES)] self.tmp_buffer_index: int = 0 @@ -19,39 +17,29 @@ def __init__(self) -> None: self.code_count: int = 257 self.output_stream = bytearray() - def _ensure_tmp_capacity(self, required_index: int) -> None: - if required_index < len(self.tmp_decompression_buffer): - return - new_len = max(required_index + 1, len(self.tmp_decompression_buffer) * 2) - self.tmp_decompression_buffer.extend([0] * (new_len - len(self.tmp_decompression_buffer))) - def save_to_lookup_table(self, compressed_code: int): self.tmp_buffer_index = -1 - steps = 0 while compressed_code >= 258: - if steps > self.MAX_PREFIX_EXPANSION: - raise ValueError( - "Invalid LZW stream: prefix chain too long (possible corrupt input)." - ) self.tmp_buffer_index += 1 - self._ensure_tmp_capacity(self.tmp_buffer_index) - self.tmp_decompression_buffer[self.tmp_buffer_index] = \ - self.lzw_lookup_table[compressed_code].suffix + self.tmp_decompression_buffer[self.tmp_buffer_index] = self.lzw_lookup_table[compressed_code].suffix compressed_code = self.lzw_lookup_table[compressed_code].prefix - steps += 1 self.tmp_buffer_index += 1 - self._ensure_tmp_capacity(self.tmp_buffer_index) self.tmp_decompression_buffer[self.tmp_buffer_index] = compressed_code - for i in reversed(list(range(self.tmp_buffer_index + 1))): - self.output_stream.append(self.tmp_decompression_buffer[i]) + for i in reversed(range(self.tmp_buffer_index + 1)): + v = self.tmp_decompression_buffer[i] + if not (0 <= v <= 255): + raise ValueError(f"Invalid output byte {v} (expected 0..255)") + self.output_stream.append(v) def decompress(self, input_stream: bytearray): + if not input_stream: + raise ValueError("Empty input_stream") byte_buffer_index: int = 0 self.output_stream = bytearray() - + # used for bit shifts shift: int = 1 current_shift: int = 1 @@ -62,43 +50,43 @@ def decompress(self, input_stream: bytearray): skip_flag: bool = False - def read_current_byte() -> int: - if byte_buffer_index >= len(input_stream): - raise ValueError("Truncated input_stream while decoding LZW") - return input_stream[byte_buffer_index] - - if not input_stream: - return self.output_stream - if byte_buffer_index >= len(input_stream): - raise ValueError("Truncated input_stream while decoding LZW") - first_code = input_stream[byte_buffer_index] while True: if current_shift >= 9: - current_shift -= 8 - if first_code != 0: byte_buffer_index += 1 + if byte_buffer_index >= len(input_stream): + raise ValueError("Truncated input_stream") + middle_code = input_stream[byte_buffer_index] first_code = (first_code << (current_shift + 8)) | (middle_code << current_shift) byte_buffer_index += 1 + + if byte_buffer_index >= len(input_stream): + raise ValueError("Truncated input_stream") + middle_code = input_stream[byte_buffer_index] tmp_code = middle_code >> (8 - current_shift) lookup_index = first_code | tmp_code - skip_flag = True else: byte_buffer_index += 1 + if byte_buffer_index >= len(input_stream): + raise ValueError("Truncated input_stream") first_code = input_stream[byte_buffer_index] byte_buffer_index += 1 + if byte_buffer_index >= len(input_stream): + raise ValueError("Truncated input_stream") middle_code = input_stream[byte_buffer_index] else: byte_buffer_index += 1 + if byte_buffer_index >= len(input_stream): + raise ValueError("Truncated input_stream") middle_code = input_stream[byte_buffer_index] if not skip_flag: @@ -106,15 +94,16 @@ def read_current_byte() -> int: if lookup_index == 256: shift = 1 - current_shift += 1 - first_code = read_current_byte() - + current_shift = 1 + previous_code = 0 + skip_flag = False + self.tmp_decompression_buffer = [0] * self.MAX_CODES self.tmp_buffer_index = 0 - self.lzw_lookup_table = [LzwItem() for _ in range(self.MAX_CODES)] self.code_count = 257 - previous_code = 0 + + first_code = input_stream[byte_buffer_index] continue elif lookup_index == 257: # EOF marker @@ -125,19 +114,17 @@ def read_current_byte() -> int: # skipit part if previous_code == 0: self.tmp_decompression_buffer[0] = lookup_index + if lookup_index < self.code_count: self.save_to_lookup_table(lookup_index) if self.code_count < self.MAX_CODES: - self.lzw_lookup_table[self.code_count] = LzwItem( - previous_code, - self.tmp_decompression_buffer[self.tmp_buffer_index]) + self.lzw_lookup_table[self.code_count] = \ + LzwItem(previous_code, self.tmp_decompression_buffer[self.tmp_buffer_index]) self.code_count += 1 else: if self.code_count < self.MAX_CODES: - self.lzw_lookup_table[self.code_count] = LzwItem( - previous_code, - self.tmp_decompression_buffer[self.tmp_buffer_index] - ) + self.lzw_lookup_table[self.code_count] = \ + LzwItem(previous_code, self.tmp_decompression_buffer[self.tmp_buffer_index]) self.code_count += 1 self.save_to_lookup_table(lookup_index) # end of skipit @@ -148,4 +135,5 @@ def read_current_byte() -> int: if self.code_count in [511, 1023, 2047, 4095]: shift += 1 current_shift += 1 + previous_code = lookup_index From b4ef135583d30cef5a9f2f0fa083b91761872126 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 4 Feb 2026 12:53:27 +0000 Subject: [PATCH 05/11] NiFi: cerner processor added extra debug attrs. --- .../record_decompress_cerner_blob.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py index eef08f9d..7bec6ebd 100644 --- a/nifi/user_python_extensions/record_decompress_cerner_blob.py +++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py @@ -211,6 +211,20 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr full_compressed_blob.extend(temporary_blob) + # build / add new attributes to dict before doing anything else to have some trace. + + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} + attributes["document_id_field_name"] = str(self.document_id_field_name) + attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, "")) + attributes["binary_field"] = str(self.binary_field_name) + attributes["output_text_field_name"] = str(self.output_text_field_name) + attributes["mime.type"] = "application/json" + attributes["blob_parts"] = str(len(order_of_blobs_keys)) + attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else "" + attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else "" + attributes["compressed_len"] = str(len(full_compressed_blob)) + attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex() + try: decompress_blob = DecompressLzwCernerBlob() decompress_blob.decompress(full_compressed_blob) @@ -225,18 +239,6 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_contents.append(output_merged_record) - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["document_id_field_name"] = str(self.document_id_field_name) - attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, "")) - attributes["binary_field"] = str(self.binary_field_name) - attributes["output_text_field_name"] = str(self.output_text_field_name) - attributes["mime.type"] = "application/json" - attributes["blob_parts"] = str(len(order_of_blobs_keys)) - attributes["blob_seq_min"] = str(order_of_blobs_keys[0]) if order_of_blobs_keys else "" - attributes["blob_seq_max"] = str(order_of_blobs_keys[-1]) if order_of_blobs_keys else "" - attributes["compressed_len"] = str(len(full_compressed_blob)) - attributes["compressed_head_hex"] = bytes(full_compressed_blob[:16]).hex() - return FlowFileTransformResult(relationship=self.REL_SUCCESS, attributes=attributes, contents=json.dumps(output_contents).encode("utf-8")) From 4742785a8945e3a3b382634a43667f5af0eda988 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 4 Feb 2026 15:25:38 +0000 Subject: [PATCH 06/11] NiFI: cerner blob processor update. --- .../record_decompress_cerner_blob.py | 103 ++++++++++++++---- .../utils/nifi/base_nifi_processor.py | 29 +++-- 2 files changed, 102 insertions(+), 30 deletions(-) diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py index 7bec6ebd..307e4189 100644 --- a/nifi/user_python_extensions/record_decompress_cerner_blob.py +++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py @@ -112,6 +112,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr """ output_contents: list = [] + attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} try: self.process_context = context @@ -134,18 +135,33 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr records = json.loads(input_raw_bytes.decode("windows-1252")) except json.JSONDecodeError as e: self.logger.error(f"Error decoding JSON: {str(e)} \n with windows-1252") - raise + return self.build_failure_result( + flowFile, + ValueError(f"Error decoding JSON: {e}"), + attributes=attributes, + contents=input_raw_bytes, + ) if not isinstance(records, list): records = [records] if not records: - raise ValueError("No records found in JSON input") + return self.build_failure_result( + flowFile, + ValueError("No records found in JSON input"), + attributes=attributes, + contents=input_raw_bytes, + ) # sanity check: blobs are from the same document_id doc_ids: set = {str(r.get(self.document_id_field_name, "")) for r in records} if len(doc_ids) > 1: - raise ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)[:5]}") + return self.build_failure_result( + flowFile, + ValueError(f"Multiple document IDs in one FlowFile: {list(doc_ids)}"), + attributes=attributes, + contents=input_raw_bytes, + ) concatenated_blob_sequence_order: dict = {} output_merged_record: dict = {} @@ -154,19 +170,35 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr have_any_no_sequence: bool = any(self.blob_sequence_order_field_name not in record for record in records) if have_any_sequence and have_any_no_sequence: - raise ValueError( - f"Mixed records: some have '{self.blob_sequence_order_field_name}', some don't. " - "Cannot safely reconstruct blob stream." + return self.build_failure_result( + flowFile, + ValueError( + f"Mixed records: some have '{self.blob_sequence_order_field_name}', some don't. " + "Cannot safely reconstruct blob stream." + ), + attributes=attributes, + contents=input_raw_bytes, ) for record in records: if self.binary_field_name not in record or record[self.binary_field_name] in (None, ""): - raise ValueError(f"Missing '{self.binary_field_name}' in a record") + return self.build_failure_result( + flowFile, + ValueError(f"Missing '{self.binary_field_name}' in a record"), + attributes=attributes, + contents=input_raw_bytes, + ) if have_any_sequence: seq = int(record[self.blob_sequence_order_field_name]) if seq in concatenated_blob_sequence_order: - raise ValueError(f"Duplicate {self.blob_sequence_order_field_name}: {seq}") + return self.build_failure_result( + flowFile, + ValueError(f"Invalid {self.blob_sequence_order_field_name}: {e}"), + attributes=attributes, + contents=input_raw_bytes, + ) + concatenated_blob_sequence_order[seq] = record[self.binary_field_name] else: # no sequence anywhere: preserve record order (0..n-1) @@ -185,35 +217,54 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr order_of_blobs_keys = sorted(concatenated_blob_sequence_order.keys()) for i in range(1, len(order_of_blobs_keys)): if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1: - raise ValueError( - f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} " - f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})" + return self.build_failure_result( + flowFile, + ValueError(f"Missing '{self.binary_field_name}' in a record"), + attributes=attributes, + contents=input_raw_bytes, ) for k in order_of_blobs_keys: v = concatenated_blob_sequence_order[k] + temporary_blob = bytearray() + if self.binary_field_source_encoding == "base64": if not isinstance(v, str): - raise ValueError( - f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}" + return self.build_failure_result( + flowFile, + ValueError( + f"Expected base64 string in {self.binary_field_name} for part {k}, got {type(v)}" + ), + attributes=attributes, + contents=input_raw_bytes, ) try: temporary_blob = base64.b64decode(v, validate=True) except Exception as e: - raise ValueError(f"Error decoding base64 blob part {k}: {e}") from e + return self.build_failure_result( + flowFile, + ValueError(f"Error decoding base64 blob part {k}: {e}"), + attributes=attributes, + contents=input_raw_bytes, + ) else: # raw bytes path if isinstance(v, (bytes, bytearray)): temporary_blob = v else: - raise ValueError(f"Expected bytes in {self.binary_field_name} for part {k}, got {type(v)}") - + return self.build_failure_result( + flowFile, + ValueError( + f"Expected bytes in {self.binary_field_name} for part {k}, got {type(v)}" + ), + attributes=attributes, + contents=input_raw_bytes, + ) + full_compressed_blob.extend(temporary_blob) # build / add new attributes to dict before doing anything else to have some trace. - - attributes: dict = {k: str(v) for k, v in flowFile.getAttributes().items()} attributes["document_id_field_name"] = str(self.document_id_field_name) attributes["document_id"] = str(output_merged_record.get(self.document_id_field_name, "")) attributes["binary_field"] = str(self.binary_field_name) @@ -231,7 +282,13 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream) except Exception as exception: self.logger.error(f"Error decompressing cerner blob: {str(exception)} \n") - raise exception + return self.build_failure_result( + flowFile, + exception, + attributes=attributes, + contents=locals().get("input_raw_bytes", flowFile.getContentsAsBytes()), + include_flowfile_attributes=False, + ) if self.output_mode == "base64": output_merged_record[self.binary_field_name] = \ @@ -244,4 +301,10 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr contents=json.dumps(output_contents).encode("utf-8")) except Exception as exception: self.logger.error("Exception during flowfile processing: " + traceback.format_exc()) - raise exception + return self.build_failure_result( + flowFile, + exception, + attributes=attributes, + contents=locals().get("input_raw_bytes", flowFile.getContentsAsBytes()), + include_flowfile_attributes=False, + ) diff --git a/nifi/user_scripts/utils/nifi/base_nifi_processor.py b/nifi/user_scripts/utils/nifi/base_nifi_processor.py index 8e28ef3b..0f787485 100644 --- a/nifi/user_scripts/utils/nifi/base_nifi_processor.py +++ b/nifi/user_scripts/utils/nifi/base_nifi_processor.py @@ -164,7 +164,9 @@ def build_failure_result( flowFile: JavaObject, exception: Exception, *, + attributes: dict | None = None, include_flowfile_attributes: bool = False, + contents: bytes | bytearray | None = None, ) -> FlowFileTransformResult: """ Build a failure FlowFileTransformResult with exception metadata. @@ -172,7 +174,9 @@ def build_failure_result( Args: flowFile: The FlowFile being processed. exception: The exception raised during processing. - include_flowfile_attributes: If true, include all FlowFile attributes. + attributes: Optional pre-built attributes dict to use/extend. + include_flowfile_attributes: If true, merge in all FlowFile attributes. + contents: Optional override for contents; defaults to the incoming FlowFile contents. Returns: A FlowFileTransformResult targeting the failure relationship. @@ -180,19 +184,24 @@ def build_failure_result( exception_name = type(exception).__name__ exception_message = str(exception) - exception_value = ( - f"{exception_name}: {exception_message}" if exception_message else exception_name - ) + exception_value = f"{exception_name}: {exception_message}" if exception_message else exception_name + + merged_attributes: dict[str, str] = {} + if attributes: + merged_attributes.update({k: str(v) for k, v in attributes.items()}) - attributes = {} if include_flowfile_attributes: - attributes = {k: str(v) for k, v in flowFile.getAttributes().items()} - attributes["exception"] = exception_value + merged_attributes.update({k: str(v) for k, v in flowFile.getAttributes().items()}) + + merged_attributes["exception"] = exception_value + + if contents is None: + contents = flowFile.getContentsAsBytes() return FlowFileTransformResult( - relationship="failure", - attributes=attributes, - contents=flowFile.getContentsAsBytes(), + relationship=self.REL_FAILURE, + attributes=merged_attributes, + contents=contents, ) def onScheduled(self, context: ProcessContext) -> None: From 0433a1535c6a9a292d2c3ca3e083f846a23c8d7b Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 4 Feb 2026 15:31:13 +0000 Subject: [PATCH 07/11] NiFI: base nifi proc update. --- nifi/user_scripts/utils/nifi/base_nifi_processor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nifi/user_scripts/utils/nifi/base_nifi_processor.py b/nifi/user_scripts/utils/nifi/base_nifi_processor.py index 0f787485..70e6b0bd 100644 --- a/nifi/user_scripts/utils/nifi/base_nifi_processor.py +++ b/nifi/user_scripts/utils/nifi/base_nifi_processor.py @@ -163,7 +163,6 @@ def build_failure_result( self, flowFile: JavaObject, exception: Exception, - *, attributes: dict | None = None, include_flowfile_attributes: bool = False, contents: bytes | bytearray | None = None, From b87164ee583b8232588e18de1e2af3afb4a7cd1c Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 4 Feb 2026 15:40:27 +0000 Subject: [PATCH 08/11] Nifi: cerner processor corrections. --- .../record_decompress_cerner_blob.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py index 307e4189..9dc5b172 100644 --- a/nifi/user_python_extensions/record_decompress_cerner_blob.py +++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py @@ -134,10 +134,9 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr try: records = json.loads(input_raw_bytes.decode("windows-1252")) except json.JSONDecodeError as e: - self.logger.error(f"Error decoding JSON: {str(e)} \n with windows-1252") return self.build_failure_result( flowFile, - ValueError(f"Error decoding JSON: {e}"), + ValueError(f"Error decoding JSON: {str(e)} \n with windows-1252"), attributes=attributes, contents=input_raw_bytes, ) @@ -194,7 +193,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr if seq in concatenated_blob_sequence_order: return self.build_failure_result( flowFile, - ValueError(f"Invalid {self.blob_sequence_order_field_name}: {e}"), + ValueError(f"Invalid {self.blob_sequence_order_field_name}: {seq}"), attributes=attributes, contents=input_raw_bytes, ) @@ -219,15 +218,18 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr if order_of_blobs_keys[i] != order_of_blobs_keys[i-1] + 1: return self.build_failure_result( flowFile, - ValueError(f"Missing '{self.binary_field_name}' in a record"), + ValueError( + f"Sequence gap: missing {order_of_blobs_keys[i-1] + 1} " + f"(have {order_of_blobs_keys[i-1]} then {order_of_blobs_keys[i]})" + ), attributes=attributes, contents=input_raw_bytes, ) - + for k in order_of_blobs_keys: v = concatenated_blob_sequence_order[k] - temporary_blob = bytearray() + temporary_blob: bytes = b"" if self.binary_field_source_encoding == "base64": if not isinstance(v, str): @@ -286,7 +288,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr flowFile, exception, attributes=attributes, - contents=locals().get("input_raw_bytes", flowFile.getContentsAsBytes()), + contents=input_raw_bytes, include_flowfile_attributes=False, ) From 7a0acda344bed733ec8c2a5b1487cd4176989ca9 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 4 Feb 2026 15:51:37 +0000 Subject: [PATCH 09/11] NiFi: base nifi proc updates. logging fixes for cerner decompression. --- .../record_decompress_cerner_blob.py | 7 +++---- nifi/user_scripts/utils/nifi/base_nifi_processor.py | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py index 9dc5b172..95ce5865 100644 --- a/nifi/user_python_extensions/record_decompress_cerner_blob.py +++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py @@ -193,7 +193,7 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr if seq in concatenated_blob_sequence_order: return self.build_failure_result( flowFile, - ValueError(f"Invalid {self.blob_sequence_order_field_name}: {seq}"), + ValueError(f"Duplicate {self.blob_sequence_order_field_name}: {seq}"), attributes=attributes, contents=input_raw_bytes, ) @@ -283,13 +283,12 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr decompress_blob.decompress(full_compressed_blob) output_merged_record[self.binary_field_name] = bytes(decompress_blob.output_stream) except Exception as exception: - self.logger.error(f"Error decompressing cerner blob: {str(exception)} \n") return self.build_failure_result( flowFile, exception, attributes=attributes, contents=input_raw_bytes, - include_flowfile_attributes=False, + include_flowfile_attributes=False ) if self.output_mode == "base64": @@ -308,5 +307,5 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr exception, attributes=attributes, contents=locals().get("input_raw_bytes", flowFile.getContentsAsBytes()), - include_flowfile_attributes=False, + include_flowfile_attributes=False ) diff --git a/nifi/user_scripts/utils/nifi/base_nifi_processor.py b/nifi/user_scripts/utils/nifi/base_nifi_processor.py index 70e6b0bd..0f787485 100644 --- a/nifi/user_scripts/utils/nifi/base_nifi_processor.py +++ b/nifi/user_scripts/utils/nifi/base_nifi_processor.py @@ -163,6 +163,7 @@ def build_failure_result( self, flowFile: JavaObject, exception: Exception, + *, attributes: dict | None = None, include_flowfile_attributes: bool = False, contents: bytes | bytearray | None = None, From ade36b7cd9afdc7ae2d5df346f2d32f33be3114f Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 4 Feb 2026 22:18:35 +0000 Subject: [PATCH 10/11] NiFI: cerner blob decompress revert last commir. --- nifi/user_scripts/utils/codecs/cerner_blob.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/nifi/user_scripts/utils/codecs/cerner_blob.py b/nifi/user_scripts/utils/codecs/cerner_blob.py index b62a2363..9596016c 100644 --- a/nifi/user_scripts/utils/codecs/cerner_blob.py +++ b/nifi/user_scripts/utils/codecs/cerner_blob.py @@ -1,4 +1,3 @@ - class LzwItem: def __init__(self, _prefix: int = 0, _suffix: int = 0) -> None: self.prefix = _prefix @@ -57,7 +56,7 @@ def decompress(self, input_stream: bytearray): current_shift -= 8 if first_code != 0: byte_buffer_index += 1 - if byte_buffer_index >= len(input_stream): + if byte_buffer_index >= len(input_stream): raise ValueError("Truncated input_stream") middle_code = input_stream[byte_buffer_index] @@ -66,7 +65,7 @@ def decompress(self, input_stream: bytearray): byte_buffer_index += 1 - if byte_buffer_index >= len(input_stream): + if byte_buffer_index >= len(input_stream): raise ValueError("Truncated input_stream") middle_code = input_stream[byte_buffer_index] @@ -76,16 +75,16 @@ def decompress(self, input_stream: bytearray): skip_flag = True else: byte_buffer_index += 1 - if byte_buffer_index >= len(input_stream): + if byte_buffer_index >= len(input_stream): raise ValueError("Truncated input_stream") first_code = input_stream[byte_buffer_index] byte_buffer_index += 1 - if byte_buffer_index >= len(input_stream): + if byte_buffer_index >= len(input_stream): raise ValueError("Truncated input_stream") middle_code = input_stream[byte_buffer_index] else: byte_buffer_index += 1 - if byte_buffer_index >= len(input_stream): + if byte_buffer_index >= len(input_stream): raise ValueError("Truncated input_stream") middle_code = input_stream[byte_buffer_index] @@ -97,7 +96,7 @@ def decompress(self, input_stream: bytearray): current_shift = 1 previous_code = 0 skip_flag = False - + self.tmp_decompression_buffer = [0] * self.MAX_CODES self.tmp_buffer_index = 0 self.lzw_lookup_table = [LzwItem() for _ in range(self.MAX_CODES)] From 0525cec05ac7be6155bc2d6eb18e9521c914ffa4 Mon Sep 17 00:00:00 2001 From: vladd-bit Date: Wed, 4 Feb 2026 23:22:36 +0000 Subject: [PATCH 11/11] NiFi scripts: error handling fix. --- .../user_python_extensions/record_decompress_cerner_blob.py | 6 +++--- nifi/user_scripts/utils/nifi/base_nifi_processor.py | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/nifi/user_python_extensions/record_decompress_cerner_blob.py b/nifi/user_python_extensions/record_decompress_cerner_blob.py index 95ce5865..718a8a14 100644 --- a/nifi/user_python_extensions/record_decompress_cerner_blob.py +++ b/nifi/user_python_extensions/record_decompress_cerner_blob.py @@ -285,10 +285,10 @@ def transform(self, context: ProcessContext, flowFile: JavaObject) -> FlowFileTr except Exception as exception: return self.build_failure_result( flowFile, - exception, + exception=exception, attributes=attributes, - contents=input_raw_bytes, - include_flowfile_attributes=False + include_flowfile_attributes=False, + contents=input_raw_bytes ) if self.output_mode == "base64": diff --git a/nifi/user_scripts/utils/nifi/base_nifi_processor.py b/nifi/user_scripts/utils/nifi/base_nifi_processor.py index 0f787485..f29478cf 100644 --- a/nifi/user_scripts/utils/nifi/base_nifi_processor.py +++ b/nifi/user_scripts/utils/nifi/base_nifi_processor.py @@ -163,10 +163,11 @@ def build_failure_result( self, flowFile: JavaObject, exception: Exception, - *, attributes: dict | None = None, include_flowfile_attributes: bool = False, contents: bytes | bytearray | None = None, + *args, + **kwargs, ) -> FlowFileTransformResult: """ Build a failure FlowFileTransformResult with exception metadata. @@ -201,7 +202,7 @@ def build_failure_result( return FlowFileTransformResult( relationship=self.REL_FAILURE, attributes=merged_attributes, - contents=contents, + contents=contents ) def onScheduled(self, context: ProcessContext) -> None: