[jboss-cvs] JBoss Messaging SVN: r5056 - in trunk: src/main/org/jboss/messaging/core/client and 14 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 1 06:04:17 EDT 2008


Author: ataylor
Date: 2008-10-01 06:04:17 -0400 (Wed, 01 Oct 2008)
New Revision: 5056

Added:
   trunk/src/main/org/jboss/messaging/util/GroupIdGenerator.java
   trunk/src/main/org/jboss/messaging/util/SimpleStringIdGenerator.java
   trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java
Modified:
   trunk/src/config/jbm-jndi.xml
   trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
   trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
   trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1273 - implementing aut setting of message group id

Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/config/jbm-jndi.xml	2008-10-01 10:04:17 UTC (rev 5056)
@@ -65,6 +65,8 @@
       <send-np-messages-synchronously>true</send-np-messages-synchronously>
       <!--Whether we send persistent messages synchronously-->
       <send-p-messages-synchronously>true</send-p-messages-synchronously>
+      <!--If true, any connections will automatically set a unique group id (per producer) on every message sent-->
+      <auto-group-id>true</auto-group-id>
    </connection-factory>
    
    <connection-factory name="TestInVMConnectionFactory">

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,12 +22,12 @@
 
 package org.jboss.messaging.core.client;
 
-import java.util.Map;
-
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 
+import java.util.Map;
 
+
 /**
  * 
  * A ClientSessionFactory
@@ -72,6 +72,10 @@
    boolean isBlockOnAcknowledge();
    
    void setBlockOnAcknowledge(final boolean blocking);
+
+   boolean isAutoGroupId();
+
+   void setAutoGroupId(boolean autoGroupId);
    
    ConnectorFactory getConnectorFactory();
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,12 +12,11 @@
 
 package org.jboss.messaging.core.client.impl;
 
-import java.util.concurrent.Semaphore;
-
 import org.jboss.messaging.core.client.AcknowledgementHandler;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
@@ -25,6 +24,8 @@
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiter;
 
+import java.util.concurrent.Semaphore;
+
 /**
  * The client-side Producer connectionFactory class.
  * 
@@ -69,6 +70,8 @@
 
    private final int initialWindowSize;
 
+   private final SimpleString autoGroupId;
+
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
@@ -79,6 +82,7 @@
                              final TokenBucketLimiter rateLimiter,
                              final boolean blockOnNonPersistentSend,
                              final boolean blockOnPersistentSend,
+                             final SimpleString autoGroupId,
                              final int initialCredits,
                              final Channel channel)
    {
@@ -96,6 +100,8 @@
 
       this.blockOnPersistentSend = blockOnPersistentSend;
 
+      this.autoGroupId = autoGroupId;
+
       availableCredits = new Semaphore(initialCredits);
 
       creditFlowControl = initialCredits != -1;
@@ -284,6 +290,11 @@
          rateLimiter.limit();
       }
 
+      if(autoGroupId != null)
+      {
+         msg.putStringProperty(MessageImpl.GROUP_ID, autoGroupId);
+      }
+
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
 
       SessionSendMessage message = new SessionSendMessage(id, msg, sendBlocking);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -11,20 +11,12 @@
  */
 package org.jboss.messaging.core.client.impl;
 
-import java.util.Map;
-import java.util.Set;
-
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.ConnectionRegistry;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -34,6 +26,9 @@
 import org.jboss.messaging.util.UUIDGenerator;
 import org.jboss.messaging.util.VersionLoader;
 
+import java.util.Map;
+import java.util.Set;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -66,6 +61,8 @@
 
    public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
 
+   public static final boolean DEFAULT_AUTO_GROUP_ID = false;
+
    // Attributes
    // -----------------------------------------------------------------------------------
 
@@ -104,6 +101,8 @@
 
    private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<ClientSessionInternal>();
 
+   private volatile boolean autoGroupId;
+
    // Static
    // ---------------------------------------------------------------------------------------
 
@@ -123,7 +122,8 @@
                                    final int producerMaxRate,
                                    final boolean blockOnAcknowledge,
                                    final boolean blockOnNonPersistentSend,
-                                   final boolean blockOnPersistentSend)
+                                   final boolean blockOnPersistentSend,
+                                   final boolean autoGroupId)
    {
       connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
       transportParams = connectorConfig.getParams();
@@ -141,6 +141,7 @@
       this.blockOnAcknowledge = blockOnAcknowledge;
       this.blockOnNonPersistentSend = blockOnNonPersistentSend;
       this.blockOnPersistentSend = blockOnPersistentSend;
+      this.autoGroupId = autoGroupId;
       connectionRegistry = ConnectionRegistryImpl.instance;
    }
 
