class MCollective::Connector::Activemq
Handles sending and receiving messages over the Stomp
protocol for ActiveMQ servers specifically, we take advantages of ActiveMQ specific features and enhancements to the Stomp
protocol. For best results in a clustered environment use ActiveMQ 5.5.0 at least.
This plugin takes an entirely different approach to dealing with ActiveMQ from the more generic stomp connector.
- Agents use /topic/<collective>.<agent>.agent - Replies use temp-topics so they are private and transient. - Point to Point messages using topics are supported by subscribing to /queue/<collective>.nodes with a selector "mc_identity = 'identity'
The use of temp-topics for the replies is a huge improvement over the old style. In the old way all clients got replies for all clients that were active at that time, this would mean that they would need to decrypt, validate etc in order to determine if they need to ignore the message, this was computationally expensive and on large busy networks the messages were being sent all over the show cross broker boundaries.
The new way means the messages go point2point back to only whoever requested the message, they only get their own replies and this is ap private channel that casual observers cannot just snoop into.
This plugin supports 1.1.6 and newer of the Stomp
rubygem.
connector = activemq plugin.activemq.pool.size = 2 plugin.activemq.pool.1.host = stomp1.your.net plugin.activemq.pool.1.port = 61613 plugin.activemq.pool.1.user = you plugin.activemq.pool.1.password = secret plugin.activemq.pool.1.ssl = true plugin.activemq.pool.1.ssl.cert = /path/to/your.cert plugin.activemq.pool.1.ssl.key = /path/to/your.key plugin.activemq.pool.1.ssl.ca = /path/to/your.ca plugin.activemq.pool.1.ssl.fallback = true plugin.activemq.pool.1.ssl.ciphers = TLSv1:!MD5:!LOW:!EXPORT plugin.activemq.pool.2.host = stomp2.your.net plugin.activemq.pool.2.port = 61613 plugin.activemq.pool.2.user = you plugin.activemq.pool.2.password = secret plugin.activemq.pool.2.ssl = false
Using this method you can supply just STOMP_USER and STOMP_PASSWORD. The port will default to 61613 if not specified.
The ssl options are only usable in version of the Stomp
gem newer than 1.2.2 where these will imply full SSL
validation will be done and you'll only be able to connect to a ActiveMQ server that has a cert signed by the same CA. If you only set ssl = true and do not supply the cert, key and ca properties or if you have an older gem it will fall back to unverified mode only if ssl.fallback is true
In addition you can set the following options for the rubygem:
plugin.activemq.initial_reconnect_delay = 0.01 plugin.activemq.max_reconnect_delay = 30.0 plugin.activemq.use_exponential_back_off = true plugin.activemq.back_off_multiplier = 2 plugin.activemq.max_reconnect_attempts = 0 plugin.activemq.randomize = false plugin.activemq.timeout = -1
You can set the initial connetion timeout - this is when your stomp server is simply unreachable - after which it would failover to the next in the pool:
plugin.activemq.connect_timeout = 30
ActiveMQ JMS message priorities can be set:
plugin.activemq.priority = 4
This plugin supports Stomp
protocol 1.1 when combined with the stomp gem version 1.2.10 or newer. To enable network heartbeats which will help keep the connection alive over NAT connections and aggresive session tracking firewalls you can set:
plugin.activemq.heartbeat_interval = 30
which will cause a heartbeat to be sent on 30 second intervals and one to be expected from the broker every 30 seconds. The shortest supported period is 30 seconds, if you set it lower it will get forced to 30 seconds.
After 2 failures to receive a heartbeat the connection will be reset via the normal failover mechanism.
By default if heartbeat_interval is set it will request Stomp
1.1 but support fallback to 1.0, but you can enable strict Stomp
1.1 only operation
plugin.activemq.stomp_1_0_fallback = 0
Attributes
Public Class Methods
# File lib/mcollective/connector/activemq.rb 200 def initialize 201 @config = Config.instance 202 @subscriptions = [] 203 @msgpriority = 0 204 @base64 = false 205 @use_exponential_back_off = get_bool_option("activemq.use_exponential_back_off", "true") 206 @initial_reconnect_delay = Float(get_option("activemq.initial_reconnect_delay", 0.01)) 207 @back_off_multiplier = Integer(get_option("activemq.back_off_multiplier", 2)) 208 @max_reconnect_delay = Float(get_option("activemq.max_reconnect_delay", 30.0)) 209 @reconnect_delay = @initial_reconnect_delay 210 211 Log.info("ActiveMQ connector initialized. Using stomp-gem #{stomp_version}") 212 end
Public Instance Methods
Connects to the ActiveMQ middleware
# File lib/mcollective/connector/activemq.rb 215 def connect(connector = ::Stomp::Connection) 216 if @connection 217 Log.debug("Already connection, not re-initializing connection") 218 return 219 end 220 221 begin 222 @base64 = get_bool_option("activemq.base64", "false") 223 @msgpriority = get_option("activemq.priority", 0).to_i 224 225 pools = Integer(get_option("activemq.pool.size")) 226 hosts = [] 227 middleware_user = '' 228 middleware_password = '' 229 prompt_for_username = get_bool_option("activemq.prompt_user", "false") 230 prompt_for_password = get_bool_option("activemq.prompt_password", "false") 231 232 if prompt_for_username 233 Log.debug("No previous user exists and activemq.prompt-user is set to true") 234 print "Please enter user to connect to middleware: " 235 middleware_user = STDIN.gets.chomp 236 end 237 238 if prompt_for_password 239 Log.debug("No previous password exists and activemq.prompt-password is set to true") 240 middleware_password = MCollective::Util.get_hidden_input("Please enter password: ") 241 print "\n" 242 end 243 244 1.upto(pools) do |poolnum| 245 host = {} 246 247 host[:host] = get_option("activemq.pool.#{poolnum}.host") 248 host[:port] = get_option("activemq.pool.#{poolnum}.port", 61613).to_i 249 host[:ssl] = get_bool_option("activemq.pool.#{poolnum}.ssl", "false") 250 251 # read user from config file 252 host[:login] = get_env_or_option("STOMP_USER", "activemq.pool.#{poolnum}.user", middleware_user) 253 if prompt_for_username and host[:login] != middleware_user 254 Log.info("Using #{host[:login]} from config file to connect to #{host[:host]}. "+ 255 "plugin.activemq.prompt_user should be set to false to remove the prompt.") 256 end 257 258 # read user from config file 259 host[:passcode] = get_env_or_option("STOMP_PASSWORD", "activemq.pool.#{poolnum}.password", middleware_password) 260 if prompt_for_password and host[:passcode] != middleware_password 261 Log.info("Using password from config file to connect to #{host[:host]}. "+ 262 "plugin.activemq.prompt_password should be set to false to remove the prompt.") 263 end 264 265 # if ssl is enabled set :ssl to the hash of parameters 266 if host[:ssl] 267 host[:ssl] = ssl_parameters(poolnum, get_bool_option("activemq.pool.#{poolnum}.ssl.fallback", "false")) 268 end 269 270 Log.debug("Adding #{host[:host]}:#{host[:port]} to the connection pool") 271 hosts << host 272 end 273 274 raise "No hosts found for the ActiveMQ connection pool" if hosts.size == 0 275 276 connection = {:hosts => hosts} 277 278 # Various STOMP gem options, defaults here matches defaults for 1.1.6 the meaning of 279 # these can be guessed, the documentation isn't clear 280 connection[:use_exponential_back_off] = @use_exponential_back_off 281 connection[:initial_reconnect_delay] = @initial_reconnect_delay 282 connection[:back_off_multiplier] = @back_off_multiplier 283 connection[:max_reconnect_delay] = @max_reconnect_delay 284 connection[:max_reconnect_attempts] = Integer(get_option("activemq.max_reconnect_attempts", 0)) 285 connection[:randomize] = get_bool_option("activemq.randomize", "false") 286 connection[:backup] = get_bool_option("activemq.backup", "false") 287 connection[:timeout] = Integer(get_option("activemq.timeout", -1)) 288 connection[:connect_timeout] = Integer(get_option("activemq.connect_timeout", 30)) 289 connection[:reliable] = true 290 connection[:connect_headers] = connection_headers 291 connection[:max_hbrlck_fails] = Integer(get_option("activemq.max_hbrlck_fails", 0)) 292 connection[:max_hbread_fails] = Integer(get_option("activemq.max_hbread_fails", 2)) 293 294 connection[:logger] = EventLogger.new 295 296 @connection = connector.new(connection) 297 298 rescue ClientTimeoutError => e 299 raise e 300 rescue Exception => e 301 raise("Could not connect to ActiveMQ Server: #{e}") 302 end 303 end
# File lib/mcollective/connector/activemq.rb 313 def connection_headers 314 headers = {:"accept-version" => "1.0"} 315 316 heartbeat_interval = Integer(get_option("activemq.heartbeat_interval", 0)) 317 stomp_1_0_fallback = get_bool_option("activemq.stomp_1_0_fallback", true) 318 319 headers[:host] = get_option("activemq.vhost", "mcollective") 320 321 if heartbeat_interval > 0 322 unless stomp_version_supports_heartbeat? 323 raise("Setting STOMP 1.1 properties like heartbeat intervals require at least version 1.2.10 of the STOMP gem") 324 end 325 326 if heartbeat_interval < 30 327 Log.warn("Connection heartbeat is set to %d, forcing to minimum value of 30s") 328 heartbeat_interval = 30 329 end 330 331 heartbeat_interval = heartbeat_interval * 1000 332 headers[:"heart-beat"] = "%d,%d" % [heartbeat_interval + 500, heartbeat_interval - 500] 333 334 if stomp_1_0_fallback 335 headers[:"accept-version"] = "1.1,1.0" 336 else 337 headers[:"accept-version"] = "1.1" 338 end 339 else 340 if stomp_version_supports_heartbeat? 341 Log.info("Connecting without STOMP 1.1 heartbeats, if you are using ActiveMQ 5.8 or newer consider setting plugin.activemq.heartbeat_interval") 342 end 343 end 344 345 headers 346 end
Disconnects from the ActiveMQ connection
# File lib/mcollective/connector/activemq.rb 506 def disconnect 507 Log.debug("Disconnecting from ActiveMQ") 508 @connection.disconnect 509 @connection = nil 510 end
Calculate the exponential backoff needed
# File lib/mcollective/connector/activemq.rb 399 def exponential_back_off 400 if !@use_exponential_back_off 401 return nil 402 end 403 404 backoff = @reconnect_delay 405 406 # calculate next delay 407 @reconnect_delay = @reconnect_delay * @back_off_multiplier 408 409 # cap at max reconnect delay 410 if @reconnect_delay > @max_reconnect_delay 411 @reconnect_delay = @max_reconnect_delay 412 end 413 414 return backoff 415 end
looks up a boolean value in the config
# File lib/mcollective/connector/activemq.rb 602 def get_bool_option(val, default) 603 Util.str_to_bool(@config.pluginconf.fetch(val, default)) 604 end
Returns the name of the certficate file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_CERT exists, where X is the ActiveMQ pool number. If the environment variable doesn't exist, it will try and load the value from the config.
# File lib/mcollective/connector/activemq.rb 394 def get_cert_file(poolnum) 395 ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_CERT" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.cert", false) 396 end
looks in the environment first then in the config file for a specific option, accepts an optional default.
raises an exception when it cant find a value anywhere
# File lib/mcollective/connector/activemq.rb 583 def get_env_or_option(env, opt, default=nil) 584 return ENV[env] if ENV.include?(env) 585 return @config.pluginconf[opt] if @config.pluginconf.include?(opt) 586 return default if default 587 588 raise("No #{env} environment or plugin.#{opt} configuration option given") 589 end
Returns the name of the private key file used by ActiveMQ Will first check if an environment variable MCOLLECTIVE_ACTIVEMQ_POOLX_SSL_KEY exists, where X is the ActiveMQ pool number. If the environment variable doesn't exist, it will try and load the value from the config.
# File lib/mcollective/connector/activemq.rb 386 def get_key_file(poolnum) 387 ENV["MCOLLECTIVE_ACTIVEMQ_POOL%s_SSL_KEY" % poolnum] || get_option("activemq.pool.#{poolnum}.ssl.key", false) 388 end
looks for a config option, accepts an optional default
raises an exception when it cant find a value anywhere
# File lib/mcollective/connector/activemq.rb 594 def get_option(opt, default=nil) 595 return @config.pluginconf[opt] if @config.pluginconf.include?(opt) 596 return default unless default.nil? 597 598 raise("No plugin.#{opt} configuration option given") 599 end
# File lib/mcollective/connector/activemq.rb 512 def headers_for(msg, identity=nil) 513 headers = {} 514 515 headers = {"priority" => @msgpriority} if @msgpriority > 0 516 517 headers["timestamp"] = (Time.now.utc.to_i * 1000).to_s 518 519 # set the expires header based on the TTL, we build a small additional 520 # timeout of 10 seconds in here to allow for network latency etc 521 headers["expires"] = ((Time.now.utc.to_i + msg.ttl + 10) * 1000).to_s 522 523 if [:request, :direct_request].include?(msg.type) 524 target = make_target(msg.agent, :reply, msg.collective) 525 526 if msg.reply_to 527 headers["reply-to"] = msg.reply_to 528 else 529 headers["reply-to"] = target[:name] 530 end 531 532 headers["mc_identity"] = identity if msg.type == :direct_request 533 end 534 535 headers["mc_sender"] = Config.instance.identity 536 537 return headers 538 end
# File lib/mcollective/connector/activemq.rb 540 def make_target(agent, type, collective) 541 raise("Unknown target type #{type}") unless [:directed, :broadcast, :reply, :request, :direct_request].include?(type) 542 raise("Unknown collective '#{collective}' known collectives are '#{@config.collectives.join ', '}'") unless @config.collectives.include?(collective) 543 544 agents_multiplex = get_bool_option("activemq.agents_multiplex", "false") 545 target = {:name => nil, :headers => {}} 546 547 case type 548 when :reply 549 target[:name] = ["/queue/" + collective, :reply, "#{Config.instance.identity}_#{$$}", Client.request_sequence].join(".") 550 551 when :broadcast 552 if agents_multiplex 553 target[:name] = ["/topic/" + collective, :agents].join(".") 554 else 555 target[:name] = ["/topic/" + collective, agent, :agent].join(".") 556 end 557 558 when :request 559 if agents_multiplex 560 target[:name] = ["/topic/" + collective, :agents].join(".") 561 else 562 target[:name] = ["/topic/" + collective, agent, :agent].join(".") 563 end 564 565 when :direct_request 566 target[:name] = ["/queue/" + collective, :nodes].join(".") 567 568 when :directed 569 target[:name] = ["/queue/" + collective, :nodes].join(".") 570 target[:headers]["selector"] = "mc_identity = '#{@config.identity}'" 571 target[:id] = "%s_directed_to_identity" % collective 572 end 573 574 target[:id] = target[:name] unless target[:id] 575 576 target 577 end
Sends a message to the ActiveMQ connection
# File lib/mcollective/connector/activemq.rb 449 def publish(msg) 450 msg.base64_encode! if @base64 451 452 target = target_for(msg) 453 454 if msg.type == :direct_request 455 msg.discovered_hosts.each do |node| 456 target[:headers] = headers_for(msg, node) 457 458 Log.debug("Sending a direct message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") 459 460 @connection.publish(target[:name], msg.payload, target[:headers]) 461 end 462 else 463 target[:headers].merge!(headers_for(msg)) 464 465 Log.debug("Sending a broadcast message to ActiveMQ target '#{target[:name]}' with headers '#{target[:headers].inspect}'") 466 467 @connection.publish(target[:name], msg.payload, target[:headers]) 468 end 469 end
Receives a message from the ActiveMQ connection
# File lib/mcollective/connector/activemq.rb 418 def receive 419 Log.debug("Waiting for a message from ActiveMQ") 420 421 # When the Stomp library > 1.2.0 is mid reconnecting due to its reliable connection 422 # handling it sets the connection to closed. If we happen to be receiving at just 423 # that time we will get an exception warning about the closed connection so handling 424 # that here with a sleep and a retry. 425 begin 426 msg = @connection.receive 427 rescue ::Stomp::Error::NoCurrentConnection 428 sleep 1 429 retry 430 end 431 432 # In older stomp gems an attempt to receive after failed authentication can return nil 433 if msg.nil? 434 raise MessageNotReceived.new(exponential_back_off), "No message received from ActiveMQ." 435 436 end 437 438 # We expect all messages we get to be of STOMP frame type MESSAGE, raise on unexpected types 439 if msg.command != 'MESSAGE' 440 Log.warn("Unexpected '#{msg.command}' frame. Headers: #{msg.headers.inspect} Body: #{msg.body.inspect}") 441 raise UnexpectedMessageType.new(exponential_back_off), 442 "Received frame of type '#{msg.command}' expected 'MESSAGE'" 443 end 444 445 Message.new(msg.body, msg, :base64 => @base64, :headers => msg.headers) 446 end
Sets the SSL
paramaters for a specific connection
# File lib/mcollective/connector/activemq.rb 349 def ssl_parameters(poolnum, fallback) 350 params = { 351 :cert_file => get_cert_file(poolnum), 352 :key_file => get_key_file(poolnum), 353 :ts_files => get_option("activemq.pool.#{poolnum}.ssl.ca", false), 354 :ciphers => get_option("activemq.pool.#{poolnum}.ssl.ciphers", false), 355 } 356 357 raise "cert, key and ca has to be supplied for verified SSL mode" unless params[:cert_file] && params[:key_file] && params[:ts_files] 358 359 raise "Cannot find certificate file #{params[:cert_file]}" unless File.exist?(params[:cert_file]) 360 raise "Cannot find key file #{params[:key_file]}" unless File.exist?(params[:key_file]) 361 362 params[:ts_files].split(",").each do |ca| 363 raise "Cannot find CA file #{ca}" unless File.exist?(ca) 364 end 365 366 begin 367 ::Stomp::SSLParams.new(params) 368 rescue NameError 369 raise "Stomp gem >= 1.2.2 is needed" 370 end 371 372 rescue Exception => e 373 if fallback 374 Log.warn("Failed to set full SSL verified mode, falling back to unverified: #{e.class}: #{e}") 375 return true 376 else 377 Log.error("Failed to set full SSL verified mode: #{e.class}: #{e}") 378 raise(e) 379 end 380 end
# File lib/mcollective/connector/activemq.rb 305 def stomp_version 306 ::Stomp::Version::STRING 307 end
# File lib/mcollective/connector/activemq.rb 309 def stomp_version_supports_heartbeat? 310 return Util.versioncmp(stomp_version, "1.2.10") >= 0 311 end
Subscribe to a topic or queue
# File lib/mcollective/connector/activemq.rb 472 def subscribe(agent, type, collective) 473 source = make_target(agent, type, collective) 474 475 unless @subscriptions.include?(source[:id]) 476 Log.debug("Subscribing to #{source[:name]} with headers #{source[:headers].inspect.chomp}") 477 @connection.subscribe(source[:name], source[:headers], source[:id]) 478 @subscriptions << source[:id] 479 end 480 rescue ::Stomp::Error::DuplicateSubscription 481 Log.error("Received subscription request for #{source.inspect.chomp} but already had a matching subscription, ignoring") 482 end
# File lib/mcollective/connector/activemq.rb 493 def target_for(msg) 494 if msg.type == :reply 495 target = {:name => msg.request.headers["reply-to"], :headers => {}} 496 elsif [:request, :direct_request].include?(msg.type) 497 target = make_target(msg.agent, msg.type, msg.collective) 498 else 499 raise "Don't now how to create a target for message type #{msg.type}" 500 end 501 502 return target 503 end
UnSubscribe to a topic or queue
# File lib/mcollective/connector/activemq.rb 485 def unsubscribe(agent, type, collective) 486 source = make_target(agent, type, collective) 487 488 Log.debug("Unsubscribing from #{source[:name]}") 489 @connection.unsubscribe(source[:name], source[:headers], source[:id]) 490 @subscriptions.delete(source[:id]) 491 end