AMQ-9855: VMTransport, Defensive copy of messages to prevent mutation#1659
AMQ-9855: VMTransport, Defensive copy of messages to prevent mutation#1659pradeep85841 wants to merge 1 commit intoapache:mainfrom
Conversation
|
Have you observed a bug or problem with message mutation? If so, please provide the scenario and reproducible test case. ActiveMQConnectionFactory already has copyMessageOnSend enabled by default. I need help understanding why (in effect) 2 copies are needed. |
|
@pradeep85841 can you please provide some details here ? Do you have an issue/test case ? It's not obvious to me if it's cosmetic or actual issue. |
|
This is based on issue with vm:// and topics (AMQ-9855). With VMTransport the same ActiveMQMessage instance is dispatched to multiple consumers. If one consumer reads or mutates the body (Camel split/processor does this), other consumers can see an empty body. This does not happen over tcp:// because marshal/unmarshal creates a copy. copyMessageOnSend only applies at the producer and broker boundary. The problem here happens inside the broker during dispatch, so that setting does not help. The change makes VM transport behave consistently with TCP transport and avoids shared mutable state Happy to add a test if needed. |
|
OK, let me take a new look. I'm adding the Jira id in the title of the PR to avoid confusion. Thanks. |
|
This probably needs a config flag that is a corollary to the copyMessageOnSend ie. copyMessageOnDispatch |
|
Sounds good, thanks. |
|
Applied a defensive copy in VMTransport.java to ensure each consumer on vm:// topics receives an independent message. Added VMTransportDefensiveCopyTest.java to reproduce the issue and verify the fix. Confirms that message bodies are not shared/mutated between consumers. Addresses the intermittent null/empty body problem reported in AMQ-9855. |
|
Should this be enabled on the client connection factory? JMS messages are supposed to be immutable. If someone is modifying it post recv, seems like that is on the app-side to ensure the spec is not violated when using vm transport. |
|
Thanks for the suggestion. The issue in AMQ-9855 happens inside the vm:// transport dispatch path. The same ActiveMQMessage instance is delivered to multiple consumers in-memory. With tcp://, marshal/unmarshal creates separate instances, so this does not occur. So this is not about client code mutating a received JMS message, it’s about vm:// sharing the same object reference across consumers, which makes its behaviour inconsistent with other transports. |
|
The consumer-side may perform the copy, no? |
|
The consumer side could defensively copy the message, but that would not solve the root cause. By the time the consumer performs the copy, the same message instance may already have been observed or mutated by another consumer. In that case, the copy would simply duplicate an already-modified state. |
|
This is an interesting PR because it's a good way to have the VM transport behavior close to another "remote" transport connector. However, there's a purpose about the VM: direct communication without marshaling. |
|
@mattrpav I would appreciate that you don't remove me from the reviewer when I requested it. Thanks. |
|
@pradeep85841 That's a very good catch. And I think it makes a lot of sense to have this to make sure vm transport behaves the same as any remote protocol. |
|
Thanks for the feedback. I’ve removed the Dockerfile and entrypoint.sh changes from this PR since they belong to the separate non-root Docker work. This PR is now strictly focused on the VM transport change for AMQ-9855. |
jeanouii
left a comment
There was a problem hiding this comment.
Overall looking good to me. Great job!
See comments bellow
| import java.sql.PreparedStatement; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.*; |
There was a problem hiding this comment.
This is discouraged in the project, even though it's not a blocker
|
|
||
| import org.apache.activemq.command.ShutdownInfo; | ||
| import jakarta.jms.JMSException; | ||
| import org.apache.activemq.command.*; |
| private volatile int receiveCounter; | ||
|
|
||
| private final List<TransportListener> listeners = new CopyOnWriteArrayList<>(); | ||
| private final ExecutorService executor = Executors.newCachedThreadPool(); |
There was a problem hiding this comment.
I'm probably blind or my search/replace does not work properly. Where are the 2 fields used so far?
| toSend = wf.unmarshal(data); // deep copy | ||
| } catch (IOException e) { | ||
| LOG.warn("Failed to deep copy MessageDispatch, sending original", e); | ||
| toSend = command; |
There was a problem hiding this comment.
I'm wondering if it's desire or not to be honest. The goal to me of this PR (and it's great) is to have VM transport behave the same as other remote transport. Benefit being that others in the same JVM can't mutate the message. Great!
Now if we can't serialize/de-serialize to create a deep copy and we still send the original, we might introduce a case where VM does work when remote does not. So I'm tempted to just fail here. What do you think?
There was a problem hiding this comment.
command.copy() should suffice here
edit: Agree, we definitely don't want to instantiate an OpenWireFormat object per-message here.
| ByteSequence data = wf.marshal(original); | ||
| toSend = (ActiveMQMessage) wf.unmarshal(data); | ||
| } catch (IOException e) { | ||
| LOG.warn("Failed to marshal/unmarshal ActiveMQMessage, sending original", e); |
| } | ||
|
|
||
| // Dispatch to listener | ||
| dispatch(peer, peer.messageQueue, toSend); |
There was a problem hiding this comment.
Should we have a return after this one to avoid the second dispatch bellow?
There was a problem hiding this comment.
This PR should be rebased against apache/main to avoid this class to be here
cshannon
left a comment
There was a problem hiding this comment.
In the past I have also noticed some odd behavior with the VM transport when using clients (producers/consumers) that could be a thread safety bug (it is used for bridges though and seems fine there so that is interesting).
However, this proposed fix in this PR doesn't make sense to me at all because the entire point is to NOT need to marshal the message. If you want to marshal the object then you should just use the TCP transport as you lose the benefits.
The VM transport is supposed to already do a deep copy during dispatch to prevent issues with multi-threading. It's possible there is an issue with that, so that's probably where to look for a fix vs adding in marshaling.
Here is where the copying is doing on dispatch in the connection:
activemq/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
Lines 1906 to 1929 in a8fa4b0
I would start there as to investigating if that is the issue
|
I think @cshannon has a very good point. The VM purpose is to "bypass" the marshaling (direct JVM communication). It was my first question about the use case. If the issue is about multi-threading/consumer, it's worth to investigate there (and not change the intentional behavior). |
Thanks for the background @cshannon |
3891225 to
3b71d42
Compare
|
I’ve applied all requested changes: cleaned up imports and unused fields in VMTransport.java, ensured proper deep-copy handling in oneway() so VM transport behaves like other remote transports, added a return to avoid duplicate dispatch, and updated the unit tests to focus on the AMQ-9855 scenario. All changes are tested and ready for review. |
|
It looks like a bunch of binary files were committed to this |
1166c3f to
ba1b109
Compare
|
I’ve cleaned up the branch and rebased against main. The PR now only contains the intended changes: VMTransport.java (defensive copy in oneway() for AMQ-9855) The corresponding unit test |
|
The latest changes here still do not address this comment I made: #1659 (review) This is still marshaling which we should not be doing, with the VM transport we should just be using the message copy methods. BUT, the question is still what is the real issue and why is this happening? As pointed out, we already copy the message on dispatch to consumers. The test you wrote does fail without your changes but ONLY because you are only comparing the String instances to see if they are the same, which they won't be as you are using OpenWire. However, with calling copy() the String instances are the same as Strings are immutable. If you modified your test to compare instance equality for the TextMessage instances and not just the bodies the test would pass because we copy on dispatch. My guess is the real issue could be a race condition during the copy or when the body is converted between states. The messages themselves are generally not thread safe which is why we copy on dispatch so each consumer gets their own copy, but some messages can toggle state..they can convert transparently between marshaled body and the in memory usable body (same with message properties). @pradeep85841 - Are you only using Text messages? So for a text message, the message will either store the body as a String or it will store it as a buffer and can switch. My guess is things are breaking during that switch as multiple threads are probably calling some of these methods concurrently: This might apply to other message types as well but this was the most noticeable type because it tries to only store either the String or byte buffer to save space. Going back a long time ago, I actually made some changes here to try and prevent issues without adding synchronization but it's not perfect: https://issues.apache.org/jira/browse/AMQ-5857 So we just need to figure out the exact issue, if it's the copy() that is being done on dispatch (maybe multiple connections are calling copy at the same time on dispatch) we may just need to add some sort of synchronization there. But we need ot identify the real cause, a good first step might be to just test what happens if you apply the "synchronized" key word on the copy() method or also on the other methods that due mutations like getText(), setText(), storeContent() etc and re-run things to see if the issue is gone to help narrow it down to that. |
|
@cshannon, Thank you for the insight. That makes perfect sense—the issue likely isn't the lack of a copy, but a race condition during the lazy state-toggle within ActiveMQTextMessage when copy() or getText() is called concurrently. I am moving away from the marshaling approach. I'll test adding synchronization to the state-changing methods in ActiveMQTextMessage to see if that stabilizes the copy() process during VM dispatch. I'll update the PR once I verify this identifies the root cause |
ba1b109 to
62735ee
Compare
|
I have refactored the fix to address the root cause at the command level rather than the transport level, as suggested. The Issue: Under high concurrency, ActiveMQTextMessage suffered from a race condition where the internal state "toggle" (moving between the text field and the ByteSequence content) was not atomic. This led to a "double null" state where one thread cleared the unmarshalled text before the other had finished reading the raw content. The Fix: Thread Safety: Added synchronized to getText(), setText(), and state-toggling methods in ActiveMQTextMessage. State Preservation: Updated clearUnMarshalledState() to ensure that the marshalled content is stored before the unmarshalled state is cleared. Performance: This approach preserves the efficiency of the VM Transport by avoiding a full transport-level marshal, while ensuring data integrity. Validation: Added ActiveMQTextMessageStressTest.java, which reproduces the null body issue and confirms the fix under high thread contention. |
jeanouii
left a comment
There was a problem hiding this comment.
Looks good.
Synchronized is good enough and it's a perf killer only on vm transport. Other transports would marshall/unmarshall and would use a different instance. On vm, it creates contentions but that's way better than messing up with the data. Using RentrantReadWriteLock would not improve the things because all methods basically mutate under the cover.
| private void copy(ActiveMQTextMessage copy) { | ||
| super.copy(copy); | ||
| copy.text = text; | ||
| protected void copy(ActiveMQTextMessage copy) { |
There was a problem hiding this comment.
The synchronized bellow on this is accurate because the other synchronized methods are also on this. All good
Wondering why the internal copy(ActiveMQTexteMessage) moved from private to protected?
I don't see subclasses or I missed it, so no reason to expose our internals
There was a problem hiding this comment.
It's a minor detail and question, but Chris would provide a way better feedback for you :-)
There was a problem hiding this comment.
@jeanouii That's a fair point regarding the visibility. I initially changed it to protected to follow the pattern of the parent ActiveMQMessage copy logic, but I agree that since there are no current subclasses of ActiveMQTextMessage, it's better to keep it encapsulated.
I'm going to wait for @cshannon's final feedback before pushing any more updates. This way, I can consolidate all requested changes—including reverting this visibility to private—into a single final push to keep the history clean. Thanks for catching that!
There was a problem hiding this comment.
I agree, leaving it private here makes sense unless we need to change it (we can always change it later)
Agreed, synchronized here is fine to use and as you pointed out we don't gain anything from a ReadWrite lock as we need a write lock for basically all methods. I wasn't around for the original implementation of classes but it looked like the goal was to avoid synchronization as messages were meant to be copied if used across threads. Unfortunately that pattern breaks with the VM transport as we see due to the nature of dispatching to multiple consumers. I don't think we see this issue with network bridges because there is only one consumer for the VM transports in bridges (the broker itself). The performance impact for single threaded applications using synchronized that have no contention is pretty minimal (there's technically some hit but unlikely to be noticeable) and even with contention as we can see correctness is more important. I'd rather the application take a couple extra ms to dispatch all the copies of the message than to receive a null body. I was looking more at AMQ-5857 and the discussion to try and remember why we didn't add synchronized back then when trying to clear memory to avoid storing the data twice and OOM errors and i assume we just didn't have good enough tests to show the real issue. Plus, it was like 10 years ago so I was new to ActiveMQ :) I haven't had time to fully review this yet but will either tomorrow or Monday and will post more feedback. |
cshannon
left a comment
There was a problem hiding this comment.
Overall this looks pretty good, I made a few comments in line. The test nicely demonstrates the race condition issue and verifies the fix so good work with that.
After reviewing this I have a couple of thoughts:
- I am thinking we probably need to add synchronized key words to the other methods in ActiveMQTextMessage. I think
clearBody()probably needs it, but I'm also wondering aboutisContentMarshalled(). The other option is to mark the variables as volatile but if we are using synchronized we don't need to do that of course. I'm not entirely sure we need to mark every method as the goal here is to copy the messages and eventually each thread (consumer, etc) would have their own copy so at that point the synchronization isn't a big deal as long as during the copy we are protected. But it's probably best to play it safe and go all the way and finish marking those other methods synchronized or use volatile so the changes are visible. - The other thought I had was the change here should really be applied to other message types as well, if relevant. For example, Map messages will have the same issue. So for consistency I think we need to fix all the messages. We could either fix them as part of this issue or crate a follow on Jira/Issue to fix the others. I don't really care as long as we track it and don't forget. I looked just at Map message but the other types need to be analyzed closer to see what if anything needs fixing.
| // Consumers | ||
| List<MessageConsumer> consumers = new ArrayList<>(); | ||
| List<Session> consumerSessions = new ArrayList<>(); | ||
| for (int i = 1; i <= DURABLE_CONSUMERS; i++) { |
There was a problem hiding this comment.
DURABLE_CONSUMERS is always 0, so this should either be changed or removed.
|
|
||
| @After | ||
| public void tearDown() throws Exception { | ||
| if (connection != null) connection.close(); |
There was a problem hiding this comment.
Nitpicking but you should use braces even for one line statements.
| producer.send(msg); | ||
| } | ||
| } catch (JMSException e) { | ||
| e.printStackTrace(); |
There was a problem hiding this comment.
I wouldn't use print stacktrace here, the exception should be handled better. We should either fail the test or in this case maybe just log it at debug level if we don't care.
| })); | ||
| } | ||
|
|
||
| // Wait for producers and consumers |
There was a problem hiding this comment.
Indentation here is off and needs to be fixed
| allConsumed.add(f.get(30, TimeUnit.SECONDS)); | ||
| } | ||
|
|
||
| // VALIDATION LOGIC |
There was a problem hiding this comment.
Indentation here is off and needs to be fixed
| private void copy(ActiveMQTextMessage copy) { | ||
| super.copy(copy); | ||
| copy.text = text; | ||
| protected void copy(ActiveMQTextMessage copy) { |
There was a problem hiding this comment.
I agree, leaving it private here makes sense unless we need to change it (we can always change it later)
|
@pradeep85841 - One more thing, did you re-run your original setup with the new fixes here? Did you verify that the synchronization actually fixes your issue in a real data flow? The test of course tries to simulate the issue but we are still not exactly sure where the race is happening (but of course thing it has to do with copying or something on dispatch). Anyways it would just be good to verify your original issue is indeed fixed by this (I think it will be) |
|
The only other thing that I keep thinking is if we are going to go ahead and add synchronization here it would be nice to better understand why the issue happens at all. Just thinking about it more there really shouldn't be anything else touching the message when it is copied during dispatch and then each consumer should have its own copy so it makes me wonder if there's something else weird about camel or something that is triggering multiple threads to touch the message at the same time. Clients also have the ability to disable copying during send which could be an issue if set. Mostly I just want to make sure that we are solving an issue with the actual broker and not a weird setup that is causing it. |
… transports + unit test
62735ee to
fec3542
Compare
|
I have updated to address the feedback from @cshannon and @jeanouii. This revision ensures total thread safety for the VM Transport "defensive copy" logic while adhering to the project's coding standards and memory-management goals. Key Changes: Full Synchronization: Per the suggestion to "go all the way," I have applied the synchronized keyword to all state-dependent methods in ActiveMQTextMessage. This includes getText(), setText(), getSize(), clearBody(), and clearUnMarshalledState(). Data Integrity: In clearUnMarshalledState(), I ensured that storeContent() is called (if bytes are missing) before the local text reference is nulled. This prevents the "double-null" scenario where a message could lose its content entirely during a race. Encapsulation: Reverted the copy(ActiveMQTextMessage) helper method to private visibility. Stress Test Improvements: * Replaced printStackTrace with SLF4J logging. Added proper braces {} to all conditional blocks. Refined the concurrency logic using an ExecutorService and Future validation to simulate high-pressure dispatching across multiple durable and non-durable subscribers. Optimized the test payload and iteration count to verify the fix under high contention without triggering OutOfMemoryError on restricted heap environments. Verification: The updated ActiveMQTextMessageStressTest confirms that even when multiple consumers hammer getText() and clearUnMarshalledState() concurrently on messages dispatched via the VM transport, the data remains intact and instances remain independent. The fix has been verified in original environment and the test suite is now passing with a clean exit. |
Updated VMTransport.java doDispatch() to create a defensive copy of ActiveMQMessage to prevent shared message mutation.
Verified locally with a helper: original message body remains unchanged and copy is correctly dispatched.
No new files added; only VMTransport.java modified.