Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,27 @@ Here are some examples with inline comments that walk you through how to use the

Tests are also a good place to know how the the library works internally: [spec](spec)

### Keep-alive connections

By default, the client opens a fresh HTTP connection (and TLS handshake) for every request. For high-traffic applications this can dominate request latency. Setting `keep_alive_connections: true` enables persistent connections via the `:net_http_persistent` Faraday adapter:

```ruby
Typesense::Client.new(
api_key: ENV['TYPESENSE_API_KEY'],
nodes: [{ host: 'localhost', port: 8108, protocol: 'https' }],
connection_timeout_seconds: 3,
num_retries: 1,
keep_alive_connections: true
)
```

Notes:

- Connections are cached per `(thread, node)`. `Net::HTTP` is not thread-safe, so each thread maintains its own keep-alive socket to each Typesense node, and the existing node round-robin still works.
- A cached connection is dropped automatically when a network error occurs, so retries open a fresh socket. We recommend setting `num_retries` to at least `1` so the gem can recover from a server- or load-balancer-side idle timeout transparently.
- Idle sockets are closed after 30 seconds; tune your load balancer's idle timeout to match or exceed this.
- The option defaults to `false`, so upgrading the gem does not change behaviour until you opt in.

## Compatibility

| Typesense Server | typesense-ruby |
Expand Down
66 changes: 61 additions & 5 deletions lib/typesense/api_call.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
module Typesense
class ApiCall
API_KEY_HEADER_NAME = 'X-TYPESENSE-API-KEY'
KEEP_ALIVE_IDLE_TIMEOUT_SECONDS = 30

attr_reader :logger

Expand All @@ -19,9 +20,16 @@ def initialize(configuration)
@healthcheck_interval_seconds = @configuration.healthcheck_interval_seconds
@num_retries_per_request = @configuration.num_retries
@retry_interval_seconds = @configuration.retry_interval_seconds
@keep_alive_connections = @configuration.keep_alive_connections

@logger = @configuration.logger

# Per-instance key for the thread-local connection cache so multiple
# Typesense::Client instances in the same process do not share sockets.
@thread_connections_key = :"_typesense_api_call_connections_#{object_id}"

require 'faraday/net_http_persistent' if @keep_alive_connections

