[jboss-cvs] JBoss Messaging SVN: r3843 - in trunk: src/main/org/jboss/messaging/core/client/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Mar 4 11:23:03 EST 2008
Author: timfox
Date: 2008-03-04 11:23:03 -0500 (Tue, 04 Mar 2008)
New Revision: 3843
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSL.java
trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSLTest.java
Log:
More tweaks
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java 2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConnection.java 2008-03-04 16:23:03 UTC (rev 3843)
@@ -16,7 +16,7 @@
public interface ClientConnection
{
ClientSession createClientSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
- int ackBatchSize, boolean cacheProducers) throws MessagingException;
+ int ackBatchSize, boolean blockOnAcknowledge, boolean cacheProducers) throws MessagingException;
void start() throws MessagingException;
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-03-04 16:23:03 UTC (rev 3843)
@@ -54,5 +54,11 @@
void close() throws MessagingException;
- boolean isClosed();
+ boolean isClosed();
+
+ boolean isAutoCommitSends();
+
+ boolean isAutoCommitAcks();
+
+ int getLazyAckBatchSize();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-03-04 16:23:03 UTC (rev 3843)
@@ -98,7 +98,8 @@
// ClientConnection implementation --------------------------------------------------------------
public ClientSession createClientSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks,
- final int ackBatchSize, final boolean cacheProducers) throws MessagingException
+ final int ackBatchSize, final boolean blockOnAcknowledge,
+ final boolean cacheProducers) throws MessagingException
{
checkClosed();
@@ -108,7 +109,7 @@
ClientSession session =
new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers, maxProducerRate,
- producerWindowSize);
+ producerWindowSize, autoCommitSends, autoCommitAcks, blockOnAcknowledge);
children.put(response.getSessionID(), session);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-03-04 16:23:03 UTC (rev 3843)
@@ -140,11 +140,19 @@
private long lastCommittedID = -1;
+ private final boolean autoCommitAcks;
+
+ private final boolean autoCommitSends;
+
+ private final boolean blockOnAcknowledge;
+
// Constructors ---------------------------------------------------------------------------------
public ClientSessionImpl(final ClientConnectionInternal connection, final String id,
final int lazyAckBatchSize, final boolean cacheProducers,
- final int maxProducerRate, final int producerWindowSize) throws MessagingException
+ final int maxProducerRate, final int producerWindowSize,
+ final boolean autoCommitSends, final boolean autoCommitAcks,
+ final boolean blockOnAcknowledge) throws MessagingException
{
if (lazyAckBatchSize < -1 || lazyAckBatchSize == 0)
{
@@ -175,6 +183,12 @@
{
producerCache = null;
}
+
+ this.autoCommitAcks = autoCommitAcks;
+
+ this.autoCommitSends = autoCommitSends;
+
+ this.blockOnAcknowledge = blockOnAcknowledge;
}
// ClientSession implementation -----------------------------------------------------------------
@@ -353,15 +367,24 @@
{
checkClosed();
- //First we tell each consumer to clear it's buffers and ignore any deliveries with
+ //We tell each consumer to clear it's buffers and ignore any deliveries with
//delivery id > last delivery id, until it gets delivery id = lastID again
+ if (autoCommitAcks)
+ {
+ lastCommittedID = lastID;
+ }
+
for (ClientConsumerInternal consumer: consumers.values())
{
consumer.recover(lastCommittedID + 1);
}
+ //We flush any remaining acks
+
acknowledgeInternal(false);
+
+ toAckCount = 0;
remotingConnection.send(id, new SessionRollbackMessage());
}
@@ -385,30 +408,18 @@
toAckCount = 0;
}
- else if (broken)
+ else if (broken || toAckCount == lazyAckBatchSize)
{
//Must always ack now
acknowledgeInternal(false);
toAckCount = 0;
- }
- else
- {
- if (toAckCount == lazyAckBatchSize)
+
+ if (autoCommitAcks)
{
- acknowledgeInternal(false);
-
- toAckCount = 0;
- }
- }
-
- //FIXME - temp hack - make server sessions alwyas transacted!!
-
- if (this.lazyAckBatchSize != -1)
- {
- lastCommittedID = lastID;
+ lastCommittedID = lastID;
+ }
}
-
}
public synchronized void close() throws MessagingException
@@ -453,6 +464,21 @@
return closed;
}
+ public boolean isAutoCommitSends()
+ {
+ return autoCommitSends;
+ }
+
+ public boolean isAutoCommitAcks()
+ {
+ return autoCommitAcks;
+ }
+
+ public int getLazyAckBatchSize()
+ {
+ return lazyAckBatchSize;
+ }
+
// ClientSessionInternal implementation ------------------------------------------------------------
public String getID()
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-03-04 16:23:03 UTC (rev 3843)
@@ -385,42 +385,59 @@
{
int ackBatchSize;
- if (transacted || acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
+ final boolean autoCommitSends;
+
+ final boolean autoCommitAcks;
+
+ final boolean blockOnAcknowledge;
+
+ if (acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ autoCommitSends = false;
+
+ autoCommitAcks = false;
+
+ ackBatchSize = -1; //Infinite
+
+ blockOnAcknowledge = false;
+ }
+ else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
{
- ackBatchSize = -1; //Infinite
+ autoCommitSends = true;
+
+ autoCommitAcks = true;
+
+ ackBatchSize = 1;
+
+ blockOnAcknowledge = true;
}
else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
{
- ackBatchSize = dupsOKBatchSize;
+ autoCommitSends = true;
+
+ autoCommitAcks = true;
+
+ ackBatchSize = dupsOKBatchSize;
+
+ blockOnAcknowledge = false;
}
- else
+ else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
{
- //Auto ack
- ackBatchSize = 1;
- }
+ autoCommitSends = true;
- boolean autoCommitSends = false;
-
- boolean autoCommitAcks = false;
-
- if (!transacted)
+ autoCommitAcks = false;
+
+ ackBatchSize = -1; //Infinite
+
+ blockOnAcknowledge = false;
+ }
+ else
{
- if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
- {
- autoCommitSends = true;
-
- autoCommitAcks = true;
- }
- else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
- {
- autoCommitSends = true;
-
- autoCommitAcks = false;
- }
+ throw new IllegalArgumentException("Invalid ackmode: " + acknowledgeMode);
}
ClientSession session =
- connection.createClientSession(isXA, autoCommitSends, autoCommitAcks, ackBatchSize, cacheProducers);
+ connection.createClientSession(isXA, autoCommitSends, autoCommitAcks, ackBatchSize, blockOnAcknowledge, cacheProducers);
justCreated = false;
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSL.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSL.java 2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSL.java 2008-03-04 16:23:03 UTC (rev 3843)
@@ -82,8 +82,7 @@
MessagingServer server = new MessagingServerImpl();
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(0, remotingConf, server.getVersion());
ClientConnection conn = cf.createConnection(null, null);
- ClientSession session = conn.createClientSession(false, true, true, 0,
- false);
+ ClientSession session = conn.createClientSession(false, true, true, 0, false, false);
ClientProducer producer = session.createProducer(QUEUE);
MessageImpl message = new MessageImpl(JBossTextMessage.TYPE, false, 0,
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSLTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSLTest.java 2008-03-04 15:06:06 UTC (rev 3842)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/ssl/integration/CoreClientOverSSLTest.java 2008-03-04 16:23:03 UTC (rev 3843)
@@ -130,8 +130,7 @@
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(0, remotingConf, server.getVersion());
connection = cf.createConnection(null, null);
- ClientSession session = connection.createClientSession(false, true, true,
- 0, false);
+ ClientSession session = connection.createClientSession(false, true, true, 0, false, false);
session.createQueue(QUEUE, QUEUE, null, false, false);
consumer = session.createConsumer(QUEUE, null, false, false, true);
connection.start();
More information about the jboss-cvs-commits
mailing list