[jboss-cvs] JBoss Messaging SVN: r5056 - in trunk: src/main/org/jboss/messaging/core/client and 14 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 1 06:04:17 EDT 2008
Author: ataylor
Date: 2008-10-01 06:04:17 -0400 (Wed, 01 Oct 2008)
New Revision: 5056
Added:
trunk/src/main/org/jboss/messaging/util/GroupIdGenerator.java
trunk/src/main/org/jboss/messaging/util/SimpleStringIdGenerator.java
trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java
Modified:
trunk/src/config/jbm-jndi.xml
trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1273 - implementing aut setting of message group id
Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/config/jbm-jndi.xml 2008-10-01 10:04:17 UTC (rev 5056)
@@ -65,6 +65,8 @@
<send-np-messages-synchronously>true</send-np-messages-synchronously>
<!--Whether we send persistent messages synchronously-->
<send-p-messages-synchronously>true</send-p-messages-synchronously>
+ <!--If true, any connections will automatically set a unique group id (per producer) on every message sent-->
+ <auto-group-id>true</auto-group-id>
</connection-factory>
<connection-factory name="TestInVMConnectionFactory">
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,12 +22,12 @@
package org.jboss.messaging.core.client;
-import java.util.Map;
-
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
+import java.util.Map;
+
/**
*
* A ClientSessionFactory
@@ -72,6 +72,10 @@
boolean isBlockOnAcknowledge();
void setBlockOnAcknowledge(final boolean blocking);
+
+ boolean isAutoGroupId();
+
+ void setAutoGroupId(boolean autoGroupId);
ConnectorFactory getConnectorFactory();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,12 +12,11 @@
package org.jboss.messaging.core.client.impl;
-import java.util.concurrent.Semaphore;
-
import org.jboss.messaging.core.client.AcknowledgementHandler;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
@@ -25,6 +24,8 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TokenBucketLimiter;
+import java.util.concurrent.Semaphore;
+
/**
* The client-side Producer connectionFactory class.
*
@@ -69,6 +70,8 @@
private final int initialWindowSize;
+ private final SimpleString autoGroupId;
+
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
@@ -79,6 +82,7 @@
final TokenBucketLimiter rateLimiter,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
+ final SimpleString autoGroupId,
final int initialCredits,
final Channel channel)
{
@@ -96,6 +100,8 @@
this.blockOnPersistentSend = blockOnPersistentSend;
+ this.autoGroupId = autoGroupId;
+
availableCredits = new Semaphore(initialCredits);
creditFlowControl = initialCredits != -1;
@@ -284,6 +290,11 @@
rateLimiter.limit();
}
+ if(autoGroupId != null)
+ {
+ msg.putStringProperty(MessageImpl.GROUP_ID, autoGroupId);
+ }
+
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
SessionSendMessage message = new SessionSendMessage(id, msg, sendBlocking);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -11,20 +11,12 @@
*/
package org.jboss.messaging.core.client.impl;
-import java.util.Map;
-import java.util.Set;
-
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.ConnectionRegistry;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -34,6 +26,9 @@
import org.jboss.messaging.util.UUIDGenerator;
import org.jboss.messaging.util.VersionLoader;
+import java.util.Map;
+import java.util.Set;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -66,6 +61,8 @@
public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
+ public static final boolean DEFAULT_AUTO_GROUP_ID = false;
+
// Attributes
// -----------------------------------------------------------------------------------
@@ -104,6 +101,8 @@
private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<ClientSessionInternal>();
+ private volatile boolean autoGroupId;
+
// Static
// ---------------------------------------------------------------------------------------
@@ -123,7 +122,8 @@
final int producerMaxRate,
final boolean blockOnAcknowledge,
final boolean blockOnNonPersistentSend,
- final boolean blockOnPersistentSend)
+ final boolean blockOnPersistentSend,
+ final boolean autoGroupId)
{
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
transportParams = connectorConfig.getParams();
@@ -141,6 +141,7 @@
this.blockOnAcknowledge = blockOnAcknowledge;
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
this.blockOnPersistentSend = blockOnPersistentSend;
+ this.autoGroupId = autoGroupId;
connectionRegistry = ConnectionRegistryImpl.instance;
}
@@ -163,6 +164,7 @@
blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+ autoGroupId = DEFAULT_AUTO_GROUP_ID;
connectionRegistry = ConnectionRegistryImpl.instance;
}
@@ -182,6 +184,7 @@
blockOnAcknowledge = DEFAULT_BLOCK_ON_ACKNOWLEDGE;
blockOnPersistentSend = DEFAULT_BLOCK_ON_PERSISTENT_SEND;
blockOnNonPersistentSend = DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+ autoGroupId = DEFAULT_AUTO_GROUP_ID;
connectionRegistry = ConnectionRegistryImpl.instance;
}
@@ -281,6 +284,16 @@
blockOnAcknowledge = blocking;
}
+ public boolean isAutoGroupId()
+ {
+ return autoGroupId;
+ }
+
+ public void setAutoGroupId(boolean autoGroupId)
+ {
+ this.autoGroupId = autoGroupId;
+ }
+
public ConnectorFactory getConnectorFactory()
{
return connectorFactory;
@@ -499,6 +512,7 @@
autoCommitSends,
autoCommitAcks,
blockOnAcknowledge,
+ autoGroupId,
connection,
this,
response.getServerVersion(),
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -11,74 +11,23 @@
*/
package org.jboss.messaging.core.client.impl;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.*;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ConnectionRegistry;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.ResponseNotifier;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.CloseSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.ExecutorFactory;
-import org.jboss.messaging.util.IDGenerator;
-import org.jboss.messaging.util.JBMThreadFactory;
-import org.jboss.messaging.util.OrderedExecutorFactory;
-import org.jboss.messaging.util.SimpleIDGenerator;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TokenBucketLimiterImpl;
+import org.jboss.messaging.util.*;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
/*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -139,6 +88,8 @@
private final boolean blockOnAcknowledge;
+ private final boolean autoGroupId;
+
private final Channel channel;
private final int version;
@@ -149,7 +100,7 @@
private boolean forceNotSameRM;
private final IDGenerator idGenerator = new SimpleIDGenerator(0);
-
+
// Constructors ----------------------------------------------------------------------------
public ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
@@ -159,6 +110,7 @@
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean blockOnAcknowledge,
+ final boolean autoGroupId,
final RemotingConnection remotingConnection,
final ClientSessionFactory connectionFactory,
final int version,
@@ -193,6 +145,8 @@
this.blockOnAcknowledge = blockOnAcknowledge;
+ this.autoGroupId = autoGroupId;
+
this.channel = channel;
this.version = version;
@@ -412,7 +366,7 @@
if (producer == null)
{
- SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize, maxRate);
+ SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, windowSize, maxRate, autoGroupId);
SessionCreateProducerResponseMessage response = (SessionCreateProducerResponseMessage)channel.sendBlocking(request);
@@ -430,6 +384,7 @@
false),
autoCommitSends && blockOnNonPersistentSend,
autoCommitSends && blockOnPersistentSend,
+ response.getAutoGroupId(),
response.getInitialCredits(),
channel);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -42,12 +42,14 @@
private int windowSize;
private int maxRate;
+
+ private boolean autoGroupId;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateProducerMessage(final SimpleString address, final int windowSize, final int maxRate)
+ public SessionCreateProducerMessage(final SimpleString address, final int windowSize, final int maxRate, final boolean autoGroupId)
{
super(SESS_CREATEPRODUCER);
@@ -56,6 +58,8 @@
this.windowSize = windowSize;
this.maxRate = maxRate;
+
+ this.autoGroupId = autoGroupId;
}
public SessionCreateProducerMessage()
@@ -72,6 +76,7 @@
buff.append(", address=" + address);
buff.append(", windowSize=" + windowSize);
buff.append(", maxrate=" + maxRate);
+ buff.append(", autoGroupId=" + autoGroupId);
buff.append("]");
return buff.toString();
}
@@ -90,12 +95,18 @@
{
return maxRate;
}
-
+
+ public boolean isAutoGroupId()
+ {
+ return autoGroupId;
+ }
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putNullableSimpleString(address);
buffer.putInt(windowSize);
buffer.putInt(maxRate);
+ buffer.putBoolean(autoGroupId);
}
public void decodeBody(final MessagingBuffer buffer)
@@ -103,6 +114,7 @@
address = buffer.getNullableSimpleString();
windowSize = buffer.getInt();
maxRate = buffer.getInt();
+ autoGroupId = buffer.getBoolean();
}
public boolean equals(Object other)
@@ -117,7 +129,8 @@
return super.equals(other) &&
this.address == null ? r.address == null : this.address.equals(r.address) &&
this.windowSize == r.windowSize &&
- this.maxRate == r.maxRate;
+ this.maxRate == r.maxRate &&
+ this.autoGroupId == autoGroupId;
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -39,17 +40,21 @@
private int maxRate;
+ private SimpleString autoGroupId;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateProducerResponseMessage(final int initialCredits, final int maxRate)
+ public SessionCreateProducerResponseMessage(final int initialCredits, final int maxRate, final SimpleString autoGroupId)
{
super(SESS_CREATEPRODUCER_RESP);
this.initialCredits = initialCredits;
this.maxRate = maxRate;
+
+ this.autoGroupId = autoGroupId;
}
public SessionCreateProducerResponseMessage()
@@ -73,17 +78,25 @@
{
return maxRate;
}
-
+
+
+ public SimpleString getAutoGroupId()
+ {
+ return autoGroupId;
+ }
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putInt(initialCredits);
buffer.putInt(maxRate);
+ buffer.putNullableSimpleString(autoGroupId);
}
public void decodeBody(final MessagingBuffer buffer)
{
initialCredits = buffer.getInt();
maxRate = buffer.getInt();
+ autoGroupId = buffer.getNullableSimpleString();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/DistributionPolicy.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -40,4 +40,6 @@
int getConsumerCount();
boolean hasConsumers();
+
+ int getCurrentPosition();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,20 +22,14 @@
package org.jboss.messaging.core.server;
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
import org.jboss.messaging.core.server.impl.ServerBrowserImpl;
import org.jboss.messaging.util.SimpleString;
+import javax.transaction.xa.Xid;
+import java.util.List;
+
/**
*
* A ServerSession
@@ -107,7 +101,7 @@
SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName, SimpleString filterString,
int windowSize, int maxRate) throws Exception;
- SessionCreateProducerResponseMessage createProducer(SimpleString address, int windowSize, int maxRate) throws Exception;
+ SessionCreateProducerResponseMessage createProducer(SimpleString address, int windowSize, int maxRate, boolean autoGroupId) throws Exception;
SessionQueueQueryResponseMessage executeQueueQuery(SimpleString queueName) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,17 +12,6 @@
package org.jboss.messaging.core.server.impl;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
@@ -57,11 +46,13 @@
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
import org.jboss.messaging.core.version.Version;
-import org.jboss.messaging.util.ExecutorFactory;
-import org.jboss.messaging.util.JBMThreadFactory;
-import org.jboss.messaging.util.OrderedExecutorFactory;
-import org.jboss.messaging.util.VersionLoader;
+import org.jboss.messaging.util.*;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
/**
* The messaging server implementation
*
@@ -128,6 +119,8 @@
private ManagementService managementService;
+ private final SimpleStringIdGenerator simpleStringIdGenerator = new GroupIdGenerator(new SimpleString("AutoGroupId-"));
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -474,7 +467,7 @@
executorFactory.getExecutor(),
channel,
managementService,
- this);
+ simpleStringIdGenerator);
// If the session already exists that's fine - create session must be idempotent
// This is because if server failures occurring during a create session call we need to
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -737,13 +737,16 @@
return HandleStatus.BUSY;
}
+ int startPos = distributionPolicy.getCurrentPosition();
+
boolean filterRejected = false;
HandleStatus status = null;
- int pos = 0;
- while (pos <= distributionPolicy.getConsumerCount())
+ int pos;
+ while (true)
{
Consumer consumer = distributionPolicy.select(reference.getMessage(), status != null);
+ pos = distributionPolicy.getCurrentPosition();
if(consumer == null)
{
if (filterRejected)
@@ -797,18 +800,25 @@
filterRejected = true;
}
- pos++;
+ if(startPos > distributionPolicy.getConsumerCount() - 1)
+ {
+ startPos = distributionPolicy.getConsumerCount() - 1;
+ }
+ if(startPos == pos)
+ {
+ // Tried all of them
+ if (filterRejected)
+ {
+ return HandleStatus.NO_MATCH;
+ }
+ else
+ {
+ // Give up - all consumers busy
+ return HandleStatus.BUSY;
+ }
+ }
}
- // Tried all of them
- if (filterRejected)
- {
- return HandleStatus.NO_MATCH;
- }
- else
- {
- // Give up - all consumers busy
- return HandleStatus.BUSY;
- }
+
}
// Inner classes
@@ -821,7 +831,10 @@
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
- deliver();
+ synchronized (QueueImpl.this)
+ {
+ deliver();
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -34,7 +34,7 @@
*/
public class RoundRobinDistributionPolicy extends DistributionPolicyImpl
{
- int pos = -1;
+ int pos = 0;
public Consumer select(ServerMessage message, boolean redeliver)
{
@@ -42,35 +42,30 @@
{
return null;
}
- if (pos == -1)
+ int startPos = pos++;
+
+ if (pos == consumers.size())
{
- //First time
pos = 0;
- return consumers.get(pos);
}
- else
- {
- pos++;
-
- if (pos == consumers.size())
- {
- pos = 0;
- }
- }
-
- return consumers.get(pos);
+ return consumers.get(startPos);
}
public synchronized void addConsumer(Consumer consumer)
{
- pos = -1;
+ pos = 0;
super.addConsumer(consumer);
}
public synchronized boolean removeConsumer(Consumer consumer)
{
- pos = -1;
+ pos = 0;
return super.removeConsumer(consumer);
}
+
+ public int getCurrentPosition()
+ {
+ return pos;
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,21 +12,6 @@
package org.jboss.messaging.core.server.impl;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import javax.management.Notification;
-import javax.management.NotificationListener;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
@@ -42,22 +27,11 @@
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.SecurityStore;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.*;
import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerConsumer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.server.ServerProducer;
-import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
@@ -66,7 +40,18 @@
import org.jboss.messaging.util.IDGenerator;
import org.jboss.messaging.util.SimpleIDGenerator;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.SimpleStringIdGenerator;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
/*
* Session implementation
*
@@ -131,6 +116,8 @@
private final IDGenerator idGenerator = new SimpleIDGenerator(0);
+ private final SimpleStringIdGenerator simpleStringIdGenerator;
+
// Constructors ---------------------------------------------------------------------------------
public ServerSessionImpl(final String name,
@@ -149,7 +136,7 @@
final Executor executor,
final Channel channel,
final ManagementService managementService,
- final MessagingServer server) throws Exception
+ final SimpleStringIdGenerator simpleStringIdGenerator) throws Exception
{
this.id = id;
@@ -185,6 +172,8 @@
this.channel = channel;
this.managementService = managementService;
+
+ this.simpleStringIdGenerator = simpleStringIdGenerator;
}
// ServerSession implementation ----------------------------------------------------------------------------
@@ -959,7 +948,8 @@
*/
public SessionCreateProducerResponseMessage createProducer(final SimpleString address,
final int windowSize,
- final int maxRate) throws Exception
+ final int maxRate,
+ final boolean autoGroupId) throws Exception
{
FlowController flowController = null;
@@ -991,7 +981,12 @@
int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
- return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse);
+ SimpleString groupId = null;
+ if(autoGroupId)
+ {
+ groupId = simpleStringIdGenerator.generateID();
+ }
+ return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
}
public boolean browserHasNextMessage(final long browserID) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,91 +12,20 @@
package org.jboss.messaging.core.server.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PROCESSED;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
+import javax.transaction.xa.Xid;
+import java.util.List;
+
/**
* A ServerSessionPacketHandler
*
@@ -213,7 +142,7 @@
case SESS_CREATEPRODUCER:
{
SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
- response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate());
+ response = session.createProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate(), request.isAutoGroupId());
break;
}
case SESS_PROCESSED:
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,24 +12,6 @@
package org.jboss.messaging.jms.client;
-import java.io.Serializable;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
-import javax.jms.XAQueueConnection;
-import javax.jms.XAQueueConnectionFactory;
-import javax.jms.XATopicConnection;
-import javax.jms.XATopicConnectionFactory;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
@@ -39,6 +21,11 @@
import org.jboss.messaging.jms.referenceable.ConnectionFactoryObjectFactory;
import org.jboss.messaging.jms.referenceable.SerializableObjectRefAddr;
+import javax.jms.*;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import java.io.Serializable;
+
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -88,6 +75,8 @@
private final boolean blockOnPersistentSend;
+ private final boolean autoGroupId;
+
// Constructors ---------------------------------------------------------------------------------
public JBossConnectionFactory(final TransportConfiguration connectorConfig,
@@ -102,7 +91,8 @@
final int producerMaxRate,
final boolean blockOnAcknowledge,
final boolean blockOnNonPersistentSend,
- final boolean blockOnPersistentSend)
+ final boolean blockOnPersistentSend,
+ final boolean autoGroupId)
{
this.connectorConfig = connectorConfig;
this.backupConnectorConfig = backupConnectorConfig;
@@ -117,6 +107,7 @@
this.blockOnAcknowledge = blockOnAcknowledge;
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
this.blockOnPersistentSend = blockOnPersistentSend;
+ this.autoGroupId = autoGroupId;
}
// ConnectionFactory implementation -------------------------------------------------------------
@@ -263,8 +254,13 @@
return blockOnPersistentSend;
}
- // Package protected ----------------------------------------------------------------------------
+ public boolean isAutoGroupId()
+ {
+ return autoGroupId;
+ }
+// Package protected ----------------------------------------------------------------------------
+
// Protected ------------------------------------------------------------------------------------
protected JBossConnection createConnectionInternal(final String username,
@@ -285,7 +281,8 @@
producerMaxRate,
blockOnAcknowledge,
blockOnNonPersistentSend,
- blockOnPersistentSend);
+ blockOnPersistentSend,
+ autoGroupId);
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,12 +22,10 @@
package org.jboss.messaging.jms.server;
-import java.util.List;
-import java.util.Map;
-
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
+import java.util.List;
+
/**
* The JMS Management interface.
*
@@ -114,7 +112,7 @@
int producerWindowSize, int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend, String jndiBinding)
+ boolean blockOnPersistentSend, boolean autoGroupId, String jndiBinding)
throws Exception;
@@ -125,7 +123,7 @@
int producerWindowSize, int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend, List<String> jndiBinding)
+ boolean blockOnPersistentSend, boolean autoGroupId, List<String> jndiBinding)
throws Exception;
/**
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -12,11 +12,6 @@
package org.jboss.messaging.jms.server.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -28,6 +23,11 @@
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -62,6 +62,8 @@
private static final String SEND_P_MESSAGES_SYNCHRONOUSLY_ELEMENT = "send-p-messages-synchronously";
+ private static final String AUTO_GROUP_ID__ELEMENT = "auto-group-id";
+
private static final String CONNECTOR_ELEMENT = "connector";
private static final String BACKUP_CONNECTOR_ELEMENT = "backup-connector";
@@ -136,7 +138,7 @@
boolean blockOnAcknowledge = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
boolean blockOnNonPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
boolean blockOnPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-
+ boolean autoGroupId = ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP_ID;
List<String> jndiBindings = new ArrayList<String>();
String connectorFactoryClassName = null;
Map<String, Object> params = new HashMap<String, Object>();
@@ -189,6 +191,10 @@
{
blockOnPersistentSend = Boolean.parseBoolean(children.item(j).getTextContent().trim());
}
+ else if(AUTO_GROUP_ID__ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
+ {
+ autoGroupId = Boolean.parseBoolean(children.item(j).getTextContent().trim());
+ }
else if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(j).getNodeName()))
{
String jndiName = children.item(j).getAttributes().getNamedItem("name").getNodeValue();
@@ -392,6 +398,7 @@
blockOnAcknowledge,
blockOnNonPersistentSend,
blockOnPersistentSend,
+ autoGroupId,
jndiBindings);
}
else if (node.getNodeName().equals(QUEUE_NODE_NAME))
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,15 +22,6 @@
package org.jboss.messaging.jms.server.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.MessagingServerControlMBean;
@@ -46,6 +37,14 @@
import org.jboss.messaging.jms.server.management.JMSManagementService;
import org.jboss.messaging.util.JNDIUtil;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* A Deployer used to create and add to JNDI queues, topics and connection
* factories. Typically this would only be used in an app server env.
@@ -200,6 +199,7 @@
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
boolean blockOnPersistentSend,
+ boolean autoGroupId,
String jndiBinding) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
@@ -217,7 +217,8 @@
producerMaxRate,
blockOnAcknowledge,
blockOnNonPersistentSend,
- blockOnPersistentSend);
+ blockOnPersistentSend,
+ autoGroupId);
connectionFactories.put(name, cf);
}
if (!bindToJndi(jndiBinding, cf))
@@ -251,6 +252,7 @@
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
boolean blockOnPersistentSend,
+ boolean autoGroupId,
List<String> jndiBindings) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
@@ -268,7 +270,8 @@
producerMaxRate,
blockOnAcknowledge,
blockOnNonPersistentSend,
- blockOnPersistentSend);
+ blockOnPersistentSend,
+ autoGroupId);
}
for (String jndiBinding : jndiBindings)
{
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,12 +22,12 @@
package org.jboss.messaging.jms.server.management;
-import static javax.management.MBeanOperationInfo.ACTION;
-
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.management.Operation;
import org.jboss.messaging.core.management.Parameter;
+import static javax.management.MBeanOperationInfo.ACTION;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -82,6 +82,7 @@
@Parameter(name = "blockOnAcknowledge", desc = "Does acknowlegment block?") boolean blockOnAcknowledge,
@Parameter(name = "blockOnNonPersistentSend", desc = "Does sending non persistent messages block?") boolean blockOnNonPersistentSend,
@Parameter(name = "blockOnPersistentSend", desc = "Does sending persistent messages block") boolean blockOnPersistentSend,
+ @Parameter(name = "autoGroupId", desc = "Any Messages sent via this factories connections will automatically set th eproperty 'JMSXGroupId'") boolean autoGroupId,
@Parameter(name = "jndiBinding", desc = "JNDI Binding") String jndiBinding)
throws Exception;
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -22,26 +22,16 @@
package org.jboss.messaging.jms.server.management.impl;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanInfo;
-import javax.management.MBeanNotificationInfo;
-import javax.management.NotCompliantMBeanException;
-import javax.management.Notification;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationEmitter;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import javax.management.StandardMBean;
-
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
import org.jboss.messaging.jms.server.JMSServerManager;
import org.jboss.messaging.jms.server.management.JMSServerControlMBean;
+import javax.management.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -83,7 +73,7 @@
int producerWindowSize, int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend, String jndiBinding) throws Exception
+ boolean blockOnPersistentSend, boolean autoGroupId, String jndiBinding) throws Exception
{
List<String> bindings = new ArrayList<String>();
bindings.add(jndiBinding);
@@ -93,7 +83,7 @@
pingPeriod, callTimeout, clientID, dupsOKBatchSize,
consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate,
blockOnAcknowledge, blockOnNonPersistentSend,
- blockOnPersistentSend, jndiBinding);
+ blockOnPersistentSend, autoGroupId, jndiBinding);
if (created)
{
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
Added: trunk/src/main/org/jboss/messaging/util/GroupIdGenerator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/GroupIdGenerator.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/GroupIdGenerator.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class GroupIdGenerator implements SimpleStringIdGenerator
+{
+ private final AtomicInteger ai = new AtomicInteger(1);
+
+ private final SimpleString prefix;
+
+ public GroupIdGenerator(final SimpleString prefix)
+ {
+ this.prefix = prefix;
+ }
+
+ public SimpleString generateID()
+ {
+ SimpleString suffix = new SimpleString("" + ai.getAndIncrement());
+ return prefix.concat(suffix);
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/util/SimpleStringIdGenerator.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/SimpleStringIdGenerator.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/SimpleStringIdGenerator.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -0,0 +1,30 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface SimpleStringIdGenerator
+{
+ SimpleString generateID();
+}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -21,27 +21,7 @@
*/
package org.jboss.test.messaging.jms;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.InvalidSelectorException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.QueueConnection;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory;
import org.jboss.messaging.jms.client.JBossConnectionFactory;
import org.jboss.test.messaging.JBMServerTestCase;
import org.jboss.test.messaging.jms.message.SimpleJMSBytesMessage;
@@ -49,6 +29,10 @@
import org.jboss.test.messaging.jms.message.SimpleJMSTextMessage;
import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Safeguards for previously detected TCK failures.
*
@@ -88,7 +72,7 @@
getJmsServerManager().createConnectionFactory("StrictTCKConnectionFactory",
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,
null,
- 1000, 1024 * 1024, -1, 1000, -1, true, true, true, "/StrictTCKConnectionFactory");
+ 1000, 1024 * 1024, -1, 1000, -1, true, true, true, false, "/StrictTCKConnectionFactory");
cf = (JBossConnectionFactory) getInitialContext().lookup("/StrictTCKConnectionFactory");
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -1,12 +1,11 @@
package org.jboss.test.messaging.jms;
-import javax.naming.InitialContext;
-
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory;
import org.jboss.messaging.jms.client.JBossConnectionFactory;
import org.jboss.test.messaging.JBMServerTestCase;
+import javax.naming.InitialContext;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: $</tt>23 Jul 2007
@@ -40,7 +39,7 @@
getJmsServerManager().createConnectionFactory("testsuitecf",
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,
- null, 1000, 1024 * 1024, -1, 1000, -1, true, true, true, "/testsuitecf");
+ null, 1000, 1024 * 1024, -1, 1000, -1, true, true, true, false, "/testsuitecf");
cf = (JBossConnectionFactory) getInitialContext().lookup("/testsuitecf");
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -21,26 +21,6 @@
*/
package org.jboss.test.messaging.tools.container;
-import java.io.File;
-import java.lang.management.ManagementFactory;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import javax.management.MBeanServerInvocationHandler;
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-import javax.naming.InitialContext;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.transaction.UserTransaction;
-
import org.jboss.kernel.spi.deployment.KernelDeployment;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
@@ -61,6 +41,20 @@
import org.jboss.test.messaging.tools.jboss.MBeanConfigurationElement;
import org.jboss.tm.TransactionManagerLocator;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.transaction.UserTransaction;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.*;
+
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -530,7 +524,7 @@
getJMSServerManager().createConnectionFactory(objectName,
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"), null, 5000, 5000,
clientId, dupsOkBatchSize,
- prefetchSize, -1, 1000, -1, blockOnAcknowledge, true, true, jndiBindings);
+ prefetchSize, -1, 1000, -1, blockOnAcknowledge, true, true, false, jndiBindings);
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/basic/AutoGroupClientTest.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -0,0 +1,255 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.basic;
+
+import junit.framework.TestCase;
+import org.jboss.messaging.core.client.*;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributionPolicy;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class AutoGroupClientTest extends TestCase
+{
+ public void testGroupIdAutomaticallySet() throws Exception
+ {
+ final SimpleString QUEUE = new SimpleString("testGroupQueue");
+ QueueSettings qs = new QueueSettings();
+ qs.setDistributionPolicyClass(GroupingRoundRobinDistributionPolicy.class.getName());
+
+ Configuration conf = new ConfigurationImpl();
+
+ conf.setSecurityEnabled(false);
+
+ conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+ messagingService.getServer().getQueueSettingsRepository().addMatch("testGroupQueue", qs);
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setAutoGroupId(true);
+ ClientSession session = sf.createSession(false, true, true, false);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final CountDownLatch latch = new CountDownLatch(100);
+
+ MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+ MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ consumer.setMessageHandler(myMessageHandler);
+ ClientConsumer consumer2 = session.createConsumer(QUEUE);
+ consumer2.setMessageHandler(myMessageHandler2);
+
+ session.start();
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ message.setDurable(false);
+ producer.send(message);
+ }
+ latch.await();
+
+ session.close();
+
+ messagingService.stop();
+
+ assertEquals(myMessageHandler.messagesReceived, 100);
+ assertEquals(myMessageHandler2.messagesReceived, 0);
+ }
+
+ public void testGroupIdAutomaticallySetMultipleProducers() throws Exception
+ {
+ final SimpleString QUEUE = new SimpleString("testGroupQueue");
+ QueueSettings qs = new QueueSettings();
+ qs.setDistributionPolicyClass(GroupingRoundRobinDistributionPolicy.class.getName());
+
+ Configuration conf = new ConfigurationImpl();
+
+ conf.setSecurityEnabled(false);
+
+ conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+ messagingService.getServer().getQueueSettingsRepository().addMatch("testGroupQueue", qs);
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setAutoGroupId(true);
+ ClientSession session = sf.createSession(false, true, true, false);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+ ClientProducer producer2 = session.createProducer(QUEUE);
+
+ final CountDownLatch latch = new CountDownLatch(200);
+
+ MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+ MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+ MyMessageHandler myMessageHandler3 = new MyMessageHandler(latch);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ consumer.setMessageHandler(myMessageHandler);
+ ClientConsumer consumer2 = session.createConsumer(QUEUE);
+ consumer2.setMessageHandler(myMessageHandler2);
+ ClientConsumer consumer3 = session.createConsumer(QUEUE);
+ consumer3.setMessageHandler(myMessageHandler3);
+
+ session.start();
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ producer.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ producer2.send(message);
+ }
+ latch.await();
+
+ session.close();
+
+ messagingService.stop();
+
+ assertEquals(myMessageHandler.messagesReceived, 100);
+ assertEquals(myMessageHandler2.messagesReceived, 100);
+ assertEquals(myMessageHandler3.messagesReceived, 0);
+ }
+
+ public void testGroupIdAutomaticallyNotSet() throws Exception
+ {
+ final SimpleString QUEUE = new SimpleString("testGroupQueue");
+ QueueSettings qs = new QueueSettings();
+ qs.setDistributionPolicyClass(GroupingRoundRobinDistributionPolicy.class.getName());
+ Configuration conf = new ConfigurationImpl();
+
+ conf.setSecurityEnabled(false);
+
+ conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+ messagingService.getServer().getQueueSettingsRepository().addMatch("testGroupQueue", qs);
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true, false);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final CountDownLatch latch = new CountDownLatch(100);
+
+ MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+ MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ consumer.setMessageHandler(myMessageHandler);
+ ClientConsumer consumer2 = session.createConsumer(QUEUE);
+ consumer2.setMessageHandler(myMessageHandler2);
+
+ session.start();
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("testINVMCoreClient");
+ message.getBody().flip();
+ message.setDurable(false);
+ producer.send(message);
+ }
+ latch.await();
+
+ session.close();
+
+ messagingService.stop();
+
+ assertEquals(myMessageHandler.messagesReceived, 50);
+ assertEquals(myMessageHandler2.messagesReceived, 50);
+ }
+
+
+ private static class MyMessageHandler implements MessageHandler
+ {
+ volatile int messagesReceived = 0;
+
+ private final CountDownLatch latch;
+
+ public MyMessageHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ messagesReceived++;
+ try
+ {
+ message.processed();
+ }
+ catch (MessagingException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ latch.countDown();
+ }
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-01 08:27:34 UTC (rev 5055)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-10-01 10:04:17 UTC (rev 5056)
@@ -1451,18 +1451,23 @@
public boolean removeConsumer(Consumer consumer)
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public int getConsumerCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public boolean hasConsumers()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
+
+ public int getCurrentPosition()
+ {
+ return 0;
+ }
}
}
More information about the jboss-cvs-commits
mailing list