Skip to content

Conversation

@awelless
Copy link
Contributor

@awelless awelless commented Dec 18, 2025

Summary

NIFI-15307

Waiting for initial initialization to complete in OnScheduled is interrupted by exceeding the timeout. This doesn't cancel resource creation on the AWS side, so after a few restarts the processor is working as expected.

To provide a better user experience, the processor waits only for 30 seconds for the initialization in OnScheduled. This should be enough when the needed AWS resources are in place.
If 30 seconds were not enough, the processor will wait in the onTrigger method.

The change was also tested manually.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files


@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (!initialized.get()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short-circuiting by checking a bool variable, so we don't have to inspect the content of the future after initialization completed.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adjusting the initialization approach @awelless. Although the initial long duration is a general concern, this adjusted approach is a helpful improvement. I recommend some minor implementation adjustments, and raised a concern on test timing.


@Test
// It takes around 30 seconds for a scheduler to fail in this test.
@Timeout(value = 3, unit = TimeUnit.MINUTES, threadMode = SEPARATE_THREAD)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too long for a unit test. One option is moving this to an integration test. Another option is to remove the test method.

private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;

private volatile Future<InitializationResult> initializationResultFuture;
private volatile AtomicBoolean initialized;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend setting a final value to avoid null checking.

Suggested change
private volatile AtomicBoolean initialized;
private final AtomicBoolean initialized = new AtomicBoolean();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants