class MCollective::Connector::Rabbitmq

Attributes

connection[R]

Public Class Methods

new() click to toggle source
    # File lib/mcollective/connector/rabbitmq.rb
 96 def initialize
 97   @config = Config.instance
 98   @subscriptions = []
 99   @base64 = false
100   @use_exponential_back_off = get_bool_option("rabbitmq.use_exponential_back_off", "true")
101   @initial_reconnect_delay = Float(get_option("rabbitmq.initial_reconnect_delay", 0.01))
102   @back_off_multiplier = Integer(get_option("rabbitmq.back_off_multiplier", 2))
103   @max_reconnect_delay = Float(get_option("rabbitmq.max_reconnect_delay", 30.0))
104   @reconnect_delay = @initial_reconnect_delay
105 
106   Log.info("RabbitMQ connector initialized.  Using stomp-gem #{stomp_version}")
107 end

Public Instance Methods

connect(connector = ::Stomp::Connection) click to toggle source

Connects to the RabbitMQ middleware

    # File lib/mcollective/connector/rabbitmq.rb
110 def connect(connector = ::Stomp::Connection)
111   if @connection
112     Log.debug("Already connection, not re-initializing connection")
113     return
114   end
115 
116   begin
117     @base64 = get_bool_option("rabbitmq.base64", "false")
118 
119     pools = Integer(get_option("rabbitmq.pool.size"))
120     hosts = []
121     middleware_user = ''
122     middleware_password = ''
123     prompt_for_username = get_bool_option("rabbitmq.prompt_user", "false")
124     prompt_for_password = get_bool_option("rabbitmq.prompt_password", "false")
125     if prompt_for_username
126       Log.debug("No previous user exists and rabbitmq.prompt-user is set to true")
127       print "Please enter user to connect to middleware: "
128       middleware_user = STDIN.gets.chomp
129     end
130 
131     if prompt_for_password
132       Log.debug("No previous password exists and rabbitmq.prompt-password is set to true")
133       middleware_password = MCollective::Util.get_hidden_input("Please enter password: ")
134       print "\n"
135     end
136     
137     1.upto(pools) do |poolnum|
138       host = {}
139 
140       host[:host] = get_option("rabbitmq.pool.#{poolnum}.host")
141       host[:port] = get_option("rabbitmq.pool.#{poolnum}.port", 61613).to_i
142       host[:ssl] = get_bool_option("rabbitmq.pool.#{poolnum}.ssl", "false")
143       
144       # read user from config file
145       host[:login] = get_env_or_option("STOMP_USER", "rabbitmq.pool.#{poolnum}.user", middleware_user)
146       if prompt_for_username and host[:login] != middleware_user
147         Log.info("Using #{host[:login]} from config file to connect to #{host[:host]}. "+
148                   "plugin.rabbitmq.prompt_user should be set to false to remove the prompt.")
149       end
150       
151       # read password from config file
152       host[:passcode] = get_env_or_option("STOMP_PASSWORD", "rabbitmq.pool.#{poolnum}.password", middleware_password)
153       if prompt_for_password and host[:passcode] != middleware_password
154           Log.info("Using password from config file to connect to #{host[:host]}. "+
155                   "plugin.rabbitmq.prompt_password should be set to false to remove the prompt.")
156       end
157      
158       # if ssl is enabled set :ssl to the hash of parameters
159       if host[:ssl]
160         host[:ssl] = ssl_parameters(poolnum, get_bool_option("rabbitmq.pool.#{poolnum}.ssl.fallback", "false"))
161       end
162 
163       Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool")
164       hosts << host
165     end
166 
167     raise "No hosts found for the RabbitMQ connection pool" if hosts.size == 0
168 
169     connection = {:hosts => hosts}
170 
171     # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of
172     # these can be guessed, the documentation isn't clear
173     connection[:use_exponential_back_off] = @use_exponential_back_off
174     connection[:initial_reconnect_delay] = @initial_reconnect_delay
175     connection[:back_off_multiplier] = @back_off_multiplier
176     connection[:max_reconnect_delay] = @max_reconnect_delay
177     connection[:max_reconnect_attempts] = Integer(get_option("rabbitmq.max_reconnect_attempts", 0))
178     connection[:randomize] = get_bool_option("rabbitmq.randomize", "false")
179     connection[:backup] = get_bool_option("rabbitmq.backup", "false")
180 
181     connection[:timeout] = Integer(get_option("rabbitmq.timeout", -1))
182     connection[:connect_timeout] = Integer(get_option("rabbitmq.connect_timeout", 30))
183     connection[:reliable] = true
184     connection[:max_hbrlck_fails] = Integer(get_option("rabbitmq.max_hbrlck_fails", 0))
185     connection[:max_hbread_fails] = Integer(get_option("rabbitmq.max_hbread_fails", 2))
186 
187     connection[:connect_headers] = connection_headers
188 
189     connection[:logger] = EventLogger.new
190 
191     @connection = connector.new(connection)
192 
193   rescue ClientTimeoutError => e
194     raise e
195   rescue Exception => e
196     raise("Could not connect to RabbitMQ Server: #{e}")
197   end
198 end
connection_headers() click to toggle source
    # File lib/mcollective/connector/rabbitmq.rb
