[jboss-cvs] JBoss Messaging SVN: r5388 - in trunk: src/main/org/jboss/messaging/core/client and 20 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 19 06:30:07 EST 2008


Author: ataylor
Date: 2008-11-19 06:30:06 -0500 (Wed, 19 Nov 2008)
New Revision: 5388

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/config/jbm-jndi.xml
   trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
   trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
   trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
Log:
added pre commit functionality

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/config/jbm-configuration.xml	2008-11-19 11:30:06 UTC (rev 5388)
@@ -86,7 +86,7 @@
             </params>            
          </acceptor>
          <!-- Netty standard TCP acceptor -->
-         <acceptor>
+         <acceptor name="netty">
             <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
             <params>	            	            
 	            <param key="jbm.remoting.netty.host" value="localhost" type="String"/>

Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/config/jbm-jndi.xml	2008-11-19 11:30:06 UTC (rev 5388)
@@ -18,6 +18,19 @@
       <entry name="java:/XAConnectionFactory"/>
    </connection-factory>
 
+   <connection-factory name="ServerAckConnectionFactory">
+      <connector>
+         <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
+      </connector>
+      <pre-commit-acks>true</pre-commit-acks>
+      <entry name="ServerAckConnectionFactory"/>
+      <entry name="/ServerAckConnectionFactory"/>
+      <entry name="/ServerAckXAConnectionFactory"/>
+      <entry name="java:/ServerAckConnectionFactory"/>
+      <entry name="java:/ServerAckXAConnectionFactory"/>
+   </connection-factory>
+
+
    <connection-factory name="ClusteredConnectionFactory">
       <connector>
          <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
@@ -69,8 +82,10 @@
       <send-p-messages-synchronously>true</send-p-messages-synchronously>
       <!--If true, any connections will automatically set a unique group id (per producer) on every message sent-->
       <auto-group-id>true</auto-group-id>
+      <!--if true then the server will pre ack any message before delivery to a consumer-->
+      <pre-commit-acks>false</pre-commit-acks>
    </connection-factory>
-   
+
    <connection-factory name="TestInVMConnectionFactory">
       <connector>
          <factory-class>org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -80,19 +95,19 @@
       </connector>
       <entry name="/TestInVMConnectionFactory"/>
    </connection-factory>
-   
+
    <connection-factory name="TestSSLConnectionFactory">
       <connector>
          <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
          <params>
             <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
-            <param key="jbm.remoting.netty.port" value="5500" type="Integer"/>	                       
+            <param key="jbm.remoting.netty.port" value="5500" type="Integer"/>
             <param key="jbm.remoting.netty.sslenabled" value="true" type="Boolean"/>
             <param key="jbm.remoting.netty.keystorepath" value="messaging.keystore" type="String"/>
             <param key="jbm.remoting.netty.keystorepassword" value="secureexample" type="String"/>
          </params>
-      </connector>      
-      <entry name="/TestSSLConnectionFactory"/>      
+      </connector>
+      <entry name="/TestSSLConnectionFactory"/>
    </connection-factory>
 
    <queue name="MyQueue">

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -28,11 +28,9 @@
 import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 
 /**
- * 
  * A ClientSessionFactory
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  */
 public interface ClientSessionFactory
 {
@@ -43,8 +41,14 @@
                                boolean xa,
                                boolean autoCommitSends,
                                boolean autoCommitAcks,
+                               final boolean preCommitAcks,
                                int ackBatchSize) throws MessagingException;
 
+   ClientSession createSession(final boolean xa,
+                               final boolean autoCommitSends,
+                               final boolean autoCommitAcks,
+                               final boolean preCommitAcks) throws MessagingException;
+
    void setConsumerWindowSize(int size);
 
    int getConsumerWindowSize();

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -83,6 +83,8 @@
    
    public static final int DEFAULT_ACK_BATCH_SIZE = 1024 * 1024;
 
+   public static final boolean DEFAULT_PRE_COMMIT_ACKS = false;
+
    // Attributes
    // -----------------------------------------------------------------------------------
 
@@ -127,6 +129,8 @@
    
    private volatile int ackBatchSize;
 
+   private volatile boolean preCommitAcks;
+
    private final Set<ClientSessionInternal> sessions = new HashSet<ClientSessionInternal>();
    
    private final Object exitLock = new Object();
@@ -161,6 +165,7 @@
                                    final boolean blockOnPersistentSend,
                                    final boolean autoGroup,
                                    final int maxConnections,
