001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.util.Map; 021 022import org.apache.activemq.advisory.AdvisorySupport; 023import org.apache.activemq.broker.region.RegionBroker; 024import org.apache.activemq.broker.region.Subscription; 025import org.apache.activemq.broker.region.TopicRegion; 026import org.apache.activemq.command.ActiveMQDestination; 027import org.apache.activemq.command.ConsumerId; 028import org.apache.activemq.command.ConsumerInfo; 029import org.apache.activemq.command.RemoveSubscriptionInfo; 030import org.apache.activemq.filter.DestinationFilter; 031import org.apache.activemq.transport.Transport; 032import org.apache.activemq.util.TypeConversionSupport; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Consolidates subscriptions 038 */ 039public class DurableConduitBridge extends ConduitBridge { 040 private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class); 041 042 @Override 043 public String toString() { 044 return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName(); 045 } 046 /** 047 * Constructor 048 * 049 * @param configuration 050 * 051 * @param localBroker 052 * @param remoteBroker 053 */ 054 public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, 055 Transport remoteBroker) { 056 super(configuration, localBroker, remoteBroker); 057 } 058 059 /** 060 * Subscriptions for these destinations are always created 061 * 062 */ 063 @Override 064 protected void setupStaticDestinations() { 065 super.setupStaticDestinations(); 066 ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations; 067 if (dests != null) { 068 for (ActiveMQDestination dest : dests) { 069 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) { 070 try { 071 //Filtering by non-empty subscriptions, see AMQ-5875 072 if (dest.isTopic()) { 073 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 074 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 075 076 String candidateSubName = getSubscriberName(dest); 077 for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { 078 String subName = subscription.getConsumerInfo().getSubscriptionName(); 079 if (subName != null && subName.equals(candidateSubName)) { 080 DemandSubscription sub = createDemandSubscription(dest, subName); 081 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); 082 sub.setStaticallyIncluded(true); 083 addSubscription(sub); 084 break; 085 } 086 } 087 } 088 } catch (IOException e) { 089 LOG.error("Failed to add static destination {}", dest, e); 090 } 091 LOG.trace("Forwarding messages for durable destination: {}", dest); 092 } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) { 093 if (dest.isTopic()) { 094 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 095 TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 096 097 String candidateSubName = getSubscriberName(dest); 098 for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { 099 String subName = subscription.getConsumerInfo().getSubscriptionName(); 100 if (subName != null && subName.equals(candidateSubName)) { 101 try { 102 // remove the NC subscription as it is no longer for a permissable dest 103 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 104 sending.setClientId(localClientId); 105 sending.setSubscriptionName(subName); 106 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 107 localBroker.oneway(sending); 108 } catch (IOException e) { 109 LOG.debug("Exception removing NC durable subscription: {}", subName, e); 110 serviceRemoteException(e); 111 } 112 break; 113 } 114 } 115 } 116 } 117 } 118 } 119 } 120 121 @Override 122 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 123 boolean isForcedDurable = isForcedDurable(info); 124 125 if (addToAlreadyInterestedConsumers(info, isForcedDurable)) { 126 return null; // don't want this subscription added 127 } 128 //add our original id to ourselves 129 info.addNetworkConsumerId(info.getConsumerId()); 130 ConsumerId forcedDurableId = isForcedDurable ? info.getConsumerId() : null; 131 132 if(info.isDurable() || isForcedDurable) { 133 // set the subscriber name to something reproducible 134 info.setSubscriptionName(getSubscriberName(info.getDestination())); 135 // and override the consumerId with something unique so that it won't 136 // be removed if the durable subscriber (at the other end) goes away 137 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), 138 consumerIdGenerator.getNextSequenceId())); 139 } 140 info.setSelector(null); 141 DemandSubscription demandSubscription = doCreateDemandSubscription(info); 142 if (forcedDurableId != null) { 143 demandSubscription.addForcedDurableConsumer(forcedDurableId); 144 forcedDurableRemoteId.add(forcedDurableId); 145 } 146 return demandSubscription; 147 } 148 149 150 private boolean isForcedDurable(ConsumerInfo info) { 151 if (info.isDurable()) { 152 return false; 153 } 154 155 ActiveMQDestination destination = info.getDestination(); 156 if (AdvisorySupport.isAdvisoryTopic(destination) || destination.isTemporary() || 157 destination.isQueue()) { 158 return false; 159 } 160 161 ActiveMQDestination matching = findMatchingDestination(dynamicallyIncludedDestinations, destination); 162 if (matching != null) { 163 return isDestForcedDurable(matching); 164 } 165 matching = findMatchingDestination(staticallyIncludedDestinations, destination); 166 if (matching != null) { 167 return isDestForcedDurable(matching); 168 } 169 return false; 170 } 171 172 private boolean isDestForcedDurable(ActiveMQDestination destination) { 173 final Map<String, String> options = destination.getOptions(); 174 175 boolean isForceDurable = false; 176 if (options != null) { 177 isForceDurable = (boolean) TypeConversionSupport.convert(options.get("forceDurable"), boolean.class); 178 } 179 180 return isForceDurable; 181 } 182 183 protected String getSubscriberName(ActiveMQDestination dest) { 184 String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName(); 185 return subscriberName; 186 } 187 188 protected boolean doesConsumerExist(ActiveMQDestination dest) { 189 DestinationFilter filter = DestinationFilter.parseFilter(dest); 190 for (DemandSubscription ds : subscriptionMapByLocalId.values()) { 191 if (filter.matches(ds.getLocalInfo().getDestination())) { 192 return true; 193 } 194 } 195 return false; 196 } 197}