10
\$\begingroup\$

I wrote the following Ruby script several years ago and have been using it often ever since on a Bioinformatics computer cluster.

It pulls out a list of hosts from the Torque queuing system qnodes. It ssh'es and runs a command on all the nodes. Then it prints the output and/or errors in a defined order (alphabetical sort of the hostnames).

A nice feature: results are printed immediately for the host that is next in the order.

on-all-nodes-run

I would like to use it as an example for a Ruby workshop. Could you please suggest best-practice and design pattern improvements?

#!/usr/bin/ruby
EXCLUDE = [/girkelab/, /biocluster/, /parrot/, /owl/] 
require "open3"
# Non-interactive, no password asking, and seasonable timeouts
SSH_OPTIONS = ["-o PreferredAuthentications=publickey,hostbased,gssapi,gssapi-with-mic",
 "-o ForwardX11=no",
 "-o BatchMode=yes",
 "-o SetupTimeOut=5",
 "-o ServerAliveInterval=5",
 "-o ServerAliveCountMax=2"
 ].join(" ")
SSH = "/usr/bin/ssh #{SSH_OPTIONS}"
MKDIR = "/bin/mkdir"
raise "Pleae give this command at least one argument" if ARGV.size < 1
COMMAND = ARGV[0..-1].join(' ')
output_o = {}
output_e = {}
IO_CONNECTIONS_TO_REMOTE_PROCESSES = {}
def on_all_nodes(&block)
 nodes = []
 Kernel.open('|qnodes | grep -v "^ " | grep -v "^$"') do |f|
 while line = f.gets
 i = line.split(' ').first
 nodes.push(i) if EXCLUDE.select{|x| i =~ x}.empty?
 end
 end
 nodes.sort.each {|n| block.call(n)}
end
# Create processes
on_all_nodes do |node|
 stdin, stdout, stderr = Open3.popen3("#{SSH} #{node} \"#{COMMAND}\"")
 IO_CONNECTIONS_TO_REMOTE_PROCESSES[node] = [stdin, stdout, stderr]
end
has_remote_errors = false
# Collect results
on_all_nodes do |node|
 stdin, stdout, stderr = IO_CONNECTIONS_TO_REMOTE_PROCESSES[node]
 stdin.close
 e_thread = Thread.new do
 while line = stderr.gets
 line.chomp!
 STDERR.puts "#{node} ERROR: #{line}"
 has_remote_errors = true
 end
 end
 o_thread = Thread.new do
 while line = stdout.gets
 line.chomp!
 puts "#{node} : #{line}"
 end
 end
 # Let the threads finish
 t1 = nil
 t2 = nil
 while [t1, t2].include? nil
 if t1.nil?
 t1 = e_thread.join(0.1) # Gives 1/10 of a second to STDERR
 end
 if t2.nil?
 t2 = o_thread.join(0.1) # Give 1/10 of a second to STDOUT
 end
 end
end
exit(1) if has_remote_errors
200_success
145k22 gold badges190 silver badges478 bronze badges
asked Mar 7, 2011 at 2:44
\$\endgroup\$

2 Answers 2

10
\$\begingroup\$

First of all, take a look at the Net::SSH library. I don't have much experience with it, so I don't know whether it supports all the options you need. But if it does, using it might be more robust than using the command line utility (you wouldn't have to worry about whether ssh is installed in the place you expect (or at all) and you wouldn't have to worry about escaping the arguments).

Assuming you can't (or won't) use Net::SSH, you should at least replace /usr/bin/ssh with just ssh, so at least it still works if ssh is installed in another location in the PATH.


nodes = []
Kernel.open('|qnodes | grep -v "^ " | grep -v "^$"') do |f|
 while line = f.gets
 i = line.split(' ').first
 nodes.push(i) if EXCLUDE.select{|x| i =~ x}.empty?
 end
