[jboss-cvs] jboss-jms/src/main/org/jboss/jms/server/endpoint ...
Timothy Fox
tim.fox at jboss.com
Thu Jul 27 15:01:54 EDT 2006
User: timfox
Date: 06/07/27 15:01:54
Modified: src/main/org/jboss/jms/server/endpoint
BrowserEndpoint.java ConnectionEndpoint.java
ConnectionFactoryEndpoint.java
ServerConnectionEndpoint.java
ServerConnectionFactoryEndpoint.java
ServerConsumerEndpoint.java
ServerSessionEndpoint.java
Log:
Mainly http://jira.jboss.com/jira/browse/JBMESSAGING-434 plus a few other bits and pieces
Revision Changes Path
1.6 +3 -3 jboss-jms/src/main/org/jboss/jms/server/endpoint/BrowserEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BrowserEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/BrowserEndpoint.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -b -r1.5 -r1.6
--- BrowserEndpoint.java 29 Dec 2005 14:09:56 -0000 1.5
+++ BrowserEndpoint.java 27 Jul 2006 19:01:54 -0000 1.6
@@ -31,9 +31,9 @@
* of the methods are handled in the advice stack.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.5 $</tt>
+ * @version <tt>$Revision: 1.6 $</tt>
*
- * $Id: BrowserEndpoint.java,v 1.5 2005/12/29 14:09:56 timfox Exp $
+ * $Id: BrowserEndpoint.java,v 1.6 2006/07/27 19:01:54 timfox Exp $
*/
public interface BrowserEndpoint extends Closeable
{
1.12 +3 -3 jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ConnectionEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -b -r1.11 -r1.12
--- ConnectionEndpoint.java 17 Jul 2006 17:14:45 -0000 1.11
+++ ConnectionEndpoint.java 27 Jul 2006 19:01:54 -0000 1.12
@@ -34,9 +34,9 @@
* The rest of the methods are handled in the advice stack.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.11 $</tt>
+ * @version <tt>$Revision: 1.12 $</tt>
*
- * $Id: ConnectionEndpoint.java,v 1.11 2006/07/17 17:14:45 timfox Exp $
+ * $Id: ConnectionEndpoint.java,v 1.12 2006/07/27 19:01:54 timfox Exp $
*/
public interface ConnectionEndpoint extends Closeable
{
@@ -54,6 +54,6 @@
void sendTransaction(TransactionRequest request) throws JMSException;
- Xid[] getPreparedTransactions();
+ Xid[] getPreparedTransactions() throws JMSException;
}
1.12 +3 -3 jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ConnectionFactoryEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -b -r1.11 -r1.12
--- ConnectionFactoryEndpoint.java 23 May 2006 18:25:08 -0000 1.11
+++ ConnectionFactoryEndpoint.java 27 Jul 2006 19:01:54 -0000 1.12
@@ -32,16 +32,16 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1.11 $</tt>
+ * @version <tt>$Revision: 1.12 $</tt>
*
- * $Id: ConnectionFactoryEndpoint.java,v 1.11 2006/05/23 18:25:08 ovidiu Exp $
+ * $Id: ConnectionFactoryEndpoint.java,v 1.12 2006/07/27 19:01:54 timfox Exp $
*/
public interface ConnectionFactoryEndpoint
{
ConnectionDelegate createConnectionDelegate(String username, String password)
throws JMSException;
- byte[] getClientAOPConfig();
+ byte[] getClientAOPConfig() throws JMSException;
IdBlock getIdBlock(int size) throws JMSException;
}
1.48 +51 -127 jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerConnectionEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java,v
retrieving revision 1.47
retrieving revision 1.48
diff -u -b -r1.47 -r1.48
--- ServerConnectionEndpoint.java 20 Jul 2006 14:04:02 -0000 1.47
+++ ServerConnectionEndpoint.java 27 Jul 2006 19:01:54 -0000 1.48
@@ -47,6 +47,7 @@
import org.jboss.jms.tx.AckInfo;
import org.jboss.jms.tx.TransactionRequest;
import org.jboss.jms.tx.TxState;
+import org.jboss.jms.util.ExceptionUtil;
import org.jboss.jms.util.MessagingJMSException;
import org.jboss.jms.util.MessagingTransactionRolledBackException;
import org.jboss.jms.util.ToString;
@@ -62,17 +63,15 @@
import org.jboss.util.id.GUID;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
/**
* Concrete implementation of ConnectionEndpoint.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.47 $</tt>
+ * @version <tt>$Revision: 1.48 $</tt>
*
- * $Id: ServerConnectionEndpoint.java,v 1.47 2006/07/20 14:04:02 timfox Exp $
+ * $Id: ServerConnectionEndpoint.java,v 1.48 2006/07/27 19:01:54 timfox Exp $
*/
public class ServerConnectionEndpoint implements ConnectionEndpoint
{
@@ -107,8 +106,6 @@
private String password;
- private ReadWriteLock closeLock;
-
// the server itself
private ServerPeer serverPeer;
@@ -150,8 +147,6 @@
this.username = username;
this.password = password;
-
- closeLock = new WriterPreferenceReadWriteLock();
}
// ConnectionDelegate implementation -----------------------------
@@ -163,14 +158,6 @@
{
try
{
- closeLock.readLock().acquire();
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- try
- {
log.debug("creating session " + (transacted ? "transacted" :"non transacted")+ ", " + ToString.acknowledgmentMode(acknowledgmentMode) + ", " + (isXA ? "XA": "non XA"));
if (closed)
@@ -194,9 +181,9 @@
return d;
}
- finally
+ catch (Throwable t)
{
- closeLock.readLock().release();
+ throw ExceptionUtil.handleJMSInvocation(t, this + " createSessionDelegate");
}
}
@@ -204,36 +191,20 @@
{
try
{
- closeLock.readLock().acquire();
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- try
- {
if (closed)
{
throw new IllegalStateException("Connection is closed");
}
return clientID;
}
- finally
+ catch (Throwable t)
{
- closeLock.readLock().release();
+ throw ExceptionUtil.handleJMSInvocation(t, this + " getClientID");
}
}
- public void setClientID(String clientID) throws IllegalStateException
- {
- try
- {
- closeLock.readLock().acquire();
- }
- catch (InterruptedException e)
+ public void setClientID(String clientID) throws JMSException
{
- //Ignore
- }
try
{
if (closed)
@@ -247,9 +218,9 @@
}
this.clientID = clientID;
}
- finally
+ catch (Throwable t)
{
- closeLock.readLock().release();
+ throw ExceptionUtil.handleJMSInvocation(t, this + " setClientID");
}
}
@@ -257,14 +228,6 @@
{
try
{
- closeLock.readLock().acquire();
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- try
- {
if (closed)
{
throw new IllegalStateException("Connection is closed");
@@ -272,9 +235,9 @@
setStarted(true);
log.debug(this + " started");
}
- finally
+ catch (Throwable t)
{
- closeLock.readLock().release();
+ throw ExceptionUtil.handleJMSInvocation(t, this + " start");
}
}
@@ -282,14 +245,6 @@
{
try
{
- closeLock.readLock().acquire();
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- try
- {
if (closed)
{
throw new IllegalStateException("Connection is closed");
@@ -297,9 +252,9 @@
setStarted(false);
log.debug("Connection " + connectionID + " stopped");
}
- finally
+ catch (Throwable t)
{
- closeLock.readLock().release();
+ throw ExceptionUtil.handleJMSInvocation(t, this + " stop");
}
}
@@ -307,14 +262,6 @@
{
try
{
- closeLock.writeLock().acquire();
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- try
- {
if (trace) { log.trace("close()"); }
if (closed)
@@ -342,7 +289,7 @@
for(Iterator i = temporaryDestinations.iterator(); i.hasNext(); )
{
JBossDestination dest = (JBossDestination)i.next();
- channelMapper.undeployCoreDestination(dest.isQueue(), dest.getName());
+ channelMapper.undeployTemporaryCoreDestination(dest.isQueue(), dest.getName());
}
temporaryDestinations.clear();
@@ -352,11 +299,10 @@
JMSDispatcher.instance.unregisterTarget(new Integer(connectionID));
closed = true;
}
- finally
+ catch (Throwable t)
{
- closeLock.writeLock().release();
+ throw ExceptionUtil.handleJMSInvocation(t, this + " close");
}
-
}
public void closing() throws JMSException
@@ -368,14 +314,6 @@
{
try
{
- closeLock.readLock().acquire();
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- try
- {
if (closed)
{
throw new IllegalStateException("Connection is closed");
@@ -479,9 +417,9 @@
if (trace) { log.trace("request processed ok"); }
}
- finally
+ catch (Throwable t)
{
- closeLock.readLock().release();
+ throw ExceptionUtil.handleJMSInvocation(t, this + " sendTransaction");
}
}
@@ -490,12 +428,19 @@
* This would be used by the transaction manager in recovery or by a tool to apply
* heuristic decisions to commit or rollback particular transactions
*/
- public Xid[] getPreparedTransactions()
+ public Xid[] getPreparedTransactions() throws JMSException
+ {
+ try
{
List xids = tr.getPreparedTransactions();
return (Xid[])xids.toArray(new Xid[xids.size()]);
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " getPreparedTransactions");
+ }
+ }
// Public --------------------------------------------------------
@@ -579,23 +524,8 @@
protected boolean isStarted()
{
- try
- {
- closeLock.readLock().acquire();
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
- try
- {
return started;
}
- finally
- {
- closeLock.readLock().release();
- }
- }
/**
* Generates a sessionID that is unique per this ConnectionDelegate instance
@@ -665,7 +595,7 @@
return jmsClientVMId;
}
- protected void sendMessage(JBossMessage jbm, Transaction tx) throws JMSException
+ protected void sendMessage(JBossMessage jbm, Transaction tx) throws Exception
{
// The JMSDestination header must already have been set for each message
JBossDestination jbDest = (JBossDestination)jbm.getJMSDestination();
@@ -701,14 +631,8 @@
boolean internalTx = false;
if (m.isReliable() && tx == null && !coreDestination.isQueue())
{
- try
- {
tx = tr.createTransaction();
- }
- catch (Exception e)
- {
- throw new MessagingJMSException("Failed to create internal transaction", e);
- }
+
internalTx = true;
}
@@ -764,7 +688,7 @@
// Private -------------------------------------------------------
- private void setStarted(boolean s) throws JMSException
+ private void setStarted(boolean s) throws Throwable
{
synchronized(sessions)
{
@@ -777,7 +701,7 @@
}
}
- private void processTransaction(TxState txState, Transaction tx) throws JMSException
+ private void processTransaction(TxState txState, Transaction tx) throws Throwable
{
if (trace) { log.trace("processing transaction, there are " + txState.getMessages().size() + " messages and " + txState.getAcks().size() + " acks "); }
1.26 +46 -40 jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerConnectionFactoryEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -b -r1.25 -r1.26
--- ServerConnectionFactoryEndpoint.java 17 Jul 2006 17:14:46 -0000 1.25
+++ ServerConnectionFactoryEndpoint.java 27 Jul 2006 19:01:54 -0000 1.26
@@ -29,7 +29,7 @@
import org.jboss.jms.server.connectionfactory.JNDIBindings;
import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
import org.jboss.jms.server.remoting.JMSDispatcher;
-import org.jboss.jms.util.MessagingJMSException;
+import org.jboss.jms.util.ExceptionUtil;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.IdBlock;
@@ -38,9 +38,9 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.25 $</tt>
+ * @version <tt>$Revision: 1.26 $</tt>
*
- * $Id: ServerConnectionFactoryEndpoint.java,v 1.25 2006/07/17 17:14:46 timfox Exp $
+ * $Id: ServerConnectionFactoryEndpoint.java,v 1.26 2006/07/27 19:01:54 timfox Exp $
*/
public class ServerConnectionFactoryEndpoint implements ConnectionFactoryEndpoint
{
@@ -85,6 +85,8 @@
public ConnectionDelegate createConnectionDelegate(String username, String password)
throws JMSException
{
+ try
+ {
log.debug("creating a new connection for user " + username);
// authenticate the user
@@ -114,23 +116,27 @@
log.debug("created and registered " + endpoint);
- ClientConnectionDelegate delegate;
- try
- {
- delegate = new ClientConnectionDelegate(connectionID);
+ ClientConnectionDelegate delegate = new ClientConnectionDelegate(connectionID);
+
+ return delegate;
}
- catch (Exception e)
+ catch (Throwable t)
{
- throw new MessagingJMSException("Failed to create connection stub", e);
+ throw ExceptionUtil.handleJMSInvocation(t, this + " createConnectionDelegate");
}
-
- return delegate;
}
- public byte[] getClientAOPConfig()
+ public byte[] getClientAOPConfig() throws JMSException
+ {
+ try
{
return serverPeer.getClientAOPConfig();
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " getClientAOPConfig");
+ }
+ }
public IdBlock getIdBlock(int size) throws JMSException
{
@@ -138,9 +144,9 @@
{
return serverPeer.getMessageIdManager().getIdBlock(size);
}
- catch (Exception e)
+ catch (Throwable t)
{
- throw new MessagingJMSException("Failed to get id block", e);
+ throw ExceptionUtil.handleJMSInvocation(t, this + " getIdBlock");
}
}
1.46 +120 -137 jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerConsumerEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java,v
retrieving revision 1.45
retrieving revision 1.46
diff -u -b -r1.45 -r1.46
--- ServerConsumerEndpoint.java 20 Jul 2006 14:04:02 -0000 1.45
+++ ServerConsumerEndpoint.java 27 Jul 2006 19:01:54 -0000 1.46
@@ -42,7 +42,7 @@
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.jms.server.subscription.Subscription;
-import org.jboss.jms.util.MessagingJMSException;
+import org.jboss.jms.util.ExceptionUtil;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.Delivery;
@@ -67,9 +67,9 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.45 $</tt>
+ * @version <tt>$Revision: 1.46 $</tt>
*
- * $Id: ServerConsumerEndpoint.java,v 1.45 2006/07/20 14:04:02 timfox Exp $
+ * $Id: ServerConsumerEndpoint.java,v 1.46 2006/07/27 19:01:54 timfox Exp $
*/
public class ServerConsumerEndpoint implements Receiver, Filter, ConsumerEndpoint
{
@@ -337,13 +337,22 @@
public void closing() throws JMSException
{
+ try
+ {
if (trace) { log.trace(this + " closing"); }
stop();
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
+ }
+ }
public void close() throws JMSException
{
+ try
+ {
synchronized (lock)
{
//On close we only disconnect the consumer from the Channel we don't actually remove it
@@ -361,23 +370,21 @@
if (channel instanceof Subscription)
{
Subscription sub = (Subscription)channel;
- try
- {
if (!sub.isRecoverable())
{
//We don't disconnect durable subs
sub.disconnect();
}
}
- catch (Exception e)
- {
- throw new MessagingJMSException("Failed to disconnect", e);
- }
- }
closed = true;
}
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " close");
+ }
+ }
// ConsumerEndpoint implementation -------------------------------
@@ -385,7 +392,7 @@
* This is called by the client consumer to tell the server to wake up and start sending more
* messages if available
*/
- public void more()
+ public void more() throws JMSException
{
try
{
@@ -417,13 +424,17 @@
result.getResult();
//Now we know the deliverer has delivered any outstanding messages to the client buffer
+
+ channel.deliver(false);
}
catch (InterruptedException e)
{
log.warn("Thread interrupted", e);
}
-
- channel.deliver(false);
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " more");
+ }
}
@@ -449,59 +460,44 @@
return id;
}
- /**
- * Actually remove the consumer and clear up any deliveries it may have
- * This is called by the session on session.close()
- * We can get rid of this when we store the deliveries on the session
- *
- **/
- public void remove() throws JMSException
- {
- if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
+ // Package protected ---------------------------------------------
- boolean wereDeliveries = false;
- for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
+ // Protected -----------------------------------------------------
+
+ protected void acknowledgeTransactionally(long messageID, Transaction tx) throws Throwable
{
- SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
- try
+ if (trace) { log.trace("acknowledging transactionally " + messageID); }
+
+ SingleReceiverDelivery d = null;
+
+ // The actual removal of the deliveries from the delivery list is deferred until tx commit
+ synchronized (lock)
{
- d.cancel();
- wereDeliveries = true;
+ d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
}
- catch(Throwable t)
+
+ DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getKeyedCallback(this);
+
+ if (deliveryCallback == null)
{
- throw new MessagingJMSException("Failed to cancel delivery", t);
- }
+ deliveryCallback = new DeliveryCallback();
+ tx.addKeyedCallback(deliveryCallback, this);
}
- deliveries.clear();
+ deliveryCallback.addMessageID(messageID);
- if (!disconnected)
- {
- if (!closed)
+ if (d != null)
{
- close();
- }
+ d.acknowledge(tx);
}
-
- sessionEndpoint.getConnectionEndpoint().
- getServerPeer().removeConsumerEndpoint(new Integer(id));
-
- sessionEndpoint.removeConsumerEndpoint(id);
-
- if (wereDeliveries)
+ else
{
- //If we cancelled any deliveries we need to force a deliver on the channel
- //This is because there may be other waiting competing consumers who need a chance to get
- //any of the cancelled messages
- channel.deliver(false);
+ throw new IllegalStateException("Failed to acknowledge delivery " + d);
}
}
- public void acknowledge(long messageID) throws JMSException
+ protected void acknowledge(long messageID) throws Throwable
{
// acknowledge a delivery
- try
- {
SingleReceiverDelivery d;
synchronized (lock)
@@ -527,82 +523,69 @@
throw new IllegalStateException("Cannot find delivery to acknowledge:" + messageID);
}
}
- catch(Throwable t)
- {
- throw new MessagingJMSException("Failed to acknowledge deliveries", t);
- }
- }
- public void acknowledgeTransactionally(long messageID, Transaction tx) throws JMSException
+ /**
+ * Actually remove the consumer and clear up any deliveries it may have
+ * This is called by the session on session.close()
+ * We can get rid of this when we store the deliveries on the session
+ *
+ **/
+ protected void remove() throws Throwable
{
- if (trace) { log.trace("acknowledging transactionally " + messageID); }
-
- SingleReceiverDelivery d = null;
+ if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
- // The actual removal of the deliveries from the delivery list is deferred until tx commit
- synchronized (lock)
+ boolean wereDeliveries = false;
+ for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
{
- d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
- }
-
- DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getKeyedCallback(this);
+ SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
- if (deliveryCallback == null)
- {
- deliveryCallback = new DeliveryCallback();
- tx.addKeyedCallback(deliveryCallback, this);
+ d.cancel();
+ wereDeliveries = true;
}
- deliveryCallback.addMessageID(messageID);
+ deliveries.clear();
- if (d != null)
- {
- try
+ if (!disconnected)
{
- d.acknowledge(tx);
- }
- catch(Throwable t)
+ if (!closed)
{
- throw new MessagingJMSException("Message " + messageID +
- "cannot be acknowledged to the source", t);
+ close();
}
}
- else
+
+ sessionEndpoint.getConnectionEndpoint().
+ getServerPeer().removeConsumerEndpoint(new Integer(id));
+
+ sessionEndpoint.removeConsumerEndpoint(id);
+
+ if (wereDeliveries)
{
- throw new IllegalStateException("Failed to acknowledge delivery " + d);
+ //If we cancelled any deliveries we need to force a deliver on the channel
+ //This is because there may be other waiting competing consumers who need a chance to get
+ //any of the cancelled messages
+ channel.deliver(false);
}
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
protected void promptDelivery()
{
channel.deliver(false);
}
- protected void cancelDelivery(Long messageID) throws JMSException
+ protected void cancelDelivery(Long messageID) throws Throwable
{
SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
if (del != null)
{
del.getReference().decrementDeliveryCount();
- try
- {
del.cancel();
}
- catch (Throwable t)
- {
- throw new MessagingJMSException("Failed to cancel delivery " + del, t);
- }
- }
else
{
throw new IllegalStateException("Cannot find delivery to cancel:" + id);
}
}
- protected void start() throws JMSException
+ protected void start()
{
synchronized (lock)
{
@@ -624,7 +607,7 @@
channel.deliver(false);
}
- protected void stop() throws JMSException
+ protected void stop() throws Throwable
{
//We need to:
//Stop accepting any new messages in the SCE
1.43 +396 -319 jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerSessionEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -b -r1.42 -r1.43
--- ServerSessionEndpoint.java 17 Jul 2006 19:57:02 -0000 1.42
+++ ServerSessionEndpoint.java 27 Jul 2006 19:01:54 -0000 1.43
@@ -48,7 +48,7 @@
import org.jboss.jms.server.subscription.DurableSubscription;
import org.jboss.jms.server.subscription.Subscription;
import org.jboss.jms.tx.AckInfo;
-import org.jboss.jms.util.MessagingJMSException;
+import org.jboss.jms.util.ExceptionUtil;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Channel;
import org.jboss.messaging.core.local.CoreDestination;
@@ -63,9 +63,9 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.42 $</tt>
+ * @version <tt>$Revision: 1.43 $</tt>
*
- * $Id: ServerSessionEndpoint.java,v 1.42 2006/07/17 19:57:02 timfox Exp $
+ * $Id: ServerSessionEndpoint.java,v 1.43 2006/07/27 19:01:54 timfox Exp $
*/
public class ServerSessionEndpoint implements SessionEndpoint
{
@@ -120,6 +120,8 @@
String subscriptionName,
boolean isCC) throws JMSException
{
+ try
+ {
if (closed)
{
throw new IllegalStateException("Session is closed");
@@ -228,15 +230,8 @@
subscriptionName + " to unsubscribe");
}
- try
- {
//Remove data for the durable sub
((DurableSubscription)subscription).unsubscribe();
- }
- catch (Exception e)
- {
- throw new MessagingJMSException("Failed to unsubscribe", e);
- }
// create a fresh new subscription
subscription = cm.createDurableSubscription(jmsDestination.getName(),
@@ -276,10 +271,17 @@
return stub;
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
+ }
+ }
public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination, String messageSelector)
throws JMSException
{
+ try
+ {
if (closed)
{
throw new IllegalStateException("Session is closed");
@@ -317,9 +319,16 @@
return stub;
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " createBrowserDelegate");
+ }
+ }
public JBossQueue createQueue(String name) throws JMSException
{
+ try
+ {
if (closed)
{
throw new IllegalStateException("Session is closed");
@@ -334,9 +343,16 @@
return new JBossQueue(name);
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " createQueue");
+ }
+ }
public JBossTopic createTopic(String name) throws JMSException
{
+ try
+ {
if (closed)
{
throw new IllegalStateException("Session is closed");
@@ -351,9 +367,16 @@
return new JBossTopic(name);
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " createTopic");
+ }
+ }
public void close() throws JMSException
{
+ try
+ {
if (closed)
{
throw new IllegalStateException("Session is already closed");
@@ -375,6 +398,11 @@
closed = true;
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " close");
+ }
+ }
public void closing() throws JMSException
{
@@ -384,40 +412,52 @@
public void send(JBossMessage message) throws JMSException
{
+ try
+ {
+ log.info("Received message:" + message);
connectionEndpoint.sendMessage(message, null);
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " send");
+ }
+ }
public void acknowledgeBatch(List ackInfos) throws JMSException
{
+ try
+ {
Iterator iter = ackInfos.iterator();
while (iter.hasNext())
{
AckInfo ackInfo = (AckInfo)iter.next();
- acknowledge(ackInfo);
+ acknowledgeInternal(ackInfo);
+ }
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledgeBatch");
}
}
public void acknowledge(AckInfo ackInfo) throws JMSException
{
- //If the message was delivered via a connection consumer then the message needs to be acked
- //via the original consumer that was used to feed the connection consumer - which
- //won't be one of the consumers of this session
- //Therefore we always look in the global map of consumers held in the server peer
- ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
-
- if (consumer == null)
+ try
{
- throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
+ acknowledgeInternal(ackInfo);
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledge");
}
-
- consumer.acknowledge(ackInfo.getMessageID());
-
}
public void cancelDeliveries(List ackInfos) throws JMSException
{
+ try
+ {
//Deliveries must be cancelled in reverse order
Set consumers = new HashSet();
@@ -450,9 +490,16 @@
consumer.promptDelivery();
}
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDeliveries");
+ }
+ }
public void addTemporaryDestination(JBossDestination dest) throws JMSException
{
+ try
+ {
if (closed)
{
throw new IllegalStateException("Session is closed");
@@ -464,11 +511,20 @@
connectionEndpoint.addTemporaryDestination(dest);
//FIXME - Params should not be hardcoded
- cm.deployCoreDestination(dest.isQueue(), dest.getName(), ms, pm, mm, 50000, 1000, 1000);
+ long id = this.getConnectionEndpoint().getServerPeer().getNextObjectID();
+
+ cm.deployTemporaryCoreDestination(dest.isQueue(), dest.getName(), id, ms, pm, mm, 50000, 1000, 1000);
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " addTemporaryDestination");
+ }
}
public void deleteTemporaryDestination(JBossDestination dest) throws JMSException
{
+ try
+ {
if (closed)
{
throw new IllegalStateException("Session is closed");
@@ -507,12 +563,19 @@
}
}
- cm.undeployCoreDestination(dest.isQueue(), dest.getName());
+ cm.undeployTemporaryCoreDestination(dest.isQueue(), dest.getName());
connectionEndpoint.removeTemporaryDestination(dest);
}
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " deleteTemporaryDestination");
+ }
+ }
public void unsubscribe(String subscriptionName) throws JMSException
{
+ try
+ {
if (closed)
{
throw new IllegalStateException("Session is closed");
@@ -546,13 +609,11 @@
throw new JMSException("Failed to remove durable subscription");
}
- try
- {
subscription.unsubscribe();
}
- catch (Exception e)
+ catch (Throwable t)
{
- throw new MessagingJMSException("Failed to unsubscribe", e);
+ throw ExceptionUtil.handleJMSInvocation(t, this + " unsubscribe");
}
}
@@ -580,6 +641,22 @@
// Protected -----------------------------------------------------
+ protected void acknowledgeInternal(AckInfo ackInfo) throws Throwable
+ {
+ //If the message was delivered via a connection consumer then the message needs to be acked
+ //via the original consumer that was used to feed the connection consumer - which
+ //won't be one of the consumers of this session
+ //Therefore we always look in the global map of consumers held in the server peer
+ ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
+
+ if (consumer == null)
+ {
+ throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
+ }
+
+ consumer.acknowledge(ackInfo.getMessageID());
+ }
+
protected ServerConsumerEndpoint putConsumerEndpoint(int consumerID, ServerConsumerEndpoint d)
{
if (trace) { log.trace(this + " caching consumer " + consumerID); }
@@ -615,7 +692,7 @@
/**
* Starts this session's Consumers
*/
- protected void setStarted(boolean s) throws JMSException
+ protected void setStarted(boolean s) throws Throwable
{
synchronized(consumers)
{
More information about the jboss-cvs-commits
mailing list