001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.io.IOException; 020import java.security.GeneralSecurityException; 021import java.security.cert.X509Certificate; 022import java.util.Arrays; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Properties; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentMap; 031import java.util.concurrent.CountDownLatch; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.Future; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.atomic.AtomicBoolean; 039 040import javax.management.ObjectName; 041 042import org.apache.activemq.DestinationDoesNotExistException; 043import org.apache.activemq.Service; 044import org.apache.activemq.advisory.AdvisoryBroker; 045import org.apache.activemq.advisory.AdvisorySupport; 046import org.apache.activemq.broker.BrokerService; 047import org.apache.activemq.broker.BrokerServiceAware; 048import org.apache.activemq.broker.ConnectionContext; 049import org.apache.activemq.broker.TransportConnection; 050import org.apache.activemq.broker.region.AbstractRegion; 051import org.apache.activemq.broker.region.DurableTopicSubscription; 052import org.apache.activemq.broker.region.Region; 053import org.apache.activemq.broker.region.RegionBroker; 054import org.apache.activemq.broker.region.Subscription; 055import org.apache.activemq.broker.region.policy.PolicyEntry; 056import org.apache.activemq.command.ActiveMQDestination; 057import org.apache.activemq.command.ActiveMQMessage; 058import org.apache.activemq.command.ActiveMQTempDestination; 059import org.apache.activemq.command.ActiveMQTopic; 060import org.apache.activemq.command.BrokerId; 061import org.apache.activemq.command.BrokerInfo; 062import org.apache.activemq.command.BrokerSubscriptionInfo; 063import org.apache.activemq.command.Command; 064import org.apache.activemq.command.CommandTypes; 065import org.apache.activemq.command.ConnectionError; 066import org.apache.activemq.command.ConnectionId; 067import org.apache.activemq.command.ConnectionInfo; 068import org.apache.activemq.command.ConsumerId; 069import org.apache.activemq.command.ConsumerInfo; 070import org.apache.activemq.command.DataStructure; 071import org.apache.activemq.command.DestinationInfo; 072import org.apache.activemq.command.ExceptionResponse; 073import org.apache.activemq.command.KeepAliveInfo; 074import org.apache.activemq.command.Message; 075import org.apache.activemq.command.MessageAck; 076import org.apache.activemq.command.MessageDispatch; 077import org.apache.activemq.command.MessageId; 078import org.apache.activemq.command.NetworkBridgeFilter; 079import org.apache.activemq.command.ProducerInfo; 080import org.apache.activemq.command.RemoveInfo; 081import org.apache.activemq.command.RemoveSubscriptionInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionInfo; 084import org.apache.activemq.command.ShutdownInfo; 085import org.apache.activemq.command.SubscriptionInfo; 086import org.apache.activemq.command.WireFormatInfo; 087import org.apache.activemq.filter.DestinationFilter; 088import org.apache.activemq.filter.MessageEvaluationContext; 089import org.apache.activemq.security.SecurityContext; 090import org.apache.activemq.transport.DefaultTransportListener; 091import org.apache.activemq.transport.FutureResponse; 092import org.apache.activemq.transport.ResponseCallback; 093import org.apache.activemq.transport.Transport; 094import org.apache.activemq.transport.TransportDisposedIOException; 095import org.apache.activemq.transport.TransportFilter; 096import org.apache.activemq.transport.failover.FailoverTransport; 097import org.apache.activemq.transport.tcp.SslTransport; 098import org.apache.activemq.transport.tcp.TcpTransport; 099import org.apache.activemq.util.IdGenerator; 100import org.apache.activemq.util.IntrospectionSupport; 101import org.apache.activemq.util.LongSequenceGenerator; 102import org.apache.activemq.util.MarshallingSupport; 103import org.apache.activemq.util.NetworkBridgeUtils; 104import org.apache.activemq.util.ServiceStopper; 105import org.apache.activemq.util.ServiceSupport; 106import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter; 107import org.slf4j.Logger; 108import org.slf4j.LoggerFactory; 109 110/** 111 * A useful base class for implementing demand forwarding bridges. 112 */ 113public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { 114 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); 115 protected static final String DURABLE_SUB_PREFIX = "NC-DS_"; 116 protected final Transport localBroker; 117 protected final Transport remoteBroker; 118 protected IdGenerator idGenerator = new IdGenerator(); 119 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 120 protected ConnectionInfo localConnectionInfo; 121 protected ConnectionInfo remoteConnectionInfo; 122 protected SessionInfo localSessionInfo; 123 protected ProducerInfo producerInfo; 124 protected String remoteBrokerName = "Unknown"; 125 protected String localClientId; 126 protected ConsumerInfo demandConsumerInfo; 127 protected int demandConsumerDispatched; 128 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false); 129 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false); 130 protected final AtomicBoolean bridgeFailed = new AtomicBoolean(); 131 protected final AtomicBoolean disposed = new AtomicBoolean(); 132 protected BrokerId localBrokerId; 133 protected ActiveMQDestination[] excludedDestinations; 134 protected ActiveMQDestination[] dynamicallyIncludedDestinations; 135 protected ActiveMQDestination[] staticallyIncludedDestinations; 136 protected ActiveMQDestination[] durableDestinations; 137 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>(); 138 protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>(); 139 protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>()); 140 protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; 141 protected final CountDownLatch startedLatch = new CountDownLatch(2); 142 protected final CountDownLatch localStartedLatch = new CountDownLatch(1); 143 protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1); 144 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); 145 protected NetworkBridgeConfiguration configuration; 146 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory(); 147 148 protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null}; 149 protected BrokerId remoteBrokerId; 150 151 protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics(); 152 153 private NetworkBridgeListener networkBridgeListener; 154 private boolean createdByDuplex; 155 private BrokerInfo localBrokerInfo; 156 private BrokerInfo remoteBrokerInfo; 157 158 private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed); 159 private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed); 160 161 private final AtomicBoolean started = new AtomicBoolean(); 162 private TransportConnection duplexInitiatingConnection; 163 private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean(); 164 protected BrokerService brokerService = null; 165 private ObjectName mbeanObjectName; 166 private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); 167 //Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads 168 private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor(); 169 private Transport duplexInboundLocalBroker = null; 170 private ProducerInfo duplexInboundLocalProducerInfo; 171 172 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { 173 this.configuration = configuration; 174 this.localBroker = localBroker; 175 this.remoteBroker = remoteBroker; 176 } 177 178 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { 179 this.localBrokerInfo = localBrokerInfo; 180 this.remoteBrokerInfo = remoteBrokerInfo; 181 this.duplexInitiatingConnection = connection; 182 start(); 183 serviceRemoteCommand(remoteBrokerInfo); 184 } 185 186 @Override 187 public void start() throws Exception { 188 if (started.compareAndSet(false, true)) { 189 190 if (brokerService == null) { 191 throw new IllegalArgumentException("BrokerService is null on " + this); 192 } 193 194 networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics()); 195 196 if (isDuplex()) { 197 duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI()); 198 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() { 199 200 @Override 201 public void onCommand(Object o) { 202 Command command = (Command) o; 203 serviceLocalCommand(command); 204 } 205 206 @Override 207 public void onException(IOException error) { 208 serviceLocalException(error); 209 } 210 }); 211 duplexInboundLocalBroker.start(); 212 } 213 214 localBroker.setTransportListener(new DefaultTransportListener() { 215 216 @Override 217 public void onCommand(Object o) { 218 Command command = (Command) o; 219 serviceLocalCommand(command); 220 } 221 222 @Override 223 public void onException(IOException error) { 224 if (!futureLocalBrokerInfo.isDone()) { 225 LOG.info("error with pending local brokerInfo on: " + localBroker, error); 226 futureLocalBrokerInfo.cancel(true); 227 return; 228 } 229 serviceLocalException(error); 230 } 231 }); 232 233 remoteBroker.setTransportListener(new DefaultTransportListener() { 234 235 @Override 236 public void onCommand(Object o) { 237 Command command = (Command) o; 238 serviceRemoteCommand(command); 239 } 240 241 @Override 242 public void onException(IOException error) { 243 if (!futureRemoteBrokerInfo.isDone()) { 244 LOG.info("error with pending remote brokerInfo on: " + remoteBroker, error); 245 futureRemoteBrokerInfo.cancel(true); 246 return; 247 } 248 serviceRemoteException(error); 249 } 250 }); 251 252 remoteBroker.start(); 253 localBroker.start(); 254 255 if (!disposed.get()) { 256 try { 257 triggerStartAsyncNetworkBridgeCreation(); 258 } catch (IOException e) { 259 LOG.warn("Caught exception from remote start", e); 260 } 261 } else { 262 LOG.warn("Bridge was disposed before the start() method was fully executed."); 263 throw new TransportDisposedIOException(); 264 } 265 } 266 } 267 268 @Override 269 public void stop() throws Exception { 270 if (started.compareAndSet(true, false)) { 271 if (disposed.compareAndSet(false, true)) { 272 LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName); 273 274 futureRemoteBrokerInfo.cancel(true); 275 futureLocalBrokerInfo.cancel(true); 276 277 NetworkBridgeListener l = this.networkBridgeListener; 278 if (l != null) { 279 l.onStop(this); 280 } 281 try { 282 // local start complete 283 if (startedLatch.getCount() < 2) { 284 LOG.trace("{} unregister bridge ({}) to {}", new Object[]{ 285 configuration.getBrokerName(), this, remoteBrokerName 286 }); 287 brokerService.getBroker().removeBroker(null, remoteBrokerInfo); 288 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo); 289 } 290 291 remoteBridgeStarted.set(false); 292 final CountDownLatch sendShutdown = new CountDownLatch(1); 293 294 brokerService.getTaskRunnerFactory().execute(new Runnable() { 295 @Override 296 public void run() { 297 try { 298 serialExecutor.shutdown(); 299 if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 300 List<Runnable> pendingTasks = serialExecutor.shutdownNow(); 301 LOG.info("pending tasks on stop {}", pendingTasks); 302 } 303 //Shutdown the syncExecutor, call countDown to make sure a thread can 304 //terminate if it is waiting 305 staticDestinationsLatch.countDown(); 306 syncExecutor.shutdown(); 307 if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 308 List<Runnable> pendingTasks = syncExecutor.shutdownNow(); 309 LOG.info("pending tasks on stop {}", pendingTasks); 310 } 311 localBroker.oneway(new ShutdownInfo()); 312 remoteBroker.oneway(new ShutdownInfo()); 313 } catch (Throwable e) { 314 LOG.debug("Caught exception sending shutdown", e); 315 } finally { 316 sendShutdown.countDown(); 317 } 318 319 } 320 }, "ActiveMQ ForwardingBridge StopTask"); 321 322 if (!sendShutdown.await(10, TimeUnit.SECONDS)) { 323 LOG.info("Network Could not shutdown in a timely manner"); 324 } 325 } finally { 326 ServiceStopper ss = new ServiceStopper(); 327 stopFailoverTransport(remoteBroker); 328 ss.stop(remoteBroker); 329 ss.stop(localBroker); 330 ss.stop(duplexInboundLocalBroker); 331 // Release the started Latch since another thread could be 332 // stuck waiting for it to start up. 333 startedLatch.countDown(); 334 startedLatch.countDown(); 335 localStartedLatch.countDown(); 336 staticDestinationsLatch.countDown(); 337 338 ss.throwFirstException(); 339 } 340 } 341 342 LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName); 343 } 344 } 345 346 private void stopFailoverTransport(Transport transport) { 347 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 348 if (failoverTransport != null) { 349 // may be blocked on write, in which case stop will block 350 try { 351 failoverTransport.handleTransportFailure(new IOException("Bridge stopped")); 352 } catch (InterruptedException ignored) {} 353 } 354 } 355 356 protected void triggerStartAsyncNetworkBridgeCreation() throws IOException { 357 brokerService.getTaskRunnerFactory().execute(new Runnable() { 358 @Override 359 public void run() { 360 final String originalName = Thread.currentThread().getName(); 361 Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " + 362 "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker); 363 364 try { 365 // First we collect the info data from both the local and remote ends 366 collectBrokerInfos(); 367 368 // Once we have all required broker info we can attempt to start 369 // the local and then remote sides of the bridge. 370 doStartLocalAndRemoteBridges(); 371 } finally { 372 Thread.currentThread().setName(originalName); 373 } 374 } 375 }); 376 } 377 378 private void collectBrokerInfos() { 379 int timeout = 30000; 380 TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class); 381 if (tcpTransport != null) { 382 timeout = tcpTransport.getConnectionTimeout(); 383 } 384 385 // First wait for the remote to feed us its BrokerInfo, then we can check on 386 // the LocalBrokerInfo and decide is this is a loop. 387 try { 388 remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS); 389 if (remoteBrokerInfo == null) { 390 serviceLocalException(new Throwable("remoteBrokerInfo is null")); 391 return; 392 } 393 } catch (Exception e) { 394 serviceRemoteException(e); 395 return; 396 } 397 398 try { 399 localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS); 400 if (localBrokerInfo == null) { 401 serviceLocalException(new Throwable("localBrokerInfo is null")); 402 return; 403 } 404 405 // Before we try and build the bridge lets check if we are in a loop 406 // and if so just stop now before registering anything. 407 remoteBrokerId = remoteBrokerInfo.getBrokerId(); 408 if (localBrokerId.equals(remoteBrokerId)) { 409 LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{ 410 configuration.getBrokerName(), remoteBrokerName, remoteBrokerId 411 }); 412 ServiceSupport.dispose(localBroker); 413 ServiceSupport.dispose(remoteBroker); 414 // the bridge is left in a bit of limbo, but it won't get retried 415 // in this state. 416 return; 417 } 418 419 // Fill in the remote broker's information now. 420 remoteBrokerPath[0] = remoteBrokerId; 421 remoteBrokerName = remoteBrokerInfo.getBrokerName(); 422 if (configuration.isUseBrokerNamesAsIdSeed()) { 423 idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName); 424 } 425 } catch (Throwable e) { 426 serviceLocalException(e); 427 } 428 } 429 430 private void doStartLocalAndRemoteBridges() { 431 432 if (disposed.get()) { 433 return; 434 } 435 436 if (isCreatedByDuplex()) { 437 // apply remote (propagated) configuration to local duplex bridge before start 438 Properties props = null; 439 try { 440 props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties()); 441 IntrospectionSupport.getProperties(configuration, props, null); 442 if (configuration.getExcludedDestinations() != null) { 443 excludedDestinations = configuration.getExcludedDestinations().toArray( 444 new ActiveMQDestination[configuration.getExcludedDestinations().size()]); 445 } 446 if (configuration.getStaticallyIncludedDestinations() != null) { 447 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray( 448 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]); 449 } 450 if (configuration.getDynamicallyIncludedDestinations() != null) { 451 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray( 452 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]); 453 } 454 } catch (Throwable t) { 455 LOG.error("Error mapping remote configuration: {}", props, t); 456 } 457 } 458 459 try { 460 startLocalBridge(); 461 } catch (Throwable e) { 462 serviceLocalException(e); 463 return; 464 } 465 466 try { 467 startRemoteBridge(); 468 } catch (Throwable e) { 469 serviceRemoteException(e); 470 return; 471 } 472 473 try { 474 if (safeWaitUntilStarted()) { 475 setupStaticDestinations(); 476 staticDestinationsLatch.countDown(); 477 } 478 } catch (Throwable e) { 479 serviceLocalException(e); 480 } 481 } 482 483 private void startLocalBridge() throws Throwable { 484 if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) { 485 synchronized (this) { 486 LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker); 487 if (!disposed.get()) { 488 489 if (idGenerator == null) { 490 throw new IllegalStateException("Id Generator cannot be null"); 491 } 492 493 localConnectionInfo = new ConnectionInfo(); 494 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 495 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); 496 localConnectionInfo.setClientId(localClientId); 497 localConnectionInfo.setUserName(configuration.getUserName()); 498 localConnectionInfo.setPassword(configuration.getPassword()); 499 Transport originalTransport = remoteBroker; 500 while (originalTransport instanceof TransportFilter) { 501 originalTransport = ((TransportFilter) originalTransport).getNext(); 502 } 503 if (originalTransport instanceof TcpTransport) { 504 X509Certificate[] peerCerts = originalTransport.getPeerCertificates(); 505 localConnectionInfo.setTransportContext(peerCerts); 506 } 507 // sync requests that may fail 508 Object resp = localBroker.request(localConnectionInfo); 509 if (resp instanceof ExceptionResponse) { 510 throw ((ExceptionResponse) resp).getException(); 511 } 512 localSessionInfo = new SessionInfo(localConnectionInfo, 1); 513 localBroker.oneway(localSessionInfo); 514 515 if (configuration.isDuplex()) { 516 // separate in-bound channel for forwards so we don't 517 // contend with out-bound dispatch on same connection 518 remoteBrokerInfo.setNetworkConnection(true); 519 duplexInboundLocalBroker.oneway(remoteBrokerInfo); 520 521 ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo(); 522 duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 523 duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_" 524 + configuration.getBrokerName()); 525 duplexLocalConnectionInfo.setUserName(configuration.getUserName()); 526 duplexLocalConnectionInfo.setPassword(configuration.getPassword()); 527 528 if (originalTransport instanceof TcpTransport) { 529 X509Certificate[] peerCerts = originalTransport.getPeerCertificates(); 530 duplexLocalConnectionInfo.setTransportContext(peerCerts); 531 } 532 // sync requests that may fail 533 resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo); 534 if (resp instanceof ExceptionResponse) { 535 throw ((ExceptionResponse) resp).getException(); 536 } 537 SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1); 538 duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1); 539 duplexInboundLocalBroker.oneway(duplexInboundSession); 540 duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo); 541 } 542 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString()); 543 NetworkBridgeListener l = this.networkBridgeListener; 544 if (l != null) { 545 l.onStart(this); 546 } 547 548 // Let the local broker know the remote broker's ID. 549 localBroker.oneway(remoteBrokerInfo); 550 // new peer broker (a consumer can work with remote broker also) 551 brokerService.getBroker().addBroker(null, remoteBrokerInfo); 552 553 LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{ 554 localBroker, remoteBroker, remoteBrokerName 555 }); 556 LOG.trace("{} register bridge ({}) to {}", new Object[]{ 557 configuration.getBrokerName(), this, remoteBrokerName 558 }); 559 } else { 560 LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed."); 561 } 562 startedLatch.countDown(); 563 localStartedLatch.countDown(); 564 } 565 } 566 } 567 568 protected void startRemoteBridge() throws Exception { 569 if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) { 570 LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker); 571 synchronized (this) { 572 if (!isCreatedByDuplex()) { 573 BrokerInfo brokerInfo = new BrokerInfo(); 574 brokerInfo.setBrokerName(configuration.getBrokerName()); 575 brokerInfo.setBrokerURL(configuration.getBrokerURL()); 576 brokerInfo.setNetworkConnection(true); 577 brokerInfo.setDuplexConnection(configuration.isDuplex()); 578 // set our properties 579 Properties props = new Properties(); 580 IntrospectionSupport.getProperties(configuration, props, null); 581 582 String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations"; 583 String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations"; 584 585 if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) { 586 props.put(dynamicallyIncludedDestinationsKey, 587 StringToListOfActiveMQDestinationConverter. 588 convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true)); 589 } 590 if (!configuration.getStaticallyIncludedDestinations().isEmpty()) { 591 props.put(staticallyIncludedDestinationsKey, 592 StringToListOfActiveMQDestinationConverter. 593 convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true)); 594 } 595 596 props.remove("networkTTL"); 597 String str = MarshallingSupport.propertiesToString(props); 598 brokerInfo.setNetworkProperties(str); 599 brokerInfo.setBrokerId(this.localBrokerId); 600 remoteBroker.oneway(brokerInfo); 601 if (configuration.isSyncDurableSubs() && 602 remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { 603 remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService, 604 configuration)); 605 } 606 } 607 if (remoteConnectionInfo != null) { 608 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); 609 } 610 remoteConnectionInfo = new ConnectionInfo(); 611 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 612 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound"); 613 remoteConnectionInfo.setUserName(configuration.getUserName()); 614 remoteConnectionInfo.setPassword(configuration.getPassword()); 615 remoteBroker.oneway(remoteConnectionInfo); 616 617 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1); 618 remoteBroker.oneway(remoteSessionInfo); 619 producerInfo = new ProducerInfo(remoteSessionInfo, 1); 620 producerInfo.setResponseRequired(false); 621 remoteBroker.oneway(producerInfo); 622 // Listen to consumer advisory messages on the remote broker to determine demand. 623 if (!configuration.isStaticBridge()) { 624 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1); 625 // always dispatch advisory message asynchronously so that 626 // we never block the producer broker if we are slow 627 demandConsumerInfo.setDispatchAsync(true); 628 String advisoryTopic = configuration.getDestinationFilter(); 629 if (configuration.isBridgeTempDestinations()) { 630 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC; 631 } 632 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic)); 633 configureConsumerPrefetch(demandConsumerInfo); 634 remoteBroker.oneway(demandConsumerInfo); 635 } 636 startedLatch.countDown(); 637 } 638 } 639 } 640 641 @Override 642 public void serviceRemoteException(Throwable error) { 643 if (!disposed.get()) { 644 if (error instanceof SecurityException || error instanceof GeneralSecurityException) { 645 LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 646 localBroker, remoteBroker, error 647 }); 648 } else { 649 LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{ 650 localBroker, remoteBroker, error 651 }); 652 } 653 LOG.debug("The remote Exception was: {}", error, error); 654 brokerService.getTaskRunnerFactory().execute(new Runnable() { 655 @Override 656 public void run() { 657 ServiceSupport.dispose(getControllingService()); 658 } 659 }); 660 fireBridgeFailed(error); 661 } 662 } 663 664 protected void serviceRemoteCommand(Command command) { 665 if (!disposed.get()) { 666 try { 667 if (command.isMessageDispatch()) { 668 safeWaitUntilStarted(); 669 MessageDispatch md = (MessageDispatch) command; 670 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); 671 ackAdvisory(md.getMessage()); 672 } else if (command.isBrokerInfo()) { 673 futureRemoteBrokerInfo.set((BrokerInfo) command); 674 } else if (command instanceof BrokerSubscriptionInfo) { 675 final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command; 676 677 //Start in a new thread so we don't block the transport waiting for staticDestinations 678 syncExecutor.execute(new Runnable() { 679 680 @Override 681 public void run() { 682 try { 683 staticDestinationsLatch.await(); 684 //Make sure after the countDown of staticDestinationsLatch we aren't stopping 685 if (!disposed.get()) { 686 BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo; 687 LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}", 688 brokerService.getBrokerName(), subInfo.getBrokerName()); 689 690 if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions() 691 && !configuration.isDynamicOnly()) { 692 if (started.get()) { 693 if (subInfo.getSubscriptionInfos() != null) { 694 for (ConsumerInfo info : subInfo.getSubscriptionInfos()) { 695 //re-add any process any non-NC consumers that match the 696 //dynamicallyIncludedDestinations list 697 if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) && 698 NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) { 699 serviceRemoteConsumerAdvisory(info); 700 } 701 } 702 } 703 704 //After re-added, clean up any empty durables 705 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 706 DemandSubscription ds = i.next(); 707 if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) { 708 cleanupDurableSub(ds, i); 709 } 710 } 711 } 712 } 713 } 714 } catch (Exception e) { 715 LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e); 716 LOG.debug(e.getMessage(), e); 717 } 718 } 719 }); 720 721 } else if (command.getClass() == ConnectionError.class) { 722 ConnectionError ce = (ConnectionError) command; 723 serviceRemoteException(ce.getException()); 724 } else { 725 if (isDuplex()) { 726 LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType()); 727 if (command.isMessage()) { 728 final ActiveMQMessage message = (ActiveMQMessage) command; 729 if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 730 serviceRemoteConsumerAdvisory(message.getDataStructure()); 731 ackAdvisory(message); 732 } else { 733 if (!isPermissableDestination(message.getDestination(), true)) { 734 return; 735 } 736 // message being forwarded - we need to 737 // propagate the response to our local send 738 if (canDuplexDispatch(message)) { 739 message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); 740 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 741 duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { 742 final int correlationId = message.getCommandId(); 743 744 @Override 745 public void onCompletion(FutureResponse resp) { 746 try { 747 Response reply = resp.getResult(); 748 reply.setCorrelationId(correlationId); 749 remoteBroker.oneway(reply); 750 //increment counter when messages are received in duplex mode 751 networkBridgeStatistics.getReceivedCount().increment(); 752 } catch (IOException error) { 753 LOG.error("Exception: {} on duplex forward of: {}", error, message); 754 serviceRemoteException(error); 755 } 756 } 757 }); 758 } else { 759 duplexInboundLocalBroker.oneway(message); 760 networkBridgeStatistics.getReceivedCount().increment(); 761 } 762 serviceInboundMessage(message); 763 } else { 764 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { 765 Response reply = new Response(); 766 reply.setCorrelationId(message.getCommandId()); 767 remoteBroker.oneway(reply); 768 } 769 } 770 } 771 } else { 772 switch (command.getDataStructureType()) { 773 case ConnectionInfo.DATA_STRUCTURE_TYPE: 774 if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { 775 // end of initiating connection setup - propogate to initial connection to get mbean by clientid 776 duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); 777 } else { 778 localBroker.oneway(command); 779 } 780 break; 781 case SessionInfo.DATA_STRUCTURE_TYPE: 782 localBroker.oneway(command); 783 break; 784 case ProducerInfo.DATA_STRUCTURE_TYPE: 785 // using duplexInboundLocalProducerInfo 786 break; 787 case MessageAck.DATA_STRUCTURE_TYPE: 788 MessageAck ack = (MessageAck) command; 789 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); 790 if (localSub != null) { 791 ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); 792 localBroker.oneway(ack); 793 } else { 794 LOG.warn("Matching local subscription not found for ack: {}", ack); 795 } 796 break; 797 case ConsumerInfo.DATA_STRUCTURE_TYPE: 798 localStartedLatch.await(); 799 if (started.get()) { 800 final ConsumerInfo consumerInfo = (ConsumerInfo) command; 801 if (isDuplicateSuppressionOff(consumerInfo)) { 802 addConsumerInfo(consumerInfo); 803 } else { 804 synchronized (brokerService.getVmConnectorURI()) { 805 addConsumerInfo(consumerInfo); 806 } 807 } 808 } else { 809 // received a subscription whilst stopping 810 LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); 811 } 812 break; 813 case ShutdownInfo.DATA_STRUCTURE_TYPE: 814 // initiator is shutting down, controlled case 815 // abortive close dealt with by inactivity monitor 816 LOG.info("Stopping network bridge on shutdown of remote broker"); 817 serviceRemoteException(new IOException(command.toString())); 818 break; 819 default: 820 LOG.debug("Ignoring remote command: {}", command); 821 } 822 } 823 } else { 824 switch (command.getDataStructureType()) { 825 case KeepAliveInfo.DATA_STRUCTURE_TYPE: 826 case WireFormatInfo.DATA_STRUCTURE_TYPE: 827 case ShutdownInfo.DATA_STRUCTURE_TYPE: 828 break; 829 default: 830 LOG.warn("Unexpected remote command: {}", command); 831 } 832 } 833 } 834 } catch (Throwable e) { 835 LOG.debug("Exception processing remote command: {}", command, e); 836 serviceRemoteException(e); 837 } 838 } 839 } 840 841 private void ackAdvisory(Message message) throws IOException { 842 demandConsumerDispatched++; 843 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * 844 (configuration.getAdvisoryAckPercentage() / 100f))) { 845 final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched); 846 ack.setConsumerId(demandConsumerInfo.getConsumerId()); 847 brokerService.getTaskRunnerFactory().execute(new Runnable() { 848 @Override 849 public void run() { 850 try { 851 remoteBroker.oneway(ack); 852 } catch (IOException e) { 853 LOG.warn("Failed to send advisory ack " + ack, e); 854 } 855 } 856 }); 857 demandConsumerDispatched = 0; 858 } 859 } 860 861 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException { 862 final int networkTTL = configuration.getConsumerTTL(); 863 if (data.getClass() == ConsumerInfo.class) { 864 // Create a new local subscription 865 ConsumerInfo info = (ConsumerInfo) data; 866 BrokerId[] path = info.getBrokerPath(); 867 868 if (info.isBrowser()) { 869 LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName); 870 return; 871 } 872 873 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 874 LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{ 875 configuration.getBrokerName(), remoteBrokerName, networkTTL, info 876 }); 877 return; 878 } 879 880 if (contains(path, localBrokerPath[0])) { 881 // Ignore this consumer as it's a consumer we locally sent to the broker. 882 LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{ 883 configuration.getBrokerName(), remoteBrokerName, info 884 }); 885 return; 886 } 887 888 if (!isPermissableDestination(info.getDestination())) { 889 // ignore if not in the permitted or in the excluded list 890 LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{ 891 configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info 892 }); 893 return; 894 } 895 896 // in a cyclic network there can be multiple bridges per broker that can propagate 897 // a network subscription so there is a need to synchronize on a shared entity 898 // if duplicate suppression is required 899 if (isDuplicateSuppressionOff(info)) { 900 addConsumerInfo(info); 901 } else { 902 synchronized (brokerService.getVmConnectorURI()) { 903 addConsumerInfo(info); 904 } 905 } 906 } else if (data.getClass() == DestinationInfo.class) { 907 // It's a destination info - we want to pass up information about temporary destinations 908 final DestinationInfo destInfo = (DestinationInfo) data; 909 BrokerId[] path = destInfo.getBrokerPath(); 910 if (path != null && networkTTL > -1 && path.length >= networkTTL) { 911 LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{ 912 configuration.getBrokerName(), destInfo, networkTTL 913 }); 914 return; 915 } 916 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) { 917 LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo); 918 return; 919 } 920 destInfo.setConnectionId(localConnectionInfo.getConnectionId()); 921 if (destInfo.getDestination() instanceof ActiveMQTempDestination) { 922 // re-set connection id so comes from here 923 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); 924 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); 925 } 926 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath())); 927 LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{ 928 configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo 929 }); 930 if (destInfo.isRemoveOperation()) { 931 // Serialize with removeSub operations such that all removeSub advisories 932 // are generated 933 serialExecutor.execute(new Runnable() { 934 @Override 935 public void run() { 936 try { 937 localBroker.oneway(destInfo); 938 } catch (IOException e) { 939 LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e); 940 } 941 } 942 }); 943 } else { 944 localBroker.oneway(destInfo); 945 } 946 } else if (data.getClass() == RemoveInfo.class) { 947 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); 948 removeDemandSubscription(id); 949 950 if (forcedDurableRemoteId.remove(id)) { 951 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 952 DemandSubscription ds = i.next(); 953 boolean removed = ds.removeForcedDurableConsumer(id); 954 if (removed) { 955 cleanupDurableSub(ds, i); 956 } 957 } 958 } 959 960 } else if (data.getClass() == RemoveSubscriptionInfo.class) { 961 RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); 962 SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); 963 for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { 964 DemandSubscription ds = i.next(); 965 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); 966 if (removed) { 967 cleanupDurableSub(ds, i); 968 } 969 } 970 } 971 } 972 973 private void cleanupDurableSub(final DemandSubscription ds, 974 Iterator<DemandSubscription> i) throws IOException { 975 if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty() 976 && ds.getForcedDurableConsumersSize() == 0) { 977 // deactivate subscriber 978 RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId()); 979 localBroker.oneway(removeInfo); 980 981 // remove subscriber 982 RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); 983 sending.setClientId(localClientId); 984 sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName()); 985 sending.setConnectionId(this.localConnectionInfo.getConnectionId()); 986 localBroker.oneway(sending); 987 988 //remove subscriber from map 989 i.remove(); 990 } 991 } 992 993 @Override 994 public void serviceLocalException(Throwable error) { 995 serviceLocalException(null, error); 996 } 997 998 public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) { 999 LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error); 1000 if (!disposed.get()) { 1001 if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) { 1002 // not a reason to terminate the bridge - temps can disappear with 1003 // pending sends as the demand sub may outlive the remote dest 1004 if (messageDispatch != null) { 1005 LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error); 1006 try { 1007 MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1); 1008 poisonAck.setPoisonCause(error); 1009 localBroker.oneway(poisonAck); 1010 } catch (IOException ioe) { 1011 LOG.error("Failed to posion ack message following forward failure: ", ioe); 1012 } 1013 fireFailedForwardAdvisory(messageDispatch, error); 1014 } else { 1015 LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error); 1016 } 1017 return; 1018 } 1019 1020 LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error}); 1021 LOG.debug("The local Exception was: {}", error, error); 1022 1023 brokerService.getTaskRunnerFactory().execute(new Runnable() { 1024 @Override 1025 public void run() { 1026 ServiceSupport.dispose(getControllingService()); 1027 } 1028 }); 1029 fireBridgeFailed(error); 1030 } 1031 } 1032 1033 private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) { 1034 if (configuration.isAdvisoryForFailedForward()) { 1035 AdvisoryBroker advisoryBroker = null; 1036 try { 1037 advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); 1038 1039 if (advisoryBroker != null) { 1040 ConnectionContext context = new ConnectionContext(); 1041 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 1042 context.setBroker(brokerService.getBroker()); 1043 1044 ActiveMQMessage advisoryMessage = new ActiveMQMessage(); 1045 advisoryMessage.setStringProperty("cause", error.getLocalizedMessage()); 1046 advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null, 1047 advisoryMessage); 1048 1049 } 1050 } catch (Exception e) { 1051 LOG.warn("failed to fire forward failure advisory, cause: {}", e); 1052 LOG.debug("detail", e); 1053 } 1054 } 1055 } 1056 1057 protected Service getControllingService() { 1058 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; 1059 } 1060 1061 protected void addSubscription(DemandSubscription sub) throws IOException { 1062 if (sub != null) { 1063 localBroker.oneway(sub.getLocalInfo()); 1064 } 1065 } 1066 1067 protected void removeSubscription(final DemandSubscription sub) throws IOException { 1068 if (sub != null) { 1069 LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()}); 1070 1071 // ensure not available for conduit subs pending removal 1072 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1073 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1074 1075 // continue removal in separate thread to free up this thread for outstanding responses 1076 // Serialize with removeDestination operations so that removeSubs are serialized with 1077 // removeDestinations such that all removeSub advisories are generated 1078 serialExecutor.execute(new Runnable() { 1079 @Override 1080 public void run() { 1081 sub.waitForCompletion(); 1082 try { 1083 localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); 1084 } catch (IOException e) { 1085 LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e); 1086 } 1087 } 1088 }); 1089 } 1090 } 1091 1092 protected Message configureMessage(MessageDispatch md) throws IOException { 1093 Message message = md.getMessage().copy(); 1094 // Update the packet to show where it came from. 1095 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath)); 1096 message.setProducerId(producerInfo.getProducerId()); 1097 message.setDestination(md.getDestination()); 1098 message.setMemoryUsage(null); 1099 if (message.getOriginalTransactionId() == null) { 1100 message.setOriginalTransactionId(message.getTransactionId()); 1101 } 1102 message.setTransactionId(null); 1103 if (configuration.isUseCompression()) { 1104 message.compress(); 1105 } 1106 return message; 1107 } 1108 1109 protected void serviceLocalCommand(Command command) { 1110 if (!disposed.get()) { 1111 try { 1112 if (command.isMessageDispatch()) { 1113 safeWaitUntilStarted(); 1114 networkBridgeStatistics.getEnqueues().increment(); 1115 final MessageDispatch md = (MessageDispatch) command; 1116 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); 1117 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { 1118 1119 if (suppressMessageDispatch(md, sub)) { 1120 LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{ 1121 configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage() 1122 }); 1123 // still ack as it may be durable 1124 try { 1125 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1126 } finally { 1127 sub.decrementOutstandingResponses(); 1128 } 1129 return; 1130 } 1131 1132 Message message = configureMessage(md); 1133 LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{ 1134 configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId()) 1135 }); 1136 if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { 1137 try { 1138 // never request b/c they are eventually acked async 1139 remoteBroker.oneway(message); 1140 } finally { 1141 sub.decrementOutstandingResponses(); 1142 } 1143 return; 1144 } 1145 if (isPermissableDestination(md.getDestination())) { 1146 if (message.isPersistent() || configuration.isAlwaysSyncSend()) { 1147 1148 // The message was not sent using async send, so we should only 1149 // ack the local broker when we get confirmation that the remote 1150 // broker has received the message. 1151 remoteBroker.asyncRequest(message, new ResponseCallback() { 1152 @Override 1153 public void onCompletion(FutureResponse future) { 1154 try { 1155 Response response = future.getResult(); 1156 if (response.isException()) { 1157 ExceptionResponse er = (ExceptionResponse) response; 1158 serviceLocalException(md, er.getException()); 1159 } else { 1160 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1161 networkBridgeStatistics.getDequeues().increment(); 1162 } 1163 } catch (IOException e) { 1164 serviceLocalException(md, e); 1165 } finally { 1166 sub.decrementOutstandingResponses(); 1167 } 1168 } 1169 }); 1170 1171 } else { 1172 // If the message was originally sent using async send, we will 1173 // preserve that QOS by bridging it using an async send (small chance 1174 // of message loss). 1175 try { 1176 remoteBroker.oneway(message); 1177 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); 1178 networkBridgeStatistics.getDequeues().increment(); 1179 } finally { 1180 sub.decrementOutstandingResponses(); 1181 } 1182 } 1183 serviceOutbound(message); 1184 } 1185 } else { 1186 LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); 1187 } 1188 } else if (command.isBrokerInfo()) { 1189 futureLocalBrokerInfo.set((BrokerInfo) command); 1190 } else if (command.isShutdownInfo()) { 1191 LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName()); 1192 stop(); 1193 } else if (command.getClass() == ConnectionError.class) { 1194 ConnectionError ce = (ConnectionError) command; 1195 serviceLocalException(ce.getException()); 1196 } else { 1197 switch (command.getDataStructureType()) { 1198 case WireFormatInfo.DATA_STRUCTURE_TYPE: 1199 break; 1200 case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE: 1201 break; 1202 default: 1203 LOG.warn("Unexpected local command: {}", command); 1204 } 1205 } 1206 } catch (Throwable e) { 1207 LOG.warn("Caught an exception processing local command", e); 1208 serviceLocalException(e); 1209 } 1210 } 1211 } 1212 1213 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception { 1214 boolean suppress = false; 1215 // for durable subs, suppression via filter leaves dangling acks so we 1216 // need to check here and allow the ack irrespective 1217 if (sub.getLocalInfo().isDurable()) { 1218 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); 1219 messageEvalContext.setMessageReference(md.getMessage()); 1220 messageEvalContext.setDestination(md.getDestination()); 1221 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); 1222 //AMQ-6465 - Need to decrement the reference count after checking matches() as 1223 //the call above will increment the reference count by 1 1224 messageEvalContext.getMessageReference().decrementReferenceCount(); 1225 } 1226 return suppress; 1227 } 1228 1229 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { 1230 if (brokerPath != null) { 1231 for (BrokerId id : brokerPath) { 1232 if (brokerId.equals(id)) { 1233 return true; 1234 } 1235 } 1236 } 1237 return false; 1238 } 1239 1240 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) { 1241 if (brokerPath == null || brokerPath.length == 0) { 1242 return pathsToAppend; 1243 } 1244 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length]; 1245 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1246 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length); 1247 return rc; 1248 } 1249 1250 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) { 1251 if (brokerPath == null || brokerPath.length == 0) { 1252 return new BrokerId[]{idToAppend}; 1253 } 1254 BrokerId rc[] = new BrokerId[brokerPath.length + 1]; 1255 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length); 1256 rc[brokerPath.length] = idToAppend; 1257 return rc; 1258 } 1259 1260 protected boolean isPermissableDestination(ActiveMQDestination destination) { 1261 return isPermissableDestination(destination, false); 1262 } 1263 1264 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) { 1265 // Are we not bridging temporary destinations? 1266 if (destination.isTemporary()) { 1267 if (allowTemporary) { 1268 return true; 1269 } else { 1270 return configuration.isBridgeTempDestinations(); 1271 } 1272 } 1273 1274 ActiveMQDestination[] dests = excludedDestinations; 1275 if (dests != null && dests.length > 0) { 1276 for (ActiveMQDestination dest : dests) { 1277 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest); 1278 if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1279 return false; 1280 } 1281 } 1282 } 1283 1284 dests = staticallyIncludedDestinations; 1285 if (dests != null && dests.length > 0) { 1286 for (ActiveMQDestination dest : dests) { 1287 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1288 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1289 return true; 1290 } 1291 } 1292 } 1293 1294 dests = dynamicallyIncludedDestinations; 1295 if (dests != null && dests.length > 0) { 1296 for (ActiveMQDestination dest : dests) { 1297 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest); 1298 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) { 1299 return true; 1300 } 1301 } 1302 1303 return false; 1304 } 1305 1306 return true; 1307 } 1308 1309 /** 1310 * Subscriptions for these destinations are always created 1311 */ 1312 protected void setupStaticDestinations() { 1313 ActiveMQDestination[] dests = staticallyIncludedDestinations; 1314 if (dests != null) { 1315 for (ActiveMQDestination dest : dests) { 1316 if (isPermissableDestination(dest)) { 1317 DemandSubscription sub = createDemandSubscription(dest, null); 1318 sub.setStaticallyIncluded(true); 1319 try { 1320 addSubscription(sub); 1321 } catch (IOException e) { 1322 LOG.error("Failed to add static destination {}", dest, e); 1323 } 1324 LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest); 1325 } else { 1326 LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest); 1327 } 1328 } 1329 } 1330 } 1331 1332 protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { 1333 ConsumerInfo info = consumerInfo.copy(); 1334 addRemoteBrokerToBrokerPath(info); 1335 DemandSubscription sub = createDemandSubscription(info); 1336 if (sub != null) { 1337 if (duplicateSuppressionIsRequired(sub)) { 1338 undoMapRegistration(sub); 1339 } else { 1340 if (consumerInfo.isDurable()) { 1341 sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); 1342 } 1343 addSubscription(sub); 1344 LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub); 1345 } 1346 } 1347 } 1348 1349 private void undoMapRegistration(DemandSubscription sub) { 1350 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); 1351 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); 1352 } 1353 1354 /* 1355 * check our existing subs networkConsumerIds against the list of network 1356 * ids in this subscription A match means a duplicate which we suppress for 1357 * topics and maybe for queues 1358 */ 1359 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) { 1360 final ConsumerInfo consumerInfo = candidate.getRemoteInfo(); 1361 boolean suppress = false; 1362 1363 if (isDuplicateSuppressionOff(consumerInfo)) { 1364 return suppress; 1365 } 1366 1367 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds(); 1368 Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination()); 1369 for (Subscription sub : currentSubs) { 1370 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); 1371 if (!networkConsumers.isEmpty()) { 1372 if (matchFound(candidateConsumers, networkConsumers)) { 1373 if (isInActiveDurableSub(sub)) { 1374 suppress = false; 1375 } else { 1376 suppress = hasLowerPriority(sub, candidate.getLocalInfo()); 1377 } 1378 break; 1379 } 1380 } 1381 } 1382 return suppress; 1383 } 1384 1385 private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) { 1386 return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions() 1387 || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() 1388 || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions(); 1389 } 1390 1391 private boolean isInActiveDurableSub(Subscription sub) { 1392 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive()); 1393 } 1394 1395 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { 1396 boolean suppress = false; 1397 1398 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { 1399 LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{ 1400 configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds() 1401 }); 1402 suppress = true; 1403 } else { 1404 // remove the existing lower priority duplicate and allow this candidate 1405 try { 1406 removeDuplicateSubscription(existingSub); 1407 1408 LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{ 1409 configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds() 1410 }); 1411 } catch (IOException e) { 1412 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e); 1413 } 1414 } 1415 return suppress; 1416 } 1417 1418 private void removeDuplicateSubscription(Subscription existingSub) throws IOException { 1419 for (NetworkConnector connector : brokerService.getNetworkConnectors()) { 1420 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) { 1421 break; 1422 } 1423 } 1424 } 1425 1426 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) { 1427 boolean found = false; 1428 for (ConsumerId aliasConsumer : networkConsumers) { 1429 if (candidateConsumers.contains(aliasConsumer)) { 1430 found = true; 1431 break; 1432 } 1433 } 1434 return found; 1435 } 1436 1437 protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) { 1438 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker(); 1439 Region region; 1440 Collection<Subscription> subs; 1441 1442 region = null; 1443 switch (dest.getDestinationType()) { 1444 case ActiveMQDestination.QUEUE_TYPE: 1445 region = region_broker.getQueueRegion(); 1446 break; 1447 case ActiveMQDestination.TOPIC_TYPE: 1448 region = region_broker.getTopicRegion(); 1449 break; 1450 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1451 region = region_broker.getTempQueueRegion(); 1452 break; 1453 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1454 region = region_broker.getTempTopicRegion(); 1455 break; 1456 } 1457 1458 if (region instanceof AbstractRegion) { 1459 subs = ((AbstractRegion) region).getSubscriptions().values(); 1460 } else { 1461 subs = null; 1462 } 1463 1464 return subs; 1465 } 1466 1467 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException { 1468 // add our original id to ourselves 1469 info.addNetworkConsumerId(info.getConsumerId()); 1470 return doCreateDemandSubscription(info); 1471 } 1472 1473 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException { 1474 DemandSubscription result = new DemandSubscription(info); 1475 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1476 if (info.getDestination().isTemporary()) { 1477 // reset the local connection Id 1478 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination(); 1479 dest.setConnectionId(localConnectionInfo.getConnectionId().toString()); 1480 } 1481 1482 if (configuration.isDecreaseNetworkConsumerPriority()) { 1483 byte priority = (byte) configuration.getConsumerPriorityBase(); 1484 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) { 1485 // The longer the path to the consumer, the less it's consumer priority. 1486 priority -= info.getBrokerPath().length + 1; 1487 } 1488 result.getLocalInfo().setPriority(priority); 1489 LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info}); 1490 } 1491 configureDemandSubscription(info, result); 1492 return result; 1493 } 1494 1495 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) { 1496 ConsumerInfo info = new ConsumerInfo(); 1497 info.setNetworkSubscription(true); 1498 info.setDestination(destination); 1499 1500 if (subscriptionName != null) { 1501 info.setSubscriptionName(subscriptionName); 1502 } 1503 1504 // Indicate that this subscription is being made on behalf of the remote broker. 1505 info.setBrokerPath(new BrokerId[]{remoteBrokerId}); 1506 1507 // the remote info held by the DemandSubscription holds the original 1508 // consumerId, the local info get's overwritten 1509 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); 1510 DemandSubscription result = null; 1511 try { 1512 result = createDemandSubscription(info); 1513 } catch (IOException e) { 1514 LOG.error("Failed to create DemandSubscription ", e); 1515 } 1516 return result; 1517 } 1518 1519 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { 1520 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) || 1521 AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { 1522 sub.getLocalInfo().setDispatchAsync(true); 1523 } else { 1524 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); 1525 } 1526 configureConsumerPrefetch(sub.getLocalInfo()); 1527 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); 1528 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); 1529 1530 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info)); 1531 if (!info.isDurable()) { 1532 // This works for now since we use a VM connection to the local broker. 1533 // may need to change if we ever subscribe to a remote broker. 1534 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter()); 1535 } else { 1536 sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); 1537 } 1538 } 1539 1540 protected void removeDemandSubscription(ConsumerId id) throws IOException { 1541 DemandSubscription sub = subscriptionMapByRemoteId.remove(id); 1542 LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{ 1543 configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub 1544 }); 1545 if (sub != null) { 1546 removeSubscription(sub); 1547 LOG.debug("{} removed sub on {} from {}: {}", new Object[]{ 1548 configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo() 1549 }); 1550 } 1551 } 1552 1553 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) { 1554 boolean removeDone = false; 1555 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId); 1556 if (sub != null) { 1557 try { 1558 removeDemandSubscription(sub.getRemoteInfo().getConsumerId()); 1559 removeDone = true; 1560 } catch (IOException e) { 1561 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e); 1562 } 1563 } 1564 return removeDone; 1565 } 1566 1567 /** 1568 * Performs a timed wait on the started latch and then checks for disposed 1569 * before performing another wait each time the the started wait times out. 1570 */ 1571 protected boolean safeWaitUntilStarted() throws InterruptedException { 1572 while (!disposed.get()) { 1573 if (startedLatch.await(1, TimeUnit.SECONDS)) { 1574 break; 1575 } 1576 } 1577 return !disposed.get(); 1578 } 1579 1580 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { 1581 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory; 1582 if (brokerService != null && brokerService.getDestinationPolicy() != null) { 1583 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination()); 1584 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) { 1585 filterFactory = entry.getNetworkBridgeFilterFactory(); 1586 } 1587 } 1588 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL()); 1589 } 1590 1591 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException { 1592 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath())); 1593 } 1594 1595 protected BrokerId[] getRemoteBrokerPath() { 1596 return remoteBrokerPath; 1597 } 1598 1599 @Override 1600 public void setNetworkBridgeListener(NetworkBridgeListener listener) { 1601 this.networkBridgeListener = listener; 1602 } 1603 1604 private void fireBridgeFailed(Throwable reason) { 1605 LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason); 1606 NetworkBridgeListener l = this.networkBridgeListener; 1607 if (l != null && this.bridgeFailed.compareAndSet(false, true)) { 1608 l.bridgeFailed(); 1609 } 1610 } 1611 1612 /** 1613 * @return Returns the dynamicallyIncludedDestinations. 1614 */ 1615 public ActiveMQDestination[] getDynamicallyIncludedDestinations() { 1616 return dynamicallyIncludedDestinations; 1617 } 1618 1619 /** 1620 * @param dynamicallyIncludedDestinations 1621 * The dynamicallyIncludedDestinations to set. 1622 */ 1623 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) { 1624 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations; 1625 } 1626 1627 /** 1628 * @return Returns the excludedDestinations. 1629 */ 1630 public ActiveMQDestination[] getExcludedDestinations() { 1631 return excludedDestinations; 1632 } 1633 1634 /** 1635 * @param excludedDestinations The excludedDestinations to set. 1636 */ 1637 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) { 1638 this.excludedDestinations = excludedDestinations; 1639 } 1640 1641 /** 1642 * @return Returns the staticallyIncludedDestinations. 1643 */ 1644 public ActiveMQDestination[] getStaticallyIncludedDestinations() { 1645 return staticallyIncludedDestinations; 1646 } 1647 1648 /** 1649 * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set. 1650 */ 1651 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) { 1652 this.staticallyIncludedDestinations = staticallyIncludedDestinations; 1653 } 1654 1655 /** 1656 * @return Returns the durableDestinations. 1657 */ 1658 public ActiveMQDestination[] getDurableDestinations() { 1659 return durableDestinations; 1660 } 1661 1662 /** 1663 * @param durableDestinations The durableDestinations to set. 1664 */ 1665 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) { 1666 this.durableDestinations = durableDestinations; 1667 } 1668 1669 /** 1670 * @return Returns the localBroker. 1671 */ 1672 public Transport getLocalBroker() { 1673 return localBroker; 1674 } 1675 1676 /** 1677 * @return Returns the remoteBroker. 1678 */ 1679 public Transport getRemoteBroker() { 1680 return remoteBroker; 1681 } 1682 1683 /** 1684 * @return the createdByDuplex 1685 */ 1686 public boolean isCreatedByDuplex() { 1687 return this.createdByDuplex; 1688 } 1689 1690 /** 1691 * @param createdByDuplex the createdByDuplex to set 1692 */ 1693 public void setCreatedByDuplex(boolean createdByDuplex) { 1694 this.createdByDuplex = createdByDuplex; 1695 } 1696 1697 @Override 1698 public String getRemoteAddress() { 1699 return remoteBroker.getRemoteAddress(); 1700 } 1701 1702 @Override 1703 public String getLocalAddress() { 1704 return localBroker.getRemoteAddress(); 1705 } 1706 1707 @Override 1708 public String getRemoteBrokerName() { 1709 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); 1710 } 1711 1712 @Override 1713 public String getRemoteBrokerId() { 1714 return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString(); 1715 } 1716 1717 @Override 1718 public String getLocalBrokerName() { 1719 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); 1720 } 1721 1722 @Override 1723 public long getDequeueCounter() { 1724 return networkBridgeStatistics.getDequeues().getCount(); 1725 } 1726 1727 @Override 1728 public long getEnqueueCounter() { 1729 return networkBridgeStatistics.getEnqueues().getCount(); 1730 } 1731 1732 @Override 1733 public NetworkBridgeStatistics getNetworkBridgeStatistics() { 1734 return networkBridgeStatistics; 1735 } 1736 1737 protected boolean isDuplex() { 1738 return configuration.isDuplex() || createdByDuplex; 1739 } 1740 1741 public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() { 1742 return subscriptionMapByRemoteId; 1743 } 1744 1745 @Override 1746 public void setBrokerService(BrokerService brokerService) { 1747 this.brokerService = brokerService; 1748 this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); 1749 localBrokerPath[0] = localBrokerId; 1750 } 1751 1752 @Override 1753 public void setMbeanObjectName(ObjectName objectName) { 1754 this.mbeanObjectName = objectName; 1755 } 1756 1757 @Override 1758 public ObjectName getMbeanObjectName() { 1759 return mbeanObjectName; 1760 } 1761 1762 @Override 1763 public void resetStats() { 1764 networkBridgeStatistics.reset(); 1765 } 1766 1767 /* 1768 * Used to allow for async tasks to await receipt of the BrokerInfo from the local and 1769 * remote sides of the network bridge. 1770 */ 1771 private static class FutureBrokerInfo implements Future<BrokerInfo> { 1772 1773 private final CountDownLatch slot = new CountDownLatch(1); 1774 private final AtomicBoolean disposed; 1775 private volatile BrokerInfo info = null; 1776 1777 public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) { 1778 this.info = info; 1779 this.disposed = disposed; 1780 } 1781 1782 @Override 1783 public boolean cancel(boolean mayInterruptIfRunning) { 1784 slot.countDown(); 1785 return true; 1786 } 1787 1788 @Override 1789 public boolean isCancelled() { 1790 return slot.getCount() == 0 && info == null; 1791 } 1792 1793 @Override 1794 public boolean isDone() { 1795 return info != null; 1796 } 1797 1798 @Override 1799 public BrokerInfo get() throws InterruptedException, ExecutionException { 1800 try { 1801 if (info == null) { 1802 while (!disposed.get()) { 1803 if (slot.await(1, TimeUnit.SECONDS)) { 1804 break; 1805 } 1806 } 1807 } 1808 return info; 1809 } catch (InterruptedException e) { 1810 Thread.currentThread().interrupt(); 1811 LOG.debug("Operation interrupted: {}", e, e); 1812 throw new InterruptedException("Interrupted."); 1813 } 1814 } 1815 1816 @Override 1817 public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 1818 try { 1819 if (info == null) { 1820 long deadline = System.currentTimeMillis() + unit.toMillis(timeout); 1821 1822 while (!disposed.get() || System.currentTimeMillis() < deadline) { 1823 if (slot.await(1, TimeUnit.MILLISECONDS)) { 1824 break; 1825 } 1826 } 1827 if (info == null) { 1828 throw new TimeoutException(); 1829 } 1830 } 1831 return info; 1832 } catch (InterruptedException e) { 1833 throw new InterruptedException("Interrupted."); 1834 } 1835 } 1836 1837 public void set(BrokerInfo info) { 1838 this.info = info; 1839 this.slot.countDown(); 1840 } 1841 } 1842 1843 protected void serviceOutbound(Message message) { 1844 NetworkBridgeListener l = this.networkBridgeListener; 1845 if (l != null) { 1846 l.onOutboundMessage(this, message); 1847 } 1848 } 1849 1850 protected void serviceInboundMessage(Message message) { 1851 NetworkBridgeListener l = this.networkBridgeListener; 1852 if (l != null) { 1853 l.onInboundMessage(this, message); 1854 } 1855 } 1856 1857 protected boolean canDuplexDispatch(Message message) { 1858 boolean result = true; 1859 if (configuration.isCheckDuplicateMessagesOnDuplex()){ 1860 final long producerSequenceId = message.getMessageId().getProducerSequenceId(); 1861 // messages are multiplexed on this producer so we need to query the persistenceAdapter 1862 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId()); 1863 if (producerSequenceId <= lastStoredForMessageProducer) { 1864 result = false; 1865 LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ 1866 (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer 1867 }); 1868 } 1869 } 1870 return result; 1871 } 1872 1873 protected long getStoredSequenceIdForMessage(MessageId messageId) { 1874 try { 1875 return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId()); 1876 } catch (IOException ignored) { 1877 LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored); 1878 } 1879 return -1; 1880 } 1881 1882 protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) { 1883 //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly 1884 //set then use it, else default to the prefetchSize setting 1885 if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) && 1886 configuration.getAdvisoryPrefetchSize() > 0) { 1887 consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize()); 1888 } else { 1889 consumerInfo.setPrefetchSize(configuration.getPrefetchSize()); 1890 } 1891 } 1892 1893}