200 def connection_headers
201   headers = {:"accept-version" => "1.0"}
202 
203   heartbeat_interval = Integer(get_option("rabbitmq.heartbeat_interval", 0))
204   stomp_1_0_fallback = get_bool_option("rabbitmq.stomp_1_0_fallback", true)
205 
206   headers[:host] = get_option("rabbitmq.vhost", "/")
207 
208   if heartbeat_interval > 0
209     unless stomp_version_supports_heartbeat?
210       raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem")
211     end
212 
213     if heartbeat_interval < 30
214       Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s")
215       heartbeat_interval = 30
216     end
217 
218     heartbeat_interval = heartbeat_interval * 1000
219     headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500]
220 
221     if stomp_1_0_fallback
222       headers[:"accept-version"] = "1.1,1.0"
223     else
224       headers[:"accept-version"] = "1.1"
225     end
226   else
227     if stomp_version_supports_heartbeat?
228       Log.info("Connecting without STOMP 1.1 heartbeats, consider setting plugin.rabbitmq.heartbeat_interval")
229     end
230   end
231 
232   headers
233 end
disconnect() click to toggle source

Disconnects from the RabbitMQ connection

    # File lib/mcollective/connector/rabbitmq.rb
481 def disconnect
482   Log.debug("Disconnecting from RabbitMQ")
483   @connection.disconnect
484   @connection = nil
485 end
exponential_back_off() click to toggle source

Calculate the exponential backoff needed

    # File lib/mcollective/connector/rabbitmq.rb
294 def exponential_back_off
295   if !@use_exponential_back_off
296     return nil
297   end
298 
299   backoff = @reconnect_delay
300 
301   # calculate next delay
302   @reconnect_delay = @reconnect_delay * @back_off_multiplier
303 
304   # cap at max reconnect delay
305   if @reconnect_delay > @max_reconnect_delay
306     @reconnect_delay = @max_reconnect_delay
307   end
308 
309   return backoff
310 end
get_bool_option(val, default) click to toggle source

looks up a boolean value in the config

    # File lib/mcollective/connector/rabbitmq.rb
510 def get_bool_option(val, default)
511   Util.str_to_bool(@config.pluginconf.fetch(val, default))
512 end
get_cert_file(poolnum) click to toggle source

Returns the name of the certificate file used by RabbitMQ Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_CERT exists, where X is the RabbitMQ pool number. If the environment variable doesn't exist, it will try and load the value from the config.

    # File lib/mcollective/connector/rabbitmq.rb
289 def get_cert_file(poolnum)
290   ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_CERT" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.cert", false)
291 end
get_env_or_option(env, opt, default=nil) click to toggle source

looks in the environment first then in the config file for a specific option, accepts an optional default.

raises an exception when it cant find a value anywhere

    # File lib/mcollective/connector/rabbitmq.rb
491 def get_env_or_option(env, opt, default=nil)
492   return ENV[env] if ENV.include?(env)
493   return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
494   return default if default
495 
496   raise("No #{env} environment or plugin.#{opt} configuration option given")
497 end
get_key_file(poolnum) click to toggle source

Returns the name of the private key file used by RabbitMQ Will first check if an environment variable MCOLLECTIVE_RABBITMQ_POOLX_SSL_KEY exists, where X is the RabbitMQ pool number. If the environment variable doesn't exist, it will try and load the value from the config.

    # File lib/mcollective/connector/rabbitmq.rb
281 def get_key_file(poolnum)
282   ENV["MCOLLECTIVE_RABBITMQ_POOL%s_SSL_KEY" % poolnum] || get_option("rabbitmq.pool.#{poolnum}.ssl.key", false)
283 end
get_option(opt, default=nil) click to toggle source

