JBoss hornetq SVN: r12095 - branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-08 11:28:33 -0500 (Wed, 08 Feb 2012)
New Revision: 12095
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
avoiding NPE
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-02-08 13:31:32 UTC (rev 12094)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-02-08 16:28:33 UTC (rev 12095)
@@ -1246,7 +1246,7 @@
}
// If empty we need to schedule depaging to make sure we would depage expired messages as well
- if ((!hasElements || expired && pageIterator != null) && pageIterator.hasNext())
+ if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext())
{
scheduleDepage();
}
12 years, 11 months
JBoss hornetq SVN: r12094 - trunk/hornetq-core/src/test/java/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-08 08:31:32 -0500 (Wed, 08 Feb 2012)
New Revision: 12094
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-820 Avoid blocking send on clientSession.close() during tearDown(): use session.cleanup()
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2012-02-08 13:31:17 UTC (rev 12093)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2012-02-08 13:31:32 UTC (rev 12094)
@@ -34,6 +34,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
@@ -123,10 +124,16 @@
{
for (ClientSession cs : clientSessions)
{
+ if (cs == null)
+ continue;
try
{
- if (cs != null)
+ if (cs instanceof ClientSessionInternal)
{
+ ((ClientSessionInternal)cs).cleanUp(false);
+ }
+ else
+ {
cs.close();
}
}
12 years, 11 months
JBoss hornetq SVN: r12093 - trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-08 08:31:17 -0500 (Wed, 08 Feb 2012)
New Revision: 12093
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-820 replicationEndpoint should be stopped before the StorageManager
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-02-07 03:21:15 UTC (rev 12092)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-02-08 13:31:17 UTC (rev 12093)
@@ -571,6 +571,12 @@
pagingManager.stop();
}
+ if (replicationEndpoint != null)
+ {
+ replicationEndpoint.stop();
+ replicationEndpoint = null;
+ }
+
if (!criticalIOError && storageManager != null)
{
storageManager.stop();
@@ -582,12 +588,6 @@
replicationManager = null;
}
- if (replicationEndpoint != null)
- {
- replicationEndpoint.stop();
- replicationEndpoint = null;
- }
-
if (securityManager != null)
{
securityManager.stop();
12 years, 11 months
JBoss hornetq SVN: r12092 - trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-06 22:21:15 -0500 (Mon, 06 Feb 2012)
New Revision: 12092
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
Log:
HORNETQ-843 - fixing paging
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2012-02-07 03:20:35 UTC (rev 12091)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/QueueImpl.java 2012-02-07 03:21:15 UTC (rev 12092)
@@ -1223,8 +1223,10 @@
try
{
boolean expired = false;
+ boolean hasElements = false;
while (iter.hasNext())
{
+ hasElements = true;
MessageReference ref = iter.next();
try
{
@@ -1243,7 +1245,8 @@
}
}
- if (expired && pageIterator != null && pageIterator.hasNext())
+ // If empty we need to schedule depaging to make sure we would depage expired messages as well
+ if ((!hasElements || expired && pageIterator != null) && pageIterator.hasNext())
{
scheduleDepage();
}
12 years, 11 months
JBoss hornetq SVN: r12091 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-06 22:20:35 -0500 (Mon, 06 Feb 2012)
New Revision: 12091
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
HORNETQ-843 - fix on paging
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 03:16:10 UTC (rev 12090)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 03:20:35 UTC (rev 12091)
@@ -27,6 +27,7 @@
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -191,7 +192,74 @@
Assert.assertEquals(i, result);
}
}
+
+ public void testExpireMessage() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ ClientSession session = sf.createSession(true, true, 0);
+
+ try
+ {
+
+ session.createQueue(PagingFailoverTest.ADDRESS, PagingFailoverTest.ADDRESS, true);
+
+ ClientProducer prod = session.createProducer(PagingFailoverTest.ADDRESS);
+
+ final int TOTAL_MESSAGES = 1000;
+
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty(new SimpleString("key"), i);
+ msg.setExpiration(System.currentTimeMillis() + 1000);
+ prod.send(msg);
+ }
+
+ crash(session);
+
+ session.close();
+
+ Queue queue = backupServer.getServer().locateQueue(ADDRESS);
+
+ long timeout = System.currentTimeMillis() + 60000;
+ System.out.println("Starting now");
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().isPaging())
+ {
+ Thread.sleep(100);
+ // Simulating what would happen on expire
+ queue.expireReferences();
+ }
+
+ try
+ {
+ assertFalse(queue.getPageSubscription().isPaging());
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ locator.close();
+ }
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
12 years, 11 months
JBoss hornetq SVN: r12090 - branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-06 22:16:10 -0500 (Mon, 06 Feb 2012)
New Revision: 12090
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
fixing tests
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-07 03:16:04 UTC (rev 12089)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-07 03:16:10 UTC (rev 12090)
@@ -138,7 +138,6 @@
backupConfig.setSharedStore(true);
backupConfig.setBackup(true);
backupConfig.setClustered(true);
- backupConfig.setMessageExpiryScanPeriod(100);
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -157,7 +156,6 @@
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
- liveConfig.setMessageExpiryScanPeriod(100);
List<String> pairs = null;
ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
pairs, false);
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 03:16:04 UTC (rev 12089)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 03:16:10 UTC (rev 12090)
@@ -240,6 +240,8 @@
while (timeout > System.currentTimeMillis() && queue.getPageSubscription().isPaging())
{
Thread.sleep(100);
+ // Force what would happen on an expire
+ queue.expireReferences();
}
try
12 years, 11 months
JBoss hornetq SVN: r12089 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-06 22:16:04 -0500 (Mon, 06 Feb 2012)
New Revision: 12089
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
fixing tests
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-07 02:46:48 UTC (rev 12088)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-07 03:16:04 UTC (rev 12089)
@@ -138,7 +138,6 @@
backupConfig.setSharedStore(true);
backupConfig.setBackup(true);
backupConfig.setClustered(true);
- backupConfig.setMessageExpiryScanPeriod(100);
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -157,7 +156,6 @@
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
- liveConfig.setMessageExpiryScanPeriod(100);
List<String> pairs = null;
ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
pairs, false);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 02:46:48 UTC (rev 12088)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 03:16:04 UTC (rev 12089)
@@ -240,6 +240,8 @@
while (timeout > System.currentTimeMillis() && queue.getPageSubscription().isPaging())
{
Thread.sleep(100);
+ // Force what would happen on an expire
+ queue.expireReferences();
}
try
12 years, 11 months
JBoss hornetq SVN: r12088 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-06 21:46:48 -0500 (Mon, 06 Feb 2012)
New Revision: 12088
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
HORNETQ-843/JBPAPP-8067 - fixing expiry on Paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-02-07 02:46:06 UTC (rev 12087)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-02-07 02:46:48 UTC (rev 12088)
@@ -1223,8 +1223,10 @@
try
{
boolean expired = false;
+ boolean hasElements = false;
while (iter.hasNext())
{
+ hasElements = true;
MessageReference ref = iter.next();
try
{
@@ -1243,7 +1245,8 @@
}
}
- if (expired && pageIterator != null && pageIterator.hasNext())
+ // If empty we need to schedule depaging to make sure we would depage expired messages as well
+ if ((!hasElements || expired && pageIterator != null) && pageIterator.hasNext())
{
scheduleDepage();
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-07 02:46:06 UTC (rev 12087)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-07 02:46:48 UTC (rev 12088)
@@ -138,6 +138,7 @@
backupConfig.setSharedStore(true);
backupConfig.setBackup(true);
backupConfig.setClustered(true);
+ backupConfig.setMessageExpiryScanPeriod(100);
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -156,6 +157,7 @@
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
+ liveConfig.setMessageExpiryScanPeriod(100);
List<String> pairs = null;
ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
pairs, false);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 02:46:06 UTC (rev 12087)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 02:46:48 UTC (rev 12088)
@@ -31,6 +31,7 @@
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -202,6 +203,68 @@
}
}
+ public void testExpireMessage() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ ClientSession session = sf.createSession(true, true, 0);
+
+ try
+ {
+
+ session.createQueue(PagingFailoverTest.ADDRESS, PagingFailoverTest.ADDRESS, true);
+
+ ClientProducer prod = session.createProducer(PagingFailoverTest.ADDRESS);
+
+ final int TOTAL_MESSAGES = 1000;
+
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty(new SimpleString("key"), i);
+ msg.setExpiration(System.currentTimeMillis() + 1000);
+ prod.send(msg);
+ }
+
+ crash(session);
+
+ session.close();
+
+ Queue queue = backupServer.getServer().locateQueue(ADDRESS);
+
+ long timeout = System.currentTimeMillis() + 60000;
+ System.out.println("Starting now");
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().isPaging())
+ {
+ Thread.sleep(100);
+ }
+
+ try
+ {
+ assertFalse(queue.getPageSubscription().isPaging());
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+
/**
* @param session
* @param latch
12 years, 11 months
JBoss hornetq SVN: r12087 - in branches/Branch_2_2_AS7: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-06 21:46:06 -0500 (Mon, 06 Feb 2012)
New Revision: 12087
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
HORNETQ-843 - fixing expiry on Paging
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-02-06 14:29:24 UTC (rev 12086)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java 2012-02-07 02:46:06 UTC (rev 12087)
@@ -1223,8 +1223,10 @@
try
{
boolean expired = false;
+ boolean hasElements = false;
while (iter.hasNext())
{
+ hasElements = true;
MessageReference ref = iter.next();
try
{
@@ -1243,7 +1245,8 @@
}
}
- if (expired && pageIterator != null && pageIterator.hasNext())
+ // If empty we need to schedule depaging to make sure we would depage expired messages as well
+ if ((!hasElements || expired && pageIterator != null) && pageIterator.hasNext())
{
scheduleDepage();
}
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-06 14:29:24 UTC (rev 12086)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2012-02-07 02:46:06 UTC (rev 12087)
@@ -138,6 +138,7 @@
backupConfig.setSharedStore(true);
backupConfig.setBackup(true);
backupConfig.setClustered(true);
+ backupConfig.setMessageExpiryScanPeriod(100);
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -156,6 +157,7 @@
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
+ liveConfig.setMessageExpiryScanPeriod(100);
List<String> pairs = null;
ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
pairs, false);
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-06 14:29:24 UTC (rev 12086)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2012-02-07 02:46:06 UTC (rev 12087)
@@ -31,6 +31,7 @@
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -202,6 +203,68 @@
}
}
+ public void testExpireMessage() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ ClientSession session = sf.createSession(true, true, 0);
+
+ try
+ {
+
+ session.createQueue(PagingFailoverTest.ADDRESS, PagingFailoverTest.ADDRESS, true);
+
+ ClientProducer prod = session.createProducer(PagingFailoverTest.ADDRESS);
+
+ final int TOTAL_MESSAGES = 1000;
+
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty(new SimpleString("key"), i);
+ msg.setExpiration(System.currentTimeMillis() + 1000);
+ prod.send(msg);
+ }
+
+ crash(session);
+
+ session.close();
+
+ Queue queue = backupServer.getServer().locateQueue(ADDRESS);
+
+ long timeout = System.currentTimeMillis() + 60000;
+ System.out.println("Starting now");
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().isPaging())
+ {
+ Thread.sleep(100);
+ }
+
+ try
+ {
+ assertFalse(queue.getPageSubscription().isPaging());
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+
/**
* @param session
* @param latch
12 years, 11 months
JBoss hornetq SVN: r12086 - in trunk: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-06 09:29:24 -0500 (Mon, 06 Feb 2012)
New Revision: 12086
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
Log:
HORNETQ-720 Fix sync of largeMessages that are being uploaded when sync starts.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java 2012-02-06 11:46:58 UTC (rev 12085)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java 2012-02-06 14:29:24 UTC (rev 12086)
@@ -36,12 +36,15 @@
SequentialFile mainSeqFile = mainLM.getFile();
if (appendFile != null)
{
+ appendFile.close();
+ appendFile.open();
for (;;)
{
buffer.rewind();
- int size = appendFile.read(buffer);
+ int bytesRead = appendFile.read(buffer);
+ if (bytesRead > 0)
mainSeqFile.writeInternal(buffer);
- if (size < buffer.capacity())
+ if (bytesRead < buffer.capacity())
{
break;
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-02-06 11:46:58 UTC (rev 12085)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-02-06 14:29:24 UTC (rev 12086)
@@ -548,7 +548,7 @@
while (true)
{
buffer.clear();
- int bytesRead = channel.read(buffer);
+ final int bytesRead = channel.read(buffer);
int toSend = bytesRead;
if (bytesRead > 0)
{
@@ -566,7 +566,7 @@
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
- sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, bytesRead, buffer));
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-02-06 11:46:58 UTC (rev 12085)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-02-06 14:29:24 UTC (rev 12086)
@@ -103,7 +103,7 @@
{
session.start();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- ClientMessage msg = consumer.receive(200);
+ ClientMessage msg = consumer.receiveImmediate();
assertNull("there should be no more messages to receive! " + msg, msg);
consumer.close();
session.commit();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2012-02-06 11:46:58 UTC (rev 12085)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2012-02-06 14:29:24 UTC (rev 12086)
@@ -7,7 +7,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
@@ -81,7 +85,8 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final ClientMessage message = session.createMessage(true);
- message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1000 * MIN_LARGE_MESSAGE));
+ final int largeMessageSize = 1000 * MIN_LARGE_MESSAGE;
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(largeMessageSize));
final AtomicBoolean caughtException = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
@@ -115,6 +120,20 @@
latch2.await();
crash(session);
assertFalse("no exceptions while sending message", caughtException.get());
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ ClientMessage msg = consumer.receive(2000);
+ HornetQBuffer buffer = msg.getBodyBuffer();
+
+ for (int j = 0; j < largeMessageSize; j++)
+ {
+ Assert.assertTrue("large msg , expecting " + largeMessageSize + " bytes, got " + j, buffer.readable());
+ Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j), buffer.readByte());
+ }
+ assertNull("there should be no more messages!", consumer.receiveImmediate());
+ consumer.close();
+ session.commit();
}
private Set<Long> getAllMessageFileIds(File dir)
12 years, 11 months