JBoss hornetq SVN: r11126 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/server/cluster/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-04 20:39:07 -0400 (Thu, 04 Aug 2011)
New Revision: 11126
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
test fixes
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -57,6 +57,9 @@
public interface HornetQServer extends HornetQComponent
{
+ /** This method was created mainly for testing but it may be used in scenarios where
+ * you need to have more than one Server inside the same VM.
+ * This identity will be exposed on logs what may help you to debug issues on the log traces and debugs.*/
void setIdentity(String identity);
String getIdentity();
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -958,6 +958,17 @@
// for testing
public void clear()
{
+ for (Bridge bridge : bridges.values())
+ {
+ try
+ {
+ bridge.stop();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
bridges.clear();
for (ClusterConnection clusterConnection : clusterConnections.values())
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -849,6 +849,7 @@
return str.toString();
}
+
public void setIdentity(String identity)
{
this.identity = identity;
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -1476,22 +1476,24 @@
{
if (sharedStorage)
{
- server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node], node);
}
else
{
server = HornetQServers.newHornetQServer(configuration);
+ server.setIdentity("Server " + node);
}
}
else
{
if (sharedStorage)
{
- server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node], node);
}
else
{
server = HornetQServers.newHornetQServer(configuration, false);
+ server.setIdentity("Server " + node);
}
}
@@ -1553,22 +1555,24 @@
{
if (sharedStorage)
{
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
}
else
{
server = HornetQServers.newHornetQServer(configuration);
+ server.setIdentity("Server " + liveNode);
}
}
else
{
if (sharedStorage)
{
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
}
else
{
server = HornetQServers.newHornetQServer(configuration, false);
+ server.setIdentity("Server " + liveNode);
}
}
server.setIdentity(this.getClass().getSimpleName() + "/Backup(" + node + " of live " + liveNode + ")");
@@ -1632,22 +1636,24 @@
{
if (sharedStorage)
{
- server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node], node);
}
else
{
server = HornetQServers.newHornetQServer(configuration);
+ server.setIdentity("Server " + node);
}
}
else
{
if (sharedStorage)
{
- server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node], node);
}
else
{
server = HornetQServers.newHornetQServer(configuration, false);
+ server.setIdentity("Server " + node);
}
}
servers[node] = server;
@@ -1719,18 +1725,19 @@
{
if (sharedStorage)
{
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode], liveNode);
}
else
{
server = HornetQServers.newHornetQServer(configuration);
+ server.setIdentity("Server " + liveNode);
}
}
else
{
if (sharedStorage)
{
- server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
+ server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode], liveNode);
}
else
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -113,12 +113,12 @@
protected TestableServer createLiveServer()
{
- return new SameProcessHornetQServer(createInVMFailoverServer(true, liveConfig, nodeManager));
+ return new SameProcessHornetQServer(createInVMFailoverServer(true, liveConfig, nodeManager, 1));
}
protected TestableServer createBackupServer()
{
- return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager));
+ return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager, 2));
}
/**
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -25,6 +25,7 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -55,7 +56,7 @@
}
super.tearDown();
}
-
+
public void testMultipleFailovers2LiveServers() throws Exception
{
NodeManager nodeManager1 = new InVMNodeManager();
@@ -157,7 +158,7 @@
config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode);
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager)));
+ servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager, liveNode)));
}
protected void createLiveConfig(NodeManager nodeManager, int liveNode, int ... otherLiveNodes)
@@ -187,7 +188,7 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager)));
+ servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)));
}
protected boolean isNetty()
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -243,7 +243,8 @@
PagingFailoverTest.PAGE_SIZE,
PagingFailoverTest.PAGE_MAX,
new HashMap<String, AddressSettings>(),
- nodeManager);
+ nodeManager,
+ 2);
}
@Override
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -125,7 +125,7 @@
config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode);
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager)));
+ servers.put(nodeid, new SameProcessHornetQServer(createInVMFailoverServer(true, config1, nodeManager, nodeid)));
}
protected void createLiveConfig(int liveNode, int ... otherLiveNodes)
@@ -155,7 +155,7 @@
config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
- servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager)));
+ servers.put(liveNode, new SameProcessHornetQServer(createInVMFailoverServer(true, config0, nodeManager, liveNode)));
}
protected boolean isNetty()
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-08-04 23:34:24 UTC (rev 11125)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-08-05 00:39:07 UTC (rev 11126)
@@ -284,14 +284,16 @@
protected HornetQServer createInVMFailoverServer(final boolean realFiles,
final Configuration configuration,
- NodeManager nodeManager)
+ final NodeManager nodeManager,
+ final int id)
{
return createInVMFailoverServer(realFiles,
configuration,
-1,
-1,
new HashMap<String, AddressSettings>(),
- nodeManager);
+ nodeManager,
+ id);
}
protected HornetQServer createInVMFailoverServer(final boolean realFiles,
@@ -299,7 +301,8 @@
final int pageSize,
final int maxAddressSize,
final Map<String, AddressSettings> settings,
- NodeManager nodeManager)
+ NodeManager nodeManager,
+ final int id)
{
HornetQServer server;
HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
@@ -308,6 +311,8 @@
ManagementFactory.getPlatformMBeanServer(),
securityManager,
nodeManager);
+
+ server.setIdentity("Server " + id);
for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
{
12 years, 10 months
JBoss hornetq SVN: r11125 - in branches/Branch_2_2_EAP_cluster_clean2: tests/src/org/hornetq/tests/integration/http and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-04 19:34:24 -0400 (Thu, 04 Aug 2011)
New Revision: 11125
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/netty/HttpAcceptorHandler.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
fixing thread leakages on the testsuite
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/netty/HttpAcceptorHandler.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/netty/HttpAcceptorHandler.java 2011-08-04 20:33:29 UTC (rev 11124)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/netty/HttpAcceptorHandler.java 2011-08-04 23:34:24 UTC (rev 11125)
@@ -13,7 +13,7 @@
package org.hornetq.core.remoting.impl.netty;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -45,7 +45,7 @@
private final BlockingQueue<Runnable> delayedResponses = new LinkedBlockingQueue<Runnable>();
- private final Executor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, delayedResponses);
+ private final ExecutorService executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, delayedResponses);
private final HttpKeepAliveRunnable httpKeepAliveTask;
@@ -211,6 +211,19 @@
}
}
+
+
+ public void shutdown()
+ {
+ executor.shutdown();
+ try
+ {
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ }
+ }
/**
* a holder class so we know what time the request first arrived
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-08-04 20:33:29 UTC (rev 11124)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-08-04 23:34:24 UTC (rev 11125)
@@ -136,6 +136,8 @@
private final int nioRemotingThreads;
private final HttpKeepAliveRunnable httpKeepAliveRunnable;
+
+ private HttpAcceptorHandler httpHandler = null;
private final ConcurrentMap<Object, NettyConnection> connections = new ConcurrentHashMap<Object, NettyConnection>();
@@ -352,7 +354,8 @@
handlers.put("http-encoder", new HttpResponseEncoder());
- handlers.put("http-handler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
+ httpHandler = new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime);
+ handlers.put("http-handler", httpHandler);
}
if (protocol == ProtocolType.CORE)
@@ -555,6 +558,11 @@
e.printStackTrace();
}
}
+
+ if (httpHandler != null)
+ {
+ httpHandler.shutdown();
+ }
paused = false;
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java 2011-08-04 20:33:29 UTC (rev 11124)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java 2011-08-04 23:34:24 UTC (rev 11125)
@@ -19,9 +19,14 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2011-08-04 20:33:29 UTC (rev 11124)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2011-08-04 23:34:24 UTC (rev 11125)
@@ -33,6 +33,8 @@
import junit.framework.Assert;
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
import org.hornetq.api.core.TransportConfiguration;
@@ -209,6 +211,11 @@
context0 = null;
context1 = null;
+
+ // Shutting down Arjuna threads
+ TxControl.disable(true);
+
+ TransactionReaper.terminate(false);
super.tearDown();
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-04 20:33:29 UTC (rev 11124)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-04 23:34:24 UTC (rev 11125)
@@ -927,7 +927,6 @@
this.getName() +
" on this following dump"));
fail("test left broadcastgroupimpl running, this could effect other tests");
- // System.exit(0);
}
}
}
12 years, 10 months
JBoss hornetq SVN: r11124 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-04 16:33:29 -0400 (Thu, 04 Aug 2011)
New Revision: 11124
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-08-04 20:32:15 UTC (rev 11123)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java 2011-08-04 20:33:29 UTC (rev 11124)
@@ -87,6 +87,8 @@
protected void setUp() throws Exception
{
super.setUp();
+
+ forceGC();
server = createServer();
server.start();
12 years, 10 months
JBoss hornetq SVN: r11123 - in branches/Branch_2_2_EAP_cluster_clean2: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-04 16:32:15 -0400 (Thu, 04 Aug 2011)
New Revision: 11123
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-04 15:30:21 UTC (rev 11122)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-04 20:32:15 UTC (rev 11123)
@@ -1404,20 +1404,6 @@
}
}
- public static void shutdown()
- {
- if (globalScheduledThreadPool != null)
- {
- globalScheduledThreadPool.shutdown();
- globalScheduledThreadPool = null;
- }
- if (globalThreadPool != null)
- {
- globalThreadPool.shutdown();
- globalThreadPool = null;
- }
- }
-
class StaticConnector implements Serializable
{
private static final long serialVersionUID = 6772279632415242634l;
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-04 15:30:21 UTC (rev 11122)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-08-04 20:32:15 UTC (rev 11123)
@@ -111,6 +111,9 @@
}
locators = new ServerLocator[ClusterTestBase.MAX_SERVERS];
+
+ // To make sure the test will start with a clean VM
+ forceGC();
}
@@ -148,8 +151,6 @@
nodeManagers = null;
super.tearDown();
-
- // ServerLocatorImpl.shutdown();
}
// Private -------------------------------------------------------------------------------------------------------
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-04 15:30:21 UTC (rev 11122)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-04 20:32:15 UTC (rev 11123)
@@ -259,6 +259,7 @@
public static void forceGC()
{
+ log.info("#test forceGC");
WeakReference<Object> dumbReference = new WeakReference<Object>(new Object());
// A loop that will wait GC, using the minimal time as possible
while (dumbReference.get() != null)
@@ -266,12 +267,13 @@
System.gc();
try
{
- Thread.sleep(500);
+ Thread.sleep(100);
}
catch (InterruptedException e)
{
}
}
+ log.info("#test forceGC Done");
}
public static void forceGC(Reference<?> ref, long timeout)
@@ -930,13 +932,49 @@
}
}
+
+
+ StringBuffer buffer = null;
+
+ boolean failed = true;
+
+ long timeout = System.currentTimeMillis() + 10000;
+ while (failed && timeout > System.currentTimeMillis())
+ {
+ buffer = new StringBuffer();
+
+ failed = checkThread(buffer);
+
+ if (failed)
+ {
+ forceGC();
+ Thread.sleep(500);
+ log.info("There are still threads running, trying again");
+ }
+ }
+
+ if (failed)
+ {
+ logAndSystemOut("Thread leaged on test " + this.getClass().getName() + "::" +
+ this.getName() + "\n" + buffer.toString());
+ fail("Thread leakage");
+ }
+
+ super.tearDown();
+ }
+
+ /**
+ * @param buffer
+ * @return
+ */
+ private boolean checkThread(StringBuffer buffer)
+ {
+ boolean failedThread = false;
+
Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces();
- boolean failedThread = false;
if (postThreads.size() > previousThreads.size())
{
- StringBuffer buffer = new StringBuffer();
-
buffer.append("*********************************************************************************\n");
buffer.append("LEAKING THREADS\n");
@@ -958,13 +996,8 @@
}
buffer.append("*********************************************************************************\n");
- System.out.println(buffer.toString());
-
}
-
- //assertFalse("Thread Failed", failedThread);
-
- super.tearDown();
+ return failedThread;
}
/**
12 years, 10 months
JBoss hornetq SVN: r11122 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 5 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-04 11:30:21 -0400 (Thu, 04 Aug 2011)
New Revision: 11122
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Log:
HORNETQ-720 Avoid record counting in the journal during replication sync
- add new journal state "SYNC" as the Journal cannot be considered "LOADED".
- rename R*SendFileIdMessage to R*StartSyncMessage as it does more than fileID reservation
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -447,7 +447,7 @@
{
jf.setCanReclaim(false);
}
- replicator.reserveFileIds(datafiles, contentType);
+ replicator.sendStartSyncMessage(datafiles, contentType);
return datafiles;
}
@@ -1674,7 +1674,6 @@
JournalLoadInformation[] info = new JournalLoadInformation[2];
info[0] = bindingsJournal.loadInternalOnly();
info[1] = messageJournal.loadInternalOnly();
-
return info;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -106,7 +106,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -535,7 +535,7 @@
}
case PacketImpl.REPLICATION_FILE_ID:
{
- packet = new ReplicationFileIdMessage();
+ packet = new ReplicationStartSyncMessage();
break;
}
case PacketImpl.REPLICATION_SYNC:
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -1,69 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Sends all fileIDs used in the live server to the backup. This is done so that we:
- * <ol>
- * <li>reserve those IDs in the backup;
- * <li>start replicating while the journal synchronization is taking place.
- * </ol>
- */
-public class ReplicationFileIdMessage extends PacketImpl
-{
-
- private long[] ids;
- private JournalContent journalType;
-
- public ReplicationFileIdMessage()
- {
- super(REPLICATION_FILE_ID);
- }
-
- public ReplicationFileIdMessage(JournalFile[] datafiles, JournalContent contentType)
- {
- this();
- ids = new long[datafiles.length];
- for (int i = 0; i < datafiles.length; i++)
- {
- ids[i] = datafiles[i].getFileID();
- }
- journalType = contentType;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeByte(journalType.typeByte);
- buffer.writeInt(ids.length);
- for (long id : ids)
- {
- buffer.writeLong(id);
- }
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- journalType = JournalContent.getType(buffer.readByte());
- int length = buffer.readInt();
- ids = new long[length];
- for (int i = 0; i < length; i++)
- {
- ids[i] = buffer.readLong();
- }
- }
-
- public JournalContent getJournalContentType()
- {
- return journalType;
- }
-
- public long[] getFileIds()
- {
- return ids;
- }
-}
Copied: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java (from rev 11121, branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java)
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -0,0 +1,68 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Sends all fileIDs used in the live server to the backup. This is done so that we:
+ * <ol>
+ * <li>reserve those IDs in the backup;
+ * <li>start replicating while the journal synchronization is taking place.
+ * </ol>
+ */
+public class ReplicationStartSyncMessage extends PacketImpl
+{
+ private long[] ids;
+ private JournalContent journalType;
+
+ public ReplicationStartSyncMessage()
+ {
+ super(REPLICATION_FILE_ID);
+ }
+
+ public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
+ {
+ this();
+ ids = new long[datafiles.length];
+ for (int i = 0; i < datafiles.length; i++)
+ {
+ ids[i] = datafiles[i].getFileID();
+ }
+ journalType = contentType;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalType.typeByte);
+ buffer.writeInt(ids.length);
+ for (long id : ids)
+ {
+ buffer.writeLong(id);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ journalType = JournalContent.getType(buffer.readByte());
+ int length = buffer.readInt();
+ ids = new long[length];
+ for (int i = 0; i < length; i++)
+ {
+ ids[i] = buffer.readLong();
+ }
+ }
+
+ public JournalContent getJournalContentType()
+ {
+ return journalType;
+ }
+
+ public long[] getFileIds()
+ {
+ return ids;
+ }
+}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -99,7 +99,7 @@
* @param contentType
* @throws HornetQException
*/
- void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
+ void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
/**
* Informs backup that data synchronization is done.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -28,6 +28,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.ReplicatingJournal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
@@ -47,7 +48,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -56,6 +56,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
@@ -81,16 +82,18 @@
private Journal[] journals;
private JournalLoadInformation[] journalLoadInformation;
- // Files reserved in each journal for synchronization of existing data from the 'live' server
+
+ /** Files reserved in each journal for synchronization of existing data from the 'live' server. */
private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
new HashMap<JournalContent, Map<Long, JournalFile>>();
+ /** Used to hold the real Journals before the backup is synchronized. */
+ private final Map<JournalContent, Journal> journalsHolder = new HashMap<JournalContent, Journal>();
+
private JournalStorageManager storage;
private PagingManager pageManager;
-
-
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex =
new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
@@ -191,7 +194,7 @@
}
else if (type == PacketImpl.REPLICATION_FILE_ID)
{
- handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
+ handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
}
else if (type == PacketImpl.REPLICATION_SYNC)
{
@@ -236,11 +239,12 @@
server.getManagementService().setStorageManager(storage);
- registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
- registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
+ journalsHolder.put(JournalContent.BINDINGS, storage.getBindingsJournal());
+ journalsHolder.put(JournalContent.MESSAGES, storage.getMessageJournal());
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
+ System.out.println("State? " + journalsHolder.get(jc));
filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
}
@@ -257,7 +261,7 @@
pageManager.start();
- started = true;
+ started = true;
}
@@ -389,9 +393,9 @@
{
if (msg.isUpToDate())
{
- for (Journal journal2 : journals)
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- JournalImpl journal = (JournalImpl)journal2;
+ JournalImpl journal = (JournalImpl)journalsHolder.get(jc);
journal.writeLock();
try
{
@@ -401,6 +405,7 @@
}
// files should be already in place.
filesReservedForSync.remove(msg.getJournalContent());
+ registerJournal(jc.typeByte, journalsHolder.get(jc));
// XXX HORNETQ-720 must reload journals
// XXX HORNETQ-720 must start using real journals
}
@@ -417,6 +422,7 @@
long id = msg.getFileId();
JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
+
byte[] data = msg.getData();
if (data == null)
{
@@ -433,18 +439,27 @@
}
}
- private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws Exception
+ /**
+ * Reserves files (with the given fileID) in the specified journal, and places a
+ * {@link ReplicatingJournal} in place to store messages while synchronization is going on.
+ * @param packet
+ * @throws Exception
+ */
+ private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception
{
if (server.isRemoteBackupUpToDate())
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup can not be up-to-date!");
}
- final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+ final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
JournalImpl journal = assertJournalImpl(journalIf);
- journal.createFilesForRemoteSync(packet.getFileIds(), filesReservedForSync.get(packet.getJournalContentType()));
+ Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
+ JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
+ registerJournal(packet.getJournalContentType().typeByte, new ReplicatingJournal(current));
}
+ // XXX HORNETQ-720 really need to do away with this once the method calls get stable.
private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException
{
if (!(journalIf instanceof JournalImpl))
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -44,7 +44,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -52,6 +51,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.utils.ExecutorFactory;
@@ -509,7 +509,7 @@
public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
{
SequentialFile file = jf.getFile().copy();
- log.info("Replication: sending " + jf + " to backup. " + file);
+ log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
if (!file.isOpen())
{
file.open(1, false);
@@ -531,9 +531,9 @@
}
@Override
- public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
+ public void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
{
- sendReplicatePacket(new ReplicationFileIdMessage(datafiles, contentType));
+ sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType));
}
@Override
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -565,7 +565,7 @@
if (liveServerSessionFactory == null)
{
- // XXX
+ // XXX HORNETQ-720
throw new RuntimeException("Need to retry...");
}
CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -77,7 +77,9 @@
private enum JournalState
{
- STOPPED, STARTED, LOADED;
+ STOPPED, STARTED,
+ /** When a replicating server is still not synchronized with its live. */
+ SYNCING, LOADED;
}
// Constants -----------------------------------------------------
@@ -100,7 +102,7 @@
// Journal
private static final void trace(final String message)
{
- JournalImpl.log.trace(message);
+ JournalImpl.log.info(message);
}
private static final void traceRecord(final String message)
@@ -304,6 +306,12 @@
this.userVersion = userVersion;
}
+ @Override
+ public String toString()
+ {
+ return super.toString() + " " + state;
+ }
+
public void runDirectJournalBlast() throws Exception
{
final int numIts = 100000000;
@@ -411,7 +419,6 @@
// since we can re-use dataFiles
Collections.sort(orderedFiles, new JournalFileComparator());
-
return orderedFiles;
}
@@ -1425,6 +1432,7 @@
}
}
+ // XXX make it protected?
public int getAlignment() throws Exception
{
return fileFactory.getAlignment();
@@ -1458,7 +1466,7 @@
}
};
- return this.load(dummyLoader);
+ return this.load(dummyLoader, true, true);
}
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
@@ -1765,9 +1773,9 @@
localCompactor.replayPendingCommands();
- // Merge transactions back after compacting
- // This has to be done after the replay pending commands, as we need to delete committs that happened during
- // the compacting
+ // Merge transactions back after compacting.
+ // This has to be done after the replay pending commands, as we need to delete commits
+ // that happened during the compacting
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
{
@@ -1868,11 +1876,20 @@
return load(loadManager, true);
}
- public synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions) throws Exception
+ public JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions)
+ throws Exception
{
+ return load(loadManager, fixFailingTransactions, false);
+ }
+
+ private synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions,
+ final boolean replicationSync) throws Exception
+ {
+
if (state != JournalState.STARTED)
{
- throw new IllegalStateException("Journal " + this + " must be in started state, was " + state);
+ throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
+ state);
}
checkControlFile();
@@ -2162,6 +2179,13 @@
}
}
+ if (replicationSync)
+ {
+ assert filesRepository.getDataFiles().isEmpty();
+ setJournalState(JournalState.SYNCING);
+ return new JournalLoadInformation(0, -1);
+ }
+
// Create any more files we need
filesRepository.ensureMinFiles();
@@ -3144,11 +3168,6 @@
}
}
- private HornetQBuffer newBuffer(final int size)
- {
- return HornetQBuffers.fixedBuffer(size);
- }
-
// Inner classes
// ---------------------------------------------------------------------------
@@ -3157,11 +3176,6 @@
private static NullEncoding instance = new NullEncoding();
- public static NullEncoding getInstance()
- {
- return NullEncoding.instance;
- }
-
public void decode(final HornetQBuffer buffer)
{
}
@@ -3271,9 +3285,10 @@
/**
* @param fileIds
+ * @return
* @throws Exception
*/
- public void createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
+ public JournalFile createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
{
writeLock();
try
@@ -3285,14 +3300,18 @@
maxID = Math.max(maxID, id);
map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
}
- if (maxID > 0)
- {
- filesRepository.setNextFileID(maxID);
- }
+ maxID += 1;
+ filesRepository.setNextFileID(maxID);
+ return filesRepository.createRemoteBackupSyncFile(maxID);
}
finally
{
writeUnlock();
}
}
+
+ public boolean getAutoReclaim()
+ {
+ return autoReclaim;
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-04 15:30:21 UTC (rev 11122)
@@ -1,29 +1,277 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.SequentialFileFactory;
+import java.util.List;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
+
/**
* Journal used at a replicating backup server during the synchronization of data with the 'live'
* server.
+ * <p>
+ * Its main purpose is to store the data like a Journal would but without verifying records.
*/
-public class ReplicatingJournal extends JournalImpl
+public class ReplicatingJournal implements Journal
{
+ private final JournalFile file;
+
/**
- * @param fileSize
- * @param minFiles
- * @param compactMinFiles
- * @param compactPercentage
- * @param fileFactory
- * @param filePrefix
- * @param fileExtension
- * @param maxAIO
+ * @param file
*/
- public ReplicatingJournal(int fileSize, int minFiles, int compactMinFiles, int compactPercentage,
- SequentialFileFactory fileFactory, String filePrefix, String fileExtension, int maxAIO)
+ public ReplicatingJournal(JournalFile file)
{
- super(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO);
+ this.file = file;
}
+ @Override
+ public void start() throws Exception
+ {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void
+ appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecord(long id, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendCommitRecord(long txID, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void
+ appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void
+ appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendRollbackRecord(long txID, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void lineUpContex(IOCompletion callback)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getAlignment() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumberOfRecords()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getUserVersion()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void perfBlast(int pages)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void runDirectJournalBlast() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
}
12 years, 10 months
JBoss hornetq SVN: r11121 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-04 11:28:56 -0400 (Thu, 04 Aug 2011)
New Revision: 11121
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
HORNETQ-720 Only compare journals if up-to-date.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04 15:28:12 UTC (rev 11120)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04 15:28:56 UTC (rev 11121)
@@ -325,6 +325,11 @@
public void compareJournalInformation(final JournalLoadInformation[] journalInformation) throws HornetQException
{
+ if (!server.isRemoteBackupUpToDate())
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot compare journals if not in sync!");
+ }
+
if (journalLoadInformation == null || journalLoadInformation.length != journalInformation.length)
{
throw new HornetQException(HornetQException.INTERNAL_ERROR,
12 years, 10 months
JBoss hornetq SVN: r11120 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-04 11:28:12 -0400 (Thu, 04 Aug 2011)
New Revision: 11120
Added:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-720 Move the sync-files control into ReplicationEndpointImpl
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04 15:26:34 UTC (rev 11119)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04 15:28:12 UTC (rev 11120)
@@ -14,6 +14,9 @@
package org.hornetq.core.replication.impl;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -77,13 +80,17 @@
private Channel channel;
private Journal[] journals;
+ private JournalLoadInformation[] journalLoadInformation;
+ // Files reserved in each journal for synchronization of existing data from the 'live' server
+ private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
+ new HashMap<JournalContent, Map<Long, JournalFile>>();
private JournalStorageManager storage;
private PagingManager pageManager;
- private JournalLoadInformation[] journalLoadInformation;
+
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex =
new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
@@ -104,6 +111,7 @@
public void registerJournal(final byte id, final Journal journal)
{
+
if (journals == null || id >= journals.length)
{
Journal[] oldJournals = journals;
@@ -228,9 +236,14 @@
server.getManagementService().setStorageManager(storage);
+ registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
- registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+ {
+ filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
+ }
+
// We only need to load internal structures on the backup...
journalLoadInformation = storage.loadInternalOnly();
@@ -371,22 +384,34 @@
{
if (msg.isUpToDate())
{
- // XXX HORNETQ-720 must reload journals(?)
- for (Journal j : journals)
+ for (Journal journal2 : journals)
{
- JournalImpl journal = (JournalImpl)j;
- journal.finishRemoteBackupSync();
+ JournalImpl journal = (JournalImpl)journal2;
+ journal.writeLock();
+ try
+ {
+ if (journal.getDataFiles().length != 0)
+ {
+ throw new IllegalStateException("Journal should not have any data files at this point");
+ }
+ // files should be already in place.
+ filesReservedForSync.remove(msg.getJournalContent());
+ // XXX HORNETQ-720 must reload journals
+ // XXX HORNETQ-720 must start using real journals
+ }
+ finally
+ {
+ journal.writeUnlock();
+ }
+
}
server.setRemoteBackupUpToDate();
log.info("Backup server " + server + " is synchronized with live-server.");
return;
}
- Journal journalIf = getJournal(msg.getJournalContent().typeByte);
- JournalImpl journal = assertJournalImpl(journalIf);
-
long id = msg.getFileId();
- JournalFile journalFile = journal.getRemoteBackupSyncFile(id);
+ JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
byte[] data = msg.getData();
if (data == null)
{
@@ -401,7 +426,6 @@
}
sf.writeDirect(ByteBuffer.wrap(data), true);
}
- // journal.get
}
private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws Exception
@@ -413,7 +437,7 @@
final Journal journalIf = journals[packet.getJournalContentType().typeByte];
JournalImpl journal = assertJournalImpl(journalIf);
- journal.createFilesForRemoteSync(packet.getFileIds());
+ journal.createFilesForRemoteSync(packet.getFileIds(), filesReservedForSync.get(packet.getJournalContentType()));
}
private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-04 15:26:34 UTC (rev 11119)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-04 15:28:12 UTC (rev 11120)
@@ -14,9 +14,7 @@
package org.hornetq.core.journal.impl;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -65,8 +63,6 @@
private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
- private Map<Long, JournalFile> filesReservedForSync;
-
private final AtomicLong nextFileID = new AtomicLong(0);
private final int maxAIO;
@@ -447,14 +443,9 @@
return nextFile;
}
- public void createRemoteBackupSyncFile(long fileID) throws Exception
+ public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception
{
- if (filesReservedForSync == null)
- {
- filesReservedForSync = new HashMap<Long, JournalFile>();
- }
- assert !filesReservedForSync.containsKey(Long.valueOf(fileID));
- filesReservedForSync.put(Long.valueOf(fileID), createFile(false, false, false, false, fileID));
+ return createFile(false, false, false, false, fileID);
}
// Package protected ---------------------------------------------
@@ -477,10 +468,8 @@
{
long fileID = fileIdPreSet != -1 ? fileIdPreSet : generateFileID();
- String fileName;
+ final String fileName = createFileName(tmpCompact, fileID);
- fileName = createFileName(tmpCompact, fileID);
-
if (JournalFilesRepository.trace)
{
JournalFilesRepository.trace("Creating file " + fileName);
@@ -583,23 +572,4 @@
return jf;
}
-
- /**
- * @param id
- * @return
- */
- public JournalFile getRemoteBackupSyncFile(long id)
- {
- return filesReservedForSync.get(Long.valueOf(id));
- }
-
- public Collection<? extends JournalFile> getSyncFiles()
- {
- return filesReservedForSync.values();
- }
-
- public void clearSyncFiles()
- {
- filesReservedForSync.clear();
- }
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-04 15:26:34 UTC (rev 11119)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-04 15:28:12 UTC (rev 11120)
@@ -3199,7 +3199,7 @@
}
- private static class JournalFileComparator implements Comparator<JournalFile>
+ public static class JournalFileComparator implements Comparator<JournalFile>
{
public int compare(final JournalFile f1, final JournalFile f2)
{
@@ -3273,7 +3273,7 @@
* @param fileIds
* @throws Exception
*/
- public void createFilesForRemoteSync(long[] fileIds) throws Exception
+ public void createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
{
writeLock();
try
@@ -3283,7 +3283,7 @@
for (long id : fileIds)
{
maxID = Math.max(maxID, id);
- filesRepository.createRemoteBackupSyncFile(id);
+ map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
}
if (maxID > 0)
{
@@ -3295,42 +3295,4 @@
writeUnlock();
}
}
-
- /**
- * @param id
- * @return
- */
- public JournalFile getRemoteBackupSyncFile(long id)
- {
- return filesRepository.getRemoteBackupSyncFile(id);
- }
-
- /**
- *
- */
- public void finishRemoteBackupSync()
- {
- writeLock();
- try
- {
- lockAppend.lock();
- List<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>();
- dataFilesToProcess.addAll(filesRepository.getDataFiles());
- filesRepository.clearDataFiles();
- dataFilesToProcess.addAll(filesRepository.getSyncFiles());
- filesRepository.clearSyncFiles();
- Collections.sort(dataFilesToProcess, new JournalFileComparator());
- for (JournalFile file : dataFilesToProcess)
- {
- filesRepository.addDataFileOnTop(file);
- }
- // XXX HORNETQ-720 still missing a "reload" call?
- }
- finally
- {
- lockAppend.unlock();
- writeUnlock();
- }
-
- }
}
Added: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-04 15:28:12 UTC (rev 11120)
@@ -0,0 +1,29 @@
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+
+/**
+ * Journal used at a replicating backup server during the synchronization of data with the 'live'
+ * server.
+ */
+public class ReplicatingJournal extends JournalImpl
+{
+
+ /**
+ * @param fileSize
+ * @param minFiles
+ * @param compactMinFiles
+ * @param compactPercentage
+ * @param fileFactory
+ * @param filePrefix
+ * @param fileExtension
+ * @param maxAIO
+ */
+ public ReplicatingJournal(int fileSize, int minFiles, int compactMinFiles, int compactPercentage,
+ SequentialFileFactory fileFactory, String filePrefix, String fileExtension, int maxAIO)
+ {
+ super(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO);
+ }
+
+
+}
12 years, 10 months
JBoss hornetq SVN: r11119 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-04 11:26:34 -0400 (Thu, 04 Aug 2011)
New Revision: 11119
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 Make sure autoReclaim is reset to its original value after we are done.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-04 14:00:25 UTC (rev 11118)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-04 15:26:34 UTC (rev 11119)
@@ -366,38 +366,56 @@
JournalFile[] messageFiles = null;
JournalFile[] bindingsFiles = null;
- // XXX HORNETQ-720 WRITE LOCK the StorageManager.
- storageManagerLock.writeLock().lock();
+ final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+ final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+
+ final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
+ final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
+
try
{
- final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
- final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+ // XXX HORNETQ-720 WRITE LOCK the StorageManager.
+ storageManagerLock.writeLock().lock();
+ try
+ {
-
- localMessageJournal.writeLock();
- localBindingsJournal.writeLock();
+ localMessageJournal.writeLock();
+ localBindingsJournal.writeLock();
+ try
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ }
+ finally
+ {
+ localMessageJournal.writeUnlock();
+ localBindingsJournal.writeUnlock();
+ }
+ bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+ }
+ finally
+ {
+ // XXX HORNETQ-720 UNLOCK StorageManager...
+ storageManagerLock.writeLock().unlock();
+ }
+ sendJournalFile(messageFiles, JournalContent.MESSAGES);
+ sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
try
{
- messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ storageManagerLock.writeLock().lock();
+ replicator.sendSynchronizationDone();
}
finally
{
- localMessageJournal.writeUnlock();
- localBindingsJournal.writeUnlock();
+ storageManagerLock.writeLock().unlock();
}
- bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
- messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
}
finally
{
- // XXX HORNETQ-720 UNLOCK StorageManager...
- storageManagerLock.writeLock().unlock();
+ localMessageJournal.setAutoReclaim(messageJournalAutoReclaim);
+ localBindingsJournal.setAutoReclaim(bindingsJournalAutoReclaim);
}
- sendJournalFile(messageFiles, JournalContent.MESSAGES);
- sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
-
- replicator.sendSynchronizationDone();
}
/**
12 years, 10 months
JBoss hornetq SVN: r11118 - tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-04 10:00:25 -0400 (Thu, 04 Aug 2011)
New Revision: 11118
Added:
tags/HornetQ_2_2_7_Final_pending/
Log:
pending release for 2.2.7.Final
12 years, 10 months
JBoss hornetq SVN: r11117 - branches/Branch_2_2_AS7/docs.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-04 09:16:45 -0400 (Thu, 04 Aug 2011)
New Revision: 11117
Modified:
branches/Branch_2_2_AS7/docs/README.html
Log:
updated release notes
Modified: branches/Branch_2_2_AS7/docs/README.html
===================================================================
--- branches/Branch_2_2_AS7/docs/README.html 2011-08-04 12:41:38 UTC (rev 11116)
+++ branches/Branch_2_2_AS7/docs/README.html 2011-08-04 13:16:45 UTC (rev 11117)
@@ -7,16 +7,16 @@
</head>
<body>
-<h1>Release Notes - HornetQ - Version 2.2.6 Final</h1>
+<h1>Release Notes - HornetQ - Version 2.2.7 Final</h1>
<br>
-<h2>20th June 2010</h2>
+<h2>5th August 2010</h2>
-These are the release notes for HornetQ 2.2.6 Final<br><br>
+These are the release notes for HornetQ 2.2.7 Final<br><br>
For full description of the contents please see the
-<a href="https://issues.jboss.org/secure/ReleaseNote.jspa?atl_token=AQZJ-FV3A-N91S...">HornetQ project JIRA</a>.<br><br>
+<a href="https://issues.jboss.org/secure/ReleaseNote.jspa?atl_token=AQZJ-FV3A-N91S...">HornetQ project JIRA</a>.<br><br>
This release contains minor fixes required for the Application Server 7 integration.
12 years, 10 months