001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.auto; 018 019import java.io.IOException; 020import java.net.Socket; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Set; 026 027import javax.net.ServerSocketFactory; 028 029import org.apache.activemq.broker.BrokerService; 030import org.apache.activemq.broker.BrokerServiceAware; 031import org.apache.activemq.openwire.OpenWireFormatFactory; 032import org.apache.activemq.transport.TransportServer; 033import org.apache.activemq.transport.tcp.TcpTransport; 034import org.apache.activemq.transport.tcp.TcpTransportFactory; 035import org.apache.activemq.transport.tcp.TcpTransportServer; 036import org.apache.activemq.util.IOExceptionSupport; 037import org.apache.activemq.util.IntrospectionSupport; 038import org.apache.activemq.util.URISupport; 039import org.apache.activemq.wireformat.WireFormat; 040 041/** 042 * 043 * 044 */ 045public class AutoTcpTransportFactory extends TcpTransportFactory implements BrokerServiceAware { 046 047 protected BrokerService brokerService; 048 /* (non-Javadoc) 049 * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) 050 */ 051 @Override 052 public void setBrokerService(BrokerService brokerService) { 053 this.brokerService = brokerService; 054 } 055 056 057 @Override 058 public TransportServer doBind(final URI location) throws IOException { 059 try { 060 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); 061 062 Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); 063 this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); 064 065 ServerSocketFactory serverSocketFactory = createServerSocketFactory(); 066 AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); 067 //server.setWireFormatFactory(createWireFormatFactory(options)); 068 server.setWireFormatFactory(new OpenWireFormatFactory()); 069 if (options.get("allowLinkStealing") != null){ 070 allowLinkStealingSet = true; 071 } 072 IntrospectionSupport.setProperties(server, options); 073 server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); 074 server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); 075 server.bind(); 076 077 return server; 078 } catch (URISyntaxException e) { 079 throw IOExceptionSupport.create(e); 080 } 081 } 082 083 boolean allowLinkStealingSet = false; 084 private Set<String> enabledProtocols; 085 086 @Override 087 protected AutoTcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { 088 AutoTcpTransportServer server = new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { 089 090 @Override 091 protected TcpTransport createTransport(Socket socket, WireFormat format) 092 throws IOException { 093 setDefaultLinkStealing(format, this); 094 return super.createTransport(socket, format); 095 } 096 097 @Override 098 protected TcpTransport createTransport(Socket socket, WireFormat format, 099 TcpTransportFactory detectedTransportFactory) throws IOException { 100 setDefaultLinkStealing(format, this); 101 return super.createTransport(socket, format, detectedTransportFactory); 102 } 103 104 }; 105 106 return server; 107 } 108 109 private void setDefaultLinkStealing(WireFormat format, TcpTransportServer server) { 110 if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) { 111 server.setAllowLinkStealing(true); 112 } 113 } 114}