From 80906027868a646243207600172abf5eab672410 Mon Sep 17 00:00:00 2001 From: Erim Icel Date: Mon, 27 Apr 2026 18:01:24 +0100 Subject: [PATCH 1/3] fix(api_call): protect node rotation and health state with a mutex `@current_node_index` and per-node `is_healthy`/`last_access_timestamp` were unsynchronised. With multiple threads sharing one client (Puma, Sidekiq), two threads in `next_node` could compute the same incremented index from the same starting value (skipping a node), and a healthcheck write could be observed mid-update with a stale timestamp. Wraps the round-robin loop in `next_node` and the writes in `set_node_healthcheck` in a single per-instance Mutex. The mutex is acquired briefly (no I/O inside it), so contention is negligible. Adds regression specs covering concurrent multi-node rotation and concurrent single-node health updates. --- lib/typesense/api_call.rb | 24 ++++++++------ spec/typesense/api_call_spec.rb | 57 +++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/lib/typesense/api_call.rb b/lib/typesense/api_call.rb index 003b6a8..8714db0 100644 --- a/lib/typesense/api_call.rb +++ b/lib/typesense/api_call.rb @@ -22,6 +22,7 @@ def initialize(configuration) @logger = @configuration.logger + @nodes_mutex = Mutex.new initialize_metadata_for_nodes @current_node_index = -1 end @@ -141,14 +142,17 @@ def next_node # Fallback to nodes as usual @logger.debug "Nodes health: #{@nodes.each_with_index.map { |node, i| "Node #{i} is #{node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" }.join(' || ')}" - candidate_node = nil - (0..@nodes.length).each do |_i| - @current_node_index = (@current_node_index + 1) % @nodes.length - candidate_node = @nodes[@current_node_index] - if candidate_node[:is_healthy] == true || node_due_for_healthcheck?(candidate_node) - @logger.debug "Updated current node to Node #{candidate_node[:index]}" - return candidate_node + candidate_node = @nodes_mutex.synchronize do + node = nil + (0..@nodes.length).each do |_i| + @current_node_index = (@current_node_index + 1) % @nodes.length + node = @nodes[@current_node_index] + if node[:is_healthy] == true || node_due_for_healthcheck?(node) + @logger.debug "Updated current node to Node #{node[:index]}" + return node + end end + node end # None of the nodes are marked healthy, but some of them could have become healthy since last health check. @@ -175,8 +179,10 @@ def initialize_metadata_for_nodes end def set_node_healthcheck(node, is_healthy:) - node[:is_healthy] = is_healthy - node[:last_access_timestamp] = Time.now.to_i + @nodes_mutex.synchronize do + node[:is_healthy] = is_healthy + node[:last_access_timestamp] = Time.now.to_i + end end def custom_exception_klass_for(response) diff --git a/spec/typesense/api_call_spec.rb b/spec/typesense/api_call_spec.rb index 5e35313..720a873 100644 --- a/spec/typesense/api_call_spec.rb +++ b/spec/typesense/api_call_spec.rb @@ -258,4 +258,61 @@ it_behaves_like 'General error handling', :delete it_behaves_like 'Node selection', :delete end + + describe 'concurrent node rotation' do + it 'distributes selection evenly across nodes when called from many threads' do + thread_count = 16 + iterations_per_thread = 90 + num_nodes = typesense.configuration.nodes.length + + counts = Array.new(num_nodes, 0) + counts_mutex = Mutex.new + + threads = Array.new(thread_count) do + Thread.new do + local_counts = Array.new(num_nodes, 0) + iterations_per_thread.times do + node = api_call.send(:next_node) + local_counts[node[:index]] += 1 + end + counts_mutex.synchronize do + local_counts.each_with_index { |c, i| counts[i] += c } + end + end + end + threads.each(&:join) + + expected_per_node = (thread_count * iterations_per_thread) / num_nodes + expect(counts).to all(eq(expected_per_node)) + end + + context 'with a single node' do + let(:typesense) do + Typesense::Client.new( + api_key: 'abcd', + nodes: [{ host: 'node0', port: 8108, protocol: 'http' }], + connection_timeout_seconds: 10, + retry_interval_seconds: 0.01, + log_level: Logger::ERROR + ) + end + + it 'returns the single node and keeps health state consistent under concurrent writes' do + node = typesense.configuration.nodes[0] + + threads = Array.new(8) do |i| + Thread.new do + 50.times do + api_call.send(:set_node_healthcheck, node, is_healthy: i.even?) + api_call.send(:next_node) + end + end + end + threads.each(&:join) + + expect(node[:is_healthy]).to be(true).or be(false) + expect(node[:last_access_timestamp]).to be_a(Integer) + end + end + end end From 1b3d4ffc2d18035ca07855f07ee5bdc7577f84a0 Mon Sep 17 00:00:00 2001 From: Erim Icel Date: Wed, 29 Apr 2026 10:14:07 +0100 Subject: [PATCH 2/3] Add concurrent call tests for nodes --- lib/typesense/api_call.rb | 43 ++++++++++---------- spec/typesense/api_call_spec.rb | 71 +++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 22 deletions(-) diff --git a/lib/typesense/api_call.rb b/lib/typesense/api_call.rb index 8714db0..1389e1a 100644 --- a/lib/typesense/api_call.rb +++ b/lib/typesense/api_call.rb @@ -130,35 +130,34 @@ def uri_for(endpoint, node) # But if no healthy nodes are found, it will just return the next node, even if it's unhealthy # so we can try the request for good measure, in case that node has become healthy since def next_node - # Check if nearest_node is set and is healthy, if so return it - unless @nearest_node.nil? - @logger.debug "Nodes health: Node #{@nearest_node[:index]} is #{@nearest_node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" - if @nearest_node[:is_healthy] == true || node_due_for_healthcheck?(@nearest_node) - @logger.debug "Updated current node to Node #{@nearest_node[:index]}" - return @nearest_node + @nodes_mutex.synchronize do + # Check if nearest_node is set and is healthy, if so return it + unless @nearest_node.nil? + @logger.debug "Nodes health: Node #{@nearest_node[:index]} is #{@nearest_node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" + if @nearest_node[:is_healthy] == true || node_due_for_healthcheck?(@nearest_node) + @logger.debug "Updated current node to Node #{@nearest_node[:index]}" + return @nearest_node + end + @logger.debug 'Falling back to individual nodes' end - @logger.debug 'Falling back to individual nodes' - end - # Fallback to nodes as usual - @logger.debug "Nodes health: #{@nodes.each_with_index.map { |node, i| "Node #{i} is #{node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" }.join(' || ')}" - candidate_node = @nodes_mutex.synchronize do - node = nil + # Fallback to nodes as usual + @logger.debug "Nodes health: #{@nodes.each_with_index.map { |node, i| "Node #{i} is #{node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" }.join(' || ')}" + candidate_node = nil (0..@nodes.length).each do |_i| @current_node_index = (@current_node_index + 1) % @nodes.length - node = @nodes[@current_node_index] - if node[:is_healthy] == true || node_due_for_healthcheck?(node) - @logger.debug "Updated current node to Node #{node[:index]}" - return node + candidate_node = @nodes[@current_node_index] + if candidate_node[:is_healthy] == true || node_due_for_healthcheck?(candidate_node) + @logger.debug "Updated current node to Node #{candidate_node[:index]}" + return candidate_node end end - node - end - # None of the nodes are marked healthy, but some of them could have become healthy since last health check. - # So we will just return the next node. - @logger.debug "No healthy nodes were found. Returning the next node, Node #{candidate_node[:index]}" - candidate_node + # None of the nodes are marked healthy, but some of them could have become healthy since last health check. + # So we will just return the next node. + @logger.debug "No healthy nodes were found. Returning the next node, Node #{candidate_node[:index]}" + candidate_node + end end def node_due_for_healthcheck?(node) diff --git a/spec/typesense/api_call_spec.rb b/spec/typesense/api_call_spec.rb index 720a873..d3848cc 100644 --- a/spec/typesense/api_call_spec.rb +++ b/spec/typesense/api_call_spec.rb @@ -286,6 +286,37 @@ expect(counts).to all(eq(expected_per_node)) end + it 'never returns a node held unhealthy while next_node is called concurrently' do + unhealthy_node = api_call.instance_variable_get(:@nodes)[1] + api_call.send(:set_node_healthcheck, unhealthy_node, is_healthy: false) + + threads = Array.new(8) do + Thread.new do + Array.new(200) { api_call.send(:next_node)[:index] } + end + end + + results = threads.flat_map(&:value) + expect(results).not_to include(1) + expect(results).to include(0).and include(2) + end + + it 'still returns a node when every node is unhealthy under concurrent calls' do + nodes = api_call.instance_variable_get(:@nodes) + nodes.each { |node| api_call.send(:set_node_healthcheck, node, is_healthy: false) } + + threads = Array.new(8) do + Thread.new do + Array.new(50) { api_call.send(:next_node) } + end + end + + results = threads.flat_map(&:value) + expect(results.length).to eq(8 * 50) + expect(results).to all(be_a(Hash)) + expect(results.map { |n| n[:index] }).to all(be_between(0, nodes.length - 1).inclusive) + end + context 'with a single node' do let(:typesense) do Typesense::Client.new( @@ -314,5 +345,45 @@ expect(node[:last_access_timestamp]).to be_a(Integer) end end + + context 'with nearest_node configured' do + let(:typesense) do + Typesense::Client.new( + api_key: 'abcd', + nearest_node: { host: 'nearestNode', port: 6108, protocol: 'http' }, + nodes: [ + { host: 'node0', port: 8108, protocol: 'http' }, + { host: 'node1', port: 8108, protocol: 'http' }, + { host: 'node2', port: 8108, protocol: 'http' } + ], + connection_timeout_seconds: 10, + retry_interval_seconds: 0.01, + log_level: Logger::ERROR + ) + end + + it 'serializes reads and writes of nearest_node health state under concurrent access' do + nearest_node = api_call.instance_variable_get(:@nearest_node) + + writer_threads = Array.new(4) do |i| + Thread.new do + 100.times { api_call.send(:set_node_healthcheck, nearest_node, is_healthy: i.even?) } + end + end + + reader_threads = Array.new(8) do + Thread.new do + Array.new(100) { api_call.send(:next_node) } + end + end + + writer_threads.each(&:join) + reader_results = reader_threads.flat_map(&:value) + + expect(reader_results).to all(be_a(Hash)) + expect([true, false]).to include(nearest_node[:is_healthy]) + expect(nearest_node[:last_access_timestamp]).to be_a(Integer) + end + end end end From 65a826c962f096c0bd5fa0ba74b022915a4ed893 Mon Sep 17 00:00:00 2001 From: Erim Icel Date: Wed, 29 Apr 2026 10:32:37 +0100 Subject: [PATCH 3/3] Update api_call_spec.rb --- spec/typesense/api_call_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/typesense/api_call_spec.rb b/spec/typesense/api_call_spec.rb index d3848cc..224ad35 100644 --- a/spec/typesense/api_call_spec.rb +++ b/spec/typesense/api_call_spec.rb @@ -381,7 +381,7 @@ reader_results = reader_threads.flat_map(&:value) expect(reader_results).to all(be_a(Hash)) - expect([true, false]).to include(nearest_node[:is_healthy]) + expect(nearest_node[:is_healthy]).to be(true).or be(false) expect(nearest_node[:last_access_timestamp]).to be_a(Integer) end end