Author: timfox
Date: 2009-11-10 15:03:11 -0500 (Tue, 10 Nov 2009)
New Revision: 8256
Modified:
trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
trunk/examples/jms/bridge/server0/hornetq-configuration.xml
trunk/examples/jms/bridge/server1/hornetq-configuration.xml
trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java
trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java
trunk/examples/jms/large-message/build.xml
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/utils/XMLUtil.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
mainly
https://jira.jboss.org/jira/browse/HORNETQ-182
Modified: trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
===================================================================
--- trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java 2009-11-10
17:25:45 UTC (rev 8255)
+++ trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -178,7 +178,10 @@
{
for (Process server : servers)
{
- stopServer(server);
+ if (server != null)
+ {
+ stopServer(server);
+ }
}
}
Modified: trunk/examples/jms/bridge/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server0/hornetq-configuration.xml 2009-11-10 17:25:45 UTC
(rev 8255)
+++ trunk/examples/jms/bridge/server0/hornetq-configuration.xml 2009-11-10 20:03:11 UTC
(rev 8256)
@@ -42,10 +42,23 @@
<filter string="name='aardvark'"/>
<transformer-class-name>org.hornetq.jms.example.HatColourChangeTransformer</transformer-class-name>
<reconnect-attempts>-1</reconnect-attempts>
+
+ <confirmation-window-size>500000</confirmation-window-size>
+
<connector-ref connector-name="remote-connector"/>
</bridge>
</bridges>
+
+
+ <address-settings>
+ <address-setting match="jms.queue.sausage-factory">
+ <max-size-bytes>10000000</max-size-bytes>
+ <page-size-bytes>1000000</page-size-bytes>
+ <address-full-policy>PAGE</address-full-policy>
+ </address-setting>
+ </address-settings>
+
<!-- Other config -->
<security-settings>
Modified: trunk/examples/jms/bridge/server1/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server1/hornetq-configuration.xml 2009-11-10 17:25:45 UTC
(rev 8255)
+++ trunk/examples/jms/bridge/server1/hornetq-configuration.xml 2009-11-10 20:03:11 UTC
(rev 8256)
@@ -23,6 +23,15 @@
<!-- Other config -->
+ <address-settings>
+
+ <address-setting match="jms.queue.sausage-factory">
+ <max-size-bytes>10000</max-size-bytes>
+ <page-size-bytes>1000</page-size-bytes>
+ <address-full-policy>PAGE</address-full-policy>
+ </address-setting>
+ </address-settings>
+
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.#">
Modified: trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java
===================================================================
--- trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java 2009-11-10
17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -12,6 +12,7 @@
*/
package org.hornetq.jms.example;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
@@ -98,42 +99,61 @@
// Step 12. We create a JMS MessageProducer object on server 0
MessageProducer producer = session0.createProducer(sausageFactory);
- // Step 13. We create and send a message representing an aardvark with a green
hat to the sausage-factory
- // on node 0
- Message message = session0.createMessage();
+ final int numMessages = 100000;
+
+ byte[] bytes = new byte[1000];
- message.setStringProperty("name", "aardvark");
+ for (int i = 0; i < numMessages; i++)
+ {
- message.setStringProperty("hat", "green");
+ // Step 13. We create and send a message representing an aardvark with a
green hat to the sausage-factory
+ // on node 0
+ BytesMessage message = session0.createBytesMessage();
- producer.send(message);
+ message.setStringProperty("name", "aardvark");
- System.out.println("Sent " +
message.getStringProperty("name") +
- " message with " +
- message.getStringProperty("hat") +
- " hat to sausage-factory on node 0");
+ message.setStringProperty("hat", "green");
+
+ message.writeBytes(bytes);
+ producer.send(message);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("Sent " + i);
+ }
+ }
+
// Step 14 - we successfully receive the aardvark message from the
mincing-machine one node 1. The aardvark's
// hat is now blue since it has been transformed!
- Message receivedMessage = consumer.receive(5000);
+ for (int i = 0; i < numMessages; i++)
+ {
+ Message receivedMessage = consumer.receive(5000);
+
+ if (receivedMessage == null)
+ {
+ throw new IllegalStateException("Did not receive message");
+ }
- System.out.println("Received " +
receivedMessage.getStringProperty("name") +
- " message with " +
- receivedMessage.getStringProperty("hat") +
- " hat from mincing-machine on node 1");
+ if (i % 1000 == 0)
+ {
+ System.out.println("Received " + i);
+ }
- // Step 13. We create and send another message, this time representing a
sasquatch with a mauve hat to the
- // sausage-factory on node 0. This won't be bridged to the mincing-machine
since we only want aardvarks, not sasquatches
+ // Step 13. We create and send another message, this time representing a
sasquatch with a mauve hat to the
+ // sausage-factory on node 0. This won't be bridged to the
mincing-machine since we only want aardvarks, not
+ // sasquatches
+ }
+
+ Message message = session0.createMessage();
- message = session0.createMessage();
-
message.setStringProperty("name", "sasquatch");
- message.setStringProperty("hat", "mauve");
+ message.setStringProperty("hat", "mauve");
producer.send(message);
-
+
System.out.println("Sent " +
message.getStringProperty("name") +
" message with " +
message.getStringProperty("hat") +
@@ -141,7 +161,7 @@
// Step 14. We don't receive the message since it has not been bridged.
- receivedMessage = (TextMessage)consumer.receive(1000);
+ Message receivedMessage = (TextMessage)consumer.receive(1000);
if (receivedMessage == null)
{
Modified:
trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java
===================================================================
---
trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -33,7 +33,7 @@
SimpleString oldProp = message.getSimpleStringProperty(propName);
- System.out.println("Old hat colour is " + oldProp);
+ //System.out.println("Old hat colour is " + oldProp);
//Change the colour
message.putStringProperty(propName, new SimpleString("blue"));
Modified: trunk/examples/jms/large-message/build.xml
===================================================================
--- trunk/examples/jms/large-message/build.xml 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/large-message/build.xml 2009-11-10 20:03:11 UTC (rev 8256)
@@ -23,7 +23,7 @@
<antcall target="runExample">
<param name="example.classname"
value="org.hornetq.jms.example.LargeMessageExample"/>
- <!-- We limit the client to running in only 50MB of RAM -->
+ <!-- We limit the client to running in only 50MB of RAM -->
<param name="java-min-memory" value="50M"/>
<param name="java-max-memory" value="50M"/>
</antcall>
@@ -36,7 +36,7 @@
</antcall>
</target>
- <target name="delete-large-messages">
+ <target name="delete-large-messages" depends="clean">
<delete file="huge_message_to_send.dat"/>
<delete file="huge_message_received.dat"/>
</target>
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
---
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -45,11 +45,13 @@
new LargeMessageExample().run(args);
}
- // The message we will send is size 256MB, even though we are only running in 50MB of
RAM on both client and server.
- // HornetQ will support much larger message sizes, but we use 512MB so the example
runs in reasonable time.
- // private final long FILE_SIZE = 256L * 1024 * 1024;
+ // The message we will send is size 10GiB, even though we are only running in 50MB of
RAM on both client and server.
+ // This may take some considerable time to create, send and consume - if it takes too
long or you don't have
+ // enough disk space just reduce the file size here
- private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 2 GiB message
+ // private final long FILE_SIZE = 256L * 1024 * 1024;
+
+ private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
public boolean runExample() throws Exception
{
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-10 17:25:45 UTC (rev
8255)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-10 20:03:11 UTC (rev
8256)
@@ -316,6 +316,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="use-duplicate-detection" type="xsd:boolean">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="confirmation-window-size" type="xsd:int">
+ </xsd:element>
<xsd:choice>
<xsd:element maxOccurs="1" minOccurs="1"
name="connector-ref" type="connector-refType">
</xsd:element>
@@ -342,7 +344,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="max-hops" type="xsd:int">
</xsd:element>
-
+ <xsd:element maxOccurs="1" minOccurs="0"
name="confirmation-window-size" type="xsd:int">
+ </xsd:element>
<xsd:choice>
<xsd:element maxOccurs="unbounded" minOccurs="1"
name="connector-ref" type="connector-refType">
</xsd:element>
Modified: trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-11-10
17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -53,6 +53,8 @@
private boolean failoverOnServerShutdown;
private boolean useDuplicateDetection;
+
+ private int confirmationWindowSize;
public BridgeConfiguration(final String name,
final String queueName,
@@ -64,6 +66,7 @@
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
+ final int confirmationWindowSize,
final Pair<String, String> connectorPair)
{
this.name = name;
@@ -76,6 +79,7 @@
this.reconnectAttempts = reconnectAttempts;
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.useDuplicateDetection = useDuplicateDetection;
+ this.confirmationWindowSize = confirmationWindowSize;
this.connectorPair = connectorPair;
this.discoveryGroupName = null;
}
@@ -90,6 +94,7 @@
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
+ final int confirmationWindowSize,
final String discoveryGroupName)
{
this.name = name;
@@ -102,6 +107,7 @@
this.reconnectAttempts = reconnectAttempts;
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.useDuplicateDetection = useDuplicateDetection;
+ this.confirmationWindowSize = confirmationWindowSize;
this.connectorPair = null;
this.discoveryGroupName = discoveryGroupName;
}
@@ -165,6 +171,11 @@
{
return useDuplicateDetection;
}
+
+ public int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
/**
* @param name the name to set
@@ -261,4 +272,12 @@
{
this.useDuplicateDetection = useDuplicateDetection;
}
+
+ /**
+ * @param confirmationWindowSize the confirmationWindowSize to set
+ */
+ public void setConfirmationWindowSize(int confirmationWindowSize)
+ {
+ this.confirmationWindowSize = confirmationWindowSize;
+ }
}
Modified:
trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java
===================================================================
---
trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -47,12 +47,15 @@
private final int maxHops;
+ private final int confirmationWindowSize;
+
public ClusterConnectionConfiguration(final String name,
final String address,
final long retryInterval,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
+ final int confirmationWindowSize,
final List<Pair<String, String>>
staticConnectorNamePairs)
{
this.name = name;
@@ -63,6 +66,7 @@
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = null;
this.maxHops = maxHops;
+ this.confirmationWindowSize = confirmationWindowSize;
}
public ClusterConnectionConfiguration(final String name,
@@ -71,6 +75,7 @@
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
+ final int confirmationWindowSize,
final String discoveryGroupName)
{
this.name = name;
@@ -81,6 +86,7 @@
this.discoveryGroupName = discoveryGroupName;
this.staticConnectorNamePairs = null;
this.maxHops = maxHops;
+ this.confirmationWindowSize = confirmationWindowSize;
}
public String getName()
@@ -108,6 +114,11 @@
return maxHops;
}
+ public int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
+
public List<Pair<String, String>> getStaticConnectorNamePairs()
{
return staticConnectorNamePairs;
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-11-10 17:25:45
UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-11-10 20:03:11
UTC (rev 8256)
@@ -154,7 +154,7 @@
public static final int DEFAULT_CLUSTER_MAX_HOPS = 1;
- public static final int DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
+ public static final long DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
public static final boolean DEFAULT_DIVERT_EXCLUSIVE = false;
@@ -166,7 +166,7 @@
public static final int DEFAULT_MEMORY_WARNING_THRESHOLD = 25;
- public static final long DEFAULT_MEMORY_MEASURE_INTERVAL = 3000; // in milliseconds
+ public static final long DEFAULT_MEMORY_MEASURE_INTERVAL = -1; // in milliseconds
public static final int DEFAULT_BACKUP_WINDOW_SIZE = 1024 * 1024;
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-10 17:25:45
UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-10 20:03:11
UTC (rev 8256)
@@ -36,7 +36,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
@@ -74,25 +73,28 @@
private static final String DEFAULT_CONFIGURATION_URL =
"hornetq-configuration.xml";
private static final String CONFIGURATION_SCHEMA_URL =
"schema/hornetq-configuration.xsd";
+
+ //For a bridge confirmations must be activated or send acknowledgements won't
return
+
+ public static final int DEFAULT_CONFIRMATION_WINDOW_SIZE = 1024 * 1024;
// Static --------------------------------------------------------------------------
-
+
// Attributes ----------------------------------------------------------------------
private String configurationUrl = DEFAULT_CONFIGURATION_URL;
-
private boolean started;
-
+
// Public -------------------------------------------------------------------------
public synchronized void start() throws Exception
- {
+ {
if (started)
{
return;
}
-
+
URL url = getClass().getClassLoader().getResource(configurationUrl);
log.debug("Loading server configuration from " + url);
@@ -105,20 +107,22 @@
clustered = getBoolean(e, "clustered", clustered);
backup = getBoolean(e, "backup", backup);
-
+
sharedStore = getBoolean(e, "shared-store", sharedStore);
-
- //Defaults to true when using FileConfiguration
+
+ // Defaults to true when using FileConfiguration
fileDeploymentEnabled = getBoolean(e, "file-deployment-enabled", true);
-
+
persistenceEnabled = getBoolean(e, "persistence-enabled",
persistenceEnabled);
- persistDeliveryCountBeforeDelivery = getBoolean(e,
"persist-delivery-count-before-delivery", persistDeliveryCountBeforeDelivery);
-
+ persistDeliveryCountBeforeDelivery = getBoolean(e,
+
"persist-delivery-count-before-delivery",
+
persistDeliveryCountBeforeDelivery);
+
// NOTE! All the defaults come from the super class
scheduledThreadPoolMaxSize = getInteger(e,
"scheduled-thread-pool-max-size", scheduledThreadPoolMaxSize, GT_ZERO);
-
+
threadPoolMaxSize = getInteger(e, "thread-pool-max-size",
threadPoolMaxSize, MINUS_ONE_OR_GT_ZERO);
securityEnabled = getBoolean(e, "security-enabled", securityEnabled);
@@ -130,35 +134,53 @@
securityInvalidationInterval = getLong(e,
"security-invalidation-interval", securityInvalidationInterval, GT_ZERO);
connectionTTLOverride = getLong(e, "connection-ttl-override",
connectionTTLOverride, MINUS_ONE_OR_GT_ZERO);
-
- asyncConnectionExecutionEnabled = getBoolean(e,
"async-connection-execution-enabled", asyncConnectionExecutionEnabled);
+ asyncConnectionExecutionEnabled = getBoolean(e,
+
"async-connection-execution-enabled",
+ asyncConnectionExecutionEnabled);
+
transactionTimeout = getLong(e, "transaction-timeout",
transactionTimeout, GT_ZERO);
- transactionTimeoutScanPeriod = getLong(e,
"transaction-timeout-scan-period", transactionTimeoutScanPeriod, GT_ZERO);
+ transactionTimeoutScanPeriod = getLong(e,
+
"transaction-timeout-scan-period",
+ transactionTimeoutScanPeriod,
+ GT_ZERO);
messageExpiryScanPeriod = getLong(e, "message-expiry-scan-period",
messageExpiryScanPeriod, GT_ZERO);
- messageExpiryThreadPriority = getInteger(e,
"message-expiry-thread-priority", messageExpiryThreadPriority,
THREAD_PRIORITY_RANGE);
+ messageExpiryThreadPriority = getInteger(e,
+
"message-expiry-thread-priority",
+ messageExpiryThreadPriority,
+ THREAD_PRIORITY_RANGE);
idCacheSize = getInteger(e, "id-cache-size", idCacheSize, GT_ZERO);
persistIDCache = getBoolean(e, "persist-id-cache", persistIDCache);
- managementAddress = new SimpleString(getString(e, "management-address",
managementAddress.toString(), NOT_NULL_OR_EMPTY));
+ managementAddress = new SimpleString(getString(e,
+ "management-address",
+ managementAddress.toString(),
+ NOT_NULL_OR_EMPTY));
managementNotificationAddress = new SimpleString(getString(e,
"management-notification-address",
-
managementNotificationAddress.toString(), NOT_NULL_OR_EMPTY));
+
managementNotificationAddress.toString(),
+ NOT_NULL_OR_EMPTY));
- managementClusterPassword = getString(e, "management-cluster-password",
managementClusterPassword, NOT_NULL_OR_EMPTY);
+ managementClusterPassword = getString(e,
+ "management-cluster-password",
+ managementClusterPassword,
+ NOT_NULL_OR_EMPTY);
managementClusterUser = getString(e, "management-cluster-user",
managementClusterUser, NOT_NULL_OR_EMPTY);
managementRequestTimeout = getLong(e, "management-request-timeout",
managementRequestTimeout, GT_ZERO);
-
- logDelegateFactoryClassName = getString(e,
"log-delegate-factory-class-name", logDelegateFactoryClassName,
NOT_NULL_OR_EMPTY);
+ logDelegateFactoryClassName = getString(e,
+
"log-delegate-factory-class-name",
+ logDelegateFactoryClassName,
+ NOT_NULL_OR_EMPTY);
+
NodeList interceptorNodes =
e.getElementsByTagName("remoting-interceptors");
ArrayList<String> interceptorList = new ArrayList<String>();
@@ -259,7 +281,7 @@
for (int i = 0; i < gaNodes.getLength(); i++)
{
- Element gaNode = (Element) gaNodes.item(i);
+ Element gaNode = (Element)gaNodes.item(i);
parseGroupingHandlerConfiguration(gaNode);
}
@@ -312,23 +334,23 @@
journalSyncNonTransactional = getBoolean(e,
"journal-sync-non-transactional", journalSyncNonTransactional);
journalFileSize = getInteger(e, "journal-file-size", journalFileSize,
GT_ZERO);
-
+
journalAIOFlushSync = getBoolean(e, "journal-aio-flush-on-sync",
DEFAULT_JOURNAL_AIO_FLUSH_SYNC);
-
+
journalAIOBufferTimeout = getInteger(e, "journal-aio-buffer-timeout",
DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT, GT_ZERO);
-
+
journalAIOBufferSize = getInteger(e, "journal-aio-buffer-size",
DEFAULT_JOURNAL_AIO_BUFFER_SIZE, GT_ZERO);
journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles,
GT_ZERO);
-
+
journalCompactMinFiles = getInteger(e, "journal-compact-min-files",
journalCompactMinFiles, GE_ZERO);
journalCompactPercentage = getInteger(e, "journal-compact-percentage",
journalCompactPercentage, PERCENTAGE);
journalMaxAIO = getInteger(e, "journal-max-aio", journalMaxAIO,
GT_ZERO);
-
+
logJournalWriteRate = getBoolean(e, "log-journal-write-rate",
DEFAULT_JOURNAL_LOG_WRITE_RATE);
-
+
journalPerfBlastPages = getInteger(e, "perf-blast-pages",
DEFAULT_JOURNAL_PERF_BLAST_PAGES, MINUS_ONE_OR_GT_ZERO);
runSyncSpeedTest = getBoolean(e, "run-sync-speed-test",
runSyncSpeedTest);
@@ -339,23 +361,28 @@
messageCounterSamplePeriod = getLong(e, "message-counter-sample-period",
messageCounterSamplePeriod, GT_ZERO);
- messageCounterMaxDayHistory = getInteger(e,
"message-counter-max-day-history", messageCounterMaxDayHistory, GT_ZERO);
-
- serverDumpInterval = getLong(e, "server-dump-interval",
serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in milliseconds
+ messageCounterMaxDayHistory = getInteger(e,
+
"message-counter-max-day-history",
+ messageCounterMaxDayHistory,
+ GT_ZERO);
+ serverDumpInterval = getLong(e, "server-dump-interval",
serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in
+
// milliseconds
+
memoryWarningThreshold = getInteger(e, "memory-warning-threshold",
memoryWarningThreshold, PERCENTAGE);
-
- memoryMeasureInterval = getLong(e, "memory-measure-interval",
memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in milliseconds
-
+
+ memoryMeasureInterval = getLong(e, "memory-measure-interval",
memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in
+
// milliseconds
+
backupWindowSize = getInteger(e, "backup-window-size",
DEFAULT_BACKUP_WINDOW_SIZE, MINUS_ONE_OR_GT_ZERO);
-
+
started = true;
}
-
+
public synchronized void stop() throws Exception
{
super.stop();
-
+
started = false;
}
@@ -386,7 +413,7 @@
for (int i = 0; i < paramsNodes.getLength(); i++)
{
Node paramNode = paramsNodes.item(i);
- NamedNodeMap attributes =paramNode.getAttributes();
+ NamedNodeMap attributes = paramNode.getAttributes();
Node nkey = attributes.getNamedItem("key");
@@ -405,7 +432,7 @@
String name = e.getAttribute("name");
String localAddress = getString(e, "local-bind-address", null,
NO_CHECK);
-
+
int localBindPort = getInteger(e, "local-bind-port", -1,
MINUS_ONE_OR_GT_ZERO);
String groupAddress = getString(e, "group-address", null,
NOT_NULL_OR_EMPTY);
@@ -487,12 +514,16 @@
boolean duplicateDetection = getBoolean(e, "use-duplicate-detection",
DEFAULT_CLUSTER_DUPLICATE_DETECTION);
- boolean forwardWhenNoConsumers = getBoolean(e,
"forward-when-no-consumers", DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS);
+ boolean forwardWhenNoConsumers = getBoolean(e,
+ "forward-when-no-consumers",
+
DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS);
int maxHops = getInteger(e, "max-hops", DEFAULT_CLUSTER_MAX_HOPS,
GE_ZERO);
- long retryInterval = getLong(e, "retry-interval",
(long)DEFAULT_CLUSTER_RETRY_INTERVAL, GT_ZERO);
+ long retryInterval = getLong(e, "retry-interval",
DEFAULT_CLUSTER_RETRY_INTERVAL, GT_ZERO);
+ int confirmationWindowSize = getInteger(e, "confirmation-window-size",
DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
+
String discoveryGroupName = null;
List<Pair<String, String>> connectorPairs = new
ArrayList<Pair<String, String>>();
@@ -531,21 +562,23 @@
if (discoveryGroupName == null)
{
config = new ClusterConnectionConfiguration(name,
- address,
- retryInterval,
+ address,
+ retryInterval,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
+ confirmationWindowSize,
connectorPairs);
}
else
{
config = new ClusterConnectionConfiguration(name,
address,
- retryInterval,
+ retryInterval,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
+ confirmationWindowSize,
discoveryGroupName);
}
@@ -556,15 +589,15 @@
{
String name = node.getAttribute("name");
String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
- String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+ String address = getString(node, "address", null, NOT_NULL_OR_EMPTY);
Integer timeout = getInteger(node, "timeout",
GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
groupingHandlerConfiguration = new GroupingHandlerConfiguration(new
SimpleString(name),
-
type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())?
GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
- new SimpleString(address),
- timeout);
+
type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType()) ?
GroupingHandlerConfiguration.TYPE.LOCAL
+
: GroupingHandlerConfiguration.TYPE.REMOTE,
+ new
SimpleString(address),
+ timeout);
}
-
private void parseBridgeConfiguration(final Element brNode)
{
String name = brNode.getAttribute("name");
@@ -576,17 +609,28 @@
String transformerClassName = getString(brNode, "transformer-class-name",
null, NO_CHECK);
long retryInterval = getLong(brNode, "retry-interval",
DEFAULT_RETRY_INTERVAL, GT_ZERO);
-
- double retryIntervalMultiplier = getDouble(brNode,
"retry-interval-multiplier", DEFAULT_RETRY_INTERVAL_MULTIPLIER, GT_ZERO);
- int reconnectAttempts = getInteger(brNode, "reconnect-attempts",
DEFAULT_BRIDGE_RECONNECT_ATTEMPTS, MINUS_ONE_OR_GE_ZERO);
+ //Default bridge conf
+ int confirmationWindowSize = getInteger(brNode,
"confirmation-window-size", DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
+
+ double retryIntervalMultiplier = getDouble(brNode,
+ "retry-interval-multiplier",
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ GT_ZERO);
- boolean failoverOnServerShutdown = getBoolean(brNode,
"failover-on-server-shutdown",
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+ int reconnectAttempts = getInteger(brNode,
+ "reconnect-attempts",
+ DEFAULT_BRIDGE_RECONNECT_ATTEMPTS,
+ MINUS_ONE_OR_GE_ZERO);
+ boolean failoverOnServerShutdown = getBoolean(brNode,
+
"failover-on-server-shutdown",
+
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+
boolean useDuplicateDetection = getBoolean(brNode,
"use-duplicate-detection", DEFAULT_BRIDGE_DUPLICATE_DETECTION);
String filterString = null;
-
+
Pair<String, String> connectorPair = null;
String discoveryGroupName = null;
@@ -636,6 +680,7 @@
reconnectAttempts,
failoverOnServerShutdown,
useDuplicateDetection,
+ confirmationWindowSize,
connectorPair);
}
else
@@ -650,6 +695,7 @@
reconnectAttempts,
failoverOnServerShutdown,
useDuplicateDetection,
+ confirmationWindowSize,
discoveryGroupName);
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -15,19 +15,17 @@
import static org.hornetq.utils.DataConstants.SIZE_INT;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.server.impl.ServerMessageImpl;
/**
@@ -222,9 +220,9 @@
}
@Override
- public synchronized int decrementRefCount(PagingStore pagingStore, MessageReference
reference) throws Exception
+ public synchronized int decrementRefCount(MessageReference reference) throws
Exception
{
- int currentRefCount = super.decrementRefCount(pagingStore, reference);
+ int currentRefCount = super.decrementRefCount(reference);
// We use <= as this could be used by load.
// because of a failure, no references were loaded, so we have 0... and we still
need to delete the associated
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-10
17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -245,7 +245,7 @@
{
throw new IllegalArgumentException("ID is null");
}
-
+
long id = props.getLongProperty(ManagementHelper.HDR_BINDING_ID);
SimpleString filterString =
props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -254,7 +254,7 @@
{
throw new IllegalArgumentException("Distance is null");
}
-
+
int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
QueueInfo info = new QueueInfo(routingName, clusterName, address,
filterString, id, distance);
@@ -271,7 +271,7 @@
{
throw new IllegalStateException("No cluster name");
}
-
+
SimpleString clusterName =
props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
QueueInfo info = queueInfos.remove(clusterName);
@@ -291,7 +291,7 @@
{
throw new IllegalStateException("No cluster name");
}
-
+
SimpleString clusterName =
props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
SimpleString filterString =
props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -606,7 +606,7 @@
cache.addToCache(duplicateIDBytes, context.getTransaction());
}
-
+
setPagingStore(message);
if (context.getTransaction() == null)
@@ -679,7 +679,7 @@
public MessageReference reroute(final ServerMessage message, final Queue queue, final
Transaction tx) throws Exception
{
setPagingStore(message);
-
+
MessageReference reference = message.createReference(queue);
if (message.containsProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME))
@@ -690,10 +690,8 @@
message.incrementDurableRefCount();
- PagingStore store = pagingManager.getPageStore(message.getDestination());
+ message.incrementRefCount(reference);
- message.incrementRefCount(store, reference);
-
if (tx == null)
{
queue.addLast(reference);
@@ -843,7 +841,7 @@
{
PagingStore store = pagingManager.getPageStore(message.getDestination());
- message.setPagingStore(store);
+ message.setPagingStore(store);
}
private void routeDirect(final ServerMessage message, final Queue queue, final boolean
applyFilters) throws Exception
@@ -864,8 +862,6 @@
Transaction tx = context.getTransaction();
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
for (Queue queue : context.getQueues())
{
MessageReference reference = message.createReference(queue);
@@ -918,7 +914,7 @@
}
}
- message.incrementRefCount(store, reference);
+ message.incrementRefCount(reference);
}
if (tx != null)
@@ -1255,9 +1251,7 @@
message.decrementDurableRefCount();
}
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- message.decrementRefCount(store, ref);
+ message.decrementRefCount(ref);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-10 17:25:45 UTC (rev
8255)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-10 20:03:11 UTC (rev
8256)
@@ -31,18 +31,16 @@
MessageReference createReference(Queue queue);
- int incrementRefCount(PagingStore pagingStore, MessageReference reference)
- throws Exception;
+ int incrementRefCount(MessageReference reference) throws Exception;
- int decrementRefCount(PagingStore pagingStore, MessageReference reference)
- throws Exception;
-
+ int decrementRefCount(MessageReference reference) throws Exception;
+
int incrementDurableRefCount();
int decrementDurableRefCount();
ServerMessage copy(long newID) throws Exception;
-
+
ServerMessage copy() throws Exception;
int getMemoryEstimate();
@@ -50,16 +48,16 @@
int getRefCount();
ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;
-
- void setOriginalHeaders(ServerMessage other, boolean expiry);
-
+
+ void setOriginalHeaders(ServerMessage other, boolean expiry);
+
void setPagingStore(PagingStore store);
-
+
PagingStore getPagingStore();
-
+
boolean page(boolean duplicateDetection) throws Exception;
-
+
boolean page(long transactionID, boolean duplicateDetection) throws Exception;
-
+
boolean storeIsPaging();
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-10
17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -117,6 +117,8 @@
private final boolean failoverOnServerShutdown;
+ private final int confirmationWindowSize;
+
private final SimpleString idsHeaderName;
private MessageFlowRecord flowRecord;
@@ -133,6 +135,8 @@
private NotificationService notificationService;
+ private ClientConsumer notifConsumer;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -160,6 +164,7 @@
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
+ final int confirmationWindowSize,
final SimpleString managementAddress,
final SimpleString managementNotificationAddress,
final String clusterUser,
@@ -184,6 +189,13 @@
this.useDuplicateDetection = useDuplicateDetection;
+ if (!(confirmationWindowSize > 0))
+ {
+ throw new IllegalStateException("confirmation-window-size must be > 0
for a bridge");
+ }
+
+ this.confirmationWindowSize = confirmationWindowSize;
+
this.discoveryAddress = discoveryAddress;
this.discoveryPort = discoveryPort;
@@ -253,9 +265,11 @@
}
Queue queue = null;
+
for (MessageReference ref2 : list)
{
queue = ref2.getQueue();
+
queue.cancel(ref2);
}
@@ -381,7 +395,7 @@
{
return HandleStatus.NO_MATCH;
}
-
+
synchronized (this)
{
if (!active)
@@ -521,15 +535,15 @@
{
active = false;
}
-
+
cancelRefs();
}
else
{
setupNotificationConsumer();
-
+
active = true;
-
+
if (queue != null)
{
queue.deliverAsync(executor);
@@ -543,8 +557,6 @@
}
}
- private ClientConsumer notifConsumer;
-
// TODO - we should move this code to the ClusterConnectorImpl - and just execute it
when the bridge
// connection is opened and closed - we can use
// a callback to tell us that
@@ -552,7 +564,6 @@
{
if (flowRecord != null)
{
-
if (notifConsumer != null)
{
try
@@ -600,7 +611,7 @@
"%')");
session.createQueue(managementNotificationAddress, notifQueueName, filter,
false);
-
+
notifConsumer = session.createConsumer(notifQueueName);
notifConsumer.setMessageHandler(flowRecord);
@@ -645,6 +656,10 @@
csf.setReconnectAttempts(reconnectAttempts);
csf.setBlockOnPersistentSend(false);
+ // Must have confirmations enabled so we get send acks
+
+ csf.setConfirmationWindowSize(confirmationWindowSize);
+
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(clusterUser, clusterPassword,
false, true, true, true, 1);
@@ -732,41 +747,4 @@
}
}
}
-
- // private class FailRunnable implements Runnable
- // {
- // public void run()
- // {
- // synchronized (BridgeImpl.this)
- // {
- // if (!started)
- // {
- // return;
- // }
- //
- // active = false;
- // }
- //
- // try
- // {
- // queue.removeConsumer(BridgeImpl.this);
- //
- // session.cleanUp();
- //
- // cancelRefs();
- //
- // csf.close();
- // }
- // catch (Exception e)
- // {
- // log.error("Failed to stop", e);
- // }
- //
- // if (!createObjects())
- // {
- // started = false;
- // }
- // }
- // }
-
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -81,6 +81,8 @@
private final long retryInterval;
private final boolean useDuplicateDetection;
+
+ private final int confirmationWindowSize;
private final boolean routeWhenNoConsumers;
@@ -108,6 +110,7 @@
final long retryInterval,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
+ final int confirmationWindowSize,
final org.hornetq.utils.ExecutorFactory executorFactory,
final HornetQServer server,
final PostOffice postOffice,
@@ -127,6 +130,8 @@
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+ this.confirmationWindowSize = confirmationWindowSize;
this.executorFactory = executorFactory;
@@ -167,6 +172,7 @@
final long retryInterval,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
+ final int confirmationWindowSize,
final ExecutorFactory executorFactory,
final HornetQServer server,
final PostOffice postOffice,
@@ -198,6 +204,8 @@
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+ this.confirmationWindowSize = confirmationWindowSize;
this.maxHops = maxHops;
@@ -221,12 +229,14 @@
}
started = true;
-
+
if (managementService != null)
{
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(),
NotificationType.CLUSTER_CONNECTION_STARTED, props);
+ Notification notification = new Notification(nodeUUID.toString(),
+
NotificationType.CLUSTER_CONNECTION_STARTED,
+ props);
managementService.sendNotification(notification);
}
}
@@ -258,10 +268,12 @@
{
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(),
NotificationType.CLUSTER_CONNECTION_STOPPED, props);
+ Notification notification = new Notification(nodeUUID.toString(),
+
NotificationType.CLUSTER_CONNECTION_STOPPED,
+ props);
managementService.sendNotification(notification);
}
-
+
started = false;
}
@@ -274,7 +286,7 @@
{
return name;
}
-
+
public String getNodeID()
{
return nodeUUID.toString();
@@ -283,7 +295,7 @@
public synchronized Map<String, String> getNodes()
{
Map<String, String> nodes = new HashMap<String, String>();
- for (Entry<String, MessageFlowRecord> record : records.entrySet( ))
+ for (Entry<String, MessageFlowRecord> record : records.entrySet())
{
if (record.getValue().getBridge().getForwardingConnection() != null)
{
@@ -292,7 +304,7 @@
}
return nodes;
}
-
+
public synchronized void activate()
{
if (!started)
@@ -428,6 +440,7 @@
-1,
true,
useDuplicateDetection,
+ confirmationWindowSize,
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
managementService.getClusterUser(),
@@ -484,7 +497,7 @@
{
this.bridge = bridge;
}
-
+
public Bridge getBridge()
{
return bridge;
@@ -570,7 +583,7 @@
{
throw new IllegalStateException("proposal type is null");
}
-
+
SimpleString type =
message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
SimpleString val =
message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
@@ -579,7 +592,7 @@
Response response = server.getGroupingHandler().receive(new Proposal(type, val),
hops + 1);
- if(response != null)
+ if (response != null)
{
server.getGroupingHandler().send(response, 0);
}
@@ -634,7 +647,7 @@
{
throw new IllegalStateException("routingName is null");
}
-
+
if (!message.containsProperty(ManagementHelper.HDR_BINDING_ID))
{
throw new IllegalStateException("queueID is null");
@@ -697,7 +710,7 @@
{
throw new IllegalStateException("clusterName is null");
}
-
+
SimpleString clusterName =
message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
removeBinding(clusterName);
@@ -756,7 +769,7 @@
{
throw new IllegalStateException("distance is null");
}
-
+
if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME))
{
throw new IllegalStateException("clusterName is null");
@@ -782,7 +795,7 @@
// Need to propagate the consumer close
Notification notification = new Notification(null, CONSUMER_CLOSED,
message.getProperties());
- managementService.sendNotification(notification);
+ managementService.sendNotification(notification);
}
}
@@ -830,7 +843,7 @@
theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
}
- //for testing only
+ // for testing only
public Map<String, MessageFlowRecord> getRecords()
{
return records;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-11-10
17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -37,7 +37,6 @@
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.Channel;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -71,7 +70,7 @@
private final Map<String, ClusterConnection> clusters = new HashMap<String,
ClusterConnection>();
private final org.hornetq.utils.ExecutorFactory executorFactory;
-
+
private final HornetQServer server;
private final PostOffice postOffice;
@@ -83,9 +82,9 @@
private final Configuration configuration;
private final UUID nodeUUID;
-
+
private volatile boolean started;
-
+
private boolean backup;
public ClusterManagerImpl(final org.hornetq.utils.ExecutorFactory executorFactory,
@@ -94,16 +93,16 @@
final ScheduledExecutorService scheduledExecutor,
final ManagementService managementService,
final Configuration configuration,
- final UUID nodeUUID,
+ final UUID nodeUUID,
final boolean backup)
{
if (nodeUUID == null)
{
throw new IllegalArgumentException("Node uuid is null");
}
-
+
this.executorFactory = executorFactory;
-
+
this.server = server;
this.postOffice = postOffice;
@@ -115,7 +114,7 @@
this.configuration = configuration;
this.nodeUUID = nodeUUID;
-
+
this.backup = backup;
}
@@ -203,34 +202,34 @@
{
return new HashSet<ClusterConnection>(clusters.values());
}
-
+
public Set<BroadcastGroup> getBroadcastGroups()
{
return new HashSet<BroadcastGroup>(broadcastGroups.values());
}
-
+
public ClusterConnection getClusterConnection(final SimpleString name)
{
- return clusters.get(name.toString());
+ return clusters.get(name.toString());
}
-
+
public synchronized void activate()
- {
- for (BroadcastGroup bg: broadcastGroups.values())
+ {
+ for (BroadcastGroup bg : broadcastGroups.values())
{
bg.activate();
}
-
- for (Bridge bridge: bridges.values())
+
+ for (Bridge bridge : bridges.values())
{
bridge.activate();
}
-
- for (ClusterConnection cc: clusters.values())
+
+ for (ClusterConnection cc : clusters.values())
{
cc.activate();
}
-
+
backup = false;
}
@@ -385,14 +384,16 @@
if (config.getDiscoveryGroupName() != null)
{
- DiscoveryGroupConfiguration discoveryGroupConfiguration =
configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
+ DiscoveryGroupConfiguration discoveryGroupConfiguration =
configuration.getDiscoveryGroupConfigurations()
+
.get(config.getDiscoveryGroupName());
if (discoveryGroupConfiguration == null)
{
- log.warn("No discovery group configured with name '" +
config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+ log.warn("No discovery group configured with name '" +
config.getDiscoveryGroupName() +
+ "'. The bridge will not be deployed.");
return;
}
-
+
bridge = new BridgeImpl(nodeUUID,
new SimpleString(config.getName()),
queue,
@@ -409,11 +410,12 @@
config.getReconnectAttempts(),
config.isFailoverOnServerShutdown(),
config.isUseDuplicateDetection(),
+ config.getConfirmationWindowSize(),
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
managementService.getClusterUser(),
managementService.getClusterPassword(),
- null,
+ null,
!backup,
server.getStorageManager());
}
@@ -461,11 +463,12 @@
config.getReconnectAttempts(),
config.isFailoverOnServerShutdown(),
config.isUseDuplicateDetection(),
+ config.getConfirmationWindowSize(),
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
managementService.getClusterUser(),
managementService.getClusterPassword(),
- null,
+ null,
!backup,
server.getStorageManager());
}
@@ -534,17 +537,18 @@
clusterConnection = new ClusterConnectionImpl(new
SimpleString(config.getName()),
new
SimpleString(config.getAddress()),
- config.getRetryInterval(),
+ config.getRetryInterval(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
+
config.getConfirmationWindowSize(),
executorFactory,
- server,
+ server,
postOffice,
managementService,
- scheduledExecutor,
+ scheduledExecutor,
connectors,
config.getMaxHops(),
- nodeUUID,
+ nodeUUID,
backup);
}
else
@@ -559,17 +563,18 @@
clusterConnection = new ClusterConnectionImpl(new
SimpleString(config.getName()),
new
SimpleString(config.getAddress()),
- config.getRetryInterval(),
+ config.getRetryInterval(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
+
config.getConfirmationWindowSize(),
executorFactory,
- server,
+ server,
postOffice,
managementService,
- scheduledExecutor,
+ scheduledExecutor,
dg,
config.getMaxHops(),
- nodeUUID,
+ nodeUUID,
backup);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-10 17:25:45 UTC
(rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-10 20:03:11 UTC
(rev 8256)
@@ -36,8 +36,6 @@
import org.hornetq.core.list.impl.PriorityLinkedListImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -51,7 +49,6 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.impl.Redistributor;
import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -111,12 +108,8 @@
private final Runnable deliverRunner = new DeliverRunner();
- private final PagingManager pagingManager;
-
private final Semaphore lock = new Semaphore(1);
- private volatile PagingStore pagingStore;
-
private final StorageManager storageManager;
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
@@ -172,15 +165,6 @@
this.scheduledExecutor = scheduledExecutor;
- if (postOffice == null)
- {
- pagingManager = null;
- }
- else
- {
- pagingManager = postOffice.getPagingManager();
- }
-
direct = true;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
@@ -1091,12 +1075,14 @@
if (reference == null)
{
nullReferences.add(consumer);
+
if (nullReferences.size() + busyConsumers.size() == totalConsumers)
{
- startDepaging();
// We delivered all the messages - go into direct delivery
direct = true;
+
promptDelivery = false;
+
return;
}
@@ -1133,8 +1119,6 @@
}
}
- initPagingStore(reference.getMessage().getDestination());
-
final SimpleString groupID =
reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
if (groupID != null)
@@ -1407,19 +1391,7 @@
queue.deliveringCount.decrementAndGet();
- PagingStore store;
- if (pagingManager != null)
- {
- // TODO: We could optimize this by storing the paging-store for the address on
the Queue. We would need to know
- // the Address for the Queue
- store = pagingManager.getPageStore(ref.getMessage().getDestination());
- }
- else
- {
- store = null;
- }
-
- message.decrementRefCount(store, ref);
+ message.decrementRefCount(ref);
}
void postRollback(final LinkedList<MessageReference> refs) throws Exception
@@ -1437,42 +1409,6 @@
}
}
- private synchronized void initPagingStore(final SimpleString destination)
- {
- // PagingManager would be null only on testcases
- if (pagingStore == null && pagingManager != null)
- {
- // TODO: It would be better if we could initialize the pagingStore during the
construction
- try
- {
- pagingStore = pagingManager.getPageStore(destination);
- }
- catch (Exception e)
- {
- // This shouldn't happen, and if it happens, this shouldn't abort the
route
- }
- }
- }
-
- private synchronized void startDepaging()
- {
- if (pagingStore != null)
- {
- // If the queue is empty, we need to check if there are pending messages, and
throw a warning
- if (pagingStore.isPaging() && pagingStore.getAddressFullMessagePolicy()
== AddressFullMessagePolicy.PAGE)
- {
- // This is just a *request* to depage. Depage will only happens if there is
space on the Address
- // and GlobalSize
- pagingStore.startDepaging();
-
- log.warn("The Queue " + name +
- " is empty, however there are pending messages on Paging for
the address " +
- pagingStore.getStoreName() +
- " waiting message ACK before they could be routed");
- }
- }
- }
-
// Inner classes
// --------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-10 17:25:45
UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-10 20:03:11
UTC (rev 8256)
@@ -102,7 +102,7 @@
return ref;
}
- public int incrementRefCount(final PagingStore pagingStore, final MessageReference
reference) throws Exception
+ public int incrementRefCount(final MessageReference reference) throws Exception
{
int count = refCount.incrementAndGet();
@@ -119,7 +119,7 @@
return count;
}
- public int decrementRefCount(final PagingStore pagingStore, final MessageReference
reference) throws Exception
+ public int decrementRefCount(final MessageReference reference) throws Exception
{
int count = refCount.decrementAndGet();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-10 17:25:45
UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-10 20:03:11
UTC (rev 8256)
@@ -387,7 +387,7 @@
if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
{
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST,
"Binding " + name + " does not exist");
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue
" + name + " does not exist");
}
securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
Modified: trunk/src/main/org/hornetq/utils/XMLUtil.java
===================================================================
--- trunk/src/main/org/hornetq/utils/XMLUtil.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/utils/XMLUtil.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -493,6 +493,8 @@
}
catch (SAXException e)
{
+ log.error("Invalid configuration", e);
+
throw new IllegalStateException("Invalid configuration", e);
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -93,6 +93,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 1;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String,
String>(server1tc.getName(), server2tc.getName());
@@ -106,6 +107,7 @@
reconnectAttempts,
true,
false,
+
confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -219,6 +221,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 3;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String,
String>(server1tc.getName(), server2tc.getName());
@@ -232,6 +235,7 @@
reconnectAttempts,
true,
false,
+
confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -338,6 +342,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 3;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String,
String>(server1tc.getName(), null);
@@ -351,6 +356,7 @@
reconnectAttempts,
true,
false,
+
confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -452,6 +458,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = -1;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String,
String>(server1tc.getName(), null);
@@ -465,6 +472,7 @@
reconnectAttempts,
true,
false,
+
confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -513,7 +521,7 @@
prod0.send(message);
}
-
+
log.info("sent messages");
for (int i = 0; i < numMessages; i++)
@@ -523,7 +531,7 @@
assertEquals(i, r1.getObjectProperty(propKey));
log.info("got message " + r1.getObjectProperty(propKey));
}
-
+
log.info("got messages");
session0.close();
@@ -566,6 +574,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 3;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String,
String>(server1tc.getName(), null);
@@ -579,6 +588,7 @@
reconnectAttempts,
true,
false,
+
confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -86,6 +86,7 @@
0,
true,
true,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -229,6 +230,7 @@
-1,
true,
true,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -419,6 +421,7 @@
0,
false,
false,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -547,6 +550,7 @@
1,
false,
true,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -76,7 +76,6 @@
try
{
-
Map<String, Object> server0Params = new HashMap<String, Object>();
server0 = createClusteredServerWithParams(0, useFiles, server0Params);
@@ -101,6 +100,10 @@
Pair<String, String> connectorPair = new Pair<String,
String>(server1tc.getName(), null);
+ final int messageSize = 1024;
+
+ final int numMessages = 10;
+
BridgeConfiguration bridgeConfiguration = new
BridgeConfiguration("bridge1",
queueName0,
forwardAddress,
@@ -111,6 +114,9 @@
-1,
true,
false,
+ // Choose
confirmation size to make sure acks
+ // are sent
+ numMessages *
messageSize / 2,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -144,7 +150,7 @@
session1.start();
- final int numMessages = 10;
+ final byte[] bytes = new byte[messageSize];
final SimpleString propKey = new SimpleString("testkey");
@@ -159,6 +165,8 @@
message.putIntProperty(propKey, i);
+ message.getBody().writeBytes(bytes);
+
producer0.send(message);
}
@@ -286,6 +294,7 @@
-1,
true,
false,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -332,7 +341,7 @@
message.putIntProperty(propKey, i);
message.putStringProperty(selectorKey, new
SimpleString("monkey"));
-
+
if (largeMessage)
{
message.setBodyInputStream(createFakeLargeStream(1024 * 1024));
@@ -368,7 +377,7 @@
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
-
+
if (largeMessage)
{
readMessages(message);
@@ -407,7 +416,7 @@
}
}
-
+
public void testWithTransformer() throws Exception
{
internaltestWithTransformer(false);
@@ -453,6 +462,7 @@
-1,
true,
false,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -497,7 +507,7 @@
message.putStringProperty(propKey, new SimpleString("bing"));
message.getBody().writeString("doo be doo be doo be doo");
-
+
producer0.send(message);
}
@@ -516,8 +526,7 @@
assertEquals("dee be dee be dee be dee", sval);
message.acknowledge();
-
-
+
}
assertNull(consumer1.receiveImmediate());
@@ -534,10 +543,8 @@
server1.stop();
}
-
- //
https://jira.jboss.org/jira/browse/HORNETQ-182
- public void disabled_testBridgeWithPaging() throws Exception
+ public void testBridgeWithPaging() throws Exception
{
HornetQServer server0 = null;
HornetQServer server1 = null;
@@ -583,6 +590,7 @@
-1,
true,
false,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
@@ -623,7 +631,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session0.createClientMessage(false);
-
+
message.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
message.putIntProperty(propKey, i);
@@ -674,13 +682,12 @@
}
-
protected void setUp() throws Exception
{
super.setUp();
clearData();
}
-
+
protected void tearDown() throws Exception
{
clearData();
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -99,6 +99,7 @@
0,
true,
true,
+ 1024,
"dg1");
List<BridgeConfiguration> bridgeConfigs = new
ArrayList<BridgeConfiguration>();
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -1407,6 +1407,7 @@
true,
forwardWhenNoConsumers,
maxHops,
+
1024,
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1458,6 +1459,7 @@
true,
forwardWhenNoConsumers,
maxHops,
+
1024,
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
@@ -1526,6 +1528,7 @@
true,
forwardWhenNoConsumers,
maxHops,
+
1024,
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
@@ -1552,6 +1555,7 @@
true,
forwardWhenNoConsumers,
maxHops,
+
1024,
discoveryGroupName);
List<ClusterConnectionConfiguration> clusterConfs =
server.getConfiguration().getClusterConfigurations();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -162,6 +162,7 @@
randomPositiveInt(),
randomBoolean(),
randomBoolean(),
+ randomPositiveInt(),
connectorPair);
Configuration conf_1 = new ConfigurationImpl();
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -61,7 +61,7 @@
private BridgeConfiguration bridgeConfig;
private HornetQServer server_1;
-
+
private ClientSession session;
// Constructors --------------------------------------------------
@@ -79,10 +79,14 @@
assertEquals(bridgeConfig.getForwardingAddress(),
(String)proxy.retrieveAttributeValue("forwardingAddress"));
assertEquals(bridgeConfig.getFilterString(),
(String)proxy.retrieveAttributeValue("filterString"));
assertEquals(bridgeConfig.getRetryInterval(),
((Long)proxy.retrieveAttributeValue("retryInterval")).longValue());
- assertEquals(bridgeConfig.getRetryIntervalMultiplier(),
(Double)proxy.retrieveAttributeValue("retryIntervalMultiplier"));
- assertEquals(bridgeConfig.getReconnectAttempts(),
((Integer)proxy.retrieveAttributeValue("reconnectAttempts")).intValue());
- assertEquals(bridgeConfig.isFailoverOnServerShutdown(),
((Boolean)proxy.retrieveAttributeValue("failoverOnServerShutdown")).booleanValue());
- assertEquals(bridgeConfig.isUseDuplicateDetection(),
((Boolean)proxy.retrieveAttributeValue("useDuplicateDetection")).booleanValue());
+ assertEquals(bridgeConfig.getRetryIntervalMultiplier(),
+
(Double)proxy.retrieveAttributeValue("retryIntervalMultiplier"));
+ assertEquals(bridgeConfig.getReconnectAttempts(),
+
((Integer)proxy.retrieveAttributeValue("reconnectAttempts")).intValue());
+ assertEquals(bridgeConfig.isFailoverOnServerShutdown(),
+
((Boolean)proxy.retrieveAttributeValue("failoverOnServerShutdown")).booleanValue());
+ assertEquals(bridgeConfig.isUseDuplicateDetection(),
+
((Boolean)proxy.retrieveAttributeValue("useDuplicateDetection")).booleanValue());
Object[] data = (Object[])proxy.retrieveAttributeValue("connectorPair");
assertEquals(bridgeConfig.getConnectorPair().a, data[0]);
@@ -99,10 +103,10 @@
// started by the server
assertTrue((Boolean)proxy.retrieveAttributeValue("Started"));
- proxy.invokeOperation("stop");
+ proxy.invokeOperation("stop");
assertFalse((Boolean)proxy.retrieveAttributeValue("Started"));
- proxy.invokeOperation("start");
+ proxy.invokeOperation("start");
assertTrue((Boolean)proxy.retrieveAttributeValue("Started"));
}
@@ -138,6 +142,7 @@
randomPositiveInt(),
randomBoolean(),
randomBoolean(),
+ randomPositiveInt(),
connectorPair);
Configuration conf_1 = new ConfigurationImpl();
@@ -175,20 +180,18 @@
server_1.stop();
session = null;
-
+
server_0 = null;
-
+
server_1 = null;
-
-
+
super.tearDown();
}
-
+
protected CoreMessagingProxy createProxy(final String name) throws Exception
{
- CoreMessagingProxy proxy = new CoreMessagingProxy(session,
- ResourceNames.CORE_BRIDGE +
name);
-
+ CoreMessagingProxy proxy = new CoreMessagingProxy(session,
ResourceNames.CORE_BRIDGE + name);
+
return proxy;
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -136,6 +136,7 @@
false,
false,
1,
+ 1024,
discoveryName);
List<Pair<String, String>> connectorInfos = new
ArrayList<Pair<String, String>>();
connectorInfos.add(new Pair<String, String>("netty", null));
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -204,6 +204,7 @@
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
+ randomPositiveInt(),
pairs);
clusterConnectionConfig2 = new ClusterConnectionConfiguration(randomString(),
@@ -212,6 +213,7 @@
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
+ randomPositiveInt(),
randomString());
Configuration conf_1 = new ConfigurationImpl();
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-10
17:25:45 UTC (rev 8255)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-10
20:03:11 UTC (rev 8256)
@@ -1125,6 +1125,18 @@
return null;
}
+ public int decrementRefCount(MessageReference reference) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public int incrementRefCount(MessageReference reference) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
class FakeFilter implements Filter