Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
*.db
/data/*
flamegraph.svg
tracing*
tracing*
lcov.info
54 changes: 28 additions & 26 deletions crates/hotfix/src/message/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const SENDING_TIME_THRESHOLD: u64 = 120;
pub(crate) fn verify_message(
message: &Message,
config: &SessionConfig,
expected_seq_number: u64,
expected_seq_number: Option<u64>,
) -> Result<(), MessageVerificationError> {
check_begin_string(message, config.begin_string.as_str())?;
let actual_seq_number: u64 = message.header().get(fix44::MSG_SEQ_NUM).unwrap_or_default();
Expand All @@ -33,7 +33,9 @@ pub(crate) fn verify_message(
check_original_sending_time(message, actual_seq_number, sending_time)?;
}

check_sequence_number(actual_seq_number, expected_seq_number, possible_duplicate)?;
if let Some(expected_seq_number) = expected_seq_number {
check_sequence_number(actual_seq_number, expected_seq_number, possible_duplicate)?;
}

Ok(())
}
Expand Down Expand Up @@ -217,7 +219,7 @@ mod tests {
let config = build_test_config();
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 42);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(result.is_ok());
}
Expand All @@ -227,7 +229,7 @@ mod tests {
let config = build_test_config();
let msg = build_test_message("FIX.4.2", "TARGET", "SENDER", 42);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -243,7 +245,7 @@ mod tests {
let config = build_test_config();
let msg = build_test_message("FIX.4.4", "WRONG_SENDER", "SENDER", 42);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -269,7 +271,7 @@ mod tests {
let config = build_test_config();
let msg = build_test_message("FIX.4.4", "TARGET", "WRONG_TARGET", 42);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -295,7 +297,7 @@ mod tests {
let config = build_test_config();
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 40);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -321,7 +323,7 @@ mod tests {
msg.header_mut()
.set(fix44::ORIG_SENDING_TIME, Timestamp::utc_now());

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -344,7 +346,7 @@ mod tests {
let config = build_test_config();
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 50);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -363,7 +365,7 @@ mod tests {
msg.header_mut().set(fix44::POSS_DUP_FLAG, true);
// Don't set OrigSendingTime

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -388,7 +390,7 @@ mod tests {
msg.header_mut().pop(fix44::SENDING_TIME);
msg.header_mut().set(fix44::SENDING_TIME, sending_time);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(result.is_ok());
}
Expand All @@ -407,7 +409,7 @@ mod tests {
msg.header_mut().pop(fix44::SENDING_TIME);
msg.header_mut().set(fix44::SENDING_TIME, sending_time);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand Down Expand Up @@ -437,7 +439,7 @@ mod tests {
msg.header_mut().pop(fix44::SENDING_TIME);
msg.header_mut().set(fix44::SENDING_TIME, timestamp);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

// equal timestamps should be valid (orig <= sending)
assert!(result.is_ok());
Expand All @@ -455,7 +457,7 @@ mod tests {
// remove begin string, which is automatically added by `Message::new`
msg.header_mut().pop(fix44::BEGIN_STRING);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -471,7 +473,7 @@ mod tests {
msg.set(fix44::MSG_SEQ_NUM, 42u64);
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -490,7 +492,7 @@ mod tests {
msg.set(fix44::MSG_SEQ_NUM, 42u64);
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -509,7 +511,7 @@ mod tests {
msg.set(fix44::TARGET_COMP_ID, "SENDER");
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

// missing seq num defaults to 0, which will be too low
assert!(matches!(
Expand All @@ -523,7 +525,7 @@ mod tests {
let config = build_test_config();
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 0);

let result = verify_message(&msg, &config, 1);
let result = verify_message(&msg, &config, Some(1));

assert!(matches!(
result,
Expand All @@ -536,7 +538,7 @@ mod tests {
let config = build_test_config();
let msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 1);

let result = verify_message(&msg, &config, 1);
let result = verify_message(&msg, &config, Some(1));

assert!(result.is_ok());
}
Expand All @@ -547,7 +549,7 @@ mod tests {
// wrong begin string AND wrong seq num - begin string error should come first
let msg = build_test_message("FIX.4.2", "TARGET", "SENDER", 100);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -561,7 +563,7 @@ mod tests {
// wrong sender and wrong target - sender error should come first
let msg = build_test_message("FIX.4.4", "WRONG_SENDER", "WRONG_TARGET", 42);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -580,7 +582,7 @@ mod tests {
msg.set(fix44::TARGET_COMP_ID, "SENDER");
msg.set(fix44::MSG_SEQ_NUM, 42u64);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -607,7 +609,7 @@ mod tests {
let past_timestamp: Timestamp = past_time.naive_utc().into();
msg.set(fix44::SENDING_TIME, past_timestamp);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -634,7 +636,7 @@ mod tests {
let future_timestamp: Timestamp = future_time.naive_utc().into();
msg.set(fix44::SENDING_TIME, future_timestamp);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(matches!(
result,
Expand All @@ -661,7 +663,7 @@ mod tests {
let boundary_timestamp: Timestamp = boundary_time.naive_utc().into();
msg.set(fix44::SENDING_TIME, boundary_timestamp);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(result.is_ok());
}
Expand All @@ -682,7 +684,7 @@ mod tests {
let valid_timestamp: Timestamp = valid_time.naive_utc().into();
msg.set(fix44::SENDING_TIME, valid_timestamp);

let result = verify_message(&msg, &config, 42);
let result = verify_message(&msg, &config, Some(42));

assert!(result.is_ok());
}
Expand Down
62 changes: 51 additions & 11 deletions crates/hotfix/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
let message_type = message.header().get(fix44::MSG_TYPE)?;

if let SessionState::AwaitingResend(state) = &mut self.state {
// TODO: consider what messages won't have a sequence number?
// e.g. SequenceReset?
let seq_number: u64 = message
.header()
.get(fix44::MSG_SEQ_NUM)
Expand Down Expand Up @@ -216,7 +214,7 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
}

async fn process_app_message(&mut self, message: &Message) -> Result<()> {
match self.verify_message(message) {
match self.verify_message(message, true) {
Ok(_) => {
let parsed_message = M::parse(message);
if matches!(
Expand Down Expand Up @@ -268,8 +266,14 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
fn verify_message(
&self,
message: &Message,
verify_target_seq_number: bool,
) -> std::result::Result<(), MessageVerificationError> {
verify_message(message, &self.config, self.store.next_target_seq_number())
let expected_seq_number = if verify_target_seq_number {
Some(self.store.next_target_seq_number())
} else {
None
};
verify_message(message, &self.config, expected_seq_number)
}

async fn on_connect(&mut self, writer: WriterRef) {
Expand Down Expand Up @@ -300,9 +304,8 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
}

async fn on_logon(&mut self, message: &Message) -> Result<()> {
// TODO: this should wait to see if a resend request is sent
if let SessionState::AwaitingLogon { writer, .. } = &self.state {
match self.verify_message(message) {
match self.verify_message(message, true) {
Ok(_) => {
// happy logon flow, the session is now active
self.state =
Expand Down Expand Up @@ -433,13 +436,50 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
}

async fn on_sequence_reset(&mut self, message: &Message) -> Result<()> {
let gap_fill: bool = message.get(fix44::GAP_FILL_FLAG).unwrap();
if !gap_fill {
// TODO: non gap fill is valid as well of course, but I don't yet know the use-case for it is
panic!("expected sequence reset with gap fill");
let msg_seq_num = message
.header()
.get(fix44::MSG_SEQ_NUM)
.map_err(|_| anyhow!("failed to get seq number"))?;
let is_gap_fill: bool = message.get(fix44::GAP_FILL_FLAG).unwrap_or(false);
if let Err(err) = self.verify_message(message, is_gap_fill) {
self.handle_verification_error(err).await;
return Ok(());
}

let end: u64 = match message.get(fix44::NEW_SEQ_NO) {
Ok(new_seq_no) => new_seq_no,
Err(err) => {
error!(
"received sequence reset message without new sequence number: {:?}",
err
);
let reject = Reject::new(msg_seq_num)
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
.text("missing NewSeqNo tag in sequence reset message");
self.send_message(reject).await;

// note: we don't increment the target seq number here
// this is an ambiguous case in the specification, but leaving the
// sequence number as is feels the safest
return Ok(());
}
};

// sequence resets cannot move the target seq number backwards
// regardless of whether the message is a gap fill or not
if end <= self.store.next_target_seq_number() {
error!(
"received sequence reset message which would move target seq number backwards: {end}",
);
let text =
format!("attempt to lower sequence number, invalid value NewSeqNo(36)={end}");
let reject = Reject::new(msg_seq_num)
.session_reject_reason(SessionRejectReason::ValueIsIncorrect)
.text(&text);
self.send_message(reject).await;
return Ok(());
}

let end: u64 = message.get(fix44::NEW_SEQ_NO).unwrap();
self.store.set_target_seq_number(end - 1).await
}

Expand Down
10 changes: 9 additions & 1 deletion crates/hotfix/tests/common/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,15 @@ impl When<&mut FakeCounterparty<TestMessage>> {
}

pub async fn sends_gap_fill(&mut self, start_seq_no: u64, new_seq_no: u64) {
self.target.send_gap_fill(start_seq_no, new_seq_no).await;
self.target
.send_sequence_reset(start_seq_no, new_seq_no, true)
.await;
}

pub async fn sends_sequence_reset(&mut self, start_seq_no: u64, new_seq_no: u64) {
self.target
.send_sequence_reset(start_seq_no, new_seq_no, false)
.await;
}

pub async fn sends_logon(&mut self) {
Expand Down
9 changes: 7 additions & 2 deletions crates/hotfix/tests/common/fakes/fake_counterparty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ where
}
}

pub async fn send_gap_fill(&mut self, start_seq_no: u64, new_seq_no: u64) {
pub async fn send_sequence_reset(
&mut self,
start_seq_no: u64,
new_seq_no: u64,
gap_fill: bool,
) {
let sequence_reset = SequenceReset {
gap_fill: true,
gap_fill,
new_seq_no,
};
let raw_message = generate_message(
Expand Down
11 changes: 11 additions & 0 deletions crates/hotfix/tests/common/test_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,14 @@ pub fn build_invalid_resend_request(

msg.encode(&Config::default()).unwrap()
}

pub fn build_sequence_reset_without_new_seq_no(msg_seq_num: u64) -> Vec<u8> {
let mut msg = Message::new("FIX.4.4", "4"); // MsgType 4 = SequenceReset
msg.set(fix44::SENDER_COMP_ID, COUNTERPARTY_COMP_ID);
msg.set(fix44::TARGET_COMP_ID, OUR_COMP_ID);
msg.set(fix44::MSG_SEQ_NUM, msg_seq_num);
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());
// Deliberately omit NEW_SEQ_NO to create an invalid SequenceReset

msg.encode(&Config::default()).unwrap()
}
Loading