Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/hotfix/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ fn default_logon_timeout() -> u64 {
10
}

fn default_logout_timeout() -> u64 {
2
}

/// The configuration of a single FIX session.
#[derive(Clone, Debug, Deserialize)]
pub struct SessionConfig {
Expand Down Expand Up @@ -87,6 +91,10 @@ pub struct SessionConfig {
#[serde(default = "default_logon_timeout")]
pub logon_timeout: u64,

/// The time we wait in seconds for Logon responses before timing out.
#[serde(default = "default_logout_timeout")]
pub logout_timeout: u64,

/// The interval we should attempt to reconnect at in seconds.
#[serde(default = "default_reconnect_interval")]
pub reconnect_interval: u64,
Expand Down
1 change: 1 addition & 0 deletions crates/hotfix/src/message/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ mod tests {
tls_config: None,
heartbeat_interval: 0,
logon_timeout: 0,
logout_timeout: 0,
reconnect_interval: 0,
reset_on_logon: false,
schedule: None,
Expand Down
24 changes: 14 additions & 10 deletions crates/hotfix/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,8 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
SessionState::Disconnected { .. } => {
warn!("disconnect message was received, but the session is already disconnected")
}
SessionState::AwaitingLogout { .. } => {
// this is unexpected because the other side should send a logout before disconnecting,
// which would move this session out of the ShuttingDown state
// TODO: is this actually true? need to review the spec carefully
warn!("disconnect message was received, but the session is still shutting down")
SessionState::AwaitingLogout { reconnect, .. } => {
self.state = SessionState::new_disconnected(reconnect, &reason);
}
}
}
Expand Down Expand Up @@ -782,8 +779,11 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
self.state.disconnect_writer().await;
}

async fn initiate_graceful_logout(&mut self, reason: &str) {
if self.state.try_transition_to_awaiting_logout() {
async fn initiate_graceful_logout(&mut self, reason: &str, reconnect: bool) {
if self.state.try_transition_to_awaiting_logout(
Duration::from_secs(self.config.logout_timeout),
reconnect,
) {
self.send_logout(reason).await;
}
}
Expand Down Expand Up @@ -826,8 +826,8 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
match request {
AdminRequest::InitiateGracefulShutdown { reconnect } => {
warn!("initiating shutdown on request from admin..");
self.logout_and_terminate("shutdown requested").await;
self.state = SessionState::new_disconnected(reconnect, "shutdown requested");
self.initiate_graceful_logout("explicitly requested", reconnect)
.await;
}
AdminRequest::RequestSessionInfo(responder) => {
info!("session info requested");
Expand All @@ -853,6 +853,9 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
} else if self.state.is_awaiting_logon() {
warn!("peer didn't respond to our Logon, disconnecting..");
self.state.disconnect_writer().await;
} else if self.state.is_awaiting_logout() {
warn!("peer didn't respond to our Logout, disconnecting..");
self.state.disconnect_writer().await;
} else {
let req_id = format!("TEST_{}", self.store.next_target_seq_number());
info!("sending TestRequest due to peer timer expiring");
Expand Down Expand Up @@ -892,7 +895,8 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
}
} else if self.state.is_connected() {
// we are currently outside scheduled session time
self.initiate_graceful_logout("End of session time").await;
self.initiate_graceful_logout("End of session time", true)
.await;
}

// we always need to reschedule the check, otherwise we won't be able to resume an inactive session
Expand Down
27 changes: 22 additions & 5 deletions crates/hotfix/src/session/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ pub enum SessionState {
/// We are awaiting the target to resend the gap we have.
AwaitingResend(AwaitingResendState),
/// We are in the process of gracefully logging out
AwaitingLogout { writer: WriterRef }, // we need the writer so we can disconnect it on successful logout
AwaitingLogout {
writer: WriterRef, // we need the writer so we can disconnect it on successful logout
logout_timeout: Instant,
reconnect: bool, // we carry this forward for the subsequent disconnected state
},
/// The session is active, we have connected and mutually logged on.
Active(ActiveState),
/// The TCP connection has been dropped.
Expand Down Expand Up @@ -87,7 +91,7 @@ impl SessionState {
_ => error!("invalid outgoing message for AwaitingLogon state"),
}
}
Self::AwaitingLogout { writer } => {
Self::AwaitingLogout { writer, .. } => {
// Logout messages are allowed because we first transition into AwaitingLogout
// and only then send the logout message
if message_type == b"5" {
Expand All @@ -102,7 +106,7 @@ impl SessionState {
match self {
Self::Active(ActiveState { writer, .. })
| Self::AwaitingLogon { writer, .. }
| Self::AwaitingLogout { writer }
| Self::AwaitingLogout { writer, .. }
| Self::AwaitingResend(AwaitingResendState { writer, .. }) => writer.disconnect().await,
_ => debug!("disconnecting an already disconnected session"),
}
Expand All @@ -112,13 +116,17 @@ impl SessionState {
match self {
Self::Active(ActiveState { writer, .. })
| Self::AwaitingLogon { writer, .. }
| Self::AwaitingLogout { writer }
| Self::AwaitingLogout { writer, .. }
| Self::AwaitingResend(AwaitingResendState { writer, .. }) => Some(writer),
_ => None,
}
}

pub fn try_transition_to_awaiting_logout(&mut self) -> bool {
pub fn try_transition_to_awaiting_logout(
&mut self,
logout_timeout: Duration,
reconnect: bool,
) -> bool {
if matches!(self, SessionState::AwaitingLogout { .. }) {
debug!("already in awaiting logout state");
return false;
Expand All @@ -127,6 +135,8 @@ impl SessionState {
if let Some(writer) = self.get_writer() {
*self = SessionState::AwaitingLogout {
writer: writer.clone(),
logout_timeout: Instant::now() + logout_timeout,
reconnect,
};
true
} else {
Expand Down Expand Up @@ -220,6 +230,7 @@ impl SessionState {
match self {
Self::Active(ActiveState { peer_deadline, .. }) => Some(peer_deadline),
Self::AwaitingLogon { logon_timeout, .. } => Some(logon_timeout),
Self::AwaitingLogout { logout_timeout, .. } => Some(logout_timeout),
_ => None,
}
}
Expand Down Expand Up @@ -268,6 +279,10 @@ impl SessionState {
matches!(self, SessionState::AwaitingLogon { .. })
}

pub fn is_awaiting_logout(&self) -> bool {
matches!(self, SessionState::AwaitingLogout { .. })
}

pub fn as_status(&self) -> SessionInfoStatus {
match self {
SessionState::AwaitingLogon { .. } => SessionInfoStatus::AwaitingLogon,
Expand Down Expand Up @@ -427,6 +442,8 @@ mod tests {
fn test_awaiting_resend_transition_when_awaiting_logout_is_prevented() {
let mut state = SessionState::AwaitingLogout {
writer: create_writer_ref(),
logout_timeout: Instant::now(),
reconnect: false,
};

let result = state.try_transition_to_awaiting_resend(1, 5);
Expand Down
40 changes: 40 additions & 0 deletions crates/hotfix/tests/common/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::common::assertions::{DEFAULT_TIMEOUT, assert_msg_type};
use crate::common::fakes::{FakeCounterparty, SessionSpy};
use crate::common::test_messages::TestMessage;
use hotfix::message::logout::Logout;
use hotfix_message::fix44::MsgType;

pub struct Finally<'a> {
session: &'a SessionSpy,
counterparty: &'a mut FakeCounterparty<TestMessage>,
}

pub fn finally<'a>(
session: &'a SessionSpy,
counterparty: &'a mut FakeCounterparty<TestMessage>,
) -> Finally<'a> {
Finally {
session,
counterparty,
}
}

impl<'a> Finally<'a> {
pub async fn disconnect(self) {
// initiate disconnect from our side
self.session.session_handle().shutdown(false).await.unwrap();

// counterparty receives our logout message
self.counterparty
.assert_next_with_timeout(|msg| assert_msg_type(msg, MsgType::Logout), DEFAULT_TIMEOUT)
.await;

// counterparty responds with logout acknowledgement
self.counterparty.send_message(Logout::default()).await;

// verify disconnection occurs
self.counterparty
.assert_disconnected_with_timeout(DEFAULT_TIMEOUT)
.await;
}
}
1 change: 1 addition & 0 deletions crates/hotfix/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod actions;
pub mod assertions;
pub mod cleanup;
pub mod fakes;
pub mod setup;
pub mod test_messages;
2 changes: 2 additions & 0 deletions crates/hotfix/tests/common/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use hotfix_message::fix44::MSG_TYPE;

pub const HEARTBEAT_INTERVAL: u64 = 30;
pub const LOGON_TIMEOUT: u64 = 10;
pub const LOGOUT_TIMEOUT: u64 = 2;

pub const COUNTERPARTY_COMP_ID: &str = "dummy-acceptor";
pub const OUR_COMP_ID: &str = "dummy-initiator";
Expand Down Expand Up @@ -58,6 +59,7 @@ pub fn create_session_config() -> SessionConfig {
tls_config: None,
heartbeat_interval: HEARTBEAT_INTERVAL,
logon_timeout: LOGON_TIMEOUT,
logout_timeout: LOGOUT_TIMEOUT,
reconnect_interval: 30,
reset_on_logon: false,
schedule: None,
Expand Down
7 changes: 3 additions & 4 deletions crates/hotfix/tests/session_test_cases/admin_request_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::common::actions::when;
use crate::common::assertions::{assert_msg_type, then};
use crate::common::cleanup::finally;
use crate::common::setup::given_an_active_session;
use hotfix::session::Status;
use hotfix_message::Part;
Expand Down Expand Up @@ -40,8 +41,7 @@ async fn test_reset_sequence_numbers_once() {
.expect("reset request to succeed");

// the counterparty is disconnected
when(&session).requests_disconnect().await;
then(&mut counterparty).gets_disconnected().await;
finally(&session, &mut counterparty).disconnect().await;

// a new connection is established to the counterparty
when(&mut counterparty).gets_reconnected(true).await;
Expand Down Expand Up @@ -70,6 +70,5 @@ async fn test_reset_sequence_numbers_once() {
"target sequence number should be 2 (after receiving logon)"
);

when(&session).requests_disconnect().await;
then(&mut counterparty).gets_disconnected().await;
finally(&session, &mut counterparty).disconnect().await;
}
4 changes: 2 additions & 2 deletions crates/hotfix/tests/session_test_cases/business_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::common::actions::when;
use crate::common::assertions::then;
use crate::common::cleanup::finally;
use crate::common::setup::given_an_active_session;
use crate::common::test_messages::TestMessage;
use hotfix::message::FixMessage;
Expand Down Expand Up @@ -27,6 +28,5 @@ async fn test_new_order_single() {
.receives(|msg| assert_eq!(msg.message_type(), MsgType::ExecutionReport.to_string()))
.await;

when(&session).requests_disconnect().await;
then(&mut counterparty).gets_disconnected().await;
finally(&session, &mut counterparty).disconnect().await;
}
7 changes: 3 additions & 4 deletions crates/hotfix/tests/session_test_cases/heartbeat_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::common::actions::when;
use crate::common::assertions::{assert_msg_type, then};
use crate::common::cleanup::finally;
use crate::common::setup::{HEARTBEAT_INTERVAL, given_an_active_session};
use hotfix::message::test_request::TestRequest;
use hotfix_message::Part;
Expand Down Expand Up @@ -27,8 +28,7 @@ async fn test_heartbeats() {
.receives(|msg| assert_msg_type(msg, MsgType::Heartbeat))
.await;

when(&session).requests_disconnect().await;
then(&mut counterparty).gets_disconnected().await;
finally(&session, &mut counterparty).disconnect().await;
}

/// Tests the peer timeout and disconnection mechanism:
Expand Down Expand Up @@ -83,6 +83,5 @@ async fn test_heartbeat_in_response_to_test_request() {
})
.await;

when(&session).requests_disconnect().await;
then(&mut counterparty).gets_disconnected().await;
finally(&session, &mut counterparty).disconnect().await;
}
Loading