001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import javax.jms.Connection; 023import javax.jms.ConnectionConsumer; 024import javax.jms.ConnectionMetaData; 025import javax.jms.Destination; 026import javax.jms.ExceptionListener; 027import javax.jms.IllegalStateException; 028import javax.jms.JMSException; 029import javax.jms.Queue; 030import javax.jms.QueueConnection; 031import javax.jms.QueueSession; 032import javax.jms.ServerSessionPool; 033import javax.jms.Session; 034import javax.jms.Topic; 035import javax.jms.TopicConnection; 036import javax.jms.TopicSession; 037import javax.resource.spi.ConnectionRequestInfo; 038import org.apache.activemq.ActiveMQQueueSession; 039import org.apache.activemq.ActiveMQSession; 040import org.apache.activemq.ActiveMQTopicSession; 041 042/** 043 * Acts as a pass through proxy for a JMS Connection object. It intercepts 044 * events that are of interest of the ActiveMQManagedConnection. 045 * 046 * 047 */ 048public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener { 049 050 private ActiveMQManagedConnection managedConnection; 051 private final List<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>(); 052 private ExceptionListener exceptionListener; 053 private ActiveMQConnectionRequestInfo info; 054 055 public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection, ConnectionRequestInfo info) { 056 this.managedConnection = managedConnection; 057 if (info instanceof ActiveMQConnectionRequestInfo) { 058 this.info = (ActiveMQConnectionRequestInfo) info; 059 } 060 } 061 062 /** 063 * Used to let the ActiveMQManagedConnection that this connection handel is 064 * not needed by the app. 065 * 066 * @throws JMSException 067 */ 068 public void close() throws JMSException { 069 if (managedConnection != null) { 070 managedConnection.proxyClosedEvent(this); 071 } 072 } 073 074 /** 075 * Called by the ActiveMQManagedConnection to invalidate this proxy. 076 */ 077 public void cleanup() { 078 exceptionListener = null; 079 managedConnection = null; 080 synchronized (sessions) { 081 for (ManagedSessionProxy p : sessions) { 082 try { 083 //TODO is this dangerous? should we copy the list before iterating? 084 p.cleanup(); 085 } catch (JMSException ignore) { 086 } 087 } 088 sessions.clear(); 089 } 090 } 091 092 /** 093 * @return "physical" underlying activemq connection, if proxy is associated with a managed connection 094 * @throws javax.jms.JMSException if managed connection is null 095 */ 096 private Connection getConnection() throws JMSException { 097 if (managedConnection == null) { 098 throw new IllegalStateException("The Connection is closed"); 099 } 100 return managedConnection.getPhysicalConnection(); 101 } 102 103 /** 104 * @param transacted Whether session is transacted 105 * @param acknowledgeMode session acknowledge mode 106 * @return session proxy 107 * @throws JMSException on error 108 */ 109 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 110 return createSessionProxy(transacted, acknowledgeMode); 111 } 112 113 /** 114 * @param transacted Whether session is transacted 115 * @param acknowledgeMode session acknowledge mode 116 * @return session proxy 117 * @throws JMSException on error 118 */ 119 private ManagedSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException { 120 ActiveMQSession session; 121 if (info != null && info.isUseSessionArgs()) { 122 session = (ActiveMQSession) getConnection().createSession(transacted, transacted ? Session.SESSION_TRANSACTED : acknowledgeMode); 123 } else { 124 session = (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); 125 } 126 ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext()); 127 session.setTransactionContext(txContext); 128 ManagedSessionProxy p = new ManagedSessionProxy(session, this); 129 p.setUseSharedTxContext(managedConnection.isInManagedTx()); 130 synchronized (sessions) { 131 sessions.add(p); 132 } 133 return p; 134 } 135 136 protected void sessionClosed(ManagedSessionProxy session) { 137 synchronized (sessions) { 138 sessions.remove(session); 139 } 140 } 141 142 public void setUseSharedTxContext(boolean enable) throws JMSException { 143 synchronized (sessions) { 144 for (ManagedSessionProxy p : sessions) { 145 p.setUseSharedTxContext(enable); 146 } 147 } 148 } 149 150 /** 151 * @param transacted Whether session is transacted 152 * @param acknowledgeMode session acknowledge mode 153 * @return session proxy 154 * @throws JMSException on error 155 */ 156 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 157 return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode)); 158 } 159 160 /** 161 * @param transacted Whether session is transacted 162 * @param acknowledgeMode session acknowledge mode 163 * @return session proxy 164 * @throws JMSException on error 165 */ 166 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 167 return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode)); 168 } 169 170 /** 171 * @return client id from delegate 172 * @throws JMSException 173 */ 174 public String getClientID() throws JMSException { 175 return getConnection().getClientID(); 176 } 177 178 /** 179 * @return exception listener from delegate 180 * @throws JMSException 181 */ 182 public ExceptionListener getExceptionListener() throws JMSException { 183 return getConnection().getExceptionListener(); 184 } 185 186 /** 187 * @return connection metadata from delegate 188 * @throws JMSException 189 */ 190 public ConnectionMetaData getMetaData() throws JMSException { 191 return getConnection().getMetaData(); 192 } 193 194 /** 195 * Sets client id on delegate 196 * @param clientID new clientId 197 * @throws JMSException 198 */ 199 public void setClientID(String clientID) throws JMSException { 200 getConnection().setClientID(clientID); 201 } 202 203 /** 204 * sets exception listener on delegate 205 * @param listener new listener 206 * @throws JMSException 207 */ 208 public void setExceptionListener(ExceptionListener listener) throws JMSException { 209 getConnection(); 210 exceptionListener = listener; 211 } 212 213 /** 214 * @throws JMSException 215 */ 216 public void start() throws JMSException { 217 getConnection().start(); 218 } 219 220 /** 221 * @throws JMSException 222 */ 223 public void stop() throws JMSException { 224 getConnection().stop(); 225 } 226 227 /** 228 * @param queue 229 * @param messageSelector 230 * @param sessionPool 231 * @param maxMessages 232 * @return 233 * @throws JMSException 234 */ 235 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 236 throw new JMSException("Not Supported."); 237 } 238 239 /** 240 * @param topic 241 * @param messageSelector 242 * @param sessionPool 243 * @param maxMessages 244 * @return 245 * @throws JMSException 246 */ 247 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 248 throw new JMSException("Not Supported."); 249 } 250 251 /** 252 * @param destination 253 * @param messageSelector 254 * @param sessionPool 255 * @param maxMessages 256 * @return 257 * @throws JMSException 258 */ 259 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 260 throw new JMSException("Not Supported."); 261 } 262 263 /** 264 * @param topic 265 * @param subscriptionName 266 * @param messageSelector 267 * @param sessionPool 268 * @param maxMessages 269 * @return 270 * @throws JMSException 271 */ 272 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 273 throw new JMSException("Not Supported."); 274 } 275 276 /** 277 * @return Returns the managedConnection. 278 */ 279 public ActiveMQManagedConnection getManagedConnection() { 280 return managedConnection; 281 } 282 283 public void onException(JMSException e) { 284 if (exceptionListener != null && managedConnection != null) { 285 try { 286 exceptionListener.onException(e); 287 } catch (Throwable ignore) { 288 // We can never trust user code so ignore any exceptions. 289 } 290 } 291 } 292 293}