[jboss-cvs] JBoss Messaging SVN: r3617 - in trunk/src/main/org/jboss/jms: client/api and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 23 04:40:25 EST 2008


Author: timfox
Date: 2008-01-23 04:40:25 -0500 (Wed, 23 Jan 2008)
New Revision: 3617

Modified:
   trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java
   trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/MessageHandler.java
   trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/client/impl/DeliveryInfo.java
   trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
   trunk/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
ack / tx changes part I


Modified: trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -43,18 +43,15 @@
    
    private ClientSession connectionConsumerSession;
    
-   private boolean shouldAck;
-   
    public AsfMessageHolder(JBossMessage msg, String consumerID,
          String queueName, int maxDeliveries,
-         ClientSession connectionConsumerSession, boolean shouldAck)
+         ClientSession connectionConsumerSession)
    {
       this.msg = msg;
       this.consumerID = consumerID;
       this.queueName = queueName;
       this.maxDeliveries = maxDeliveries;
       this.connectionConsumerSession = connectionConsumerSession;
-      this.shouldAck = shouldAck;
    }
 
    public JBossMessage getMsg()
@@ -106,16 +103,4 @@
    {
       this.connectionConsumerSession = connectionConsumerSession;
    }
-
-   public boolean isShouldAck()
-   {
-      return shouldAck;
-   }
-
-   public void setShouldAck(boolean shouldAck)
-   {
-      this.shouldAck = shouldAck;
-   }
-
-   
 }

Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -95,8 +95,6 @@
    
    private String queueName;
    
-   private boolean shouldAck;
-   
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
@@ -122,9 +120,7 @@
       this.consumerID = cons.getID();      
         
       this.maxDeliveries = cons.getMaxDeliveries();
-      
-      shouldAck = cons.isShouldAck();      
-            
+         
       if (subName != null)
       {
          queueName = MessageQueueNameHelper.createSubscriptionName(conn.getClientID(), subName);
@@ -280,7 +276,7 @@
                for (int i = 0; i < mesList.size(); i++)
                {
                   JBossMessage m = (JBossMessage)mesList.get(i);
-                  session.addAsfMessage(m, consumerID, queueName, maxDeliveries, sess, shouldAck);
+                  session.addAsfMessage(m, consumerID, queueName, maxDeliveries, sess);
                   if (trace) { log.trace("added " + m + " to session"); }
                }
 

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -224,7 +224,7 @@
                MessageHandler.callOnMessage(session, distinguishedListener, holder.getConsumerID(),
                                                 false,
                                                 holder.getMsg(), ackMode, holder.getMaxDeliveries(),
-                                                holder.getConnectionConsumerSession(), holder.isShouldAck());
+                                                holder.getConnectionConsumerSession());
             }
          }
       }
@@ -497,12 +497,12 @@
     * with messages to be processed by the session's run() method
     */
    void addAsfMessage(JBossMessage m, String consumerID, String queueName, int maxDeliveries,
-                      ClientSession connectionConsumerSession, boolean shouldAck) throws JMSException
+                      ClientSession connectionConsumerSession) throws JMSException
    {
       
       AsfMessageHolder holder =
          new AsfMessageHolder(m, consumerID, queueName, maxDeliveries,
-                              connectionConsumerSession, shouldAck);
+                              connectionConsumerSession);
 
       if (asfMessages == null)
       {

Modified: trunk/src/main/org/jboss/jms/client/MessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/MessageHandler.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/MessageHandler.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -54,18 +54,17 @@
          JBossMessage m,
          int ackMode,
          int maxDeliveries,
-         ClientSession connectionConsumerSession,
-         boolean shouldAck)
+         ClientSession connectionConsumerSession)
    throws JMSException
    {      
-      if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+      if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries))
       {
          // Message has been cancelled
          return;
       }
 
       DeliveryInfo deliveryInfo =
-         new DeliveryInfo(m, consumerID, connectionConsumerSession, shouldAck);
+         new DeliveryInfo(m, consumerID, connectionConsumerSession);
 
       m.incDeliveryCount();
 
