Author: clebert.suconic(a)jboss.com
Date: 2011-01-29 22:33:33 -0500 (Sat, 29 Jan 2011)
New Revision: 10159
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-5828 / HORNETQ-636 - ClusterBridges not Acking
properly
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-01-29
05:31:35 UTC (rev 10158)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-01-30
03:33:33 UTC (rev 10159)
@@ -13,14 +13,10 @@
package org.hornetq.api.core.client;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ServerLocatorImpl;
-import java.util.List;
-
/**
* Utility class for creating HornetQ {@link ClientSessionFactory} objects.
*
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-01-29
05:31:35 UTC (rev 10158)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-01-30
03:33:33 UTC (rev 10159)
@@ -74,7 +74,7 @@
private CommandConfirmationHandler commandConfirmationHandler;
private volatile boolean transferring;
-
+
public ChannelImpl(final CoreRemotingConnection connection, final long id, final int
confWindowSize)
{
this.connection = connection;
@@ -153,7 +153,7 @@
}
// This must never called by more than one thread concurrently
-
+
public void send(final Packet packet, final boolean flush, final boolean batch)
{
synchronized (sendLock)
@@ -161,7 +161,7 @@
packet.setChannelID(id);
HornetQBuffer buffer = packet.encode(connection);
-
+
lock.lock();
try
@@ -193,14 +193,13 @@
{
lock.unlock();
}
-
- //The actual send must be outside the lock, or with OIO transport, the write can
block if the tcp
- //buffer is full, preventing any incoming buffers being handled and blocking
failover
+
+ // The actual send must be outside the lock, or with OIO transport, the write
can block if the tcp
+ // buffer is full, preventing any incoming buffers being handled and blocking
failover
connection.getTransportConnection().write(buffer, flush, batch);
}
}
-
public Packet sendBlocking(final Packet packet) throws HornetQException
{
if (closed)
@@ -212,7 +211,7 @@
{
throw new IllegalStateException("Cannot do a blocking call timeout on a
server side connection");
}
-
+
// Synchronized since can't be called concurrently by more than one thread and
this can occur
// E.g. blocking acknowledge() from inside a message handler at some time as other
operation on main thread
synchronized (sendBlockingLock)
@@ -300,6 +299,10 @@
public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
{
+ if (confWindowSize < 0)
+ {
+ throw new IllegalStateException("You can't set confirmationHandler on a
connection with confirmation-window-size < 0. Look at the documentation for more
information.");
+ }
commandConfirmationHandler = handler;
}
@@ -501,6 +504,7 @@
lastReceivedCommandID +
" first stored command id " +
firstStoredCommandID);
+ firstStoredCommandID = lastReceivedCommandID + 1;
return;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-01-29
05:31:35 UTC (rev 10158)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-01-30
03:33:33 UTC (rev 10159)
@@ -359,6 +359,7 @@
serverLocator.setClusterTransportConfiguration(connector);
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
+ serverLocator.setConfirmationWindowSize(0);
if(retryInterval > 0)
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java 2011-01-29
05:31:35 UTC (rev 10158)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/SessionSendAcknowledgementHandlerTest.java 2011-01-30
03:33:33 UTC (rev 10159)
@@ -66,13 +66,59 @@
super.tearDown();
}
+ public void testSetInvalidSendACK() throws Exception
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setConfirmationWindowSize(-1);
+
+ ClientSessionFactory csf = locator.createSessionFactory();
+ ClientSession session = csf.createSession(null, null, false, true, true, false,
1);
+
+ try
+ {
+
+ boolean failed = false;
+ try
+ {
+ session.setSendAcknowledgementHandler(new SendAcknowledgementHandler()
+ {
+ public void sendAcknowledged(Message message)
+ {
+ }
+ });
+ }
+ catch (Throwable expected)
+ {
+ failed = true;
+ }
+
+ assertTrue("Expected a failure on setting ACK Handler", failed);
+
+ session.createQueue(address, queueName, false);
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
+ public void testSendAcknowledgementsNoWindowSize() throws Exception
+ {
+ testSendAcknowledgements(0);
+ }
+
public void testSendAcknowledgements() throws Exception
{
+ testSendAcknowledgements(1024);
+ }
+
+ public void testSendAcknowledgements(int windowSize) throws Exception
+ {
ServerLocator locator = createInVMNonHALocator();
+ locator.setConfirmationWindowSize(windowSize);
- locator.setConfirmationWindowSize(1024);
-
ClientSessionFactory csf = locator.createSessionFactory();
ClientSession session = csf.createSession(null, null, false, true, true, false,
1);