[jboss-cvs] JBoss Messaging SVN: r4965 - in trunk: src/main/org/jboss/messaging/core/client and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Sep 17 07:18:49 EDT 2008
Author: timfox
Date: 2008-09-17 07:18:49 -0400 (Wed, 17 Sep 2008)
New Revision: 4965
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicationTest.java
Modified:
trunk/src/config/jbm-jndi.xml
trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
Log:
Failover
Modified: trunk/src/config/jbm-jndi.xml
===================================================================
--- trunk/src/config/jbm-jndi.xml 2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/config/jbm-jndi.xml 2008-09-17 11:18:49 UTC (rev 4965)
@@ -22,12 +22,16 @@
<connector>
<factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
</connector>
+ <backup-connector>
+ <factory-class>org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ <params>
+ <param key="jbm.remoting.netty.host" value="somehost" type="String"/>
+ </params>
+ </backup-connector>
<entry name="/ClusteredConnectionFactory"/>
<entry name="/ClusteredXAConnectionFactory"/>
<entry name="java:/ClusteredConnectionFactory"/>
<entry name="java:/ClusteredXAConnectionFactory"/>
- <supports-failover>true</supports-failover>
- <supports-load-balancing>true</supports-load-balancing>
</connection-factory>
<connection-factory name="MyExampleConnectionFactory">
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-09-17 11:18:49 UTC (rev 4965)
@@ -80,7 +80,15 @@
Map<String, Object> getTransportParams();
void setTransportParams(final Map<String, Object> transportParams);
+
+ ConnectorFactory getBackupConnectorFactory();
+ void setBackupConnectorFactory(final ConnectorFactory connectorFactory);
+
+ Map<String, Object> getBackupTransportParams();
+
+ void setBackupTransportParams(final Map<String, Object> transportParams);
+
long getPingPeriod();
void setPingPeriod(final long pingPeriod);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-09-17 11:18:49 UTC (rev 4965)
@@ -120,8 +120,6 @@
private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<ClientSessionInternal>();
- private volatile boolean sessionsCreated;
-
// Static
// ---------------------------------------------------------------------------------------
@@ -309,7 +307,7 @@
public void setConnectorFactory(final ConnectorFactory connectorFactory)
{
- if (sessionsCreated)
+ if (!sessions.isEmpty())
{
throw new IllegalStateException("Cannot set connector factory after connections have been created");
}
@@ -324,7 +322,7 @@
public void setTransportParams(final Map<String, Object> transportParams)
{
- if (sessionsCreated)
+ if (!sessions.isEmpty())
{
throw new IllegalStateException("Cannot set transport params after connections have been created");
}
@@ -339,7 +337,7 @@
public void setBackupConnectorFactory(final ConnectorFactory connectorFactory)
{
- if (sessionsCreated)
+ if (!sessions.isEmpty())
{
throw new IllegalStateException("Cannot set backup connector factory after connections have been created");
}
@@ -354,7 +352,7 @@
public void setBackupTransportParams(final Map<String, Object> transportParams)
{
- if (sessionsCreated)
+ if (!sessions.isEmpty())
{
throw new IllegalStateException("Cannot set backup transport params after connections have been created");
}
@@ -528,8 +526,6 @@
sessionChannel.setHandler(handler);
- sessionsCreated = true;
-
return session;
}
catch (Throwable t)
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-09-17 11:18:49 UTC (rev 4965)
@@ -565,10 +565,12 @@
channel.sendBlocking(new PacketImpl(SESS_CLOSE));
}
- finally
+ catch (Throwable ignore)
{
- doCleanup();
+ //Session close should always return without exception
}
+
+ doCleanup();
}
public ClientMessage createClientMessage(byte type, boolean durable, long expiration, long timestamp, byte priority)
@@ -750,24 +752,20 @@
{
ClientConsumerInternal consumer = consumers.get(consumerID);
- if (consumer == null)
+ if (consumer != null)
{
- throw new IllegalArgumentException("Cannot find consumer with id " + consumerID);
- }
-
- consumer.handleMessage(message);
+ consumer.handleMessage(message);
+ }
}
public void receiveProducerCredits(final long producerID, final int credits) throws Exception
{
ClientProducerInternal producer = producers.get(producerID);
- if (producer == null)
+ if (producer != null)
{
- throw new IllegalArgumentException("Cannot find producer with id " + producerID);
- }
-
- producer.receiveCredits(credits);
+ producer.receiveCredits(credits);
+ }
}
public void handleFailover(final RemotingConnection backupConnection)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-09-17 11:18:49 UTC (rev 4965)
@@ -488,11 +488,6 @@
private void doWrite(final Packet packet)
{
- if (destroyed)
- {
- return;
- }
-
final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
packet.encode(buffer);
@@ -944,6 +939,11 @@
// This must never called by more than one thread concurrently
public Packet sendBlocking(final Packet packet) throws MessagingException
{
+ if (connection.destroyed)
+ {
+ throw new MessagingException(MessagingException.NOT_CONNECTED, "Cannot write to connection - it is destroyed");
+ }
+
lock.readLock().lock();
try
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-09-17 11:18:49 UTC (rev 4965)
@@ -1,24 +1,24 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
package org.jboss.messaging.jms.server.impl;
@@ -41,34 +41,52 @@
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
*/
-public class JMSServerDeployer extends XmlDeployer
+public class JMSServerDeployer extends XmlDeployer
{
Logger log = Logger.getLogger(JMSServerDeployer.class);
-
+
public static final int DEFAULT_DUPS_OK_BATCH_SIZE = 1000;
private JMSServerManager jmsServerManager;
private static final String CLIENTID_ELEMENT = "client-id";
+
private static final String PING_PERIOD_ELEMENT = "ping-period";
+
private static final String CALL_TIMEOUT_ELEMENT = "call-timeout";
+
private static final String DUPS_OK_BATCH_SIZE_ELEMENT = "dups-ok-batch-size";
+
private static final String CONSUMER_WINDOW_SIZE_ELEMENT = "consumer-window-size";
+
private static final String CONSUMER_MAX_RATE_ELEMENT = "consumer-max-rate";
+
private static final String PRODUCER_WINDOW_SIZE_ELEMENT = "producer-window-size";
+
private static final String PRODUCER_MAX_RATE_ELEMENT = "producer-max-rate";
+
private static final String BLOCK_ON_ACKNOWLEDGE_ELEMENT = "block-on-acknowledge";
+
private static final String SEND_NP_MESSAGES_SYNCHRONOUSLY_ELEMENT = "send-np-messages-synchronously";
+
private static final String SEND_P_MESSAGES_SYNCHRONOUSLY_ELEMENT = "send-p-messages-synchronously";
+
private static final String CONNECTOR_ELEMENT = "connector";
+
private static final String BACKUP_CONNECTOR_ELEMENT = "backup-connector";
+
private static final String FACTORY_CLASS_ELEMENT = "factory-class";
+
private static final String PARAMS_ELEMENT = "params";
+
private static final String PARAM_ELEMENT = "param";
-
+
private static final String ENTRY_NODE_NAME = "entry";
+
private static final String CONNECTION_FACTORY_NODE_NAME = "connection-factory";
+
private static final String QUEUE_NODE_NAME = "queue";
+
private static final String TOPIC_NODE_NAME = "topic";
public JMSServerDeployer(DeploymentManager deploymentManager)
@@ -88,7 +106,7 @@
*/
public String[] getElementTagName()
{
- return new String[]{QUEUE_NODE_NAME, TOPIC_NODE_NAME, CONNECTION_FACTORY_NODE_NAME};
+ return new String[] { QUEUE_NODE_NAME, TOPIC_NODE_NAME, CONNECTION_FACTORY_NODE_NAME };
}
/**
@@ -113,7 +131,7 @@
if (node.getNodeName().equals(CONNECTION_FACTORY_NODE_NAME))
{
NodeList children = node.getChildNodes();
-
+
long pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
long callTimeout = ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
String clientID = null;
@@ -125,13 +143,13 @@
boolean blockOnAcknowledge = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
boolean blockOnNonPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
boolean blockOnPersistentSend = ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-
+
List<String> jndiBindings = new ArrayList<String>();
- String connectorFactoryClassName = null;
+ String connectorFactoryClassName = null;
Map<String, Object> params = new HashMap<String, Object>();
- String backupConnectorFactoryClassName = null;
+ String backupConnectorFactoryClassName = null;
Map<String, Object> backupParams = new HashMap<String, Object>();
-
+
for (int j = 0; j < children.getLength(); j++)
{
if (PING_PERIOD_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
@@ -186,50 +204,51 @@
else if (CONNECTOR_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
{
NodeList children2 = children.item(j).getChildNodes();
-
+
for (int l = 0; l < children2.getLength(); l++)
- {
+ {
String nodeName = children2.item(l).getNodeName();
-
+
if (FACTORY_CLASS_ELEMENT.equalsIgnoreCase(nodeName))
- {
+ {
connectorFactoryClassName = children2.item(l).getTextContent();
}
else if (PARAMS_ELEMENT.equalsIgnoreCase(nodeName))
- {
+ {
NodeList nlParams = children2.item(l).getChildNodes();
-
+
for (int m = 0; m < nlParams.getLength(); m++)
{
if (PARAM_ELEMENT.equalsIgnoreCase(nlParams.item(m).getNodeName()))
{
Node paramNode = nlParams.item(m);
-
+
NamedNodeMap attributes = paramNode.getAttributes();
-
+
Node nkey = attributes.getNamedItem("key");
-
+
String key = nkey.getTextContent();
-
+
Node nValue = attributes.getNamedItem("value");
-
+
String value = nValue.getTextContent();
-
+
Node nType = attributes.getNamedItem("type");
-
+
String type = nType.getTextContent();
-
+
if (type.equalsIgnoreCase("Integer"))
{
try
{
Integer iVal = Integer.parseInt(value);
-
+
params.put(key, iVal);
}
catch (NumberFormatException e2)
{
- throw new IllegalArgumentException("Remoting acceptor parameter " + value + " is not a valid Integer");
+ throw new IllegalArgumentException("Remoting acceptor parameter " + value +
+ " is not a valid Integer");
}
}
else if (type.equalsIgnoreCase("Long"))
@@ -237,23 +256,24 @@
try
{
Long lVal = Long.parseLong(value);
-
+
params.put(key, lVal);
}
catch (NumberFormatException e2)
{
- throw new IllegalArgumentException("Remoting acceptor parameter " + value + " is not a valid Long");
+ throw new IllegalArgumentException("Remoting acceptor parameter " + value +
+ " is not a valid Long");
}
}
else if (type.equalsIgnoreCase("String"))
{
- params.put(key, value);
+ params.put(key, value);
}
else if (type.equalsIgnoreCase("Boolean"))
{
Boolean lVal = Boolean.parseBoolean(value);
-
- params.put(key, lVal);
+
+ params.put(key, lVal);
}
else
{
@@ -261,56 +281,57 @@
}
}
}
- }
+ }
}
}
else if (BACKUP_CONNECTOR_ELEMENT.equalsIgnoreCase(children.item(j).getNodeName()))
{
NodeList children2 = children.item(j).getChildNodes();
-
+
for (int l = 0; l < children2.getLength(); l++)
- {
+ {
String nodeName = children2.item(l).getNodeName();
-
+
if (FACTORY_CLASS_ELEMENT.equalsIgnoreCase(nodeName))
- {
+ {
backupConnectorFactoryClassName = children2.item(l).getTextContent();
}
else if (PARAMS_ELEMENT.equalsIgnoreCase(nodeName))
- {
+ {
NodeList nlParams = children2.item(l).getChildNodes();
-
+
for (int m = 0; m < nlParams.getLength(); m++)
{
if (PARAM_ELEMENT.equalsIgnoreCase(nlParams.item(m).getNodeName()))
{
Node paramNode = nlParams.item(m);
-
+
NamedNodeMap attributes = paramNode.getAttributes();
-
+
Node nkey = attributes.getNamedItem("key");
-
+
String key = nkey.getTextContent();
-
+
Node nValue = attributes.getNamedItem("value");
-
+
String value = nValue.getTextContent();
-
+
Node nType = attributes.getNamedItem("type");
-
+
String type = nType.getTextContent();
-
+
if (type.equalsIgnoreCase("Integer"))
{
try
{
Integer iVal = Integer.parseInt(value);
-
+
backupParams.put(key, iVal);
}
catch (NumberFormatException e2)
{
- throw new IllegalArgumentException("Remoting acceptor parameter " + value + " is not a valid Integer");
+ throw new IllegalArgumentException("Remoting acceptor parameter " + value +
+ " is not a valid Integer");
}
}
else if (type.equalsIgnoreCase("Long"))
@@ -318,23 +339,24 @@
try
{
Long lVal = Long.parseLong(value);
-
+
backupParams.put(key, lVal);
}
catch (NumberFormatException e2)
{
- throw new IllegalArgumentException("Remoting acceptor parameter " + value + " is not a valid Long");
+ throw new IllegalArgumentException("Remoting acceptor parameter " + value +
+ " is not a valid Long");
}
}
else if (type.equalsIgnoreCase("String"))
{
- backupParams.put(key, value);
+ backupParams.put(key, value);
}
else if (type.equalsIgnoreCase("Boolean"))
{
Boolean lVal = Boolean.parseBoolean(value);
-
- backupParams.put(key, lVal);
+
+ backupParams.put(key, lVal);
}
else
{
@@ -342,33 +364,42 @@
}
}
}
- }
+ }
}
}
}
-
+
if (connectorFactoryClassName == null)
{
throw new IllegalArgumentException("connector-factory-class-name must be specified in configuration");
}
-
- TransportConfiguration connectorConfig =
- new TransportConfiguration(connectorFactoryClassName, params);
-
+
+ TransportConfiguration connectorConfig = new TransportConfiguration(connectorFactoryClassName, params);
+
TransportConfiguration backupConnectorConfig = null;
-
+
if (backupConnectorFactoryClassName != null)
{
backupConnectorConfig = new TransportConfiguration(backupConnectorFactoryClassName, backupParams);
}
-
+
String name = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
-
- jmsServerManager.createConnectionFactory(name, connectorConfig, backupConnectorConfig,
- pingPeriod, callTimeout, clientID, dupsOKBatchSize,
- consumerWindowSize, consumerMaxRate, producerWindowSize, producerMaxRate,
- blockOnAcknowledge, blockOnNonPersistentSend,
- blockOnPersistentSend, jndiBindings);
+
+ jmsServerManager.createConnectionFactory(name,
+ connectorConfig,
+ backupConnectorConfig,
+ pingPeriod,
+ callTimeout,
+ clientID,
+ dupsOKBatchSize,
+ consumerWindowSize,
+ consumerMaxRate,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend,
+ jndiBindings);
}
else if (node.getNodeName().equals(QUEUE_NODE_NAME))
{
@@ -419,14 +450,14 @@
else if (node.getNodeName().equals(QUEUE_NODE_NAME))
{
String queueName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
- //TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1413
- //jmsServerManager.destroyQueue(queueName);
+ // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1413
+ // jmsServerManager.destroyQueue(queueName);
}
else if (node.getNodeName().equals(TOPIC_NODE_NAME))
{
String topicName = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
- //TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1413
- //jmsServerManager.destroyTopic(topicName);
+ // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1413
+ // jmsServerManager.destroyTopic(topicName);
}
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-09-17 11:18:49 UTC (rev 4965)
@@ -1,23 +1,23 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
*/
package org.jboss.messaging.jms.server.impl;
@@ -37,7 +37,6 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.jms.JBossQueue;
@@ -56,8 +55,7 @@
*/
public class JMSServerManagerImpl implements JMSServerManager
{
- private static final Logger log = Logger
- .getLogger(JMSServerManagerImpl.class);
+ private static final Logger log = Logger.getLogger(JMSServerManagerImpl.class);
/**
* the initial context to bind to
@@ -81,9 +79,10 @@
private final JMSManagementService managementService;
public JMSServerManagerImpl(final MessagingServerControlMBean server,
- final PostOffice postOffice, final StorageManager storageManager,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final JMSManagementService managementService)
+ final PostOffice postOffice,
+ final StorageManager storageManager,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final JMSManagementService managementService)
{
this.messagingServer = server;
this.postOffice = postOffice;
@@ -97,7 +96,8 @@
try
{
initialContext = new InitialContext();
- } catch (NamingException e)
+ }
+ catch (NamingException e)
{
log.error("Unable to create Initial Context", e);
}
@@ -116,8 +116,7 @@
return messagingServer.getVersion();
}
- public boolean createQueue(final String queueName, final String jndiBinding)
- throws Exception
+ public boolean createQueue(final String queueName, final String jndiBinding) throws Exception
{
JBossQueue jBossQueue = new JBossQueue(queueName);
postOffice.addDestination(jBossQueue.getSimpleAddress(), true);
@@ -128,13 +127,16 @@
addToDestinationBindings(queueName, jndiBinding);
}
Binding binding = postOffice.getBinding(jBossQueue.getSimpleAddress());
- managementService.registerQueue(jBossQueue, binding.getQueue(),
- jndiBinding, postOffice, storageManager, queueSettingsRepository);
+ managementService.registerQueue(jBossQueue,
+ binding.getQueue(),
+ jndiBinding,
+ postOffice,
+ storageManager,
+ queueSettingsRepository);
return added;
}
- public boolean createTopic(final String topicName, final String jndiBinding)
- throws Exception
+ public boolean createTopic(final String topicName, final String jndiBinding) throws Exception
{
JBossTopic jBossTopic = new JBossTopic(topicName);
postOffice.addDestination(jBossTopic.getSimpleAddress(), true);
@@ -143,8 +145,7 @@
{
addToDestinationBindings(topicName, jndiBinding);
}
- managementService.registerTopic(jBossTopic, jndiBinding, postOffice,
- storageManager);
+ managementService.registerTopic(jBossTopic, jndiBinding, postOffice, storageManager);
return added;
}
@@ -161,10 +162,8 @@
}
destinations.remove(name);
managementService.unregisterQueue(name);
- postOffice.removeDestination(JBossQueue.createAddressFromName(name),
- false);
- messagingServer.destroyQueue(JBossQueue.createAddressFromName(name)
- .toString());
+ postOffice.removeDestination(JBossQueue.createAddressFromName(name), false);
+ messagingServer.destroyQueue(JBossQueue.createAddressFromName(name).toString());
return true;
}
@@ -182,8 +181,7 @@
}
destinations.remove(name);
managementService.unregisterTopic(name);
- postOffice.removeDestination(JBossTopic.createAddressFromName(name),
- false);
+ postOffice.removeDestination(JBossTopic.createAddressFromName(name), false);
return true;
}
@@ -191,23 +189,33 @@
public boolean createConnectionFactory(String name,
TransportConfiguration connectorConfig,
TransportConfiguration backupConnectorConfig,
- long pingPeriod, long callTimeout, String clientID,
- int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
- int producerWindowSize, int producerMaxRate,
+ long pingPeriod,
+ long callTimeout,
+ String clientID,
+ int dupsOKBatchSize,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int producerWindowSize,
+ int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend, String jndiBinding)
- throws Exception
+ boolean blockOnPersistentSend,
+ String jndiBinding) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
cf = new JBossConnectionFactory(connectorConfig,
backupConnectorConfig,
- pingPeriod, callTimeout,
- clientID, dupsOKBatchSize,
- consumerWindowSize, consumerMaxRate, producerWindowSize,
- producerMaxRate, blockOnAcknowledge,
+ pingPeriod,
+ callTimeout,
+ clientID,
+ dupsOKBatchSize,
+ consumerWindowSize,
+ consumerMaxRate,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
blockOnNonPersistentSend,
blockOnPersistentSend);
connectionFactories.put(name, cf);
@@ -232,25 +240,35 @@
public boolean createConnectionFactory(String name,
TransportConfiguration connectorConfig,
TransportConfiguration backupConnectorConfig,
- long pingPeriod, long callTimeout, String clientID,
- int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
- int producerWindowSize, int producerMaxRate,
+ long pingPeriod,
+ long callTimeout,
+ String clientID,
+ int dupsOKBatchSize,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int producerWindowSize,
+ int producerMaxRate,
boolean blockOnAcknowledge,
boolean blockOnNonPersistentSend,
- boolean blockOnPersistentSend, List<String> jndiBindings)
- throws Exception
+ boolean blockOnPersistentSend,
+ List<String> jndiBindings) throws Exception
{
JBossConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
cf = new JBossConnectionFactory(connectorConfig,
- backupConnectorConfig,
- pingPeriod, callTimeout,
- clientID, dupsOKBatchSize,
- consumerWindowSize, consumerMaxRate, producerWindowSize,
- producerMaxRate, blockOnAcknowledge,
- blockOnNonPersistentSend,
- blockOnPersistentSend);
+ backupConnectorConfig,
+ pingPeriod,
+ callTimeout,
+ clientID,
+ dupsOKBatchSize,
+ consumerWindowSize,
+ consumerMaxRate,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend);
}
for (String jndiBinding : jndiBindings)
{
@@ -295,8 +313,7 @@
// Private -------------------------------------------------------
- private boolean bindToJndi(final String jndiName, final Object objectToBind)
- throws NamingException
+ private boolean bindToJndi(final String jndiName, final Object objectToBind) throws NamingException
{
String parentContext;
String jndiNameInContext;
@@ -328,8 +345,7 @@
return true;
}
- private void addToDestinationBindings(final String destination,
- final String jndiBinding)
+ private void addToDestinationBindings(final String destination, final String jndiBinding)
{
if (destinations.get(destination) == null)
{
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicationTest.java 2008-09-17 08:45:41 UTC (rev 4964)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicationTest.java 2008-09-17 11:18:49 UTC (rev 4965)
@@ -1,408 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
-import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
-import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.util.SimpleString;
-
-public class ReplicationTest extends TestCase
-{
- private static final Logger log = Logger.getLogger(ReplicationTest.class);
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-
- private MessagingService liveService;
-
- private MessagingService backupService;
-
- private Map<String, Object> backupParams = new HashMap<String, Object>();
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testReplication() throws Exception
- {
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
-
- ClientSession session = sf.createSession(false, true, true, -1, false);
-
- session.createQueue(ADDRESS, ADDRESS, null, false, false);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- final int numMessages = 1000;
-
- log.info("starting");
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().putString("aardvarks");
- message.getBody().flip();
- producer.send(message);
- }
-
- log.info("Sent messages");
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive();
-
- // log.info("Got message " + message2);
-
- assertEquals("aardvarks", message2.getBody().getString());
- assertEquals(i, message2.getProperty(new SimpleString("count")));
-
- session.acknowledge();
- }
-
- log.info("done");
-
- ClientMessage message3 = consumer.receive(500);
-
- assertNull(message3);
-
- log.info("Got all messages");
-
- session.close();
- }
-
-
- public void testFailoverSameConnectionFactory() throws Exception
- {
- ClientSessionFactory sf =
- new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
- new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
-
- ClientSession session = sf.createSession(false, true, true, -1, false);
-
- session.createQueue(ADDRESS, ADDRESS, null, false, false);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- final int numMessages = 1000;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().putString("aardvarks");
- message.getBody().flip();
- producer.send(message);
- }
-
- RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
-
- //Simulate failure on connection
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- for (int i = 0; i < numMessages / 2; i++)
- {
- ClientMessage message2 = consumer.receive();
-
- assertEquals("aardvarks", message2.getBody().getString());
-
- assertEquals(i, message2.getProperty(new SimpleString("count")));
-
- session.acknowledge();
-
- //log.info("got message " + message2.getProperty(new SimpleString("blah")));
- }
-
- session.close();
-
- session = sf.createSession(false, true, true, -1, false);
-
- consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- for (int i = numMessages / 2 ; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive();
-
- assertEquals("aardvarks", message2.getBody().getString());
-
- assertEquals(i, message2.getProperty(new SimpleString("count")));
-
- session.acknowledge();
-
- //log.info("got message " + message2.getProperty(new SimpleString("blah")));
- }
-
- ClientMessage message3 = consumer.receive(500);
-
- session.close();
-
- assertNull(message3);
- }
-
- public void testFailoverChangeConnectionFactory() throws Exception
- {
- ClientSessionFactory sf =
- new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
- new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
-
- ClientSession session = sf.createSession(false, true, true, -1, false);
-
- session.createQueue(ADDRESS, ADDRESS, null, false, false);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- final int numMessages = 1000;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().putString("aardvarks");
- message.getBody().flip();
- producer.send(message);
- // log.info("sent " + i);
- }
-
- RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
-
- //Simulate failure on connection
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- for (int i = 0; i < numMessages / 2; i++)
- {
- ClientMessage message2 = consumer.receive();
-
- assertEquals("aardvarks", message2.getBody().getString());
-
- assertEquals(i, message2.getProperty(new SimpleString("count")));
-
- session.acknowledge();
-
- //log.info("got message " + message2.getProperty(new SimpleString("blah")));
- }
-
- session.close();
-
- sf =
- new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
-
- log.info("** creating new one");
-
- session = sf.createSession(false, true, true, -1, false);
-
- consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- for (int i = numMessages / 2 ; i < numMessages; i++)
- {
- ClientMessage message2 = consumer.receive();
-
- assertEquals("aardvarks", message2.getBody().getString());
-
- assertEquals(i, message2.getProperty(new SimpleString("count")));
-
- session.acknowledge();
-
- //log.info("got message " + message2.getProperty(new SimpleString("blah")));
- }
-
- ClientMessage message3 = consumer.receive(500);
-
- assertNull(message3);
-
- session.close();
- }
-
- public void testFailoverMultipleSessions() throws Exception
- {
- ClientSessionFactory sf =
- new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
- new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
-
- final int numSessions = 10;
-
- List<ClientSession> sessions = new ArrayList<ClientSession>();
-
- List<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
-
- for (int i = 0; i < numSessions; i++)
- {
- ClientSession sess = sf.createSession(false, true, true, -1, false);
-
- SimpleString queueName = new SimpleString("subscription" + i);
-
- sess.createQueue(ADDRESS, queueName, null, false, false);
-
- ClientConsumer consumer = sess.createConsumer(queueName);
-
- sess.start();
-
- sessions.add(sess);
-
- consumers.add(consumer);
- }
-
- log.info("Created consumers");
-
- ClientSession session = sf.createSession(false, true, true, -1, false);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().putString("aardvarks");
- message.getBody().flip();
- producer.send(message);
- // log.info("sent " + i);
- }
-
- RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
-
- //Simulate failure on connection
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
-
- for (int i = 0; i < numSessions; i++)
- {
- ClientConsumer cons = consumers.get(i);
-
- ClientSession sess = sessions.get(i);
-
- for (int j = 0; j < numMessages; j++)
- {
- ClientMessage message2 = cons.receive();
-
- assertEquals("aardvarks", message2.getBody().getString());
-
- //log.info("got message " + i + ":" + message2.getProperty(new SimpleString("count")));
-
- assertEquals(j, message2.getProperty(new SimpleString("count")));
-
- sess.acknowledge();
- }
- }
-
- session.close();
-
- for (int i = 0; i < numSessions; i++)
- {
- ClientSession sess = sessions.get(i);
-
- sess.close();
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- Configuration backupConf = new ConfigurationImpl();
- backupConf.setSecurityEnabled(false);
- backupConf.setPacketConfirmationBatchSize(1);
- backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
- backupConf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory", backupParams));
- backupConf.setBackup(true);
- backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
- backupService.start();
-
- Configuration liveConf = new ConfigurationImpl();
- liveConf.setSecurityEnabled(false);
- liveConf.setPacketConfirmationBatchSize(1);
- liveConf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
- liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory", backupParams));
- liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
- liveService.start();
- }
-
- protected void tearDown() throws Exception
- {
- assertEquals(0, ConnectionRegistryImpl.instance.size());
-
- assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
-
- backupService.stop();
-
- assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
-
- liveService.stop();
-
- assertEquals(0, InVMRegistry.instance.size());
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java 2008-09-17 11:18:49 UTC (rev 4965)
@@ -0,0 +1,714 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A SimpleAutomaticFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SimpleAutomaticFailoverTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private MessagingService liveService;
+
+ private MessagingService backupService;
+
+ private Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testReplication() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+ ClientSession session = sf.createSession(false, true, true, -1, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ // log.info("Got message " + message2);
+
+ assertEquals("aardvarks", message2.getBody().getString());
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ session.acknowledge();
+ }
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ session.close();
+ }
+
+ public void testFailoverSameConnectionFactory() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ ClientSession session = sf.createSession(false, true, true, -1, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ session.acknowledge();
+ }
+
+ session.close();
+
+ session = sf.createSession(false, true, true, -1, false);
+
+ consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = numMessages / 2; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ session.acknowledge();
+ }
+
+ ClientMessage message3 = consumer.receive(250);
+
+ session.close();
+
+ assertNull(message3);
+ }
+
+ public void testFailoverChangeConnectionFactory() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ ClientSession session = sf.createSession(false, true, true, -1, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ session.acknowledge();
+ }
+
+ session.close();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ session = sf.createSession(false, true, true, -1, false);
+
+ consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = numMessages / 2; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ session.acknowledge();
+ }
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ session.close();
+ }
+
+ public void testNoMessagesLeftAfterFailoverNewSession() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ ClientSession session = sf.createSession(false, true, true, -1, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ session.acknowledge();
+ }
+
+ session.close();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ session = sf.createSession(false, true, true, -1, false);
+
+ consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ session.close();
+ }
+
+ public void testFailureListenerCalledOnFailure() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ ClientSession session = sf.createSession(false, true, true, -1, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements FailureListener
+ {
+ public void connectionFailed(MessagingException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ conn.addFailureListener(new MyListener());
+
+ // Simulate failure on connection
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ session.acknowledge();
+ }
+
+ session.close();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ session = sf.createSession(false, true, true, -1, false);
+
+ consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage message3 = consumer.receive(250);
+
+ assertNull(message3);
+
+ session.close();
+ }
+
+ public void testFailoverMultipleSessions() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ final int numSessions = 10;
+
+ List<ClientSession> sessions = new ArrayList<ClientSession>();
+
+ List<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession sess = sf.createSession(false, true, true, -1, false);
+
+ SimpleString queueName = new SimpleString("subscription" + i);
+
+ sess.createQueue(ADDRESS, queueName, null, false, false);
+
+ ClientConsumer consumer = sess.createConsumer(queueName);
+
+ sess.start();
+
+ sessions.add(sess);
+
+ consumers.add(consumer);
+ }
+
+ ClientSession session = sf.createSession(false, true, true, -1, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientConsumer cons = consumers.get(i);
+
+ ClientSession sess = sessions.get(i);
+
+ for (int j = 0; j < numMessages; j++)
+ {
+ ClientMessage message2 = cons.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(j, message2.getProperty(new SimpleString("count")));
+
+ sess.acknowledge();
+ }
+
+ ClientMessage message3 = cons.receive(250);
+
+ assertNull(message3);
+ }
+
+ session.close();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ ClientSession sess = sessions.get(i);
+
+ sess.close();
+ }
+ }
+
+ public void testCantSetConnectorsAfterCreateSession() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ sf.setConnectorFactory(new InVMConnectorFactory());
+ sf.setTransportParams(new HashMap<String, Object>());
+ sf.setBackupConnectorFactory(new InVMConnectorFactory());
+ sf.setBackupTransportParams(new HashMap<String, Object>());
+
+ ClientSession sess = null;
+
+ try
+ {
+ sess = sf.createSession(false, true, true, -1, false);
+
+ try
+ {
+ sf.setConnectorFactory(new InVMConnectorFactory());
+
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ // Ok
+ }
+
+ try
+ {
+ sf.setTransportParams(new HashMap<String, Object>());
+
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ // Ok
+ }
+
+ try
+ {
+ sf.setBackupConnectorFactory(new InVMConnectorFactory());
+
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ // Ok
+ }
+
+ try
+ {
+ sf.setBackupTransportParams(new HashMap<String, Object>());
+
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ // Ok
+ }
+ }
+ finally
+ {
+ sess.close();
+ }
+
+ sf.setConnectorFactory(new InVMConnectorFactory());
+ sf.setTransportParams(new HashMap<String, Object>());
+ sf.setBackupConnectorFactory(new InVMConnectorFactory());
+ sf.setBackupTransportParams(new HashMap<String, Object>());
+ }
+
+ public void testFailureAfterFailover() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+
+ ClientSession session = sf.createSession(false, true, true, -1, false);
+
+ session.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().putString("aardvarks");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ // Consume half of them
+
+ for (int i = 0; i < numMessages / 2; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("aardvarks", message2.getBody().getString());
+
+ assertEquals(i, message2.getProperty(new SimpleString("count")));
+
+ session.acknowledge();
+ }
+
+ RemotingConnection conn2 = ((ClientSessionImpl)session).getConnection();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements FailureListener
+ {
+ public void connectionFailed(MessagingException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ conn2.addFailureListener(new MyListener());
+
+ assertFalse(conn == conn2);
+
+ conn2.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+
+ try
+ {
+ session.createQueue(new SimpleString("blah"), new SimpleString("blah"), null, false, false);
+
+ fail("Should throw exception");
+ }
+ catch (MessagingException me)
+ {
+ assertEquals(MessagingException.NOT_CONNECTED, me.getCode());
+ }
+
+ session.close();
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupConf.setPacketConfirmationBatchSize(10);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.setPacketConfirmationBatchSize(10);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+ liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+ liveService.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, ConnectionRegistryImpl.instance.size());
+
+ assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+ backupService.stop();
+
+ assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
More information about the jboss-cvs-commits
mailing list