end
  1. When you initialize an empty array and then append to it in a loop, that is often a good sign you want to use map and/or select instead.
  2. line = f.gets is a bit of an anti-pattern in ruby. The IO class already has methods to iterate a file line-wise.
  3. To find out whether none of the elements in an array meet a condition, negating any? seems more idiomatic than building an array with select and check whether it's empty.

    nodes = Kernel.open('|qnodes | grep -v "^ " | grep -v "^$"') do |f|
     f.lines.map do |line|
     line.split(' ').first
     end.reject do |i|
     EXCLUDE.any? {|x| i =~ x}
     end
    end
    

nodes.sort.each {|n| block.call(n)}

I would recommend that instead of taking a block and yielding each element, you just return nodes.sort and rename the function to all_nodes. This way you can use all_nodes.each to execute code on all nodes, but you could also use all_nodes.map or all_nodes.select when it makes sense.


Open3.popen3("#{SSH} #{node} \"#{COMMAND}\"")

Note that this will break if COMMAND contains double quotes itself. Generally trying to escape command line arguments by surrounding them with quotes is a bad idea. system and open3 accept multiple arguments exactly to avoid this.

If you make SSH an array (with one element per argument) instead of a string, you can use the multiple-arguments version of popen3 and can thus avoid the fickle solution of adding quote around COMMAND to escape spaces, i.e.:

Open3.popen3(*(SSH + [node, COMMAND]))

IO_CONNECTIONS_TO_REMOTE_PROCESSES = {}
# ...
on_all_nodes do |node|
 stdin, stdout, stderr = Open3.popen3("#{SSH} #{node} \"#{COMMAND}\"")
 IO_CONNECTIONS_TO_REMOTE_PROCESSES[node] = [stdin, stdout, stderr]
end

If you heeded my above advice about all_nodes you can simplify this using map. I also would suggest not using a Hash here. If you use an array instead, the nodes will stay in the order in which you inserted them, which will mean that you can iterate over that array instead of invoking all_nodes again.

has_remote_errors = false
all_nodes.map do |node|
 [node, Open3.popen3(*(SSH + [node, COMMAND]))]
end.each do |node, (stdin, stdout, stderr)|
 stdin.close
 ethread = # ...
 # ...
end

This way you removed the complexity of first putting everything into a hash and then getting it out again.


while line = stderr.gets

Again, this can be written more idiomatically as stderr.each_line do |line|. Same with stdout.


first = true

This is never used. I can only assume that it's a left over of previous iterations of the code, which is no longer necessary. Obviously it should be removed.


# Let the threads finish
t1 = nil
t2 = nil
while [t1, t2].include? nil
 if t1.nil?
 t1 = e_thread.join(0.1) # Gives 1/10 of a second to STDERR
 end
 if t2.nil?
 t2 = o_thread.join(0.1) # Give 1/10 of a second to STDOUT
 end
end

I don't see any benefit of doing it this way. Just do:

e_thread.join
o_thread.join

Note that joining on one thread does not mean that the other threads stop running - only the main thread does, but that's perfectly okay as you want that anyway.

answered Mar 7, 2011 at 21:21
\$\endgroup\$
1
  • 2
    \$\begingroup\$ e_thread.join; o_thread.join works just fine. I am not able to reproduce nether dead-locks nor the out-of-order problems. This alone cuts 10 lines. \$\endgroup\$ Commented Mar 8, 2011 at 3:15
3
\$\begingroup\$

One can also use the 'parallel' gem to similar effect:

host_hash = { "hosta" => {}, "hostb" => {} } 
Parallel.each(host_hash.keys, :in_threads => host_hash.keys.size) do |host|
 cmd = "ssh #{host} whoami" # assuming keys are setup.
 Open3.popen3(cmd) do |stdin, stdout, stderr, wait_thr|
 host_hash[host]['stdout'] = stdout.read
 host_hash[host]['stderr'] = stderr.read
 host_hash[host]['exit_code'] = wait_thr.value
 if wait_thr.value != 0
 $stderr.puts "ssh #{host} failed with #{wait_thr.value}"
 end
 end
end
# then loop through hash to see results.

The one issue I see is that you have to wait for the SSH commands to complete before you see any output. There may be a way to 'stream' the output as it happens.

answered Aug 15, 2015 at 7:05
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.