looks for a config option, accepts an optional default

raises an exception when it cant find a value anywhere

    # File lib/mcollective/connector/rabbitmq.rb
502 def get_option(opt, default=nil)
503   return @config.pluginconf[opt] if @config.pluginconf.include?(opt)
504   return default unless default.nil?
505 
506   raise("No plugin.#{opt} configuration option given")
507 end
make_target(agent, type, collective, reply_to=nil, node=nil) click to toggle source
    # File lib/mcollective/connector/rabbitmq.rb
386 def make_target(agent, type, collective, reply_to=nil, node=nil)
387   raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type)
388   raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective)
389 
390   agents_multiplex = get_bool_option("rabbitmq.agents_multiplex", "false")
391   target = {:name => "", :headers => {}, :id => nil}
392 
393   if reply_to
394     reply_path = reply_to
395   elsif get_bool_option("rabbitmq.use_reply_exchange", false)
396     reply_path = "/exchange/mcollective_reply/%s_%s_%s" % [ @config.identity, $$, Client.request_sequence ]
397   else
398     reply_path = "/temp-queue/mcollective_reply_%s" % agent
399   end
400   case type
401     when :reply # receiving replies on a temp queue
402       target[:name] = reply_path
403       target[:id] = "mcollective_%s_replies" % agent
404 
405     when :broadcast, :request # publishing a request to all nodes with an agent
406       if agents_multiplex
407         target[:name] = "/exchange/%s_broadcast" % collective
408         target[:id] = "%s_broadcast" % collective
409       else
410         target[:name] = "/exchange/%s_broadcast/%s" % [collective, agent]
411         target[:id] = "%s_broadcast_%s" % [collective, agent]
412       end
413       if reply_to
414         target[:headers]["reply-to"] = reply_to
415       else
416         target[:headers]["reply-to"] = reply_path
417       end
418 
419     when :direct_request # a request to a specific node
420       raise "Directed requests need to have a node identity" unless node
421 
422       target[:name] = "/exchange/%s_directed/%s" % [ collective, node]
423       target[:headers]["reply-to"] = reply_path
424 
425     when :directed # subscribing to directed messages
426       target[:name] = "/exchange/%s_directed/%s" % [ collective, @config.identity ]
427       target[:id] = "%s_%s_directed_to_identity" % [ collective, @config.identity ]
428   end
429 
430   target
431 end
publish(msg) click to toggle source

Sends a message to the RabbitMQ connection

    # File lib/mcollective/connector/rabbitmq.rb
345 def publish(msg)
346   msg.base64_encode! if @base64
347 
348   if msg.type == :direct_request
349     msg.discovered_hosts.each do |node|
350       target = target_for(msg, node)
351 
352       Log.debug("Sending a direct message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
353 
354       @connection.publish(target[:name], msg.payload, target[:headers])
355     end
356   else
357     target = target_for(msg)
358 
359     Log.debug("Sending a broadcast message to RabbitMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
360 
361     @connection.publish(target[:name], msg.payload, target[:headers])
362   end
363 end
receive() click to toggle source

Receives a message from the RabbitMQ connection

    # File lib/mcollective/connector/rabbitmq.rb
313 def receive
314   Log.debug("Waiting for a message from RabbitMQ")
315 
316   # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection
317   # handling it sets the connection to closed.  If we happen to be receiving at just
318   # that time we will get an exception warning about the closed connection so handling
319   # that here with a sleep and a retry.
320   begin
321     msg = @connection.receive
322   rescue ::Stomp::Error::NoCurrentConnection
323     sleep 1
324     retry
325   end
326 
327   # In older stomp gems an attempt to receive after failed authentication can return nil
328   if msg.nil?
329     raise MessageNotReceived.new(exponential_back_off), "No message received from RabbitMQ."
330   end
331 
332   raise "Received a processing error from RabbitMQ: '%s'" % msg.body.chomp if msg.body =~ /Processing error/
333 
334   # We expect all messages we get to be of STOMP frame type MESSAGE, raise on unexpected types
335   if msg.command != 'MESSAGE'
336     Log.debug("Unexpected '#{msg.command}' frame.  Headers: #{msg.headers.inspect} Body: #{msg.body.inspect}")
337     raise UnexpectedMessageType.new(exponential_back_off),
338       "Received frame of type '#{msg.command}' expected 'MESSAGE'"
339   end
340 
341   Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers)
342 end
ssl_parameters(poolnum, fallback) click to toggle source

Sets the SSL paramaters for a specific connection

    # File lib/mcollective/connector/rabbitmq.rb
244 def ssl_parameters(poolnum, fallback)
245   params = {
246     :cert_file => get_cert_file(poolnum),
247     :key_file  => get_key_file(poolnum),
248     :ts_files  => get_option("rabbitmq.pool.#{poolnum}.ssl.ca", false),
249     :ciphers   => get_option("rabbitmq.pool.#{poolnum}.ssl.ciphers", false),
250   }
251 
252   raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files]
253 
254   raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file])
255   raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file])
256 
257   params[:ts_files].split(",").each do |ca|
258     raise "Cannot find CA file #{ca}" unless File.exist?(ca)
259   end
260 
261   begin
262     ::Stomp::SSLParams.new(params)
263   rescue NameError
264     raise "Stomp gem >= 1.2.2 is needed"
265   end
266 
267 rescue Exception => e
268   if fallback
269     Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}")
270     return true
271   else
272     Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}")
273     raise(e)
274   end
275 end
stomp_version() click to toggle source
    # File lib/mcollective/connector/rabbitmq.rb
