001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.util.Map;
021
022import org.apache.activemq.advisory.AdvisorySupport;
023import org.apache.activemq.broker.region.RegionBroker;
024import org.apache.activemq.broker.region.Subscription;
025import org.apache.activemq.broker.region.TopicRegion;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.ConsumerId;
028import org.apache.activemq.command.ConsumerInfo;
029import org.apache.activemq.command.RemoveSubscriptionInfo;
030import org.apache.activemq.filter.DestinationFilter;
031import org.apache.activemq.transport.Transport;
032import org.apache.activemq.util.TypeConversionSupport;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Consolidates subscriptions
038 */
039public class DurableConduitBridge extends ConduitBridge {
040    private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
041
042    @Override
043    public String toString() {
044        return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
045    }
046    /**
047     * Constructor
048     *
049     * @param configuration
050     *
051     * @param localBroker
052     * @param remoteBroker
053     */
054    public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
055                                Transport remoteBroker) {
056        super(configuration, localBroker, remoteBroker);
057    }
058
059    /**
060     * Subscriptions for these destinations are always created
061     *
062     */
063    @Override
064    protected void setupStaticDestinations() {
065        super.setupStaticDestinations();
066        ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
067        if (dests != null) {
068            for (ActiveMQDestination dest : dests) {
069                if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
070                    try {
071                        //Filtering by non-empty subscriptions, see AMQ-5875
072                        if (dest.isTopic()) {
073                            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
074                            TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
075
076                            String candidateSubName = getSubscriberName(dest);
077                            for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
078                                String subName = subscription.getConsumerInfo().getSubscriptionName();
079                                if (subName != null && subName.equals(candidateSubName)) {
080                                    DemandSubscription sub = createDemandSubscription(dest, subName);
081                                    sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
082                                    sub.setStaticallyIncluded(true);
083                                    addSubscription(sub);
084                                    break;
085                                }
086                            }
087                        }
088                    } catch (IOException e) {
089                        LOG.error("Failed to add static destination {}", dest, e);
090                    }
091                    LOG.trace("Forwarding messages for durable destination: {}", dest);
092                } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) {
093                    if (dest.isTopic()) {
094                        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
095                        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
096
097                        String candidateSubName = getSubscriberName(dest);
098                        for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
099                            String subName = subscription.getConsumerInfo().getSubscriptionName();
100                            if (subName != null && subName.equals(candidateSubName)) {
101                               try {
102                                    // remove the NC subscription as it is no longer for a permissable dest
103                                    RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
104                                    sending.setClientId(localClientId);
105                                    sending.setSubscriptionName(subName);
106                                    sending.setConnectionId(this.localConnectionInfo.getConnectionId());
107                                    localBroker.oneway(sending);
108                                } catch (IOException e) {
109                                    LOG.debug("Exception removing NC durable subscription: {}", subName, e);
110                                    serviceRemoteException(e);
111                                }
112                                break;
113                            }
114                        }
115                    }
116                }
117            }
118        }
119    }
120
121    @Override
122    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
123        boolean isForcedDurable = isForcedDurable(info);
124
125        if (addToAlreadyInterestedConsumers(info, isForcedDurable)) {
126            return null; // don't want this subscription added
127        }
128        //add our original id to ourselves
129        info.addNetworkConsumerId(info.getConsumerId());
130        ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null;
131
132        if(info.isDurable() || isForcedDurable) {
133            // set the subscriber name to something reproducible
134            info.setSubscriptionName(getSubscriberName(info.getDestination()));
135            // and override the consumerId with something unique so that it won't
136            // be removed if the durable subscriber (at the other end) goes away
137            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
138                               consumerIdGenerator.getNextSequenceId()));
139        }
140        info.setSelector(null);
141        DemandSubscription demandSubscription = doCreateDemandSubscription(info);
142        if (forcedDurableId != null) {
143            demandSubscription.addForcedDurableConsumer(forcedDurableId);
144            forcedDurableRemoteId.add(forcedDurableId);
145        }
146        return demandSubscription;
147    }
148
149
150    private boolean isForcedDurable(ConsumerInfo info) {
151        if (info.isDurable()) {
152            return false;
153        }
154
155        ActiveMQDestination destination = info.getDestination();
156        if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() ||
157                destination.isQueue()) {
158            return false;
159        }
160
161        ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination);
162        if (matching != null) {
163            return isDestForcedDurable(matching);
164        }
165        matching = findMatchingDestination(staticallyIncludedDestinations, destination);
166        if (matching != null) {
167            return isDestForcedDurable(matching);
168        }
169        return false;
170    }
171
172    private boolean isDestForcedDurable(ActiveMQDestination destination) {
173        final Map<String, String> options = destination.getOptions();
174
175        boolean isForceDurable = false;
176        if (options != null) {
177            isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class);
178        }
179
180        return isForceDurable;
181    }
182
183    protected String getSubscriberName(ActiveMQDestination dest) {
184        String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
185        return subscriberName;
186    }
187
188    protected boolean doesConsumerExist(ActiveMQDestination dest) {
189        DestinationFilter filter = DestinationFilter.parseFilter(dest);
190        for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
191            if (filter.matches(ds.getLocalInfo().getDestination())) {
192                return true;
193            }
194        }
195        return false;
196    }
197}