001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.io.Serializable; 020import java.net.URI; 021import java.util.HashMap; 022 023import javax.jms.JMSException; 024import javax.resource.NotSupportedException; 025import javax.resource.ResourceException; 026import javax.resource.spi.ActivationSpec; 027import javax.resource.spi.BootstrapContext; 028import javax.resource.spi.ResourceAdapterInternalException; 029import javax.resource.spi.endpoint.MessageEndpointFactory; 030import javax.transaction.xa.XAException; 031import javax.transaction.xa.XAResource; 032import javax.transaction.xa.Xid; 033 034import org.apache.activemq.ActiveMQConnection; 035import org.apache.activemq.ActiveMQConnectionFactory; 036import org.apache.activemq.RedeliveryPolicy; 037import org.apache.activemq.TransactionContext; 038import org.apache.activemq.broker.BrokerFactory; 039import org.apache.activemq.broker.BrokerService; 040import org.apache.activemq.util.ServiceSupport; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Knows how to connect to one ActiveMQ server. It can then activate endpoints 046 * and deliver messages to those end points using the connection configure in 047 * the resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4) 048 * 049 * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true" 050 * description="The JCA Resource Adaptor for ActiveMQ" 051 * 052 */ 053public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements Serializable, MessageResourceAdapter { 054 private static final long serialVersionUID = 360805587169336959L; 055 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class); 056 private transient final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>(); 057 058 private transient BootstrapContext bootstrapContext; 059 private String brokerXmlConfig; 060 private transient BrokerService broker; 061 private transient Thread brokerStartThread; 062 private ActiveMQConnectionFactory connectionFactory; 063 064 /** 065 * 066 */ 067 public ActiveMQResourceAdapter() { 068 super(); 069 } 070 071 /** 072 * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext) 073 */ 074 @Override 075 public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { 076 this.bootstrapContext = bootstrapContext; 077 if (brokerXmlConfig != null && brokerXmlConfig.trim().length() > 0) { 078 brokerStartThread = new Thread("Starting ActiveMQ Broker") { 079 @Override 080 public void run () { 081 try { 082 // ensure RAR resources are available to xbean (needed for weblogic) 083 log.debug("original thread context classLoader: " + Thread.currentThread().getContextClassLoader()); 084 Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); 085 log.debug("current (from getClass()) thread context classLoader: " + Thread.currentThread().getContextClassLoader()); 086 087 synchronized( ActiveMQResourceAdapter.this ) { 088 broker = BrokerFactory.createBroker(new URI(brokerXmlConfig)); 089 } 090 broker.start(); 091 // Default the ServerUrl to the local broker if not specified in the ra.xml 092 if (getServerUrl() == null) { 093 setServerUrl("vm://" + broker.getBrokerName() + "?create=false"); 094 } 095 } catch (Throwable e) { 096 log.warn("Could not start up embeded ActiveMQ Broker '"+brokerXmlConfig+"': "+e.getMessage()); 097 log.debug("Reason for: "+e.getMessage(), e); 098 } 099 } 100 }; 101 brokerStartThread.setDaemon(true); 102 brokerStartThread.start(); 103 104 // Wait up to 5 seconds for the broker to start up in the async thread.. otherwise keep going without it.. 105 try { 106 brokerStartThread.join(1000*5); 107 } catch (InterruptedException e) { 108 Thread.currentThread().interrupt(); 109 } 110 } 111 } 112 113 public ActiveMQConnection makeConnection() throws JMSException { 114 if( connectionFactory == null ) { 115 return makeConnection(getInfo()); 116 } else { 117 return makeConnection(getInfo(), connectionFactory); 118 } 119 } 120 121 /** 122 * @param activationSpec 123 */ 124 @Override 125 public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException { 126 ActiveMQConnectionFactory cf = getConnectionFactory(); 127 if (cf == null) { 128 cf = createConnectionFactory(getInfo(), activationSpec); 129 } 130 String userName = defaultValue(activationSpec.getUserName(), getInfo().getUserName()); 131 String password = defaultValue(activationSpec.getPassword(), getInfo().getPassword()); 132 String clientId = activationSpec.getClientId(); 133 if (clientId != null) { 134 cf.setClientID(clientId); 135 } else { 136 if (activationSpec.isDurableSubscription()) { 137 log.warn("No clientID specified for durable subscription: " + activationSpec); 138 } 139 } 140 ActiveMQConnection physicalConnection = (ActiveMQConnection) cf.createConnection(userName, password); 141 142 // have we configured a redelivery policy 143 RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy(); 144 if (redeliveryPolicy != null) { 145 physicalConnection.setRedeliveryPolicy(redeliveryPolicy); 146 } 147 return physicalConnection; 148 } 149 150 /** 151 * @see javax.resource.spi.ResourceAdapter#stop() 152 */ 153 @Override 154 public void stop() { 155 synchronized (endpointWorkers) { 156 while (endpointWorkers.size() > 0) { 157 ActiveMQEndpointActivationKey key = endpointWorkers.keySet().iterator().next(); 158 endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec()); 159 } 160 } 161 162 synchronized( this ) { 163 if (broker != null) { 164 if( brokerStartThread.isAlive() ) { 165 brokerStartThread.interrupt(); 166 } 167 ServiceSupport.dispose(broker); 168 broker = null; 169 } 170 } 171 172 this.bootstrapContext = null; 173 } 174 175 /** 176 * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext() 177 */ 178 @Override 179 public BootstrapContext getBootstrapContext() { 180 return bootstrapContext; 181 } 182 183 /** 184 * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory, 185 * javax.resource.spi.ActivationSpec) 186 */ 187 @Override 188 public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException { 189 190 // spec section 5.3.3 191 if (!equals(activationSpec.getResourceAdapter())) { 192 throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")"); 193 } 194 195 if (!(activationSpec instanceof MessageActivationSpec)) { 196 throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass()); 197 } 198 199 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); 200 // This is weird.. the same endpoint activated twice.. must be a 201 // container error. 202 if (endpointWorkers.containsKey(key)) { 203 throw new IllegalStateException("Endpoint previously activated"); 204 } 205 206 ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key); 207 208 endpointWorkers.put(key, worker); 209 worker.start(); 210 } 211 212 /** 213 * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory, 214 * javax.resource.spi.ActivationSpec) 215 */ 216 @Override 217 public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) { 218 if (activationSpec instanceof MessageActivationSpec) { 219 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec)activationSpec); 220 ActiveMQEndpointWorker worker = null; 221 synchronized (endpointWorkers) { 222 worker = endpointWorkers.remove(key); 223 } 224 if (worker == null) { 225 // This is weird.. that endpoint was not activated.. oh well.. 226 // this method 227 // does not throw exceptions so just return. 228 return; 229 } 230 try { 231 worker.stop(); 232 } catch (InterruptedException e) { 233 // We interrupted.. we won't throw an exception but will stop 234 // waiting for the worker 235 // to stop.. we tried our best. Keep trying to interrupt the 236 // thread. 237 Thread.currentThread().interrupt(); 238 } 239 240 } 241 242 } 243 244 /** 245 * We only connect to one resource manager per ResourceAdapter instance, so 246 * any ActivationSpec will return the same XAResource. 247 * 248 * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[]) 249 */ 250 @Override 251 public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { 252 try { 253 return new XAResource[]{ 254 new TransactionContext() { 255 256 @Override 257 public boolean isSameRM(XAResource xaresource) throws XAException { 258 ActiveMQConnection original = null; 259 try { 260 original = setConnection(newConnection()); 261 boolean result = super.isSameRM(xaresource); 262 LOG.trace("{}.recover({})={}", getConnection(), xaresource, result); 263 return result; 264 265 } catch (JMSException e) { 266 LOG.trace("isSameRM({}) failed", xaresource, e); 267 XAException xaException = new XAException(e.getMessage()); 268 throw xaException; 269 } finally { 270 closeConnection(original); 271 } 272 } 273 274 @Override 275 protected String getResourceManagerId() throws JMSException { 276 ActiveMQConnection original = null; 277 try { 278 original = setConnection(newConnection()); 279 return super.getResourceManagerId(); 280 } finally { 281 closeConnection(original); 282 } 283 } 284 285 @Override 286 public void commit(Xid xid, boolean onePhase) throws XAException { 287 ActiveMQConnection original = null; 288 try { 289 setConnection(newConnection()); 290 super.commit(xid, onePhase); 291 LOG.trace("{}.commit({},{})", getConnection(), xid); 292 293 } catch (JMSException e) { 294 LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e); 295 throwXAException(e); 296 } finally { 297 closeConnection(original); 298 } 299 } 300 301 @Override 302 public void rollback(Xid xid) throws XAException { 303 ActiveMQConnection original = null; 304 try { 305 original = setConnection(newConnection()); 306 super.rollback(xid); 307 LOG.trace("{}.rollback({})", getConnection(), xid); 308 309 } catch (JMSException e) { 310 LOG.trace("{}.rollback({}) failed", getConnection(), xid, e); 311 throwXAException(e); 312 } finally { 313 closeConnection(original); 314 } 315 } 316 317 @Override 318 public Xid[] recover(int flags) throws XAException { 319 Xid[] result = new Xid[]{}; 320 ActiveMQConnection original = null; 321 try { 322 original = setConnection(newConnection()); 323 result = super.recover(flags); 324 LOG.trace("{}.recover({})={}", getConnection(), flags, result); 325 326 } catch (JMSException e) { 327 LOG.trace("{}.recover({}) failed", getConnection(), flags, e); 328 throwXAException(e); 329 } finally { 330 closeConnection(original); 331 } 332 return result; 333 } 334 335 @Override 336 public void forget(Xid xid) throws XAException { 337 ActiveMQConnection original = null; 338 try { 339 original = setConnection(newConnection()); 340 super.forget(xid); 341 LOG.trace("{}.forget({})", getConnection(), xid); 342 343 } catch (JMSException e) { 344 LOG.trace("{}.forget({}) failed", getConnection(), xid, e); 345 throwXAException(e); 346 } finally { 347 closeConnection(original); 348 } 349 } 350 351 private void throwXAException(JMSException e) throws XAException { 352 XAException xaException = new XAException(e.getMessage()); 353 xaException.errorCode = XAException.XAER_RMFAIL; 354 throw xaException; 355 } 356 357 private ActiveMQConnection newConnection() throws JMSException { 358 ActiveMQConnection connection = null; 359 try { 360 connection = makeConnection(); 361 connection.start(); 362 } catch (JMSException ex) { 363 if (connection != null) { 364 try { 365 connection.close(); 366 } catch (JMSException ignore) { } 367 } 368 throw ex; 369 } 370 return connection; 371 } 372 373 private void closeConnection(ActiveMQConnection original) { 374 ActiveMQConnection connection = getConnection(); 375 if (connection != null) { 376 try { 377 connection.close(); 378 } catch (JMSException ignored) {} 379 } 380 setConnection(original); 381 } 382 }}; 383 384 } catch (Exception e) { 385 throw new ResourceException(e); 386 } 387 } 388 389 // /////////////////////////////////////////////////////////////////////// 390 // 391 // Java Bean getters and setters for this ResourceAdapter class. 392 // 393 // /////////////////////////////////////////////////////////////////////// 394 395 /** 396 * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig() 397 */ 398 @Override 399 public String getBrokerXmlConfig() { 400 return brokerXmlConfig; 401 } 402 403 /** 404 * Sets the <a href="http://activemq.org/Xml+Configuration">XML 405 * configuration file </a> used to configure the ActiveMQ broker via Spring 406 * if using embedded mode. 407 * 408 * @param brokerXmlConfig is the filename which is assumed to be on the 409 * classpath unless a URL is specified. So a value of 410 * <code>foo/bar.xml</code> would be assumed to be on the 411 * classpath whereas <code>file:dir/file.xml</code> would 412 * use the file system. Any valid URL string is supported. 413 */ 414 public void setBrokerXmlConfig(String brokerXmlConfig) { 415 this.brokerXmlConfig = brokerXmlConfig; 416 } 417 418 /** 419 * @see java.lang.Object#equals(java.lang.Object) 420 */ 421 @Override 422 public boolean equals(Object o) { 423 if (this == o) { 424 return true; 425 } 426 if (!(o instanceof MessageResourceAdapter)) { 427 return false; 428 } 429 430 final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter)o; 431 432 if (!getInfo().equals(activeMQResourceAdapter.getInfo())) { 433 return false; 434 } 435 if (notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig())) { 436 return false; 437 } 438 439 return true; 440 } 441 442 /** 443 * @see java.lang.Object#hashCode() 444 */ 445 @Override 446 public int hashCode() { 447 int result; 448 result = getInfo().hashCode(); 449 if (brokerXmlConfig != null) { 450 result ^= brokerXmlConfig.hashCode(); 451 } 452 return result; 453 } 454 455 public ActiveMQConnectionFactory getConnectionFactory() { 456 return connectionFactory; 457 } 458 459 public void setConnectionFactory(ActiveMQConnectionFactory aConnectionFactory) { 460 this.connectionFactory = aConnectionFactory; 461 } 462 463 464 }