Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit f2a68e4

Browse files
authored
Merge branch 'master' into fix_close_existing_connection
2 parents 7891aa3 + 8e8ef3e commit f2a68e4

File tree

1 file changed

+43
-33
lines changed

1 file changed

+43
-33
lines changed

‎mrblib/mrb_rediscluster.rb‎

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ class RedisCluster
44
MAX_REDIRECTIONS = 16
55
DEFAULT_MAX_CACHED_CONNECTIONS = 2
66

7+
attr_reader :nodes
8+
79
def initialize(startup_nodes, max_cached_connections=nil)
810
@startup_nodes = startup_nodes
911
@max_cached_connections = max_cached_connections || DEFAULT_MAX_CACHED_CONNECTIONS
1012

13+
@nodes = {}
1114
@slots = {}
1215
@connections = {}
1316
@refresh_slots_cache = false
@@ -20,9 +23,9 @@ def method_missing(*argv)
2023
end
2124

2225
def cluster_slots
23-
@startup_nodes.each do |n|
26+
@nodes.each do |id,node|
2427
begin
25-
redis = Redis.new(n[:host], n[:port])
28+
redis = Redis.new(node[:host], node[:port])
2629
return redis.cluster('slots')
2730
rescue
2831
next
@@ -31,42 +34,44 @@ def cluster_slots
3134
raise 'Error: failed to get cluster slots'
3235
end
3336

34-
def cluster_nodes
35-
@startup_nodes.each do |n|
37+
def get_cluster_nodes(nodes)
38+
nodes.each do |node|
3639
begin
37-
redis = Redis.new(n[:host], n[:port])
40+
redis = Redis.new(node[:host], node[:port])
3841
resp = redis.cluster('nodes')
3942
rescue
4043
next
4144
end
4245

43-
nodes = []
46+
ret = {}
4447
resp.split("\n").each do |r|
4548
id, ip_port, flags = r.split(' ')
4649
host, port = ip_port.split(':')
47-
nodes << {
48-
id: id,
50+
flags = flags.split(',')
51+
flags.delete('myself')
52+
ret[id] = {
4953
host: host,
5054
port: port.to_i,
5155
name: "#{host}:#{port}",
5256
flags: flags
5357
}
5458
end
55-
return nodes
59+
return ret
5660
end
5761
raise 'Error: failed to get cluster nodes'
5862
end
5963

6064
def initialize_slots_cache
61-
@startup_nodes = cluster_nodes
65+
@nodes = if @nodes.empty?
66+
get_cluster_nodes(@startup_nodes)
67+
else
68+
get_cluster_nodes(@nodes.values)
69+
end
70+
6271
cluster_slots.each do |r|
6372
(r[0]..r[1]).each do |slot|
64-
host, port = r[2]
65-
node = { host: host, port: port, name: "#{host}:#{port}" }
66-
@slots[slot] = node
67-
unless @startup_nodes.include?(node)
68-
@startup_nodes << node
69-
end
73+
node_id = r[2][2]
74+
@slots[slot] = node_id
7075
end
7176
end
7277

@@ -102,8 +107,10 @@ def send_cluster_command(argv)
102107
@refresh_slots_cache = true
103108
err, newslot, ip_port = e.message.split
104109
host, port = ip_port.split(':')
110+
port = port.to_i
105111
newslot = newslot.to_i
106-
@slots[newslot] = { host: host, port: port, name: ip_port }
112+
id, node = @nodes.find { |k, v| v[:host] == host && v[:port] == port.to_i }
113+
@slots[newslot] = id
107114
elsif e.message.start_with?('ASK')
108115
asking = true
109116
else
@@ -117,52 +124,55 @@ def send_cluster_command(argv)
117124
end
118125

119126
def get_random_connection
120-
@startup_nodes.shuffle.each do |node|
121-
conn = @connections[node[:name]]
127+
e = nil
128+
@nodes.keys.shuffle.each do |node_id|
129+
conn = @connections[node_id]
122130
begin
123131
if conn.nil?
132+
node = @nodes[node_id]
124133
conn = Redis.new(node[:host], node[:port])
125-
if conn.ping == 'PONG'
134+
if conn.ping == "PONG"
126135
close_existing_connection
127-
@connections[node[:name]] = conn
136+
@connections[node_id] = conn
128137
return conn
129138
else
130139
conn.close
131140
end
132141
else
133-
return conn if conn.ping == 'PONG'
142+
return conn if conn.ping == "PONG"
134143
end
135-
rescue
136-
next
144+
rescue=>e
145+
# Just try with the next node.
137146
end
138147
end
139-
raise 'Error: failed to get random connection'
148+
raise "Error: failed to get random connection (#{e})"
140149
end
141150

142151
def get_connection_by(slot)
143-
node = @slots[slot]
144-
return get_random_connection if node.nil?
152+
node_id = @slots[slot]
153+
return get_random_connection if node_id.nil?
145154

146-
if ! @connections[node[:name]]
155+
if ! @connections[node_id]
147156
close_existing_connection
148-
@connections[node[:name]] = Redis.new(node[:host], node[:port])
157+
node = @nodes[node_id]
158+
@connections[node_id] = Redis.new(node[:host], node[:port])
149159
end
150160

151-
@connections[node[:name]]
161+
@connections[node_id]
152162
end
153163

154164
def close_existing_connection
155165
while @connections.length > @max_cached_connections
156-
name, conn = @connections.shift
166+
id, conn = @connections.shift
157167
conn.close
158168
end
159169
end
160170

161171
def close_all_connections
162-
@connections.each do |name, conn|
163-
name, conn = @connections.shift
172+
@connections.each do |id, conn|
164173
conn.close
165174
end
175+
@connections.clear
166176
end
167177

168178
def extract_key(argv)

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /