-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15307 ConsumeKinesis. Wait for long initialization in onTrigger #10664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
NIFI-15307 ConsumeKinesis. Wait for long initialization in onTrigger #10664
Conversation
|
|
||
| @Override | ||
| public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { | ||
| if (!initialized.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short-circuiting by checking a bool variable, so we don't have to inspect the content of the future after initialization completed.
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adjusting the initialization approach @awelless. Although the initial long duration is a general concern, this adjusted approach is a helpful improvement. I recommend some minor implementation adjustments, and raised a concern on test timing.
|
|
||
| @Test | ||
| // It takes around 30 seconds for a scheduler to fail in this test. | ||
| @Timeout(value = 3, unit = TimeUnit.MINUTES, threadMode = SEPARATE_THREAD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too long for a unit test. One option is moving this to an integration test. Another option is to remove the test method.
| private volatile @Nullable ReaderRecordProcessor readerRecordProcessor; | ||
|
|
||
| private volatile Future<InitializationResult> initializationResultFuture; | ||
| private volatile AtomicBoolean initialized; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommend setting a final value to avoid null checking.
| private volatile AtomicBoolean initialized; | |
| private final AtomicBoolean initialized = new AtomicBoolean(); |
Summary
NIFI-15307
Waiting for initial initialization to complete in
OnScheduledis interrupted by exceeding the timeout. This doesn't cancel resource creation on the AWS side, so after a few restarts the processor is working as expected.To provide a better user experience, the processor waits only for 30 seconds for the initialization in
OnScheduled. This should be enough when the needed AWS resources are in place.If 30 seconds were not enough, the processor will wait in the
onTriggermethod.The change was also tested manually.
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation