From e53d3c383eca8955b3d762152afcf06116810b20 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 20:50:52 -0500 Subject: [PATCH 1/9] fix!: Big move to use kwargs as the initializer for classes which include us --- Gemfile.lock | 2 +- examples/echo_endpoint.rb | 6 +++--- lib/leopard/message_wrapper.rb | 1 + lib/leopard/nats_api_server.rb | 11 +++++++---- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 104afbd..d37b6b3 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - leopard (0.1.6) + leopard (0.1.7) concurrent-ruby (~> 1.1) dry-configurable (~> 1.3) dry-monads (~> 1.9) diff --git a/examples/echo_endpoint.rb b/examples/echo_endpoint.rb index 9774389..8f7c712 100755 --- a/examples/echo_endpoint.rb +++ b/examples/echo_endpoint.rb @@ -7,11 +7,11 @@ class EchoService include Rubyists::Leopard::NatsApiServer - def initialize(a_var = 1) + def initialize(a_var: 1) logger.info "EchoService initialized with a_var: #{a_var}" end - endpoint(:echo) { |msg| Success(msg.data) } + endpoint(:echo) { |msg| require 'pry'; binding.pry; Success(msg.data) } endpoint(:echo_fail) { |msg| Failure({ failure: '*boom*', data: msg.data }) } end @@ -21,7 +21,7 @@ def initialize(a_var = 1) service_opts: { name: 'example.echo', version: '1.0.0', - instance_args: [2], + instance_args: { a_var: 2 }, }, instances: 1, ) diff --git a/lib/leopard/message_wrapper.rb b/lib/leopard/message_wrapper.rb index e1638ff..40f24b8 100644 --- a/lib/leopard/message_wrapper.rb +++ b/lib/leopard/message_wrapper.rb @@ -26,6 +26,7 @@ def initialize(nats_msg) # # @return [void] def respond(payload) + raw.header = headers raw.respond(serialize(payload)) end diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index 582ccdc..f0a218b 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -98,6 +98,8 @@ def spawn_instances(url, opts, count, workers, blocking) pool = Concurrent::FixedThreadPool.new(count) @instance_args = opts.delete(:instance_args) || nil logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}" + raise ArgumentError, 'instance_args must be a Hash' if @instance_args && !@instance_args.is_a?(Hash) + count.times do pool.post { build_worker(url, opts, workers, blocking) } end @@ -112,12 +114,13 @@ def spawn_instances(url, opts, count, workers, blocking) # @param blocking [Boolean] If true, blocks the current thread until the worker is set up. # # @return [void] - def build_worker(url, opts, workers, blocking) - worker = @instance_args ? new(*@instance_args) : new + def build_worker(nats_url, service_opts, workers, blocking) + worker = @instance_args ? new(**@instance_args) : new workers << worker - return worker.setup_worker!(nats_url: url, service_opts: opts) if blocking + args = { nats_url:, service_opts: } + return worker.setup_worker!(**args) if blocking - worker.setup_worker(nats_url: url, service_opts: opts) + worker.setup_worker(**args) end # Shuts down the NATS API server gracefully. From 84cf037eae863c6637a65fced1030d75fd7ece7d Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 20:57:39 -0500 Subject: [PATCH 2/9] fix: Remove breakpoint (Thanks, Copilot) --- examples/echo_endpoint.rb | 2 +- lib/leopard/message_wrapper.rb | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/echo_endpoint.rb b/examples/echo_endpoint.rb index 8f7c712..8d605ce 100755 --- a/examples/echo_endpoint.rb +++ b/examples/echo_endpoint.rb @@ -11,7 +11,7 @@ def initialize(a_var: 1) logger.info "EchoService initialized with a_var: #{a_var}" end - endpoint(:echo) { |msg| require 'pry'; binding.pry; Success(msg.data) } + endpoint(:echo) { |msg| Success(msg.data) } endpoint(:echo_fail) { |msg| Failure({ failure: '*boom*', data: msg.data }) } end diff --git a/lib/leopard/message_wrapper.rb b/lib/leopard/message_wrapper.rb index 40f24b8..cc7a5f9 100644 --- a/lib/leopard/message_wrapper.rb +++ b/lib/leopard/message_wrapper.rb @@ -10,10 +10,11 @@ class MessageWrapper # # @!attribute [r] data # @return [Object] The parsed data from the NATS message. + attr_reader :raw, :data # - # @!attribute [r] headers + # @!attribute [w] headers # @return [Hash] The headers from the NATS message. - attr_reader :raw, :data, :headers + attr_accessor :headers # @param nats_msg [NATS::Message] The NATS message to wrap. def initialize(nats_msg) From 6dfe5d94774d99411c9bb785b0514859a8d763b8 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 21:11:08 -0500 Subject: [PATCH 3/9] fix: Do not set header if headers are empty --- lib/leopard/message_wrapper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/leopard/message_wrapper.rb b/lib/leopard/message_wrapper.rb index cc7a5f9..3f1279b 100644 --- a/lib/leopard/message_wrapper.rb +++ b/lib/leopard/message_wrapper.rb @@ -27,7 +27,7 @@ def initialize(nats_msg) # # @return [void] def respond(payload) - raw.header = headers + raw.header = headers unless headers.empty? raw.respond(serialize(payload)) end From 9457fe665111effb76cf99ea5141cafb79d726df Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 21:18:17 -0500 Subject: [PATCH 4/9] fix: Mocks are why we cannot have good things --- test/lib/message_wrapper.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/lib/message_wrapper.rb b/test/lib/message_wrapper.rb index d4c85c9..7c94d88 100644 --- a/test/lib/message_wrapper.rb +++ b/test/lib/message_wrapper.rb @@ -4,7 +4,8 @@ require 'leopard/message_wrapper' class FakeMsg - attr_reader :data, :header, :responded_payload, :error_args + attr_reader :data, :responded_payload, :error_args + attr_accessor :header def initialize(data, header = {}) @data = data From 40fb7b6069b07fed98707a27ebb3afa7ce8b4c95 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 22:58:24 -0500 Subject: [PATCH 5/9] fix: Corrects documentation for new method signature --- lib/leopard/nats_api_server.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/leopard/nats_api_server.rb b/lib/leopard/nats_api_server.rb index f0a218b..b165dd1 100644 --- a/lib/leopard/nats_api_server.rb +++ b/lib/leopard/nats_api_server.rb @@ -108,8 +108,8 @@ def spawn_instances(url, opts, count, workers, blocking) # Builds a worker instance and sets it up with the NATS server. # - # @param url [String] The URL of the NATS server. - # @param opts [Hash] Options for the NATS service. + # @param nats_url [String] The URL of the NATS server. + # @param service_opts [Hash] Options for the NATS service. # @param workers [Array] The array to store worker instances. # @param blocking [Boolean] If true, blocks the current thread until the worker is set up. # From f5087d7cedd3c3963e3b54288270be8f03619c1f Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 23:46:31 -0500 Subject: [PATCH 6/9] fix: Cleaner assignment with ||= --- lib/leopard/nats_api_client.rb | 339 +++++++++++++++++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 lib/leopard/nats_api_client.rb diff --git a/lib/leopard/nats_api_client.rb b/lib/leopard/nats_api_client.rb new file mode 100644 index 0000000..383cd7d --- /dev/null +++ b/lib/leopard/nats_api_client.rb @@ -0,0 +1,339 @@ +# frozen_string_literal: true + +require 'nats/client' +require 'dry/monads' +require 'dry/configurable' +require 'concurrent' +require_relative '../leopard' +require_relative 'message_wrapper' + +module Rubyists + module Leopard + module NatsApiServer + include Dry::Monads[:result] + extend Dry::Monads[:result] + + def self.included(base) + base.extend(ClassMethods) + base.include(InstanceMethods) + base.extend(Dry::Monads[:result]) + base.extend(Dry::Configurable) + base.setting :logger, default: Rubyists::Leopard.logger, reader: true + end + + Endpoint = Struct.new(:name, :subject, :queue, :group, :handler) + + module ClassMethods + def endpoints = @endpoints ||= [] + def groups = @groups ||= {} + def middleware = @middleware ||= [] + + # Define an endpoint for the NATS API server. + # + # @param name [String] The name of the endpoint. + # @param subject [String, nil] The NATS subject to listen on. Defaults to the endpoint name. + # @param queue [String, nil] The NATS queue group to use. Defaults to nil. + # @param group [String, nil] The group this endpoint belongs to. Defaults to nil. + # @param handler [Proc] The block that will handle incoming messages. + # + # @return [void] + def endpoint(name, subject: nil, queue: nil, group: nil, &handler) + subject ||= name + endpoints << Endpoint.new(name:, subject:, queue:, group:, handler:) + end + + # Define a group for organizing endpoints. + # + # @param name [String] The name of the group. + # @param group [String, nil] The parent group this group belongs to. Defaults to nil. + # @param queue [String, nil] The NATS queue group to use for this group. Defaults to nil. + # + # @return [void] + def group(name, group: nil, queue: nil) + groups[name] = { name:, parent: group, queue: } + end + + # Use a middleware class for processing messages. + # + # @param klass [Class] The middleware class to use. + # @param args [Array] Optional arguments to pass to the middleware class. + # @param block [Proc] Optional block to pass to the middleware class. + # + # @return [void] + def use(klass, *args, &block) + middleware << [klass, args, block] + end + + # Start the NATS API server. + # This method connects to the NATS server and spawns multiple instances of the API server. + # + # @param nats_url [String] The URL of the NATS server to connect to. + # @param service_opts [Hash] Options for the NATS service. + # @param instances [Integer] The number of instances to spawn. Defaults to 1. + # @param blocking [Boolean] If false, does not block current thread after starting the server. Defaults to true. + # + # @return [void] + def run(nats_url:, service_opts:, instances: 1, blocking: true) + logger.info 'Booting NATS API server...' + workers = Concurrent::Array.new + pool = spawn_instances(nats_url, service_opts, instances, workers, blocking) + logger.info 'Setting up signal trap...' + trap_signals(workers, pool) + return pool unless blocking + + sleep + end + + private + + # Spawns multiple instances of the NATS API server. + # + # @param url [String] The URL of the NATS server. + # @param opts [Hash] Options for the NATS service. + # @param count [Integer] The number of instances to spawn. + # @param workers [Array] The array to store worker instances. + # @param blocking [Boolean] If false, does not block current thread after starting the server. + # + # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + def spawn_instances(url, opts, count, workers, blocking) + pool = Concurrent::FixedThreadPool.new(count) + @instance_args = opts.delete(:instance_args) || nil + logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}" + raise ArgumentError, 'instance_args must be a Hash' if @instance_args && !@instance_args.is_a?(Hash) + + count.times do + pool.post { build_worker(url, opts, workers, blocking) } + end + pool + end + + # Builds a worker instance and sets it up with the NATS server. + # + # @param nats_url [String] The URL of the NATS server. + # @param service_opts [Hash] Options for the NATS service. + # @param workers [Array] The array to store worker instances. + # @param blocking [Boolean] If true, blocks the current thread until the worker is set up. + # + # @return [void] + def build_worker(nats_url, service_opts, workers, blocking) + worker = @instance_args ? new(**@instance_args) : new + workers << worker + args = { nats_url:, service_opts: } + return worker.setup_worker!(**args) if blocking + + worker.setup_worker(**args) + end + + # Shuts down the NATS API server gracefully. + # + # @param workers [Array] The array of worker instances to stop. + # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + # + # @return [Proc] A lambda that performs the shutdown operations. + def shutdown(workers, pool) + lambda do + logger.warn 'Draining worker subscriptions...' + workers.each(&:stop) + logger.warn 'All workers stopped, shutting down pool...' + pool.shutdown + logger.warn 'Pool is shut down, waiting for termination...' + pool.wait_for_termination + logger.warn 'Bye bye!' + wake_main_thread + end + end + + # Sets up signal traps for graceful shutdown of the NATS API server. + # + # @param workers [Array] The array of worker instances to stop on signal. + # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. + # + # @return [void] + def trap_signals(workers, pool) + return if @trapped + + %w[INT TERM QUIT].each do |sig| + trap(sig) do + logger.warn "Received #{sig} signal, shutting down..." + Thread.new { shutdown(workers, pool).call } + end + end + @trapped = true + end + + # Wakes up the main thread to allow it to continue execution after the server is stopped. + # This is useful when the server is running in a blocking mode. + # If the main thread is not blocked, this method does nothing. + # + # @return [void] + def wake_main_thread + Thread.main.wakeup + rescue ThreadError + nil + end + end + + module InstanceMethods + # Returns the logger configured for the NATS API server. + def logger = self.class.logger + + # Sets up a worker thread for the NATS API server. + # This method connects to the NATS server, adds the service, groups, and endpoints, + # + # @param url [String] The URL of the NATS server. + # @param opts [Hash] Options for the NATS service. + # @param eps [Array] The list of endpoints to add. + # @param gps [Hash] The groups to add. + # + # @return [void] + def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) + @thread = Thread.current + @client = NATS.connect nats_url + @service = @client.services.add(build_service_opts(service_opts:)) + gps = self.class.groups.dup + eps = self.class.endpoints.dup + group_map = add_groups(gps) + add_endpoints eps, group_map + end + + # Sets up a worker thread for the NATS API server and blocks the current thread. + # + # @see #setup_worker + def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) + setup_worker(nats_url:, service_opts:) + sleep + end + + # Stops the NATS API server worker. + def stop + @service&.stop + @client&.close + @thread&.wakeup + rescue ThreadError + nil + end + + private + + # Builds the service options for the NATS service. + # + # @param service_opts [Hash] Options for the NATS service. + # + # @return [Hash] The complete service options including name and version. + def build_service_opts(service_opts:) + { + name: self.class.name.split('::').join('.'), + version: '0.1.0', + }.merge(service_opts) + end + + # Adds groups to the NATS service. + # + # @param gps [Hash] The groups to add, where keys are group names and values are group definitions. + # + # @return [Hash] A map of group names to their created group objects. + def add_groups(gps) + created = {} + gps.each_key { |name| build_group(gps, created, name) } + created + end + + # Builds a group in the NATS service. + # + # @param defs [Hash] The group definitions, where keys are group names and values are group definitions. + # @param cache [Hash] A cache to store already created groups. + # @param name [String] The name of the group to build. + # + # @return [NATS::Group] The created group object. + def build_group(defs, cache, name) + return cache[name] if cache.key?(name) + + gdef = defs[name] + raise ArgumentError, "Group #{name} not defined" unless gdef + + parent = gdef[:parent] ? build_group(defs, cache, gdef[:parent]) : @service + cache[name] = parent.groups.add(gdef[:name], queue: gdef[:queue]) + end + + # Adds endpoints to the NATS service. + # + # @param endpoints [Array] The list of endpoints to add. + # @param group_map [Hash] A map of group names to their created group objects. + # + # @return [void] + def add_endpoints(endpoints, group_map) + endpoints.each do |ep| + grp = ep.group + parent = grp ? group_map[grp] : @service + raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil? + + build_endpoint(parent, ep) + end + end + + # Builds an endpoint in the NATS service. + # + # @param parent [NATS::Group] The parent group or service to add the endpoint to. + # @param ept [Endpoint] The endpoint definition containing name, subject, queue, and handler. + # NOTE: Named ept because `endpoint` is a DSL method we expose, to avoid confusion. + # + # @return [void] + def build_endpoint(parent, ept) + parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg| + wrapper = MessageWrapper.new(raw_msg) + dispatch_with_middleware(wrapper, ept.handler) + end + end + + # Dispatches a message through the middleware stack and handles it with the provided handler. + # + # @param wrapper [MessageWrapper] The message wrapper containing the raw message. + # @param handler [Proc] The handler to process the message. + # + # @return [void] + def dispatch_with_middleware(wrapper, handler) + app = ->(w) { handle_message(w.raw, handler) } + self.class.middleware.reverse_each do |(klass, args, blk)| + app = klass.new(app, *args, &blk) + end + app.call(wrapper) + end + + # Handles a raw NATS message using the provided handler. + # + # @param raw_msg [NATS::Message] The raw NATS message to handle. + # @param handler [Proc] The handler to process the message. + # + # @return [void] + def handle_message(raw_msg, handler) + wrapper = MessageWrapper.new(raw_msg) + result = instance_exec(wrapper, &handler) + process_result(wrapper, result) + rescue StandardError => e + logger.error 'Error processing message: ', e + wrapper.respond_with_error(e.message) + end + + # Processes the result of the handler execution. + # + # @param wrapper [MessageWrapper] The message wrapper containing the raw message. + # @param result [Dry::Monads::Result] The result of the handler execution. + # + # @return [void] + # @raise [ResultError] If the result is not a Success or Failure monad. + def process_result(wrapper, result) + case result + in Dry::Monads::Success + wrapper.respond(result.value!) + in Dry::Monads::Failure + logger.error 'Error processing message: ', result.failure + wrapper.respond_with_error(result.failure) + else + logger.error('Unexpected result: ', result:) + raise ResultError, "Unexpected Response from Handler, must respond with a Success or Failure monad: #{result}" + end + end + end + end + end +end From 7b660a48200342f657fd1dee460b992a91f2f61c Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Wed, 6 Aug 2025 23:48:59 -0500 Subject: [PATCH 7/9] docs: Corrects parameter documentation --- lib/leopard/nats_api_client.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/leopard/nats_api_client.rb b/lib/leopard/nats_api_client.rb index 383cd7d..0b4ed72 100644 --- a/lib/leopard/nats_api_client.rb +++ b/lib/leopard/nats_api_client.rb @@ -180,10 +180,8 @@ def logger = self.class.logger # Sets up a worker thread for the NATS API server. # This method connects to the NATS server, adds the service, groups, and endpoints, # - # @param url [String] The URL of the NATS server. - # @param opts [Hash] Options for the NATS service. - # @param eps [Array] The list of endpoints to add. - # @param gps [Hash] The groups to add. + # @param nats_url [String] The URL of the NATS server. + # @param service_opts [Hash] Options for the NATS service. # # @return [void] def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) From c8122e32d756e9c3609327b5a70b9b38ab6e022b Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 7 Aug 2025 07:10:57 -0500 Subject: [PATCH 8/9] fix: Removes stray copied client --- lib/leopard/nats_api_client.rb | 337 --------------------------------- 1 file changed, 337 deletions(-) delete mode 100644 lib/leopard/nats_api_client.rb diff --git a/lib/leopard/nats_api_client.rb b/lib/leopard/nats_api_client.rb deleted file mode 100644 index 0b4ed72..0000000 --- a/lib/leopard/nats_api_client.rb +++ /dev/null @@ -1,337 +0,0 @@ -# frozen_string_literal: true - -require 'nats/client' -require 'dry/monads' -require 'dry/configurable' -require 'concurrent' -require_relative '../leopard' -require_relative 'message_wrapper' - -module Rubyists - module Leopard - module NatsApiServer - include Dry::Monads[:result] - extend Dry::Monads[:result] - - def self.included(base) - base.extend(ClassMethods) - base.include(InstanceMethods) - base.extend(Dry::Monads[:result]) - base.extend(Dry::Configurable) - base.setting :logger, default: Rubyists::Leopard.logger, reader: true - end - - Endpoint = Struct.new(:name, :subject, :queue, :group, :handler) - - module ClassMethods - def endpoints = @endpoints ||= [] - def groups = @groups ||= {} - def middleware = @middleware ||= [] - - # Define an endpoint for the NATS API server. - # - # @param name [String] The name of the endpoint. - # @param subject [String, nil] The NATS subject to listen on. Defaults to the endpoint name. - # @param queue [String, nil] The NATS queue group to use. Defaults to nil. - # @param group [String, nil] The group this endpoint belongs to. Defaults to nil. - # @param handler [Proc] The block that will handle incoming messages. - # - # @return [void] - def endpoint(name, subject: nil, queue: nil, group: nil, &handler) - subject ||= name - endpoints << Endpoint.new(name:, subject:, queue:, group:, handler:) - end - - # Define a group for organizing endpoints. - # - # @param name [String] The name of the group. - # @param group [String, nil] The parent group this group belongs to. Defaults to nil. - # @param queue [String, nil] The NATS queue group to use for this group. Defaults to nil. - # - # @return [void] - def group(name, group: nil, queue: nil) - groups[name] = { name:, parent: group, queue: } - end - - # Use a middleware class for processing messages. - # - # @param klass [Class] The middleware class to use. - # @param args [Array] Optional arguments to pass to the middleware class. - # @param block [Proc] Optional block to pass to the middleware class. - # - # @return [void] - def use(klass, *args, &block) - middleware << [klass, args, block] - end - - # Start the NATS API server. - # This method connects to the NATS server and spawns multiple instances of the API server. - # - # @param nats_url [String] The URL of the NATS server to connect to. - # @param service_opts [Hash] Options for the NATS service. - # @param instances [Integer] The number of instances to spawn. Defaults to 1. - # @param blocking [Boolean] If false, does not block current thread after starting the server. Defaults to true. - # - # @return [void] - def run(nats_url:, service_opts:, instances: 1, blocking: true) - logger.info 'Booting NATS API server...' - workers = Concurrent::Array.new - pool = spawn_instances(nats_url, service_opts, instances, workers, blocking) - logger.info 'Setting up signal trap...' - trap_signals(workers, pool) - return pool unless blocking - - sleep - end - - private - - # Spawns multiple instances of the NATS API server. - # - # @param url [String] The URL of the NATS server. - # @param opts [Hash] Options for the NATS service. - # @param count [Integer] The number of instances to spawn. - # @param workers [Array] The array to store worker instances. - # @param blocking [Boolean] If false, does not block current thread after starting the server. - # - # @return [Concurrent::FixedThreadPool] The thread pool managing the worker threads. - def spawn_instances(url, opts, count, workers, blocking) - pool = Concurrent::FixedThreadPool.new(count) - @instance_args = opts.delete(:instance_args) || nil - logger.info "Building #{count} workers with options: #{opts.inspect}, instance_args: #{@instance_args}" - raise ArgumentError, 'instance_args must be a Hash' if @instance_args && !@instance_args.is_a?(Hash) - - count.times do - pool.post { build_worker(url, opts, workers, blocking) } - end - pool - end - - # Builds a worker instance and sets it up with the NATS server. - # - # @param nats_url [String] The URL of the NATS server. - # @param service_opts [Hash] Options for the NATS service. - # @param workers [Array] The array to store worker instances. - # @param blocking [Boolean] If true, blocks the current thread until the worker is set up. - # - # @return [void] - def build_worker(nats_url, service_opts, workers, blocking) - worker = @instance_args ? new(**@instance_args) : new - workers << worker - args = { nats_url:, service_opts: } - return worker.setup_worker!(**args) if blocking - - worker.setup_worker(**args) - end - - # Shuts down the NATS API server gracefully. - # - # @param workers [Array] The array of worker instances to stop. - # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. - # - # @return [Proc] A lambda that performs the shutdown operations. - def shutdown(workers, pool) - lambda do - logger.warn 'Draining worker subscriptions...' - workers.each(&:stop) - logger.warn 'All workers stopped, shutting down pool...' - pool.shutdown - logger.warn 'Pool is shut down, waiting for termination...' - pool.wait_for_termination - logger.warn 'Bye bye!' - wake_main_thread - end - end - - # Sets up signal traps for graceful shutdown of the NATS API server. - # - # @param workers [Array] The array of worker instances to stop on signal. - # @param pool [Concurrent::FixedThreadPool] The thread pool managing the worker threads. - # - # @return [void] - def trap_signals(workers, pool) - return if @trapped - - %w[INT TERM QUIT].each do |sig| - trap(sig) do - logger.warn "Received #{sig} signal, shutting down..." - Thread.new { shutdown(workers, pool).call } - end - end - @trapped = true - end - - # Wakes up the main thread to allow it to continue execution after the server is stopped. - # This is useful when the server is running in a blocking mode. - # If the main thread is not blocked, this method does nothing. - # - # @return [void] - def wake_main_thread - Thread.main.wakeup - rescue ThreadError - nil - end - end - - module InstanceMethods - # Returns the logger configured for the NATS API server. - def logger = self.class.logger - - # Sets up a worker thread for the NATS API server. - # This method connects to the NATS server, adds the service, groups, and endpoints, - # - # @param nats_url [String] The URL of the NATS server. - # @param service_opts [Hash] Options for the NATS service. - # - # @return [void] - def setup_worker(nats_url: 'nats://localhost:4222', service_opts: {}) - @thread = Thread.current - @client = NATS.connect nats_url - @service = @client.services.add(build_service_opts(service_opts:)) - gps = self.class.groups.dup - eps = self.class.endpoints.dup - group_map = add_groups(gps) - add_endpoints eps, group_map - end - - # Sets up a worker thread for the NATS API server and blocks the current thread. - # - # @see #setup_worker - def setup_worker!(nats_url: 'nats://localhost:4222', service_opts: {}) - setup_worker(nats_url:, service_opts:) - sleep - end - - # Stops the NATS API server worker. - def stop - @service&.stop - @client&.close - @thread&.wakeup - rescue ThreadError - nil - end - - private - - # Builds the service options for the NATS service. - # - # @param service_opts [Hash] Options for the NATS service. - # - # @return [Hash] The complete service options including name and version. - def build_service_opts(service_opts:) - { - name: self.class.name.split('::').join('.'), - version: '0.1.0', - }.merge(service_opts) - end - - # Adds groups to the NATS service. - # - # @param gps [Hash] The groups to add, where keys are group names and values are group definitions. - # - # @return [Hash] A map of group names to their created group objects. - def add_groups(gps) - created = {} - gps.each_key { |name| build_group(gps, created, name) } - created - end - - # Builds a group in the NATS service. - # - # @param defs [Hash] The group definitions, where keys are group names and values are group definitions. - # @param cache [Hash] A cache to store already created groups. - # @param name [String] The name of the group to build. - # - # @return [NATS::Group] The created group object. - def build_group(defs, cache, name) - return cache[name] if cache.key?(name) - - gdef = defs[name] - raise ArgumentError, "Group #{name} not defined" unless gdef - - parent = gdef[:parent] ? build_group(defs, cache, gdef[:parent]) : @service - cache[name] = parent.groups.add(gdef[:name], queue: gdef[:queue]) - end - - # Adds endpoints to the NATS service. - # - # @param endpoints [Array] The list of endpoints to add. - # @param group_map [Hash] A map of group names to their created group objects. - # - # @return [void] - def add_endpoints(endpoints, group_map) - endpoints.each do |ep| - grp = ep.group - parent = grp ? group_map[grp] : @service - raise ArgumentError, "Group #{grp} not defined" if grp && parent.nil? - - build_endpoint(parent, ep) - end - end - - # Builds an endpoint in the NATS service. - # - # @param parent [NATS::Group] The parent group or service to add the endpoint to. - # @param ept [Endpoint] The endpoint definition containing name, subject, queue, and handler. - # NOTE: Named ept because `endpoint` is a DSL method we expose, to avoid confusion. - # - # @return [void] - def build_endpoint(parent, ept) - parent.endpoints.add(ept.name, subject: ept.subject, queue: ept.queue) do |raw_msg| - wrapper = MessageWrapper.new(raw_msg) - dispatch_with_middleware(wrapper, ept.handler) - end - end - - # Dispatches a message through the middleware stack and handles it with the provided handler. - # - # @param wrapper [MessageWrapper] The message wrapper containing the raw message. - # @param handler [Proc] The handler to process the message. - # - # @return [void] - def dispatch_with_middleware(wrapper, handler) - app = ->(w) { handle_message(w.raw, handler) } - self.class.middleware.reverse_each do |(klass, args, blk)| - app = klass.new(app, *args, &blk) - end - app.call(wrapper) - end - - # Handles a raw NATS message using the provided handler. - # - # @param raw_msg [NATS::Message] The raw NATS message to handle. - # @param handler [Proc] The handler to process the message. - # - # @return [void] - def handle_message(raw_msg, handler) - wrapper = MessageWrapper.new(raw_msg) - result = instance_exec(wrapper, &handler) - process_result(wrapper, result) - rescue StandardError => e - logger.error 'Error processing message: ', e - wrapper.respond_with_error(e.message) - end - - # Processes the result of the handler execution. - # - # @param wrapper [MessageWrapper] The message wrapper containing the raw message. - # @param result [Dry::Monads::Result] The result of the handler execution. - # - # @return [void] - # @raise [ResultError] If the result is not a Success or Failure monad. - def process_result(wrapper, result) - case result - in Dry::Monads::Success - wrapper.respond(result.value!) - in Dry::Monads::Failure - logger.error 'Error processing message: ', result.failure - wrapper.respond_with_error(result.failure) - else - logger.error('Unexpected result: ', result:) - raise ResultError, "Unexpected Response from Handler, must respond with a Success or Failure monad: #{result}" - end - end - end - end - end -end From 8b5d3b6c5122ab6eb08238800ac6457c5ff6afc9 Mon Sep 17 00:00:00 2001 From: "Tj (bougyman) Vanderpoel" Date: Thu, 7 Aug 2025 07:12:43 -0500 Subject: [PATCH 9/9] chore: Updates gem dependencies --- Gemfile.lock | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index d37b6b3..96ee7e6 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -28,7 +28,7 @@ GEM dry-core (~> 1.1) zeitwerk (~> 2.6) io-console (0.8.1) - json (2.13.1) + json (2.13.2) language_server-protocol (3.17.0.5) lint_roller (1.1.0) logger (1.7.0) @@ -53,10 +53,10 @@ GEM racc (1.8.1) rainbow (3.1.1) rake (13.3.0) - regexp_parser (2.10.0) + regexp_parser (2.11.0) reline (0.6.2) io-console (~> 0.5) - rubocop (1.79.0) + rubocop (1.79.2) json (~> 2.3) language_server-protocol (~> 3.17.0.2) lint_roller (~> 1.1.0) @@ -66,7 +66,6 @@ GEM regexp_parser (>= 2.9.3, < 3.0) rubocop-ast (>= 1.46.0, < 2.0) ruby-progressbar (~> 1.7) - tsort (>= 0.2.0) unicode-display_width (>= 2.4.0, < 4.0) rubocop-ast (1.46.0) parser (>= 3.3.7.2) @@ -92,7 +91,6 @@ GEM simplecov_json_formatter (~> 0.1) simplecov-html (0.13.2) simplecov_json_formatter (0.1.4) - tsort (0.2.0) unicode-display_width (3.1.4) unicode-emoji (~> 4.0, >= 4.0.4) unicode-emoji (4.0.4)