[jboss-cvs] JBoss Messaging SVN: r5388 - in trunk: src/main/org/jboss/messaging/core/client and 20 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 19 06:30:07 EST 2008
Author: ataylor
Date: 2008-11-19 06:30:06 -0500 (Wed, 19 Nov 2008)
New Revision: 5388
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/
trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/config/jbm-jndi.xml
trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
Log:
added pre commit functionality
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/config/jbm-configuration.xml 2008-11-19 11:30:06 UTC (rev 5388)
@@ -86,7 +86,7 @@
</params>
</acceptor>
<!-- Netty standard TCP acceptor -->
- <acceptor>
+ <acceptor name="netty">
<factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
<params>
<param key="jbm.remoting.netty.host" value="localhost" type="String"/>
Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/config/jbm-jndi.xml 2008-11-19 11:30:06 UTC (rev 5388)
@@ -18,6 +18,19 @@
<entry name="java:/XAConnectionFactory"/>
</connection-factory>
+ <connection-factory name="ServerAckConnectionFactory">
+ <connector>
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ <pre-commit-acks>true</pre-commit-acks>
+ <entry name="ServerAckConnectionFactory"/>
+ <entry name="/ServerAckConnectionFactory"/>
+ <entry name="/ServerAckXAConnectionFactory"/>
+ <entry name="java:/ServerAckConnectionFactory"/>
+ <entry name="java:/ServerAckXAConnectionFactory"/>
+ </connection-factory>
+
+
<connection-factory name="ClusteredConnectionFactory">
<connector>
<factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
@@ -69,8 +82,10 @@
<send-p-messages-synchronously>true</send-p-messages-synchronously>
<!--If true, any connections will automatically set a unique group id (per producer) on every message sent-->
<auto-group-id>true</auto-group-id>
+ <!--if true then the server will pre ack any message before delivery to a consumer-->
+ <pre-commit-acks>false</pre-commit-acks>
</connection-factory>
-
+
<connection-factory name="TestInVMConnectionFactory">
<connector>
<factory-class>org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
@@ -80,19 +95,19 @@
</connector>
<entry name="/TestInVMConnectionFactory"/>
</connection-factory>
-
+
<connection-factory name="TestSSLConnectionFactory">
<connector>
<factory-class>org.jboss.messaging.integration.transports.netty.NettyConnectorFactory</factory-class>
<params>
<param key="jbm.remoting.netty.host" value="localhost" type="String"/>
- <param key="jbm.remoting.netty.port" value="5500" type="Integer"/>
+ <param key="jbm.remoting.netty.port" value="5500" type="Integer"/>
<param key="jbm.remoting.netty.sslenabled" value="true" type="Boolean"/>
<param key="jbm.remoting.netty.keystorepath" value="messaging.keystore" type="String"/>
<param key="jbm.remoting.netty.keystorepassword" value="secureexample" type="String"/>
</params>
- </connector>
- <entry name="/TestSSLConnectionFactory"/>
+ </connector>
+ <entry name="/TestSSLConnectionFactory"/>
</connection-factory>
<queue name="MyQueue">
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -28,11 +28,9 @@
import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
/**
- *
* A ClientSessionFactory
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public interface ClientSessionFactory
{
@@ -43,8 +41,14 @@
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
+ final boolean preCommitAcks,
int ackBatchSize) throws MessagingException;
+ ClientSession createSession(final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preCommitAcks) throws MessagingException;
+
void setConsumerWindowSize(int size);
int getConsumerWindowSize();
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -83,6 +83,8 @@
public static final int DEFAULT_ACK_BATCH_SIZE = 1024 * 1024;
+ public static final boolean DEFAULT_PRE_COMMIT_ACKS = false;
+
// Attributes
// -----------------------------------------------------------------------------------
@@ -127,6 +129,8 @@
private volatile int ackBatchSize;
+ private volatile boolean preCommitAcks;
+
private final Set<ClientSessionInternal> sessions = new HashSet<ClientSessionInternal>();
private final Object exitLock = new Object();
@@ -161,6 +165,7 @@
final boolean blockOnPersistentSend,
final boolean autoGroup,
final int maxConnections,
+ final boolean preCommitAcks,
final int ackBatchSize)
{
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
@@ -204,6 +209,7 @@
this.minLargeMessageSize = minLargeMessageSize;
this.autoGroup = autoGroup;
this.maxConnections = maxConnections;
+ this.preCommitAcks = preCommitAcks;
this.ackBatchSize = ackBatchSize;
}
@@ -224,6 +230,7 @@
DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_PRE_COMMIT_ACKS,
DEFAULT_ACK_BATCH_SIZE);
}
@@ -242,19 +249,28 @@
final String password,
final boolean xa,
final boolean autoCommitSends,
- final boolean autoCommitAcks,
+ final boolean autoCommitAcks,
+ final boolean preCommitAcks,
final int ackBatchSize) throws MessagingException
{
- return createSessionInternal(username, password, xa, autoCommitSends, autoCommitAcks, ackBatchSize);
+ return createSessionInternal(username, password, xa, autoCommitSends, autoCommitAcks, preCommitAcks, ackBatchSize);
}
public ClientSession createSession(final boolean xa,
final boolean autoCommitSends,
final boolean autoCommitAcks) throws MessagingException
{
- return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, ackBatchSize);
+ return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, false, ackBatchSize);
}
+ public ClientSession createSession(final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preCommitAcks) throws MessagingException
+ {
+ return createSessionInternal(null, null, xa, autoCommitSends, autoCommitAcks, preCommitAcks, ackBatchSize);
+ }
+
public int getConsumerWindowSize()
{
return consumerWindowSize;
@@ -553,7 +569,8 @@
final String password,
final boolean xa,
final boolean autoCommitSends,
- final boolean autoCommitAcks,
+ final boolean autoCommitAcks,
+ final boolean preCommitAcks,
final int ackBatchSize) throws MessagingException
{
synchronized (createSessionLock)
@@ -608,6 +625,7 @@
xa,
autoCommitSends,
autoCommitAcks,
+ preCommitAcks,
sendWindowSize);
Packet pResponse = channel1.sendBlocking(request);
@@ -638,6 +656,7 @@
xa,
autoCommitSends,
autoCommitAcks,
+ preCommitAcks,
blockOnAcknowledge,
autoGroup,
ackBatchSize,
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -135,6 +135,8 @@
private final boolean autoCommitAcks;
+ private final boolean preCommitAcks;
+
private final boolean autoCommitSends;
private final boolean blockOnAcknowledge;
@@ -161,6 +163,7 @@
final boolean xa,
final boolean autoCommitSends,
final boolean autoCommitAcks,
+ final boolean preCommitAcks,
final boolean blockOnAcknowledge,
final boolean autoGroup,
final int ackBatchSize,
@@ -183,6 +186,8 @@
this.autoCommitAcks = autoCommitAcks;
+ this.preCommitAcks = preCommitAcks;
+
this.autoCommitSends = autoCommitSends;
this.blockOnAcknowledge = blockOnAcknowledge;
@@ -551,6 +556,11 @@
// This acknowledges all messages received by the consumer so far
public void acknowledge(final long consumerID, final long messageID) throws MessagingException
{
+ //if we've pre commited then we don't need to do anything
+ if(preCommitAcks)
+ {
+ return;
+ }
checkClosed();
SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -53,9 +53,11 @@
private boolean autoCommitSends;
private boolean autoCommitAcks;
+
+ private boolean preCommitAcks;
private int windowSize;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -64,7 +66,7 @@
final int version, final String username, final String password,
final int minLargeMessageSize,
final boolean xa, final boolean autoCommitSends,
- final boolean autoCommitAcks, final int windowSize)
+ final boolean autoCommitAcks, final boolean preCommitAcks, final int windowSize)
{
super(CREATESESSION);
@@ -87,6 +89,8 @@
this.autoCommitAcks = autoCommitAcks;
this.windowSize = windowSize;
+
+ this.preCommitAcks = preCommitAcks;
}
public CreateSessionMessage()
@@ -135,7 +139,12 @@
{
return this.autoCommitAcks;
}
-
+
+ public boolean isPreCommitAcks()
+ {
+ return preCommitAcks;
+ }
+
public int getWindowSize()
{
return this.windowSize;
@@ -153,6 +162,7 @@
buffer.putBoolean(autoCommitSends);
buffer.putBoolean(autoCommitAcks);
buffer.putInt(windowSize);
+ buffer.putBoolean(preCommitAcks);
}
public void decodeBody(final MessagingBuffer buffer)
@@ -167,6 +177,7 @@
autoCommitSends = buffer.getBoolean();
autoCommitAcks = buffer.getBoolean();
windowSize = buffer.getInt();
+ preCommitAcks = buffer.getBoolean();
}
public boolean equals(Object other)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -53,6 +53,8 @@
private boolean autoCommitSends;
private boolean autoCommitAcks;
+
+ private boolean preCommitAcks;
private int windowSize;
@@ -69,6 +71,7 @@
final boolean xa,
final boolean autoCommitSends,
final boolean autoCommitAcks,
+ final boolean preCommitAcks,
final int windowSize)
{
super(REPLICATE_CREATESESSION);
@@ -90,7 +93,9 @@
this.autoCommitSends = autoCommitSends;
this.autoCommitAcks = autoCommitAcks;
-
+
+ this.preCommitAcks = preCommitAcks;
+
this.windowSize = windowSize;
}
@@ -145,7 +150,12 @@
{
return this.autoCommitAcks;
}
-
+
+ public boolean isPreCommitAcks()
+ {
+ return preCommitAcks;
+ }
+
public int getWindowSize()
{
return this.windowSize;
@@ -163,6 +173,7 @@
buffer.putBoolean(autoCommitSends);
buffer.putBoolean(autoCommitAcks);
buffer.putInt(windowSize);
+ buffer.putBoolean(preCommitAcks);
}
public void decodeBody(final MessagingBuffer buffer)
@@ -177,6 +188,7 @@
autoCommitSends = buffer.getBoolean();
autoCommitAcks = buffer.getBoolean();
windowSize = buffer.getInt();
+ preCommitAcks = buffer.getBoolean();
}
public boolean equals(Object other)
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -73,6 +73,7 @@
RemotingConnection remotingConnection,
boolean autoCommitSends,
boolean autoCommitAcks,
+ boolean preCommitAcks,
boolean xa,
int sendWindowSize) throws Exception;
@@ -85,6 +86,7 @@
RemotingConnection remotingConnection,
boolean autoCommitSends,
boolean autoCommitAcks,
+ boolean preCommitAcks,
boolean xa,
int sendWindowSize) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -72,7 +72,7 @@
/**
* The messaging server implementation
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
* @version <tt>$Revision: 3543 $</tt> <p/> $Id: ServerPeer.java 3543 2008-01-07 22:31:58Z clebert.suconic at jboss.com $
@@ -264,7 +264,7 @@
"\"",
e);
}
-
+
Map<String, Object> backupConnectorParams = backupConnector.getParams();
// TODO don't hardcode ping interval and code timeout
@@ -285,25 +285,25 @@
scheduledExecutor);
clusterManager.start();
-
+
//Deploy the cluster artifacts
-
+
for (BroadcastGroupConfiguration config: configuration.getBroadcastGroupConfigurations())
{
clusterManager.deployBroadcastGroup(config);
}
-
+
for (DiscoveryGroupConfiguration config: configuration.getDiscoveryGroupConfigurations())
{
clusterManager.deployDiscoveryGroup(config);
}
-
+
for (MessageFlowConfiguration config: configuration.getMessageFlowConfigurations())
{
clusterManager.deployMessageFlow(config);
}
}
-
+
started = true;
}
@@ -525,17 +525,18 @@
}
public CreateSessionResponseMessage replicateCreateSession(final String name,
- final long channelID,
- final String username,
- final String password,
- final int minLargeMessageSize,
- final int incrementingVersion,
- final RemotingConnection connection,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean xa,
- final int sendWindowSize) throws Exception
-{
+ final long channelID,
+ final String username,
+ final String password,
+ final int minLargeMessageSize,
+ final int incrementingVersion,
+ final RemotingConnection connection,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preCommitAcks,
+ final boolean xa,
+ final int sendWindowSize) throws Exception
+ {
return doCreateSession(name,
channelID,
username,
@@ -545,6 +546,7 @@
connection,
autoCommitSends,
autoCommitAcks,
+ preCommitAcks,
xa,
sendWindowSize);
}
@@ -558,6 +560,7 @@
final RemotingConnection connection,
final boolean autoCommitSends,
final boolean autoCommitAcks,
+ final boolean preCommitAcks,
final boolean xa,
final int sendWindowSize) throws Exception
{
@@ -572,6 +575,7 @@
connection,
autoCommitSends,
autoCommitAcks,
+ preCommitAcks,
xa,
sendWindowSize);
}
@@ -632,11 +636,12 @@
final long channelID,
final String username,
final String password,
- final int minLargeMessageSize,
+ final int minLargeMessageSize,
final int incrementingVersion,
final RemotingConnection connection,
final boolean autoCommitSends,
final boolean autoCommitAcks,
+ final boolean preCommitAcks,
final boolean xa,
final int sendWindowSize) throws Exception
{
@@ -677,6 +682,7 @@
minLargeMessageSize,
autoCommitSends,
autoCommitAcks,
+ preCommitAcks,
xa,
connection,
storageManager,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -72,6 +72,7 @@
msg.isXA(),
msg.isAutoCommitSends(),
msg.isAutoCommitAcks(),
+ msg.isPreCommitAcks(),
msg.getWindowSize());
result = channel1.replicatePacket(replPacket);
@@ -100,6 +101,7 @@
connection,
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
+ request.isPreCommitAcks(),
request.isXA(),
request.getWindowSize());
break;
@@ -117,6 +119,7 @@
connection,
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
+ request.isPreCommitAcks(),
request.isXA(),
request.getWindowSize());
break;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -120,6 +120,8 @@
private volatile boolean closed;
+ private final boolean preCommitAcks;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -133,7 +135,8 @@
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final PostOffice postOffice,
final Channel channel,
- final PagingManager pager)
+ final PagingManager pager,
+ final boolean preCommitAcks)
{
this.id = id;
@@ -157,6 +160,8 @@
this.pager = pager;
+ this.preCommitAcks = preCommitAcks;
+
messageQueue.addConsumer(this);
this.minLargeMessageSize = session.getMinLargeMessageSize();
@@ -375,7 +380,7 @@
while (ref.getMessage().getMessageID() != messageID);
}
-
+
public MessageReference getExpired(final long messageID) throws Exception
{
if (browseOnly)
@@ -537,7 +542,7 @@
return HandleStatus.NO_MATCH;
}
- if (!browseOnly)
+ if (!browseOnly || preCommitAcks)
{
deliveringRefs.add(ref);
}
@@ -554,6 +559,11 @@
sendRegularMessage(ref, message);
}
+ if(preCommitAcks)
+ {
+ doAck(ref);
+ }
+
return HandleStatus.HANDLED;
}
finally
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -164,6 +164,8 @@
private final boolean autoCommitAcks;
+ private final boolean preCommitAcks;
+
private volatile RemotingConnection remotingConnection;
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
@@ -211,6 +213,7 @@
final int minLargeMessageSize,
final boolean autoCommitSends,
final boolean autoCommitAcks,
+ final boolean preCommitAcks,
final boolean xa,
final RemotingConnection remotingConnection,
final StorageManager storageManager,
@@ -236,6 +239,8 @@
this.autoCommitAcks = autoCommitAcks;
+ this.preCommitAcks = preCommitAcks;
+
this.remotingConnection = remotingConnection;
this.storageManager = storageManager;
@@ -392,7 +397,8 @@
queueSettingsRepository,
postOffice,
channel,
- pager);
+ pager,
+ preCommitAcks);
consumers.put(consumer.getID(), consumer);
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -267,7 +267,7 @@
ClientSession session = null;
try
{
- session = sessionFactory.createSession(username, password, false, true, true, 0);
+ session = sessionFactory.createSession(username, password, false, true, true, false, 0);
// Remove any temporary queues and addresses
@@ -455,20 +455,24 @@
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
- session = sessionFactory.createSession(username, password, isXA, false, false, transactionBatchSize);
+ session = sessionFactory.createSession(username, password, isXA, false, false, false, transactionBatchSize);
}
else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, true, 0);
+ session = sessionFactory.createSession(username, password, isXA, true, true, false, 0);
}
else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, true, dupsOKBatchSize);
+ session = sessionFactory.createSession(username, password, isXA, true, true, false, dupsOKBatchSize);
}
else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, false, transactionBatchSize);
+ session = sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize);
}
+ else if (acknowledgeMode == JBossSession.SERVER_ACKNOWLEDGE)
+ {
+ session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize);
+ }
else
{
throw new IllegalArgumentException("Invalid ackmode: " + acknowledgeMode);
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -98,6 +98,8 @@
private final int maxConnections;
+ private final boolean preCommitAcks;
+
// Constructors ---------------------------------------------------------------------------------
public JBossConnectionFactory(final TransportConfiguration connectorConfig,
@@ -116,7 +118,8 @@
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
final boolean autoGroup,
- final int maxConnections)
+ final int maxConnections,
+ final boolean preCommitAcks)
{
this.connectorConfig = connectorConfig;
this.backupConnectorConfig = backupConnectorConfig;
@@ -135,6 +138,7 @@
this.blockOnPersistentSend = blockOnPersistentSend;
this.autoGroup = autoGroup;
this.maxConnections = maxConnections;
+ this.preCommitAcks = preCommitAcks;
}
// ConnectionFactory implementation -------------------------------------------------------------
@@ -311,6 +315,7 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
+ preCommitAcks,
DEFAULT_ACK_BATCH_SIZE);
}
@@ -323,7 +328,7 @@
try
{
- sess = sessionFactory.createSession(username, password, false, false, false, 0);
+ sess = sessionFactory.createSession(username, password, false, false, false, false, 0);
}
catch (MessagingException e)
{
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -96,6 +96,7 @@
public static final int TYPE_TOPIC_SESSION = 2;
+ public static final int SERVER_ACKNOWLEDGE = 4;
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JBossSession.class);
@@ -118,6 +119,7 @@
private final Set<JBossMessageConsumer> consumers = new HashSet<JBossMessageConsumer>();
+
// Constructors --------------------------------------------------
public JBossSession(final JBossConnection connection,
Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -123,6 +123,7 @@
boolean blockOnPersistentSend,
boolean autoGroup,
int maxConnections,
+ boolean preCommitAcks,
String jndiBinding) throws Exception;
boolean createConnectionFactory(String name,
@@ -143,6 +144,7 @@
boolean blockOnPersistentSend,
boolean autoGroupId,
int maxConnections,
+ boolean preCommitAcks,
List<String> jndiBinding) throws Exception;
/**
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -67,6 +67,8 @@
private static final String MAX_CONNECTIONS_ELEMENT = "max-connections";
+ private static final String PRE_COMMIT_ACKS_ELEMENT = "pre-commit-acks";
+
private static final String CONNECTOR_ELEMENT = "connector";
private static final String BACKUP_CONNECTOR_ELEMENT = "backup-connector";
@@ -145,6 +147,7 @@
boolean blockOnPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
boolean autoGroup = ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
int maxConnections = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+ boolean preCommitAcks = ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS;
List<String> jndiBindings = new ArrayList<String>();
String connectorFactoryClassName = null;
Map<String, Object> params = new HashMap<String, Object>();
@@ -213,6 +216,10 @@
{
maxConnections = Integer.parseInt(children.item(j).getTextContent().trim());
}
+ else if(PRE_COMMIT_ACKS_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
+ {
+ preCommitAcks = Boolean.parseBoolean(children.item(j).getTextContent().trim());;
+ }
else if (ENTRY_NODE_NAME.equalsIgnoreCase(children.item(j).getNodeName()))
{
String jndiName = children.item(j).getAttributes().getNamedItem("name").getNodeValue();
@@ -420,6 +427,7 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
+ preCommitAcks,
jndiBindings);
}
else if (node.getNodeName().equals(QUEUE_NODE_NAME))
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -55,7 +55,7 @@
/**
* A Deployer used to create and add to JNDI queues, topics and connection
* factories. Typically this would only be used in an app server env.
- *
+ *
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
*/
@@ -86,10 +86,10 @@
public static JMSServerManagerImpl newJMSServerManagerImpl(MessagingServer server) throws Exception
{
- MessagingServerControlMBean control = new MessagingServerControl(server.getPostOffice(), server.getStorageManager(), server.getConfiguration(),
+ MessagingServerControlMBean control = new MessagingServerControl(server.getPostOffice(), server.getStorageManager(), server.getConfiguration(),
server.getQueueSettingsRepository(), server.getResourceManager(), server, new MessageCounterManagerImpl(1000), new NotificationBroadcasterSupport());
JMSManagementService jmsManagementService = new JMSManagementServiceImpl(server.getManagementService());
- return new JMSServerManagerImpl(control, server.getPostOffice(), server.getStorageManager(),
+ return new JMSServerManagerImpl(control, server.getPostOffice(), server.getStorageManager(),
server.getQueueSettingsRepository(), jmsManagementService);
}
@@ -219,6 +219,7 @@
boolean blockOnPersistentSend,
boolean autoGroup,
int maxConnections,
+ boolean preCommitAcks,
String jndiBinding) throws Exception
{
ArrayList<String> bindings = new ArrayList<String>(1);
@@ -242,6 +243,7 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
+ preCommitAcks,
bindings);
}
@@ -263,6 +265,7 @@
boolean blockOnPersistentSend,
boolean autoGroup,
int maxConnections,
+ boolean preCommitAcks,
List<String> jndiBindings) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
@@ -285,7 +288,8 @@
blockOnNonPersistentSend,
blockOnPersistentSend,
autoGroup,
- maxConnections);
+ maxConnections,
+ preCommitAcks);
}
for (String jndiBinding : jndiBindings)
{
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControlMBean.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -99,6 +99,8 @@
boolean autoGroup,
@Parameter(name = "maxConnections", desc = "The maximum number of physical connections created per client using this connection factory. Sessions created will be assigned a connection in a round-robin fashion")
int maxConnections,
+ @Parameter(name = "preCommitAcks", desc = "If the server will acknowledge delivery of a message before it is sent")
+ boolean preCommitAcks,
@Parameter(name = "jndiBinding", desc = "JNDI Binding")
String jndiBinding) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControl.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -94,6 +94,7 @@
boolean blockOnPersistentSend,
boolean autoGroup,
int maxConnections,
+ boolean preCommitAcks,
String jndiBinding) throws Exception
{
List<String> bindings = new ArrayList<String>();
@@ -117,6 +118,7 @@
blockOnPersistentSend,
autoGroup,
maxConnections,
+ preCommitAcks,
jndiBinding);
if (created)
{
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -105,6 +105,7 @@
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS,
"/StrictTCKConnectionFactory");
cf = (JBossConnectionFactory)getInitialContext().lookup("/StrictTCKConnectionFactory");
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -55,6 +55,7 @@
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS,
"/testsuitecf");
cf = (JBossConnectionFactory)getInitialContext().lookup("/testsuitecf");
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -554,6 +554,7 @@
true,
false,
8,
+ false,
jndiBindings);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -124,7 +124,7 @@
sf.setBlockOnAcknowledge(true);
}
- ClientSession session = sf.createSession(null, null, false, true, true, 0);
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
@@ -198,7 +198,7 @@
sf = createInVMFactory();
}
- session = sf.createSession(null, null, false, true, true, 0);
+ session = sf.createSession(null, null, false, true, true, false, 0);
ClientConsumer consumer = null;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -158,7 +158,7 @@
mockFactory.setBlockOnPersistentSend(false);
mockFactory.setBlockOnAcknowledge(false);
- session = mockFactory.createSession(null, null, false, true, true, 0);
+ session = mockFactory.createSession(null, null, false, true, true, false, 0);
callback.session = session;
@@ -296,7 +296,7 @@
ClientSessionFactory sf = createInVMFactory();
- ClientSession session = sf.createSession(null, null, false, true, true, 0);
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(ADDRESS, queue[0], null, true, false, true);
session.createQueue(ADDRESS, queue[1], null, true, false, true);
@@ -387,7 +387,7 @@
ClientSessionFactory sf = createInVMFactory();
- ClientSession session = sf.createSession(null, null, false, true, true, 0);
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(ADDRESS, queue[0], null, true, false, true);
session.createQueue(ADDRESS, queue[1], null, true, false, true);
@@ -417,7 +417,7 @@
sf = createInVMFactory();
- session = sf.createSession(null, null, false, true, true, 0);
+ session = sf.createSession(null, null, false, true, true, false, 0);
}
readMessage(session, queue[0], numberOfIntegers);
@@ -488,7 +488,7 @@
sf.setBlockOnAcknowledge(true);
}
- ClientSession session = sf.createSession(null, null, false, true, true, 0);
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
@@ -539,7 +539,7 @@
sf = createInVMFactory();
}
- session = sf.createSession(null, null, false, true, true, 0);
+ session = sf.createSession(null, null, false, true, true, false, 0);
ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -577,7 +577,7 @@
session.close();
- session = sf.createSession(null, null, false, true, true, 0);
+ session = sf.createSession(null, null, false, true, true, false, 0);
readMessage(session, ADDRESS, numberOfIntegersBigMessage);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -34,6 +34,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
@@ -145,7 +146,7 @@
{
super.setUp();
-
+
Configuration config = createDefaultConfig(true);
config.setSecurityEnabled(false);
messagingService = createService(false, config);
@@ -165,8 +166,9 @@
DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_PRE_COMMIT_ACKS,
DEFAULT_ACK_BATCH_SIZE);
-
+
}
@Override
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/ReplicateConnectionFailureTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -93,6 +93,7 @@
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS,
ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/consumer/ConsumerTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -408,6 +408,112 @@
session.close();
}
+ public void testConsumerAckImmediateAutoCommitTrue() throws Exception
+ {
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createMessage(session, "m" + i);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().getString());
+ }
+ // assert that all the messages are there and none have been acked
+ assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getDeliveringCount(), 0);
+ assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getMessageCount(), 0);
+
+ session.close();
+ }
+
+ public void testConsumerAckImmediateAutoCommitFalse() throws Exception
+ {
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, false, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createMessage(session, "m" + i);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().getString());
+ }
+ // assert that all the messages are there and none have been acked
+ assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getDeliveringCount(), 0);
+ assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getMessageCount(), 0);
+
+ session.close();
+ }
+
+ public void testConsumerAckImmediateAckIgnored() throws Exception
+ {
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createMessage(session, "m" + i);
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertEquals("m" + i, message2.getBody().getString());
+ if(i < 50)
+ {
+ message2.acknowledge();
+ }
+ }
+ // assert that all the messages are there and none have been acked
+ assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getDeliveringCount(), 0);
+ assertEquals(messagingService.getServer().getPostOffice().getBinding(QUEUE).getQueue().getMessageCount(), 0);
+
+ session.close();
+ }
+
private ClientMessage createMessage(ClientSession session, String msg)
{
ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -106,7 +106,8 @@
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
Connection conn = jbcf.createConnection();
@@ -185,7 +186,8 @@
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
JBossConnectionFactory jbcfBackup = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams),
@@ -204,7 +206,8 @@
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
Connection connLive = jbcfLive.createConnection();
Added: trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -0,0 +1,137 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.jms.consumer;
+
+import junit.framework.TestCase;
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.jms.server.impl.JMSServerManagerImpl;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.integration.jms.management.NullInitialContext;
+import org.jboss.messaging.util.SimpleString;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ConsumerTest extends TestCase
+{
+ private MessagingService service;
+
+ private JMSServerManagerImpl serverManager;
+
+ private JBossConnectionFactory cf;
+
+ private static final String Q_NAME = "ConsumerTestQueue";
+
+ private JBossQueue jBossQueue;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ conf.setJMXManagementEnabled(true);
+ conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ service = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+ service.start();
+ serverManager = JMSServerManagerImpl.newJMSServerManagerImpl(service.getServer());
+ serverManager.start();
+ serverManager.setInitialContext(new NullInitialContext());
+ serverManager.createQueue(Q_NAME, Q_NAME);
+ cf = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ null,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ null,
+ ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_BIG_MESSAGE_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ true,
+ ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ true);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ cf = null;
+ if (service != null && service.isStarted())
+ {
+ try
+ {
+ service.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ service = null;
+
+ }
+ }
+
+ public void testPreCommitAcks() throws Exception
+ {
+ Connection conn = cf.createConnection();
+ Session session = conn.createSession(false, JBossSession.SERVER_ACKNOWLEDGE);
+ jBossQueue = new JBossQueue(Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 100;
+ for(int i = 0; i < noOfMessages; i++)
+ {
+ producer.send(session.createTextMessage("m" + i));
+ }
+
+ conn.start();
+ for(int i = 0; i < noOfMessages; i++)
+ {
+ Message m = consumer.receive(500);
+ assertNotNull(m);
+ }
+ // assert that all the messages are there and none have been acked
+ SimpleString queueName = new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
+ assertEquals(service.getServer().getPostOffice().getBinding(queueName).getQueue().getDeliveringCount(), 0);
+ assertEquals(service.getServer().getPostOffice().getBinding(queueName).getQueue().getMessageCount(), 0);
+ session.close();
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -172,7 +172,8 @@
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
Connection conn = cf.createConnection();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -73,7 +73,8 @@
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
Connection conn = cf.createConnection();
@@ -105,7 +106,8 @@
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
Connection conn = cf.createConnection();
@@ -133,7 +135,8 @@
ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
true,
ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_PRE_COMMIT_ACKS);
Connection conn = cf.createConnection();
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java 2008-11-19 07:23:23 UTC (rev 5387)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java 2008-11-19 11:30:06 UTC (rev 5388)
@@ -90,7 +90,7 @@
final int NUMBER_OF_MESSAGES = 60000;
- session = factory.createSession(null, null, false, false, true, 1024 * NUMBER_OF_MESSAGES);
+ session = factory.createSession(null, null, false, false, true, false, 1024 * NUMBER_OF_MESSAGES);
SimpleString address = new SimpleString("page-adr");
More information about the jboss-cvs-commits
mailing list