@@ -163,6 +164,7 @@
       blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
       blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
       blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+      autoGroupId = DEFAULT_AUTO_GROUP_ID;
       connectionRegistry = ConnectionRegistryImpl.instance;
    }
 
@@ -182,6 +184,7 @@
       blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
       blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
       blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+      autoGroupId = DEFAULT_AUTO_GROUP_ID;
       connectionRegistry = ConnectionRegistryImpl.instance;
    }
 
@@ -281,6 +284,16 @@
       blockOnAcknowledge = blocking;
    }
 
+   public boolean isAutoGroupId()
+   {
+      return autoGroupId;
+   }
+
+   public void setAutoGroupId(boolean autoGroupId)
+   {
+      this.autoGroupId = autoGroupId;
+   }
+
    public ConnectorFactory getConnectorFactory()
    {
       return connectorFactory;
@@ -499,6 +512,7 @@
                                                                autoCommitSends,
                                                                autoCommitAcks,
                                                                blockOnAcknowledge,
+                                                               autoGroupId,
                                                                connection,
                                                                this,
                                                                response.getServerVersion(),

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -11,74 +11,23 @@
  */
 package org.jboss.messaging.core.client.impl;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.*;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ConnectionRegistry;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.*;
 import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.CloseSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.ExecutorFactory;
-import org.jboss.messaging.util.IDGenerator;
-import org.jboss.messaging.util.JBMThreadFactory;
-import org.jboss.messaging.util.OrderedExecutorFactory;
-import org.jboss.messaging.util.SimpleIDGenerator;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TokenBucketLimiterImpl;
+import org.jboss.messaging.util.*;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
 /*
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * 
@@ -139,6 +88,8 @@
 
    private final boolean blockOnAcknowledge;
 
+   private final boolean autoGroupId;
+
    private final Channel channel;
 
    private final int version;
@@ -149,7 +100,7 @@
    private boolean forceNotSameRM;
 
    private final IDGenerator idGenerator = new SimpleIDGenerator(0);
-   
+
    // Constructors ----------------------------------------------------------------------------  
 
    public ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
@@ -159,6 +110,7 @@
                             final boolean autoCommitSends,
                             final boolean autoCommitAcks,
                             final boolean blockOnAcknowledge,
+                            final boolean autoGroupId,
                             final RemotingConnection remotingConnection,
                             final ClientSessionFactory connectionFactory,
                             final int version,
@@ -193,6 +145,8 @@
 
       this.blockOnAcknowledge = blockOnAcknowledge;
 
+      this.autoGroupId = autoGroupId;
+
       this.channel = channel;
 
       this.version = version;
@@ -412,7 +366,7 @@
 
       if (producer == null)
       {
-         SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize, maxRate);
+         SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize, maxRate, autoGroupId);
 
          SessionCreateProducerResponseMessage response = (SessionCreateProducerResponseMessage)channel.sendBlocking(request);
 
@@ -430,6 +384,7 @@
                                                                                                    false),
                                            autoCommitSends && blockOnNonPersistentSend,
                                            autoCommitSends && blockOnPersistentSend,
+                                           response.getAutoGroupId(),
                                            response.getInitialCredits(),
                                            channel);
       }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -42,12 +42,14 @@
    private int windowSize;
    
    private int maxRate;
+
+   private boolean autoGroupId;
       
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateProducerMessage(final SimpleString address, final int windowSize, final int maxRate)
+   public SessionCreateProducerMessage(final SimpleString address, final int windowSize, final int maxRate, final boolean autoGroupId)
    {
       super(SESS_CREATEPRODUCER);
   
@@ -56,6 +58,8 @@
       this.windowSize = windowSize;
       
       this.maxRate = maxRate;
+
+      this.autoGroupId = autoGroupId;
    }
    
    public SessionCreateProducerMessage()
@@ -72,6 +76,7 @@
       buff.append(", address=" + address);
       buff.append(", windowSize=" + windowSize);
       buff.append(", maxrate=" + maxRate);
+      buff.append(", autoGroupId=" + autoGroupId);
       buff.append("]");
       return buff.toString();
    }
@@ -90,12 +95,18 @@
    {
    	return maxRate;
    }
-   
+
+   public boolean isAutoGroupId()
+   {
+      return autoGroupId;
+   }
+
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putNullableSimpleString(address);
       buffer.putInt(windowSize);
       buffer.putInt(maxRate);
+      buffer.putBoolean(autoGroupId);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
@@ -103,6 +114,7 @@
       address = buffer.getNullableSimpleString();      
       windowSize = buffer.getInt();      
       maxRate = buffer.getInt();
+      autoGroupId = buffer.getBoolean();
    }
    
    public boolean equals(Object other)
@@ -117,7 +129,8 @@
       return super.equals(other) &&
              this.address == null ? r.address == null : this.address.equals(r.address) &&
              this.windowSize == r.windowSize &&
-             this.maxRate == r.maxRate;                  
+             this.maxRate == r.maxRate &&
+             this.autoGroupId == autoGroupId;                  
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -39,17 +40,21 @@
    
    private int maxRate;
 
+   private SimpleString autoGroupId;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionCreateProducerResponseMessage(final int initialCredits, final int maxRate)
+   public SessionCreateProducerResponseMessage(final int initialCredits, final int maxRate, final SimpleString autoGroupId)
    {
       super(SESS_CREATEPRODUCER_RESP);
  
       this.initialCredits = initialCredits;
       
       this.maxRate = maxRate;
+
+      this.autoGroupId = autoGroupId;
    }
    
    public SessionCreateProducerResponseMessage()
@@ -73,17 +78,25 @@
    {
    	return maxRate;
    }
-   
+
+
+   public SimpleString getAutoGroupId()
+   {
+      return autoGroupId;
+   }
+
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putInt(initialCredits);
       buffer.putInt(maxRate);
+      buffer.putNullableSimpleString(autoGroupId);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
    {     
       initialCredits = buffer.getInt();
       maxRate = buffer.getInt();
+      autoGroupId = buffer.getNullableSimpleString();
    }
    
 

Modified: trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -40,4 +40,6 @@
    int getConsumerCount();
 
    boolean hasConsumers();
+
+   int getCurrentPosition();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,20 +22,14 @@
 
 package org.jboss.messaging.core.server;
 
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
 import org.jboss.messaging.core.server.impl.ServerBrowserImpl;
 import org.jboss.messaging.util.SimpleString;
 
+import javax.transaction.xa.Xid;
+import java.util.List;
+
 /**
  * 
  * A ServerSession
@@ -107,7 +101,7 @@
    SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName, SimpleString filterString,
    		                                              int windowSize, int maxRate) throws Exception;
    
-   SessionCreateProducerResponseMessage createProducer(SimpleString address, int windowSize, int maxRate) throws Exception;   
+   SessionCreateProducerResponseMessage createProducer(SimpleString address, int windowSize, int maxRate, boolean autoGroupId) throws Exception;
 
    SessionQueueQueryResponseMessage executeQueueQuery(SimpleString queueName) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,17 +12,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -57,11 +46,13 @@
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
 import org.jboss.messaging.core.version.Version;
-import org.jboss.messaging.util.ExecutorFactory;
-import org.jboss.messaging.util.JBMThreadFactory;
-import org.jboss.messaging.util.OrderedExecutorFactory;
-import org.jboss.messaging.util.VersionLoader;
+import org.jboss.messaging.util.*;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
 /**
  * The messaging server implementation
  * 
@@ -128,6 +119,8 @@
 
    private ManagementService managementService;
 
+   private final SimpleStringIdGenerator simpleStringIdGenerator = new GroupIdGenerator(new SimpleString("AutoGroupId-"));
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -474,7 +467,7 @@
                                                               executorFactory.getExecutor(),
                                                               channel,
                                                               managementService,
-                                                              this);
+                                                              simpleStringIdGenerator);
 
       // If the session already exists that's fine - create session must be idempotent
       // This is because if server failures occurring during a create session call we need to

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -737,13 +737,16 @@
          return HandleStatus.BUSY;
       }
 
+      int startPos = distributionPolicy.getCurrentPosition();
+
       boolean filterRejected = false;
 
       HandleStatus status = null;
-      int pos = 0;
-      while (pos <= distributionPolicy.getConsumerCount())
+      int pos;
+      while (true)
       {
          Consumer consumer = distributionPolicy.select(reference.getMessage(), status != null);
+         pos = distributionPolicy.getCurrentPosition();
          if(consumer == null)
          {
             if (filterRejected)
@@ -797,18 +800,25 @@
 
             filterRejected = true;
          }
-         pos++;
+         if(startPos > distributionPolicy.getConsumerCount() - 1)
+         {
+            startPos = distributionPolicy.getConsumerCount() - 1;
+         }
+         if(startPos == pos)
+         {
+            // Tried all of them
+            if (filterRejected)
+            {
+               return HandleStatus.NO_MATCH;
+            }
+            else
+            {
+               // Give up - all consumers busy
+               return HandleStatus.BUSY;
+            }   
+         }
       }
-      // Tried all of them
-      if (filterRejected)
-      {
-         return HandleStatus.NO_MATCH;
-      }
-      else
-      {
-         // Give up - all consumers busy
-         return HandleStatus.BUSY;
-      }
+
    }
 
    // Inner classes
@@ -821,7 +831,10 @@
          // Must be set to false *before* executing to avoid race
          waitingToDeliver.set(false);
 
-         deliver();
+         synchronized (QueueImpl.this)
+         {
+            deliver();
+         }
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -34,7 +34,7 @@
  */
 public class RoundRobinDistributionPolicy extends DistributionPolicyImpl
 {
-   int pos = -1;
+   int pos = 0;
 
    public Consumer select(ServerMessage message, boolean redeliver)
    {     
@@ -42,35 +42,30 @@
       {
          return null;
       }
-      if (pos == -1)
+      int startPos = pos++;
+
+      if (pos == consumers.size())
       {
-         //First time
          pos = 0;
-         return consumers.get(pos);   
       }
-      else
-      {
-         pos++;
-         
-         if (pos == consumers.size())
-         {
-            pos = 0;
-         }
-      }
-
-      return consumers.get(pos);
+      return consumers.get(startPos);
    }
 
    public synchronized void addConsumer(Consumer consumer)
    {
-      pos = -1;
+      pos = 0;
       super.addConsumer(consumer);
    }
 
    public synchronized boolean removeConsumer(Consumer consumer)
    {
 
-      pos = -1;
+      pos = 0;
       return super.removeConsumer(consumer);
    }
+
+   public int getCurrentPosition()
+   {
+      return pos;
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,21 +12,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
@@ -42,22 +27,11 @@
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
 import org.jboss.messaging.core.security.CheckType;
 import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.*;
 import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerConsumer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.ServerProducer;
-import org.jboss.messaging.core.server.ServerSession;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
@@ -66,7 +40,18 @@
 import org.jboss.messaging.util.IDGenerator;
 import org.jboss.messaging.util.SimpleIDGenerator;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.SimpleStringIdGenerator;
 
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
 /*
  * Session implementation 
  * 
@@ -131,6 +116,8 @@
 
    private final IDGenerator idGenerator = new SimpleIDGenerator(0);
 
+   private final SimpleStringIdGenerator simpleStringIdGenerator;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerSessionImpl(final String name,
@@ -149,7 +136,7 @@
                             final Executor executor,
                             final Channel channel,
                             final ManagementService managementService,
-                            final MessagingServer server) throws Exception
+                            final SimpleStringIdGenerator simpleStringIdGenerator) throws Exception
    {
       this.id = id;
 
@@ -185,6 +172,8 @@
       this.channel = channel;
 
       this.managementService = managementService;
+
+      this.simpleStringIdGenerator = simpleStringIdGenerator;
    }
 
    // ServerSession implementation ----------------------------------------------------------------------------
@@ -959,7 +948,8 @@
     */
    public SessionCreateProducerResponseMessage createProducer(final SimpleString address,
                                                               final int windowSize,
-                                                              final int maxRate) throws Exception
+                                                              final int maxRate,
+                                                              final boolean autoGroupId) throws Exception
    {
       FlowController flowController = null;
 
@@ -991,7 +981,12 @@
 
       int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
 
-      return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse);
+      SimpleString groupId = null;
+      if(autoGroupId)
+      {
+         groupId = simpleStringIdGenerator.generateID();
+      }
+      return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
    }
 
    public boolean browserHasNextMessage(final long browserID) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,91 +12,20 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PROCESSED;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.ServerSession;
 
+import javax.transaction.xa.Xid;
+import java.util.List;
+
 /**
  * A ServerSessionPacketHandler
  * 
@@ -213,7 +142,7 @@
             case SESS_CREATEPRODUCER:
             {
                SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
-               response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate());
+               response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate(), request.isAutoGroupId());
                break;
             }
             case SESS_PROCESSED:

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,24 +12,6 @@
 
 package org.jboss.messaging.jms.client;
 
-import java.io.Serializable;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
-import javax.jms.XAQueueConnection;
-import javax.jms.XAQueueConnectionFactory;
-import javax.jms.XATopicConnection;
-import javax.jms.XATopicConnectionFactory;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
@@ -39,6 +21,11 @@
 import org.jboss.messaging.jms.referenceable.ConnectionFactoryObjectFactory;
 import org.jboss.messaging.jms.referenceable.SerializableObjectRefAddr;
 
+import javax.jms.*;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import java.io.Serializable;
+
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -88,6 +75,8 @@
 
    private final boolean blockOnPersistentSend;
 
+   private final boolean autoGroupId;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public JBossConnectionFactory(final TransportConfiguration connectorConfig,
@@ -102,7 +91,8 @@
                                  final int producerMaxRate,
                                  final boolean blockOnAcknowledge,
                                  final boolean blockOnNonPersistentSend,
-                                 final boolean blockOnPersistentSend)
+                                 final boolean blockOnPersistentSend,
+                                 final boolean autoGroupId)
    {
       this.connectorConfig = connectorConfig;
       this.backupConnectorConfig = backupConnectorConfig;
@@ -117,6 +107,7 @@
       this.blockOnAcknowledge = blockOnAcknowledge;
       this.blockOnNonPersistentSend = blockOnNonPersistentSend;
       this.blockOnPersistentSend = blockOnPersistentSend;
+      this.autoGroupId = autoGroupId;
    }
 
    // ConnectionFactory implementation -------------------------------------------------------------
@@ -263,8 +254,13 @@
       return blockOnPersistentSend;
    }
 
-   // Package protected ----------------------------------------------------------------------------
+   public boolean isAutoGroupId()
+   {
+      return autoGroupId;
+   }
 
+// Package protected ----------------------------------------------------------------------------
+
    // Protected ------------------------------------------------------------------------------------
 
    protected JBossConnection createConnectionInternal(final String username,
@@ -285,7 +281,8 @@
                                                        producerMaxRate,
                                                        blockOnAcknowledge,
                                                        blockOnNonPersistentSend,
-                                                       blockOnPersistentSend);
+                                                       blockOnPersistentSend,
+                                                       autoGroupId);
 
       }
 

Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,12 +22,10 @@
 
 package org.jboss.messaging.jms.server;
 
-import java.util.List;
-import java.util.Map;
-
 import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
 
+import java.util.List;
+
 /**
  * The JMS Management interface.
  * 
@@ -114,7 +112,7 @@
          int producerWindowSize, int producerMaxRate,
          boolean blockOnAcknowledge,
          boolean blockOnNonPersistentSend,
-         boolean blockOnPersistentSend, String jndiBinding)
+         boolean blockOnPersistentSend, boolean autoGroupId, String jndiBinding)
          throws Exception;
 
 
@@ -125,7 +123,7 @@
          int producerWindowSize, int producerMaxRate,
          boolean blockOnAcknowledge,
          boolean blockOnNonPersistentSend,
-         boolean blockOnPersistentSend, List<String> jndiBinding)
+         boolean blockOnPersistentSend, boolean autoGroupId,  List<String> jndiBinding)
          throws Exception;
 
    /**

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,11 +12,6 @@
 
 package org.jboss.messaging.jms.server.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -28,6 +23,11 @@
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -62,6 +62,8 @@
 
    private static final String SEND_P_MESSAGES_SYNCHRONOUSLY_ELEMENT = "send-p-messages-synchronously";
 
+   private static final String AUTO_GROUP_ID__ELEMENT = "auto-group-id";
+
    private static final String CONNECTOR_ELEMENT = "connector";
 
    private static final String BACKUP_CONNECTOR_ELEMENT = "backup-connector";
@@ -136,7 +138,7 @@
          boolean blockOnAcknowledge = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
          boolean blockOnNonPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
          boolean blockOnPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-
+         boolean autoGroupId = ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP_ID;
          List<String> jndiBindings = new ArrayList<String>();
          String connectorFactoryClassName = null;
          Map<String, Object> params = new HashMap<String, Object>();
@@ -189,6 +191,10 @@
             {
                blockOnPersistentSend = Boolean.parseBoolean(children.item(j).getTextContent().trim());
             }
+            else if(AUTO_GROUP_ID__ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
+            {
+               autoGroupId = Boolean.parseBoolean(children.item(j).getTextContent().trim());
+            }
             else if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(j).getNodeName()))
             {
                String jndiName = children.item(j).getAttributes().getNamedItem("name").getNodeValue();
@@ -392,6 +398,7 @@
                                                   blockOnAcknowledge,
                                                   blockOnNonPersistentSend,
                                                   blockOnPersistentSend,
+                                                  autoGroupId,
                                                   jndiBindings);
       }
       else if (node.getNodeName().equals(QUEUE_NODE_NAME))

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,15 +22,6 @@
 
 package org.jboss.messaging.jms.server.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
@@ -46,6 +37,14 @@
 import org.jboss.messaging.jms.server.management.JMSManagementService;
 import org.jboss.messaging.util.JNDIUtil;
 
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * A Deployer used to create and add to JNDI queues, topics and connection
  * factories. Typically this would only be used in an app server env.
@@ -200,6 +199,7 @@
                                           boolean blockOnAcknowledge,
                                           boolean blockOnNonPersistentSend,
                                           boolean blockOnPersistentSend,
+                                          boolean autoGroupId,
                                           String jndiBinding) throws Exception
    {
       JBossConnectionFactory cf = connectionFactories.get(name);
@@ -217,7 +217,8 @@
                                          producerMaxRate,
                                          blockOnAcknowledge,
                                          blockOnNonPersistentSend,
-                                         blockOnPersistentSend);
+                                         blockOnPersistentSend,
+                                         autoGroupId);
          connectionFactories.put(name, cf);
       }
       if (!bindToJndi(jndiBinding, cf))
@@ -251,6 +252,7 @@
                                           boolean blockOnAcknowledge,
                                           boolean blockOnNonPersistentSend,
                                           boolean blockOnPersistentSend,
+                                          boolean autoGroupId,
                                           List<String> jndiBindings) throws Exception
    {
       JBossConnectionFactory cf = connectionFactories.get(name);
@@ -268,7 +270,8 @@
                                          producerMaxRate,
                                          blockOnAcknowledge,
                                          blockOnNonPersistentSend,
-                                         blockOnPersistentSend);
+                                         blockOnPersistentSend,
+                                         autoGroupId);
       }
       for (String jndiBinding : jndiBindings)
       {

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,12 +22,12 @@
 
 package org.jboss.messaging.jms.server.management;
 
-import static javax.management.MBeanOperationInfo.ACTION;
-
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.management.Operation;
 import org.jboss.messaging.core.management.Parameter;
 
+import static javax.management.MBeanOperationInfo.ACTION;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -82,6 +82,7 @@
          @Parameter(name = "blockOnAcknowledge", desc = "Does acknowlegment block?") boolean blockOnAcknowledge,
          @Parameter(name = "blockOnNonPersistentSend", desc = "Does sending non persistent messages block?") boolean blockOnNonPersistentSend,
          @Parameter(name = "blockOnPersistentSend", desc = "Does sending persistent messages block") boolean blockOnPersistentSend,
+         @Parameter(name = "autoGroupId", desc = "Any Messages sent via this factories connections will automatically set th eproperty 'JMSXGroupId'") boolean autoGroupId,
          @Parameter(name = "jndiBinding", desc = "JNDI Binding") String jndiBinding)         
                   throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,26 +22,16 @@
 
 package org.jboss.messaging.jms.server.management.impl;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanInfo;
-import javax.management.MBeanNotificationInfo;
-import javax.management.NotCompliantMBeanException;
-import javax.management.Notification;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationEmitter;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import javax.management.StandardMBean;
-
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
 import org.jboss.messaging.jms.server.JMSServerManager;
 import org.jboss.messaging.jms.server.management.JMSServerControlMBean;
 
+import javax.management.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -83,7 +73,7 @@
          int producerWindowSize, int producerMaxRate,
          boolean blockOnAcknowledge,
          boolean blockOnNonPersistentSend,
-         boolean blockOnPersistentSend, String jndiBinding) throws Exception
+         boolean blockOnPersistentSend, boolean autoGroupId, String jndiBinding) throws Exception
    {
       List<String> bindings = new ArrayList<String>();
       bindings.add(jndiBinding);
@@ -93,7 +83,7 @@
                   pingPeriod, callTimeout, clientID, dupsOKBatchSize, 
                consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate, 
                blockOnAcknowledge, blockOnNonPersistentSend, 
-               blockOnPersistentSend, jndiBinding);
+               blockOnPersistentSend, autoGroupId, jndiBinding);
       if (created)
       {
          sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);

Added: trunk/src/main/org/jboss/messaging/util/GroupIdGenerator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/GroupIdGenerator.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/GroupIdGenerator.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class GroupIdGenerator implements SimpleStringIdGenerator
+{
+   private final AtomicInteger ai = new AtomicInteger(1);
+
+   private final SimpleString prefix;
+
+   public GroupIdGenerator(final SimpleString prefix)
+   {
+      this.prefix = prefix;
+   }
+
+   public SimpleString generateID()
+   {
+      SimpleString suffix = new SimpleString("" + ai.getAndIncrement());
+      return prefix.concat(suffix);
+   }
+
+}

Added: trunk/src/main/org/jboss/messaging/util/SimpleStringIdGenerator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/SimpleStringIdGenerator.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/SimpleStringIdGenerator.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -0,0 +1,30 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface SimpleStringIdGenerator
+{
+   SimpleString generateID();
+}

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -21,27 +21,7 @@
  */
 package org.jboss.test.messaging.jms;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.InvalidSelectorException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.QueueConnection;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-
 import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory;
 import org.jboss.messaging.jms.client.JBossConnectionFactory;
 import org.jboss.test.messaging.JBMServerTestCase;
 import org.jboss.test.messaging.jms.message.SimpleJMSBytesMessage;
@@ -49,6 +29,10 @@
 import org.jboss.test.messaging.jms.message.SimpleJMSTextMessage;
 import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
 
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Safeguards for previously detected TCK failures.
  *
@@ -88,7 +72,7 @@
          getJmsServerManager().createConnectionFactory("StrictTCKConnectionFactory",
                   new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,                  
                   null,
-               1000, 1024 * 1024, -1, 1000, -1, true, true, true, "/StrictTCKConnectionFactory");
+               1000, 1024 * 1024, -1, 1000, -1, true, true, true, false, "/StrictTCKConnectionFactory");
                  
          cf = (JBossConnectionFactory) getInitialContext().lookup("/StrictTCKConnectionFactory");
 		}

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -1,12 +1,11 @@
 package org.jboss.test.messaging.jms;
 
