[jboss-cvs] JBoss Messaging SVN: r3843 - in trunk: src/main/org/jboss/messaging/core/client/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Mar 4 11:23:03 EST 2008


Author: timfox
Date: 2008-03-04 11:23:03 -0500 (Tue, 04 Mar 2008)
New Revision: 3843

Modified:
   trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
   trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSL.java
   trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSLTest.java
Log:
More tweaks


Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java	2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java	2008-03-04 16:23:03 UTC (rev 3843)
@@ -16,7 +16,7 @@
 public interface ClientConnection
 {    
    ClientSession createClientSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
-                                     int ackBatchSize, boolean cacheProducers) throws MessagingException;
+                                     int ackBatchSize, boolean blockOnAcknowledge, boolean cacheProducers) throws MessagingException;
 
    void start() throws MessagingException;
 

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-03-04 16:23:03 UTC (rev 3843)
@@ -54,5 +54,11 @@
    
    void close() throws MessagingException;
    
-   boolean isClosed();        
+   boolean isClosed();     
+   
+   boolean isAutoCommitSends();
+   
+   boolean isAutoCommitAcks();
+   
+   int getLazyAckBatchSize();
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-03-04 16:23:03 UTC (rev 3843)
@@ -98,7 +98,8 @@
    // ClientConnection implementation --------------------------------------------------------------
 
    public ClientSession createClientSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks,
-                                            final int ackBatchSize, final boolean cacheProducers) throws MessagingException
+                                            final int ackBatchSize, final boolean blockOnAcknowledge,
+                                            final boolean cacheProducers) throws MessagingException
    {
       checkClosed();
 
@@ -108,7 +109,7 @@
 
       ClientSession session =
       	new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers, maxProducerRate,
-      			                producerWindowSize);
+      			                producerWindowSize, autoCommitSends, autoCommitAcks, blockOnAcknowledge);
 
       children.put(response.getSessionID(), session);
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-03-04 16:23:03 UTC (rev 3843)
@@ -140,11 +140,19 @@
    
    private long lastCommittedID = -1;
    
+   private final boolean autoCommitAcks;
+   
+   private final boolean autoCommitSends;
+   
+   private final boolean blockOnAcknowledge;
+   
    // Constructors ---------------------------------------------------------------------------------
    
    public ClientSessionImpl(final ClientConnectionInternal connection, final String id,
                             final int lazyAckBatchSize, final boolean cacheProducers,
-                            final int maxProducerRate, final int producerWindowSize) throws MessagingException
+                            final int maxProducerRate, final int producerWindowSize,
+                            final boolean autoCommitSends, final boolean autoCommitAcks,
+                            final boolean blockOnAcknowledge) throws MessagingException
    {
    	if (lazyAckBatchSize < -1 || lazyAckBatchSize == 0)
    	{
@@ -175,6 +183,12 @@
       {
       	producerCache = null;
       }
+      
+      this.autoCommitAcks = autoCommitAcks;
+      
+      this.autoCommitSends = autoCommitSends;
+      
+      this.blockOnAcknowledge = blockOnAcknowledge;
    }
    
    // ClientSession implementation -----------------------------------------------------------------
@@ -353,15 +367,24 @@
    {
       checkClosed();
             
-      //First we tell each consumer to clear it's buffers and ignore any deliveries with
+      //We tell each consumer to clear it's buffers and ignore any deliveries with
       //delivery id > last delivery id, until it gets delivery id = lastID again
       
+      if (autoCommitAcks)
+      {
+      	lastCommittedID = lastID;
+      }
+      
       for (ClientConsumerInternal consumer: consumers.values())
       {
          consumer.recover(lastCommittedID + 1);
       }
       
+      //We flush any remaining acks
+      
       acknowledgeInternal(false);      
+      
+      toAckCount = 0;
 
       remotingConnection.send(id, new SessionRollbackMessage());
    }
@@ -385,30 +408,18 @@
          
          toAckCount = 0;
       }
-      else if (broken)
+      else if (broken || toAckCount == lazyAckBatchSize)
       {
          //Must always ack now
          acknowledgeInternal(false);
          
          toAckCount = 0;
-      }
-      else
-      {
-         if (toAckCount == lazyAckBatchSize)
+         
+         if (autoCommitAcks)
          {
-            acknowledgeInternal(false);
-            
-            toAckCount = 0;
-         }                       
-      }            
-      
-      //FIXME - temp hack - make server sessions alwyas transacted!!
-      
-      if (this.lazyAckBatchSize != -1)
-      {
-      	lastCommittedID = lastID;
+         	lastCommittedID = lastID;
+         }
       }
