001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.net.SocketException;
022import java.net.URI;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Properties;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.CopyOnWriteArrayList;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicInteger;
037import java.util.concurrent.atomic.AtomicReference;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039
040import javax.transaction.xa.XAResource;
041
042import org.apache.activemq.advisory.AdvisorySupport;
043import org.apache.activemq.broker.region.ConnectionStatistics;
044import org.apache.activemq.broker.region.DurableTopicSubscription;
045import org.apache.activemq.broker.region.RegionBroker;
046import org.apache.activemq.broker.region.TopicRegion;
047import org.apache.activemq.command.ActiveMQDestination;
048import org.apache.activemq.command.BrokerInfo;
049import org.apache.activemq.command.BrokerSubscriptionInfo;
050import org.apache.activemq.command.Command;
051import org.apache.activemq.command.CommandTypes;
052import org.apache.activemq.command.ConnectionControl;
053import org.apache.activemq.command.ConnectionError;
054import org.apache.activemq.command.ConnectionId;
055import org.apache.activemq.command.ConnectionInfo;
056import org.apache.activemq.command.ConsumerControl;
057import org.apache.activemq.command.ConsumerId;
058import org.apache.activemq.command.ConsumerInfo;
059import org.apache.activemq.command.ControlCommand;
060import org.apache.activemq.command.DataArrayResponse;
061import org.apache.activemq.command.DestinationInfo;
062import org.apache.activemq.command.ExceptionResponse;
063import org.apache.activemq.command.FlushCommand;
064import org.apache.activemq.command.IntegerResponse;
065import org.apache.activemq.command.KeepAliveInfo;
066import org.apache.activemq.command.Message;
067import org.apache.activemq.command.MessageAck;
068import org.apache.activemq.command.MessageDispatch;
069import org.apache.activemq.command.MessageDispatchNotification;
070import org.apache.activemq.command.MessagePull;
071import org.apache.activemq.command.ProducerAck;
072import org.apache.activemq.command.ProducerId;
073import org.apache.activemq.command.ProducerInfo;
074import org.apache.activemq.command.RemoveInfo;
075import org.apache.activemq.command.RemoveSubscriptionInfo;
076import org.apache.activemq.command.Response;
077import org.apache.activemq.command.SessionId;
078import org.apache.activemq.command.SessionInfo;
079import org.apache.activemq.command.ShutdownInfo;
080import org.apache.activemq.command.TransactionId;
081import org.apache.activemq.command.TransactionInfo;
082import org.apache.activemq.command.WireFormatInfo;
083import org.apache.activemq.network.DemandForwardingBridge;
084import org.apache.activemq.network.MBeanNetworkListener;
085import org.apache.activemq.network.NetworkBridgeConfiguration;
086import org.apache.activemq.network.NetworkBridgeFactory;
087import org.apache.activemq.network.NetworkConnector;
088import org.apache.activemq.security.MessageAuthorizationPolicy;
089import org.apache.activemq.state.CommandVisitor;
090import org.apache.activemq.state.ConnectionState;
091import org.apache.activemq.state.ConsumerState;
092import org.apache.activemq.state.ProducerState;
093import org.apache.activemq.state.SessionState;
094import org.apache.activemq.state.TransactionState;
095import org.apache.activemq.thread.Task;
096import org.apache.activemq.thread.TaskRunner;
097import org.apache.activemq.thread.TaskRunnerFactory;
098import org.apache.activemq.transaction.Transaction;
099import org.apache.activemq.transport.DefaultTransportListener;
100import org.apache.activemq.transport.ResponseCorrelator;
101import org.apache.activemq.transport.TransmitCallback;
102import org.apache.activemq.transport.Transport;
103import org.apache.activemq.transport.TransportDisposedIOException;
104import org.apache.activemq.util.IntrospectionSupport;
105import org.apache.activemq.util.MarshallingSupport;
106import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
107import org.apache.activemq.util.SubscriptionKey;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110import org.slf4j.MDC;
111
112public class TransportConnection implements Connection, Task, CommandVisitor {
113    private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
114    private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
115    private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
116    // Keeps track of the broker and connector that created this connection.
117    protected final Broker broker;
118    protected final BrokerService brokerService;
119    protected final TransportConnector connector;
120    // Keeps track of the state of the connections.
121    // protected final ConcurrentHashMap localConnectionStates=new
122    // ConcurrentHashMap();
123    protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
124    // The broker and wireformat info that was exchanged.
125    protected BrokerInfo brokerInfo;
126    protected final List<Command> dispatchQueue = new LinkedList<>();
127    protected TaskRunner taskRunner;
128    protected final AtomicReference<Throwable> transportException = new AtomicReference<>();
129    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
130    private final Transport transport;
131    private MessageAuthorizationPolicy messageAuthorizationPolicy;
132    private WireFormatInfo wireFormatInfo;
133    // Used to do async dispatch.. this should perhaps be pushed down into the
134    // transport layer..
135    private boolean inServiceException;
136    private final ConnectionStatistics statistics = new ConnectionStatistics();
137    private boolean manageable;
138    private boolean slow;
139    private boolean markedCandidate;
140    private boolean blockedCandidate;
141    private boolean blocked;
142    private boolean connected;
143    private boolean active;
144    private boolean starting;
145    private boolean pendingStop;
146    private long timeStamp;
147    private final AtomicBoolean stopping = new AtomicBoolean(false);
148    private final CountDownLatch stopped = new CountDownLatch(1);
149    private final AtomicBoolean asyncException = new AtomicBoolean(false);
150    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>();
151    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>();
152    private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
153    private ConnectionContext context;
154    private boolean networkConnection;
155    private boolean faultTolerantConnection;
156    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
157    private DemandForwardingBridge duplexBridge;
158    private final TaskRunnerFactory taskRunnerFactory;
159    private final TaskRunnerFactory stopTaskRunnerFactory;
160    private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
161    private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
162    private String duplexNetworkConnectorId;
163
164    /**
165     * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
166     *                          else commands are sent async.
167     * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
168     */
169    public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
170                               TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
171        this.connector = connector;
172        this.broker = broker;
173        this.brokerService = broker.getBrokerService();
174
175        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
176        brokerConnectionStates = rb.getConnectionStates();
177        if (connector != null) {
178            this.statistics.setParent(connector.getStatistics());
179            this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
180        }
181        this.taskRunnerFactory = taskRunnerFactory;
182        this.stopTaskRunnerFactory = stopTaskRunnerFactory;
183        this.transport = transport;
184        if( this.transport instanceof BrokerServiceAware ) {
185            ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
186        }
187        this.transport.setTransportListener(new DefaultTransportListener() {
188            @Override
189            public void onCommand(Object o) {
190                serviceLock.readLock().lock();
191                try {
192                    if (!(o instanceof Command)) {
193                        throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
194                    }
195                    Command command = (Command) o;
196                    if (!brokerService.isStopping()) {
197                        Response response = service(command);
198                        if (response != null && !brokerService.isStopping()) {
199                            dispatchSync(response);
200                        }
201                    } else {
202                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
203                    }
204                } finally {
205                    serviceLock.readLock().unlock();
206                }
207            }
208
209            @Override
210            public void onException(IOException exception) {
211                serviceLock.readLock().lock();
212                try {
213                    serviceTransportException(exception);
214                } finally {
215                    serviceLock.readLock().unlock();
216                }
217            }
218        });
219        connected = true;
220    }
221
222    /**
223     * Returns the number of messages to be dispatched to this connection
224     *
225     * @return size of dispatch queue
226     */
227    @Override
228    public int getDispatchQueueSize() {
229        synchronized (dispatchQueue) {
230            return dispatchQueue.size();
231        }
232    }
233
234    public void serviceTransportException(IOException e) {
235        if (!stopping.get() && !pendingStop) {
236            transportException.set(e);
237            if (TRANSPORTLOG.isDebugEnabled()) {
238                TRANSPORTLOG.debug(this + " failed: " + e, e);
239            } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
240                TRANSPORTLOG.warn(this + " failed: " + e);
241            }
242            stopAsync(e);
243        }
244    }
245
246    private boolean expected(IOException e) {
247        return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
248    }
249
250    private boolean isStomp() {
251        URI uri = connector.getUri();
252        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
253    }
254
255    /**
256     * Calls the serviceException method in an async thread. Since handling a
257     * service exception closes a socket, we should not tie up broker threads
258     * since client sockets may hang or cause deadlocks.
259     */
260    @Override
261    public void serviceExceptionAsync(final IOException e) {
262        if (asyncException.compareAndSet(false, true)) {
263            new Thread("Async Exception Handler") {
264                @Override
265                public void run() {
266                    serviceException(e);
267                }
268            }.start();
269        }
270    }
271
272    /**
273     * Closes a clients connection due to a detected error. Errors are ignored
274     * if: the client is closing or broker is closing. Otherwise, the connection
275     * error transmitted to the client before stopping it's transport.
276     */
277    @Override
278    public void serviceException(Throwable e) {
279        // are we a transport exception such as not being able to dispatch
280        // synchronously to a transport
281        if (e instanceof IOException) {
282            serviceTransportException((IOException) e);
283        } else if (e.getClass() == BrokerStoppedException.class) {
284            // Handle the case where the broker is stopped
285            // But the client is still connected.
286            if (!stopping.get()) {
287                SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
288                ConnectionError ce = new ConnectionError();
289                ce.setException(e);
290                dispatchSync(ce);
291                // Record the error that caused the transport to stop
292                transportException.set(e);
293                // Wait a little bit to try to get the output buffer to flush
294                // the exception notification to the client.
295                try {
296                    Thread.sleep(500);
297                } catch (InterruptedException ie) {
298                    Thread.currentThread().interrupt();
299                }
300                // Worst case is we just kill the connection before the
301                // notification gets to him.
302                stopAsync();
303            }
304        } else if (!stopping.get() && !inServiceException) {
305            inServiceException = true;
306            try {
307                if (SERVICELOG.isDebugEnabled()) {
308                    SERVICELOG.debug("Async error occurred: " + e, e);
309                } else {
310                    SERVICELOG.warn("Async error occurred: " + e);
311                }
312                ConnectionError ce = new ConnectionError();
313                ce.setException(e);
314                if (pendingStop) {
315                    dispatchSync(ce);
316                } else {
317                    dispatchAsync(ce);
318                }
319            } finally {
320                inServiceException = false;
321            }
322        }
323    }
324
325    @Override
326    public Response service(Command command) {
327        MDC.put("activemq.connector", connector.getUri().toString());
328        Response response = null;
329        boolean responseRequired = command.isResponseRequired();
330        int commandId = command.getCommandId();
331        try {
332            if (!pendingStop) {
333                response = command.visit(this);
334            } else {
335                response = new ExceptionResponse(transportException.get());
336            }
337        } catch (Throwable e) {
338            if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
339                SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
340                        + " command: " + command + ", exception: " + e, e);
341            }
342
343            if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
344                LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
345                responseRequired = false;
346            }
347
348            if (responseRequired) {
349                if (e instanceof SecurityException || e.getCause() instanceof SecurityException) {
350                    SERVICELOG.warn("Security Error occurred on connection to: {}, {}",
351                            transport.getRemoteAddress(), e.getMessage());
352                }
353                response = new ExceptionResponse(e);
354            } else {
355                forceRollbackOnlyOnFailedAsyncTransactionOp(e, command);
356                serviceException(e);
357            }
358        }
359        if (responseRequired) {
360            if (response == null) {
361                response = new Response();
362            }
363            response.setCorrelationId(commandId);
364        }
365        // The context may have been flagged so that the response is not
366        // sent.
367        if (context != null) {
368            if (context.isDontSendReponse()) {
369                context.setDontSendReponse(false);
370                response = null;
371            }
372            context = null;
373        }
374        MDC.remove("activemq.connector");
375        return response;
376    }
377
378    private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) {
379        if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) {
380            Transaction transaction = getActiveTransaction(command);
381            if (transaction != null && !transaction.isRollbackOnly()) {
382                LOG.debug("on async exception, force rollback of transaction for: " + command, e);
383                transaction.setRollbackOnly(e);
384            }
385        }
386    }
387
388    private Transaction getActiveTransaction(Command command) {
389        Transaction transaction = null;
390        try {
391            if (command instanceof Message) {
392                Message messageSend = (Message) command;
393                ProducerId producerId = messageSend.getProducerId();
394                ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
395                transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId());
396            } else if (command instanceof  MessageAck) {
397                MessageAck messageAck = (MessageAck) command;
398                ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId());
399                if (consumerExchange != null) {
400                    transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId());
401                }
402            }
403        } catch(Exception ignored){
404            LOG.trace("failed to find active transaction for command: " + command, ignored);
405        }
406        return transaction;
407    }
408
409    private boolean isInTransaction(Command command) {
410        return command instanceof Message && ((Message)command).isInTransaction()
411                || command instanceof MessageAck && ((MessageAck)command).isInTransaction();
412    }
413
414    @Override
415    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
416        return null;
417    }
418
419    @Override
420    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
421        broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
422        return null;
423    }
424
425    @Override
426    public Response processWireFormat(WireFormatInfo info) throws Exception {
427        wireFormatInfo = info;
428        protocolVersion.set(info.getVersion());
429        return null;
430    }
431
432    @Override
433    public Response processShutdown(ShutdownInfo info) throws Exception {
434        stopAsync();
435        return null;
436    }
437
438    @Override
439    public Response processFlush(FlushCommand command) throws Exception {
440        return null;
441    }
442
443    @Override
444    public Response processBeginTransaction(TransactionInfo info) throws Exception {
445        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
446        context = null;
447        if (cs != null) {
448            context = cs.getContext();
449        }
450        if (cs == null) {
451            throw new NullPointerException("Context is null");
452        }
453        // Avoid replaying dup commands
454        if (cs.getTransactionState(info.getTransactionId()) == null) {
455            cs.addTransactionState(info.getTransactionId());
456            broker.beginTransaction(context, info.getTransactionId());
457        }
458        return null;
459    }
460
461    @Override
462    public int getActiveTransactionCount() {
463        int rc = 0;
464        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
465            Collection<TransactionState> transactions = cs.getTransactionStates();
466            for (TransactionState transaction : transactions) {
467                rc++;
468            }
469        }
470        return rc;
471    }
472
473    @Override
474    public Long getOldestActiveTransactionDuration() {
475        TransactionState oldestTX = null;
476        for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
477            Collection<TransactionState> transactions = cs.getTransactionStates();
478            for (TransactionState transaction : transactions) {
479                if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) {
480                    oldestTX = transaction;
481                }
482            }
483        }
484        if( oldestTX == null ) {
485            return null;
486        }
487        return System.currentTimeMillis() - oldestTX.getCreatedAt();
488    }
489
490    @Override
491    public Response processEndTransaction(TransactionInfo info) throws Exception {
492        // No need to do anything. This packet is just sent by the client
493        // make sure he is synced with the server as commit command could
494        // come from a different connection.
495        return null;
496    }
497
498    @Override
499    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
500        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
501        context = null;
502        if (cs != null) {
503            context = cs.getContext();
504        }
505        if (cs == null) {
506            throw new NullPointerException("Context is null");
507        }
508        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
509        if (transactionState == null) {
510            throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
511                    + info.getTransactionId());
512        }
513        // Avoid dups.
514        if (!transactionState.isPrepared()) {
515            transactionState.setPrepared(true);
516            int result = broker.prepareTransaction(context, info.getTransactionId());
517            transactionState.setPreparedResult(result);
518            if (result == XAResource.XA_RDONLY) {
519                // we are done, no further rollback or commit from TM
520                cs.removeTransactionState(info.getTransactionId());
521            }
522            IntegerResponse response = new IntegerResponse(result);
523            return response;
524        } else {
525            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
526            return response;
527        }
528    }
529
530    @Override
531    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
532        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
533        context = cs.getContext();
534        cs.removeTransactionState(info.getTransactionId());
535        broker.commitTransaction(context, info.getTransactionId(), true);
536        return null;
537    }
538
539    @Override
540    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
541        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
542        context = cs.getContext();
543        cs.removeTransactionState(info.getTransactionId());
544        broker.commitTransaction(context, info.getTransactionId(), false);
545        return null;
546    }
547
548    @Override
549    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
550        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
551        context = cs.getContext();
552        cs.removeTransactionState(info.getTransactionId());
553        broker.rollbackTransaction(context, info.getTransactionId());
554        return null;
555    }
556
557    @Override
558    public Response processForgetTransaction(TransactionInfo info) throws Exception {
559        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
560        context = cs.getContext();
561        broker.forgetTransaction(context, info.getTransactionId());
562        return null;
563    }
564
565    @Override
566    public Response processRecoverTransactions(TransactionInfo info) throws Exception {
567        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
568        context = cs.getContext();
569        TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
570        return new DataArrayResponse(preparedTransactions);
571    }
572
573    @Override
574    public Response processMessage(Message messageSend) throws Exception {
575        ProducerId producerId = messageSend.getProducerId();
576        ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
577        if (producerExchange.canDispatch(messageSend)) {
578            broker.send(producerExchange, messageSend);
579        }
580        return null;
581    }
582
583    @Override
584    public Response processMessageAck(MessageAck ack) throws Exception {
585        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
586        if (consumerExchange != null) {
587            broker.acknowledge(consumerExchange, ack);
588        } else if (ack.isInTransaction()) {
589            LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack);
590        }
591        return null;
592    }
593
594    @Override
595    public Response processMessagePull(MessagePull pull) throws Exception {
596        return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
597    }
598
599    @Override
600    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
601        broker.processDispatchNotification(notification);
602        return null;
603    }
604
605    @Override
606    public Response processAddDestination(DestinationInfo info) throws Exception {
607        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
608        broker.addDestinationInfo(cs.getContext(), info);
609        if (info.getDestination().isTemporary()) {
610            cs.addTempDestination(info);
611        }
612        return null;
613    }
614
615    @Override
616    public Response processRemoveDestination(DestinationInfo info) throws Exception {
617        TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
618        broker.removeDestinationInfo(cs.getContext(), info);
619        if (info.getDestination().isTemporary()) {
620            cs.removeTempDestination(info.getDestination());
621        }
622        return null;
623    }
624
625    @Override
626    public Response processAddProducer(ProducerInfo info) throws Exception {
627        SessionId sessionId = info.getProducerId().getParentId();
628        ConnectionId connectionId = sessionId.getParentId();
629        TransportConnectionState cs = lookupConnectionState(connectionId);
630        if (cs == null) {
631            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
632                    + connectionId);
633        }
634        SessionState ss = cs.getSessionState(sessionId);
635        if (ss == null) {
636            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
637                    + sessionId);
638        }
639        // Avoid replaying dup commands
640        if (!ss.getProducerIds().contains(info.getProducerId())) {
641            ActiveMQDestination destination = info.getDestination();
642            // Do not check for null here as it would cause the count of max producers to exclude
643            // anonymous producers.  The isAdvisoryTopic method checks for null so it is safe to
644            // call it from here with a null Destination value.
645            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
646                if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
647                    throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
648                }
649            }
650            broker.addProducer(cs.getContext(), info);
651            try {
652                ss.addProducer(info);
653            } catch (IllegalStateException e) {
654                broker.removeProducer(cs.getContext(), info);
655            }
656
657        }
658        return null;
659    }
660
661    @Override
662    public Response processRemoveProducer(ProducerId id) throws Exception {
663        SessionId sessionId = id.getParentId();
664        ConnectionId connectionId = sessionId.getParentId();
665        TransportConnectionState cs = lookupConnectionState(connectionId);
666        SessionState ss = cs.getSessionState(sessionId);
667        if (ss == null) {
668            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
669                    + sessionId);
670        }
671        ProducerState ps = ss.removeProducer(id);
672        if (ps == null) {
673            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
674        }
675        removeProducerBrokerExchange(id);
676        broker.removeProducer(cs.getContext(), ps.getInfo());
677        return null;
678    }
679
680    @Override
681    public Response processAddConsumer(ConsumerInfo info) throws Exception {
682        SessionId sessionId = info.getConsumerId().getParentId();
683        ConnectionId connectionId = sessionId.getParentId();
684        TransportConnectionState cs = lookupConnectionState(connectionId);
685        if (cs == null) {
686            throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
687                    + connectionId);
688        }
689        SessionState ss = cs.getSessionState(sessionId);
690        if (ss == null) {
691            throw new IllegalStateException(broker.getBrokerName()
692                    + " Cannot add a consumer to a session that had not been registered: " + sessionId);
693        }
694        // Avoid replaying dup commands
695        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
696            ActiveMQDestination destination = info.getDestination();
697            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
698                if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
699                    throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
700                }
701            }
702
703            broker.addConsumer(cs.getContext(), info);
704            try {
705                ss.addConsumer(info);
706                addConsumerBrokerExchange(cs, info.getConsumerId());
707            } catch (IllegalStateException e) {
708                broker.removeConsumer(cs.getContext(), info);
709            }
710
711        }
712        return null;
713    }
714
715    @Override
716    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
717        SessionId sessionId = id.getParentId();
718        ConnectionId connectionId = sessionId.getParentId();
719        TransportConnectionState cs = lookupConnectionState(connectionId);
720        if (cs == null) {
721            throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
722                    + connectionId);
723        }
724        SessionState ss = cs.getSessionState(sessionId);
725        if (ss == null) {
726            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
727                    + sessionId);
728        }
729        ConsumerState consumerState = ss.removeConsumer(id);
730        if (consumerState == null) {
731            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
732        }
733        ConsumerInfo info = consumerState.getInfo();
734        info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
735        broker.removeConsumer(cs.getContext(), consumerState.getInfo());
736        removeConsumerBrokerExchange(id);
737        return null;
738    }
739
740    @Override
741    public Response processAddSession(SessionInfo info) throws Exception {
742        ConnectionId connectionId = info.getSessionId().getParentId();
743        TransportConnectionState cs = lookupConnectionState(connectionId);
744        // Avoid replaying dup commands
745        if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
746            broker.addSession(cs.getContext(), info);
747            try {
748                cs.addSession(info);
749            } catch (IllegalStateException e) {
750                LOG.warn("Failed to add session: {}", info.getSessionId(), e);
751                broker.removeSession(cs.getContext(), info);
752            }
753        }
754        return null;
755    }
756
757    @Override
758    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
759        ConnectionId connectionId = id.getParentId();
760        TransportConnectionState cs = lookupConnectionState(connectionId);
761        if (cs == null) {
762            throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
763        }
764        SessionState session = cs.getSessionState(id);
765        if (session == null) {
766            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
767        }
768        // Don't let new consumers or producers get added while we are closing
769        // this down.
770        session.shutdown();
771        // Cascade the connection stop to the consumers and producers.
772        for (ConsumerId consumerId : session.getConsumerIds()) {
773            try {
774                processRemoveConsumer(consumerId, lastDeliveredSequenceId);
775            } catch (Throwable e) {
776                LOG.warn("Failed to remove consumer: {}", consumerId, e);
777            }
778        }
779        for (ProducerId producerId : session.getProducerIds()) {
780            try {
781                processRemoveProducer(producerId);
782            } catch (Throwable e) {
783                LOG.warn("Failed to remove producer: {}", producerId, e);
784            }
785        }
786        cs.removeSession(id);
787        broker.removeSession(cs.getContext(), session.getInfo());
788        return null;
789    }
790
791    @Override
792    public Response processAddConnection(ConnectionInfo info) throws Exception {
793        // Older clients should have been defaulting this field to true.. but
794        // they were not.
795        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
796            info.setClientMaster(true);
797        }
798        TransportConnectionState state;
799        // Make sure 2 concurrent connections by the same ID only generate 1
800        // TransportConnectionState object.
801        synchronized (brokerConnectionStates) {
802            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
803            if (state == null) {
804                state = new TransportConnectionState(info, this);
805                brokerConnectionStates.put(info.getConnectionId(), state);
806            }
807            state.incrementReference();
808        }
809        // If there are 2 concurrent connections for the same connection id,
810        // then last one in wins, we need to sync here
811        // to figure out the winner.
812        synchronized (state.getConnectionMutex()) {
813            if (state.getConnection() != this) {
814                LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress());
815                state.getConnection().stop();
816                LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress());
817                state.setConnection(this);
818                state.reset(info);
819            }
820        }
821        registerConnectionState(info.getConnectionId(), state);
822        LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info });
823        this.faultTolerantConnection = info.isFaultTolerant();
824        // Setup the context.
825        String clientId = info.getClientId();
826        context = new ConnectionContext();
827        context.setBroker(broker);
828        context.setClientId(clientId);
829        context.setClientMaster(info.isClientMaster());
830        context.setConnection(this);
831        context.setConnectionId(info.getConnectionId());
832        context.setConnector(connector);
833        context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
834        context.setNetworkConnection(networkConnection);
835        context.setFaultTolerant(faultTolerantConnection);
836        context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
837        context.setUserName(info.getUserName());
838        context.setWireFormatInfo(wireFormatInfo);
839        context.setReconnect(info.isFailoverReconnect());
840        this.manageable = info.isManageable();
841        context.setConnectionState(state);
842        state.setContext(context);
843        state.setConnection(this);
844        if (info.getClientIp() == null) {
845            info.setClientIp(getRemoteAddress());
846        }
847
848        try {
849            broker.addConnection(context, info);
850        } catch (Exception e) {
851            synchronized (brokerConnectionStates) {
852                brokerConnectionStates.remove(info.getConnectionId());
853            }
854            unregisterConnectionState(info.getConnectionId());
855            LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e);
856            if (e instanceof SecurityException) {
857                // close this down - in case the peer of this transport doesn't play nice
858                delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
859            }
860            throw e;
861        }
862        if (info.isManageable()) {
863            // send ConnectionCommand
864            ConnectionControl command = this.connector.getConnectionControl();
865            command.setFaultTolerant(broker.isFaultTolerantConfiguration());
866            if (info.isFailoverReconnect()) {
867                command.setRebalanceConnection(false);
868            }
869            dispatchAsync(command);
870        }
871        return null;
872    }
873
874    @Override
875    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
876            throws InterruptedException {
877        LOG.debug("remove connection id: {}", id);
878        TransportConnectionState cs = lookupConnectionState(id);
879        if (cs != null) {
880            // Don't allow things to be added to the connection state while we
881            // are shutting down.
882            cs.shutdown();
883            // Cascade the connection stop to the sessions.
884            for (SessionId sessionId : cs.getSessionIds()) {
885                try {
886                    processRemoveSession(sessionId, lastDeliveredSequenceId);
887                } catch (Throwable e) {
888                    SERVICELOG.warn("Failed to remove session {}", sessionId, e);
889                }
890            }
891            // Cascade the connection stop to temp destinations.
892            for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
893                DestinationInfo di = iter.next();
894                try {
895                    broker.removeDestination(cs.getContext(), di.getDestination(), 0);
896                } catch (Throwable e) {
897                    SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e);
898                }
899                iter.remove();
900            }
901            try {
902                broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get());
903            } catch (Throwable e) {
904                SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e);
905            }
906            TransportConnectionState state = unregisterConnectionState(id);
907            if (state != null) {
908                synchronized (brokerConnectionStates) {
909                    // If we are the last reference, we should remove the state
910                    // from the broker.
911                    if (state.decrementReference() == 0) {
912                        brokerConnectionStates.remove(id);
913                    }
914                }
915            }
916        }
917        return null;
918    }
919
920    @Override
921    public Response processProducerAck(ProducerAck ack) throws Exception {
922        // A broker should not get ProducerAck messages.
923        return null;
924    }
925
926    @Override
927    public Connector getConnector() {
928        return connector;
929    }
930
931    @Override
932    public void dispatchSync(Command message) {
933        try {
934            processDispatch(message);
935        } catch (IOException e) {
936            serviceExceptionAsync(e);
937        }
938    }
939
940    @Override
941    public void dispatchAsync(Command message) {
942        if (!stopping.get()) {
943            if (taskRunner == null) {
944                dispatchSync(message);
945            } else {
946                synchronized (dispatchQueue) {
947                    dispatchQueue.add(message);
948                }
949                try {
950                    taskRunner.wakeup();
951                } catch (InterruptedException e) {
952                    Thread.currentThread().interrupt();
953                }
954            }
955        } else {
956            if (message.isMessageDispatch()) {
957                MessageDispatch md = (MessageDispatch) message;
958                TransmitCallback sub = md.getTransmitCallback();
959                broker.postProcessDispatch(md);
960                if (sub != null) {
961                    sub.onFailure();
962                }
963            }
964        }
965    }
966
967    protected void processDispatch(Command command) throws IOException {
968        MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
969        try {
970            if (!stopping.get()) {
971                if (messageDispatch != null) {
972                    try {
973                        broker.preProcessDispatch(messageDispatch);
974                    } catch (RuntimeException convertToIO) {
975                        throw new IOException(convertToIO);
976                    }
977                }
978                dispatch(command);
979            }
980        } catch (IOException e) {
981            if (messageDispatch != null) {
982                TransmitCallback sub = messageDispatch.getTransmitCallback();
983                broker.postProcessDispatch(messageDispatch);
984                if (sub != null) {
985                    sub.onFailure();
986                }
987                messageDispatch = null;
988                throw e;
989            }
990        } finally {
991            if (messageDispatch != null) {
992                TransmitCallback sub = messageDispatch.getTransmitCallback();
993                broker.postProcessDispatch(messageDispatch);
994                if (sub != null) {
995                    sub.onSuccess();
996                }
997            }
998        }
999    }
1000
1001    @Override
1002    public boolean iterate() {
1003        try {
1004            if (pendingStop || stopping.get()) {
1005                if (dispatchStopped.compareAndSet(false, true)) {
1006                    if (transportException.get() == null) {
1007                        try {
1008                            dispatch(new ShutdownInfo());
1009                        } catch (Throwable ignore) {
1010                        }
1011                    }
1012                    dispatchStoppedLatch.countDown();
1013                }
1014                return false;
1015            }
1016            if (!dispatchStopped.get()) {
1017                Command command = null;
1018                synchronized (dispatchQueue) {
1019                    if (dispatchQueue.isEmpty()) {
1020                        return false;
1021                    }
1022                    command = dispatchQueue.remove(0);
1023                }
1024                processDispatch(command);
1025                return true;
1026            }
1027            return false;
1028        } catch (IOException e) {
1029            if (dispatchStopped.compareAndSet(false, true)) {
1030                dispatchStoppedLatch.countDown();
1031            }
1032            serviceExceptionAsync(e);
1033            return false;
1034        }
1035    }
1036
1037    /**
1038     * Returns the statistics for this connection
1039     */
1040    @Override
1041    public ConnectionStatistics getStatistics() {
1042        return statistics;
1043    }
1044
1045    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1046        return messageAuthorizationPolicy;
1047    }
1048
1049    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1050        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1051    }
1052
1053    @Override
1054    public boolean isManageable() {
1055        return manageable;
1056    }
1057
1058    @Override
1059    public void start() throws Exception {
1060        try {
1061            synchronized (this) {
1062                starting = true;
1063                if (taskRunnerFactory != null) {
1064                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
1065                            + getRemoteAddress());
1066                } else {
1067                    taskRunner = null;
1068                }
1069                transport.start();
1070                active = true;
1071                BrokerInfo info = connector.getBrokerInfo().copy();
1072                if (connector.isUpdateClusterClients()) {
1073                    info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
1074                } else {
1075                    info.setPeerBrokerInfos(null);
1076                }
1077                dispatchAsync(info);
1078
1079                connector.onStarted(this);
1080            }
1081        } catch (Exception e) {
1082            // Force clean up on an error starting up.
1083            pendingStop = true;
1084            throw e;
1085        } finally {
1086            // stop() can be called from within the above block,
1087            // but we want to be sure start() completes before
1088            // stop() runs, so queue the stop until right now:
1089            setStarting(false);
1090            if (isPendingStop()) {
1091                LOG.debug("Calling the delayed stop() after start() {}", this);
1092                stop();
1093            }
1094        }
1095    }
1096
1097    @Override
1098    public void stop() throws Exception {
1099        // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
1100        // as their lifecycle is handled elsewhere
1101
1102        stopAsync();
1103        while (!stopped.await(5, TimeUnit.SECONDS)) {
1104            LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress());
1105        }
1106    }
1107
1108    public void delayedStop(final int waitTime, final String reason, Throwable cause) {
1109        if (waitTime > 0) {
1110            synchronized (this) {
1111                pendingStop = true;
1112                transportException.set(cause);
1113            }
1114            try {
1115                stopTaskRunnerFactory.execute(new Runnable() {
1116                    @Override
1117                    public void run() {
1118                        try {
1119                            Thread.sleep(waitTime);
1120                            stopAsync();
1121                            LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason);
1122                        } catch (InterruptedException e) {
1123                        }
1124                    }
1125                });
1126            } catch (Throwable t) {
1127                LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
1128            }
1129        }
1130    }
1131
1132    public void stopAsync(Throwable cause) {
1133        transportException.set(cause);
1134        stopAsync();
1135    }
1136
1137    public void stopAsync() {
1138        // If we're in the middle of starting then go no further... for now.
1139        synchronized (this) {
1140            pendingStop = true;
1141            if (starting) {
1142                LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
1143                return;
1144            }
1145        }
1146        if (stopping.compareAndSet(false, true)) {
1147            // Let all the connection contexts know we are shutting down
1148            // so that in progress operations can notice and unblock.
1149            List<TransportConnectionState> connectionStates = listConnectionStates();
1150            for (TransportConnectionState cs : connectionStates) {
1151                ConnectionContext connectionContext = cs.getContext();
1152                if (connectionContext != null) {
1153                    connectionContext.getStopping().set(true);
1154                }
1155            }
1156            try {
1157                stopTaskRunnerFactory.execute(new Runnable() {
1158                    @Override
1159                    public void run() {
1160                        serviceLock.writeLock().lock();
1161                        try {
1162                            doStop();
1163                        } catch (Throwable e) {
1164                            LOG.debug("Error occurred while shutting down a connection {}", this, e);
1165                        } finally {
1166                            stopped.countDown();
1167                            serviceLock.writeLock().unlock();
1168                        }
1169                    }
1170                });
1171            } catch (Throwable t) {
1172                LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1173                stopped.countDown();
1174            }
1175        }
1176    }
1177
1178    @Override
1179    public String toString() {
1180        return "Transport Connection to: " + transport.getRemoteAddress();
1181    }
1182
1183    protected void doStop() throws Exception {
1184        LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1185        connector.onStopped(this);
1186        try {
1187            synchronized (this) {
1188                if (duplexBridge != null) {
1189                    duplexBridge.stop();
1190                }
1191            }
1192        } catch (Exception ignore) {
1193            LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1194        }
1195        try {
1196            transport.stop();
1197            LOG.debug("Stopped transport: {}", transport.getRemoteAddress());
1198        } catch (Exception e) {
1199            LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e);
1200        }
1201        if (taskRunner != null) {
1202            taskRunner.shutdown(1);
1203            taskRunner = null;
1204        }
1205        active = false;
1206        // Run the MessageDispatch callbacks so that message references get
1207        // cleaned up.
1208        synchronized (dispatchQueue) {
1209            for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1210                Command command = iter.next();
1211                if (command.isMessageDispatch()) {
1212                    MessageDispatch md = (MessageDispatch) command;
1213                    TransmitCallback sub = md.getTransmitCallback();
1214                    broker.postProcessDispatch(md);
1215                    if (sub != null) {
1216                        sub.onFailure();
1217                    }
1218                }
1219            }
1220            dispatchQueue.clear();
1221        }
1222        //
1223        // Remove all logical connection associated with this connection
1224        // from the broker.
1225        if (!broker.isStopped()) {
1226            List<TransportConnectionState> connectionStates = listConnectionStates();
1227            connectionStates = listConnectionStates();
1228            for (TransportConnectionState cs : connectionStates) {
1229                cs.getContext().getStopping().set(true);
1230                try {
1231                    LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1232                    processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
1233                } catch (Throwable ignore) {
1234                    LOG.debug("Exception caught removing connection {}. This exception is ignored.", cs.getInfo().getConnectionId(), ignore);
1235                }
1236            }
1237        }
1238        LOG.debug("Connection Stopped: {}", getRemoteAddress());
1239    }
1240
1241    /**
1242     * @return Returns the blockedCandidate.
1243     */
1244    public boolean isBlockedCandidate() {
1245        return blockedCandidate;
1246    }
1247
1248    /**
1249     * @param blockedCandidate The blockedCandidate to set.
1250     */
1251    public void setBlockedCandidate(boolean blockedCandidate) {
1252        this.blockedCandidate = blockedCandidate;
1253    }
1254
1255    /**
1256     * @return Returns the markedCandidate.
1257     */
1258    public boolean isMarkedCandidate() {
1259        return markedCandidate;
1260    }
1261
1262    /**
1263     * @param markedCandidate The markedCandidate to set.
1264     */
1265    public void setMarkedCandidate(boolean markedCandidate) {
1266        this.markedCandidate = markedCandidate;
1267        if (!markedCandidate) {
1268            timeStamp = 0;
1269            blockedCandidate = false;
1270        }
1271    }
1272
1273    /**
1274     * @param slow The slow to set.
1275     */
1276    public void setSlow(boolean slow) {
1277        this.slow = slow;
1278    }
1279
1280    /**
1281     * @return true if the Connection is slow
1282     */
1283    @Override
1284    public boolean isSlow() {
1285        return slow;
1286    }
1287
1288    /**
1289     * @return true if the Connection is potentially blocked
1290     */
1291    public boolean isMarkedBlockedCandidate() {
1292        return markedCandidate;
1293    }
1294
1295    /**
1296     * Mark the Connection, so we can deem if it's collectable on the next sweep
1297     */
1298    public void doMark() {
1299        if (timeStamp == 0) {
1300            timeStamp = System.currentTimeMillis();
1301        }
1302    }
1303
1304    /**
1305     * @return if after being marked, the Connection is still writing
1306     */
1307    @Override
1308    public boolean isBlocked() {
1309        return blocked;
1310    }
1311
1312    /**
1313     * @return true if the Connection is connected
1314     */
1315    @Override
1316    public boolean isConnected() {
1317        return connected;
1318    }
1319
1320    /**
1321     * @param blocked The blocked to set.
1322     */
1323    public void setBlocked(boolean blocked) {
1324        this.blocked = blocked;
1325    }
1326
1327    /**
1328     * @param connected The connected to set.
1329     */
1330    public void setConnected(boolean connected) {
1331        this.connected = connected;
1332    }
1333
1334    /**
1335     * @return true if the Connection is active
1336     */
1337    @Override
1338    public boolean isActive() {
1339        return active;
1340    }
1341
1342    /**
1343     * @param active The active to set.
1344     */
1345    public void setActive(boolean active) {
1346        this.active = active;
1347    }
1348
1349    /**
1350     * @return true if the Connection is starting
1351     */
1352    public synchronized boolean isStarting() {
1353        return starting;
1354    }
1355
1356    @Override
1357    public synchronized boolean isNetworkConnection() {
1358        return networkConnection;
1359    }
1360
1361    @Override
1362    public boolean isFaultTolerantConnection() {
1363        return this.faultTolerantConnection;
1364    }
1365
1366    protected synchronized void setStarting(boolean starting) {
1367        this.starting = starting;
1368    }
1369
1370    /**
1371     * @return true if the Connection needs to stop
1372     */
1373    public synchronized boolean isPendingStop() {
1374        return pendingStop;
1375    }
1376
1377    protected synchronized void setPendingStop(boolean pendingStop) {
1378        this.pendingStop = pendingStop;
1379    }
1380
1381    public static BrokerSubscriptionInfo getBrokerSubscriptionInfo(final BrokerService brokerService) {
1382        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
1383        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
1384        List<ConsumerInfo> subscriptionInfos = new ArrayList<>();
1385        for (SubscriptionKey key : topicRegion.getDurableSubscriptions().keySet()) {
1386            DurableTopicSubscription sub = topicRegion.getDurableSubscriptions().get(key);
1387            if (sub != null) {
1388                ConsumerInfo ci = sub.getConsumerInfo().copy();
1389                ci.setClientId(key.getClientId());
1390                subscriptionInfos.add(ci);
1391            }
1392        }
1393        BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(brokerService.getBrokerName());
1394        bsi.setSubscriptionInfos(subscriptionInfos.toArray(new ConsumerInfo[0]));
1395        return bsi;
1396    }
1397
1398    private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
1399        Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1400        Map<String, String> props = createMap(properties);
1401        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1402        IntrospectionSupport.setProperties(config, props, "");
1403        return config;
1404    }
1405
1406    @Override
1407    public Response processBrokerInfo(BrokerInfo info) {
1408        if (info.isSlaveBroker()) {
1409            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1410        } else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
1411            try {
1412                NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1413                if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1414                    LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1415                    dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
1416                }
1417            } catch (Exception e) {
1418                LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
1419                return null;
1420            }
1421        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1422            // so this TransportConnection is the rear end of a network bridge
1423            // We have been requested to create a two way pipe ...
1424            try {
1425                NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1426                config.setBrokerName(broker.getBrokerName());
1427
1428                if (config.isSyncDurableSubs() && protocolVersion.get() >= 12) {
1429                    LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1430                    dispatchSync(getBrokerSubscriptionInfo(this.broker.getBrokerService()));
1431                }
1432
1433                // check for existing duplex connection hanging about
1434
1435                // We first look if existing network connection already exists for the same broker Id and network connector name
1436                // It's possible in case of brief network fault to have this transport connector side of the connection always active
1437                // and the duplex network connector side wanting to open a new one
1438                // In this case, the old connection must be broken
1439                String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1440                CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1441                synchronized (connections) {
1442                    for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1443                        TransportConnection c = iter.next();
1444                        if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1445                            LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1446                            c.stopAsync();
1447                            // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1448                            c.getStopped().await(1, TimeUnit.SECONDS);
1449                        }
1450                    }
1451                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1452                }
1453                Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
1454                Transport remoteBridgeTransport = transport;
1455                if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
1456                    // the vm transport case is already wrapped
1457                    remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
1458                }
1459                String duplexName = localTransport.toString();
1460                if (duplexName.contains("#")) {
1461                    duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1462                }
1463                MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
1464                listener.setCreatedByDuplex(true);
1465                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1466                duplexBridge.setBrokerService(brokerService);
1467                //Need to set durableDestinations to properly restart subs when dynamicOnly=false
1468                duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(
1469                        broker.getDurableDestinations()));
1470
1471                // now turn duplex off this side
1472                info.setDuplexConnection(false);
1473                duplexBridge.setCreatedByDuplex(true);
1474                duplexBridge.duplexStart(this, brokerInfo, info);
1475                LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1476                return null;
1477            } catch (TransportDisposedIOException e) {
1478                LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1479                return null;
1480            } catch (Exception e) {
1481                LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1482                return null;
1483            }
1484        }
1485        // We only expect to get one broker info command per connection
1486        if (this.brokerInfo != null) {
1487            LOG.warn("Unexpected extra broker info command received: {}", info);
1488        }
1489        this.brokerInfo = info;
1490        networkConnection = true;
1491        List<TransportConnectionState> connectionStates = listConnectionStates();
1492        for (TransportConnectionState cs : connectionStates) {
1493            cs.getContext().setNetworkConnection(true);
1494        }
1495        return null;
1496    }
1497
1498    @SuppressWarnings({"unchecked", "rawtypes"})
1499    private HashMap<String, String> createMap(Properties properties) {
1500        return new HashMap(properties);
1501    }
1502
1503    protected void dispatch(Command command) throws IOException {
1504        try {
1505            setMarkedCandidate(true);
1506            transport.oneway(command);
1507        } finally {
1508            setMarkedCandidate(false);
1509        }
1510    }
1511
1512    @Override
1513    public String getRemoteAddress() {
1514        return transport.getRemoteAddress();
1515    }
1516
1517    public Transport getTransport() {
1518        return transport;
1519    }
1520
1521    @Override
1522    public String getConnectionId() {
1523        List<TransportConnectionState> connectionStates = listConnectionStates();
1524        for (TransportConnectionState cs : connectionStates) {
1525            if (cs.getInfo().getClientId() != null) {
1526                return cs.getInfo().getClientId();
1527            }
1528            return cs.getInfo().getConnectionId().toString();
1529        }
1530        return null;
1531    }
1532
1533    @Override
1534    public void updateClient(ConnectionControl control) {
1535        if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1536                && this.wireFormatInfo.getVersion() >= 6) {
1537            dispatchAsync(control);
1538        }
1539    }
1540
1541    public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
1542        ProducerBrokerExchange result = null;
1543        if (producerInfo != null && producerInfo.getProducerId() != null){
1544            synchronized (producerExchanges){
1545                result = producerExchanges.get(producerInfo.getProducerId());
1546            }
1547        }
1548        return result;
1549    }
1550
1551    private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1552        ProducerBrokerExchange result = producerExchanges.get(id);
1553        if (result == null) {
1554            synchronized (producerExchanges) {
1555                result = new ProducerBrokerExchange();
1556                TransportConnectionState state = lookupConnectionState(id);
1557                context = state.getContext();
1558                result.setConnectionContext(context);
1559                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1560                    result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id));
1561                }
1562                SessionState ss = state.getSessionState(id.getParentId());
1563                if (ss != null) {
1564                    result.setProducerState(ss.getProducerState(id));
1565                    ProducerState producerState = ss.getProducerState(id);
1566                    if (producerState != null && producerState.getInfo() != null) {
1567                        ProducerInfo info = producerState.getInfo();
1568                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1569                    }
1570                }
1571                producerExchanges.put(id, result);
1572            }
1573        } else {
1574            context = result.getConnectionContext();
1575        }
1576        return result;
1577    }
1578
1579    private void removeProducerBrokerExchange(ProducerId id) {
1580        synchronized (producerExchanges) {
1581            producerExchanges.remove(id);
1582        }
1583    }
1584
1585    private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1586        ConsumerBrokerExchange result = consumerExchanges.get(id);
1587        return result;
1588    }
1589
1590    private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) {
1591        ConsumerBrokerExchange result = consumerExchanges.get(id);
1592        if (result == null) {
1593            synchronized (consumerExchanges) {
1594                result = new ConsumerBrokerExchange();
1595                context = connectionState.getContext();
1596                result.setConnectionContext(context);
1597                SessionState ss = connectionState.getSessionState(id.getParentId());
1598                if (ss != null) {
1599                    ConsumerState cs = ss.getConsumerState(id);
1600                    if (cs != null) {
1601                        ConsumerInfo info = cs.getInfo();
1602                        if (info != null) {
1603                            if (info.getDestination() != null && info.getDestination().isPattern()) {
1604                                result.setWildcard(true);
1605                            }
1606                        }
1607                    }
1608                }
1609                consumerExchanges.put(id, result);
1610            }
1611        }
1612        return result;
1613    }
1614
1615    private void removeConsumerBrokerExchange(ConsumerId id) {
1616        synchronized (consumerExchanges) {
1617            consumerExchanges.remove(id);
1618        }
1619    }
1620
1621    public int getProtocolVersion() {
1622        return protocolVersion.get();
1623    }
1624
1625    @Override
1626    public Response processControlCommand(ControlCommand command) throws Exception {
1627        return null;
1628    }
1629
1630    @Override
1631    public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1632        return null;
1633    }
1634
1635    @Override
1636    public Response processConnectionControl(ConnectionControl control) throws Exception {
1637        if (control != null) {
1638            faultTolerantConnection = control.isFaultTolerant();
1639        }
1640        return null;
1641    }
1642
1643    @Override
1644    public Response processConnectionError(ConnectionError error) throws Exception {
1645        return null;
1646    }
1647
1648    @Override
1649    public Response processConsumerControl(ConsumerControl control) throws Exception {
1650        ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1651        broker.processConsumerControl(consumerExchange, control);
1652        return null;
1653    }
1654
1655    protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1656                                                                            TransportConnectionState state) {
1657        TransportConnectionState cs = null;
1658        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1659            // swap implementations
1660            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1661            newRegister.intialize(connectionStateRegister);
1662            connectionStateRegister = newRegister;
1663        }
1664        cs = connectionStateRegister.registerConnectionState(connectionId, state);
1665        return cs;
1666    }
1667
1668    protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1669        return connectionStateRegister.unregisterConnectionState(connectionId);
1670    }
1671
1672    protected synchronized List<TransportConnectionState> listConnectionStates() {
1673        return connectionStateRegister.listConnectionStates();
1674    }
1675
1676    protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1677        return connectionStateRegister.lookupConnectionState(connectionId);
1678    }
1679
1680    protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1681        return connectionStateRegister.lookupConnectionState(id);
1682    }
1683
1684    protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1685        return connectionStateRegister.lookupConnectionState(id);
1686    }
1687
1688    protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1689        return connectionStateRegister.lookupConnectionState(id);
1690    }
1691
1692    // public only for testing
1693    public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1694        return connectionStateRegister.lookupConnectionState(connectionId);
1695    }
1696
1697    protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1698        this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1699    }
1700
1701    protected synchronized String getDuplexNetworkConnectorId() {
1702        return this.duplexNetworkConnectorId;
1703    }
1704
1705    public boolean isStopping() {
1706        return stopping.get();
1707    }
1708
1709    protected CountDownLatch getStopped() {
1710        return stopped;
1711    }
1712
1713    private int getProducerCount(ConnectionId connectionId) {
1714        int result = 0;
1715        TransportConnectionState cs = lookupConnectionState(connectionId);
1716        if (cs != null) {
1717            for (SessionId sessionId : cs.getSessionIds()) {
1718                SessionState sessionState = cs.getSessionState(sessionId);
1719                if (sessionState != null) {
1720                    result += sessionState.getProducerIds().size();
1721                }
1722            }
1723        }
1724        return result;
1725    }
1726
1727    private int getConsumerCount(ConnectionId connectionId) {
1728        int result = 0;
1729        TransportConnectionState cs = lookupConnectionState(connectionId);
1730        if (cs != null) {
1731            for (SessionId sessionId : cs.getSessionIds()) {
1732                SessionState sessionState = cs.getSessionState(sessionId);
1733                if (sessionState != null) {
1734                    result += sessionState.getConsumerIds().size();
1735                }
1736            }
1737        }
1738        return result;
1739    }
1740
1741    public WireFormatInfo getRemoteWireFormatInfo() {
1742        return wireFormatInfo;
1743    }
1744
1745    /* (non-Javadoc)
1746     * @see org.apache.activemq.state.CommandVisitor#processBrokerSubscriptionInfo(org.apache.activemq.command.BrokerSubscriptionInfo)
1747     */
1748    @Override
1749    public Response processBrokerSubscriptionInfo(BrokerSubscriptionInfo info) throws Exception {
1750        return null;
1751    }
1752}