JBoss hornetq SVN: r9453 - trunk/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-22 00:54:11 -0400 (Thu, 22 Jul 2010)
New Revision: 9453
Modified:
trunk/src/config/common/hornetq-version.properties
Log:
Increased release
Modified: trunk/src/config/common/hornetq-version.properties
===================================================================
--- trunk/src/config/common/hornetq-version.properties 2010-07-22 04:14:06 UTC (rev 9452)
+++ trunk/src/config/common/hornetq-version.properties 2010-07-22 04:54:11 UTC (rev 9453)
@@ -1,8 +1,8 @@
-hornetq.version.versionName=Stripey
+hornetq.version.versionName=Colmeia
hornetq.version.majorVersion=2
hornetq.version.minorVersion=1
-hornetq.version.microVersion=1
-hornetq.version.incrementingVersion=119
+hornetq.version.microVersion=2
+hornetq.version.incrementingVersion=120
hornetq.version.versionSuffix=Final
hornetq.version.versionTag=Final
hornetq.netty.version=(a)NETTY.VERSION@
13 years, 11 months
JBoss hornetq SVN: r9452 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-22 00:14:06 -0400 (Thu, 22 Jul 2010)
New Revision: 9452
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
minor tweak
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-22 04:03:53 UTC (rev 9451)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-22 04:14:06 UTC (rev 9452)
@@ -2176,7 +2176,7 @@
{
cleanUp(cleanupFile);
}
- catch (Exception e)
+ catch (Throwable e)
{
JournalImpl.log.warn(e.getMessage(), e);
}
@@ -2338,7 +2338,7 @@
{
JournalImpl.this.compact();
}
- catch (Exception e)
+ catch (Throwable e)
{
JournalImpl.log.error(e.getMessage(), e);
}
13 years, 11 months
JBoss hornetq SVN: r9451 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-22 00:03:53 -0400 (Thu, 22 Jul 2010)
New Revision: 9451
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - fixes
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-21 23:13:23 UTC (rev 9450)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-22 04:03:53 UTC (rev 9451)
@@ -436,6 +436,16 @@
// This is what supports us from not re-filling the whole file
int readFileId = wholeFileBuffer.getInt();
+
+ // This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+ if (readFileId != file.getRecordID())
+ {
+ wholeFileBuffer.position(pos + 1);
+ continue;
+ }
+
+
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -602,17 +612,6 @@
continue;
}
- // This record is from a previous file-usage. The file was
- // reused and we need to ignore this record
- if (readFileId != file.getRecordID())
- {
- // If a file has damaged pendingTransactions, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile(file);
-
- continue;
- }
-
wholeFileBuffer.position(oldPos);
// At this point everything is checked. So we relax and just load
13 years, 11 months
JBoss hornetq SVN: r9450 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-21 19:13:23 -0400 (Wed, 21 Jul 2010)
New Revision: 9450
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - fix on isValidSize during read
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-21 20:39:57 UTC (rev 9449)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-21 23:13:23 UTC (rev 9450)
@@ -490,21 +490,27 @@
variableSize = wholeFileBuffer.getInt();
+ if (recordType != JournalImpl.DELETE_RECORD_TX)
+ {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1))
+ {
+ wholeFileBuffer.position(pos +1);
+ continue;
+ }
+
+ userRecordType = wholeFileBuffer.get();
+ }
+
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
{
wholeFileBuffer.position(pos + 1);
continue;
}
- if (recordType != JournalImpl.DELETE_RECORD_TX)
- {
- userRecordType = wholeFileBuffer.get();
- }
-
record = new byte[variableSize];
- wholeFileBuffer.get(record);
- }
+ wholeFileBuffer.get(record);
+ }
// Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
// currentFile
@@ -3416,6 +3422,7 @@
else
{
final int position = bufferPos + size;
+
return position > fileSize || position < 0;
}
13 years, 11 months
JBoss hornetq SVN: r9449 - trunk/native/bin.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-21 16:39:57 -0400 (Wed, 21 Jul 2010)
New Revision: 9449
Modified:
trunk/native/bin/libHornetQAIO64.so
Log:
64 bits compilation
Modified: trunk/native/bin/libHornetQAIO64.so
===================================================================
(Binary files differ)
13 years, 11 months
JBoss hornetq SVN: r9448 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/xa.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-07-21 09:48:50 -0400 (Wed, 21 Jul 2010)
New Revision: 9448
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
Log:
fix xa test
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2010-07-21 13:39:48 UTC (rev 9447)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2010-07-21 13:48:50 UTC (rev 9448)
@@ -59,6 +59,8 @@
private final SimpleString atestq = new SimpleString("atestq");
+ private ServerLocator locator;
+
@Override
protected void setUp() throws Exception
{
@@ -1261,8 +1263,8 @@
private void createClients(final boolean createQueue, final boolean commitACKs) throws Exception
{
- ServerLocator locator = createInVMNonHALocator();
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator = createInVMNonHALocator();
+ sessionFactory = locator.createSessionFactory();
clientSession = sessionFactory.createSession(true, false, commitACKs);
if (createQueue)
{
13 years, 11 months
JBoss hornetq SVN: r9447 - branches/2_2_0_HA_Improvements/examples/jms/static-clustered-queue/server1.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-21 09:39:48 -0400 (Wed, 21 Jul 2010)
New Revision: 9447
Modified:
branches/2_2_0_HA_Improvements/examples/jms/static-clustered-queue/server1/hornetq-configuration.xml
Log:
HA refactoring
* remove bogus local configuration modification
Modified: branches/2_2_0_HA_Improvements/examples/jms/static-clustered-queue/server1/hornetq-configuration.xml
===================================================================
--- branches/2_2_0_HA_Improvements/examples/jms/static-clustered-queue/server1/hornetq-configuration.xml 2010-07-21 11:40:32 UTC (rev 9446)
+++ branches/2_2_0_HA_Improvements/examples/jms/static-clustered-queue/server1/hornetq-configuration.xml 2010-07-21 13:39:48 UTC (rev 9447)
@@ -3,9 +3,6 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<clustered>true</clustered>
- <bindings-directory>/Users/jmesnil/Desktop/server1/bindings</bindings-directory>
- <journal-directory>/Users/jmesnil/Desktop/server1/journal</journal-directory>
-
<!-- Connectors -->
<connectors>
<connector name="netty-connector">
13 years, 11 months
JBoss hornetq SVN: r9446 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-07-21 07:40:32 -0400 (Wed, 21 Jul 2010)
New Revision: 9446
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient2.java
Log:
test fixes
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2010-07-21 10:32:52 UTC (rev 9445)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2010-07-21 11:40:32 UTC (rev 9446)
@@ -63,7 +63,7 @@
public void testCrashClient() throws Exception
{
- assertActiveConnections(0);
+ assertActiveConnections(1);
// spawn a JVM that creates a Core client, which sends a message
Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
@@ -76,7 +76,7 @@
session.start();
// receive a message from the queue
- Message messageFromClient = consumer.receive(5000);
+ Message messageFromClient = consumer.receive(500000);
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
@@ -109,14 +109,14 @@
Thread.sleep(2 * ClientCrashTest.CONNECTION_TTL);
// the crash must have been detected and the resources cleaned up
- assertActiveConnections(0);
+ assertActiveConnections(1);
// FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
assertActiveSession(0);
}
public void testCrashClient2() throws Exception
{
- assertActiveConnections(0);
+ assertActiveConnections(1);
ClientSession session = sf.createSession(false, true, true);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java 2010-07-21 10:32:52 UTC (rev 9445)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient.java 2010-07-21 11:40:32 UTC (rev 9446)
@@ -45,10 +45,10 @@
CrashClient.log.debug("args = " + Arrays.asList(args));
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ locator.setClientFailureCheckPeriod(ClientCrashTest.PING_PERIOD);
+ locator.setConnectionTTL(ClientCrashTest.CONNECTION_TTL);
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setClientFailureCheckPeriod(ClientCrashTest.PING_PERIOD);
- sf.getServerLocator().setConnectionTTL(ClientCrashTest.CONNECTION_TTL);
ClientSession session = sf.createSession(false, true, true);
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient2.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient2.java 2010-07-21 10:32:52 UTC (rev 9445)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/clientcrash/CrashClient2.java 2010-07-21 11:40:32 UTC (rev 9446)
@@ -44,10 +44,10 @@
log.debug("args = " + Arrays.asList(args));
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ locator.setClientFailureCheckPeriod(ClientCrashTest.PING_PERIOD);
+ locator.setConnectionTTL(ClientCrashTest.CONNECTION_TTL);
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setClientFailureCheckPeriod(ClientCrashTest.PING_PERIOD);
- sf.getServerLocator().setConnectionTTL(ClientCrashTest.CONNECTION_TTL);
ClientSession session = sf.createSession(true, true, 1000000);
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
13 years, 11 months
JBoss hornetq SVN: r9445 - in branches/2_2_0_HA_Improvements/examples/jms/clustered-topic: server1 and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-21 06:32:52 -0400 (Wed, 21 Jul 2010)
New Revision: 9445
Modified:
branches/2_2_0_HA_Improvements/examples/jms/clustered-topic/server0/hornetq-configuration.xml
branches/2_2_0_HA_Improvements/examples/jms/clustered-topic/server1/hornetq-configuration.xml
Log:
HA refactoring
* fix clustered topic example's configuration
Modified: branches/2_2_0_HA_Improvements/examples/jms/clustered-topic/server0/hornetq-configuration.xml
===================================================================
--- branches/2_2_0_HA_Improvements/examples/jms/clustered-topic/server0/hornetq-configuration.xml 2010-07-21 10:17:33 UTC (rev 9444)
+++ branches/2_2_0_HA_Improvements/examples/jms/clustered-topic/server0/hornetq-configuration.xml 2010-07-21 10:32:52 UTC (rev 9445)
@@ -27,7 +27,7 @@
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<broadcast-period>100</broadcast-period>
- <connector-ref connector-name="netty-connector"/>
+ <connector-ref>netty-connector</connector-ref>
</broadcast-group>
</broadcast-groups>
@@ -42,6 +42,7 @@
<cluster-connections>
<cluster-connection name="my-cluster">
<address>jms</address>
+ <connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>true</forward-when-no-consumers>
Modified: branches/2_2_0_HA_Improvements/examples/jms/clustered-topic/server1/hornetq-configuration.xml
===================================================================
--- branches/2_2_0_HA_Improvements/examples/jms/clustered-topic/server1/hornetq-configuration.xml 2010-07-21 10:17:33 UTC (rev 9444)
+++ branches/2_2_0_HA_Improvements/examples/jms/clustered-topic/server1/hornetq-configuration.xml 2010-07-21 10:32:52 UTC (rev 9445)
@@ -25,7 +25,7 @@
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<broadcast-period>100</broadcast-period>
- <connector-ref connector-name="netty-connector"/>
+ <connector-ref>netty-connector</connector-ref>
</broadcast-group>
</broadcast-groups>
@@ -40,6 +40,7 @@
<cluster-connections>
<cluster-connection name="my-cluster">
<address>jms</address>
+ <connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>true</forward-when-no-consumers>
13 years, 11 months
JBoss hornetq SVN: r9444 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq: core/client/impl and 5 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-21 06:17:33 -0400 (Wed, 21 Jul 2010)
New Revision: 9444
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA refactoring
* fix cluster code using discovery group (example/jms/clustered-queue passes again)
* ServerLocator and ClusterManager no longer implement ClusterTopologyListener as they /notify/ the listeners
* DiscoveryEntry keeps track of the nodeID to be able to notify listeners of the node which has joined the cluster
* in ServerLocatorImpl.connectorsChanged(), notify the listeners that a node is UP
/!\ we must also keep track of previous entries to notify when a node is DOWN
* in HornetQServerImpl.initialisePart2(), starts the remoting service *before* the cluster manager. Otherwise the
bridges created when the cluster is formed will not be able to connect to the server and the cluster will not be formed
/!\ this is a modification with lots of implications... not sure about this one
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
@@ -660,5 +661,9 @@
void unregisterTopologyListener(ClusterTopologyListener listener);
+ void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+
+ void notifyNodeDown(String nodeID);
+
boolean isHA();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -1156,11 +1156,11 @@
if (topMessage.isExit())
{
- serverLocator.nodeDown(topMessage.getNodeID());
+ serverLocator.notifyNodeDown(topMessage.getNodeID());
}
else
{
- serverLocator.nodeUP(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ serverLocator.notifyNodeUP(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -297,6 +297,16 @@
discoveryGroup.start();
}
+
+ if (initialConnectors != null)
+ {
+ System.out.println(">>>>>>>> Static initial connectors = " + Arrays.asList(initialConnectors));
+ for (int i = 0; i < initialConnectors.length; i++)
+ {
+ // FIXME and now what do I do?
+ TransportConfiguration connector = initialConnectors[i];
+ }
+ }
readOnly = true;
}
@@ -1031,7 +1041,7 @@
closed = true;
}
- public synchronized void nodeDown(final String nodeID)
+ public synchronized void notifyNodeDown(final String nodeID)
{
if (!ha)
{
@@ -1059,7 +1069,7 @@
}
}
- public synchronized void nodeUP(final String nodeID,
+ public synchronized void notifyNodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1114,9 +1124,11 @@
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
+
+ notifyNodeUP(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true);
}
- System.out.println(">>>>>>>> Initial connectors = " + Arrays.asList(initialConnectors));
+ System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
}
public synchronized void factoryClosed(final ClientSessionFactory factory)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
/**
@@ -25,7 +24,7 @@
*
*
*/
-public interface ServerLocatorInternal extends ServerLocator, ClusterTopologyListener
+public interface ServerLocatorInternal extends ServerLocator
{
void start() throws Exception;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -24,17 +24,23 @@
*/
public class DiscoveryEntry
{
+ private final String nodeID;
private final TransportConfiguration connector;
-
private final long lastUpdate;
- public DiscoveryEntry(final TransportConfiguration connector, final long lastUpdate)
+
+ public DiscoveryEntry(final String nodeID, final TransportConfiguration connector, final long lastUpdate)
{
+ this.nodeID = nodeID;
this.connector = connector;
-
this.lastUpdate = lastUpdate;
}
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
public TransportConfiguration getConnector()
{
return connector;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -330,7 +330,7 @@
connector.decode(buffer);
- DiscoveryEntry entry = new DiscoveryEntry(connector, System.currentTimeMillis());
+ DiscoveryEntry entry = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
DiscoveryEntry oldVal = connectors.put(originatingNodeID, entry);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -16,7 +16,9 @@
import java.util.Map;
import java.util.Set;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
@@ -28,7 +30,7 @@
* Created 18 Nov 2008 09:23:26
*
*/
-public interface ClusterManager extends HornetQComponent, ClusterTopologyListener
+public interface ClusterManager extends HornetQComponent
{
Map<String, Bridge> getBridges();
@@ -38,6 +40,10 @@
Set<BroadcastGroup> getBroadcastGroups();
+ void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+
+ void notifyNodeDown(String nodeID);
+
void registerTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
void unregisterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -490,7 +490,6 @@
try
{
csf = createSessionFactory();
-
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
@@ -501,9 +500,7 @@
}
producer = session.createProducer();
-
session.addFailureListener(BridgeImpl.this);
-
session.setSendAcknowledgementHandler(BridgeImpl.this);
afterConnect();
@@ -511,7 +508,6 @@
active = true;
queue.addConsumer(BridgeImpl.this);
-
queue.deliverAsync();
BridgeImpl.log.info("Bridge " + name + " is connected to its destination");
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -265,8 +265,6 @@
public synchronized void nodeDown(final String nodeID)
{
- server.getClusterManager().nodeDown(nodeID);
-
//Remove the flow record for that node
MessageFlowRecord record = records.remove(nodeID);
@@ -290,8 +288,6 @@
{
try
{
- server.getClusterManager().nodeUP(nodeID, connectorPair, false);
-
MessageFlowRecord record = records.get(nodeID);
if (record == null)
@@ -307,25 +303,23 @@
if (queueBinding != null)
{
queue = (Queue)queueBinding.getBindable();
-
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
else
{
// Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
// actually routed to at that address though
-
queue = server.createQueue(queueName, queueName, null, true, false);
-
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
+
+ createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
else
{
- if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
- {
- // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
- }
+ // FIXME apple and orange comparison. I don't understand it...
+ //if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
+ // {
+ // // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
+ //}
}
}
catch (Exception e)
@@ -340,6 +334,7 @@
final Queue queue,
final boolean start) throws Exception
{
+ System.out.println("ClusterConnectionImpl.createNewRecord() " + connector);
MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
Bridge bridge = new ClusterConnectionBridge(serverLocator,
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -325,7 +325,7 @@
}
- public synchronized void nodeDown(final String nodeID)
+ public synchronized void notifyNodeDown(final String nodeID)
{
topology.remove(nodeID);
@@ -335,7 +335,7 @@
}
}
- public synchronized void nodeUP(final String nodeID,
+ public synchronized void notifyNodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -1579,14 +1579,19 @@
deploymentManager.start();
}
+ // We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
+ // it is activated
+
+ // FIXME -- I inverted the order to start the remoting service before the cluster manager.
+ // when the cluster manager is started, it will form a cluster -> other nodes will then create bridges
+ // to connect to this server. If the remoting service is not started before, the connection will fail
+ // and the cluster will not be formed...
+ remotingService.start();
+
clusterManager.start();
initialised = true;
- // We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
- // it is activated
-
- remotingService.start();
}
/**
13 years, 11 months