@@ -114,7 +113,7 @@
    
    public static boolean checkExpiredOrReachedMaxdeliveries(JBossMessage jbm,
          ClientSession del,
-         int maxDeliveries, boolean shouldCancel)
+         int maxDeliveries)
    {
       Message msg = jbm.getCoreMessage();
 
@@ -135,21 +134,18 @@
                log.trace(msg + " has reached maximum delivery number " + maxDeliveries +", cancelling to server");
             }
          }
-
-         if (shouldCancel)
-         {           
-            final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
-                  expired, reachedMaxDeliveries);          
-            try
-            {
-               del.cancelDelivery(cancel);
-            }
-            catch (JMSException e)
-            {
-               log.error("Failed to cancel delivery", e);
-            }   
+      
+         final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
+               expired, reachedMaxDeliveries);          
+         try
+         {
+            del.cancelDelivery(cancel);
          }
-
+         catch (JMSException e)
+         {
+            log.error("Failed to cancel delivery", e);
+         }   
+         
          return true;
       }
       else

Modified: trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -40,8 +40,6 @@
    
    int getMaxDeliveries();
    
-   boolean isShouldAck();
-   
    void handleMessage(JBossMessage message) throws Exception;
    
    void addToFrontOfBuffer(JBossMessage message) throws JMSException;

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -89,8 +89,7 @@
    private QueuedExecutor sessionExecutor;
    private boolean listenerRunning;
    private long lastDeliveryId = -1;
-   private boolean waitingForLastDelivery;
-   private boolean shouldAck;     
+   private boolean waitingForLastDelivery;   
    private int consumeCount;
    private MessagingRemotingConnection remotingConnection;
    
@@ -105,8 +104,7 @@
                              Destination dest,
                              String selector, boolean noLocal,
                              boolean isCC, QueuedExecutor sessionExecutor,
-                             MessagingRemotingConnection remotingConnection,
-                             boolean shouldAck)
+                             MessagingRemotingConnection remotingConnection)
    {
       this.id = id;
       this.session = session;
@@ -118,7 +116,6 @@
       this.noLocal = noLocal;
       this.isConnectionConsumer = isCC;
       this.sessionExecutor = sessionExecutor;
-      this.shouldAck = shouldAck; 
       this.remotingConnection = remotingConnection;
       
    }
@@ -329,11 +326,11 @@
                if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
                        
                boolean ignore =
-                  MessageHandler.checkExpiredOrReachedMaxdeliveries(m, session, maxDeliveries, shouldAck);
+                  MessageHandler.checkExpiredOrReachedMaxdeliveries(m, session, maxDeliveries);
                
                if (!isConnectionConsumer && !ignore)
                {
-                  DeliveryInfo info = new DeliveryInfo(m, id, null, shouldAck);
+                  DeliveryInfo info = new DeliveryInfo(m, id, null);
                                                     
                   session.preDeliver(info);                  
                   
@@ -405,11 +402,6 @@
       }
    }
    
-   public boolean isShouldAck()
-   {
-      return this.shouldAck;
-   }
-   
    public void handleMessage(final JBossMessage message) throws Exception
    {
       synchronized (mainLock)
@@ -499,7 +491,7 @@
          // consumer's deliveries until then), which is too late - since we need to preserve the
          // order of messages delivered in a session.
          
-         if (shouldAck && !buffer.isEmpty())
+         if (!buffer.isEmpty())
          {                        
             // Now we cancel any deliveries that might be waiting in our buffer. This is because
             // otherwise the messages wouldn't get cancelled until the corresponding session died.
@@ -836,7 +828,7 @@
             try
             {
                MessageHandler.callOnMessage(session, theListener, id,
-                             false, msg, session.getAcknowledgeMode(), maxDeliveries, null, shouldAck);
+                             false, msg, session.getAcknowledgeMode(), maxDeliveries, null);
                
                if (trace) { log.trace("Called callonMessage"); }
             }

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -423,14 +423,12 @@
       
       CreateConsumerResponse response = (CreateConsumerResponse)remotingConnection.sendBlocking(id, request);
       
-      boolean shouldAck = !(destination.getType() == DestinationType.TOPIC && subscriptionName == null);
-
       ClientConsumer consumer =
          new ClientConsumerImpl(this, response.getConsumerID(), response.getBufferSize(),
                response.getMaxDeliveries(), response.getRedeliveryDelay(),
             destination,
             selector, noLocal,
-            isCC, executor, remotingConnection, shouldAck);
+            isCC, executor, remotingConnection);
 
       children.put(response.getConsumerID(), consumer);
 
