001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import java.io.File; 020import java.net.URI; 021import java.util.Set; 022 023import javax.jms.JMSException; 024import javax.management.ObjectName; 025 026import org.apache.activemq.advisory.AdvisorySupport; 027import org.apache.activemq.broker.Broker; 028import org.apache.activemq.broker.BrokerFilter; 029import org.apache.activemq.broker.BrokerService; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.ProducerBrokerExchange; 032import org.apache.activemq.broker.jmx.BrokerViewMBean; 033import org.apache.activemq.broker.jmx.SubscriptionViewMBean; 034import org.apache.activemq.broker.region.Destination; 035import org.apache.activemq.broker.region.DestinationStatistics; 036import org.apache.activemq.broker.region.RegionBroker; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ActiveMQMapMessage; 039import org.apache.activemq.command.Message; 040import org.apache.activemq.command.MessageId; 041import org.apache.activemq.command.ProducerId; 042import org.apache.activemq.command.ProducerInfo; 043import org.apache.activemq.state.ProducerState; 044import org.apache.activemq.usage.SystemUsage; 045import org.apache.activemq.util.IdGenerator; 046import org.apache.activemq.util.LongSequenceGenerator; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049/** 050 * A StatisticsBroker You can retrieve a Map Message for a Destination - or 051 * Broker containing statistics as key-value pairs The message must contain a 052 * replyTo Destination - else its ignored 053 * 054 */ 055public class StatisticsBroker extends BrokerFilter { 056 private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class); 057 static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; 058 static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; 059 static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset"; 060 static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription"; 061 static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + ".List.End.With.Null"; 062 private static final IdGenerator ID_GENERATOR = new IdGenerator(); 063 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 064 protected final ProducerId advisoryProducerId = new ProducerId(); 065 protected BrokerViewMBean brokerView; 066 067 /** 068 * 069 * Constructor 070 * 071 * @param next 072 */ 073 public StatisticsBroker(Broker next) { 074 super(next); 075 this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); 076 } 077 078 /** 079 * Sets the persistence mode 080 * 081 * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, 082 * org.apache.activemq.command.Message) 083 */ 084 @Override 085 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 086 ActiveMQDestination msgDest = messageSend.getDestination(); 087 ActiveMQDestination replyTo = messageSend.getReplyTo(); 088 if (replyTo != null) { 089 String physicalName = msgDest.getPhysicalName(); 090 boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0, 091 STATS_DESTINATION_PREFIX.length()); 092 boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX 093 .length()); 094 boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX 095 .length()); 096 BrokerService brokerService = getBrokerService(); 097 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); 098 if (destStats) { 099 String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); 100 if (destinationName.startsWith(".")) { 101 destinationName = destinationName.substring(1); 102 } 103 String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,""); 104 boolean endListMessage = !destinationName.equals(destinationQuery); 105 ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType()); 106 Set<Destination> destinations = getDestinations(queryDestination); 107 108 for (Destination dest : destinations) { 109 DestinationStatistics stats = dest.getDestinationStatistics(); 110 if (stats != null) { 111 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 112 statsMessage.setString("brokerName", regionBroker.getBrokerName()); 113 statsMessage.setString("brokerId", regionBroker.getBrokerId().toString()); 114 statsMessage.setString("destinationName", dest.getActiveMQDestination().toString()); 115 statsMessage.setLong("size", stats.getMessages().getCount()); 116 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount()); 117 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount()); 118 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); 119 statsMessage.setLong("expiredCount", stats.getExpired().getCount()); 120 statsMessage.setLong("inflightCount", stats.getInflight().getCount()); 121 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); 122 // we are okay with the size without decimals so cast to long 123 statsMessage.setLong("averageMessageSize", (long) stats.getMessageSize().getAveragePerSecond()); 124 statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage()); 125 statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage()); 126 statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit()); 127 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime()); 128 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime()); 129 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); 130 statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); 131 statsMessage.setLong("producerCount", stats.getProducers().getCount()); 132 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); 133 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); 134 } 135 } 136 if(endListMessage){ 137 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 138 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); 139 sendStats(producerExchange.getConnectionContext(),statsMessage,replyTo); 140 } 141 142 } else if (subStats) { 143 sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo); 144 sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo); 145 } else if (brokerStats) { 146 147 if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) { 148 getBrokerView().resetStatistics(); 149 } 150 151 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 152 SystemUsage systemUsage = brokerService.getSystemUsage(); 153 DestinationStatistics stats = regionBroker.getDestinationStatistics(); 154 statsMessage.setString("brokerName", regionBroker.getBrokerName()); 155 statsMessage.setString("brokerId", regionBroker.getBrokerId().toString()); 156 statsMessage.setLong("size", stats.getMessages().getCount()); 157 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount()); 158 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount()); 159 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); 160 statsMessage.setLong("expiredCount", stats.getExpired().getCount()); 161 statsMessage.setLong("inflightCount", stats.getInflight().getCount()); 162 // we are okay with the size without decimals so cast to long 163 statsMessage.setLong("averageMessageSize",(long) stats.getMessageSize().getAverageSize()); 164 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); 165 statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage()); 166 statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage()); 167 statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit()); 168 statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage()); 169 statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage()); 170 statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit()); 171 statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage()); 172 statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage()); 173 statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit()); 174 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime()); 175 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime()); 176 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); 177 statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); 178 statsMessage.setLong("producerCount", stats.getProducers().getCount()); 179 String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp"); 180 answer = answer != null ? answer : ""; 181 statsMessage.setString("openwire", answer); 182 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp"); 183 answer = answer != null ? answer : ""; 184 statsMessage.setString("stomp", answer); 185 answer = brokerService.getTransportConnectorURIsAsMap().get("ssl"); 186 answer = answer != null ? answer : ""; 187 statsMessage.setString("ssl", answer); 188 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl"); 189 answer = answer != null ? answer : ""; 190 statsMessage.setString("stomp+ssl", answer); 191 URI uri = brokerService.getVmConnectorURI(); 192 answer = uri != null ? uri.toString() : ""; 193 statsMessage.setString("vm", answer); 194 File file = brokerService.getDataDirectoryFile(); 195 answer = file != null ? file.getCanonicalPath() : ""; 196 statsMessage.setString("dataDirectory", answer); 197 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); 198 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); 199 } else { 200 super.send(producerExchange, messageSend); 201 } 202 } else { 203 super.send(producerExchange, messageSend); 204 } 205 } 206 207 BrokerViewMBean getBrokerView() throws Exception { 208 if (this.brokerView == null) { 209 ObjectName brokerName = getBrokerService().getBrokerObjectName(); 210 this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName, 211 BrokerViewMBean.class, true); 212 } 213 return this.brokerView; 214 } 215 216 @Override 217 public void start() throws Exception { 218 super.start(); 219 LOG.info("Starting StatisticsBroker"); 220 } 221 222 @Override 223 public void stop() throws Exception { 224 super.stop(); 225 } 226 227 protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception { 228 for (int i = 0; i < subscribers.length; i++) { 229 ObjectName name = subscribers[i]; 230 SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true); 231 ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber); 232 sendStats(context, statsMessage, replyTo); 233 } 234 } 235 236 protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException { 237 Broker regionBroker = getBrokerService().getRegionBroker(); 238 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); 239 statsMessage.setString("brokerName", regionBroker.getBrokerName()); 240 statsMessage.setString("brokerId", regionBroker.getBrokerId().toString()); 241 statsMessage.setString("destinationName", subscriber.getDestinationName()); 242 statsMessage.setString("clientId", subscriber.getClientId()); 243 statsMessage.setString("connectionId", subscriber.getConnectionId()); 244 statsMessage.setLong("sessionId", subscriber.getSessionId()); 245 statsMessage.setString("selector", subscriber.getSelector()); 246 statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter()); 247 statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter()); 248 statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter()); 249 statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize()); 250 statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize()); 251 statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit()); 252 statsMessage.setBoolean("exclusive", subscriber.isExclusive()); 253 statsMessage.setBoolean("retroactive", subscriber.isRetroactive()); 254 statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer()); 255 return statsMessage; 256 } 257 258 protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo) 259 throws Exception { 260 msg.setPersistent(false); 261 msg.setTimestamp(System.currentTimeMillis()); 262 msg.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY); 263 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 264 msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId())); 265 msg.setDestination(replyTo); 266 msg.setResponseRequired(false); 267 msg.setProducerId(this.advisoryProducerId); 268 boolean originalFlowControl = context.isProducerFlowControl(); 269 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 270 producerExchange.setConnectionContext(context); 271 producerExchange.setMutable(true); 272 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 273 try { 274 context.setProducerFlowControl(false); 275 this.next.send(producerExchange, msg); 276 } finally { 277 context.setProducerFlowControl(originalFlowControl); 278 } 279 } 280 281}