001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicInteger;
024import java.util.concurrent.atomic.AtomicLong;
025
026import javax.jms.JMSException;
027
028import org.apache.activemq.ActiveMQMessageAudit;
029import org.apache.activemq.broker.Broker;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
034import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
035import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
036import org.apache.activemq.command.ConsumerControl;
037import org.apache.activemq.command.ConsumerInfo;
038import org.apache.activemq.command.Message;
039import org.apache.activemq.command.MessageAck;
040import org.apache.activemq.command.MessageDispatch;
041import org.apache.activemq.command.MessageDispatchNotification;
042import org.apache.activemq.command.MessageId;
043import org.apache.activemq.command.MessagePull;
044import org.apache.activemq.command.Response;
045import org.apache.activemq.thread.Scheduler;
046import org.apache.activemq.transaction.Synchronization;
047import org.apache.activemq.transport.TransmitCallback;
048import org.apache.activemq.usage.SystemUsage;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052public class TopicSubscription extends AbstractSubscription {
053
054    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
055    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
056
057    protected PendingMessageCursor matched;
058    protected final SystemUsage usageManager;
059    boolean singleDestination = true;
060    Destination destination;
061    private final Scheduler scheduler;
062
063    private int maximumPendingMessages = -1;
064    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
065    private int discarded;
066    private final Object matchedListMutex = new Object();
067    private int memoryUsageHighWaterMark = 95;
068    // allow duplicate suppression in a ring network of brokers
069    protected int maxProducersToAudit = 1024;
070    protected int maxAuditDepth = 1000;
071    protected boolean enableAudit = false;
072    protected ActiveMQMessageAudit audit;
073    protected boolean active = false;
074    protected boolean discarding = false;
075
076    //Used for inflight message size calculations
077    protected final Object dispatchLock = new Object();
078    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
079
080    public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
081        super(broker, context, info);
082        this.usageManager = usageManager;
083        String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
084        if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) {
085            this.matched = new VMPendingMessageCursor(false);
086        } else {
087            this.matched = new FilePendingMessageCursor(broker,matchedName,false);
088        }
089
090        this.scheduler = broker.getScheduler();
091    }
092
093    public void init() throws Exception {
094        this.matched.setSystemUsage(usageManager);
095        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
096        this.matched.start();
097        if (enableAudit) {
098            audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
099        }
100        this.active=true;
101    }
102
103    @Override
104    public void add(MessageReference node) throws Exception {
105        if (isDuplicate(node)) {
106            return;
107        }
108        // Lets use an indirect reference so that we can associate a unique
109        // locator /w the message.
110        node = new IndirectMessageReference(node.getMessage());
111        getSubscriptionStatistics().getEnqueues().increment();
112        synchronized (matchedListMutex) {
113            // if this subscriber is already discarding a message, we don't want to add
114            // any more messages to it as those messages can only be advisories generated in the process,
115            // which can trigger the recursive call loop
116            if (discarding) return;
117
118            if (!isFull() && matched.isEmpty()) {
119                // if maximumPendingMessages is set we will only discard messages which
120                // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
121                dispatch(node);
122                setSlowConsumer(false);
123            } else {
124                if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
125                    // Slow consumers should log and set their state as such.
126                    if (!isSlowConsumer()) {
127                        LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
128                        setSlowConsumer(true);
129                        for (Destination dest: destinations) {
130                            dest.slowConsumer(getContext(), this);
131                        }
132                    }
133                }
134                if (maximumPendingMessages != 0) {
135                    boolean warnedAboutWait = false;
136                    while (active) {
137                        while (matched.isFull()) {
138                            if (getContext().getStopping().get()) {
139                                LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
140                                getSubscriptionStatistics().getEnqueues().decrement();
141                                return;
142                            }
143                            if (!warnedAboutWait) {
144                                LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
145                                        new Object[]{
146                                                toString(),
147                                                matched,
148                                                matched.getSystemUsage().getTempUsage().getPercentUsage(),
149                                                matched.getSystemUsage().getMemoryUsage().getPercentUsage()
150                                        });
151                                warnedAboutWait = true;
152                            }
153                            matchedListMutex.wait(20);
154                        }
155                        // Temporary storage could be full - so just try to add the message
156                        // see https://issues.apache.org/activemq/browse/AMQ-2475
157                        if (matched.tryAddMessageLast(node, 10)) {
158                            break;
159                        }
160                    }
161                    if (maximumPendingMessages > 0) {
162                        // calculate the high water mark from which point we
163                        // will eagerly evict expired messages
164                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
165                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
166                            max = maximumPendingMessages;
167                        }
168                        if (!matched.isEmpty() && matched.size() > max) {
169                            removeExpiredMessages();
170                        }
171                        // lets discard old messages as we are a slow consumer
172                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
173                            int pageInSize = matched.size() - maximumPendingMessages;
174                            // only page in a 1000 at a time - else we could blow the memory
175                            pageInSize = Math.max(1000, pageInSize);
176                            LinkedList<MessageReference> list = null;
177                            MessageReference[] oldMessages=null;
178                            synchronized(matched){
179                                list = matched.pageInList(pageInSize);
180                                oldMessages = messageEvictionStrategy.evictMessages(list);
181                                for (MessageReference ref : list) {
182                                    ref.decrementReferenceCount();
183                                }
184                            }
185                            int messagesToEvict = 0;
186                            if (oldMessages != null){
187                                messagesToEvict = oldMessages.length;
188                                for (int i = 0; i < messagesToEvict; i++) {
189                                    MessageReference oldMessage = oldMessages[i];
190                                    discard(oldMessage);
191                                }
192                            }
193                            // lets avoid an infinite loop if we are given a bad eviction strategy
194                            // for a bad strategy lets just not evict
195                            if (messagesToEvict == 0) {
196                                LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{
197                                        destination, messageEvictionStrategy, list.size()
198                                });
199                                break;
200                            }
201                        }
202                    }
203                    dispatchMatched();
204                }
205            }
206        }
207    }
208
209    private boolean isDuplicate(MessageReference node) {
210        boolean duplicate = false;
211        if (enableAudit && audit != null) {
212            duplicate = audit.isDuplicate(node);
213            if (LOG.isDebugEnabled()) {
214                if (duplicate) {
215                    LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId());
216                }
217            }
218        }
219        return duplicate;
220    }
221
222    /**
223     * Discard any expired messages from the matched list. Called from a
224     * synchronized block.
225     *
226     * @throws IOException
227     */
228    protected void removeExpiredMessages() throws IOException {
229        try {
230            matched.reset();
231            while (matched.hasNext()) {
232                MessageReference node = matched.next();
233                node.decrementReferenceCount();
234                if (node.isExpired()) {
235                    matched.remove();
236                    getSubscriptionStatistics().getDispatched().increment();
237                    node.decrementReferenceCount();
238                    if (broker.isExpired(node)) {
239                        ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
240                        broker.messageExpired(getContext(), node, this);
241                    }
242                    break;
243                }
244            }
245        } finally {
246            matched.release();
247        }
248    }
249
250    @Override
251    public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
252        synchronized (matchedListMutex) {
253            try {
254                matched.reset();
255                while (matched.hasNext()) {
256                    MessageReference node = matched.next();
257                    node.decrementReferenceCount();
258                    if (node.getMessageId().equals(mdn.getMessageId())) {
259                        synchronized(dispatchLock) {
260                            matched.remove();
261                            getSubscriptionStatistics().getDispatched().increment();
262                            dispatched.add(node);
263                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
264                            node.decrementReferenceCount();
265                        }
266                        break;
267                    }
268                }
269            } finally {
270                matched.release();
271            }
272        }
273    }
274
275    @Override
276    public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
277        super.acknowledge(context, ack);
278
279        // Handle the standard acknowledgment case.
280        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
281            if (context.isInTransaction()) {
282                context.getTransaction().addSynchronization(new Synchronization() {
283                    @Override
284                    public void afterCommit() throws Exception {
285                        updateStatsOnAck(ack);
286                        dispatchMatched();
287                    }
288                });
289            } else {
290                updateStatsOnAck(ack);
291            }
292            updatePrefetch(ack);
293            dispatchMatched();
294            return;
295        } else if (ack.isDeliveredAck()) {
296            // Message was delivered but not acknowledged: update pre-fetch counters.
297            prefetchExtension.addAndGet(ack.getMessageCount());
298            dispatchMatched();
299            return;
300        } else if (ack.isExpiredAck()) {
301            updateStatsOnAck(ack);
302            updatePrefetch(ack);
303            dispatchMatched();
304            return;
305        } else if (ack.isRedeliveredAck()) {
306            // nothing to do atm
307            return;
308        }
309        throw new JMSException("Invalid acknowledgment: " + ack);
310    }
311
312    @Override
313    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
314
315        // The slave should not deliver pull messages.
316        if (getPrefetchSize() == 0) {
317
318            final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount();
319            prefetchExtension.set(pull.getQuantity());
320            dispatchMatched();
321
322            // If there was nothing dispatched.. we may need to setup a timeout.
323            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
324
325                // immediate timeout used by receiveNoWait()
326                if (pull.getTimeout() == -1) {
327                    // Send a NULL message to signal nothing pending.
328                    dispatch(null);
329                    prefetchExtension.set(0);
330                }
331
332                if (pull.getTimeout() > 0) {
333                    scheduler.executeAfterDelay(new Runnable() {
334
335                        @Override
336                        public void run() {
337                            pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone());
338                        }
339                    }, pull.getTimeout());
340                }
341            }
342        }
343        return null;
344    }
345
346    /**
347     * Occurs when a pull times out. If nothing has been dispatched since the
348     * timeout was setup, then send the NULL message.
349     */
350    private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
351        synchronized (matchedListMutex) {
352            if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) {
353                try {
354                    dispatch(null);
355                } catch (Exception e) {
356                    context.getConnection().serviceException(e);
357                } finally {
358                    prefetchExtension.set(0);
359                }
360            }
361        }
362    }
363
364    /**
365     * Update the statistics on message ack.
366     * @param ack
367     */
368    private void updateStatsOnAck(final MessageAck ack) {
369        synchronized(dispatchLock) {
370            boolean inAckRange = false;
371            List<MessageReference> removeList = new ArrayList<MessageReference>();
372            for (final MessageReference node : dispatched) {
373                MessageId messageId = node.getMessageId();
374                if (ack.getFirstMessageId() == null
375                        || ack.getFirstMessageId().equals(messageId)) {
376                    inAckRange = true;
377                }
378                if (inAckRange) {
379                    removeList.add(node);
380                    if (ack.getLastMessageId().equals(messageId)) {
381                        break;
382                    }
383                }
384            }
385
386            for (final MessageReference node : removeList) {
387                dispatched.remove(node);
388                getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
389                getSubscriptionStatistics().getDequeues().increment();
390                ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
391                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
392                if (info.isNetworkSubscription()) {
393                    ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
394                }
395                if (ack.isExpiredAck()) {
396                    destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
397                }
398            }
399        }
400    }
401
402    private void updatePrefetch(MessageAck ack) {
403        while (true) {
404            int currentExtension = prefetchExtension.get();
405            int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
406            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
407                break;
408            }
409        }
410    }
411
412    private void decrementPrefetchExtension() {
413        while (true) {
414            int currentExtension = prefetchExtension.get();
415            int newExtension = Math.max(0, currentExtension - 1);
416            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
417                break;
418            }
419        }
420    }
421
422    @Override
423    public int countBeforeFull() {
424        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
425    }
426
427    @Override
428    public int getPendingQueueSize() {
429        return matched();
430    }
431
432    @Override
433    public long getPendingMessageSize() {
434        synchronized (matchedListMutex) {
435            return matched.messageSize();
436        }
437    }
438
439    @Override
440    public int getDispatchedQueueSize() {
441        return (int)(getSubscriptionStatistics().getDispatched().getCount() -
442                prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());
443    }
444
445    public int getMaximumPendingMessages() {
446        return maximumPendingMessages;
447    }
448
449    @Override
450    public long getDispatchedCounter() {
451        return getSubscriptionStatistics().getDispatched().getCount();
452    }
453
454    @Override
455    public long getEnqueueCounter() {
456        return getSubscriptionStatistics().getEnqueues().getCount();
457    }
458
459    @Override
460    public long getDequeueCounter() {
461        return getSubscriptionStatistics().getDequeues().getCount();
462    }
463
464    /**
465     * @return the number of messages discarded due to being a slow consumer
466     */
467    public int discarded() {
468        synchronized (matchedListMutex) {
469            return discarded;
470        }
471    }
472
473    /**
474     * @return the number of matched messages (messages targeted for the
475     *         subscription but not yet able to be dispatched due to the
476     *         prefetch buffer being full).
477     */
478    public int matched() {
479        synchronized (matchedListMutex) {
480            return matched.size();
481        }
482    }
483
484    /**
485     * Sets the maximum number of pending messages that can be matched against
486     * this consumer before old messages are discarded.
487     */
488    public void setMaximumPendingMessages(int maximumPendingMessages) {
489        this.maximumPendingMessages = maximumPendingMessages;
490    }
491
492    public MessageEvictionStrategy getMessageEvictionStrategy() {
493        return messageEvictionStrategy;
494    }
495
496    /**
497     * Sets the eviction strategy used to decide which message to evict when the
498     * slow consumer needs to discard messages
499     */
500    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
501        this.messageEvictionStrategy = messageEvictionStrategy;
502    }
503
504    public int getMaxProducersToAudit() {
505        return maxProducersToAudit;
506    }
507
508    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
509        this.maxProducersToAudit = maxProducersToAudit;
510        if (audit != null) {
511            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
512        }
513    }
514
515    public int getMaxAuditDepth() {
516        return maxAuditDepth;
517    }
518
519    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
520        this.maxAuditDepth = maxAuditDepth;
521        if (audit != null) {
522            audit.setAuditDepth(maxAuditDepth);
523        }
524    }
525
526    public boolean isEnableAudit() {
527        return enableAudit;
528    }
529
530    public synchronized void setEnableAudit(boolean enableAudit) {
531        this.enableAudit = enableAudit;
532        if (enableAudit && audit == null) {
533            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
534        }
535    }
536
537    // Implementation methods
538    // -------------------------------------------------------------------------
539    @Override
540    public boolean isFull() {
541        if (info.getPrefetchSize() == 0) {
542            return prefetchExtension.get() == 0;
543        }
544        return getDispatchedQueueSize() >= info.getPrefetchSize();
545    }
546
547    @Override
548    public int getInFlightSize() {
549        return getDispatchedQueueSize();
550    }
551
552    /**
553     * @return true when 60% or more room is left for dispatching messages
554     */
555    @Override
556    public boolean isLowWaterMark() {
557        return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
558    }
559
560    /**
561     * @return true when 10% or less room is left for dispatching messages
562     */
563    @Override
564    public boolean isHighWaterMark() {
565        return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
566    }
567
568    /**
569     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
570     */
571    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
572        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
573    }
574
575    /**
576     * @return the memoryUsageHighWaterMark
577     */
578    public int getMemoryUsageHighWaterMark() {
579        return this.memoryUsageHighWaterMark;
580    }
581
582    /**
583     * @return the usageManager
584     */
585    public SystemUsage getUsageManager() {
586        return this.usageManager;
587    }
588
589    /**
590     * @return the matched
591     */
592    public PendingMessageCursor getMatched() {
593        return this.matched;
594    }
595
596    /**
597     * @param matched the matched to set
598     */
599    public void setMatched(PendingMessageCursor matched) {
600        this.matched = matched;
601    }
602
603    /**
604     * inform the MessageConsumer on the client to change it's prefetch
605     *
606     * @param newPrefetch
607     */
608    @Override
609    public void updateConsumerPrefetch(int newPrefetch) {
610        if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
611            ConsumerControl cc = new ConsumerControl();
612            cc.setConsumerId(info.getConsumerId());
613            cc.setPrefetch(newPrefetch);
614            context.getConnection().dispatchAsync(cc);
615        }
616    }
617
618    private void dispatchMatched() throws IOException {
619        synchronized (matchedListMutex) {
620            if (!matched.isEmpty() && !isFull()) {
621                try {
622                    matched.reset();
623
624                    while (matched.hasNext() && !isFull()) {
625                        MessageReference message = matched.next();
626                        message.decrementReferenceCount();
627                        matched.remove();
628                        // Message may have been sitting in the matched list a while
629                        // waiting for the consumer to ak the message.
630                        if (message.isExpired()) {
631                            discard(message);
632                            continue; // just drop it.
633                        }
634                        dispatch(message);
635                    }
636                } finally {
637                    matched.release();
638                }
639            }
640        }
641    }
642
643    private void dispatch(final MessageReference node) throws IOException {
644        Message message = node != null ? node.getMessage() : null;
645        if (node != null) {
646            node.incrementReferenceCount();
647        }
648        // Make sure we can dispatch a message.
649        MessageDispatch md = new MessageDispatch();
650        md.setMessage(message);
651        md.setConsumerId(info.getConsumerId());
652        if (node != null) {
653            md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
654            synchronized(dispatchLock) {
655                getSubscriptionStatistics().getDispatched().increment();
656                dispatched.add(node);
657                getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
658            }
659
660            // Keep track if this subscription is receiving messages from a single destination.
661            if (singleDestination) {
662                if (destination == null) {
663                    destination = (Destination)node.getRegionDestination();
664                } else {
665                    if (destination != node.getRegionDestination()) {
666                        singleDestination = false;
667                    }
668                }
669            }
670
671            if (getPrefetchSize() == 0) {
672                decrementPrefetchExtension();
673            }
674
675        }
676        if (info.isDispatchAsync()) {
677            if (node != null) {
678                md.setTransmitCallback(new TransmitCallback() {
679
680                    @Override
681                    public void onSuccess() {
682                        Destination regionDestination = (Destination) node.getRegionDestination();
683                        regionDestination.getDestinationStatistics().getDispatched().increment();
684                        regionDestination.getDestinationStatistics().getInflight().increment();
685                        node.decrementReferenceCount();
686                    }
687
688                    @Override
689                    public void onFailure() {
690                        Destination regionDestination = (Destination) node.getRegionDestination();
691                        regionDestination.getDestinationStatistics().getDispatched().increment();
692                        regionDestination.getDestinationStatistics().getInflight().increment();
693                        node.decrementReferenceCount();
694                    }
695                });
696            }
697            context.getConnection().dispatchAsync(md);
698        } else {
699            context.getConnection().dispatchSync(md);
700            if (node != null) {
701                Destination regionDestination = (Destination) node.getRegionDestination();
702                regionDestination.getDestinationStatistics().getDispatched().increment();
703                regionDestination.getDestinationStatistics().getInflight().increment();
704                node.decrementReferenceCount();
705            }
706        }
707    }
708
709    private void discard(MessageReference message) {
710        discarding = true;
711        try {
712            message.decrementReferenceCount();
713            matched.remove(message);
714            discarded++;
715            if (destination != null) {
716                destination.getDestinationStatistics().getDequeues().increment();
717            }
718            LOG.debug("{}, discarding message {}", this, message);
719            Destination dest = (Destination) message.getRegionDestination();
720            if (dest != null) {
721                dest.messageDiscarded(getContext(), this, message);
722            }
723            broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
724        } finally {
725            discarding = false;
726        }
727    }
728
729    @Override
730    public String toString() {
731        return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
732                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get();
733    }
734
735    @Override
736    public void destroy() {
737        this.active=false;
738        synchronized (matchedListMutex) {
739            try {
740                matched.destroy();
741            } catch (Exception e) {
742                LOG.warn("Failed to destroy cursor", e);
743            }
744        }
745        setSlowConsumer(false);
746        synchronized(dispatchLock) {
747            dispatched.clear();
748        }
749    }
750
751    @Override
752    public int getPrefetchSize() {
753        return info.getPrefetchSize();
754    }
755
756    @Override
757    public void setPrefetchSize(int newSize) {
758        info.setPrefetchSize(newSize);
759        try {
760            dispatchMatched();
761        } catch(Exception e) {
762            LOG.trace("Caught exception on dispatch after prefetch size change.");
763        }
764    }
765}