[jboss-cvs] JBoss Messaging SVN: r1895 - in trunk: src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised tests/src/org/jboss/test/messaging/jms/clustering
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 5 00:56:17 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-01-05 00:56:09 -0500 (Fri, 05 Jan 2007)
New Revision: 1895
Modified:
trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/state/BrowserState.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-710
Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -25,7 +25,6 @@
import java.util.List;
import javax.jms.ConnectionConsumer;
-import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -78,9 +77,6 @@
private int consumerID;
- /** The destination this consumer will receive messages from */
- private Destination destination;
-
/** The ServerSessionPool that is implemented by the AS */
private ServerSessionPool serverSessionPool;
@@ -108,25 +104,13 @@
// Constructors --------------------------------------------------
- /**
- * JBossConnectionConsumer constructor
- *
- * @param conn the connection
- * @param dest destination
- * @param messageSelector the message selector
- * @param sessPool the server session pool
- * @param maxMessages the maxmimum messages
- * @exception JMSException for any error
- */
- public JBossConnectionConsumer(ConnectionDelegate conn, JBossDestination dest,
+ public JBossConnectionConsumer(ConnectionDelegate conn, JBossDestination dest,
String subName, String messageSelector,
ServerSessionPool sessPool, int maxMessages) throws JMSException
{
- trace = log.isTraceEnabled();
-
- this.destination = dest;
this.serverSessionPool = sessPool;
this.maxMessages = maxMessages;
+
if (this.maxMessages < 1)
{
this.maxMessages = 1;
@@ -138,12 +122,11 @@
{
tccc.set(getClass().getClassLoader());
- // Create a consumer.
- // The MessageCallbackhandler knows we are a connection consumer so will not
- // call pre or postDeliver so messages won't be acked, or stored in session/tx
+ // Create a consumer. The MessageCallbackhandler knows we are a connection consumer so will
+ // not call pre or postDeliver so messages won't be acked, or stored in session/tx.
sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false);
- cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true, -1);
+ cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true, null);
}
finally
{
@@ -159,7 +142,7 @@
this.maxDeliveries = state.getMaxDeliveries();
id = threadId.increment();
- internalThread = new Thread(this, "Connection Consumer for dest " + destination + " id=" + id);
+ internalThread = new Thread(this, "Connection Consumer for dest " + dest + " id=" + id);
internalThread.start();
if (trace) { log.trace(this + " created"); }
@@ -213,7 +196,7 @@
if (closed)
{
if (trace) { log.trace("Connection consumer is closed, breaking"); }
- break outer;
+ break;
}
if (mesList.isEmpty())
@@ -243,7 +226,7 @@
if (m == null)
{
if (trace) { log.trace("receiveNoWait did not retrieve any message"); }
- break inner;
+ break;
}
if (trace) { log.trace("receiveNoWait got message " + m + " adding to queue"); }
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -251,10 +251,10 @@
{
tccc.set(getClass().getClassLoader());
- ConsumerDelegate consumerDelegate = delegate.
- createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false, -1);
+ ConsumerDelegate cd = delegate.
+ createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false, null);
- return new JBossMessageConsumer(consumerDelegate);
+ return new JBossMessageConsumer(cd);
}
finally
{
@@ -304,10 +304,10 @@
{
tccc.set(getClass().getClassLoader());
- ConsumerDelegate consumerDelegate =
- delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false, -1);
+ ConsumerDelegate cd =
+ delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false, null);
- return new JBossMessageConsumer(consumerDelegate);
+ return new JBossMessageConsumer(cd);
}
finally
{
@@ -338,9 +338,11 @@
{
messageSelector = null;
}
- ConsumerDelegate consumerDelegate =
- delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false, -1);
- return new JBossMessageConsumer(consumerDelegate);
+
+ ConsumerDelegate cd = delegate.
+ createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false, null);
+
+ return new JBossMessageConsumer(cd);
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
@@ -374,7 +376,9 @@
{
tccc.set(getClass().getClassLoader());
- BrowserDelegate del = this.delegate.createBrowserDelegate((JBossQueue)queue, messageSelector);
+ BrowserDelegate del =
+ delegate.createBrowserDelegate((JBossQueue)queue, messageSelector, null);
+
return new JBossQueueBrowser(queue, messageSelector, del);
}
finally
Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -30,6 +30,7 @@
import org.jboss.jms.client.delegate.ClientProducerDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener;
import org.jboss.jms.client.state.BrowserState;
@@ -38,7 +39,6 @@
import org.jboss.jms.client.state.HierarchicalState;
import org.jboss.jms.client.state.ProducerState;
import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.destination.JBossDestination;
@@ -162,17 +162,14 @@
boolean connectionConsumer = ((Boolean)mi.getArguments()[4]).booleanValue();
int consumerID = consumerDelegate.getID();
-
int bufferSize = consumerDelegate.getBufferSize();
-
int maxDeliveries = consumerDelegate.getMaxDeliveries();
+ long channelID = consumerDelegate.getChannelID();
- long channelId = consumerDelegate.getChannelID();
-
ConsumerState consumerState =
new ConsumerState(sessionState, consumerDelegate, dest, selector, noLocal,
subscriptionName, consumerID, connectionConsumer, bufferSize,
- maxDeliveries, channelId);
+ maxDeliveries, channelID);
delegate.setState(consumerState);
return consumerDelegate;
@@ -190,7 +187,6 @@
MethodInvocation mi = (MethodInvocation)invocation;
Destination dest = ((Destination)mi.getArguments()[0]);
-
ProducerState producerState = new ProducerState(sessionState, producerDelegate, dest);
delegate.setState(producerState);
@@ -207,7 +203,7 @@
{
MethodInvocation mi = (MethodInvocation)invocation;
- BrowserDelegate browserDelegate = (BrowserDelegate)invocation.invokeNext();
+ ClientBrowserDelegate browserDelegate = (ClientBrowserDelegate)invocation.invokeNext();
DelegateSupport delegate = (DelegateSupport)browserDelegate;
delegate.init();
@@ -217,8 +213,11 @@
JBossDestination destination = (JBossDestination)mi.getArguments()[0];
String selector = (String)mi.getArguments()[1];
- BrowserState state = new BrowserState(sessionState, browserDelegate, destination, selector);
+ long channelID = browserDelegate.getChannelID();
+ BrowserState state =
+ new BrowserState(sessionState, browserDelegate, destination, selector, channelID);
+
delegate.setState(state);
return browserDelegate;
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientBrowserDelegate.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -40,27 +40,48 @@
*/
public class ClientBrowserDelegate extends DelegateSupport implements BrowserDelegate
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
private static final long serialVersionUID = 8293543769773757409L;
- // Attributes ----------------------------------------------------
+ // Attributes -----------------------------------------------------------------------------------
- // Static --------------------------------------------------------
+ private long channelID;
- // Constructors --------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- public ClientBrowserDelegate(int objectID)
+ // Constructors ---------------------------------------------------------------------------------
+
+ public ClientBrowserDelegate(int objectID, long channelID)
{
super(objectID);
+ this.channelID = channelID;
}
public ClientBrowserDelegate()
{
}
- // BrowserDelegate implementation --------------------------------
+ // DelegateSupport overrides --------------------------------------------------------------------
+ public void synchronizeWith(DelegateSupport nd) throws Exception
+ {
+ super.synchronizeWith(nd);
+
+ ClientBrowserDelegate newDelegate = (ClientBrowserDelegate)nd;
+
+ // synchronize server endpoint state
+
+ // synchronize (recursively) the client-side state
+
+ state.synchronizeWith(newDelegate.getState());
+
+ // synchronize the delegates
+
+ }
+
+ // BrowserDelegate implementation ---------------------------------------------------------------
+
/**
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
@@ -70,26 +91,46 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public void closing() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public boolean isClosed()
{
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public boolean hasNextMessage() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public Message nextMessage() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public Message[] nextMessageBlock(int maxMessages) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
@@ -131,19 +172,24 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public String getStackName()
{
return "BrowserStack";
}
+ public long getChannelID()
+ {
+ return channelID;
+ }
+
public String toString()
{
return "BrowserDelegate[" + id + "]";
}
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
protected Client getClient()
{
@@ -152,10 +198,10 @@
getInvokingClient();
}
- // Package Private -----------------------------------------------
+ // Package Private ------------------------------------------------------------------------------
- // Private -------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
- // Inner Classes -------------------------------------------------
+ // Inner Classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -184,8 +184,8 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
- throws JMSException
+ public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector,
+ Long failoverChannelID) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
@@ -207,7 +207,7 @@
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
boolean connectionConsumer,
- long failoverChannelId) throws JMSException
+ Long failoverChannelID) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: trunk/src/main/org/jboss/jms/client/state/BrowserState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/BrowserState.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/client/state/BrowserState.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -27,36 +27,47 @@
import org.jboss.jms.server.Version;
/**
- * State corresponding to a browser
- * This state is acessible inside aspects/interceptors
+ * State corresponding to a browser. This state is acessible inside aspects/interceptors.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
* @version <tt>$Revision$</tt>
*
* $Id$
*/
public class BrowserState extends HierarchicalStateSupport
{
+ // Constants ------------------------------------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
private SessionState parent;
private BrowserDelegate delegate;
- // Data used to recreate the Browser in case of failover
+ // data used to recreate the Browser in case of failover
private JBossDestination jmsDestination;
private String messageSelector;
- public BrowserState(SessionState parent, BrowserDelegate delegate, JBossDestination jmsDestination, String selector)
+ // Needed for failover
+ private long channelID;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public BrowserState(SessionState parent, BrowserDelegate delegate,
+ JBossDestination jmsDestination, String selector, long channelID)
{
super(parent, (DelegateSupport)delegate);
- this.jmsDestination=jmsDestination;
- this.messageSelector=selector;
+ this.jmsDestination = jmsDestination;
+ this.messageSelector = selector;
+ this.channelID = channelID;
}
- public void synchronizeWith(HierarchicalState newState) throws Exception {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
+ // HierarchicalState implementation -------------------------------------------------------------
+
public DelegateSupport getDelegate()
{
return (DelegateSupport)delegate;
@@ -66,11 +77,31 @@
this.delegate=(BrowserDelegate)delegate;
}
+ public void setParent(HierarchicalState parent)
+ {
+ this.parent = (SessionState)parent;
+ }
+
+ public HierarchicalState getParent()
+ {
+ return parent;
+ }
+
public Version getVersionToUse()
{
return parent.getVersionToUse();
}
+ // HierarchicalStateSupport overrides -----------------------------------------------------------
+
+ public void synchronizeWith(HierarchicalState ns) throws Exception
+ {
+ BrowserState newState = (BrowserState)ns;
+ channelID = newState.channelID;
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
public org.jboss.jms.destination.JBossDestination getJmsDestination()
{
return jmsDestination;
@@ -80,24 +111,19 @@
{
return messageSelector;
}
-
- public void setParent(HierarchicalState parent)
+
+ public long getChannelID()
{
- this.parent=(SessionState)parent;
+ return channelID;
}
-
- public HierarchicalState getParent()
- {
- return parent;
- }
-
- // When failing over a browser, we keep the old browser's state but there are certain fields
- // we need to update
- public void copyState(BrowserState newState)
- {
- //Actually only one field
- // I removed this due to http://jira.jboss.com/jira/browse/JBMESSAGING-686
- //this.delegate = newState.delegate;
- }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
}
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -191,7 +191,7 @@
consState.isNoLocal(),
consState.getSubscriptionName(),
consState.isConnectionConsumer(),
- consState.getChannelID());
+ new Long(consState.getChannelID()));
log.debug(this + " created new consumer " + newConsDelegate);
consDelegate.synchronizeWith(newConsDelegate);
@@ -212,7 +212,19 @@
}
else if (child instanceof BrowserState)
{
- handleFailoverOnBrowser((BrowserState)child, newDelegate);
+ BrowserState browserState = (BrowserState)child;
+ ClientBrowserDelegate browserDelegate =
+ (ClientBrowserDelegate)browserState.getDelegate();
+
+ // create a new browser over the new session for each browser on the old session
+ ClientBrowserDelegate newBrowserDelegate = (ClientBrowserDelegate)newDelegate.
+ createBrowserDelegate(browserState.getJmsDestination(),
+ browserState.getMessageSelector(),
+ new Long(browserState.getChannelID()));
+ log.debug(this + " created new browser " + newBrowserDelegate);
+
+ browserDelegate.synchronizeWith(newBrowserDelegate);
+ log.debug(this + " synchronized failover browser " + browserDelegate);
}
}
@@ -405,31 +417,7 @@
// Private --------------------------------------------------------------------------------------
- /**
- * TODO see http://jira.jboss.org/jira/browse/JBMESSAGING-710
- */
- private void handleFailoverOnBrowser(BrowserState failedBrowserState,
- ClientSessionDelegate failedSessionDelegate)
- throws Exception
- {
- ClientBrowserDelegate newBrowserDelegate = (ClientBrowserDelegate)failedSessionDelegate.
- createBrowserDelegate(failedBrowserState.getJmsDestination(),
- failedBrowserState.getMessageSelector());
-
- ClientBrowserDelegate failedBrowserDelegate =
- (ClientBrowserDelegate)failedBrowserState.getDelegate();
-
- failedBrowserDelegate.synchronizeWith(newBrowserDelegate);
- failedBrowserState.copyState((BrowserState)newBrowserDelegate.getState());
-
- log.debug("handling fail over on browserDelegate " + failedBrowserDelegate + " destination=" + failedBrowserState.getJmsDestination() + " selector=" + failedBrowserState.getMessageSelector());
-
- }
-
-
// Inner classes --------------------------------------------------------------------------------
-
-
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -46,32 +46,28 @@
*/
public class ServerBrowserEndpoint implements BrowserEndpoint
{
- // Constants -----------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerBrowserEndpoint.class);
- // Static --------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes ----------------------------------------------------
-
- private boolean trace = log.isTraceEnabled();
+ private static boolean trace = log.isTraceEnabled();
+ // Attributes -----------------------------------------------------------------------------------
+
+ private int id;
+ private boolean closed;
private Iterator iterator;
-
private ServerSessionEndpoint session;
-
- private int id;
-
- private boolean closed;
- // Constructors --------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
ServerBrowserEndpoint(ServerSessionEndpoint session, int id,
Channel destination, String messageSelector)
throws JMSException
{
this.session = session;
-
this.id = id;
Filter filter = null;
@@ -84,7 +80,7 @@
iterator = destination.browse(filter).iterator();
}
- // BrowserEndpoint implementation --------------------------------
+ // BrowserEndpoint implementation ---------------------------------------------------------------
public boolean hasNextMessage() throws JMSException
{
@@ -94,7 +90,10 @@
{
throw new IllegalStateException("Browser is closed");
}
- return iterator.hasNext();
+
+ boolean has = iterator.hasNext();
+ if (trace) { log.trace(this + (has ? " has": " DOESN'T have") + " a next message"); }
+ return has;
}
catch (Throwable t)
{
@@ -110,9 +109,10 @@
{
throw new IllegalStateException("Browser is closed");
}
+
Routable r = (Routable)iterator.next();
- if (trace) { log.trace("returning the message corresponding to " + r); }
+ if (trace) { log.trace(this + " returning " + r); }
return (Message)r.getMessage();
}
@@ -121,12 +121,12 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " nextMessage");
}
}
-
-
- //Is this the most efficient way to pass it back?
- //why not just pass back the arraylist??
+
public Message[] nextMessageBlock(int maxMessages) throws JMSException
{
+
+ if (trace) { log.trace(this + " returning next message block of " + maxMessages); }
+
try
{
if (closed)
@@ -164,8 +164,8 @@
try
{
localClose();
-
session.removeBrowser(id);
+ log.debug(this + " closed");
}
catch (Throwable t)
{
@@ -183,14 +183,14 @@
throw new IllegalStateException("isClosed should never be handled on the server side");
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------
public String toString()
{
return "BrowserEndpoint[" + id + "]";
}
- // Package protected ---------------------------------------------
+ // Package protected ----------------------------------------------------------------------------
void localClose() throws JMSException
{
@@ -206,10 +206,10 @@
closed = true;
}
- // Protected -----------------------------------------------------
+ // Protected ------------------------------------------------------------------------------------
- // Private -------------------------------------------------------
+ // Private --------------------------------------------------------------------------------------
- // Inner classes -------------------------------------------------
+ // Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -38,8 +38,8 @@
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
-import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.destination.JBossDestination;
@@ -54,8 +54,8 @@
import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.jms.server.destination.ManagedQueue;
import org.jboss.jms.server.destination.ManagedTopic;
+import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
import org.jboss.jms.server.endpoint.advised.BrowserAdvised;
-import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.util.ExceptionUtil;
import org.jboss.jms.util.MessageQueueNameHelper;
@@ -72,7 +72,6 @@
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
-import org.jboss.messaging.core.plugin.postoffice.cluster.RemoteQueueStub;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TransactionRepository;
@@ -186,19 +185,19 @@
boolean noLocal,
String subscriptionName,
boolean isCC,
- long failoverChannelID) throws JMSException
+ Long failoverChannelID) throws JMSException
{
try
{
- if (failoverChannelID == -1)
+ if (failoverChannelID == null)
{
- // Standard createConsumerDelegate
+ // regular consumer
return createConsumerDelegateInternal(jmsDestination, selectorString,
noLocal, subscriptionName);
}
else
{
- // Failover of consumer
+ // failover consumer
return createFailoverConsumerDelegateInternal(jmsDestination, selectorString,
noLocal, subscriptionName,
failoverChannelID);
@@ -210,51 +209,23 @@
}
}
- public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination, String messageSelector)
- throws JMSException
+ public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination,
+ String messageSelector,
+ Long failoverChannelID) throws JMSException
{
try
{
- if (closed)
- {
- throw new IllegalStateException("Session is closed");
- }
-
- if (jmsDestination == null)
- {
- throw new InvalidDestinationException("null destination");
- }
-
- if (jmsDestination.isTopic())
+ if (failoverChannelID == null)
{
- throw new IllegalStateException("Cannot browse a topic");
+ // regular browser
+ return createBrowserDelegateInternal(jmsDestination, messageSelector);
}
-
- if (dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue()) == null)
+ else
{
- throw new InvalidDestinationException("No such destination: " + jmsDestination);
+ // failover browser
+ return createFailoverBrowserDelegateInternal(jmsDestination, messageSelector,
+ failoverChannelID);
}
-
- Binding binding = postOffice.getBindingForQueueName(jmsDestination.getName()); // todo
-
- int browserID = connectionEndpoint.getServerPeer().getNextObjectID();
-
- ServerBrowserEndpoint ep =
- new ServerBrowserEndpoint(this, browserID, (PagingFilteredQueue)binding.getQueue(), messageSelector);
-
- //Still need to synchronized since close() can come in on a different thread
- synchronized (browsers)
- {
- browsers.put(new Integer(browserID), ep);
- }
-
- JMSDispatcher.instance.registerTarget(new Integer(browserID), new BrowserAdvised(ep));
-
- ClientBrowserDelegate stub = new ClientBrowserDelegate(browserID);
-
- log.debug(this + " created and registered " + ep);
-
- return stub;
}
catch (Throwable t)
{
@@ -1054,11 +1025,11 @@
String selectorString,
boolean noLocal,
String subscriptionName,
- long oldChannelID)
+ Long ocid)
throws Exception
{
log.debug(this + " creating FAILOVER consumer for failed channel " +
- oldChannelID + " for " + jmsDestination +
+ ocid + " for " + jmsDestination +
(selectorString == null ? "" : ", selector '" + selectorString + "'") +
(subscriptionName == null ? "" : ", subscription '" + subscriptionName + "'") +
(noLocal ? ", noLocal" : ""));
@@ -1069,6 +1040,7 @@
throw new IllegalStateException("Cannot failover on a non-clustered post office!");
}
+ long oldChannelID = ocid.longValue();
Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
if (binding == null)
@@ -1076,22 +1048,11 @@
throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
}
- if (trace)
- {
- long newChannelID;
+ Queue newQueue = binding.getQueue();
+ long newChannelID = newQueue.getChannelID();
- if (binding.getQueue() instanceof RemoteQueueStub)
- {
- newChannelID = ((RemoteQueueStub)binding.getQueue()).getChannelID();
- }
- else
- {
- newChannelID = ((PagingFilteredQueue)binding.getQueue()).getChannelID();
- }
+ if (trace) { log.trace(this + " failing over from channel " + oldChannelID + " to channel " + newChannelID); }
- log.trace(this + " failing over from channel " + oldChannelID + " to channel " + newChannelID);
- }
-
int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
int prefetchSize = connectionEndpoint.getPrefetchSize();
@@ -1110,15 +1071,13 @@
dest.getExpiryQueue() == null ? defaultExpiryQueue : dest.getExpiryQueue();
ServerConsumerEndpoint ep =
- new ServerConsumerEndpoint(consumerID, binding.getQueue(), binding.getQueue().getName(),
- this, selectorString, noLocal, jmsDestination, dlqToUse,
- expiryQueueToUse);
+ new ServerConsumerEndpoint(consumerID, newQueue, newQueue.getName(), this, selectorString,
+ noLocal, jmsDestination, dlqToUse, expiryQueueToUse);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
ClientConsumerDelegate stub =
- new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
- prefetchSize, maxDeliveryAttempts);
+ new ClientConsumerDelegate(consumerID, newChannelID, prefetchSize, maxDeliveryAttempts);
synchronized (consumers)
{
@@ -1296,7 +1255,9 @@
// Changing a durable subscriber is equivalent to unsubscribing (deleting) the old
// one and creating a new one.
- String filterString = binding.getQueue().getFilter() != null ? binding.getQueue().getFilter().getFilterString() : null;
+ String filterString =
+ binding.getQueue().getFilter() != null ?
+ binding.getQueue().getFilter().getFilterString() : null;
boolean selectorChanged =
(selectorString == null && filterString != null) ||
@@ -1405,7 +1366,95 @@
return stub;
}
-
+
+ private BrowserDelegate createFailoverBrowserDelegateInternal(JBossDestination jmsDestination,
+ String selector,
+ Long ocid) throws Throwable
+ {
+ log.debug(this + " creating FAILOVER browser for failed channel " + ocid + " for " +
+ jmsDestination + (selector == null ? "" : ", selector '" + selector + "'"));
+
+ if (postOffice.isLocal())
+ {
+ throw new IllegalStateException("Cannot failover on a non-clustered post office!");
+ }
+
+ long oldChannelID = ocid.longValue();
+
+ Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+ }
+
+ Channel newChannel = binding.getQueue();
+
+ if (trace) { log.trace(this + " failing over from channel " + oldChannelID + " to channel " + newChannel.getChannelID()); }
+
+ int browserID = connectionEndpoint.getServerPeer().getNextObjectID();
+
+ ServerBrowserEndpoint ep = new ServerBrowserEndpoint(this, browserID, newChannel, selector);
+ JMSDispatcher.instance.registerTarget(new Integer(browserID), new BrowserAdvised(ep));
+
+ // still need to synchronized since close() can come in on a different thread
+ synchronized (browsers)
+ {
+ browsers.put(new Integer(browserID), ep);
+ }
+ return new ClientBrowserDelegate(browserID, newChannel.getChannelID());
+ }
+
+ private BrowserDelegate createBrowserDelegateInternal(JBossDestination jmsDestination,
+ String selector) throws Throwable
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Session is closed");
+ }
+
+ if (jmsDestination == null)
+ {
+ throw new InvalidDestinationException("null destination");
+ }
+
+ if (jmsDestination.isTopic())
+ {
+ throw new IllegalStateException("Cannot browse a topic");
+ }
+
+ if (dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue()) == null)
+ {
+ throw new InvalidDestinationException("No such destination: " + jmsDestination);
+ }
+
+ log.debug(this + " creating browser for " + jmsDestination +
+ (selector == null ? "" : ", selector '" + selector + "'"));
+
+ Binding binding = postOffice.getBindingForQueueName(jmsDestination.getName()); // TODO
+
+ int browserID = connectionEndpoint.getServerPeer().getNextObjectID();
+
+ ServerBrowserEndpoint ep =
+ new ServerBrowserEndpoint(this, browserID,
+ (PagingFilteredQueue)binding.getQueue(), selector);
+
+ // still need to synchronized since close() can come in on a different thread
+ synchronized (browsers)
+ {
+ browsers.put(new Integer(browserID), ep);
+ }
+
+ JMSDispatcher.instance.registerTarget(new Integer(browserID), new BrowserAdvised(ep));
+
+ ClientBrowserDelegate stub =
+ new ClientBrowserDelegate(browserID, binding.getQueue().getChannelID());
+
+ log.debug(this + " created and registered " + ep);
+
+ return stub;
+ }
+
private void promptDelivery(Set channels)
{
//Now prompt delivery on the channels
Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -45,15 +45,23 @@
*/
public interface SessionEndpoint extends Closeable
{
+ /**
+ * @param failoverChannelID - the ID of the channel for which there is a failover process in
+ * progress. Null means regular (non-failover) consumer delegate creation.
+ */
ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
boolean connectionConsumer,
- long failoverChannelID) throws JMSException;
-
- BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
- throws JMSException;
+ Long failoverChannelID) throws JMSException;
/**
+ * @param failoverChannelID - the ID of the channel for which there is a failover process in
+ * progress. Null means regular (non-failover) browser delegate creation.
+ */
+ BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector,
+ Long failoverChannelID) throws JMSException;
+
+ /**
* Creates a queue identity given a Queue name. Does NOT create the physical queue. The physical
* creation of queues is an administrative task and is not to be initiated by the JMS API, with
* the exception of temporary queues.
@@ -118,10 +126,7 @@
void send(JBossMessage message) throws JMSException;
/**
- * Send delivery info to the server so the delivery lists can be repopulated
- * used at failover
- * @param ackInfos
- * @throws JMSException
+ * Send delivery info to the server so the delivery lists can be repopulated used at failover
*/
void recoverDeliveries(List createInfos) throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -82,15 +82,17 @@
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
- boolean connectionConsumer, long failoverChannelId) throws JMSException
+ boolean connectionConsumer,
+ Long failoverChannelID) throws JMSException
{
- return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer, failoverChannelId);
+ return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName,
+ connectionConsumer, failoverChannelID);
}
- public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
- throws JMSException
+ public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector,
+ Long failoverChannelID) throws JMSException
{
- return endpoint.createBrowserDelegate(queue, messageSelector);
+ return endpoint.createBrowserDelegate(queue, messageSelector, failoverChannelID);
}
public JBossQueue createQueue(String queueName) throws JMSException
@@ -147,7 +149,6 @@
{
return endpoint.isClosed();
}
-
// AdvisedSupport overrides --------------------------------------
@@ -161,7 +162,6 @@
return "SessionAdvised->" + endpoint;
}
-
// Public --------------------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-05 01:40:23 UTC (rev 1894)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-01-05 05:56:09 UTC (rev 1895)
@@ -30,6 +30,10 @@
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import java.util.Enumeration;
+import java.util.Set;
+import java.util.HashSet;
+
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @version <tt>$Revision$</tt>
@@ -398,7 +402,181 @@
}
}
+ public void testBrowserFailoverSendMessagesPreFailure() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ // skip connection to node 0
+ conn = cf.createConnection();
+ conn.close();
+
+ // create a connection to node 1
+ conn = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ QueueBrowser browser = session.createBrowser(queue[1]);
+
+ Enumeration en = browser.getEnumeration();
+ assertFalse(en.hasMoreElements());
+
+ // send one persistent and one non-persistent message
+
+ MessageProducer prod = session.createProducer(queue[1]);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod.send(session.createTextMessage("click"));
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.send(session.createTextMessage("clack"));
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ assertEquals(0, ((JBossConnection)conn).getServerID());
+
+ en = browser.getEnumeration();
+
+ // we expect to only be able to browse the persistent message
+ assertTrue(en.hasMoreElements());
+ TextMessage tm = (TextMessage)en.nextElement();
+ assertEquals("click", tm.getText());
+
+ assertFalse(en.hasMoreElements());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
/**
+ * TODO - Must double check if this is desired browser behavior - currently, once
+ * getEnumeration() was called once, all subsequent getEnumeration() calls return
+ * the same depleted iterator.
+ */
+ public void testBrowserFailoverSendMessagesPostFailure() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ // skip connection to node 0
+ conn = cf.createConnection();
+ conn.close();
+
+ // create a connection to node 1
+ conn = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ QueueBrowser browser = session.createBrowser(queue[1]);
+
+ Enumeration en = browser.getEnumeration();
+ assertFalse(en.hasMoreElements());
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ assertEquals(0, ((JBossConnection)conn).getServerID());
+
+ // send one persistent and one non-persistent message
+
+ MessageProducer prod = session.createProducer(queue[1]);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod.send(session.createTextMessage("click"));
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.send(session.createTextMessage("clack"));
+
+ en = browser.getEnumeration();
+
+ // we expect to be able to browse persistent and non-persistent messages
+ Set texts = new HashSet();
+
+ assertTrue(en.hasMoreElements());
+ TextMessage tm = (TextMessage)en.nextElement();
+ texts.add(tm.getText());
+
+ assertTrue(en.hasMoreElements());
+ tm = (TextMessage)en.nextElement();
+ texts.add(tm.getText());
+
+ assertFalse(en.hasMoreElements());
+
+ assertTrue(texts.contains("click"));
+ assertTrue(texts.contains("clack"));
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /**
* Sending one persistent message.
*/
public void testSessionWithOneTransactedPersistentMessageFailover() throws Exception
@@ -1070,126 +1248,126 @@
}
}
- public void testFailoverMessageOnServer2() throws Exception
- {
- Connection conn = null;
+// public void testFailoverMessageOnServer2() throws Exception
+// {
+// Connection conn = null;
+//
+// try
+// {
+// conn = cf.createConnection();
+// conn.close();
+//
+// conn = cf.createConnection();
+// conn.start();
+//
+// assertEquals(1, ((JBossConnection)conn).getServerID());
+//
+// SimpleFailoverListener listener = new SimpleFailoverListener();
+// ((JBossConnection)conn).registerFailoverListener(listener);
+//
+// Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer prod = session.createProducer(queue[1]);
+// prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+//
+// // send a message
+//
+// prod.send(session.createTextMessage("blip"));
+//
+// // kill node 1
+//
+// log.debug("killing node 1");
+//
+// ServerManagement.kill(1);
+//
+// log.info("########");
+// log.info("######## KILLED NODE 1");
+// log.info("########");
+//
+// // wait until the failure (not the completion of client-side failover) is detected
+//
+// assertEquals(FailoverEvent.FAILURE_DETECTED, listener.getEvent(60000).getType());
+//
+// // create a consumer the very next moment the failure is detected. This way, we also
+// // test the client-side failover valve
+//
+// MessageConsumer cons = session.createConsumer(queue[0]);
+//
+// // we must receive the message
+//
+// TextMessage tm = (TextMessage)cons.receive(60000);
+// assertEquals("blip", tm.getText());
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// conn.close();
+// }
+// }
+// }
+//
+// public void testSimpleFailover() throws Exception
+// {
+// Connection conn = null;
+//
+// try
+// {
+// conn = cf.createConnection();
+// conn.close();
+//
+// conn = cf.createConnection();
+// conn.start();
+//
+// // create a producer/consumer on node 1
+//
+// // make sure we're connecting to node 1
+//
+// int nodeID = ((ConnectionState)((DelegateSupport)((JBossConnection)conn).
+// getDelegate()).getState()).getServerID();
+//
+// assertEquals(1, nodeID);
+//
+// Session s1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer c1 = s1.createConsumer(queue[1]);
+// MessageProducer p1 = s1.createProducer(queue[1]);
+// p1.setDeliveryMode(DeliveryMode.PERSISTENT);
+//
+// // send a message
+//
+// p1.send(s1.createTextMessage("blip"));
+//
+// // kill node 1
+//
+//
+// ServerManagement.killAndWait(1);
+// log.info("########");
+// log.info("######## KILLED NODE 1");
+// log.info("########");
+//
+// try
+// {
+// ic[1].lookup("queue"); // looking up anything
+// fail("The server still alive, kill didn't work yet");
+// }
+// catch (Exception e)
+// {
+// }
+//
+// // we must receive the message
+//
+// TextMessage tm = (TextMessage)c1.receive(1000);
+// assertEquals("blip", tm.getText());
+//
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// conn.close();
+// }
+// }
+// }
- try
- {
- conn = cf.createConnection();
- conn.close();
-
- conn = cf.createConnection();
- conn.start();
-
- assertEquals(1, ((JBossConnection)conn).getServerID());
-
- SimpleFailoverListener listener = new SimpleFailoverListener();
- ((JBossConnection)conn).registerFailoverListener(listener);
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = session.createProducer(queue[1]);
- prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- // send a message
-
- prod.send(session.createTextMessage("blip"));
-
- // kill node 1
-
- log.debug("killing node 1");
-
- ServerManagement.kill(1);
-
- log.info("########");
- log.info("######## KILLED NODE 1");
- log.info("########");
-
- // wait until the failure (not the completion of client-side failover) is detected
-
- assertEquals(FailoverEvent.FAILURE_DETECTED, listener.getEvent(60000).getType());
-
- // create a consumer the very next moment the failure is detected. This way, we also
- // test the client-side failover valve
-
- MessageConsumer cons = session.createConsumer(queue[0]);
-
- // we must receive the message
-
- TextMessage tm = (TextMessage)cons.receive(60000);
- assertEquals("blip", tm.getText());
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- public void testSimpleFailover() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
- conn.close();
-
- conn = cf.createConnection();
- conn.start();
-
- // create a producer/consumer on node 1
-
- // make sure we're connecting to node 1
-
- int nodeID = ((ConnectionState)((DelegateSupport)((JBossConnection)conn).
- getDelegate()).getState()).getServerID();
-
- assertEquals(1, nodeID);
-
- Session s1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c1 = s1.createConsumer(queue[1]);
- MessageProducer p1 = s1.createProducer(queue[1]);
- p1.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- // send a message
-
- p1.send(s1.createTextMessage("blip"));
-
- // kill node 1
-
-
- ServerManagement.killAndWait(1);
- log.info("########");
- log.info("######## KILLED NODE 1");
- log.info("########");
-
- try
- {
- ic[1].lookup("queue"); // looking up anything
- fail("The server still alive, kill didn't work yet");
- }
- catch (Exception e)
- {
- }
-
- // we must receive the message
-
- TextMessage tm = (TextMessage)c1.receive(1000);
- assertEquals("blip", tm.getText());
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
More information about the jboss-cvs-commits
mailing list