+                                   final boolean preCommitAcks,
                                    final int ackBatchSize)
    {
       connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
@@ -204,6 +209,7 @@
       this.minLargeMessageSize = minLargeMessageSize;
       this.autoGroup = autoGroup;
       this.maxConnections = maxConnections;
+      this.preCommitAcks = preCommitAcks;
       this.ackBatchSize = ackBatchSize;
    }
 
@@ -224,6 +230,7 @@
            DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
            DEFAULT_AUTO_GROUP,
            DEFAULT_MAX_CONNECTIONS,
+           DEFAULT_PRE_COMMIT_ACKS,
            DEFAULT_ACK_BATCH_SIZE);
    }
 
@@ -242,19 +249,28 @@
                                       final String password,
                                       final boolean xa,
                                       final boolean autoCommitSends,
-                                      final boolean autoCommitAcks,                        
+                                      final boolean autoCommitAcks,
+                                      final boolean preCommitAcks,
                                       final int ackBatchSize) throws MessagingException
    {
-      return createSessionInternal(username, password, xa, autoCommitSends, autoCommitAcks, ackBatchSize);
+      return createSessionInternal(username, password, xa, autoCommitSends, autoCommitAcks, preCommitAcks, ackBatchSize);
    }
 
    public ClientSession createSession(final boolean xa,
                                       final boolean autoCommitSends,
                                       final boolean autoCommitAcks) throws MessagingException
    {
-      return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, ackBatchSize);
+      return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, false, ackBatchSize);
    }
 
+   public ClientSession createSession(final boolean xa,
+                                      final boolean autoCommitSends,
+                                      final boolean autoCommitAcks,
+                                      final boolean preCommitAcks) throws MessagingException
+   {
+      return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preCommitAcks, ackBatchSize);
+   }
+
    public int getConsumerWindowSize()
    {
       return consumerWindowSize;
@@ -553,7 +569,8 @@
                                                final String password,
                                                final boolean xa,
                                                final boolean autoCommitSends,
-                                               final boolean autoCommitAcks,                                             
+                                               final boolean autoCommitAcks,
+                                               final boolean preCommitAcks,
                                                final int ackBatchSize) throws MessagingException
    {
       synchronized (createSessionLock)
@@ -608,6 +625,7 @@
                                                          xa,
                                                          autoCommitSends,
                                                          autoCommitAcks,
+                                                         preCommitAcks,
                                                          sendWindowSize);
       
                Packet pResponse = channel1.sendBlocking(request);
@@ -638,6 +656,7 @@
                                                                         xa,                                                                      
                                                                         autoCommitSends,
                                                                         autoCommitAcks,
+                                                                        preCommitAcks,
                                                                         blockOnAcknowledge,
                                                                         autoGroup,
                                                                         ackBatchSize,

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -135,6 +135,8 @@
 
    private final boolean autoCommitAcks;
 
+   private final boolean preCommitAcks;
+
    private final boolean autoCommitSends;
 
    private final boolean blockOnAcknowledge;
@@ -161,6 +163,7 @@
                             final boolean xa,
                             final boolean autoCommitSends,
                             final boolean autoCommitAcks,
+                            final boolean preCommitAcks,
                             final boolean blockOnAcknowledge,
                             final boolean autoGroup,
                             final int ackBatchSize,
@@ -183,6 +186,8 @@
 
       this.autoCommitAcks = autoCommitAcks;
 
+      this.preCommitAcks = preCommitAcks;
+
       this.autoCommitSends = autoCommitSends;
 
       this.blockOnAcknowledge = blockOnAcknowledge;
@@ -551,6 +556,11 @@
    // This acknowledges all messages received by the consumer so far
    public void acknowledge(final long consumerID, final long messageID) throws MessagingException
    {
+      //if we've pre commited then we don't need to do anything
+      if(preCommitAcks)
+      {
+         return;
+      }
       checkClosed();
 
       SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -53,9 +53,11 @@
    private boolean autoCommitSends;
    
    private boolean autoCommitAcks;
+
+   private boolean preCommitAcks;
    
    private int windowSize;
-   
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -64,7 +66,7 @@
                                final int version, final String username, final String password,
                                final int minLargeMessageSize, 
                                final boolean xa, final boolean autoCommitSends,
-                               final boolean autoCommitAcks, final int windowSize)
+                               final boolean autoCommitAcks, final boolean preCommitAcks, final int windowSize)
    {
       super(CREATESESSION);
       
@@ -87,6 +89,8 @@
       this.autoCommitAcks = autoCommitAcks;
       
       this.windowSize = windowSize;
+
+      this.preCommitAcks = preCommitAcks;
    }
    
    public CreateSessionMessage()
@@ -135,7 +139,12 @@
    {
       return this.autoCommitAcks;
    }
