JBoss hornetq SVN: r8017 - in trunk: src/main/org/hornetq/core/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-30 18:49:33 -0400 (Wed, 30 Sep 2009)
New Revision: 8017
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
fixed build
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-30 18:00:33 UTC (rev 8016)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
@@ -535,7 +535,7 @@
public void route(final ServerMessage message, Transaction tx) throws Exception
{
SimpleString address = message.getDestination();
-
+
byte[] duplicateIDBytes = null;
Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
@@ -622,7 +622,7 @@
{
routed = false;
}
-
+
if (!routed)
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-30 18:00:33 UTC (rev 8016)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
@@ -13,10 +13,6 @@
package org.hornetq.core.server.impl;
-import static org.hornetq.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
-import static org.hornetq.core.message.impl.MessageImpl.HDR_ORIGINAL_DESTINATION;
-import static org.hornetq.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -1780,8 +1776,6 @@
public synchronized void pause()
{
paused = true;
-
- log.info("Paused is now " + paused);
}
public synchronized void resume()
@@ -1793,7 +1787,6 @@
public synchronized boolean isPaused()
{
- log.info("return ispaused " + paused);
return paused;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-09-30 18:00:33 UTC (rev 8016)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
@@ -210,11 +210,11 @@
putLongProperty(HDR_ORIG_MESSAGE_ID, other.getMessageID());
}
+ // reset expiry
+ setExpiration(0);
+
if (expiry)
- {
- // reset expiry
- setExpiration(0);
-
+ {
long actualExpiryTime = System.currentTimeMillis();
putLongProperty(HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-30 18:00:33 UTC (rev 8016)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-30 22:49:33 UTC (rev 8017)
@@ -549,8 +549,7 @@
consumerExpiry.close();
for (int i = 0; i < 10; i++)
- {
-
+ {
consumerExpiry = session.createConsumer(ADDRESS_DLA);
msg1 = consumerExpiry.receive(5000);
15 years, 3 months
JBoss hornetq SVN: r8016 - trunk/src/main/org/hornetq/core/config/cluster.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-30 14:00:33 -0400 (Wed, 30 Sep 2009)
New Revision: 8016
Modified:
trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
trunk/src/main/org/hornetq/core/config/cluster/BroadcastGroupConfiguration.java
trunk/src/main/org/hornetq/core/config/cluster/DiscoveryGroupConfiguration.java
trunk/src/main/org/hornetq/core/config/cluster/DivertConfiguration.java
trunk/src/main/org/hornetq/core/config/cluster/QueueConfiguration.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-158
Modified: trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-09-30 17:44:47 UTC (rev 8015)
+++ trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-09-30 18:00:33 UTC (rev 8016)
@@ -30,29 +30,29 @@
{
private static final long serialVersionUID = -1057244274380572226L;
- private final String name;
+ private String name;
- private final String queueName;
+ private String queueName;
- private final String forwardingAddress;
+ private String forwardingAddress;
- private final String filterString;
+ private String filterString;
- private final Pair<String, String> connectorPair;
+ private Pair<String, String> connectorPair;
- private final String discoveryGroupName;
+ private String discoveryGroupName;
- private final String transformerClassName;
+ private String transformerClassName;
- private final long retryInterval;
+ private long retryInterval;
- private final double retryIntervalMultiplier;
+ private double retryIntervalMultiplier;
- private final int reconnectAttempts;
+ private int reconnectAttempts;
- private final boolean failoverOnServerShutdown;
+ private boolean failoverOnServerShutdown;
- private final boolean useDuplicateDetection;
+ private boolean useDuplicateDetection;
public BridgeConfiguration(final String name,
final String queueName,
@@ -165,4 +165,100 @@
{
return useDuplicateDetection;
}
+
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * @param queueName the queueName to set
+ */
+ public void setQueueName(String queueName)
+ {
+ this.queueName = queueName;
+ }
+
+ /**
+ * @param forwardingAddress the forwardingAddress to set
+ */
+ public void setForwardingAddress(String forwardingAddress)
+ {
+ this.forwardingAddress = forwardingAddress;
+ }
+
+ /**
+ * @param filterString the filterString to set
+ */
+ public void setFilterString(String filterString)
+ {
+ this.filterString = filterString;
+ }
+
+ /**
+ * @param connectorPair the connectorPair to set
+ */
+ public void setConnectorPair(Pair<String, String> connectorPair)
+ {
+ this.connectorPair = connectorPair;
+ }
+
+ /**
+ * @param discoveryGroupName the discoveryGroupName to set
+ */
+ public void setDiscoveryGroupName(String discoveryGroupName)
+ {
+ this.discoveryGroupName = discoveryGroupName;
+ }
+
+ /**
+ * @param transformerClassName the transformerClassName to set
+ */
+ public void setTransformerClassName(String transformerClassName)
+ {
+ this.transformerClassName = transformerClassName;
+ }
+
+ /**
+ * @param retryInterval the retryInterval to set
+ */
+ public void setRetryInterval(long retryInterval)
+ {
+ this.retryInterval = retryInterval;
+ }
+
+ /**
+ * @param retryIntervalMultiplier the retryIntervalMultiplier to set
+ */
+ public void setRetryIntervalMultiplier(double retryIntervalMultiplier)
+ {
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ }
+
+ /**
+ * @param reconnectAttempts the reconnectAttempts to set
+ */
+ public void setReconnectAttempts(int reconnectAttempts)
+ {
+ this.reconnectAttempts = reconnectAttempts;
+ }
+
+ /**
+ * @param failoverOnServerShutdown the failoverOnServerShutdown to set
+ */
+ public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown)
+ {
+ this.failoverOnServerShutdown = failoverOnServerShutdown;
+ }
+
+ /**
+ * @param useDuplicateDetection the useDuplicateDetection to set
+ */
+ public void setUseDuplicateDetection(boolean useDuplicateDetection)
+ {
+ this.useDuplicateDetection = useDuplicateDetection;
+ }
}
Modified: trunk/src/main/org/hornetq/core/config/cluster/BroadcastGroupConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/BroadcastGroupConfiguration.java 2009-09-30 17:44:47 UTC (rev 8015)
+++ trunk/src/main/org/hornetq/core/config/cluster/BroadcastGroupConfiguration.java 2009-09-30 18:00:33 UTC (rev 8016)
@@ -27,7 +27,6 @@
*
* Created 18 Nov 2008 08:44:30
*
- *
*/
public class BroadcastGroupConfiguration implements Serializable
{
@@ -35,19 +34,19 @@
private static final Logger log = Logger.getLogger(BroadcastGroupConfiguration.class);
- private final String name;
+ private String name;
- private final String localBindAddress;
+ private String localBindAddress;
- private final int localBindPort;
+ private int localBindPort;
- private final String groupAddress;
+ private String groupAddress;
- private final int groupPort;
+ private int groupPort;
- private final long broadcastPeriod;
+ private long broadcastPeriod;
- private final List<Pair<String, String>> connectorInfos;
+ private List<Pair<String, String>> connectorInfos;
public BroadcastGroupConfiguration(final String name,
final String localBindAddress,
@@ -102,4 +101,60 @@
return connectorInfos;
}
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * @param localBindAddress the localBindAddress to set
+ */
+ public void setLocalBindAddress(String localBindAddress)
+ {
+ this.localBindAddress = localBindAddress;
+ }
+
+ /**
+ * @param localBindPort the localBindPort to set
+ */
+ public void setLocalBindPort(int localBindPort)
+ {
+ this.localBindPort = localBindPort;
+ }
+
+ /**
+ * @param groupAddress the groupAddress to set
+ */
+ public void setGroupAddress(String groupAddress)
+ {
+ this.groupAddress = groupAddress;
+ }
+
+ /**
+ * @param groupPort the groupPort to set
+ */
+ public void setGroupPort(int groupPort)
+ {
+ this.groupPort = groupPort;
+ }
+
+ /**
+ * @param broadcastPeriod the broadcastPeriod to set
+ */
+ public void setBroadcastPeriod(long broadcastPeriod)
+ {
+ this.broadcastPeriod = broadcastPeriod;
+ }
+
+ /**
+ * @param connectorInfos the connectorInfos to set
+ */
+ public void setConnectorInfos(List<Pair<String, String>> connectorInfos)
+ {
+ this.connectorInfos = connectorInfos;
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/config/cluster/DiscoveryGroupConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/DiscoveryGroupConfiguration.java 2009-09-30 17:44:47 UTC (rev 8015)
+++ trunk/src/main/org/hornetq/core/config/cluster/DiscoveryGroupConfiguration.java 2009-09-30 18:00:33 UTC (rev 8016)
@@ -29,20 +29,19 @@
{
private static final long serialVersionUID = 8657206421727863400L;
- private final String name;
+ private String name;
- private final String groupAddress;
+ private String groupAddress;
- private final int groupPort;
+ private int groupPort;
- private final long refreshTimeout;
+ private long refreshTimeout;
public DiscoveryGroupConfiguration(final String name,
final String groupAddress,
final int groupPort,
final long refreshTimeout)
{
- super();
this.name = name;
this.groupAddress = groupAddress;
this.groupPort = groupPort;
@@ -68,4 +67,36 @@
{
return refreshTimeout;
}
+
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * @param groupAddress the groupAddress to set
+ */
+ public void setGroupAddress(String groupAddress)
+ {
+ this.groupAddress = groupAddress;
+ }
+
+ /**
+ * @param groupPort the groupPort to set
+ */
+ public void setGroupPort(int groupPort)
+ {
+ this.groupPort = groupPort;
+ }
+
+ /**
+ * @param refreshTimeout the refreshTimeout to set
+ */
+ public void setRefreshTimeout(long refreshTimeout)
+ {
+ this.refreshTimeout = refreshTimeout;
+ }
}
Modified: trunk/src/main/org/hornetq/core/config/cluster/DivertConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/DivertConfiguration.java 2009-09-30 17:44:47 UTC (rev 8015)
+++ trunk/src/main/org/hornetq/core/config/cluster/DivertConfiguration.java 2009-09-30 18:00:33 UTC (rev 8016)
@@ -33,21 +33,20 @@
private static final Logger log = Logger.getLogger(DivertConfiguration.class);
+ private String name;
- private final String name;
+ private String routingName;
- private final String routingName;
+ private String address;
- private final String address;
+ private String forwardingAddress;
- private final String forwardingAddress;
+ private boolean exclusive;
- private final boolean exclusive;
+ private String filterString;
- private final String filterString;
+ private String transformerClassName;
- private final String transformerClassName;
-
public DivertConfiguration(final String name,
final String routingName,
final String address,
@@ -106,4 +105,60 @@
{
return transformerClassName;
}
+
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * @param routingName the routingName to set
+ */
+ public void setRoutingName(String routingName)
+ {
+ this.routingName = routingName;
+ }
+
+ /**
+ * @param address the address to set
+ */
+ public void setAddress(String address)
+ {
+ this.address = address;
+ }
+
+ /**
+ * @param forwardingAddress the forwardingAddress to set
+ */
+ public void setForwardingAddress(String forwardingAddress)
+ {
+ this.forwardingAddress = forwardingAddress;
+ }
+
+ /**
+ * @param exclusive the exclusive to set
+ */
+ public void setExclusive(boolean exclusive)
+ {
+ this.exclusive = exclusive;
+ }
+
+ /**
+ * @param filterString the filterString to set
+ */
+ public void setFilterString(String filterString)
+ {
+ this.filterString = filterString;
+ }
+
+ /**
+ * @param transformerClassName the transformerClassName to set
+ */
+ public void setTransformerClassName(String transformerClassName)
+ {
+ this.transformerClassName = transformerClassName;
+ }
}
Modified: trunk/src/main/org/hornetq/core/config/cluster/QueueConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/QueueConfiguration.java 2009-09-30 17:44:47 UTC (rev 8015)
+++ trunk/src/main/org/hornetq/core/config/cluster/QueueConfiguration.java 2009-09-30 18:00:33 UTC (rev 8016)
@@ -29,13 +29,13 @@
{
private static final long serialVersionUID = 650404974977490254L;
- private final String address;
+ private String address;
- private final String name;
+ private String name;
- private final String filterString;
+ private String filterString;
- private final boolean durable;
+ private boolean durable;
public QueueConfiguration(final String address, final String name, final String filterString, final boolean durable)
{
@@ -64,4 +64,36 @@
{
return durable;
}
+
+ /**
+ * @param address the address to set
+ */
+ public void setAddress(String address)
+ {
+ this.address = address;
+ }
+
+ /**
+ * @param name the name to set
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * @param filterString the filterString to set
+ */
+ public void setFilterString(String filterString)
+ {
+ this.filterString = filterString;
+ }
+
+ /**
+ * @param durable the durable to set
+ */
+ public void setDurable(boolean durable)
+ {
+ this.durable = durable;
+ }
}
15 years, 3 months
JBoss hornetq SVN: r8015 - in trunk: src/main/org/hornetq/core/remoting/impl and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-30 13:44:47 -0400 (Wed, 30 Sep 2009)
New Revision: 8015
Added:
trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-144
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-09-30 16:24:22 UTC (rev 8014)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-09-30 17:44:47 UTC (rev 8015)
@@ -275,7 +275,7 @@
{
printedDropMessagesWarning = true;
- log.warn("Messages are being dropped on adress " + getStoreName());
+ log.warn("Messages are being dropped on address " + getStoreName());
}
// Address is full, we just pretend we are paging, and drop the data
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-09-30 16:24:22 UTC (rev 8014)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-09-30 17:44:47 UTC (rev 8015)
@@ -198,7 +198,7 @@
// Must block on semaphore outside the main lock or this can prevent failover from occurring, also after the
// packet is sent to assure we get some credits back
if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
- {
+ {
try
{
sendSemaphore.acquire(size);
@@ -343,6 +343,12 @@
{
return;
}
+
+ if (sendSemaphore != null)
+ {
+ //Any threads blocking on the send semaphore should be allowed to return
+ sendSemaphore.release(Integer.MAX_VALUE);
+ }
if (!connection.isDestroyed() && !connection.removeChannel(id))
{
Added: trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/HornetQCrashTest.java 2009-09-30 17:44:47 UTC (rev 8015)
@@ -0,0 +1,120 @@
+package org.hornetq.tests.integration.client;
+
+import junit.framework.TestCase;
+
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SendAcknowledgementHandler;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.Message;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ *
+ * From https://jira.jboss.org/jira/browse/HORNETQ-144
+ *
+ */
+public class HornetQCrashTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(HornetQCrashTest.class);
+
+ public HornetQServer server;
+
+ private volatile boolean ackReceived;
+
+ public void testHang() throws Exception
+ {
+ Configuration configuration = new ConfigurationImpl();
+ configuration.setPersistenceEnabled(false);
+ configuration.setSecurityEnabled(false);
+ configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+
+ server = HornetQ.newHornetQServer(configuration);
+
+ server.start();
+
+ server.getRemotingService().addInterceptor(new AckInterceptor(server));
+
+ ClientSessionFactory clientSessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+
+ // Force an ack at once - this means the send call will block
+ clientSessionFactory.setProducerWindowSize(1);
+
+ ClientSession session = clientSessionFactory.createSession();
+
+ session.setSendAcknowledgementHandler(new SendAcknowledgementHandler()
+ {
+ public void sendAcknowledged(Message message)
+ {
+ ackReceived = true;
+ }
+ });
+
+ ClientProducer producer = session.createProducer("fooQueue");
+
+ ClientMessage msg = session.createClientMessage(false);
+
+ msg.putStringProperty("someKey", "someValue");
+
+ producer.send(msg);
+
+ Thread.sleep(250);
+
+ assertFalse(ackReceived);
+ }
+
+ public static class AckInterceptor implements Interceptor
+ {
+ private HornetQServer server;
+
+ AckInterceptor(HornetQServer server)
+ {
+ this.server = server;
+ }
+
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ log.info("AckInterceptor.intercept " + packet);
+
+ if (packet.getType() == PacketImpl.SESS_SEND)
+ {
+ try
+ {
+ log.info("Stopping server");
+
+ // Stop the server when a message arrives, to simulate a crash
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ server = null;
+ }
+}
15 years, 3 months
JBoss hornetq SVN: r8014 - in trunk: tests/src/org/hornetq/tests/integration/management and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-30 12:24:22 -0400 (Wed, 30 Sep 2009)
New Revision: 8014
Modified:
trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-160
Modified: trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2009-09-30 12:23:44 UTC (rev 8013)
+++ trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2009-09-30 16:24:22 UTC (rev 8014)
@@ -13,6 +13,8 @@
package org.hornetq.core.management.impl;
+import java.util.List;
+
import javax.management.StandardMBean;
import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
@@ -79,7 +81,14 @@
public Object[] getStaticConnectorNamePairs()
{
- Object[] ret = new Object[configuration.getStaticConnectorNamePairs().size()];
+ List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
+
+ if (pairs == null)
+ {
+ return null;
+ }
+
+ Object[] ret = new Object[pairs.size()];
int i = 0;
for (Pair<String, String> pair : configuration.getStaticConnectorNamePairs())
@@ -97,9 +106,16 @@
public String getStaticConnectorNamePairsAsJSON() throws Exception
{
+ List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
+
+ if (pairs == null)
+ {
+ return null;
+ }
+
JSONArray array = new JSONArray();
- for (Pair<String, String> pair : configuration.getStaticConnectorNamePairs())
+ for (Pair<String, String> pair : pairs)
{
JSONObject p = new JSONObject();
p.put("a", pair.a);
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-09-30 12:23:44 UTC (rev 8013)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-09-30 16:24:22 UTC (rev 8014)
@@ -58,7 +58,9 @@
private HornetQServer server_0;
- private ClusterConnectionConfiguration clusterConnectionConfig;
+ private ClusterConnectionConfiguration clusterConnectionConfig1;
+
+ private ClusterConnectionConfiguration clusterConnectionConfig2;
private HornetQServer server_1;
@@ -66,42 +68,68 @@
// Public --------------------------------------------------------
- public void testAttributes() throws Exception
+ public void testAttributes1() throws Exception
{
- checkResource(ObjectNames.getClusterConnectionObjectName(clusterConnectionConfig.getName()));
+ checkResource(ObjectNames.getClusterConnectionObjectName(clusterConnectionConfig1.getName()));
- ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig.getName());
+ ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig1.getName());
- assertEquals(clusterConnectionConfig.getName(), clusterConnectionControl.getName());
- assertEquals(clusterConnectionConfig.getAddress(), clusterConnectionControl.getAddress());
- assertEquals(clusterConnectionConfig.getDiscoveryGroupName(), clusterConnectionControl.getDiscoveryGroupName());
- assertEquals(clusterConnectionConfig.getRetryInterval(), clusterConnectionControl.getRetryInterval());
- assertEquals(clusterConnectionConfig.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
- assertEquals(clusterConnectionConfig.isForwardWhenNoConsumers(),
+ assertEquals(clusterConnectionConfig1.getName(), clusterConnectionControl.getName());
+ assertEquals(clusterConnectionConfig1.getAddress(), clusterConnectionControl.getAddress());
+ assertEquals(clusterConnectionConfig1.getDiscoveryGroupName(), clusterConnectionControl.getDiscoveryGroupName());
+ assertEquals(clusterConnectionConfig1.getRetryInterval(), clusterConnectionControl.getRetryInterval());
+ assertEquals(clusterConnectionConfig1.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
+ assertEquals(clusterConnectionConfig1.isForwardWhenNoConsumers(),
clusterConnectionControl.isForwardWhenNoConsumers());
- assertEquals(clusterConnectionConfig.getMaxHops(), clusterConnectionControl.getMaxHops());
+ assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops());
Object[] connectorPairs = clusterConnectionControl.getStaticConnectorNamePairs();
assertEquals(1, connectorPairs.length);
Object[] connectorPairData = (Object[])connectorPairs[0];
- assertEquals(clusterConnectionConfig.getStaticConnectorNamePairs().get(0).a, connectorPairData[0]);
- assertEquals(clusterConnectionConfig.getStaticConnectorNamePairs().get(0).b, connectorPairData[1]);
+ assertEquals(clusterConnectionConfig1.getStaticConnectorNamePairs().get(0).a, connectorPairData[0]);
+ assertEquals(clusterConnectionConfig1.getStaticConnectorNamePairs().get(0).b, connectorPairData[1]);
String jsonString = clusterConnectionControl.getStaticConnectorNamePairsAsJSON();
assertNotNull(jsonString);
JSONArray array = new JSONArray(jsonString);
assertEquals(1, array.length());
JSONObject data = array.getJSONObject(0);
- assertEquals(clusterConnectionConfig.getStaticConnectorNamePairs().get(0).a, data.optString("a"));
- assertEquals(clusterConnectionConfig.getStaticConnectorNamePairs().get(0).b, data.optString("b", null));
+ assertEquals(clusterConnectionConfig1.getStaticConnectorNamePairs().get(0).a, data.optString("a"));
+ assertEquals(clusterConnectionConfig1.getStaticConnectorNamePairs().get(0).b, data.optString("b", null));
+ assertNull(clusterConnectionControl.getDiscoveryGroupName());
+
assertTrue(clusterConnectionControl.isStarted());
}
+
+ public void testAttributes2() throws Exception
+ {
+ checkResource(ObjectNames.getClusterConnectionObjectName(clusterConnectionConfig2.getName()));
+ ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig2.getName());
+
+ assertEquals(clusterConnectionConfig2.getName(), clusterConnectionControl.getName());
+ assertEquals(clusterConnectionConfig2.getAddress(), clusterConnectionControl.getAddress());
+ assertEquals(clusterConnectionConfig2.getDiscoveryGroupName(), clusterConnectionControl.getDiscoveryGroupName());
+ assertEquals(clusterConnectionConfig2.getRetryInterval(), clusterConnectionControl.getRetryInterval());
+ assertEquals(clusterConnectionConfig2.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
+ assertEquals(clusterConnectionConfig2.isForwardWhenNoConsumers(),
+ clusterConnectionControl.isForwardWhenNoConsumers());
+ assertEquals(clusterConnectionConfig2.getMaxHops(), clusterConnectionControl.getMaxHops());
+
+ Object[] connectorPairs = clusterConnectionControl.getStaticConnectorNamePairs();
+ assertNull(connectorPairs);
+
+ String jsonPairs = clusterConnectionControl.getStaticConnectorNamePairsAsJSON();
+ assertNull(jsonPairs);
+
+ assertEquals(clusterConnectionConfig2.getDiscoveryGroupName(), clusterConnectionControl.getDiscoveryGroupName());
+ }
+
public void testStartStop() throws Exception
{
- checkResource(ObjectNames.getClusterConnectionObjectName(clusterConnectionConfig.getName()));
- ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig.getName());
+ checkResource(ObjectNames.getClusterConnectionObjectName(clusterConnectionConfig1.getName()));
+ ClusterConnectionControl clusterConnectionControl = createManagementControl(clusterConnectionConfig1.getName());
// started by the server
assertTrue(clusterConnectionControl.isStarted());
@@ -138,13 +166,21 @@
List<Pair<String, String>> pairs = new ArrayList<Pair<String, String>>();
pairs.add(connectorPair);
- clusterConnectionConfig = new ClusterConnectionConfiguration(randomString(),
+ clusterConnectionConfig1 = new ClusterConnectionConfiguration(randomString(),
queueConfig.getAddress(),
randomPositiveLong(),
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
pairs);
+
+ clusterConnectionConfig2 = new ClusterConnectionConfiguration(randomString(),
+ queueConfig.getAddress(),
+ randomPositiveLong(),
+ randomBoolean(),
+ randomBoolean(),
+ randomPositiveInt(),
+ randomString());
Configuration conf_1 = new ConfigurationImpl();
conf_1.setSecurityEnabled(false);
@@ -159,7 +195,8 @@
conf_0.setClustered(true);
conf_0.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
conf_0.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig);
- conf_0.getClusterConfigurations().add(clusterConnectionConfig);
+ conf_0.getClusterConfigurations().add(clusterConnectionConfig1);
+ conf_0.getClusterConfigurations().add(clusterConnectionConfig2);
server_1 = HornetQ.newHornetQServer(conf_1, MBeanServerFactory.createMBeanServer(), false);
server_1.start();
15 years, 3 months
JBoss hornetq SVN: r8013 - in trunk/tests/src/org/hornetq/tests/integration/cluster: restart and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-09-30 08:23:44 -0400 (Wed, 30 Sep 2009)
New Revision: 8013
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/restart/
trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
Log:
new test
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2009-09-30 12:23:44 UTC (rev 8013)
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.cluster.restart;
+
+import org.hornetq.utils.SimpleString;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.message.impl.MessageImpl;
+
+import java.util.Collection;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Sep 29, 2009
+ */
+public class ClusterRestartTest extends ClusterTestBase
+{
+ public void testRestartWithDurableQueues() throws Exception
+ {
+ /*setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ printBindings();
+
+ sendInRange(1, "queues.testaddress", 0, 10, false, null);
+
+
+ sendInRange(2, "queues.testaddress", 10, 20, false, null);
+
+
+ sendInRange(0, "queues.testaddress", 20, 30, false, null);
+
+ System.out.println("stopping******************************************************");
+ stopServers(1);
+ System.out.println("stopped******************************************************");
+ startServers(1);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ addConsumer(4, 1, "queue0", null);
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+ printBindings();
+ sendInRange(2, "queues.testaddress", 30, 40, false, null);
+
+ sendInRange(0, "queues.testaddress", 40, 50, false, null);
+
+ verifyReceiveAllInRange(0, 50, 1);
+ System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }*/
+ }
+
+ private void printBindings()
+ throws Exception
+ {
+ Collection<Binding> bindings0 = getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
+ Collection<Binding> bindings1 = getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
+ Collection<Binding> bindings2 = getServer(2).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
+ for (Binding binding : bindings0)
+ {
+ System.out.println(binding + " on node 0 at " + binding.getID());
+ }
+
+ for (Binding binding : bindings1)
+ {
+ System.out.println(binding + " on node 1 at " + binding.getID());
+ }
+
+ for (Binding binding : bindings2)
+ {
+ System.out.println(binding + " on node 2 at " + binding.getID());
+ }
+ }
+
+ public boolean isNetty()
+ {
+ return true;
+ }
+
+ public boolean isFileStorage()
+ {
+ return true;
+ }
+}
15 years, 3 months
JBoss hornetq SVN: r8012 - in trunk: src/main/org/hornetq/core/postoffice/impl and 5 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-30 06:23:39 -0400 (Wed, 30 Sep 2009)
New Revision: 8012
Added:
trunk/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Removed:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
Modified:
trunk/src/main/org/hornetq/core/postoffice/Bindings.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-28
Modified: trunk/src/main/org/hornetq/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Bindings.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/src/main/org/hornetq/core/postoffice/Bindings.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -32,7 +32,7 @@
{
Collection<Binding> getBindings();
- void route(ServerMessage message, Transaction tx) throws Exception;
+ boolean route(ServerMessage message, Transaction tx) throws Exception;
void addBinding(Binding binding);
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -56,7 +56,7 @@
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
private volatile boolean routeWhenNoConsumers;
-
+
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
{
this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -123,7 +123,7 @@
bindingsMap.remove(binding.getID());
}
- private void routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
+ private boolean routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
{
byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
@@ -139,10 +139,7 @@
if (binding == null)
{
- // The binding has been closed - we need to route the message somewhere else...............
- throw new IllegalStateException("Binding not found when routing from cluster - it must have closed " + bindingID);
-
- // FIXME need to deal with this better
+ return false;
}
binding.willRoute(message);
@@ -159,6 +156,8 @@
{
bindable.route(message, tx);
}
+
+ return true;
}
public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
@@ -251,7 +250,7 @@
}
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public boolean route(final ServerMessage message, final Transaction tx) throws Exception
{
boolean routed = false;
@@ -272,7 +271,7 @@
{
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
- routeFromCluster(message, tx);
+ routed = routeFromCluster(message, tx);
}
else
{
@@ -405,9 +404,13 @@
for (Bindable bindable : chosen)
{
bindable.route(message, tx);
+
+ routed = true;
}
}
}
+
+ return routed;
}
private final int incrementPos(int pos, int length)
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -612,10 +612,44 @@
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+ boolean routed;
+
if (bindings != null)
{
- bindings.route(message, tx);
+ routed = bindings.route(message, tx);
}
+ else
+ {
+ routed = false;
+ }
+
+ if (!routed)
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
+
+ boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
+
+ if (sendToDLA)
+ {
+ //Send to the DLA for the address
+
+ SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+
+ if (dlaAddress == null)
+ {
+ log.warn("Did not route to any bindings for address " + address + " and sendToDLAOnNoRoute is true " +
+ "but there is no DLA configured for the address, the message will be ignored.");
+ }
+ else
+ {
+ message.setOriginalHeaders(message, false);
+
+ message.setDestination(dlaAddress);
+
+ route(message, tx);
+ }
+ }
+ }
if (startedTx)
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -147,6 +147,8 @@
Bindings bindings = mappings.get(address);
Bindings prevBindings = null;
+
+
if (bindings == null)
{
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -50,6 +50,8 @@
int getRefCount();
+ ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;
- //TODO - we might be able to put this in a better place
+ void setOriginalHeaders(ServerMessage other, boolean expiry);
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -1097,31 +1097,10 @@
and original message id
*/
- long newMessageId = storageManager.generateUniqueID();
+ long newID = storageManager.generateUniqueID();
- ServerMessage copy = message.copy(newMessageId);
-
- if (ref.getMessage().getProperty(HDR_ORIG_MESSAGE_ID) != null)
- {
- copy.putStringProperty(HDR_ORIGINAL_DESTINATION, (SimpleString)ref.getMessage()
- .getProperty(HDR_ORIGINAL_DESTINATION));
- copy.putLongProperty(HDR_ORIG_MESSAGE_ID, (Long)ref.getMessage().getProperty(HDR_ORIG_MESSAGE_ID));
- }
- else
- {
- SimpleString originalQueue = copy.getDestination();
- copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalQueue);
- copy.putLongProperty(HDR_ORIG_MESSAGE_ID, message.getMessageID());
- }
- // reset expiry
- copy.setExpiration(0);
- if (expiry)
- {
- long actualExpiryTime = System.currentTimeMillis();
-
- copy.putLongProperty(HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
- }
-
+ ServerMessage copy = message.makeCopyForExpiryOrDLA(newID, expiry);
+
return copy;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -21,6 +21,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.SimpleString;
/**
*
@@ -34,17 +35,17 @@
public class ServerMessageImpl extends MessageImpl implements ServerMessage
{
private static final Logger log = Logger.getLogger(ServerMessageImpl.class);
-
+
private final AtomicInteger durableRefCount = new AtomicInteger(0);
/** Global reference counts for paging control */
private final AtomicInteger refCount = new AtomicInteger(0);
private volatile boolean stored;
-
- //We cache this
+
+ // We cache this
private volatile int memoryEstimate = -1;
-
+
/*
* Constructor for when reading from network
*/
@@ -87,34 +88,34 @@
{
messageID = id;
}
-
+
public void setType(byte type)
{
this.type = type;
}
-
+
public MessageReference createReference(final Queue queue)
{
MessageReference ref = new MessageReferenceImpl(this, queue);
return ref;
}
-
+
public boolean isStored()
{
return stored;
}
-
+
public void setStored() throws Exception
{
stored = true;
}
-
+
public int incrementRefCount()
{
return refCount.incrementAndGet();
}
-
+
public int incrementDurableRefCount()
{
return durableRefCount.incrementAndGet();
@@ -129,7 +130,7 @@
{
return refCount.decrementAndGet();
}
-
+
public int getRefCount()
{
return refCount.get();
@@ -139,7 +140,7 @@
{
return false;
}
-
+
public long getLargeBodySize()
{
return (long)getBodySize();
@@ -154,26 +155,72 @@
// different from reality
memoryEstimate = getEncodeSize() + (16 + 4) * 2 + 1;
}
-
+
return memoryEstimate;
}
public ServerMessage copy(final long newID) throws Exception
{
ServerMessage m = new ServerMessageImpl(this);
-
+
m.setMessageID(newID);
-
+
return m;
}
-
+
public ServerMessage copy() throws Exception
{
ServerMessage m = new ServerMessageImpl(this);
-
+
return m;
}
+ public ServerMessage makeCopyForExpiryOrDLA(final long newID, final boolean expiry) throws Exception
+ {
+ /*
+ We copy the message and send that to the dla/expiry queue - this is
+ because otherwise we may end up with a ref with the same message id in the
+ queue more than once which would barf - this might happen if the same message had been
+ expire from multiple subscriptions of a topic for example
+ We set headers that hold the original message destination, expiry time
+ and original message id
+ */
+
+ ServerMessage copy = copy(newID);
+
+ copy.setOriginalHeaders(this, expiry);
+
+ return copy;
+ }
+
+ public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
+ {
+ if (other.getProperty(HDR_ORIG_MESSAGE_ID) != null)
+ {
+ putStringProperty(HDR_ORIGINAL_DESTINATION, (SimpleString)other.getProperty(HDR_ORIGINAL_DESTINATION));
+
+ putLongProperty(HDR_ORIG_MESSAGE_ID, (Long)other.getProperty(HDR_ORIG_MESSAGE_ID));
+ }
+ else
+ {
+ SimpleString originalQueue = other.getDestination();
+
+ putStringProperty(HDR_ORIGINAL_DESTINATION, originalQueue);
+
+ putLongProperty(HDR_ORIG_MESSAGE_ID, other.getMessageID());
+ }
+
+ if (expiry)
+ {
+ // reset expiry
+ setExpiration(0);
+
+ long actualExpiryTime = System.currentTimeMillis();
+
+ putLongProperty(HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
+ }
+ }
+
@Override
public String toString()
{
Modified: trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
--- trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -49,6 +49,8 @@
public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;
public static final long DEFAULT_REDISTRIBUTION_DELAY = -1;
+
+ public static final boolean DEFAULT_SEND_TO_DLA_ON_NO_ROUTE = false;
private Integer maxSizeBytes = null;
@@ -71,6 +73,8 @@
private Boolean lastValueQueue = null;
private Long redistributionDelay = null;
+
+ private Boolean sendToDLAOnNoRoute = null;
public boolean isLastValueQueue()
{
@@ -91,12 +95,12 @@
{
return dropMessagesWhenFull != null ? dropMessagesWhenFull : DEFAULT_DROP_MESSAGES_WHEN_FULL;
}
-
+
public void setDropMessagesWhenFull(final boolean value)
{
dropMessagesWhenFull = value;
}
-
+
public void setPageSizeBytes(final int pageSize)
{
pageSizeBytes = pageSize;
@@ -172,7 +176,17 @@
{
this.expiryAddress = expiryAddress;
}
+
+ public boolean isSendToDLAOnNoRoute()
+ {
+ return sendToDLAOnNoRoute != null ? sendToDLAOnNoRoute : DEFAULT_SEND_TO_DLA_ON_NO_ROUTE;
+ }
+ public void setSendToDLAOnNoRoute(final boolean value)
+ {
+ sendToDLAOnNoRoute = value;
+ }
+
public Distributor getDistributionPolicy()
{
try
@@ -248,6 +262,10 @@
{
redistributionDelay = merged.redistributionDelay;
}
+ if (sendToDLAOnNoRoute == null)
+ {
+ sendToDLAOnNoRoute = merged.sendToDLAOnNoRoute;
+ }
}
}
Added: trunk/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import static org.hornetq.tests.util.RandomUtil.randomSimpleString;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ *
+ * A NewDeadLetterAddressTest
+ *
+ * @author tim fox
+ *
+ *
+ */
+public class NewDeadLetterAddressTest extends UnitTestCase
+{
+ private HornetQServer server;
+
+ private ClientSession clientSession;
+
+ public void testSendToDLAWhenNoRoute() throws Exception
+ {
+ SimpleString dla = new SimpleString("DLA");
+ SimpleString address = new SimpleString("empty_address");
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setDeadLetterAddress(dla);
+ addressSettings.setSendToDLAOnNoRoute(true);
+ server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
+ SimpleString dlq = new SimpleString("DLQ1");
+ clientSession.createQueue(dla, dlq, null, false);
+ ClientProducer producer = clientSession.createProducer(address);
+ producer.send(createTextMessage("heyho!", clientSession));
+ clientSession.start();
+ ClientConsumer clientConsumer = clientSession.createConsumer(dlq);
+ ClientMessage m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().readString(), "heyho!");
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ConfigurationImpl configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ server = HornetQ.newHornetQServer(configuration, false);
+ // start the server
+ server.start();
+ // then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ clientSession = sessionFactory.createSession(false, true, false);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (clientSession != null)
+ {
+ try
+ {
+ clientSession.close();
+ }
+ catch (HornetQException e1)
+ {
+ //
+ }
+ }
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e1)
+ {
+ //
+ }
+ }
+ server = null;
+ clientSession = null;
+ super.tearDown();
+ }
+
+}
Deleted: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-09-30 04:04:31 UTC (rev 8011)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -1,1019 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.postoffice.impl;
-
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import javax.transaction.xa.Xid;
-
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.filter.Filter;
-import org.hornetq.core.postoffice.Binding;
-import org.hornetq.core.postoffice.BindingType;
-import org.hornetq.core.postoffice.impl.BindingsImpl;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.Bindable;
-import org.hornetq.core.server.MessageReference;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-
-/**
- * A BindingImplTest
- *
- * @author clebert
- *
- * Created Mar 12, 2009 9:14:46 PM
- *
- *
- */
-public class BindingImplTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testRemoveWhileRouting() throws Exception
- {
- // It would require many iterations before getting a failure
- for (int i = 0; i < 500; i++)
- {
- internalTest(true);
- }
- }
-
- public void testRemoveWhileRedistributing() throws Exception
- {
- // It would require many iterations before getting a failure
- for (int i = 0; i < 500; i++)
- {
- internalTest(false);
- }
- }
-
- private void internalTest(final boolean route) throws Exception
- {
- final FakeBinding fake = new FakeBinding(new SimpleString("a"));
-
- final BindingsImpl bind = new BindingsImpl();
- bind.addBinding(fake);
- bind.addBinding(new FakeBinding(new SimpleString("a")));
- bind.addBinding(new FakeBinding(new SimpleString("a")));
-
- Thread t = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- bind.removeBinding(fake);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
- Queue queue = new FakeQueue(new SimpleString("a"));
- t.start();
-
- for (int i = 0; i < 100; i++)
- {
- if (route)
- {
- bind.route(new FakeMessage(), new FakeTransaction());
- }
- else
- {
- bind.redistribute(new FakeMessage(), queue, new FakeTransaction());
- }
- }
- }
-
- class FakeTransaction implements Transaction
- {
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#addOperation(org.hornetq.core.transaction.TransactionOperation)
- */
- public void addOperation(final TransactionOperation sync)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#commit()
- */
- public void commit() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#commit(boolean)
- */
- public void commit(final boolean onePhase) throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#getCreateTime()
- */
- public long getCreateTime()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#getID()
- */
- public long getID()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#getOperationsCount()
- */
- public int getOperationsCount()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#getProperty(int)
- */
- public Object getProperty(final int index)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#getState()
- */
- public State getState()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#getXid()
- */
- public Xid getXid()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#markAsRollbackOnly(org.hornetq.core.exception.HornetQException)
- */
- public void markAsRollbackOnly(final HornetQException exception)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#prepare()
- */
- public void prepare() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#putProperty(int, java.lang.Object)
- */
- public void putProperty(final int index, final Object property)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#removeOperation(org.hornetq.core.transaction.TransactionOperation)
- */
- public void removeOperation(final TransactionOperation sync)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#resume()
- */
- public void resume()
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#rollback()
- */
- public void rollback() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#setState(org.hornetq.core.transaction.Transaction.State)
- */
- public void setState(final State state)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#suspend()
- */
- public void suspend()
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.Transaction#getDistinctQueues()
- */
- public Set<Queue> getDistinctQueues()
- {
- return Collections.emptySet();
- }
-
- }
-
- class FakeMessage implements ServerMessage
- {
-
- public Map<String, Object> toMap()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#copy(long)
- */
- public ServerMessage copy(final long newID) throws Exception
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#copy()
- */
- public ServerMessage copy() throws Exception
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#createReference(org.hornetq.core.server.Queue)
- */
- public MessageReference createReference(final Queue queue)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#decrementDurableRefCount()
- */
- public int decrementDurableRefCount()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#decrementRefCount()
- */
- public int decrementRefCount()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#getMemoryEstimate()
- */
- public int getMemoryEstimate()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#getRefCount()
- */
- public int getRefCount()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#incrementDurableRefCount()
- */
- public int incrementDurableRefCount()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#incrementRefCount()
- */
- public int incrementRefCount()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#isLargeMessage()
- */
- public boolean isLargeMessage()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#isStored()
- */
- public boolean isStored()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#setMessageID(long)
- */
- public void setMessageID(final long id)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.ServerMessage#setStored()
- */
- public void setStored()
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#containsProperty(org.hornetq.utils.SimpleString)
- */
- public boolean containsProperty(final SimpleString key)
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#decode(org.hornetq.core.remoting.spi.HornetQBuffer)
- */
- public void decode(final HornetQBuffer buffer)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#decodeBody(org.hornetq.core.remoting.spi.HornetQBuffer)
- */
- public void decodeBody(final HornetQBuffer buffer)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#decodeProperties(org.hornetq.core.remoting.spi.HornetQBuffer)
- */
- public void decodeProperties(final HornetQBuffer buffer)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#encode(org.hornetq.core.remoting.spi.HornetQBuffer)
- */
- public void encode(final HornetQBuffer buffer)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#encodeBody(org.hornetq.core.remoting.spi.HornetQBuffer, long, int)
- */
- public void encodeBody(final HornetQBuffer buffer, final long start, final int size)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#encodeBody(org.hornetq.core.remoting.spi.HornetQBuffer)
- */
- public void encodeBody(final HornetQBuffer buffer)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#encodeProperties(org.hornetq.core.remoting.spi.HornetQBuffer)
- */
- public void encodeProperties(final HornetQBuffer buffer)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getBody()
- */
- public HornetQBuffer getBody()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getBodySize()
- */
- public int getBodySize()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getDestination()
- */
- public SimpleString getDestination()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getEncodeSize()
- */
- public int getEncodeSize()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getExpiration()
- */
- public long getExpiration()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getMessageID()
- */
- public long getMessageID()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getPriority()
- */
- public byte getPriority()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getProperties()
- */
- public TypedProperties getProperties()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getPropertiesEncodeSize()
- */
- public int getPropertiesEncodeSize()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getProperty(org.hornetq.utils.SimpleString)
- */
- public Object getProperty(final SimpleString key)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getPropertyNames()
- */
- public Set<SimpleString> getPropertyNames()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getTimestamp()
- */
- public long getTimestamp()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getType()
- */
- public byte getType()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#isDurable()
- */
- public boolean isDurable()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#isExpired()
- */
- public boolean isExpired()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putBooleanProperty(org.hornetq.utils.SimpleString, boolean)
- */
- public void putBooleanProperty(final SimpleString key, final boolean value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putByteProperty(org.hornetq.utils.SimpleString, byte)
- */
- public void putByteProperty(final SimpleString key, final byte value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putBytesProperty(org.hornetq.utils.SimpleString, byte[])
- */
- public void putBytesProperty(final SimpleString key, final byte[] value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putDoubleProperty(org.hornetq.utils.SimpleString, double)
- */
- public void putDoubleProperty(final SimpleString key, final double value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putFloatProperty(org.hornetq.utils.SimpleString, float)
- */
- public void putFloatProperty(final SimpleString key, final float value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putIntProperty(org.hornetq.utils.SimpleString, int)
- */
- public void putIntProperty(final SimpleString key, final int value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putLongProperty(org.hornetq.utils.SimpleString, long)
- */
- public void putLongProperty(final SimpleString key, final long value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putShortProperty(org.hornetq.utils.SimpleString, short)
- */
- public void putShortProperty(final SimpleString key, final short value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putStringProperty(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString)
- */
- public void putStringProperty(final SimpleString key, final SimpleString value)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putTypedProperties(org.hornetq.utils.TypedProperties)
- */
- public void putTypedProperties(final TypedProperties properties)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#removeProperty(org.hornetq.utils.SimpleString)
- */
- public Object removeProperty(final SimpleString key)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#setBody(org.hornetq.core.remoting.spi.HornetQBuffer)
- */
- public void setBody(final HornetQBuffer body)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#setDestination(org.hornetq.utils.SimpleString)
- */
- public void setDestination(final SimpleString destination)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#setDurable(boolean)
- */
- public void setDurable(final boolean durable)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#setExpiration(long)
- */
- public void setExpiration(final long expiration)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#setPriority(byte)
- */
- public void setPriority(final byte priority)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#setTimestamp(long)
- */
- public void setTimestamp(final long timestamp)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getBodyInputStream()
- */
- public InputStream getBodyInputStream()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#setBodyInputStream(java.io.InputStream)
- */
- public void setBodyInputStream(InputStream stream)
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getLargeBodySize()
- */
- public long getLargeBodySize()
- {
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#containsProperty(java.lang.String)
- */
- public boolean containsProperty(String key)
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#getProperty(java.lang.String)
- */
- public Object getProperty(String key)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putBooleanProperty(java.lang.String, boolean)
- */
- public void putBooleanProperty(String key, boolean value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putByteProperty(java.lang.String, byte)
- */
- public void putByteProperty(String key, byte value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putBytesProperty(java.lang.String, byte[])
- */
- public void putBytesProperty(String key, byte[] value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putDoubleProperty(java.lang.String, double)
- */
- public void putDoubleProperty(String key, double value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putFloatProperty(java.lang.String, float)
- */
- public void putFloatProperty(String key, float value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putIntProperty(java.lang.String, int)
- */
- public void putIntProperty(String key, int value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putLongProperty(java.lang.String, long)
- */
- public void putLongProperty(String key, long value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putShortProperty(java.lang.String, short)
- */
- public void putShortProperty(String key, short value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#putStringProperty(java.lang.String, java.lang.String)
- */
- public void putStringProperty(String key, String value)
- {
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.message.Message#removeProperty(java.lang.String)
- */
- public Object removeProperty(String key)
- {
-
- return null;
- }
-
- }
-
- class FakeFilter implements Filter
- {
-
- /* (non-Javadoc)
- * @see org.hornetq.core.filter.Filter#getFilterString()
- */
- public SimpleString getFilterString()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.filter.Filter#match(org.hornetq.core.server.ServerMessage)
- */
- public boolean match(final ServerMessage message)
- {
- return false;
- }
-
- }
-
- class FakeBinding implements Binding
- {
-
- final SimpleString name;
-
- FakeBinding(final SimpleString name)
- {
- this.name = name;
- }
-
- public SimpleString getAddress()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#getBindable()
- */
- public Bindable getBindable()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#getClusterName()
- */
- public SimpleString getClusterName()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#getDistance()
- */
- public int getDistance()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#getFilter()
- */
- public Filter getFilter()
- {
- return new FakeFilter();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#getID()
- */
- public int getID()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#getRoutingName()
- */
- public SimpleString getRoutingName()
- {
- return name;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#getType()
- */
- public BindingType getType()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#getUniqueName()
- */
- public SimpleString getUniqueName()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#isExclusive()
- */
- public boolean isExclusive()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#isHighAcceptPriority(org.hornetq.core.server.ServerMessage)
- */
- public boolean isHighAcceptPriority(final ServerMessage message)
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#setID(int)
- */
- public void setID(final int id)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#willRoute(org.hornetq.core.server.ServerMessage)
- */
- public void willRoute(final ServerMessage message)
- {
-
- }
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java (from rev 8006, trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-09-30 10:23:39 UTC (rev 8012)
@@ -0,0 +1,1036 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.postoffice.impl;
+
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.postoffice.impl.BindingsImpl;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.server.Bindable;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+/**
+ * A BindingImplTest
+ *
+ * @author clebert
+ *
+ * Created Mar 12, 2009 9:14:46 PM
+ *
+ *
+ */
+public class BindingsImplTest extends UnitTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testRemoveWhileRouting() throws Exception
+ {
+ // It would require many iterations before getting a failure
+ for (int i = 0; i < 500; i++)
+ {
+ internalTest(true);
+ }
+ }
+
+ public void testRemoveWhileRedistributing() throws Exception
+ {
+ // It would require many iterations before getting a failure
+ for (int i = 0; i < 500; i++)
+ {
+ internalTest(false);
+ }
+ }
+
+ private void internalTest(final boolean route) throws Exception
+ {
+ final FakeBinding fake = new FakeBinding(new SimpleString("a"));
+
+ final BindingsImpl bind = new BindingsImpl();
+ bind.addBinding(fake);
+ bind.addBinding(new FakeBinding(new SimpleString("a")));
+ bind.addBinding(new FakeBinding(new SimpleString("a")));
+
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ bind.removeBinding(fake);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Queue queue = new FakeQueue(new SimpleString("a"));
+ t.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ if (route)
+ {
+ bind.route(new FakeMessage(), new FakeTransaction());
+ }
+ else
+ {
+ bind.redistribute(new FakeMessage(), queue, new FakeTransaction());
+ }
+ }
+ }
+
+ class FakeTransaction implements Transaction
+ {
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#addOperation(org.hornetq.core.transaction.TransactionOperation)
+ */
+ public void addOperation(final TransactionOperation sync)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#commit()
+ */
+ public void commit() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#commit(boolean)
+ */
+ public void commit(final boolean onePhase) throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#getCreateTime()
+ */
+ public long getCreateTime()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#getID()
+ */
+ public long getID()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#getOperationsCount()
+ */
+ public int getOperationsCount()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#getProperty(int)
+ */
+ public Object getProperty(final int index)
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#getState()
+ */
+ public State getState()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#getXid()
+ */
+ public Xid getXid()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#markAsRollbackOnly(org.hornetq.core.exception.HornetQException)
+ */
+ public void markAsRollbackOnly(final HornetQException exception)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#prepare()
+ */
+ public void prepare() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#putProperty(int, java.lang.Object)
+ */
+ public void putProperty(final int index, final Object property)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#removeOperation(org.hornetq.core.transaction.TransactionOperation)
+ */
+ public void removeOperation(final TransactionOperation sync)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#resume()
+ */
+ public void resume()
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#rollback()
+ */
+ public void rollback() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#setState(org.hornetq.core.transaction.Transaction.State)
+ */
+ public void setState(final State state)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#suspend()
+ */
+ public void suspend()
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#getDistinctQueues()
+ */
+ public Set<Queue> getDistinctQueues()
+ {
+ return Collections.emptySet();
+ }
+
+ }
+
+ class FakeMessage implements ServerMessage
+ {
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#makeCopyForExpiryOrDLA(long, boolean)
+ */
+ public ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#setOriginalHeaders(org.hornetq.core.server.ServerMessage, boolean)
+ */
+ public void setOriginalHeaders(ServerMessage other, boolean expiry)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public Map<String, Object> toMap()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#copy(long)
+ */
+ public ServerMessage copy(final long newID) throws Exception
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#copy()
+ */
+ public ServerMessage copy() throws Exception
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#createReference(org.hornetq.core.server.Queue)
+ */
+ public MessageReference createReference(final Queue queue)
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#decrementDurableRefCount()
+ */
+ public int decrementDurableRefCount()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#decrementRefCount()
+ */
+ public int decrementRefCount()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#getMemoryEstimate()
+ */
+ public int getMemoryEstimate()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#getRefCount()
+ */
+ public int getRefCount()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#incrementDurableRefCount()
+ */
+ public int incrementDurableRefCount()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#incrementRefCount()
+ */
+ public int incrementRefCount()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#isLargeMessage()
+ */
+ public boolean isLargeMessage()
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#isStored()
+ */
+ public boolean isStored()
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#setMessageID(long)
+ */
+ public void setMessageID(final long id)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#setStored()
+ */
+ public void setStored()
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#containsProperty(org.hornetq.utils.SimpleString)
+ */
+ public boolean containsProperty(final SimpleString key)
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#decode(org.hornetq.core.remoting.spi.HornetQBuffer)
+ */
+ public void decode(final HornetQBuffer buffer)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#decodeBody(org.hornetq.core.remoting.spi.HornetQBuffer)
+ */
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#decodeProperties(org.hornetq.core.remoting.spi.HornetQBuffer)
+ */
+ public void decodeProperties(final HornetQBuffer buffer)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#encode(org.hornetq.core.remoting.spi.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#encodeBody(org.hornetq.core.remoting.spi.HornetQBuffer, long, int)
+ */
+ public void encodeBody(final HornetQBuffer buffer, final long start, final int size)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#encodeBody(org.hornetq.core.remoting.spi.HornetQBuffer)
+ */
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#encodeProperties(org.hornetq.core.remoting.spi.HornetQBuffer)
+ */
+ public void encodeProperties(final HornetQBuffer buffer)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getBody()
+ */
+ public HornetQBuffer getBody()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getBodySize()
+ */
+ public int getBodySize()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getDestination()
+ */
+ public SimpleString getDestination()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getExpiration()
+ */
+ public long getExpiration()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getMessageID()
+ */
+ public long getMessageID()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getPriority()
+ */
+ public byte getPriority()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getProperties()
+ */
+ public TypedProperties getProperties()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getPropertiesEncodeSize()
+ */
+ public int getPropertiesEncodeSize()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getProperty(org.hornetq.utils.SimpleString)
+ */
+ public Object getProperty(final SimpleString key)
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getPropertyNames()
+ */
+ public Set<SimpleString> getPropertyNames()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getTimestamp()
+ */
+ public long getTimestamp()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getType()
+ */
+ public byte getType()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#isDurable()
+ */
+ public boolean isDurable()
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#isExpired()
+ */
+ public boolean isExpired()
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putBooleanProperty(org.hornetq.utils.SimpleString, boolean)
+ */
+ public void putBooleanProperty(final SimpleString key, final boolean value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putByteProperty(org.hornetq.utils.SimpleString, byte)
+ */
+ public void putByteProperty(final SimpleString key, final byte value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putBytesProperty(org.hornetq.utils.SimpleString, byte[])
+ */
+ public void putBytesProperty(final SimpleString key, final byte[] value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putDoubleProperty(org.hornetq.utils.SimpleString, double)
+ */
+ public void putDoubleProperty(final SimpleString key, final double value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putFloatProperty(org.hornetq.utils.SimpleString, float)
+ */
+ public void putFloatProperty(final SimpleString key, final float value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putIntProperty(org.hornetq.utils.SimpleString, int)
+ */
+ public void putIntProperty(final SimpleString key, final int value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putLongProperty(org.hornetq.utils.SimpleString, long)
+ */
+ public void putLongProperty(final SimpleString key, final long value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putShortProperty(org.hornetq.utils.SimpleString, short)
+ */
+ public void putShortProperty(final SimpleString key, final short value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putStringProperty(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString)
+ */
+ public void putStringProperty(final SimpleString key, final SimpleString value)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putTypedProperties(org.hornetq.utils.TypedProperties)
+ */
+ public void putTypedProperties(final TypedProperties properties)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#removeProperty(org.hornetq.utils.SimpleString)
+ */
+ public Object removeProperty(final SimpleString key)
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#setBody(org.hornetq.core.remoting.spi.HornetQBuffer)
+ */
+ public void setBody(final HornetQBuffer body)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#setDestination(org.hornetq.utils.SimpleString)
+ */
+ public void setDestination(final SimpleString destination)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#setDurable(boolean)
+ */
+ public void setDurable(final boolean durable)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#setExpiration(long)
+ */
+ public void setExpiration(final long expiration)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#setPriority(byte)
+ */
+ public void setPriority(final byte priority)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#setTimestamp(long)
+ */
+ public void setTimestamp(final long timestamp)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getBodyInputStream()
+ */
+ public InputStream getBodyInputStream()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#setBodyInputStream(java.io.InputStream)
+ */
+ public void setBodyInputStream(InputStream stream)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getLargeBodySize()
+ */
+ public long getLargeBodySize()
+ {
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#containsProperty(java.lang.String)
+ */
+ public boolean containsProperty(String key)
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getProperty(java.lang.String)
+ */
+ public Object getProperty(String key)
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putBooleanProperty(java.lang.String, boolean)
+ */
+ public void putBooleanProperty(String key, boolean value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putByteProperty(java.lang.String, byte)
+ */
+ public void putByteProperty(String key, byte value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putBytesProperty(java.lang.String, byte[])
+ */
+ public void putBytesProperty(String key, byte[] value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putDoubleProperty(java.lang.String, double)
+ */
+ public void putDoubleProperty(String key, double value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putFloatProperty(java.lang.String, float)
+ */
+ public void putFloatProperty(String key, float value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putIntProperty(java.lang.String, int)
+ */
+ public void putIntProperty(String key, int value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putLongProperty(java.lang.String, long)
+ */
+ public void putLongProperty(String key, long value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putShortProperty(java.lang.String, short)
+ */
+ public void putShortProperty(String key, short value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#putStringProperty(java.lang.String, java.lang.String)
+ */
+ public void putStringProperty(String key, String value)
+ {
+
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#removeProperty(java.lang.String)
+ */
+ public Object removeProperty(String key)
+ {
+
+ return null;
+ }
+
+ }
+
+ class FakeFilter implements Filter
+ {
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.filter.Filter#getFilterString()
+ */
+ public SimpleString getFilterString()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.filter.Filter#match(org.hornetq.core.server.ServerMessage)
+ */
+ public boolean match(final ServerMessage message)
+ {
+ return false;
+ }
+
+ }
+
+ class FakeBinding implements Binding
+ {
+
+ final SimpleString name;
+
+ FakeBinding(final SimpleString name)
+ {
+ this.name = name;
+ }
+
+ public SimpleString getAddress()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#getBindable()
+ */
+ public Bindable getBindable()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#getClusterName()
+ */
+ public SimpleString getClusterName()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#getDistance()
+ */
+ public int getDistance()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#getFilter()
+ */
+ public Filter getFilter()
+ {
+ return new FakeFilter();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#getID()
+ */
+ public int getID()
+ {
+
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#getRoutingName()
+ */
+ public SimpleString getRoutingName()
+ {
+ return name;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#getType()
+ */
+ public BindingType getType()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#getUniqueName()
+ */
+ public SimpleString getUniqueName()
+ {
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#isExclusive()
+ */
+ public boolean isExclusive()
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#isHighAcceptPriority(org.hornetq.core.server.ServerMessage)
+ */
+ public boolean isHighAcceptPriority(final ServerMessage message)
+ {
+
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#setID(int)
+ */
+ public void setID(final int id)
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.Binding#willRoute(org.hornetq.core.server.ServerMessage)
+ */
+ public void willRoute(final ServerMessage message)
+ {
+
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 3 months
JBoss hornetq SVN: r8011 - in branches/Replication_Clebert: src/main/org/hornetq/core/remoting/impl/wireformat and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-30 00:04:31 -0400 (Wed, 30 Sep 2009)
New Revision: 8011
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes...
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-09-30 04:04:31 UTC (rev 8011)
@@ -13,7 +13,7 @@
package org.hornetq.core.remoting.impl;
-
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
@@ -74,6 +74,7 @@
import org.hornetq.core.remoting.impl.wireformat.Ping;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -353,12 +354,16 @@
packet = new SessionSendContinuationMessage();
break;
}
-
case CREATE_REPLICATION:
{
packet = new CreateReplicationSessionMessage();
break;
}
+ case REPLICATION_APPEND:
+ {
+ packet = new ReplicationAddMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-09-30 04:04:31 UTC (rev 8011)
@@ -141,7 +141,7 @@
// Replication
- public static final byte REPLICATION_SEND_REPLICATION = 77;
+ public static final byte REPLICATION_APPEND = 77;
// Static --------------------------------------------------------
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-09-30 04:04:31 UTC (rev 8011)
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationAddMessage
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationAddMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long id;
+
+ private byte recordType;
+
+ private EncodingSupport encodingData;
+
+ private byte[] recordData;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationAddMessage()
+ {
+ super(REPLICATION_APPEND);
+ }
+
+ public ReplicationAddMessage(long id, byte recordType, EncodingSupport encodingData)
+ {
+ this();
+ this.id = id;
+ this.recordType = recordType;
+ this.encodingData = encodingData;
+ }
+
+ // Public --------------------------------------------------------
+
+
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_INT +
+ (encodingData != null ? encodingData.getEncodeSize() : recordData.length);
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(id);
+ buffer.writeByte(recordType);
+ buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ id = buffer.readLong();
+ recordType = buffer.readByte();
+ int size = buffer.readInt();
+ recordData = new byte[size];
+ buffer.readBytes(recordData);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-30 04:04:31 UTC (rev 8011)
@@ -13,6 +13,7 @@
package org.hornetq.core.replication;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.server.HornetQComponent;
@@ -23,5 +24,5 @@
*/
public interface ReplicationManager extends HornetQComponent
{
- void replicate(byte[] bytes, ReplicationToken token);
+ void appendAddRecord(long id, byte recordType, EncodingSupport record);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-30 04:04:31 UTC (rev 8011)
@@ -14,9 +14,11 @@
package org.hornetq.core.replication.impl;
import org.hornetq.core.client.impl.ConnectionManager;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
@@ -63,9 +65,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
*/
- public void replicate(byte[] bytes, ReplicationToken token)
+
+
+ public void appendAddRecord(long id, byte recordType, EncodingSupport encodingData)
{
- replicatingChannel.send(new CreateReplicationSessionMessage(1, 1));
+ replicatingChannel.send(new ReplicationAddMessage(id, recordType, encodingData));
}
/* (non-Javadoc)
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-29 21:16:46 UTC (rev 8010)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-30 04:04:31 UTC (rev 8011)
@@ -34,6 +34,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -47,6 +48,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
@@ -165,7 +167,8 @@
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
manager.start();
- manager.replicate(new byte[]{3}, null);
+ manager.appendAddRecord(1, (byte)1, new DataImplement());
+ Thread.sleep(1000);
manager.stop();
}
finally
@@ -173,7 +176,29 @@
server.stop();
}
}
+
+ class DataImplement implements EncodingSupport
+ {
+ public void decode(HornetQBuffer buffer)
+ {
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeBytes(new byte[5]);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return 5;
+ }
+
+ }
+
// Package protected ---------------------------------------------
class LocalRemotingServiceImpl extends RemotingServiceImpl
{
15 years, 3 months
JBoss hornetq SVN: r8010 - in branches/Replication_Clebert: src/main/org/hornetq/core/server and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-29 17:16:46 -0400 (Tue, 29 Sep 2009)
New Revision: 8010
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes..
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -50,7 +50,7 @@
*/
public void handlePacket(Packet packet)
{
-
+ System.out.println("packet = " + packet);
}
/* (non-Javadoc)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -34,7 +34,7 @@
// Attributes ----------------------------------------------------
- // TODO: Should this be configurable or not?
+ // TODO: where should this be configured?
private static final int WINDOW_SIZE = 100 * 1024;
private final ConnectionManager connectionManager;
@@ -65,8 +65,7 @@
*/
public void replicate(byte[] bytes, ReplicationToken token)
{
- // TODO Auto-generated method stub
-
+ replicatingChannel.send(new CreateReplicationSessionMessage(1, 1));
}
/* (non-Javadoc)
@@ -105,12 +104,18 @@
*/
public void stop() throws Exception
{
- replicatingChannel.close();
+ if (replicatingChannel != null)
+ {
+ replicatingChannel.close();
+ }
this.started = false;
+
+ if (connection != null)
+ {
+ connection.destroy();
+ }
- connection.destroy();
-
connection = null;
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -19,6 +19,7 @@
import javax.management.MBeanServer;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -70,7 +71,7 @@
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
- ReplicationEndpoint createReplicationEndpoint();
+ ReplicationEndpoint createReplicationEndpoint() throws HornetQException;
CreateSessionResponseMessage createSession(String name,
long channelID,
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -593,12 +593,16 @@
return new CreateSessionResponseMessage(true, version.getIncrementingVersion());
}
- public synchronized ReplicationEndpoint createReplicationEndpoint()
+ public synchronized ReplicationEndpoint createReplicationEndpoint() throws HornetQException
{
+ if (!configuration.isBackup())
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server");
+ }
+
if (replicationEndpoint == null)
{
replicationEndpoint = new ReplicationEndpointImpl(this);
-
}
return replicationEndpoint;
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -33,6 +33,7 @@
import org.hornetq.core.client.impl.ConnectionManagerImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -40,11 +41,9 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Interceptor;
-import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
@@ -59,6 +58,7 @@
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
@@ -82,8 +82,6 @@
// Attributes ----------------------------------------------------
- private RemotingService remoting;
-
private ThreadFactory tFactory;
private ExecutorService executor;
@@ -100,11 +98,82 @@
public void testBasicConnection() throws Exception
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
- manager.start();
- manager.stop();
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(true);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ manager.start();
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
}
+ public void testConnectIntoNonBackup() throws Exception
+ {
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(false);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ try
+ {
+ manager.start();
+ fail("Exception was expected");
+ }
+ catch (HornetQException expected)
+ {
+ }
+
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ public void testSendPackets() throws Exception
+ {
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(true);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ manager.start();
+ manager.replicate(new byte[]{3}, null);
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
// Package protected ---------------------------------------------
class LocalRemotingServiceImpl extends RemotingServiceImpl
{
@@ -137,7 +206,7 @@
protected void setUp() throws Exception
{
- Configuration config = createDefaultConfig(false);
+ super.setUp();
tFactory = new HornetQThreadFactory("HornetQ-ReplicationTest", false);
@@ -145,10 +214,6 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
- remoting = new LocalRemotingServiceImpl(config, new FakeServer(), null, null, executor, scheduledExecutor, 0);
-
- remoting.start();
-
TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName(),
new HashMap<String, Object>(),
randomString());
@@ -175,16 +240,14 @@
protected void tearDown() throws Exception
{
- remoting.stop();
-
executor.shutdown();
scheduledExecutor.shutdown();
- remoting = null;
-
tFactory = null;
+ connectionManager = null;
+
scheduledExecutor = null;
}
15 years, 3 months
JBoss hornetq SVN: r8009 - trunk/tests/src/org/hornetq/tests/integration/jms/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-29 15:23:15 -0400 (Tue, 29 Sep 2009)
New Revision: 8009
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
Log:
small tweak
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-09-29 17:27:11 UTC (rev 8008)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-09-29 19:23:15 UTC (rev 8009)
@@ -176,6 +176,7 @@
}
else if (copiedMessage instanceof ObjectMessage)
{
+ assertNotSame(((ObjectMessage)originalMessage).getObject(), ((ObjectMessage)copiedMessage).getObject());
assertEquals(((ObjectMessage)originalMessage).getObject(), ((ObjectMessage)copiedMessage).getObject());
}
else if (copiedMessage instanceof TextMessage)
15 years, 3 months
JBoss hornetq SVN: r8008 - in trunk: src/main/org/hornetq/core/client and 15 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-29 13:27:11 -0400 (Tue, 29 Sep 2009)
New Revision: 8008
Modified:
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/joram-tests/src/org/objectweb/jtests/jms/conform/selector/SelectorTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
added max retry interval https://jira.jboss.org/jira/browse/HORNETQ-147
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2009-09-29 17:27:11 UTC (rev 8008)
@@ -98,7 +98,10 @@
</xsd:element>
<xsd:element name="retry-interval-multiplier" type="xsd:float"
maxOccurs="1" minOccurs="0">
- </xsd:element>
+ </xsd:element>
+ <xsd:element name="max-retry-interval" type="xsd:long"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
<xsd:element name="reconnect-attempts" type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -141,7 +141,11 @@
double getRetryIntervalMultiplier();
void setRetryIntervalMultiplier(double retryIntervalMultiplier);
+
+ long getMaxRetryInterval();
+ void setMaxRetryInterval(long maxRetryInterval);
+
int getReconnectAttempts();
void setReconnectAttempts(int reconnectAttempts);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -100,6 +100,8 @@
public static final long DEFAULT_RETRY_INTERVAL = 2000;
public static final double DEFAULT_RETRY_INTERVAL_MULTIPLIER = 1d;
+
+ public static final long DEFAULT_MAX_RETRY_INTERVAL = 2000;
public static final int DEFAULT_RECONNECT_ATTEMPTS = 0;
@@ -189,6 +191,8 @@
private long retryInterval;
private double retryIntervalMultiplier;
+
+ private long maxRetryInterval;
private int reconnectAttempts;
@@ -290,6 +294,7 @@
connectionTTL,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
useReattach,
threadPool,
@@ -360,6 +365,8 @@
retryInterval = DEFAULT_RETRY_INTERVAL;
retryIntervalMultiplier = DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ maxRetryInterval = DEFAULT_MAX_RETRY_INTERVAL;
reconnectAttempts = DEFAULT_RECONNECT_ATTEMPTS;
@@ -643,7 +650,18 @@
checkWrite();
this.retryInterval = retryInterval;
}
+
+ public synchronized long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+ public synchronized void setMaxRetryInterval(long retryInterval)
+ {
+ checkWrite();
+ this.maxRetryInterval = retryInterval;
+ }
+
public synchronized double getRetryIntervalMultiplier()
{
return retryIntervalMultiplier;
@@ -938,6 +956,7 @@
connectionTTL,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
useReattach,
threadPool,
Modified: trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -127,6 +127,8 @@
private final long retryInterval;
private final double retryIntervalMultiplier; // For exponential backoff
+
+ private final long maxRetryInterval;
private final int reconnectAttempts;
@@ -175,6 +177,7 @@
final long connectionTTL,
final long retryInterval,
final double retryIntervalMultiplier,
+ final long maxRetryInterval,
final int reconnectAttempts,
final boolean useReattach,
final ExecutorService threadPool,
@@ -217,6 +220,8 @@
this.retryInterval = retryInterval;
this.retryIntervalMultiplier = retryIntervalMultiplier;
+
+ this.maxRetryInterval = maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
@@ -836,6 +841,8 @@
try
{
+ log.info("sleeping " + interval);
+
Thread.sleep(interval);
}
catch (InterruptedException ignore)
@@ -843,7 +850,14 @@
}
// Exponential back-off
- interval *= retryIntervalMultiplier;
+ long newInterval = (long)((double)interval * retryIntervalMultiplier);
+
+ if (newInterval > maxRetryInterval)
+ {
+ newInterval = maxRetryInterval;
+ }
+
+ interval = newInterval;
}
else
{
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -442,7 +442,17 @@
{
sessionFactory.setRetryInterval(retryInterval);
}
+
+ public synchronized long getMaxRetryInterval()
+ {
+ return sessionFactory.getMaxRetryInterval();
+ }
+ public synchronized void setMaxRetryInterval(long retryInterval)
+ {
+ sessionFactory.setMaxRetryInterval(retryInterval);
+ }
+
public synchronized double getRetryIntervalMultiplier()
{
return sessionFactory.getRetryIntervalMultiplier();
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -162,6 +162,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
List<String> jndiBindings) throws Exception;
@@ -195,6 +196,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
List<String> jndiBindings) throws Exception;
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -144,6 +144,7 @@
boolean preAcknowledge = getBoolean(e, "pre-acknowledge", ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE);
long retryInterval = getLong(e, "retry-interval", ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL, GT_ZERO);
double retryIntervalMultiplier = getDouble(e, "retry-interval-multiplier", ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER, GT_ZERO);
+ long maxRetryInterval = getLong(e, "max-retry-interval", ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL, GT_ZERO);
int reconnectAttempts = getInteger(e, "reconnect-attempts", ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS, MINUS_ONE_OR_GE_ZERO);
boolean failoverOnServerShutdown = getBoolean(e, "failover-on-server-shutdown", ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
boolean useGlobalPools = getBoolean(e, "use-global-pools", ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS);
@@ -257,6 +258,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
@@ -289,6 +291,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -368,6 +368,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
List<String> jndiBindings) throws Exception
@@ -401,6 +402,7 @@
cf.setThreadPoolMaxSize(threadPoolMaxSize);
cf.setRetryInterval(retryInterval);
cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ cf.setMaxRetryInterval(maxRetryInterval);
cf.setReconnectAttempts(reconnectAttempts);
cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
}
@@ -437,6 +439,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
List<String> jndiBindings) throws Exception
@@ -472,6 +475,7 @@
cf.setThreadPoolMaxSize(threadPoolMaxSize);
cf.setRetryInterval(retryInterval);
cf.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ cf.setMaxRetryInterval(maxRetryInterval);
cf.setReconnectAttempts(reconnectAttempts);
cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
}
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -119,6 +119,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
Object[] jndiBindings) throws Exception;
@@ -152,6 +153,7 @@
@Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
@Parameter(name = "retryInterval") long retryInterval,
@Parameter(name = "retryIntervalMultiplier") double retryIntervalMultiplier,
+ @Parameter(name = "maxRetryInterval") long maxRetryInterval,
@Parameter(name = "reconnectAttempts") int reconnectAttempts,
@Parameter(name = "failoverOnServerShutdown") boolean failoverOnServerShutdown,
@Parameter(name = "jndiBindings", desc = "comma-separated list of JNDI bindings") String jndiBindings) throws Exception;
@@ -198,6 +200,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
Object[] jndiBindings) throws Exception;
@@ -232,6 +235,7 @@
@Parameter(name = "threadPoolMaxSize") int threadPoolMaxSize,
@Parameter(name = "retryInterval") long retryInterval,
@Parameter(name = "retryIntervalMultiplier") double retryIntervalMultiplier,
+ @Parameter(name = "maxRetryInterval") long maxRetryInterval,
@Parameter(name = "reconnectAttempts") int reconnectAttempts,
@Parameter(name = "failoverOnServerShutdown") boolean failoverOnServerShutdown,
@Parameter(name = "jndiBindings", desc = "comma-separated list of JNDI bindings") String jndiBindings) throws Exception;
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -246,6 +246,7 @@
final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
+ final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final Object[] jndiBindings) throws Exception
@@ -283,6 +284,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindingsList);
@@ -319,6 +321,7 @@
final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
+ final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final String jndiBindings) throws Exception
@@ -358,6 +361,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
bindings);
@@ -416,6 +420,7 @@
final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
+ final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final Object[] jndiBindings) throws Exception
@@ -451,6 +456,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindingsList);
@@ -487,6 +493,7 @@
final int threadPoolMaxSize,
final long retryInterval,
final double retryIntervalMultiplier,
+ final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final String jndiBindings) throws Exception
@@ -522,6 +529,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
bindings);
Modified: trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-09-29 17:27:11 UTC (rev 8008)
@@ -33,6 +33,7 @@
<reconnect-attempts>34</reconnect-attempts>
<retry-interval>5</retry-interval>
<retry-interval-multiplier>6.0</retry-interval-multiplier>
+ <max-retry-interval>300</max-retry-interval>
</connection-factory>
<queue name="fullConfigurationQueue">
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -24,6 +24,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -43,6 +44,7 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.utils.Pair;
@@ -112,6 +114,7 @@
DEFAULT_THREAD_POOL_MAX_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
jndiBindings);
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -24,6 +24,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -40,6 +41,7 @@
import javax.naming.InitialContext;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.utils.Pair;
@@ -108,6 +110,7 @@
DEFAULT_THREAD_POOL_MAX_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
jndiBindings);
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -23,6 +23,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -47,6 +48,7 @@
import javax.management.ObjectName;
import javax.naming.InitialContext;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ObjectNames;
@@ -337,6 +339,7 @@
DEFAULT_THREAD_POOL_MAX_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
jndiBindings);
Modified: trunk/tests/joram-tests/src/org/objectweb/jtests/jms/conform/selector/SelectorTest.java
===================================================================
--- trunk/tests/joram-tests/src/org/objectweb/jtests/jms/conform/selector/SelectorTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/joram-tests/src/org/objectweb/jtests/jms/conform/selector/SelectorTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -41,9 +41,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "");
receiverConnection.start();
@@ -52,7 +52,7 @@
message.setText("testEmptyStringAsSelector");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue("No message was received", msg != null);
assertEquals("testEmptyStringAsSelector", msg.getText());
}
@@ -75,9 +75,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "string = 'literal''s'");
receiverConnection.start();
@@ -92,7 +92,7 @@
message.setText("testStringLiterals:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue("No message was received", msg != null);
assertEquals("testStringLiterals:2", msg.getText());
}
@@ -111,9 +111,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "JMSDeliveryMode = 'PERSISTENT'");
receiverConnection.start();
@@ -128,7 +128,7 @@
// send a message in *persistent*
sender.send(message, DeliveryMode.PERSISTENT, sender.getPriority(), sender.getTimeToLive());
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue("No message was received", msg != null);
// only the message sent in persistent mode should be received.
assertEquals(DeliveryMode.PERSISTENT, msg.getJMSDeliveryMode());
@@ -150,9 +150,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "NumberOfOrders > 1");
receiverConnection.start();
@@ -167,7 +167,7 @@
message.setText("testIdentifierConversion:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertEquals("testIdentifierConversion:2", msg.getText());
}
catch (JMSException e)
@@ -188,12 +188,12 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue,
- "JMSType = 'car' AND color = 'blue' AND weight > 2500");
+ "JMSType = 'car' AND color = 'blue' AND weight > 2500");
receiverConnection.start();
TextMessage dummyMessage = senderSession.createTextMessage();
@@ -210,7 +210,7 @@
message.setText("testSelectorExampleFromSpecs:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertEquals("testSelectorExampleFromSpecs:2", msg.getText());
}
catch (JMSException e)
@@ -231,9 +231,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "weight > 2500");
receiverConnection.start();
@@ -248,7 +248,7 @@
message.setText("testGreaterThan:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertEquals("testGreaterThan:2", msg.getText());
}
catch (JMSException e)
@@ -269,9 +269,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "weight = 2500");
receiverConnection.start();
@@ -286,7 +286,7 @@
message.setText("testEquals:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertEquals("testEquals:2", msg.getText());
}
catch (JMSException e)
@@ -307,9 +307,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "weight <> 2500");
receiverConnection.start();
@@ -324,7 +324,7 @@
message.setText("testEquals:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertEquals("testEquals:2", msg.getText());
}
catch (JMSException e)
@@ -345,9 +345,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "age BETWEEN 15 and 19");
receiverConnection.start();
@@ -362,7 +362,7 @@
message.setText("testBetween:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue("Message not received", msg != null);
assertTrue("Message of another test: " + msg.getText(), msg.getText().startsWith("testBetween"));
assertEquals("testBetween:2", msg.getText());
@@ -386,9 +386,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "Country IN ('UK', 'US', 'France')");
receiverConnection.start();
@@ -403,7 +403,7 @@
message.setText("testIn:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue("Message not received", msg != null);
assertTrue("Message of another test: " + msg.getText(), msg.getText().startsWith("testIn"));
assertEquals("testIn:2", msg.getText());
@@ -427,9 +427,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "underscored LIKE '\\_%' ESCAPE '\\'");
receiverConnection.start();
@@ -444,7 +444,7 @@
message.setText("testLikeEscape:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue("Message not received", msg != null);
assertTrue("Message of another test: " + msg.getText(), msg.getText().startsWith("testLikeEscape"));
assertEquals("testLikeEscape:2", msg.getText());
@@ -468,9 +468,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "word LIKE 'l_se'");
receiverConnection.start();
@@ -485,7 +485,7 @@
message.setText("testLike_2:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue("Message not received", msg != null);
assertTrue("Message of another test: " + msg.getText(), msg.getText().startsWith("testLike_2"));
assertEquals("testLike_2:2", msg.getText());
@@ -509,9 +509,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "phone LIKE '12%3'");
receiverConnection.start();
@@ -526,7 +526,7 @@
message.setText("testLike_1:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue("Message not received", msg != null);
assertTrue("Message of another test: " + msg.getText(), msg.getText().startsWith("testLike_1"));
assertEquals("testLike_1:2", msg.getText());
@@ -550,9 +550,9 @@
try
{
receiverConnection.stop();
- if (receiver!=null)
+ if (receiver != null)
{
- receiver.close();
+ receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "prop_name IS NULL");
receiverConnection.start();
@@ -566,7 +566,7 @@
message.setText("testNull:2");
sender.send(message);
- TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
+ TextMessage msg = (TextMessage)receiver.receive(TestConfig.TIMEOUT);
assertTrue(msg != null);
assertEquals("testNull:2", msg.getText());
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -75,7 +75,7 @@
final int reconnectAttempts = 1;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
@@ -153,7 +153,7 @@
final int reconnectAttempts = -1;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
@@ -243,28 +243,28 @@
final long asyncFailDelay = 2000;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
sf.setUseReattach(true);
ClientSession session = sf.createSession(false, true, true);
-
+
ClientSession session2 = sf.createSession(false, true, true);
-
+
class MyFailureListener implements FailureListener
{
volatile boolean failed;
-
+
public void connectionFailed(HornetQException me)
{
failed = true;
}
}
-
+
MyFailureListener listener = new MyFailureListener();
-
+
session2.addFailureListener(listener);
session.createQueue(ADDRESS, ADDRESS, null, false);
@@ -290,17 +290,17 @@
InVMConnector.numberOfFailures = 10;
InVMConnector.failOnCreateConnection = true;
- //We need to fail on different connections.
-
- //We fail on one connection then the connection manager tries to reconnect all connections
- //Then we fail the other, and the connection manager is then called while the reconnection is occurring
- //We can't use the same connection since RemotingConnectionImpl only allows one fail to be in process
- //at same time
-
+ // We need to fail on different connections.
+
+ // We fail on one connection then the connection manager tries to reconnect all connections
+ // Then we fail the other, and the connection manager is then called while the reconnection is occurring
+ // We can't use the same connection since RemotingConnectionImpl only allows one fail to be in process
+ // at same time
+
final RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
-
+
final RemotingConnection conn2 = ((ClientSessionInternal)session2).getConnection();
-
+
assertTrue(conn != conn2);
Thread t = new Thread()
@@ -314,7 +314,7 @@
catch (InterruptedException ignore)
{
}
-
+
log.info("calling fail async");
conn2.fail(new HornetQException(HornetQException.NOT_CONNECTED, "Did not receive pong from server"));
@@ -322,13 +322,13 @@
};
t.start();
-
+
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
+
assertTrue(listener.failed);
-
- session.start();
+ session.start();
+
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(500);
@@ -347,7 +347,7 @@
assertNull(message);
session.close();
-
+
session2.close();
sf.close();
@@ -364,7 +364,7 @@
final int reconnectAttempts = 3;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
@@ -441,7 +441,7 @@
final int reconnectAttempts = 10;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
@@ -509,7 +509,7 @@
final int reconnectAttempts = -1;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
@@ -596,12 +596,12 @@
{
final long retryInterval = 500;
- final double retryMultiplier = 4d;
+ final double retryMultiplier = 2d;
final int reconnectAttempts = -1;
ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
-
+
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
@@ -630,29 +630,93 @@
ClientConsumer consumer = session.createConsumer(ADDRESS);
InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = 3;
RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
long start = System.currentTimeMillis();
+
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- Thread t = new Thread()
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
{
- public void run()
- {
- try
- {
- Thread.sleep(retryInterval * 2);
- }
- catch (InterruptedException ignore)
- {
- }
+ ClientMessage message = consumer.receive(500);
- InVMConnector.failOnCreateConnection = false;
- }
- };
+ assertNotNull(message);
- t.start();
+ assertEquals("aardvarks", message.getBody().readString());
+ assertEquals(i, message.getProperty(new SimpleString("count")));
+
+ message.acknowledge();
+ }
+
+ ClientMessage message = consumer.receiveImmediate();
+
+ assertNull(message);
+
+ long end = System.currentTimeMillis();
+
+ double wait = retryInterval + retryMultiplier * retryInterval + retryMultiplier * retryMultiplier * retryInterval;
+
+ log.info("wait is " + wait);
+
+ assertTrue((end - start) >= wait);
+
+ session.close();
+
+ sf.close();
+ }
+
+ public void testExponentialBackoffMaxRetryInterval() throws Exception
+ {
+ final long retryInterval = 500;
+
+ final double retryMultiplier = 2d;
+
+ final int reconnectAttempts = -1;
+
+ final long maxRetryInterval = 1000;
+
+ ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ sf.setRetryInterval(retryInterval);
+ sf.setRetryIntervalMultiplier(retryMultiplier);
+ sf.setReconnectAttempts(reconnectAttempts);
+ sf.setMaxRetryInterval(maxRetryInterval);
+ sf.setUseReattach(true);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(HornetQTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().writeString("aardvarks");
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ InVMConnector.failOnCreateConnection = true;
+ InVMConnector.numberOfFailures = 3;
+
+ RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+ long start = System.currentTimeMillis();
+
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
session.start();
@@ -676,13 +740,17 @@
long end = System.currentTimeMillis();
- assertTrue((end - start) >= retryInterval * (1 + retryMultiplier));
+ double wait = retryInterval + retryMultiplier * 2 * retryInterval + retryMultiplier;
+
+ log.info("wait is " + wait);
+ assertTrue((end - start) >= wait);
+
+ assertTrue((end - start) < wait + 500);
+
session.close();
sf.close();
-
- t.join();
}
// Package protected ---------------------------------------------
@@ -710,7 +778,7 @@
service.stop();
assertEquals(0, InVMRegistry.instance.size());
-
+
service = null;
super.tearDown();
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -171,6 +171,7 @@
DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
+ 1000,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -281,6 +281,7 @@
DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
+ 1000,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -25,6 +25,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE;
@@ -41,6 +42,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.tests.util.JMSTestBase;
import org.hornetq.utils.Pair;
@@ -244,6 +246,7 @@
DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ResendTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -24,6 +24,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -283,6 +284,7 @@
DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -452,6 +452,7 @@
ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE,
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
bindings);
@@ -497,6 +498,7 @@
ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE,
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
jndiBindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -121,6 +121,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
Object[] jndiBindings) throws Exception
@@ -154,6 +155,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
@@ -188,6 +190,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
String jndiBindings) throws Exception
@@ -221,6 +224,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
@@ -403,6 +407,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
Object[] jndiBindings) throws Exception
@@ -437,6 +442,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
@@ -472,6 +478,7 @@
int threadPoolMaxSize,
long retryInterval,
double retryIntervalMultiplier,
+ long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
String jndiBindings) throws Exception
@@ -506,6 +513,7 @@
threadPoolMaxSize,
retryInterval,
retryIntervalMultiplier,
+ maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-09-29 13:34:52 UTC (rev 8007)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-09-29 17:27:11 UTC (rev 8008)
@@ -25,6 +25,7 @@
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -40,6 +41,7 @@
import javax.jms.Queue;
import javax.naming.NamingException;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.server.HornetQ;
@@ -223,6 +225,7 @@
DEFAULT_THREAD_POOL_MAX_SIZE,
retryInterval,
retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
jndiBindings);
15 years, 3 months