JBoss hornetq SVN: r9580 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/discovery.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-23 10:52:52 -0400 (Mon, 23 Aug 2010)
New Revision: 9580
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
Log:
fix DiscoveryTest
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2010-08-23 12:47:18 UTC (rev 9579)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2010-08-23 14:52:52 UTC (rev 9580)
@@ -15,6 +15,10 @@
import java.net.InetAddress;
import java.net.NetworkInterface;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
@@ -22,7 +26,6 @@
import junit.framework.Assert;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
@@ -98,17 +101,8 @@
Assert.assertTrue(ok);
List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
- Assert.assertNotNull(entries);
-
- Assert.assertEquals(1, entries.size());
-
- DiscoveryEntry entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live1, entry.getConnector());
-
bg.stop();
dg.stop();
@@ -193,17 +187,8 @@
Assert.assertTrue(ok);
List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
- Assert.assertNotNull(entries);
-
- Assert.assertEquals(1, entries.size());
-
- DiscoveryEntry entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live1, entry.getConnector());
-
bg.stop();
dg.stop();
@@ -249,17 +234,8 @@
Assert.assertTrue(ok);
List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
- Assert.assertNotNull(entries);
-
- Assert.assertEquals(1, entries.size());
-
- DiscoveryEntry entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live1, entry.getConnector());
-
bg.stop();
dg.stop();
@@ -275,17 +251,7 @@
Assert.assertTrue(ok);
entries = dg.getDiscoveryEntries();
-
- Assert.assertNotNull(entries);
-
- Assert.assertEquals(1, entries.size());
-
- entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live1, entry.getConnector());
-
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
}
public void testIgnoreTrafficFromOwnNode() throws Exception
@@ -325,14 +291,8 @@
Assert.assertNotNull(entries);
- Assert.assertEquals(1, entries.size());
+ Assert.assertEquals(0, entries.size());
- DiscoveryEntry entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live1, entry.getConnector());
-
bg.stop();
dg.stop();
@@ -555,45 +515,18 @@
boolean ok = dg1.waitForBroadcast(1000);
Assert.assertTrue(ok);
List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
- Assert.assertNotNull(entries);
-
- Assert.assertEquals(1, entries.size());
-
- DiscoveryEntry entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live1, entry.getConnector());
-
ok = dg2.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live2), entries);
- Assert.assertNotNull(entries);
-
- Assert.assertEquals(1, entries.size());
-
- entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live2, entry.getConnector());
-
ok = dg3.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live3), entries);
- Assert.assertNotNull(entries);
-
- Assert.assertEquals(1, entries.size());
-
- entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live3, entry.getConnector());
-
bg1.stop();
bg2.stop();
bg3.stop();
@@ -734,16 +667,7 @@
boolean ok = dg.waitForBroadcast(1000);
Assert.assertTrue(ok);
List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
-
- Assert.assertNotNull(entries);
-
- Assert.assertEquals(1, entries.size());
-
- DiscoveryEntry entry = entries.get(0);
-
- Assert.assertNotNull(entry);
-
- Assert.assertEquals(live1, entry.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
Assert.assertTrue(listener1.called);
Assert.assertTrue(listener2.called);
listener1.called = false;
@@ -753,14 +677,7 @@
ok = dg.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(2, entries.size());
- DiscoveryEntry entry1 = entries.get(0);
- Assert.assertNotNull(entry1);
- Assert.assertEquals(live1, entry1.getConnector());
- DiscoveryEntry entry2 = entries.get(1);
- Assert.assertNotNull(entry2);
- Assert.assertEquals(live2, entry2.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2), entries);
Assert.assertTrue(listener1.called);
Assert.assertTrue(listener2.called);
listener1.called = false;
@@ -770,17 +687,7 @@
ok = dg.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(2, entries.size());
- entry1 = entries.get(0);
- Assert.assertNotNull(entry1);
- Assert.assertEquals(live1, entry1.getConnector());
- entry2 = entries.get(1);
- Assert.assertNotNull(entry2);
- Assert.assertEquals(live2, entry2.getConnector());
- DiscoveryEntry entry3 = entries.get(2);
- Assert.assertNotNull(entry3);
- Assert.assertEquals(live3, entry3.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
Assert.assertTrue(listener1.called);
Assert.assertTrue(listener2.called);
listener1.called = false;
@@ -790,17 +697,7 @@
ok = dg.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(2, entries.size());
- entry1 = entries.get(0);
- Assert.assertNotNull(entry1);
- Assert.assertEquals(live1, entry1.getConnector());
- entry2 = entries.get(1);
- Assert.assertNotNull(entry2);
- Assert.assertEquals(live2, entry2.getConnector());
- entry3 = entries.get(2);
- Assert.assertNotNull(entry3);
- Assert.assertEquals(live3, entry3.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
Assert.assertFalse(listener1.called);
Assert.assertFalse(listener2.called);
listener1.called = false;
@@ -810,17 +707,7 @@
ok = dg.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(2, entries.size());
- entry1 = entries.get(0);
- Assert.assertNotNull(entry1);
- Assert.assertEquals(live1, entry1.getConnector());
- entry2 = entries.get(1);
- Assert.assertNotNull(entry2);
- Assert.assertEquals(live2, entry2.getConnector());
- entry3 = entries.get(2);
- Assert.assertNotNull(entry3);
- Assert.assertEquals(live3, entry3.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
Assert.assertFalse(listener1.called);
Assert.assertFalse(listener2.called);
listener1.called = false;
@@ -830,17 +717,7 @@
ok = dg.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(2, entries.size());
- entry1 = entries.get(0);
- Assert.assertNotNull(entry1);
- Assert.assertEquals(live1, entry1.getConnector());
- entry2 = entries.get(1);
- Assert.assertNotNull(entry2);
- Assert.assertEquals(live2, entry2.getConnector());
- entry3 = entries.get(2);
- Assert.assertNotNull(entry3);
- Assert.assertEquals(live3, entry3.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
Assert.assertFalse(listener1.called);
Assert.assertFalse(listener2.called);
listener1.called = false;
@@ -854,18 +731,7 @@
// Connector2 should still be there since not timed out yet
entries = dg.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(2, entries.size());
- entry1 = entries.get(0);
- Assert.assertNotNull(entry1);
- Assert.assertEquals(live1, entry1.getConnector());
- entry2 = entries.get(1);
- Assert.assertNotNull(entry2);
- Assert.assertEquals(live2, entry2.getConnector());
- entry3 = entries.get(2);
- Assert.assertNotNull(entry3);
- Assert.assertEquals(live3, entry3.getConnector());
-
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
Assert.assertFalse(listener1.called);
Assert.assertFalse(listener2.called);
listener1.called = false;
@@ -881,15 +747,7 @@
ok = dg.waitForBroadcast(1000);
entries = dg.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(2, entries.size());
- entry1 = entries.get(0);
- Assert.assertNotNull(entry1);
- Assert.assertEquals(live1, entry1.getConnector());
- entry3 = entries.get(1);
- Assert.assertNotNull(entry3);
- Assert.assertEquals(live3, entry3.getConnector());
-
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live3), entries);
Assert.assertTrue(listener1.called);
Assert.assertTrue(listener2.called);
listener1.called = false;
@@ -987,29 +845,17 @@
boolean ok = dg1.waitForBroadcast(1000);
Assert.assertTrue(ok);
List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(1, entries.size());
- DiscoveryEntry entry = entries.get(0);
- Assert.assertNotNull(entry);
- Assert.assertEquals(live1, entry.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
ok = dg2.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg2.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(1, entries.size());
- entry = entries.get(0);
- Assert.assertNotNull(entry);
- Assert.assertEquals(live1, entry.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
ok = dg3.waitForBroadcast(1000);
Assert.assertTrue(ok);
entries = dg3.getDiscoveryEntries();
- Assert.assertNotNull(entries);
- Assert.assertEquals(1, entries.size());
- entry = entries.get(0);
- Assert.assertNotNull(entry);
- Assert.assertEquals(live1, entry.getConnector());
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
bg.stop();
@@ -1117,5 +963,35 @@
called = true;
}
}
+
+
+ private static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
+ {
+ assertNotNull(actual);
+
+ List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
+ Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
+ {
+ public int compare(TransportConfiguration o1, TransportConfiguration o2)
+ {
+ return o2.toString().compareTo(o1.toString());
+ }
+ });
+ List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
+ Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
+ {
+ public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
+ {
+ return o2.getConnector().toString().compareTo(o1.getConnector().toString());
+ }
+ });
+
+ assertEquals(sortedExpected.size(), sortedActual.size());
+ for (int i = 0; i < sortedExpected.size(); i++)
+ {
+ assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
+ }
+ }
+
}
13 years, 9 months
JBoss hornetq SVN: r9579 - in branches/2_2_0_HA_Improvements: tests/src/org/hornetq/tests/integration/client and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-23 08:47:18 -0400 (Mon, 23 Aug 2010)
New Revision: 9579
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
uncommented and fixed SessionFactoryTest
* flag Topology and TopologyMember as serializable since they're members of ServerLocator
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-23 12:46:05 UTC (rev 9578)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-23 12:47:18 UTC (rev 9579)
@@ -12,6 +12,7 @@
*/
package org.hornetq.core.client.impl;
+import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -23,8 +24,12 @@
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Aug 16, 2010
*/
-public class Topology
+public class Topology implements Serializable
{
+ /**
+ *
+ */
+ private static final long serialVersionUID = -9037171688692471371L;
/*
* topology describes the other cluster nodes that this server knows about:
*
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java 2010-08-23 12:46:05 UTC (rev 9578)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java 2010-08-23 12:47:18 UTC (rev 9579)
@@ -12,6 +12,8 @@
*/
package org.hornetq.core.client.impl;
+import java.io.Serializable;
+
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
@@ -19,8 +21,10 @@
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Aug 16, 2010
*/
-public class TopologyMember
+public class TopologyMember implements Serializable
{
+ private static final long serialVersionUID = 1123652191795626133L;
+
private final Pair<TransportConfiguration, TransportConfiguration> connector;
private final int distance;
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-08-23 12:46:05 UTC (rev 9578)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java 2010-08-23 12:47:18 UTC (rev 9579)
@@ -17,27 +17,23 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
@@ -60,42 +56,39 @@
private HornetQServer liveService;
- private HornetQServer backupService;
-
private TransportConfiguration liveTC;
- private TransportConfiguration backupTC;
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ startServer();
+ }
+
@Override
protected void tearDown() throws Exception
{
- if (liveService != null && liveService.isStarted())
+ if (liveService != null)
{
liveService.stop();
}
- if (backupService != null && backupService.isStarted())
- {
- backupService.stop();
- }
+
liveService = null;
- backupService = null;
liveTC = null;
- backupTC = null;
super.tearDown();
}
- /* public void testSerializable() throws Exception
+ public void testSerializable() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName()));
- ClientSessionFactory cf = locator.createSessionFactory();
-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(cf);
+ oos.writeObject(locator);
oos.close();
@@ -105,289 +98,110 @@
ObjectInputStream ois = new ObjectInputStream(bais);
- ClientSessionFactoryImpl csi = (ClientSessionFactoryImpl)ois.readObject();
+ ServerLocator csi = (ServerLocator)ois.readObject();
Assert.assertNotNull(csi);
}
public void testCloseUnusedClientSessionFactoryWithoutGlobalPools() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(liveTC);
ClientSessionFactory csf = locator.createSessionFactory();
- csf.getServerLocator().setUseGlobalPools(false);
csf.close();
}
- public void testDefaultConstructor() throws Exception
+ public void testDiscoveryConstructor() throws Exception
{
- try
- {
- startLiveAndBackup();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(groupAddress, groupPort);
- ClientSessionFactory cf = locator.createSessionFactory();
-
- assertFactoryParams(cf,
- null,
- null,
- 0,
- HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- HornetQClient.DEFAULT_CALL_TIMEOUT,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
- HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
- HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
- HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND,
- HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND,
- HornetQClient.DEFAULT_AUTO_GROUP,
- HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
- HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- HornetQClient.DEFAULT_ACK_BATCH_SIZE,
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
- HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_RETRY_INTERVAL,
- HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
- HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- try
- {
- ClientSession session = cf.createSession(false, true, true);
- Assert.fail("Should throw exception");
- }
- catch (HornetQException e)
- {
- e.printStackTrace();
- // Ok
- }
- final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
- Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(liveTC,
- backupTC);
- staticConnectors.add(pair0);
- cf.getServerLocator().setStaticConnectors(staticConnectors);
- ClientSession session = cf.createSession(false, true, true);
- Assert.assertNotNull(session);
- session.close();
- testSettersThrowException(cf);
- }
- finally
- {
- stopLiveAndBackup();
- }
+ assertFactoryParams(locator,
+ null,
+ groupAddress,
+ groupPort,
+ HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
+ HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
+ HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND,
+ HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND,
+ HornetQClient.DEFAULT_AUTO_GROUP,
+ HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
+ HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
+ HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
+ HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+
+ ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSession session = cf.createSession(false, true, true);
+ Assert.assertNotNull(session);
+ session.close();
+ testSettersThrowException(cf);
+
+ cf.close();
}
- public void testDiscoveryConstructor() throws Exception
- {
- try
- {
- startLiveAndBackup();
- ClientSessionFactory cf = HornetQClient.createClientSessionFactory(groupAddress, groupPort);
- assertFactoryParams(cf,
- null,
- groupAddress,
- groupPort,
- HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- HornetQClient.DEFAULT_CALL_TIMEOUT,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
- HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
- HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
- HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND,
- HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND,
- HornetQClient.DEFAULT_AUTO_GROUP,
- HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
- HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- HornetQClient.DEFAULT_ACK_BATCH_SIZE,
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
- HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_RETRY_INTERVAL,
- HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
- HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- ClientSession session = cf.createSession(false, true, true);
- Assert.assertNotNull(session);
- session.close();
- testSettersThrowException(cf);
- }
- finally
- {
- stopLiveAndBackup();
- }
- }
-
public void testStaticConnectorListConstructor() throws Exception
{
- try
- {
- startLiveAndBackup();
- final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
- Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(liveTC,
- backupTC);
- staticConnectors.add(pair0);
+ TransportConfiguration[] tc = new TransportConfiguration[] { liveTC };
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(tc);
- ClientSessionFactory cf = HornetQClient.createClientSessionFactory(staticConnectors);
- assertFactoryParams(cf,
- staticConnectors,
- null,
- 0,
- HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- HornetQClient.DEFAULT_CALL_TIMEOUT,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
- HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
- HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
- HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND,
- HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND,
- HornetQClient.DEFAULT_AUTO_GROUP,
- HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
- HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- HornetQClient.DEFAULT_ACK_BATCH_SIZE,
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
- HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_RETRY_INTERVAL,
- HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
- HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- ClientSession session = cf.createSession(false, true, true);
- Assert.assertNotNull(session);
- session.close();
- testSettersThrowException(cf);
- }
- finally
- {
- stopLiveAndBackup();
- }
+ assertFactoryParams(locator,
+ tc,
+ null,
+ -1,
+ HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
+ HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
+ HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
+ HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
+ HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND,
+ HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND,
+ HornetQClient.DEFAULT_AUTO_GROUP,
+ HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
+ HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ HornetQClient.DEFAULT_ACK_BATCH_SIZE,
+ HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
+ HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
+ HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
+ HornetQClient.DEFAULT_RETRY_INTERVAL,
+ HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
+ HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+
+ ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSession session = cf.createSession(false, true, true);
+ Assert.assertNotNull(session);
+ session.close();
+ testSettersThrowException(cf);
+
+ cf.close();
}
- public void testStaticConnectorLiveAndBackupConstructor() throws Exception
+ public void testGettersAndSetters() throws Exception
{
- try
- {
- startLiveAndBackup();
- final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
- Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(liveTC,
- backupTC);
- staticConnectors.add(pair0);
- ClientSessionFactory cf = HornetQClient.createClientSessionFactory(liveTC, backupTC);
- assertFactoryParams(cf,
- staticConnectors,
- null,
- 0,
- HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- HornetQClient.DEFAULT_CALL_TIMEOUT,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
- HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
- HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
- HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND,
- HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND,
- HornetQClient.DEFAULT_AUTO_GROUP,
- HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
- HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- HornetQClient.DEFAULT_ACK_BATCH_SIZE,
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
- HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_RETRY_INTERVAL,
- HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
- HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- ClientSession session = cf.createSession(false, true, true);
- Assert.assertNotNull(session);
- session.close();
- testSettersThrowException(cf);
- }
- finally
- {
- stopLiveAndBackup();
- }
- }
+ TransportConfiguration[] tc = new TransportConfiguration[] { liveTC };
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(tc);
- public void testStaticConnectorLiveConstructor() throws Exception
- {
- try
- {
- startLiveAndBackup();
- final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
- Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(liveTC,
- null);
- staticConnectors.add(pair0);
-
- ClientSessionFactory cf = HornetQClient.createClientSessionFactory(liveTC);
- assertFactoryParams(cf,
- staticConnectors,
- null,
- 0,
- HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- HornetQClient.DEFAULT_CALL_TIMEOUT,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
- HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
- HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
- HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND,
- HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND,
- HornetQClient.DEFAULT_AUTO_GROUP,
- HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
- HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- HornetQClient.DEFAULT_ACK_BATCH_SIZE,
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
- HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_RETRY_INTERVAL,
- HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
- HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- ClientSession session = cf.createSession(false, true, true);
- Assert.assertNotNull(session);
- session.close();
- testSettersThrowException(cf);
- }
- finally
- {
- stopLiveAndBackup();
- }
- }
-
- public void testGettersAndSetters()
- {
- ClientSessionFactory cf = HornetQClient.createClientSessionFactory();
-
- List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
- Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(liveTC,
- backupTC);
- staticConnectors.add(pair0);
-
- String discoveryAddress = RandomUtil.randomString();
- int discoveryPort = RandomUtil.randomPositiveInt();
long discoveryRefreshTimeout = RandomUtil.randomPositiveLong();
long clientFailureCheckPeriod = RandomUtil.randomPositiveLong();
long connectionTTL = RandomUtil.randomPositiveLong();
@@ -413,73 +227,64 @@
int reconnectAttempts = RandomUtil.randomPositiveInt();
boolean failoverOnServerShutdown = RandomUtil.randomBoolean();
- cf.setStaticConnectors(staticConnectors);
- cf.setDiscoveryAddress(discoveryAddress);
- cf.setDiscoveryPort(discoveryPort);
- cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
- cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
- cf.setConnectionTTL(connectionTTL);
- cf.setCallTimeout(callTimeout);
- cf.setMinLargeMessageSize(minLargeMessageSize);
- cf.setConsumerWindowSize(consumerWindowSize);
- cf.setConsumerMaxRate(consumerMaxRate);
- cf.setConfirmationWindowSize(confirmationWindowSize);
- cf.setProducerMaxRate(producerMaxRate);
- cf.setBlockOnAcknowledge(blockOnAcknowledge);
- cf.setBlockOnDurableSend(blockOnDurableSend);
- cf.setBlockOnNonDurableSend(blockOnNonDurableSend);
- cf.setAutoGroup(autoGroup);
- cf.setPreAcknowledge(preAcknowledge);
- cf.setConnectionLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
- cf.setAckBatchSize(ackBatchSize);
- cf.setDiscoveryInitialWaitTimeout(initialWaitTimeout);
- cf.setUseGlobalPools(useGlobalPools);
- cf.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
- cf.setThreadPoolMaxSize(threadPoolMaxSize);
- cf.setRetryInterval(retryInterval);
- cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
- cf.setReconnectAttempts(reconnectAttempts);
- cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
+ locator.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
+ locator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ locator.setConnectionTTL(connectionTTL);
+ locator.setCallTimeout(callTimeout);
+ locator.setMinLargeMessageSize(minLargeMessageSize);
+ locator.setConsumerWindowSize(consumerWindowSize);
+ locator.setConsumerMaxRate(consumerMaxRate);
+ locator.setConfirmationWindowSize(confirmationWindowSize);
+ locator.setProducerMaxRate(producerMaxRate);
+ locator.setBlockOnAcknowledge(blockOnAcknowledge);
+ locator.setBlockOnDurableSend(blockOnDurableSend);
+ locator.setBlockOnNonDurableSend(blockOnNonDurableSend);
+ locator.setAutoGroup(autoGroup);
+ locator.setPreAcknowledge(preAcknowledge);
+ locator.setConnectionLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
+ locator.setAckBatchSize(ackBatchSize);
+ locator.setDiscoveryInitialWaitTimeout(initialWaitTimeout);
+ locator.setUseGlobalPools(useGlobalPools);
+ locator.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+ locator.setThreadPoolMaxSize(threadPoolMaxSize);
+ locator.setRetryInterval(retryInterval);
+ locator.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ locator.setReconnectAttempts(reconnectAttempts);
+ locator.setFailoverOnServerShutdown(failoverOnServerShutdown);
- Assert.assertEquals(staticConnectors, cf.getStaticConnectors());
- Assert.assertEquals(discoveryAddress, cf.getDiscoveryAddress());
- Assert.assertEquals(discoveryPort, cf.getDiscoveryPort());
- Assert.assertEquals(discoveryRefreshTimeout, cf.getDiscoveryRefreshTimeout());
- Assert.assertEquals(clientFailureCheckPeriod, cf.getClientFailureCheckPeriod());
- Assert.assertEquals(connectionTTL, cf.getConnectionTTL());
- Assert.assertEquals(callTimeout, cf.getCallTimeout());
- Assert.assertEquals(minLargeMessageSize, cf.getMinLargeMessageSize());
- Assert.assertEquals(consumerWindowSize, cf.getConsumerWindowSize());
- Assert.assertEquals(consumerMaxRate, cf.getConsumerMaxRate());
- Assert.assertEquals(confirmationWindowSize, cf.getConfirmationWindowSize());
- Assert.assertEquals(producerMaxRate, cf.getProducerMaxRate());
- Assert.assertEquals(blockOnAcknowledge, cf.isBlockOnAcknowledge());
- Assert.assertEquals(blockOnDurableSend, cf.isBlockOnDurableSend());
- Assert.assertEquals(blockOnNonDurableSend, cf.isBlockOnNonDurableSend());
- Assert.assertEquals(autoGroup, cf.isAutoGroup());
- Assert.assertEquals(preAcknowledge, cf.isPreAcknowledge());
- Assert.assertEquals(loadBalancingPolicyClassName, cf.getConnectionLoadBalancingPolicyClassName());
- Assert.assertEquals(ackBatchSize, cf.getAckBatchSize());
- Assert.assertEquals(initialWaitTimeout, cf.getDiscoveryInitialWaitTimeout());
- Assert.assertEquals(useGlobalPools, cf.isUseGlobalPools());
- Assert.assertEquals(scheduledThreadPoolMaxSize, cf.getScheduledThreadPoolMaxSize());
- Assert.assertEquals(threadPoolMaxSize, cf.getThreadPoolMaxSize());
- Assert.assertEquals(retryInterval, cf.getRetryInterval());
- Assert.assertEquals(retryIntervalMultiplier, cf.getRetryIntervalMultiplier());
- Assert.assertEquals(reconnectAttempts, cf.getReconnectAttempts());
- Assert.assertEquals(failoverOnServerShutdown, cf.isFailoverOnServerShutdown());
+ assertEqualsTransportConfigurations(tc, locator.getStaticTransportConfigurations());
+ Assert.assertEquals(null, locator.getDiscoveryAddress());
+ Assert.assertEquals(-1, locator.getDiscoveryPort());
+ Assert.assertEquals(discoveryRefreshTimeout, locator.getDiscoveryRefreshTimeout());
+ Assert.assertEquals(clientFailureCheckPeriod, locator.getClientFailureCheckPeriod());
+ Assert.assertEquals(connectionTTL, locator.getConnectionTTL());
+ Assert.assertEquals(callTimeout, locator.getCallTimeout());
+ Assert.assertEquals(minLargeMessageSize, locator.getMinLargeMessageSize());
+ Assert.assertEquals(consumerWindowSize, locator.getConsumerWindowSize());
+ Assert.assertEquals(consumerMaxRate, locator.getConsumerMaxRate());
+ Assert.assertEquals(confirmationWindowSize, locator.getConfirmationWindowSize());
+ Assert.assertEquals(producerMaxRate, locator.getProducerMaxRate());
+ Assert.assertEquals(blockOnAcknowledge, locator.isBlockOnAcknowledge());
+ Assert.assertEquals(blockOnDurableSend, locator.isBlockOnDurableSend());
+ Assert.assertEquals(blockOnNonDurableSend, locator.isBlockOnNonDurableSend());
+ Assert.assertEquals(autoGroup, locator.isAutoGroup());
+ Assert.assertEquals(preAcknowledge, locator.isPreAcknowledge());
+ Assert.assertEquals(loadBalancingPolicyClassName, locator
+ .getConnectionLoadBalancingPolicyClassName());
+ Assert.assertEquals(ackBatchSize, locator.getAckBatchSize());
+ Assert.assertEquals(initialWaitTimeout, locator.getDiscoveryInitialWaitTimeout());
+ Assert.assertEquals(useGlobalPools, locator.isUseGlobalPools());
+ Assert.assertEquals(scheduledThreadPoolMaxSize, locator.getScheduledThreadPoolMaxSize());
+ Assert.assertEquals(threadPoolMaxSize, locator.getThreadPoolMaxSize());
+ Assert.assertEquals(retryInterval, locator.getRetryInterval());
+ Assert.assertEquals(retryIntervalMultiplier, locator.getRetryIntervalMultiplier());
+ Assert.assertEquals(reconnectAttempts, locator.getReconnectAttempts());
+ Assert.assertEquals(failoverOnServerShutdown, locator.isFailoverOnServerShutdown());
}
private void testSettersThrowException(final ClientSessionFactory cf)
{
- List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
- Pair<TransportConfiguration, TransportConfiguration> pair0 = new Pair<TransportConfiguration, TransportConfiguration>(liveTC,
- backupTC);
- staticConnectors.add(pair0);
-
- String discoveryAddress = RandomUtil.randomString();
- int discoveryPort = RandomUtil.randomPositiveInt();
long discoveryRefreshTimeout = RandomUtil.randomPositiveLong();
long clientFailureCheckPeriod = RandomUtil.randomPositiveLong();
long connectionTTL = RandomUtil.randomPositiveLong();
@@ -507,7 +312,7 @@
try
{
- cf.setStaticConnectors(staticConnectors);
+ cf.getServerLocator().setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -516,7 +321,7 @@
}
try
{
- cf.setDiscoveryAddress(discoveryAddress);
+ cf.getServerLocator().setClientFailureCheckPeriod(clientFailureCheckPeriod);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -525,7 +330,7 @@
}
try
{
- cf.setDiscoveryPort(discoveryPort);
+ cf.getServerLocator().setConnectionTTL(connectionTTL);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -534,7 +339,7 @@
}
try
{
- cf.setDiscoveryRefreshTimeout(discoveryRefreshTimeout);
+ cf.getServerLocator().setCallTimeout(callTimeout);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -543,7 +348,7 @@
}
try
{
- cf.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ cf.getServerLocator().setMinLargeMessageSize(minLargeMessageSize);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -552,7 +357,7 @@
}
try
{
- cf.setConnectionTTL(connectionTTL);
+ cf.getServerLocator().setConsumerWindowSize(consumerWindowSize);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -561,7 +366,7 @@
}
try
{
- cf.setCallTimeout(callTimeout);
+ cf.getServerLocator().setConsumerMaxRate(consumerMaxRate);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -570,7 +375,7 @@
}
try
{
- cf.setMinLargeMessageSize(minLargeMessageSize);
+ cf.getServerLocator().setConfirmationWindowSize(confirmationWindowSize);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -579,7 +384,7 @@
}
try
{
- cf.setConsumerWindowSize(consumerWindowSize);
+ cf.getServerLocator().setProducerMaxRate(producerMaxRate);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -588,7 +393,7 @@
}
try
{
- cf.setConsumerMaxRate(consumerMaxRate);
+ cf.getServerLocator().setBlockOnAcknowledge(blockOnAcknowledge);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -597,7 +402,7 @@
}
try
{
- cf.setConfirmationWindowSize(confirmationWindowSize);
+ cf.getServerLocator().setBlockOnDurableSend(blockOnDurableSend);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -606,7 +411,7 @@
}
try
{
- cf.setProducerMaxRate(producerMaxRate);
+ cf.getServerLocator().setBlockOnNonDurableSend(blockOnNonDurableSend);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -615,7 +420,7 @@
}
try
{
- cf.setBlockOnAcknowledge(blockOnAcknowledge);
+ cf.getServerLocator().setAutoGroup(autoGroup);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -624,7 +429,7 @@
}
try
{
- cf.setBlockOnDurableSend(blockOnDurableSend);
+ cf.getServerLocator().setPreAcknowledge(preAcknowledge);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -633,7 +438,7 @@
}
try
{
- cf.setBlockOnNonDurableSend(blockOnNonDurableSend);
+ cf.getServerLocator().setConnectionLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -642,7 +447,7 @@
}
try
{
- cf.setAutoGroup(autoGroup);
+ cf.getServerLocator().setAckBatchSize(ackBatchSize);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -651,7 +456,7 @@
}
try
{
- cf.setPreAcknowledge(preAcknowledge);
+ cf.getServerLocator().setDiscoveryInitialWaitTimeout(initialWaitTimeout);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -660,7 +465,7 @@
}
try
{
- cf.setConnectionLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
+ cf.getServerLocator().setUseGlobalPools(useGlobalPools);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -669,7 +474,7 @@
}
try
{
- cf.setAckBatchSize(ackBatchSize);
+ cf.getServerLocator().setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -678,7 +483,7 @@
}
try
{
- cf.setDiscoveryInitialWaitTimeout(initialWaitTimeout);
+ cf.getServerLocator().setThreadPoolMaxSize(threadPoolMaxSize);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -687,7 +492,7 @@
}
try
{
- cf.setUseGlobalPools(useGlobalPools);
+ cf.getServerLocator().setRetryInterval(retryInterval);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -696,7 +501,7 @@
}
try
{
- cf.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+ cf.getServerLocator().setRetryIntervalMultiplier(retryIntervalMultiplier);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -705,7 +510,7 @@
}
try
{
- cf.setThreadPoolMaxSize(threadPoolMaxSize);
+ cf.getServerLocator().setReconnectAttempts(reconnectAttempts);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
@@ -714,73 +519,46 @@
}
try
{
- cf.setRetryInterval(retryInterval);
+ cf.getServerLocator().setFailoverOnServerShutdown(failoverOnServerShutdown);
Assert.fail("Should throw exception");
}
catch (IllegalStateException e)
{
// OK
}
- try
- {
- cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
- Assert.fail("Should throw exception");
- }
- catch (IllegalStateException e)
- {
- // OK
- }
- try
- {
- cf.setReconnectAttempts(reconnectAttempts);
- Assert.fail("Should throw exception");
- }
- catch (IllegalStateException e)
- {
- // OK
- }
- try
- {
- cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
- Assert.fail("Should throw exception");
- }
- catch (IllegalStateException e)
- {
- // OK
- }
- cf.getStaticConnectors();
- cf.getDiscoveryAddress();
- cf.getDiscoveryPort();
- cf.getDiscoveryRefreshTimeout();
- cf.getClientFailureCheckPeriod();
- cf.getConnectionTTL();
- cf.getCallTimeout();
- cf.getMinLargeMessageSize();
- cf.getConsumerWindowSize();
- cf.getConsumerMaxRate();
- cf.getConfirmationWindowSize();
- cf.getProducerMaxRate();
- cf.isBlockOnAcknowledge();
- cf.isBlockOnDurableSend();
- cf.isBlockOnNonDurableSend();
- cf.isAutoGroup();
- cf.isPreAcknowledge();
- cf.getConnectionLoadBalancingPolicyClassName();
- cf.getAckBatchSize();
- cf.getDiscoveryInitialWaitTimeout();
- cf.isUseGlobalPools();
- cf.getScheduledThreadPoolMaxSize();
- cf.getThreadPoolMaxSize();
- cf.getRetryInterval();
- cf.getRetryIntervalMultiplier();
- cf.getReconnectAttempts();
- cf.isFailoverOnServerShutdown();
+ cf.getServerLocator().getStaticTransportConfigurations();
+ cf.getServerLocator().getDiscoveryAddress();
+ cf.getServerLocator().getDiscoveryPort();
+ cf.getServerLocator().getDiscoveryRefreshTimeout();
+ cf.getServerLocator().getClientFailureCheckPeriod();
+ cf.getServerLocator().getConnectionTTL();
+ cf.getServerLocator().getCallTimeout();
+ cf.getServerLocator().getMinLargeMessageSize();
+ cf.getServerLocator().getConsumerWindowSize();
+ cf.getServerLocator().getConsumerMaxRate();
+ cf.getServerLocator().getConfirmationWindowSize();
+ cf.getServerLocator().getProducerMaxRate();
+ cf.getServerLocator().isBlockOnAcknowledge();
+ cf.getServerLocator().isBlockOnDurableSend();
+ cf.getServerLocator().isBlockOnNonDurableSend();
+ cf.getServerLocator().isAutoGroup();
+ cf.getServerLocator().isPreAcknowledge();
+ cf.getServerLocator().getConnectionLoadBalancingPolicyClassName();
+ cf.getServerLocator().getAckBatchSize();
+ cf.getServerLocator().getDiscoveryInitialWaitTimeout();
+ cf.getServerLocator().isUseGlobalPools();
+ cf.getServerLocator().getScheduledThreadPoolMaxSize();
+ cf.getServerLocator().getThreadPoolMaxSize();
+ cf.getServerLocator().getRetryInterval();
+ cf.getServerLocator().getRetryIntervalMultiplier();
+ cf.getServerLocator().getReconnectAttempts();
+ cf.getServerLocator().isFailoverOnServerShutdown();
}
- private void assertFactoryParams(final ClientSessionFactory cf,
- final List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors,
+ private void assertFactoryParams(final ServerLocator locator,
+ final TransportConfiguration[] staticConnectors,
final String discoveryAddress,
final int discoveryPort,
final long discoveryRefreshTimeout,
@@ -808,93 +586,53 @@
final int reconnectAttempts,
final boolean failoverOnServerShutdown)
{
- List<Pair<TransportConfiguration, TransportConfiguration>> cfStaticConnectors = cf.getStaticConnectors();
if (staticConnectors == null)
{
- Assert.assertNull(cfStaticConnectors);
+ Assert.assertNull(locator.getStaticTransportConfigurations());
}
else
{
- Assert.assertEquals(staticConnectors.size(), cfStaticConnectors.size());
-
- for (int i = 0; i < staticConnectors.size(); i++)
- {
- Assert.assertEquals(staticConnectors.get(i), cfStaticConnectors.get(i));
- }
+ assertEqualsTransportConfigurations(staticConnectors, locator.getStaticTransportConfigurations());
}
- Assert.assertEquals(cf.getDiscoveryAddress(), discoveryAddress);
- Assert.assertEquals(cf.getDiscoveryPort(), discoveryPort);
- Assert.assertEquals(cf.getDiscoveryRefreshTimeout(), discoveryRefreshTimeout);
- Assert.assertEquals(cf.getClientFailureCheckPeriod(), clientFailureCheckPeriod);
- Assert.assertEquals(cf.getConnectionTTL(), connectionTTL);
- Assert.assertEquals(cf.getCallTimeout(), callTimeout);
- Assert.assertEquals(cf.getMinLargeMessageSize(), minLargeMessageSize);
- Assert.assertEquals(cf.getConsumerWindowSize(), consumerWindowSize);
- Assert.assertEquals(cf.getConsumerMaxRate(), consumerMaxRate);
- Assert.assertEquals(cf.getConfirmationWindowSize(), confirmationWindowSize);
- Assert.assertEquals(cf.getProducerMaxRate(), producerMaxRate);
- Assert.assertEquals(cf.isBlockOnAcknowledge(), blockOnAcknowledge);
- Assert.assertEquals(cf.isBlockOnDurableSend(), blockOnDurableSend);
- Assert.assertEquals(cf.isBlockOnNonDurableSend(), blockOnNonDurableSend);
- Assert.assertEquals(cf.isAutoGroup(), autoGroup);
- Assert.assertEquals(cf.isPreAcknowledge(), preAcknowledge);
- Assert.assertEquals(cf.getConnectionLoadBalancingPolicyClassName(), loadBalancingPolicyClassName);
- Assert.assertEquals(cf.getAckBatchSize(), ackBatchSize);
- Assert.assertEquals(cf.getDiscoveryInitialWaitTimeout(), initialWaitTimeout);
- Assert.assertEquals(cf.isUseGlobalPools(), useGlobalPools);
- Assert.assertEquals(cf.getScheduledThreadPoolMaxSize(), scheduledThreadPoolMaxSize);
- Assert.assertEquals(cf.getThreadPoolMaxSize(), threadPoolMaxSize);
- Assert.assertEquals(cf.getRetryInterval(), retryInterval);
- Assert.assertEquals(cf.getRetryIntervalMultiplier(), retryIntervalMultiplier);
- Assert.assertEquals(cf.getReconnectAttempts(), reconnectAttempts);
- Assert.assertEquals(cf.isFailoverOnServerShutdown(), failoverOnServerShutdown);
+ Assert.assertEquals(locator.getDiscoveryAddress(), discoveryAddress);
+ Assert.assertEquals(locator.getDiscoveryPort(), discoveryPort);
+ Assert.assertEquals(locator.getDiscoveryRefreshTimeout(), discoveryRefreshTimeout);
+ Assert.assertEquals(locator.getClientFailureCheckPeriod(), clientFailureCheckPeriod);
+ Assert.assertEquals(locator.getConnectionTTL(), connectionTTL);
+ Assert.assertEquals(locator.getCallTimeout(), callTimeout);
+ Assert.assertEquals(locator.getMinLargeMessageSize(), minLargeMessageSize);
+ Assert.assertEquals(locator.getConsumerWindowSize(), consumerWindowSize);
+ Assert.assertEquals(locator.getConsumerMaxRate(), consumerMaxRate);
+ Assert.assertEquals(locator.getConfirmationWindowSize(), confirmationWindowSize);
+ Assert.assertEquals(locator.getProducerMaxRate(), producerMaxRate);
+ Assert.assertEquals(locator.isBlockOnAcknowledge(), blockOnAcknowledge);
+ Assert.assertEquals(locator.isBlockOnDurableSend(), blockOnDurableSend);
+ Assert.assertEquals(locator.isBlockOnNonDurableSend(), blockOnNonDurableSend);
+ Assert.assertEquals(locator.isAutoGroup(), autoGroup);
+ Assert.assertEquals(locator.isPreAcknowledge(), preAcknowledge);
+ Assert.assertEquals(locator.getConnectionLoadBalancingPolicyClassName(),
+ loadBalancingPolicyClassName);
+ Assert.assertEquals(locator.getAckBatchSize(), ackBatchSize);
+ Assert.assertEquals(locator.getDiscoveryInitialWaitTimeout(), initialWaitTimeout);
+ Assert.assertEquals(locator.isUseGlobalPools(), useGlobalPools);
+ Assert.assertEquals(locator.getScheduledThreadPoolMaxSize(), scheduledThreadPoolMaxSize);
+ Assert.assertEquals(locator.getThreadPoolMaxSize(), threadPoolMaxSize);
+ Assert.assertEquals(locator.getRetryInterval(), retryInterval);
+ Assert.assertEquals(locator.getRetryIntervalMultiplier(), retryIntervalMultiplier);
+ Assert.assertEquals(locator.getReconnectAttempts(), reconnectAttempts);
+ Assert.assertEquals(locator.isFailoverOnServerShutdown(), failoverOnServerShutdown);
}
- private void stopLiveAndBackup() throws Exception
+ private void startServer() throws Exception
{
- if (liveService.isStarted())
- {
- SessionFactoryTest.log.info("stopping live");
- liveService.stop();
- }
- if (backupService.isStarted())
- {
- SessionFactoryTest.log.info("stopping backup");
- backupService.stop();
- }
- }
-
- private void startLiveAndBackup() throws Exception
- {
- Map<String, Object> backupParams = new HashMap<String, Object>();
- Configuration backupConf = new ConfigurationImpl();
- backupConf.setSecurityEnabled(false);
- backupConf.setClustered(true);
- backupConf.setSharedStore(true);
- backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
- backupConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", backupParams));
- backupConf.setBackup(true);
- backupService = HornetQServers.newHornetQServer(backupConf, false);
- backupService.start();
-
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
- liveTC = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
- liveConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- backupTC = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory", backupParams);
- connectors.put(backupTC.getName(), backupTC);
- connectors.put(liveTC.getName(), liveTC);
- liveConf.setConnectorConfigurations(connectors);
- liveConf.setBackupConnectorName(backupTC.getName());
+ liveTC = new TransportConfiguration(InVMConnectorFactory.class.getName());
+ liveConf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ liveConf.getConnectorConfigurations().put(liveTC.getName(), liveTC);
liveConf.setSharedStore(true);
liveConf.setClustered(true);
- List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
- connectorNames.add(new Pair<String, String>(liveTC.getName(), backupTC.getName()));
-
final long broadcastPeriod = 250;
final String bcGroupName = "bc1";
@@ -907,13 +645,13 @@
groupAddress,
groupPort,
broadcastPeriod,
- connectorNames);
+ Arrays.asList(liveTC.getName()));
List<BroadcastGroupConfiguration> bcConfigs1 = new ArrayList<BroadcastGroupConfiguration>();
bcConfigs1.add(bcConfig1);
liveConf.setBroadcastGroupConfigurations(bcConfigs1);
- liveService = HornetQServers.newHornetQServer(liveConf, false);
+ liveService = createFakeLockServer(false, liveConf);
liveService.start();
- }*/
+ }
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-08-23 12:46:05 UTC (rev 9578)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/UnitTestCase.java 2010-08-23 12:47:18 UTC (rev 9579)
@@ -47,6 +47,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
@@ -298,6 +299,15 @@
Assert.assertEquals("byte at index " + i, expected[i], actual[i]);
}
}
+
+ public static void assertEqualsTransportConfigurations(final TransportConfiguration[] expected, final TransportConfiguration[] actual)
+ {
+ assertEquals(expected.length, actual.length);
+ for (int i = 0; i < expected.length; i++)
+ {
+ Assert.assertEquals("TransportConfiguration at index " + i, expected[i], actual[i]);
+ }
+ }
public static void assertEqualsBuffers(final int size, final HornetQBuffer expected, final HornetQBuffer actual)
{
13 years, 9 months
JBoss hornetq SVN: r9578 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-23 08:46:05 -0400 (Mon, 23 Aug 2010)
New Revision: 9578
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
fix Bridge construction
* it is allowed to pass null as the bridge's forwarding address
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-21 20:34:23 UTC (rev 9577)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-23 12:46:05 UTC (rev 9578)
@@ -577,7 +577,7 @@
queue,
executorFactory.getExecutor(),
SimpleString.toSimpleString(config.getFilterString()),
- new SimpleString(config.getForwardingAddress()),
+ SimpleString.toSimpleString(config.getForwardingAddress()),
scheduledExecutor,
transformer,
config.isUseDuplicateDetection(),
13 years, 9 months
JBoss hornetq SVN: r9577 - in trunk/tests: jms-tests/src/org/hornetq/jms/tests/tools/ant and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-21 16:34:23 -0400 (Sat, 21 Aug 2010)
New Revision: 9577
Removed:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/DisplayWarningsAndErrors.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/FailOnSerializationDebugOutput.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/GenerateSmokeReport.java
trunk/tests/smoke/
Log:
Removing old artifacts
Deleted: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/DisplayWarningsAndErrors.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/DisplayWarningsAndErrors.java 2010-08-20 23:55:54 UTC (rev 9576)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/DisplayWarningsAndErrors.java 2010-08-21 20:34:23 UTC (rev 9577)
@@ -1,163 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.tests.tools.ant;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Greps fror WARN and ERROR entries in the specified file.
- *
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
- *
- * $Id$
- */
-public class DisplayWarningsAndErrors
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- public static void main(final String[] args) throws Exception
- {
- new DisplayWarningsAndErrors(args).run();
- }
-
- // Attributes ----------------------------------------------------
-
- private final File file;
-
- private List ignoreList;
-
- // Constructors --------------------------------------------------
-
- private DisplayWarningsAndErrors(final String[] args) throws Exception
- {
- if (args.length == 0)
- {
- throw new Exception("Specify the file to grep!");
- }
-
- file = new File(args[0]);
-
- if (!file.canRead())
- {
- throw new Exception("The file " + file + " does not exist or cannot be read");
- }
-
- for (int i = 1; i < args.length; i++)
- {
- if (ignoreList == null)
- {
- ignoreList = new ArrayList();
- }
-
- ignoreList.add(args[i]);
- }
- }
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private void run() throws Exception
- {
- FileReader fr = new FileReader(file);
- BufferedReader br = new BufferedReader(fr);
- boolean error = false;
-
- try
- {
- String line;
- boolean first = true;
- outer: while ((line = br.readLine()) != null)
- {
- if (line.indexOf("ERROR") != -1 || line.indexOf("WARN") != -1)
- {
- // System.out.println(">"+line+"<");
- if (ignoreList != null)
- {
- for (Iterator i = ignoreList.iterator(); i.hasNext();)
- {
- if (line.endsWith((String)i.next()))
- {
- continue outer;
- }
- }
- }
-
- if (first)
- {
- printBanner();
- first = false;
- }
-
- if (line.indexOf("ERROR") != -1)
- {
- error = true;
- }
-
- System.out.println(line);
- }
- }
-
- }
- finally
- {
- fr.close();
- br.close();
- }
-
- if (error)
- {
- System.exit(1);
- }
- }
-
- private void printBanner()
- {
- System.out.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
- System.out.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
- System.out.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX WARNING! JBoss server instance generated WARN/ERROR log entries: XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXX XXXXXXXX");
- System.out.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
- System.out.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
- System.out.println("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
-
- System.out.println();
-
- }
-
- // Inner classes -------------------------------------------------
-
-}
Deleted: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/FailOnSerializationDebugOutput.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/FailOnSerializationDebugOutput.java 2010-08-20 23:55:54 UTC (rev 9576)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/FailOnSerializationDebugOutput.java 2010-08-21 20:34:23 UTC (rev 9577)
@@ -1,99 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.tests.tools.ant;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-
-/**
- * Throws an exception if it finds a jboss-serialization DEBUG output in the given file
- *
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
- *
- * $Id$
- */
-public class FailOnSerializationDebugOutput
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- public static void main(final String[] args) throws Exception
- {
- new FailOnSerializationDebugOutput(args).run();
- }
-
- // Attributes ----------------------------------------------------
-
- private final File file;
-
- // Constructors --------------------------------------------------
-
- private FailOnSerializationDebugOutput(final String[] args) throws Exception
- {
- if (args.length == 0)
- {
- throw new Exception("Specify the file to grep!");
- }
-
- file = new File(args[0]);
-
- if (!file.canRead())
- {
- throw new Exception("The file " + file + " does not exist or cannot be read");
- }
- }
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private void run() throws Exception
- {
- FileReader fr = new FileReader(file);
- BufferedReader br = new BufferedReader(fr);
- boolean error = false;
-
- try
- {
- String line;
- while ((line = br.readLine()) != null)
- {
- if (line.indexOf("DEBUG") != -1 && line.indexOf("org.jboss.serial") != -1)
- {
- System.out.println("TEST FAILURE: Found serialization DEBUG output on line: " + line);
- System.exit(1);
- }
- }
- }
- finally
- {
- fr.close();
- br.close();
- }
-
- if (error)
- {
- System.exit(1);
- }
- }
-
- // Inner classes -------------------------------------------------
-
-}
Deleted: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/GenerateSmokeReport.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/GenerateSmokeReport.java 2010-08-20 23:55:54 UTC (rev 9576)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/GenerateSmokeReport.java 2010-08-21 20:34:23 UTC (rev 9577)
@@ -1,873 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.jms.tests.tools.ant;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-/**
- * Generates a HTML smoke test report based on raw smoke run data.
- *
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
- *
- * $Id$
- */
-public class GenerateSmokeReport
-{
- // Constants ------------------------------------------------------------------------------------
-
- public static final String DEFAULT_OUTPUT_BASENAME = "smoke-tes-report";
-
- private static final byte INSTALLATION_TEST = 0;
-
- private static final byte CLIENT_COMPATIBILITY_TEST = 1;
-
- private static final byte SERVER_COMPATIBILITY_TEST = 2;
-
- // Static ---------------------------------------------------------------------------------------
-
- public static void main(final String[] args) throws Exception
- {
- new GenerateSmokeReport(args).run();
- }
-
- /**
- * The method generates a new example list in which examples are ordered according to the
- * ordered name list.
- *
- * @param exampleNames - a List<String>.
- * @param orderedNameList - comma (and space) separated ordered example name list
- *
- * @return a copy of the original list.
- */
- public static List order(final List exampleNames, final String orderedNameList)
- {
- List originalList = new ArrayList(exampleNames);
- List orderedList = new ArrayList();
- for (StringTokenizer st = new StringTokenizer(orderedNameList, ", "); st.hasMoreTokens();)
- {
- String ordn = st.nextToken();
- if (originalList.contains(ordn))
- {
- originalList.remove(ordn);
- orderedList.add(ordn);
- }
- }
-
- orderedList.addAll(originalList);
- return orderedList;
- }
-
- // Attributes -----------------------------------------------------------------------------------
-
- private File inputFile;
-
- private File outputDir;
-
- private final String outputFileName;
-
- private File installerDir;
-
- private String orderedNameList;
-
- // Constructors ---------------------------------------------------------------------------------
-
- private GenerateSmokeReport(final String[] args) throws Exception
- {
- String baseName = null;
-
- for (int i = 0; i < args.length; i++)
- {
- if ("-inputfile".equals(args[i]))
- {
- if (i == args.length - 1)
- {
- throw new Exception("File name must follow -inputfile");
- }
- inputFile = new File(args[++i]);
- }
- else if ("-outputdir".equals(args[i]))
- {
- if (i == args.length - 1)
- {
- throw new Exception("Output directory name must follow -outputdir");
- }
- outputDir = new File(args[++i]);
- }
- else if ("-basename".equals(args[i]))
- {
- if (i == args.length - 1)
- {
- throw new Exception("Output file name must follow -name");
- }
- baseName = args[++i];
- }
- else if ("-installerdir".equals(args[i]))
- {
- if (i == args.length - 1)
- {
- throw new Exception("Installer directory must follow -installerdir");
- }
- installerDir = new File(args[++i]);
- }
- else if ("-order".equals(args[i]))
- {
- if (i == args.length - 1)
- {
- throw new Exception("Example name list must follow -order");
- }
- orderedNameList = args[++i];
- }
- else
- {
- throw new Exception("Unknown argument: " + args[i]);
- }
- }
-
- if (inputFile == null)
- {
- throw new Exception("No input file specified");
- }
-
- if (!inputFile.canRead())
- {
- throw new Exception("The input file " + inputFile + " does not exist or cannot be read");
- }
-
- if (outputDir == null)
- {
- // no output directory specified, using the current directory
- outputDir = new File(".");
- }
-
- if (!outputDir.canWrite())
- {
- throw new Exception("The output directory " + outputDir + " is not writable");
- }
-
- if (baseName == null)
- {
- baseName = GenerateSmokeReport.DEFAULT_OUTPUT_BASENAME;
- }
-
- outputFileName = baseName + ".java-" + System.getProperty("java.version") + ".html";
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- private void run() throws Exception
- {
- ReportData d = parseInputFile(inputFile);
- generateReport(d);
- }
-
- private ReportData parseInputFile(final File f) throws Exception
- {
- BufferedReader br = new BufferedReader(new FileReader(f));
- ReportData result = new ReportData();
-
- try
- {
- String line;
- while ((line = br.readLine()) != null)
- {
- int bi = line.indexOf("TEST_TYPE=");
- if (bi == -1)
- {
- throw new Exception("TEST_TYPE= not found in \"" + line + "\"");
- }
- int ei = line.indexOf(' ', bi);
- if (ei == -1)
- {
- ei = line.length();
- }
- String testType = line.substring(bi + 10, ei);
-
- bi = line.indexOf("JBOSS_HOME=");
- if (bi == -1)
- {
- throw new Exception("JBOSS_HOME= not found in \"" + line + "\"");
- }
- ei = line.indexOf(' ', bi);
- if (ei == -1)
- {
- ei = line.length();
- }
- String jbossHome = line.substring(bi + 11, ei);
-
- bi = line.indexOf("JBOSS_CONFIGURATION=");
- if (bi == -1)
- {
- throw new Exception("JBOSS_CONFIGURATION= not found in \"" + line + "\"");
- }
- ei = line.indexOf(' ', bi);
- if (ei == -1)
- {
- ei = line.length();
- }
- String jbossConfiguration = line.substring(bi + 20, ei);
-
- bi = line.indexOf("CLIENT_VERSION=");
- if (bi == -1)
- {
- throw new Exception("CLIENT_VERSION= not found in \"" + line + "\"");
- }
- ei = line.indexOf(' ', bi);
- if (ei == -1)
- {
- ei = line.length();
- }
- String clientVersion = line.substring(bi + 15, ei);
-
- bi = line.indexOf("INSTALLATION_TYPE=");
- if (bi == -1)
- {
- throw new Exception("INSTALLATION_TYPE= not found in \"" + line + "\"");
- }
- ei = line.indexOf(' ', bi);
- if (ei == -1)
- {
- ei = line.length();
- }
- String installationType = line.substring(bi + 18, ei);
-
- bi = line.indexOf("SERVER_ARTIFACT_NAME=");
- if (bi == -1)
- {
- throw new Exception("SERVER_ARTIFACT_NAME= not found in \"" + line + "\"");
- }
- ei = line.indexOf(' ', bi);
- if (ei == -1)
- {
- ei = line.length();
- }
- String serverArtifactName = line.substring(bi + 21, ei);
-
- bi = line.indexOf("EXAMPLE_NAME=");
- if (bi == -1)
- {
- throw new Exception("EXAMPLE_NAME= not found in \"" + line + "\"");
- }
- ei = line.indexOf(' ', bi);
- if (ei == -1)
- {
- ei = line.length();
- }
- String exampleName = line.substring(bi + 13, ei);
-
- bi = line.indexOf("CLUSTERED=");
- if (bi == -1)
- {
- throw new Exception("CLUSTERED= not found in \"" + line + "\"");
- }
- ei = line.indexOf(' ', bi);
- if (ei == -1)
- {
- ei = line.length();
- }
- String clusteredValue = line.substring(bi + 10, ei);
- clusteredValue = clusteredValue.toLowerCase();
- boolean clustered;
- if ("true".equals(clusteredValue))
- {
- clustered = true;
- }
- else if ("false".equals(clusteredValue))
- {
- clustered = false;
- }
- else
- {
- throw new Exception("CLUSTERED must be either 'true' or 'false' but it's " + clusteredValue);
- }
-
- result.addTestRun(testType,
- jbossHome,
- jbossConfiguration,
- clientVersion,
- installationType,
- serverArtifactName,
- exampleName,
- clustered);
- }
- }
- finally
- {
- if (br != null)
- {
- br.close();
- }
- }
- return result;
- }
-
- private void generateReport(final ReportData data) throws Exception
- {
- PrintWriter pw = new PrintWriter(new FileWriter(new File(outputDir, outputFileName)));
-
- try
- {
- pw.println("<html>");
- pw.println("<head><title>HornetQ Smoke Test Results</title></head>");
- pw.println("<body>");
-
- pw.println("<h1>HornetQ Smoke Test Results</h1>");
-
- pw.print("Java version: ");
- pw.print(System.getProperty("java.version"));
- pw.println("<br>");
- pw.print("Run on: ");
- pw.print(new Date());
-
- List installations = new ArrayList(data.getInstallations());
- Collections.sort(installations);
- List examples = new ArrayList(data.getExamples(GenerateSmokeReport.INSTALLATION_TEST));
- if (orderedNameList != null)
- {
- examples = GenerateSmokeReport.order(examples, orderedNameList);
- }
- else
- {
- Collections.sort(examples);
- }
-
- pw.println("<h2>Installation Test Results</h2>");
-
- pw.println("<table border=\"1\" cellpadding=\"2\" cellspacing=\"2\">");
-
- // header
-
- pw.print("<tr>");
- pw.print("<td></td>");
- for (Iterator j = examples.iterator(); j.hasNext();)
- {
- pw.print("<td align=\"center\"><b>");
- pw.print((String)j.next());
- pw.print("</b></td>");
- }
- pw.println("</tr>");
-
- for (Iterator i = installations.iterator(); i.hasNext();)
- {
- JBossInstallation jbi = (JBossInstallation)i.next();
- Set thisExamples = data.getExamples(jbi);
-
- pw.println("<tr>");
- pw.print("<td>");
- pw.print(jbi.toString());
- pw.println("</td>");
-
- for (Iterator j = examples.iterator(); j.hasNext();)
- {
- String exampleName = (String)j.next();
- if (thisExamples.contains(exampleName))
- {
- pw.print("<td bgcolor=\"#00FF00\">");
- pw.print(" OK ");
- pw.println("</td>");
- }
- else
- {
- pw.print("<td bgcolor=\"#C0C0C0\">");
- pw.print(" ");
- pw.println("</td>");
- }
- }
-
- pw.println("</tr>");
- }
-
- pw.println("</table>");
-
- List serverVersions = new ArrayList(data.getServerVersions());
- Collections.sort(serverVersions);
- examples = new ArrayList(data.getExamples(GenerateSmokeReport.CLIENT_COMPATIBILITY_TEST));
- Collections.sort(examples);
-
- pw.println("<h2>Client Compatibility Test Results</h2>");
-
- pw.println("<table border=\"1\" cellpadding=\"2\" cellspacing=\"2\">");
-
- // header
-
- pw.print("<tr>");
- pw.print("<td></td>");
- for (Iterator j = examples.iterator(); j.hasNext();)
- {
- pw.print("<td align=\"center\"><b>");
- pw.print((String)j.next());
- pw.print("</b></td>");
- }
- pw.println("</tr>");
-
- for (Iterator i = serverVersions.iterator(); i.hasNext();)
- {
- String serverVersion = (String)i.next();
- Set thisExamples = data.getExamples(true, serverVersion);
-
- pw.println("<tr>");
- pw.print("<td>");
- pw.print(serverVersion);
- pw.println("</td>");
-
- for (Iterator j = examples.iterator(); j.hasNext();)
- {
- String exampleName = (String)j.next();
- if (thisExamples.contains(exampleName))
- {
- pw.print("<td bgcolor=\"#00FF00\">");
- pw.print(" OK ");
- pw.println("</td>");
- }
- else
- {
- pw.print("<td bgcolor=\"#C0C0C0\">");
- pw.print(" ");
- pw.println("</td>");
- }
- }
-
- pw.println("</tr>");
- }
-
- pw.println("</table>");
-
- List clientVersions = new ArrayList(data.getClientVersions());
- Collections.sort(clientVersions);
- examples = new ArrayList(data.getExamples(GenerateSmokeReport.SERVER_COMPATIBILITY_TEST));
- Collections.sort(examples);
-
- pw.println("<h2>Server Compatibility Test Results</h2>");
-
- pw.println("<table border=\"1\" cellpadding=\"2\" cellspacing=\"2\">");
-
- // header
-
- pw.print("<tr>");
- pw.print("<td></td>");
- for (Iterator j = examples.iterator(); j.hasNext();)
- {
- pw.print("<td align=\"center\"><b>");
- pw.print((String)j.next());
- pw.print("</b></td>");
- }
- pw.println("</tr>");
-
- for (Iterator i = clientVersions.iterator(); i.hasNext();)
- {
- String clientVersion = (String)i.next();
- Set thisExamples = data.getExamples(false, clientVersion);
-
- pw.println("<tr>");
- pw.print("<td>");
- pw.print(clientVersion);
- pw.println("</td>");
-
- for (Iterator j = examples.iterator(); j.hasNext();)
- {
- String exampleName = (String)j.next();
- if (thisExamples.contains(exampleName))
- {
- pw.print("<td bgcolor=\"#00FF00\">");
- pw.print(" OK ");
- pw.println("</td>");
- }
- else
- {
- pw.print("<td bgcolor=\"#C0C0C0\">");
- pw.print(" ");
- pw.println("</td>");
- }
- }
-
- pw.println("</tr>");
- }
-
- pw.println("</table>");
-
- pw.println("</body>");
- pw.println("</html>");
- }
- finally
- {
- if (pw != null)
- {
- pw.close();
- }
- }
- }
-
- // Inner classes -------------------------------------------------
-
- private class ReportData
- {
- // <jbossInstallation - Set<examples>>
- private final Map installationTests;
-
- // <serverVersion - Set<example>>
- private final Map clientCompatibilityTests;
-
- // <clientVersion - Set<example>>
- private final Map serverCompatibilityTests;
-
- private ReportData()
- {
- installationTests = new HashMap();
- clientCompatibilityTests = new HashMap();
- serverCompatibilityTests = new HashMap();
- }
-
- public void addTestRun(final String testType,
- final String jbossHome,
- final String jbossConfiguration,
- final String clientVersion,
- final String installationType,
- final String serverArtifactName,
- final String exampleName,
- final boolean clustered) throws Exception
- {
- if ("installation".equals(testType))
- {
- addInstallationTestRun(jbossHome, installationType, serverArtifactName, exampleName, clustered);
- }
- else if ("client.compatibility".equals(testType))
- {
- addClientCompatibilityTestRun(jbossConfiguration, exampleName);
- }
- else if ("server.compatibility".equals(testType))
- {
- addServerCompatibilityTestRun(clientVersion, exampleName);
- }
- else
- {
- throw new Exception("Unknown test type: " + testType);
- }
- }
-
- public Set getInstallations()
- {
- return installationTests.keySet();
- }
-
- public Set getServerVersions()
- {
- return clientCompatibilityTests.keySet();
- }
-
- public Set getClientVersions()
- {
- return serverCompatibilityTests.keySet();
- }
-
- public Set getExamples(final JBossInstallation jbi)
- {
- return (Set)installationTests.get(jbi);
- }
-
- public Set getExamples(final boolean clientTest, final String version)
- {
- if (clientTest)
- {
- return (Set)clientCompatibilityTests.get(version);
- }
- return (Set)serverCompatibilityTests.get(version);
- }
-
- /**
- * @return all examples for which at least a test was recorded
- */
- public Set getExamples(final byte testType)
- {
- Set examples = new HashSet();
- Collection values = testType == GenerateSmokeReport.INSTALLATION_TEST ? installationTests.values()
- : testType == GenerateSmokeReport.CLIENT_COMPATIBILITY_TEST ? clientCompatibilityTests.values()
- : serverCompatibilityTests.values();
- for (Iterator i = values.iterator(); i.hasNext();)
- {
- Set s = (Set)i.next();
- examples.addAll(s);
- }
- return examples;
- }
-
- private void addInstallationTestRun(final String jbossHome,
- final String installationType,
- final String serverArtifactName,
- final String exampleName,
- final boolean clustered) throws Exception
- {
- String jbossVersion;
- boolean installerGenerated = false;
- boolean standalone = false;
- boolean scoped = false;
-
- int idx = jbossHome.lastIndexOf("jboss-");
- if (idx == -1)
- {
- throw new Exception("Cannot determine JBoss version from " + jbossHome);
- }
- jbossVersion = jbossHome.substring(idx + 6);
-
- // determine if it's an "installer" generated installation
-
- File parent = new File(jbossHome).getParentFile();
- while (parent != null)
- {
- if (parent.equals(installerDir))
- {
- installerGenerated = true;
- break;
- }
-
- parent = parent.getParentFile();
- }
-
- // determine if is a "standalone" installation
-
- if ("standalone".equals(installationType))
- {
- standalone = true;
- }
-
- // determine if it's scoped or not
- scoped = serverArtifactName.indexOf("-scoped") != -1;
-
- JBossInstallation jbi = new JBossInstallation(jbossVersion, installerGenerated, standalone, scoped, clustered);
-
- Set examples = (Set)installationTests.get(jbi);
-
- if (examples == null)
- {
- examples = new HashSet();
- installationTests.put(jbi, examples);
- }
-
- if (examples.contains(exampleName))
- {
- throw new Exception("Duplicate installation run: " + jbi + ", " + exampleName);
- }
- examples.add(exampleName);
- }
-
- private void addClientCompatibilityTestRun(final String jbossConfiguration, final String exampleName) throws Exception
- {
- if (!jbossConfiguration.startsWith("messaging-"))
- {
- throw new Exception("Invalid JBoss configuration name for a " + "client compatibility test: " +
- jbossConfiguration);
- }
-
- String serverVersion = jbossConfiguration.substring(10);
-
- Set examples = (Set)clientCompatibilityTests.get(serverVersion);
- if (examples == null)
- {
- examples = new HashSet();
- clientCompatibilityTests.put(serverVersion, examples);
- }
-
- if (examples.contains(exampleName))
- {
- throw new Exception("Duplicate client compatibility run: " + exampleName +
- " on " +
- serverVersion +
- " server");
- }
- examples.add(exampleName);
- }
-
- private void addServerCompatibilityTestRun(final String clientVersion, final String exampleName) throws Exception
- {
- Set examples = (Set)serverCompatibilityTests.get(clientVersion);
- if (examples == null)
- {
- examples = new HashSet();
- serverCompatibilityTests.put(clientVersion, examples);
- }
-
- if (examples.contains(exampleName))
- {
- throw new Exception("Duplicate server compatibility run: " + exampleName +
- " with " +
- clientVersion +
- " client");
- }
- examples.add(exampleName);
- }
- }
-
- private class JBossInstallation implements Comparable
- {
-
- private final String version;
-
- private final boolean installerGenerated;
-
- private final boolean standalone;
-
- private final boolean scoped;
-
- private final boolean clustered;
-
- private JBossInstallation(final String version,
- final boolean installerGenerated,
- final boolean standalone,
- final boolean scoped,
- final boolean clustered)
- {
- this.version = version;
- this.installerGenerated = installerGenerated;
- this.standalone = standalone;
- this.scoped = scoped;
- this.clustered = clustered;
- }
-
- public int compareTo(final Object o)
- {
- JBossInstallation that = (JBossInstallation)o;
-
- int result = version.compareTo(that.version);
-
- if (result != 0)
- {
- return result;
- }
-
- int thisScore = (isClustered() ? 1000 : 0) + (isStandalone() ? 100 : 0) +
- (isInstallerGenerated() ? 10 : 0) +
- (isScoped() ? 1 : 0);
-
- int thatScore = (that.isClustered() ? 1000 : 0) + (that.isStandalone() ? 100 : 0) +
- (that.isInstallerGenerated() ? 10 : 0) +
- (that.isScoped() ? 1 : 0);
-
- return thisScore - thatScore;
- }
-
- public String getVersion()
- {
- return version;
- }
-
- public boolean isInstallerGenerated()
- {
- return installerGenerated;
- }
-
- public boolean isStandalone()
- {
- return standalone;
- }
-
- public boolean isScoped()
- {
- return scoped;
- }
-
- public boolean isClustered()
- {
- return clustered;
- }
-
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
-
- if (!(o instanceof JBossInstallation))
- {
- return false;
- }
-
- JBossInstallation that = (JBossInstallation)o;
-
- return version.equals(that.version) && installerGenerated == that.installerGenerated &&
- standalone == that.standalone &&
- scoped == that.scoped &&
- clustered == that.clustered;
- }
-
- public int hashCode()
- {
- return version.hashCode() + (installerGenerated ? 17 : 0) +
- (standalone ? 37 : 0) +
- (scoped ? 57 : 0) +
- (clustered ? 129 : 0);
- }
-
- public String toString()
- {
- StringBuffer sb = new StringBuffer();
- sb.append(version);
- sb.append(" (");
-
- if (standalone)
- {
- sb.append("standalone");
- }
- else
- {
- if (scoped)
- {
- sb.append("scoped");
- }
- else
- {
- sb.append("non-scoped");
- }
- }
-
- if (installerGenerated)
- {
- sb.append(", installer generated");
- }
-
- if (clustered)
- {
- sb.append(", CLUSTERED");
- }
- else
- {
- sb.append(", not manageConfirmations");
- }
-
- sb.append(")");
-
- return sb.toString();
- }
-
- }
-}
13 years, 9 months
JBoss hornetq SVN: r9576 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-20 19:55:54 -0400 (Fri, 20 Aug 2010)
New Revision: 9576
Modified:
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
Log:
id on journal import should be long
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-20 22:30:33 UTC (rev 9575)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-20 23:55:54 UTC (rev 9576)
@@ -279,7 +279,7 @@
protected static RecordInfo parseRecord(Properties properties) throws Exception
{
- int id = parseInt("id", properties);
+ long id = parseLong("id", properties);
byte userRecordType = parseByte("userRecordType", properties);
boolean isUpdate = parseBoolean("isUpdate", properties);
byte[] data = parseEncoding("data", properties);
13 years, 9 months
JBoss hornetq SVN: r9575 - in branches/Branch_Large_Message_Compression: src/main/org/hornetq/api/core and 19 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-20 18:30:33 -0400 (Fri, 20 Aug 2010)
New Revision: 9575
Added:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
Modified:
branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd
branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
Initial implementation
Modified: branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/config/common/schema/hornetq-jms.xsd 2010-08-20 22:30:33 UTC (rev 9575)
@@ -75,7 +75,11 @@
</xsd:element>
<xsd:element name="min-large-message-size" type="xsd:long"
maxOccurs="1" minOccurs="0">
- </xsd:element>
+ </xsd:element>
+ <xsd:element name="compress-large-messages" type="xsd:boolean"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
+
<xsd:element name="client-id" type="xsd:string"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/Message.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -65,6 +65,8 @@
public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID");
public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");
+
+ public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");
public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -252,6 +252,27 @@
* @param minLargeMessageSize large message size threshold in bytes
*/
void setMinLargeMessageSize(int minLargeMessageSize);
+
+
+ /**
+ * If this attribute is set to true, the message body will be compressed when sent as large message.
+ *
+ * the compression will be done using the GZIP protocol.
+ *
+ *
+ * @param compressLargeMessage
+ */
+ void setCompressLargeMessages(boolean compressLargeMessage);
+
+ /**
+ * If this attribute is set to true, the message body will be compressed when sent as large message.
+ *
+ * the compression will be done using the GZIP protocol.
+ *
+ *
+ * @param compressLargeMessage
+ */
+ boolean isCompressLargeMessages();
/**
* Returns the window size for flow control of the consumers created through this factory.
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -41,6 +41,8 @@
// Any message beyond this size is considered a large message (to be sent in chunks)
public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
+
+ public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.utils.DecompressedLargeMessageBuffer;
import org.hornetq.utils.Future;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
@@ -444,6 +445,11 @@
// ClientConsumerInternal implementation
// --------------------------------------------------------------
+ public ClientSessionInternal getSession()
+ {
+ return session;
+ }
+
public SessionQueueQueryResponseMessage getQueueInfo()
{
return queueInfo;
@@ -544,7 +550,14 @@
currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
- currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+ if (currentChunkMessage.isCompressed())
+ {
+ currentChunkMessage.setBuffer(new DecompressedLargeMessageBuffer(currentLargeMessageBuffer, session.getThreadPool()));
+ }
+ else
+ {
+ currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+ }
currentChunkMessage.setFlowControlSize(0);
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -67,4 +67,6 @@
void start();
SessionQueueQueryResponseMessage getQueueInfo();
+
+ ClientSessionInternal getSession();
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -21,11 +21,13 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.HornetQBufferInputStream;
/**
*
@@ -117,6 +119,11 @@
{
return largeMessage;
}
+
+ public boolean isCompressed()
+ {
+ return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
+ }
/**
* @param largeMessage the largeMessage to set
@@ -142,7 +149,6 @@
"]";
}
- // FIXME - only used for large messages - move it!
/* (non-Javadoc)
* @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
*/
@@ -150,7 +156,7 @@
{
if (largeMessage)
{
- ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
+ ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
}
else
{
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -44,4 +44,6 @@
void discardLargeBody();
void setBuffer(HornetQBuffer buffer);
+
+ boolean isCompressed();
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,8 +13,16 @@
package org.hornetq.core.client.impl;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.Executor;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -28,6 +36,8 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.HornetQBufferInputStream;
import org.hornetq.utils.TokenBucketLimiter;
import org.hornetq.utils.UUIDGenerator;
@@ -150,7 +160,7 @@
{
return;
}
-
+
doCleanup();
}
@@ -190,7 +200,7 @@
{
return credits;
}
-
+
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
@@ -203,7 +213,7 @@
{
session.returnCredits(address);
}
-
+
session.removeProducer(this);
closed = true;
@@ -212,12 +222,13 @@
private void doSend(final SimpleString address, final Message msg) throws HornetQException
{
MessageInternal msgI = (MessageInternal)msg;
-
+
ClientProducerCredits theCredits;
-
+
boolean isLarge;
- if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
+ if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
+ msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
{
isLarge = true;
}
@@ -236,7 +247,7 @@
{
msg.setAddress(address);
}
-
+
// Anonymous
theCredits = session.getCredits(address, true);
}
@@ -250,7 +261,7 @@
{
msg.setAddress(this.address);
}
-
+
theCredits = credits;
}
@@ -270,8 +281,6 @@
session.workDone();
-
-
if (isLarge)
{
largeMessageSend(sendBlocking, msgI, theCredits);
@@ -322,8 +331,16 @@
* @param msgI
* @throws HornetQException
*/
- private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
+ private void largeMessageSend(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final ClientProducerCredits credits) throws HornetQException
{
+
+ if (session.isCompressLargeMessages())
+ {
+ msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
+ }
+
int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
if (headerSize >= minLargeMessageSize)
@@ -341,7 +358,6 @@
HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
msgI.encodeHeadersAndProperties(headerBuffer);
-
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
channel.send(initialChunk);
@@ -358,7 +374,7 @@
if (input != null)
{
- largeMessageSendStreamed(sendBlocking, input, credits);
+ largeMessageSendStreamed(sendBlocking, msgI, input, credits);
}
else
{
@@ -375,72 +391,29 @@
final MessageInternal msgI,
final ClientProducerCredits credits) throws HornetQException
{
- BodyEncoder context = msgI.getBodyEncoder();
-
- final long bodySize = context.getLargeBodySize();
-
- context.open();
- try
- {
-
- for (int pos = 0; pos < bodySize;)
- {
- final boolean lastChunk;
-
- final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
-
- final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
-
- context.encode(bodyBuffer, chunkLength);
-
- pos += chunkLength;
-
- lastChunk = pos >= bodySize;
-
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
- .array(),
- !lastChunk,
- lastChunk && sendBlocking);
-
- if (sendBlocking && lastChunk)
- {
- // When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunk);
- }
- else
- {
- channel.send(chunk);
- }
-
- try
- {
- credits.acquireCredits(chunk.getPacketSize());
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- finally
- {
- context.close();
- }
+ msgI.getBodyBuffer().readerIndex(0);
+ largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits);
}
/**
- * TODO: This method could be eliminated and
- * combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}.
- * All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
* @param sendBlocking
* @param input
* @throws HornetQException
*/
private void largeMessageSendStreamed(final boolean sendBlocking,
- final InputStream input,
+ final MessageInternal msgI,
+ final InputStream inputStreamParameter,
final ClientProducerCredits credits) throws HornetQException
{
boolean lastPacket = false;
+ InputStream input = inputStreamParameter;
+
+ if (session.isCompressLargeMessages())
+ {
+ input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
+ }
+
while (!lastPacket)
{
byte[] buff = new byte[minLargeMessageSize];
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -105,6 +105,8 @@
private long callTimeout;
private int minLargeMessageSize;
+
+ private boolean compressLargeMessages;
private int consumerWindowSize;
@@ -308,6 +310,8 @@
minLargeMessageSize = other.getMinLargeMessageSize();
+ compressLargeMessages = other.isCompressLargeMessages();
+
consumerWindowSize = other.getConsumerWindowSize();
consumerMaxRate = other.getConsumerMaxRate();
@@ -370,6 +374,8 @@
callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -465,16 +471,6 @@
// ClientSessionFactory implementation------------------------------------------------------------
- public synchronized boolean isCacheLargeMessagesClient()
- {
- return cacheLargeMessagesClient;
- }
-
- public synchronized void setCacheLargeMessagesClient(final boolean cached)
- {
- cacheLargeMessagesClient = cached;
- }
-
public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getStaticConnectors()
{
return staticConnectors;
@@ -531,10 +527,31 @@
this.minLargeMessageSize = minLargeMessageSize;
}
+ public synchronized boolean isCacheLargeMessagesClient()
+ {
+ checkWrite();
+ return cacheLargeMessagesClient;
+ }
+
+ public synchronized void setCacheLargeMessagesClient(final boolean cached)
+ {
+ cacheLargeMessagesClient = cached;
+ }
+
public synchronized int getConsumerWindowSize()
{
return consumerWindowSize;
}
+
+ public synchronized void setCompressLargeMessages(final boolean compress)
+ {
+ this.compressLargeMessages = compress;
+ }
+
+ public synchronized boolean isCompressLargeMessages()
+ {
+ return compressLargeMessages;
+ }
public synchronized void setConsumerWindowSize(final int consumerWindowSize)
{
@@ -1129,6 +1146,7 @@
ackBatchSize,
cacheLargeMessagesClient,
minLargeMessageSize,
+ compressLargeMessages,
blockOnAcknowledge,
autoGroup,
confirmationWindowSize,
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -154,6 +154,8 @@
private final boolean blockOnDurableSend;
private final int minLargeMessageSize;
+
+ private final boolean compressLargeMessages;
private volatile int initialMessagePacketSize;
@@ -204,6 +206,7 @@
final boolean blockOnDurableSend,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int initialMessagePacketSize,
final String groupID,
final CoreRemotingConnection remotingConnection,
@@ -256,6 +259,8 @@
this.cacheLargeMessageClient = cacheLargeMessageClient;
this.minLargeMessageSize = minLargeMessageSize;
+
+ this.compressLargeMessages = compressLargeMessages;
this.initialMessagePacketSize = initialMessagePacketSize;
@@ -267,6 +272,15 @@
// ClientSession implementation
// -----------------------------------------------------------------
+ /**
+ * This will be used for instance when compressin large messages.
+ * the compression has to be done through a PipedOutputStream, and that needs to be done on a different thread
+ */
+ public Executor getThreadPool()
+ {
+ return failoverManager.getThreadPool();
+ }
+
public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
{
internalCreateQueue(address, queueName, null, false, false);
@@ -664,6 +678,11 @@
{
return minLargeMessageSize;
}
+
+ public boolean isCompressLargeMessages()
+ {
+ return compressLargeMessages;
+ }
/**
* @return the cacheLargeMessageClient
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,6 +13,8 @@
package org.hornetq.core.client.impl;
+import java.util.concurrent.Executor;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -39,6 +41,8 @@
boolean isCacheLargeMessageClient();
int getMinLargeMessageSize();
+
+ boolean isCompressLargeMessages();
void expire(long consumerID, long messageID) throws HornetQException;
@@ -85,4 +89,6 @@
void setAddress(Message message, SimpleString address);
void setPacketSize(int packetSize);
+
+ Executor getThreadPool();
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -14,6 +14,7 @@
package org.hornetq.core.client.impl;
import java.util.Set;
+import java.util.concurrent.Executor;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -555,4 +556,20 @@
{
session.setPacketSize(packetSize);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientSessionInternal#isCompressLargeMessages()
+ */
+ public boolean isCompressLargeMessages()
+ {
+ return session.isCompressLargeMessages();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientSessionInternal#getThreadPool()
+ */
+ public Executor getThreadPool()
+ {
+ return session.getThreadPool();
+ }
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -13,6 +13,8 @@
package org.hornetq.core.client.impl;
+import java.util.concurrent.Executor;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SessionFailureListener;
@@ -38,6 +40,7 @@
final int ackBatchSize,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final boolean blockOnAcknowledge,
final boolean autoGroup,
final int confirmationWindowSize,
@@ -63,4 +66,6 @@
boolean removeFailureListener(SessionFailureListener listener);
void causeExit();
+
+ Executor getThreadPool();
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -297,8 +297,13 @@
handleConnectionFailure(connectionID, me);
}
- // ConnectionManager implementation ------------------------------------------------------------------
+ // FailoverManager implementation ------------------------------------------------------------------
+ public Executor getThreadPool()
+ {
+ return threadPool;
+ }
+
public ClientSession createSession(final String username,
final String password,
final boolean xa,
@@ -308,6 +313,7 @@
final int ackBatchSize,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final boolean blockOnAcknowledge,
final boolean autoGroup,
final int confWindowSize,
@@ -457,6 +463,7 @@
blockOnDurableSend,
cacheLargeMessageClient,
minLargeMessageSize,
+ compressLargeMessages,
initialMessagePacketSize,
groupID,
theConnection,
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
/**
* A LargeMessageBufferInternal
@@ -55,6 +56,8 @@
* Saves this buffer to the specified output.
*/
void saveBuffer(final OutputStream output) throws HornetQException;
+
+ public void addPacket(final SessionReceiveContinuationMessage packet);
/**
* Waits for the completion for the specified waiting time (in milliseconds).
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -648,6 +648,13 @@
}
}
+ /**
+ * @param compressLargeMessages
+ */
+ public void setCompressLargeMessages(boolean compressLargeMessages)
+ {
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -198,6 +198,7 @@
long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
+ boolean compressLargeMessage,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
@@ -235,6 +236,7 @@
long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
+ boolean compressLargeMessages,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -106,6 +106,10 @@
int getMinLargeMessageSize();
void setMinLargeMessageSize(int minLargeMessageSize);
+
+ boolean isCompressLargeMessages();
+
+ void setCompressLargeMessages(boolean compress);
int getConsumerWindowSize();
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -66,6 +66,8 @@
private boolean cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
private int minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ private boolean compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
private int consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -306,6 +308,16 @@
this.minLargeMessageSize = minLargeMessageSize;
}
+ public boolean isCompressLargeMessages()
+ {
+ return compressLargeMessage;
+ }
+
+ public void setCompressLargeMessages(final boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
public int getConsumerWindowSize()
{
return consumerWindowSize;
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -254,6 +254,11 @@
"min-large-message-size",
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
Validators.GT_ZERO);
+
+ boolean compressLargeMessages = XMLConfigurationUtil.getBoolean(e,
+ "compress-large-messages",
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES);
+
boolean blockOnAcknowledge = XMLConfigurationUtil.getBoolean(e,
"block-on-acknowledge",
HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
@@ -387,6 +392,7 @@
cfConfig.setCallTimeout(callTimeout);
cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cfConfig.setMinLargeMessageSize(minLargeMessageSize);
+ cfConfig.setCompressLargeMessages(compressLargeMessages);
cfConfig.setConsumerWindowSize(consumerWindowSize);
cfConfig.setConsumerMaxRate(consumerMaxRate);
cfConfig.setConfirmationWindowSize(confirmationWindowSize);
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -711,6 +711,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessage,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -747,6 +748,7 @@
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessage);
configuration.setConsumerWindowSize(consumerWindowSize);
configuration.setConsumerMaxRate(consumerMaxRate);
configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -786,6 +788,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -827,6 +830,7 @@
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessages);
configuration.setConsumerWindowSize(consumerWindowSize);
configuration.setConsumerMaxRate(consumerMaxRate);
configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -926,6 +930,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -964,6 +969,7 @@
cf.setCallTimeout(callTimeout);
cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
+ cf.setCompressLargeMessages(compressLargeMessages);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
cf.setConfirmationWindowSize(confirmationWindowSize);
@@ -999,6 +1005,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -1034,6 +1041,7 @@
cf.setCallTimeout(callTimeout);
cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
+ cf.setCompressLargeMessages(compressLargeMessages);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
cf.setConfirmationWindowSize(confirmationWindowSize);
@@ -1169,6 +1177,7 @@
cfConfig.getCallTimeout(),
cfConfig.isCacheLargeMessagesClient(),
cfConfig.getMinLargeMessageSize(),
+ cfConfig.isCompressLargeMessages(),
cfConfig.getConsumerWindowSize(),
cfConfig.getConsumerMaxRate(),
cfConfig.getConfirmationWindowSize(),
@@ -1203,6 +1212,7 @@
cfConfig.getCallTimeout(),
cfConfig.isCacheLargeMessagesClient(),
cfConfig.getMinLargeMessageSize(),
+ cfConfig.isCompressLargeMessages(),
cfConfig.getConsumerWindowSize(),
cfConfig.getConsumerMaxRate(),
cfConfig.getConfirmationWindowSize(),
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,1116 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.LargeMessageBufferImpl;
+import org.hornetq.core.client.impl.LargeMessageBufferInternal;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * A DecompressedHornetQBuffer
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DecompressedLargeMessageBuffer implements LargeMessageBufferInternal
+{
+
+ // Constants -----------------------------------------------------
+
+ /**
+ *
+ */
+ private static final String OPERATION_NOT_SUPPORTED = "Operation not supported";
+
+ private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported";
+
+ // Attributes ----------------------------------------------------
+
+
+ final LargeMessageBufferInternal bufferDelegate;
+
+ final Executor threadPool;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate, final Executor threadPool)
+ {
+ this.bufferDelegate = bufferDelegate;
+ this.threadPool = threadPool;
+ }
+
+
+ // Public --------------------------------------------------------
+
+ /**
+ *
+ */
+ public void discardUnusedPackets()
+ {
+ bufferDelegate.discardUnusedPackets();
+ }
+
+ /**
+ * Add a buff to the List, or save it to the OutputStream if set
+ * @param packet
+ */
+ public void addPacket(final SessionReceiveContinuationMessage packet)
+ {
+ bufferDelegate.addPacket(packet);
+ }
+
+ public synchronized void cancel()
+ {
+ bufferDelegate.cancel();
+ }
+
+ public synchronized void close()
+ {
+ bufferDelegate.cancel();
+ }
+
+ public void setOutputStream(final OutputStream output) throws HornetQException
+ {
+ try
+ {
+ PipedOutputStream pipeOut = new PipedOutputStream();
+ PipedInputStream pipeIn = new PipedInputStream();
+
+ pipeOut.connect(pipeIn);
+
+ GZipUtil.pipeGZip(pipeIn, false, threadPool);
+
+ bufferDelegate.setOutputStream(pipeOut);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+ }
+
+ public synchronized void saveBuffer(final OutputStream output) throws HornetQException
+ {
+ setOutputStream(output);
+ waitCompletion(0);
+ }
+
+ /**
+ *
+ * @param timeWait Milliseconds to Wait. 0 means forever
+ * @throws Exception
+ */
+ public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
+ {
+ return bufferDelegate.waitCompletion(timeWait);
+ }
+
+ // Channel Buffer Implementation ---------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#array()
+ */
+ public byte[] array()
+ {
+ throw new IllegalAccessError("array not supported on LargeMessageBufferImpl");
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#capacity()
+ */
+ public int capacity()
+ {
+ return -1;
+ }
+
+ DataInputStream dataInput = null;
+
+ private DataInputStream getStream()
+ {
+ if (dataInput == null)
+ {
+ try
+ {
+ InputStream input = new HornetQBufferInputStream(bufferDelegate);
+
+ dataInput = new DataInputStream(GZipUtil.pipeGZip(input, false, threadPool));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+
+ }
+ return dataInput;
+ }
+
+ private void positioningNotSupported()
+ {
+ throw new IllegalStateException("Position not supported over compressed large messages");
+ }
+
+ public byte readByte()
+ {
+ try
+ {
+ return getStream().readByte();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getByte(int)
+ */
+ public byte getByte(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ private byte getByte(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void getBytes(final int index, final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void getBytes(final long index, final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, byte[], int, int)
+ */
+ public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.ByteBuffer)
+ */
+ public void getBytes(final int index, final ByteBuffer dst)
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final ByteBuffer dst)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.io.OutputStream, int)
+ */
+ public void getBytes(final int index, final OutputStream out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final OutputStream out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.channels.GatheringByteChannel, int)
+ */
+ public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getInt(int)
+ */
+ public int getInt(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public int getInt(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getLong(int)
+ */
+ public long getLong(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public long getLong(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getShort(int)
+ */
+ public short getShort(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public short getShort(final long index)
+ {
+ return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getUnsignedMedium(int)
+ */
+ public int getUnsignedMedium(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+
+
+ public int getUnsignedMedium(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setByte(int, byte)
+ */
+ public void setByte(final int index, final byte value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void setBytes(final int index, final HornetQBuffer src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, byte[], int, int)
+ */
+ public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.ByteBuffer)
+ */
+ public void setBytes(final int index, final ByteBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.io.InputStream, int)
+ */
+ public int setBytes(final int index, final InputStream in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.channels.ScatteringByteChannel, int)
+ */
+ public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setInt(int, int)
+ */
+ public void setInt(final int index, final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setLong(int, long)
+ */
+ public void setLong(final int index, final long value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setMedium(int, int)
+ */
+ public void setMedium(final int index, final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setShort(int, short)
+ */
+ public void setShort(final int index, final short value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#toByteBuffer(int, int)
+ */
+ public ByteBuffer toByteBuffer(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#toString(int, int, java.lang.String)
+ */
+ public String toString(final int index, final int length, final String charsetName)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int readerIndex()
+ {
+ // TODO
+ return 0;
+ }
+
+ public void readerIndex(final int readerIndex)
+ {
+ // TODO
+ }
+
+ public int writerIndex()
+ {
+ // TODO
+ return 0;
+ }
+
+ public long getSize()
+ {
+ // TODO
+ return 0;
+ }
+
+ public void writerIndex(final int writerIndex)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setIndex(final int readerIndex, final int writerIndex)
+ {
+ positioningNotSupported();
+ }
+
+ public void clear()
+ {
+ }
+
+ public boolean readable()
+ {
+ return true;
+ }
+
+ public boolean writable()
+ {
+ return false;
+ }
+
+ public int readableBytes()
+ {
+ return 1;
+ }
+
+ public int writableBytes()
+ {
+ return 0;
+ }
+
+ public void markReaderIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void resetReaderIndex()
+ {
+ // TODO: reset positioning if possible
+ }
+
+ public void markWriterIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void resetWriterIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void discardReadBytes()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public short getUnsignedByte(final int index)
+ {
+ return (short)(getByte(index) & 0xFF);
+ }
+
+ public int getUnsignedShort(final int index)
+ {
+ return getShort(index) & 0xFFFF;
+ }
+
+ public int getMedium(final int index)
+ {
+ int value = getUnsignedMedium(index);
+ if ((value & 0x800000) != 0)
+ {
+ value |= 0xff000000;
+ }
+ return value;
+ }
+
+ public long getUnsignedInt(final int index)
+ {
+ return getInt(index) & 0xFFFFFFFFL;
+ }
+
+ public void getBytes(int index, final byte[] dst)
+ {
+ // TODO: optimize this by using System.arraycopy
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = getByte(index++);
+ }
+ }
+
+ public void getBytes(long index, final byte[] dst)
+ {
+ // TODO: optimize this by using System.arraycopy
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = getByte(index++);
+ }
+ }
+
+ public void getBytes(final int index, final HornetQBuffer dst)
+ {
+ getBytes(index, dst, dst.writableBytes());
+ }
+
+ public void getBytes(final int index, final HornetQBuffer dst, final int length)
+ {
+ if (length > dst.writableBytes())
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ getBytes(index, dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+ public void setBytes(final int index, final byte[] src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setBytes(final int index, final HornetQBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setBytes(final int index, final HornetQBuffer src, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setZero(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public short readUnsignedByte()
+ {
+ try
+ {
+ return (short)getStream().readUnsignedByte();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public short readShort()
+ {
+ try
+ {
+ return (short)getStream().readShort();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public int readUnsignedShort()
+ {
+ try
+ {
+ return (int)getStream().readUnsignedShort();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public int readMedium()
+ {
+ int value = readUnsignedMedium();
+ if ((value & 0x800000) != 0)
+ {
+ value |= 0xff000000;
+ }
+ return value;
+ }
+
+
+ public int readUnsignedMedium()
+ {
+ return (readByte() & 0xff) << 16 | (readByte() & 0xff) << 8 | (readByte() & 0xff) << 0;
+ }
+
+ public int readInt()
+ {
+ try
+ {
+ return getStream().readInt();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public int readInt(final int pos)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public long readUnsignedInt()
+ {
+ return readInt() & 0xFFFFFFFFL;
+ }
+
+ public long readLong()
+ {
+ try
+ {
+ return getStream().readLong();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void readBytes(final byte[] dst, final int dstIndex, final int length)
+ {
+ try
+ {
+ getStream().read(dst, dstIndex, length);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void readBytes(final byte[] dst)
+ {
+ readBytes(dst, 0, dst.length);
+ }
+
+ public void readBytes(final HornetQBuffer dst)
+ {
+ readBytes(dst, dst.writableBytes());
+ }
+
+ public void readBytes(final HornetQBuffer dst, final int length)
+ {
+ if (length > dst.writableBytes())
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ readBytes(dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+ public void readBytes(final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ byte[] destBytes = new byte[length];
+ readBytes(destBytes);
+ dst.setBytes(dstIndex, destBytes);
+ }
+
+ public void readBytes(final ByteBuffer dst)
+ {
+ byte bytesToGet[] = new byte[dst.remaining()];
+ readBytes(bytesToGet);
+ dst.put(bytesToGet);
+ }
+
+ public int readBytes(final GatheringByteChannel out, final int length) throws IOException
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+ public void readBytes(final OutputStream out, final int length) throws IOException
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+ public void skipBytes(final int length)
+ {
+
+ try
+ {
+ for (int i = 0 ; i < length; i++)
+ {
+ getStream().read();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void writeByte(final byte value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeShort(final short value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeMedium(final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeInt(final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeLong(final long value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final byte[] src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final byte[] src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final ByteBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int writeBytes(final InputStream in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeZero(final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer toByteBuffer()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer[] toByteBuffers()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer[] toByteBuffers(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public String toString(final String charsetName)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public Object getUnderlyingBuffer()
+ {
+ return this;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readBoolean()
+ */
+ public boolean readBoolean()
+ {
+ return readByte() != 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readChar()
+ */
+ public char readChar()
+ {
+ return (char)readShort();
+ }
+
+ public char getChar(final int index)
+ {
+ return (char)getShort(index);
+ }
+
+ public double getDouble(final int index)
+ {
+ return Double.longBitsToDouble(getLong(index));
+ }
+
+ public float getFloat(final int index)
+ {
+ return Float.intBitsToFloat(getInt(index));
+ }
+
+ public HornetQBuffer readBytes(final int length)
+ {
+ byte bytesToGet[] = new byte[length];
+ readBytes(bytesToGet);
+ return HornetQBuffers.wrappedBuffer(bytesToGet);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readDouble()
+ */
+ public double readDouble()
+ {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readFloat()
+ */
+ public float readFloat()
+ {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableSimpleString()
+ */
+ public SimpleString readNullableSimpleString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readSimpleString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableString()
+ */
+ public String readNullableString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readSimpleString()
+ */
+ public SimpleString readSimpleString()
+ {
+ int len = readInt();
+ byte[] data = new byte[len];
+ readBytes(data);
+ return new SimpleString(data);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readString()
+ */
+ public String readString()
+ {
+ int len = readInt();
+
+ if (len < 9)
+ {
+ char[] chars = new char[len];
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = (char)readShort();
+ }
+ return new String(chars);
+ }
+ else if (len < 0xfff)
+ {
+ return readUTF();
+ }
+ else
+ {
+ return readSimpleString().toString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readUTF()
+ */
+ public String readUTF()
+ {
+ return UTF8Util.readUTF(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeBoolean(boolean)
+ */
+ public void writeBoolean(final boolean val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeChar(char)
+ */
+ public void writeChar(final char val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeDouble(double)
+ */
+ public void writeDouble(final double val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeFloat(float)
+ */
+ public void writeFloat(final float val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableSimpleString(org.hornetq.util.SimpleString)
+ */
+ public void writeNullableSimpleString(final SimpleString val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableString(java.lang.String)
+ */
+ public void writeNullableString(final String val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeSimpleString(org.hornetq.util.SimpleString)
+ */
+ public void writeSimpleString(final SimpleString val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeString(java.lang.String)
+ */
+ public void writeString(final String val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeUTF(java.lang.String)
+ */
+ public void writeUTF(final String utf)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#compareTo(org.hornetq.api.core.buffers.ChannelBuffer)
+ */
+ public int compareTo(final HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+ public HornetQBuffer copy()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public HornetQBuffer slice(final int index, final int length)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @param body
+ */
+ // Inner classes -------------------------------------------------
+
+ public ChannelBuffer channelBuffer()
+ {
+ return null;
+ }
+
+ public HornetQBuffer copy(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer duplicate()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer readSlice(final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setChar(final int index, final char value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setDouble(final int index, final double value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setFloat(final int index, final float value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer slice()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+}
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.Executor;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A GZipUtil
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class GZipUtil
+{
+
+ private static final Logger log = Logger.getLogger(GZipUtil.class);
+
+ /**
+ * This will start a GZipOutputStream, using another thread through a Pipe
+ * TODO: We would need an inverted GZipInputStream (that would compress on reading) to avoid creating this thread (through an executor)
+ * @param inputStreamParameter
+ * @param compress = true if compressing, false if decompressing
+ * @return
+ * @throws HornetQException
+ */
+ public static InputStream pipeGZip(final InputStream inputStreamParameter, final boolean compress, final Executor threadPool) throws HornetQException
+ {
+ final InputStream input;
+ if (compress)
+ {
+ input = inputStreamParameter;
+ }
+ else
+ {
+ try
+ {
+ input = new GZIPInputStream(new BufferedInputStream(inputStreamParameter));
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+ }
+
+ final PipedOutputStream pipedOut = new PipedOutputStream();
+ final PipedInputStream pipedInput = new PipedInputStream();
+ try
+ {
+ pipedOut.connect(pipedInput);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+ }
+
+ threadPool.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ byte readBytes[] = new byte[1024];
+ int size = 0;
+
+ try
+ {
+ OutputStream out;
+ if (compress)
+ {
+ BufferedOutputStream buffOut = new BufferedOutputStream(pipedOut);
+ out = new GZIPOutputStream(buffOut);
+ }
+ else
+ {
+ out = new BufferedOutputStream(pipedOut);
+ }
+ while ((size = input.read(readBytes)) > 0)
+ {
+ System.out.println("Read " + size + " bytes on compressing thread");
+ out.write(readBytes, 0, size);
+ }
+ System.out.println("Finished compressing");
+ out.close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage());
+ try
+ {
+ pipedOut.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ }
+ });
+
+ return pipedInput;
+ }
+
+ public static void deZip(final InputStream input, final OutputStream output, final Executor threadPool) throws HornetQException
+ {
+ threadPool.execute(new Runnable()
+ {
+
+ public void run()
+ {
+ byte readBytes[] = new byte[1024];
+ int size = 0;
+
+ OutputStream out = null;
+
+ try
+ {
+ BufferedOutputStream buffOut = new BufferedOutputStream(output);
+ out = new GZIPOutputStream(buffOut);
+ while ((size = input.read(readBytes)) > 0)
+ {
+ System.out.println("Read " + size + " bytes on compressing thread");
+ out.write(readBytes, 0, size);
+ }
+ System.out.println("Finished compressing");
+ out.close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage());
+ try
+ {
+ out.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ }
+ });
+ }
+
+
+}
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/HornetQBufferInputStream.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Used to send large messages
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStream extends InputStream
+{
+
+ /* (non-Javadoc)
+ * @see java.io.InputStream#read()
+ */
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private HornetQBuffer bb;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public HornetQBufferInputStream(final HornetQBuffer paramByteBuffer)
+ {
+ bb = paramByteBuffer;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ if (remainingBytes() == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return bb.readByte();
+ }
+ }
+
+ @Override
+ public int read(final byte[] byteArray) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ return read(byteArray, 0, byteArray.length);
+ }
+
+ @Override
+ public int read(final byte[] byteArray, final int off, final int len) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ if (byteArray == null)
+ {
+ throw new NullPointerException();
+ }
+ if (off < 0 || off > byteArray.length || len < 0 || off + len > byteArray.length || off + len < 0)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0)
+ {
+ return 0;
+ }
+
+ int size = Math.min(remainingBytes(), len);
+
+ if (size == 0)
+ {
+ return -1;
+ }
+
+ bb.readBytes(byteArray, off, size);
+ return size;
+ }
+
+ @Override
+ public long skip(final long len) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("skip on a closed InputStream");
+ }
+
+ if (len <= 0L)
+ {
+ return 0L;
+ }
+
+ int size = Math.min(remainingBytes(), (int) len);
+
+ bb.skipBytes((int)size);
+
+ return size;
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("available on a closed InputStream");
+ }
+
+ return remainingBytes();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ bb = null;
+ }
+
+ @Override
+ public synchronized void mark(final int paramInt)
+ {
+ }
+
+ @Override
+ public synchronized void reset() throws IOException
+ {
+ throw new IOException("mark/reset not supported");
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @return
+ */
+ private int remainingBytes()
+ {
+ return bb.writerIndex() - bb.readerIndex();
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-08-20 22:30:33 UTC (rev 9575)
@@ -20,6 +20,7 @@
<producer-window-size>7712652</producer-window-size>
<producer-max-rate>789</producer-max-rate>
<min-large-message-size>12</min-large-message-size>
+ <compress-large-messages>true</compress-large-messages>
<client-id>TestClientID</client-id>
<dups-ok-batch-size>3456</dups-ok-batch-size>
<transaction-batch-size>4567</transaction-batch-size>
Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -72,6 +72,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -68,6 +68,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -298,6 +298,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
prefetchSize,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Added: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java (rev 0)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/CompressedLargeMessageTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+
+/**
+ * A CompressedLargeMessageTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompressedLargeMessageTest extends LargeMessageTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected ClientSessionFactoryImpl createFactory(final boolean isNetty)
+ {
+ ClientSessionFactoryImpl factory = super.createFactory(isNetty);
+ factory.setCompressLargeMessages(true);
+ return factory;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -151,7 +151,7 @@
public void doTestLargeBuffer(boolean transacted) throws Exception
{
final int journalsize = 100 * 1024;
- final int messageSize = 3 * journalsize;
+ final int messageSize = 3 * journalsize + 5;
// final int messageSize = 5 * 1024;
ClientSession session = null;
@@ -169,6 +169,8 @@
server.start();
ClientSessionFactory sf = createFactory(isNetty());
+
+ sf.setCompressLargeMessages(true);
session = sf.createSession(!transacted, !transacted, 0);
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -136,6 +136,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -206,6 +206,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -305,6 +305,7 @@
callTimeout,
true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -74,6 +74,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -241,6 +241,7 @@
callTimeout,
true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -152,6 +152,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -74,6 +74,7 @@
assertEquals(7712652, cfConfig.getProducerWindowSize());
assertEquals(789, cfConfig.getProducerMaxRate());
assertEquals(12, cfConfig.getMinLargeMessageSize());
+ assertEquals(true, cfConfig.isCompressLargeMessages());
assertEquals("TestClientID", cfConfig.getClientID());
assertEquals(3456, cfConfig.getDupsOKBatchSize());
assertEquals(4567, cfConfig.getTransactionBatchSize());
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -15,6 +15,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
@@ -35,12 +36,14 @@
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientMessageInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.LargeMessageBufferImpl;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
/**
* A LargeMessageBufferUnitTest
@@ -56,7 +59,7 @@
// Attributes ----------------------------------------------------
- static int tmpFileCounter = 0;
+ static int tmpFileCounter = 0;
// Static --------------------------------------------------------
@@ -67,13 +70,13 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
tmpFileCounter++;
File tmp = new File(getTestDir());
tmp.mkdirs();
}
-
+
protected void tearDown() throws Exception
{
super.tearDown();
@@ -166,6 +169,20 @@
}
}
+ public void testReadIntegersOverStream() throws Exception
+ {
+ LargeMessageBufferImpl buffer = createBufferWithIntegers(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+ DataInputStream dataInput = new DataInputStream(is);
+
+ for (int i = 1; i <= 15; i++)
+ {
+ Assert.assertEquals(i, dataInput.readInt());
+ }
+
+ assertEquals(-1, dataInput.read());
+ }
+
// testing void getBytes(int index, ChannelBuffer dst, int dstIndex, int length)
public void testReadLongs() throws Exception
{
@@ -186,6 +203,20 @@
}
}
+ public void testReadLongsOverStream() throws Exception
+ {
+ LargeMessageBufferImpl buffer = createBufferWithLongs(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+ DataInputStream dataInput = new DataInputStream(is);
+
+ for (int i = 1; i <= 15; i++)
+ {
+ Assert.assertEquals(i, dataInput.readLong());
+ }
+
+ assertEquals(-1, dataInput.read());
+ }
+
public void testReadData() throws Exception
{
HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
@@ -315,14 +346,14 @@
Assert.assertEquals(i, bytes[i]);
}
}
-
+
public void testSplitBufferOnFile() throws Exception
{
LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
- 1024 * 1024,
- 1,
- getTestFile(),
- 1024);
+ 1024 * 1024,
+ 1,
+ getTestFile(),
+ 1024);
try
{
@@ -525,6 +556,36 @@
}
+ public void testReadBytesOnStreaming() throws Exception
+ {
+ byte[] byteArray = new byte[1024];
+ for (int i = 0; i < byteArray.length; i++)
+ {
+ byteArray[i] = getSamplebyte(i);
+ }
+
+ HornetQBuffer splitbuffer = splitBuffer(3, byteArray);
+
+ HornetQBufferInputStream is = new HornetQBufferInputStream(splitbuffer);
+
+ for (int i = 0; i < 100; i++)
+ {
+ assertEquals(getSamplebyte(i), (byte)is.read());
+ }
+
+ for (int i = 100; i < byteArray.length; i += 10)
+ {
+ byte readBytes[] = new byte[10];
+
+ int size = is.read(readBytes);
+
+ for (int j = 0; j < size; j++)
+ {
+ assertEquals(getSamplebyte(i + j), readBytes[j]);
+ }
+ }
+ }
+
/**
* @return
*/
@@ -795,6 +856,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientConsumerInternal#getSession()
+ */
+ public ClientSessionInternal getSession()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}
Added: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java (rev 0)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
+
+/**
+ * A HornetQInputStreamTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStreamTest extends UnitTestCase
+{
+
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testReadBytes() throws Exception
+ {
+ byte bytes[] = new byte[10*1024];
+ for (int i = 0 ; i < bytes.length; i++)
+ {
+ bytes[i] = getSamplebyte(i);
+ }
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+
+ // First read byte per byte
+ for (int i = 0 ; i < 1024; i++)
+ {
+ assertEquals(getSamplebyte(i), is.read());
+ }
+
+ // Second, read in chunks
+ for (int i = 1; i < 10; i++)
+ {
+ bytes = new byte[1024];
+ is.read(bytes);
+ for (int j = 0 ; j < bytes.length; j++)
+ {
+ assertEquals(getSamplebyte(i * 1024 + j), bytes[j]);
+ }
+
+ }
+
+ assertEquals(-1, is.read());
+
+
+ bytes = new byte[1024];
+
+ int sizeRead = is.read(bytes);
+
+ assertEquals(-1, sizeRead);
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-08-20 22:28:06 UTC (rev 9574)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-08-20 22:30:33 UTC (rev 9575)
@@ -190,6 +190,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
13 years, 9 months
JBoss hornetq SVN: r9574 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-20 18:28:06 -0400 (Fri, 20 Aug 2010)
New Revision: 9574
Added:
branches/Branch_Large_Message_Compression/
Log:
Large Message Compression temporary Branch
Copied: branches/Branch_Large_Message_Compression (from rev 9573, trunk)
13 years, 9 months
JBoss hornetq SVN: r9573 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-20 10:44:23 -0400 (Fri, 20 Aug 2010)
New Revision: 9573
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
Log:
fix testMultipleClusterConnections
* make sure that cluster connections cluster2 & cluster3's address are not matched by cluster conn cluster1
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-08-20 14:34:03 UTC (rev 9572)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-08-20 14:44:23 UTC (rev 9573)
@@ -1142,8 +1142,8 @@
public void testMultipleClusterConnections() throws Exception
{
- setupClusterConnection("cluster2", 0, 1, "queues2", false, 1, isNetty());
- setupClusterConnection("cluster3", 0, 1, "queues3", false, 1, isNetty());
+ setupClusterConnection("cluster2", 0, 1, "q2", false, 1, isNetty());
+ setupClusterConnection("cluster3", 0, 1, "q3", false, 1, isNetty());
startServers(1, 0);
@@ -1154,17 +1154,17 @@
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(0, "queues.testaddress", "queue1", null, false);
- createQueue(0, "queues2.testaddress", "queue2", null, false);
- createQueue(0, "queues2.testaddress", "queue3", null, false);
- createQueue(0, "queues3.testaddress", "queue4", null, false);
- createQueue(0, "queues3.testaddress", "queue5", null, false);
+ createQueue(0, "q2.testaddress", "queue2", null, false);
+ createQueue(0, "q2.testaddress", "queue3", null, false);
+ createQueue(0, "q3.testaddress", "queue4", null, false);
+ createQueue(0, "q3.testaddress", "queue5", null, false);
createQueue(1, "queues.testaddress", "queue6", null, false);
createQueue(1, "queues.testaddress", "queue7", null, false);
- createQueue(1, "queues2.testaddress", "queue8", null, false);
- createQueue(1, "queues2.testaddress", "queue9", null, false);
- createQueue(1, "queues3.testaddress", "queue10", null, false);
- createQueue(1, "queues3.testaddress", "queue11", null, false);
+ createQueue(1, "q2.testaddress", "queue8", null, false);
+ createQueue(1, "q2.testaddress", "queue9", null, false);
+ createQueue(1, "q3.testaddress", "queue10", null, false);
+ createQueue(1, "q3.testaddress", "queue11", null, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 0, "queue1", null);
@@ -1183,11 +1183,11 @@
waitForBindings(0, "queues.testaddress", 2, 2, true);
waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(0, "queues2.testaddress", 2, 2, true);
- waitForBindings(0, "queues2.testaddress", 2, 2, false);
+ waitForBindings(0, "q2.testaddress", 2, 2, true);
+ waitForBindings(0, "q2.testaddress", 2, 2, false);
- waitForBindings(0, "queues3.testaddress", 2, 2, true);
- waitForBindings(0, "queues3.testaddress", 2, 2, false);
+ waitForBindings(0, "q3.testaddress", 2, 2, true);
+ waitForBindings(0, "q3.testaddress", 2, 2, false);
send(0, "queues.testaddress", 10, false, null);
@@ -1195,13 +1195,13 @@
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
- send(0, "queues2.testaddress", 10, false, null);
+ send(0, "q2.testaddress", 10, false, null);
verifyReceiveAll(10, 2, 3, 8, 9);
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
- send(0, "queues3.testaddress", 10, false, null);
+ send(0, "q3.testaddress", 10, false, null);
verifyReceiveAll(10, 4, 5, 10, 11);
13 years, 9 months
JBoss hornetq SVN: r9572 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-08-20 10:34:03 -0400 (Fri, 20 Aug 2010)
New Revision: 9572
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
make sure inner delivery loop doesn't sping too long
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-08-20 13:51:18 UTC (rev 9571)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-08-20 14:34:03 UTC (rev 9572)
@@ -1070,6 +1070,15 @@
while (handled < numRefs)
{
+ if (handled == MAX_DELIVERIES_IN_LOOP)
+ {
+ // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
+
+ deliverAsync();
+
+ return;
+ }
+
ConsumerHolder holder = consumerList.get(pos);
Consumer consumer = holder.consumer;
@@ -1160,14 +1169,7 @@
if (pos == size)
{
pos = 0;
- }
-
- if (handled == MAX_DELIVERIES_IN_LOOP)
- {
- // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
-
- deliverAsync();
- }
+ }
}
}
13 years, 9 months
JBoss hornetq SVN: r9571 - in branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster: failover and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-20 09:51:18 -0400 (Fri, 20 Aug 2010)
New Revision: 9571
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
Log:
added methods to create HornetQ servers using fake FileLock
(required to have both live and backup servers running inside the same VM)
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-20 13:45:58 UTC (rev 9570)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-20 13:51:18 UTC (rev 9571)
@@ -1197,27 +1197,48 @@
{
setupServer(node, fileStorage, netty, backup, -1);
}
+
+ protected void setupServer(final int node, final boolean fileStorage, final boolean netty, final boolean backup, final boolean useFakeLock)
+ {
+ setupServer(node, fileStorage, netty, backup, -1);
+ }
protected void setupServer(final int node, final boolean fileStorage, final boolean netty, final int backupNode)
{
- setupServer(node, fileStorage, netty, false, backupNode);
+ setupServer(node, fileStorage, netty, false, backupNode, false);
}
+ protected void setupServer(final int node, final boolean fileStorage, final boolean netty, final int backupNode, final boolean useFakeLock)
+ {
+ setupServer(node, fileStorage, netty, false, backupNode, useFakeLock);
+ }
+
protected void setupServer(final int node,
final boolean fileStorage,
final boolean netty,
final boolean backup,
final int backupNode)
{
- setupServer(node, fileStorage, true, netty, backup, backupNode);
+ setupServer(node, fileStorage, netty, backup, backupNode, false);
}
+
+ protected void setupServer(final int node,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean backup,
+ final int backupNode,
+ final boolean useFakeLock)
+ {
+ setupServer(node, fileStorage, true, netty, backup, backupNode, useFakeLock);
+ }
protected void setupServer(final int node,
final boolean fileStorage,
final boolean sharedStorage,
final boolean netty,
final boolean backup,
- final int backupNode)
+ final int backupNode,
+ final boolean useFakeLock)
{
if (servers[node] != null)
{
@@ -1259,11 +1280,25 @@
if (fileStorage)
{
- server = HornetQServers.newHornetQServer(configuration);
+ if (useFakeLock)
+ {
+ server = createFakeLockServer(true, configuration);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
}
else
{
- server = HornetQServers.newHornetQServer(configuration, false);
+ if (useFakeLock)
+ {
+ server = createFakeLockServer(false, configuration);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
}
servers[node] = server;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-08-20 13:45:58 UTC (rev 9570)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-08-20 13:51:18 UTC (rev 9571)
@@ -562,18 +562,18 @@
protected void setupServers() throws Exception
{
// The backups
- setupServer(5, isFileStorage(), isNetty(), true);
- setupServer(6, isFileStorage(), isNetty(), true);
- setupServer(7, isFileStorage(), isNetty(), true);
- setupServer(8, isFileStorage(), isNetty(), true);
- setupServer(9, isFileStorage(), isNetty(), true);
+ setupServer(5, isFileStorage(), isNetty(), true, true);
+ setupServer(6, isFileStorage(), isNetty(), true, true);
+ setupServer(7, isFileStorage(), isNetty(), true, true);
+ setupServer(8, isFileStorage(), isNetty(), true, true);
+ setupServer(9, isFileStorage(), isNetty(), true, true);
// The lives
- setupServer(0, isFileStorage(), isNetty(), 5);
- setupServer(1, isFileStorage(), isNetty(), 6);
- setupServer(2, isFileStorage(), isNetty(), 7);
- setupServer(3, isFileStorage(), isNetty(), 8);
- setupServer(4, isFileStorage(), isNetty(), 9);
+ setupServer(0, isFileStorage(), isNetty(), 5, true);
+ setupServer(1, isFileStorage(), isNetty(), 6, true);
+ setupServer(2, isFileStorage(), isNetty(), 7, true);
+ setupServer(3, isFileStorage(), isNetty(), 8, true);
+ setupServer(4, isFileStorage(), isNetty(), 9, true);
}
@Override
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2010-08-20 13:45:58 UTC (rev 9570)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2010-08-20 13:51:18 UTC (rev 9571)
@@ -87,6 +87,6 @@
@Override
void setupMasterServer(final int i, final boolean fileStorage, final boolean netty)
{
- setupServer(i, fileStorage, false, netty, false, 2);
+ setupServer(i, fileStorage, false, netty, false, 2, false);
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2010-08-20 13:45:58 UTC (rev 9570)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2010-08-20 13:51:18 UTC (rev 9571)
@@ -267,9 +267,9 @@
{
super.setUp();
- setupServer(1, true, isShared(), true, false, -1);
- setupServer(2, true, isShared(), true, true, -1);
- setupServer(3, true, isShared(), true, true, 2);
+ setupServer(1, true, isShared(), true, false, -1, false);
+ setupServer(2, true, isShared(), true, true, -1, false);
+ setupServer(3, true, isShared(), true, true, 2, false);
setupClusterConnectionWithBackups("test", "test", false, 1, true, 1, new int[] { 3 }, new int[] { 2 });
13 years, 9 months