001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicInteger; 024import java.util.concurrent.atomic.AtomicLong; 025 026import javax.jms.JMSException; 027 028import org.apache.activemq.ActiveMQMessageAudit; 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; 032import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 033import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 034import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; 035import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; 036import org.apache.activemq.command.ConsumerControl; 037import org.apache.activemq.command.ConsumerInfo; 038import org.apache.activemq.command.Message; 039import org.apache.activemq.command.MessageAck; 040import org.apache.activemq.command.MessageDispatch; 041import org.apache.activemq.command.MessageDispatchNotification; 042import org.apache.activemq.command.MessageId; 043import org.apache.activemq.command.MessagePull; 044import org.apache.activemq.command.Response; 045import org.apache.activemq.thread.Scheduler; 046import org.apache.activemq.transaction.Synchronization; 047import org.apache.activemq.transport.TransmitCallback; 048import org.apache.activemq.usage.SystemUsage; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052public class TopicSubscription extends AbstractSubscription { 053 054 private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); 055 private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); 056 057 protected PendingMessageCursor matched; 058 protected final SystemUsage usageManager; 059 boolean singleDestination = true; 060 Destination destination; 061 private final Scheduler scheduler; 062 063 private int maximumPendingMessages = -1; 064 private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); 065 private int discarded; 066 private final Object matchedListMutex = new Object(); 067 private int memoryUsageHighWaterMark = 95; 068 // allow duplicate suppression in a ring network of brokers 069 protected int maxProducersToAudit = 1024; 070 protected int maxAuditDepth = 1000; 071 protected boolean enableAudit = false; 072 protected ActiveMQMessageAudit audit; 073 protected boolean active = false; 074 protected boolean discarding = false; 075 076 //Used for inflight message size calculations 077 protected final Object dispatchLock = new Object(); 078 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); 079 080 public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { 081 super(broker, context, info); 082 this.usageManager = usageManager; 083 String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]"; 084 if (info.getDestination().isTemporary() || broker.getTempDataStore()==null ) { 085 this.matched = new VMPendingMessageCursor(false); 086 } else { 087 this.matched = new FilePendingMessageCursor(broker,matchedName,false); 088 } 089 090 this.scheduler = broker.getScheduler(); 091 } 092 093 public void init() throws Exception { 094 this.matched.setSystemUsage(usageManager); 095 this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 096 this.matched.start(); 097 if (enableAudit) { 098 audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit); 099 } 100 this.active=true; 101 } 102 103 @Override 104 public void add(MessageReference node) throws Exception { 105 if (isDuplicate(node)) { 106 return; 107 } 108 // Lets use an indirect reference so that we can associate a unique 109 // locator /w the message. 110 node = new IndirectMessageReference(node.getMessage()); 111 getSubscriptionStatistics().getEnqueues().increment(); 112 synchronized (matchedListMutex) { 113 // if this subscriber is already discarding a message, we don't want to add 114 // any more messages to it as those messages can only be advisories generated in the process, 115 // which can trigger the recursive call loop 116 if (discarding) return; 117 118 if (!isFull() && matched.isEmpty()) { 119 // if maximumPendingMessages is set we will only discard messages which 120 // have not been dispatched (i.e. we allow the prefetch buffer to be filled) 121 dispatch(node); 122 setSlowConsumer(false); 123 } else { 124 if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { 125 // Slow consumers should log and set their state as such. 126 if (!isSlowConsumer()) { 127 LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); 128 setSlowConsumer(true); 129 for (Destination dest: destinations) { 130 dest.slowConsumer(getContext(), this); 131 } 132 } 133 } 134 if (maximumPendingMessages != 0) { 135 boolean warnedAboutWait = false; 136 while (active) { 137 while (matched.isFull()) { 138 if (getContext().getStopping().get()) { 139 LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId()); 140 getSubscriptionStatistics().getEnqueues().decrement(); 141 return; 142 } 143 if (!warnedAboutWait) { 144 LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.", 145 new Object[]{ 146 toString(), 147 matched, 148 matched.getSystemUsage().getTempUsage().getPercentUsage(), 149 matched.getSystemUsage().getMemoryUsage().getPercentUsage() 150 }); 151 warnedAboutWait = true; 152 } 153 matchedListMutex.wait(20); 154 } 155 // Temporary storage could be full - so just try to add the message 156 // see https://issues.apache.org/activemq/browse/AMQ-2475 157 if (matched.tryAddMessageLast(node, 10)) { 158 break; 159 } 160 } 161 if (maximumPendingMessages > 0) { 162 // calculate the high water mark from which point we 163 // will eagerly evict expired messages 164 int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark(); 165 if (maximumPendingMessages > 0 && maximumPendingMessages < max) { 166 max = maximumPendingMessages; 167 } 168 if (!matched.isEmpty() && matched.size() > max) { 169 removeExpiredMessages(); 170 } 171 // lets discard old messages as we are a slow consumer 172 while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { 173 int pageInSize = matched.size() - maximumPendingMessages; 174 // only page in a 1000 at a time - else we could blow the memory 175 pageInSize = Math.max(1000, pageInSize); 176 LinkedList<MessageReference> list = null; 177 MessageReference[] oldMessages=null; 178 synchronized(matched){ 179 list = matched.pageInList(pageInSize); 180 oldMessages = messageEvictionStrategy.evictMessages(list); 181 for (MessageReference ref : list) { 182 ref.decrementReferenceCount(); 183 } 184 } 185 int messagesToEvict = 0; 186 if (oldMessages != null){ 187 messagesToEvict = oldMessages.length; 188 for (int i = 0; i < messagesToEvict; i++) { 189 MessageReference oldMessage = oldMessages[i]; 190 discard(oldMessage); 191 } 192 } 193 // lets avoid an infinite loop if we are given a bad eviction strategy 194 // for a bad strategy lets just not evict 195 if (messagesToEvict == 0) { 196 LOG.warn("No messages to evict returned for {} from eviction strategy: {} out of {} candidates", new Object[]{ 197 destination, messageEvictionStrategy, list.size() 198 }); 199 break; 200 } 201 } 202 } 203 dispatchMatched(); 204 } 205 } 206 } 207 } 208 209 private boolean isDuplicate(MessageReference node) { 210 boolean duplicate = false; 211 if (enableAudit && audit != null) { 212 duplicate = audit.isDuplicate(node); 213 if (LOG.isDebugEnabled()) { 214 if (duplicate) { 215 LOG.debug("{}, ignoring duplicate add: {}", this, node.getMessageId()); 216 } 217 } 218 } 219 return duplicate; 220 } 221 222 /** 223 * Discard any expired messages from the matched list. Called from a 224 * synchronized block. 225 * 226 * @throws IOException 227 */ 228 protected void removeExpiredMessages() throws IOException { 229 try { 230 matched.reset(); 231 while (matched.hasNext()) { 232 MessageReference node = matched.next(); 233 node.decrementReferenceCount(); 234 if (node.isExpired()) { 235 matched.remove(); 236 getSubscriptionStatistics().getDispatched().increment(); 237 node.decrementReferenceCount(); 238 if (broker.isExpired(node)) { 239 ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment(); 240 broker.messageExpired(getContext(), node, this); 241 } 242 break; 243 } 244 } 245 } finally { 246 matched.release(); 247 } 248 } 249 250 @Override 251 public void processMessageDispatchNotification(MessageDispatchNotification mdn) { 252 synchronized (matchedListMutex) { 253 try { 254 matched.reset(); 255 while (matched.hasNext()) { 256 MessageReference node = matched.next(); 257 node.decrementReferenceCount(); 258 if (node.getMessageId().equals(mdn.getMessageId())) { 259 synchronized(dispatchLock) { 260 matched.remove(); 261 getSubscriptionStatistics().getDispatched().increment(); 262 dispatched.add(node); 263 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 264 node.decrementReferenceCount(); 265 } 266 break; 267 } 268 } 269 } finally { 270 matched.release(); 271 } 272 } 273 } 274 275 @Override 276 public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { 277 super.acknowledge(context, ack); 278 279 // Handle the standard acknowledgment case. 280 if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { 281 if (context.isInTransaction()) { 282 context.getTransaction().addSynchronization(new Synchronization() { 283 @Override 284 public void afterCommit() throws Exception { 285 updateStatsOnAck(ack); 286 dispatchMatched(); 287 } 288 }); 289 } else { 290 updateStatsOnAck(ack); 291 } 292 updatePrefetch(ack); 293 dispatchMatched(); 294 return; 295 } else if (ack.isDeliveredAck()) { 296 // Message was delivered but not acknowledged: update pre-fetch counters. 297 prefetchExtension.addAndGet(ack.getMessageCount()); 298 dispatchMatched(); 299 return; 300 } else if (ack.isExpiredAck()) { 301 updateStatsOnAck(ack); 302 updatePrefetch(ack); 303 dispatchMatched(); 304 return; 305 } else if (ack.isRedeliveredAck()) { 306 // nothing to do atm 307 return; 308 } 309 throw new JMSException("Invalid acknowledgment: " + ack); 310 } 311 312 @Override 313 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 314 315 // The slave should not deliver pull messages. 316 if (getPrefetchSize() == 0) { 317 318 final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount(); 319 prefetchExtension.set(pull.getQuantity()); 320 dispatchMatched(); 321 322 // If there was nothing dispatched.. we may need to setup a timeout. 323 if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { 324 325 // immediate timeout used by receiveNoWait() 326 if (pull.getTimeout() == -1) { 327 // Send a NULL message to signal nothing pending. 328 dispatch(null); 329 prefetchExtension.set(0); 330 } 331 332 if (pull.getTimeout() > 0) { 333 scheduler.executeAfterDelay(new Runnable() { 334 335 @Override 336 public void run() { 337 pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone()); 338 } 339 }, pull.getTimeout()); 340 } 341 } 342 } 343 return null; 344 } 345 346 /** 347 * Occurs when a pull times out. If nothing has been dispatched since the 348 * timeout was setup, then send the NULL message. 349 */ 350 private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) { 351 synchronized (matchedListMutex) { 352 if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) { 353 try { 354 dispatch(null); 355 } catch (Exception e) { 356 context.getConnection().serviceException(e); 357 } finally { 358 prefetchExtension.set(0); 359 } 360 } 361 } 362 } 363 364 /** 365 * Update the statistics on message ack. 366 * @param ack 367 */ 368 private void updateStatsOnAck(final MessageAck ack) { 369 synchronized(dispatchLock) { 370 boolean inAckRange = false; 371 List<MessageReference> removeList = new ArrayList<MessageReference>(); 372 for (final MessageReference node : dispatched) { 373 MessageId messageId = node.getMessageId(); 374 if (ack.getFirstMessageId() == null 375 || ack.getFirstMessageId().equals(messageId)) { 376 inAckRange = true; 377 } 378 if (inAckRange) { 379 removeList.add(node); 380 if (ack.getLastMessageId().equals(messageId)) { 381 break; 382 } 383 } 384 } 385 386 for (final MessageReference node : removeList) { 387 dispatched.remove(node); 388 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 389 getSubscriptionStatistics().getDequeues().increment(); 390 ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); 391 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 392 if (info.isNetworkSubscription()) { 393 ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); 394 } 395 if (ack.isExpiredAck()) { 396 destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); 397 } 398 } 399 } 400 } 401 402 private void updatePrefetch(MessageAck ack) { 403 while (true) { 404 int currentExtension = prefetchExtension.get(); 405 int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); 406 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 407 break; 408 } 409 } 410 } 411 412 private void decrementPrefetchExtension() { 413 while (true) { 414 int currentExtension = prefetchExtension.get(); 415 int newExtension = Math.max(0, currentExtension - 1); 416 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 417 break; 418 } 419 } 420 } 421 422 @Override 423 public int countBeforeFull() { 424 return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize(); 425 } 426 427 @Override 428 public int getPendingQueueSize() { 429 return matched(); 430 } 431 432 @Override 433 public long getPendingMessageSize() { 434 synchronized (matchedListMutex) { 435 return matched.messageSize(); 436 } 437 } 438 439 @Override 440 public int getDispatchedQueueSize() { 441 return (int)(getSubscriptionStatistics().getDispatched().getCount() - 442 prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount()); 443 } 444 445 public int getMaximumPendingMessages() { 446 return maximumPendingMessages; 447 } 448 449 @Override 450 public long getDispatchedCounter() { 451 return getSubscriptionStatistics().getDispatched().getCount(); 452 } 453 454 @Override 455 public long getEnqueueCounter() { 456 return getSubscriptionStatistics().getEnqueues().getCount(); 457 } 458 459 @Override 460 public long getDequeueCounter() { 461 return getSubscriptionStatistics().getDequeues().getCount(); 462 } 463 464 /** 465 * @return the number of messages discarded due to being a slow consumer 466 */ 467 public int discarded() { 468 synchronized (matchedListMutex) { 469 return discarded; 470 } 471 } 472 473 /** 474 * @return the number of matched messages (messages targeted for the 475 * subscription but not yet able to be dispatched due to the 476 * prefetch buffer being full). 477 */ 478 public int matched() { 479 synchronized (matchedListMutex) { 480 return matched.size(); 481 } 482 } 483 484 /** 485 * Sets the maximum number of pending messages that can be matched against 486 * this consumer before old messages are discarded. 487 */ 488 public void setMaximumPendingMessages(int maximumPendingMessages) { 489 this.maximumPendingMessages = maximumPendingMessages; 490 } 491 492 public MessageEvictionStrategy getMessageEvictionStrategy() { 493 return messageEvictionStrategy; 494 } 495 496 /** 497 * Sets the eviction strategy used to decide which message to evict when the 498 * slow consumer needs to discard messages 499 */ 500 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 501 this.messageEvictionStrategy = messageEvictionStrategy; 502 } 503 504 public int getMaxProducersToAudit() { 505 return maxProducersToAudit; 506 } 507 508 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 509 this.maxProducersToAudit = maxProducersToAudit; 510 if (audit != null) { 511 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 512 } 513 } 514 515 public int getMaxAuditDepth() { 516 return maxAuditDepth; 517 } 518 519 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 520 this.maxAuditDepth = maxAuditDepth; 521 if (audit != null) { 522 audit.setAuditDepth(maxAuditDepth); 523 } 524 } 525 526 public boolean isEnableAudit() { 527 return enableAudit; 528 } 529 530 public synchronized void setEnableAudit(boolean enableAudit) { 531 this.enableAudit = enableAudit; 532 if (enableAudit && audit == null) { 533 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 534 } 535 } 536 537 // Implementation methods 538 // ------------------------------------------------------------------------- 539 @Override 540 public boolean isFull() { 541 if (info.getPrefetchSize() == 0) { 542 return prefetchExtension.get() == 0; 543 } 544 return getDispatchedQueueSize() >= info.getPrefetchSize(); 545 } 546 547 @Override 548 public int getInFlightSize() { 549 return getDispatchedQueueSize(); 550 } 551 552 /** 553 * @return true when 60% or more room is left for dispatching messages 554 */ 555 @Override 556 public boolean isLowWaterMark() { 557 return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4); 558 } 559 560 /** 561 * @return true when 10% or less room is left for dispatching messages 562 */ 563 @Override 564 public boolean isHighWaterMark() { 565 return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9); 566 } 567 568 /** 569 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 570 */ 571 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 572 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 573 } 574 575 /** 576 * @return the memoryUsageHighWaterMark 577 */ 578 public int getMemoryUsageHighWaterMark() { 579 return this.memoryUsageHighWaterMark; 580 } 581 582 /** 583 * @return the usageManager 584 */ 585 public SystemUsage getUsageManager() { 586 return this.usageManager; 587 } 588 589 /** 590 * @return the matched 591 */ 592 public PendingMessageCursor getMatched() { 593 return this.matched; 594 } 595 596 /** 597 * @param matched the matched to set 598 */ 599 public void setMatched(PendingMessageCursor matched) { 600 this.matched = matched; 601 } 602 603 /** 604 * inform the MessageConsumer on the client to change it's prefetch 605 * 606 * @param newPrefetch 607 */ 608 @Override 609 public void updateConsumerPrefetch(int newPrefetch) { 610 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 611 ConsumerControl cc = new ConsumerControl(); 612 cc.setConsumerId(info.getConsumerId()); 613 cc.setPrefetch(newPrefetch); 614 context.getConnection().dispatchAsync(cc); 615 } 616 } 617 618 private void dispatchMatched() throws IOException { 619 synchronized (matchedListMutex) { 620 if (!matched.isEmpty() && !isFull()) { 621 try { 622 matched.reset(); 623 624 while (matched.hasNext() && !isFull()) { 625 MessageReference message = matched.next(); 626 message.decrementReferenceCount(); 627 matched.remove(); 628 // Message may have been sitting in the matched list a while 629 // waiting for the consumer to ak the message. 630 if (message.isExpired()) { 631 discard(message); 632 continue; // just drop it. 633 } 634 dispatch(message); 635 } 636 } finally { 637 matched.release(); 638 } 639 } 640 } 641 } 642 643 private void dispatch(final MessageReference node) throws IOException { 644 Message message = node != null ? node.getMessage() : null; 645 if (node != null) { 646 node.incrementReferenceCount(); 647 } 648 // Make sure we can dispatch a message. 649 MessageDispatch md = new MessageDispatch(); 650 md.setMessage(message); 651 md.setConsumerId(info.getConsumerId()); 652 if (node != null) { 653 md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination()); 654 synchronized(dispatchLock) { 655 getSubscriptionStatistics().getDispatched().increment(); 656 dispatched.add(node); 657 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 658 } 659 660 // Keep track if this subscription is receiving messages from a single destination. 661 if (singleDestination) { 662 if (destination == null) { 663 destination = (Destination)node.getRegionDestination(); 664 } else { 665 if (destination != node.getRegionDestination()) { 666 singleDestination = false; 667 } 668 } 669 } 670 671 if (getPrefetchSize() == 0) { 672 decrementPrefetchExtension(); 673 } 674 675 } 676 if (info.isDispatchAsync()) { 677 if (node != null) { 678 md.setTransmitCallback(new TransmitCallback() { 679 680 @Override 681 public void onSuccess() { 682 Destination regionDestination = (Destination) node.getRegionDestination(); 683 regionDestination.getDestinationStatistics().getDispatched().increment(); 684 regionDestination.getDestinationStatistics().getInflight().increment(); 685 node.decrementReferenceCount(); 686 } 687 688 @Override 689 public void onFailure() { 690 Destination regionDestination = (Destination) node.getRegionDestination(); 691 regionDestination.getDestinationStatistics().getDispatched().increment(); 692 regionDestination.getDestinationStatistics().getInflight().increment(); 693 node.decrementReferenceCount(); 694 } 695 }); 696 } 697 context.getConnection().dispatchAsync(md); 698 } else { 699 context.getConnection().dispatchSync(md); 700 if (node != null) { 701 Destination regionDestination = (Destination) node.getRegionDestination(); 702 regionDestination.getDestinationStatistics().getDispatched().increment(); 703 regionDestination.getDestinationStatistics().getInflight().increment(); 704 node.decrementReferenceCount(); 705 } 706 } 707 } 708 709 private void discard(MessageReference message) { 710 discarding = true; 711 try { 712 message.decrementReferenceCount(); 713 matched.remove(message); 714 discarded++; 715 if (destination != null) { 716 destination.getDestinationStatistics().getDequeues().increment(); 717 } 718 LOG.debug("{}, discarding message {}", this, message); 719 Destination dest = (Destination) message.getRegionDestination(); 720 if (dest != null) { 721 dest.messageDiscarded(getContext(), this, message); 722 } 723 broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId())); 724 } finally { 725 discarding = false; 726 } 727 } 728 729 @Override 730 public String toString() { 731 return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" 732 + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get(); 733 } 734 735 @Override 736 public void destroy() { 737 this.active=false; 738 synchronized (matchedListMutex) { 739 try { 740 matched.destroy(); 741 } catch (Exception e) { 742 LOG.warn("Failed to destroy cursor", e); 743 } 744 } 745 setSlowConsumer(false); 746 synchronized(dispatchLock) { 747 dispatched.clear(); 748 } 749 } 750 751 @Override 752 public int getPrefetchSize() { 753 return info.getPrefetchSize(); 754 } 755 756 @Override 757 public void setPrefetchSize(int newSize) { 758 info.setPrefetchSize(newSize); 759 try { 760 dispatchMatched(); 761 } catch(Exception e) { 762 LOG.trace("Caught exception on dispatch after prefetch size change."); 763 } 764 } 765}