@@ -1010,37 +1008,27 @@
 
    private boolean ackDelivery(DeliveryInfo delivery) throws JMSException
    {
-   	if (delivery.isShouldAck())
-   	{
-	      ClientSession connectionConsumerSession = delivery.getConnectionConsumerSession();
+   	ClientSession connectionConsumerSession = delivery.getConnectionConsumerSession();
 
-	      //If the delivery was obtained via a connection consumer we need to ack via that
-	      //otherwise we just use this session
+      //If the delivery was obtained via a connection consumer we need to ack via that
+      //otherwise we just use this session
 
-	      ClientSession sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : this;
+      ClientSession sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : this;
 
-	      return sessionToUse.acknowledgeDelivery(delivery);
-   	}
-   	else
-   	{
-   		return true;
-   	}
+      return sessionToUse.acknowledgeDelivery(delivery);
    }
 
    private void cancelDelivery(DeliveryInfo delivery) throws JMSException
    {
-   	if (delivery.isShouldAck())
-   	{
-   	   ClientSession connectionConsumerSession = delivery.getConnectionConsumerSession();
+	   ClientSession connectionConsumerSession = delivery.getConnectionConsumerSession();
 
-	      //If the delivery was obtained via a connection consumer we need to cancel via that
-	      //otherwise we just use this session
+      //If the delivery was obtained via a connection consumer we need to cancel via that
+      //otherwise we just use this session
 
-   	   ClientSession sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : this;
+	   ClientSession sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : this;
 
-	      sessionToUse.cancelDelivery(new CancelImpl(delivery.getDeliveryID(),
-	                                  delivery.getMessage().getDeliveryCount(), false, false));
-   	}
+      sessionToUse.cancelDelivery(new CancelImpl(delivery.getDeliveryID(),
+                                  delivery.getMessage().getDeliveryCount(), false, false));   	
    }
 
    private void internalCancelDeliveries( List deliveryInfos) throws JMSException
@@ -1051,14 +1039,11 @@
       {
          DeliveryInfo ack = (DeliveryInfo)i.next();
 
-         if (ack.isShouldAck())
-         {
-	         CancelImpl cancel = new CancelImpl(ack.getMessage().getDeliveryId(),
-	                                                  ack.getMessage().getDeliveryCount(),
-	                                                  false, false);
+         CancelImpl cancel = new CancelImpl(ack.getMessage().getDeliveryId(),
+                                                  ack.getMessage().getDeliveryCount(),
+                                                  false, false);
 
-	         cancels.add(cancel);
-         }
+         cancels.add(cancel);         
       }
 
       if (!cancels.isEmpty())
@@ -1069,22 +1054,10 @@
 
    private void acknowledgeDeliveries(ClientSession del, List deliveryInfos) throws JMSException
    {
-      List acks = new ArrayList();
-
-      for (Iterator i = deliveryInfos.iterator(); i.hasNext(); )
+      if (!deliveryInfos.isEmpty())
       {
-         DeliveryInfo ack = (DeliveryInfo)i.next();
-
-         if (ack.isShouldAck())
-         {
-	         acks.add(ack);
-         }
+         del.acknowledgeDeliveries(deliveryInfos);
       }
-
-      if (!acks.isEmpty())
-      {
-         del.acknowledgeDeliveries(acks);
-      }
    }
 
    // Inner Classes --------------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/impl/DeliveryInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/DeliveryInfo.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/impl/DeliveryInfo.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -45,9 +45,6 @@
 
    private JBossMessage msg;
    
-   //For messages in non durable subscriptions - there is no need to ack on the server
-   private boolean shouldAck;
-   
    //When using the evil abomination known as a ConnectionConsumer, the connection consumer
    //will get from a session that it created, then pass them onto sessions got from the pool
    //this means when the messages are acked/cancelled then this needs to be done against
@@ -62,15 +59,13 @@
    // Constructors --------------------------------------------------
    
    public DeliveryInfo(JBossMessage msg, String consumerId, 
-                       ClientSession connectionConsumerSession, boolean shouldAck)
+                       ClientSession connectionConsumerSession)
    {      
       this.msg = msg;
       
       this.consumerId = consumerId;
       
       this.connectionConsumerSession = connectionConsumerSession;
-      
-      this.shouldAck = shouldAck;
    }
 
    // Public --------------------------------------------------------
