001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CancellationException; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArrayList; 028import java.util.concurrent.Future; 029import java.util.concurrent.locks.ReentrantReadWriteLock; 030 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.BrokerService; 033import org.apache.activemq.broker.ConnectionContext; 034import org.apache.activemq.broker.ProducerBrokerExchange; 035import org.apache.activemq.broker.region.policy.DispatchPolicy; 036import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; 037import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 038import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; 039import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; 040import org.apache.activemq.broker.util.InsertionCountList; 041import org.apache.activemq.command.ActiveMQDestination; 042import org.apache.activemq.command.ConsumerInfo; 043import org.apache.activemq.command.ExceptionResponse; 044import org.apache.activemq.command.Message; 045import org.apache.activemq.command.MessageAck; 046import org.apache.activemq.command.MessageId; 047import org.apache.activemq.command.ProducerAck; 048import org.apache.activemq.command.ProducerInfo; 049import org.apache.activemq.command.Response; 050import org.apache.activemq.command.SubscriptionInfo; 051import org.apache.activemq.filter.MessageEvaluationContext; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.store.MessageRecoveryListener; 054import org.apache.activemq.store.NoLocalSubscriptionAware; 055import org.apache.activemq.store.PersistenceAdapter; 056import org.apache.activemq.store.TopicMessageStore; 057import org.apache.activemq.thread.Task; 058import org.apache.activemq.thread.TaskRunner; 059import org.apache.activemq.thread.TaskRunnerFactory; 060import org.apache.activemq.transaction.Synchronization; 061import org.apache.activemq.util.SubscriptionKey; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * The Topic is a destination that sends a copy of a message to every active 067 * Subscription registered. 068 */ 069public class Topic extends BaseDestination implements Task { 070 protected static final Logger LOG = LoggerFactory.getLogger(Topic.class); 071 private final TopicMessageStore topicStore; 072 protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>(); 073 private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); 074 private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); 075 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 076 private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); 077 private final TaskRunner taskRunner; 078 private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); 079 private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { 080 @Override 081 public void run() { 082 try { 083 Topic.this.taskRunner.wakeup(); 084 } catch (InterruptedException e) { 085 } 086 }; 087 }; 088 089 public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, 090 DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { 091 super(brokerService, store, destination, parentStats); 092 this.topicStore = store; 093 subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); 094 this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); 095 } 096 097 @Override 098 public void initialize() throws Exception { 099 super.initialize(); 100 // set non default subscription recovery policy (override policyEntries) 101 if (AdvisorySupport.isMasterBrokerAdvisoryTopic(destination)) { 102 subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); 103 setAlwaysRetroactive(true); 104 } 105 if (store != null) { 106 // AMQ-2586: Better to leave this stat at zero than to give the user 107 // misleading metrics. 108 // int messageCount = store.getMessageCount(); 109 // destinationStatistics.getMessages().setCount(messageCount); 110 store.start(); 111 } 112 } 113 114 @Override 115 public List<Subscription> getConsumers() { 116 synchronized (consumers) { 117 return new ArrayList<Subscription>(consumers); 118 } 119 } 120 121 public boolean lock(MessageReference node, LockOwner sub) { 122 return true; 123 } 124 125 @Override 126 public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { 127 if (!sub.getConsumerInfo().isDurable()) { 128 129 // Do a retroactive recovery if needed. 130 if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { 131 132 // synchronize with dispatch method so that no new messages are sent 133 // while we are recovering a subscription to avoid out of order messages. 134 dispatchLock.writeLock().lock(); 135 try { 136 boolean applyRecovery = false; 137 synchronized (consumers) { 138 if (!consumers.contains(sub)){ 139 sub.add(context, this); 140 consumers.add(sub); 141 applyRecovery=true; 142 super.addSubscription(context, sub); 143 } 144 } 145 if (applyRecovery){ 146 subscriptionRecoveryPolicy.recover(context, this, sub); 147 } 148 } finally { 149 dispatchLock.writeLock().unlock(); 150 } 151 152 } else { 153 synchronized (consumers) { 154 if (!consumers.contains(sub)){ 155 sub.add(context, this); 156 consumers.add(sub); 157 super.addSubscription(context, sub); 158 } 159 } 160 } 161 } else { 162 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 163 super.addSubscription(context, sub); 164 sub.add(context, this); 165 if(dsub.isActive()) { 166 synchronized (consumers) { 167 boolean hasSubscription = false; 168 169 if (consumers.size() == 0) { 170 hasSubscription = false; 171 } else { 172 for (Subscription currentSub : consumers) { 173 if (currentSub.getConsumerInfo().isDurable()) { 174 DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; 175 if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { 176 hasSubscription = true; 177 break; 178 } 179 } 180 } 181 } 182 183 if (!hasSubscription) { 184 consumers.add(sub); 185 } 186 } 187 } 188 durableSubscribers.put(dsub.getSubscriptionKey(), dsub); 189 } 190 } 191 192 @Override 193 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { 194 if (!sub.getConsumerInfo().isDurable()) { 195 super.removeSubscription(context, sub, lastDeliveredSequenceId); 196 synchronized (consumers) { 197 consumers.remove(sub); 198 } 199 } 200 sub.remove(context, this); 201 } 202 203 public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { 204 if (topicStore != null) { 205 topicStore.deleteSubscription(key.clientId, key.subscriptionName); 206 DurableTopicSubscription removed = durableSubscribers.remove(key); 207 if (removed != null) { 208 destinationStatistics.getConsumers().decrement(); 209 // deactivate and remove 210 removed.deactivate(false, 0l); 211 consumers.remove(removed); 212 } 213 } 214 } 215 216 private boolean hasDurableSubChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { 217 if (hasSelectorChanged(info1, info2)) { 218 return true; 219 } 220 221 return hasNoLocalChanged(info1, info2); 222 } 223 224 private boolean hasNoLocalChanged(SubscriptionInfo info1, ConsumerInfo info2) throws IOException { 225 //Not all persistence adapters store the noLocal value for a subscription 226 PersistenceAdapter adapter = broker.getBrokerService().getPersistenceAdapter(); 227 if (adapter instanceof NoLocalSubscriptionAware) { 228 if (info1.isNoLocal() ^ info2.isNoLocal()) { 229 return true; 230 } 231 } 232 233 return false; 234 } 235 236 private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) { 237 if (info1.getSelector() != null ^ info2.getSelector() != null) { 238 return true; 239 } 240 241 if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) { 242 return true; 243 } 244 245 return false; 246 } 247 248 public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { 249 // synchronize with dispatch method so that no new messages are sent 250 // while we are recovering a subscription to avoid out of order messages. 251 dispatchLock.writeLock().lock(); 252 try { 253 254 if (topicStore == null) { 255 return; 256 } 257 258 // Recover the durable subscription. 259 String clientId = subscription.getSubscriptionKey().getClientId(); 260 String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName(); 261 SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName); 262 if (info != null) { 263 // Check to see if selector changed. 264 if (hasDurableSubChanged(info, subscription.getConsumerInfo())) { 265 // Need to delete the subscription 266 topicStore.deleteSubscription(clientId, subscriptionName); 267 info = null; 268 // Force a rebuild of the selector chain for the subscription otherwise 269 // the stored subscription is updated but the selector expression is not 270 // and the subscription will not behave according to the new configuration. 271 subscription.setSelector(subscription.getConsumerInfo().getSelector()); 272 synchronized (consumers) { 273 consumers.remove(subscription); 274 } 275 } else { 276 synchronized (consumers) { 277 if (!consumers.contains(subscription)) { 278 consumers.add(subscription); 279 } 280 } 281 } 282 } 283 284 // Do we need to create the subscription? 285 if (info == null) { 286 info = new SubscriptionInfo(); 287 info.setClientId(clientId); 288 info.setSelector(subscription.getConsumerInfo().getSelector()); 289 info.setSubscriptionName(subscriptionName); 290 info.setDestination(getActiveMQDestination()); 291 info.setNoLocal(subscription.getConsumerInfo().isNoLocal()); 292 // This destination is an actual destination id. 293 info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 294 // This destination might be a pattern 295 synchronized (consumers) { 296 consumers.add(subscription); 297 topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); 298 } 299 } 300 301 final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 302 msgContext.setDestination(destination); 303 if (subscription.isRecoveryRequired()) { 304 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() { 305 @Override 306 public boolean recoverMessage(Message message) throws Exception { 307 message.setRegionDestination(Topic.this); 308 try { 309 msgContext.setMessageReference(message); 310 if (subscription.matches(message, msgContext)) { 311 subscription.add(message); 312 } 313 } catch (IOException e) { 314 LOG.error("Failed to recover this message {}", message, e); 315 } 316 return true; 317 } 318 319 @Override 320 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 321 throw new RuntimeException("Should not be called."); 322 } 323 324 @Override 325 public boolean hasSpace() { 326 return true; 327 } 328 329 @Override 330 public boolean isDuplicate(MessageId id) { 331 return false; 332 } 333 }); 334 } 335 } finally { 336 dispatchLock.writeLock().unlock(); 337 } 338 } 339 340 public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception { 341 synchronized (consumers) { 342 consumers.remove(sub); 343 } 344 sub.remove(context, this, dispatched); 345 } 346 347 public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { 348 if (subscription.getConsumerInfo().isRetroactive()) { 349 subscriptionRecoveryPolicy.recover(context, this, subscription); 350 } 351 } 352 353 @Override 354 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 355 final ConnectionContext context = producerExchange.getConnectionContext(); 356 357 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 358 producerExchange.incrementSend(); 359 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 360 && !context.isInRecoveryMode(); 361 362 message.setRegionDestination(this); 363 364 // There is delay between the client sending it and it arriving at the 365 // destination.. it may have expired. 366 if (message.isExpired()) { 367 broker.messageExpired(context, message, null); 368 getDestinationStatistics().getExpired().increment(); 369 if (sendProducerAck) { 370 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 371 context.getConnection().dispatchAsync(ack); 372 } 373 return; 374 } 375 376 if (memoryUsage.isFull()) { 377 isFull(context, memoryUsage); 378 fastProducer(context, producerInfo); 379 380 if (isProducerFlowControl() && context.isProducerFlowControl()) { 381 382 if (warnOnProducerFlowControl) { 383 warnOnProducerFlowControl = false; 384 LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", 385 getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); 386 } 387 388 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 389 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" 390 + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() 391 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 392 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 393 } 394 395 // We can avoid blocking due to low usage if the producer is sending a sync message or 396 // if it is using a producer window 397 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { 398 synchronized (messagesWaitingForSpace) { 399 messagesWaitingForSpace.add(new Runnable() { 400 @Override 401 public void run() { 402 try { 403 404 // While waiting for space to free up... the 405 // message may have expired. 406 if (message.isExpired()) { 407 broker.messageExpired(context, message, null); 408 getDestinationStatistics().getExpired().increment(); 409 } else { 410 doMessageSend(producerExchange, message); 411 } 412 413 if (sendProducerAck) { 414 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message 415 .getSize()); 416 context.getConnection().dispatchAsync(ack); 417 } else { 418 Response response = new Response(); 419 response.setCorrelationId(message.getCommandId()); 420 context.getConnection().dispatchAsync(response); 421 } 422 423 } catch (Exception e) { 424 if (!sendProducerAck && !context.isInRecoveryMode()) { 425 ExceptionResponse response = new ExceptionResponse(e); 426 response.setCorrelationId(message.getCommandId()); 427 context.getConnection().dispatchAsync(response); 428 } 429 } 430 } 431 }); 432 433 registerCallbackForNotFullNotification(); 434 context.setDontSendReponse(true); 435 return; 436 } 437 438 } else { 439 // Producer flow control cannot be used, so we have do the flow control 440 // at the broker by blocking this thread until there is space available. 441 442 if (memoryUsage.isFull()) { 443 if (context.isInTransaction()) { 444 445 int count = 0; 446 while (!memoryUsage.waitForSpace(1000)) { 447 if (context.getStopping().get()) { 448 throw new IOException("Connection closed, send aborted."); 449 } 450 if (count > 2 && context.isInTransaction()) { 451 count = 0; 452 int size = context.getTransaction().size(); 453 LOG.warn("Waiting for space to send transacted message - transaction elements = {} need more space to commit. Message = {}", size, message); 454 } 455 count++; 456 } 457 } else { 458 waitForSpace( 459 context, 460 producerExchange, 461 memoryUsage, 462 "Usage Manager Memory Usage limit reached. Stopping producer (" 463 + message.getProducerId() 464 + ") to prevent flooding " 465 + getActiveMQDestination().getQualifiedName() 466 + "." 467 + " See http://activemq.apache.org/producer-flow-control.html for more info"); 468 } 469 } 470 471 // The usage manager could have delayed us by the time 472 // we unblock the message could have expired.. 473 if (message.isExpired()) { 474 getDestinationStatistics().getExpired().increment(); 475 LOG.debug("Expired message: {}", message); 476 return; 477 } 478 } 479 } 480 } 481 482 doMessageSend(producerExchange, message); 483 messageDelivered(context, message); 484 if (sendProducerAck) { 485 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 486 context.getConnection().dispatchAsync(ack); 487 } 488 } 489 490 /** 491 * do send the message - this needs to be synchronized to ensure messages 492 * are stored AND dispatched in the right order 493 * 494 * @param producerExchange 495 * @param message 496 * @throws IOException 497 * @throws Exception 498 */ 499 synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) 500 throws IOException, Exception { 501 final ConnectionContext context = producerExchange.getConnectionContext(); 502 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 503 Future<Object> result = null; 504 505 if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { 506 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { 507 final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of " 508 + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() 509 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." 510 + " See http://activemq.apache.org/producer-flow-control.html for more info"; 511 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 512 throw new javax.jms.ResourceAllocationException(logMessage); 513 } 514 515 waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); 516 } 517 result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); 518 519 if (isReduceMemoryFootprint()) { 520 message.clearMarshalledState(); 521 } 522 } 523 524 message.incrementReferenceCount(); 525 526 if (context.isInTransaction()) { 527 context.getTransaction().addSynchronization(new Synchronization() { 528 @Override 529 public void afterCommit() throws Exception { 530 // It could take while before we receive the commit 531 // operation.. by that time the message could have 532 // expired.. 533 if (message.isExpired()) { 534 if (broker.isExpired(message)) { 535 getDestinationStatistics().getExpired().increment(); 536 broker.messageExpired(context, message, null); 537 } 538 message.decrementReferenceCount(); 539 return; 540 } 541 try { 542 dispatch(context, message); 543 } finally { 544 message.decrementReferenceCount(); 545 } 546 } 547 548 @Override 549 public void afterRollback() throws Exception { 550 message.decrementReferenceCount(); 551 } 552 }); 553 554 } else { 555 try { 556 dispatch(context, message); 557 } finally { 558 message.decrementReferenceCount(); 559 } 560 } 561 562 if (result != null && !result.isCancelled()) { 563 try { 564 result.get(); 565 } catch (CancellationException e) { 566 // ignore - the task has been cancelled if the message 567 // has already been deleted 568 } 569 } 570 } 571 572 private boolean canOptimizeOutPersistence() { 573 return durableSubscribers.size() == 0; 574 } 575 576 @Override 577 public String toString() { 578 return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size(); 579 } 580 581 @Override 582 public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, 583 final MessageReference node) throws IOException { 584 if (topicStore != null && node.isPersistent()) { 585 DurableTopicSubscription dsub = (DurableTopicSubscription) sub; 586 SubscriptionKey key = dsub.getSubscriptionKey(); 587 topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), 588 convertToNonRangedAck(ack, node)); 589 } 590 messageConsumed(context, node); 591 } 592 593 @Override 594 public void gc() { 595 } 596 597 public Message loadMessage(MessageId messageId) throws IOException { 598 return topicStore != null ? topicStore.getMessage(messageId) : null; 599 } 600 601 @Override 602 public void start() throws Exception { 603 this.subscriptionRecoveryPolicy.start(); 604 if (memoryUsage != null) { 605 memoryUsage.start(); 606 } 607 608 if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { 609 scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); 610 } 611 } 612 613 @Override 614 public void stop() throws Exception { 615 if (taskRunner != null) { 616 taskRunner.shutdown(); 617 } 618 this.subscriptionRecoveryPolicy.stop(); 619 if (memoryUsage != null) { 620 memoryUsage.stop(); 621 } 622 if (this.topicStore != null) { 623 this.topicStore.stop(); 624 } 625 626 scheduler.cancel(expireMessagesTask); 627 } 628 629 @Override 630 public Message[] browse() { 631 final List<Message> result = new ArrayList<Message>(); 632 doBrowse(result, getMaxBrowsePageSize()); 633 return result.toArray(new Message[result.size()]); 634 } 635 636 private void doBrowse(final List<Message> browseList, final int max) { 637 try { 638 if (topicStore != null) { 639 final List<Message> toExpire = new ArrayList<Message>(); 640 topicStore.recover(new MessageRecoveryListener() { 641 @Override 642 public boolean recoverMessage(Message message) throws Exception { 643 if (message.isExpired()) { 644 toExpire.add(message); 645 } 646 browseList.add(message); 647 return true; 648 } 649 650 @Override 651 public boolean recoverMessageReference(MessageId messageReference) throws Exception { 652 return true; 653 } 654 655 @Override 656 public boolean hasSpace() { 657 return browseList.size() < max; 658 } 659 660 @Override 661 public boolean isDuplicate(MessageId id) { 662 return false; 663 } 664 }); 665 final ConnectionContext connectionContext = createConnectionContext(); 666 for (Message message : toExpire) { 667 for (DurableTopicSubscription sub : durableSubscribers.values()) { 668 if (!sub.isActive()) { 669 message.setRegionDestination(this); 670 messageExpired(connectionContext, sub, message); 671 } 672 } 673 } 674 Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination()); 675 if (msgs != null) { 676 for (int i = 0; i < msgs.length && browseList.size() < max; i++) { 677 browseList.add(msgs[i]); 678 } 679 } 680 } 681 } catch (Throwable e) { 682 LOG.warn("Failed to browse Topic: {}", getActiveMQDestination().getPhysicalName(), e); 683 } 684 } 685 686 @Override 687 public boolean iterate() { 688 synchronized (messagesWaitingForSpace) { 689 while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) { 690 Runnable op = messagesWaitingForSpace.removeFirst(); 691 op.run(); 692 } 693 694 if (!messagesWaitingForSpace.isEmpty()) { 695 registerCallbackForNotFullNotification(); 696 } 697 } 698 return false; 699 } 700 701 private void registerCallbackForNotFullNotification() { 702 // If the usage manager is not full, then the task will not 703 // get called.. 704 if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) { 705 // so call it directly here. 706 sendMessagesWaitingForSpaceTask.run(); 707 } 708 } 709 710 // Properties 711 // ------------------------------------------------------------------------- 712 713 public DispatchPolicy getDispatchPolicy() { 714 return dispatchPolicy; 715 } 716 717 public void setDispatchPolicy(DispatchPolicy dispatchPolicy) { 718 this.dispatchPolicy = dispatchPolicy; 719 } 720 721 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 722 return subscriptionRecoveryPolicy; 723 } 724 725 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { 726 if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { 727 // allow users to combine retained message policy with other ActiveMQ policies 728 RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; 729 policy.setWrapped(recoveryPolicy); 730 } else { 731 this.subscriptionRecoveryPolicy = recoveryPolicy; 732 } 733 } 734 735 // Implementation methods 736 // ------------------------------------------------------------------------- 737 738 @Override 739 public final void wakeup() { 740 } 741 742 protected void dispatch(final ConnectionContext context, Message message) throws Exception { 743 // AMQ-2586: Better to leave this stat at zero than to give the user 744 // misleading metrics. 745 // destinationStatistics.getMessages().increment(); 746 destinationStatistics.getEnqueues().increment(); 747 destinationStatistics.getMessageSize().addSize(message.getSize()); 748 MessageEvaluationContext msgContext = null; 749 750 dispatchLock.readLock().lock(); 751 try { 752 if (!subscriptionRecoveryPolicy.add(context, message)) { 753 return; 754 } 755 synchronized (consumers) { 756 if (consumers.isEmpty()) { 757 onMessageWithNoConsumers(context, message); 758 return; 759 } 760 } 761 msgContext = context.getMessageEvaluationContext(); 762 msgContext.setDestination(destination); 763 msgContext.setMessageReference(message); 764 if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { 765 onMessageWithNoConsumers(context, message); 766 } 767 768 } finally { 769 dispatchLock.readLock().unlock(); 770 if (msgContext != null) { 771 msgContext.clear(); 772 } 773 } 774 } 775 776 private final Runnable expireMessagesTask = new Runnable() { 777 @Override 778 public void run() { 779 List<Message> browsedMessages = new InsertionCountList<Message>(); 780 doBrowse(browsedMessages, getMaxExpirePageSize()); 781 } 782 }; 783 784 @Override 785 public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) { 786 broker.messageExpired(context, reference, subs); 787 // AMQ-2586: Better to leave this stat at zero than to give the user 788 // misleading metrics. 789 // destinationStatistics.getMessages().decrement(); 790 destinationStatistics.getExpired().increment(); 791 MessageAck ack = new MessageAck(); 792 ack.setAckType(MessageAck.STANDARD_ACK_TYPE); 793 ack.setDestination(destination); 794 ack.setMessageID(reference.getMessageId()); 795 try { 796 if (subs instanceof DurableTopicSubscription) { 797 ((DurableTopicSubscription)subs).removePending(reference); 798 } 799 acknowledge(context, subs, ack, reference); 800 } catch (Exception e) { 801 LOG.error("Failed to remove expired Message from the store ", e); 802 } 803 } 804 805 @Override 806 protected Logger getLog() { 807 return LOG; 808 } 809 810 protected boolean isOptimizeStorage(){ 811 boolean result = false; 812 813 if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){ 814 result = true; 815 for (DurableTopicSubscription s : durableSubscribers.values()) { 816 if (s.isActive()== false){ 817 result = false; 818 break; 819 } 820 if (s.getPrefetchSize()==0){ 821 result = false; 822 break; 823 } 824 if (s.isSlowConsumer()){ 825 result = false; 826 break; 827 } 828 if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){ 829 result = false; 830 break; 831 } 832 } 833 } 834 return result; 835 } 836 837 /** 838 * force a reread of the store - after transaction recovery completion 839 */ 840 @Override 841 public void clearPendingMessages() { 842 dispatchLock.readLock().lock(); 843 try { 844 for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { 845 clearPendingAndDispatch(durableTopicSubscription); 846 } 847 } finally { 848 dispatchLock.readLock().unlock(); 849 } 850 } 851 852 private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { 853 synchronized (durableTopicSubscription.pendingLock) { 854 durableTopicSubscription.pending.clear(); 855 try { 856 durableTopicSubscription.dispatchPending(); 857 } catch (IOException exception) { 858 LOG.warn("After clear of pending, failed to dispatch to: {}, for: {}, pending: {}", new Object[]{ 859 durableTopicSubscription, 860 destination, 861 durableTopicSubscription.pending }, exception); 862 } 863 } 864 } 865 866 public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { 867 return durableSubscribers; 868 } 869}