-   
+
+   public boolean isPreCommitAcks()
+   {
+      return preCommitAcks;
+   }
+
    public int getWindowSize()
    {
       return this.windowSize;
@@ -153,6 +162,7 @@
       buffer.putBoolean(autoCommitSends);
       buffer.putBoolean(autoCommitAcks);
       buffer.putInt(windowSize);
+      buffer.putBoolean(preCommitAcks);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
@@ -167,6 +177,7 @@
       autoCommitSends = buffer.getBoolean();
       autoCommitAcks = buffer.getBoolean();
       windowSize = buffer.getInt();
+      preCommitAcks = buffer.getBoolean();
    }
    
    public boolean equals(Object other)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -53,6 +53,8 @@
    private boolean autoCommitSends;
 
    private boolean autoCommitAcks;
+
+   private boolean preCommitAcks;
    
    private int windowSize;
 
@@ -69,6 +71,7 @@
                                         final boolean xa,
                                         final boolean autoCommitSends,
                                         final boolean autoCommitAcks,
+                                        final boolean preCommitAcks,
                                         final int windowSize)
    {
       super(REPLICATE_CREATESESSION);
@@ -90,7 +93,9 @@
       this.autoCommitSends = autoCommitSends;
 
       this.autoCommitAcks = autoCommitAcks;
-      
+
+      this.preCommitAcks = preCommitAcks;
+
       this.windowSize = windowSize;
    }
 
@@ -145,7 +150,12 @@
    {
       return this.autoCommitAcks;
    }
-   
+
+   public boolean isPreCommitAcks()
+   {
+      return preCommitAcks;
+   }
+
    public int getWindowSize()
    {
       return this.windowSize;
@@ -163,6 +173,7 @@
       buffer.putBoolean(autoCommitSends);
       buffer.putBoolean(autoCommitAcks);
       buffer.putInt(windowSize);
+      buffer.putBoolean(preCommitAcks);
    }
 
    public void decodeBody(final MessagingBuffer buffer)
@@ -177,6 +188,7 @@
       autoCommitSends = buffer.getBoolean();
       autoCommitAcks = buffer.getBoolean();
       windowSize = buffer.getInt();
+      preCommitAcks = buffer.getBoolean();
    }
 
    public boolean equals(Object other)

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -73,6 +73,7 @@
                                               RemotingConnection remotingConnection,
                                               boolean autoCommitSends,
                                               boolean autoCommitAcks,
+                                              boolean preCommitAcks,
                                               boolean xa,
                                               int sendWindowSize) throws Exception;
 
@@ -85,6 +86,7 @@
                                                        RemotingConnection remotingConnection,
                                                        boolean autoCommitSends,
                                                        boolean autoCommitAcks,