@@ -90,11 +85,6 @@
       return connectionConsumerSession;
    }
    
-   public boolean isShouldAck()
-   {
-   	return shouldAck;
-   }
-   
    public String toString()
    {
       return "Delivery[" + getDeliveryID() + ", " + msg + "]";

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -52,8 +52,6 @@
 
    void stop() throws JMSException;
 
-   void sendTransaction(TransactionRequest request) throws JMSException;
-
-   MessagingXid[] getPreparedTransactions() throws JMSException; ;
+   void sendTransaction(TransactionRequest request) throws JMSException;   
 }
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -202,14 +202,14 @@
 
    public CreateSessionResponse createSession(boolean transacted,
                                               int acknowledgmentMode,
-                                              boolean isXA)
+                                              boolean xa)
       throws JMSException
    {
       try
       {
          log.trace(this + " creating " + (transacted ? "transacted" : "non transacted") +
             " session, " + Util.acknowledgmentMode(acknowledgmentMode) + ", " +
-            (isXA ? "XA": "non XA"));
+            (xa ? "XA": "non XA"));
 
          if (closed)
          {
@@ -222,7 +222,7 @@
          // connection endpoint instance
 
          //Note we only replicate transacted and client acknowledge sessions.
-         ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this);
+         ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this, transacted, xa);
 
          synchronized (sessions)
          {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -109,12 +109,8 @@
    // Must be volatile
    private volatile boolean clientAccepting;
 
-   private boolean retainDeliveries;
-   
    private long lastDeliveryID = -1;
    
-   private volatile boolean dead;
-
    private int prefetchSize;
    
    private volatile int sendCount;
@@ -163,19 +159,6 @@
       
       this.filter = filter;
                 
-      //FIXME - we shouldn't have checks like this on the server side
-      //It should be the jms client that decides whether to retain deliveries or not
-      if (destination.getType() == DestinationType.TOPIC && !messageQueue.isDurable())
-      {
-         // This is a consumer of a non durable topic subscription. We don't need to store
-         // deliveries since if the consumer is closed or dies the refs go too.
-         this.retainDeliveries = false;
-      }
-      else
-      {
-         this.retainDeliveries = true;
-      }
-      
       this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
       
       // adding the consumer to the queue
@@ -415,11 +398,6 @@
    	return this.id;
    }
 
-   boolean isRetainDeliveries()
-   {
-   	return this.retainDeliveries;
-   }
-   
    void setLastDeliveryID(long id)
    {
    	this.lastDeliveryID = id;
@@ -431,16 +409,6 @@
       this.started = started;      
    }
    
-   void setDead()
-   {
-      dead = true;
-   }
-   
-   boolean isDead()
-   {
-      return dead;
-   }
-   
    Queue getDLQ()
    {
       return dlq;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -112,7 +112,6 @@
  * Session implementation
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  * Parts derived from JBM 1.x ServerSessionEndpoint by
  *
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -133,8 +132,6 @@
 
    static final String TEMP_QUEUE_MESSAGECOUNTER_PREFIX = "TempQueue.";
 
-   private static final long DELIVERY_WAIT_TIMEOUT = 5 * 1000;
-
    private static final long CLOSE_WAIT_TIMEOUT = 5 * 1000;
 
    // Static ---------------------------------------------------------------------------------------
@@ -153,18 +150,15 @@
 
    private MessagingServer sp;
 
-   private Map consumers;
-   private Map browsers;
+   private Map consumers = new HashMap();
+   private Map browsers = new HashMap();
 
    private PostOffice postOffice;
-   private int nodeId;
    private int defaultMaxDeliveryAttempts;
    private long defaultRedeliveryDelay;
    private Queue defaultDLQ;
    private Queue defaultExpiryQueue;
 
-   private Object deliveryLock = new Object();
-
    // Map <deliveryID, Delivery>
    private Map deliveries;
 
@@ -173,15 +167,18 @@
    //Temporary until we have our own NIO transport
    QueuedExecutor executor = new QueuedExecutor(new LinkedQueue());
 
-   private LinkedQueue toDeliver = new LinkedQueue();
-
-   private boolean waitingToClose = false;
-
    private Object waitLock = new Object();
+   
+   private Transaction tx;
+   
+   private boolean transacted;
+   
+   private boolean xa;
 
    // Constructors ---------------------------------------------------------------------------------
 
-   ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
+   ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
+                         boolean transacted, boolean xa) throws Exception
    {
       this.id = sessionID;
 
@@ -191,12 +188,6 @@
 
       postOffice = sp.getPostOffice();
 
-      nodeId = sp.getConfiguration().getMessagingServerID();
-
-      consumers = new HashMap();
-
-		browsers = new HashMap();
-
       defaultDLQ = sp.getDefaultDLQInstance();
 
       defaultExpiryQueue = sp.getDefaultExpiryQueueInstance();
@@ -208,6 +199,15 @@
       deliveries = new ConcurrentHashMap();
 
       deliveryIdSequence = new SynchronizedLong(0);
+      
+      this.transacted = transacted;
+      
+      this.xa = xa;
+      
+      if (transacted && !xa)
+      {
+         tx = new TransactionImpl();
+      }
    }
 
    // SessionDelegate implementation ---------------------------------------------------------------
