JBoss hornetq SVN: r10801 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-14 10:05:34 -0400 (Tue, 14 Jun 2011)
New Revision: 10801
Modified:
trunk/pom.xml
Log:
Revert JUnit version and document upgrade problem
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-06-14 13:39:07 UTC (rev 10800)
+++ trunk/pom.xml 2011-06-14 14:05:34 UTC (rev 10801)
@@ -301,7 +301,8 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.8.2</version>
+ <!-- There are newer versions of the JUnit but thet break our tests -->
+ <version>3.8.2</version>
</dependency>
<!--needed to compile the jms tests -->
<dependency>
13 years, 6 months
JBoss hornetq SVN: r10800 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-14 09:39:07 -0400 (Tue, 14 Jun 2011)
New Revision: 10800
Modified:
trunk/pom.xml
Log:
FIX BUILD: Add declaration for jboss-transaction-api
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-06-14 12:52:11 UTC (rev 10799)
+++ trunk/pom.xml 2011-06-14 13:39:07 UTC (rev 10800)
@@ -148,6 +148,11 @@
<artifactId>jboss-jms-api_1.1_spec</artifactId>
<version>1.0.0.Final</version>
</dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-transaction-api</artifactId>
+ <version>1.0.1.20070913080910</version>
+ </dependency>
<!-- needed to compile JavaEE examples-->
<dependency>
@@ -161,7 +166,7 @@
<version>1.0.0.Final</version>
</dependency>
<dependency>
- <groupId>org.jboss.javaee</groupId>
+ <groupId>org.jboss.javaee</groupId>
<artifactId>jboss-jms-api</artifactId>
<version>1.1.0.20070913080910</version>
</dependency>
@@ -247,6 +252,7 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
+ <!-- there is a new version of this JAR but it breaks our usage of it -->
<version>2.1.2</version>
</dependency>
<!--needed to compile the spring support-->
13 years, 6 months
JBoss hornetq SVN: r10799 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-14 08:52:11 -0400 (Tue, 14 Jun 2011)
New Revision: 10799
Modified:
trunk/pom.xml
Log:
revert twitter4j-core version upgrade
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-06-14 12:24:51 UTC (rev 10798)
+++ trunk/pom.xml 2011-06-14 12:52:11 UTC (rev 10799)
@@ -247,7 +247,7 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
- <version>2.2.3</version>
+ <version>2.1.2</version>
</dependency>
<!--needed to compile the spring support-->
<dependency>
13 years, 6 months
JBoss hornetq SVN: r10798 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-14 08:24:51 -0400 (Tue, 14 Jun 2011)
New Revision: 10798
Modified:
trunk/pom.xml
Log:
FIX THE BUILD: Add jboss-jms-api to the dependencies, and add a version number.
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-06-14 05:13:03 UTC (rev 10797)
+++ trunk/pom.xml 2011-06-14 12:24:51 UTC (rev 10798)
@@ -160,6 +160,11 @@
<artifactId>jboss-transaction-api_1.1_spec</artifactId>
<version>1.0.0.Final</version>
</dependency>
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-jms-api</artifactId>
+ <version>1.1.0.20070913080910</version>
+ </dependency>
<!--this specifically for the JMS Bridge-->
<dependency>
@@ -231,7 +236,7 @@
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>1.2.14</version>
+ <version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
@@ -242,7 +247,7 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
- <version>2.1.2</version>
+ <version>2.2.3</version>
</dependency>
<!--needed to compile the spring support-->
<dependency>
@@ -290,7 +295,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>3.8.2</version>
+ <version>4.8.2</version>
</dependency>
<!--needed to compile the jms tests -->
<dependency>
13 years, 6 months
JBoss hornetq SVN: r10797 - branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-14 01:13:03 -0400 (Tue, 14 Jun 2011)
New Revision: 10797
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
Adding TODO comment
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-14 05:08:28 UTC (rev 10796)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-14 05:13:03 UTC (rev 10797)
@@ -736,6 +736,11 @@
{
// We need to close the session outside of the lock,
// so any pending operation will be canceled right away
+
+ // TODO: Why closing the CSF will make a few clustering and failover tests to
+ // either deadlock or take forever on waiting
+ // locks
+ //csf.close();
csf = null;
if (session != null)
{
13 years, 6 months
JBoss hornetq SVN: r10796 - in branches/Branch_2_2_EAP-cluster-cleanup: src/main/org/hornetq/core/server/cluster/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-14 01:08:28 -0400 (Tue, 14 Jun 2011)
New Revision: 10796
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Cluster cleanup
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -406,10 +406,10 @@
public void causeExit()
{
- exitLoop = true;
synchronized (waitLock)
{
- waitLock.notify();
+ exitLoop = true;
+ waitLock.notifyAll();
}
}
@@ -420,7 +420,7 @@
return;
}
- // we need to stopthe factory from connecting if it is in the middle aof trying to failover before we get the lock
+ // we need to stop the factory from connecting if it is in the middle of trying to failover before we get the lock
causeExit();
synchronized (createSessionLock)
{
@@ -764,6 +764,8 @@
ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
sessionChannel.setHandler(handler);
+
+ log.info("Creating session " + session);
return new DelegatingSession(session);
}
@@ -889,13 +891,8 @@
synchronized (waitLock)
{
- while (true)
+ while (!exitLoop)
{
- if (exitLoop)
- {
- return;
- }
-
if (log.isDebugEnabled())
{
log.debug("Trying reconnection attempt " + count);
@@ -927,12 +924,12 @@
try
{
- waitLock.wait(interval);
+ waitLock.wait(interval);
}
catch (InterruptedException ignore)
{
}
-
+
// Exponential back-off
long newInterval = (long)(interval * retryIntervalMultiplier);
@@ -1283,6 +1280,21 @@
if (type == PacketImpl.DISCONNECT)
{
+// ClientSessionFactoryImpl.this.closed = true;
+//
+// for (ClientSessionInternal session : ClientSessionFactoryImpl.this.sessions)
+// {
+// try
+// {
+// log.info("cleanup session on Factory " + session);
+// session.cleanUp(false);
+// }
+// catch (Exception e)
+// {
+// log.warn("Error cleaning up session " + session, e);
+// }
+// }
+//
final DisconnectMessage msg = (DisconnectMessage)packet;
closeExecutor.execute(new Runnable()
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -843,8 +843,11 @@
{
if (closed)
{
+ log.info("Session was already closed, giving up now, this=" + this);
return;
}
+
+ log.info("Calling close on session " + this);
try
{
@@ -1607,6 +1610,10 @@
return remotingConnection;
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
@@ -1614,7 +1621,8 @@
{
buffer.append(entry.getKey() + "=" + entry.getValue() + ",");
}
- return "ClientSessionImpl::(" + buffer.toString() + ")";
+
+ return "ClientSessionImpl [name=" + name + ", username=" + username + ", closed=" + closed + " metaData=(" + buffer + ")]@" + Integer.toHexString(hashCode()) ;
}
// Protected
@@ -1776,13 +1784,25 @@
private void doCleanup(boolean failingOver)
{
remotingConnection.removeFailureListener(this);
+
+ log.info("calling cleanup on " + this);
synchronized (this)
{
closed = true;
channel.close();
+
+
+ log.info("Calling unblock on " + this);
+
+ // if the server is sending a disconnect
+ // any pending blocked operation could hang without this
+ channel.returnBlocking();
}
+
+
+ log.info("Cleaned up " + this);
sessionFactory.removeSession(this, failingOver);
}
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -578,4 +578,10 @@
{
return session.isCompressLargeMessages();
}
+
+ public String toString()
+ {
+ return "DelegatingSession [session=" + session + "]";
+ }
+
}
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -242,24 +242,33 @@
public void stop() throws Exception
{
- if (started)
- {
- executor.execute(new Runnable()
- {
- public void run()
- {
- // We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is
- // trying to connect to the target
- // server which isn't up in an infinite loop
- if (csf != null)
- {
- //csf.close();
- csf = null;
- }
- }
- });
- }
+ // TODO: Remove this during merge:
+ // If we close the csf, at the same time we could have the bridge calling close and proper cancellations (not just reseting as we used to do)
+ // we could have either Dead locks or very slow shutdowns on the testsuite.
+ // The solution I could find so far was to just leave the csf on
+
+ // TODO: Need to find a better way to close the CSF
+
+
+// if (started)
+// {
+// executor.execute(new Runnable()
+// {
+// public void run()
+// {
+// // We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is
+// // trying to connect to the target
+// // server which isn't up in an infinite loop
+// if (csf != null)
+// {
+// csf.close();
+// csf = null;
+// }
+// }
+// });
+// }
+//
log.info("Bridge " + this.name + " being stopped");
stopping = true;
@@ -590,7 +599,18 @@
csf = createSessionFactory();
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
+
+ try
+ {
+ session.addMetaData("Session-for-bridge", name.toString());
+ session.addMetaData("nodeUUID", nodeUUID.toString());
+ }
+ catch (Throwable dontCare)
+ {
+ // addMetaData here is just for debug purposes
+ }
+
if (forwardingAddress != null)
{
BindingQuery query = null;
@@ -661,6 +681,8 @@
}
catch (HornetQException e)
{
+ e.printStackTrace();
+ System.out.println("ex " + e);
if (csf != null)
{
csf.close();
@@ -707,14 +729,18 @@
private class StopRunnable implements Runnable
{
+ Exception created = new Exception ("Stop bridge called at for session = " + session);
public void run()
{
try
{
// We need to close the session outside of the lock,
// so any pending operation will be canceled right away
+ csf = null;
if (session != null)
{
+ log.info("Stopping bridge called at ", created);
+ log.info("Cleaning up session " + session);
session.close();
session.removeFailureListener(BridgeImpl.this);
}
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -149,7 +149,6 @@
{
if (flowRecord != null)
{
- // TODO: can I really remove this? nope
flowRecord.reset();
if (notifConsumer != null)
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -63,13 +63,11 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -159,6 +157,8 @@
private final MBeanServer mbeanServer;
private volatile boolean started;
+
+ private volatile boolean stopped;
private volatile SecurityStore securityStore;
@@ -353,6 +353,11 @@
nodeManager.startLiveNode();
+ if (stopped)
+ {
+ return;
+ }
+
initialisePart2();
log.info("Server is now live");
@@ -379,6 +384,8 @@
private class SharedStoreBackupActivation implements Activation
{
+
+ volatile boolean closed = false;
public void run()
{
try
@@ -397,6 +404,11 @@
configuration.setBackup(false);
+ if (stopped)
+ {
+ return;
+ }
+
initialisePart2();
clusterManager.activate();
@@ -480,6 +492,7 @@
backupActivationThread.interrupt();
+ // TODO: do we really need this?
Thread.sleep(1000);
}
@@ -536,6 +549,8 @@
public synchronized void start() throws Exception
{
+ stopped = false;
+
initialiseLogging();
checkJournalDirectory();
@@ -618,6 +633,7 @@
public void stop() throws Exception
{
+ stopped = true;
stop(configuration.isFailoverOnServerShutdown());
}
@@ -1463,7 +1479,13 @@
private void initialisePart2() throws Exception
{
// Load the journal and populate queues, transactions and caches in memory
-
+
+
+ if (stopped)
+ {
+ return;
+ }
+
pagingManager.reloadStores();
JournalLoadInformation[] journalInfo = loadJournals();
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -21,10 +21,15 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -1812,6 +1812,7 @@
* we need to wait a lil while between server start up to allow the server to communicate in some order.
* This is to avoid split brain on startup
* */
+ // TODO: Do we really need this?
Thread.sleep(500);
}
for (int node : nodes)
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -140,6 +140,7 @@
staticConnectors, false);
backupConfig.getClusterConfigurations().add(cccLive);
backupServer = createBackupServer();
+ backupServer.getServer().setIdentity("bkpIdentityServer");
liveConfig = super.createDefaultConfig();
liveConfig.getAcceptorConfigurations().clear();
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-06-14 05:08:28 UTC (rev 10796)
@@ -971,7 +971,15 @@
}
// We shutdown the global pools to give a better isolation between tests
- ServerLocatorImpl.clearThreadPools();
+ try
+ {
+ ServerLocatorImpl.clearThreadPools();
+ }
+ catch (Throwable e)
+ {
+ log.info(threadDump(e.getMessage()));
+ throw new RuntimeException (e.getMessage(), e);
+ }
}
protected byte[] autoEncode(final Object... args)
13 years, 6 months
JBoss hornetq SVN: r10795 - in branches/Branch_2_2_EAP-cluster-cleanup: tests/src/org/hornetq/tests/integration/cluster/restart and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-06-13 14:30:39 -0400 (Mon, 13 Jun 2011)
New Revision: 10795
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
Log:
cluster test fixes
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-13 12:02:50 UTC (rev 10794)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-13 18:30:39 UTC (rev 10795)
@@ -400,7 +400,7 @@
{
return HandleStatus.NO_MATCH;
}
-
+
synchronized (this)
{
if (!active)
@@ -518,37 +518,44 @@
active = false;
}
- try
- {
+
if (!session.getConnection().isDestroyed())
{
if (beforeReconnect)
{
- synchronized (this)
+ try {
+ synchronized (this)
+ {
+ log.debug(name + "::Connection is destroyed, active = false now");
+ }
+
+ cancelRefs();
+ }
+ catch (Exception e)
{
- log.debug(name + "::Connection is destroyed, active = false now");
+ BridgeImpl.log.error("Failed to cancel refs", e);
}
-
- cancelRefs();
}
else
{
- afterConnect();
+ try
+ {
+ afterConnect();
- log.debug(name + "::After reconnect, setting active=true now");
- active = true;
+ log.debug(name + "::After reconnect, setting active=true now");
+ active = true;
- if (queue != null)
+ if (queue != null)
+ {
+ queue.deliverAsync();
+ }
+ }
+ catch (Exception e)
{
- queue.deliverAsync();
+ BridgeImpl.log.error("Failed to call after connect", e);
}
}
}
- }
- catch (Exception e)
- {
- BridgeImpl.log.error("Failed to cancel refs", e);
- }
}
/* Hook for doing extra stuff after connection */
@@ -709,6 +716,7 @@
if (session != null)
{
session.close();
+ session.removeFailureListener(BridgeImpl.this);
}
synchronized (BridgeImpl.this)
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-13 12:02:50 UTC (rev 10794)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-13 18:30:39 UTC (rev 10795)
@@ -149,8 +149,8 @@
{
if (flowRecord != null)
{
- // TODO: can I really remove this?
- // flowRecord.reset();
+ // TODO: can I really remove this? nope
+ flowRecord.reset();
if (notifConsumer != null)
{
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-13 12:02:50 UTC (rev 10794)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-13 18:30:39 UTC (rev 10795)
@@ -544,7 +544,7 @@
}
else
{
- log.info("Reattaching nodeID=" + nodeID);
+ log.info("Reattaching nodeID=" + nodeID);
if (record.isClosed())
{
record.resume();
@@ -787,7 +787,6 @@
isClosed = false;
this.bridge = createBridge(this);
bridge.start();
- bridge.activate();
}
public boolean isClosed()
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2011-06-13 12:02:50 UTC (rev 10794)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2011-06-13 18:30:39 UTC (rev 10795)
@@ -69,10 +69,11 @@
printBindings(2);
- sendInRange(1, "queues.testaddress", 0, 10, false, null);
+ sendInRange(1, "queues.testaddress", 0, 10, true, null);
System.out.println("stopping******************************************************");
stopServers(0);
+ Thread.sleep(2000);
System.out.println("stopped******************************************************");
startServers(0);
@@ -87,7 +88,7 @@
sendInRange(1, "queues.testaddress", 10, 20, false, null);
- verifyReceiveAllInRange(10, 20, 0);
+ verifyReceiveAllInRange(0, 20, 0);
System.out.println("*****************************************************************************");
}
finally
13 years, 6 months
JBoss hornetq SVN: r10794 - in branches/Branch_2_2_EAP_export_tool: src/main/org/hornetq/core/persistence/tools/xmlmodel and 3 other directories.
by do-not-reply@jboss.org
Author: jicken
Date: 2011-06-13 08:02:50 -0400 (Mon, 13 Jun 2011)
New Revision: 10794
Added:
branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataJmsTest.java
Modified:
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ImportData.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java
branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
added secured server handling
added a JMS test to check for correct UserId
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ImportData.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ImportData.java 2011-06-13 02:59:47 UTC (rev 10793)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ImportData.java 2011-06-13 12:02:50 UTC (rev 10794)
@@ -25,13 +25,21 @@
public static void main(String args[])
{
- if (args.length != 3) {
- System.out.println("Usage Export: java org.hornetq.core.persistence.tools.ImportData <import-file-name> <hq-config-file> <connector-name>");
+ if (args.length != 3 && args.length != 5) {
+ System.out.println("Usage Export: java org.hornetq.core.persistence.tools.ImportData <import-file-name> " +
+ "<hq-config-file> <connector-name> [user] [password]");
System.exit(-1);
}
try
{
- ManageDataTool.importMessages(args[0], args[1], args[2]);
+ String user = null;
+ String password = null;
+
+ if (args.length == 5) {
+ user = args[3];
+ password = args[4];
+ }
+ ManageDataTool.importMessages(args[0], args[1], args[2], user, password);
}
catch (Exception e)
{
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-06-13 02:59:47 UTC (rev 10793)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/ManageDataTool.java 2011-06-13 12:02:50 UTC (rev 10794)
@@ -71,20 +71,19 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.Base64;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.UUID;
import org.xml.sax.InputSource;
import org.xml.sax.XMLReader;
/**
* @author <a href="mailto:torben@redhat.com">Torben Jaeger</a>
*/
-public class ManageDataTool extends JournalStorageManager
-{
+public class ManageDataTool extends JournalStorageManager {
private static final Logger log = Logger.getLogger(ManageDataTool.class);
private static long messageCounter = 0;
- private ManageDataTool(final Configuration config, final ExecutorFactory executorFactory)
- {
+ private ManageDataTool(final Configuration config, final ExecutorFactory executorFactory) {
super(config, executorFactory);
}
@@ -113,15 +112,13 @@
* @param messagesDir directory with the messages journal
* @throws Exception if something goes wrong
*/
- public static void exportMessages(final String bindingsDir, final String messagesDir, final OutputStream out) throws Exception
- {
+ public static void exportMessages(final String bindingsDir, final String messagesDir, final OutputStream out) throws Exception {
// Will use only default values. The load function should adapt to anything different
ConfigurationImpl defaultValues = new ConfigurationImpl();
defaultValues.setJournalDirectory(messagesDir);
- if (log.isInfoEnabled())
- {
+ if (log.isInfoEnabled()) {
log.info("Generating backup of original journal ...");
}
@@ -147,34 +144,28 @@
HornetQExport hqJournalExport = new HornetQExport();
- if (log.isInfoEnabled())
- {
+ if (log.isInfoEnabled()) {
log.info("Exporting bindings ...");
}
hqJournalExport.setQueues(exportBindings(bindingsJournal));
- if (log.isInfoEnabled())
- {
+ if (log.isInfoEnabled()) {
log.info("Exporting journal ...");
}
hqJournalExport.setMessages(exportMessages(messagesJournal));
- if (log.isInfoEnabled())
- {
+ if (log.isInfoEnabled()) {
log.info("Writing export file ...");
}
writeExportToFile(hqJournalExport, out);
- if (log.isInfoEnabled())
- {
+ if (log.isInfoEnabled()) {
log.info("Export done!");
}
}
- private static BindingsJournalType exportBindings(JournalImpl original) throws Exception
- {
- try
- {
+ private static BindingsJournalType exportBindings(JournalImpl original) throws Exception {
+ try {
final List<RecordInfo> records = new LinkedList<RecordInfo>();
final Set<Long> recordsToDelete = new HashSet<Long>();
@@ -182,9 +173,7 @@
return buildXmlBindings(records);
- }
- finally
- {
+ } finally {
original.stop();
}
}
@@ -196,11 +185,9 @@
* @return an XML representation of the journal
* @throws Exception if the journal is corrupt
*/
- private static MessagesExportType exportMessages(JournalImpl original) throws Exception
- {
+ private static MessagesExportType exportMessages(JournalImpl original) throws Exception {
- try
- {
+ try {
final List<RecordInfo> records = new LinkedList<RecordInfo>();
final Set<Long> recordsToDelete = new HashSet<Long>();
@@ -212,61 +199,48 @@
return journal;
- }
- finally
- {
+ } finally {
original.stop();
}
}
- private static void loadData(Journal original, final List<RecordInfo> records, final Set<Long> recordsToDelete) throws Exception
- {
+ private static void loadData(Journal original, final List<RecordInfo> records, final Set<Long> recordsToDelete) throws Exception {
original.start();
- original.load(new LoaderCallback()
- {
- private void logNotExportedRecord(long id, String reason)
- {
- if (log.isDebugEnabled())
- {
+ original.load(new LoaderCallback() {
+ private void logNotExportedRecord(long id, String reason) {
+ if (log.isDebugEnabled()) {
log.debug("Record " + id + " will not be exported due to " + reason + "!");
}
}
- private void addToNotExportedRecords(List<RecordInfo> list, String reason)
- {
- for (RecordInfo record : list)
- {
+ private void addToNotExportedRecords(List<RecordInfo> list, String reason) {
+ for (RecordInfo record : list) {
logNotExportedRecord(record.id, reason);
recordsToDelete.add(record.id);
}
}
- public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
- {
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction) {
addToNotExportedRecords(preparedTransaction.records, "prepared TX");
addToNotExportedRecords(preparedTransaction.recordsToDelete, "prepared TX");
}
- public void addRecord(RecordInfo info)
- {
+ public void addRecord(RecordInfo info) {
records.add(info);
}
- public void deleteRecord(long id)
- {
+ public void deleteRecord(long id) {
logNotExportedRecord(id, "DEL");
recordsToDelete.add(id);
}
- public void updateRecord(RecordInfo info)
- {
+ public void updateRecord(RecordInfo info) {
records.add(info);
}
- public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> _recordsToDelete)
- {
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> _recordsToDelete) {
addToNotExportedRecords(records, "failed TX");
addToNotExportedRecords(_recordsToDelete, "failed TX");
}
@@ -279,11 +253,10 @@
*
* @param hqJournalExport the root JAXB context
* @throws java.io.FileNotFoundException if the export file could not be created
- * @throws javax.xml.bind.JAXBException if an error occurs during marshalling
+ * @throws javax.xml.bind.JAXBException if an error occurs during marshalling
*/
private static void writeExportToFile(HornetQExport hqJournalExport, OutputStream os) throws JAXBException,
- FileNotFoundException
- {
+ FileNotFoundException {
// todo: http://jaxb.java.net/guide/Different_ways_of_marshalling.html#Marshalling...
@@ -302,23 +275,19 @@
* @param recordsToDelete records which are @see DeleteRecord
* @return @see BindingsJournalType or @see MessagesJournalType
*/
- private static MessagesExportType buildXmlMessages(List<RecordInfo> records, Set<Long> recordsToDelete)
- {
+ private static MessagesExportType buildXmlMessages(List<RecordInfo> records, Set<Long> recordsToDelete) {
MessagesExportType journalType = new MessagesExportType();
// Export Journal
- for (RecordInfo info : records)
- {
+ for (RecordInfo info : records) {
- if (recordsToDelete.contains(info.id))
- {
+ if (recordsToDelete.contains(info.id)) {
// deleted records are not exported
continue;
}
- switch (info.getUserRecordType())
- {
+ switch (info.getUserRecordType()) {
case JournalStorageManager.ADD_MESSAGE:
handleAddMessage(journalType, info);
@@ -333,12 +302,11 @@
break;
default:
- if (log.isDebugEnabled())
- {
+ if (log.isDebugEnabled()) {
log.debug(new StringBuilder().append("Record ")
- .append(info.id)
- .append(" is not exported!")
- .toString());
+ .append(info.id)
+ .append(" is not exported!")
+ .toString());
}
break;
}
@@ -347,9 +315,8 @@
return journalType;
}
- private static void handleAddMessage(MessagesExportType journalType, RecordInfo info)
- {
- final Message msg = ((MessageDescribe)JournalStorageManager.newObjectEncoding(info)).msg;
+ private static void handleAddMessage(MessagesExportType journalType, RecordInfo info) {
+ final Message msg = ((MessageDescribe) JournalStorageManager.newObjectEncoding(info)).msg;
MessageType messageType = new MessageType((ServerMessage) msg);
final HornetQBuffer bodyBuffer = msg.getBodyBuffer();
@@ -359,14 +326,12 @@
journalType.getMessage().add(messageType);
}
- private static void handleAddRef(MessagesExportType journalType, RecordInfo info)
- {
- JournalStorageManager.ReferenceDescribe ref = (JournalStorageManager.ReferenceDescribe)JournalStorageManager.newObjectEncoding(info);
+ private static void handleAddRef(MessagesExportType journalType, RecordInfo info) {
+ JournalStorageManager.ReferenceDescribe ref = (JournalStorageManager.ReferenceDescribe) JournalStorageManager.newObjectEncoding(info);
MessageType message = getMessage(journalType, info);
- if (message == null)
- {
+ if (message == null) {
throw new IllegalStateException("Journal is corrupt: AddRef without Add!");
}
@@ -377,21 +342,18 @@
}
- private static MessageType getMessage(MessagesExportType journalType, RecordInfo info)
- {
+ private static MessageType getMessage(MessagesExportType journalType, RecordInfo info) {
List<MessageType> messages = journalType.getMessage();
final int index = messages.indexOf(new MessageType(info.id));
return index == -1 ? null : messages.get(index);
}
- private static void handleAckRef(MessagesExportType journalType, RecordInfo info)
- {
- JournalStorageManager.AckDescribe ack = (JournalStorageManager.AckDescribe)JournalStorageManager.newObjectEncoding(info);
+ private static void handleAckRef(MessagesExportType journalType, RecordInfo info) {
+ JournalStorageManager.AckDescribe ack = (JournalStorageManager.AckDescribe) JournalStorageManager.newObjectEncoding(info);
MessageType message = getMessage(journalType, info);
- if (message == null)
- {
+ if (message == null) {
throw new IllegalStateException("Journal is corrupt: Ack without Add!");
}
@@ -402,29 +364,25 @@
}
- private static BindingsJournalType buildXmlBindings(List<RecordInfo> records)
- {
+ private static BindingsJournalType buildXmlBindings(List<RecordInfo> records) {
BindingsJournalType journalType = new BindingsJournalType();
// Export Journal
- for (RecordInfo info : records)
- {
+ for (RecordInfo info : records) {
- switch (info.getUserRecordType())
- {
+ switch (info.getUserRecordType()) {
case JournalStorageManager.QUEUE_BINDING_RECORD:
handleQBindingRecord(journalType, info);
break;
default:
- if (log.isDebugEnabled())
- {
+ if (log.isDebugEnabled()) {
log.debug(new StringBuilder().append("Record ")
- .append(info.id)
- .append(" is not exported!")
- .toString());
+ .append(info.id)
+ .append(" is not exported!")
+ .toString());
}
break;
@@ -435,8 +393,7 @@
return journalType;
}
- private static void handleQBindingRecord(BindingsJournalType journalType, RecordInfo info)
- {
+ private static void handleQBindingRecord(BindingsJournalType journalType, RecordInfo info) {
JournalStorageManager.PersistentQueueBindingEncoding persistentQueueBindingEncoding = JournalStorageManager.newBindingEncoding(info.id,
HornetQBuffers.wrappedBuffer(info.data));
@@ -455,43 +412,41 @@
/**
* Imports messages which were exported as a XML representation using @see #exportMessages(String,String,String).
- *
+ * <p/>
* This is done by starting an embedded HQ server which connects the queues transparently.
*
- * @param importFile full qualified name of the import file (/path/file)
+ * @param importFile full qualified name of the import file (/path/file)
* @param configurationFile the configuration URL of a running HQ server
- * @param connectorName the name of the connector used to connect to the HQ server
+ * @param connectorName the name of the connector used to connect to the HQ server
* @return the number of imported records
* @throws Exception if an error occurs while importing
*/
- public static long importMessages(final String importFile, final String configurationFile, final String connectorName) throws Exception
- {
+ public static long importMessages(final String importFile, final String configurationFile,
+ final String connectorName, final String user,
+ final String password) throws Exception {
FileConfiguration configuration = null;
- try
- {
+ try {
configuration = new FileConfiguration();
configuration.setConfigurationUrl(configurationFile);
configuration.start();
- }
- finally
- {
- if (configuration != null)
- {
+ } finally {
+ if (configuration != null) {
configuration.stop();
}
}
ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(configuration.getConnectorConfigurations()
- .get(connectorName));
+ .get(connectorName));
FileInputStream input = new FileInputStream(new File(importFile));
- return importMessages(input, serverLocator);
+ return importMessages(input, serverLocator, user, password);
}
public static long importMessages(final InputStream importFile,
- final ServerLocator serverLocator) throws Exception
- {
+ final ServerLocator serverLocator,
+ final String user,
+ final String password) throws Exception {
final JAXBContext context = JAXBContext.newInstance(HornetQExport.class);
final Unmarshaller unmarshaller = context.createUnmarshaller();
@@ -500,15 +455,24 @@
ClientSessionFactory sf = null;
- try
- {
+ try {
sf = serverLocator.createSessionFactory();
- final ClientSession coreSession = sf.createSession();
+ final ClientSession coreSession;
+ if (user != null && password != null) {
+ coreSession = sf.createSession(user,
+ password,
+ false,
+ true,
+ true,
+ serverLocator.isPreAcknowledge(),
+ serverLocator.getAckBatchSize());
+ } else {
+ coreSession = sf.createSession();
+ }
// message notification callback
final ClientSessionFactory finalSf = sf;
- final MessagesExportType.Listener listener = new MessagesExportType.Listener()
- {
+ final MessagesExportType.Listener listener = new MessagesExportType.Listener() {
public void handleMessage(MessageType message) throws Exception {
final List<QueueType> originalQueues = message.getAllPreviousBindings().getQueue();
final List<QueueRefType> originalBindings = message.getBindings().getQueue();
@@ -536,27 +500,29 @@
}
private long getNewQueueId(QueueType queue) throws Exception {
- final ClientSession requestorSession = finalSf.createSession(false, true, true);
+ final ClientSession requestorSession = finalSf.createSession(user,
+ password,
+ false,
+ true,
+ true,
+ serverLocator.isPreAcknowledge(),
+ serverLocator.getAckBatchSize());
requestorSession.start();
ClientRequestor requestor = new ClientRequestor(requestorSession, ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
ClientMessage m = requestorSession.createMessage(false);
ManagementHelper.putAttribute(m, ResourceNames.CORE_QUEUE + queue.getName(), "ID");
- try
- {
+ try {
final ClientMessage reply = requestor.request(m);
Object result = ManagementHelper.getResult(reply);
return ((Integer) result).longValue();
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
throw new IllegalStateException(e);
}
}
- private ClientMessage generateClientMessage(MessageType message) throws IOException
- {
+ private ClientMessage generateClientMessage(MessageType message) throws IOException {
ClientMessage clientMessage = coreSession.createMessage(message.getType(),
message.isDurable(),
message.getExpiration(),
@@ -571,10 +537,8 @@
// Routing
// only map Q if not already ACKed
- for (QueueRefType binding : bindings)
- {
- if (!ackedQueues.contains(new QueueRefType(binding.getId())))
- {
+ for (QueueRefType binding : bindings) {
+ if (!ackedQueues.contains(new QueueRefType(binding.getId()))) {
queues.add(queueMapping.get(binding.getId()));
}
}
@@ -582,41 +546,39 @@
// Properties
List<PropertyType> properties = message.getProperties().getProperty();
- for (PropertyType property : properties)
- {
+ for (PropertyType property : properties) {
clientMessage.putStringProperty(property.getKey(), property.getValue());
}
// Payload
- clientMessage.getBodyBuffer().writeBytes(Base64.decode(message.getPayload(), Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
+ clientMessage.getBodyBuffer().writeBytes(Base64.decode(message.getPayload(),
+ Base64.DONT_BREAK_LINES | Base64.URL_SAFE));
// UserID
- // todo: need to set?
+ // todo: need to set or regenerate?
+ String userId = message.getUserId();
+ if (userId != null && !"".equals(userId)) {
+ clientMessage.setUserID(new UUID(UUID.TYPE_TIME_BASED,
+ Base64.decode(userId,
+ Base64.DONT_BREAK_LINES | Base64.URL_SAFE)));
+ }
return clientMessage;
}
- private byte[] getByteArrayOf(List<Long> queues) throws IOException
- {
+ private byte[] getByteArrayOf(List<Long> queues) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- try
- {
- for (long id : queues)
- {
+ try {
+ for (long id : queues) {
dos.writeLong(id);
}
dos.flush();
- }
- finally
- {
- try
- {
+ } finally {
+ try {
dos.close();
bos.close();
- }
- catch (IOException e)
- {
+ } catch (IOException e) {
// could not close streams
}
}
@@ -629,24 +591,19 @@
messageCounter = 0;
// install the callback on all message instances
- unmarshaller.setListener(new Unmarshaller.Listener()
- {
- public void beforeUnmarshal(Object target, Object parent)
- {
- if (target instanceof MessagesExportType)
- {
- ((MessagesExportType)target).setMessageListener(listener);
+ unmarshaller.setListener(new Unmarshaller.Listener() {
+ public void beforeUnmarshal(Object target, Object parent) {
+ if (target instanceof MessagesExportType) {
+ ((MessagesExportType) target).setMessageListener(listener);
if (((MessagesExportType) target).getBindings() == null) {
- ((MessagesExportType)target).setOriginalBindings(((HornetQExport)parent).getQueues());
+ ((MessagesExportType) target).setOriginalBindings(((HornetQExport) parent).getQueues());
}
}
}
- public void afterUnmarshal(Object target, Object parent)
- {
- if (target instanceof MessagesExportType)
- {
- ((MessagesExportType)target).setMessageListener(null);
+ public void afterUnmarshal(Object target, Object parent) {
+ if (target instanceof MessagesExportType) {
+ ((MessagesExportType) target).setMessageListener(null);
messageCounter++;
}
}
@@ -658,11 +615,8 @@
reader.setContentHandler(unmarshaller.getUnmarshallerHandler());
reader.parse(new InputSource(importFile));
- }
- finally
- {
- if (sf != null)
- {
+ } finally {
+ if (sf != null) {
sf.close();
}
}
@@ -670,13 +624,11 @@
return messageCounter;
}
- private static Map<String, Long> loadBindings(final String bindingsDirectry) throws Exception
- {
+ private static Map<String, Long> loadBindings(final String bindingsDirectry) throws Exception {
final Map<String, Long> queueMapping;
JournalImpl bindingsJournal = null;
- try
- {
+ try {
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDirectry);
bindingsJournal = new JournalImpl(FileConfiguration.DEFAULT_JOURNAL_FILE_SIZE,
@@ -695,21 +647,16 @@
queueMapping = new HashMap<String, Long>();
- for (RecordInfo bindingRecord : committedRecordsB)
- {
- if (bindingRecord.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD)
- {
+ for (RecordInfo bindingRecord : committedRecordsB) {
+ if (bindingRecord.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD) {
// mapping old/new queue binding
PersistentQueueBindingEncoding queueBinding = JournalStorageManager.newBindingEncoding(bindingRecord.id,
HornetQBuffers.wrappedBuffer(bindingRecord.data));
queueMapping.put(queueBinding.getAddress().toString(), queueBinding.getId());
}
}
- }
- finally
- {
- if (bindingsJournal != null)
- {
+ } finally {
+ if (bindingsJournal != null) {
bindingsJournal.stop();
}
}
Modified: branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java 2011-06-13 02:59:47 UTC (rev 10793)
+++ branches/Branch_2_2_EAP_export_tool/src/main/org/hornetq/core/persistence/tools/xmlmodel/MessageType.java 2011-06-13 12:02:50 UTC (rev 10794)
@@ -9,6 +9,8 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.Base64;
+import org.hornetq.utils.TypedProperties;
/**
@@ -97,14 +99,20 @@
for (SimpleString propName : msg.getPropertyNames()) {
PropertyType propertyType = new PropertyType();
propertyType.setKey(propName.toString());
- propertyType.setValue(msg.getSimpleStringProperty(propName).toString());
+ // ie. _HQ_DUPL_ID cannot be handled as a SimpleString prop because it's a byte array
+ if (isInternalProperty(propName)) {
+ Object objectProperty = msg.getObjectProperty(propName);
+ propertyType.setValue(getObjectPropertyValue(objectProperty));
+ } else {
+ propertyType.setValue(msg.getSimpleStringProperty(propName).toString());
+ }
properties.getProperty().add(propertyType);
}
setProperties(properties);
setPriority(msg.getPriority());
if (msg.getUserID() != null)
{
- setUserId(msg.getUserID().toString());
+ setUserId(Base64.encodeBytes(msg.getUserID().asBytes(),Base64.DONT_BREAK_LINES|Base64.URL_SAFE));
}
else
{
@@ -112,6 +120,22 @@
}
}
+ private String getObjectPropertyValue(Object objectProperty) {
+ if (objectProperty instanceof SimpleString) {
+ return objectProperty.toString();
+ } else {
+ try {
+ return Base64.encodeBytes((byte[]) objectProperty, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ } catch (Exception e) {
+ return "could not decode property value";
+ }
+ }
+ }
+
+ private boolean isInternalProperty(SimpleString propName) {
+ return propName.startsWith(new SimpleString("_HQ"));
+ }
+
/**
* Ctor for equals.
*
Modified: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-06-13 02:59:47 UTC (rev 10793)
+++ branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-06-13 12:02:50 UTC (rev 10794)
@@ -76,8 +76,8 @@
public FailoverTest(final String name)
{
super(name);
+
}
-
public FailoverTest()
{
}
Added: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataJmsTest.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataJmsTest.java (rev 0)
+++ branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataJmsTest.java 2011-06-13 12:02:50 UTC (rev 10794)
@@ -0,0 +1,113 @@
+package org.hornetq.tests.integration.persistence;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.persistence.tools.ManageDataTool;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.JMSTestBase;
+
+import javax.jms.*;
+import javax.management.MBeanServerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: torben
+ * Date: 13.06.11
+ * Time: 12:57
+ * To change this template use File | Settings | File Templates.
+ */
+public class ExportDataJmsTest extends JMSTestBase {
+
+ public static final String QUEUE = "Q1";
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ jmsServer.stop();
+
+ server = createServer(true, false,true);
+
+ jmsServer = new JMSServerManagerImpl(server);
+ context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ jmsServer.createQueue(false, QUEUE, null, true, QUEUE);
+ cf = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ cf = null;
+
+ super.tearDown();
+ }
+
+ public void testExportImportJmsMessages() throws Exception {
+ Connection connection = cf.createConnection("a", "b");
+ connection.start();
+
+ Session jmsSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ Destination q = HornetQJMSClient.createQueue(QUEUE);
+
+ MessageProducer producer = jmsSession.createProducer(q);
+
+ TextMessage message = jmsSession.createTextMessage("foobar");
+ producer.send(message);
+
+ jmsSession.commit();
+ producer.close();
+ jmsSession.close();
+ connection.close();
+
+ jmsServer.stop();
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ ManageDataTool.exportMessages(getBindingsDir(), getJournalDir(), bout);
+
+ InputStream is = new ByteArrayInputStream(bout.toByteArray());
+
+ clearData();
+
+ server = createServer(true, false, true);
+ server.start();
+
+ ServerLocator locator = createNonHALocator(false);
+ ManageDataTool.importMessages(is, locator, "a", "b");
+
+ jmsServer = new JMSServerManagerImpl(server);
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ Connection newConnection = cf.createConnection("a", "b");
+ newConnection.start();
+
+ Session newJmsSession = newConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer consumer = newJmsSession.createConsumer(q);
+ Message received = consumer.receive(1000);
+ assertNotNull(received);
+ assertNotNull(received.getJMSMessageID());
+ assertEquals("foobar", ((TextMessage) received).getText());
+ received.acknowledge();
+
+ consumer.close();
+ newJmsSession.close();
+ newConnection.close();
+
+ }
+}
Property changes on: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataJmsTest.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: svn:keywords
+ Date Revision
Added: svn:eol-style
+ native
Modified: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java 2011-06-13 02:59:47 UTC (rev 10793)
+++ branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/integration/persistence/ExportDataTest.java 2011-06-13 12:02:50 UTC (rev 10794)
@@ -25,7 +25,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.persistence.tools.ManageDataTool;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -38,7 +37,6 @@
*/
public class ExportDataTest extends ServiceTestBase {
- protected static HornetQJMSConnectionFactory myCf;
private static final int MSG_SIZE = 1024;
public void testExportImport() throws Exception {
@@ -106,7 +104,7 @@
locator = createInVMNonHALocator();
- ManageDataTool.importMessages(is, locator);
+ ManageDataTool.importMessages(is, locator, "a", "b");
ClientSessionFactory csf = locator.createSessionFactory();
Modified: branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-06-13 02:59:47 UTC (rev 10793)
+++ branches/Branch_2_2_EAP_export_tool/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-06-13 12:02:50 UTC (rev 10794)
@@ -15,10 +15,7 @@
import java.io.File;
import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import javax.management.MBeanServer;
@@ -38,6 +35,7 @@
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.security.Role;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.NodeManager;
@@ -48,6 +46,7 @@
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
/**
*
@@ -272,11 +271,34 @@
return createServer(realFiles, false);
}
- protected HornetQServer createServer(final boolean realFiles, final boolean netty)
+ protected HornetQServer createServer(final boolean realFiles, final boolean netty) {
+ return createServer(realFiles, netty, false);
+ }
+
+ protected HornetQServer createServer(final boolean realFiles, final boolean netty, final boolean secured)
{
- return createServer(realFiles, createDefaultConfig(netty), -1, -1, new HashMap<String, AddressSettings>());
+ Configuration defaultConfig = createDefaultConfig(netty);
+ defaultConfig.setSecurityEnabled(secured);
+
+ HornetQServer server = createServer(realFiles, defaultConfig, -1, -1, new HashMap<String, AddressSettings>());
+ if (secured) {
+ installSecurity(server);
+ }
+ return server;
}
+ protected HornetQSecurityManager installSecurity(HornetQServer server)
+ {
+ HornetQSecurityManager securityManager = server.getSecurityManager();
+ securityManager.addUser("a", "b");
+ Role role = new Role("arole", true, true, true, true, true, true, true);
+ Set<Role> roles = new HashSet<Role>();
+ roles.add(role);
+ server.getSecurityRepository().addMatch("#", roles);
+ securityManager.addRole("a", "arole");
+ return securityManager;
+ }
+
protected HornetQServer createServer(final boolean realFiles, final Configuration configuration)
{
return createServer(realFiles, configuration, -1, -1, new HashMap<String, AddressSettings>());
13 years, 6 months
JBoss hornetq SVN: r10793 - in branches/Branch_2_2_EAP-cluster-cleanup: src/main/org/hornetq/core/client/impl and 15 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-12 22:59:47 -0400 (Sun, 12 Jun 2011)
New Revision: 10793
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/config/common/schema/hornetq-configuration.xsd
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/BridgeConfiguration.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
Cleanup branch
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/config/common/schema/hornetq-configuration.xsd 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/config/common/schema/hornetq-configuration.xsd 2011-06-13 02:59:47 UTC (rev 10793)
@@ -313,12 +313,18 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="transformer-class-name" type="xsd:string">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="check-period" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="connection-ttl" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="retry-interval" type="xsd:long">
- </xsd:element>
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="retry-interval-multiplier" type="xsd:double">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="reconnect-attempts" type="xsd:int">
- </xsd:element>
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="max-retry-interval" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="reconnect-attempts" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="failover-on-server-shutdown" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="use-duplicate-detection" type="xsd:boolean">
@@ -354,8 +360,18 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="1" name="connector-ref" type="xsd:string">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="check-period" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="connection-ttl" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="retry-interval" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="retry-interval-multiplier" type="xsd:double">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="max-retry-interval" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="reconnect-attempts" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="use-duplicate-detection" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="forward-when-no-consumers" type="xsd:boolean">
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -83,6 +83,8 @@
private static final long serialVersionUID = 2512460695662741413L;
private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
// Attributes
// -----------------------------------------------------------------------------------
@@ -497,6 +499,12 @@
return;
}
+
+ if (isTrace)
+ {
+ log.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
+ }
+
// We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages
callFailureListeners(me, false, false);
@@ -910,6 +918,13 @@
return;
}
+ if (isTrace)
+ {
+ log.trace("Waiting " + interval +
+ " milliseconds before next retry. RetryInterval=" + retryInterval +
+ " and multiplier = " + retryIntervalMultiplier);
+ }
+
try
{
waitLock.wait(interval);
@@ -1086,6 +1101,13 @@
}
}
}
+ else
+ {
+ if (isTrace)
+ {
+ log.trace("No Backup configured!");
+ }
+ }
}
catch (Exception e)
{
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -49,6 +49,8 @@
public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
{
private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private static final int SOCKET_TIMEOUT = 500;
@@ -375,6 +377,14 @@
if (changed)
{
+ if (isTrace)
+ {
+ log.trace("Connectors changed on Discovery:");
+ for (DiscoveryEntry connector : connectors.values())
+ {
+ log.trace(connector);
+ }
+ }
callListeners();
}
@@ -438,6 +448,10 @@
if (entry.getValue().getLastUpdate() + timeout <= now)
{
+ if (isTrace)
+ {
+ log.trace("Timed out node on discovery:" + entry.getValue());
+ }
iter.remove();
changed = true;
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/BridgeConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/BridgeConfiguration.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -61,17 +61,24 @@
private String password;
+ private final long connectionTTL;
+
+ private final long maxRetryInterval;
+
+
public BridgeConfiguration(final String name,
final String queueName,
final String forwardingAddress,
final String filterString,
final String transformerClassName,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
final long retryInterval,
+ final long maxRetryInterval,
final double retryIntervalMultiplier,
final int reconnectAttempts,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
- final long clientFailureCheckPeriod,
final List<String> staticConnectors,
final boolean ha,
final String user,
@@ -89,8 +96,10 @@
this.confirmationWindowSize = confirmationWindowSize;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
this.staticConnectors = staticConnectors;
- this.user = user;
+ this. user = user;
this.password = password;
+ this.connectionTTL = connectionTTL;
+ this.maxRetryInterval = maxRetryInterval;
discoveryGroupName = null;
}
@@ -99,12 +108,14 @@
final String forwardingAddress,
final String filterString,
final String transformerClassName,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
final long retryInterval,
+ final long maxRetryInterval,
final double retryIntervalMultiplier,
final int reconnectAttempts,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
- final long clientFailureCheckPeriod,
final String discoveryGroupName,
final boolean ha,
final String user,
@@ -126,6 +137,8 @@
this.ha = ha;
this.user = user;
this.password = password;
+ this.connectionTTL = connectionTTL;
+ this.maxRetryInterval = maxRetryInterval;
}
public String getName()
@@ -138,6 +151,22 @@
return queueName;
}
+ /**
+ * @return the connectionTTL
+ */
+ public long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ /**
+ * @return the maxRetryInterval
+ */
+ public long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
public String getForwardingAddress()
{
return forwardingAddress;
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -16,6 +16,8 @@
import java.io.Serializable;
import java.util.List;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+
/**
* A ClusterConnectionConfiguration
*
@@ -35,7 +37,17 @@
private final String connectorName;
+ private final long clientFailureCheckPeriod;
+
+ private final long connectionTTL;
+
private final long retryInterval;
+
+ private final double retryIntervalMultiplier;
+
+ private final long maxRetryInterval;
+
+ private final int reconnectAttempts;
private final boolean duplicateDetection;
@@ -50,11 +62,45 @@
private final int confirmationWindowSize;
private final boolean allowDirectConnectionsOnly;
+
+ public ClusterConnectionConfiguration(final String name,
+ final String address,
+ final String connectorName,
+ final long retryInterval,
+ final boolean duplicateDetection,
+ final boolean forwardWhenNoConsumers,
+ final int maxHops,
+ final int confirmationWindowSize,
+ final List<String> staticConnectors,
+ final boolean allowDirectConnectionsOnly)
+ {
+ this(name,
+ address,
+ connectorName,
+ ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+ ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+ retryInterval,
+ ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+ ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+ ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+ duplicateDetection,
+ forwardWhenNoConsumers,
+ maxHops,
+ confirmationWindowSize,
+ staticConnectors,
+ allowDirectConnectionsOnly);
+ }
+
public ClusterConnectionConfiguration(final String name,
final String address,
final String connectorName,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
@@ -65,7 +111,12 @@
this.name = name;
this.address = address;
this.connectorName = connectorName;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ this.connectionTTL = connectionTTL;
this.retryInterval = retryInterval;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetryInterval = maxRetryInterval;
+ this.reconnectAttempts = reconnectAttempts;
this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
@@ -75,6 +126,7 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
}
+
public ClusterConnectionConfiguration(final String name,
final String address,
final String connectorName,
@@ -85,10 +137,47 @@
final int confirmationWindowSize,
final String discoveryGroupName)
{
+ this(name,
+ address,
+ connectorName,
+ ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD,
+ ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL,
+ retryInterval,
+ ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
+ ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
+ ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
+ duplicateDetection,
+ forwardWhenNoConsumers,
+ maxHops,
+ confirmationWindowSize,
+ discoveryGroupName);
+ }
+
+
+ public ClusterConnectionConfiguration(final String name,
+ final String address,
+ final String connectorName,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
+ final boolean duplicateDetection,
+ final boolean forwardWhenNoConsumers,
+ final int maxHops,
+ final int confirmationWindowSize,
+ final String discoveryGroupName)
+ {
this.name = name;
this.address = address;
this.connectorName = connectorName;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ this.connectionTTL = connectionTTL;
this.retryInterval = retryInterval;
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ this.maxRetryInterval = maxRetryInterval;
+ this.reconnectAttempts = reconnectAttempts;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = discoveryGroupName;
@@ -108,6 +197,46 @@
return address;
}
+ /**
+ * @return the clientFailureCheckPeriod
+ */
+ public long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
+
+ /**
+ * @return the connectionTTL
+ */
+ public long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ /**
+ * @return the retryIntervalMultiplier
+ */
+ public double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ /**
+ * @return the maxRetryInterval
+ */
+ public long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
+ /**
+ * @return the reconnectAttempts
+ */
+ public int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
public String getConnectorName()
{
return connectorName;
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -166,6 +167,16 @@
public static final long DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
+ public static final int DEFAULT_CLUSTER_RECONNECT_ATTEMPTS = -1;
+
+ public static final long DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+ public static final long DEFAULT_CLUSTER_CONNECTION_TTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ public static final double DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ public static final long DEFAULT_CLUSTER_MAX_RETRY_INTERVAL = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
public static final boolean DEFAULT_DIVERT_EXCLUSIVE = false;
public static final boolean DEFAULT_BRIDGE_DUPLICATE_DETECTION = true;
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -1003,12 +1003,27 @@
"max-hops",
ConfigurationImpl.DEFAULT_CLUSTER_MAX_HOPS,
Validators.GE_ZERO);
+
+ long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(e, "check-period",
+ ConfigurationImpl.DEFAULT_CLUSTER_FAILURE_CHECK_PERIOD, Validators.GT_ZERO) ;
+ long connectionTTL = XMLConfigurationUtil.getLong(e, "connection-ttl",
+ ConfigurationImpl.DEFAULT_CLUSTER_CONNECTION_TTL, Validators.GT_ZERO) ;
+
+
long retryInterval = XMLConfigurationUtil.getLong(e,
"retry-interval",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL,
Validators.GT_ZERO);
+
+ double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier",
+ ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
+
+ long maxRetryInterval = XMLConfigurationUtil.getLong(e, "max-retry-interval", ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL, Validators.GT_ZERO);
+
+ int reconnectAttempts = XMLConfigurationUtil.getInteger(e, "reconnect-attempts", ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS, Validators.GT_ZERO);
+
int confirmationWindowSize = XMLConfigurationUtil.getInteger(e,
"confirmation-window-size",
FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE,
@@ -1048,7 +1063,12 @@
config = new ClusterConnectionConfiguration(name,
address,
connectorName,
+ clientFailureCheckPeriod,
+ connectionTTL,
retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -1061,7 +1081,12 @@
config = new ClusterConnectionConfiguration(name,
address,
connectorName,
+ clientFailureCheckPeriod,
+ connectionTTL,
retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -1101,17 +1126,26 @@
null,
Validators.NO_CHECK);
+ // Default bridge conf
+ int confirmationWindowSize = XMLConfigurationUtil.getInteger(brNode,
+ "confirmation-window-size",
+ FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ Validators.GT_ZERO);
+
long retryInterval = XMLConfigurationUtil.getLong(brNode,
"retry-interval",
HornetQClient.DEFAULT_RETRY_INTERVAL,
Validators.GT_ZERO);
- // Default bridge conf
- int confirmationWindowSize = XMLConfigurationUtil.getInteger(brNode,
- "confirmation-window-size",
- FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE,
- Validators.GT_ZERO);
+ long clientFailureCheckPeriod = XMLConfigurationUtil.getLong(brNode, "check-period",
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, Validators.GT_ZERO) ;
+ long connectionTTL = XMLConfigurationUtil.getLong(brNode, "connection-ttl",
+ HornetQClient.DEFAULT_CONNECTION_TTL, Validators.GT_ZERO) ;
+
+ long maxRetryInterval = XMLConfigurationUtil.getLong(brNode, "max-retry-interval", HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, Validators.GT_ZERO);
+
+
double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(brNode,
"retry-interval-multiplier",
HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
@@ -1173,12 +1207,14 @@
forwardingAddress,
filterString,
transformerClassName,
+ clientFailureCheckPeriod,
+ connectionTTL,
retryInterval,
+ maxRetryInterval,
retryIntervalMultiplier,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectorNames,
ha,
user,
@@ -1191,12 +1227,14 @@
forwardingAddress,
filterString,
transformerClassName,
+ clientFailureCheckPeriod,
+ connectionTTL,
retryInterval,
+ maxRetryInterval,
retryIntervalMultiplier,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
discoveryGroupName,
ha,
user,
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -38,6 +38,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.BridgeControl;
import org.hornetq.api.core.management.DivertControl;
@@ -1702,12 +1703,14 @@
forwardingAddress,
filterString,
transformerClassName,
+ clientFailureCheckPeriod,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
- clientFailureCheckPeriod,
connectorNames,
ha,
user,
@@ -1721,12 +1724,14 @@
forwardingAddress,
filterString,
transformerClassName,
+ clientFailureCheckPeriod,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
- clientFailureCheckPeriod,
connectors,
ha,
user,
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -13,6 +13,8 @@
package org.hornetq.core.postoffice.impl;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
@@ -50,6 +52,8 @@
{
private static final Logger log = Logger.getLogger(BindingsImpl.class);
+ private static boolean isTrace = log.isTraceEnabled();
+
private final ConcurrentMap<SimpleString, List<Binding>> routingNameBindingMap = new ConcurrentHashMap<SimpleString, List<Binding>>();
private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
@@ -61,13 +65,16 @@
private volatile boolean routeWhenNoConsumers;
private final GroupingHandler groupingHandler;
-
+
private final PagingStore pageStore;
- public BindingsImpl(final GroupingHandler groupingHandler, final PagingStore pageStore)
+ private final SimpleString name;
+
+ public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, final PagingStore pageStore)
{
this.groupingHandler = groupingHandler;
this.pageStore = pageStore;
+ this.name = name;
}
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
@@ -82,6 +89,10 @@
public void addBinding(final Binding binding)
{
+ if (isTrace)
+ {
+ log.trace("addBinding(" + binding + ") being called");
+ }
if (binding.isExclusive())
{
exclusiveBindings.add(binding);
@@ -108,6 +119,12 @@
}
bindingsMap.put(binding.getID(), binding);
+
+ if (isTrace)
+ {
+ log.trace("Adding binding " + binding + " into " + this + " bindingTable: " + debugBindings());
+ }
+
}
public void removeBinding(final Binding binding)
@@ -134,6 +151,11 @@
}
bindingsMap.remove(binding.getID());
+
+ if (isTrace)
+ {
+ log.trace("Removing binding " + binding + " into " + this + " bindingTable: " + debugBindings());
+ }
}
public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception
@@ -144,6 +166,11 @@
return false;
}
+ if (isTrace)
+ {
+ log.trace("Redistributing message " + message);
+ }
+
SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -222,12 +249,12 @@
return false;
}
}
-
+
public PagingStore getPagingStore()
{
return pageStore;
}
-
+
public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
boolean routed = false;
@@ -247,8 +274,8 @@
if (!routed)
{
- //TODO this is a little inefficient since we do the lookup once to see if the property
- //is there, then do it again to remove the actual property
+ // TODO this is a little inefficient since we do the lookup once to see if the property
+ // is there, then do it again to remove the actual property
if (message.containsProperty(MessageImpl.HDR_ROUTE_TO_IDS))
{
routeFromCluster(message, context);
@@ -259,6 +286,10 @@
}
else
{
+ if (isTrace)
+ {
+ log.trace("Routing message " + message + " on binding=" + this);
+ }
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
{
SimpleString routingName = entry.getKey();
@@ -283,6 +314,15 @@
}
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "BindingsImpl [name=" + name + "]";
+ }
+
private Binding getNextBinding(final ServerMessage message,
final SimpleString routingName,
final List<Binding> bindings)
@@ -290,9 +330,9 @@
Integer ipos = routingNamePositions.get(routingName);
int pos = ipos != null ? ipos : 0;
-
+
int length = bindings.size();
-
+
int startPos = pos;
Binding theBinding = null;
@@ -470,7 +510,75 @@
}
}
}
+
+ private String debugBindings()
+ {
+ StringWriter writer = new StringWriter();
+ PrintWriter out = new PrintWriter(writer);
+
+ out.println("\n***************************************");
+
+ out.println("routingNameBindingMap:");
+ if (routingNameBindingMap.isEmpty())
+ {
+ out.println("EMPTY!");
+ }
+ for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+ {
+ out.print("key=" + entry.getKey() + ", value=" + entry.getValue());
+// for (Binding bind : entry.getValue())
+// {
+// out.print(bind + ",");
+// }
+ out.println();
+ }
+
+ out.println();
+
+ out.println("RoutingNamePositions:");
+ if (routingNamePositions.isEmpty())
+ {
+ out.println("EMPTY!");
+ }
+ for (Map.Entry<SimpleString, Integer> entry : routingNamePositions.entrySet())
+ {
+ out.println("key=" + entry.getKey() + ", value=" + entry.getValue());
+ }
+
+ out.println();
+
+ out.println("BindingsMap:");
+
+ if (bindingsMap.isEmpty())
+ {
+ out.println("EMPTY!");
+ }
+ for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet())
+ {
+ out.println("Key=" + entry.getKey() + ", value=" + entry.getValue());
+ }
+
+ out.println();
+
+ out.println("ExclusiveBindings:");
+ if (exclusiveBindings.isEmpty())
+ {
+ out.println("EMPTY!");
+ }
+
+ for (Binding binding: exclusiveBindings)
+ {
+ out.println(binding);
+ }
+
+ out.println("#####################################################");
+
+
+ return writer.toString();
+ }
+
+
private void routeFromCluster(final ServerMessage message, final RoutingContext context) throws Exception
{
byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -124,15 +124,30 @@
return BindingType.DIVERT;
}
+
+
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString()
{
- return "DivertBinding [divert=" + divert + "]";
+ return "DivertBinding [id=" + id +
+ ", address=" +
+ address +
+ ", divert=" +
+ divert +
+ ", filter=" +
+ filter +
+ ", uniqueName=" +
+ uniqueName +
+ ", routingName=" +
+ routingName +
+ ", exclusive=" +
+ exclusive +
+ "]";
}
-
+
public void close() throws Exception
{
}
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -77,6 +77,8 @@
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory
{
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_HQ_RESET_QUEUE_DATA");
@@ -209,6 +211,10 @@
public void onNotification(final Notification notification)
{
+ if (isTrace)
+ {
+ log.trace("Receiving notification : " + notification);
+ }
synchronized (notificationLock)
{
NotificationType type = notification.getType();
@@ -1306,6 +1312,6 @@
public Bindings createBindings(final SimpleString address) throws Exception
{
- return new BindingsImpl(server.getGroupingHandler(), pagingManager.getPageStore(address));
+ return new BindingsImpl(address, server.getGroupingHandler(), pagingManager.getPageStore(address));
}
}
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -38,6 +38,8 @@
public class ChannelImpl implements Channel
{
private static final Logger log = Logger.getLogger(ChannelImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private volatile long id;
@@ -159,6 +161,11 @@
synchronized (sendLock)
{
packet.setChannelID(id);
+
+ if (isTrace)
+ {
+ log.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
+ }
HornetQBuffer buffer = packet.encode(connection);
@@ -168,6 +175,10 @@
{
while (failingOver)
{
+ if (isTrace)
+ {
+ log.trace("Waiting fail over condition to clear on channelID=" + id);
+ }
// TODO - don't hardcode this timeout
try
{
@@ -176,6 +187,10 @@
catch (InterruptedException e)
{
}
+ if (isTrace)
+ {
+ log.trace("FailOver condition cleared on channelID=" + id);
+ }
}
// Sanity check
@@ -193,7 +208,13 @@
{
lock.unlock();
}
+
+ if (isTrace)
+ {
+ log.trace("Writing buffer for channelID=" + id);
+ }
+
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
connection.getTransportConnection().write(buffer, flush, batch);
@@ -350,6 +371,10 @@
{
if (resendCache != null)
{
+ if (isTrace)
+ {
+ log.trace("Replaying commands on channelID=" + id);
+ }
clearUpTo(otherLastConfirmedCommandID);
for (final Packet packet : resendCache)
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -13,6 +13,8 @@
package org.hornetq.core.server.cluster;
+import java.util.concurrent.Executor;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQComponent;
@@ -33,6 +35,12 @@
public interface Bridge extends Consumer, HornetQComponent
{
SimpleString getName();
+
+ /**
+ * Get the executor that is on the context of this bridge
+ * @return
+ */
+ Executor getExecutor();
Queue getQueue();
@@ -43,7 +51,9 @@
boolean isUseDuplicateDetection();
void activate();
-
+
+ void flushExecutor();
+
void setNotificationService(NotificationService notificationService);
RemotingConnection getForwardingConnection();
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -33,6 +33,10 @@
Bridge getBridge();
void close() throws Exception;
+
+ public void resume() throws Exception;
+
+ boolean isClosed();
void reset() throws Exception;
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -195,8 +195,17 @@
while ((ref = refs.poll()) != null)
{
+ if (isTrace)
+ {
+ log.trace("Cancelling reference " + ref + " on bridge " + this);
+ }
list.addFirst(ref);
}
+
+ if (isTrace && list.isEmpty())
+ {
+ log.trace("didn't have any references to cancel on bridge " + this);
+ }
Queue queue = null;
@@ -210,18 +219,45 @@
}
}
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ public void flushExecutor()
+ {
+ // Wait for any create objects runnable to complete
+ Future future = new Future();
+ executor.execute(future);
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ BridgeImpl.log.warn("Timed out waiting to stop");
+ }
+ }
+
+
public void stop() throws Exception
{
if (started)
{
- // We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is
- // trying to connect to the target
- // server which isn't up in an infinite loop
- if (csf != null)
+ executor.execute(new Runnable()
{
- csf.close();
- }
+ public void run()
+ {
+ // We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is
+ // trying to connect to the target
+ // server which isn't up in an infinite loop
+ if (csf != null)
+ {
+ //csf.close();
+ csf = null;
+ }
+ }
+ });
}
log.info("Bridge " + this.name + " being stopped");
@@ -229,9 +265,7 @@
stopping = true;
executor.execute(new StopRunnable());
-
- waitForRunnablesToComplete();
-
+
if (notificationService != null)
{
TypedProperties props = new TypedProperties();
@@ -424,6 +458,10 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
log.warn(name + "::Connection failed with failedOver=" + failedOver, me);
+ if (isTrace)
+ {
+ log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me=" + me + ", boolean failedOver=" + failedOver);
+ }
fail(false);
}
@@ -432,6 +470,8 @@
log.warn(name + "::Connection failed before reconnect ", exception);
fail(true);
}
+
+
// Package protected ---------------------------------------------
@@ -439,19 +479,29 @@
// Private -------------------------------------------------------
- private void waitForRunnablesToComplete()
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
{
- // Wait for any create objects runnable to complete
- Future future = new Future();
-
- executor.execute(future);
-
- boolean ok = future.await(10000);
-
- if (!ok)
- {
- BridgeImpl.log.warn("Timed out waiting to stop");
- }
+ return this.getClass().getName() +
+ " [name=" + name +
+ ", nodeUUID=" +
+ nodeUUID +
+ ", queue=" +
+ queue +
+ ", filter=" +
+ filter +
+ ", forwardingAddress=" +
+ forwardingAddress +
+ ", useDuplicateDetection=" +
+ useDuplicateDetection +
+ ", active=" +
+ active +
+ ", stopping=" +
+ stopping +
+ "]";
}
private void fail(final boolean beforeReconnect)
@@ -654,20 +704,17 @@
{
try
{
+ // We need to close the session outside of the lock,
+ // so any pending operation will be canceled right away
+ if (session != null)
+ {
+ session.close();
+ }
+
synchronized (BridgeImpl.this)
{
- if (!started)
- {
- return;
- }
-
log.debug("Closing Session for bridge " + BridgeImpl.this.name);
- if (session != null)
- {
- session.close();
- }
-
started = false;
active = false;
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -50,6 +50,8 @@
public class ClusterConnectionBridge extends BridgeImpl
{
private static final Logger log = Logger.getLogger(ClusterConnectionBridge.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private final MessageFlowRecord flowRecord;
@@ -147,7 +149,8 @@
{
if (flowRecord != null)
{
- flowRecord.reset();
+ // TODO: can I really remove this?
+ // flowRecord.reset();
if (notifConsumer != null)
{
@@ -195,7 +198,7 @@
flowRecord.getAddress() +
"%')");
- session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
+ session.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter);
notifConsumer = session.createConsumer(notifQueueName);
@@ -224,6 +227,12 @@
}
@Override
+ public void stop() throws Exception
+ {
+ super.stop();
+ }
+
+ @Override
protected ClientSessionFactory createSessionFactory() throws Exception
{
//We create the session factory using the specified connector
@@ -234,6 +243,11 @@
@Override
public void connectionFailed(HornetQException me, boolean failedOver)
{
+ if (isTrace)
+ {
+ log.trace("Connection Failed on ClusterConnectionBridge, failedOver = " + failedOver + ", sessionClosed = " + session.isClosed(), new Exception ("trace"));
+ }
+
if (!failedOver && !session.isClosed())
{
try
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -65,6 +65,8 @@
public class ClusterConnectionImpl implements ClusterConnection
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private final org.hornetq.utils.ExecutorFactory executorFactory;
@@ -78,7 +80,17 @@
private final SimpleString address;
+ private final long clientFailureCheckPeriod;
+
+ private final long connectionTTL;
+
private final long retryInterval;
+
+ private final double retryIntervalMultiplier;
+
+ private final long maxRetryInterval;
+
+ private final int reconnectAttempts;
private final boolean useDuplicateDetection;
@@ -114,7 +126,12 @@
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -144,7 +161,17 @@
this.address = address;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+
+ this.connectionTTL = connectionTTL;
+
this.retryInterval = retryInterval;
+
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetryInterval = maxRetryInterval;
+
+ this.reconnectAttempts = reconnectAttempts;
this.useDuplicateDetection = useDuplicateDetection;
@@ -188,7 +215,12 @@
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
- final long retryInterval,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -218,7 +250,17 @@
this.address = address;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+
+ this.connectionTTL = connectionTTL;
+
this.retryInterval = retryInterval;
+
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetryInterval = maxRetryInterval;
+
+ this.reconnectAttempts = reconnectAttempts;
this.useDuplicateDetection = useDuplicateDetection;
@@ -355,13 +397,26 @@
{
serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setReconnectAttempts(-1);
+ serverLocator.setReconnectAttempts(reconnectAttempts);
serverLocator.setClusterConnection(true);
serverLocator.setClusterTransportConfiguration(connector);
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
- serverLocator.setConfirmationWindowSize(0);
+ if (serverLocator.getConfirmationWindowSize() < 0)
+ {
+ // We can't have confirmationSize = -1 on the cluster Bridge
+ // Otherwise we won't have confirmation working
+ serverLocator.setConfirmationWindowSize(0);
+ }
+
+ if (!useDuplicateDetection)
+ {
+ log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
+ }
+ // if not using duplicate detection, we will send blocked
+ serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
+ serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
if(retryInterval > 0)
{
@@ -393,7 +448,7 @@
public synchronized void nodeDown(final String nodeID)
{
- log.debug("node " + nodeID + " being considered down on cluster connection for nodeID=" + nodeUUID);
+ log.debug("node " + nodeID + " being considered down on cluster connection for nodeID=" + nodeUUID, new Exception ("trace"));
if (nodeID.equals(nodeUUID.toString()))
{
return;
@@ -407,7 +462,11 @@
{
try
{
- record.reset();
+ if (isTrace)
+ {
+ log.trace("Closing clustering record " + record);
+ }
+ record.close();
}
catch (Exception e)
{
@@ -485,11 +544,11 @@
}
else
{
- // FIXME apple and orange comparison. I don't understand it...
- //if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
- // {
- // // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
- //}
+ log.info("Reattaching nodeID=" + nodeID);
+ if (record.isClosed())
+ {
+ record.resume();
+ }
}
}
catch (Exception e)
@@ -502,6 +561,11 @@
public void nodeAnnounced(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair)
{
+ if (isTrace)
+ {
+ log.trace("nodeAnnouncedUp:" + nodeID);
+ }
+
if (nodeID.equals(nodeUUID.toString()))
{
return;
@@ -510,6 +574,10 @@
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
{
+ if (isTrace)
+ {
+ log.trace("Ignoring nodeUp message as it only allows direct connections");
+ }
return;
}
@@ -517,20 +585,32 @@
// and empty static connectors to create bridges... ulgy!
if (serverLocator == null)
{
+ if (isTrace)
+ {
+ log.trace("Ignoring nodeUp as serverLocator==null");
+ }
return;
}
/*we dont create bridges to backups*/
if(connectorPair.a == null)
{
+ if (isTrace)
+ {
+ log.trace("Igoring nodeup as connectorPair.a==null (backup)");
+ }
return;
}
synchronized (records)
{
+ if (isTrace)
+ {
+ log.trace("Adding record for nodeID=" + nodeID);
+ }
try
{
MessageFlowRecord record = records.get(nodeID);
-
+
if (record == null)
{
// New node - create a new flow record
@@ -556,11 +636,10 @@
}
else
{
- // FIXME apple and orange comparison. I don't understand it...
- //if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
- // {
- // // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
- //}
+ if (isTrace)
+ {
+ log.trace("It already had a node created before, ignoring the nodeUp message");
+ }
}
}
catch (Exception e)
@@ -576,13 +655,32 @@
final Queue queue,
final boolean start) throws Exception
{
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(nodeID, connector, queueName, queue);
- Bridge bridge = new ClusterConnectionBridge(serverLocator,
+ records.put(nodeID, record);
+
+ Bridge bridge = createBridge(record);
+
+ record.setBridge(bridge);
+
+ if (start)
+ {
+ bridge.start();
+ }
+ }
+
+ /**
+ * @param record
+ * @return
+ * @throws Exception
+ */
+ protected Bridge createBridge(MessageFlowRecordImpl record) throws Exception
+ {
+ ClusterConnectionBridge bridge = new ClusterConnectionBridge(serverLocator,
nodeUUID,
- nodeID,
- queueName,
- queue,
+ record.getNodeID(),
+ record.getQueueName(),
+ record.getQueue(),
executorFactory.getExecutor(),
null,
null,
@@ -596,16 +694,9 @@
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
record,
- connector);
+ record.getConnector());
- record.setBridge(bridge);
-
- records.put(nodeID, record);
-
- if (start)
- {
- bridge.start();
- }
+ return bridge;
}
// Inner classes -----------------------------------------------------------------------------------
@@ -614,15 +705,26 @@
{
private Bridge bridge;
+ private final String nodeID;
+ private final TransportConfiguration connector;
+ private final SimpleString queueName;
private final Queue queue;
private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
+
+ private volatile boolean isClosed = false;
private volatile boolean firstReset = false;
- public MessageFlowRecordImpl(final Queue queue)
+ public MessageFlowRecordImpl(final String nodeID,
+ final TransportConfiguration connector,
+ final SimpleString queueName,
+ final Queue queue)
{
this.queue = queue;
+ this.nodeID = nodeID;
+ this.connector = connector;
+ this.queueName = queueName;
}
public String getAddress()
@@ -630,6 +732,38 @@
return address.toString();
}
+ /**
+ * @return the nodeID
+ */
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ /**
+ * @return the connector
+ */
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+ /**
+ * @return the queueName
+ */
+ public SimpleString getQueueName()
+ {
+ return queueName;
+ }
+
+ /**
+ * @return the queue
+ */
+ public Queue getQueue()
+ {
+ return queue;
+ }
+
public int getMaxHops()
{
return maxHops;
@@ -637,10 +771,29 @@
public void close() throws Exception
{
+ if (isTrace)
+ {
+ log.trace("Stopping bridge " + bridge);
+ }
+
+ isClosed = true;
+ clearBindings();
+
bridge.stop();
-
- clearBindings();
}
+
+ public void resume() throws Exception
+ {
+ isClosed = false;
+ this.bridge = createBridge(this);
+ bridge.start();
+ bridge.activate();
+ }
+
+ public boolean isClosed()
+ {
+ return isClosed;
+ }
public void reset() throws Exception
{
@@ -659,6 +812,10 @@
public synchronized void onMessage(final ClientMessage message)
{
+ if (isTrace)
+ {
+ log.trace("Receiving message " + message);
+ }
try
{
// Reset the bindings
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -17,7 +17,12 @@
import java.lang.reflect.Array;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -32,7 +37,10 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
-import org.hornetq.core.config.*;
+import org.hornetq.core.config.BridgeConfiguration;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
@@ -682,8 +690,16 @@
serverLocator.setReconnectAttempts(config.getReconnectAttempts());
serverLocator.setRetryInterval(config.getRetryInterval());
serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
+ serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
serverLocator.setInitialConnectAttempts(config.getReconnectAttempts());
+ serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
+ serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
+ if (!config.isUseDuplicateDetection())
+ {
+ log.debug("Bridge " + config.getName() +
+ " is configured to not use duplicate detecion, it will send messages synchronously");
+ }
clusterLocators.add(serverLocator);
Bridge bridge = new BridgeImpl(serverLocator,
nodeUUID,
@@ -769,7 +785,12 @@
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
+ config.getClientFailureCheckPeriod(),
+ config.getConnectionTTL(),
config.getRetryInterval(),
+ config.getRetryIntervalMultiplier(),
+ config.getMaxRetryInterval(),
+ config.getReconnectAttempts(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
@@ -794,7 +815,12 @@
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
+ config.getClientFailureCheckPeriod(),
+ config.getConnectionTTL(),
config.getRetryInterval(),
+ config.getRetryIntervalMultiplier(),
+ config.getMaxRetryInterval(),
+ config.getReconnectAttempts(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -853,6 +853,7 @@
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception
{
+ deliveringCount.decrementAndGet();
if (checkRedelivery(reference, timeBase))
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false))
@@ -870,7 +871,7 @@
{
if (isTrace)
{
- log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName(), new Exception ("trace"));
+ log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName());
}
move(expiryAddress, ref, true, false);
}
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -88,6 +88,8 @@
// Constants -----------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
// Static -------------------------------------------------------------------------------
@@ -598,6 +600,10 @@
public void commit() throws Exception
{
+ if (isTrace)
+ {
+ log.trace("Calling commit");
+ }
try
{
tx.commit();
@@ -1075,7 +1081,7 @@
public void send(final ServerMessage message, final boolean direct) throws Exception
{
long id = storageManager.generateUniqueID();
-
+
SimpleString address = message.getAddress();
message.setMessageID(id);
@@ -1096,6 +1102,12 @@
}
}
+ if (isTrace)
+ {
+ log.trace("send(message=" + message + ", direct=" + direct + ") being called");
+ }
+
+
if (message.getAddress().equals(managementAddress))
{
// It's a management message
Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -84,6 +84,8 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(ManagementServiceImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private final MBeanServer mbeanServer;
@@ -635,6 +637,12 @@
public void sendNotification(final Notification notification) throws Exception
{
+ if (isTrace)
+ {
+ log.trace("Sending Notification = " + notification +
+ ", notificationEnabled=" + notificationsEnabled +
+ " messagingServerControl=" + messagingServerControl, new Exception ("trace"));
+ }
if (messagingServerControl != null && notificationsEnabled)
{
// This needs to be synchronized since we need to ensure notifications are processed in strict sequence
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -118,12 +118,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -260,12 +262,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -386,12 +390,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -527,12 +533,14 @@
forwardAddress,
null,
null,
+ clientFailureCheckPeriod,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
- clientFailureCheckPeriod,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -661,12 +669,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -104,12 +104,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
0,
true,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -176,6 +178,8 @@
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
bridge.stop();
+
+ bridge.flushExecutor();
for (int i = 0; i < numMessages; i++)
{
@@ -267,12 +271,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
500,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
true,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -478,12 +484,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
0,
false,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -629,12 +637,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
1,
true,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -706,6 +716,8 @@
BridgeStartTest.log.info("stopping bridge manually");
bridge.stop();
+
+ bridge.flushExecutor();
for (int i = numMessages; i < numMessages * 2; i++)
{
@@ -739,6 +751,8 @@
Assert.assertNull(consumer1.receiveImmediate());
bridge.stop();
+
+ bridge.flushExecutor();
for (int i = 0; i < numMessages; i++)
{
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -139,14 +139,16 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
// Choose confirmation size to make sure acks
// are sent
numMessages * messageSize / 2,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorConfig,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -339,12 +341,14 @@
forwardAddress,
filterString,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -510,12 +514,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
100,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -665,12 +671,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
100,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
true,
0,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -853,12 +861,14 @@
forwardAddress,
null,
SimpleTransformer.class.getName(),
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -1001,12 +1011,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
0,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -1242,12 +1254,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -1385,14 +1399,16 @@
// address
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
-1,
false,
// Choose confirmation size to make sure acks
// are sent
numMessages * messageSize / 2,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -116,12 +116,14 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
1000,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
1d,
0,
true,
1024,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
@@ -188,6 +190,7 @@
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
bridge.stop();
+ bridge.flushExecutor();
for (int i = 0; i < numMessages; i++)
{
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -113,13 +113,15 @@
forwardAddress,
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
retryInterval,
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
retryIntervalMultiplier,
reconnectAttempts,
false,
confirmationWindowSize,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- staticConnectors,
+ staticConnectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -119,7 +119,7 @@
send(0, "queues.testaddress", 10, false, null);
verifyNotReceive(0);
}
-
+
public void testStopAndStartTarget() throws Exception
{
startServers(0, 1);
@@ -150,9 +150,14 @@
stopServers(1);
- OnewayTwoNodeClusterTest.log.info("restarting server 1");
+ OnewayTwoNodeClusterTest.log.info("restarting server 1(" + servers[1].getIdentity() + ")");
startServers(1);
+
+ //Thread.sleep(1000);
+
+ log.info("Server 1 id=" + servers[1].getNodeID());
+
long end = System.currentTimeMillis();
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -166,12 +166,14 @@
targetQueueConfig.getAddress(),
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
RandomUtil.randomPositiveLong(),
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
RandomUtil.randomDouble(),
RandomUtil.randomPositiveInt(),
RandomUtil.randomBoolean(),
RandomUtil.randomPositiveInt(),
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -144,12 +144,14 @@
targetQueueConfig.getAddress(),
null,
null,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ HornetQClient.DEFAULT_CONNECTION_TTL,
RandomUtil.randomPositiveLong(),
+ HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
RandomUtil.randomDouble(),
RandomUtil.randomPositiveInt(),
RandomUtil.randomBoolean(),
RandomUtil.randomPositiveInt(),
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectors,
false,
ConfigurationImpl.DEFAULT_CLUSTER_USER,
Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-06-13 02:55:40 UTC (rev 10792)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2011-06-13 02:59:47 UTC (rev 10793)
@@ -87,7 +87,7 @@
{
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
- final BindingsImpl bind = new BindingsImpl(null, null);
+ final BindingsImpl bind = new BindingsImpl(null, null, null);
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));
13 years, 6 months
JBoss hornetq SVN: r10792 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-12 22:55:40 -0400 (Sun, 12 Jun 2011)
New Revision: 10792
Added:
branches/Branch_2_2_EAP-cluster-cleanup/
Log:
EAP cleanup branch
13 years, 6 months