-      
    }
 
    public synchronized void close() throws MessagingException
@@ -453,6 +464,21 @@
       return closed;
    }
    
+   public boolean isAutoCommitSends()
+   {
+   	return autoCommitSends;
+   }
+   
+   public boolean isAutoCommitAcks()
+   {
+   	return autoCommitAcks;   	   	
+   }
+   
+   public int getLazyAckBatchSize()
+   {
+   	return lazyAckBatchSize;
+   }
+   
    // ClientSessionInternal implementation ------------------------------------------------------------
    
    public String getID()

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-03-04 16:23:03 UTC (rev 3843)
@@ -385,42 +385,59 @@
       {
          int ackBatchSize;
          
-         if (transacted || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
+         final boolean autoCommitSends;
+
+         final boolean autoCommitAcks;
+         
+         final boolean blockOnAcknowledge;
+
+      	if (acknowledgeMode == Session.SESSION_TRANSACTED)
+      	{
+      		autoCommitSends = false;
+
+            autoCommitAcks = false;
+            
+            ackBatchSize = -1; //Infinite
+            
+            blockOnAcknowledge = false;
+      	}
+      	else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
          {
-            ackBatchSize = -1; //Infinite
+            autoCommitSends = true;
+
+            autoCommitAcks = true;
+            
+            ackBatchSize = 1;
+            
+            blockOnAcknowledge = true;
          }
          else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
          {
-            ackBatchSize = dupsOKBatchSize;
+         	autoCommitSends = true;
+         	
+         	autoCommitAcks = true;
+         	
+         	ackBatchSize = dupsOKBatchSize;
+         	
+         	blockOnAcknowledge = false;
          }
-         else
+         else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
          {
-            //Auto ack
-            ackBatchSize = 1;
-         }
+            autoCommitSends = true;
 
-         boolean autoCommitSends = false;
-
-         boolean autoCommitAcks = false;
-
-         if (!transacted)
+            autoCommitAcks = false;
+            
+            ackBatchSize = -1; //Infinite
+            
+            blockOnAcknowledge = false;
+         }         
+         else
          {
-            if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
-            {
-               autoCommitSends = true;
-
-               autoCommitAcks = true;
-            }
-            else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
-            {
-               autoCommitSends = true;
-
-               autoCommitAcks = false;
-            }
+         	throw new IllegalArgumentException("Invalid ackmode: " + acknowledgeMode);
          }
 
          ClientSession session =
-         	connection.createClientSession(isXA, autoCommitSends, autoCommitAcks, ackBatchSize, cacheProducers);
+         	connection.createClientSession(isXA, autoCommitSends, autoCommitAcks, ackBatchSize, blockOnAcknowledge, cacheProducers);
 
          justCreated = false;
          

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSL.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSL.java	2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSL.java	2008-03-04 16:23:03 UTC (rev 3843)
@@ -82,8 +82,7 @@
          MessagingServer server = new MessagingServerImpl();
          ClientConnectionFactory cf = new ClientConnectionFactoryImpl(0, remotingConf, server.getVersion());
          ClientConnection conn = cf.createConnection(null, null);
-         ClientSession session = conn.createClientSession(false, true, true, 0,
-               false);
+         ClientSession session = conn.createClientSession(false, true, true, 0, false, false);
          ClientProducer producer = session.createProducer(QUEUE);
 
          MessageImpl message = new MessageImpl(JBossTextMessage.TYPE, false, 0,

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSLTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSLTest.java	2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSLTest.java	2008-03-04 16:23:03 UTC (rev 3843)
@@ -130,8 +130,7 @@
 
       ClientConnectionFactory cf = new ClientConnectionFactoryImpl(0, remotingConf, server.getVersion());
       connection = cf.createConnection(null, null);
-      ClientSession session = connection.createClientSession(false, true, true,
-            0, false);
+      ClientSession session = connection.createClientSession(false, true, true, 0, false, false);
       session.createQueue(QUEUE, QUEUE, null, false, false);
       consumer = session.createConsumer(QUEUE, null, false, false, true);
       connection.start();




More information about the jboss-cvs-commits mailing list