JBoss hornetq SVN: r11280 - branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 18:35:21 -0400 (Thu, 01 Sep 2011)
New Revision: 11280
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
just format (as it was format on the original branch) to make it easy to merge
Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 22:27:06 UTC (rev 11279)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 22:35:21 UTC (rev 11280)
@@ -88,12 +88,12 @@
{
return 1024;
}
-
+
protected boolean isLargeMessage()
{
return false;
}
-
+
@Override
protected void setUp() throws Exception
{
@@ -148,13 +148,11 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
-
-
nodeManagers = null;
super.tearDown();
- // ServerLocatorImpl.shutdown();
+ // ServerLocatorImpl.shutdown();
}
// Private -------------------------------------------------------------------------------------------------------
@@ -199,7 +197,7 @@
{
return consumers[node].consumer;
}
-
+
protected void waitForMessages(final int node, final String address, final int count) throws Exception
{
HornetQServer server = servers[node];
@@ -458,7 +456,7 @@
if (holder != null)
{
holder.consumer.close();
- // holder.session.close();
+ // holder.session.close();
consumers[i] = null;
}
@@ -532,7 +530,7 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
-
+
if (isLargeMessage())
{
message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -545,15 +543,14 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
-
producer.send(message);
-
+
if (i % 100 == 0)
{
session.commit();
}
}
-
+
session.commit();
}
finally
@@ -596,7 +593,7 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
-
+
if (isLargeMessage())
{
message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -781,10 +778,9 @@
for (int j = msgStart; j < msgEnd; j++)
{
-
+
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
-
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -796,13 +792,12 @@
if (isLargeMessage())
{
- for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
{
assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
}
}
-
if (ack)
{
message.acknowledge();
@@ -838,7 +833,7 @@
}
}
}
-
+
protected String clusterDescription(HornetQServer server)
{
String br = "-------------------------\n";
@@ -1196,7 +1191,7 @@
{
res[j++] = i;
}
-
+
if (ack)
{
// just to flush acks
@@ -1262,7 +1257,6 @@
sfs[node] = sf;
}
-
protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
{
if (sfs[node] != null)
@@ -1313,7 +1307,6 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
}
-
locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
@@ -1349,75 +1342,75 @@
final boolean fileStorage,
final boolean sharedStorage,
final boolean netty)
+ {
+ if (servers[node] != null)
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setSharedStore(sharedStorage);
- configuration.setThreadPoolMaxSize(10);
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ configuration.setThreadPoolMaxSize(10);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node,
+ netty)));
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
if (sharedStorage)
{
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
}
else
{
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ server = HornetQServers.newHornetQServer(configuration);
}
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
-
- configuration.getAcceptorConfigurations().clear();
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
-
- HornetQServer server;
-
- if (fileStorage)
+ }
+ else
+ {
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
+ server = HornetQServers.newHornetQServer(configuration, false);
}
- servers[node] = server;
}
+ servers[node] = server;
+ }
-
- protected void setupBackupServer(final int node,
- final int liveNode,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty)
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
{
if (servers[node] != null)
{
@@ -1454,7 +1447,7 @@
configuration.getAcceptorConfigurations().clear();
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
configuration.getAcceptorConfigurations().add(acceptorConfig);
- //add backup connector
+ // add backup connector
TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
@@ -1488,171 +1481,180 @@
}
protected void setupLiveServerWithDiscovery(final int node,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setBackup(false);
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+ configuration.setBackup(false);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false, params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 1000,
- connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+ groupAddress,
+ port,
+ 1000,
+ connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+ null,
+ groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
protected void setupBackupServerWithDiscovery(final int node,
- final int liveNode,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ final int liveNode,
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
- configuration.setClustered(true);
- configuration.setBackup(true);
+ configuration.setSecurityEnabled(false);
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setBackup(true);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false, params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 1000,
- connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+ groupAddress,
+ port,
+ 1000,
+ connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+ null,
+ groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
-
protected void clearServer(final int... nodes)
{
for (int i = 0; i < nodes.length; i++)
@@ -1689,12 +1691,12 @@
{
throw new IllegalStateException("No server at node " + nodeFrom);
}
-
+
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
List<String> pairs = null;
-
+
if (nodeTo != -1)
{
TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
@@ -1711,11 +1713,11 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, allowDirectConnectionsOnly);
+ pairs,
+ allowDirectConnectionsOnly);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
-
protected void setupClusterConnection(final String name,
final String address,
final boolean forwardWhenNoConsumers,
@@ -1733,7 +1735,7 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
-
+
List<String> pairs = new ArrayList<String>();
for (int element : nodesTo)
{
@@ -1750,7 +1752,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, false);
+ pairs,
+ false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1789,7 +1792,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, false);
+ pairs,
+ false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1811,7 +1815,7 @@
TransportConfiguration connectorConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-
+
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
address,
name,
@@ -1844,23 +1848,22 @@
}
for (int node : nodes)
{
- //wait for each server to start, it may be a backup and started in a separate thread
+ // wait for each server to start, it may be a backup and started in a separate thread
waitForServer(servers[node]);
}
}
- private void waitForServer(HornetQServer server)
- throws InterruptedException
+ private void waitForServer(HornetQServer server) throws InterruptedException
{
- long timetowait =System.currentTimeMillis() + 5000;
- while(!server.isStarted())
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted())
{
Thread.sleep(100);
- if(server.isStarted())
+ if (server.isStarted())
{
break;
}
- else if(System.currentTimeMillis() > timetowait)
+ else if (System.currentTimeMillis() > timetowait)
{
fail("server didnt start");
}
13 years, 4 months
JBoss hornetq SVN: r11279 - in branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 18:27:06 -0400 (Thu, 01 Sep 2011)
New Revision: 11279
Added:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
back porting JBPAPP-7115
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-01 22:27:06 UTC (rev 11279)
@@ -143,6 +143,14 @@
*/
protected MessageImpl(final MessageImpl other)
{
+ this(other, other.getProperties());
+ }
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other, TypedProperties properties)
+ {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -151,7 +159,7 @@
expiration = other.getExpiration();
timestamp = other.getTimestamp();
priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ this.properties = new TypedProperties(properties);
// This MUST be synchronized using the monitor on the other message to prevent it running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01 22:27:06 UTC (rev 11279)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
/**
* A LargeServerMessageImpl
@@ -70,12 +71,13 @@
/**
* Copy constructor
+ * @param properties
* @param copy
* @param fileCopy
*/
- private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
+ private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, properties);
linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -281,8 +283,30 @@
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
+
+ @Override
+ public synchronized ServerMessage copy()
+ {
+ incrementDelayDeletionCount();
+ long idToUse = messageID;
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ : (LargeServerMessageImpl)linkMessage,
+ properties,
+ newfile,
+ messageID);
+ return newMessage;
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
@@ -301,6 +325,7 @@
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
: (LargeServerMessageImpl)linkMessage,
+ properties,
newfile,
newID);
return newMessage;
@@ -317,7 +342,7 @@
file.copyTo(newFile);
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
newMessage.linkMessage = null;
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-01 22:27:06 UTC (rev 11279)
@@ -326,7 +326,7 @@
// Consumer implementation ---------------------------------------
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
if (useDuplicateDetection)
{
@@ -338,10 +338,20 @@
if (transformer != null)
{
- message = transformer.transform(message);
+ final ServerMessage transformedMessage = transformer.transform(message);
+ if (transformedMessage != message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+ }
+ }
+ return transformedMessage;
}
-
- return message;
+ else
+ {
+ return message;
+ }
}
/**
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-01 22:27:06 UTC (rev 11279)
@@ -113,34 +113,50 @@
}
@Override
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- message = message.copy();
+ ServerMessage messageCopy = message.copy();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
+ }
// TODO - we can optimise this
- Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+ Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+ if (queueIds == null)
+ {
+ // Sanity check only
+ log.warn("no queue IDs defined!, originalMessage = " + message +
+ ", copiedMessage = " +
+ messageCopy +
+ ", props=" +
+ idsHeaderName);
+ throw new IllegalStateException("no queueIDs defined");
+ }
for (SimpleString propName : propNames)
{
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(propName);
+ messageCopy.removeProperty(propName);
}
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- message = super.beforeForward(message);
-
- return message;
+ messageCopy = super.beforeForward(messageCopy);
+
+ return messageCopy;
}
private void setupNotificationConsumer() throws Exception
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-01 22:27:06 UTC (rev 11279)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
/**
*
@@ -89,6 +90,14 @@
super(other);
}
+ /*
+ * Copy constructor
+ */
+ protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties)
+ {
+ super(other, properties);
+ }
+
public boolean isServerMessage()
{
return true;
@@ -193,6 +202,7 @@
public ServerMessage copy()
{
+ // This is a simple copy, used only to avoid changing original properties
return new ServerMessageImpl(this);
}
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 22:27:06 UTC (rev 11279)
@@ -71,6 +71,16 @@
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
+ public ClusterTestBase()
+ {
+ super();
+ }
+
+ public ClusterTestBase(String name)
+ {
+ super(name);
+ }
+
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
@@ -88,12 +98,12 @@
{
return 1024;
}
-
+
protected boolean isLargeMessage()
{
return false;
}
-
+
@Override
protected void setUp() throws Exception
{
@@ -148,13 +158,11 @@
consumers = new ConsumerHolder[ClusterTestBase.MAX_CONSUMERS];
-
-
nodeManagers = null;
super.tearDown();
- // ServerLocatorImpl.shutdown();
+ // ServerLocatorImpl.shutdown();
}
// Private -------------------------------------------------------------------------------------------------------
@@ -199,7 +207,7 @@
{
return consumers[node].consumer;
}
-
+
protected void waitForMessages(final int node, final String address, final int count) throws Exception
{
HornetQServer server = servers[node];
@@ -458,7 +466,7 @@
if (holder != null)
{
holder.consumer.close();
- // holder.session.close();
+ // holder.session.close();
consumers[i] = null;
}
@@ -532,7 +540,7 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
-
+
if (isLargeMessage())
{
message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -545,7 +553,6 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
-
producer.send(message);
}
}
@@ -589,7 +596,7 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
-
+
if (isLargeMessage())
{
message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
@@ -774,10 +781,9 @@
for (int j = msgStart; j < msgEnd; j++)
{
-
+
ClientMessage message = holder.consumer.receive(WAIT_TIMEOUT);
-
-
+
if (message == null)
{
ClusterTestBase.log.info("*** dumping consumers:");
@@ -789,13 +795,12 @@
if (isLargeMessage())
{
- for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
{
assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
}
}
-
if (ack)
{
message.acknowledge();
@@ -831,7 +836,7 @@
}
}
}
-
+
protected String clusterDescription(HornetQServer server)
{
String br = "-------------------------\n";
@@ -1076,6 +1081,14 @@
{
int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+
// log.info("consumer " + consumerIDs[i] + " received message " + count);
Assert.assertFalse(counts.contains(count));
@@ -1169,6 +1182,15 @@
message = consumer.consumer.receive(500);
if (message != null)
{
+
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+
if (ack)
{
message.acknowledge();
@@ -1189,7 +1211,7 @@
{
res[j++] = i;
}
-
+
if (ack)
{
// just to flush acks
@@ -1255,7 +1277,6 @@
sfs[node] = sf;
}
-
protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
{
if (sfs[node] != null)
@@ -1306,7 +1327,6 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
}
-
locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
locators[node].setRetryInterval(100);
locators[node].setRetryIntervalMultiplier(1d);
@@ -1342,74 +1362,74 @@
final boolean fileStorage,
final boolean sharedStorage,
final boolean netty)
+ {
+ if (servers[node] != null)
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setSharedStore(sharedStorage);
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setJournalCompactMinFiles(0);
+
+ configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node,
+ netty)));
+
+ HornetQServer server;
+
+ if (fileStorage)
+ {
if (sharedStorage)
{
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
}
else
{
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ server = HornetQServers.newHornetQServer(configuration);
}
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
-
- configuration.getAcceptorConfigurations().clear();
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, generateParams(node, netty)));
-
- HornetQServer server;
-
- if (fileStorage)
+ }
+ else
+ {
+ if (sharedStorage)
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
}
else
{
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
+ server = HornetQServers.newHornetQServer(configuration, false);
}
- servers[node] = server;
}
+ servers[node] = server;
+ }
-
- protected void setupBackupServer(final int node,
- final int liveNode,
- final boolean fileStorage,
- final boolean sharedStorage,
- final boolean netty)
+ protected void setupBackupServer(final int node,
+ final int liveNode,
+ final boolean fileStorage,
+ final boolean sharedStorage,
+ final boolean netty)
{
if (servers[node] != null)
{
@@ -1446,7 +1466,7 @@
configuration.getAcceptorConfigurations().clear();
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));
configuration.getAcceptorConfigurations().add(acceptorConfig);
- //add backup connector
+ // add backup connector
TransportConfiguration liveConfig = createTransportConfiguration(netty, false, generateParams(liveNode, netty));
configuration.getConnectorConfigurations().put(liveConfig.getName(), liveConfig);
TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
@@ -1480,171 +1500,180 @@
}
protected void setupLiveServerWithDiscovery(final int node,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setBackup(false);
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, false));
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+ configuration.setJournalType(getDefaultJournalType());
+ configuration.setJournalMaxIO_AIO(1000);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+ configuration.setBackup(false);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false, params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 1000,
- connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+ groupAddress,
+ port,
+ 1000,
+ connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+ null,
+ groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[node]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
protected void setupBackupServerWithDiscovery(final int node,
- final int liveNode,
- final String groupAddress,
- final int port,
- final boolean fileStorage,
- final boolean netty,
- final boolean sharedStorage)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " + node);
- }
+ final int liveNode,
+ final String groupAddress,
+ final int port,
+ final boolean fileStorage,
+ final boolean netty,
+ final boolean sharedStorage)
+ {
+ if (servers[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at node " + node);
+ }
- Configuration configuration = createBasicConfig();
+ Configuration configuration = createBasicConfig();
- configuration.setSecurityEnabled(false);
- configuration.setSharedStore(sharedStorage);
- if (sharedStorage)
- {
- // Shared storage will share the node between the backup and live node
- configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
- configuration.setJournalDirectory(getJournalDir(liveNode, false));
- configuration.setPagingDirectory(getPageDir(liveNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
- }
- else
- {
- configuration.setBindingsDirectory(getBindingsDir(node, true));
- configuration.setJournalDirectory(getJournalDir(node, true));
- configuration.setPagingDirectory(getPageDir(node, true));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
- }
- configuration.setClustered(true);
- configuration.setBackup(true);
+ configuration.setSecurityEnabled(false);
+ configuration.setSharedStore(sharedStorage);
+ if (sharedStorage)
+ {
+ // Shared storage will share the node between the backup and live node
+ configuration.setBindingsDirectory(getBindingsDir(liveNode, false));
+ configuration.setJournalDirectory(getJournalDir(liveNode, false));
+ configuration.setPagingDirectory(getPageDir(liveNode, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(liveNode, false));
+ }
+ else
+ {
+ configuration.setBindingsDirectory(getBindingsDir(node, true));
+ configuration.setJournalDirectory(getJournalDir(node, true));
+ configuration.setPagingDirectory(getPageDir(node, true));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, true));
+ }
+ configuration.setClustered(true);
+ configuration.setBackup(true);
- configuration.getAcceptorConfigurations().clear();
+ configuration.getAcceptorConfigurations().clear();
- Map<String, Object> params = generateParams(node, netty);
+ Map<String, Object> params = generateParams(node, netty);
- configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
- TransportConfiguration connector = createTransportConfiguration(netty, false, params);
- configuration.getConnectorConfigurations().put(connector.getName(), connector);
+ TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
- List<String> connectorPairs = new ArrayList<String>();
- connectorPairs.add(connector.getName());
+ List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
- null,
- -1,
- groupAddress,
- port,
- 1000,
- connectorPairs);
+ BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
+ null,
+ -1,
+ groupAddress,
+ port,
+ 1000,
+ connectorPairs);
- configuration.getBroadcastGroupConfigurations().add(bcConfig);
+ configuration.getBroadcastGroupConfigurations().add(bcConfig);
- DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1", null, groupAddress, port, 5000, 5000);
+ DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration("dg1",
+ null,
+ groupAddress,
+ port,
+ 5000,
+ 5000);
- configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
+ configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
- HornetQServer server;
- if (fileStorage)
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- }
- else
- {
- if (sharedStorage)
- {
- server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- }
- servers[node] = server;
- }
+ HornetQServer server;
+ if (fileStorage)
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(true, configuration, nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration);
+ }
+ }
+ else
+ {
+ if (sharedStorage)
+ {
+ server = createInVMFailoverServer(false, configuration, nodeManagers[liveNode]);
+ }
+ else
+ {
+ server = HornetQServers.newHornetQServer(configuration, false);
+ }
+ }
+ servers[node] = server;
+ }
-
protected void clearServer(final int... nodes)
{
for (int i = 0; i < nodes.length; i++)
@@ -1681,12 +1710,12 @@
{
throw new IllegalStateException("No server at node " + nodeFrom);
}
-
+
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
List<String> pairs = null;
-
+
if (nodeTo != -1)
{
TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(nodeTo, netty));
@@ -1703,11 +1732,11 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, allowDirectConnectionsOnly);
+ pairs,
+ allowDirectConnectionsOnly);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
-
protected void setupClusterConnection(final String name,
final String address,
final boolean forwardWhenNoConsumers,
@@ -1725,7 +1754,7 @@
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
-
+
List<String> pairs = new ArrayList<String>();
for (int element : nodesTo)
{
@@ -1742,7 +1771,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, false);
+ pairs,
+ false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1781,7 +1811,8 @@
forwardWhenNoConsumers,
maxHops,
1024,
- pairs, false);
+ pairs,
+ false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1803,7 +1834,7 @@
TransportConfiguration connectorConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
server.getConfiguration().getConnectorConfigurations().put(name, connectorConfig);
-
+
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
address,
name,
@@ -1836,23 +1867,22 @@
}
for (int node : nodes)
{
- //wait for each server to start, it may be a backup and started in a separate thread
+ // wait for each server to start, it may be a backup and started in a separate thread
waitForServer(servers[node]);
}
}
- private void waitForServer(HornetQServer server)
- throws InterruptedException
+ private void waitForServer(HornetQServer server) throws InterruptedException
{
- long timetowait =System.currentTimeMillis() + 5000;
- while(!server.isStarted())
+ long timetowait = System.currentTimeMillis() + 5000;
+ while (!server.isStarted())
{
Thread.sleep(100);
- if(server.isStarted())
+ if (server.isStarted())
{
break;
}
- else if(System.currentTimeMillis() > timetowait)
+ else if (System.currentTimeMillis() > timetowait)
{
fail("server didnt start");
}
Added: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java (rev 0)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java 2011-09-01 22:27:06 UTC (rev 11279)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+
+/**
+ * A LargeMessageRedistributionTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class LargeMessageRedistributionTest extends MessageRedistributionTest
+{
+
+ public LargeMessageRedistributionTest()
+ {
+ super();
+ }
+
+ public LargeMessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return true;
+ }
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-01 21:58:56 UTC (rev 11278)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-01 22:27:06 UTC (rev 11279)
@@ -25,9 +25,6 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -44,6 +41,19 @@
{
private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
+ public MessageRedistributionTest()
+ {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public MessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -118,96 +128,6 @@
MessageRedistributionTest.log.info("Test done");
}
- // https://issues.jboss.org/browse/HORNETQ-654
- public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
- {
- setupCluster(false);
-
- MessageRedistributionTest.log.info("Doing test");
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 20, true, null);
-
- getReceivedOrder(0, true);
- int[] ids1 = getReceivedOrder(1, false);
- getReceivedOrder(2, true);
-
- for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
- {
- ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
- for (MessageFlowRecord record : impl.getRecords().values())
- {
- if (record.getBridge() != null)
- {
- System.out.println("stop record bridge");
- record.getBridge().stop();
- }
- }
- }
-
- removeConsumer(1);
-
- // Need to wait some time as we need to handle all redistributions before we stop the servers
- Thread.sleep(5000);
-
- for (int i = 0; i <= 2; i++)
- {
- servers[i].stop();
- servers[i] = null;
- }
-
- setupServers();
-
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- for (int i = 0 ; i <= 2; i++)
- {
- consumers[i] = null;
- sfs[i] = null;
- }
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
- MessageRedistributionTest.log.info("Test done");
- }
-
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
{
setupCluster(false);
13 years, 4 months
JBoss hornetq SVN: r11278 - in branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 17:58:56 -0400 (Thu, 01 Sep 2011)
New Revision: 11278
Modified:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Back porting HORNETQ-753
Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-01 21:50:16 UTC (rev 11277)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-01 21:58:56 UTC (rev 11278)
@@ -409,7 +409,8 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(msgI,
+ bodyBuffer.toByteBuffer()
.array(),
!lastChunk,
lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+ chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get());
}
else
{
- chunk = new SessionSendContinuationMessage(buff, true, false);
+ chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
}
if (sendBlocking && lastPacket)
Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-01 21:50:16 UTC (rev 11277)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-01 21:58:56 UTC (rev 11278)
@@ -63,6 +63,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
@@ -107,8 +108,6 @@
private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
- private final boolean trace = ClientSessionImpl.log.isTraceEnabled();
-
// Attributes ----------------------------------------------------------------------------
private Map<String, String> metadata = new HashMap<String, String>();
@@ -1192,6 +1191,15 @@
sendAckHandler.sendAcknowledged(ssm.getMessage());
}
+ else
+ if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+ {
+ SessionSendContinuationMessage scm = (SessionSendContinuationMessage)packet;
+ if (!scm.isContinues())
+ {
+ sendAckHandler.sendAcknowledged(scm.getMessage());
+ }
+ }
}
// XAResource implementation
Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01 21:50:16 UTC (rev 11277)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01 21:58:56 UTC (rev 11278)
@@ -2344,9 +2344,9 @@
}
- private static class QueueEncoding implements EncodingSupport
+ public static class QueueEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
public QueueEncoding(final long queueID)
{
@@ -2398,7 +2398,7 @@
}
}
- private static class RefEncoding extends QueueEncoding
+ public static class RefEncoding extends QueueEncoding
{
public RefEncoding()
{
@@ -2848,7 +2848,7 @@
// Encoding functions for binding Journal
- private static Object newObjectEncoding(RecordInfo info)
+ public static Object newObjectEncoding(RecordInfo info)
{
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
long id = info.id;
@@ -2990,9 +2990,9 @@
}
}
- private static class ReferenceDescribe
+ public static class ReferenceDescribe
{
- RefEncoding refEncoding;
+ public RefEncoding refEncoding;
public ReferenceDescribe(RefEncoding refEncoding)
{
Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-01 21:50:16 UTC (rev 11277)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-01 21:58:56 UTC (rev 11278)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
@@ -34,6 +35,9 @@
private boolean requiresResponse;
+ // Used on confirmation handling
+ private MessageInternal message;
+
/**
* to be sent on the last package
*/
@@ -53,7 +57,7 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
@@ -64,9 +68,9 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
{
- this(body, continues, requiresResponse);
+ this(message, body, continues, requiresResponse);
this.messageBodySize = messageBodySize;
}
@@ -84,7 +88,16 @@
{
return messageBodySize;
}
+
+ /**
+ * @return the message
+ */
+ public MessageInternal getMessage()
+ {
+ return message;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-01 21:50:16 UTC (rev 11277)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-01 21:58:56 UTC (rev 11278)
@@ -23,6 +23,7 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -195,11 +196,13 @@
{
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
}
+ else
+ {
+ message.getBodyBuffer().writeBytes(bytes);
+ }
message.putIntProperty(propKey, i);
- message.getBodyBuffer().writeBytes(bytes);
-
producer0.send(message);
}
@@ -255,6 +258,173 @@
}
+ public void testBridgeWithLargeMessage() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+
+ final int PAGE_MAX = 1024 * 1024 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ -1,
+ false,
+ 0,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ staticConnectors,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator.setMinLargeMessageSize(1024);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 50;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int LARGE_MESSAGE_SIZE = 10 * 1024;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+ message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ session0.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ HornetQBuffer buff = message.getBodyBuffer();
+
+ for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), buff.readByte());
+ }
+
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ Map<Long, AtomicInteger> maps = loadQueues(server0);
+
+ for (Map.Entry<Long, AtomicInteger> value : maps.entrySet())
+ {
+ System.out.println("queue " + value.getKey() + "=" + value.getValue());
+
+ assertEquals(0, value.getValue().intValue());
+ }
+ }
+
+
/**
* @param server1Params
*/
Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 21:50:16 UTC (rev 11277)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 21:58:56 UTC (rev 11278)
@@ -84,6 +84,16 @@
private static final long WAIT_TIMEOUT = 5000;
+ protected int getLargeMessageSize()
+ {
+ return 1024;
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return false;
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -522,6 +532,11 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
if (filterVal != null)
{
@@ -530,6 +545,7 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
producer.send(message);
if (i % 100 == 0)
@@ -580,6 +596,11 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
message.putStringProperty(key, val);
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
@@ -773,6 +794,13 @@
Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
}
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
if (ack)
Modified: branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-01 21:50:16 UTC (rev 11277)
+++ branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-01 21:58:56 UTC (rev 11278)
@@ -34,10 +34,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -1231,7 +1240,63 @@
}
return bindingsFound;
}
+ /**
+ * It will inspect the journal directly and determine if there are queues on this journal,
+ * @return a Map containing the reference counts per queue
+ * @param serverToInvestigate
+ * @throws Exception
+ */
+ protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate) throws Exception
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
+ JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+ serverToInvestigate.getConfiguration().getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+ messagesJournal.start();
+ messagesJournal.load(records, preparedTransactions, null);
+
+ // These are more immutable integers
+ Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
+
+
+ for (RecordInfo info : records)
+ {
+ Object o = JournalStorageManager.newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = new AtomicInteger(1);
+ messageRefCounts.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ count.incrementAndGet();
+ }
+ }
+ }
+
+
+ messagesJournal.stop();
+
+
+ return messageRefCounts;
+
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
13 years, 4 months
JBoss hornetq SVN: r11277 - in branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 17:50:16 -0400 (Thu, 01 Sep 2011)
New Revision: 11277
Modified:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
back porting HORNETQ-753
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2011-09-01 21:50:16 UTC (rev 11277)
@@ -409,7 +409,8 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(msgI,
+ bodyBuffer.toByteBuffer()
.array(),
!lastChunk,
lastChunk && sendBlocking);
@@ -529,11 +530,11 @@
buff = buff2;
- chunk = new SessionSendContinuationMessage(buff, false, sendBlocking, messageSize.get());
+ chunk = new SessionSendContinuationMessage(msgI, buff, false, sendBlocking, messageSize.get());
}
else
{
- chunk = new SessionSendContinuationMessage(buff, true, false);
+ chunk = new SessionSendContinuationMessage(msgI, buff, true, false);
}
if (sendBlocking && lastPacket)
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-01 21:50:16 UTC (rev 11277)
@@ -63,6 +63,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
@@ -107,8 +108,6 @@
private static final Logger log = Logger.getLogger(ClientSessionImpl.class);
- private final boolean trace = ClientSessionImpl.log.isTraceEnabled();
-
// Attributes ----------------------------------------------------------------------------
private Map<String, String> metadata = new HashMap<String, String>();
@@ -1192,6 +1191,15 @@
sendAckHandler.sendAcknowledged(ssm.getMessage());
}
+ else
+ if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION)
+ {
+ SessionSendContinuationMessage scm = (SessionSendContinuationMessage)packet;
+ if (!scm.isContinues())
+ {
+ sendAckHandler.sendAcknowledged(scm.getMessage());
+ }
+ }
}
// XAResource implementation
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01 21:50:16 UTC (rev 11277)
@@ -2335,9 +2335,9 @@
}
- private static class QueueEncoding implements EncodingSupport
+ public static class QueueEncoding implements EncodingSupport
{
- long queueID;
+ public long queueID;
public QueueEncoding(final long queueID)
{
@@ -2389,7 +2389,7 @@
}
}
- private static class RefEncoding extends QueueEncoding
+ public static class RefEncoding extends QueueEncoding
{
public RefEncoding()
{
@@ -2839,7 +2839,7 @@
// Encoding functions for binding Journal
- private static Object newObjectEncoding(RecordInfo info)
+ public static Object newObjectEncoding(RecordInfo info)
{
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
long id = info.id;
@@ -2981,9 +2981,9 @@
}
}
- private static class ReferenceDescribe
+ public static class ReferenceDescribe
{
- RefEncoding refEncoding;
+ public RefEncoding refEncoding;
public ReferenceDescribe(RefEncoding refEncoding)
{
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java 2011-09-01 21:50:16 UTC (rev 11277)
@@ -14,6 +14,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
@@ -34,6 +35,9 @@
private boolean requiresResponse;
+ // Used on confirmation handling
+ private MessageInternal message;
+
/**
* to be sent on the last package
*/
@@ -53,7 +57,7 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
@@ -64,9 +68,9 @@
* @param continues
* @param requiresResponse
*/
- public SessionSendContinuationMessage(final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
+ public SessionSendContinuationMessage(final MessageInternal message, final byte[] body, final boolean continues, final boolean requiresResponse, final long messageBodySize)
{
- this(body, continues, requiresResponse);
+ this(message, body, continues, requiresResponse);
this.messageBodySize = messageBodySize;
}
@@ -84,7 +88,16 @@
{
return messageBodySize;
}
+
+ /**
+ * @return the message
+ */
+ public MessageInternal getMessage()
+ {
+ return message;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2011-09-01 21:50:16 UTC (rev 11277)
@@ -17,9 +17,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -191,11 +193,13 @@
{
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
}
+ else
+ {
+ message.getBodyBuffer().writeBytes(bytes);
+ }
message.putIntProperty(propKey, i);
- message.getBodyBuffer().writeBytes(bytes);
-
producer0.send(message);
}
@@ -251,6 +255,173 @@
}
+ public void testBridgeWithLargeMessage() throws Exception
+ {
+ HornetQServer server0 = null;
+ HornetQServer server1 = null;
+
+ final int PAGE_MAX = 1024 * 1024 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+ ServerLocator locator = null;
+ try
+ {
+
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ -1,
+ false,
+ 0,
+ HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ staticConnectors,
+ false,
+ ConfigurationImpl.DEFAULT_CLUSTER_USER,
+ ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration(testAddress, queueName0, null, true);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration(forwardAddress, queueName1, null, true);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+
+ locator = HornetQClient.createServerLocatorWithoutHA(server0tc, server1tc);
+ locator.setMinLargeMessageSize(1024);
+ ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+ ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+ ClientSession session0 = sf0.createSession(false, true, true);
+
+ ClientSession session1 = sf1.createSession(false, true, true);
+
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+ session1.start();
+
+ final int numMessages = 50;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final int LARGE_MESSAGE_SIZE = 10 * 1024;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createMessage(true);
+ message.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE));
+
+ message.putIntProperty(propKey, i);
+
+ producer0.send(message);
+ }
+
+ session0.commit();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ HornetQBuffer buff = message.getBodyBuffer();
+
+ for (int posMsg = 0 ; posMsg < LARGE_MESSAGE_SIZE; posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), buff.readByte());
+ }
+
+ message.acknowledge();
+ }
+
+ session1.commit();
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+ try
+ {
+ server0.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server1.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ Map<Long, AtomicInteger> maps = loadQueues(server0);
+
+ for (Map.Entry<Long, AtomicInteger> value : maps.entrySet())
+ {
+ System.out.println("queue " + value.getKey() + "=" + value.getValue());
+
+ assertEquals(0, value.getValue().intValue());
+ }
+ }
+
+
/**
* @param server1Params
*/
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 21:50:16 UTC (rev 11277)
@@ -84,6 +84,16 @@
private static final long WAIT_TIMEOUT = 5000;
+ protected int getLargeMessageSize()
+ {
+ return 1024;
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return false;
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -522,6 +532,11 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
if (filterVal != null)
{
@@ -530,6 +545,7 @@
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
producer.send(message);
}
}
@@ -573,6 +589,11 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
message.putStringProperty(key, val);
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
@@ -766,6 +787,13 @@
Assert.assertNotNull("consumer " + consumerID + " did not receive message " + j, message);
}
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
if (ack)
Modified: branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-01 21:48:13 UTC (rev 11276)
+++ branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-09-01 21:50:16 UTC (rev 11277)
@@ -34,10 +34,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import javax.transaction.xa.XAException;
@@ -57,9 +59,16 @@
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -1234,7 +1243,63 @@
}
return bindingsFound;
}
+ /**
+ * It will inspect the journal directly and determine if there are queues on this journal,
+ * @return a Map containing the reference counts per queue
+ * @param serverToInvestigate
+ * @throws Exception
+ */
+ protected Map<Long, AtomicInteger> loadQueues(HornetQServer serverToInvestigate) throws Exception
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalDirectory());
+ JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(),
+ serverToInvestigate.getConfiguration().getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+
+ messagesJournal.start();
+ messagesJournal.load(records, preparedTransactions, null);
+
+ // These are more immutable integers
+ Map<Long, AtomicInteger> messageRefCounts = new HashMap<Long, AtomicInteger>();
+
+
+ for (RecordInfo info : records)
+ {
+ Object o = JournalStorageManager.newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ AtomicInteger count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = new AtomicInteger(1);
+ messageRefCounts.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ count.incrementAndGet();
+ }
+ }
+ }
+
+
+ messagesJournal.stop();
+
+
+ return messageRefCounts;
+
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
13 years, 4 months
JBoss hornetq SVN: r11276 - branches/one-offs.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 17:48:13 -0400 (Thu, 01 Sep 2011)
New Revision: 11276
Added:
branches/one-offs/Hornetq_2_2_5_EAP_JBPAPP_7116/
Log:
creating branch for JBPAPP-7116
13 years, 4 months
JBoss hornetq SVN: r11275 - branches/one-offs.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 17:47:39 -0400 (Thu, 01 Sep 2011)
New Revision: 11275
Added:
branches/one-offs/Hornetq_2_2_4_EAP_JBPAPP_7116/
Log:
creating branch for JBPAPP-7116
13 years, 4 months
JBoss hornetq SVN: r11274 - in branches/Branch_2_2_EAP_cluster_clean3: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 17:34:19 -0400 (Thu, 01 Sep 2011)
New Revision: 11274
Added:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
JBPAPP-7115 - committing to a branch (It will be merged with other changes)
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/message/impl/MessageImpl.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -143,6 +143,14 @@
*/
protected MessageImpl(final MessageImpl other)
{
+ this(other, other.getProperties());
+ }
+
+ /*
+ * Copy constructor
+ */
+ protected MessageImpl(final MessageImpl other, TypedProperties properties)
+ {
messageID = other.getMessageID();
userID = other.getUserID();
address = other.getAddress();
@@ -151,7 +159,7 @@
expiration = other.getExpiration();
timestamp = other.getTimestamp();
priority = other.getPriority();
- properties = new TypedProperties(other.getProperties());
+ this.properties = new TypedProperties(properties);
// This MUST be synchronized using the monitor on the other message to prevent it running concurrently
// with getEncodedBuffer(), otherwise can introduce race condition when delivering concurrently to
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.TypedProperties;
/**
* A LargeServerMessageImpl
@@ -70,12 +71,13 @@
/**
* Copy constructor
+ * @param properties
* @param copy
* @param fileCopy
*/
- private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
+ private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
{
- super(copy);
+ super(copy, properties);
linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
@@ -281,8 +283,30 @@
this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
+
+ @Override
+ public synchronized ServerMessage copy()
+ {
+ incrementDelayDeletionCount();
+ long idToUse = messageID;
+ if (linkMessage != null)
+ {
+ idToUse = linkMessage.getMessageID();
+ }
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
+
+ ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
+ : (LargeServerMessageImpl)linkMessage,
+ properties,
+ newfile,
+ messageID);
+ return newMessage;
+ }
+
+
@Override
public synchronized ServerMessage copy(final long newID)
{
@@ -301,6 +325,7 @@
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
: (LargeServerMessageImpl)linkMessage,
+ properties,
newfile,
newID);
return newMessage;
@@ -317,7 +342,7 @@
file.copyTo(newFile);
- LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
+ LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
newMessage.linkMessage = null;
@@ -341,9 +366,9 @@
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
+ return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" +
- ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
+ ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
}
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -450,7 +450,7 @@
// Consumer implementation ---------------------------------------
/* Hook for processing message before forwarding */
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
if (useDuplicateDetection)
{
@@ -462,10 +462,20 @@
if (transformer != null)
{
- message = transformer.transform(message);
+ final ServerMessage transformedMessage = transformer.transform(message);
+ if (transformedMessage != message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+ }
+ }
+ return transformedMessage;
}
-
- return message;
+ else
+ {
+ return message;
+ }
}
/**
@@ -530,6 +540,12 @@
// that this will throw a disconnect, we need to remove the message
// from the acks so it will get resent, duplicate detection will cope
// with any messages resent
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("XXX going to send message " + message);
+ }
+
try
{
producer.send(dest, message);
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-01 16:15:10 UTC (rev 11273)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -182,34 +182,50 @@
}
@Override
- protected ServerMessage beforeForward(ServerMessage message)
+ protected ServerMessage beforeForward(final ServerMessage message)
{
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
// only
// the one pertinent for the address node - this is important since different queues on different
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
- message = message.copy();
+ ServerMessage messageCopy = message.copy();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
+ }
// TODO - we can optimise this
- Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+ Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
+
+ if (queueIds == null)
+ {
+ // Sanity check only
+ log.warn("no queue IDs defined!, originalMessage = " + message +
+ ", copiedMessage = " +
+ messageCopy +
+ ", props=" +
+ idsHeaderName);
+ throw new IllegalStateException("no queueIDs defined");
+ }
for (SimpleString propName : propNames)
{
if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
{
- message.removeProperty(propName);
+ messageCopy.removeProperty(propName);
}
}
- message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+ messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
- message = super.beforeForward(message);
-
- return message;
+ messageCopy = super.beforeForward(messageCopy);
+
+ return messageCopy;
}
private void setupNotificationConsumer() throws Exception
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.MemorySize;
+import org.hornetq.utils.TypedProperties;
/**
*
@@ -89,6 +90,14 @@
super(other);
}
+ /*
+ * Copy constructor
+ */
+ protected ServerMessageImpl(final ServerMessageImpl other, TypedProperties properties)
+ {
+ super(other, properties);
+ }
+
public boolean isServerMessage()
{
return true;
@@ -193,6 +202,7 @@
public ServerMessage copy()
{
+ // This is a simple copy, used only to avoid changing original properties
return new ServerMessageImpl(this);
}
@@ -275,7 +285,7 @@
{
return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
",expiration=" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : 0) +
- ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
+ ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
}
// FIXME - this is stuff that is only used in large messages
Modified: branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
+++ branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -1073,6 +1073,11 @@
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message);
+ if (log.isTraceEnabled())
+ {
+ log.trace("sendLarge::" + largeMsg);
+ }
+
if (currentLargeMessage != null)
{
ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 16:15:10 UTC (rev 11273)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -74,7 +74,17 @@
public abstract class ClusterTestBase extends ServiceTestBase
{
private final Logger log = Logger.getLogger(this.getClass());
+ public ClusterTestBase()
+ {
+ super();
+ }
+ public ClusterTestBase(String name)
+ {
+ super(name);
+ }
+
+
private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
TransportConstants.DEFAULT_PORT + 1,
TransportConstants.DEFAULT_PORT + 2,
@@ -87,7 +97,18 @@
TransportConstants.DEFAULT_PORT + 9, };
private static final long WAIT_TIMEOUT = 10000;
+
+ protected int getLargeMessageSize()
+ {
+ return 500;
+ }
+ protected boolean isLargeMessage()
+ {
+ return false;
+ }
+
+
private static final long TIMEOUT_START_SERVER = 10;
@Override
@@ -635,6 +656,11 @@
}
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
producer.send(message);
@@ -686,9 +712,15 @@
for (int i = msgStart; i < msgEnd; i++)
{
ClientMessage message = session.createMessage(durable);
+
+ if (isLargeMessage())
+ {
+ message.setBodyInputStream(createFakeLargeStream(getLargeMessageSize()));
+ }
message.putStringProperty(key, val);
message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
+
producer.send(message);
}
}
@@ -881,6 +913,15 @@
log.info("msg on ClusterTestBase = " + message);
+
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+
if (ack)
{
message.acknowledge();
@@ -1180,7 +1221,13 @@
if (message != null)
{
int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
+
+ for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+
// log.info("consumer " + consumerIDs[i] + " received message " + count);
Assert.assertFalse(counts.contains(count));
@@ -1274,6 +1321,15 @@
message = consumer.consumer.receive(500);
if (message != null)
{
+
+ if (isLargeMessage())
+ {
+ for (int posMsg = 0 ; posMsg < getLargeMessageSize(); posMsg++)
+ {
+ assertEquals(getSamplebyte(posMsg), message.getBodyBuffer().readByte());
+ }
+ }
+
if (ack)
{
message.acknowledge();
Added: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java (rev 0)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+
+/**
+ * A LargeMessageRedistributionTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class LargeMessageRedistributionTest extends MessageRedistributionTest
+{
+
+ public LargeMessageRedistributionTest()
+ {
+ super();
+ }
+
+ public LargeMessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
+ protected boolean isLargeMessage()
+ {
+ return true;
+ }
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-01 16:15:10 UTC (rev 11273)
+++ branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-01 21:34:19 UTC (rev 11274)
@@ -44,6 +44,19 @@
{
private static final Logger log = Logger.getLogger(MessageRedistributionTest.class);
+ public MessageRedistributionTest()
+ {
+ super();
+ }
+
+ /**
+ * @param name
+ */
+ public MessageRedistributionTest(String name)
+ {
+ super(name);
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -113,101 +126,11 @@
removeConsumer(1);
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+ verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
MessageRedistributionTest.log.info("Test done");
}
- // https://issues.jboss.org/browse/HORNETQ-654
- public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
- {
- setupCluster(false);
-
- MessageRedistributionTest.log.info("Doing test");
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 2, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
-
- send(0, "queues.testaddress", 20, true, null);
-
- getReceivedOrder(0, true);
- int[] ids1 = getReceivedOrder(1, false);
- getReceivedOrder(2, true);
-
- for (ClusterConnection conn : servers[1].getClusterManager().getClusterConnections())
- {
- ClusterConnectionImpl impl = (ClusterConnectionImpl)conn;
- for (MessageFlowRecord record : impl.getRecords().values())
- {
- if (record.getBridge() != null)
- {
- System.out.println("stop record bridge");
- record.getBridge().stop();
- }
- }
- }
-
- removeConsumer(1);
-
- // Need to wait some time as we need to handle all redistributions before we stop the servers
- Thread.sleep(1000);
-
- for (int i = 0; i <= 2; i++)
- {
- servers[i].stop();
- servers[i] = null;
- }
-
- setupServers();
-
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- for (int i = 0 ; i <= 2; i++)
- {
- consumers[i] = null;
- sfs[i] = null;
- }
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(2, 2, "queue0", null);
-
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, true);
-
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
-
- MessageRedistributionTest.log.info("Test done");
- }
-
public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception
{
setupCluster(false);
13 years, 4 months
JBoss hornetq SVN: r11273 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm and 8 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-01 12:15:10 -0400 (Thu, 01 Sep 2011)
New Revision: 11273
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Synchronization of Large Messages
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -27,6 +27,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -372,7 +373,7 @@
final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
-
+ Map<String, Long> largeMessageFilesToSync;
try
{
storageManagerLock.writeLock().lock();
@@ -386,6 +387,7 @@
{
messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ largeMessageFilesToSync = getLargeMessageInformation();
}
finally
{
@@ -399,8 +401,10 @@
{
storageManagerLock.writeLock().unlock();
}
+
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+ sendLargeMessageFiles(largeMessageFilesToSync);
storageManagerLock.writeLock().lock();
try
@@ -420,7 +424,43 @@
}
}
+ private void sendLargeMessageFiles(Map<String, Long> largeMessageFilesToSync) throws Exception
+ {
+ for (Entry<String, Long> entry : largeMessageFilesToSync.entrySet())
+ {
+ String fileName = entry.getKey();
+ long size = entry.getValue();
+ SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1);
+ if (!seqFile.exists())
+ continue;
+ replicator.syncLargeMessageFile(seqFile, size, getLargeMessageIdFromFilename(fileName));
+ }
+ }
+
+ private long getLargeMessageIdFromFilename(String filename)
+ {
+ return Long.parseLong(filename.split("\\.")[0]);
+ }
+
/**
+ * Assumes the
+ * @return
+ * @throws Exception
+ */
+ private Map<String, Long> getLargeMessageInformation() throws Exception
+ {
+ Map<String, Long> largeMessages = new HashMap<String, Long>();
+ List<String> filenames = largeMessagesFactory.listFiles("msg");
+ for (String filename : filenames)
+ {
+ SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename, 1);
+ long size = seqFile.size();
+ largeMessages.put(filename, size);
+ }
+ return largeMessages;
+ }
+
+ /**
* Send an entire journal file to a replicating server (a backup server that is).
* @param jf
* @param replicator2
@@ -431,7 +471,7 @@
{
for (JournalFile jf : journalFiles)
{
- replicator.sendJournalFile(jf, type);
+ replicator.syncJournalFile(jf, type);
jf.setCanReclaim(true);
}
}
@@ -563,30 +603,44 @@
public void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes) throws Exception
{
- file.position(file.size());
+ readLock();
+ try
+ {
+ file.position(file.size());
- file.writeDirect(ByteBuffer.wrap(bytes), false);
+ file.writeDirect(ByteBuffer.wrap(bytes), false);
- if (isReplicated())
+ if (isReplicated())
+ {
+ replicator.largeMessageWrite(messageId, bytes);
+ }
+ }
+ finally
{
- replicator.largeMessageWrite(messageId, bytes);
+ readUnLock();
}
}
public LargeServerMessage createLargeMessage(final long id, final MessageInternal message)
{
- if (isReplicated())
+ readLock();
+ try
{
- replicator.largeMessageBegin(id);
- }
+ if (isReplicated())
+ {
+ replicator.largeMessageBegin(id);
+ }
- LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
+ LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
+ largeMessage.copyHeadersAndProperties(message);
+ largeMessage.setMessageID(id);
- largeMessage.copyHeadersAndProperties(message);
-
- largeMessage.setMessageID(id);
-
- return largeMessage;
+ return largeMessage;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Non transactional operations
@@ -604,6 +658,7 @@
{
// Note that we don't sync, the add reference that comes immediately after will sync if appropriate
+ // XXX HORNETQ-720
if (message.isLargeMessage())
{
messageJournal.appendAddRecord(message.getMessageID(),
@@ -2049,16 +2104,9 @@
* @param messageID
* @return
*/
- SequentialFile createFileForLargeMessage(final long messageID, final boolean durable)
+ SequentialFile createFileForLargeMessage(final long messageID, String extension)
{
- if (durable)
- {
- return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);
- }
- else
- {
- return largeMessagesFactory.createSequentialFile(messageID + ".tmp", -1);
- }
+ return largeMessagesFactory.createSequentialFile(messageID + extension, -1);
}
// Private ----------------------------------------------------------------------------------
@@ -2379,14 +2427,11 @@
*/
private void cleanupIncompleteFiles() throws Exception
{
- if (largeMessagesFactory != null)
+ List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
+ for (String tmpFile : tmpFiles)
{
- List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
- for (String tmpFile : tmpFiles)
- {
- SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
- file.delete();
- }
+ SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
+ file.delete();
}
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -31,7 +31,7 @@
* A LargeServerMessageImpl
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
+ *
* Created 30-Sep-08 12:02:45 PM
*
*
@@ -49,9 +49,9 @@
private final JournalStorageManager storageManager;
private LargeServerMessage linkMessage;
-
+
private boolean paged;
-
+ private boolean replicationSync;
// We should only use the NIO implementation on the Journal
private SequentialFile file;
@@ -89,7 +89,7 @@
{
paged = true;
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
*/
@@ -231,7 +231,7 @@
public boolean isFileExists() throws Exception
{
- SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(), durable);
+ SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(), getExtension());
return localfile.exists();
}
@@ -243,7 +243,7 @@
{
if (memoryEstimate == -1)
{
- // The body won't be on memory (aways on-file), so we don't consider this for paging
+ // The body won't be on memory (always on-file), so we don't consider this for paging
memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT +
getEncodeSize() +
(16 + 4) *
@@ -268,17 +268,18 @@
}
}
}
-
+
+ @Override
public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
{
super.setOriginalHeaders(other, expiry);
-
+
LargeServerMessageImpl otherLM = (LargeServerMessageImpl)other;
this.paged = otherLM.paged;
if (this.paged)
{
- this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
+ this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
@@ -289,16 +290,16 @@
if (!paged)
{
incrementDelayDeletionCount();
-
+
long idToUse = messageID;
-
+
if (linkMessage != null)
{
idToUse = linkMessage.getMessageID();
}
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, getExtension());
+
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
: (LargeServerMessageImpl)linkMessage,
newfile,
@@ -310,19 +311,19 @@
try
{
validateFile();
-
+
SequentialFile file = this.file;
-
- SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
-
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID, getExtension());
+
file.copyTo(newFile);
-
+
LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
-
+
newMessage.linkMessage = null;
-
+
newMessage.setPaged();
-
+
return newMessage;
}
catch (Exception e)
@@ -333,8 +334,9 @@
}
}
- public SequentialFile getFile()
+ public SequentialFile getFile() throws HornetQException
{
+ validateFile();
return file;
}
@@ -369,10 +371,10 @@
throw new RuntimeException("MessageID not set on LargeMessage");
}
- file = storageManager.createFileForLargeMessage(getMessageID(), durable);
+ file = storageManager.createFileForLargeMessage(getMessageID(), getExtension());
file.open();
-
+
bodySize = file.size();
}
}
@@ -383,6 +385,13 @@
}
}
+ private String getExtension()
+ {
+ if (replicationSync)
+ return ".sync";
+ return durable ? ".msg" : ".tmp";
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
*/
@@ -396,7 +405,7 @@
linkMessage = message;
- file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
+ file = storageManager.createFileForLargeMessage(message.getMessageID(), getExtension());
try
{
file.open();
@@ -477,4 +486,10 @@
return bodySize;
}
}
+
+ @Override
+ public void setReplicationSync(boolean sync)
+ {
+ replicationSync = sync;
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -14,6 +14,7 @@
package org.hornetq.core.persistence.impl.nullpm;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -21,7 +22,7 @@
* A NullStorageLargeServerMessage
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
+ *
* Created 30-Sep-08 1:51:42 PM
*
*
@@ -164,7 +165,23 @@
{
}
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#setReplicationSync(boolean)
+ */
+ @Override
+ public void setReplicationSync(boolean sync)
+ {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public SequentialFile getFile()
+ {
+ return null;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -104,15 +104,15 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
@@ -526,14 +526,14 @@
packet = new HaBackupRegistrationMessage();
break;
}
- case PacketImpl.REPLICATION_START_SYNC:
+ case PacketImpl.REPLICATION_START_STOP_SYNC:
{
packet = new ReplicationStartSyncMessage();
break;
}
- case PacketImpl.REPLICATION_SYNC:
+ case PacketImpl.REPLICATION_SYNC_FILE:
{
- packet = new ReplicationJournalFileMessage();
+ packet = new ReplicationSyncFileMessage();
break;
}
default:
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -178,7 +178,7 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
- public static final byte REPLICATION_SYNC = 103;
+ public static final byte REPLICATION_SYNC_FILE = 103;
// HA
@@ -195,7 +195,7 @@
/** XXX HORNETQ-720 "HA" is not really used anywhere else. Better name? */
public static final byte HA_BACKUP_REGISTRATION = 113;
- public static final byte REPLICATION_START_SYNC = 120;
+ public static final byte REPLICATION_START_STOP_SYNC = 120;
// Static --------------------------------------------------------
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -1,98 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import java.nio.ByteBuffer;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Message is used to:
- * <ol>
- * <li>copy JournalFile data over to the backup during synchronization;
- * <li>send a up-to-date signal to backup;
- * </ol>
- */
-public final class ReplicationJournalFileMessage extends PacketImpl
-{
-
- private ByteBuffer data;
- private int dataSize;
- private JournalContent journalType;
- /** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()} */
- private long fileId;
- private boolean backupIsUpToDate;
- private byte[] byteArray;
-
- public ReplicationJournalFileMessage()
- {
- super(REPLICATION_SYNC);
- }
-
- public ReplicationJournalFileMessage(int size, ByteBuffer buffer, JournalContent content, long id)
- {
- this();
- this.fileId = id;
- this.backupIsUpToDate = id == -1;
- this.dataSize = size;
- this.data = buffer;
- this.journalType = content;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeLong(fileId);
- if (fileId == -1)
- return;
- buffer.writeByte(journalType.typeByte);
- buffer.writeInt(dataSize);
- // sending -1 will close the file
- if (dataSize > 0)
- {
- buffer.writeBytes(data);// (data, 0, dataSize);
- }
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- fileId = buffer.readLong();
- if (fileId == -1)
- {
- backupIsUpToDate = true;
- return;
- }
- journalType = JournalContent.getType(buffer.readByte());
- int size = buffer.readInt();
- if (size > 0)
- {
- byteArray = new byte[size];
- buffer.readBytes(byteArray);
- }
- }
-
- public long getFileId()
- {
- return fileId;
- }
-
- public byte[] getData()
- {
- return byteArray;
- }
-
- public JournalContent getJournalContent()
- {
- return journalType;
- }
-
- /**
- * @return {@code true} if the live has finished synchronizing its data and the backup is
- * therefore up-to-date, {@code false} otherwise.
- */
- public boolean isUpToDate()
- {
- return backupIsUpToDate;
- }
-}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -6,25 +6,30 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Sends all fileIDs used in the live server to the backup. This is done so that we:
- * <ol>
- * <li>reserve those IDs in the backup;
- * <li>start replicating while the journal synchronization is taking place.
- * </ol>
+ * This message may signal start or end of the replication synchronization.
+ * <p>
+ * At start, it sends all fileIDs used in a given journal live server to the backup, so the backup
+ * can reserve those IDs.
*/
public class ReplicationStartSyncMessage extends PacketImpl
{
private long[] ids;
private JournalContent journalType;
+ private boolean synchronizationIsFinished;
public ReplicationStartSyncMessage()
{
- super(REPLICATION_START_SYNC);
+ super(REPLICATION_START_STOP_SYNC);
}
public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
{
this();
+ if (datafiles == null && contentType == null)
+ {
+ synchronizationIsFinished = true;
+ return;
+ }
ids = new long[datafiles.length];
for (int i = 0; i < datafiles.length; i++)
{
@@ -36,6 +41,9 @@
@Override
public void encodeRest(final HornetQBuffer buffer)
{
+ buffer.writeBoolean(synchronizationIsFinished);
+ if (synchronizationIsFinished)
+ return;
buffer.writeByte(journalType.typeByte);
buffer.writeInt(ids.length);
for (long id : ids)
@@ -47,6 +55,9 @@
@Override
public void decodeRest(final HornetQBuffer buffer)
{
+ synchronizationIsFinished = buffer.readBoolean();
+ if (synchronizationIsFinished)
+ return;
journalType = JournalContent.getType(buffer.readByte());
int length = buffer.readInt();
ids = new long[length];
@@ -56,6 +67,15 @@
}
}
+ /**
+ * @return {@code true} if the live has finished synchronizing its data and the backup is
+ * therefore up-to-date, {@code false} otherwise.
+ */
+ public boolean isSynchronizationFinished()
+ {
+ return synchronizationIsFinished;
+ }
+
public JournalContent getJournalContentType()
{
return journalType;
Copied: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java (from rev 11272, branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java)
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -0,0 +1,108 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import java.nio.ByteBuffer;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Message is used to:
+ * <ol>
+ * <li>copy JournalFile data over to the backup during synchronization;
+ * <li>send a up-to-date signal to backup;
+ * </ol>
+ */
+public final class ReplicationSyncFileMessage extends PacketImpl
+{
+
+ /**
+ * The JournalType or {@code null} if sync'ing large-messages.
+ */
+ private JournalContent journalType;
+ /**
+ * This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()}, or the
+ * message id if we are sync'ing a large-message.
+ */
+ private long fileId;
+ private int dataSize;
+ private ByteBuffer byteBuffer;
+ private byte[] byteArray;
+
+ public ReplicationSyncFileMessage()
+ {
+ super(REPLICATION_SYNC_FILE);
+ }
+
+ public ReplicationSyncFileMessage(JournalContent content, long id, int size, ByteBuffer buffer)
+ {
+ this();
+ this.byteBuffer = buffer;
+ this.dataSize = size;
+ this.fileId = id;
+ this.journalType = content;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(fileId);
+ if (fileId == -1)
+ return;
+ boolean isJournal = journalType != null;
+ buffer.writeBoolean(isJournal);
+ if (isJournal)
+ buffer.writeByte(journalType.typeByte);
+ buffer.writeInt(dataSize);
+ /*
+ * sending -1 will close the file in case of a journal, but not in case of a largeMessage
+ * (which might receive appends)
+ */
+ if (dataSize > 0)
+ {
+ buffer.writeBytes(byteBuffer);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ fileId = buffer.readLong();
+ if (buffer.readBoolean())
+ {
+ journalType = JournalContent.getType(buffer.readByte());
+ }
+ int size = buffer.readInt();
+ if (size > 0)
+ {
+ byteArray = new byte[size];
+ buffer.readBytes(byteArray);
+ }
+ }
+
+ public long getId()
+ {
+ return fileId;
+ }
+
+ public JournalContent getJournalContent()
+ {
+ return journalType;
+ }
+
+ /**
+ * @return
+ */
+ public byte[] getData()
+ {
+ return byteArray;
+ }
+
+ /**
+ * @return
+ */
+ public boolean isLargeMessage()
+ {
+ return journalType == null;
+ }
+}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
@@ -91,7 +92,7 @@
* @throws HornetQException
* @throws Exception
*/
- void sendJournalFile(JournalFile jf, JournalContent type) throws Exception;
+ void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
/**
* Reserve the following fileIDs in the backup server.
@@ -108,4 +109,9 @@
*/
void sendSynchronizationDone();
+ /**
+ * @param seqFile
+ * @throws Exception
+ */
+ void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -17,6 +17,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -48,15 +49,15 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
@@ -86,9 +87,13 @@
/** Files reserved in each journal for synchronization of existing data from the 'live' server. */
private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
new HashMap<JournalContent, Map<Long, JournalFile>>();
+ private Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long, LargeServerMessage>();
- /** Used to hold the real Journals before the backup is synchronized. */
- private final Map<JournalContent, Journal> journalsHolder = new HashMap<JournalContent, Journal>();
+ /**
+ * Used to hold the real Journals before the backup is synchronized. This field should be
+ * {@code null} on an up-to-date server.
+ */
+ private Map<JournalContent, Journal> journalsHolder = new HashMap<JournalContent, Journal>();
private StorageManager storage;
@@ -192,13 +197,13 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
- else if (type == PacketImpl.REPLICATION_START_SYNC)
+ else if (type == PacketImpl.REPLICATION_START_STOP_SYNC)
{
handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
}
- else if (type == PacketImpl.REPLICATION_SYNC)
+ else if (type == PacketImpl.REPLICATION_SYNC_FILE)
{
- handleReplicationSynchronization((ReplicationJournalFileMessage)packet);
+ handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
}
else
{
@@ -306,7 +311,7 @@
pageManager.stop();
- started = false;
+ started = false;
}
/* (non-Javadoc)
@@ -387,57 +392,109 @@
// Private -------------------------------------------------------
- private void handleReplicationSynchronization(ReplicationJournalFileMessage msg) throws Exception
+ private void finishSynchronization() throws Exception
{
- if (msg.isUpToDate())
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+ JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
+ journal.writeLock();
+ try
{
- JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
- journal.writeLock();
- try
+ if (journal.getDataFiles().length != 0)
{
- if (journal.getDataFiles().length != 0)
+ throw new IllegalStateException("Journal should not have any data files at this point");
+ }
+ // files should be already in place.
+ filesReservedForSync.remove(jc);
+ getJournal(jc.typeByte).stop();
+ registerJournal(jc.typeByte, journal);
+ journal.loadInternalOnly();
+ }
+ finally
+ {
+ journal.writeUnlock();
+ }
+ }
+ synchronized (largeMessagesOnSync)
+ {
+ synchronized (largeMessages)
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+ for (Entry<Long, LargeServerMessage> entry : largeMessages.entrySet())
+ {
+ Long id = entry.getKey();
+ LargeServerMessage lm = entry.getValue();
+ if (largeMessagesOnSync.containsKey(id))
{
- throw new IllegalStateException("Journal should not have any data files at this point");
+ SequentialFile sq = lm.getFile();
+ LargeServerMessage mainLM = largeMessagesOnSync.get(id);
+ SequentialFile mainSeqFile = mainLM.getFile();
+ System.out.println(mainSeqFile);
+ for (;;)
+ {
+ buffer.rewind();
+ int size = sq.read(buffer);
+ mainSeqFile.writeInternal(buffer);
+ if (size < buffer.capacity())
+ {
+ break;
+ }
+ }
}
- // files should be already in place.
- filesReservedForSync.remove(jc);
- getJournal(jc.typeByte).stop();
- registerJournal(jc.typeByte, journal);
- journal.loadInternalOnly();
- // XXX HORNETQ-720 must reload journals
- // XXX HORNETQ-720 must start using real journals
+ else
+ {
+ // these are large-messages created after sync started
+ largeMessagesOnSync.put(id, lm);
+ }
+ }
+ largeMessages.clear();
+ largeMessages.putAll(largeMessagesOnSync);
+ }
+ }
+ largeMessagesOnSync = null;
+ journalsHolder = null;
+ server.setRemoteBackupUpToDate();
+ log.info("Backup server " + server + " is synchronized with live-server.");
+ return;
+ }
- }
- finally
+ private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception
+ {
+ Long id = Long.valueOf(msg.getId());
+ byte[] data = msg.getData();
+ SequentialFile sf;
+ if (msg.isLargeMessage())
+ {
+ synchronized (largeMessagesOnSync)
+ {
+ LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
+ if (largeMessage == null)
{
- journal.writeUnlock();
+ largeMessage = storage.createLargeMessage();
+ largeMessage.setDurable(true);
+ largeMessage.setMessageID(id);
+ largeMessagesOnSync.put(id, largeMessage);
}
-
+ sf = largeMessage.getFile();
}
- server.setRemoteBackupUpToDate();
- log.info("Backup server " + server + " is synchronized with live-server.");
- return;
}
+ else
+ {
+ JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
+ sf = journalFile.getFile();
- long id = msg.getFileId();
- JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
-
- byte[] data = msg.getData();
+ }
if (data == null)
{
- journalFile.getFile().close();
+ sf.close();
+ return;
}
- else
+
+ if (!sf.isOpen())
{
- SequentialFile sf = journalFile.getFile();
- if (!sf.isOpen())
- {
- sf.open(1, false);
- }
- sf.writeDirect(ByteBuffer.wrap(data), true);
+ sf.open(1, false);
}
+ sf.writeDirect(ByteBuffer.wrap(data), true);
}
/**
@@ -452,6 +509,13 @@
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup can not be up-to-date!");
}
+
+ if (packet.isSynchronizationFinished())
+ {
+ finishSynchronization();
+ return;
+ }
+
final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
JournalImpl journal = assertJournalImpl(journalIf);
@@ -520,6 +584,18 @@
else
{
message = largeMessages.get(messageId);
+ if (message == null)
+ {
+ synchronized (largeMessages)
+ {
+ if (!server.isRemoteBackupUpToDate())
+ {
+ // in case we need to append data to a file while still sync'ing the backup
+ createLargeMessage(messageId, true);
+ message = largeMessages.get(messageId);
+ }
+ }
+ }
}
if (message == null)
@@ -537,13 +613,20 @@
*/
private void handleLargeMessageBegin(final ReplicationLargeMessageBeingMessage packet)
{
- LargeServerMessage largeMessage = storage.createLargeMessage();
- largeMessage.setDurable(true);
- largeMessage.setMessageID(packet.getMessageId());
- log.trace("Receiving Large Message " + largeMessage.getMessageID() + " on backup");
- largeMessages.put(largeMessage.getMessageID(), largeMessage);
+ final long id = packet.getMessageId();
+ createLargeMessage(id, false);
+ log.trace("Receiving Large Message " + id + " on backup");
}
+ private void createLargeMessage(final long id, boolean sync)
+ {
+ LargeServerMessage msg = storage.createLargeMessage();
+ msg.setDurable(true);
+ msg.setMessageID(id);
+ msg.setReplicationSync(sync);
+ largeMessages.put(id, msg);
+ }
+
/**
* @param packet
*/
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -44,7 +44,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -52,6 +51,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.utils.ExecutorFactory;
@@ -504,26 +504,58 @@
}
@Override
- public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
+ public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
{
SequentialFile file = jf.getFile().copy();
log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
+ sendLargeFile(content, jf.getFileID(), file, Long.MAX_VALUE);
+ }
+
+ @Override
+ public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
+ {
+ sendLargeFile(null, id, file, size);
+ }
+
+ /**
+ * Sends large files in reasonably sized chunks to the backup during replication synchronization.
+ * @param content journal type or {@code null} for large-messages
+ * @param id journal file id or (large) message id
+ * @param file
+ * @param maxBytesToSend maximum number of bytes to read and send from the file
+ * @throws Exception
+ */
+ private void sendLargeFile(JournalContent content, final long id, SequentialFile file, long maxBytesToSend)
+ throws Exception
+ {
if (!file.isOpen())
{
file.open(1, false);
}
- final long id = jf.getFileID();
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
while (true)
{
+ buffer.rewind();
int bytesRead = file.read(buffer);
+ int toSend = bytesRead;
if (bytesRead > 0)
- buffer.limit(bytesRead);
+ {
+ if (bytesRead >= maxBytesToSend)
+ {
+ toSend = (int)maxBytesToSend;
+ maxBytesToSend = 0;
+ }
+ else
+ {
+ maxBytesToSend = maxBytesToSend - bytesRead;
+ }
+ buffer.limit(toSend);
+ }
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
- sendReplicatePacket(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
- if (bytesRead == -1 || bytesRead == 0)
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, id, bytesRead, buffer));
+ if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
}
@@ -537,7 +569,7 @@
@Override
public void sendSynchronizationDone()
{
- sendReplicatePacket(new ReplicationJournalFileMessage(-1, null, null, -1));
+ ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
+ sendReplicatePacket(msg);
}
-
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -13,14 +13,13 @@
package org.hornetq.core.server;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.journal.SequentialFile;
+
/**
* A LargeMessage
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * Created 30-Sep-08 10:58:04 AM
- *
- *
*/
public interface LargeServerMessage extends ServerMessage
{
@@ -30,13 +29,13 @@
void setLinkedMessage(LargeServerMessage message);
boolean isFileExists() throws Exception;
-
+
/**
* We have to copy the large message content in case of DLQ and paged messages
* For that we need to pre-mark the LargeMessage with a flag when it is paged
*/
void setPaged();
-
+
/** Close the files if opened */
void releaseResources();
@@ -45,4 +44,17 @@
void incrementDelayDeletionCount();
void decrementDelayDeletionCount() throws Exception;
+
+ /**
+ * This method only has relevance in a backup server.
+ * @param sync {@code true} if this file is meant for appends of a message that needs to be
+ * sync'ed with the live.
+ */
+ void setReplicationSync(boolean sync);
+
+ /**
+ * @return
+ * @throws HornetQException
+ */
+ SequentialFile getFile() throws HornetQException;
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -17,9 +17,9 @@
import java.util.List;
/**
- *
+ *
* A SequentialFileFactory
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -28,6 +28,12 @@
{
SequentialFile createSequentialFile(String fileName, int maxIO);
+ /**
+ * @param extension extension to filter files with. Its value should not contain '.', as the
+ * method appends one to it.
+ * @return
+ * @throws Exception
+ */
List<String> listFiles(String extension) throws Exception;
boolean isSupportsCallbacks();
@@ -59,7 +65,7 @@
void stop();
- /**
+ /**
* Create the directory if it doesn't exist yet
*/
void createDirs() throws Exception;
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -25,7 +25,7 @@
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
- private static final int N_MSGS = 100;
+ private static final int N_MSGS = 10;
@Override
protected void setUp() throws Exception
@@ -115,7 +115,7 @@
assertFalse("backup is started?", backupServer.isStarted());
liveServer.removeInterceptor(syncDelay);
backupServer.start();
- waitForBackup(sessionFactory, 5);
+ waitForBackup(sessionFactory, 20);
crash(session);
waitForServerInitialization(backupServer, 5);
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-01 16:15:10 UTC (rev 11273)
@@ -13,8 +13,8 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -27,7 +27,7 @@
* <p>
* We need to hijack the replication channel handler, because we need to
* <ol>
- * <li>send an early answer to the {@link PacketImpl#REPLICATION_SYNC} packet that signals being
+ * <li>send an early answer to the {@link PacketImpl#REPLICATION_SYNC_FILE} packet that signals being
* up-to-date
* <li>not send an answer to it, when we deliver the packet later.
* </ol>
@@ -135,10 +135,10 @@
deliver();
}
- if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
+ if (packet.getType() == PacketImpl.REPLICATION_START_STOP_SYNC && mustHold)
{
- ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
- if (syncMsg.isUpToDate() && !deliver)
+ ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
+ if (syncMsg.isSynchronizationFinished() && !deliver)
{
receivedUpToDate = true;
assert onHold == null;
13 years, 4 months
JBoss hornetq SVN: r11272 - branches/HORNETQ-720_Replication/examples.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-01 12:13:06 -0400 (Thu, 01 Sep 2011)
New Revision: 11272
Modified:
branches/HORNETQ-720_Replication/examples/pom.xml
Log:
fix maven warning
Modified: branches/HORNETQ-720_Replication/examples/pom.xml
===================================================================
--- branches/HORNETQ-720_Replication/examples/pom.xml 2011-09-01 16:12:50 UTC (rev 11271)
+++ branches/HORNETQ-720_Replication/examples/pom.xml 2011-09-01 16:13:06 UTC (rev 11272)
@@ -22,7 +22,6 @@
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
- <version>2.2</version>
<configuration>
<descriptor>src/main/assembly/dep.xml</descriptor>
</configuration>
13 years, 4 months
JBoss hornetq SVN: r11271 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-01 12:12:50 -0400 (Thu, 01 Sep 2011)
New Revision: 11271
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
Removed:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
Log:
HORNETQ-720 tests for LargeMessage sync'ing
Deleted: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-09-01 13:36:39 UTC (rev 11270)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-09-01 16:12:50 UTC (rev 11271)
@@ -1,208 +0,0 @@
-package org.hornetq.tests.integration.cluster.failover;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-import org.hornetq.tests.util.TransportConfigurationUtils;
-
-public class BackupJournalSyncTest extends FailoverTestBase
-{
-
- private ServerLocatorInternal locator;
- private ClientSessionFactoryInternal sessionFactory;
- private ClientSession session;
- private ClientProducer producer;
- private BackupSyncDelay syncDelay;
- private static final int N_MSGS = 100;
-
- @Override
- protected void setUp() throws Exception
- {
- startBackupServer = false;
- super.setUp();
- locator = getServerLocator();
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
- sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
- syncDelay = new BackupSyncDelay(backupServer, liveServer);
- }
-
- public void testNodeID() throws Exception
- {
- backupServer.start();
- waitForComponent(backupServer, 5);
- assertTrue("must be running", backupServer.isStarted());
- assertEquals("backup and live should have the same nodeID", liveServer.getServer().getNodeID(),
- backupServer.getServer().getNodeID());
- }
-
- public void testReserveFileIdValuesOnBackup() throws Exception
- {
- createProducerSendSomeMessages();
- JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
- for (int i = 0; i < 5; i++)
- {
- messageJournal.forceMoveNextFile();
- sendMessages(session, producer, N_MSGS);
- }
-
- backupServer.start();
-
- waitForBackup(sessionFactory, 10, false);
-
- // SEND more messages, now with the backup replicating
- sendMessages(session, producer, N_MSGS);
- Set<Long> liveIds = getFileIds(messageJournal);
-
- finishSyncAndFailover();
-
- JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
- Set<Long> backupIds = getFileIds(backupMsgJournal);
- assertEquals("File IDs must match!", liveIds, backupIds);
- }
-
- public void testReplicationDuringSync() throws Exception
- {
- createProducerSendSomeMessages();
- backupServer.start();
- waitForBackup(sessionFactory, 10, false);
-
- sendMessages(session, producer, N_MSGS);
- session.commit();
- receiveMsgs(0, N_MSGS);
- finishSyncAndFailover();
- }
-
- private void finishSyncAndFailover() throws Exception
- {
- syncDelay.deliverUpToDateMsg();
- waitForBackup(sessionFactory, 10, true);
- assertFalse("should not be initialized", backupServer.getServer().isInitialised());
- crash(session);
- waitForServerInitialization(backupServer, 5);
- }
-
- public void testMessageSyncSimple() throws Exception
- {
- createProducerSendSomeMessages();
- startBackupCrashLive();
- receiveMsgs(0, N_MSGS);
- }
-
- public void testMessageSync() throws Exception
- {
- createProducerSendSomeMessages();
- receiveMsgs(0, N_MSGS / 2);
- startBackupCrashLive();
- receiveMsgs(N_MSGS / 2, N_MSGS);
- }
-
- private void startBackupCrashLive() throws Exception
- {
- assertFalse("backup is started?", backupServer.isStarted());
- liveServer.removeInterceptor(syncDelay);
- backupServer.start();
- waitForBackup(sessionFactory, 5);
- crash(session);
- waitForServerInitialization(backupServer, 5);
- }
-
- private void createProducerSendSomeMessages() throws HornetQException, Exception
- {
- session = sessionFactory.createSession(true, true);
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer, N_MSGS);
- session.commit();
- }
-
- private void receiveMsgs(int start, int end) throws HornetQException
- {
- session.start();
- ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- receiveMessagesAndAck(consumer, start, end);
- consumer.close();
- session.commit();
- }
-
- private static void waitForServerInitialization(TestableServer server, int seconds)
- {
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (!server.isInitialised())
- {
- try
- {
- Thread.sleep(50);
- }
- catch (InterruptedException e)
- {
- // ignore
- }
- if (System.currentTimeMillis() > (time + toWait))
- {
- fail("component did not start within timeout of " + seconds);
- }
- }
- }
- private Set<Long> getFileIds(JournalImpl journal)
- {
- Set<Long> results = new HashSet<Long>();
- for (JournalFile jf : journal.getDataFiles())
- {
- results.add(Long.valueOf(jf.getFileID()));
- }
- return results;
- }
-
- static JournalImpl getMessageJournalFromServer(TestableServer server)
- {
- JournalStorageManager sm = (JournalStorageManager)server.getServer().getStorageManager();
- return (JournalImpl)sm.getMessageJournal();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- if (sessionFactory != null)
- sessionFactory.close();
- if (session != null)
- session.close();
- closeServerLocator(locator);
-
- super.tearDown();
- }
-
- @Override
- protected void createConfigs() throws Exception
- {
- createReplicatedConfigs();
- }
-
- @Override
- protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
- {
- return TransportConfigurationUtils.getInVMAcceptor(live);
- }
-
- @Override
- protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
- {
- return TransportConfigurationUtils.getInVMConnector(live);
- }
-
-
-}
Copied: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java (from rev 11243, branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java)
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-01 16:12:50 UTC (rev 11271)
@@ -0,0 +1,209 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupSyncJournalTest extends FailoverTestBase
+{
+
+ private ServerLocatorInternal locator;
+ private ClientSessionFactoryInternal sessionFactory;
+ private ClientSession session;
+ private ClientProducer producer;
+ private BackupSyncDelay syncDelay;
+ private static final int N_MSGS = 100;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startBackupServer = false;
+ super.setUp();
+ locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+ sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+ syncDelay = new BackupSyncDelay(backupServer, liveServer);
+ }
+
+ public void testNodeID() throws Exception
+ {
+ backupServer.start();
+ waitForComponent(backupServer, 5);
+ assertTrue("must be running", backupServer.isStarted());
+ assertEquals("backup and live should have the same nodeID", liveServer.getServer().getNodeID(),
+ backupServer.getServer().getNodeID());
+ }
+
+ public void testReserveFileIdValuesOnBackup() throws Exception
+ {
+ createProducerSendSomeMessages();
+ JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
+ for (int i = 0; i < 5; i++)
+ {
+ messageJournal.forceMoveNextFile();
+ sendMessages(session, producer, N_MSGS);
+ }
+
+ backupServer.start();
+
+ waitForBackup(sessionFactory, 10, false);
+
+ // SEND more messages, now with the backup replicating
+ sendMessages(session, producer, N_MSGS);
+ Set<Long> liveIds = getFileIds(messageJournal);
+
+ finishSyncAndFailover();
+
+ JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+ Set<Long> backupIds = getFileIds(backupMsgJournal);
+ assertEquals("File IDs must match!", liveIds, backupIds);
+ }
+
+ public void testReplicationDuringSync() throws Exception
+ {
+ createProducerSendSomeMessages();
+ backupServer.start();
+ waitForBackup(sessionFactory, 10, false);
+
+ sendMessages(session, producer, N_MSGS);
+ session.commit();
+ receiveMsgs(0, N_MSGS);
+ finishSyncAndFailover();
+ }
+
+ private void finishSyncAndFailover() throws Exception
+ {
+ syncDelay.deliverUpToDateMsg();
+ waitForBackup(sessionFactory, 10, true);
+ assertFalse("should not be initialized", backupServer.getServer().isInitialised());
+ crash(session);
+ waitForServerInitialization(backupServer, 5);
+ }
+
+ public void testMessageSyncSimple() throws Exception
+ {
+ createProducerSendSomeMessages();
+ startBackupCrashLive();
+ receiveMsgs(0, N_MSGS);
+ }
+
+ public void testMessageSync() throws Exception
+ {
+ createProducerSendSomeMessages();
+ receiveMsgs(0, N_MSGS / 2);
+ startBackupCrashLive();
+ receiveMsgs(N_MSGS / 2, N_MSGS);
+ }
+
+ private void startBackupCrashLive() throws Exception
+ {
+ assertFalse("backup is started?", backupServer.isStarted());
+ liveServer.removeInterceptor(syncDelay);
+ backupServer.start();
+ waitForBackup(sessionFactory, 5);
+ crash(session);
+ waitForServerInitialization(backupServer, 5);
+ }
+
+ private void createProducerSendSomeMessages() throws HornetQException, Exception
+ {
+ session = sessionFactory.createSession(true, true);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+ producer = session.createProducer(FailoverTestBase.ADDRESS);
+ sendMessages(session, producer, N_MSGS);
+ session.commit();
+ }
+
+ private void receiveMsgs(int start, int end) throws HornetQException
+ {
+ session.start();
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ receiveMessagesAndAck(consumer, start, end);
+ consumer.close();
+ session.commit();
+ }
+
+ private static void waitForServerInitialization(TestableServer server, int seconds)
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while (!server.isInitialised())
+ {
+ try
+ {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("component did not start within timeout of " + seconds);
+ }
+ }
+ }
+
+ private Set<Long> getFileIds(JournalImpl journal)
+ {
+ Set<Long> results = new HashSet<Long>();
+ for (JournalFile jf : journal.getDataFiles())
+ {
+ results.add(Long.valueOf(jf.getFileID()));
+ }
+ return results;
+ }
+
+ static JournalImpl getMessageJournalFromServer(TestableServer server)
+ {
+ JournalStorageManager sm = (JournalStorageManager)server.getServer().getStorageManager();
+ return (JournalImpl)sm.getMessageJournal();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (sessionFactory != null)
+ sessionFactory.close();
+ if (session != null)
+ session.close();
+ closeServerLocator(locator);
+
+ super.tearDown();
+ }
+
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ createReplicatedConfigs();
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMAcceptor(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMConnector(live);
+ }
+
+
+}
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-01 16:12:50 UTC (rev 11271)
@@ -0,0 +1,37 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+
+public class BackupSyncLargeMessageTest extends BackupSyncJournalTest
+{
+
+ /**
+ * @param i
+ * @param message
+ */
+ @Override
+ protected void assertMessageBody(final int i, final ClientMessage message)
+ {
+ assertLargeMessageBody(i, message);
+ }
+
+ @Override
+ protected ServerLocatorInternal getServerLocator() throws Exception
+ {
+ ServerLocator locator = super.getServerLocator();
+ locator.setMinLargeMessageSize(MIN_LARGE_MESSAGE);
+ return (ServerLocatorInternal)locator;
+ }
+
+ /**
+ * @param i
+ * @param message
+ */
+ @Override
+ protected void setBody(final int i, final ClientMessage message) throws Exception
+ {
+ setLargeMessageBody(i, message);
+ }
+}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-01 13:36:39 UTC (rev 11270)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-01 16:12:50 UTC (rev 11271)
@@ -24,10 +24,12 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
@@ -47,6 +49,7 @@
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ReplicatedBackupUtils;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
/**
* A FailoverTestBase
@@ -59,6 +62,11 @@
protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+ /*
+ * Used only by tests of large messages.
+ */
+ protected static final int MIN_LARGE_MESSAGE = 1024;
+ private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
// Attributes ----------------------------------------------------
@@ -121,6 +129,40 @@
return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager));
}
+ /**
+ * Large message version of {@link #setBody(int, ClientMessage)}.
+ * @param i
+ * @param message
+ * @param size
+ */
+ protected static void setLargeMessageBody(final int i, final ClientMessage message)
+ {
+ try
+ {
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(LARGE_MESSAGE_SIZE));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Large message version of {@link #assertMessageBody(int, ClientMessage)}.
+ * @param i
+ * @param message
+ */
+ protected static void assertLargeMessageBody(final int i, final ClientMessage message)
+ {
+ HornetQBuffer buffer = message.getBodyBuffer();
+
+ for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
+ {
+ Assert.assertTrue("expecting " + LARGE_MESSAGE_SIZE + " bytes, got " + j, buffer.readable());
+ Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j), buffer.readByte());
+ }
+ }
+
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2011-09-01 13:36:39 UTC (rev 11270)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2011-09-01 16:12:50 UTC (rev 11271)
@@ -13,14 +13,10 @@
package org.hornetq.tests.integration.cluster.failover;
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.util.UnitTestCase;
/**
* A LargeMessageFailoverTest
@@ -36,11 +32,6 @@
private static final Logger log = Logger.getLogger(LargeMessageFailoverTest.class);
-
- private static final int MIN_LARGE_MESSAGE = 1024;
-
- private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -53,17 +44,11 @@
// Protected -----------------------------------------------------
- /**
- * @param name
- */
public LargeMessageFailoverTest(final String name)
{
super(name);
}
- /**
- *
- */
public LargeMessageFailoverTest()
{
super();
@@ -90,13 +75,7 @@
@Override
protected void assertMessageBody(final int i, final ClientMessage message)
{
- HornetQBuffer buffer = message.getBodyBuffer();
-
- for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
- {
- Assert.assertTrue("expecting more bytes", buffer.readable());
- Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j), buffer.readByte());
- }
+ assertLargeMessageBody(i, message);
}
@@ -104,12 +83,10 @@
protected ServerLocatorInternal getServerLocator() throws Exception
{
ServerLocator locator = super.getServerLocator();
- locator.setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
- return (ServerLocatorInternal) locator;
+ locator.setMinLargeMessageSize(MIN_LARGE_MESSAGE);
+ return (ServerLocatorInternal)locator;
}
-
-
/**
* @param i
* @param message
@@ -117,11 +94,6 @@
@Override
protected void setBody(final int i, final ClientMessage message) throws Exception
{
- message.setBodyInputStream(UnitTestCase.createFakeLargeStream(LARGE_MESSAGE_SIZE));
+ setLargeMessageBody(i, message);
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
13 years, 4 months