001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.activemq.broker.ConnectionContext; 044import org.apache.activemq.broker.region.BaseDestination; 045import org.apache.activemq.broker.scheduler.JobSchedulerStore; 046import org.apache.activemq.command.ActiveMQDestination; 047import org.apache.activemq.command.ActiveMQQueue; 048import org.apache.activemq.command.ActiveMQTempQueue; 049import org.apache.activemq.command.ActiveMQTempTopic; 050import org.apache.activemq.command.ActiveMQTopic; 051import org.apache.activemq.command.Message; 052import org.apache.activemq.command.MessageAck; 053import org.apache.activemq.command.MessageId; 054import org.apache.activemq.command.ProducerId; 055import org.apache.activemq.command.SubscriptionInfo; 056import org.apache.activemq.command.TransactionId; 057import org.apache.activemq.openwire.OpenWireFormat; 058import org.apache.activemq.protobuf.Buffer; 059import org.apache.activemq.store.AbstractMessageStore; 060import org.apache.activemq.store.IndexListener; 061import org.apache.activemq.store.ListenableFuture; 062import org.apache.activemq.store.MessageRecoveryListener; 063import org.apache.activemq.store.MessageStore; 064import org.apache.activemq.store.MessageStoreStatistics; 065import org.apache.activemq.store.MessageStoreSubscriptionStatistics; 066import org.apache.activemq.store.NoLocalSubscriptionAware; 067import org.apache.activemq.store.PersistenceAdapter; 068import org.apache.activemq.store.TopicMessageStore; 069import org.apache.activemq.store.TransactionIdTransformer; 070import org.apache.activemq.store.TransactionStore; 071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 072import org.apache.activemq.store.kahadb.data.KahaDestination; 073import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 074import org.apache.activemq.store.kahadb.data.KahaLocation; 075import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 076import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 077import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 078import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 079import org.apache.activemq.store.kahadb.disk.journal.Location; 080import org.apache.activemq.store.kahadb.disk.page.Transaction; 081import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 082import org.apache.activemq.usage.MemoryUsage; 083import org.apache.activemq.usage.SystemUsage; 084import org.apache.activemq.util.ServiceStopper; 085import org.apache.activemq.util.ThreadPoolUtils; 086import org.apache.activemq.wireformat.WireFormat; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, NoLocalSubscriptionAware { 091 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 092 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 093 094 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 095 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 096 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 097 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 098 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 099 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 100 101 protected ExecutorService queueExecutor; 102 protected ExecutorService topicExecutor; 103 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 104 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 105 final WireFormat wireFormat = new OpenWireFormat(); 106 private SystemUsage usageManager; 107 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 108 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 109 Semaphore globalQueueSemaphore; 110 Semaphore globalTopicSemaphore; 111 private boolean concurrentStoreAndDispatchQueues = true; 112 // when true, message order may be compromised when cache is exhausted if store is out 113 // or order w.r.t cache 114 private boolean concurrentStoreAndDispatchTopics = false; 115 private final boolean concurrentStoreAndDispatchTransactions = false; 116 private int maxAsyncJobs = MAX_ASYNC_JOBS; 117 private final KahaDBTransactionStore transactionStore; 118 private TransactionIdTransformer transactionIdTransformer; 119 120 public KahaDBStore() { 121 this.transactionStore = new KahaDBTransactionStore(this); 122 this.transactionIdTransformer = new TransactionIdTransformer() { 123 @Override 124 public TransactionId transform(TransactionId txid) { 125 return txid; 126 } 127 }; 128 } 129 130 @Override 131 public String toString() { 132 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 133 } 134 135 @Override 136 public void setBrokerName(String brokerName) { 137 } 138 139 @Override 140 public void setUsageManager(SystemUsage usageManager) { 141 this.usageManager = usageManager; 142 } 143 144 public SystemUsage getUsageManager() { 145 return this.usageManager; 146 } 147 148 /** 149 * @return the concurrentStoreAndDispatch 150 */ 151 public boolean isConcurrentStoreAndDispatchQueues() { 152 return this.concurrentStoreAndDispatchQueues; 153 } 154 155 /** 156 * @param concurrentStoreAndDispatch 157 * the concurrentStoreAndDispatch to set 158 */ 159 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 160 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 161 } 162 163 /** 164 * @return the concurrentStoreAndDispatch 165 */ 166 public boolean isConcurrentStoreAndDispatchTopics() { 167 return this.concurrentStoreAndDispatchTopics; 168 } 169 170 /** 171 * @param concurrentStoreAndDispatch 172 * the concurrentStoreAndDispatch to set 173 */ 174 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 175 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 176 } 177 178 public boolean isConcurrentStoreAndDispatchTransactions() { 179 return this.concurrentStoreAndDispatchTransactions; 180 } 181 182 /** 183 * @return the maxAsyncJobs 184 */ 185 public int getMaxAsyncJobs() { 186 return this.maxAsyncJobs; 187 } 188 189 /** 190 * @param maxAsyncJobs 191 * the maxAsyncJobs to set 192 */ 193 public void setMaxAsyncJobs(int maxAsyncJobs) { 194 this.maxAsyncJobs = maxAsyncJobs; 195 } 196 197 198 @Override 199 protected void configureMetadata() { 200 if (brokerService != null) { 201 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 202 wireFormat.setVersion(metadata.openwireVersion); 203 204 if (LOG.isDebugEnabled()) { 205 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 206 } 207 208 } 209 } 210 211 @Override 212 public void doStart() throws Exception { 213 //configure the metadata before start, right now 214 //this is just the open wire version 215 configureMetadata(); 216 217 super.doStart(); 218 219 if (brokerService != null) { 220 // In case the recovered store used a different OpenWire version log a warning 221 // to assist in determining why journal reads fail. 222 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 223 LOG.warn("Existing Store uses a different OpenWire version[{}] " + 224 "than the version configured[{}] reverting to the version " + 225 "used by this store, some newer broker features may not work" + 226 "as expected.", 227 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 228 229 // Update the broker service instance to the actual version in use. 230 wireFormat.setVersion(metadata.openwireVersion); 231 brokerService.setStoreOpenWireVersion(metadata.openwireVersion); 232 } 233 } 234 235 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 236 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 237 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 238 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 239 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 240 asyncQueueJobQueue, new ThreadFactory() { 241 @Override 242 public Thread newThread(Runnable runnable) { 243 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 244 thread.setDaemon(true); 245 return thread; 246 } 247 }); 248 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 249 asyncTopicJobQueue, new ThreadFactory() { 250 @Override 251 public Thread newThread(Runnable runnable) { 252 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 253 thread.setDaemon(true); 254 return thread; 255 } 256 }); 257 } 258 259 @Override 260 public void doStop(ServiceStopper stopper) throws Exception { 261 // drain down async jobs 262 LOG.info("Stopping async queue tasks"); 263 if (this.globalQueueSemaphore != null) { 264 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 265 } 266 synchronized (this.asyncQueueMaps) { 267 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 268 synchronized (m) { 269 for (StoreTask task : m.values()) { 270 task.cancel(); 271 } 272 } 273 } 274 this.asyncQueueMaps.clear(); 275 } 276 LOG.info("Stopping async topic tasks"); 277 if (this.globalTopicSemaphore != null) { 278 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 279 } 280 synchronized (this.asyncTopicMaps) { 281 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 282 synchronized (m) { 283 for (StoreTask task : m.values()) { 284 task.cancel(); 285 } 286 } 287 } 288 this.asyncTopicMaps.clear(); 289 } 290 if (this.globalQueueSemaphore != null) { 291 this.globalQueueSemaphore.drainPermits(); 292 } 293 if (this.globalTopicSemaphore != null) { 294 this.globalTopicSemaphore.drainPermits(); 295 } 296 if (this.queueExecutor != null) { 297 ThreadPoolUtils.shutdownNow(queueExecutor); 298 queueExecutor = null; 299 } 300 if (this.topicExecutor != null) { 301 ThreadPoolUtils.shutdownNow(topicExecutor); 302 topicExecutor = null; 303 } 304 LOG.info("Stopped KahaDB"); 305 super.doStop(stopper); 306 } 307 308 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 309 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 310 @Override 311 public Location execute(Transaction tx) throws IOException { 312 StoredDestination sd = getStoredDestination(destination, tx); 313 Long sequence = sd.messageIdIndex.get(tx, key); 314 if (sequence == null) { 315 return null; 316 } 317 return sd.orderIndex.get(tx, sequence).location; 318 } 319 }); 320 } 321 322 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 323 StoreQueueTask task = null; 324 synchronized (store.asyncTaskMap) { 325 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 326 } 327 return task; 328 } 329 330 // with asyncTaskMap locked 331 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 332 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 333 this.queueExecutor.execute(task); 334 } 335 336 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 337 StoreTopicTask task = null; 338 synchronized (store.asyncTaskMap) { 339 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 340 } 341 return task; 342 } 343 344 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 345 synchronized (store.asyncTaskMap) { 346 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 347 } 348 this.topicExecutor.execute(task); 349 } 350 351 @Override 352 public TransactionStore createTransactionStore() throws IOException { 353 return this.transactionStore; 354 } 355 356 public boolean getForceRecoverIndex() { 357 return this.forceRecoverIndex; 358 } 359 360 public void setForceRecoverIndex(boolean forceRecoverIndex) { 361 this.forceRecoverIndex = forceRecoverIndex; 362 } 363 364 public class KahaDBMessageStore extends AbstractMessageStore { 365 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 366 protected KahaDestination dest; 367 private final int maxAsyncJobs; 368 private final Semaphore localDestinationSemaphore; 369 370 double doneTasks, canceledTasks = 0; 371 372 public KahaDBMessageStore(ActiveMQDestination destination) { 373 super(destination); 374 this.dest = convert(destination); 375 this.maxAsyncJobs = getMaxAsyncJobs(); 376 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 377 } 378 379 @Override 380 public ActiveMQDestination getDestination() { 381 return destination; 382 } 383 384 @Override 385 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 386 throws IOException { 387 if (isConcurrentStoreAndDispatchQueues()) { 388 message.beforeMarshall(wireFormat); 389 StoreQueueTask result = new StoreQueueTask(this, context, message); 390 ListenableFuture<Object> future = result.getFuture(); 391 message.getMessageId().setFutureOrSequenceLong(future); 392 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 393 result.aquireLocks(); 394 synchronized (asyncTaskMap) { 395 addQueueTask(this, result); 396 if (indexListener != null) { 397 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 398 } 399 } 400 return future; 401 } else { 402 return super.asyncAddQueueMessage(context, message); 403 } 404 } 405 406 @Override 407 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 408 if (isConcurrentStoreAndDispatchQueues()) { 409 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 410 StoreQueueTask task = null; 411 synchronized (asyncTaskMap) { 412 task = (StoreQueueTask) asyncTaskMap.get(key); 413 } 414 if (task != null) { 415 if (ack.isInTransaction() || !task.cancel()) { 416 try { 417 task.future.get(); 418 } catch (InterruptedException e) { 419 throw new InterruptedIOException(e.toString()); 420 } catch (Exception ignored) { 421 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 422 } 423 removeMessage(context, ack); 424 } else { 425 indexLock.writeLock().lock(); 426 try { 427 metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); 428 } finally { 429 indexLock.writeLock().unlock(); 430 } 431 synchronized (asyncTaskMap) { 432 asyncTaskMap.remove(key); 433 } 434 } 435 } else { 436 removeMessage(context, ack); 437 } 438 } else { 439 removeMessage(context, ack); 440 } 441 } 442 443 @Override 444 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 445 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 446 command.setDestination(dest); 447 command.setMessageId(message.getMessageId().toProducerKey()); 448 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 449 command.setPriority(message.getPriority()); 450 command.setPrioritySupported(isPrioritizedMessages()); 451 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 452 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 453 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 454 // sync add? (for async, future present from getFutureOrSequenceLong) 455 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 456 457 @Override 458 public void sequenceAssignedWithIndexLocked(final long sequence) { 459 message.getMessageId().setFutureOrSequenceLong(sequence); 460 if (indexListener != null) { 461 if (possibleFuture == null) { 462 trackPendingAdd(dest, sequence); 463 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 464 @Override 465 public void run() { 466 trackPendingAddComplete(dest, sequence); 467 } 468 })); 469 } 470 } 471 } 472 }, null); 473 } 474 475 @Override 476 public void updateMessage(Message message) throws IOException { 477 if (LOG.isTraceEnabled()) { 478 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 479 } 480 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 481 KahaAddMessageCommand command = new KahaAddMessageCommand(); 482 command.setDestination(dest); 483 command.setMessageId(message.getMessageId().toProducerKey()); 484 command.setPriority(message.getPriority()); 485 command.setPrioritySupported(prioritizedMessages); 486 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 487 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 488 updateMessageCommand.setMessage(command); 489 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 490 } 491 492 @Override 493 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 494 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 495 command.setDestination(dest); 496 command.setMessageId(ack.getLastMessageId().toProducerKey()); 497 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 498 499 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 500 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 501 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 502 } 503 504 @Override 505 public void removeAllMessages(ConnectionContext context) throws IOException { 506 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 507 command.setDestination(dest); 508 store(command, true, null, null); 509 } 510 511 @Override 512 public Message getMessage(MessageId identity) throws IOException { 513 final String key = identity.toProducerKey(); 514 515 // Hopefully one day the page file supports concurrent read 516 // operations... but for now we must 517 // externally synchronize... 518 Location location; 519 indexLock.writeLock().lock(); 520 try { 521 location = findMessageLocation(key, dest); 522 } finally { 523 indexLock.writeLock().unlock(); 524 } 525 if (location == null) { 526 return null; 527 } 528 529 return loadMessage(location); 530 } 531 532 @Override 533 public boolean isEmpty() throws IOException { 534 indexLock.writeLock().lock(); 535 try { 536 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 537 @Override 538 public Boolean execute(Transaction tx) throws IOException { 539 // Iterate through all index entries to get a count of 540 // messages in the destination. 541 StoredDestination sd = getStoredDestination(dest, tx); 542 return sd.locationIndex.isEmpty(tx); 543 } 544 }); 545 } finally { 546 indexLock.writeLock().unlock(); 547 } 548 } 549 550 @Override 551 public void recover(final MessageRecoveryListener listener) throws Exception { 552 // recovery may involve expiry which will modify 553 indexLock.writeLock().lock(); 554 try { 555 pageFile.tx().execute(new Transaction.Closure<Exception>() { 556 @Override 557 public void execute(Transaction tx) throws Exception { 558 StoredDestination sd = getStoredDestination(dest, tx); 559 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 560 sd.orderIndex.resetCursorPosition(); 561 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 562 .hasNext(); ) { 563 Entry<Long, MessageKeys> entry = iterator.next(); 564 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 565 continue; 566 } 567 Message msg = loadMessage(entry.getValue().location); 568 listener.recoverMessage(msg); 569 } 570 } 571 }); 572 } finally { 573 indexLock.writeLock().unlock(); 574 } 575 } 576 577 @Override 578 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 579 indexLock.writeLock().lock(); 580 try { 581 pageFile.tx().execute(new Transaction.Closure<Exception>() { 582 @Override 583 public void execute(Transaction tx) throws Exception { 584 StoredDestination sd = getStoredDestination(dest, tx); 585 Entry<Long, MessageKeys> entry = null; 586 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 587 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 588 entry = iterator.next(); 589 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 590 continue; 591 } 592 Message msg = loadMessage(entry.getValue().location); 593 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 594 listener.recoverMessage(msg); 595 counter++; 596 if (counter >= maxReturned) { 597 break; 598 } 599 } 600 sd.orderIndex.stoppedIterating(); 601 } 602 }); 603 } finally { 604 indexLock.writeLock().unlock(); 605 } 606 } 607 608 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 609 int counter = 0; 610 String id; 611 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 612 id = iterator.next(); 613 iterator.remove(); 614 Long sequence = sd.messageIdIndex.get(tx, id); 615 if (sequence != null) { 616 if (sd.orderIndex.alreadyDispatched(sequence)) { 617 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 618 counter++; 619 if (counter >= maxReturned) { 620 break; 621 } 622 } else { 623 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 624 } 625 } else { 626 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 627 } 628 } 629 return counter; 630 } 631 632 633 @Override 634 public void resetBatching() { 635 if (pageFile.isLoaded()) { 636 indexLock.writeLock().lock(); 637 try { 638 pageFile.tx().execute(new Transaction.Closure<Exception>() { 639 @Override 640 public void execute(Transaction tx) throws Exception { 641 StoredDestination sd = getExistingStoredDestination(dest, tx); 642 if (sd != null) { 643 sd.orderIndex.resetCursorPosition();} 644 } 645 }); 646 } catch (Exception e) { 647 LOG.error("Failed to reset batching",e); 648 } finally { 649 indexLock.writeLock().unlock(); 650 } 651 } 652 } 653 654 @Override 655 public void setBatch(final MessageId identity) throws IOException { 656 indexLock.writeLock().lock(); 657 try { 658 pageFile.tx().execute(new Transaction.Closure<IOException>() { 659 @Override 660 public void execute(Transaction tx) throws IOException { 661 StoredDestination sd = getStoredDestination(dest, tx); 662 Long location = (Long) identity.getFutureOrSequenceLong(); 663 Long pending = sd.orderIndex.minPendingAdd(); 664 if (pending != null) { 665 location = Math.min(location, pending-1); 666 } 667 sd.orderIndex.setBatch(tx, location); 668 } 669 }); 670 } finally { 671 indexLock.writeLock().unlock(); 672 } 673 } 674 675 @Override 676 public void setMemoryUsage(MemoryUsage memoryUsage) { 677 } 678 @Override 679 public void start() throws Exception { 680 super.start(); 681 } 682 @Override 683 public void stop() throws Exception { 684 super.stop(); 685 } 686 687 protected void lockAsyncJobQueue() { 688 try { 689 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 690 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 691 } 692 } catch (Exception e) { 693 LOG.error("Failed to lock async jobs for " + this.destination, e); 694 } 695 } 696 697 protected void unlockAsyncJobQueue() { 698 this.localDestinationSemaphore.release(this.maxAsyncJobs); 699 } 700 701 protected void acquireLocalAsyncLock() { 702 try { 703 this.localDestinationSemaphore.acquire(); 704 } catch (InterruptedException e) { 705 LOG.error("Failed to aquire async lock for " + this.destination, e); 706 } 707 } 708 709 protected void releaseLocalAsyncLock() { 710 this.localDestinationSemaphore.release(); 711 } 712 713 @Override 714 public String toString(){ 715 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 716 } 717 718 @Override 719 protected void recoverMessageStoreStatistics() throws IOException { 720 try { 721 MessageStoreStatistics recoveredStatistics; 722 lockAsyncJobQueue(); 723 indexLock.writeLock().lock(); 724 try { 725 recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() { 726 @Override 727 public MessageStoreStatistics execute(Transaction tx) throws IOException { 728 MessageStoreStatistics statistics = new MessageStoreStatistics(); 729 730 // Iterate through all index entries to get the size of each message 731 StoredDestination sd = getStoredDestination(dest, tx); 732 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 733 int locationSize = iterator.next().getKey().getSize(); 734 statistics.getMessageCount().increment(); 735 statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); 736 } 737 return statistics; 738 } 739 }); 740 getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); 741 getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); 742 } finally { 743 indexLock.writeLock().unlock(); 744 } 745 } finally { 746 unlockAsyncJobQueue(); 747 } 748 } 749 } 750 751 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 752 private final AtomicInteger subscriptionCount = new AtomicInteger(); 753 protected final MessageStoreSubscriptionStatistics messageStoreSubStats = 754 new MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics()); 755 756 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 757 super(destination); 758 this.subscriptionCount.set(getAllSubscriptions().length); 759 if (isConcurrentStoreAndDispatchTopics()) { 760 asyncTopicMaps.add(asyncTaskMap); 761 } 762 } 763 764 @Override 765 protected void recoverMessageStoreStatistics() throws IOException { 766 super.recoverMessageStoreStatistics(); 767 this.recoverMessageStoreSubMetrics(); 768 } 769 770 @Override 771 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 772 throws IOException { 773 if (isConcurrentStoreAndDispatchTopics()) { 774 message.beforeMarshall(wireFormat); 775 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 776 result.aquireLocks(); 777 addTopicTask(this, result); 778 return result.getFuture(); 779 } else { 780 return super.asyncAddTopicMessage(context, message); 781 } 782 } 783 784 @Override 785 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 786 MessageId messageId, MessageAck ack) throws IOException { 787 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 788 if (isConcurrentStoreAndDispatchTopics()) { 789 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 790 StoreTopicTask task = null; 791 synchronized (asyncTaskMap) { 792 task = (StoreTopicTask) asyncTaskMap.get(key); 793 } 794 if (task != null) { 795 if (task.addSubscriptionKey(subscriptionKey)) { 796 removeTopicTask(this, messageId); 797 if (task.cancel()) { 798 synchronized (asyncTaskMap) { 799 asyncTaskMap.remove(key); 800 } 801 } 802 } 803 } else { 804 doAcknowledge(context, subscriptionKey, messageId, ack); 805 } 806 } else { 807 doAcknowledge(context, subscriptionKey, messageId, ack); 808 } 809 } 810 811 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 812 throws IOException { 813 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 814 command.setDestination(dest); 815 command.setSubscriptionKey(subscriptionKey); 816 command.setMessageId(messageId.toProducerKey()); 817 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 818 if (ack != null && ack.isUnmatchedAck()) { 819 command.setAck(UNMATCHED); 820 } else { 821 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 822 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 823 } 824 store(command, false, null, null); 825 } 826 827 @Override 828 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 829 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 830 .getSubscriptionName()); 831 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 832 command.setDestination(dest); 833 command.setSubscriptionKey(subscriptionKey.toString()); 834 command.setRetroactive(retroactive); 835 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 836 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 837 store(command, isEnableJournalDiskSyncs() && true, null, null); 838 this.subscriptionCount.incrementAndGet(); 839 } 840 841 @Override 842 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 843 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 844 command.setDestination(dest); 845 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 846 store(command, isEnableJournalDiskSyncs() && true, null, null); 847 this.subscriptionCount.decrementAndGet(); 848 } 849 850 @Override 851 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 852 853 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 854 indexLock.writeLock().lock(); 855 try { 856 pageFile.tx().execute(new Transaction.Closure<IOException>() { 857 @Override 858 public void execute(Transaction tx) throws IOException { 859 StoredDestination sd = getStoredDestination(dest, tx); 860 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 861 .hasNext();) { 862 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 863 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 864 .getValue().getSubscriptionInfo().newInput())); 865 subscriptions.add(info); 866 867 } 868 } 869 }); 870 } finally { 871 indexLock.writeLock().unlock(); 872 } 873 874 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 875 subscriptions.toArray(rc); 876 return rc; 877 } 878 879 @Override 880 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 881 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 882 indexLock.writeLock().lock(); 883 try { 884 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 885 @Override 886 public SubscriptionInfo execute(Transaction tx) throws IOException { 887 StoredDestination sd = getStoredDestination(dest, tx); 888 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 889 if (command == null) { 890 return null; 891 } 892 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 893 .getSubscriptionInfo().newInput())); 894 } 895 }); 896 } finally { 897 indexLock.writeLock().unlock(); 898 } 899 } 900 901 @Override 902 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 903 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 904 905 if (isEnableSubscriptionStatistics()) { 906 return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount(); 907 } else { 908 909 indexLock.writeLock().lock(); 910 try { 911 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 912 @Override 913 public Integer execute(Transaction tx) throws IOException { 914 StoredDestination sd = getStoredDestination(dest, tx); 915 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 916 if (cursorPos == null) { 917 // The subscription might not exist. 918 return 0; 919 } 920 921 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 922 } 923 }); 924 } finally { 925 indexLock.writeLock().unlock(); 926 } 927 } 928 } 929 930 931 @Override 932 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 933 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 934 if (isEnableSubscriptionStatistics()) { 935 return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize(); 936 } else { 937 indexLock.writeLock().lock(); 938 try { 939 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 940 @Override 941 public Integer execute(Transaction tx) throws IOException { 942 StoredDestination sd = getStoredDestination(dest, tx); 943 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 944 if (cursorPos == null) { 945 // The subscription might not exist. 946 return 0; 947 } 948 949 return (int) getStoredMessageSize(tx, sd, subscriptionKey); 950 } 951 }); 952 } finally { 953 indexLock.writeLock().unlock(); 954 } 955 } 956 } 957 958 959 protected void recoverMessageStoreSubMetrics() throws IOException { 960 if (isEnableSubscriptionStatistics()) { 961 962 final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); 963 indexLock.writeLock().lock(); 964 try { 965 pageFile.tx().execute(new Transaction.Closure<IOException>() { 966 @Override 967 public void execute(Transaction tx) throws IOException { 968 StoredDestination sd = getStoredDestination(dest, tx); 969 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions 970 .iterator(tx); iterator.hasNext();) { 971 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 972 973 String subscriptionKey = entry.getKey(); 974 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 975 if (cursorPos != null) { 976 long size = getStoredMessageSize(tx, sd, subscriptionKey); 977 statistics.getMessageCount(subscriptionKey) 978 .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); 979 statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0); 980 } 981 } 982 } 983 }); 984 } finally { 985 indexLock.writeLock().unlock(); 986 } 987 } 988 } 989 990 @Override 991 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 992 throws Exception { 993 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 994 @SuppressWarnings("unused") 995 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 996 indexLock.writeLock().lock(); 997 try { 998 pageFile.tx().execute(new Transaction.Closure<Exception>() { 999 @Override 1000 public void execute(Transaction tx) throws Exception { 1001 StoredDestination sd = getStoredDestination(dest, tx); 1002 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 1003 sd.orderIndex.setBatch(tx, cursorPos); 1004 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 1005 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 1006 .hasNext();) { 1007 Entry<Long, MessageKeys> entry = iterator.next(); 1008 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1009 continue; 1010 } 1011 listener.recoverMessage(loadMessage(entry.getValue().location)); 1012 } 1013 sd.orderIndex.resetCursorPosition(); 1014 } 1015 }); 1016 } finally { 1017 indexLock.writeLock().unlock(); 1018 } 1019 } 1020 1021 @Override 1022 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 1023 final MessageRecoveryListener listener) throws Exception { 1024 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1025 @SuppressWarnings("unused") 1026 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 1027 indexLock.writeLock().lock(); 1028 try { 1029 pageFile.tx().execute(new Transaction.Closure<Exception>() { 1030 @Override 1031 public void execute(Transaction tx) throws Exception { 1032 StoredDestination sd = getStoredDestination(dest, tx); 1033 sd.orderIndex.resetCursorPosition(); 1034 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 1035 if (moc == null) { 1036 LastAck pos = getLastAck(tx, sd, subscriptionKey); 1037 if (pos == null) { 1038 // sub deleted 1039 return; 1040 } 1041 sd.orderIndex.setBatch(tx, pos); 1042 moc = sd.orderIndex.cursor; 1043 } else { 1044 sd.orderIndex.cursor.sync(moc); 1045 } 1046 1047 Entry<Long, MessageKeys> entry = null; 1048 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 1049 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 1050 .hasNext();) { 1051 entry = iterator.next(); 1052 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 1053 continue; 1054 } 1055 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 1056 counter++; 1057 } 1058 if (counter >= maxReturned || listener.hasSpace() == false) { 1059 break; 1060 } 1061 } 1062 sd.orderIndex.stoppedIterating(); 1063 if (entry != null) { 1064 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1065 sd.subscriptionCursors.put(subscriptionKey, copy); 1066 } 1067 } 1068 }); 1069 } finally { 1070 indexLock.writeLock().unlock(); 1071 } 1072 } 1073 1074 @Override 1075 public void resetBatching(String clientId, String subscriptionName) { 1076 try { 1077 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1078 indexLock.writeLock().lock(); 1079 try { 1080 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1081 @Override 1082 public void execute(Transaction tx) throws IOException { 1083 StoredDestination sd = getStoredDestination(dest, tx); 1084 sd.subscriptionCursors.remove(subscriptionKey); 1085 } 1086 }); 1087 }finally { 1088 indexLock.writeLock().unlock(); 1089 } 1090 } catch (IOException e) { 1091 throw new RuntimeException(e); 1092 } 1093 } 1094 1095 @Override 1096 public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() { 1097 return messageStoreSubStats; 1098 } 1099 } 1100 1101 String subscriptionKey(String clientId, String subscriptionName) { 1102 return clientId + ":" + subscriptionName; 1103 } 1104 1105 @Override 1106 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1107 String key = key(convert(destination)); 1108 MessageStore store = storeCache.get(key(convert(destination))); 1109 if (store == null) { 1110 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1111 store = storeCache.putIfAbsent(key, queueStore); 1112 if (store == null) { 1113 store = queueStore; 1114 } 1115 } 1116 1117 return store; 1118 } 1119 1120 @Override 1121 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1122 String key = key(convert(destination)); 1123 MessageStore store = storeCache.get(key(convert(destination))); 1124 if (store == null) { 1125 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1126 store = storeCache.putIfAbsent(key, topicStore); 1127 if (store == null) { 1128 store = topicStore; 1129 } 1130 } 1131 1132 return (TopicMessageStore) store; 1133 } 1134 1135 /** 1136 * Cleanup method to remove any state associated with the given destination. 1137 * This method does not stop the message store (it might not be cached). 1138 * 1139 * @param destination 1140 * Destination to forget 1141 */ 1142 @Override 1143 public void removeQueueMessageStore(ActiveMQQueue destination) { 1144 } 1145 1146 /** 1147 * Cleanup method to remove any state associated with the given destination 1148 * This method does not stop the message store (it might not be cached). 1149 * 1150 * @param destination 1151 * Destination to forget 1152 */ 1153 @Override 1154 public void removeTopicMessageStore(ActiveMQTopic destination) { 1155 } 1156 1157 @Override 1158 public void deleteAllMessages() throws IOException { 1159 deleteAllMessages = true; 1160 } 1161 1162 @Override 1163 public Set<ActiveMQDestination> getDestinations() { 1164 try { 1165 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1166 indexLock.writeLock().lock(); 1167 try { 1168 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1169 @Override 1170 public void execute(Transaction tx) throws IOException { 1171 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1172 .hasNext();) { 1173 Entry<String, StoredDestination> entry = iterator.next(); 1174 //Removing isEmpty topic check - see AMQ-5875 1175 rc.add(convert(entry.getKey())); 1176 } 1177 } 1178 }); 1179 }finally { 1180 indexLock.writeLock().unlock(); 1181 } 1182 return rc; 1183 } catch (IOException e) { 1184 throw new RuntimeException(e); 1185 } 1186 } 1187 1188 @Override 1189 public long getLastMessageBrokerSequenceId() throws IOException { 1190 return 0; 1191 } 1192 1193 @Override 1194 public long getLastProducerSequenceId(ProducerId id) { 1195 indexLock.writeLock().lock(); 1196 try { 1197 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1198 } finally { 1199 indexLock.writeLock().unlock(); 1200 } 1201 } 1202 1203 @Override 1204 public long size() { 1205 try { 1206 return journalSize.get() + getPageFile().getDiskSize(); 1207 } catch (IOException e) { 1208 throw new RuntimeException(e); 1209 } 1210 } 1211 1212 @Override 1213 public void beginTransaction(ConnectionContext context) throws IOException { 1214 throw new IOException("Not yet implemented."); 1215 } 1216 @Override 1217 public void commitTransaction(ConnectionContext context) throws IOException { 1218 throw new IOException("Not yet implemented."); 1219 } 1220 @Override 1221 public void rollbackTransaction(ConnectionContext context) throws IOException { 1222 throw new IOException("Not yet implemented."); 1223 } 1224 1225 @Override 1226 public void checkpoint(boolean sync) throws IOException { 1227 super.checkpointCleanup(sync); 1228 } 1229 1230 // ///////////////////////////////////////////////////////////////// 1231 // Internal helper methods. 1232 // ///////////////////////////////////////////////////////////////// 1233 1234 /** 1235 * @param location 1236 * @return 1237 * @throws IOException 1238 */ 1239 Message loadMessage(Location location) throws IOException { 1240 try { 1241 JournalCommand<?> command = load(location); 1242 KahaAddMessageCommand addMessage = null; 1243 switch (command.type()) { 1244 case KAHA_UPDATE_MESSAGE_COMMAND: 1245 addMessage = ((KahaUpdateMessageCommand) command).getMessage(); 1246 break; 1247 default: 1248 addMessage = (KahaAddMessageCommand) command; 1249 } 1250 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1251 return msg; 1252 } catch (IOException ioe) { 1253 LOG.error("Failed to load message at: {}", location , ioe); 1254 brokerService.handleIOException(ioe); 1255 throw ioe; 1256 } 1257 } 1258 1259 // ///////////////////////////////////////////////////////////////// 1260 // Internal conversion methods. 1261 // ///////////////////////////////////////////////////////////////// 1262 1263 KahaLocation convert(Location location) { 1264 KahaLocation rc = new KahaLocation(); 1265 rc.setLogId(location.getDataFileId()); 1266 rc.setOffset(location.getOffset()); 1267 return rc; 1268 } 1269 1270 KahaDestination convert(ActiveMQDestination dest) { 1271 KahaDestination rc = new KahaDestination(); 1272 rc.setName(dest.getPhysicalName()); 1273 switch (dest.getDestinationType()) { 1274 case ActiveMQDestination.QUEUE_TYPE: 1275 rc.setType(DestinationType.QUEUE); 1276 return rc; 1277 case ActiveMQDestination.TOPIC_TYPE: 1278 rc.setType(DestinationType.TOPIC); 1279 return rc; 1280 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1281 rc.setType(DestinationType.TEMP_QUEUE); 1282 return rc; 1283 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1284 rc.setType(DestinationType.TEMP_TOPIC); 1285 return rc; 1286 default: 1287 return null; 1288 } 1289 } 1290 1291 ActiveMQDestination convert(String dest) { 1292 int p = dest.indexOf(":"); 1293 if (p < 0) { 1294 throw new IllegalArgumentException("Not in the valid destination format"); 1295 } 1296 int type = Integer.parseInt(dest.substring(0, p)); 1297 String name = dest.substring(p + 1); 1298 return convert(type, name); 1299 } 1300 1301 private ActiveMQDestination convert(KahaDestination commandDestination) { 1302 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1303 } 1304 1305 private ActiveMQDestination convert(int type, String name) { 1306 switch (KahaDestination.DestinationType.valueOf(type)) { 1307 case QUEUE: 1308 return new ActiveMQQueue(name); 1309 case TOPIC: 1310 return new ActiveMQTopic(name); 1311 case TEMP_QUEUE: 1312 return new ActiveMQTempQueue(name); 1313 case TEMP_TOPIC: 1314 return new ActiveMQTempTopic(name); 1315 default: 1316 throw new IllegalArgumentException("Not in the valid destination format"); 1317 } 1318 } 1319 1320 public TransactionIdTransformer getTransactionIdTransformer() { 1321 return transactionIdTransformer; 1322 } 1323 1324 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1325 this.transactionIdTransformer = transactionIdTransformer; 1326 } 1327 1328 static class AsyncJobKey { 1329 MessageId id; 1330 ActiveMQDestination destination; 1331 1332 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1333 this.id = id; 1334 this.destination = destination; 1335 } 1336 1337 @Override 1338 public boolean equals(Object obj) { 1339 if (obj == this) { 1340 return true; 1341 } 1342 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1343 && destination.equals(((AsyncJobKey) obj).destination); 1344 } 1345 1346 @Override 1347 public int hashCode() { 1348 return id.hashCode() + destination.hashCode(); 1349 } 1350 1351 @Override 1352 public String toString() { 1353 return destination.getPhysicalName() + "-" + id; 1354 } 1355 } 1356 1357 public interface StoreTask { 1358 public boolean cancel(); 1359 1360 public void aquireLocks(); 1361 1362 public void releaseLocks(); 1363 } 1364 1365 class StoreQueueTask implements Runnable, StoreTask { 1366 protected final Message message; 1367 protected final ConnectionContext context; 1368 protected final KahaDBMessageStore store; 1369 protected final InnerFutureTask future; 1370 protected final AtomicBoolean done = new AtomicBoolean(); 1371 protected final AtomicBoolean locked = new AtomicBoolean(); 1372 1373 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1374 this.store = store; 1375 this.context = context; 1376 this.message = message; 1377 this.future = new InnerFutureTask(this); 1378 } 1379 1380 public ListenableFuture<Object> getFuture() { 1381 return this.future; 1382 } 1383 1384 @Override 1385 public boolean cancel() { 1386 if (this.done.compareAndSet(false, true)) { 1387 return this.future.cancel(false); 1388 } 1389 return false; 1390 } 1391 1392 @Override 1393 public void aquireLocks() { 1394 if (this.locked.compareAndSet(false, true)) { 1395 try { 1396 globalQueueSemaphore.acquire(); 1397 store.acquireLocalAsyncLock(); 1398 message.incrementReferenceCount(); 1399 } catch (InterruptedException e) { 1400 LOG.warn("Failed to aquire lock", e); 1401 } 1402 } 1403 1404 } 1405 1406 @Override 1407 public void releaseLocks() { 1408 if (this.locked.compareAndSet(true, false)) { 1409 store.releaseLocalAsyncLock(); 1410 globalQueueSemaphore.release(); 1411 message.decrementReferenceCount(); 1412 } 1413 } 1414 1415 @Override 1416 public void run() { 1417 this.store.doneTasks++; 1418 try { 1419 if (this.done.compareAndSet(false, true)) { 1420 this.store.addMessage(context, message); 1421 removeQueueTask(this.store, this.message.getMessageId()); 1422 this.future.complete(); 1423 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1424 System.err.println(this.store.dest.getName() + " cancelled: " 1425 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1426 this.store.canceledTasks = this.store.doneTasks = 0; 1427 } 1428 } catch (Exception e) { 1429 this.future.setException(e); 1430 } 1431 } 1432 1433 protected Message getMessage() { 1434 return this.message; 1435 } 1436 1437 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1438 1439 private Runnable listener; 1440 public InnerFutureTask(Runnable runnable) { 1441 super(runnable, null); 1442 1443 } 1444 1445 public void setException(final Exception e) { 1446 super.setException(e); 1447 } 1448 1449 public void complete() { 1450 super.set(null); 1451 } 1452 1453 @Override 1454 public void done() { 1455 fireListener(); 1456 } 1457 1458 @Override 1459 public void addListener(Runnable listener) { 1460 this.listener = listener; 1461 if (isDone()) { 1462 fireListener(); 1463 } 1464 } 1465 1466 private void fireListener() { 1467 if (listener != null) { 1468 try { 1469 listener.run(); 1470 } catch (Exception ignored) { 1471 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1472 } 1473 } 1474 } 1475 } 1476 } 1477 1478 class StoreTopicTask extends StoreQueueTask { 1479 private final int subscriptionCount; 1480 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1481 private final KahaDBTopicMessageStore topicStore; 1482 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1483 int subscriptionCount) { 1484 super(store, context, message); 1485 this.topicStore = store; 1486 this.subscriptionCount = subscriptionCount; 1487 1488 } 1489 1490 @Override 1491 public void aquireLocks() { 1492 if (this.locked.compareAndSet(false, true)) { 1493 try { 1494 globalTopicSemaphore.acquire(); 1495 store.acquireLocalAsyncLock(); 1496 message.incrementReferenceCount(); 1497 } catch (InterruptedException e) { 1498 LOG.warn("Failed to aquire lock", e); 1499 } 1500 } 1501 } 1502 1503 @Override 1504 public void releaseLocks() { 1505 if (this.locked.compareAndSet(true, false)) { 1506 message.decrementReferenceCount(); 1507 store.releaseLocalAsyncLock(); 1508 globalTopicSemaphore.release(); 1509 } 1510 } 1511 1512 /** 1513 * add a key 1514 * 1515 * @param key 1516 * @return true if all acknowledgements received 1517 */ 1518 public boolean addSubscriptionKey(String key) { 1519 synchronized (this.subscriptionKeys) { 1520 this.subscriptionKeys.add(key); 1521 } 1522 return this.subscriptionKeys.size() >= this.subscriptionCount; 1523 } 1524 1525 @Override 1526 public void run() { 1527 this.store.doneTasks++; 1528 try { 1529 if (this.done.compareAndSet(false, true)) { 1530 this.topicStore.addMessage(context, message); 1531 // apply any acks we have 1532 synchronized (this.subscriptionKeys) { 1533 for (String key : this.subscriptionKeys) { 1534 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1535 1536 } 1537 } 1538 removeTopicTask(this.topicStore, this.message.getMessageId()); 1539 this.future.complete(); 1540 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1541 System.err.println(this.store.dest.getName() + " cancelled: " 1542 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1543 this.store.canceledTasks = this.store.doneTasks = 0; 1544 } 1545 } catch (Exception e) { 1546 this.future.setException(e); 1547 } 1548 } 1549 } 1550 1551 public class StoreTaskExecutor extends ThreadPoolExecutor { 1552 1553 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1554 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1555 } 1556 1557 @Override 1558 protected void afterExecute(Runnable runnable, Throwable throwable) { 1559 super.afterExecute(runnable, throwable); 1560 1561 if (runnable instanceof StoreTask) { 1562 ((StoreTask)runnable).releaseLocks(); 1563 } 1564 } 1565 } 1566 1567 @Override 1568 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1569 return new JobSchedulerStoreImpl(); 1570 } 1571 1572 /* (non-Javadoc) 1573 * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal() 1574 */ 1575 @Override 1576 public boolean isPersistNoLocal() { 1577 // Prior to v11 the broker did not store the noLocal value for durable subs. 1578 return brokerService.getStoreOpenWireVersion() >= 11; 1579 } 1580}