JBoss hornetq SVN: r9944 - branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 18:20:16 -0500 (Mon, 29 Nov 2010)
New Revision: 9944
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
Log:
Changing test to fail instead of hang case messages are not received
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-11-29 23:13:07 UTC (rev 9943)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-11-29 23:20:16 UTC (rev 9944)
@@ -145,7 +145,9 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message2 = consumer.receive();
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
Assert.assertEquals("aardvarks", message2.getBodyBuffer().readString());
13 years, 5 months
JBoss hornetq SVN: r9943 - in branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover: server0 and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 18:13:07 -0500 (Mon, 29 Nov 2010)
New Revision: 9943
Modified:
branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover/
branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover/server0/
Log:
svn:ignores
Property changes on: branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover
___________________________________________________________________
Name: svn:ignore
+ build
Property changes on: branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover/server0
___________________________________________________________________
Name: svn:ignore
+ KILL_ME
data
13 years, 5 months
JBoss hornetq SVN: r9942 - branches/2_2_0_HA_Improvements_preMerge/tests/config.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 18:00:33 -0500 (Mon, 29 Nov 2010)
New Revision: 9942
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
Log:
fixing test
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-11-29 22:34:59 UTC (rev 9941)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-11-29 23:00:33 UTC (rev 9942)
@@ -29,7 +29,6 @@
<auto-group>false</auto-group>
<pre-acknowledge>true</pre-acknowledge>
<connection-ttl>2345</connection-ttl>
- <discovery-initial-wait-timeout>678</discovery-initial-wait-timeout>
<failover-on-initial-connection>true</failover-on-initial-connection>
<failover-on-server-shutdown>false</failover-on-server-shutdown>
<connection-load-balancing-policy-class-name>FooClass</connection-load-balancing-policy-class-name>
13 years, 5 months
JBoss hornetq SVN: r9941 - branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 17:34:59 -0500 (Mon, 29 Nov 2010)
New Revision: 9941
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
tweak
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-29 19:20:34 UTC (rev 9940)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-29 22:34:59 UTC (rev 9941)
@@ -1109,7 +1109,12 @@
}
// To recover positions on Iterators
- pagingManager.processReload();
+ if (pagingManager != null)
+ {
+ // it could be null on certain tests that are not dealing with paging
+ // This could also be the case in certain embedded conditions
+ pagingManager.processReload();
+ }
if (perfBlastPages != -1)
{
13 years, 5 months
JBoss hornetq SVN: r9940 - branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover/src/org/hornetq/jms/example.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 14:20:34 -0500 (Mon, 29 Nov 2010)
New Revision: 9940
Modified:
branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover/src/org/hornetq/jms/example/MultipleFailoverExample.java
Log:
organizing imports
Modified: branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover/src/org/hornetq/jms/example/MultipleFailoverExample.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover/src/org/hornetq/jms/example/MultipleFailoverExample.java 2010-11-29 19:07:41 UTC (rev 9939)
+++ branches/2_2_0_HA_Improvements_preMerge/examples/jms/multiple-failover/src/org/hornetq/jms/example/MultipleFailoverExample.java 2010-11-29 19:20:34 UTC (rev 9940)
@@ -13,12 +13,18 @@
package org.hornetq.jms.example;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
import org.hornetq.common.example.HornetQExample;
-import org.hornetq.jms.client.HornetQConnection;
-import javax.jms.*;
-import javax.naming.InitialContext;
-
/**
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
* Date: Nov 4, 2010
13 years, 5 months
JBoss hornetq SVN: r9939 - branches/2_2_0_HA_Improvements_preMerge/examples/common/src/org/hornetq/common/example.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-29 14:07:41 -0500 (Mon, 29 Nov 2010)
New Revision: 9939
Modified:
branches/2_2_0_HA_Improvements_preMerge/examples/common/src/org/hornetq/common/example/HornetQExample.java
Log:
Adding sleep after kill
Modified: branches/2_2_0_HA_Improvements_preMerge/examples/common/src/org/hornetq/common/example/HornetQExample.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/examples/common/src/org/hornetq/common/example/HornetQExample.java 2010-11-29 17:04:17 UTC (rev 9938)
+++ branches/2_2_0_HA_Improvements_preMerge/examples/common/src/org/hornetq/common/example/HornetQExample.java 2010-11-29 19:07:41 UTC (rev 9939)
@@ -106,6 +106,9 @@
File file = new File("server" + id + "/KILL_ME");
file.createNewFile();
+
+ // Sleep longer than the KillChecker check period
+ Thread.sleep(1000);
}
protected void stopServer(final int id) throws Exception
13 years, 5 months
JBoss hornetq SVN: r9938 - in branches/2_2_0_HA_Improvements_preMerge: src/main/org/hornetq/core/client/impl and 9 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-29 12:04:17 -0500 (Mon, 29 Nov 2010)
New Revision: 9938
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
Log:
removed checking for source node when announcing backup as doesnt make sense + test fixes
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+ void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
void nodeDown(String nodeID);
}
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -1109,7 +1109,6 @@
{
TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),
- serverLocator.getNodeID(),
serverLocator.isBackup(),
config));
}
@@ -1244,7 +1243,6 @@
else
{
serverLocator.notifyNodeUp(topMessage.getNodeID(),
- topMessage.getSourceNodeID(),
topMessage.getPair(),
topMessage.isLast(),
topMessage.getDistance());
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -1128,7 +1128,6 @@
}
public synchronized void notifyNodeUp(final String nodeID,
- final String sourceNodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last,
final int distance)
@@ -1162,7 +1161,7 @@
for (ClusterTopologyListener listener : topologyListeners)
{
- listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, connectorPair, last, distance);
}
// Notify if waiting on getting topology
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -38,7 +38,7 @@
ClientSessionFactory connect() throws Exception;
- void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
void notifyNodeDown(String nodeID);
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/Topology.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/Topology.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -39,10 +39,16 @@
*/
private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
+ private boolean debug;
+
public synchronized boolean addMember(String nodeId, TopologyMember member)
{
boolean replaced = false;
TopologyMember currentMember = topology.get(nodeId);
+ if (debug)
+ {
+ //System.out.println("member.getConnector() = " + member.getConnector());
+ }
if(currentMember == null)
{
topology.put(nodeId, member);
@@ -60,6 +66,15 @@
currentMember.getConnector().b = member.getConnector().b;
replaced = true;
}
+
+ if(member.getConnector().a == null)
+ {
+ member.getConnector().a = currentMember.getConnector().a;
+ }
+ if(member.getConnector().b == null)
+ {
+ member.getConnector().b = currentMember.getConnector().b;
+ }
}
return replaced;
}
@@ -70,12 +85,12 @@
return (member != null);
}
- public synchronized void fireListeners(ClusterTopologyListener listener, String sourceNodeId)
+ public synchronized void fireListeners(ClusterTopologyListener listener)
{
int count = 0;
for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
{
- listener.nodeUP(entry.getKey(), sourceNodeId, entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+ listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
}
}
@@ -149,4 +164,9 @@
}
return null;
}
+
+ public void setDebug(boolean b)
+ {
+ debug = b;
+ }
}
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -111,16 +111,9 @@
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
{
- if(System.getProperty("foo") != null)
- {
- if(connectorPair.toString().contains("b=org-hornetq-core-remoting-impl-invm-InVMConnectorFactory?server-id=1"))
- {
- System.out.println("");
- }
- }
- channel0.send(new ClusterTopologyChangeMessage(nodeID, sourceNodeID, connectorPair, last, distance + 1));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last, distance + 1));
}
public void nodeDown(String nodeID)
@@ -154,7 +147,7 @@
{
pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), msg.getSourceNodeID(), pair, false, 1);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
}
}
});
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -34,8 +34,6 @@
private boolean exit;
private String nodeID;
-
- private String sourceNodeID;
private Pair<TransportConfiguration, TransportConfiguration> pair;
@@ -47,13 +45,11 @@
// Constructors --------------------------------------------------
- public ClusterTopologyChangeMessage(final String nodeID, String sourceNodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int distance)
+ public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int distance)
{
super(PacketImpl.CLUSTER_TOPOLOGY);
this.nodeID = nodeID;
-
- this.sourceNodeID = sourceNodeID;
this.pair = pair;
@@ -84,11 +80,6 @@
{
return nodeID;
}
-
- public String getSourceNodeID()
- {
- return sourceNodeID;
- }
public Pair<TransportConfiguration, TransportConfiguration> getPair()
{
@@ -122,7 +113,6 @@
buffer.writeString(nodeID);
if (!exit)
{
- buffer.writeString(sourceNodeID);
if (pair.a != null)
{
buffer.writeBoolean(true);
@@ -153,7 +143,6 @@
nodeID = buffer.readString();
if (!exit)
{
- sourceNodeID = buffer.readString();
boolean hasLive = buffer.readBoolean();
TransportConfiguration a;
if(hasLive)
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -36,19 +36,15 @@
private TransportConfiguration connector;
- private String sourceNodeID;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NodeAnnounceMessage(final String nodeID, final String sourceNodeID, final boolean backup, final TransportConfiguration tc)
+ public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
{
super(PacketImpl.NODE_ANNOUNCE);
this.nodeID = nodeID;
-
- this.sourceNodeID = sourceNodeID;
this.backup = backup;
@@ -76,11 +72,6 @@
return nodeID;
}
- public String getSourceNodeID()
- {
- return sourceNodeID;
- }
-
public boolean isBackup()
{
return backup;
@@ -96,7 +87,6 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
- buffer.writeString(sourceNodeID);
buffer.writeBoolean(backup);
connector.encode(buffer);
}
@@ -105,7 +95,6 @@
public void decodeRest(final HornetQBuffer buffer)
{
this.nodeID = buffer.readString();
- this.sourceNodeID = buffer.readString();
this.backup = buffer.readBoolean();
connector = new TransportConfiguration();
connector.decode(buffer);
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -50,7 +50,7 @@
void notifyNodeDown(String nodeID);
- void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
Topology getTopology();
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -343,23 +343,23 @@
}
public synchronized void nodeUP(final String nodeID,
- final String sourceNodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last,
final int distance)
{
// discard notifications about ourselves unless its from our backup
+
if (nodeID.equals(nodeUUID.toString()))
{
- if(sourceNodeID.equals(nodeUUID.toString()) && connectorPair.b != null)
+ if(connectorPair.b != null)
{
- server.getClusterManager().notifyNodeUp(nodeID, sourceNodeID, connectorPair, last, distance);
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
}
return;
}
// we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, sourceNodeID, connectorPair, last, distance);
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowsDirectConnectionsOnly && distance > 1 && !allowableConnections.contains(connectorPair.a))
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -251,26 +251,27 @@
}
public void notifyNodeUp(String nodeID,
- String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
{
boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+
if(!updated)
{
return;
}
+
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, connectorPair, last, distance);
}
if (distance < topology.nodes())
{
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, connectorPair, last, distance);
}
}
}
@@ -313,7 +314,7 @@
}
// We now need to send the current topology to the client
- topology.fireListeners(listener, nodeUUID.toString());
+ topology.fireListeners(listener);
}
public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener,
@@ -405,12 +406,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
}
}
@@ -478,12 +479,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
}
@@ -835,7 +836,7 @@
try
{
ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
- backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, connector));
+ backupSessionFactory.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeUUID.toString(), true, connector));
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -200,6 +200,7 @@
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
private volatile GroupingHandler groupingHandler;
+
private NodeManager nodeManager;
// Constructors
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -398,7 +398,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
{
if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -211,7 +211,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, String sourceNodeID,
+ public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -201,7 +201,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, String sourceNodeID,
+ public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
@@ -262,7 +262,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, String sourceNodeID,
+ public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
@@ -335,7 +335,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, String sourceNodeID,
+ public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
@@ -417,7 +417,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, String sourceNodeID,
+ public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -145,6 +145,8 @@
Assert.assertTrue(ok);
}
+
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-11-29 14:15:10 UTC (rev 9937)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-11-29 17:04:17 UTC (rev 9938)
@@ -303,7 +303,6 @@
}
public void nodeUP(String nodeID,
- String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
13 years, 5 months
JBoss hornetq SVN: r9937 - branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-11-29 09:15:10 -0500 (Mon, 29 Nov 2010)
New Revision: 9937
Added:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
Log:
added a zip class impl that directly use Deflater
Added: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java (rev 0)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-29 14:15:10 UTC (rev 9937)
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Deflater;
+
+/**
+ * A DeflaterReader
+ * The reader takes an inputstream and compress it.
+ *
+ *
+ */
+public class DeflaterReader
+{
+ private Deflater deflater = new Deflater();
+ private boolean isFinished = false;
+ private boolean compressDone = false;
+
+ private InputStream input;
+
+ public DeflaterReader(InputStream inData)
+ {
+ input = inData;
+ }
+
+ public int read(byte[] buffer) throws IOException
+ {
+ return read(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Try to fill the buffer with compressed bytes. Except the last effective read,
+ * this method always returns with a full buffer of compressed data.
+ *
+ * @param buffer the buffer to fill compressed bytes
+ * @return the number of bytes really filled, -1 indicates end.
+ * @throws IOException
+ */
+ public int read(byte[] buffer, int offset, int len) throws IOException
+ {
+ if (compressDone)
+ {
+ return -1;
+ }
+
+ //buffer for reading input stream
+ byte[] readBuffer = new byte[2 * len];
+
+ int n = 0;
+ int read = 0;
+
+ while (len > 0)
+ {
+ n = deflater.deflate(buffer, offset, len);
+ if (n == 0)
+ {
+ if (isFinished)
+ {
+ deflater.end();
+ compressDone = true;
+ break;
+ }
+ else if (deflater.needsInput())
+ {
+ System.err.println("need input so read input");
+ // read some data from inputstream
+ int m = input.read(readBuffer);
+ System.err.println("original data read: " + m);
+ if (m == -1)
+ {
+ System.err.println("no more original data, finish deflater, now offset " + offset + " len " + len);
+
+ deflater.finish();
+ isFinished = true;
+ }
+ else
+ {
+ deflater.setInput(readBuffer, 0, m);
+ }
+ }
+ else
+ {
+ deflater.finish();
+ isFinished = true;
+ }
+ }
+ else
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+
+ }
+ return read;
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+
+ DeflaterReader reader = new DeflaterReader(inputStream);
+
+ byte[] buffer = new byte[7];
+
+ int n = reader.read(buffer);
+
+ System.err.println("first read: " + n);
+
+ while (n != -1)
+ {
+ System.err.println("==>read n " + n + " values: " + getBytesString(buffer));
+ n = reader.read(buffer);
+ }
+
+ System.err.println("compressed.");
+
+ System.err.println("now verify");
+
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+ System.err.println("compress len: " + compressedDataLength);
+ System.err.println("commpress data: " + getBytesString(output));
+
+ }
+
+ static String getBytesString(byte[] array)
+ {
+ StringBuffer bf = new StringBuffer();
+ for (byte b : array)
+ {
+ int val = b & 0xFF;
+ bf.append(val + " ");
+ }
+ return bf.toString();
+ }
+
+}
13 years, 5 months
JBoss hornetq SVN: r9936 - in branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster: util and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-28 04:44:08 -0500 (Sun, 28 Nov 2010)
New Revision: 9936
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
Log:
test fix
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-11-27 12:09:17 UTC (rev 9935)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-11-28 09:44:08 UTC (rev 9936)
@@ -2023,7 +2023,7 @@
try
{
- //liveServer.getRemotingService().addInterceptor(interceptor);
+ liveServer.addInterceptor(interceptor);
session.commit();
}
@@ -2033,7 +2033,7 @@
{
// Ok - now we retry the commit after removing the interceptor
- //liveServer.getRemotingService().removeInterceptor(interceptor);
+ liveServer.removeInterceptor(interceptor);
try
{
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-11-27 12:09:17 UTC (rev 9935)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-11-28 09:44:08 UTC (rev 9936)
@@ -19,6 +19,7 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SessionFailureListener;
@@ -80,6 +81,16 @@
return started;
}
+ public void addInterceptor(Interceptor interceptor)
+ {
+ throw new UnsupportedOperationException("can't do this with a remote server");
+ }
+
+ public void removeInterceptor(Interceptor interceptor)
+ {
+ throw new UnsupportedOperationException("can't do this with a remote server");
+ }
+
public void setInitialised(boolean initialised)
{
this.initialised = initialised;
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-11-27 12:09:17 UTC (rev 9935)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-11-28 09:44:08 UTC (rev 9936)
@@ -19,6 +19,7 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.server.HornetQServer;
@@ -56,6 +57,16 @@
return server.isStarted();
}
+ public void addInterceptor(Interceptor interceptor)
+ {
+ server.getRemotingService().addInterceptor(interceptor);
+ }
+
+ public void removeInterceptor(Interceptor interceptor)
+ {
+ server.getRemotingService().removeInterceptor(interceptor);
+ }
+
public void start() throws Exception
{
server.start();
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2010-11-27 12:09:17 UTC (rev 9935)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2010-11-28 09:44:08 UTC (rev 9936)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.cluster.util;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.client.ClientSession;
/**
@@ -35,4 +36,8 @@
void destroy();
boolean isStarted();
+
+ void addInterceptor(Interceptor interceptor);
+
+ void removeInterceptor(Interceptor interceptor);
}
13 years, 5 months
JBoss hornetq SVN: r9935 - branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-27 07:09:17 -0500 (Sat, 27 Nov 2010)
New Revision: 9935
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
fixed test
Modified: branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-11-26 14:58:09 UTC (rev 9934)
+++ branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-11-27 12:09:17 UTC (rev 9935)
@@ -105,6 +105,7 @@
{
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
locator.setReconnectAttempts(-1);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
@@ -157,7 +158,7 @@
crash(session);
int retry = 0;
- while (received.size() != numMessages)
+ while (received.size() >= numMessages)
{
Thread.sleep(1000);
retry++;
@@ -166,8 +167,10 @@
break;
}
}
+ System.out.println("received.size() = " + received.size());
+ session.close();
- session.close();
+ sf.close();
Assert.assertTrue(retry <= 5);
13 years, 5 months