@@ -4,10 +4,13 @@ class RedisCluster
44 MAX_REDIRECTIONS = 16
55 DEFAULT_MAX_CACHED_CONNECTIONS = 2
66
7+ attr_reader :nodes
8+ 79 def initialize ( startup_nodes , max_cached_connections = nil )
810 @startup_nodes = startup_nodes
911 @max_cached_connections = max_cached_connections || DEFAULT_MAX_CACHED_CONNECTIONS
1012
13+ @nodes = { }
1114 @slots = { }
1215 @connections = { }
1316 @refresh_slots_cache = false
@@ -20,9 +23,9 @@ def method_missing(*argv)
2023 end
2124
2225 def cluster_slots
23- @startup_nodes . each do |n |
26+ @nodes . each do |id , node |
2427 begin
25- redis = Redis . new ( n [ :host ] , n [ :port ] )
28+ redis = Redis . new ( node [ :host ] , node [ :port ] )
2629 return redis . cluster ( 'slots' )
2730 rescue
2831 next
@@ -31,42 +34,44 @@ def cluster_slots
3134 raise 'Error: failed to get cluster slots'
3235 end
3336
34- def cluster_nodes
35- @startup_nodes . each do |n |
37+ def get_cluster_nodes ( nodes )
38+ nodes . each do |n |
3639 begin
3740 redis = Redis . new ( n [ :host ] , n [ :port ] )
3841 resp = redis . cluster ( 'nodes' )
3942 rescue
4043 next
4144 end
4245
43- nodes = [ ]
46+ ret = { }
4447 resp . split ( "\n " ) . each do |r |
4548 id , ip_port , flags = r . split ( ' ' )
4649 host , port = ip_port . split ( ':' )
47- nodes << {
48- id : id ,
50+ flags = flags . split ( ',' )
51+ flags . delete ( 'myself' )
52+ ret [ id ] = {
4953 host : host ,
5054 port : port . to_i ,
5155 name : "#{ host } :#{ port } " ,
5256 flags : flags
5357 }
5458 end
55- return nodes
59+ return ret
5660 end
5761 raise 'Error: failed to get cluster nodes'
5862 end
5963
6064 def initialize_slots_cache
61- @startup_nodes = cluster_nodes
65+ @nodes = if @nodes . empty?
66+ get_cluster_nodes ( @startup_nodes )
67+ else
68+ get_cluster_nodes ( @nodes . values )
69+ end
70+ 6271 cluster_slots . each do |r |
6372 ( r [ 0 ] ..r [ 1 ] ) . each do |slot |
64- host , port = r [ 2 ]
65- node = { host : host , port : port , name : "#{ host } :#{ port } " }
66- @slots [ slot ] = node
67- unless @startup_nodes . include? ( node )
68- @startup_nodes << node
69- end
73+ id = r [ 2 ] [ 2 ]
74+ @slots [ slot ] = @nodes [ id ]
7075 end
7176 end
7277
@@ -102,8 +107,10 @@ def send_cluster_command(argv)
102107 @refresh_slots_cache = true
103108 err , newslot , ip_port = e . message . split
104109 host , port = ip_port . split ( ':' )
110+ port = port . to_i
105111 newslot = newslot . to_i
106- @slots [ newslot ] = { host : host , port : port , name : ip_port }
112+ id , node = @nodes . find { |k , v | v [ :host ] == host && v [ :port ] == port . to_i }
113+ @slots [ newslot ] = node
107114 elsif e . message . start_with? ( 'ASK' )
108115 asking = true
109116 else
@@ -117,7 +124,7 @@ def send_cluster_command(argv)
117124 end
118125
119126 def get_random_connection
120- @startup_nodes . shuffle . each do |node |
127+ @nodes . values . shuffle . each do |node |
121128 conn = @connections [ node [ :name ] ]
122129 begin
123130 if conn . nil?
0 commit comments