[jboss-cvs] JBoss Messaging SVN: r2431 - in trunk: src/main/org/jboss/jms/client/remoting and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Feb 25 17:38:43 EST 2007
Author: timfox
Date: 2007-02-25 17:38:43 -0500 (Sun, 25 Feb 2007)
New Revision: 2431
Modified:
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java
trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
Log:
Made change rate synchronous to avoid race
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2007-02-25 22:38:43 UTC (rev 2431)
@@ -136,7 +136,7 @@
{
RequestSupport req = new ConsumerChangeRateRequest(id, version, newRate);
- doInvokeOneway(client, req);
+ doInvoke(client, req);
}
/**
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-02-25 22:38:43 UTC (rev 2431)
@@ -240,6 +240,10 @@
public void handleMessage(final Object message) throws Exception
{
+ //TODO - we temporarily need to execute on a different thread to
+ //avoid a deadlock situation in failover
+ //where a message is sent then the valve is locked, and the message send cause a message
+ //delivery back to the same client which tries to ack but can't get through the valve
this.sessionExecutor.execute(
new Runnable() { public void run()
{
@@ -541,6 +545,10 @@
// TODO If we don't zap this buffer, we may be able to salvage some non-persistent messages
buffer.clear();
+
+ //need to reset toggle state
+ serverSending = true;
+
}
public long getLastDeliveryId()
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-02-25 22:38:43 UTC (rev 2431)
@@ -475,20 +475,15 @@
// No need to synchronize - clientAccepting is volatile.
- // Important note - this invocations can arrive in a different order to
- // which they were
- // sent - this is inherent in one way invocations where a client side
- // pool is used.
- // Therefore we just toggle the clientAccepting flag - if we actually
- // looked at the newRate
- // value we might end up turning off the consumer when it should be on
- // (since a off-on, arrives as on-off)
- // Toggling is safe, but when we start to look at the actual rate value
- // we will
- // have to be a bit cleverer
+ if (newRate > 0)
+ {
+ clientAccepting = true;
+ }
+ else
+ {
+ clientAccepting = false;
+ }
- clientAccepting = !clientAccepting;
-
if (clientAccepting)
{
promptDelivery();
Modified: trunk/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java 2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java 2007-02-25 22:38:43 UTC (rev 2431)
@@ -88,17 +88,20 @@
os.flush();
}
- public Object getPayload()
- {
- OnewayInvocation oi = new OnewayInvocation(this);
+ //Until we have NBIO transport this needs to be synchronous otherwise we can get out of sync
+ //due to earlier invocations overtaking later invocations
+
+// public Object getPayload()
+// {
+// OnewayInvocation oi = new OnewayInvocation(this);
+//
+// InvocationRequest request =
+// new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+// oi, ONE_WAY_METADATA, null, null);
+//
+// return request;
+// }
- InvocationRequest request =
- new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
- oi, ONE_WAY_METADATA, null, null);
-
- return request;
- }
-
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java 2007-02-25 22:22:39 UTC (rev 2430)
+++ trunk/tests/src/org/jboss/test/messaging/jms/FailoverTest.java 2007-02-25 22:38:43 UTC (rev 2431)
@@ -64,92 +64,92 @@
}
-// public void testSendReceive() throws Exception
-// {
-// Hashtable properties = new Hashtable();
-//
-// properties.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
-//
-// properties.put("java.naming.provider.url", "jnp://192.168.1.11:1199");
-//
-// properties.put("java.naming.factory.url", "org.jnp.interfaces");
-//
-// log.info("Creaing ic");
-//
-// InitialContext ic = new InitialContext(properties);
-//
-// log.info("************ REMOTE");
-//
-// Connection conn = null;
-//
-// try
-// {
-// log.info("Created ic");
-//
-// Queue queue = (Queue)ic.lookup("/queue/testDistributedQueue");
-//
-// log.info("Looked up queue");
-//
-// ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-//
-// log.info("Looked up cf");
-//
-// conn = cf.createConnection();
-//
-// Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons = sessCons.createConsumer(queue);
-//
-// MessageListener list = new MyListener();
-//
-// cons.setMessageListener(list);
-//
-// conn.start();
-//
-// MessageProducer prod = sessSend.createProducer(queue);
-//
-// prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-//
-// int count = 0;
-//
-// while (true)
+ public void testSendReceive() throws Exception
+ {
+ Hashtable properties = new Hashtable();
+
+ properties.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
+
+ properties.put("java.naming.provider.url", "jnp://192.168.1.11:1199");
+
+ properties.put("java.naming.factory.url", "org.jnp.interfaces");
+
+ log.info("Creaing ic");
+
+ InitialContext ic = new InitialContext(properties);
+
+ log.info("************ REMOTE");
+
+ Connection conn = null;
+
+ try
+ {
+ log.info("Created ic");
+
+ Queue queue = (Queue)ic.lookup("/queue/testDistributedQueue");
+
+ log.info("Looked up queue");
+
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ log.info("Looked up cf");
+
+ conn = cf.createConnection();
+
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessCons.createConsumer(queue);
+
+ MessageListener list = new MyListener();
+
+ cons.setMessageListener(list);
+
+ conn.start();
+
+ MessageProducer prod = sessSend.createProducer(queue);
+
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ int count = 0;
+
+ while (true)
+ {
+ TextMessage tm = sessSend.createTextMessage("message " + count);
+
+ prod.send(tm);
+
+ log.info("sent " + count);
+
+ count++;
+
+ //Thread.sleep(250);
+ }
+
+
+ }
+ catch (Exception e)
+ {
+ log.error("Failed", e);
+ throw e;
+ }
+ finally
+ {
+// if (conn != null)
// {
-// TextMessage tm = sessSend.createTextMessage("message " + count);
-//
-// prod.send(tm);
-//
-// log.info("sent " + count);
-//
-// count++;
-//
-// //Thread.sleep(250);
+// log.info("closing connetion");
+// try
+// {
+// conn.close();
+// }
+// catch (Exception ignore)
+// {
+// }
+// log.info("closed connection");
// }
-//
-//
-// }
-// catch (Exception e)
-// {
-// log.error("Failed", e);
-// throw e;
-// }
-// finally
-// {
-//// if (conn != null)
-//// {
-//// log.info("closing connetion");
-//// try
-//// {
-//// conn.close();
-//// }
-//// catch (Exception ignore)
-//// {
-//// }
-//// log.info("closed connection");
-//// }
-// }
-// }
+ }
+ }
class MyListener implements MessageListener
{
More information about the jboss-cvs-commits
mailing list