@@ -853,35 +853,14 @@
 
    	 if (trace) { log.trace("Delivery id is now " + deliveryId); }
 
-   	 //TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
-       if (consumer.isRetainDeliveries())
-       {
-      	 // Add a delivery
+   	 // Add a delivery
 
-      	 rec = new DeliveryRecord(ref, consumer, deliveryId);
+   	 rec = new DeliveryRecord(ref, consumer, deliveryId);
 
-          deliveries.put(new Long(deliveryId), rec);
+       deliveries.put(new Long(deliveryId), rec);
 
-          if (trace) { log.trace(this + " added delivery " + deliveryId + ": " + ref); }
-       }
-       else
-       {
-       	//Acknowledge it now
-       	try
-       	{
-       		//This basically just releases the memory reference
+       if (trace) { log.trace(this + " added delivery " + deliveryId + ": " + ref); }
 
-       		if (trace) { log.trace("Acknowledging delivery now"); }
-
-       		ref.acknowledge(connectionEndpoint.getMessagingServer().getPersistenceManager());
-       	}
-       	catch (Throwable t)
-       	{
-       		log.error("Failed to acknowledge delivery", t);
-       	}
-       }
-
-
        performDelivery(ref, deliveryId, consumer);
    }
 
@@ -894,12 +873,6 @@
    		return;
    	}
 
-      if (consumer.isDead())
-      {
-         //Ignore any responses that come back after consumer has died
-         return;
-      }
-
    	if (trace) { log.trace(this + " performing delivery for " + ref); }
 
       // We send the message to the client on the current thread. The message is written onto the
@@ -946,17 +919,6 @@
       {
          long id = ack.getDeliveryID();
 
-         //TODO - do this more elegantly
-         if (ack instanceof DeliveryInfo)
-         {
-         	if (!((DeliveryInfo)ack).isShouldAck())
-         	{
-         		//If we are in VM then acks for non durable subs will still exist - this
-         		//won't happen remoptely since they are not written to the wire
-         		continue;
-         	}
-         }
-
          DeliveryRecord rec = (DeliveryRecord)deliveries.get(id);
 
          DeliveryCallback cb = new DeliveryCallback(id);
@@ -1006,11 +968,7 @@
 
       try
       {
-         //Prompting delivery must be asynchronous to avoid deadlock
-         //but we cannot use one way invocations on cancelDelivery and
-         //cancelDeliveries because remoting one way invocations can
-         //overtake each other in flight - this problem will
-         //go away when we have our own transport and our dedicated connection
+         //TODO - do we really need to prompt on a different thread?
          this.executor.execute(new Runnable() { public void run() { queue.deliver();} } );
 
       }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -25,6 +25,7 @@
 import java.util.List;
 
 import javax.jms.JMSException;
+import javax.transaction.xa.Xid;
 
 import org.jboss.jms.client.Closeable;
 import org.jboss.jms.client.impl.Ack;
@@ -120,5 +121,27 @@
    int getDupsOKBatchSize();
 
    public boolean isStrictTck();
+   
+//   public void XAStart(Xid xid) throws JMSException;
+//   
+//   public void XAEnd(Xid xid) throws JMSException;
+//   
+//   public void XASuspend(Xid xid) throws JMSException;
+//   
+//   public void XAJoin(Xid xid) throws JMSException;
+//   
+//   public void XAResume(Xid xid) throws JMSException;
+//   
+//   public void XAPrepare(Xid xid) throws JMSException;
+//   
+//   public void XACommit(Xid xid, boolean onePhase) throws JMSException;
+//   
+//   public void XARollback(Xid xid) throws JMSException;
+//   
+//   public List<Xid> XARecover() throws JMSException;
+//   
+//   public void XASetTxTimeout(int seconds) throws JMSException;
+//   
+//   public int XAGetTimeout() throws JMSException;
 }
 

Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -67,13 +67,6 @@
 
    private boolean clientSide;
    
