JBoss hornetq SVN: r10986 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-14 13:09:28 -0400 (Thu, 14 Jul 2011)
New Revision: 10986
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fixing test
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-14 16:25:28 UTC (rev 10985)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-14 17:09:28 UTC (rev 10986)
@@ -1478,11 +1478,6 @@
catch (HornetQException e)
{
log.debug("Exception on establish connector initial connection", e);
- if (!interrupted)
- {
- this.e = e;
- throw e;
- }
return null;
}
}
14 years, 9 months
JBoss hornetq SVN: r10985 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-14 12:25:28 -0400 (Thu, 14 Jul 2011)
New Revision: 10985
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
improving clustering
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-14 16:24:00 UTC (rev 10984)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-14 16:25:28 UTC (rev 10985)
@@ -1363,33 +1363,26 @@
try
{
- List<Future<ClientSessionFactory>> futuresList = new ArrayList<Future<ClientSessionFactory>>();
-
- for (Connector conn : connectors)
+ while (csf == null && !ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing)
{
- // TODO: Why using submit here? if we are waiting for it anyway?
- log.info("XXX Submitting call towards " + conn);
- futuresList.add(threadPool.submit(conn));
- }
-
- for (int i = 0, futuresSize = futuresList.size(); i < futuresSize; i++)
- {
- Future<ClientSessionFactory> future = futuresList.get(i);
- try
+ for (Connector conn : connectors)
{
- csf = future.get();
- if (csf != null)
- break;
+ if (log.isDebugEnabled())
+ {
+ log.debug("Submitting connect towards " + conn);
+ }
+
+ csf = conn.tryConnect();
+
+ if (csf != null || ServerLocatorImpl.this.closed || ServerLocatorImpl.this.closing)
+ {
+ break;
+ }
}
- catch (Exception e)
- {
- log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
- }
+
+ Thread.sleep (retryInterval);
}
- if (csf == null && !closed)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
- }
+
}
catch (Exception e)
{
@@ -1455,14 +1448,12 @@
super.finalize();
}
- class Connector implements Callable<ClientSessionFactory>
+ class Connector
{
private TransportConfiguration initialConnector;
private volatile ClientSessionFactoryInternal factory;
- private boolean isConnected = false;
-
private boolean interrupted = false;
private Exception e;
@@ -1473,15 +1464,16 @@
this.factory = factory;
}
- public ClientSessionFactory call() throws HornetQException
+ public ClientSessionFactory tryConnect() throws HornetQException
{
if (log.isDebugEnabled())
{
- log.debug("Executing connection to " + factory + " through threadPool.submission()");
+ log.debug("Trying to connect to " + factory);
}
try
{
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ factory.connect(1, false);
+ return factory;
}
catch (HornetQException e)
{
@@ -1491,29 +1483,10 @@
this.e = e;
throw e;
}
- /*if(factory != null)
- {
- factory.close();
- factory = null;
- }*/
return null;
}
- isConnected = true;
- for (Connector connector : connectors)
- {
- if (!connector.isConnected())
- {
- connector.disconnect();
- }
- }
- return factory;
}
- public boolean isConnected()
- {
- return isConnected;
- }
-
public void disconnect()
{
interrupted = true;
14 years, 9 months
JBoss hornetq SVN: r10984 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-14 12:24:00 -0400 (Thu, 14 Jul 2011)
New Revision: 10984
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
improving log on test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-14 05:57:39 UTC (rev 10983)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-14 16:24:00 UTC (rev 10984)
@@ -1303,7 +1303,7 @@
{
if (sfs[node] != null)
{
- throw new IllegalArgumentException("Already a server at " + node);
+ throw new IllegalArgumentException("Already a factory at " + node);
}
Map<String, Object> params = generateParams(node, netty);
@@ -1332,6 +1332,7 @@
locators[node].setBlockOnDurableSend(true);
ClientSessionFactory sf = locators[node].createSessionFactory();
+ sf.createSession().close();
sfs[node] = sf;
}
@@ -1482,6 +1483,8 @@
server = HornetQServers.newHornetQServer(configuration, false);
}
}
+
+ server.setIdentity(this.getClass().getSimpleName() + "/Live(" + node + ")");
servers[node] = server;
}
@@ -1557,6 +1560,7 @@
server = HornetQServers.newHornetQServer(configuration, false);
}
}
+ server.setIdentity(this.getClass().getSimpleName() + "/Backup(" + node + " of live " + liveNode + ")");
servers[node] = server;
}
14 years, 9 months
JBoss hornetq SVN: r10983 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-14 01:57:39 -0400 (Thu, 14 Jul 2011)
New Revision: 10983
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
Log:
Fixing test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-07-13 15:41:38 UTC (rev 10982)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-07-14 05:57:39 UTC (rev 10983)
@@ -922,7 +922,7 @@
// the message IDs are set on the server
Map<String, Object>[] messages = queueControl.listMessages(null);
Assert.assertEquals(50, messages.length);
- assertEquals(50, ((Long)messages[0].get("count")).intValue());
+ assertEquals(50, ((Number)messages[0].get("count")).intValue());
long messageID = (Long)messages[0].get("messageID");
// delete 1st message
14 years, 9 months
JBoss hornetq SVN: r10982 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:41:38 -0400 (Wed, 13 Jul 2011)
New Revision: 10982
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
remove obsolete tests
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-13 15:36:01 UTC (rev 10981)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-13 15:41:38 UTC (rev 10982)
@@ -144,60 +144,12 @@
waitForComponent(liveServer.getReplicationManager());
}
- public void testInvalidJournal() throws Exception
- {
-
- setupServer(true, false);
- manager = liveServer.getReplicationManager();
- waitForComponent(manager);
-
- try
- {
- manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(2, 2),
- new JournalLoadInformation(2, 2) });
- Assert.fail("Exception was expected");
- }
- catch (HornetQException e)
- {
- if (e.getCode() != HornetQException.ILLEGAL_STATE)
- e.printStackTrace();
- Assert.assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
- }
-
- manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(), new JournalLoadInformation() });
-
- }
-
- // should throw an exception if a second server connects to the same backup
- public void testInvalidConnection() throws Exception
- {
-
- setupServer(true, false);
-
- manager = liveServer.getReplicationManager();
- waitForComponent(manager);
- try
- {
- ReplicationManagerImpl manager2 =
- new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-
- manager2.start();
- Assert.fail("Exception was expected");
- }
- catch (Exception e)
- {
- // expected
- }
-
- }
-
public void testConnectIntoNonBackup() throws Exception
{
setupServer(false, false);
try
{
-
manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
manager.start();
Assert.fail("Exception was expected");
@@ -438,22 +390,6 @@
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
-// public void testNoServer() throws Exception
-// {
-// locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-//
-// try
-// {
-// manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-// manager.start();
-// Assert.fail("Exception expected");
-// }
-// catch (HornetQException expected)
-// {
-// Assert.assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
-// }
-// }
-
public void testNoActions() throws Exception
{
14 years, 9 months
JBoss hornetq SVN: r10981 - in branches/HORNETQ-720_Replication/tests: unit-tests/src/test/java/org/hornetq/tests/util and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:36:01 -0400 (Wed, 13 Jul 2011)
New Revision: 10981
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/NonSerializableFactory.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
Reduce visibility.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/NonSerializableFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/NonSerializableFactory.java 2011-07-13 15:35:14 UTC (rev 10980)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/NonSerializableFactory.java 2011-07-13 15:36:01 UTC (rev 10981)
@@ -32,11 +32,12 @@
* used by the default context when running in embedded local configuration
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
*/
-public class NonSerializableFactory implements ObjectFactory
+public final class NonSerializableFactory implements ObjectFactory
{
- public NonSerializableFactory()
+ private NonSerializableFactory()
{
+ // Utility
}
public static void unbind(final Context ctx, final String strName) throws NamingException
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-07-13 15:35:14 UTC (rev 10980)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-07-13 15:36:01 UTC (rev 10981)
@@ -38,6 +38,7 @@
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.NodeManager;
@@ -96,6 +97,35 @@
}
}
+ protected final static void waitForComponent(final HornetQComponent component, final long seconds) throws Exception
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while (!component.isStarted())
+ {
+ try
+ {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("component did not start within timeout of " + seconds);
+ }
+ }
+ }
+
+ protected final void stopComponent(HornetQComponent component) throws Exception
+ {
+ if (component == null)
+ return;
+ if (component.isStarted())
+ component.stop();
+ }
+
protected static Map<String, Object> generateParams(final int node, final boolean netty)
{
Map<String, Object> params = new HashMap<String, Object>();
14 years, 9 months
JBoss hornetq SVN: r10980 - in branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests: integration/replication and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:35:14 -0400 (Wed, 13 Jul 2011)
New Revision: 10980
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
HORNETQ-720 Fix tests, add some utility methods
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -23,16 +23,22 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.DelegatingSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
/**
* A MultiThreadFailoverTest
- *
+ *
* Test Failover where failure is prompted by another thread
*
* @author Tim Fox
@@ -47,7 +53,7 @@
private volatile ClientSessionFactoryInternal sf;
- private Object lockFail = new Object();
+ private final Object lockFail = new Object();
class MyListener implements SessionFailureListener
{
@@ -170,7 +176,7 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
- sf = (ClientSessionFactoryInternal) createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession createSession = sf.createSession(true, true);
@@ -198,7 +204,7 @@
// Simulate failure on connection
synchronized (lockFail)
{
- crash((ClientSession) createSession);
+ crash(createSession);
}
/*if (listener != null)
@@ -226,7 +232,7 @@
Assert.assertEquals(0, sf.numSessions());
locator.close();
-
+
Assert.assertEquals(0, sf.numConnections());
if (i != numIts - 1)
@@ -243,7 +249,7 @@
DelegatingSession.debug = false;
}
}
-
+
protected void addPayload(ClientMessage msg)
{
}
@@ -278,7 +284,7 @@
message.getBodyBuffer().writeString("message" + i);
message.putIntProperty("counter", i);
-
+
addPayload(message);
producer.send(message);
@@ -408,7 +414,7 @@
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("id:" + i +
",exec:" +
executionId));
-
+
addPayload(message);
@@ -439,13 +445,13 @@
}
while (retry);
-
-
+
+
boolean blocked = false;
retry = false;
-
- ClientConsumer consumer = null;
+
+ ClientConsumer consumer = null;
do
{
ArrayList<Integer> msgs = new ArrayList<Integer>();
@@ -473,7 +479,7 @@
}
session.commit();
-
+
if (blocked)
{
assertTrue("msgs.size is expected to be 0 or " + numMessages + " but it was " + msgs.size(), msgs.size() == 0 || msgs.size() == numMessages);
@@ -535,13 +541,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -13,23 +13,29 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.utils.ReusableLatch;
+import org.hornetq.tests.util.TransportConfigurationUtils;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
* Date: Dec 21, 2010
@@ -193,6 +199,7 @@
Assert.assertEquals(0, sf.numConnections());
}
+ @Override
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
@@ -234,13 +241,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackManualTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -13,23 +13,30 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
* Date: Dec 21, 2010
@@ -133,6 +140,7 @@
}
+ @Override
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
@@ -176,13 +184,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -28,6 +28,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
/**
* A FailoverOnFlowControlTest
@@ -49,8 +50,8 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
-
+
+
public void testOverflowSend() throws Exception
{
ServerLocator locator = getServerLocator();
@@ -65,11 +66,11 @@
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
System.out.println("Intercept..." + packet.getClass().getName());
-
+
if (packet instanceof SessionProducerCreditsMessage )
{
SessionProducerCreditsMessage credit = (SessionProducerCreditsMessage)packet;
-
+
System.out.println("Credits: " + credit.getCredits());
if (count.incrementAndGet() == 2)
{
@@ -87,7 +88,7 @@
return true;
}
};
-
+
locator.addInterceptor(interceptorClient);
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
@@ -97,27 +98,28 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
final int numMessages = 10;
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(true);
-
+
message.getBodyBuffer().writeBytes(new byte[5000]);
message.putIntProperty("counter", i);
producer.send(message);
}
-
+
session.close();
-
+
locator.close();
}
+ @Override
protected void createConfigs() throws Exception
{
super.createConfigs();
@@ -125,6 +127,7 @@
backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
}
+ @Override
protected ServerLocatorInternal getServerLocator() throws Exception
{
ServerLocatorInternal locator = super.getServerLocator();
@@ -140,13 +143,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -46,6 +46,7 @@
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.TransportConfigurationUtils;
/**
*
@@ -1875,13 +1876,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
/**
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -35,19 +35,16 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.ReplicatedBackupUtils;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -60,8 +57,8 @@
// Constants -----------------------------------------------------
protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
- private static final String LIVE_NODE_NAME = "hqLIVE";
+
// Attributes ----------------------------------------------------
protected TestableServer liveServer;
@@ -121,19 +118,6 @@
return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager));
}
- private ClusterConnectionConfiguration createClusterConnectionConf(String name, String... connectors)
- {
- List<String> conn = new ArrayList<String>(connectors.length);
- for (String iConn : connectors)
- {
- conn.add(iConn);
- }
- return new ClusterConnectionConfiguration("cluster1", "jms", name, -1, false, false, 1, 1, conn, false);
- }
-
- /**
- * @throws Exception
- */
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
@@ -149,8 +133,8 @@
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
- backupConfig.getClusterConfigurations().add(createClusterConnectionConf(backupConnector.getName(),
- liveConnector.getName()));
+ ReplicatedBackupUtils.createClusterConnectionConf(backupConfig, backupConnector.getName(),
+ liveConnector.getName());
backupServer = createBackupServer();
liveConfig = super.createDefaultConfig();
@@ -159,7 +143,7 @@
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
- liveConfig.getClusterConfigurations().add(createClusterConnectionConf(liveConnector.getName()));
+ ReplicatedBackupUtils.createClusterConnectionConf(liveConfig, liveConnector.getName());
liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
liveServer = createLiveServer();
}
@@ -172,42 +156,26 @@
nodeManager = new InVMNodeManager();
backupConfig = super.createDefaultConfig();
+ liveConfig = super.createDefaultConfig();
+ TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false);
+ ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig,
+ liveConnector);
+
backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + "_backup");
backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + "_backup");
backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + "_backup");
backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() + "_backup");
- backupConfig.getAcceptorConfigurations().clear();
- backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
-
- backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
- backupConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
- backupConfig.getClusterConfigurations().add(createClusterConnectionConf(backupConnector.getName(),
- backupConnector.getName()));
-
backupConfig.setSecurityEnabled(false);
- backupConfig.setSharedStore(false);
- backupConfig.setBackup(true);
- backupConfig.setLiveConnectorName(LIVE_NODE_NAME);
- backupConfig.setClustered(true);
backupServer = createBackupServer();
backupServer.getServer().setIdentity("idBackup");
- liveConfig = super.createDefaultConfig();
+
liveConfig.getAcceptorConfigurations().clear();
liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
- liveConfig.setName(LIVE_NODE_NAME);
- liveConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
- liveConfig.setSecurityEnabled(false);
- liveConfig.setSharedStore(false);
- liveConfig.setClustered(true);
- liveConfig.getClusterConfigurations().add(createClusterConnectionConf(LIVE_NODE_NAME, LIVE_NODE_NAME));
liveServer = createLiveServer();
liveServer.getServer().setIdentity("idLive");
-
- //liveServer.start();
- //backupServer.start();
}
@Override
@@ -261,8 +229,7 @@
return sf;
}
- protected void waitForBackup(ClientSessionFactoryInternal sf, long seconds)
- throws Exception
+ protected static void waitForBackup(ClientSessionFactoryInternal sf, long seconds) throws Exception
{
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
@@ -288,38 +255,6 @@
System.out.println("sf.getBackupConnector() = " + sf.getBackupConnector());
}
- protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
- {
- if (live)
- {
- return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName());
- }
- else
- {
- Map<String, Object> server1Params = new HashMap<String, Object>();
-
- server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), server1Params);
- }
- }
-
- protected TransportConfiguration getInVMTransportAcceptorConfiguration(final boolean live)
- {
- if (live)
- {
- return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
- }
- else
- {
- Map<String, Object> server1Params = new HashMap<String, Object>();
-
- server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-
- return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), server1Params);
- }
- }
-
protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
{
if (live)
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -14,12 +14,9 @@
package org.hornetq.tests.integration.cluster.failover;
import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -28,17 +25,16 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
/**
* A PagingFailoverTest
- *
+ *
* TODO: validate replication failover also
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
@@ -135,10 +131,10 @@
{
crash(session);
}
-
-
+
+
session.close();
-
+
session = sf.createSession(!transacted, !transacted, 0);
session.start();
@@ -161,9 +157,9 @@
}
session.commit();
-
+
cons.close();
-
+
Thread.sleep(1000);
if (!failBeforeConsume)
@@ -173,7 +169,7 @@
}
session.close();
-
+
session = sf.createSession(true, true, 0);
cons = session.createConsumer(PagingFailoverTest.ADDRESS);
@@ -202,23 +198,6 @@
}
}
- /**
- * @param session
- * @param latch
- * @throws InterruptedException
- */
- private void failSession(final ClientSession session, final CountDownLatch latch) throws InterruptedException
- {
- RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
-
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- Assert.assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -226,13 +205,13 @@
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- return getInVMTransportAcceptorConfiguration(live);
+ return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- return getInVMConnectorTransportConfiguration(live);
+ return TransportConfigurationUtils.getInVMConnector(live);
}
@Override
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-13 15:34:24 UTC (rev 10979)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -14,6 +14,7 @@
package org.hornetq.tests.integration.replication;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -56,15 +57,19 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
+import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.ReplicatedBackupUtils;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.TransportConfigurationUtils;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
@@ -76,23 +81,18 @@
public class ReplicationTest extends ServiceTestBase
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
private ThreadFactory tFactory;
-
private ExecutorService executor;
-
private ExecutorFactory factory;
-
private ScheduledExecutorService scheduledExecutor;
- private HornetQServerImpl server;
+ private HornetQServerImpl backupServer;
+ /** This field is not always used. */
+ private HornetQServerImpl liveServer;
private ServerLocator locator;
- private ReplicationManagerImpl manager;
+ private ReplicationManager manager;
// Static --------------------------------------------------------
@@ -100,32 +100,57 @@
// Public --------------------------------------------------------
- public void testBasicConnection() throws Exception
+ private void setupServer(boolean backup, boolean netty, String... interceptors) throws Exception
{
- boolean backup = true;
- boolean netty = false;
- setupServer(backup, netty);
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
- manager.stop();
+ assert backup; // XXX
+
+ Configuration backupConfig = createDefaultConfig(netty);
+ Configuration liveConfig = createDefaultConfig(netty);
+ backupConfig.setBackup(backup);
+ if (interceptors.length > 0)
+ {
+ List<String> interceptorsList = Arrays.asList(interceptors);
+ backupConfig.setInterceptorClassNames(interceptorsList);
+ }
+
+ TransportConfiguration liveConnector = TransportConfigurationUtils.getInVMConnector(true);
+ TransportConfiguration backupConnector = TransportConfigurationUtils.getInVMConnector(false);
+ TransportConfiguration backupAcceptor = TransportConfigurationUtils.getInVMAcceptor(false);
+
+ ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig,
+ liveConnector);
+ if (backup)
+ {
+ liveServer = new HornetQServerImpl(liveConfig);
+ liveServer.start();
+ waitForComponent(liveServer);
+ }
+
+ backupServer = new HornetQServerImpl(backupConfig);
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ backupServer.start();
+ Thread.sleep(200); // XXX improve this
+ waitForComponent(backupServer);
}
- private void setupServer(boolean backup, boolean netty) throws Exception
+ private static void waitForComponent(HornetQComponent component) throws Exception
{
- Configuration config = createDefaultConfig(netty);
- config.setBackup(backup);
- server = new HornetQServerImpl(config);
- locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- server.start();
+ waitForComponent(component, 3);
}
+ public void testBasicConnection() throws Exception
+ {
+ setupServer(true, false);
+ waitForComponent(liveServer.getReplicationManager());
+ }
+
public void testInvalidJournal() throws Exception
{
setupServer(true, false);
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
try
{
manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(2, 2),
@@ -134,7 +159,8 @@
}
catch (HornetQException e)
{
- e.printStackTrace();
+ if (e.getCode() != HornetQException.ILLEGAL_STATE)
+ e.printStackTrace();
Assert.assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
}
@@ -148,10 +174,8 @@
setupServer(true, false);
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-
- manager.start();
-
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
try
{
ReplicationManagerImpl manager2 =
@@ -162,17 +186,19 @@
}
catch (Exception e)
{
+ // expected
}
+
}
public void testConnectIntoNonBackup() throws Exception
{
setupServer(false, false);
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
-
try
{
+
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
manager.start();
Assert.fail("Exception was expected");
}
@@ -188,8 +214,8 @@
StorageManager storage = getStorage();
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -226,8 +252,8 @@
blockOnReplication(storage, manager);
PagingManager pagingManager =
- createPageManager(server.getStorageManager(), server.getConfiguration(), server.getExecutorFactory(),
- server.getAddressSettingsRepository());
+ createPageManager(backupServer.getStorageManager(), backupServer.getConfiguration(), backupServer.getExecutorFactory(),
+ backupServer.getAddressSettingsRepository());
PagingStore store = pagingManager.getPageStore(dummy);
store.start();
@@ -266,25 +292,10 @@
public void testSendPacketsWithFailure() throws Exception
{
- Configuration config = createDefaultConfig(false);
+ setupServer(true, false, TestInterceptor.class.getName());
- config.setBackup(true);
-
- ArrayList<String> intercepts = new ArrayList<String>();
-
- intercepts.add(TestInterceptor.class.getName());
-
- config.setInterceptorClassNames(intercepts);
-
- server = new HornetQServerImpl(config);
-
- server.start();
-
- locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-
StorageManager storage = getStorage();
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
+ manager = liveServer.getReplicationManager();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -309,7 +320,7 @@
}
});
- server.stop();
+ backupServer.stop();
Assert.assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -408,7 +419,7 @@
* @param manager
* @return
*/
- private void blockOnReplication(final StorageManager storage, final ReplicationManagerImpl manager) throws Exception
+ private void blockOnReplication(final StorageManager storage, final ReplicationManager manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
storage.afterCompleteOperations(new IOAsyncTask()
@@ -427,29 +438,29 @@
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
- public void testNoServer() throws Exception
- {
- locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+// public void testNoServer() throws Exception
+// {
+// locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+//
+// try
+// {
+// manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+// manager.start();
+// Assert.fail("Exception expected");
+// }
+// catch (HornetQException expected)
+// {
+// Assert.assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
+// }
+// }
- try
- {
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
- Assert.fail("Exception expected");
- }
- catch (HornetQException expected)
- {
- Assert.assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
- }
- }
-
public void testNoActions() throws Exception
{
setupServer(true, false);
StorageManager storage = getStorage();
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -482,9 +493,7 @@
final ArrayList<Integer> executions = new ArrayList<Integer>();
StorageManager storage = getStorage();
- manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
-
+ manager = liveServer.getReplicationManager();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
int numberOfAdds = 200;
@@ -568,19 +577,13 @@
@Override
protected void tearDown() throws Exception
{
- if (manager != null)
- {
- if (manager.isStarted())
- manager.stop();
- manager = null;
- }
- if (server != null)
- {
- if (server.isStarted())
- server.stop();
- server = null;
- }
+ stopComponent(manager);
+ manager = null;
+ stopComponent(liveServer);
+ liveServer = null;
+ stopComponent(backupServer);
+ backupServer = null;
executor.shutdown();
@@ -593,6 +596,7 @@
}
+
protected
PagingManager
createPageManager(final StorageManager storageManager,
@@ -613,7 +617,7 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
- public static class TestInterceptor implements Interceptor
+ public static final class TestInterceptor implements Interceptor
{
static AtomicBoolean value = new AtomicBoolean(true);
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/ReplicatedBackupUtils.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -0,0 +1,71 @@
+/**
+ *
+ */
+package org.hornetq.tests.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+
+public final class ReplicatedBackupUtils
+{
+ private static final String LIVE_NODE_NAME = "hqLIVE";
+ private ReplicatedBackupUtils()
+ {
+ // Utility class
+ }
+
+ /**
+ * Creates a {@link ClusterConnectionConfiguration} and adds it to the {@link Configuration}.
+ * @param configuration
+ * @param name
+ * @param connectors
+ */
+ public static void createClusterConnectionConf(Configuration configuration, String name, String... connectors)
+ {
+
+ List<String> conn = new ArrayList<String>(connectors.length);
+ for (String iConn : connectors)
+ {
+ conn.add(iConn);
+ }
+ ClusterConnectionConfiguration clusterConfig =
+ new ClusterConnectionConfiguration("cluster1", "jms", name, -1, false, false, 1, 1, conn, false);
+ configuration.getClusterConfigurations().add(clusterConfig);
+ }
+
+ public static void configureReplicationPair(Configuration backupConfig,
+ TransportConfiguration backupConnector,
+ TransportConfiguration backupAcceptor,
+ Configuration liveConfig,
+ TransportConfiguration liveConnector)
+ {
+ if (backupAcceptor != null)
+ {
+ Set<TransportConfiguration> backupAcceptorSet = backupConfig.getAcceptorConfigurations();
+ backupAcceptorSet.clear();
+ backupAcceptorSet.add(backupAcceptor);
+ }
+
+ backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+ backupConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
+ ReplicatedBackupUtils.createClusterConnectionConf(backupConfig, backupConnector.getName(),
+ backupConnector.getName());
+
+ backupConfig.setSharedStore(false);
+ backupConfig.setBackup(true);
+ backupConfig.setLiveConnectorName(LIVE_NODE_NAME);
+ backupConfig.setClustered(true);
+
+ liveConfig.setName(LIVE_NODE_NAME);
+ liveConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
+ liveConfig.setSecurityEnabled(false);
+ liveConfig.setSharedStore(false);
+ liveConfig.setClustered(true);
+ ReplicatedBackupUtils.createClusterConnectionConf(liveConfig, LIVE_NODE_NAME, LIVE_NODE_NAME);
+ }
+}
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/TransportConfigurationUtils.java 2011-07-13 15:35:14 UTC (rev 10980)
@@ -0,0 +1,38 @@
+package org.hornetq.tests.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
+
+public final class TransportConfigurationUtils
+{
+
+ public static TransportConfiguration getInVMAcceptor(final boolean live)
+ {
+ if (live)
+ {
+ return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
+ }
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), server1Params);
+ }
+
+ public static TransportConfiguration getInVMConnector(final boolean live)
+ {
+ if (live)
+ {
+ return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName());
+ }
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), server1Params);
+ }
+
+}
14 years, 9 months
JBoss hornetq SVN: r10979 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:34:24 -0400 (Wed, 13 Jul 2011)
New Revision: 10979
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Clean up test case, remove duplicated code.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-13 15:33:34 UTC (rev 10978)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-13 15:34:24 UTC (rev 10979)
@@ -22,7 +22,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
@@ -88,18 +87,6 @@
// Constructors --------------------------------------------------
- // XXX remove constructor once the other one is stable
- @Deprecated
- public ReplicationManagerImpl(final ClientSessionFactoryInternal sessionFactory,
- final ExecutorFactory executorFactory)
- {
- super();
- this.executorFactory = executorFactory;
-
- CoreRemotingConnection conn = sessionFactory.getConnection();
- replicatingChannel = conn.getChannel(CHANNEL_ID.REPLICATION.id, -1);
- }
-
/**
* @param remotingConnection
*/
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-13 15:33:34 UTC (rev 10978)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-13 15:34:24 UTC (rev 10979)
@@ -35,7 +35,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
@@ -59,7 +58,6 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -73,10 +71,7 @@
/**
* A ReplicationTest
- *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public class ReplicationTest extends ServiceTestBase
{
@@ -93,6 +88,12 @@
private ScheduledExecutorService scheduledExecutor;
+ private HornetQServerImpl server;
+
+ private ServerLocator locator;
+
+ private ReplicationManagerImpl manager;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -101,241 +102,165 @@
public void testBasicConnection() throws Exception
{
+ boolean backup = true;
+ boolean netty = false;
+ setupServer(backup, netty);
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+ manager.start();
+ manager.stop();
+ }
- Configuration config = createDefaultConfig(false);
-
- config.setBackup(true);
-
- HornetQServer server = new HornetQServerImpl(config);
-
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
-
+ private void setupServer(boolean backup, boolean netty) throws Exception
+ {
+ Configuration config = createDefaultConfig(netty);
+ config.setBackup(backup);
+ server = new HornetQServerImpl(config);
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
server.start();
-
- try
- {
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
- manager.start();
- manager.stop();
- }
- finally
- {
- server.stop();
- }
}
public void testInvalidJournal() throws Exception
{
- Configuration config = createDefaultConfig(false);
+ setupServer(true, false);
- config.setBackup(true);
-
- HornetQServer server = new HornetQServerImpl(config);
-
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
-
- server.start();
-
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+ manager.start();
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
- manager.start();
- try
- {
- manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(2, 2),
- new JournalLoadInformation(2, 2) });
- Assert.fail("Exception was expected");
- }
- catch (HornetQException e)
- {
- e.printStackTrace();
- Assert.assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
- }
-
- manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(),
- new JournalLoadInformation() });
-
- manager.stop();
+ manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(2, 2),
+ new JournalLoadInformation(2, 2) });
+ Assert.fail("Exception was expected");
}
- finally
+ catch (HornetQException e)
{
- server.stop();
+ e.printStackTrace();
+ Assert.assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
}
+
+ manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(), new JournalLoadInformation() });
+
}
// should throw an exception if a second server connects to the same backup
public void testInvalidConnection() throws Exception
{
- Configuration config = createDefaultConfig(false);
+ setupServer(true, false);
- config.setBackup(true);
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- HornetQServer server = new HornetQServerImpl(config);
+ manager.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
-
- server.start();
-
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
+ ReplicationManagerImpl manager2 =
+ new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- manager.start();
-
- try
- {
- ReplicationManagerImpl manager2 = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
-
- manager2.start();
- Assert.fail("Exception was expected");
- }
- catch (Exception e)
- {
- }
-
- manager.stop();
-
+ manager2.start();
+ Assert.fail("Exception was expected");
}
- finally
+ catch (Exception e)
{
- server.stop();
}
}
public void testConnectIntoNonBackup() throws Exception
{
+ setupServer(false, false);
- Configuration config = createDefaultConfig(false);
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
- config.setBackup(false);
-
- HornetQServer server = new HornetQServerImpl(config);
-
- server.start();
-
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
-
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
-
- try
- {
- manager.start();
- Assert.fail("Exception was expected");
- }
- catch (HornetQException expected)
- {
- }
-
- manager.stop();
+ manager.start();
+ Assert.fail("Exception was expected");
}
- finally
+ catch (HornetQException expected)
{
- server.stop();
+ // expected
}
}
public void testSendPackets() throws Exception
{
+ setupServer(true, false);
- Configuration config = createDefaultConfig(false);
+ StorageManager storage = getStorage();
- config.setBackup(true);
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+ manager.start();
- HornetQServer server = new HornetQServerImpl(config);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
- server.start();
+ replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ replicatedJournal.appendAddRecord(1, (byte)1, new FakeData(), false);
+ replicatedJournal.appendUpdateRecord(1, (byte)2, new FakeData(), false);
+ replicatedJournal.appendDeleteRecord(1, false);
+ replicatedJournal.appendAddRecordTransactional(2, 2, (byte)1, new FakeData());
+ replicatedJournal.appendUpdateRecordTransactional(2, 2, (byte)2, new FakeData());
+ replicatedJournal.appendCommitRecord(2, false);
- try
- {
- StorageManager storage = getStorage();
+ replicatedJournal.appendDeleteRecordTransactional(3, 4, new FakeData());
+ replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
+ replicatedJournal.appendRollbackRecord(3, false);
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
- manager.start();
+ blockOnReplication(storage, manager);
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+ Assert.assertEquals(0, manager.getActiveTokens().size());
- replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
+ ServerMessage msg = new ServerMessageImpl(1, 1024);
- replicatedJournal.appendAddRecord(1, (byte)1, new FakeData(), false);
- replicatedJournal.appendUpdateRecord(1, (byte)2, new FakeData(), false);
- replicatedJournal.appendDeleteRecord(1, false);
- replicatedJournal.appendAddRecordTransactional(2, 2, (byte)1, new FakeData());
- replicatedJournal.appendUpdateRecordTransactional(2, 2, (byte)2, new FakeData());
- replicatedJournal.appendCommitRecord(2, false);
+ SimpleString dummy = new SimpleString("dummy");
+ msg.setAddress(dummy);
- replicatedJournal.appendDeleteRecordTransactional(3, 4, new FakeData());
- replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
- replicatedJournal.appendRollbackRecord(3, false);
+ replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new FakeData());
- blockOnReplication(storage, manager);
+ PagedMessage pgmsg = new PagedMessageImpl(msg, new long[0]);
+ manager.pageWrite(pgmsg, 1);
+ manager.pageWrite(pgmsg, 2);
+ manager.pageWrite(pgmsg, 3);
+ manager.pageWrite(pgmsg, 4);
- Assert.assertEquals(0, manager.getActiveTokens().size());
+ blockOnReplication(storage, manager);
- ServerMessage msg = new ServerMessageImpl(1, 1024);
+ PagingManager pagingManager =
+ createPageManager(server.getStorageManager(), server.getConfiguration(), server.getExecutorFactory(),
+ server.getAddressSettingsRepository());
- SimpleString dummy = new SimpleString("dummy");
- msg.setAddress(dummy);
+ PagingStore store = pagingManager.getPageStore(dummy);
+ store.start();
+ Assert.assertEquals(4, store.getNumberOfPages());
+ store.stop();
- replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new FakeData());
+ manager.pageDeleted(dummy, 1);
+ manager.pageDeleted(dummy, 2);
+ manager.pageDeleted(dummy, 3);
+ manager.pageDeleted(dummy, 4);
+ manager.pageDeleted(dummy, 5);
+ manager.pageDeleted(dummy, 6);
- PagedMessage pgmsg = new PagedMessageImpl(msg, new long[0]);
- manager.pageWrite(pgmsg, 1);
- manager.pageWrite(pgmsg, 2);
- manager.pageWrite(pgmsg, 3);
- manager.pageWrite(pgmsg, 4);
+ blockOnReplication(storage, manager);
- blockOnReplication(storage, manager);
+ ServerMessageImpl serverMsg = new ServerMessageImpl();
+ serverMsg.setMessageID(500);
+ serverMsg.setAddress(new SimpleString("tttt"));
- PagingManager pagingManager = createPageManager(server.getStorageManager(),
- server.getConfiguration(),
- server.getExecutorFactory(),
- server.getAddressSettingsRepository());
+ HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(100);
+ serverMsg.encodeHeadersAndProperties(buffer);
- PagingStore store = pagingManager.getPageStore(dummy);
- store.start();
- Assert.assertEquals(4, store.getNumberOfPages());
- store.stop();
+ manager.largeMessageBegin(500);
- manager.pageDeleted(dummy, 1);
- manager.pageDeleted(dummy, 2);
- manager.pageDeleted(dummy, 3);
- manager.pageDeleted(dummy, 4);
- manager.pageDeleted(dummy, 5);
- manager.pageDeleted(dummy, 6);
+ manager.largeMessageWrite(500, new byte[1024]);
- blockOnReplication(storage, manager);
+ manager.largeMessageDelete(500);
- ServerMessageImpl serverMsg = new ServerMessageImpl();
- serverMsg.setMessageID(500);
- serverMsg.setAddress(new SimpleString("tttt"));
+ blockOnReplication(storage, manager);
- HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(100);
- serverMsg.encodeHeadersAndProperties(buffer);
+ store.start();
- manager.largeMessageBegin(500);
-
- manager.largeMessageWrite(500, new byte[1024]);
-
- manager.largeMessageDelete(500);
-
- blockOnReplication(storage, manager);
-
- store.start();
-
- Assert.assertEquals(0, store.getNumberOfPages());
-
- manager.stop();
- }
- finally
- {
- server.stop();
- }
+ Assert.assertEquals(0, store.getNumberOfPages());
}
public void testSendPacketsWithFailure() throws Exception
@@ -351,49 +276,43 @@
config.setInterceptorClassNames(intercepts);
- HornetQServer server = new HornetQServerImpl(config);
+ server = new HornetQServerImpl(config);
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- try
- {
- StorageManager storage = getStorage();
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
- manager.start();
+ StorageManager storage = getStorage();
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+ manager.start();
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
- TestInterceptor.value.set(false);
+ TestInterceptor.value.set(false);
- for (int i = 0; i < 500; i++)
+ for (int i = 0; i < 500; i++)
+ {
+ replicatedJournal.appendAddRecord(i, (byte)1, new FakeData(), false);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ storage.afterCompleteOperations(new IOAsyncTask()
+ {
+
+ public void onError(final int errorCode, final String errorMessage)
{
- replicatedJournal.appendAddRecord(i, (byte)1, new FakeData(), false);
}
- final CountDownLatch latch = new CountDownLatch(1);
- storage.afterCompleteOperations(new IOAsyncTask()
+ public void done()
{
+ latch.countDown();
+ }
+ });
- public void onError(final int errorCode, final String errorMessage)
- {
- }
+ server.stop();
- public void done()
- {
- latch.countDown();
- }
- });
+ Assert.assertTrue(latch.await(50, TimeUnit.SECONDS));
- server.stop();
-
- Assert.assertTrue(latch.await(50, TimeUnit.SECONDS));
- }
- finally
- {
- server.stop();
- }
}
public void testExceptionSettingActionBefore() throws Exception
@@ -510,11 +429,11 @@
public void testNoServer() throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
manager.start();
Assert.fail("Exception expected");
}
@@ -527,119 +446,86 @@
public void testNoActions() throws Exception
{
- Configuration config = createDefaultConfig(false);
+ setupServer(true, false);
+ StorageManager storage = getStorage();
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+ manager.start();
- config.setBackup(true);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
- HornetQServer server = new HornetQServerImpl(config);
+ replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
- server.start();
-
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
-
- try
+ final CountDownLatch latch = new CountDownLatch(1);
+ storage.afterCompleteOperations(new IOAsyncTask()
{
- StorageManager storage = getStorage();
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
- manager.start();
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
- replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
-
- final CountDownLatch latch = new CountDownLatch(1);
- storage.afterCompleteOperations(new IOAsyncTask()
+ public void done()
{
+ latch.countDown();
+ }
+ });
- public void onError(final int errorCode, final String errorMessage)
- {
- }
+ Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
- public void done()
- {
- latch.countDown();
- }
- });
-
- Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
-
- Assert.assertEquals(0, manager.getActiveTokens().size());
- manager.stop();
- }
- finally
- {
- server.stop();
- }
+ Assert.assertEquals(0, manager.getActiveTokens().size());
}
public void testOrderOnNonPersistency() throws Exception
{
- Configuration config = createDefaultConfig(false);
+ setupServer(true, false);
- config.setBackup(true);
+ final ArrayList<Integer> executions = new ArrayList<Integer>();
- HornetQServer server = new HornetQServerImpl(config);
+ StorageManager storage = getStorage();
+ manager = new ReplicationManagerImpl(locator.createSessionFactory().getConnection(), factory);
+ manager.start();
- server.start();
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ int numberOfAdds = 200;
- final ArrayList<Integer> executions = new ArrayList<Integer>();
+ final CountDownLatch latch = new CountDownLatch(numberOfAdds);
- try
+ OperationContext ctx = storage.getContext();
+
+ for (int i = 0; i < numberOfAdds; i++)
{
- StorageManager storage = getStorage();
- ReplicationManagerImpl manager = new ReplicationManagerImpl((ClientSessionFactoryInternal) locator.createSessionFactory(), factory);
- manager.start();
+ final int nAdd = i;
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+ if (i % 2 == 0)
+ {
+ replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
+ }
- int numberOfAdds = 200;
-
- final CountDownLatch latch = new CountDownLatch(numberOfAdds);
-
- OperationContext ctx = storage.getContext();
-
- for (int i = 0; i < numberOfAdds; i++)
+ ctx.executeOnCompletion(new IOAsyncTask()
{
- final int nAdd = i;
- if (i % 2 == 0)
+ public void onError(final int errorCode, final String errorMessage)
{
- replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
}
- ctx.executeOnCompletion(new IOAsyncTask()
+ public void done()
{
+ System.out.println("Add " + nAdd);
+ executions.add(nAdd);
+ latch.countDown();
+ }
+ });
+ }
- public void onError(final int errorCode, final String errorMessage)
- {
- }
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
- public void done()
- {
- System.out.println("Add " + nAdd);
- executions.add(nAdd);
- latch.countDown();
- }
- });
- }
-
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-
- for (int i = 0; i < numberOfAdds; i++)
- {
- Assert.assertEquals(i, executions.get(i).intValue());
- }
-
- Assert.assertEquals(0, manager.getActiveTokens().size());
- manager.stop();
- }
- finally
+ for (int i = 0; i < numberOfAdds; i++)
{
- server.stop();
+ Assert.assertEquals(i, executions.get(i).intValue());
}
+
+ Assert.assertEquals(0, manager.getActiveTokens().size());
}
class FakeData implements EncodingSupport
@@ -654,9 +540,6 @@
buffer.writeBytes(new byte[5]);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
- */
public int getEncodeSize()
{
return 5;
@@ -685,33 +568,43 @@
@Override
protected void tearDown() throws Exception
{
+ if (manager != null)
+ {
+ if (manager.isStarted())
+ manager.stop();
+ manager = null;
+ }
+ if (server != null)
+ {
+ if (server.isStarted())
+ server.stop();
+ server = null;
+ }
+
executor.shutdown();
scheduledExecutor.shutdown();
tFactory = null;
-
scheduledExecutor = null;
super.tearDown();
}
-
- protected PagingManager createPageManager(final StorageManager storageManager,
- final Configuration configuration,
- final ExecutorFactory executorFactory,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception
+ protected
+ PagingManager
+ createPageManager(final StorageManager storageManager,
+ final Configuration configuration,
+ final ExecutorFactory executorFactory,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception
{
- PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
- 1000,
- null,
- executorFactory,
- false),
- storageManager,
- addressSettingsRepository);
+ PagingManager paging =
+ new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), 1000, null,
+ executorFactory, false), storageManager,
+ addressSettingsRepository);
paging.start();
return paging;
@@ -734,25 +627,22 @@
static class FakeJournal implements Journal
{
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean)
- */
- public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
+ public
+ void
+ appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
- */
- public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
+ public
+ void appendAddRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, byte[])
- */
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
@@ -761,9 +651,6 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
- */
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
@@ -772,81 +659,59 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
- */
public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
- */
public void appendDeleteRecord(final long id, final boolean sync) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, byte[])
- */
public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, org.hornetq.core.journal.EncodingSupport)
- */
- public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
+ public
+ void
+ appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long)
- */
public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean)
- */
- public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
+ public
+ void
+ appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
- */
- public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
+ public
+ void
+ appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
- */
public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean)
- */
- public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
+ public
+ void
+ appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
- */
public void appendUpdateRecord(final long id,
final byte recordType,
final EncodingSupport record,
@@ -855,9 +720,6 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, byte[])
- */
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
@@ -866,9 +728,6 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
- */
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
@@ -877,27 +736,18 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#getAlignment()
- */
public int getAlignment() throws Exception
{
return 0;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
- */
public JournalLoadInformation load(final LoaderCallback reloadManager) throws Exception
{
return new JournalLoadInformation();
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
- */
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure) throws Exception
@@ -906,58 +756,37 @@
return new JournalLoadInformation();
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#perfBlast(int)
- */
public void perfBlast(final int pages) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#isStarted()
- */
public boolean isStarted()
{
return false;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
- */
public void start() throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
public void stop() throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#loadInternalOnly()
- */
public JournalLoadInformation loadInternalOnly() throws Exception
{
return new JournalLoadInformation();
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#getNumberOfRecords()
- */
public int getNumberOfRecords()
{
return 0;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean, org.hornetq.core.journal.IOCompletion)
- */
public void appendAddRecord(final long id,
final byte recordType,
final byte[] record,
@@ -966,9 +795,6 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
- */
public void appendAddRecord(final long id,
final byte recordType,
final EncodingSupport record,
@@ -977,23 +803,16 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
- */
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
- */
- public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion completionCallback) throws Exception
+ public
+ void
+ appendDeleteRecord(final long id, final boolean sync, final IOCompletion completionCallback) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
- */
public void appendPrepareRecord(final long txID,
final EncodingSupport transactionData,
final boolean sync,
@@ -1001,9 +820,6 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean, org.hornetq.core.journal.IOCompletion)
- */
public void appendPrepareRecord(final long txID,
final byte[] transactionData,
final boolean sync,
@@ -1011,16 +827,12 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
- */
- public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
+ public
+ void
+ appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean, org.hornetq.core.journal.IOCompletion)
- */
public void appendUpdateRecord(final long id,
final byte recordType,
final byte[] record,
@@ -1029,9 +841,6 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
- */
public void appendUpdateRecord(final long id,
final byte recordType,
final EncodingSupport record,
@@ -1040,9 +849,6 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#sync(org.hornetq.core.journal.IOCompletion)
- */
public void sync(final IOCompletion callback)
{
}
@@ -1051,30 +857,23 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#getUserVersion()
- */
public int getUserVersion()
{
return 0;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean, org.hornetq.core.journal.IOCompletion, boolean)
- */
- public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+ public
+ void
+ appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
{
// TODO Auto-generated method stub
-
+
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
- */
public void lineUpContex(IOCompletion callback)
{
// TODO Auto-generated method stub
-
+
}
}
14 years, 9 months
JBoss hornetq SVN: r10978 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:33:34 -0400 (Wed, 13 Jul 2011)
New Revision: 10978
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/ServerSession.java
Log:
Remove unused imports.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/ServerSession.java 2011-07-13 15:33:07 UTC (rev 10977)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/ServerSession.java 2011-07-13 15:33:34 UTC (rev 10978)
@@ -15,12 +15,9 @@
import java.util.List;
import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.persistence.OperationContext;
14 years, 9 months
JBoss hornetq SVN: r10977 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-13 11:33:07 -0400 (Wed, 13 Jul 2011)
New Revision: 10977
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Rename lock as previous name was misleading
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-13 15:32:39 UTC (rev 10976)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-13 15:33:07 UTC (rev 10977)
@@ -204,8 +204,14 @@
// After a record is appended, the usedFile can't be changed until the positives and negatives are updated
private final ReentrantLock lockAppend = new ReentrantLock();
- /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
- private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
+ /**
+ * We don't lock the journal during the whole compacting operation. During compacting we only
+ * lock it (i) when gathering the initial structure, and (ii) when replicating the structures
+ * after finished compacting.
+ * <p>
+ * However we need to lock it while taking and updating snapshots
+ */
+ private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private volatile JournalFile currentFile;
@@ -817,7 +823,7 @@
{
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
try
{
@@ -851,7 +857,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -889,7 +895,7 @@
{
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
try
{
@@ -942,7 +948,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -962,7 +968,7 @@
{
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
try
{
@@ -1021,7 +1027,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -1038,7 +1044,7 @@
{
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
try
{
@@ -1071,7 +1077,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -1104,7 +1110,7 @@
{
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
try
{
@@ -1137,7 +1143,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -1150,7 +1156,7 @@
{
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
try
{
@@ -1181,7 +1187,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -1239,7 +1245,7 @@
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
JournalTransaction tx = getTransactionInfo(txID);
@@ -1273,7 +1279,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -1324,7 +1330,7 @@
{
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
JournalTransaction tx = transactions.remove(txID);
@@ -1362,7 +1368,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -1383,7 +1389,7 @@
{
checkJournalIsLoaded();
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
JournalTransaction tx = null;
@@ -1418,7 +1424,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
@@ -1640,7 +1646,7 @@
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
- compactingLock.writeLock().lock();
+ journalLock.writeLock().lock();
try
{
if (state != JournalState.LOADED)
@@ -1685,7 +1691,7 @@
}
finally
{
- compactingLock.writeLock().unlock();
+ journalLock.writeLock().unlock();
}
Collections.sort(dataFilesToProcess, new JournalFileComparator());
@@ -1721,7 +1727,7 @@
SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles(), null);
- compactingLock.writeLock().lock();
+ journalLock.writeLock().lock();
try
{
// Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
@@ -1785,7 +1791,7 @@
}
finally
{
- compactingLock.writeLock().unlock();
+ journalLock.writeLock().unlock();
}
// At this point the journal is unlocked. We keep renaming files while the journal is already operational
@@ -2239,7 +2245,7 @@
}
// We can't start reclaim while compacting is working
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
try
{
reclaimer.scan(getDataFiles());
@@ -2262,7 +2268,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
return false;
@@ -2484,7 +2490,7 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
- compactingLock.readLock().lock();
+ journalLock.readLock().lock();
try
{
lockAppend.lock();
@@ -2500,7 +2506,7 @@
}
finally
{
- compactingLock.readLock().unlock();
+ journalLock.readLock().unlock();
}
}
14 years, 9 months