[jboss-cvs] jboss-jms/src/main/org/jboss/jms/server/endpoint ...

Timothy Fox tim.fox at jboss.com
Thu Jul 27 15:01:54 EDT 2006


  User: timfox  
  Date: 06/07/27 15:01:54

  Modified:    src/main/org/jboss/jms/server/endpoint       
                        BrowserEndpoint.java ConnectionEndpoint.java
                        ConnectionFactoryEndpoint.java
                        ServerConnectionEndpoint.java
                        ServerConnectionFactoryEndpoint.java
                        ServerConsumerEndpoint.java
                        ServerSessionEndpoint.java
  Log:
  Mainly http://jira.jboss.com/jira/browse/JBMESSAGING-434 plus a few other bits and pieces
  
  Revision  Changes    Path
  1.6       +3 -3      jboss-jms/src/main/org/jboss/jms/server/endpoint/BrowserEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BrowserEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/BrowserEndpoint.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -b -r1.5 -r1.6
  --- BrowserEndpoint.java	29 Dec 2005 14:09:56 -0000	1.5
  +++ BrowserEndpoint.java	27 Jul 2006 19:01:54 -0000	1.6
  @@ -31,9 +31,9 @@
    * of the methods are handled in the advice stack.
    * 
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.5 $</tt>
  + * @version <tt>$Revision: 1.6 $</tt>
    *
  - * $Id: BrowserEndpoint.java,v 1.5 2005/12/29 14:09:56 timfox Exp $
  + * $Id: BrowserEndpoint.java,v 1.6 2006/07/27 19:01:54 timfox Exp $
    */
   public interface BrowserEndpoint extends Closeable
   {   
  
  
  
  1.12      +3 -3      jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ConnectionEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -b -r1.11 -r1.12
  --- ConnectionEndpoint.java	17 Jul 2006 17:14:45 -0000	1.11
  +++ ConnectionEndpoint.java	27 Jul 2006 19:01:54 -0000	1.12
  @@ -34,9 +34,9 @@
    * The rest of the methods are handled in the advice stack.
    * 
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.11 $</tt>
  + * @version <tt>$Revision: 1.12 $</tt>
    *
  - * $Id: ConnectionEndpoint.java,v 1.11 2006/07/17 17:14:45 timfox Exp $
  + * $Id: ConnectionEndpoint.java,v 1.12 2006/07/27 19:01:54 timfox Exp $
    */
   public interface ConnectionEndpoint extends Closeable
   {
  @@ -54,6 +54,6 @@
   
      void sendTransaction(TransactionRequest request) throws JMSException;
   
  -   Xid[] getPreparedTransactions();   
  +   Xid[] getPreparedTransactions() throws JMSException; 
   }
   
  
  
  
  1.12      +3 -3      jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ConnectionFactoryEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -b -r1.11 -r1.12
  --- ConnectionFactoryEndpoint.java	23 May 2006 18:25:08 -0000	1.11
  +++ ConnectionFactoryEndpoint.java	27 Jul 2006 19:01:54 -0000	1.12
  @@ -32,16 +32,16 @@
    * 
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  - * @version <tt>$Revision: 1.11 $</tt>
  + * @version <tt>$Revision: 1.12 $</tt>
    *
  - * $Id: ConnectionFactoryEndpoint.java,v 1.11 2006/05/23 18:25:08 ovidiu Exp $
  + * $Id: ConnectionFactoryEndpoint.java,v 1.12 2006/07/27 19:01:54 timfox Exp $
    */
   public interface ConnectionFactoryEndpoint
   {
      ConnectionDelegate createConnectionDelegate(String username, String password)
         throws JMSException;
      
  -   byte[] getClientAOPConfig();
  +   byte[] getClientAOPConfig() throws JMSException;
   
      IdBlock getIdBlock(int size) throws JMSException;
   }
  
  
  
  1.48      +51 -127   jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerConnectionEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java,v
  retrieving revision 1.47
  retrieving revision 1.48
  diff -u -b -r1.47 -r1.48
  --- ServerConnectionEndpoint.java	20 Jul 2006 14:04:02 -0000	1.47
  +++ ServerConnectionEndpoint.java	27 Jul 2006 19:01:54 -0000	1.48
  @@ -47,6 +47,7 @@
   import org.jboss.jms.tx.AckInfo;
   import org.jboss.jms.tx.TransactionRequest;
   import org.jboss.jms.tx.TxState;
  +import org.jboss.jms.util.ExceptionUtil;
   import org.jboss.jms.util.MessagingJMSException;
   import org.jboss.jms.util.MessagingTransactionRolledBackException;
   import org.jboss.jms.util.ToString;
  @@ -62,17 +63,15 @@
   import org.jboss.util.id.GUID;
   
   import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
  -import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
  -import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
   
   /**
    * Concrete implementation of ConnectionEndpoint.
    * 
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.47 $</tt>
  + * @version <tt>$Revision: 1.48 $</tt>
    *
  - * $Id: ServerConnectionEndpoint.java,v 1.47 2006/07/20 14:04:02 timfox Exp $
  + * $Id: ServerConnectionEndpoint.java,v 1.48 2006/07/27 19:01:54 timfox Exp $
    */
   public class ServerConnectionEndpoint implements ConnectionEndpoint
   {
  @@ -107,8 +106,6 @@
      
      private String password;
   
  -   private ReadWriteLock closeLock;
  -
      // the server itself
      private ServerPeer serverPeer;
   
  @@ -150,8 +147,6 @@
         
         this.username = username;
         this.password = password;
  -      
  -      closeLock = new WriterPreferenceReadWriteLock();
      }
      
      // ConnectionDelegate implementation -----------------------------
  @@ -163,14 +158,6 @@
      {
         try
         {
  -         closeLock.readLock().acquire();
  -      }
  -      catch (InterruptedException e)
  -      {
  -         //Ignore
  -      }
  -      try
  -      {
            log.debug("creating session " + (transacted ? "transacted" :"non transacted")+ ", " + ToString.acknowledgmentMode(acknowledgmentMode) + ", " + (isXA ? "XA": "non XA"));
   
            if (closed)
  @@ -194,9 +181,9 @@
   
            return d;
         }
  -      finally
  +      catch (Throwable t)
         {
  -         closeLock.readLock().release();
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " createSessionDelegate");         
         }
      }
            
  @@ -204,36 +191,20 @@
      {
         try
         {
  -         closeLock.readLock().acquire();
  -      }
  -      catch (InterruptedException e)
  -      {
  -         //Ignore
  -      }
  -      try
  -      {
            if (closed)
            {
               throw new IllegalStateException("Connection is closed");
            }
            return clientID;
         }
  -      finally
  +      catch (Throwable t)
         {
  -         closeLock.readLock().release();
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " getClientID");
         }
      }
      
  -   public void setClientID(String clientID) throws IllegalStateException
  -   {
  -      try
  -      {
  -         closeLock.readLock().acquire();
  -      }
  -      catch (InterruptedException e)
  +   public void setClientID(String clientID) throws JMSException
         {
  -         //Ignore
  -      }
         try
         {
            if (closed)
  @@ -247,9 +218,9 @@
            }
            this.clientID = clientID;
         }
  -      finally
  +      catch (Throwable t)
         {
  -         closeLock.readLock().release();
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " setClientID");
         }
      }
         
  @@ -257,14 +228,6 @@
      {
         try
         {
  -         closeLock.readLock().acquire();
  -      }
  -      catch (InterruptedException e)
  -      {
  -         //Ignore
  -      }
  -      try
  -      {
            if (closed)
            {
               throw new IllegalStateException("Connection is closed");
  @@ -272,9 +235,9 @@
            setStarted(true);
            log.debug(this + " started");
         }
  -      finally
  +      catch (Throwable t)
         {
  -         closeLock.readLock().release();
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " start");
         }
      }   
      
  @@ -282,14 +245,6 @@
      {
         try
         {
  -         closeLock.readLock().acquire();
  -      }
  -      catch (InterruptedException e)
  -      {
  -         //Ignore
  -      }
  -      try
  -      {
            if (closed)
            {
               throw new IllegalStateException("Connection is closed");
  @@ -297,9 +252,9 @@
            setStarted(false);
            log.debug("Connection " + connectionID + " stopped");
         }
  -      finally
  +      catch (Throwable t)
         {
  -         closeLock.readLock().release();
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " stop");
         }
      }
      
  @@ -307,14 +262,6 @@
      {      
         try
         {
  -         closeLock.writeLock().acquire();
  -      }
  -      catch (InterruptedException e)
  -      {
  -         // Ignore
  -      }
  -      try
  -      {
            if (trace) { log.trace("close()"); }
            
            if (closed)
  @@ -342,7 +289,7 @@
            for(Iterator i = temporaryDestinations.iterator(); i.hasNext(); )
            {
               JBossDestination dest = (JBossDestination)i.next();
  -            channelMapper.undeployCoreDestination(dest.isQueue(), dest.getName());
  +            channelMapper.undeployTemporaryCoreDestination(dest.isQueue(), dest.getName());
            }
            
            temporaryDestinations.clear();
  @@ -352,11 +299,10 @@
            JMSDispatcher.instance.unregisterTarget(new Integer(connectionID));
            closed = true;
         }
  -      finally
  +      catch (Throwable t)
         {
  -         closeLock.writeLock().release();
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " close");
         }
  -
      }
      
      public void closing() throws JMSException
  @@ -368,14 +314,6 @@
      {
         try
         {
  -         closeLock.readLock().acquire();
  -      }
  -      catch (InterruptedException e)
  -      {
  -         //Ignore
  -      }
  -      try
  -      {
            if (closed)
            {
               throw new IllegalStateException("Connection is closed");
  @@ -479,9 +417,9 @@
                    
            if (trace) { log.trace("request processed ok"); }
         }
  -      finally
  +      catch (Throwable t)
         {
  -         closeLock.readLock().release();
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " sendTransaction");
         }
      }
      
  @@ -490,12 +428,19 @@
       * This would be used by the transaction manager in recovery or by a tool to apply
       * heuristic decisions to commit or rollback particular transactions
       */
  -   public Xid[] getPreparedTransactions()
  +   public Xid[] getPreparedTransactions() throws JMSException
  +   {
  +      try
      {
         List xids = tr.getPreparedTransactions();
         
         return (Xid[])xids.toArray(new Xid[xids.size()]);
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " getPreparedTransactions");
  +      }
  +   }
     
      // Public --------------------------------------------------------
      
  @@ -579,23 +524,8 @@
      
      protected boolean isStarted()
      {
  -      try
  -      {
  -         closeLock.readLock().acquire();
  -      }
  -      catch (InterruptedException e)
  -      {
  -         //Ignore
  -      }
  -      try
  -      {
            return started;
         }
  -      finally
  -      {
  -         closeLock.readLock().release();
  -      }
  -   }
      
      /**
       * Generates a sessionID that is unique per this ConnectionDelegate instance
  @@ -665,7 +595,7 @@
         return jmsClientVMId;
      }
   
  -   protected void sendMessage(JBossMessage jbm, Transaction tx) throws JMSException
  +   protected void sendMessage(JBossMessage jbm, Transaction tx) throws Exception
      {
         // The JMSDestination header must already have been set for each message
         JBossDestination jbDest = (JBossDestination)jbm.getJMSDestination();
  @@ -701,14 +631,8 @@
         boolean internalTx = false;
         if (m.isReliable() && tx == null && !coreDestination.isQueue())
         {
  -         try
  -         {
               tx = tr.createTransaction();
  -         }
  -         catch (Exception e)
  -         {
  -            throw new MessagingJMSException("Failed to create internal transaction", e);
  -         }
  +         
            internalTx = true;
         }
         
  @@ -764,7 +688,7 @@
   
      // Private -------------------------------------------------------
      
  -   private void setStarted(boolean s) throws JMSException
  +   private void setStarted(boolean s) throws Throwable
      {
         synchronized(sessions)
         {
  @@ -777,7 +701,7 @@
         }
      }   
      
  -   private void processTransaction(TxState txState, Transaction tx) throws JMSException
  +   private void processTransaction(TxState txState, Transaction tx) throws Throwable
      {
         if (trace) { log.trace("processing transaction, there are " + txState.getMessages().size() + " messages and " + txState.getAcks().size() + " acks "); }
         
  
  
  
  1.26      +46 -40    jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerConnectionFactoryEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java,v
  retrieving revision 1.25
  retrieving revision 1.26
  diff -u -b -r1.25 -r1.26
  --- ServerConnectionFactoryEndpoint.java	17 Jul 2006 17:14:46 -0000	1.25
  +++ ServerConnectionFactoryEndpoint.java	27 Jul 2006 19:01:54 -0000	1.26
  @@ -29,7 +29,7 @@
   import org.jboss.jms.server.connectionfactory.JNDIBindings;
   import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
   import org.jboss.jms.server.remoting.JMSDispatcher;
  -import org.jboss.jms.util.MessagingJMSException;
  +import org.jboss.jms.util.ExceptionUtil;
   import org.jboss.logging.Logger;
   import org.jboss.messaging.core.plugin.IdBlock;
   
  @@ -38,9 +38,9 @@
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.25 $</tt>
  + * @version <tt>$Revision: 1.26 $</tt>
    *
  - * $Id: ServerConnectionFactoryEndpoint.java,v 1.25 2006/07/17 17:14:46 timfox Exp $
  + * $Id: ServerConnectionFactoryEndpoint.java,v 1.26 2006/07/27 19:01:54 timfox Exp $
    */
   public class ServerConnectionFactoryEndpoint implements ConnectionFactoryEndpoint
   {
  @@ -85,6 +85,8 @@
      public ConnectionDelegate createConnectionDelegate(String username, String password)
         throws JMSException
      {
  +      try
  +      {
         log.debug("creating a new connection for user " + username);
         
         // authenticate the user
  @@ -114,23 +116,27 @@
         
         log.debug("created and registered " + endpoint);
   
  -      ClientConnectionDelegate delegate;
  -      try
  -      {
  -         delegate = new ClientConnectionDelegate(connectionID);
  +         ClientConnectionDelegate delegate = new ClientConnectionDelegate(connectionID);     
  +         
  +         return delegate;
         }
  -      catch (Exception e)
  +      catch (Throwable t)
         {
  -         throw new MessagingJMSException("Failed to create connection stub", e);
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " createConnectionDelegate");
         }  
  -      
  -      return delegate;
      }
      
  -   public byte[] getClientAOPConfig()
  +   public byte[] getClientAOPConfig() throws JMSException
  +   {
  +      try
      {
         return serverPeer.getClientAOPConfig();
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " getClientAOPConfig");
  +      }
  +   }
   
      public IdBlock getIdBlock(int size) throws JMSException
      {
  @@ -138,9 +144,9 @@
         {
            return serverPeer.getMessageIdManager().getIdBlock(size);
         }
  -      catch (Exception e)
  +      catch (Throwable t)
         {
  -         throw new MessagingJMSException("Failed to get id block", e);
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " getIdBlock");
         }
      }
   
  
  
  
  1.46      +120 -137  jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerConsumerEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java,v
  retrieving revision 1.45
  retrieving revision 1.46
  diff -u -b -r1.45 -r1.46
  --- ServerConsumerEndpoint.java	20 Jul 2006 14:04:02 -0000	1.45
  +++ ServerConsumerEndpoint.java	27 Jul 2006 19:01:54 -0000	1.46
  @@ -42,7 +42,7 @@
   import org.jboss.jms.server.remoting.JMSDispatcher;
   import org.jboss.jms.server.remoting.MessagingMarshallable;
   import org.jboss.jms.server.subscription.Subscription;
  -import org.jboss.jms.util.MessagingJMSException;
  +import org.jboss.jms.util.ExceptionUtil;
   import org.jboss.logging.Logger;
   import org.jboss.messaging.core.Channel;
   import org.jboss.messaging.core.Delivery;
  @@ -67,9 +67,9 @@
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.45 $</tt>
  + * @version <tt>$Revision: 1.46 $</tt>
    *
  - * $Id: ServerConsumerEndpoint.java,v 1.45 2006/07/20 14:04:02 timfox Exp $
  + * $Id: ServerConsumerEndpoint.java,v 1.46 2006/07/27 19:01:54 timfox Exp $
    */
   public class ServerConsumerEndpoint implements Receiver, Filter, ConsumerEndpoint
   {
  @@ -337,13 +337,22 @@
   
      public void closing() throws JMSException
      {
  +      try
  +      {
         if (trace) { log.trace(this + " closing"); }
         
         stop();         
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " closing");
  +      }     
  +   }
      
      public void close() throws JMSException
      {            
  +      try
  +      {
         synchronized (lock)
         { 
            //On close we only disconnect the consumer from the Channel we don't actually remove it
  @@ -361,23 +370,21 @@
            if (channel instanceof Subscription)
            {
               Subscription sub = (Subscription)channel;
  -            try
  -            {
                  if (!sub.isRecoverable())
                  {
                     //We don't disconnect durable subs
                     sub.disconnect();
                  }
               }
  -            catch (Exception e)
  -            {
  -               throw new MessagingJMSException("Failed to disconnect", e);
  -            }
  -         } 
            
            closed = true;
         }
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " close");
  +      }
  +   }
                  
      // ConsumerEndpoint implementation -------------------------------
      
  @@ -385,7 +392,7 @@
       * This is called by the client consumer to tell the server to wake up and start sending more
       * messages if available
       */
  -   public void more()
  +   public void more() throws JMSException
      {           
         try
         {
  @@ -417,13 +424,17 @@
            result.getResult();
                     
            //Now we know the deliverer has delivered any outstanding messages to the client buffer
  +         
  +         channel.deliver(false);
         }
         catch (InterruptedException e)
         {
            log.warn("Thread interrupted", e);
         }
  -      
  -      channel.deliver(false);
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " more");
  +      }
      }
      
      
  @@ -449,59 +460,44 @@
         return id;
      }
       
  -   /**
  -    * Actually remove the consumer and clear up any deliveries it may have
  -    * This is called by the session on session.close()
  -    * We can get rid of this when we store the deliveries on the session
  -    *
  -    **/
  -   public void remove() throws JMSException
  -   {         
  -      if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
  +   // Package protected ---------------------------------------------
         
  -      boolean wereDeliveries = false;
  -      for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
  +   // Protected -----------------------------------------------------   
  +   
  +   protected void acknowledgeTransactionally(long messageID, Transaction tx) throws Throwable
         {
  -         SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
  -         try
  +      if (trace) { log.trace("acknowledging transactionally " + messageID); }
  +      
  +      SingleReceiverDelivery d = null;
  +                 
  +      // The actual removal of the deliveries from the delivery list is deferred until tx commit
  +      synchronized (lock)
            {
  -            d.cancel();
  -            wereDeliveries = true;
  +         d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
            }
  -         catch(Throwable t)
  +      
  +      DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getKeyedCallback(this);
  +            
  +      if (deliveryCallback == null)
            {
  -            throw new MessagingJMSException("Failed to cancel delivery", t);
  -         }
  +         deliveryCallback = new DeliveryCallback();
  +         tx.addKeyedCallback(deliveryCallback, this);
         }
  -      deliveries.clear();           
  +      deliveryCallback.addMessageID(messageID);
         
  -      if (!disconnected)
  -      {
  -         if (!closed)
  +      if (d != null)
            {
  -            close();
  -         }
  +         d.acknowledge(tx);
         }
  -      
  -      sessionEndpoint.getConnectionEndpoint().
  -         getServerPeer().removeConsumerEndpoint(new Integer(id));                  
  -            
  -      sessionEndpoint.removeConsumerEndpoint(id);
  -      
  -      if (wereDeliveries)
  +      else
         {
  -         //If we cancelled any deliveries we need to force a deliver on the channel
  -         //This is because there may be other waiting competing consumers who need a chance to get
  -         //any of the cancelled messages
  -         channel.deliver(false);
  +         throw new IllegalStateException("Failed to acknowledge delivery " + d);
         }
      }  
      
  -   public void acknowledge(long messageID) throws JMSException
  +   protected void acknowledge(long messageID) throws Throwable
      {  
         // acknowledge a delivery
  -      try
  -      {     
            SingleReceiverDelivery d;
              
            synchronized (lock)
  @@ -527,82 +523,69 @@
               throw new IllegalStateException("Cannot find delivery to acknowledge:" + messageID);
            }
         }
  -      catch(Throwable t)
  -      {
  -         throw new MessagingJMSException("Failed to acknowledge deliveries", t);
  -      }      
  -   }
      
  -   public void acknowledgeTransactionally(long messageID, Transaction tx) throws JMSException
  +   /**
  +    * Actually remove the consumer and clear up any deliveries it may have
  +    * This is called by the session on session.close()
  +    * We can get rid of this when we store the deliveries on the session
  +    *
  +    **/
  +   protected void remove() throws Throwable
      {
  -      if (trace) { log.trace("acknowledging transactionally " + messageID); }
  -      
  -      SingleReceiverDelivery d = null;
  +      if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
                    
  -      // The actual removal of the deliveries from the delivery list is deferred until tx commit
  -      synchronized (lock)
  +      boolean wereDeliveries = false;
  +      for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
         {
  -         d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
  -      }
  -      
  -      DeliveryCallback deliveryCallback = (DeliveryCallback)tx.getKeyedCallback(this);
  +         SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
               
  -      if (deliveryCallback == null)
  -      {
  -         deliveryCallback = new DeliveryCallback();
  -         tx.addKeyedCallback(deliveryCallback, this);
  +         d.cancel();
  +         wereDeliveries = true;
         }
  -      deliveryCallback.addMessageID(messageID);
  +      deliveries.clear();           
            
  -      if (d != null)
  -      {
  -         try
  +      if (!disconnected)
            {
  -            d.acknowledge(tx);
  -         }
  -         catch(Throwable t)
  +         if (!closed)
            {
  -            throw new MessagingJMSException("Message " + messageID +
  -                                            "cannot be acknowledged to the source", t);
  +            close();
            } 
         }
  -      else
  +      
  +      sessionEndpoint.getConnectionEndpoint().
  +         getServerPeer().removeConsumerEndpoint(new Integer(id));                  
  +            
  +      sessionEndpoint.removeConsumerEndpoint(id);
  +      
  +      if (wereDeliveries)
         {
  -         throw new IllegalStateException("Failed to acknowledge delivery " + d);
  +         //If we cancelled any deliveries we need to force a deliver on the channel
  +         //This is because there may be other waiting competing consumers who need a chance to get
  +         //any of the cancelled messages
  +         channel.deliver(false);
         }             
      }      
      
  -   // Package protected ---------------------------------------------
  -   
  -   // Protected -----------------------------------------------------   
  -   
      protected void promptDelivery()
      {
         channel.deliver(false);
      }
      
  -   protected void cancelDelivery(Long messageID) throws JMSException
  +   protected void cancelDelivery(Long messageID) throws Throwable
      {
         SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
         if (del != null)
         {  
             del.getReference().decrementDeliveryCount();    
  -          try
  -          {
                del.cancel();
             }
  -          catch (Throwable t)
  -          {
  -             throw new MessagingJMSException("Failed to cancel delivery " + del, t);
  -          }
  -      }
         else
         {
             throw new IllegalStateException("Cannot find delivery to cancel:" + id);
         }
      }
                  
  -   protected void start() throws JMSException
  +   protected void start()
      {             
         synchronized (lock)
         {
  @@ -624,7 +607,7 @@
         channel.deliver(false);
      }
      
  -   protected void stop() throws JMSException
  +   protected void stop() throws Throwable
      {     
         //We need to:
         //Stop accepting any new messages in the SCE
  
  
  
  1.43      +396 -319  jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerSessionEndpoint.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java,v
  retrieving revision 1.42
  retrieving revision 1.43
  diff -u -b -r1.42 -r1.43
  --- ServerSessionEndpoint.java	17 Jul 2006 19:57:02 -0000	1.42
  +++ ServerSessionEndpoint.java	27 Jul 2006 19:01:54 -0000	1.43
  @@ -48,7 +48,7 @@
   import org.jboss.jms.server.subscription.DurableSubscription;
   import org.jboss.jms.server.subscription.Subscription;
   import org.jboss.jms.tx.AckInfo;
  -import org.jboss.jms.util.MessagingJMSException;
  +import org.jboss.jms.util.ExceptionUtil;
   import org.jboss.logging.Logger;
   import org.jboss.messaging.core.Channel;
   import org.jboss.messaging.core.local.CoreDestination;
  @@ -63,9 +63,9 @@
    * 
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  - * @version <tt>$Revision: 1.42 $</tt>
  + * @version <tt>$Revision: 1.43 $</tt>
    *
  - * $Id: ServerSessionEndpoint.java,v 1.42 2006/07/17 19:57:02 timfox Exp $
  + * $Id: ServerSessionEndpoint.java,v 1.43 2006/07/27 19:01:54 timfox Exp $
    */
   public class ServerSessionEndpoint implements SessionEndpoint
   {
  @@ -120,6 +120,8 @@
                                                     String subscriptionName,
                                                     boolean isCC) throws JMSException
      {
  +      try
  +      {
         if (closed)
         {
            throw new IllegalStateException("Session is closed");
  @@ -228,15 +230,8 @@
                                                              subscriptionName + " to unsubscribe");
                     }
   
  -                  try
  -                  {
                        //Remove data for the durable sub
                        ((DurableSubscription)subscription).unsubscribe();
  -                  }
  -                  catch (Exception e)
  -                  {
  -                     throw new MessagingJMSException("Failed to unsubscribe", e);
  -                  }
   
                     // create a fresh new subscription
                     subscription = cm.createDurableSubscription(jmsDestination.getName(),
  @@ -276,10 +271,17 @@
   
         return stub;
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
  +      }
  +   }
   	
   	public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination, String messageSelector)
   	   throws JMSException
   	{
  +      try
  +      {
   	   if (closed)
   	   {
   	      throw new IllegalStateException("Session is closed");
  @@ -317,9 +319,16 @@
   
   	   return stub;
   	}
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " createBrowserDelegate");
  +      }
  +	}
   
      public JBossQueue createQueue(String name) throws JMSException
      {
  +      try
  +      {
         if (closed)
         {
            throw new IllegalStateException("Session is closed");
  @@ -334,9 +343,16 @@
   
         return new JBossQueue(name);
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " createQueue");
  +      }
  +   }
   
      public JBossTopic createTopic(String name) throws JMSException
      {
  +      try
  +      {
         if (closed)
         {
            throw new IllegalStateException("Session is closed");
  @@ -351,9 +367,16 @@
   
         return new JBossTopic(name);
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " createTopic");
  +      }
  +   }
   
      public void close() throws JMSException
      {
  +      try
  +      {
         if (closed)
         {
            throw new IllegalStateException("Session is already closed");
  @@ -375,6 +398,11 @@
         
         closed = true;
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " close");
  +      }
  +   }
      
      public void closing() throws JMSException
      {
  @@ -384,40 +412,52 @@
      
      public void send(JBossMessage message) throws JMSException
      {
  +      try
  +      {
  +         log.info("Received message:" + message);
         connectionEndpoint.sendMessage(message, null);
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " send");
  +      }
  +   }
      
      public void acknowledgeBatch(List ackInfos) throws JMSException
      {
  +      try
  +      {
         Iterator iter = ackInfos.iterator();
         
         while (iter.hasNext())
         {
            AckInfo ackInfo = (AckInfo)iter.next();
            
  -         acknowledge(ackInfo);
  +            acknowledgeInternal(ackInfo);
  +         }
  +      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledgeBatch");
         }
      }
      
      public void acknowledge(AckInfo ackInfo) throws JMSException
      {
  -      //If the message was delivered via a connection consumer then the message needs to be acked
  -      //via the original consumer that was used to feed the connection consumer - which
  -      //won't be one of the consumers of this session
  -      //Therefore we always look in the global map of consumers held in the server peer
  -      ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
  -
  -      if (consumer == null)
  +      try
         {
  -         throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
  +         acknowledgeInternal(ackInfo);      
  +      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledge");
         }
  -      
  -      consumer.acknowledge(ackInfo.getMessageID());
  -      
      }      
      
      public void cancelDeliveries(List ackInfos) throws JMSException
      {
  +      try
  +      {
         //Deliveries must be cancelled in reverse order
          
         Set consumers = new HashSet();
  @@ -450,9 +490,16 @@
            consumer.promptDelivery();
         }
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDeliveries");
  +      }
  +   }
   
      public void addTemporaryDestination(JBossDestination dest) throws JMSException
      {
  +      try
  +      {
         if (closed)
         {
            throw new IllegalStateException("Session is closed");
  @@ -464,11 +511,20 @@
         connectionEndpoint.addTemporaryDestination(dest);
         
         //FIXME - Params should not be hardcoded
  -      cm.deployCoreDestination(dest.isQueue(), dest.getName(), ms, pm, mm, 50000, 1000, 1000);
  +         long id = this.getConnectionEndpoint().getServerPeer().getNextObjectID();
  +         
  +         cm.deployTemporaryCoreDestination(dest.isQueue(), dest.getName(), id, ms, pm, mm, 50000, 1000, 1000);
  +      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " addTemporaryDestination");
  +      }
      }
      
      public void deleteTemporaryDestination(JBossDestination dest) throws JMSException
      {
  +      try
  +      {
         if (closed)
         {
            throw new IllegalStateException("Session is closed");
  @@ -507,12 +563,19 @@
            }
         }
         
  -      cm.undeployCoreDestination(dest.isQueue(), dest.getName());
  +         cm.undeployTemporaryCoreDestination(dest.isQueue(), dest.getName());
         connectionEndpoint.removeTemporaryDestination(dest);
      }
  +      catch (Throwable t)
  +      {
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " deleteTemporaryDestination");
  +      }
  +   }
      
      public void unsubscribe(String subscriptionName) throws JMSException
      {
  +      try
  +      {
         if (closed)
         {
            throw new IllegalStateException("Session is closed");
  @@ -546,13 +609,11 @@
            throw new JMSException("Failed to remove durable subscription");
         }
         
  -      try
  -      {
            subscription.unsubscribe();
         }
  -      catch (Exception e)
  +      catch (Throwable t)
         {
  -         throw new MessagingJMSException("Failed to unsubscribe", e);
  +         throw ExceptionUtil.handleJMSInvocation(t, this + " unsubscribe");
         }
      }
       
  @@ -580,6 +641,22 @@
      
      // Protected -----------------------------------------------------
      
  +   protected void acknowledgeInternal(AckInfo ackInfo) throws Throwable
  +   {
  +      //If the message was delivered via a connection consumer then the message needs to be acked
  +      //via the original consumer that was used to feed the connection consumer - which
  +      //won't be one of the consumers of this session
  +      //Therefore we always look in the global map of consumers held in the server peer
  +      ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
  +
  +      if (consumer == null)
  +      {
  +         throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
  +      }
  +      
  +      consumer.acknowledge(ackInfo.getMessageID());
  +   }
  +   
      protected ServerConsumerEndpoint putConsumerEndpoint(int consumerID, ServerConsumerEndpoint d)
      {
         if (trace) { log.trace(this + " caching consumer " + consumerID); }
  @@ -615,7 +692,7 @@
      /**
       * Starts this session's Consumers
       */
  -   protected void setStarted(boolean s) throws JMSException
  +   protected void setStarted(boolean s) throws Throwable
      {
         synchronized(consumers)
         {
  
  
  



More information about the jboss-cvs-commits mailing list