+                                                       boolean preCommitAcks,
                                                        boolean xa,
                                                        int sendWindowSize) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -72,7 +72,7 @@
 
 /**
  * The messaging server implementation
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
  * @version <tt>$Revision: 3543 $</tt> <p/> $Id: ServerPeer.java 3543 2008-01-07 22:31:58Z clebert.suconic at jboss.com $
@@ -264,7 +264,7 @@
                                                         "\"",
                                                e);
          }
-         
+
          Map<String, Object> backupConnectorParams = backupConnector.getParams();
 
          // TODO don't hardcode ping interval and code timeout
@@ -285,25 +285,25 @@
                                                  scheduledExecutor);
 
          clusterManager.start();
-         
+
          //Deploy the cluster artifacts
-         
+
          for (BroadcastGroupConfiguration config: configuration.getBroadcastGroupConfigurations())
          {
             clusterManager.deployBroadcastGroup(config);
          }
-         
+
          for (DiscoveryGroupConfiguration config: configuration.getDiscoveryGroupConfigurations())
          {
             clusterManager.deployDiscoveryGroup(config);
          }
-         
+
          for (MessageFlowConfiguration config: configuration.getMessageFlowConfigurations())
          {
             clusterManager.deployMessageFlow(config);
          }
       }
-            
+
       started = true;
    }
 
@@ -525,17 +525,18 @@
    }
 
    public CreateSessionResponseMessage replicateCreateSession(final String name,
-                                                                           final long channelID,
-                                                                           final String username,
-                                                                           final String password,
-                                                                           final int minLargeMessageSize,
-                                                                           final int incrementingVersion,
-                                                                           final RemotingConnection connection,
-                                                                           final boolean autoCommitSends,
-                                                                           final boolean autoCommitAcks,
-                                                                           final boolean xa,
-                                                                           final int sendWindowSize) throws Exception
-{
+                                                              final long channelID,
+                                                              final String username,
+                                                              final String password,
+                                                              final int minLargeMessageSize,
+                                                              final int incrementingVersion,
+                                                              final RemotingConnection connection,
+                                                              final boolean autoCommitSends,
+                                                              final boolean autoCommitAcks,
+                                                              final boolean preCommitAcks,
+                                                              final boolean xa,
+                                                              final int sendWindowSize) throws Exception
+   {
       return doCreateSession(name,
                              channelID,
                              username,
@@ -545,6 +546,7 @@
                              connection,
                              autoCommitSends,
                              autoCommitAcks,
+                             preCommitAcks,
                              xa,
                              sendWindowSize);
    }
@@ -558,6 +560,7 @@
                                                      final RemotingConnection connection,
                                                      final boolean autoCommitSends,
                                                      final boolean autoCommitAcks,
+                                                     final boolean preCommitAcks,
                                                      final boolean xa,
                                                      final int sendWindowSize) throws Exception
    {
@@ -572,6 +575,7 @@
                              connection,
                              autoCommitSends,
                              autoCommitAcks,
+                             preCommitAcks,
                              xa,
                              sendWindowSize);
    }
@@ -632,11 +636,12 @@
                                                         final long channelID,
                                                         final String username,
                                                         final String password,
-                                                        final int minLargeMessageSize, 
+                                                        final int minLargeMessageSize,
                                                         final int incrementingVersion,
                                                         final RemotingConnection connection,
                                                         final boolean autoCommitSends,
                                                         final boolean autoCommitAcks,
+                                                        final boolean preCommitAcks,
                                                         final boolean xa,
                                                         final int sendWindowSize) throws Exception
    {
@@ -677,6 +682,7 @@
                                                               minLargeMessageSize,
                                                               autoCommitSends,
                                                               autoCommitAcks,
+                                                              preCommitAcks,
                                                               xa,
                                                               connection,
                                                               storageManager,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -72,6 +72,7 @@
                                                                msg.isXA(),
                                                                msg.isAutoCommitSends(),
                                                                msg.isAutoCommitAcks(),
+                                                               msg.isPreCommitAcks(),
                                                                msg.getWindowSize());
          
          result = channel1.replicatePacket(replPacket);
@@ -100,6 +101,7 @@
                                                connection,
                                                request.isAutoCommitSends(),
                                                request.isAutoCommitAcks(),
+                                               request.isPreCommitAcks(),
                                                request.isXA(),
                                                request.getWindowSize());
                break;
@@ -117,6 +119,7 @@
                                                         connection,
                                                         request.isAutoCommitSends(),
                                                         request.isAutoCommitAcks(),
+                                                        request.isPreCommitAcks(),
                                                         request.isXA(),
                                                         request.getWindowSize());
                break;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -120,6 +120,8 @@
 
    private volatile boolean closed;
 
+   private final boolean preCommitAcks;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -133,7 +135,8 @@
                              final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                              final PostOffice postOffice,
                              final Channel channel,
-                             final PagingManager pager)
+                             final PagingManager pager,
+                             final boolean preCommitAcks)
    {
       this.id = id;
 
@@ -157,6 +160,8 @@
       
       this.pager = pager;
 
+      this.preCommitAcks = preCommitAcks;
+
       messageQueue.addConsumer(this);
       
       this.minLargeMessageSize = session.getMinLargeMessageSize();
@@ -375,7 +380,7 @@
       while (ref.getMessage().getMessageID() != messageID);
 
    }
-      
+
    public MessageReference getExpired(final long messageID) throws Exception
    {
       if (browseOnly)
@@ -537,7 +542,7 @@
             return HandleStatus.NO_MATCH;
          }
 
-         if (!browseOnly)
+         if (!browseOnly || preCommitAcks)
          {
             deliveringRefs.add(ref);
          }
@@ -554,6 +559,11 @@
             sendRegularMessage(ref, message);
          }
 
+         if(preCommitAcks)
+         {
+            doAck(ref);
+         }
+
          return HandleStatus.HANDLED;
       }
       finally

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -164,6 +164,8 @@
 
    private final boolean autoCommitAcks;
 
+   private final boolean preCommitAcks;
+
    private volatile RemotingConnection remotingConnection;
 
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
@@ -211,6 +213,7 @@
                             final int minLargeMessageSize,
                             final boolean autoCommitSends,
                             final boolean autoCommitAcks,
+                            final boolean preCommitAcks,
                             final boolean xa,
                             final RemotingConnection remotingConnection,
                             final StorageManager storageManager,
@@ -236,6 +239,8 @@
 
       this.autoCommitAcks = autoCommitAcks;
 
+      this.preCommitAcks = preCommitAcks;
+
       this.remotingConnection = remotingConnection;
 
       this.storageManager = storageManager;
@@ -392,7 +397,8 @@
                                                           queueSettingsRepository,
                                                           postOffice,
                                                           channel,
-                                                          pager);
+                                                          pager,
+                                                          preCommitAcks);
 
          consumers.put(consumer.getID(), consumer);
 

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -267,7 +267,7 @@
             ClientSession session = null;
             try
             {
-               session = sessionFactory.createSession(username, password, false, true, true, 0);
+               session = sessionFactory.createSession(username, password, false, true, true, false, 0);
 
                // Remove any temporary queues and addresses
 
@@ -455,20 +455,24 @@
 
          if (acknowledgeMode == Session.SESSION_TRANSACTED)
          {
-            session = sessionFactory.createSession(username, password, isXA, false, false, transactionBatchSize);
+            session = sessionFactory.createSession(username, password, isXA, false, false, false, transactionBatchSize);
          }
          else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
          {
-            session = sessionFactory.createSession(username, password, isXA, true, true, 0);
+            session = sessionFactory.createSession(username, password, isXA, true, true, false, 0);
          }
          else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
          {
-            session = sessionFactory.createSession(username, password, isXA, true, true, dupsOKBatchSize);
+            session = sessionFactory.createSession(username, password, isXA, true, true, false, dupsOKBatchSize);
          }
          else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
          {
-            session = sessionFactory.createSession(username, password, isXA, true, false, transactionBatchSize);
+            session = sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize);
          }
+         else if (acknowledgeMode == JBossSession.SERVER_ACKNOWLEDGE)
+         {
+            session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize);
+         }
          else
          {
             throw new IllegalArgumentException("Invalid ackmode: " + acknowledgeMode);

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -98,6 +98,8 @@
 
    private final int maxConnections;
 
+   private final boolean preCommitAcks;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public JBossConnectionFactory(final TransportConfiguration connectorConfig,
@@ -116,7 +118,8 @@
                                  final boolean blockOnNonPersistentSend,
                                  final boolean blockOnPersistentSend,
                                  final boolean autoGroup,
-                                 final int maxConnections)
+                                 final int maxConnections,
+                                 final boolean preCommitAcks)
    {
       this.connectorConfig = connectorConfig;
       this.backupConnectorConfig = backupConnectorConfig;
@@ -135,6 +138,7 @@
       this.blockOnPersistentSend = blockOnPersistentSend;
       this.autoGroup = autoGroup;
       this.maxConnections = maxConnections;
+      this.preCommitAcks = preCommitAcks;
    }
 
    // ConnectionFactory implementation -------------------------------------------------------------
@@ -311,6 +315,7 @@
                                                        blockOnPersistentSend,
                                                        autoGroup,
                                                        maxConnections,
+                                                       preCommitAcks,
                                                        DEFAULT_ACK_BATCH_SIZE);
 
       }
@@ -323,7 +328,7 @@
 
          try
          {
-            sess = sessionFactory.createSession(username, password, false, false, false, 0);
+            sess = sessionFactory.createSession(username, password, false, false, false, false, 0);
          }
          catch (MessagingException e)
          {

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -96,6 +96,7 @@
 
    public static final int TYPE_TOPIC_SESSION = 2;
 
+   public static final int SERVER_ACKNOWLEDGE = 4;
    // Static --------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(JBossSession.class);
@@ -118,6 +119,7 @@
 
    private final Set<JBossMessageConsumer> consumers = new HashSet<JBossMessageConsumer>();
 
+
    // Constructors --------------------------------------------------
 
    public JBossSession(final JBossConnection connection,

Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -123,6 +123,7 @@
                                    boolean blockOnPersistentSend,
                                    boolean autoGroup,
                                    int maxConnections,
+                                   boolean preCommitAcks,
                                    String jndiBinding) throws Exception;
 
    boolean createConnectionFactory(String name,
@@ -143,6 +144,7 @@
                                    boolean blockOnPersistentSend,
                                    boolean autoGroupId,
                                    int maxConnections,
+                                   boolean preCommitAcks,
                                    List<String> jndiBinding) throws Exception;
 
    /**

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -67,6 +67,8 @@
    
    private static final String MAX_CONNECTIONS_ELEMENT = "max-connections";
 
+   private static final String PRE_COMMIT_ACKS_ELEMENT = "pre-commit-acks";
+
    private static final String CONNECTOR_ELEMENT = "connector";
 
    private static final String BACKUP_CONNECTOR_ELEMENT = "backup-connector";
@@ -145,6 +147,7 @@
          boolean blockOnPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
          boolean autoGroup = ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
          int maxConnections = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+         boolean preCommitAcks = ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS;
          List<String> jndiBindings = new ArrayList<String>();
          String connectorFactoryClassName = null;
          Map<String, Object> params = new HashMap<String, Object>();
@@ -213,6 +216,10 @@
             {
                maxConnections = Integer.parseInt(children.item(j).getTextContent().trim());
             }
+            else if(PRE_COMMIT_ACKS_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
+            {
+               preCommitAcks = Boolean.parseBoolean(children.item(j).getTextContent().trim());;
+            }
             else if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(j).getNodeName()))
             {
                String jndiName = children.item(j).getAttributes().getNamedItem("name").getNodeValue();
@@ -420,6 +427,7 @@
                                                   blockOnPersistentSend,
                                                   autoGroup,
                                                   maxConnections,
+                                                  preCommitAcks,
                                                   jndiBindings);
       }
       else if (node.getNodeName().equals(QUEUE_NODE_NAME))

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -55,7 +55,7 @@
 /**
  * A Deployer used to create and add to JNDI queues, topics and connection
  * factories. Typically this would only be used in an app server env.
- * 
+ *
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
@@ -86,10 +86,10 @@
 
    public static JMSServerManagerImpl newJMSServerManagerImpl(MessagingServer server) throws Exception
    {
-      MessagingServerControlMBean control = new MessagingServerControl(server.getPostOffice(), server.getStorageManager(), server.getConfiguration(), 
+      MessagingServerControlMBean control = new MessagingServerControl(server.getPostOffice(), server.getStorageManager(), server.getConfiguration(),
                                                                        server.getQueueSettingsRepository(), server.getResourceManager(), server, new MessageCounterManagerImpl(1000), new NotificationBroadcasterSupport());
       JMSManagementService jmsManagementService = new JMSManagementServiceImpl(server.getManagementService());
-      return new JMSServerManagerImpl(control, server.getPostOffice(), server.getStorageManager(), 
+      return new JMSServerManagerImpl(control, server.getPostOffice(), server.getStorageManager(),
                                       server.getQueueSettingsRepository(), jmsManagementService);
    }
 
@@ -219,6 +219,7 @@
                                           boolean blockOnPersistentSend,
                                           boolean autoGroup,
                                           int maxConnections,
+                                          boolean preCommitAcks,
                                           String jndiBinding) throws Exception
    {
       ArrayList<String> bindings = new ArrayList<String>(1);
@@ -242,6 +243,7 @@
                                      blockOnPersistentSend,
                                      autoGroup,
                                      maxConnections,
+                                     preCommitAcks,
                                      bindings);
    }
 
@@ -263,6 +265,7 @@
                                           boolean blockOnPersistentSend,
                                           boolean autoGroup,
                                           int maxConnections,
+                                          boolean preCommitAcks,
                                           List<String> jndiBindings) throws Exception
    {
       JBossConnectionFactory cf = connectionFactories.get(name);
@@ -285,7 +288,8 @@
                                          blockOnNonPersistentSend,
                                          blockOnPersistentSend,
                                          autoGroup,
-                                         maxConnections);
+                                         maxConnections,
+                                         preCommitAcks);
       }
       for (String jndiBinding : jndiBindings)
       {

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -99,6 +99,8 @@
                                 boolean autoGroup,
                                 @Parameter(name = "maxConnections", desc = "The maximum number of physical connections created per client using this connection factory. Sessions created will be assigned a connection in a round-robin fashion")
                                 int maxConnections,
+                                @Parameter(name = "preCommitAcks", desc = "If the server will acknowledge delivery of a message before it is sent")
+                                boolean preCommitAcks,
                                 @Parameter(name = "jndiBinding", desc = "JNDI Binding")
                                 String jndiBinding) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -94,6 +94,7 @@
                                        boolean blockOnPersistentSend,
                                        boolean autoGroup,
                                        int maxConnections,
+                                       boolean preCommitAcks,
                                        String jndiBinding) throws Exception
    {
       List<String> bindings = new ArrayList<String>();
@@ -117,6 +118,7 @@
                                                        blockOnPersistentSend,
                                                        autoGroup,
                                                        maxConnections,
+                                                       preCommitAcks,
                                                        jndiBinding);
       if (created)
       {

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -105,6 +105,7 @@
                                                        true,
                                                        ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
                                                        ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                       ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS,
                                                        "/StrictTCKConnectionFactory");
 
          cf = (JBossConnectionFactory)getInitialContext().lookup("/StrictTCKConnectionFactory");

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -55,6 +55,7 @@
                                                     true,
                                                     ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
                                                     ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                    ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS,
                                                     "/testsuitecf");
       
       cf = (JBossConnectionFactory)getInitialContext().lookup("/testsuitecf");

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -554,6 +554,7 @@
                                                     true,
                                                     false,
                                                     8,
+                                                    false,
                                                     jndiBindings);
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -124,7 +124,7 @@
             sf.setBlockOnAcknowledge(true);
          }
 
-         ClientSession session = sf.createSession(null, null, false, true, true, 0);
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
          session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
 
@@ -198,7 +198,7 @@
             sf = createInVMFactory();
          }
 
-         session = sf.createSession(null, null, false, true, true, 0);
+         session = sf.createSession(null, null, false, true, true, false, 0);
 
          ClientConsumer consumer = null;
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -158,7 +158,7 @@
          mockFactory.setBlockOnPersistentSend(false);
          mockFactory.setBlockOnAcknowledge(false);
 
-         session = mockFactory.createSession(null, null, false, true, true, 0);
+         session = mockFactory.createSession(null, null, false, true, true, false,  0);
 
          callback.session = session;
 
@@ -296,7 +296,7 @@
 
          ClientSessionFactory sf = createInVMFactory();
 
-         ClientSession session = sf.createSession(null, null, false, true, true, 0);
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
          session.createQueue(ADDRESS, queue[0], null, true, false, true);
          session.createQueue(ADDRESS, queue[1], null, true, false, true);
@@ -387,7 +387,7 @@
 
          ClientSessionFactory sf = createInVMFactory();
 
-         ClientSession session = sf.createSession(null, null, false, true, true, 0);
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
          session.createQueue(ADDRESS, queue[0], null, true, false, true);
          session.createQueue(ADDRESS, queue[1], null, true, false, true);
@@ -417,7 +417,7 @@
 
             sf = createInVMFactory();
 
-            session = sf.createSession(null, null, false, true, true, 0);
+            session = sf.createSession(null, null, false, true, true, false, 0);
          }
 
          readMessage(session, queue[0], numberOfIntegers);
@@ -488,7 +488,7 @@
             sf.setBlockOnAcknowledge(true);
          }
 
-         ClientSession session = sf.createSession(null, null, false, true, true, 0);
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
          session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
 
@@ -539,7 +539,7 @@
             sf = createInVMFactory();
          }
 
-         session = sf.createSession(null, null, false, true, true, 0);
+         session = sf.createSession(null, null, false, true, true, false, 0);
 
          ClientConsumer consumer = session.createConsumer(ADDRESS);
 
@@ -577,7 +577,7 @@
 
          session.close();
 
-         session = sf.createSession(null, null, false, true, true, 0);
+         session = sf.createSession(null, null, false, true, true, false, 0);
 
          readMessage(session, ADDRESS, numberOfIntegersBigMessage);
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -34,6 +34,7 @@
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
 import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS;
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
@@ -145,7 +146,7 @@
    {
       super.setUp();
 
-      
+
       Configuration config = createDefaultConfig(true);
       config.setSecurityEnabled(false);
       messagingService = createService(false, config);
@@ -165,8 +166,9 @@
                                         DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
                                         DEFAULT_AUTO_GROUP,
                                         DEFAULT_MAX_CONNECTIONS,
+                                        DEFAULT_PRE_COMMIT_ACKS,
                                         DEFAULT_ACK_BATCH_SIZE);
-      
+
    }
 
    @Override

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -93,6 +93,7 @@
                                                                       ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
                                                                       ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
                                                                       ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                                      ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS,
                                                                       ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE);
       
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -408,6 +408,112 @@
       session.close();
    }
 
+   public void testConsumerAckImmediateAutoCommitTrue() throws Exception
+   {
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createMessage(session, "m" + i);
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      session.start();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive(1000);
+
+         assertEquals("m" + i, message2.getBody().getString());
+      }
+      // assert that all the messages are there and none have been acked
+      assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getDeliveringCount(), 0);
+      assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getMessageCount(), 0);
+
+      session.close();
+   }
+
+   public void testConsumerAckImmediateAutoCommitFalse() throws Exception
+   {
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, false, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createMessage(session, "m" + i);
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      session.start();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive(1000);
+
+         assertEquals("m" + i, message2.getBody().getString());
+      }
+      // assert that all the messages are there and none have been acked
+      assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getDeliveringCount(), 0);
+      assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getMessageCount(), 0);
+
+      session.close();
+   }
+
+   public void testConsumerAckImmediateAckIgnored() throws Exception
+   {
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createMessage(session, "m" + i);
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      session.start();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive(1000);
+
+         assertEquals("m" + i, message2.getBody().getString());
+         if(i < 50)
+         {
+            message2.acknowledge();
+         }
+      }
+      // assert that all the messages are there and none have been acked
+      assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getDeliveringCount(), 0);
+      assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getMessageCount(), 0);
+
+      session.close();
+   }
+
    private ClientMessage createMessage(ClientSession session, String msg)
    {
       ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -106,7 +106,8 @@
                                                                ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
                                                                ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
                                                                ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
-                                                               ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+                                                               ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                               ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
 
       Connection conn = jbcf.createConnection();
 
@@ -185,7 +186,8 @@
                                                                    ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
                                                                    true,
                                                                    ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
-                                                                   ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+                                                                   ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                                   ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
 
       JBossConnectionFactory jbcfBackup = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
                                                                                                 backupParams),
@@ -204,7 +206,8 @@
                                                                      ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
                                                                      ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
                                                                      ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
-                                                                     ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+                                                                     ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                                     ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
 
       Connection connLive = jbcfLive.createConnection();
 

Added: trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -0,0 +1,137 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.jms.consumer;
+
+import junit.framework.TestCase;
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.jms.server.impl.JMSServerManagerImpl;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.integration.jms.management.NullInitialContext;
+import org.jboss.messaging.util.SimpleString;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ConsumerTest extends TestCase
+{
+   private MessagingService service;
+
+   private JMSServerManagerImpl serverManager;
+
+   private JBossConnectionFactory cf;
+
+   private static final String Q_NAME = "ConsumerTestQueue";
+
+   private JBossQueue jBossQueue;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration conf = new ConfigurationImpl();
+      conf.setSecurityEnabled(false);
+      conf.setJMXManagementEnabled(true);
+      conf.getAcceptorConfigurations()
+            .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      service = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+      service.start();
+      serverManager = JMSServerManagerImpl.newJMSServerManagerImpl(service.getServer());
+      serverManager.start();
+      serverManager.setInitialContext(new NullInitialContext());
+      serverManager.createQueue(Q_NAME, Q_NAME);
+      cf = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                             null,
+                                                             ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+                                                             ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                                             null,
+                                                             ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+                                                             ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+                                                             ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+                                                             ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+                                                             ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+                                                             ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+                                                             ClientSessionFactoryImpl.DEFAULT_BIG_MESSAGE_SIZE,
+                                                             ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+                                                             ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+                                                             true,
+                                                             ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
+                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                             true);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      cf = null;
+      if (service != null && service.isStarted())
+      {
+         try
+         {
+            service.stop();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+         service = null;
+
+      }
+   }
+
+   public void testPreCommitAcks() throws Exception
+   {
+      Connection conn = cf.createConnection();
+      Session session = conn.createSession(false, JBossSession.SERVER_ACKNOWLEDGE);
+      jBossQueue = new JBossQueue(Q_NAME);
+      MessageProducer producer = session.createProducer(jBossQueue);
+      MessageConsumer consumer = session.createConsumer(jBossQueue);
+      int noOfMessages = 100;
+      for(int i = 0; i < noOfMessages; i++)
+      {
+         producer.send(session.createTextMessage("m" + i));
+      }
+
+      conn.start();
+      for(int i = 0; i < noOfMessages; i++)
+      {
+         Message m = consumer.receive(500);
+         assertNotNull(m);
+      }
+       // assert that all the messages are there and none have been acked
+      SimpleString queueName = new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
+      assertEquals(service.getServer().getPostOffice().getBinding(queueName).getQueue().getDeliveringCount(), 0);
+      assertEquals(service.getServer().getPostOffice().getBinding(queueName).getQueue().getMessageCount(), 0);
+      session.close();
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -172,7 +172,8 @@
                                                              ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
                                                              true,
                                                              ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
-                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                             ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
 
       Connection conn = cf.createConnection();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -73,7 +73,8 @@
                                                              ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
                                                              true,
                                                              ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
-                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                             ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
 
       Connection conn = cf.createConnection();
 
@@ -105,7 +106,8 @@
                                                              ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
                                                              true,
                                                              ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
-                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                             ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
 
       Connection conn = cf.createConnection();
 
@@ -133,7 +135,8 @@
                                                              ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
                                                              true,
                                                              ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
-                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+                                                             ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                                             ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
 
       Connection conn = cf.createConnection();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	2008-11-19 11:30:06 UTC (rev 5388)
@@ -90,7 +90,7 @@
 
          final int NUMBER_OF_MESSAGES = 60000;
          
-         session = factory.createSession(null, null, false, false, true, 1024 * NUMBER_OF_MESSAGES);
+         session = factory.createSession(null, null, false, false, true, false, 1024 * NUMBER_OF_MESSAGES);
 
          SimpleString address = new SimpleString("page-adr");
 




More information about the jboss-cvs-commits mailing list