001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.beans.Transient; 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.zip.DeflaterOutputStream; 029 030import javax.jms.JMSException; 031 032import org.apache.activemq.ActiveMQConnection; 033import org.apache.activemq.advisory.AdvisorySupport; 034import org.apache.activemq.broker.region.MessageReference; 035import org.apache.activemq.usage.MemoryUsage; 036import org.apache.activemq.util.ByteArrayInputStream; 037import org.apache.activemq.util.ByteArrayOutputStream; 038import org.apache.activemq.util.ByteSequence; 039import org.apache.activemq.util.MarshallingSupport; 040import org.apache.activemq.wireformat.WireFormat; 041import org.fusesource.hawtbuf.UTF8Buffer; 042 043/** 044 * Represents an ActiveMQ message 045 * 046 * @openwire:marshaller 047 * 048 */ 049public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { 050 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 051 052 /** 053 * The default minimum amount of memory a message is assumed to use 054 */ 055 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; 056 057 protected MessageId messageId; 058 protected ActiveMQDestination originalDestination; 059 protected TransactionId originalTransactionId; 060 061 protected ProducerId producerId; 062 protected ActiveMQDestination destination; 063 protected TransactionId transactionId; 064 065 protected long expiration; 066 protected long timestamp; 067 protected long arrival; 068 protected long brokerInTime; 069 protected long brokerOutTime; 070 protected String correlationId; 071 protected ActiveMQDestination replyTo; 072 protected boolean persistent; 073 protected String type; 074 protected byte priority; 075 protected String groupID; 076 protected int groupSequence; 077 protected ConsumerId targetConsumerId; 078 protected boolean compressed; 079 protected String userID; 080 081 protected ByteSequence content; 082 protected ByteSequence marshalledProperties; 083 protected DataStructure dataStructure; 084 protected int redeliveryCounter; 085 086 protected int size; 087 protected Map<String, Object> properties; 088 protected boolean readOnlyProperties; 089 protected boolean readOnlyBody; 090 protected transient boolean recievedByDFBridge; 091 protected boolean droppable; 092 protected boolean jmsXGroupFirstForConsumer; 093 094 private transient short referenceCount; 095 private transient ActiveMQConnection connection; 096 transient MessageDestination regionDestination; 097 transient MemoryUsage memoryUsage; 098 transient AtomicBoolean processAsExpired = new AtomicBoolean(false); 099 100 private BrokerId[] brokerPath; 101 private BrokerId[] cluster; 102 103 public static interface MessageDestination { 104 int getMinimumMessageSize(); 105 MemoryUsage getMemoryUsage(); 106 } 107 108 public abstract Message copy(); 109 public abstract void clearBody() throws JMSException; 110 public abstract void storeContent(); 111 public abstract void storeContentAndClear(); 112 113 // useful to reduce the memory footprint of a persisted message 114 public void clearMarshalledState() throws JMSException { 115 properties = null; 116 } 117 118 protected void copy(Message copy) { 119 super.copy(copy); 120 copy.producerId = producerId; 121 copy.transactionId = transactionId; 122 copy.destination = destination; 123 copy.messageId = messageId != null ? messageId.copy() : null; 124 copy.originalDestination = originalDestination; 125 copy.originalTransactionId = originalTransactionId; 126 copy.expiration = expiration; 127 copy.timestamp = timestamp; 128 copy.correlationId = correlationId; 129 copy.replyTo = replyTo; 130 copy.persistent = persistent; 131 copy.redeliveryCounter = redeliveryCounter; 132 copy.type = type; 133 copy.priority = priority; 134 copy.size = size; 135 copy.groupID = groupID; 136 copy.userID = userID; 137 copy.groupSequence = groupSequence; 138 139 if (properties != null) { 140 copy.properties = new HashMap<String, Object>(properties); 141 142 // The new message hasn't expired, so remove this feild. 143 copy.properties.remove(ORIGINAL_EXPIRATION); 144 } else { 145 copy.properties = properties; 146 } 147 148 copy.content = copyByteSequence(content); 149 copy.marshalledProperties = copyByteSequence(marshalledProperties); 150 copy.dataStructure = dataStructure; 151 copy.readOnlyProperties = readOnlyProperties; 152 copy.readOnlyBody = readOnlyBody; 153 copy.compressed = compressed; 154 copy.recievedByDFBridge = recievedByDFBridge; 155 156 copy.arrival = arrival; 157 copy.connection = connection; 158 copy.regionDestination = regionDestination; 159 copy.brokerInTime = brokerInTime; 160 copy.brokerOutTime = brokerOutTime; 161 copy.memoryUsage=this.memoryUsage; 162 copy.brokerPath = brokerPath; 163 copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer; 164 165 // lets not copy the following fields 166 // copy.targetConsumerId = targetConsumerId; 167 // copy.referenceCount = referenceCount; 168 } 169 170 private ByteSequence copyByteSequence(ByteSequence content) { 171 if (content != null) { 172 return new ByteSequence(content.getData(), content.getOffset(), content.getLength()); 173 } 174 return null; 175 } 176 177 public Object getProperty(String name) throws IOException { 178 if (properties == null) { 179 if (marshalledProperties == null) { 180 return null; 181 } 182 properties = unmarsallProperties(marshalledProperties); 183 } 184 Object result = properties.get(name); 185 if (result instanceof UTF8Buffer) { 186 result = result.toString(); 187 } 188 189 return result; 190 } 191 192 @SuppressWarnings("unchecked") 193 public Map<String, Object> getProperties() throws IOException { 194 if (properties == null) { 195 if (marshalledProperties == null) { 196 return Collections.EMPTY_MAP; 197 } 198 properties = unmarsallProperties(marshalledProperties); 199 } 200 return Collections.unmodifiableMap(properties); 201 } 202 203 public void clearProperties() { 204 marshalledProperties = null; 205 properties = null; 206 } 207 208 public void setProperty(String name, Object value) throws IOException { 209 lazyCreateProperties(); 210 properties.put(name, value); 211 } 212 213 public void removeProperty(String name) throws IOException { 214 lazyCreateProperties(); 215 properties.remove(name); 216 } 217 218 protected void lazyCreateProperties() throws IOException { 219 if (properties == null) { 220 if (marshalledProperties == null) { 221 properties = new HashMap<String, Object>(); 222 } else { 223 properties = unmarsallProperties(marshalledProperties); 224 marshalledProperties = null; 225 } 226 } else { 227 marshalledProperties = null; 228 } 229 } 230 231 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 232 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties))); 233 } 234 235 @Override 236 public void beforeMarshall(WireFormat wireFormat) throws IOException { 237 // Need to marshal the properties. 238 if (marshalledProperties == null && properties != null) { 239 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 240 DataOutputStream os = new DataOutputStream(baos); 241 MarshallingSupport.marshalPrimitiveMap(properties, os); 242 os.close(); 243 marshalledProperties = baos.toByteSequence(); 244 } 245 } 246 247 @Override 248 public void afterMarshall(WireFormat wireFormat) throws IOException { 249 } 250 251 @Override 252 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 253 } 254 255 @Override 256 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 257 } 258 259 // ///////////////////////////////////////////////////////////////// 260 // 261 // Simple Field accessors 262 // 263 // ///////////////////////////////////////////////////////////////// 264 265 /** 266 * @openwire:property version=1 cache=true 267 */ 268 public ProducerId getProducerId() { 269 return producerId; 270 } 271 272 public void setProducerId(ProducerId producerId) { 273 this.producerId = producerId; 274 } 275 276 /** 277 * @openwire:property version=1 cache=true 278 */ 279 public ActiveMQDestination getDestination() { 280 return destination; 281 } 282 283 public void setDestination(ActiveMQDestination destination) { 284 this.destination = destination; 285 } 286 287 /** 288 * @openwire:property version=1 cache=true 289 */ 290 public TransactionId getTransactionId() { 291 return transactionId; 292 } 293 294 public void setTransactionId(TransactionId transactionId) { 295 this.transactionId = transactionId; 296 } 297 298 public boolean isInTransaction() { 299 return transactionId != null; 300 } 301 302 /** 303 * @openwire:property version=1 cache=true 304 */ 305 public ActiveMQDestination getOriginalDestination() { 306 return originalDestination; 307 } 308 309 public void setOriginalDestination(ActiveMQDestination destination) { 310 this.originalDestination = destination; 311 } 312 313 /** 314 * @openwire:property version=1 315 */ 316 @Override 317 public MessageId getMessageId() { 318 return messageId; 319 } 320 321 public void setMessageId(MessageId messageId) { 322 this.messageId = messageId; 323 } 324 325 /** 326 * @openwire:property version=1 cache=true 327 */ 328 public TransactionId getOriginalTransactionId() { 329 return originalTransactionId; 330 } 331 332 public void setOriginalTransactionId(TransactionId transactionId) { 333 this.originalTransactionId = transactionId; 334 } 335 336 /** 337 * @openwire:property version=1 338 */ 339 @Override 340 public String getGroupID() { 341 return groupID; 342 } 343 344 public void setGroupID(String groupID) { 345 this.groupID = groupID; 346 } 347 348 /** 349 * @openwire:property version=1 350 */ 351 @Override 352 public int getGroupSequence() { 353 return groupSequence; 354 } 355 356 public void setGroupSequence(int groupSequence) { 357 this.groupSequence = groupSequence; 358 } 359 360 /** 361 * @openwire:property version=1 362 */ 363 public String getCorrelationId() { 364 return correlationId; 365 } 366 367 public void setCorrelationId(String correlationId) { 368 this.correlationId = correlationId; 369 } 370 371 /** 372 * @openwire:property version=1 373 */ 374 @Override 375 public boolean isPersistent() { 376 return persistent; 377 } 378 379 public void setPersistent(boolean deliveryMode) { 380 this.persistent = deliveryMode; 381 } 382 383 /** 384 * @openwire:property version=1 385 */ 386 @Override 387 public long getExpiration() { 388 return expiration; 389 } 390 391 public void setExpiration(long expiration) { 392 this.expiration = expiration; 393 } 394 395 /** 396 * @openwire:property version=1 397 */ 398 public byte getPriority() { 399 return priority; 400 } 401 402 public void setPriority(byte priority) { 403 if (priority < 0) { 404 this.priority = 0; 405 } else if (priority > 9) { 406 this.priority = 9; 407 } else { 408 this.priority = priority; 409 } 410 } 411 412 /** 413 * @openwire:property version=1 414 */ 415 public ActiveMQDestination getReplyTo() { 416 return replyTo; 417 } 418 419 public void setReplyTo(ActiveMQDestination replyTo) { 420 this.replyTo = replyTo; 421 } 422 423 /** 424 * @openwire:property version=1 425 */ 426 public long getTimestamp() { 427 return timestamp; 428 } 429 430 public void setTimestamp(long timestamp) { 431 this.timestamp = timestamp; 432 } 433 434 /** 435 * @openwire:property version=1 436 */ 437 public String getType() { 438 return type; 439 } 440 441 public void setType(String type) { 442 this.type = type; 443 } 444 445 /** 446 * @openwire:property version=1 447 */ 448 public ByteSequence getContent() { 449 return content; 450 } 451 452 public void setContent(ByteSequence content) { 453 this.content = content; 454 } 455 456 /** 457 * @openwire:property version=1 458 */ 459 public ByteSequence getMarshalledProperties() { 460 return marshalledProperties; 461 } 462 463 public void setMarshalledProperties(ByteSequence marshalledProperties) { 464 this.marshalledProperties = marshalledProperties; 465 } 466 467 /** 468 * @openwire:property version=1 469 */ 470 public DataStructure getDataStructure() { 471 return dataStructure; 472 } 473 474 public void setDataStructure(DataStructure data) { 475 this.dataStructure = data; 476 } 477 478 /** 479 * Can be used to route the message to a specific consumer. Should be null 480 * to allow the broker use normal JMS routing semantics. If the target 481 * consumer id is an active consumer on the broker, the message is dropped. 482 * Used by the AdvisoryBroker to replay advisory messages to a specific 483 * consumer. 484 * 485 * @openwire:property version=1 cache=true 486 */ 487 @Override 488 public ConsumerId getTargetConsumerId() { 489 return targetConsumerId; 490 } 491 492 public void setTargetConsumerId(ConsumerId targetConsumerId) { 493 this.targetConsumerId = targetConsumerId; 494 } 495 496 @Override 497 public boolean isExpired() { 498 long expireTime = getExpiration(); 499 return expireTime > 0 && System.currentTimeMillis() > expireTime; 500 } 501 502 @Override 503 public boolean isAdvisory() { 504 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 505 } 506 507 /** 508 * @openwire:property version=1 509 */ 510 public boolean isCompressed() { 511 return compressed; 512 } 513 514 public void setCompressed(boolean compressed) { 515 this.compressed = compressed; 516 } 517 518 public boolean isRedelivered() { 519 return redeliveryCounter > 0; 520 } 521 522 public void setRedelivered(boolean redelivered) { 523 if (redelivered) { 524 if (!isRedelivered()) { 525 setRedeliveryCounter(1); 526 } 527 } else { 528 if (isRedelivered()) { 529 setRedeliveryCounter(0); 530 } 531 } 532 } 533 534 @Override 535 public void incrementRedeliveryCounter() { 536 redeliveryCounter++; 537 } 538 539 /** 540 * @openwire:property version=1 541 */ 542 @Override 543 public int getRedeliveryCounter() { 544 return redeliveryCounter; 545 } 546 547 public void setRedeliveryCounter(int deliveryCounter) { 548 this.redeliveryCounter = deliveryCounter; 549 } 550 551 /** 552 * The route of brokers the command has moved through. 553 * 554 * @openwire:property version=1 cache=true 555 */ 556 public BrokerId[] getBrokerPath() { 557 return brokerPath; 558 } 559 560 public void setBrokerPath(BrokerId[] brokerPath) { 561 this.brokerPath = brokerPath; 562 } 563 564 public boolean isReadOnlyProperties() { 565 return readOnlyProperties; 566 } 567 568 public void setReadOnlyProperties(boolean readOnlyProperties) { 569 this.readOnlyProperties = readOnlyProperties; 570 } 571 572 public boolean isReadOnlyBody() { 573 return readOnlyBody; 574 } 575 576 public void setReadOnlyBody(boolean readOnlyBody) { 577 this.readOnlyBody = readOnlyBody; 578 } 579 580 public ActiveMQConnection getConnection() { 581 return this.connection; 582 } 583 584 public void setConnection(ActiveMQConnection connection) { 585 this.connection = connection; 586 } 587 588 /** 589 * Used to schedule the arrival time of a message to a broker. The broker 590 * will not dispatch a message to a consumer until it's arrival time has 591 * elapsed. 592 * 593 * @openwire:property version=1 594 */ 595 public long getArrival() { 596 return arrival; 597 } 598 599 public void setArrival(long arrival) { 600 this.arrival = arrival; 601 } 602 603 /** 604 * Only set by the broker and defines the userID of the producer connection 605 * who sent this message. This is an optional field, it needs to be enabled 606 * on the broker to have this field populated. 607 * 608 * @openwire:property version=1 609 */ 610 public String getUserID() { 611 return userID; 612 } 613 614 public void setUserID(String jmsxUserID) { 615 this.userID = jmsxUserID; 616 } 617 618 @Override 619 public int getReferenceCount() { 620 return referenceCount; 621 } 622 623 @Override 624 public Message getMessageHardRef() { 625 return this; 626 } 627 628 @Override 629 public Message getMessage() { 630 return this; 631 } 632 633 public void setRegionDestination(MessageDestination destination) { 634 this.regionDestination = destination; 635 if(this.memoryUsage==null) { 636 this.memoryUsage=destination.getMemoryUsage(); 637 } 638 } 639 640 @Override 641 @Transient 642 public MessageDestination getRegionDestination() { 643 return regionDestination; 644 } 645 646 public MemoryUsage getMemoryUsage() { 647 return this.memoryUsage; 648 } 649 650 public void setMemoryUsage(MemoryUsage usage) { 651 this.memoryUsage=usage; 652 } 653 654 @Override 655 public boolean isMarshallAware() { 656 return true; 657 } 658 659 @Override 660 public int incrementReferenceCount() { 661 int rc; 662 int size; 663 synchronized (this) { 664 rc = ++referenceCount; 665 size = getSize(); 666 } 667 668 if (rc == 1 && getMemoryUsage() != null) { 669 getMemoryUsage().increaseUsage(size); 670 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 671 672 } 673 674 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 675 return rc; 676 } 677 678 @Override 679 public int decrementReferenceCount() { 680 int rc; 681 int size; 682 synchronized (this) { 683 rc = --referenceCount; 684 size = getSize(); 685 } 686 687 if (rc == 0 && getMemoryUsage() != null) { 688 getMemoryUsage().decreaseUsage(size); 689 //Thread.dumpStack(); 690 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 691 } 692 693 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 694 695 return rc; 696 } 697 698 @Override 699 public int getSize() { 700 int minimumMessageSize = getMinimumMessageSize(); 701 if (size < minimumMessageSize || size == 0) { 702 size = minimumMessageSize; 703 if (marshalledProperties != null) { 704 size += marshalledProperties.getLength(); 705 } 706 if (content != null) { 707 size += content.getLength(); 708 } 709 } 710 return size; 711 } 712 713 protected int getMinimumMessageSize() { 714 int result = DEFAULT_MINIMUM_MESSAGE_SIZE; 715 //let destination override 716 MessageDestination dest = regionDestination; 717 if (dest != null) { 718 result=dest.getMinimumMessageSize(); 719 } 720 return result; 721 } 722 723 /** 724 * @openwire:property version=1 725 * @return Returns the recievedByDFBridge. 726 */ 727 public boolean isRecievedByDFBridge() { 728 return recievedByDFBridge; 729 } 730 731 /** 732 * @param recievedByDFBridge The recievedByDFBridge to set. 733 */ 734 public void setRecievedByDFBridge(boolean recievedByDFBridge) { 735 this.recievedByDFBridge = recievedByDFBridge; 736 } 737 738 public void onMessageRolledBack() { 739 incrementRedeliveryCounter(); 740 } 741 742 /** 743 * @openwire:property version=2 cache=true 744 */ 745 public boolean isDroppable() { 746 return droppable; 747 } 748 749 public void setDroppable(boolean droppable) { 750 this.droppable = droppable; 751 } 752 753 /** 754 * If a message is stored in multiple nodes on a cluster, all the cluster 755 * members will be listed here. Otherwise, it will be null. 756 * 757 * @openwire:property version=3 cache=true 758 */ 759 public BrokerId[] getCluster() { 760 return cluster; 761 } 762 763 public void setCluster(BrokerId[] cluster) { 764 this.cluster = cluster; 765 } 766 767 @Override 768 public boolean isMessage() { 769 return true; 770 } 771 772 /** 773 * @openwire:property version=3 774 */ 775 public long getBrokerInTime() { 776 return this.brokerInTime; 777 } 778 779 public void setBrokerInTime(long brokerInTime) { 780 this.brokerInTime = brokerInTime; 781 } 782 783 /** 784 * @openwire:property version=3 785 */ 786 public long getBrokerOutTime() { 787 return this.brokerOutTime; 788 } 789 790 public void setBrokerOutTime(long brokerOutTime) { 791 this.brokerOutTime = brokerOutTime; 792 } 793 794 @Override 795 public boolean isDropped() { 796 return false; 797 } 798 799 /** 800 * @openwire:property version=10 801 */ 802 public boolean isJMSXGroupFirstForConsumer() { 803 return jmsXGroupFirstForConsumer; 804 } 805 806 public void setJMSXGroupFirstForConsumer(boolean val) { 807 jmsXGroupFirstForConsumer = val; 808 } 809 810 public void compress() throws IOException { 811 if (!isCompressed()) { 812 storeContent(); 813 if (!isCompressed() && getContent() != null) { 814 doCompress(); 815 } 816 } 817 } 818 819 protected void doCompress() throws IOException { 820 compressed = true; 821 ByteSequence bytes = getContent(); 822 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 823 OutputStream os = new DeflaterOutputStream(bytesOut); 824 os.write(bytes.data, bytes.offset, bytes.length); 825 os.close(); 826 setContent(bytesOut.toByteSequence()); 827 } 828 829 @Override 830 public String toString() { 831 return toString(null); 832 } 833 834 @Override 835 public String toString(Map<String, Object>overrideFields) { 836 try { 837 getProperties(); 838 } catch (IOException e) { 839 } 840 return super.toString(overrideFields); 841 } 842 843 @Override 844 public boolean canProcessAsExpired() { 845 return processAsExpired.compareAndSet(false, true); 846 } 847}