001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.vm; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026 027import org.apache.activemq.broker.BrokerFactory; 028import org.apache.activemq.broker.BrokerFactoryHandler; 029import org.apache.activemq.broker.BrokerRegistry; 030import org.apache.activemq.broker.BrokerService; 031import org.apache.activemq.broker.TransportConnector; 032import org.apache.activemq.transport.MarshallingTransportFilter; 033import org.apache.activemq.transport.Transport; 034import org.apache.activemq.transport.TransportFactory; 035import org.apache.activemq.transport.TransportServer; 036import org.apache.activemq.util.IOExceptionSupport; 037import org.apache.activemq.util.IntrospectionSupport; 038import org.apache.activemq.util.ServiceSupport; 039import org.apache.activemq.util.URISupport; 040import org.apache.activemq.util.URISupport.CompositeData; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043import org.slf4j.MDC; 044 045public class VMTransportFactory extends TransportFactory { 046 047 public static final ConcurrentMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>(); 048 public static final ConcurrentMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>(); 049 public static final ConcurrentMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>(); 050 private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class); 051 052 BrokerFactoryHandler brokerFactoryHandler; 053 054 @Override 055 public Transport doConnect(URI location) throws Exception { 056 return VMTransportServer.configure(doCompositeConnect(location)); 057 } 058 059 @Override 060 public Transport doCompositeConnect(URI location) throws Exception { 061 URI brokerURI; 062 String host; 063 Map<String, String> options; 064 boolean create = true; 065 int waitForStart = -1; 066 CompositeData data = URISupport.parseComposite(location); 067 if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) { 068 brokerURI = data.getComponents()[0]; 069 CompositeData brokerData = URISupport.parseComposite(brokerURI); 070 host = brokerData.getParameters().get("brokerName"); 071 if (host == null) { 072 host = "localhost"; 073 } 074 if (brokerData.getPath() != null) { 075 host = brokerData.getPath(); 076 } 077 options = data.getParameters(); 078 location = new URI("vm://" + host); 079 } else { 080 // If using the less complex vm://localhost?broker.persistent=true 081 // form 082 try { 083 host = extractHost(location); 084 options = URISupport.parseParameters(location); 085 String config = options.remove("brokerConfig"); 086 if (config != null) { 087 brokerURI = new URI(config); 088 } else { 089 Map<String, Object> brokerOptions = IntrospectionSupport.extractProperties(options, "broker."); 090 brokerURI = new URI("broker://()/" + host + "?" 091 + URISupport.createQueryString(brokerOptions)); 092 } 093 if ("false".equals(options.remove("create"))) { 094 create = false; 095 } 096 String waitForStartString = options.remove("waitForStart"); 097 if (waitForStartString != null) { 098 waitForStart = Integer.parseInt(waitForStartString); 099 } 100 } catch (URISyntaxException e1) { 101 throw IOExceptionSupport.create(e1); 102 } 103 location = new URI("vm://" + host); 104 } 105 if (host == null) { 106 host = "localhost"; 107 } 108 VMTransportServer server = SERVERS.get(host); 109 // validate the broker is still active 110 if (!validateBroker(host) || server == null) { 111 BrokerService broker = null; 112 // Synchronize on the registry so that multiple concurrent threads 113 // doing this do not think that the broker has not been created and 114 // cause multiple brokers to be started. 115 synchronized (BrokerRegistry.getInstance().getRegistryMutext()) { 116 broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart); 117 if (broker == null) { 118 if (!create) { 119 throw new IOException("Broker named '" + host + "' does not exist."); 120 } 121 try { 122 if (brokerFactoryHandler != null) { 123 broker = brokerFactoryHandler.createBroker(brokerURI); 124 } else { 125 broker = BrokerFactory.createBroker(brokerURI); 126 } 127 broker.start(); 128 MDC.put("activemq.broker", broker.getBrokerName()); 129 } catch (URISyntaxException e) { 130 throw IOExceptionSupport.create(e); 131 } 132 BROKERS.put(host, broker); 133 BrokerRegistry.getInstance().getRegistryMutext().notifyAll(); 134 } 135 136 server = SERVERS.get(host); 137 if (server == null) { 138 server = (VMTransportServer)bind(location, true); 139 TransportConnector connector = new TransportConnector(server); 140 connector.setBrokerService(broker); 141 connector.setUri(location); 142 connector.setTaskRunnerFactory(broker.getTaskRunnerFactory()); 143 connector.start(); 144 CONNECTORS.put(host, connector); 145 } 146 147 } 148 } 149 150 VMTransport vmtransport = server.connect(); 151 IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options)); 152 IntrospectionSupport.setProperties(vmtransport, options); 153 Transport transport = vmtransport; 154 if (vmtransport.isMarshal()) { 155 Map<String, String> optionsCopy = new HashMap<String, String>(options); 156 transport = new MarshallingTransportFilter(transport, createWireFormat(options), 157 createWireFormat(optionsCopy)); 158 } 159 if (!options.isEmpty()) { 160 throw new IllegalArgumentException("Invalid connect parameters: " + options); 161 } 162 return transport; 163 } 164 165 private static String extractHost(URI location) { 166 String host = location.getHost(); 167 if (host == null || host.length() == 0) { 168 host = location.getAuthority(); 169 if (host == null || host.length() == 0) { 170 host = "localhost"; 171 } 172 } 173 return host; 174 } 175 176 /** 177 * Attempt to find a Broker instance. 178 * 179 * @param registry 180 * the registry in which to search for the BrokerService instance. 181 * @param brokerName 182 * the name of the Broker that should be located. 183 * @param waitForStart 184 * time in milliseconds to wait for a broker to appear and be started. 185 * 186 * @return a BrokerService instance if one is found, or null. 187 */ 188 private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) { 189 BrokerService broker = null; 190 synchronized(registry.getRegistryMutext()) { 191 broker = registry.lookup(brokerName); 192 if (broker == null || waitForStart > 0) { 193 final long expiry = System.currentTimeMillis() + waitForStart; 194 while ((broker == null || !broker.isStarted()) && expiry > System.currentTimeMillis()) { 195 long timeout = Math.max(0, expiry - System.currentTimeMillis()); 196 if (broker == null) { 197 try { 198 LOG.debug("waiting for broker named: " + brokerName + " to enter registry"); 199 registry.getRegistryMutext().wait(timeout); 200 broker = registry.lookup(brokerName); 201 } catch (InterruptedException ignored) { 202 } 203 } 204 if (broker != null && !broker.isStarted()) { 205 LOG.debug("waiting for broker named: " + brokerName + " to start"); 206 timeout = Math.max(0, expiry - System.currentTimeMillis()); 207 // Wait for however long we have left for broker to be started, if 208 // it doesn't get started we need to clear broker so it doesn't get 209 // returned. A null return should throw an exception. 210 if (!broker.waitUntilStarted(timeout)) { 211 broker = null; 212 break; 213 } 214 } 215 } 216 } 217 } 218 return broker; 219 } 220 221 @Override 222 public TransportServer doBind(URI location) throws IOException { 223 return bind(location, false); 224 } 225 226 /** 227 * @param location 228 * @return the TransportServer 229 * @throws IOException 230 */ 231 private TransportServer bind(URI location, boolean dispose) throws IOException { 232 String host = extractHost(location); 233 LOG.debug("binding to broker: " + host); 234 VMTransportServer server = new VMTransportServer(location, dispose); 235 Object currentBoundValue = SERVERS.get(host); 236 if (currentBoundValue != null) { 237 throw new IOException("VMTransportServer already bound at: " + location); 238 } 239 SERVERS.put(host, server); 240 return server; 241 } 242 243 public static void stopped(VMTransportServer server) { 244 String host = extractHost(server.getBindURI()); 245 stopped(host); 246 } 247 248 public static void stopped(String host) { 249 SERVERS.remove(host); 250 TransportConnector connector = CONNECTORS.remove(host); 251 if (connector != null) { 252 LOG.debug("Shutting down VM connectors for broker: " + host); 253 ServiceSupport.dispose(connector); 254 BrokerService broker = BROKERS.remove(host); 255 if (broker != null) { 256 ServiceSupport.dispose(broker); 257 } 258 MDC.remove("activemq.broker"); 259 } 260 } 261 262 public BrokerFactoryHandler getBrokerFactoryHandler() { 263 return brokerFactoryHandler; 264 } 265 266 public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) { 267 this.brokerFactoryHandler = brokerFactoryHandler; 268 } 269 270 private boolean validateBroker(String host) { 271 boolean result = true; 272 if (BROKERS.containsKey(host) || SERVERS.containsKey(host) || CONNECTORS.containsKey(host)) { 273 // check the broker is still in the BrokerRegistry 274 TransportConnector connector = CONNECTORS.get(host); 275 if (BrokerRegistry.getInstance().lookup(host) == null 276 || (connector != null && connector.getBroker().isStopped())) { 277 result = false; 278 // clean-up 279 BROKERS.remove(host); 280 SERVERS.remove(host); 281 if (connector != null) { 282 CONNECTORS.remove(host); 283 if (connector != null) { 284 ServiceSupport.dispose(connector); 285 } 286 } 287 } 288 } 289 return result; 290 } 291}