001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.net.URISyntaxException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028 029import javax.jms.Connection; 030import javax.jms.InvalidSelectorException; 031import javax.jms.MessageProducer; 032import javax.jms.Session; 033import javax.management.MalformedObjectNameException; 034import javax.management.ObjectName; 035import javax.management.openmbean.CompositeData; 036import javax.management.openmbean.CompositeDataSupport; 037import javax.management.openmbean.CompositeType; 038import javax.management.openmbean.OpenDataException; 039import javax.management.openmbean.TabularData; 040import javax.management.openmbean.TabularDataSupport; 041import javax.management.openmbean.TabularType; 042 043import org.apache.activemq.ActiveMQConnectionFactory; 044import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 045import org.apache.activemq.broker.region.Destination; 046import org.apache.activemq.broker.region.Subscription; 047import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 048import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 049import org.apache.activemq.command.ActiveMQDestination; 050import org.apache.activemq.command.ActiveMQMessage; 051import org.apache.activemq.command.ActiveMQTextMessage; 052import org.apache.activemq.command.Message; 053import org.apache.activemq.filter.BooleanExpression; 054import org.apache.activemq.filter.MessageEvaluationContext; 055import org.apache.activemq.selector.SelectorParser; 056import org.apache.activemq.store.MessageStore; 057import org.apache.activemq.util.URISupport; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061public class DestinationView implements DestinationViewMBean { 062 private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class); 063 protected final Destination destination; 064 protected final ManagedRegionBroker broker; 065 066 public DestinationView(ManagedRegionBroker broker, Destination destination) { 067 this.broker = broker; 068 this.destination = destination; 069 } 070 071 public void gc() { 072 destination.gc(); 073 } 074 075 @Override 076 public String getName() { 077 return destination.getName(); 078 } 079 080 @Override 081 public void resetStatistics() { 082 destination.getDestinationStatistics().reset(); 083 } 084 085 @Override 086 public long getEnqueueCount() { 087 return destination.getDestinationStatistics().getEnqueues().getCount(); 088 } 089 090 @Override 091 public long getDequeueCount() { 092 return destination.getDestinationStatistics().getDequeues().getCount(); 093 } 094 095 @Override 096 public long getForwardCount() { 097 return destination.getDestinationStatistics().getForwards().getCount(); 098 } 099 100 @Override 101 public long getDispatchCount() { 102 return destination.getDestinationStatistics().getDispatched().getCount(); 103 } 104 105 @Override 106 public long getInFlightCount() { 107 return destination.getDestinationStatistics().getInflight().getCount(); 108 } 109 110 @Override 111 public long getExpiredCount() { 112 return destination.getDestinationStatistics().getExpired().getCount(); 113 } 114 115 @Override 116 public long getConsumerCount() { 117 return destination.getDestinationStatistics().getConsumers().getCount(); 118 } 119 120 @Override 121 public long getQueueSize() { 122 return destination.getDestinationStatistics().getMessages().getCount(); 123 } 124 125 @Override 126 public long getStoreMessageSize() { 127 MessageStore messageStore = destination.getMessageStore(); 128 return messageStore != null ? messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0; 129 } 130 131 public long getMessagesCached() { 132 return destination.getDestinationStatistics().getMessagesCached().getCount(); 133 } 134 135 @Override 136 public int getMemoryPercentUsage() { 137 return destination.getMemoryUsage().getPercentUsage(); 138 } 139 140 @Override 141 public long getMemoryUsageByteCount() { 142 return destination.getMemoryUsage().getUsage(); 143 } 144 145 @Override 146 public long getMemoryLimit() { 147 return destination.getMemoryUsage().getLimit(); 148 } 149 150 @Override 151 public void setMemoryLimit(long limit) { 152 destination.getMemoryUsage().setLimit(limit); 153 } 154 155 @Override 156 public double getAverageEnqueueTime() { 157 return destination.getDestinationStatistics().getProcessTime().getAverageTime(); 158 } 159 160 @Override 161 public long getMaxEnqueueTime() { 162 return destination.getDestinationStatistics().getProcessTime().getMaxTime(); 163 } 164 165 @Override 166 public long getMinEnqueueTime() { 167 return destination.getDestinationStatistics().getProcessTime().getMinTime(); 168 } 169 170 /** 171 * @return the average size of a message (bytes) 172 */ 173 @Override 174 public long getAverageMessageSize() { 175 // we are okay with the size without decimals so cast to long 176 return (long) destination.getDestinationStatistics().getMessageSize().getAverageSize(); 177 } 178 179 /** 180 * @return the max size of a message (bytes) 181 */ 182 @Override 183 public long getMaxMessageSize() { 184 return destination.getDestinationStatistics().getMessageSize().getMaxSize(); 185 } 186 187 /** 188 * @return the min size of a message (bytes) 189 */ 190 @Override 191 public long getMinMessageSize() { 192 return destination.getDestinationStatistics().getMessageSize().getMinSize(); 193 } 194 195 196 @Override 197 public boolean isPrioritizedMessages() { 198 return destination.isPrioritizedMessages(); 199 } 200 201 @Override 202 public CompositeData[] browse() throws OpenDataException { 203 try { 204 return browse(null); 205 } catch (InvalidSelectorException e) { 206 // should not happen. 207 throw new RuntimeException(e); 208 } 209 } 210 211 @Override 212 public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException { 213 Message[] messages = destination.browse(); 214 ArrayList<CompositeData> c = new ArrayList<CompositeData>(); 215 216 MessageEvaluationContext ctx = new MessageEvaluationContext(); 217 ctx.setDestination(destination.getActiveMQDestination()); 218 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 219 220 for (int i = 0; i < messages.length; i++) { 221 try { 222 223 if (selectorExpression == null) { 224 c.add(OpenTypeSupport.convert(messages[i])); 225 } else { 226 ctx.setMessageReference(messages[i]); 227 if (selectorExpression.matches(ctx)) { 228 c.add(OpenTypeSupport.convert(messages[i])); 229 } 230 } 231 232 } catch (Throwable e) { 233 LOG.warn("exception browsing destination", e); 234 } 235 } 236 237 CompositeData rc[] = new CompositeData[c.size()]; 238 c.toArray(rc); 239 return rc; 240 } 241 242 /** 243 * Browses the current destination returning a list of messages 244 */ 245 @Override 246 public List<Object> browseMessages() throws InvalidSelectorException { 247 return browseMessages(null); 248 } 249 250 /** 251 * Browses the current destination with the given selector returning a list 252 * of messages 253 */ 254 @Override 255 public List<Object> browseMessages(String selector) throws InvalidSelectorException { 256 Message[] messages = destination.browse(); 257 ArrayList<Object> answer = new ArrayList<Object>(); 258 259 MessageEvaluationContext ctx = new MessageEvaluationContext(); 260 ctx.setDestination(destination.getActiveMQDestination()); 261 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 262 263 for (int i = 0; i < messages.length; i++) { 264 try { 265 Message message = messages[i]; 266 message.setReadOnlyBody(true); 267 if (selectorExpression == null) { 268 answer.add(message); 269 } else { 270 ctx.setMessageReference(message); 271 if (selectorExpression.matches(ctx)) { 272 answer.add(message); 273 } 274 } 275 276 } catch (Throwable e) { 277 LOG.warn("exception browsing destination", e); 278 } 279 } 280 return answer; 281 } 282 283 @Override 284 public TabularData browseAsTable() throws OpenDataException { 285 try { 286 return browseAsTable(null); 287 } catch (InvalidSelectorException e) { 288 throw new RuntimeException(e); 289 } 290 } 291 292 @Override 293 public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException { 294 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 295 Message[] messages = destination.browse(); 296 CompositeType ct = factory.getCompositeType(); 297 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" }); 298 TabularDataSupport rc = new TabularDataSupport(tt); 299 300 MessageEvaluationContext ctx = new MessageEvaluationContext(); 301 ctx.setDestination(destination.getActiveMQDestination()); 302 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 303 304 for (int i = 0; i < messages.length; i++) { 305 try { 306 if (selectorExpression == null) { 307 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 308 } else { 309 ctx.setMessageReference(messages[i]); 310 if (selectorExpression.matches(ctx)) { 311 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 312 } 313 } 314 } catch (Throwable e) { 315 LOG.warn("exception browsing destination", e); 316 } 317 } 318 319 return rc; 320 } 321 322 @Override 323 public String sendTextMessageWithProperties(String properties) throws Exception { 324 String[] kvs = properties.split(","); 325 Map<String, String> props = new HashMap<String, String>(); 326 for (String kv : kvs) { 327 String[] it = kv.split("="); 328 if (it.length == 2) { 329 props.put(it[0],it[1]); 330 } 331 } 332 return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password")); 333 } 334 335 @Override 336 public String sendTextMessage(String body) throws Exception { 337 return sendTextMessage(Collections.EMPTY_MAP, body); 338 } 339 340 @Override 341 public String sendTextMessage(Map headers, String body) throws Exception { 342 return sendTextMessage(headers, body, null, null); 343 } 344 345 @Override 346 public String sendTextMessage(String body, String user, @Sensitive String password) throws Exception { 347 return sendTextMessage(Collections.EMPTY_MAP, body, user, password); 348 } 349 350 @Override 351 public String sendTextMessage(Map<String, String> headers, String body, String userName, @Sensitive String password) throws Exception { 352 353 String brokerUrl = "vm://" + broker.getBrokerName(); 354 ActiveMQDestination dest = destination.getActiveMQDestination(); 355 356 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl); 357 Connection connection = null; 358 try { 359 connection = cf.createConnection(userName, password); 360 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 361 MessageProducer producer = session.createProducer(dest); 362 ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body); 363 364 for (Iterator<Entry<String, String>> iter = headers.entrySet().iterator(); iter.hasNext();) { 365 Entry<String, String> entry = iter.next(); 366 msg.setObjectProperty(entry.getKey(), entry.getValue()); 367 } 368 369 producer.setDeliveryMode(msg.getJMSDeliveryMode()); 370 producer.setPriority(msg.getPriority()); 371 long ttl = 0; 372 if (msg.getExpiration() != 0) { 373 ttl = msg.getExpiration() - System.currentTimeMillis(); 374 } else { 375 String timeToLive = headers.get("timeToLive"); 376 if (timeToLive != null) { 377 ttl = Integer.valueOf(timeToLive); 378 } 379 } 380 producer.setTimeToLive(ttl > 0 ? ttl : 0); 381 producer.send(msg); 382 383 return msg.getJMSMessageID(); 384 385 } finally { 386 if (connection != null) { 387 connection.close(); 388 } 389 } 390 } 391 392 @Override 393 public int getMaxAuditDepth() { 394 return destination.getMaxAuditDepth(); 395 } 396 397 @Override 398 public int getMaxProducersToAudit() { 399 return destination.getMaxProducersToAudit(); 400 } 401 402 public boolean isEnableAudit() { 403 return destination.isEnableAudit(); 404 } 405 406 public void setEnableAudit(boolean enableAudit) { 407 destination.setEnableAudit(enableAudit); 408 } 409 410 @Override 411 public void setMaxAuditDepth(int maxAuditDepth) { 412 destination.setMaxAuditDepth(maxAuditDepth); 413 } 414 415 @Override 416 public void setMaxProducersToAudit(int maxProducersToAudit) { 417 destination.setMaxProducersToAudit(maxProducersToAudit); 418 } 419 420 @Override 421 public float getMemoryUsagePortion() { 422 return destination.getMemoryUsage().getUsagePortion(); 423 } 424 425 @Override 426 public long getProducerCount() { 427 return destination.getDestinationStatistics().getProducers().getCount(); 428 } 429 430 @Override 431 public boolean isProducerFlowControl() { 432 return destination.isProducerFlowControl(); 433 } 434 435 @Override 436 public void setMemoryUsagePortion(float value) { 437 destination.getMemoryUsage().setUsagePortion(value); 438 } 439 440 @Override 441 public void setProducerFlowControl(boolean producerFlowControl) { 442 destination.setProducerFlowControl(producerFlowControl); 443 } 444 445 @Override 446 public boolean isAlwaysRetroactive() { 447 return destination.isAlwaysRetroactive(); 448 } 449 450 @Override 451 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 452 destination.setAlwaysRetroactive(alwaysRetroactive); 453 } 454 455 /** 456 * Set's the interval at which warnings about producers being blocked by 457 * resource usage will be triggered. Values of 0 or less will disable 458 * warnings 459 * 460 * @param blockedProducerWarningInterval the interval at which warning about 461 * blocked producers will be triggered. 462 */ 463 @Override 464 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 465 destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval); 466 } 467 468 /** 469 * 470 * @return the interval at which warning about blocked producers will be 471 * triggered. 472 */ 473 @Override 474 public long getBlockedProducerWarningInterval() { 475 return destination.getBlockedProducerWarningInterval(); 476 } 477 478 @Override 479 public int getMaxPageSize() { 480 return destination.getMaxPageSize(); 481 } 482 483 @Override 484 public void setMaxPageSize(int pageSize) { 485 destination.setMaxPageSize(pageSize); 486 } 487 488 @Override 489 public boolean isUseCache() { 490 return destination.isUseCache(); 491 } 492 493 @Override 494 public void setUseCache(boolean value) { 495 destination.setUseCache(value); 496 } 497 498 @Override 499 public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException { 500 List<Subscription> subscriptions = destination.getConsumers(); 501 ObjectName[] answer = new ObjectName[subscriptions.size()]; 502 ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName(); 503 int index = 0; 504 for (Subscription subscription : subscriptions) { 505 String connectionClientId = subscription.getContext().getClientId(); 506 answer[index++] = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo()); 507 } 508 return answer; 509 } 510 511 @Override 512 public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException { 513 ObjectName result = null; 514 SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy(); 515 if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) { 516 result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy); 517 } 518 return result; 519 } 520 521 @Override 522 public String getOptions() { 523 Map<String, String> options = destination.getActiveMQDestination().getOptions(); 524 String optionsString = ""; 525 try { 526 if (options != null) { 527 optionsString = URISupport.createQueryString(options); 528 } 529 } catch (URISyntaxException ignored) {} 530 return optionsString; 531 } 532 533 @Override 534 public boolean isDLQ() { 535 return destination.getActiveMQDestination().isDLQ(); 536 } 537 538 @Override 539 public long getBlockedSends() { 540 return destination.getDestinationStatistics().getBlockedSends().getCount(); 541 } 542 543 @Override 544 public double getAverageBlockedTime() { 545 return destination.getDestinationStatistics().getBlockedTime().getAverageTime(); 546 } 547 548 @Override 549 public long getTotalBlockedTime() { 550 return destination.getDestinationStatistics().getBlockedTime().getTotalTime(); 551 } 552 553}