-import javax.naming.InitialContext;
-
 import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory;
 import org.jboss.messaging.jms.client.JBossConnectionFactory;
 import org.jboss.test.messaging.JBMServerTestCase;
 
+import javax.naming.InitialContext;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: $</tt>23 Jul 2007
@@ -40,7 +39,7 @@
       
       getJmsServerManager().createConnectionFactory("testsuitecf",
                new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,      
-               null, 1000, 1024 * 1024, -1, 1000, -1, true, true, true, "/testsuitecf");
+               null, 1000, 1024 * 1024, -1, 1000, -1, true, true, true, false, "/testsuitecf");
       
       cf = (JBossConnectionFactory) getInitialContext().lookup("/testsuitecf");      
    }

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -21,26 +21,6 @@
 */
 package org.jboss.test.messaging.tools.container;
 
-import java.io.File;
-import java.lang.management.ManagementFactory;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import javax.management.MBeanServerInvocationHandler;
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-import javax.naming.InitialContext;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.transaction.UserTransaction;
-
 import org.jboss.kernel.spi.deployment.KernelDeployment;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
@@ -61,6 +41,20 @@
 import org.jboss.test.messaging.tools.jboss.MBeanConfigurationElement;
 import org.jboss.tm.TransactionManagerLocator;
 
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.transaction.UserTransaction;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.*;
+
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -530,7 +524,7 @@
       getJMSServerManager().createConnectionFactory(objectName,
                new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,      
                clientId, dupsOkBatchSize,
-      		prefetchSize, -1, 1000, -1, blockOnAcknowledge, true, true, jndiBindings);
+      		prefetchSize, -1, 1000, -1, blockOnAcknowledge, true, true, false, jndiBindings);
    }
 
 