-   private boolean hasPersistentAcks;
-   
-   private boolean failedOver;
-   
-   private boolean removeAcks;
-
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -109,29 +102,9 @@
       }
       SessionTxState sessionTxState = getSessionTxState(sessionId);
 
-      sessionTxState.addAck(info);
-      
-      if (info.getMessage().getCoreMessage().isDurable())
-      {
-         hasPersistentAcks = true;
-      }
-      
-      if (!info.isShouldAck())
-      {
-      	removeAcks = true;
-      }
+      sessionTxState.addAck(info);           
    }
    
-   public boolean hasPersistentAcks()
-   {
-      return hasPersistentAcks;
-   }
-   
-   public boolean isFailedOver()
-   {
-      return failedOver;
-   }
-   
    public void clearMessages()
    {
       if (!clientSide)
@@ -178,47 +151,8 @@
       }
    }
 
-   /*
-   * Substitute newSessionID for oldSessionID
-   */
-   public void handleFailover(int newServerID, String oldSessionID, String newSessionID)
-   {
-      if (!clientSide)
-      {
-         throw new IllegalStateException("Cannot call this method on the server side");
-      }
-      
-      // Note we have to do this in one go since there may be overlap between old and new session
-      // IDs and we don't want to overwrite keys in the map.
+   
 
-      Map<String, SessionTxState> tmpMap = null;
-
-      if (sessionStatesMap != null)
-      {
-         for (SessionTxState state: sessionStatesMap.values())
-         {
-            boolean handled = state.handleFailover(newServerID, oldSessionID, newSessionID);
-
-            if (handled)
-            {
-	            if (tmpMap == null)
-	            {
-	               tmpMap = new LinkedHashMap<String, SessionTxState>();
-	            }
-	            tmpMap.put(newSessionID, state);
-            }
-         }
-      }
-
-      if (tmpMap != null)
-      {
-         // swap
-         sessionStatesMap = tmpMap;
-      }
-      
-      failedOver = true;
-   }
-
    /**
     * May return an empty list, but never null.
     */
@@ -293,12 +227,8 @@
             {
                DeliveryInfo ack = (DeliveryInfo)iter2.next();
                
-               //We don't want to send acks for things like non durable subs which will have been already acked
-               if (ack.isShouldAck())
-               {
-               	//We only need the delivery id written
-               	out.writeLong(ack.getMessage().getDeliveryId());
-               }
+            	//We only need the delivery id written
+            	out.writeLong(ack.getMessage().getDeliveryId());               
             }
             
             //Marker for end of acks

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2008-01-23 09:40:25 UTC (rev 3617)
@@ -136,20 +136,8 @@
       tx.addMessage(sessionId, m);
    }
    
+
    /*
-    * Failover session from old session ID -> new session ID
-    */
-   public void handleFailover(int newServerID, String oldSessionID, String newSessionID)
-   {	
-      for (Iterator i = transactions.values().iterator(); i.hasNext(); )
-      {
-         ClientTransaction tx = (ClientTransaction)i.next();
-         
-         tx.handleFailover(newServerID, oldSessionID, newSessionID);
-      }                
-   }   
-   
-   /*
     * Get all the deliveries corresponding to the session ID
     */
    public List getDeliveriesForSession(String sessionID)
@@ -167,8 +155,7 @@
       
       return ackInfos;
    }
-   
-   
+      
    /**
     * Add an acknowledgement to the transaction
     * 




More information about the jboss-cvs-commits mailing list