001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.CopyOnWriteArrayList; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.LinkedBlockingQueue; 032import java.util.concurrent.RejectedExecutionHandler; 033import java.util.concurrent.ThreadFactory; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicInteger; 038 039import javax.jms.Connection; 040import javax.jms.ConnectionConsumer; 041import javax.jms.ConnectionMetaData; 042import javax.jms.Destination; 043import javax.jms.ExceptionListener; 044import javax.jms.IllegalStateException; 045import javax.jms.InvalidDestinationException; 046import javax.jms.JMSException; 047import javax.jms.Queue; 048import javax.jms.QueueConnection; 049import javax.jms.QueueSession; 050import javax.jms.ServerSessionPool; 051import javax.jms.Session; 052import javax.jms.Topic; 053import javax.jms.TopicConnection; 054import javax.jms.TopicSession; 055import javax.jms.XAConnection; 056 057import org.apache.activemq.advisory.DestinationSource; 058import org.apache.activemq.blob.BlobTransferPolicy; 059import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; 060import org.apache.activemq.command.ActiveMQDestination; 061import org.apache.activemq.command.ActiveMQMessage; 062import org.apache.activemq.command.ActiveMQTempDestination; 063import org.apache.activemq.command.ActiveMQTempQueue; 064import org.apache.activemq.command.ActiveMQTempTopic; 065import org.apache.activemq.command.BrokerInfo; 066import org.apache.activemq.command.Command; 067import org.apache.activemq.command.CommandTypes; 068import org.apache.activemq.command.ConnectionControl; 069import org.apache.activemq.command.ConnectionError; 070import org.apache.activemq.command.ConnectionId; 071import org.apache.activemq.command.ConnectionInfo; 072import org.apache.activemq.command.ConsumerControl; 073import org.apache.activemq.command.ConsumerId; 074import org.apache.activemq.command.ConsumerInfo; 075import org.apache.activemq.command.ControlCommand; 076import org.apache.activemq.command.DestinationInfo; 077import org.apache.activemq.command.ExceptionResponse; 078import org.apache.activemq.command.Message; 079import org.apache.activemq.command.MessageDispatch; 080import org.apache.activemq.command.MessageId; 081import org.apache.activemq.command.ProducerAck; 082import org.apache.activemq.command.ProducerId; 083import org.apache.activemq.command.RemoveInfo; 084import org.apache.activemq.command.RemoveSubscriptionInfo; 085import org.apache.activemq.command.Response; 086import org.apache.activemq.command.SessionId; 087import org.apache.activemq.command.ShutdownInfo; 088import org.apache.activemq.command.WireFormatInfo; 089import org.apache.activemq.management.JMSConnectionStatsImpl; 090import org.apache.activemq.management.JMSStatsImpl; 091import org.apache.activemq.management.StatsCapable; 092import org.apache.activemq.management.StatsImpl; 093import org.apache.activemq.state.CommandVisitorAdapter; 094import org.apache.activemq.thread.Scheduler; 095import org.apache.activemq.thread.TaskRunnerFactory; 096import org.apache.activemq.transport.FutureResponse; 097import org.apache.activemq.transport.RequestTimedOutIOException; 098import org.apache.activemq.transport.ResponseCallback; 099import org.apache.activemq.transport.Transport; 100import org.apache.activemq.transport.TransportListener; 101import org.apache.activemq.transport.failover.FailoverTransport; 102import org.apache.activemq.util.IdGenerator; 103import org.apache.activemq.util.IntrospectionSupport; 104import org.apache.activemq.util.JMSExceptionSupport; 105import org.apache.activemq.util.LongSequenceGenerator; 106import org.apache.activemq.util.ServiceSupport; 107import org.apache.activemq.util.ThreadPoolUtils; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110 111public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection { 112 113 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; 114 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 115 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 116 public static int DEFAULT_THREAD_POOL_SIZE = 1000; 117 118 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class); 119 120 public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>(); 121 122 protected boolean dispatchAsync=true; 123 protected boolean alwaysSessionAsync = true; 124 125 private TaskRunnerFactory sessionTaskRunner; 126 private final ThreadPoolExecutor executor; 127 128 // Connection state variables 129 private final ConnectionInfo info; 130 private ExceptionListener exceptionListener; 131 private ClientInternalExceptionListener clientInternalExceptionListener; 132 private boolean clientIDSet; 133 private boolean isConnectionInfoSentToBroker; 134 private boolean userSpecifiedClientID; 135 136 // Configuration options variables 137 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 138 private BlobTransferPolicy blobTransferPolicy; 139 private RedeliveryPolicyMap redeliveryPolicyMap; 140 private MessageTransformer transformer; 141 142 private boolean disableTimeStampsByDefault; 143 private boolean optimizedMessageDispatch = true; 144 private boolean copyMessageOnSend = true; 145 private boolean useCompression; 146 private boolean objectMessageSerializationDefered; 147 private boolean useAsyncSend; 148 private boolean optimizeAcknowledge; 149 private long optimizeAcknowledgeTimeOut = 0; 150 private long optimizedAckScheduledAckInterval = 0; 151 private boolean nestedMapAndListEnabled = true; 152 private boolean useRetroactiveConsumer; 153 private boolean exclusiveConsumer; 154 private boolean alwaysSyncSend; 155 private int closeTimeout = 15000; 156 private boolean watchTopicAdvisories = true; 157 private long warnAboutUnstartedConnectionTimeout = 500L; 158 private int sendTimeout =0; 159 private boolean sendAcksAsync=true; 160 private boolean checkForDuplicates = true; 161 private boolean queueOnlyConnection = false; 162 private boolean consumerExpiryCheckEnabled = true; 163 164 private final Transport transport; 165 private final IdGenerator clientIdGenerator; 166 private final JMSStatsImpl factoryStats; 167 private final JMSConnectionStatsImpl stats; 168 169 private final AtomicBoolean started = new AtomicBoolean(false); 170 private final AtomicBoolean closing = new AtomicBoolean(false); 171 private final AtomicBoolean closed = new AtomicBoolean(false); 172 private final AtomicBoolean transportFailed = new AtomicBoolean(false); 173 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); 174 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); 175 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); 176 177 // Maps ConsumerIds to ActiveMQConsumer objects 178 private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); 179 private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>(); 180 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 181 private final SessionId connectionSessionId; 182 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 183 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); 184 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); 185 186 private AdvisoryConsumer advisoryConsumer; 187 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); 188 private BrokerInfo brokerInfo; 189 private IOException firstFailureError; 190 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; 191 192 // Assume that protocol is the latest. Change to the actual protocol 193 // version when a WireFormatInfo is received. 194 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 195 private final long timeCreated; 196 private final ConnectionAudit connectionAudit = new ConnectionAudit(); 197 private DestinationSource destinationSource; 198 private final Object ensureConnectionInfoSentMutex = new Object(); 199 private boolean useDedicatedTaskRunner; 200 protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0); 201 private long consumerFailoverRedeliveryWaitPeriod; 202 private Scheduler scheduler; 203 private boolean messagePrioritySupported = false; 204 private boolean transactedIndividualAck = false; 205 private boolean nonBlockingRedelivery = false; 206 private boolean rmIdFromConnectionId = false; 207 208 private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE; 209 private RejectedExecutionHandler rejectedTaskHandler = null; 210 211 private List<String> trustedPackages = new ArrayList<String>(); 212 private boolean trustAllPackages = false; 213 private int connectResponseTimeout; 214 215 /** 216 * Construct an <code>ActiveMQConnection</code> 217 * 218 * @param transport 219 * @param factoryStats 220 * @throws Exception 221 */ 222 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { 223 224 this.transport = transport; 225 this.clientIdGenerator = clientIdGenerator; 226 this.factoryStats = factoryStats; 227 228 // Configure a single threaded executor who's core thread can timeout if 229 // idle 230 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 231 @Override 232 public Thread newThread(Runnable r) { 233 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); 234 //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796 235 //thread.setDaemon(true); 236 return thread; 237 } 238 }); 239 // asyncConnectionThread.allowCoreThreadTimeOut(true); 240 String uniqueId = connectionIdGenerator.generateId(); 241 this.info = new ConnectionInfo(new ConnectionId(uniqueId)); 242 this.info.setManageable(true); 243 this.info.setFaultTolerant(transport.isFaultTolerant()); 244 this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 245 246 this.transport.setTransportListener(this); 247 248 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); 249 this.factoryStats.addConnection(this); 250 this.timeCreated = System.currentTimeMillis(); 251 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); 252 } 253 254 protected void setUserName(String userName) { 255 this.info.setUserName(userName); 256 } 257 258 protected void setPassword(String password) { 259 this.info.setPassword(password); 260 } 261 262 /** 263 * A static helper method to create a new connection 264 * 265 * @return an ActiveMQConnection 266 * @throws JMSException 267 */ 268 public static ActiveMQConnection makeConnection() throws JMSException { 269 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 270 return (ActiveMQConnection)factory.createConnection(); 271 } 272 273 /** 274 * A static helper method to create a new connection 275 * 276 * @param uri 277 * @return and ActiveMQConnection 278 * @throws JMSException 279 */ 280 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException { 281 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 282 return (ActiveMQConnection)factory.createConnection(); 283 } 284 285 /** 286 * A static helper method to create a new connection 287 * 288 * @param user 289 * @param password 290 * @param uri 291 * @return an ActiveMQConnection 292 * @throws JMSException 293 */ 294 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException { 295 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri)); 296 return (ActiveMQConnection)factory.createConnection(); 297 } 298 299 /** 300 * @return a number unique for this connection 301 */ 302 public JMSConnectionStatsImpl getConnectionStats() { 303 return stats; 304 } 305 306 /** 307 * Creates a <CODE>Session</CODE> object. 308 * 309 * @param transacted indicates whether the session is transacted 310 * @param acknowledgeMode indicates whether the consumer or the client will 311 * acknowledge any messages it receives; ignored if the 312 * session is transacted. Legal values are 313 * <code>Session.AUTO_ACKNOWLEDGE</code>, 314 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 315 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 316 * @return a newly created session 317 * @throws JMSException if the <CODE>Connection</CODE> object fails to 318 * create a session due to some internal error or lack of 319 * support for the specific transaction and acknowledgement 320 * mode. 321 * @see Session#AUTO_ACKNOWLEDGE 322 * @see Session#CLIENT_ACKNOWLEDGE 323 * @see Session#DUPS_OK_ACKNOWLEDGE 324 * @since 1.1 325 */ 326 @Override 327 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 328 checkClosedOrFailed(); 329 ensureConnectionInfoSent(); 330 if (!transacted) { 331 if (acknowledgeMode == Session.SESSION_TRANSACTED) { 332 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); 333 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { 334 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + 335 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); 336 } 337 } 338 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), isAlwaysSessionAsync()); 339 } 340 341 /** 342 * @return sessionId 343 */ 344 protected SessionId getNextSessionId() { 345 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId()); 346 } 347 348 /** 349 * Gets the client identifier for this connection. 350 * <P> 351 * This value is specific to the JMS provider. It is either preconfigured by 352 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned 353 * dynamically by the application by calling the <code>setClientID</code> 354 * method. 355 * 356 * @return the unique client identifier 357 * @throws JMSException if the JMS provider fails to return the client ID 358 * for this connection due to some internal error. 359 */ 360 @Override 361 public String getClientID() throws JMSException { 362 checkClosedOrFailed(); 363 return this.info.getClientId(); 364 } 365 366 /** 367 * Sets the client identifier for this connection. 368 * <P> 369 * The preferred way to assign a JMS client's client identifier is for it to 370 * be configured in a client-specific <CODE>ConnectionFactory</CODE> 371 * object and transparently assigned to the <CODE>Connection</CODE> object 372 * it creates. 373 * <P> 374 * Alternatively, a client can set a connection's client identifier using a 375 * provider-specific value. The facility to set a connection's client 376 * identifier explicitly is not a mechanism for overriding the identifier 377 * that has been administratively configured. It is provided for the case 378 * where no administratively specified identifier exists. If one does exist, 379 * an attempt to change it by setting it must throw an 380 * <CODE>IllegalStateException</CODE>. If a client sets the client 381 * identifier explicitly, it must do so immediately after it creates the 382 * connection and before any other action on the connection is taken. After 383 * this point, setting the client identifier is a programming error that 384 * should throw an <CODE>IllegalStateException</CODE>. 385 * <P> 386 * The purpose of the client identifier is to associate a connection and its 387 * objects with a state maintained on behalf of the client by a provider. 388 * The only such state identified by the JMS API is that required to support 389 * durable subscriptions. 390 * <P> 391 * If another connection with the same <code>clientID</code> is already 392 * running when this method is called, the JMS provider should detect the 393 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>. 394 * 395 * @param newClientID the unique client identifier 396 * @throws JMSException if the JMS provider fails to set the client ID for 397 * this connection due to some internal error. 398 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an 399 * invalid or duplicate client ID. 400 * @throws javax.jms.IllegalStateException if the JMS client attempts to set 401 * a connection's client ID at the wrong time or when it has 402 * been administratively configured. 403 */ 404 @Override 405 public void setClientID(String newClientID) throws JMSException { 406 checkClosedOrFailed(); 407 408 if (this.clientIDSet) { 409 throw new IllegalStateException("The clientID has already been set"); 410 } 411 412 if (this.isConnectionInfoSentToBroker) { 413 throw new IllegalStateException("Setting clientID on a used Connection is not allowed"); 414 } 415 416 this.info.setClientId(newClientID); 417 this.userSpecifiedClientID = true; 418 ensureConnectionInfoSent(); 419 } 420 421 /** 422 * Sets the default client id that the connection will use if explicitly not 423 * set with the setClientId() call. 424 */ 425 public void setDefaultClientID(String clientID) throws JMSException { 426 this.info.setClientId(clientID); 427 this.userSpecifiedClientID = true; 428 } 429 430 /** 431 * Gets the metadata for this connection. 432 * 433 * @return the connection metadata 434 * @throws JMSException if the JMS provider fails to get the connection 435 * metadata for this connection. 436 * @see javax.jms.ConnectionMetaData 437 */ 438 @Override 439 public ConnectionMetaData getMetaData() throws JMSException { 440 checkClosedOrFailed(); 441 return ActiveMQConnectionMetaData.INSTANCE; 442 } 443 444 /** 445 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not 446 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE> 447 * associated with it. 448 * 449 * @return the <CODE>ExceptionListener</CODE> for this connection, or 450 * null, if no <CODE>ExceptionListener</CODE> is associated with 451 * this connection. 452 * @throws JMSException if the JMS provider fails to get the 453 * <CODE>ExceptionListener</CODE> for this connection. 454 * @see javax.jms.Connection#setExceptionListener(ExceptionListener) 455 */ 456 @Override 457 public ExceptionListener getExceptionListener() throws JMSException { 458 checkClosedOrFailed(); 459 return this.exceptionListener; 460 } 461 462 /** 463 * Sets an exception listener for this connection. 464 * <P> 465 * If a JMS provider detects a serious problem with a connection, it informs 466 * the connection's <CODE> ExceptionListener</CODE>, if one has been 467 * registered. It does this by calling the listener's <CODE>onException 468 * </CODE> 469 * method, passing it a <CODE>JMSException</CODE> object describing the 470 * problem. 471 * <P> 472 * An exception listener allows a client to be notified of a problem 473 * asynchronously. Some connections only consume messages, so they would 474 * have no other way to learn their connection has failed. 475 * <P> 476 * A connection serializes execution of its <CODE>ExceptionListener</CODE>. 477 * <P> 478 * A JMS provider should attempt to resolve connection problems itself 479 * before it notifies the client of them. 480 * 481 * @param listener the exception listener 482 * @throws JMSException if the JMS provider fails to set the exception 483 * listener for this connection. 484 */ 485 @Override 486 public void setExceptionListener(ExceptionListener listener) throws JMSException { 487 checkClosedOrFailed(); 488 this.exceptionListener = listener; 489 } 490 491 /** 492 * Gets the <code>ClientInternalExceptionListener</code> object for this connection. 493 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE> 494 * associated with it. 495 * 496 * @return the listener or <code>null</code> if no listener is registered with the connection. 497 */ 498 public ClientInternalExceptionListener getClientInternalExceptionListener() { 499 return clientInternalExceptionListener; 500 } 501 502 /** 503 * Sets a client internal exception listener for this connection. 504 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components 505 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message. 506 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> 507 * describing the problem. 508 * 509 * @param listener the exception listener 510 */ 511 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) { 512 this.clientInternalExceptionListener = listener; 513 } 514 515 /** 516 * Starts (or restarts) a connection's delivery of incoming messages. A call 517 * to <CODE>start</CODE> on a connection that has already been started is 518 * ignored. 519 * 520 * @throws JMSException if the JMS provider fails to start message delivery 521 * due to some internal error. 522 * @see javax.jms.Connection#stop() 523 */ 524 @Override 525 public void start() throws JMSException { 526 checkClosedOrFailed(); 527 ensureConnectionInfoSent(); 528 if (started.compareAndSet(false, true)) { 529 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 530 ActiveMQSession session = i.next(); 531 session.start(); 532 } 533 } 534 } 535 536 /** 537 * Temporarily stops a connection's delivery of incoming messages. Delivery 538 * can be restarted using the connection's <CODE>start</CODE> method. When 539 * the connection is stopped, delivery to all the connection's message 540 * consumers is inhibited: synchronous receives block, and messages are not 541 * delivered to message listeners. 542 * <P> 543 * This call blocks until receives and/or message listeners in progress have 544 * completed. 545 * <P> 546 * Stopping a connection has no effect on its ability to send messages. A 547 * call to <CODE>stop</CODE> on a connection that has already been stopped 548 * is ignored. 549 * <P> 550 * A call to <CODE>stop</CODE> must not return until delivery of messages 551 * has paused. This means that a client can rely on the fact that none of 552 * its message listeners will be called and that all threads of control 553 * waiting for <CODE>receive</CODE> calls to return will not return with a 554 * message until the connection is restarted. The receive timers for a 555 * stopped connection continue to advance, so receives may time out while 556 * the connection is stopped. 557 * <P> 558 * If message listeners are running when <CODE>stop</CODE> is invoked, the 559 * <CODE>stop</CODE> call must wait until all of them have returned before 560 * it may return. While these message listeners are completing, they must 561 * have the full services of the connection available to them. 562 * 563 * @throws JMSException if the JMS provider fails to stop message delivery 564 * due to some internal error. 565 * @see javax.jms.Connection#start() 566 */ 567 @Override 568 public void stop() throws JMSException { 569 doStop(true); 570 } 571 572 /** 573 * @see #stop() 574 * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed, 575 * <tt>false</tt> to skip this check 576 * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error. 577 */ 578 void doStop(boolean checkClosed) throws JMSException { 579 if (checkClosed) { 580 checkClosedOrFailed(); 581 } 582 if (started.compareAndSet(true, false)) { 583 synchronized(sessions) { 584 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 585 ActiveMQSession s = i.next(); 586 s.stop(); 587 } 588 } 589 } 590 } 591 592 /** 593 * Closes the connection. 594 * <P> 595 * Since a provider typically allocates significant resources outside the 596 * JVM on behalf of a connection, clients should close these resources when 597 * they are not needed. Relying on garbage collection to eventually reclaim 598 * these resources may not be timely enough. 599 * <P> 600 * There is no need to close the sessions, producers, and consumers of a 601 * closed connection. 602 * <P> 603 * Closing a connection causes all temporary destinations to be deleted. 604 * <P> 605 * When this method is invoked, it should not return until message 606 * processing has been shut down in an orderly fashion. This means that all 607 * message listeners that may have been running have returned, and that all 608 * pending receives have returned. A close terminates all pending message 609 * receives on the connection's sessions' consumers. The receives may return 610 * with a message or with null, depending on whether there was a message 611 * available at the time of the close. If one or more of the connection's 612 * sessions' message listeners is processing a message at the time when 613 * connection <CODE>close</CODE> is invoked, all the facilities of the 614 * connection and its sessions must remain available to those listeners 615 * until they return control to the JMS provider. 616 * <P> 617 * Closing a connection causes any of its sessions' transactions in progress 618 * to be rolled back. In the case where a session's work is coordinated by 619 * an external transaction manager, a session's <CODE>commit</CODE> and 620 * <CODE> rollback</CODE> methods are not used and the result of a closed 621 * session's work is determined later by the transaction manager. Closing a 622 * connection does NOT force an acknowledgment of client-acknowledged 623 * sessions. 624 * <P> 625 * Invoking the <CODE>acknowledge</CODE> method of a received message from 626 * a closed connection's session must throw an 627 * <CODE>IllegalStateException</CODE>. Closing a closed connection must 628 * NOT throw an exception. 629 * 630 * @throws JMSException if the JMS provider fails to close the connection 631 * due to some internal error. For example, a failure to 632 * release resources or to close a socket connection can 633 * cause this exception to be thrown. 634 */ 635 @Override 636 public void close() throws JMSException { 637 try { 638 // If we were running, lets stop first. 639 if (!closed.get() && !transportFailed.get()) { 640 // do not fail if already closed as according to JMS spec we must not 641 // throw exception if already closed 642 doStop(false); 643 } 644 645 synchronized (this) { 646 if (!closed.get()) { 647 closing.set(true); 648 649 if (destinationSource != null) { 650 destinationSource.stop(); 651 destinationSource = null; 652 } 653 if (advisoryConsumer != null) { 654 advisoryConsumer.dispose(); 655 advisoryConsumer = null; 656 } 657 658 Scheduler scheduler = this.scheduler; 659 if (scheduler != null) { 660 try { 661 scheduler.stop(); 662 } catch (Exception e) { 663 JMSException ex = JMSExceptionSupport.create(e); 664 throw ex; 665 } 666 } 667 668 long lastDeliveredSequenceId = -1; 669 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 670 ActiveMQSession s = i.next(); 671 s.dispose(); 672 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId()); 673 } 674 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 675 ActiveMQConnectionConsumer c = i.next(); 676 c.dispose(); 677 } 678 679 this.activeTempDestinations.clear(); 680 681 try { 682 if (isConnectionInfoSentToBroker) { 683 // If we announced ourselves to the broker.. Try to let the broker 684 // know that the connection is being shutdown. 685 RemoveInfo removeCommand = info.createRemoveCommand(); 686 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 687 try { 688 syncSendPacket(removeCommand, closeTimeout); 689 } catch (JMSException e) { 690 if (e.getCause() instanceof RequestTimedOutIOException) { 691 // expected 692 } else { 693 throw e; 694 } 695 } 696 doAsyncSendPacket(new ShutdownInfo()); 697 } 698 } finally { // release anyway even if previous communication fails 699 started.set(false); 700 701 // TODO if we move the TaskRunnerFactory to the connection 702 // factory 703 // then we may need to call 704 // factory.onConnectionClose(this); 705 if (sessionTaskRunner != null) { 706 sessionTaskRunner.shutdown(); 707 } 708 closed.set(true); 709 closing.set(false); 710 } 711 } 712 } 713 } finally { 714 try { 715 if (executor != null) { 716 ThreadPoolUtils.shutdown(executor); 717 } 718 } catch (Throwable e) { 719 LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e); 720 } 721 722 ServiceSupport.dispose(this.transport); 723 724 factoryStats.removeConnection(this); 725 } 726 } 727 728 /** 729 * Tells the broker to terminate its VM. This can be used to cleanly 730 * terminate a broker running in a standalone java process. Server must have 731 * property enable.vm.shutdown=true defined to allow this to work. 732 */ 733 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet 734 // implemented. 735 /* 736 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand 737 * command = new BrokerAdminCommand(); 738 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM); 739 * asyncSendPacket(command); } 740 */ 741 742 /** 743 * Create a durable connection consumer for this connection (optional 744 * operation). This is an expert facility not used by regular JMS clients. 745 * 746 * @param topic topic to access 747 * @param subscriptionName durable subscription name 748 * @param messageSelector only messages with properties matching the message 749 * selector expression are delivered. A value of null or an 750 * empty string indicates that there is no message selector 751 * for the message consumer. 752 * @param sessionPool the server session pool to associate with this durable 753 * connection consumer 754 * @param maxMessages the maximum number of messages that can be assigned to 755 * a server session at one time 756 * @return the durable connection consumer 757 * @throws JMSException if the <CODE>Connection</CODE> object fails to 758 * create a connection consumer due to some internal error 759 * or invalid arguments for <CODE>sessionPool</CODE> and 760 * <CODE>messageSelector</CODE>. 761 * @throws javax.jms.InvalidDestinationException if an invalid destination 762 * is specified. 763 * @throws javax.jms.InvalidSelectorException if the message selector is 764 * invalid. 765 * @see javax.jms.ConnectionConsumer 766 * @since 1.1 767 */ 768 @Override 769 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) 770 throws JMSException { 771 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); 772 } 773 774 /** 775 * Create a durable connection consumer for this connection (optional 776 * operation). This is an expert facility not used by regular JMS clients. 777 * 778 * @param topic topic to access 779 * @param subscriptionName durable subscription name 780 * @param messageSelector only messages with properties matching the message 781 * selector expression are delivered. A value of null or an 782 * empty string indicates that there is no message selector 783 * for the message consumer. 784 * @param sessionPool the server session pool to associate with this durable 785 * connection consumer 786 * @param maxMessages the maximum number of messages that can be assigned to 787 * a server session at one time 788 * @param noLocal set true if you want to filter out messages published 789 * locally 790 * @return the durable connection consumer 791 * @throws JMSException if the <CODE>Connection</CODE> object fails to 792 * create a connection consumer due to some internal error 793 * or invalid arguments for <CODE>sessionPool</CODE> and 794 * <CODE>messageSelector</CODE>. 795 * @throws javax.jms.InvalidDestinationException if an invalid destination 796 * is specified. 797 * @throws javax.jms.InvalidSelectorException if the message selector is 798 * invalid. 799 * @see javax.jms.ConnectionConsumer 800 * @since 1.1 801 */ 802 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages, 803 boolean noLocal) throws JMSException { 804 checkClosedOrFailed(); 805 806 if (queueOnlyConnection) { 807 throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources."); 808 } 809 810 ensureConnectionInfoSent(); 811 SessionId sessionId = new SessionId(info.getConnectionId(), -1); 812 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId())); 813 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic)); 814 info.setSubscriptionName(subscriptionName); 815 info.setSelector(messageSelector); 816 info.setPrefetchSize(maxMessages); 817 info.setDispatchAsync(isDispatchAsync()); 818 819 // Allows the options on the destination to configure the consumerInfo 820 if (info.getDestination().getOptions() != null) { 821 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions()); 822 IntrospectionSupport.setProperties(this.info, options, "consumer."); 823 } 824 825 return new ActiveMQConnectionConsumer(this, sessionPool, info); 826 } 827 828 // Properties 829 // ------------------------------------------------------------------------- 830 831 /** 832 * Returns true if this connection has been started 833 * 834 * @return true if this Connection is started 835 */ 836 public boolean isStarted() { 837 return started.get(); 838 } 839 840 /** 841 * Returns true if the connection is closed 842 */ 843 public boolean isClosed() { 844 return closed.get(); 845 } 846 847 /** 848 * Returns true if the connection is in the process of being closed 849 */ 850 public boolean isClosing() { 851 return closing.get(); 852 } 853 854 /** 855 * Returns true if the underlying transport has failed 856 */ 857 public boolean isTransportFailed() { 858 return transportFailed.get(); 859 } 860 861 /** 862 * @return Returns the prefetchPolicy. 863 */ 864 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 865 return prefetchPolicy; 866 } 867 868 /** 869 * Sets the <a 870 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 871 * policy</a> for consumers created by this connection. 872 */ 873 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 874 this.prefetchPolicy = prefetchPolicy; 875 } 876 877 /** 878 */ 879 public Transport getTransportChannel() { 880 return transport; 881 } 882 883 /** 884 * @return Returns the clientID of the connection, forcing one to be 885 * generated if one has not yet been configured. 886 */ 887 public String getInitializedClientID() throws JMSException { 888 ensureConnectionInfoSent(); 889 return info.getClientId(); 890 } 891 892 /** 893 * @return Returns the timeStampsDisableByDefault. 894 */ 895 public boolean isDisableTimeStampsByDefault() { 896 return disableTimeStampsByDefault; 897 } 898 899 /** 900 * Sets whether or not timestamps on messages should be disabled or not. If 901 * you disable them it adds a small performance boost. 902 */ 903 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { 904 this.disableTimeStampsByDefault = timeStampsDisableByDefault; 905 } 906 907 /** 908 * @return Returns the dispatchOptimizedMessage. 909 */ 910 public boolean isOptimizedMessageDispatch() { 911 return optimizedMessageDispatch; 912 } 913 914 /** 915 * If this flag is set then an larger prefetch limit is used - only 916 * applicable for durable topic subscribers. 917 */ 918 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) { 919 this.optimizedMessageDispatch = dispatchOptimizedMessage; 920 } 921 922 /** 923 * @return Returns the closeTimeout. 924 */ 925 public int getCloseTimeout() { 926 return closeTimeout; 927 } 928 929 /** 930 * Sets the timeout before a close is considered complete. Normally a 931 * close() on a connection waits for confirmation from the broker; this 932 * allows that operation to timeout to save the client hanging if there is 933 * no broker 934 */ 935 public void setCloseTimeout(int closeTimeout) { 936 this.closeTimeout = closeTimeout; 937 } 938 939 /** 940 * @return ConnectionInfo 941 */ 942 public ConnectionInfo getConnectionInfo() { 943 return this.info; 944 } 945 946 public boolean isUseRetroactiveConsumer() { 947 return useRetroactiveConsumer; 948 } 949 950 /** 951 * Sets whether or not retroactive consumers are enabled. Retroactive 952 * consumers allow non-durable topic subscribers to receive old messages 953 * that were published before the non-durable subscriber started. 954 */ 955 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 956 this.useRetroactiveConsumer = useRetroactiveConsumer; 957 } 958 959 public boolean isNestedMapAndListEnabled() { 960 return nestedMapAndListEnabled; 961 } 962 963 /** 964 * Enables/disables whether or not Message properties and MapMessage entries 965 * support <a 966 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 967 * Structures</a> of Map and List objects 968 */ 969 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 970 this.nestedMapAndListEnabled = structuredMapsEnabled; 971 } 972 973 public boolean isExclusiveConsumer() { 974 return exclusiveConsumer; 975 } 976 977 /** 978 * Enables or disables whether or not queue consumers should be exclusive or 979 * not for example to preserve ordering when not using <a 980 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 981 * 982 * @param exclusiveConsumer 983 */ 984 public void setExclusiveConsumer(boolean exclusiveConsumer) { 985 this.exclusiveConsumer = exclusiveConsumer; 986 } 987 988 /** 989 * Adds a transport listener so that a client can be notified of events in 990 * the underlying transport 991 */ 992 public void addTransportListener(TransportListener transportListener) { 993 transportListeners.add(transportListener); 994 } 995 996 public void removeTransportListener(TransportListener transportListener) { 997 transportListeners.remove(transportListener); 998 } 999 1000 public boolean isUseDedicatedTaskRunner() { 1001 return useDedicatedTaskRunner; 1002 } 1003 1004 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1005 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1006 } 1007 1008 public TaskRunnerFactory getSessionTaskRunner() { 1009 synchronized (this) { 1010 if (sessionTaskRunner == null) { 1011 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize); 1012 sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler); 1013 } 1014 } 1015 return sessionTaskRunner; 1016 } 1017 1018 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 1019 this.sessionTaskRunner = sessionTaskRunner; 1020 } 1021 1022 public MessageTransformer getTransformer() { 1023 return transformer; 1024 } 1025 1026 /** 1027 * Sets the transformer used to transform messages before they are sent on 1028 * to the JMS bus or when they are received from the bus but before they are 1029 * delivered to the JMS client 1030 */ 1031 public void setTransformer(MessageTransformer transformer) { 1032 this.transformer = transformer; 1033 } 1034 1035 /** 1036 * @return the statsEnabled 1037 */ 1038 public boolean isStatsEnabled() { 1039 return this.stats.isEnabled(); 1040 } 1041 1042 /** 1043 * @param statsEnabled the statsEnabled to set 1044 */ 1045 public void setStatsEnabled(boolean statsEnabled) { 1046 this.stats.setEnabled(statsEnabled); 1047 } 1048 1049 /** 1050 * Returns the {@link DestinationSource} object which can be used to listen to destinations 1051 * being created or destroyed or to enquire about the current destinations available on the broker 1052 * 1053 * @return a lazily created destination source 1054 * @throws JMSException 1055 */ 1056 @Override 1057 public DestinationSource getDestinationSource() throws JMSException { 1058 if (destinationSource == null) { 1059 destinationSource = new DestinationSource(this); 1060 destinationSource.start(); 1061 } 1062 return destinationSource; 1063 } 1064 1065 // Implementation methods 1066 // ------------------------------------------------------------------------- 1067 1068 /** 1069 * Used internally for adding Sessions to the Connection 1070 * 1071 * @param session 1072 * @throws JMSException 1073 * @throws JMSException 1074 */ 1075 protected void addSession(ActiveMQSession session) throws JMSException { 1076 this.sessions.add(session); 1077 if (sessions.size() > 1 || session.isTransacted()) { 1078 optimizedMessageDispatch = false; 1079 } 1080 } 1081 1082 /** 1083 * Used interanlly for removing Sessions from a Connection 1084 * 1085 * @param session 1086 */ 1087 protected void removeSession(ActiveMQSession session) { 1088 this.sessions.remove(session); 1089 this.removeDispatcher(session); 1090 } 1091 1092 /** 1093 * Add a ConnectionConsumer 1094 * 1095 * @param connectionConsumer 1096 * @throws JMSException 1097 */ 1098 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 1099 this.connectionConsumers.add(connectionConsumer); 1100 } 1101 1102 /** 1103 * Remove a ConnectionConsumer 1104 * 1105 * @param connectionConsumer 1106 */ 1107 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { 1108 this.connectionConsumers.remove(connectionConsumer); 1109 this.removeDispatcher(connectionConsumer); 1110 } 1111 1112 /** 1113 * Creates a <CODE>TopicSession</CODE> object. 1114 * 1115 * @param transacted indicates whether the session is transacted 1116 * @param acknowledgeMode indicates whether the consumer or the client will 1117 * acknowledge any messages it receives; ignored if the 1118 * session is transacted. Legal values are 1119 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1120 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1121 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1122 * @return a newly created topic session 1123 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1124 * to create a session due to some internal error or lack of 1125 * support for the specific transaction and acknowledgement 1126 * mode. 1127 * @see Session#AUTO_ACKNOWLEDGE 1128 * @see Session#CLIENT_ACKNOWLEDGE 1129 * @see Session#DUPS_OK_ACKNOWLEDGE 1130 */ 1131 @Override 1132 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 1133 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1134 } 1135 1136 /** 1137 * Creates a connection consumer for this connection (optional operation). 1138 * This is an expert facility not used by regular JMS clients. 1139 * 1140 * @param topic the topic to access 1141 * @param messageSelector only messages with properties matching the message 1142 * selector expression are delivered. A value of null or an 1143 * empty string indicates that there is no message selector 1144 * for the message consumer. 1145 * @param sessionPool the server session pool to associate with this 1146 * connection consumer 1147 * @param maxMessages the maximum number of messages that can be assigned to 1148 * a server session at one time 1149 * @return the connection consumer 1150 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1151 * to create a connection consumer due to some internal 1152 * error or invalid arguments for <CODE>sessionPool</CODE> 1153 * and <CODE>messageSelector</CODE>. 1154 * @throws javax.jms.InvalidDestinationException if an invalid topic is 1155 * specified. 1156 * @throws javax.jms.InvalidSelectorException if the message selector is 1157 * invalid. 1158 * @see javax.jms.ConnectionConsumer 1159 */ 1160 @Override 1161 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1162 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); 1163 } 1164 1165 /** 1166 * Creates a connection consumer for this connection (optional operation). 1167 * This is an expert facility not used by regular JMS clients. 1168 * 1169 * @param queue the queue to access 1170 * @param messageSelector only messages with properties matching the message 1171 * selector expression are delivered. A value of null or an 1172 * empty string indicates that there is no message selector 1173 * for the message consumer. 1174 * @param sessionPool the server session pool to associate with this 1175 * connection consumer 1176 * @param maxMessages the maximum number of messages that can be assigned to 1177 * a server session at one time 1178 * @return the connection consumer 1179 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1180 * to create a connection consumer due to some internal 1181 * error or invalid arguments for <CODE>sessionPool</CODE> 1182 * and <CODE>messageSelector</CODE>. 1183 * @throws javax.jms.InvalidDestinationException if an invalid queue is 1184 * specified. 1185 * @throws javax.jms.InvalidSelectorException if the message selector is 1186 * invalid. 1187 * @see javax.jms.ConnectionConsumer 1188 */ 1189 @Override 1190 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1191 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); 1192 } 1193 1194 /** 1195 * Creates a connection consumer for this connection (optional operation). 1196 * This is an expert facility not used by regular JMS clients. 1197 * 1198 * @param destination the destination to access 1199 * @param messageSelector only messages with properties matching the message 1200 * selector expression are delivered. A value of null or an 1201 * empty string indicates that there is no message selector 1202 * for the message consumer. 1203 * @param sessionPool the server session pool to associate with this 1204 * connection consumer 1205 * @param maxMessages the maximum number of messages that can be assigned to 1206 * a server session at one time 1207 * @return the connection consumer 1208 * @throws JMSException if the <CODE>Connection</CODE> object fails to 1209 * create a connection consumer due to some internal error 1210 * or invalid arguments for <CODE>sessionPool</CODE> and 1211 * <CODE>messageSelector</CODE>. 1212 * @throws javax.jms.InvalidDestinationException if an invalid destination 1213 * is specified. 1214 * @throws javax.jms.InvalidSelectorException if the message selector is 1215 * invalid. 1216 * @see javax.jms.ConnectionConsumer 1217 * @since 1.1 1218 */ 1219 @Override 1220 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1221 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); 1222 } 1223 1224 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) 1225 throws JMSException { 1226 1227 checkClosedOrFailed(); 1228 ensureConnectionInfoSent(); 1229 1230 ConsumerId consumerId = createConsumerId(); 1231 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); 1232 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination)); 1233 consumerInfo.setSelector(messageSelector); 1234 consumerInfo.setPrefetchSize(maxMessages); 1235 consumerInfo.setNoLocal(noLocal); 1236 consumerInfo.setDispatchAsync(isDispatchAsync()); 1237 1238 // Allows the options on the destination to configure the consumerInfo 1239 if (consumerInfo.getDestination().getOptions() != null) { 1240 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions()); 1241 IntrospectionSupport.setProperties(consumerInfo, options, "consumer."); 1242 } 1243 1244 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo); 1245 } 1246 1247 /** 1248 * @return a newly created ConsumedId unique to this connection session instance. 1249 */ 1250 private ConsumerId createConsumerId() { 1251 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); 1252 } 1253 1254 /** 1255 * Creates a <CODE>QueueSession</CODE> object. 1256 * 1257 * @param transacted indicates whether the session is transacted 1258 * @param acknowledgeMode indicates whether the consumer or the client will 1259 * acknowledge any messages it receives; ignored if the 1260 * session is transacted. Legal values are 1261 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1262 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1263 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1264 * @return a newly created queue session 1265 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1266 * to create a session due to some internal error or lack of 1267 * support for the specific transaction and acknowledgement 1268 * mode. 1269 * @see Session#AUTO_ACKNOWLEDGE 1270 * @see Session#CLIENT_ACKNOWLEDGE 1271 * @see Session#DUPS_OK_ACKNOWLEDGE 1272 */ 1273 @Override 1274 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 1275 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1276 } 1277 1278 /** 1279 * Ensures that the clientID was manually specified and not auto-generated. 1280 * If the clientID was not specified this method will throw an exception. 1281 * This method is used to ensure that the clientID + durableSubscriber name 1282 * are used correctly. 1283 * 1284 * @throws JMSException 1285 */ 1286 public void checkClientIDWasManuallySpecified() throws JMSException { 1287 if (!userSpecifiedClientID) { 1288 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1289 } 1290 } 1291 1292 /** 1293 * send a Packet through the Connection - for internal use only 1294 * 1295 * @param command 1296 * @throws JMSException 1297 */ 1298 public void asyncSendPacket(Command command) throws JMSException { 1299 if (isClosed()) { 1300 throw new ConnectionClosedException(); 1301 } else { 1302 doAsyncSendPacket(command); 1303 } 1304 } 1305 1306 private void doAsyncSendPacket(Command command) throws JMSException { 1307 try { 1308 this.transport.oneway(command); 1309 } catch (IOException e) { 1310 throw JMSExceptionSupport.create(e); 1311 } 1312 } 1313 1314 /** 1315 * Send a packet through a Connection - for internal use only 1316 * 1317 * @param command 1318 * 1319 * @throws JMSException 1320 */ 1321 public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException { 1322 if(onComplete==null) { 1323 syncSendPacket(command); 1324 } else { 1325 if (isClosed()) { 1326 throw new ConnectionClosedException(); 1327 } 1328 try { 1329 this.transport.asyncRequest(command, new ResponseCallback() { 1330 @Override 1331 public void onCompletion(FutureResponse resp) { 1332 Response response; 1333 Throwable exception = null; 1334 try { 1335 response = resp.getResult(); 1336 if (response.isException()) { 1337 ExceptionResponse er = (ExceptionResponse)response; 1338 exception = er.getException(); 1339 } 1340 } catch (Exception e) { 1341 exception = e; 1342 } 1343 if(exception!=null) { 1344 if ( exception instanceof JMSException) { 1345 onComplete.onException((JMSException) exception); 1346 } else { 1347 if (isClosed()||closing.get()) { 1348 LOG.debug("Received an exception but connection is closing"); 1349 } 1350 JMSException jmsEx = null; 1351 try { 1352 jmsEx = JMSExceptionSupport.create(exception); 1353 } catch(Throwable e) { 1354 LOG.error("Caught an exception trying to create a JMSException for " +exception,e); 1355 } 1356 // dispose of transport for security exceptions on connection initiation 1357 if (exception instanceof SecurityException && command instanceof ConnectionInfo){ 1358 forceCloseOnSecurityException(exception); 1359 } 1360 if (jmsEx !=null) { 1361 onComplete.onException(jmsEx); 1362 } 1363 } 1364 } else { 1365 onComplete.onSuccess(); 1366 } 1367 } 1368 }); 1369 } catch (IOException e) { 1370 throw JMSExceptionSupport.create(e); 1371 } 1372 } 1373 } 1374 1375 private void forceCloseOnSecurityException(Throwable exception) { 1376 LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception); 1377 onException(new IOException("Force close due to SecurityException on connect", exception)); 1378 } 1379 1380 public Response syncSendPacket(Command command, int timeout) throws JMSException { 1381 if (isClosed()) { 1382 throw new ConnectionClosedException(); 1383 } else { 1384 1385 try { 1386 Response response = (Response)(timeout > 0 1387 ? this.transport.request(command, timeout) 1388 : this.transport.request(command)); 1389 if (response.isException()) { 1390 ExceptionResponse er = (ExceptionResponse)response; 1391 if (er.getException() instanceof JMSException) { 1392 throw (JMSException)er.getException(); 1393 } else { 1394 if (isClosed()||closing.get()) { 1395 LOG.debug("Received an exception but connection is closing"); 1396 } 1397 JMSException jmsEx = null; 1398 try { 1399 jmsEx = JMSExceptionSupport.create(er.getException()); 1400 } catch(Throwable e) { 1401 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); 1402 } 1403 if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){ 1404 forceCloseOnSecurityException(er.getException()); 1405 } 1406 if (jmsEx !=null) { 1407 throw jmsEx; 1408 } 1409 } 1410 } 1411 return response; 1412 } catch (IOException e) { 1413 throw JMSExceptionSupport.create(e); 1414 } 1415 } 1416 } 1417 1418 /** 1419 * Send a packet through a Connection - for internal use only 1420 * 1421 * @param command 1422 * 1423 * @return the broker Response for the given Command. 1424 * 1425 * @throws JMSException 1426 */ 1427 public Response syncSendPacket(Command command) throws JMSException { 1428 return syncSendPacket(command, 0); 1429 } 1430 1431 /** 1432 * @return statistics for this Connection 1433 */ 1434 @Override 1435 public StatsImpl getStats() { 1436 return stats; 1437 } 1438 1439 /** 1440 * simply throws an exception if the Connection is already closed or the 1441 * Transport has failed 1442 * 1443 * @throws JMSException 1444 */ 1445 protected synchronized void checkClosedOrFailed() throws JMSException { 1446 checkClosed(); 1447 if (transportFailed.get()) { 1448 throw new ConnectionFailedException(firstFailureError); 1449 } 1450 } 1451 1452 /** 1453 * simply throws an exception if the Connection is already closed 1454 * 1455 * @throws JMSException 1456 */ 1457 protected synchronized void checkClosed() throws JMSException { 1458 if (closed.get()) { 1459 throw new ConnectionClosedException(); 1460 } 1461 } 1462 1463 /** 1464 * Send the ConnectionInfo to the Broker 1465 * 1466 * @throws JMSException 1467 */ 1468 protected void ensureConnectionInfoSent() throws JMSException { 1469 synchronized(this.ensureConnectionInfoSentMutex) { 1470 // Can we skip sending the ConnectionInfo packet?? 1471 if (isConnectionInfoSentToBroker || closed.get()) { 1472 return; 1473 } 1474 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID? 1475 if (info.getClientId() == null || info.getClientId().trim().length() == 0) { 1476 info.setClientId(clientIdGenerator.generateId()); 1477 } 1478 syncSendPacket(info.copy(), getConnectResponseTimeout()); 1479 1480 this.isConnectionInfoSentToBroker = true; 1481 // Add a temp destination advisory consumer so that 1482 // We know what the valid temporary destinations are on the 1483 // broker without having to do an RPC to the broker. 1484 1485 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); 1486 if (watchTopicAdvisories) { 1487 advisoryConsumer = new AdvisoryConsumer(this, consumerId); 1488 } 1489 } 1490 } 1491 1492 public synchronized boolean isWatchTopicAdvisories() { 1493 return watchTopicAdvisories; 1494 } 1495 1496 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 1497 this.watchTopicAdvisories = watchTopicAdvisories; 1498 } 1499 1500 /** 1501 * @return Returns the useAsyncSend. 1502 */ 1503 public boolean isUseAsyncSend() { 1504 return useAsyncSend; 1505 } 1506 1507 /** 1508 * Forces the use of <a 1509 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 1510 * adds a massive performance boost; but means that the send() method will 1511 * return immediately whether the message has been sent or not which could 1512 * lead to message loss. 1513 */ 1514 public void setUseAsyncSend(boolean useAsyncSend) { 1515 this.useAsyncSend = useAsyncSend; 1516 } 1517 1518 /** 1519 * @return true if always sync send messages 1520 */ 1521 public boolean isAlwaysSyncSend() { 1522 return this.alwaysSyncSend; 1523 } 1524 1525 /** 1526 * Set true if always require messages to be sync sent 1527 * 1528 * @param alwaysSyncSend 1529 */ 1530 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 1531 this.alwaysSyncSend = alwaysSyncSend; 1532 } 1533 1534 /** 1535 * @return the messagePrioritySupported 1536 */ 1537 public boolean isMessagePrioritySupported() { 1538 return this.messagePrioritySupported; 1539 } 1540 1541 /** 1542 * @param messagePrioritySupported the messagePrioritySupported to set 1543 */ 1544 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 1545 this.messagePrioritySupported = messagePrioritySupported; 1546 } 1547 1548 /** 1549 * Cleans up this connection so that it's state is as if the connection was 1550 * just created. This allows the Resource Adapter to clean up a connection 1551 * so that it can be reused without having to close and recreate the 1552 * connection. 1553 */ 1554 public void cleanup() throws JMSException { 1555 doCleanup(false); 1556 } 1557 1558 public void doCleanup(boolean removeConnection) throws JMSException { 1559 if (advisoryConsumer != null && !isTransportFailed()) { 1560 advisoryConsumer.dispose(); 1561 advisoryConsumer = null; 1562 } 1563 1564 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1565 ActiveMQSession s = i.next(); 1566 s.dispose(); 1567 } 1568 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 1569 ActiveMQConnectionConsumer c = i.next(); 1570 c.dispose(); 1571 } 1572 1573 if (removeConnection) { 1574 if (isConnectionInfoSentToBroker) { 1575 if (!transportFailed.get() && !closing.get()) { 1576 syncSendPacket(info.createRemoveCommand()); 1577 } 1578 isConnectionInfoSentToBroker = false; 1579 } 1580 if (userSpecifiedClientID) { 1581 info.setClientId(null); 1582 userSpecifiedClientID = false; 1583 } 1584 clientIDSet = false; 1585 } 1586 1587 started.set(false); 1588 } 1589 1590 /** 1591 * Changes the associated username/password that is associated with this 1592 * connection. If the connection has been used, you must called cleanup() 1593 * before calling this method. 1594 * 1595 * @throws IllegalStateException if the connection is in used. 1596 */ 1597 public void changeUserInfo(String userName, String password) throws JMSException { 1598 if (isConnectionInfoSentToBroker) { 1599 throw new IllegalStateException("changeUserInfo used Connection is not allowed"); 1600 } 1601 this.info.setUserName(userName); 1602 this.info.setPassword(password); 1603 } 1604 1605 /** 1606 * @return Returns the resourceManagerId. 1607 * @throws JMSException 1608 */ 1609 public String getResourceManagerId() throws JMSException { 1610 if (isRmIdFromConnectionId()) { 1611 return info.getConnectionId().getValue(); 1612 } 1613 waitForBrokerInfo(); 1614 if (brokerInfo == null) { 1615 throw new JMSException("Connection failed before Broker info was received."); 1616 } 1617 return brokerInfo.getBrokerId().getValue(); 1618 } 1619 1620 /** 1621 * Returns the broker name if one is available or null if one is not 1622 * available yet. 1623 */ 1624 public String getBrokerName() { 1625 try { 1626 brokerInfoReceived.await(5, TimeUnit.SECONDS); 1627 if (brokerInfo == null) { 1628 return null; 1629 } 1630 return brokerInfo.getBrokerName(); 1631 } catch (InterruptedException e) { 1632 Thread.currentThread().interrupt(); 1633 return null; 1634 } 1635 } 1636 1637 /** 1638 * Returns the broker information if it is available or null if it is not 1639 * available yet. 1640 */ 1641 public BrokerInfo getBrokerInfo() { 1642 return brokerInfo; 1643 } 1644 1645 /** 1646 * @return Returns the RedeliveryPolicy. 1647 * @throws JMSException 1648 */ 1649 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { 1650 return redeliveryPolicyMap.getDefaultEntry(); 1651 } 1652 1653 /** 1654 * Sets the redelivery policy to be used when messages are rolled back 1655 */ 1656 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 1657 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); 1658 } 1659 1660 public BlobTransferPolicy getBlobTransferPolicy() { 1661 if (blobTransferPolicy == null) { 1662 blobTransferPolicy = createBlobTransferPolicy(); 1663 } 1664 return blobTransferPolicy; 1665 } 1666 1667 /** 1668 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1669 * OBjects) are transferred from producers to brokers to consumers 1670 */ 1671 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1672 this.blobTransferPolicy = blobTransferPolicy; 1673 } 1674 1675 /** 1676 * @return Returns the alwaysSessionAsync. 1677 */ 1678 public boolean isAlwaysSessionAsync() { 1679 return alwaysSessionAsync; 1680 } 1681 1682 /** 1683 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in 1684 * the Connection. However, a separate thread is always used if there is more than one session, or the session 1685 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch 1686 * happens asynchronously. 1687 */ 1688 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 1689 this.alwaysSessionAsync = alwaysSessionAsync; 1690 } 1691 1692 /** 1693 * @return Returns the optimizeAcknowledge. 1694 */ 1695 public boolean isOptimizeAcknowledge() { 1696 return optimizeAcknowledge; 1697 } 1698 1699 /** 1700 * Enables an optimised acknowledgement mode where messages are acknowledged 1701 * in batches rather than individually 1702 * 1703 * @param optimizeAcknowledge The optimizeAcknowledge to set. 1704 */ 1705 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 1706 this.optimizeAcknowledge = optimizeAcknowledge; 1707 } 1708 1709 /** 1710 * The max time in milliseconds between optimized ack batches 1711 * @param optimizeAcknowledgeTimeOut 1712 */ 1713 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 1714 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 1715 } 1716 1717 public long getOptimizeAcknowledgeTimeOut() { 1718 return optimizeAcknowledgeTimeOut; 1719 } 1720 1721 public long getWarnAboutUnstartedConnectionTimeout() { 1722 return warnAboutUnstartedConnectionTimeout; 1723 } 1724 1725 /** 1726 * Enables the timeout from a connection creation to when a warning is 1727 * generated if the connection is not properly started via {@link #start()} 1728 * and a message is received by a consumer. It is a very common gotcha to 1729 * forget to <a 1730 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1731 * the connection</a> so this option makes the default case to create a 1732 * warning if the user forgets. To disable the warning just set the value to < 1733 * 0 (say -1). 1734 */ 1735 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1736 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1737 } 1738 1739 /** 1740 * @return the sendTimeout (in milliseconds) 1741 */ 1742 public int getSendTimeout() { 1743 return sendTimeout; 1744 } 1745 1746 /** 1747 * @param sendTimeout the sendTimeout to set (in milliseconds) 1748 */ 1749 public void setSendTimeout(int sendTimeout) { 1750 this.sendTimeout = sendTimeout; 1751 } 1752 1753 /** 1754 * @return the sendAcksAsync 1755 */ 1756 public boolean isSendAcksAsync() { 1757 return sendAcksAsync; 1758 } 1759 1760 /** 1761 * @param sendAcksAsync the sendAcksAsync to set 1762 */ 1763 public void setSendAcksAsync(boolean sendAcksAsync) { 1764 this.sendAcksAsync = sendAcksAsync; 1765 } 1766 1767 /** 1768 * Returns the time this connection was created 1769 */ 1770 public long getTimeCreated() { 1771 return timeCreated; 1772 } 1773 1774 private void waitForBrokerInfo() throws JMSException { 1775 try { 1776 brokerInfoReceived.await(); 1777 } catch (InterruptedException e) { 1778 Thread.currentThread().interrupt(); 1779 throw JMSExceptionSupport.create(e); 1780 } 1781 } 1782 1783 // Package protected so that it can be used in unit tests 1784 public Transport getTransport() { 1785 return transport; 1786 } 1787 1788 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { 1789 producers.put(producerId, producer); 1790 } 1791 1792 public void removeProducer(ProducerId producerId) { 1793 producers.remove(producerId); 1794 } 1795 1796 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { 1797 dispatchers.put(consumerId, dispatcher); 1798 } 1799 1800 public void removeDispatcher(ConsumerId consumerId) { 1801 dispatchers.remove(consumerId); 1802 } 1803 1804 public boolean hasDispatcher(ConsumerId consumerId) { 1805 return dispatchers.containsKey(consumerId); 1806 } 1807 1808 /** 1809 * @param o - the command to consume 1810 */ 1811 @Override 1812 public void onCommand(final Object o) { 1813 final Command command = (Command)o; 1814 if (!closed.get() && command != null) { 1815 try { 1816 command.visit(new CommandVisitorAdapter() { 1817 @Override 1818 public Response processMessageDispatch(MessageDispatch md) throws Exception { 1819 waitForTransportInterruptionProcessingToComplete(); 1820 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 1821 if (dispatcher != null) { 1822 // Copy in case a embedded broker is dispatching via 1823 // vm:// 1824 // md.getMessage() == null to signal end of queue 1825 // browse. 1826 Message msg = md.getMessage(); 1827 if (msg != null) { 1828 msg = msg.copy(); 1829 msg.setReadOnlyBody(true); 1830 msg.setReadOnlyProperties(true); 1831 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 1832 msg.setConnection(ActiveMQConnection.this); 1833 msg.setMemoryUsage(null); 1834 md.setMessage(msg); 1835 } 1836 dispatcher.dispatch(md); 1837 } else { 1838 LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers); 1839 } 1840 return null; 1841 } 1842 1843 @Override 1844 public Response processProducerAck(ProducerAck pa) throws Exception { 1845 if (pa != null && pa.getProducerId() != null) { 1846 ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); 1847 if (producer != null) { 1848 producer.onProducerAck(pa); 1849 } 1850 } 1851 return null; 1852 } 1853 1854 @Override 1855 public Response processBrokerInfo(BrokerInfo info) throws Exception { 1856 brokerInfo = info; 1857 brokerInfoReceived.countDown(); 1858 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); 1859 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); 1860 return null; 1861 } 1862 1863 @Override 1864 public Response processConnectionError(final ConnectionError error) throws Exception { 1865 executor.execute(new Runnable() { 1866 @Override 1867 public void run() { 1868 onAsyncException(error.getException()); 1869 } 1870 }); 1871 return null; 1872 } 1873 1874 @Override 1875 public Response processControlCommand(ControlCommand command) throws Exception { 1876 onControlCommand(command); 1877 return null; 1878 } 1879 1880 @Override 1881 public Response processConnectionControl(ConnectionControl control) throws Exception { 1882 onConnectionControl((ConnectionControl)command); 1883 return null; 1884 } 1885 1886 @Override 1887 public Response processConsumerControl(ConsumerControl control) throws Exception { 1888 onConsumerControl((ConsumerControl)command); 1889 return null; 1890 } 1891 1892 @Override 1893 public Response processWireFormat(WireFormatInfo info) throws Exception { 1894 onWireFormatInfo((WireFormatInfo)command); 1895 return null; 1896 } 1897 }); 1898 } catch (Exception e) { 1899 onClientInternalException(e); 1900 } 1901 } 1902 1903 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1904 TransportListener listener = iter.next(); 1905 listener.onCommand(command); 1906 } 1907 } 1908 1909 protected void onWireFormatInfo(WireFormatInfo info) { 1910 protocolVersion.set(info.getVersion()); 1911 } 1912 1913 /** 1914 * Handles async client internal exceptions. 1915 * A client internal exception is usually one that has been thrown 1916 * by a container runtime component during asynchronous processing of a 1917 * message that does not affect the connection itself. 1918 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking 1919 * its <code>onException</code> method, if one has been registered with this connection. 1920 * 1921 * @param error the exception that the problem 1922 */ 1923 public void onClientInternalException(final Throwable error) { 1924 if ( !closed.get() && !closing.get() ) { 1925 if ( this.clientInternalExceptionListener != null ) { 1926 executor.execute(new Runnable() { 1927 @Override 1928 public void run() { 1929 ActiveMQConnection.this.clientInternalExceptionListener.onException(error); 1930 } 1931 }); 1932 } else { 1933 LOG.debug("Async client internal exception occurred with no exception listener registered: " 1934 + error, error); 1935 } 1936 } 1937 } 1938 1939 /** 1940 * Used for handling async exceptions 1941 * 1942 * @param error 1943 */ 1944 public void onAsyncException(Throwable error) { 1945 if (!closed.get() && !closing.get()) { 1946 if (this.exceptionListener != null) { 1947 1948 if (!(error instanceof JMSException)) { 1949 error = JMSExceptionSupport.create(error); 1950 } 1951 final JMSException e = (JMSException)error; 1952 1953 executor.execute(new Runnable() { 1954 @Override 1955 public void run() { 1956 ActiveMQConnection.this.exceptionListener.onException(e); 1957 } 1958 }); 1959 1960 } else { 1961 LOG.debug("Async exception with no exception listener: " + error, error); 1962 } 1963 } 1964 } 1965 1966 @Override 1967 public void onException(final IOException error) { 1968 onAsyncException(error); 1969 if (!closing.get() && !closed.get()) { 1970 executor.execute(new Runnable() { 1971 @Override 1972 public void run() { 1973 transportFailed(error); 1974 ServiceSupport.dispose(ActiveMQConnection.this.transport); 1975 brokerInfoReceived.countDown(); 1976 try { 1977 doCleanup(true); 1978 } catch (JMSException e) { 1979 LOG.warn("Exception during connection cleanup, " + e, e); 1980 } 1981 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1982 TransportListener listener = iter.next(); 1983 listener.onException(error); 1984 } 1985 } 1986 }); 1987 } 1988 } 1989 1990 @Override 1991 public void transportInterupted() { 1992 transportInterruptionProcessingComplete.set(1); 1993 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1994 ActiveMQSession s = i.next(); 1995 s.clearMessagesInProgress(transportInterruptionProcessingComplete); 1996 } 1997 1998 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { 1999 connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete); 2000 } 2001 2002 if (transportInterruptionProcessingComplete.decrementAndGet() > 0) { 2003 if (LOG.isDebugEnabled()) { 2004 LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get()); 2005 } 2006 signalInterruptionProcessingNeeded(); 2007 } 2008 2009 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 2010 TransportListener listener = iter.next(); 2011 listener.transportInterupted(); 2012 } 2013 } 2014 2015 @Override 2016 public void transportResumed() { 2017 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 2018 TransportListener listener = iter.next(); 2019 listener.transportResumed(); 2020 } 2021 } 2022 2023 /** 2024 * Create the DestinationInfo object for the temporary destination. 2025 * 2026 * @param topic - if its true topic, else queue. 2027 * @return DestinationInfo 2028 * @throws JMSException 2029 */ 2030 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException { 2031 2032 // Check if Destination info is of temporary type. 2033 ActiveMQTempDestination dest; 2034 if (topic) { 2035 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 2036 } else { 2037 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 2038 } 2039 2040 DestinationInfo info = new DestinationInfo(); 2041 info.setConnectionId(this.info.getConnectionId()); 2042 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 2043 info.setDestination(dest); 2044 syncSendPacket(info); 2045 2046 dest.setConnection(this); 2047 activeTempDestinations.put(dest, dest); 2048 return dest; 2049 } 2050 2051 /** 2052 * @param destination 2053 * @throws JMSException 2054 */ 2055 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { 2056 2057 checkClosedOrFailed(); 2058 2059 for (ActiveMQSession session : this.sessions) { 2060 if (session.isInUse(destination)) { 2061 throw new JMSException("A consumer is consuming from the temporary destination"); 2062 } 2063 } 2064 2065 activeTempDestinations.remove(destination); 2066 2067 DestinationInfo destInfo = new DestinationInfo(); 2068 destInfo.setConnectionId(this.info.getConnectionId()); 2069 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2070 destInfo.setDestination(destination); 2071 destInfo.setTimeout(0); 2072 syncSendPacket(destInfo); 2073 } 2074 2075 public boolean isDeleted(ActiveMQDestination dest) { 2076 2077 // If we are not watching the advisories.. then 2078 // we will assume that the temp destination does exist. 2079 if (advisoryConsumer == null) { 2080 return false; 2081 } 2082 2083 return !activeTempDestinations.containsValue(dest); 2084 } 2085 2086 public boolean isCopyMessageOnSend() { 2087 return copyMessageOnSend; 2088 } 2089 2090 public LongSequenceGenerator getLocalTransactionIdGenerator() { 2091 return localTransactionIdGenerator; 2092 } 2093 2094 public boolean isUseCompression() { 2095 return useCompression; 2096 } 2097 2098 /** 2099 * Enables the use of compression of the message bodies 2100 */ 2101 public void setUseCompression(boolean useCompression) { 2102 this.useCompression = useCompression; 2103 } 2104 2105 public void destroyDestination(ActiveMQDestination destination) throws JMSException { 2106 2107 checkClosedOrFailed(); 2108 ensureConnectionInfoSent(); 2109 2110 DestinationInfo info = new DestinationInfo(); 2111 info.setConnectionId(this.info.getConnectionId()); 2112 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2113 info.setDestination(destination); 2114 info.setTimeout(0); 2115 syncSendPacket(info); 2116 } 2117 2118 public boolean isDispatchAsync() { 2119 return dispatchAsync; 2120 } 2121 2122 /** 2123 * Enables or disables the default setting of whether or not consumers have 2124 * their messages <a 2125 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 2126 * synchronously or asynchronously by the broker</a>. For non-durable 2127 * topics for example we typically dispatch synchronously by default to 2128 * minimize context switches which boost performance. However sometimes its 2129 * better to go slower to ensure that a single blocked consumer socket does 2130 * not block delivery to other consumers. 2131 * 2132 * @param asyncDispatch If true then consumers created on this connection 2133 * will default to having their messages dispatched 2134 * asynchronously. The default value is true. 2135 */ 2136 public void setDispatchAsync(boolean asyncDispatch) { 2137 this.dispatchAsync = asyncDispatch; 2138 } 2139 2140 public boolean isObjectMessageSerializationDefered() { 2141 return objectMessageSerializationDefered; 2142 } 2143 2144 /** 2145 * When an object is set on an ObjectMessage, the JMS spec requires the 2146 * object to be serialized by that set method. Enabling this flag causes the 2147 * object to not get serialized. The object may subsequently get serialized 2148 * if the message needs to be sent over a socket or stored to disk. 2149 */ 2150 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 2151 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 2152 } 2153 2154 /** 2155 * Unsubscribes a durable subscription that has been created by a client. 2156 * <P> 2157 * This method deletes the state being maintained on behalf of the 2158 * subscriber by its provider. 2159 * <P> 2160 * It is erroneous for a client to delete a durable subscription while there 2161 * is an active <CODE>MessageConsumer </CODE> or 2162 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 2163 * message is part of a pending transaction or has not been acknowledged in 2164 * the session. 2165 * 2166 * @param name the name used to identify this subscription 2167 * @throws JMSException if the session fails to unsubscribe to the durable 2168 * subscription due to some internal error. 2169 * @throws InvalidDestinationException if an invalid subscription name is 2170 * specified. 2171 * @since 1.1 2172 */ 2173 public void unsubscribe(String name) throws InvalidDestinationException, JMSException { 2174 checkClosedOrFailed(); 2175 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 2176 rsi.setConnectionId(getConnectionInfo().getConnectionId()); 2177 rsi.setSubscriptionName(name); 2178 rsi.setClientId(getConnectionInfo().getClientId()); 2179 syncSendPacket(rsi); 2180 } 2181 2182 /** 2183 * Internal send method optimized: - It does not copy the message - It can 2184 * only handle ActiveMQ messages. - You can specify if the send is async or 2185 * sync - Does not allow you to send /w a transaction. 2186 */ 2187 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { 2188 checkClosedOrFailed(); 2189 2190 if (destination.isTemporary() && isDeleted(destination)) { 2191 throw new JMSException("Cannot publish to a deleted Destination: " + destination); 2192 } 2193 2194 msg.setJMSDestination(destination); 2195 msg.setJMSDeliveryMode(deliveryMode); 2196 long expiration = 0L; 2197 2198 if (!isDisableTimeStampsByDefault()) { 2199 long timeStamp = System.currentTimeMillis(); 2200 msg.setJMSTimestamp(timeStamp); 2201 if (timeToLive > 0) { 2202 expiration = timeToLive + timeStamp; 2203 } 2204 } 2205 2206 msg.setJMSExpiration(expiration); 2207 msg.setJMSPriority(priority); 2208 msg.setJMSRedelivered(false); 2209 msg.setMessageId(messageId); 2210 msg.onSend(); 2211 msg.setProducerId(msg.getMessageId().getProducerId()); 2212 2213 if (LOG.isDebugEnabled()) { 2214 LOG.debug("Sending message: " + msg); 2215 } 2216 2217 if (async) { 2218 asyncSendPacket(msg); 2219 } else { 2220 syncSendPacket(msg); 2221 } 2222 } 2223 2224 protected void onControlCommand(ControlCommand command) { 2225 String text = command.getCommand(); 2226 if (text != null) { 2227 if ("shutdown".equals(text)) { 2228 LOG.info("JVM told to shutdown"); 2229 System.exit(0); 2230 } 2231 2232 // TODO Should we handle the "close" case? 2233 // if (false && "close".equals(text)){ 2234 // LOG.error("Broker " + getBrokerInfo() + "shutdown connection"); 2235 // try { 2236 // close(); 2237 // } catch (JMSException e) { 2238 // } 2239 // } 2240 } 2241 } 2242 2243 protected void onConnectionControl(ConnectionControl command) { 2244 if (command.isFaultTolerant()) { 2245 this.optimizeAcknowledge = false; 2246 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2247 ActiveMQSession s = i.next(); 2248 s.setOptimizeAcknowledge(false); 2249 } 2250 } 2251 } 2252 2253 protected void onConsumerControl(ConsumerControl command) { 2254 if (command.isClose()) { 2255 for (ActiveMQSession session : this.sessions) { 2256 session.close(command.getConsumerId()); 2257 } 2258 } else { 2259 for (ActiveMQSession session : this.sessions) { 2260 session.setPrefetchSize(command.getConsumerId(), command.getPrefetch()); 2261 } 2262 for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) { 2263 ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); 2264 if (consumerInfo.getConsumerId().equals(command.getConsumerId())) { 2265 consumerInfo.setPrefetchSize(command.getPrefetch()); 2266 } 2267 } 2268 } 2269 } 2270 2271 protected void transportFailed(IOException error) { 2272 transportFailed.set(true); 2273 if (firstFailureError == null) { 2274 firstFailureError = error; 2275 } 2276 } 2277 2278 /** 2279 * Should a JMS message be copied to a new JMS Message object as part of the 2280 * send() method in JMS. This is enabled by default to be compliant with the 2281 * JMS specification. You can disable it if you do not mutate JMS messages 2282 * after they are sent for a performance boost 2283 */ 2284 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 2285 this.copyMessageOnSend = copyMessageOnSend; 2286 } 2287 2288 @Override 2289 public String toString() { 2290 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; 2291 } 2292 2293 protected BlobTransferPolicy createBlobTransferPolicy() { 2294 return new BlobTransferPolicy(); 2295 } 2296 2297 public int getProtocolVersion() { 2298 return protocolVersion.get(); 2299 } 2300 2301 public int getProducerWindowSize() { 2302 return producerWindowSize; 2303 } 2304 2305 public void setProducerWindowSize(int producerWindowSize) { 2306 this.producerWindowSize = producerWindowSize; 2307 } 2308 2309 public void setAuditDepth(int auditDepth) { 2310 connectionAudit.setAuditDepth(auditDepth); 2311 } 2312 2313 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 2314 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber); 2315 } 2316 2317 protected void removeDispatcher(ActiveMQDispatcher dispatcher) { 2318 connectionAudit.removeDispatcher(dispatcher); 2319 } 2320 2321 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2322 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message); 2323 } 2324 2325 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2326 connectionAudit.rollbackDuplicate(dispatcher, message); 2327 } 2328 2329 public IOException getFirstFailureError() { 2330 return firstFailureError; 2331 } 2332 2333 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { 2334 if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) { 2335 LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get()); 2336 signalInterruptionProcessingComplete(); 2337 } 2338 } 2339 2340 protected void transportInterruptionProcessingComplete() { 2341 if (transportInterruptionProcessingComplete.decrementAndGet() == 0) { 2342 signalInterruptionProcessingComplete(); 2343 } 2344 } 2345 2346 private void signalInterruptionProcessingComplete() { 2347 if (LOG.isDebugEnabled()) { 2348 LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get() 2349 + " for:" + this.getConnectionInfo().getConnectionId()); 2350 } 2351 2352 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2353 if (failoverTransport != null) { 2354 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId()); 2355 if (LOG.isDebugEnabled()) { 2356 LOG.debug("notified failover transport (" + failoverTransport 2357 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId()); 2358 } 2359 } 2360 transportInterruptionProcessingComplete.set(0); 2361 } 2362 2363 private void signalInterruptionProcessingNeeded() { 2364 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2365 if (failoverTransport != null) { 2366 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId()); 2367 if (LOG.isDebugEnabled()) { 2368 LOG.debug("notified failover transport (" + failoverTransport 2369 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId()); 2370 } 2371 } 2372 } 2373 2374 /* 2375 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery 2376 * will wait to receive re dispatched messages. 2377 * default value is 0 so there is no wait by default. 2378 */ 2379 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 2380 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 2381 } 2382 2383 public long getConsumerFailoverRedeliveryWaitPeriod() { 2384 return consumerFailoverRedeliveryWaitPeriod; 2385 } 2386 2387 protected Scheduler getScheduler() throws JMSException { 2388 Scheduler result = scheduler; 2389 if (result == null) { 2390 if (isClosing() || isClosed()) { 2391 // without lock contention report the closing state 2392 throw new ConnectionClosedException(); 2393 } 2394 synchronized (this) { 2395 result = scheduler; 2396 if (result == null) { 2397 checkClosed(); 2398 try { 2399 result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler"); 2400 result.start(); 2401 scheduler = result; 2402 } catch(Exception e) { 2403 throw JMSExceptionSupport.create(e); 2404 } 2405 } 2406 } 2407 } 2408 return result; 2409 } 2410 2411 protected ThreadPoolExecutor getExecutor() { 2412 return this.executor; 2413 } 2414 2415 protected CopyOnWriteArrayList<ActiveMQSession> getSessions() { 2416 return sessions; 2417 } 2418 2419 /** 2420 * @return the checkForDuplicates 2421 */ 2422 public boolean isCheckForDuplicates() { 2423 return this.checkForDuplicates; 2424 } 2425 2426 /** 2427 * @param checkForDuplicates the checkForDuplicates to set 2428 */ 2429 public void setCheckForDuplicates(boolean checkForDuplicates) { 2430 this.checkForDuplicates = checkForDuplicates; 2431 } 2432 2433 public boolean isTransactedIndividualAck() { 2434 return transactedIndividualAck; 2435 } 2436 2437 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 2438 this.transactedIndividualAck = transactedIndividualAck; 2439 } 2440 2441 public boolean isNonBlockingRedelivery() { 2442 return nonBlockingRedelivery; 2443 } 2444 2445 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 2446 this.nonBlockingRedelivery = nonBlockingRedelivery; 2447 } 2448 2449 public boolean isRmIdFromConnectionId() { 2450 return rmIdFromConnectionId; 2451 } 2452 2453 public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { 2454 this.rmIdFromConnectionId = rmIdFromConnectionId; 2455 } 2456 2457 /** 2458 * Removes any TempDestinations that this connection has cached, ignoring 2459 * any exceptions generated because the destination is in use as they should 2460 * not be removed. 2461 * Used from a pooled connection, b/c it will not be explicitly closed. 2462 */ 2463 public void cleanUpTempDestinations() { 2464 2465 if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) { 2466 return; 2467 } 2468 2469 Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries 2470 = this.activeTempDestinations.entrySet().iterator(); 2471 while(entries.hasNext()) { 2472 ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next(); 2473 try { 2474 // Only delete this temp destination if it was created from this connection. The connection used 2475 // for the advisory consumer may also have a reference to this temp destination. 2476 ActiveMQTempDestination dest = entry.getValue(); 2477 String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString(); 2478 if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) { 2479 this.deleteTempDestination(entry.getValue()); 2480 } 2481 } catch (Exception ex) { 2482 // the temp dest is in use so it can not be deleted. 2483 // it is ok to leave it to connection tear down phase 2484 } 2485 } 2486 } 2487 2488 /** 2489 * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back. 2490 * @param redeliveryPolicyMap the redeliveryPolicyMap to set 2491 */ 2492 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { 2493 this.redeliveryPolicyMap = redeliveryPolicyMap; 2494 } 2495 2496 /** 2497 * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the 2498 * Consumers when dealing with transaction messages that have been rolled back. 2499 * 2500 * @return the redeliveryPolicyMap 2501 */ 2502 public RedeliveryPolicyMap getRedeliveryPolicyMap() { 2503 return redeliveryPolicyMap; 2504 } 2505 2506 public int getMaxThreadPoolSize() { 2507 return maxThreadPoolSize; 2508 } 2509 2510 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 2511 this.maxThreadPoolSize = maxThreadPoolSize; 2512 } 2513 2514 /** 2515 * Enable enforcement of QueueConnection semantics. 2516 * 2517 * @return this object, useful for chaining 2518 */ 2519 ActiveMQConnection enforceQueueOnlyConnection() { 2520 this.queueOnlyConnection = true; 2521 return this; 2522 } 2523 2524 public RejectedExecutionHandler getRejectedTaskHandler() { 2525 return rejectedTaskHandler; 2526 } 2527 2528 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 2529 this.rejectedTaskHandler = rejectedTaskHandler; 2530 } 2531 2532 /** 2533 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled 2534 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers 2535 * will not do any background Message acknowledgment. 2536 * 2537 * @return the scheduledOptimizedAckInterval 2538 */ 2539 public long getOptimizedAckScheduledAckInterval() { 2540 return optimizedAckScheduledAckInterval; 2541 } 2542 2543 /** 2544 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that 2545 * have been configured with optimizeAcknowledge enabled. 2546 * 2547 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set 2548 */ 2549 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { 2550 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 2551 } 2552 2553 /** 2554 * @return true if MessageConsumer instance will check for expired messages before dispatch. 2555 */ 2556 public boolean isConsumerExpiryCheckEnabled() { 2557 return consumerExpiryCheckEnabled; 2558 } 2559 2560 /** 2561 * Controls whether message expiration checking is done in each MessageConsumer 2562 * prior to dispatching a message. Disabling this check can lead to consumption 2563 * of expired messages. 2564 * 2565 * @param consumerExpiryCheckEnabled 2566 * controls whether expiration checking is done prior to dispatch. 2567 */ 2568 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 2569 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 2570 } 2571 2572 public List<String> getTrustedPackages() { 2573 return trustedPackages; 2574 } 2575 2576 public void setTrustedPackages(List<String> trustedPackages) { 2577 this.trustedPackages = trustedPackages; 2578 } 2579 2580 public boolean isTrustAllPackages() { 2581 return trustAllPackages; 2582 } 2583 2584 public void setTrustAllPackages(boolean trustAllPackages) { 2585 this.trustAllPackages = trustAllPackages; 2586 } 2587 2588 public int getConnectResponseTimeout() { 2589 return connectResponseTimeout; 2590 } 2591 2592 public void setConnectResponseTimeout(int connectResponseTimeout) { 2593 this.connectResponseTimeout = connectResponseTimeout; 2594 } 2595}