001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.Set;
020
021import org.apache.activemq.ActiveMQPrefetchPolicy;
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.region.BaseDestination;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.broker.region.DurableTopicSubscription;
026import org.apache.activemq.broker.region.Queue;
027import org.apache.activemq.broker.region.QueueBrowserSubscription;
028import org.apache.activemq.broker.region.QueueSubscription;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.broker.region.Topic;
031import org.apache.activemq.broker.region.TopicSubscription;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.group.GroupFactoryFinder;
034import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
035import org.apache.activemq.filter.DestinationMapEntry;
036import org.apache.activemq.network.NetworkBridgeFilterFactory;
037import org.apache.activemq.usage.SystemUsage;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Represents an entry in a {@link PolicyMap} for assigning policies to a
043 * specific destination or a hierarchical wildcard area of destinations.
044 *
045 * @org.apache.xbean.XBean
046 *
047 */
048public class PolicyEntry extends DestinationMapEntry {
049
050    private static final Logger LOG = LoggerFactory.getLogger(PolicyEntry.class);
051    private DispatchPolicy dispatchPolicy;
052    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
053    private boolean sendAdvisoryIfNoConsumers;
054    private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
055    private PendingMessageLimitStrategy pendingMessageLimitStrategy;
056    private MessageEvictionStrategy messageEvictionStrategy;
057    private long memoryLimit;
058    private String messageGroupMapFactoryType = "cached";
059    private MessageGroupMapFactory messageGroupMapFactory;
060    private PendingQueueMessageStoragePolicy pendingQueuePolicy;
061    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
062    private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
063    private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT;
064    private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
065    private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
066    private boolean enableAudit=true;
067    private boolean producerFlowControl = true;
068    private boolean alwaysRetroactive = false;
069    private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
070    private boolean optimizedDispatch=false;
071    private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
072    private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
073    private boolean useCache=true;
074    private long minimumMessageSize=1024;
075    private boolean useConsumerPriority=true;
076    private boolean strictOrderDispatch=false;
077    private boolean lazyDispatch=false;
078    private int timeBeforeDispatchStarts = 0;
079    private int consumersBeforeDispatchStarts = 0;
080    private boolean advisoryForSlowConsumers;
081    private boolean advisoryForFastProducers;
082    private boolean advisoryForDiscardingMessages;
083    private boolean advisoryWhenFull;
084    private boolean advisoryForDelivery;
085    private boolean advisoryForConsumed;
086    private boolean includeBodyForAdvisory;
087    private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
088    private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
089    private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
090    private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
091    private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
092    private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
093    private boolean usePrefetchExtension = true;
094    private int cursorMemoryHighWaterMark = 70;
095    private int storeUsageHighWaterMark = 100;
096    private SlowConsumerStrategy slowConsumerStrategy;
097    private boolean prioritizedMessages;
098    private boolean allConsumersExclusiveByDefault;
099    private boolean gcInactiveDestinations;
100    private boolean gcWithNetworkConsumers;
101    private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
102    private boolean reduceMemoryFootprint;
103    private NetworkBridgeFilterFactory networkBridgeFilterFactory;
104    private boolean doOptimzeMessageStorage = true;
105    private int maxDestinations = -1;
106
107    /*
108     * percentage of in-flight messages above which optimize message store is disabled
109     */
110    private int optimizeMessageStoreInFlightLimit = 10;
111    private boolean persistJMSRedelivered = false;
112
113
114    public void configure(Broker broker,Queue queue) {
115        baseConfiguration(broker,queue);
116        if (dispatchPolicy != null) {
117            queue.setDispatchPolicy(dispatchPolicy);
118        }
119        queue.setDeadLetterStrategy(getDeadLetterStrategy());
120        queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
121        if (memoryLimit > 0) {
122            queue.getMemoryUsage().setLimit(memoryLimit);
123        }
124        if (pendingQueuePolicy != null) {
125            PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
126            queue.setMessages(messages);
127        }
128
129        queue.setUseConsumerPriority(isUseConsumerPriority());
130        queue.setStrictOrderDispatch(isStrictOrderDispatch());
131        queue.setOptimizedDispatch(isOptimizedDispatch());
132        queue.setLazyDispatch(isLazyDispatch());
133        queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
134        queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
135        queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
136        queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
137    }
138
139    public void update(Queue queue) {
140        update(queue, null);
141    }
142
143    /**
144     * Update a queue with this policy.  Only apply properties that
145     * match the includedProperties list.  Not all properties are eligible
146     * to be updated.
147     *
148     * If includedProperties is null then all of the properties will be set as
149     * isUpdate will return true
150     * @param baseDestination
151     * @param includedProperties
152     */
153    public void update(Queue queue, Set<String> includedProperties) {
154        baseUpdate(queue, includedProperties);
155        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
156            queue.getMemoryUsage().setLimit(memoryLimit);
157        }
158        if (isUpdate("useConsumerPriority", includedProperties)) {
159            queue.setUseConsumerPriority(isUseConsumerPriority());
160        }
161        if (isUpdate("strictOrderDispatch", includedProperties)) {
162            queue.setStrictOrderDispatch(isStrictOrderDispatch());
163        }
164        if (isUpdate("optimizedDispatch", includedProperties)) {
165            queue.setOptimizedDispatch(isOptimizedDispatch());
166        }
167        if (isUpdate("lazyDispatch", includedProperties)) {
168            queue.setLazyDispatch(isLazyDispatch());
169        }
170        if (isUpdate("timeBeforeDispatchStarts", includedProperties)) {
171            queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
172        }
173        if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) {
174            queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
175        }
176        if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) {
177            queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
178        }
179        if (isUpdate("persistJMSRedelivered", includedProperties)) {
180            queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
181        }
182    }
183
184    public void configure(Broker broker,Topic topic) {
185        baseConfiguration(broker,topic);
186        if (dispatchPolicy != null) {
187            topic.setDispatchPolicy(dispatchPolicy);
188        }
189        topic.setDeadLetterStrategy(getDeadLetterStrategy());
190        if (subscriptionRecoveryPolicy != null) {
191            SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy();
192            srp.setBroker(broker);
193            topic.setSubscriptionRecoveryPolicy(srp);
194        }
195        if (memoryLimit > 0) {
196            topic.getMemoryUsage().setLimit(memoryLimit);
197        }
198        topic.setLazyDispatch(isLazyDispatch());
199    }
200
201    public void update(Topic topic) {
202        update(topic, null);
203    }
204
205    //If includedProperties is null then all of the properties will be set as
206    //isUpdate will return true
207    public void update(Topic topic, Set<String> includedProperties) {
208        baseUpdate(topic, includedProperties);
209        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
210            topic.getMemoryUsage().setLimit(memoryLimit);
211        }
212        if (isUpdate("lazyDispatch", includedProperties)) {
213            topic.setLazyDispatch(isLazyDispatch());
214        }
215    }
216
217    // attributes that can change on the fly
218    public void baseUpdate(BaseDestination destination) {
219        baseUpdate(destination, null);
220    }
221
222    // attributes that can change on the fly
223    //If includedProperties is null then all of the properties will be set as
224    //isUpdate will return true
225    public void baseUpdate(BaseDestination destination, Set<String> includedProperties) {
226        if (isUpdate("producerFlowControl", includedProperties)) {
227            destination.setProducerFlowControl(isProducerFlowControl());
228        }
229        if (isUpdate("alwaysRetroactive", includedProperties)) {
230            destination.setAlwaysRetroactive(isAlwaysRetroactive());
231        }
232        if (isUpdate("blockedProducerWarningInterval", includedProperties)) {
233            destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
234        }
235        if (isUpdate("maxPageSize", includedProperties)) {
236            destination.setMaxPageSize(getMaxPageSize());
237        }
238        if (isUpdate("maxBrowsePageSize", includedProperties)) {
239            destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
240        }
241
242        if (isUpdate("minimumMessageSize", includedProperties)) {
243            destination.setMinimumMessageSize((int) getMinimumMessageSize());
244        }
245        if (isUpdate("maxExpirePageSize", includedProperties)) {
246            destination.setMaxExpirePageSize(getMaxExpirePageSize());
247        }
248        if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) {
249            destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
250        }
251        if (isUpdate("storeUsageHighWaterMark", includedProperties)) {
252            destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
253        }
254        if (isUpdate("gcInactiveDestinations", includedProperties)) {
255            destination.setGcIfInactive(isGcInactiveDestinations());
256        }
257        if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
258            destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
259        }
260        if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) {
261            destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC());
262        }
263        if (isUpdate("reduceMemoryFootprint", includedProperties)) {
264            destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
265        }
266        if (isUpdate("doOptimizeMessageStore", includedProperties)) {
267            destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
268        }
269        if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) {
270            destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
271        }
272        if (isUpdate("advisoryForConsumed", includedProperties)) {
273            destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
274        }
275        if (isUpdate("advisoryForDelivery", includedProperties)) {
276            destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
277        }
278        if (isUpdate("advisoryForDiscardingMessages", includedProperties)) {
279            destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
280        }
281        if (isUpdate("advisoryForSlowConsumers", includedProperties)) {
282            destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
283        }
284        if (isUpdate("advisoryForFastProducers", includedProperties)) {
285            destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
286        }
287        if (isUpdate("advisoryWhenFull", includedProperties)) {
288            destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
289        }
290        if (isUpdate("includeBodyForAdvisory", includedProperties)) {
291            destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
292        }
293        if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
294            destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
295        }
296    }
297
298    public void baseConfiguration(Broker broker, BaseDestination destination) {
299        baseUpdate(destination);
300        destination.setEnableAudit(isEnableAudit());
301        destination.setMaxAuditDepth(getMaxQueueAuditDepth());
302        destination.setMaxProducersToAudit(getMaxProducersToAudit());
303        destination.setUseCache(isUseCache());
304        destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
305        SlowConsumerStrategy scs = getSlowConsumerStrategy();
306        if (scs != null) {
307            scs.setBrokerService(broker);
308            scs.addDestination(destination);
309        }
310        destination.setSlowConsumerStrategy(scs);
311        destination.setPrioritizedMessages(isPrioritizedMessages());
312    }
313
314    public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
315        configurePrefetch(subscription);
316        subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
317        if (pendingMessageLimitStrategy != null) {
318            int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
319            int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
320            if (consumerLimit > 0) {
321                if (value < 0 || consumerLimit < value) {
322                    value = consumerLimit;
323                }
324            }
325            if (value >= 0) {
326                LOG.debug("Setting the maximumPendingMessages size to: {} for consumer: {}", value, subscription.getInfo().getConsumerId());
327                subscription.setMaximumPendingMessages(value);
328            }
329        }
330        if (messageEvictionStrategy != null) {
331            subscription.setMessageEvictionStrategy(messageEvictionStrategy);
332        }
333        if (pendingSubscriberPolicy != null) {
334            String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
335            int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
336            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription));
337        }
338        if (enableAudit) {
339            subscription.setEnableAudit(enableAudit);
340            subscription.setMaxProducersToAudit(maxProducersToAudit);
341            subscription.setMaxAuditDepth(maxAuditDepth);
342        }
343    }
344
345    public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
346        String clientId = sub.getSubscriptionKey().getClientId();
347        String subName = sub.getSubscriptionKey().getSubscriptionName();
348        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
349        configurePrefetch(sub);
350        if (pendingDurableSubscriberPolicy != null) {
351            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub);
352            cursor.setSystemUsage(memoryManager);
353            sub.setPending(cursor);
354        }
355        int auditDepth = getMaxAuditDepth();
356        if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) {
357            sub.setMaxAuditDepth(auditDepth * 10);
358        } else {
359            sub.setMaxAuditDepth(auditDepth);
360        }
361        sub.setMaxProducersToAudit(getMaxProducersToAudit());
362        sub.setUsePrefetchExtension(isUsePrefetchExtension());
363    }
364
365    public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
366        configurePrefetch(sub);
367        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
368        sub.setUsePrefetchExtension(isUsePrefetchExtension());
369
370        // TODO
371        // We currently need an infinite audit because of the way that browser dispatch
372        // is done.  We should refactor the browsers to better handle message dispatch so
373        // we can remove this and perform a more efficient dispatch.
374        sub.setMaxProducersToAudit(Integer.MAX_VALUE);
375        sub.setMaxAuditDepth(Short.MAX_VALUE);
376
377        // part solution - dispatching to browsers needs to be restricted
378        sub.setMaxMessages(getMaxBrowsePageSize());
379    }
380
381    public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
382        configurePrefetch(sub);
383        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
384        sub.setUsePrefetchExtension(isUsePrefetchExtension());
385        sub.setMaxProducersToAudit(getMaxProducersToAudit());
386    }
387
388    public void configurePrefetch(Subscription subscription) {
389
390        final int currentPrefetch = subscription.getConsumerInfo().getPrefetchSize();
391        if (subscription instanceof QueueBrowserSubscription) {
392            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH) {
393                ((QueueBrowserSubscription) subscription).setPrefetchSize(getQueueBrowserPrefetch());
394            }
395        } else if (subscription instanceof QueueSubscription) {
396            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH) {
397                ((QueueSubscription) subscription).setPrefetchSize(getQueuePrefetch());
398            }
399        } else if (subscription instanceof DurableTopicSubscription) {
400            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH ||
401                    subscription.getConsumerInfo().getPrefetchSize() == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH) {
402                ((DurableTopicSubscription)subscription).setPrefetchSize(getDurableTopicPrefetch());
403            }
404        } else if (subscription instanceof TopicSubscription) {
405            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH) {
406                ((TopicSubscription) subscription).setPrefetchSize(getTopicPrefetch());
407            }
408        }
409        if (currentPrefetch != 0 && subscription.getPrefetchSize() == 0) {
410            // tell the sub so that it can issue a pull request
411            subscription.updateConsumerPrefetch(0);
412        }
413    }
414
415    private boolean isUpdate(String property, Set<String> includedProperties) {
416        return includedProperties == null || includedProperties.contains(property);
417    }
418    // Properties
419    // -------------------------------------------------------------------------
420    public DispatchPolicy getDispatchPolicy() {
421        return dispatchPolicy;
422    }
423
424    public void setDispatchPolicy(DispatchPolicy policy) {
425        this.dispatchPolicy = policy;
426    }
427
428    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
429        return subscriptionRecoveryPolicy;
430    }
431
432    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
433        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
434    }
435
436    public boolean isSendAdvisoryIfNoConsumers() {
437        return sendAdvisoryIfNoConsumers;
438    }
439
440    /**
441     * Sends an advisory message if a non-persistent message is sent and there
442     * are no active consumers
443     */
444    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
445        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
446    }
447
448    public DeadLetterStrategy getDeadLetterStrategy() {
449        return deadLetterStrategy;
450    }
451
452    /**
453     * Sets the policy used to determine which dead letter queue destination
454     * should be used
455     */
456    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
457        this.deadLetterStrategy = deadLetterStrategy;
458    }
459
460    public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
461        return pendingMessageLimitStrategy;
462    }
463
464    /**
465     * Sets the strategy to calculate the maximum number of messages that are
466     * allowed to be pending on consumers (in addition to their prefetch sizes).
467     * Once the limit is reached, non-durable topics can then start discarding
468     * old messages. This allows us to keep dispatching messages to slow
469     * consumers while not blocking fast consumers and discarding the messages
470     * oldest first.
471     */
472    public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) {
473        this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
474    }
475
476    public MessageEvictionStrategy getMessageEvictionStrategy() {
477        return messageEvictionStrategy;
478    }
479
480    /**
481     * Sets the eviction strategy used to decide which message to evict when the
482     * slow consumer needs to discard messages
483     */
484    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
485        this.messageEvictionStrategy = messageEvictionStrategy;
486    }
487
488    public long getMemoryLimit() {
489        return memoryLimit;
490    }
491
492    /**
493     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
494     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
495     */
496    public void setMemoryLimit(long memoryLimit) {
497        this.memoryLimit = memoryLimit;
498    }
499
500    public MessageGroupMapFactory getMessageGroupMapFactory() {
501        if (messageGroupMapFactory == null) {
502            try {
503            messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType());
504            }catch(Exception e){
505                LOG.error("Failed to create message group Factory ",e);
506            }
507        }
508        return messageGroupMapFactory;
509    }
510
511    /**
512     * Sets the factory used to create new instances of {MessageGroupMap} used
513     * to implement the <a
514     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
515     * functionality.
516     */
517    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
518        this.messageGroupMapFactory = messageGroupMapFactory;
519    }
520
521
522    public String getMessageGroupMapFactoryType() {
523        return messageGroupMapFactoryType;
524    }
525
526    public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) {
527        this.messageGroupMapFactoryType = messageGroupMapFactoryType;
528    }
529
530
531    /**
532     * @return the pendingDurableSubscriberPolicy
533     */
534    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
535        return this.pendingDurableSubscriberPolicy;
536    }
537
538    /**
539     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy
540     *                to set
541     */
542    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
543        this.pendingDurableSubscriberPolicy = pendingDurableSubscriberPolicy;
544    }
545
546    /**
547     * @return the pendingQueuePolicy
548     */
549    public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
550        return this.pendingQueuePolicy;
551    }
552
553    /**
554     * @param pendingQueuePolicy the pendingQueuePolicy to set
555     */
556    public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy) {
557        this.pendingQueuePolicy = pendingQueuePolicy;
558    }
559
560    /**
561     * @return the pendingSubscriberPolicy
562     */
563    public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy() {
564        return this.pendingSubscriberPolicy;
565    }
566
567    /**
568     * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
569     */
570    public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) {
571        this.pendingSubscriberPolicy = pendingSubscriberPolicy;
572    }
573
574    /**
575     * @return true if producer flow control enabled
576     */
577    public boolean isProducerFlowControl() {
578        return producerFlowControl;
579    }
580
581    /**
582     * @param producerFlowControl
583     */
584    public void setProducerFlowControl(boolean producerFlowControl) {
585        this.producerFlowControl = producerFlowControl;
586    }
587
588    /**
589     * @return true if topic is always retroactive
590     */
591    public boolean isAlwaysRetroactive() {
592        return alwaysRetroactive;
593    }
594
595    /**
596     * @param alwaysRetroactive
597     */
598    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
599        this.alwaysRetroactive = alwaysRetroactive;
600    }
601
602
603    /**
604     * Set's the interval at which warnings about producers being blocked by
605     * resource usage will be triggered. Values of 0 or less will disable
606     * warnings
607     *
608     * @param blockedProducerWarningInterval the interval at which warning about
609     *            blocked producers will be triggered.
610     */
611    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
612        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
613    }
614
615    /**
616     *
617     * @return the interval at which warning about blocked producers will be
618     *         triggered.
619     */
620    public long getBlockedProducerWarningInterval() {
621        return blockedProducerWarningInterval;
622    }
623
624    /**
625     * @return the maxProducersToAudit
626     */
627    public int getMaxProducersToAudit() {
628        return maxProducersToAudit;
629    }
630
631    /**
632     * @param maxProducersToAudit the maxProducersToAudit to set
633     */
634    public void setMaxProducersToAudit(int maxProducersToAudit) {
635        this.maxProducersToAudit = maxProducersToAudit;
636    }
637
638    /**
639     * @return the maxAuditDepth
640     */
641    public int getMaxAuditDepth() {
642        return maxAuditDepth;
643    }
644
645    /**
646     * @param maxAuditDepth the maxAuditDepth to set
647     */
648    public void setMaxAuditDepth(int maxAuditDepth) {
649        this.maxAuditDepth = maxAuditDepth;
650    }
651
652    /**
653     * @return the enableAudit
654     */
655    public boolean isEnableAudit() {
656        return enableAudit;
657    }
658
659    /**
660     * @param enableAudit the enableAudit to set
661     */
662    public void setEnableAudit(boolean enableAudit) {
663        this.enableAudit = enableAudit;
664    }
665
666    public int getMaxQueueAuditDepth() {
667        return maxQueueAuditDepth;
668    }
669
670    public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
671        this.maxQueueAuditDepth = maxQueueAuditDepth;
672    }
673
674    public boolean isOptimizedDispatch() {
675        return optimizedDispatch;
676    }
677
678    public void setOptimizedDispatch(boolean optimizedDispatch) {
679        this.optimizedDispatch = optimizedDispatch;
680    }
681
682    public int getMaxPageSize() {
683        return maxPageSize;
684    }
685
686    public void setMaxPageSize(int maxPageSize) {
687        this.maxPageSize = maxPageSize;
688    }
689
690    public int getMaxBrowsePageSize() {
691        return maxBrowsePageSize;
692    }
693
694    public void setMaxBrowsePageSize(int maxPageSize) {
695        this.maxBrowsePageSize = maxPageSize;
696    }
697
698    public boolean isUseCache() {
699        return useCache;
700    }
701
702    public void setUseCache(boolean useCache) {
703        this.useCache = useCache;
704    }
705
706    public long getMinimumMessageSize() {
707        return minimumMessageSize;
708    }
709
710    public void setMinimumMessageSize(long minimumMessageSize) {
711        this.minimumMessageSize = minimumMessageSize;
712    }
713
714    public boolean isUseConsumerPriority() {
715        return useConsumerPriority;
716    }
717
718    public void setUseConsumerPriority(boolean useConsumerPriority) {
719        this.useConsumerPriority = useConsumerPriority;
720    }
721
722    public boolean isStrictOrderDispatch() {
723        return strictOrderDispatch;
724    }
725
726    public void setStrictOrderDispatch(boolean strictOrderDispatch) {
727        this.strictOrderDispatch = strictOrderDispatch;
728    }
729
730    public boolean isLazyDispatch() {
731        return lazyDispatch;
732    }
733
734    public void setLazyDispatch(boolean lazyDispatch) {
735        this.lazyDispatch = lazyDispatch;
736    }
737
738    public int getTimeBeforeDispatchStarts() {
739        return timeBeforeDispatchStarts;
740    }
741
742    public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
743        this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
744    }
745
746    public int getConsumersBeforeDispatchStarts() {
747        return consumersBeforeDispatchStarts;
748    }
749
750    public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
751        this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
752    }
753
754    /**
755     * @return the advisoryForSlowConsumers
756     */
757    public boolean isAdvisoryForSlowConsumers() {
758        return advisoryForSlowConsumers;
759    }
760
761    /**
762     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
763     */
764    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
765        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
766    }
767
768    /**
769     * @return the advisoryForDiscardingMessages
770     */
771    public boolean isAdvisoryForDiscardingMessages() {
772        return advisoryForDiscardingMessages;
773    }
774
775    /**
776     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
777     */
778    public void setAdvisoryForDiscardingMessages(
779            boolean advisoryForDiscardingMessages) {
780        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
781    }
782
783    /**
784     * @return the advisoryWhenFull
785     */
786    public boolean isAdvisoryWhenFull() {
787        return advisoryWhenFull;
788    }
789
790    /**
791     * @param advisoryWhenFull the advisoryWhenFull to set
792     */
793    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
794        this.advisoryWhenFull = advisoryWhenFull;
795    }
796
797    /**
798     * @return the advisoryForDelivery
799     */
800    public boolean isAdvisoryForDelivery() {
801        return advisoryForDelivery;
802    }
803
804    /**
805     * @param advisoryForDelivery the advisoryForDelivery to set
806     */
807    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
808        this.advisoryForDelivery = advisoryForDelivery;
809    }
810
811    /**
812     * @return the advisoryForConsumed
813     */
814    public boolean isAdvisoryForConsumed() {
815        return advisoryForConsumed;
816    }
817
818    /**
819     * @param advisoryForConsumed the advisoryForConsumed to set
820     */
821    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
822        this.advisoryForConsumed = advisoryForConsumed;
823    }
824
825    /**
826     * @return the advisdoryForFastProducers
827     */
828    public boolean isAdvisoryForFastProducers() {
829        return advisoryForFastProducers;
830    }
831
832    /**
833     * @param advisoryForFastProducers the advisdoryForFastProducers to set
834     */
835    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
836        this.advisoryForFastProducers = advisoryForFastProducers;
837    }
838
839    /**
840     * Returns true if the original message body should be included when applicable
841     * for advisory messages
842     *
843     * @return
844     */
845    public boolean isIncludeBodyForAdvisory() {
846        return includeBodyForAdvisory;
847    }
848
849    /**
850     * Sets if the original message body should be included when applicable
851     * for advisory messages
852     *
853     * @param includeBodyForAdvisory
854     */
855    public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
856        this.includeBodyForAdvisory = includeBodyForAdvisory;
857    }
858
859    public void setMaxExpirePageSize(int maxExpirePageSize) {
860        this.maxExpirePageSize = maxExpirePageSize;
861    }
862
863    public int getMaxExpirePageSize() {
864        return maxExpirePageSize;
865    }
866
867    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
868        this.expireMessagesPeriod = expireMessagesPeriod;
869    }
870
871    public long getExpireMessagesPeriod() {
872        return expireMessagesPeriod;
873    }
874
875    /**
876     * Get the queuePrefetch
877     * @return the queuePrefetch
878     */
879    public int getQueuePrefetch() {
880        return this.queuePrefetch;
881    }
882
883    /**
884     * Set the queuePrefetch
885     * @param queuePrefetch the queuePrefetch to set
886     */
887    public void setQueuePrefetch(int queuePrefetch) {
888        this.queuePrefetch = queuePrefetch;
889    }
890
891    /**
892     * Get the queueBrowserPrefetch
893     * @return the queueBrowserPrefetch
894     */
895    public int getQueueBrowserPrefetch() {
896        return this.queueBrowserPrefetch;
897    }
898
899    /**
900     * Set the queueBrowserPrefetch
901     * @param queueBrowserPrefetch the queueBrowserPrefetch to set
902     */
903    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
904        this.queueBrowserPrefetch = queueBrowserPrefetch;
905    }
906
907    /**
908     * Get the topicPrefetch
909     * @return the topicPrefetch
910     */
911    public int getTopicPrefetch() {
912        return this.topicPrefetch;
913    }
914
915    /**
916     * Set the topicPrefetch
917     * @param topicPrefetch the topicPrefetch to set
918     */
919    public void setTopicPrefetch(int topicPrefetch) {
920        this.topicPrefetch = topicPrefetch;
921    }
922
923    /**
924     * Get the durableTopicPrefetch
925     * @return the durableTopicPrefetch
926     */
927    public int getDurableTopicPrefetch() {
928        return this.durableTopicPrefetch;
929    }
930
931    /**
932     * Set the durableTopicPrefetch
933     * @param durableTopicPrefetch the durableTopicPrefetch to set
934     */
935    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
936        this.durableTopicPrefetch = durableTopicPrefetch;
937    }
938
939    public boolean isUsePrefetchExtension() {
940        return this.usePrefetchExtension;
941    }
942
943    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
944        this.usePrefetchExtension = usePrefetchExtension;
945    }
946
947    public int getCursorMemoryHighWaterMark() {
948        return this.cursorMemoryHighWaterMark;
949    }
950
951    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
952        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
953    }
954
955    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
956        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
957    }
958
959    public int getStoreUsageHighWaterMark() {
960        return storeUsageHighWaterMark;
961    }
962
963    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
964        this.slowConsumerStrategy = slowConsumerStrategy;
965    }
966
967    public SlowConsumerStrategy getSlowConsumerStrategy() {
968        return this.slowConsumerStrategy;
969    }
970
971
972    public boolean isPrioritizedMessages() {
973        return this.prioritizedMessages;
974    }
975
976    public void setPrioritizedMessages(boolean prioritizedMessages) {
977        this.prioritizedMessages = prioritizedMessages;
978    }
979
980    public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
981        this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
982    }
983
984    public boolean isAllConsumersExclusiveByDefault() {
985        return allConsumersExclusiveByDefault;
986    }
987
988    public boolean isGcInactiveDestinations() {
989        return this.gcInactiveDestinations;
990    }
991
992    public void setGcInactiveDestinations(boolean gcInactiveDestinations) {
993        this.gcInactiveDestinations = gcInactiveDestinations;
994    }
995
996    /**
997     * @return the amount of time spent inactive before GC of the destination kicks in.
998     *
999     * @deprecated use getInactiveTimeoutBeforeGC instead.
1000     */
1001    @Deprecated
1002    public long getInactiveTimoutBeforeGC() {
1003        return getInactiveTimeoutBeforeGC();
1004    }
1005
1006    /**
1007     * Sets the amount of time a destination is inactive before it is marked for GC
1008     *
1009     * @param inactiveTimoutBeforeGC
1010     *        time in milliseconds to configure as the inactive timeout.
1011     *
1012     * @deprecated use getInactiveTimeoutBeforeGC instead.
1013     */
1014    @Deprecated
1015    public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
1016        setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC);
1017    }
1018
1019    /**
1020     * @return the amount of time spent inactive before GC of the destination kicks in.
1021     */
1022    public long getInactiveTimeoutBeforeGC() {
1023        return this.inactiveTimeoutBeforeGC;
1024    }
1025
1026    /**
1027     * Sets the amount of time a destination is inactive before it is marked for GC
1028     *
1029     * @param inactiveTimoutBeforeGC
1030     *        time in milliseconds to configure as the inactive timeout.
1031     */
1032    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
1033        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
1034    }
1035
1036    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
1037        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
1038    }
1039
1040    public boolean isGcWithNetworkConsumers() {
1041        return gcWithNetworkConsumers;
1042    }
1043
1044    public boolean isReduceMemoryFootprint() {
1045        return reduceMemoryFootprint;
1046    }
1047
1048    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
1049        this.reduceMemoryFootprint = reduceMemoryFootprint;
1050    }
1051
1052    public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) {
1053        this.networkBridgeFilterFactory = networkBridgeFilterFactory;
1054    }
1055
1056    public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() {
1057        return networkBridgeFilterFactory;
1058    }
1059
1060    public boolean isDoOptimzeMessageStorage() {
1061        return doOptimzeMessageStorage;
1062    }
1063
1064    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
1065        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
1066    }
1067
1068    public int getOptimizeMessageStoreInFlightLimit() {
1069        return optimizeMessageStoreInFlightLimit;
1070    }
1071
1072    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
1073        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
1074    }
1075
1076    public void setPersistJMSRedelivered(boolean val) {
1077        this.persistJMSRedelivered = val;
1078    }
1079
1080    public boolean isPersistJMSRedelivered() {
1081        return persistJMSRedelivered;
1082    }
1083
1084    public int getMaxDestinations() {
1085        return maxDestinations;
1086    }
1087
1088    /**
1089     * Sets the maximum number of destinations that can be created
1090     *
1091     * @param maxDestinations
1092     *            maximum number of destinations
1093     */
1094    public void setMaxDestinations(int maxDestinations) {
1095        this.maxDestinations = maxDestinations;
1096    }
1097
1098    @Override
1099    public String toString() {
1100        return "PolicyEntry [" + destination + "]";
1101    }
1102}