Added: trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -0,0 +1,255 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.basic;
+
+import junit.framework.TestCase;
+import org.jboss.messaging.core.client.*;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributionPolicy;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class AutoGroupClientTest extends TestCase
+{
+   public void testGroupIdAutomaticallySet() throws Exception
+   {
+      final SimpleString QUEUE = new SimpleString("testGroupQueue");
+      QueueSettings qs = new QueueSettings();
+      qs.setDistributionPolicyClass(GroupingRoundRobinDistributionPolicy.class.getName());
+
+      Configuration conf = new ConfigurationImpl();
+
+      conf.setSecurityEnabled(false);
+
+      conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+      MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+      messagingService.getServer().getQueueSettingsRepository().addMatch("testGroupQueue", qs);
+      messagingService.start();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      sf.setAutoGroupId(true);
+      ClientSession session = sf.createSession(false, true, true, false);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final CountDownLatch latch = new CountDownLatch(100);
+
+      MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+      MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      consumer.setMessageHandler(myMessageHandler);
+      ClientConsumer consumer2 = session.createConsumer(QUEUE);
+      consumer2.setMessageHandler(myMessageHandler2);
+
+      session.start();
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+               System.currentTimeMillis(), (byte) 1);
+         message.getBody().putString("testINVMCoreClient");
+         message.getBody().flip();
+         message.setDurable(false);
+         producer.send(message);
+      }
+      latch.await();
+
+      session.close();
+
+      messagingService.stop();
+
+      assertEquals(myMessageHandler.messagesReceived, 100);
+      assertEquals(myMessageHandler2.messagesReceived, 0);
+   }
+
+   public void testGroupIdAutomaticallySetMultipleProducers() throws Exception
+   {
+      final SimpleString QUEUE = new SimpleString("testGroupQueue");
+      QueueSettings qs = new QueueSettings();
+      qs.setDistributionPolicyClass(GroupingRoundRobinDistributionPolicy.class.getName());
+
+      Configuration conf = new ConfigurationImpl();
+
+      conf.setSecurityEnabled(false);
+
+      conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+      MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+      messagingService.getServer().getQueueSettingsRepository().addMatch("testGroupQueue", qs);
+      messagingService.start();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      sf.setAutoGroupId(true);
+      ClientSession session = sf.createSession(false, true, true, false);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+      ClientProducer producer2 = session.createProducer(QUEUE);
+
+      final CountDownLatch latch = new CountDownLatch(200);
+
+      MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+      MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+      MyMessageHandler myMessageHandler3 = new MyMessageHandler(latch);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      consumer.setMessageHandler(myMessageHandler);
+      ClientConsumer consumer2 = session.createConsumer(QUEUE);
+      consumer2.setMessageHandler(myMessageHandler2);
+      ClientConsumer consumer3 = session.createConsumer(QUEUE);
+      consumer3.setMessageHandler(myMessageHandler3);
+
+      session.start();
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+               System.currentTimeMillis(), (byte) 1);
+         message.getBody().putString("testINVMCoreClient");
+         message.getBody().flip();
+         producer.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+               System.currentTimeMillis(), (byte) 1);
+         message.getBody().putString("testINVMCoreClient");
+         message.getBody().flip();
+         producer2.send(message);
+      }
+      latch.await();
+
+      session.close();
+
+      messagingService.stop();
+
+      assertEquals(myMessageHandler.messagesReceived, 100);
+      assertEquals(myMessageHandler2.messagesReceived, 100);
+      assertEquals(myMessageHandler3.messagesReceived, 0);
+   }
+
+   public void testGroupIdAutomaticallyNotSet() throws Exception
+   {
+      final SimpleString QUEUE = new SimpleString("testGroupQueue");
+      QueueSettings qs = new QueueSettings();
+      qs.setDistributionPolicyClass(GroupingRoundRobinDistributionPolicy.class.getName());
+      Configuration conf = new ConfigurationImpl();
+
+      conf.setSecurityEnabled(false);
+
+      conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+      MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+      messagingService.getServer().getQueueSettingsRepository().addMatch("testGroupQueue", qs);
+      messagingService.start();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true, false);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+      
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final CountDownLatch latch = new CountDownLatch(100);
+
+      MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+      MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+      consumer.setMessageHandler(myMessageHandler);
+      ClientConsumer consumer2 = session.createConsumer(QUEUE);
+      consumer2.setMessageHandler(myMessageHandler2);
+      
+      session.start();
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+               System.currentTimeMillis(), (byte) 1);
+         message.getBody().putString("testINVMCoreClient");
+         message.getBody().flip();
+         message.setDurable(false);
+         producer.send(message);
+      }
+      latch.await();
+      
+      session.close();
+
+      messagingService.stop();
+
+      assertEquals(myMessageHandler.messagesReceived, 50);
+      assertEquals(myMessageHandler2.messagesReceived, 50);
+   }
+
+
+   private static class MyMessageHandler implements MessageHandler
+   {
+      volatile int messagesReceived = 0;
+
+      private final CountDownLatch latch;
+
+      public MyMessageHandler(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         messagesReceived++;
+         try
+         {
+            message.processed();
+         }
+         catch (MessagingException e)
+         {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+         }
+         latch.countDown();
+      }
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-10-01 10:04:17 UTC (rev 5056)
@@ -1451,18 +1451,23 @@
 
       public boolean removeConsumer(Consumer consumer)
       {
-         return false;  //To change body of implemented methods use File | Settings | File Templates.
+         return false;
       }
 
       public int getConsumerCount()
       {
-         return 0;  //To change body of implemented methods use File | Settings | File Templates.
+         return 0;
       }
 
       public boolean hasConsumers()
       {
-         return false;  //To change body of implemented methods use File | Settings | File Templates.
+         return false;
       }
+
+      public int getCurrentPosition()
+      {
+         return 0;  
+      }
    }
 
 }




More information about the jboss-cvs-commits mailing list