235 def stomp_version
236   ::Stomp::Version::STRING
237 end
stomp_version_supports_heartbeat?() click to toggle source
    # File lib/mcollective/connector/rabbitmq.rb
239 def stomp_version_supports_heartbeat?
240   return Util.versioncmp(stomp_version, "1.2.10") >= 0
241 end
subscribe(agent, type, collective) click to toggle source

Subscribe to a topic or queue

    # File lib/mcollective/connector/rabbitmq.rb
434 def subscribe(agent, type, collective)
435   if type == :reply
436     # On rabbitmq if you send a message with a reply-to: header set to
437     # '/temp-queue/*' it automatically creates a private queue, munges
438     # the reply-to: header to point to this private queue, and
439     # subscribes you to it.  As such you should never attempt to
440     # SUBSCRIBE or UNSUBSCRIBE to '/temp-queue/*' directly as that'll
441     # cause great pain and suffering.
442     # https://www.rabbitmq.com/stomp.html#d.tqd
443 
444     # The exception to this is in 'use_reply_exchange' mode, when the
445     # reply-to will be set to a queue in an explicit exchange.
446     if !get_bool_option("rabbitmq.use_reply_exchange", false)
447       # We aren't in 'use_reply_exchange' mode, don't subscribe.
448       return
449     end
450   end
451 
452   source = make_target(agent, type, collective)
453 
454   unless @subscriptions.include?(source[:id])
455     Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}")
456     @connection.subscribe(source[:name], source[:headers], source[:id])
457     @subscriptions << source[:id]
458   end
459 rescue ::Stomp::Error::DuplicateSubscription
460   Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring")
461 end
target_for(msg, node=nil) click to toggle source
    # File lib/mcollective/connector/rabbitmq.rb
365 def target_for(msg, node=nil)
366   if msg.type == :reply
367     target = {:name => msg.request.headers["reply-to"], :headers => {}, :id => ""}
368 
369   elsif [:request, :direct_request].include?(msg.type)
370     target = make_target(msg.agent, msg.type, msg.collective, msg.reply_to, node)
371 
372   else
373     raise "Don't now how to create a target for message type #{msg.type}"
374 
375   end
376 
377   # marks messages as valid for ttl + 10 seconds, we do this here
378   # rather than in make_target as this should only be set on publish
379   target[:headers]["expiration"] = ((msg.ttl + 10) * 1000).to_s
380 
381   target[:headers]["mc_sender"] = Config.instance.identity
382 
383   return target
384 end
unsubscribe(agent, type, collective) click to toggle source

Subscribe to a topic or queue

    # File lib/mcollective/connector/rabbitmq.rb
464 def unsubscribe(agent, type, collective)
465   if type == :reply
466     # For a more detailed discussion of this logic, please see #subscribe
467     if !get_bool_option("rabbitmq.use_reply_exchange", false)
468       # We shouldn't try to unsubscribe from a '/temp-queue/*' queue.
469       return
470     end
471   end
472 
473   source = make_target(agent, type, collective)
474 
475   Log.debug("Unsubscribing from #{source[:name]}")
476   @connection.unsubscribe(source[:name], source[:headers], source[:id])
477   @subscriptions.delete(source[:id])
478 end