JBoss hornetq SVN: r9954 - branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-30 12:42:12 -0500 (Tue, 30 Nov 2010)
New Revision: 9954
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
added latch for nodes being removed
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-11-30 16:27:01 UTC (rev 9953)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-11-30 17:42:12 UTC (rev 9954)
@@ -414,6 +414,7 @@
final List<String> nodes = new ArrayList<String>();
final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(5);
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
@@ -434,6 +435,7 @@
if (nodes.contains(nodeID))
{
nodes.remove(nodeID);
+ downLatch.countDown();
}
}
});
@@ -454,6 +456,8 @@
waitForClusterConnections(4, 4);
stopServers(0, 4, 2, 3, 1);
+
+ assertTrue("Was not notified that all servers are Down", upLatch.await(10, SECONDS));
checkContains(new int[] { }, nodeIDs, nodes);
for (int i = 0; i < sfs.length; i++)
14 years
JBoss hornetq SVN: r9953 - in branches/2_2_0_HA_Improvements_preMerge: src/main/org/hornetq/core/protocol/core/impl/wireformat and 6 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-30 11:27:01 -0500 (Tue, 30 Nov 2010)
New Revision: 9953
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
activate cluster connection properly and test fixes
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -244,7 +244,7 @@
public void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp)
{
- if(live.equals(connectorConfig))
+ if(live.equals(connectorConfig) && backUp != null)
{
backupConfig = backUp;
}
@@ -1209,10 +1209,7 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
- if (msg.getNodeID() != null)
- {
- System.out.println("received disconnect from node " + msg.getNodeID());
- }
+
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -49,16 +49,8 @@
this.backup = backup;
this.connector = tc;
- if(System.getProperty("foo") != null)
- {
- if(tc.toString().contains("org-hornetq-core-remoting-impl-invm-InVMConnectorFactory?server-id=1"))
- {
- System.out.println("");
- }
- }
-
}
-
+
public NodeAnnounceMessage()
{
super(PacketImpl.NODE_ANNOUNCE);
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -48,7 +48,7 @@
SimpleString queueName,
int distance) throws Exception;
- void activate();
+ void activate() throws Exception;
TransportConfiguration getConnector();
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -195,41 +195,14 @@
return;
}
- if (serverLocator != null)
+ started = true;
+
+ if(!backup)
{
- serverLocator.addClusterTopologyListener(this);
- serverLocator.start();
- // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
- server.getExecutorFactory().getExecutor().execute(new Runnable()
- {
- public void run()
- {
- try
- {
- serverLocator.connect();
- }
- catch (Exception e)
- {
- if(started)
- {
- log.warn("did not connect the cluster connection to other nodes", e);
- }
- }
- }
- });
+ activate();
}
-
- started = true;
- if (managementService != null)
- {
- TypedProperties props = new TypedProperties();
- props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(),
- NotificationType.CLUSTER_CONNECTION_STARTED,
- props);
- managementService.sendNotification(notification);
- }
+
}
public void stop() throws Exception
@@ -299,7 +272,7 @@
return nodes;
}
- public synchronized void activate()
+ public synchronized void activate() throws Exception
{
if (!started)
{
@@ -307,6 +280,41 @@
}
backup = false;
+
+
+ if (serverLocator != null)
+ {
+ serverLocator.addClusterTopologyListener(this);
+ serverLocator.start();
+ // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+ server.getExecutorFactory().getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ serverLocator.connect();
+ }
+ catch (Exception e)
+ {
+ if(started)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(),
+ NotificationType.CLUSTER_CONNECTION_STARTED,
+ props);
+ managementService.sendNotification(notification);
+ }
}
public TransportConfiguration getConnector()
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -100,7 +100,7 @@
private Topology topology = new Topology();
- private ServerLocatorInternal backupServerLocator;
+ private volatile ServerLocatorInternal backupServerLocator;
private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
@@ -384,7 +384,7 @@
{
try
{
- clusterConnection.start();
+ clusterConnection.activate();
}
catch (Exception e)
{
@@ -790,12 +790,10 @@
clusterConnections.put(config.getName(), clusterConnection);
- if (!backup)
+ clusterConnection.start();
+
+ if(backup)
{
- clusterConnection.start();
- }
- else
- {
announceBackup(config, connector);
}
}
@@ -836,7 +834,11 @@
try
{
ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
- backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+ if (backupSessionFactory != null)
+ {
+ backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+ log.info("backup announced");
+ }
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -605,17 +605,18 @@
}
connectorsService.stop();
+ //we stop the groupinghandler before we stop te cluster manager so binding mappings aren't removed in case of failover
+ if (groupingHandler != null)
+ {
+ managementService.removeNotificationListener(groupingHandler);
+ groupingHandler = null;
+ }
if (clusterManager != null)
{
clusterManager.stop();
}
- if (groupingHandler != null)
- {
- managementService.removeNotificationListener(groupingHandler);
- groupingHandler = null;
- }
}
// we stop the remoting service outside a lock
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -785,7 +785,7 @@
{
for (int i = 0; i < consumers.length; i++)
{
- if (consumers[i] != null)
+ if (consumers[i] != null && !consumers[i].consumer.isClosed())
{
ClusterTestBase.log.info("Dumping consumer " + i);
@@ -1163,6 +1163,11 @@
protected void setupSessionFactory(final int node, final boolean netty) throws Exception
{
+ setupSessionFactory(node, netty, false);
+ }
+
+ protected void setupSessionFactory(final int node, final boolean netty, boolean ha) throws Exception
+ {
if (sfs[node] != null)
{
throw new IllegalArgumentException("Already a server at " + node);
@@ -1181,7 +1186,14 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
}
- locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ if (ha)
+ {
+ locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+ }
+ else
+ {
+ locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ }
locators[node].setBlockOnNonDurableSend(true);
locators[node].setBlockOnDurableSend(true);
@@ -1190,6 +1202,7 @@
sfs[node] = sf;
}
+
protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
{
if (sfs[node] != null)
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -29,64 +29,9 @@
*/
public class GroupingFailoverReplicationTest extends GroupingFailoverTestBase
{
- private static final Logger log = Logger.getLogger(GroupingFailoverReplicationTest.class);
-
@Override
- protected void setupReplicatedServer(final int node,
- final boolean fileStorage,
- final boolean netty,
- final int backupNode)
+ boolean isSharedServer()
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
-
- Configuration configuration = new ConfigurationImpl();
-
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
- configuration.setBackup(true);
- configuration.setSharedStore(false);
-
- configuration.getAcceptorConfigurations().clear();
-
- Map<String, Object> params = generateParams(node, netty);
-
- TransportConfiguration invmtc = new TransportConfiguration(ServiceTestBase.INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
-
- if (netty)
- {
- TransportConfiguration nettytc = new TransportConfiguration(ServiceTestBase.NETTY_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(nettytc);
- }
-
- HornetQServer server;
-
- if (fileStorage)
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- servers[node] = server;
+ return false;
}
-
- @Override
- void setupMasterServer(final int i, final boolean fileStorage, final boolean netty)
- {
- setupLiveServer(i, fileStorage, false, netty);
- }
}
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -28,67 +28,10 @@
*/
public class GroupingFailoverSharedServerTest extends GroupingFailoverTestBase
{
- @Override
- protected void setupReplicatedServer(final int node,
- final boolean fileStorage,
- final boolean netty,
- final int backupNode)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
- Configuration configuration = new ConfigurationImpl();
-
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalDirectory(getJournalDir(backupNode, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setPagingDirectory(getPageDir(backupNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode, false));
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
- configuration.setBackup(true);
- configuration.setSharedStore(true);
-
- configuration.getAcceptorConfigurations().clear();
-
- Map<String, Object> params = generateParams(node, netty);
-
- TransportConfiguration invmtc = new TransportConfiguration(ServiceTestBase.INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
-
- if (netty)
- {
- TransportConfiguration nettytc = new TransportConfiguration(ServiceTestBase.NETTY_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(nettytc);
- }
-
- HornetQServer server;
-
- if (fileStorage)
- {
- if(nodeManagers[backupNode] == null)
- {
- nodeManagers[backupNode] = new InVMNodeManager();
- }
- server = createInVMFailoverServer(true, configuration, nodeManagers[backupNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- servers[node] = server;
- }
-
@Override
- public void setupMasterServer(final int i, final boolean fileStorage, final boolean netty)
+ boolean isSharedServer()
{
- setupServer(i, fileStorage, netty);
+ return true;
}
-
}
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -21,7 +21,9 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -37,17 +39,17 @@
public void testGroupingLocalHandlerFails() throws Exception
{
- setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
- setupMasterServer(0, isFileStorage(), isNetty());
+ setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
+ setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
- setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[] { 0 });
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
- setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 1);
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 2, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
@@ -74,6 +76,11 @@
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForServerTopology(servers[1], 3, 5);
+
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 0);
@@ -90,12 +97,6 @@
waitForBindings(2, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, false);
-
- waitForBindings(1, "queues.testaddress", 1, 1, true);
-
- waitForBindings(1, "queues.testaddress", 1, 1, false);
-
sendWithProperty(2, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 2);
@@ -116,18 +117,19 @@
public void testGroupingLocalHandlerFailsMultipleGroups() throws Exception
{
- setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
- setupMasterServer(0, isFileStorage(), isNetty());
+ setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
+ setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
- setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[] { 0 });
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
- setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 1);
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 2, 1);
+
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -138,7 +140,9 @@
try
{
startServers(2, 0, 1);
+
setupSessionFactory(0, isNetty());
+
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
@@ -155,6 +159,12 @@
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForServerTopology(servers[1], 3, 5);
+
+
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id2"));
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id3"));
@@ -166,22 +176,8 @@
closeSessionFactory(0);
- final CountDownLatch latch = new CountDownLatch(1);
+ servers[0].kill();
- class MyListener implements FailureListener
- {
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
- }
-
- Map<String, MessageFlowRecord> records = ((ClusterConnectionImpl)getServer(1).getClusterManager()
- .getClusterConnection(new SimpleString("cluster1"))).getRecords();
- RemotingConnection rc = records.get("0").getBridge().getForwardingConnection();
- rc.addFailureListener(new MyListener());
- fail(rc, latch);
-
waitForServerRestart(2);
setupSessionFactory(2, isNetty());
@@ -203,7 +199,7 @@
sendWithProperty(2, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id5"));
sendWithProperty(2, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id6"));
- verifyReceiveAllWithGroupIDRoundRobin(0, 30, 1, 2);
+ verifyReceiveAllWithGroupIDRoundRobin(2, 30, 1, 2);
System.out.println("*****************************************************************************");
}
@@ -219,14 +215,27 @@
}
}
- abstract void setupMasterServer(int i, boolean fileStorage, boolean netty);
+ private void waitForServerTopology(HornetQServer server, int nodes, int seconds)
+ throws InterruptedException
+ {
+ Topology topology = server.getClusterManager().getTopology();
+ long timeToWait = System.currentTimeMillis() + (seconds * 1000);
+ while(topology.nodes()!= nodes)
+ {
+ Thread.sleep(100);
+ if(System.currentTimeMillis() > timeToWait)
+ {
+ fail("timed out waiting for server topology");
+ }
+ }
+ }
public boolean isNetty()
{
return true;
}
- abstract void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int backupNode);
+ abstract boolean isSharedServer();
private void fail(final RemotingConnection conn, final CountDownLatch latch) throws InterruptedException
{
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -97,7 +97,8 @@
System.setProperty("foo", "bar");
servers.get(3).crash(session2);
int liveAfter3 = waitForNewLive(10000, true, servers, 4, 5);
-
+ locator.close();
+ locator2.close();
if (liveAfter0 == 2)
{
servers.get(1).stop();
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -94,6 +94,8 @@
session = sendAndConsume(sf, false);
session.close();
servers.get(backupNode).stop();
+
+ locator.close();
}
protected void createBackupConfig(int liveNode, int nodeid, int... nodes)
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-11-30 15:39:22 UTC (rev 9952)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-11-30 16:27:01 UTC (rev 9953)
@@ -98,6 +98,20 @@
{
System.exit(0);
}
+ Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();
+ for (Thread thread : threadMap.keySet())
+ {
+ StackTraceElement[] stack = threadMap.get(thread);
+ for (StackTraceElement stackTraceElement : stack)
+ {
+ if(stackTraceElement.getMethodName().contains("getConnectionWithRetry"))
+ {
+ System.out.println(this.getName() + " has left threads running");
+ fail("test left serverlocator running, this could effect other tests");
+ //System.exit(0);
+ }
+ }
+ }
}
protected static Map<String, Object> generateParams(final int node, final boolean netty)
14 years
JBoss hornetq SVN: r9952 - in branches/Branch_Large_Message_Compression/src/main/org/hornetq: utils and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-30 10:39:22 -0500 (Tue, 30 Nov 2010)
New Revision: 9952
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
Log:
replace old impl
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-11-30 15:39:22 UTC (rev 9952)
@@ -13,16 +13,8 @@
package org.hornetq.core.client.impl;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.concurrent.Executor;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -30,13 +22,12 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
-import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.DeflaterReader;
import org.hornetq.utils.HornetQBufferInputStream;
import org.hornetq.utils.TokenBucketLimiter;
import org.hornetq.utils.UUIDGenerator;
@@ -411,8 +402,7 @@
if (session.isCompressLargeMessages())
{
- //input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
- input = GZipUtil.createZipInputStream(inputStreamParameter);
+ input = new DeflaterReader(inputStreamParameter);
}
while (!lastPacket)
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-11-30 15:39:22 UTC (rev 9952)
@@ -1257,6 +1257,7 @@
{
try
{
+ System.err.println("___writing packet: " + packet + " output " + output);
output.write(packet.getBody());
if (!packet.isContinues())
{
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-30 15:39:22 UTC (rev 9952)
@@ -98,7 +98,7 @@
public void setOutputStream(final OutputStream output) throws HornetQException
{
- bufferDelegate.setOutputStream(GZipUtil.createZipOutputStream(output));
+ bufferDelegate.setOutputStream(new InflaterWriter(output));
}
public synchronized void saveBuffer(final OutputStream output) throws HornetQException
@@ -145,7 +145,7 @@
{
InputStream input = new HornetQBufferInputStream(bufferDelegate);
- dataInput = new DataInputStream(GZipUtil.createUnZipInputStream(input));
+ dataInput = new DataInputStream(new InflaterReader(input));
}
catch (Exception e)
{
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-30 15:39:22 UTC (rev 9952)
@@ -16,6 +16,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.zip.Deflater;
/**
@@ -26,7 +27,7 @@
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*
*/
-public class DeflaterReader
+public class DeflaterReader extends InputStream
{
private Deflater deflater = new Deflater();
private boolean isFinished = false;
@@ -38,12 +39,22 @@
{
input = inData;
}
-
- public int read(byte[] buffer) throws IOException
+
+ public int read() throws IOException
{
- return read(buffer, 0, buffer.length);
+ byte[] buffer = new byte[1];
+ int n = read(buffer, 0, 1);
+ if (n == 1)
+ {
+ return buffer[0];
+ }
+ if (n == -1)
+ {
+ return -1;
+ }
+ throw new IOException("Error reading data, invalid n: " + n);
}
-
+
/**
* Try to fill the buffer with compressed bytes. Except the last effective read,
* this method always returns with a full buffer of compressed data.
@@ -52,6 +63,7 @@
* @return the number of bytes really filled, -1 indicates end.
* @throws IOException
*/
+ @Override
public int read(byte[] buffer, int offset, int len) throws IOException
{
if (compressDone)
@@ -74,6 +86,7 @@
{
deflater.end();
compressDone = true;
+ read = -1;
break;
}
else if (deflater.needsInput())
@@ -120,6 +133,25 @@
DeflaterReader reader = new DeflaterReader(inputStream);
+
+ ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+ int b = reader.read();
+
+ while (b != -1)
+ {
+ zipHolder.add(b);
+ b = reader.read();
+ }
+
+ byte[] allCompressed = new byte[zipHolder.size()];
+ for (int i = 0; i < allCompressed.length; i++)
+ {
+ allCompressed[i] = (byte) zipHolder.get(i).intValue();
+ }
+
+ System.err.println("compressed: " + getBytesString(allCompressed));
+
+/*
byte[] buffer = new byte[7];
int n = reader.read(buffer);
@@ -131,7 +163,7 @@
System.err.println("==>read n " + n + " values: " + getBytesString(buffer));
n = reader.read(buffer);
}
-
+*/
System.err.println("compressed.");
System.err.println("now verify");
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java 2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java 2010-11-30 15:39:22 UTC (rev 9952)
@@ -47,6 +47,11 @@
{
this.output = output;
}
+
+ private void log(String str)
+ {
+ System.err.println(this + " " + str);
+ }
/*
* Write a compressed byte.
@@ -54,6 +59,7 @@
@Override
public void write(int b) throws IOException
{
+ log("call write b: " + b);
writeBuffer[writePointer] = (byte)(b & 0xFF);
writePointer++;
@@ -62,6 +68,7 @@
writePointer = 0;
try
{
+ log("call doWrite");
doWrite();
}
catch (DataFormatException e)
@@ -74,6 +81,7 @@
@Override
public void close() throws IOException
{
+ log("call close");
if (writePointer > 0)
{
inflater.setInput(writeBuffer, 0, writePointer);
14 years
JBoss hornetq SVN: r9951 - branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-30 09:30:46 -0500 (Tue, 30 Nov 2010)
New Revision: 9951
Added:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java
Log:
added InflaterWriter
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-30 11:51:12 UTC (rev 9950)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-30 14:30:46 UTC (rev 9951)
@@ -22,6 +22,8 @@
* A DeflaterReader
* The reader takes an inputstream and compress it.
* Not for concurrent use.
+
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*
*/
public class DeflaterReader
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java 2010-11-30 11:51:12 UTC (rev 9950)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java 2010-11-30 14:30:46 UTC (rev 9951)
@@ -26,6 +26,7 @@
* It takes an compressed input stream and decompressed it as it is being read.
* Not for concurrent use.
*
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*/
public class InflaterReader extends InputStream
{
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java 2010-11-30 14:30:46 UTC (rev 9951)
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+/**
+ * A InflaterWriter
+ *
+ * This class takes an OutputStream. Compressed bytes
+ * can directly be written into this class. The class will
+ * decompress the bytes and write them to the output stream.
+ *
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class InflaterWriter extends OutputStream
+{
+ private Inflater inflater = new Inflater();
+ private OutputStream output;
+
+ private byte[] writeBuffer = new byte[1024];
+ private int writePointer = 0;
+
+ private byte[] outputBuffer = new byte[writeBuffer.length*2];
+
+ public InflaterWriter(OutputStream output)
+ {
+ this.output = output;
+ }
+
+ /*
+ * Write a compressed byte.
+ */
+ @Override
+ public void write(int b) throws IOException
+ {
+ writeBuffer[writePointer] = (byte)(b & 0xFF);
+ writePointer++;
+
+ if (writePointer == writeBuffer.length)
+ {
+ writePointer = 0;
+ try
+ {
+ doWrite();
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException("Error decompressing data", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (writePointer > 0)
+ {
+ inflater.setInput(writeBuffer, 0, writePointer);
+ try
+ {
+ int n = inflater.inflate(outputBuffer);
+ while (n > 0)
+ {
+ output.write(outputBuffer, 0, n);
+ n = inflater.inflate(outputBuffer);
+ }
+ output.close();
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private void doWrite() throws DataFormatException, IOException
+ {
+ inflater.setInput(writeBuffer);
+ int n = inflater.inflate(outputBuffer);
+
+ while (n > 0)
+ {
+ output.write(outputBuffer, 0, n);
+ n = inflater.inflate(outputBuffer);
+ }
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+ System.err.println("compress len: " + compressedDataLength);
+
+ byte[] zipBytes = new byte[compressedDataLength];
+
+ System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+ InflaterWriter writer = new InflaterWriter(byteOutput);
+
+ byte[] zipBuffer = new byte[12];
+
+ int n = byteInput.read(zipBuffer);
+ while (n > 0)
+ {
+ System.out.println("Writing: " + n);
+ writer.write(zipBuffer, 0, n);
+ n = byteInput.read(zipBuffer);
+ }
+
+ writer.close();
+
+ byte[] outcome = byteOutput.toByteArray();
+ String outStr = new String(outcome);
+
+ System.out.println("Outcome: " + outStr);
+
+ }
+
+}
14 years
JBoss hornetq SVN: r9950 - branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-30 06:51:12 -0500 (Tue, 30 Nov 2010)
New Revision: 9950
Added:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
Log:
an input stream used to read decompressed data
Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-30 03:30:08 UTC (rev 9949)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-30 11:51:12 UTC (rev 9950)
@@ -21,8 +21,8 @@
/**
* A DeflaterReader
* The reader takes an inputstream and compress it.
+ * Not for concurrent use.
*
- *
*/
public class DeflaterReader
{
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java 2010-11-30 11:51:12 UTC (rev 9950)
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+/**
+ * An InflaterReader
+ * It takes an compressed input stream and decompressed it as it is being read.
+ * Not for concurrent use.
+ *
+ */
+public class InflaterReader extends InputStream
+{
+ private Inflater inflater = new Inflater();
+
+ private InputStream input;
+
+ private byte[] readBuffer;
+ private int pointer;
+ private int length;
+
+ public InflaterReader(InputStream input)
+ {
+ this(input, 1024);
+ }
+
+ public InflaterReader(InputStream input, int bufferSize)
+ {
+ this.input = input;
+ this.readBuffer = new byte[bufferSize];
+ this.pointer = -1;
+ }
+
+ public static void log(String str)
+ {
+ System.out.println(str);
+ }
+
+ public int read() throws IOException
+ {
+ log("in read");
+
+ if (pointer == -1)
+ {
+ log("pointer is -1");
+
+ try
+ {
+ log("need to decompress more bytes");
+ length = doRead(readBuffer, 0, readBuffer.length);
+ log("bytes decompressed:" + length);
+ if (length == 0)
+ {
+ log("zero byte got, ending");
+ return -1;
+ }
+ log("reset pointer to zero");
+ pointer = 0;
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ log("reading byte at " + pointer);
+ int value = readBuffer[pointer] & 0xFF;
+ pointer++;
+ if (pointer == length)
+ {
+ log("buffer all read, set pointer to -1");
+ pointer = -1;
+ }
+
+ log("byte got: " + value);
+ return value;
+ }
+
+ /*
+ * feed inflater more bytes in order to get some
+ * decompressed output.
+ * returns number of bytes actually got
+ */
+ private int doRead(byte[] buf, int offset, int len) throws DataFormatException, IOException
+ {
+ int read = 0;
+ int n = 0;
+ byte[] inputBuffer = new byte[len];
+
+ while (len > 0)
+ {
+ n = inflater.inflate(buf, offset, len);
+ if (n == 0)
+ {
+ if (inflater.finished())
+ {
+ break;
+ }
+ else if (inflater.needsInput())
+ {
+ //feeding
+ int m = input.read(inputBuffer);
+
+ if (m == -1)
+ {
+ //it shouldn't be here, throw exception
+ throw new DataFormatException("Input is over while inflater still expecting data");
+ }
+ else
+ {
+ //feed the data in
+ inflater.setInput(inputBuffer);
+ n = inflater.inflate(buf, offset, len);
+ if (n > 0)
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+ }
+ }
+ else
+ {
+ //it shouldn't be here, throw
+ throw new DataFormatException("Inflater is neither finished nor needing input.");
+ }
+ }
+ else
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+ }
+ return read;
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+ System.err.println("compress len: " + compressedDataLength);
+
+ byte[] zipBytes = new byte[compressedDataLength];
+
+ System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+
+ InflaterReader inflater = new InflaterReader(byteInput);
+ ArrayList<Integer> holder = new ArrayList<Integer>();
+ int read = inflater.read();
+
+ while (read != -1)
+ {
+ holder.add(read);
+ read = inflater.read();
+ }
+
+ byte[] result = new byte[holder.size()];
+
+ System.out.println("total bytes: " + holder.size());
+ for (int i = 0; i < result.length; i++)
+ {
+ result[i] = holder.get(i).byteValue();
+ }
+
+ String txt = new String(result);
+ System.out.println("the result: " + txt);
+
+ }
+
+}
14 years
JBoss hornetq SVN: r9949 - branches/2_2_0_HA_Improvements_preMerge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 22:30:08 -0500 (Mon, 29 Nov 2010)
New Revision: 9949
Modified:
branches/2_2_0_HA_Improvements_preMerge/build-hornetq.xml
Log:
removing jms-tests temporarily (this is the dev-branch only) we won't remove them on trunk
Modified: branches/2_2_0_HA_Improvements_preMerge/build-hornetq.xml
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/build-hornetq.xml 2010-11-30 03:23:48 UTC (rev 9948)
+++ branches/2_2_0_HA_Improvements_preMerge/build-hornetq.xml 2010-11-30 03:30:08 UTC (rev 9949)
@@ -1924,7 +1924,8 @@
<target name="all-tests" depends="unit-tests, integration-tests, concurrent-tests, stress-tests, jms-tests, joram-tests, rest-tests"/>
- <target name="hudson-tests" depends="unit-tests, integration-tests, concurrent-tests, timing-tests, jms-tests, joram-tests"/>
+ <!-- jms-tests are removed temporarily until merge on trunk is done... target name="hudson-tests" depends="unit-tests, integration-tests, concurrent-tests, timing-tests, jms-tests, joram-tests"/ -->
+ <target name="hudson-tests" depends="unit-tests, integration-tests, concurrent-tests, timing-tests"/>
<target name="compile-reports">
<mkdir dir="${test.stylesheets.dir}"/>
14 years
JBoss hornetq SVN: r9948 - in branches/2_2_0_HA_Improvements_preMerge/tests: src/org/hornetq/tests/integration/jms/server and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 22:23:48 -0500 (Mon, 29 Nov 2010)
New Revision: 9948
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/config/hornetq-jms-for-JMSServerDeployerTest2.xml
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
Log:
fixing tests
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/config/hornetq-jms-for-JMSServerDeployerTest2.xml
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/config/hornetq-jms-for-JMSServerDeployerTest2.xml 2010-11-30 03:18:12 UTC (rev 9947)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/config/hornetq-jms-for-JMSServerDeployerTest2.xml 2010-11-30 03:23:48 UTC (rev 9948)
@@ -30,7 +30,6 @@
<auto-group>false</auto-group>
<pre-acknowledge>true</pre-acknowledge>
<connection-ttl>2345</connection-ttl>
- <discovery-initial-wait-timeout>5464</discovery-initial-wait-timeout>
<failover-on-initial-connection>true</failover-on-initial-connection>
<failover-on-server-shutdown>false</failover-on-server-shutdown>
<connection-load-balancing-policy-class-name>FooClass</connection-load-balancing-policy-class-name>
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2010-11-30 03:18:12 UTC (rev 9947)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/jms/server/JMSServerDeployerTest.java 2010-11-30 03:23:48 UTC (rev 9948)
@@ -314,7 +314,6 @@
assertEquals("172.16.8.10", cf.getLocalBindAddress());
assertEquals(12345, cf.getDiscoveryPort());
assertEquals(5432, cf.getDiscoveryRefreshTimeout());
- assertEquals(5464, cf.getDiscoveryInitialWaitTimeout());
}
for (String binding : queueBindings)
14 years
JBoss hornetq SVN: r9947 - branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/jms/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 22:18:12 -0500 (Mon, 29 Nov 2010)
New Revision: 9947
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
fixing jms tests
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-11-30 03:08:02 UTC (rev 9946)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-11-30 03:18:12 UTC (rev 9947)
@@ -757,6 +757,7 @@
configuration.setClientID(clientID);
configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod);
configuration.setConnectionTTL(connectionTTL);
+ configuration.setFactoryType(cfType);
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
@@ -782,7 +783,7 @@
configuration.setReconnectAttempts(reconnectAttempts);
configuration.setFailoverOnInitialConnection(failoverOnInitialConnection);
configuration.setGroupID(groupId);
- createConnectionFactory(true, configuration, jndiBindings);
+ createConnectionFactory(true, configuration, jndiBindings);
}
}
14 years
JBoss hornetq SVN: r9946 - in branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests: tools/container and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 22:08:02 -0500 (Mon, 29 Nov 2010)
New Revision: 9946
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
Log:
fixing tests
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-11-30 03:03:12 UTC (rev 9945)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-11-30 03:08:02 UTC (rev 9946)
@@ -26,7 +26,6 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.impl.JMSFactoryType;
-import org.hornetq.tests.util.RandomUtil;
/**
* Safeguards for previously detected TCK failures.
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-11-30 03:03:12 UTC (rev 9945)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-11-30 03:08:02 UTC (rev 9946)
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
@@ -39,7 +40,6 @@
import org.hornetq.integration.bootstrap.HornetQBootstrapServer;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSFactoryType;
-import org.hornetq.tests.util.RandomUtil;
import org.jboss.kernel.plugins.config.property.PropertyKernelConfig;
/**
@@ -338,7 +338,7 @@
ArrayList<String> connectors = new ArrayList<String>();
for (TransportConfiguration tnsp : connectorConfigs)
{
- String name = RandomUtil.randomString();
+ String name = UUID.randomUUID().toString();
getHornetQServer().getConfiguration().getConnectorConfigurations().put(name, tnsp);
14 years
JBoss hornetq SVN: r9945 - branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 22:03:12 -0500 (Mon, 29 Nov 2010)
New Revision: 9945
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java
Log:
fixing tests
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java 2010-11-29 23:20:16 UTC (rev 9944)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/jms-tests/src/org/hornetq/jms/tests/HornetQServerTestCase.java 2010-11-30 03:03:12 UTC (rev 9945)
@@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -43,7 +44,6 @@
import org.hornetq.jms.tests.tools.ServerManagement;
import org.hornetq.jms.tests.tools.container.Server;
import org.hornetq.jms.tests.util.ProxyAssertSupport;
-import org.hornetq.tests.util.RandomUtil;
/**
* @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
@@ -75,7 +75,7 @@
ArrayList<String> connectors = new ArrayList<String>();
for (TransportConfiguration tnsp : connectorConfigs)
{
- String name = RandomUtil.randomString();
+ String name = UUID.randomUUID().toString();
getJmsServer().getConfiguration().getConnectorConfigurations().put(name, tnsp);
14 years