001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.security.GeneralSecurityException; 021import java.security.cert.X509Certificate; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Properties; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentMap; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.Future; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.atomic.AtomicBoolean; 039 040import javax.management.ObjectName; 041 042import org.apache.activemq.DestinationDoesNotExistException; 043import org.apache.activemq.Service; 044import org.apache.activemq.advisory.AdvisoryBroker; 045import org.apache.activemq.advisory.AdvisorySupport; 046import org.apache.activemq.broker.BrokerService; 047import org.apache.activemq.broker.BrokerServiceAware; 048import org.apache.activemq.broker.ConnectionContext; 049import org.apache.activemq.broker.TransportConnection; 050import org.apache.activemq.broker.region.AbstractRegion; 051import org.apache.activemq.broker.region.DurableTopicSubscription; 052import org.apache.activemq.broker.region.Region; 053import org.apache.activemq.broker.region.RegionBroker; 054import org.apache.activemq.broker.region.Subscription; 055import org.apache.activemq.broker.region.policy.PolicyEntry; 056import org.apache.activemq.command.ActiveMQDestination; 057import org.apache.activemq.command.ActiveMQMessage; 058import org.apache.activemq.command.ActiveMQTempDestination; 059import org.apache.activemq.command.ActiveMQTopic; 060import org.apache.activemq.command.BrokerId; 061import org.apache.activemq.command.BrokerInfo; 062import org.apache.activemq.command.BrokerSubscriptionInfo; 063import org.apache.activemq.command.Command; 064import org.apache.activemq.command.CommandTypes; 065import org.apache.activemq.command.ConnectionError; 066import org.apache.activemq.command.ConnectionId; 067import org.apache.activemq.command.ConnectionInfo; 068import org.apache.activemq.command.ConsumerId; 069import org.apache.activemq.command.ConsumerInfo; 070import org.apache.activemq.command.DataStructure; 071import org.apache.activemq.command.DestinationInfo; 072import org.apache.activemq.command.ExceptionResponse; 073import org.apache.activemq.command.KeepAliveInfo; 074import org.apache.activemq.command.Message; 075import org.apache.activemq.command.MessageAck; 076import org.apache.activemq.command.MessageDispatch; 077import org.apache.activemq.command.MessageId; 078import org.apache.activemq.command.NetworkBridgeFilter; 079import org.apache.activemq.command.ProducerInfo; 080import org.apache.activemq.command.RemoveInfo; 081import org.apache.activemq.command.RemoveSubscriptionInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionInfo; 084import org.apache.activemq.command.ShutdownInfo; 085import org.apache.activemq.command.SubscriptionInfo; 086import org.apache.activemq.command.WireFormatInfo; 087import org.apache.activemq.filter.DestinationFilter; 088import org.apache.activemq.filter.MessageEvaluationContext; 089import org.apache.activemq.security.SecurityContext; 090import org.apache.activemq.transport.DefaultTransportListener; 091import org.apache.activemq.transport.FutureResponse; 092import org.apache.activemq.transport.ResponseCallback; 093import org.apache.activemq.transport.Transport; 094import org.apache.activemq.transport.TransportDisposedIOException; 095import org.apache.activemq.transport.TransportFilter; 096import org.apache.activemq.transport.tcp.SslTransport; 097import org.apache.activemq.transport.tcp.TcpTransport; 098import org.apache.activemq.util.IdGenerator; 099import org.apache.activemq.util.IntrospectionSupport; 100import org.apache.activemq.util.LongSequenceGenerator; 101import org.apache.activemq.util.MarshallingSupport; 102import org.apache.activemq.util.ServiceStopper; 103import org.apache.activemq.util.ServiceSupport; 104import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter; 105import org.slf4j.Logger; 106import org.slf4j.LoggerFactory; 107 108/** 109 * A useful base class for implementing demand forwarding bridges. 110 */ 111public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 112 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); 113 protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; 114 protected final Transport localBroker; 115 protected final Transport remoteBroker; 116 protected IdGenerator idGenerator = new IdGenerator(); 117 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 118 protected ConnectionInfo localConnectionInfo; 119 protected ConnectionInfo remoteConnectionInfo; 120 protected SessionInfo localSessionInfo; 121 protected ProducerInfo producerInfo; 122 protected String remoteBrokerName = "Unknown"; 123 protected String localClientId; 124 protected ConsumerInfo demandConsumerInfo; 125 protected int demandConsumerDispatched; 126 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 127 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 128 protected final AtomicBoolean bridgeFailed = new AtomicBoolean(); 129 protected final AtomicBoolean disposed = new AtomicBoolean(); 130 protected BrokerId localBrokerId; 131 protected ActiveMQDestination[] excludedDestinations; 132 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 133 protected ActiveMQDestination[] staticallyIncludedDestinations; 134 protected ActiveMQDestination[] durableDestinations; 135 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>(); 136 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>(); 137 protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>()); 138 protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; 139 protected final CountDownLatch startedLatch = new CountDownLatch(2); 140 protected final CountDownLatch localStartedLatch = new CountDownLatch(1); 141 protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1); 142 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 143 protected NetworkBridgeConfiguration configuration; 144 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); 145 146 protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null}; 147 protected BrokerId remoteBrokerId; 148 149 protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics(); 150 151 private NetworkBridgeListener networkBridgeListener; 152 private boolean createdByDuplex; 153 private BrokerInfo localBrokerInfo; 154 private BrokerInfo remoteBrokerInfo; 155 156 private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed); 157 private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed); 158 159 private final AtomicBoolean started = new AtomicBoolean(); 160 private TransportConnection duplexInitiatingConnection; 161 private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean(); 162 protected BrokerService brokerService = null; 163 private ObjectName mbeanObjectName; 164 private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); 165 private Transport duplexInboundLocalBroker = null; 166 private ProducerInfo duplexInboundLocalProducerInfo; 167 168 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 169 this.configuration = configuration; 170 this.localBroker = localBroker; 171 this.remoteBroker = remoteBroker; 172 } 173 174 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 175 this.localBrokerInfo = localBrokerInfo; 176 this.remoteBrokerInfo = remoteBrokerInfo; 177 this.duplexInitiatingConnection = connection; 178 start(); 179 serviceRemoteCommand(remoteBrokerInfo); 180 } 181 182 @Override 183 public void start() throws Exception { 184 if (started.compareAndSet(false, true)) { 185 186 if (brokerService == null) { 187 throw new IllegalArgumentException("BrokerService is null on " + this); 188 } 189 190 networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics()); 191 192 if (isDuplex()) { 193 duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker()); 194 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() { 195 196 @Override 197 public void onCommand(Object o) { 198 Command command = (Command) o; 199 serviceLocalCommand(command); 200 } 201 202 @Override 203 public void onException(IOException error) { 204 serviceLocalException(error); 205 } 206 }); 207 duplexInboundLocalBroker.start(); 208 } 209 210 localBroker.setTransportListener(new DefaultTransportListener() { 211 212 @Override 213 public void onCommand(Object o) { 214 Command command = (Command) o; 215 serviceLocalCommand(command); 216 } 217 218 @Override 219 public void onException(IOException error) { 220 if (!futureLocalBrokerInfo.isDone()) { 221 futureLocalBrokerInfo.cancel(true); 222 return; 223 } 224 serviceLocalException(error); 225 } 226 }); 227 228 remoteBroker.setTransportListener(new DefaultTransportListener() { 229 230 @Override 231 public void onCommand(Object o) { 232 Command command = (Command) o; 233 serviceRemoteCommand(command); 234 } 235 236 @Override 237 public void onException(IOException error) { 238 if (!futureRemoteBrokerInfo.isDone()) { 239 futureRemoteBrokerInfo.cancel(true); 240 return; 241 } 242 serviceRemoteException(error); 243 } 244 }); 245 246 remoteBroker.start(); 247 localBroker.start(); 248 249 if (!disposed.get()) { 250 try { 251 triggerStartAsyncNetworkBridgeCreation(); 252 } catch (IOException e) { 253 LOG.warn("Caught exception from remote start", e); 254 } 255 } else { 256 LOG.warn("Bridge was disposed before the start() method was fully executed."); 257 throw new TransportDisposedIOException(); 258 } 259 } 260 } 261 262 @Override 263 public void stop() throws Exception { 264 if (started.compareAndSet(true, false)) { 265 if (disposed.compareAndSet(false, true)) { 266 LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName); 267 268 futureRemoteBrokerInfo.cancel(true); 269 futureLocalBrokerInfo.cancel(true); 270 271 NetworkBridgeListener l = this.networkBridgeListener; 272 if (l != null) { 273 l.onStop(this); 274 } 275 try { 276 // local start complete 277 if (startedLatch.getCount() < 2) { 278 LOG.trace("{} unregister bridge ({}) to {}", new Object[]{ 279 configuration.getBrokerName(), this, remoteBrokerName 280 }); 281 brokerService.getBroker().removeBroker(null, remoteBrokerInfo); 282 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); 283 } 284 285 remoteBridgeStarted.set(false); 286 final CountDownLatch sendShutdown = new CountDownLatch(1); 287 288 brokerService.getTaskRunnerFactory().execute(new Runnable() { 289 @Override 290 public void run() { 291 try { 292 serialExecutor.shutdown(); 293 if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 294 List<Runnable> pendingTasks = serialExecutor.shutdownNow(); 295 LOG.info("pending tasks on stop {}", pendingTasks); 296 } 297 localBroker.oneway(new ShutdownInfo()); 298 remoteBroker.oneway(new ShutdownInfo()); 299 } catch (Throwable e) { 300 LOG.debug("Caught exception sending shutdown", e); 301 } finally { 302 sendShutdown.countDown(); 303 } 304 305 } 306 }, "ActiveMQ ForwardingBridge StopTask"); 307 308 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 309 LOG.info("Network Could not shutdown in a timely manner"); 310 } 311 } finally { 312 ServiceStopper ss = new ServiceStopper(); 313 ss.stop(remoteBroker); 314 ss.stop(localBroker); 315 ss.stop(duplexInboundLocalBroker); 316 // Release the started Latch since another thread could be 317 // stuck waiting for it to start up. 318 startedLatch.countDown(); 319 startedLatch.countDown(); 320 localStartedLatch.countDown(); 321 staticDestinationsLatch.countDown(); 322 323 ss.throwFirstException(); 324 } 325 } 326 327 LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName); 328 } 329 } 330 331 protected void triggerStartAsyncNetworkBridgeCreation() throws IOException { 332 brokerService.getTaskRunnerFactory().execute(new Runnable() { 333 @Override 334 public void run() { 335 final String originalName = Thread.currentThread().getName(); 336 Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " + 337 "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); 338 339 try { 340 // First we collect the info data from both the local and remote ends 341 collectBrokerInfos(); 342 343 // Once we have all required broker info we can attempt to start 344 // the local and then remote sides of the bridge. 345 doStartLocalAndRemoteBridges(); 346 } finally { 347 Thread.currentThread().setName(originalName); 348 } 349 } 350 }); 351 } 352 353 private void collectBrokerInfos() { 354 int timeout = 30000; 355 TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class); 356 if (tcpTransport != null) { 357 timeout = tcpTransport.getConnectionTimeout(); 358 } 359 360 // First wait for the remote to feed us its BrokerInfo, then we can check on 361 // the LocalBrokerInfo and decide is this is a loop. 362 try { 363 remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS); 364 if (remoteBrokerInfo == null) { 365 serviceLocalException(new Throwable("remoteBrokerInfo is null")); 366 return; 367 } 368 } catch (Exception e) { 369 serviceRemoteException(e); 370 return; 371 } 372 373 try { 374 localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS); 375 if (localBrokerInfo == null) { 376 serviceLocalException(new Throwable("localBrokerInfo is null")); 377 return; 378 } 379 380 // Before we try and build the bridge lets check if we are in a loop 381 // and if so just stop now before registering anything. 382 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 383 if (localBrokerId.equals(remoteBrokerId)) { 384 LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{ 385 configuration.getBrokerName(), remoteBrokerName, remoteBrokerId 386 }); 387 ServiceSupport.dispose(localBroker); 388 ServiceSupport.dispose(remoteBroker); 389 // the bridge is left in a bit of limbo, but it won't get retried 390 // in this state. 391 return; 392 } 393 394 // Fill in the remote broker's information now. 395 remoteBrokerPath[0] = remoteBrokerId; 396 remoteBrokerName = remoteBrokerInfo.getBrokerName(); 397 if (configuration.isUseBrokerNamesAsIdSeed()) { 398 idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName); 399 } 400 } catch (Throwable e) { 401 serviceLocalException(e); 402 } 403 } 404 405 private void doStartLocalAndRemoteBridges() { 406 407 if (disposed.get()) { 408 return; 409 } 410 411 if (isCreatedByDuplex()) { 412 // apply remote (propagated) configuration to local duplex bridge before start 413 Properties props = null; 414 try { 415 props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 416 IntrospectionSupport.getProperties(configuration, props, null); 417 if (configuration.getExcludedDestinations() != null) { 418 excludedDestinations = configuration.getExcludedDestinations().toArray( 419 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 420 } 421 if (configuration.getStaticallyIncludedDestinations() != null) { 422 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 423 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 424 } 425 if (configuration.getDynamicallyIncludedDestinations() != null) { 426 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( 427 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); 428 } 429 } catch (Throwable t) { 430 LOG.error("Error mapping remote configuration: {}", props, t); 431 } 432 } 433 434 try { 435 startLocalBridge(); 436 } catch (Throwable e) { 437 serviceLocalException(e); 438 return; 439 } 440 441 try { 442 startRemoteBridge(); 443 } catch (Throwable e) { 444 serviceRemoteException(e); 445 return; 446 } 447 448 try { 449 if (safeWaitUntilStarted()) { 450 setupStaticDestinations(); 451 staticDestinationsLatch.countDown(); 452 } 453 } catch (Throwable e) { 454 serviceLocalException(e); 455 } 456 } 457 458 private void startLocalBridge() throws Throwable { 459 if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) { 460 synchronized (this) { 461 LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker); 462 if (!disposed.get()) { 463 464 if (idGenerator == null) { 465 throw new IllegalStateException("Id Generator cannot be null"); 466 } 467 468 localConnectionInfo = new ConnectionInfo(); 469 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 470 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 471 localConnectionInfo.setClientId(localClientId); 472 localConnectionInfo.setUserName(configuration.getUserName()); 473 localConnectionInfo.setPassword(configuration.getPassword()); 474 Transport originalTransport = remoteBroker; 475 while (originalTransport instanceof TransportFilter) { 476 originalTransport = ((TransportFilter) originalTransport).getNext(); 477 } 478 if (originalTransport instanceof SslTransport) { 479 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 480 localConnectionInfo.setTransportContext(peerCerts); 481 } 482 // sync requests that may fail 483 Object resp = localBroker.request(localConnectionInfo); 484 if (resp instanceof ExceptionResponse) { 485 throw ((ExceptionResponse) resp).getException(); 486 } 487 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 488 localBroker.oneway(localSessionInfo); 489 490 if (configuration.isDuplex()) { 491 // separate in-bound channel for forwards so we don't 492 // contend with out-bound dispatch on same connection 493 remoteBrokerInfo.setNetworkConnection(true); 494 duplexInboundLocalBroker.oneway(remoteBrokerInfo); 495 496 ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); 497 duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 498 duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" 499 + configuration.getBrokerName()); 500 duplexLocalConnectionInfo.setUserName(configuration.getUserName()); 501 duplexLocalConnectionInfo.setPassword(configuration.getPassword()); 502 503 if (originalTransport instanceof SslTransport) { 504 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); 505 duplexLocalConnectionInfo.setTransportContext(peerCerts); 506 } 507 // sync requests that may fail 508 resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo); 509 if (resp instanceof ExceptionResponse) { 510 throw ((ExceptionResponse) resp).getException(); 511 } 512 SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1); 513 duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1); 514 duplexInboundLocalBroker.oneway(duplexInboundSession); 515 duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo); 516 } 517 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString()); 518 NetworkBridgeListener l = this.networkBridgeListener; 519 if (l != null) { 520 l.onStart(this); 521 } 522 523 // Let the local broker know the remote broker's ID. 524 localBroker.oneway(remoteBrokerInfo); 525 // new peer broker (a consumer can work with remote broker also) 526 brokerService.getBroker().addBroker(null, remoteBrokerInfo); 527 528 LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{ 529 localBroker, remoteBroker, remoteBrokerName 530 }); 531 LOG.trace("{} register bridge ({}) to {}", new Object[]{ 532 configuration.getBrokerName(), this, remoteBrokerName 533 }); 534 } else { 535 LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed."); 536 } 537 startedLatch.countDown(); 538 localStartedLatch.countDown(); 539 } 540 } 541 } 542 543 protected void startRemoteBridge() throws Exception { 544 if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) { 545 LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker); 546 synchronized (this) { 547 if (!isCreatedByDuplex()) { 548 BrokerInfo brokerInfo = new BrokerInfo(); 549 brokerInfo.setBrokerName(configuration.getBrokerName()); 550 brokerInfo.setBrokerURL(configuration.getBrokerURL()); 551 brokerInfo.setNetworkConnection(true); 552 brokerInfo.setDuplexConnection(configuration.isDuplex()); 553 // set our properties 554 Properties props = new Properties(); 555 IntrospectionSupport.getProperties(configuration, props, null); 556 557 String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations"; 558 String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations"; 559 560 if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) { 561 props.put(dynamicallyIncludedDestinationsKey, 562 StringToListOfActiveMQDestinationConverter. 563 convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true)); 564 } 565 if (!configuration.getStaticallyIncludedDestinations().isEmpty()) { 566 props.put(staticallyIncludedDestinationsKey, 567 StringToListOfActiveMQDestinationConverter. 568 convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true)); 569 } 570 571 props.remove("networkTTL"); 572 String str = MarshallingSupport.propertiesToString(props); 573 brokerInfo.setNetworkProperties(str); 574 brokerInfo.setBrokerId(this.localBrokerId); 575 remoteBroker.oneway(brokerInfo); 576 if (configuration.isSyncDurableSubs() && 577 remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { 578 remoteBroker.oneway(TransportConnection.getBrokerSubscriptionInfo(brokerService)); 579 } 580 } 581 if (remoteConnectionInfo != null) { 582 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 583 } 584 remoteConnectionInfo = new ConnectionInfo(); 585 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 586 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); 587 remoteConnectionInfo.setUserName(configuration.getUserName()); 588 remoteConnectionInfo.setPassword(configuration.getPassword()); 589 remoteBroker.oneway(remoteConnectionInfo); 590 591 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 592 remoteBroker.oneway(remoteSessionInfo); 593 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 594 producerInfo.setResponseRequired(false); 595 remoteBroker.oneway(producerInfo); 596 // Listen to consumer advisory messages on the remote broker to determine demand. 597 if (!configuration.isStaticBridge()) { 598 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 599 // always dispatch advisory message asynchronously so that 600 // we never block the producer broker if we are slow 601 demandConsumerInfo.setDispatchAsync(true); 602 String advisoryTopic = configuration.getDestinationFilter(); 603 if (configuration.isBridgeTempDestinations()) { 604 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 605 } 606 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 607 configureConsumerPrefetch(demandConsumerInfo); 608 remoteBroker.oneway(demandConsumerInfo); 609 } 610 startedLatch.countDown(); 611 } 612 } 613 } 614 615 @Override 616 public void serviceRemoteException(Throwable error) { 617 if (!disposed.get()) { 618 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 619 LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 620 localBroker, remoteBroker, error 621 }); 622 } else { 623 LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 624 localBroker, remoteBroker, error 625 }); 626 } 627 LOG.debug("The remote Exception was: {}", error, error); 628 brokerService.getTaskRunnerFactory().execute(new Runnable() { 629 @Override 630 public void run() { 631 ServiceSupport.dispose(getControllingService()); 632 } 633 }); 634 fireBridgeFailed(error); 635 } 636 } 637 638 protected void serviceRemoteCommand(Command command) { 639 if (!disposed.get()) { 640 try { 641 if (command.isMessageDispatch()) { 642 safeWaitUntilStarted(); 643 MessageDispatch md = (MessageDispatch) command; 644 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 645 ackAdvisory(md.getMessage()); 646 } else if (command.isBrokerInfo()) { 647 futureRemoteBrokerInfo.set((BrokerInfo) command); 648 } else if (command instanceof BrokerSubscriptionInfo) { 649 staticDestinationsLatch.await(); 650 BrokerSubscriptionInfo subInfo = (BrokerSubscriptionInfo) command; 651 LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}", 652 this.brokerService.getBrokerName(), subInfo.getBrokerName()); 653 654 if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions() 655 && !configuration.isDynamicOnly()) { 656 if (started.get()) { 657 if (subInfo.getSubscriptionInfos() != null) { 658 for (ConsumerInfo info : subInfo.getSubscriptionInfos()) { 659 if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) && 660 matchesDynamicallyIncludedDestinations(info.getDestination())) { 661 serviceRemoteConsumerAdvisory(info); 662 } 663 } 664 } 665 666 //After re-added, clean up any empty durables 667 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 668 DemandSubscription ds = i.next(); 669 if (matchesDynamicallyIncludedDestinations(ds.getLocalInfo().getDestination())) { 670 cleanupDurableSub(ds, i); 671 } 672 } 673 } 674 } 675 } else if (command.getClass() == ConnectionError.class) { 676 ConnectionError ce = (ConnectionError) command; 677 serviceRemoteException(ce.getException()); 678 } else { 679 if (isDuplex()) { 680 LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType()); 681 if (command.isMessage()) { 682 final ActiveMQMessage message = (ActiveMQMessage) command; 683 if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 684 serviceRemoteConsumerAdvisory(message.getDataStructure()); 685 ackAdvisory(message); 686 } else { 687 if (!isPermissableDestination(message.getDestination(), true)) { 688 return; 689 } 690 // message being forwarded - we need to 691 // propagate the response to our local send 692 if (canDuplexDispatch(message)) { 693 message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); 694 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 695 duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { 696 final int correlationId = message.getCommandId(); 697 698 @Override 699 public void onCompletion(FutureResponse resp) { 700 try { 701 Response reply = resp.getResult(); 702 reply.setCorrelationId(correlationId); 703 remoteBroker.oneway(reply); 704 //increment counter when messages are received in duplex mode 705 networkBridgeStatistics.getReceivedCount().increment(); 706 } catch (IOException error) { 707 LOG.error("Exception: {} on duplex forward of: {}", error, message); 708 serviceRemoteException(error); 709 } 710 } 711 }); 712 } else { 713 duplexInboundLocalBroker.oneway(message); 714 networkBridgeStatistics.getReceivedCount().increment(); 715 } 716 serviceInboundMessage(message); 717 } else { 718 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 719 Response reply = new Response(); 720 reply.setCorrelationId(message.getCommandId()); 721 remoteBroker.oneway(reply); 722 } 723 } 724 } 725 } else { 726 switch (command.getDataStructureType()) { 727 case ConnectionInfo.DATA_STRUCTURE_TYPE: 728 if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { 729 // end of initiating connection setup - propogate to initial connection to get mbean by clientid 730 duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); 731 } else { 732 localBroker.oneway(command); 733 } 734 break; 735 case SessionInfo.DATA_STRUCTURE_TYPE: 736 localBroker.oneway(command); 737 break; 738 case ProducerInfo.DATA_STRUCTURE_TYPE: 739 // using duplexInboundLocalProducerInfo 740 break; 741 case MessageAck.DATA_STRUCTURE_TYPE: 742 MessageAck ack = (MessageAck) command; 743 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); 744 if (localSub != null) { 745 ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); 746 localBroker.oneway(ack); 747 } else { 748 LOG.warn("Matching local subscription not found for ack: {}", ack); 749 } 750 break; 751 case ConsumerInfo.DATA_STRUCTURE_TYPE: 752 localStartedLatch.await(); 753 if (started.get()) { 754 addConsumerInfo((ConsumerInfo) command); 755 } else { 756 // received a subscription whilst stopping 757 LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); 758 } 759 break; 760 case ShutdownInfo.DATA_STRUCTURE_TYPE: 761 // initiator is shutting down, controlled case 762 // abortive close dealt with by inactivity monitor 763 LOG.info("Stopping network bridge on shutdown of remote broker"); 764 serviceRemoteException(new IOException(command.toString())); 765 break; 766 default: 767 LOG.debug("Ignoring remote command: {}", command); 768 } 769 } 770 } else { 771 switch (command.getDataStructureType()) { 772 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 773 case WireFormatInfo.DATA_STRUCTURE_TYPE: 774 case ShutdownInfo.DATA_STRUCTURE_TYPE: 775 break; 776 default: 777 LOG.warn("Unexpected remote command: {}", command); 778 } 779 } 780 } 781 } catch (Throwable e) { 782 LOG.debug("Exception processing remote command: {}", command, e); 783 serviceRemoteException(e); 784 } 785 } 786 } 787 788 private void ackAdvisory(Message message) throws IOException { 789 demandConsumerDispatched++; 790 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * 791 (configuration.getAdvisoryAckPercentage() / 100f))) { 792 MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched); 793 ack.setConsumerId(demandConsumerInfo.getConsumerId()); 794 remoteBroker.oneway(ack); 795 demandConsumerDispatched = 0; 796 } 797 } 798 799 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 800 final int networkTTL = configuration.getConsumerTTL(); 801 if (data.getClass() == ConsumerInfo.class) { 802 // Create a new local subscription 803 ConsumerInfo info = (ConsumerInfo) data; 804 BrokerId[] path = info.getBrokerPath(); 805 806 if (info.isBrowser()) { 807 LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName); 808 return; 809 } 810 811 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 812 LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{ 813 configuration.getBrokerName(), remoteBrokerName, networkTTL, info 814 }); 815 return; 816 } 817 818 if (contains(path, localBrokerPath[0])) { 819 // Ignore this consumer as it's a consumer we locally sent to the broker. 820 LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{ 821 configuration.getBrokerName(), remoteBrokerName, info 822 }); 823 return; 824 } 825 826 if (!isPermissableDestination(info.getDestination())) { 827 // ignore if not in the permitted or in the excluded list 828 LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{ 829 configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info 830 }); 831 return; 832 } 833 834 // in a cyclic network there can be multiple bridges per broker that can propagate 835 // a network subscription so there is a need to synchronize on a shared entity 836 synchronized (brokerService.getVmConnectorURI()) { 837 addConsumerInfo(info); 838 } 839 } else if (data.getClass() == DestinationInfo.class) { 840 // It's a destination info - we want to pass up information about temporary destinations 841 final DestinationInfo destInfo = (DestinationInfo) data; 842 BrokerId[] path = destInfo.getBrokerPath(); 843 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 844 LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{ 845 configuration.getBrokerName(), destInfo, networkTTL 846 }); 847 return; 848 } 849 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 850 LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo); 851 return; 852 } 853 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 854 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 855 // re-set connection id so comes from here 856 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 857 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 858 } 859 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 860 LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{ 861 configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo 862 }); 863 if (destInfo.isRemoveOperation()) { 864 // Serialize with removeSub operations such that all removeSub advisories 865 // are generated 866 serialExecutor.execute(new Runnable() { 867 @Override 868 public void run() { 869 try { 870 localBroker.oneway(destInfo); 871 } catch (IOException e) { 872 LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e); 873 } 874 } 875 }); 876 } else { 877 localBroker.oneway(destInfo); 878 } 879 } else if (data.getClass() == RemoveInfo.class) { 880 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 881 removeDemandSubscription(id); 882 883 if (forcedDurableRemoteId.remove(id)) { 884 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 885 DemandSubscription ds = i.next(); 886 boolean removed = ds.removeForcedDurableConsumer(id); 887 if (removed) { 888 cleanupDurableSub(ds, i); 889 } 890 } 891 } 892 893 } else if (data.getClass() == RemoveSubscriptionInfo.class) { 894 RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); 895 SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); 896 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 897 DemandSubscription ds = i.next(); 898 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); 899 if (removed) { 900 cleanupDurableSub(ds, i); 901 } 902 } 903 } 904 } 905 906 private void cleanupDurableSub(final DemandSubscription ds, 907 Iterator<DemandSubscription> i) throws IOException { 908 if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty() 909 && ds.getForcedDurableConsumersSize() == 0) { 910 911 // deactivate subscriber 912 RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); 913 localBroker.oneway(removeInfo); 914 915 // remove subscriber 916 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 917 sending.setClientId(localClientId); 918 sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName()); 919 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 920 localBroker.oneway(sending); 921 922 //remove subscriber from map 923 i.remove(); 924 } 925 } 926 927 @Override 928 public void serviceLocalException(Throwable error) { 929 serviceLocalException(null, error); 930 } 931 932 public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { 933 LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error); 934 if (!disposed.get()) { 935 if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { 936 // not a reason to terminate the bridge - temps can disappear with 937 // pending sends as the demand sub may outlive the remote dest 938 if (messageDispatch != null) { 939 LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); 940 try { 941 MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); 942 poisonAck.setPoisonCause(error); 943 localBroker.oneway(poisonAck); 944 } catch (IOException ioe) { 945 LOG.error("Failed to posion ack message following forward failure: ", ioe); 946 } 947 fireFailedForwardAdvisory(messageDispatch, error); 948 } else { 949 LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error); 950 } 951 return; 952 } 953 954 LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error}); 955 LOG.debug("The local Exception was: {}", error, error); 956 957 brokerService.getTaskRunnerFactory().execute(new Runnable() { 958 @Override 959 public void run() { 960 ServiceSupport.dispose(getControllingService()); 961 } 962 }); 963 fireBridgeFailed(error); 964 } 965 } 966 967 private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) { 968 if (configuration.isAdvisoryForFailedForward()) { 969 AdvisoryBroker advisoryBroker = null; 970 try { 971 advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); 972 973 if (advisoryBroker != null) { 974 ConnectionContext context = new ConnectionContext(); 975 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 976 context.setBroker(brokerService.getBroker()); 977 978 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 979 advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); 980 advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, 981 advisoryMessage); 982 983 } 984 } catch (Exception e) { 985 LOG.warn("failed to fire forward failure advisory, cause: {}", e); 986 LOG.debug("detail", e); 987 } 988 } 989 } 990 991 protected Service getControllingService() { 992 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 993 } 994 995 protected void addSubscription(DemandSubscription sub) throws IOException { 996 if (sub != null) { 997 if (isDuplex()) { 998 // async vm transport, need to wait for completion 999 localBroker.request(sub.getLocalInfo()); 1000 } else { 1001 localBroker.oneway(sub.getLocalInfo()); 1002 } 1003 } 1004 } 1005 1006 protected void removeSubscription(final DemandSubscription sub) throws IOException { 1007 if (sub != null) { 1008 LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()}); 1009 1010 // ensure not available for conduit subs pending removal 1011 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1012 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1013 1014 // continue removal in separate thread to free up this thread for outstanding responses 1015 // Serialize with removeDestination operations so that removeSubs are serialized with 1016 // removeDestinations such that all removeSub advisories are generated 1017 serialExecutor.execute(new Runnable() { 1018 @Override 1019 public void run() { 1020 sub.waitForCompletion(); 1021 try { 1022 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 1023 } catch (IOException e) { 1024 LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e); 1025 } 1026 } 1027 }); 1028 } 1029 } 1030 1031 protected Message configureMessage(MessageDispatch md) throws IOException { 1032 Message message = md.getMessage().copy(); 1033 // Update the packet to show where it came from. 1034 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 1035 message.setProducerId(producerInfo.getProducerId()); 1036 message.setDestination(md.getDestination()); 1037 message.setMemoryUsage(null); 1038 if (message.getOriginalTransactionId() == null) { 1039 message.setOriginalTransactionId(message.getTransactionId()); 1040 } 1041 message.setTransactionId(null); 1042 if (configuration.isUseCompression()) { 1043 message.compress(); 1044 } 1045 return message; 1046 } 1047 1048 protected void serviceLocalCommand(Command command) { 1049 if (!disposed.get()) { 1050 try { 1051 if (command.isMessageDispatch()) { 1052 safeWaitUntilStarted(); 1053 networkBridgeStatistics.getEnqueues().increment(); 1054 final MessageDispatch md = (MessageDispatch) command; 1055 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 1056 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 1057 1058 if (suppressMessageDispatch(md, sub)) { 1059 LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{ 1060 configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage() 1061 }); 1062 // still ack as it may be durable 1063 try { 1064 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1065 } finally { 1066 sub.decrementOutstandingResponses(); 1067 } 1068 return; 1069 } 1070 1071 Message message = configureMessage(md); 1072 LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{ 1073 configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId()) 1074 }); 1075 if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 1076 try { 1077 // never request b/c they are eventually acked async 1078 remoteBroker.oneway(message); 1079 } finally { 1080 sub.decrementOutstandingResponses(); 1081 } 1082 return; 1083 } 1084 if (isPermissableDestination(md.getDestination())) { 1085 if (message.isPersistent() || configuration.isAlwaysSyncSend()) { 1086 1087 // The message was not sent using async send, so we should only 1088 // ack the local broker when we get confirmation that the remote 1089 // broker has received the message. 1090 remoteBroker.asyncRequest(message, new ResponseCallback() { 1091 @Override 1092 public void onCompletion(FutureResponse future) { 1093 try { 1094 Response response = future.getResult(); 1095 if (response.isException()) { 1096 ExceptionResponse er = (ExceptionResponse) response; 1097 serviceLocalException(md, er.getException()); 1098 } else { 1099 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1100 networkBridgeStatistics.getDequeues().increment(); 1101 } 1102 } catch (IOException e) { 1103 serviceLocalException(md, e); 1104 } finally { 1105 sub.decrementOutstandingResponses(); 1106 } 1107 } 1108 }); 1109 1110 } else { 1111 // If the message was originally sent using async send, we will 1112 // preserve that QOS by bridging it using an async send (small chance 1113 // of message loss). 1114 try { 1115 remoteBroker.oneway(message); 1116 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1117 networkBridgeStatistics.getDequeues().increment(); 1118 } finally { 1119 sub.decrementOutstandingResponses(); 1120 } 1121 } 1122 serviceOutbound(message); 1123 } 1124 } else { 1125 LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); 1126 } 1127 } else if (command.isBrokerInfo()) { 1128 futureLocalBrokerInfo.set((BrokerInfo) command); 1129 } else if (command.isShutdownInfo()) { 1130 LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName()); 1131 stop(); 1132 } else if (command.getClass() == ConnectionError.class) { 1133 ConnectionError ce = (ConnectionError) command; 1134 serviceLocalException(ce.getException()); 1135 } else { 1136 switch (command.getDataStructureType()) { 1137 case WireFormatInfo.DATA_STRUCTURE_TYPE: 1138 break; 1139 case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE: 1140 break; 1141 default: 1142 LOG.warn("Unexpected local command: {}", command); 1143 } 1144 } 1145 } catch (Throwable e) { 1146 LOG.warn("Caught an exception processing local command", e); 1147 serviceLocalException(e); 1148 } 1149 } 1150 } 1151 1152 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { 1153 boolean suppress = false; 1154 // for durable subs, suppression via filter leaves dangling acks so we 1155 // need to check here and allow the ack irrespective 1156 if (sub.getLocalInfo().isDurable()) { 1157 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); 1158 messageEvalContext.setMessageReference(md.getMessage()); 1159 messageEvalContext.setDestination(md.getDestination()); 1160 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); 1161 } 1162 return suppress; 1163 } 1164 1165 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 1166 if (brokerPath != null) { 1167 for (BrokerId id : brokerPath) { 1168 if (brokerId.equals(id)) { 1169 return true; 1170 } 1171 } 1172 } 1173 return false; 1174 } 1175 1176 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 1177 if (brokerPath == null || brokerPath.length == 0) { 1178 return pathsToAppend; 1179 } 1180 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 1181 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1182 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 1183 return rc; 1184 } 1185 1186 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 1187 if (brokerPath == null || brokerPath.length == 0) { 1188 return new BrokerId[]{idToAppend}; 1189 } 1190 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 1191 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1192 rc[brokerPath.length] = idToAppend; 1193 return rc; 1194 } 1195 1196 protected boolean isPermissableDestination(ActiveMQDestination destination) { 1197 return isPermissableDestination(destination, false); 1198 } 1199 1200 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 1201 // Are we not bridging temporary destinations? 1202 if (destination.isTemporary()) { 1203 if (allowTemporary) { 1204 return true; 1205 } else { 1206 return configuration.isBridgeTempDestinations(); 1207 } 1208 } 1209 1210 ActiveMQDestination[] dests = excludedDestinations; 1211 if (dests != null && dests.length > 0) { 1212 for (ActiveMQDestination dest : dests) { 1213 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest); 1214 if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1215 return false; 1216 } 1217 } 1218 } 1219 1220 dests = staticallyIncludedDestinations; 1221 if (dests != null && dests.length > 0) { 1222 for (ActiveMQDestination dest : dests) { 1223 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1224 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1225 return true; 1226 } 1227 } 1228 } 1229 1230 dests = dynamicallyIncludedDestinations; 1231 if (dests != null && dests.length > 0) { 1232 for (ActiveMQDestination dest : dests) { 1233 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1234 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1235 return true; 1236 } 1237 } 1238 1239 return false; 1240 } 1241 1242 return true; 1243 } 1244 1245 private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination destination) { 1246 ActiveMQDestination[] dests = dynamicallyIncludedDestinations; 1247 if (dests != null && dests.length > 0) { 1248 for (ActiveMQDestination dest : dests) { 1249 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1250 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1251 return true; 1252 } 1253 } 1254 } 1255 1256 return false; 1257 } 1258 1259 protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) { 1260 if (dests != null && dests.length > 0) { 1261 for (ActiveMQDestination dest : dests) { 1262 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1263 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1264 return dest; 1265 } 1266 } 1267 } 1268 1269 return null; 1270 } 1271 1272 /** 1273 * Subscriptions for these destinations are always created 1274 */ 1275 protected void setupStaticDestinations() { 1276 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1277 if (dests != null) { 1278 for (ActiveMQDestination dest : dests) { 1279 if (isPermissableDestination(dest)) { 1280 DemandSubscription sub = createDemandSubscription(dest, null); 1281 sub.setStaticallyIncluded(true); 1282 try { 1283 addSubscription(sub); 1284 } catch (IOException e) { 1285 LOG.error("Failed to add static destination {}", dest, e); 1286 } 1287 LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest); 1288 } else { 1289 LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest); 1290 } 1291 } 1292 } 1293 } 1294 1295 protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 1296 ConsumerInfo info = consumerInfo.copy(); 1297 addRemoteBrokerToBrokerPath(info); 1298 DemandSubscription sub = createDemandSubscription(info); 1299 if (sub != null) { 1300 if (duplicateSuppressionIsRequired(sub)) { 1301 undoMapRegistration(sub); 1302 } else { 1303 if (consumerInfo.isDurable()) { 1304 sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); 1305 } 1306 addSubscription(sub); 1307 LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub); 1308 } 1309 } 1310 } 1311 1312 private void undoMapRegistration(DemandSubscription sub) { 1313 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1314 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1315 } 1316 1317 /* 1318 * check our existing subs networkConsumerIds against the list of network 1319 * ids in this subscription A match means a duplicate which we suppress for 1320 * topics and maybe for queues 1321 */ 1322 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 1323 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 1324 boolean suppress = false; 1325 1326 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic() 1327 && !configuration.isSuppressDuplicateTopicSubscriptions()) { 1328 return suppress; 1329 } 1330 1331 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 1332 Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination()); 1333 for (Subscription sub : currentSubs) { 1334 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 1335 if (!networkConsumers.isEmpty()) { 1336 if (matchFound(candidateConsumers, networkConsumers)) { 1337 if (isInActiveDurableSub(sub)) { 1338 suppress = false; 1339 } else { 1340 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 1341 } 1342 break; 1343 } 1344 } 1345 } 1346 return suppress; 1347 } 1348 1349 private boolean isInActiveDurableSub(Subscription sub) { 1350 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); 1351 } 1352 1353 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1354 boolean suppress = false; 1355 1356 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1357 LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{ 1358 configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds() 1359 }); 1360 suppress = true; 1361 } else { 1362 // remove the existing lower priority duplicate and allow this candidate 1363 try { 1364 removeDuplicateSubscription(existingSub); 1365 1366 LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{ 1367 configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds() 1368 }); 1369 } catch (IOException e) { 1370 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e); 1371 } 1372 } 1373 return suppress; 1374 } 1375 1376 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1377 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1378 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1379 break; 1380 } 1381 } 1382 } 1383 1384 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1385 boolean found = false; 1386 for (ConsumerId aliasConsumer : networkConsumers) { 1387 if (candidateConsumers.contains(aliasConsumer)) { 1388 found = true; 1389 break; 1390 } 1391 } 1392 return found; 1393 } 1394 1395 protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) { 1396 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker(); 1397 Region region; 1398 Collection<Subscription> subs; 1399 1400 region = null; 1401 switch (dest.getDestinationType()) { 1402 case ActiveMQDestination.QUEUE_TYPE: 1403 region = region_broker.getQueueRegion(); 1404 break; 1405 case ActiveMQDestination.TOPIC_TYPE: 1406 region = region_broker.getTopicRegion(); 1407 break; 1408 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1409 region = region_broker.getTempQueueRegion(); 1410 break; 1411 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1412 region = region_broker.getTempTopicRegion(); 1413 break; 1414 } 1415 1416 if (region instanceof AbstractRegion) { 1417 subs = ((AbstractRegion) region).getSubscriptions().values(); 1418 } else { 1419 subs = null; 1420 } 1421 1422 return subs; 1423 } 1424 1425 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1426 // add our original id to ourselves 1427 info.addNetworkConsumerId(info.getConsumerId()); 1428 return doCreateDemandSubscription(info); 1429 } 1430 1431 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1432 DemandSubscription result = new DemandSubscription(info); 1433 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1434 if (info.getDestination().isTemporary()) { 1435 // reset the local connection Id 1436 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1437 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1438 } 1439 1440 if (configuration.isDecreaseNetworkConsumerPriority()) { 1441 byte priority = (byte) configuration.getConsumerPriorityBase(); 1442 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1443 // The longer the path to the consumer, the less it's consumer priority. 1444 priority -= info.getBrokerPath().length + 1; 1445 } 1446 result.getLocalInfo().setPriority(priority); 1447 LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info}); 1448 } 1449 configureDemandSubscription(info, result); 1450 return result; 1451 } 1452 1453 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) { 1454 ConsumerInfo info = new ConsumerInfo(); 1455 info.setNetworkSubscription(true); 1456 info.setDestination(destination); 1457 1458 if (subscriptionName != null) { 1459 info.setSubscriptionName(subscriptionName); 1460 } 1461 1462 // Indicate that this subscription is being made on behalf of the remote broker. 1463 info.setBrokerPath(new BrokerId[]{remoteBrokerId}); 1464 1465 // the remote info held by the DemandSubscription holds the original 1466 // consumerId, the local info get's overwritten 1467 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1468 DemandSubscription result = null; 1469 try { 1470 result = createDemandSubscription(info); 1471 } catch (IOException e) { 1472 LOG.error("Failed to create DemandSubscription ", e); 1473 } 1474 return result; 1475 } 1476 1477 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1478 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) || 1479 AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { 1480 sub.getLocalInfo().setDispatchAsync(true); 1481 } else { 1482 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1483 } 1484 configureConsumerPrefetch(sub.getLocalInfo()); 1485 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1486 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1487 1488 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info)); 1489 if (!info.isDurable()) { 1490 // This works for now since we use a VM connection to the local broker. 1491 // may need to change if we ever subscribe to a remote broker. 1492 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); 1493 } else { 1494 sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); 1495 } 1496 } 1497 1498 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1499 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1500 LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{ 1501 configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub 1502 }); 1503 if (sub != null) { 1504 removeSubscription(sub); 1505 LOG.debug("{} removed sub on {} from {}: {}", new Object[]{ 1506 configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo() 1507 }); 1508 } 1509 } 1510 1511 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1512 boolean removeDone = false; 1513 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1514 if (sub != null) { 1515 try { 1516 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1517 removeDone = true; 1518 } catch (IOException e) { 1519 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e); 1520 } 1521 } 1522 return removeDone; 1523 } 1524 1525 /** 1526 * Performs a timed wait on the started latch and then checks for disposed 1527 * before performing another wait each time the the started wait times out. 1528 */ 1529 protected boolean safeWaitUntilStarted() throws InterruptedException { 1530 while (!disposed.get()) { 1531 if (startedLatch.await(1, TimeUnit.SECONDS)) { 1532 break; 1533 } 1534 } 1535 return !disposed.get(); 1536 } 1537 1538 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 1539 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory; 1540 if (brokerService != null && brokerService.getDestinationPolicy() != null) { 1541 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination()); 1542 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) { 1543 filterFactory = entry.getNetworkBridgeFilterFactory(); 1544 } 1545 } 1546 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL()); 1547 } 1548 1549 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 1550 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); 1551 } 1552 1553 protected BrokerId[] getRemoteBrokerPath() { 1554 return remoteBrokerPath; 1555 } 1556 1557 @Override 1558 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1559 this.networkBridgeListener = listener; 1560 } 1561 1562 private void fireBridgeFailed(Throwable reason) { 1563 LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason); 1564 NetworkBridgeListener l = this.networkBridgeListener; 1565 if (l != null && this.bridgeFailed.compareAndSet(false, true)) { 1566 l.bridgeFailed(); 1567 } 1568 } 1569 1570 /** 1571 * @return Returns the dynamicallyIncludedDestinations. 1572 */ 1573 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 1574 return dynamicallyIncludedDestinations; 1575 } 1576 1577 /** 1578 * @param dynamicallyIncludedDestinations 1579 * The dynamicallyIncludedDestinations to set. 1580 */ 1581 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 1582 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 1583 } 1584 1585 /** 1586 * @return Returns the excludedDestinations. 1587 */ 1588 public ActiveMQDestination[] getExcludedDestinations() { 1589 return excludedDestinations; 1590 } 1591 1592 /** 1593 * @param excludedDestinations The excludedDestinations to set. 1594 */ 1595 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 1596 this.excludedDestinations = excludedDestinations; 1597 } 1598 1599 /** 1600 * @return Returns the staticallyIncludedDestinations. 1601 */ 1602 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 1603 return staticallyIncludedDestinations; 1604 } 1605 1606 /** 1607 * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. 1608 */ 1609 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 1610 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 1611 } 1612 1613 /** 1614 * @return Returns the durableDestinations. 1615 */ 1616 public ActiveMQDestination[] getDurableDestinations() { 1617 return durableDestinations; 1618 } 1619 1620 /** 1621 * @param durableDestinations The durableDestinations to set. 1622 */ 1623 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 1624 this.durableDestinations = durableDestinations; 1625 } 1626 1627 /** 1628 * @return Returns the localBroker. 1629 */ 1630 public Transport getLocalBroker() { 1631 return localBroker; 1632 } 1633 1634 /** 1635 * @return Returns the remoteBroker. 1636 */ 1637 public Transport getRemoteBroker() { 1638 return remoteBroker; 1639 } 1640 1641 /** 1642 * @return the createdByDuplex 1643 */ 1644 public boolean isCreatedByDuplex() { 1645 return this.createdByDuplex; 1646 } 1647 1648 /** 1649 * @param createdByDuplex the createdByDuplex to set 1650 */ 1651 public void setCreatedByDuplex(boolean createdByDuplex) { 1652 this.createdByDuplex = createdByDuplex; 1653 } 1654 1655 @Override 1656 public String getRemoteAddress() { 1657 return remoteBroker.getRemoteAddress(); 1658 } 1659 1660 @Override 1661 public String getLocalAddress() { 1662 return localBroker.getRemoteAddress(); 1663 } 1664 1665 @Override 1666 public String getRemoteBrokerName() { 1667 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1668 } 1669 1670 @Override 1671 public String getRemoteBrokerId() { 1672 return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString(); 1673 } 1674 1675 @Override 1676 public String getLocalBrokerName() { 1677 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1678 } 1679 1680 @Override 1681 public long getDequeueCounter() { 1682 return networkBridgeStatistics.getDequeues().getCount(); 1683 } 1684 1685 @Override 1686 public long getEnqueueCounter() { 1687 return networkBridgeStatistics.getEnqueues().getCount(); 1688 } 1689 1690 @Override 1691 public NetworkBridgeStatistics getNetworkBridgeStatistics() { 1692 return networkBridgeStatistics; 1693 } 1694 1695 protected boolean isDuplex() { 1696 return configuration.isDuplex() || createdByDuplex; 1697 } 1698 1699 public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() { 1700 return subscriptionMapByRemoteId; 1701 } 1702 1703 @Override 1704 public void setBrokerService(BrokerService brokerService) { 1705 this.brokerService = brokerService; 1706 this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); 1707 localBrokerPath[0] = localBrokerId; 1708 } 1709 1710 @Override 1711 public void setMbeanObjectName(ObjectName objectName) { 1712 this.mbeanObjectName = objectName; 1713 } 1714 1715 @Override 1716 public ObjectName getMbeanObjectName() { 1717 return mbeanObjectName; 1718 } 1719 1720 @Override 1721 public void resetStats() { 1722 networkBridgeStatistics.reset(); 1723 } 1724 1725 /* 1726 * Used to allow for async tasks to await receipt of the BrokerInfo from the local and 1727 * remote sides of the network bridge. 1728 */ 1729 private static class FutureBrokerInfo implements Future<BrokerInfo> { 1730 1731 private final CountDownLatch slot = new CountDownLatch(1); 1732 private final AtomicBoolean disposed; 1733 private volatile BrokerInfo info = null; 1734 1735 public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) { 1736 this.info = info; 1737 this.disposed = disposed; 1738 } 1739 1740 @Override 1741 public boolean cancel(boolean mayInterruptIfRunning) { 1742 slot.countDown(); 1743 return true; 1744 } 1745 1746 @Override 1747 public boolean isCancelled() { 1748 return slot.getCount() == 0 && info == null; 1749 } 1750 1751 @Override 1752 public boolean isDone() { 1753 return info != null; 1754 } 1755 1756 @Override 1757 public BrokerInfo get() throws InterruptedException, ExecutionException { 1758 try { 1759 if (info == null) { 1760 while (!disposed.get()) { 1761 if (slot.await(1, TimeUnit.SECONDS)) { 1762 break; 1763 } 1764 } 1765 } 1766 return info; 1767 } catch (InterruptedException e) { 1768 Thread.currentThread().interrupt(); 1769 LOG.debug("Operation interrupted: {}", e, e); 1770 throw new InterruptedException("Interrupted."); 1771 } 1772 } 1773 1774 @Override 1775 public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 1776 try { 1777 if (info == null) { 1778 long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 1779 1780 while (!disposed.get() || System.currentTimeMillis() < deadline) { 1781 if (slot.await(1, TimeUnit.MILLISECONDS)) { 1782 break; 1783 } 1784 } 1785 if (info == null) { 1786 throw new TimeoutException(); 1787 } 1788 } 1789 return info; 1790 } catch (InterruptedException e) { 1791 throw new InterruptedException("Interrupted."); 1792 } 1793 } 1794 1795 public void set(BrokerInfo info) { 1796 this.info = info; 1797 this.slot.countDown(); 1798 } 1799 } 1800 1801 protected void serviceOutbound(Message message) { 1802 NetworkBridgeListener l = this.networkBridgeListener; 1803 if (l != null) { 1804 l.onOutboundMessage(this, message); 1805 } 1806 } 1807 1808 protected void serviceInboundMessage(Message message) { 1809 NetworkBridgeListener l = this.networkBridgeListener; 1810 if (l != null) { 1811 l.onInboundMessage(this, message); 1812 } 1813 } 1814 1815 protected boolean canDuplexDispatch(Message message) { 1816 boolean result = true; 1817 if (configuration.isCheckDuplicateMessagesOnDuplex()){ 1818 final long producerSequenceId = message.getMessageId().getProducerSequenceId(); 1819 // messages are multiplexed on this producer so we need to query the persistenceAdapter 1820 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId()); 1821 if (producerSequenceId <= lastStoredForMessageProducer) { 1822 result = false; 1823 LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ 1824 (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer 1825 }); 1826 } 1827 } 1828 return result; 1829 } 1830 1831 protected long getStoredSequenceIdForMessage(MessageId messageId) { 1832 try { 1833 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 1834 } catch (IOException ignored) { 1835 LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); 1836 } 1837 return -1; 1838 } 1839 1840 protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) { 1841 //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly 1842 //set then use it, else default to the prefetchSize setting 1843 if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) && 1844 configuration.getAdvisoryPrefetchSize() > 0) { 1845 consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize()); 1846 } else { 1847 consumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 1848 } 1849 } 1850 1851}