Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions lib/optimizely/decision_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def get_variation(project_config, experiment_id, user_context, user_profile_trac

should_ignore_user_profile_service = decide_options.include? Optimizely::Decide::OptimizelyDecideOption::IGNORE_USER_PROFILE_SERVICE
# Check for saved bucketing decisions if decide_options do not include ignoreUserProfileService
unless should_ignore_user_profile_service && user_profile_tracker
# Skip UPS for CMAB experiments as they require dynamic decisions
is_cmab_experiment = experiment.key?('cmab')
unless should_ignore_user_profile_service || is_cmab_experiment || !user_profile_tracker
saved_variation_id, reasons_received = get_saved_variation_id(project_config, experiment_id, user_profile_tracker.user_profile)
decide_reasons.push(*reasons_received)
if saved_variation_id
Expand All @@ -111,6 +113,13 @@ def get_variation(project_config, experiment_id, user_context, user_profile_trac
end
end

# Add decision reason when UPS is skipped for CMAB
if is_cmab_experiment && !should_ignore_user_profile_service && user_profile_tracker
message = 'User Profile Service is not used for CMAB experiments.'
@logger.log(Logger::INFO, message)
decide_reasons.push(message)
end

# Check audience conditions
user_meets_audience_conditions, reasons_received = Audience.user_meets_audience_conditions?(project_config, experiment, user_context, @logger)
decide_reasons.push(*reasons_received)
Expand All @@ -132,8 +141,6 @@ def get_variation(project_config, experiment_id, user_context, user_profile_trac
return VariationResult.new(nil, true, decide_reasons, nil)
end

@logger.log(Logger::DEBUG, "Skipping user profile service for CMAB experiment '#{experiment_key}'. CMAB decisions are dynamic and not stored for sticky bucketing.")
should_ignore_user_profile_service = true
cmab_decision = cmab_decision_result.result
variation_id = cmab_decision&.variation_id
cmab_uuid = cmab_decision&.cmab_uuid
Expand All @@ -157,7 +164,8 @@ def get_variation(project_config, experiment_id, user_context, user_profile_trac
decide_reasons.push(message) if message

# Persist bucketing decision
user_profile_tracker.update_user_profile(experiment_id, variation_id) unless should_ignore_user_profile_service && user_profile_tracker
# Skip UPS for CMAB experiments as they require dynamic decisions
user_profile_tracker.update_user_profile(experiment_id, variation_id) unless should_ignore_user_profile_service || is_cmab_experiment || !user_profile_tracker
VariationResult.new(cmab_uuid, false, decide_reasons, variation_id)
end

Expand Down
52 changes: 13 additions & 39 deletions lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,35 +172,20 @@ def flush_queue!
return if @current_batch.empty?

log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
@logger.log(
Logger::INFO,
'Flushing Queue.'
)

retry_count = 0
max_retries = Optimizely::Helpers::Constants::EVENT_DISPATCH_CONFIG[:MAX_RETRIES]

while retry_count < max_retries
begin
@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
)
# Success - break out of retry loop
break
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
retry_count += 1

if retry_count < max_retries
delay = calculate_retry_interval(retry_count - 1)
@logger.log(Logger::DEBUG, "Retrying event dispatch (attempt #{retry_count + 1} of #{max_retries}) after #{delay}s")
sleep(delay)
end
end
begin
@logger.log(
Logger::INFO,
'Flushing Queue.'
)

@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end

@current_batch = []
end

Expand Down Expand Up @@ -246,16 +231,5 @@ def positive_number?(value)
# false otherwise.
Helpers::Validator.finite_number?(value) && value.positive?
end

# Calculate exponential backoff interval: 200ms, 400ms, 800ms, ... capped at 1s
#
# @param retry_count - Zero-based retry count
# @return [Float] - Delay in seconds
def calculate_retry_interval(retry_count)
initial_interval = Helpers::Constants::EVENT_DISPATCH_CONFIG[:INITIAL_RETRY_INTERVAL]
max_interval = Helpers::Constants::EVENT_DISPATCH_CONFIG[:MAX_RETRY_INTERVAL]
interval = initial_interval * (2**retry_count)
[interval, max_interval].min
end
end
end
9 changes: 2 additions & 7 deletions lib/optimizely/helpers/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,7 @@ module Constants
}.freeze

EVENT_DISPATCH_CONFIG = {
REQUEST_TIMEOUT: 10,
MAX_RETRIES: 3,
INITIAL_RETRY_INTERVAL: 0.2, # 200ms in seconds
MAX_RETRY_INTERVAL: 1.0 # 1 second
REQUEST_TIMEOUT: 10
}.freeze

ODP_GRAPHQL_API_CONFIG = {
Expand Down Expand Up @@ -493,9 +490,7 @@ module Constants
DEFAULT_QUEUE_CAPACITY: 10_000,
DEFAULT_BATCH_SIZE: 10,
DEFAULT_FLUSH_INTERVAL_SECONDS: 1,
DEFAULT_RETRY_COUNT: 3,
INITIAL_RETRY_INTERVAL: 0.2, # 200ms in seconds
MAX_RETRY_INTERVAL: 1.0 # 1 second
DEFAULT_RETRY_COUNT: 3
}.freeze

HTTP_HEADERS = {
Expand Down
18 changes: 1 addition & 17 deletions lib/optimizely/odp/odp_event_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,7 @@ def flush_batch!
end
break unless should_retry

if i < @retry_count - 1
# Exponential backoff: 200ms, 400ms, 800ms, ... capped at 1s
delay = calculate_retry_interval(i)
@logger.log(Logger::DEBUG, "Error dispatching ODP events, retrying (attempt #{i + 2} of #{@retry_count}) after #{delay}s")
sleep(delay)
end
@logger.log(Logger::DEBUG, 'Error dispatching ODP events, scheduled to retry.') if i < @retry_count
i += 1
end

Expand Down Expand Up @@ -287,16 +282,5 @@ def process_config_update
@api_key = @odp_config&.api_key
@api_host = @odp_config&.api_host
end

# Calculate exponential backoff interval: 200ms, 400ms, 800ms, ... capped at 1s
#
# @param retry_count - Zero-based retry count
# @return [Float] - Delay in seconds
def calculate_retry_interval(retry_count)
initial_interval = Helpers::Constants::ODP_EVENT_MANAGER[:INITIAL_RETRY_INTERVAL]
max_interval = Helpers::Constants::ODP_EVENT_MANAGER[:MAX_RETRY_INTERVAL]
interval = initial_interval * (2**retry_count)
[interval, max_interval].min
end
end
end
55 changes: 14 additions & 41 deletions spec/decision_service_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1167,8 +1167,8 @@
end
end

describe 'user profile service behavior' do
it 'should not save user profile for CMAB experiments' do
describe 'when user profile service is enabled' do
it 'should skip user profile service lookup and save for CMAB experiments' do
# Create a CMAB experiment configuration
cmab_experiment = {
'id' => '111150',
Expand All @@ -1189,8 +1189,9 @@
}
user_context = project_instance.create_user_context('test_user', {})

# Create a user profile tracker
user_profile_tracker = Optimizely::UserProfileTracker.new(user_context.user_id, spy_user_profile_service, spy_logger)
# Create user profile tracker
user_profile_service = Optimizely::UserProfileService.new
user_profile_tracker = Optimizely::UserProfileTracker.new('test_user', user_profile_service, spy_logger)

# Mock experiment lookup to return our CMAB experiment
allow(config).to receive(:get_experiment_from_id).with('111150').and_return(cmab_experiment)
Expand All @@ -1214,52 +1215,24 @@
.with('111150', '111151')
.and_return({'id' => '111151', 'key' => 'variation_1'})

# Spy on update_user_profile method
# Spy on user profile tracker methods
allow(user_profile_tracker).to receive(:update_user_profile).and_call_original
allow(decision_service).to receive(:get_saved_variation_id).and_call_original

# Call get_variation with the CMAB experiment and user profile tracker
variation_result = decision_service.get_variation(config, '111150', user_context, user_profile_tracker)

# Verify the variation and cmab_uuid are returned
expect(variation_result.variation_id).to eq('111151')
expect(variation_result.cmab_uuid).to eq('test-cmab-uuid-123')
expect(variation_result.error).to eq(false)

# Verify user profile was NOT updated for CMAB experiment
expect(user_profile_tracker).not_to have_received(:update_user_profile)

# Verify debug log was called to explain CMAB exclusion
expect(spy_logger).to have_received(:log).with(
Logger::DEBUG,
"Skipping user profile service for CMAB experiment 'cmab_experiment'. CMAB decisions are dynamic and not stored for sticky bucketing."
)
end

it 'should save user profile for standard (non-CMAB) experiments' do
# Use a standard (non-CMAB) experiment
config.get_experiment_from_key('test_experiment')
user_context = project_instance.create_user_context('test_user', {})

# Create a user profile tracker
user_profile_tracker = Optimizely::UserProfileTracker.new(user_context.user_id, spy_user_profile_service, spy_logger)

# Mock audience evaluation to pass
allow(Optimizely::Audience).to receive(:user_meets_audience_conditions?).and_return([true, []])

# Mock bucketer to return a variation
allow(decision_service.bucketer).to receive(:bucket)
.and_return([{'id' => '111129', 'key' => 'variation'}, []])

# Spy on update_user_profile method
allow(user_profile_tracker).to receive(:update_user_profile).and_call_original

# Call get_variation with standard experiment and user profile tracker
variation_result = decision_service.get_variation(config, '111127', user_context, user_profile_tracker)
# Verify UPS lookup was NOT called for CMAB
expect(decision_service).not_to have_received(:get_saved_variation_id)

# Verify variation was returned
expect(variation_result.variation_id).to eq('111129')
# Verify UPS save was NOT called for CMAB
expect(user_profile_tracker).not_to have_received(:update_user_profile)

# Verify user profile WAS updated for standard experiment
expect(user_profile_tracker).to have_received(:update_user_profile).with('111127', '111129')
# Verify the UPS exclusion message is in reasons
expect(variation_result.reasons).to include('User Profile Service is not used for CMAB experiments.')
end
end
end
Expand Down
93 changes: 1 addition & 92 deletions spec/event/batch_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,9 @@
@event_processor.flush
# Wait until other thread has processed the event.
sleep 0.1 until @event_processor.current_batch.empty?
sleep 0.7 # Wait for retries to complete (200ms + 400ms + processing time)

expect(@notification_center).not_to have_received(:send_notifications)
# With retries, error will be logged 3 times (once per attempt)
expect(spy_logger).to have_received(:log).exactly(3).times.with(
expect(spy_logger).to have_received(:log).once.with(
Logger::ERROR,
"Error dispatching event: #{log_event} Timeout::Error."
)
Expand Down Expand Up @@ -379,93 +377,4 @@
expect(@event_processor.event_queue.length).to eq(0)
expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Executor shutdown, not accepting tasks.').once
end

context 'retry logic with exponential backoff' do
it 'should retry on dispatch errors with exponential backoff' do
@event_processor = Optimizely::BatchEventProcessor.new(
event_dispatcher: @event_dispatcher,
batch_size: 1,
flush_interval: 10_000,
logger: spy_logger
)

user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
log_event = Optimizely::EventFactory.create_log_event(user_event, spy_logger)

# Simulate dispatch failure twice, then success
call_count = 0
allow(@event_dispatcher).to receive(:dispatch_event) do
call_count += 1
raise StandardError, 'Network error' if call_count < 3
end

start_time = Time.now
@event_processor.process(user_event)

# Wait for processing to complete
sleep 0.1 until @event_processor.event_queue.empty?
sleep 0.7 # Wait for retries to complete (200ms + 400ms + processing time)

elapsed_time = Time.now - start_time

# Should make 3 attempts total (1 initial + 2 retries)
expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).exactly(3).times

# Should have delays: 200ms + 400ms = 600ms minimum
expect(elapsed_time).to be >= 0.6

# Should log retry attempts
expect(spy_logger).to have_received(:log).with(
Logger::DEBUG, /Retrying event dispatch/
).at_least(:twice)
end

it 'should give up after max retries' do
@event_processor = Optimizely::BatchEventProcessor.new(
event_dispatcher: @event_dispatcher,
batch_size: 1,
flush_interval: 10_000,
logger: spy_logger
)

user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
log_event = Optimizely::EventFactory.create_log_event(user_event, spy_logger)

# Simulate dispatch failure every time
allow(@event_dispatcher).to receive(:dispatch_event).and_raise(StandardError, 'Network error')

@event_processor.process(user_event)

# Wait for processing to complete
sleep 0.1 until @event_processor.event_queue.empty?
sleep 0.7 # Wait for all retries to complete

# Should make 3 attempts total (1 initial + 2 retries)
expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).exactly(3).times

# Should log error for each attempt
expect(spy_logger).to have_received(:log).with(
Logger::ERROR, /Error dispatching event/
).exactly(3).times
end

it 'should calculate correct exponential backoff intervals' do
processor = Optimizely::BatchEventProcessor.new

# First retry: 200ms
expect(processor.send(:calculate_retry_interval, 0)).to eq(0.2)

# Second retry: 400ms
expect(processor.send(:calculate_retry_interval, 1)).to eq(0.4)

# Third retry: 800ms
expect(processor.send(:calculate_retry_interval, 2)).to eq(0.8)

# Fourth retry: capped at 1s
expect(processor.send(:calculate_retry_interval, 3)).to eq(1.0)

# Fifth retry: still capped at 1s
expect(processor.send(:calculate_retry_interval, 4)).to eq(1.0)
end
end
end
Loading
Loading