initialize_metadata_for_nodes
@current_node_index = -1
end
Expand Down Expand Up @@ -69,14 +77,11 @@ def perform_request(method, endpoint, query_parameters: nil, body_parameters: ni
@logger.debug "Attempting #{method.to_s.upcase} request Try ##{num_tries} to Node #{node[:index]}"

begin
conn = Faraday.new(uri_for(endpoint, node)) do |f|
f.options.timeout = @connection_timeout_seconds
f.options.open_timeout = @connection_timeout_seconds
end
conn, request_path = connection_and_path_for(endpoint, node)

headers = default_headers.merge(additional_headers)

response = conn.send(method) do |req|
response = conn.send(method, request_path) do |req|
req.headers = headers
req.params = query_parameters unless query_parameters.nil?
unless body_parameters.nil?
Expand Down Expand Up @@ -108,6 +113,9 @@ def perform_request(method, endpoint, query_parameters: nil, body_parameters: ni
# Rescue network layer exceptions and HTTP 5xx errors, so the loop can continue.
# Using loops for retries instead of rescue...retry to maintain consistency with client libraries in
# other languages that might not support the same construct.
# Drop the cached keep-alive connection (if any): the underlying socket is likely
# half-closed and reusing it would just fail again on retry.
discard_connection(node) if @keep_alive_connections
set_node_healthcheck(node, is_healthy: false)
last_exception = e
@logger.warn "Request #{method}:#{uri_for(endpoint, node)} to Node #{node[:index]} failed due to \"#{e.class}: #{e.message}\""
Expand All @@ -125,6 +133,54 @@ def uri_for(endpoint, node)
"#{node[:protocol]}://#{node[:host]}:#{node[:port]}#{endpoint}"
end

# Returns [connection, request_path]. When keep-alive is enabled, the connection
# is cached per (thread, node) and the path is appended at request time. When it
# is disabled, the original behaviour is preserved: a fresh Faraday is built for
# the full per-request URL, so existing callers and stubs see no change.
def connection_and_path_for(endpoint, node)
if @keep_alive_connections
[connection_for(node), endpoint]
else
[build_one_shot_connection(endpoint, node), nil]
end
end

def build_one_shot_connection(endpoint, node)
Faraday.new(uri_for(endpoint, node)) do |f|
f.options.timeout = @connection_timeout_seconds
f.options.open_timeout = @connection_timeout_seconds
end
end

# Net::HTTP is not thread-safe, so connections are cached per (thread, node)
# rather than shared across threads.
def connection_for(node)
thread_connections[connection_key(node)] ||= build_keep_alive_connection(node)
end

def discard_connection(node)
conn = thread_connections.delete(connection_key(node))
conn&.close if conn.respond_to?(:close)
end

def thread_connections
Thread.current[@thread_connections_key] ||= {}
end

def connection_key(node)
"#{node[:protocol]}://#{node[:host]}:#{node[:port]}"
end

def build_keep_alive_connection(node)
Faraday.new(url: connection_key(node)) do |f|
f.options.timeout = @connection_timeout_seconds
f.options.open_timeout = @connection_timeout_seconds
f.adapter :net_http_persistent, pool_size: 1 do |http|
http.idle_timeout = KEEP_ALIVE_IDLE_TIMEOUT_SECONDS
end
end
end

## Attempts to find the next healthy node, looping through the list of nodes once.
# But if no healthy nodes are found, it will just return the next node, even if it's unhealthy
# so we can try the request for good measure, in case that node has become healthy since
Expand Down
3 changes: 2 additions & 1 deletion lib/typesense/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

module Typesense
class Configuration
attr_accessor :nodes, :nearest_node, :connection_timeout_seconds, :healthcheck_interval_seconds, :num_retries, :retry_interval_seconds, :api_key, :logger, :log_level
attr_accessor :nodes, :nearest_node, :connection_timeout_seconds, :healthcheck_interval_seconds, :num_retries, :retry_interval_seconds, :api_key, :logger, :log_level, :keep_alive_connections

def initialize(options = {})
@nodes = options[:nodes] || []
Expand All @@ -14,6 +14,7 @@ def initialize(options = {})
@num_retries = options[:num_retries] || (@nodes.length + (@nearest_node.nil? ? 0 : 1)) || 3
@retry_interval_seconds = options[:retry_interval_seconds] || 0.1
@api_key = options[:api_key]
@keep_alive_connections = options.fetch(:keep_alive_connections, false)

@logger = options[:logger] || Logger.new($stdout)
@log_level = options[:log_level] || Logger::WARN
Expand Down
96 changes: 96 additions & 0 deletions spec/typesense/api_call_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,100 @@
it_behaves_like 'General error handling', :delete
it_behaves_like 'Node selection', :delete
end

describe 'keep-alive connection caching' do
subject(:api_call) { described_class.new(keep_alive_typesense.configuration) }

let(:keep_alive_typesense) do
Typesense::Client.new(
api_key: 'abcd',
nodes: typesense.configuration.nodes,
connection_timeout_seconds: 10,
retry_interval_seconds: 0.01,
log_level: Logger::ERROR,
keep_alive_connections: true
)
end

let(:node) { keep_alive_typesense.configuration.nodes[0] }

before do
keep_alive_typesense.configuration.nodes.each do |n|
stub_request(:any, api_call.send(:uri_for, '/', n))
.to_return(status: 200, body: JSON.dump('ok' => true), headers: { 'Content-Type' => 'application/json' })
end
end

it 'reuses the same Faraday connection across calls to the same node on the same thread' do
first = api_call.send(:connection_for, node)
second = api_call.send(:connection_for, node)

expect(second).to be(first)
end

it 'caches connections separately per node' do
first_node_conn = api_call.send(:connection_for, keep_alive_typesense.configuration.nodes[0])
second_node_conn = api_call.send(:connection_for, keep_alive_typesense.configuration.nodes[1])

expect(second_node_conn).not_to be(first_node_conn)
end

it 'isolates the cache per thread' do
main_thread_conn = api_call.send(:connection_for, node)

other_thread_conn = Thread.new { api_call.send(:connection_for, node) }.value

expect(other_thread_conn).not_to be(main_thread_conn)
end

it 'isolates the cache per ApiCall instance' do
other_api_call = described_class.new(keep_alive_typesense.configuration)

expect(other_api_call.send(:connection_for, node))
.not_to be(api_call.send(:connection_for, node))
end

it 'evicts the cached connection when a network error occurs so retries open a fresh socket' do
timeout_node = keep_alive_typesense.configuration.nodes[0]
keep_alive_typesense.configuration.nodes.each do |n|
stub_request(:any, api_call.send(:uri_for, '/', n)).to_timeout
end

pre_call_conn = api_call.send(:connection_for, timeout_node)

begin
api_call.get('/')
rescue StandardError
# expected: all nodes time out
end

cache = Thread.current[api_call.instance_variable_get(:@thread_connections_key)] || {}
expect(cache[api_call.send(:connection_key, timeout_node)]).to be_nil

post_retry_conn = api_call.send(:connection_for, timeout_node)
expect(post_retry_conn).not_to be(pre_call_conn)
end

it 'uses the configured timeouts on the cached connection' do
conn = api_call.send(:connection_for, node)

expect(conn.options.timeout).to eq(keep_alive_typesense.configuration.connection_timeout_seconds)
expect(conn.options.open_timeout).to eq(keep_alive_typesense.configuration.connection_timeout_seconds)
end
end

describe 'keep-alive disabled (default)' do
it 'is off by default on the configuration' do
expect(typesense.configuration.keep_alive_connections).to be(false)
end

it 'builds a fresh Faraday connection per request' do
stub_request(:any, api_call.send(:uri_for, '/', typesense.configuration.nodes[0]))
.to_return(status: 200, body: JSON.dump('ok' => true), headers: { 'Content-Type' => 'application/json' })

api_call.get('/')

expect(Thread.current[api_call.instance_variable_get(:@thread_connections_key)]).to be_nil
end
end
end
2 changes: 2 additions & 0 deletions typesense.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Gem::Specification.new do |spec|

spec.add_dependency 'base64', '~> 0.2.0'
spec.add_dependency 'faraday', '~> 2.8'
spec.add_dependency 'faraday-net_http_persistent', '~> 2.0'
spec.add_dependency 'json', '~> 2.9'
spec.add_dependency 'net-http-persistent', '~> 4.0'
spec.metadata['rubygems_mfa_required'] = 'true'
end
Loading