[Jboss-cvs] JBoss Messaging SVN: r1277 - in branches/Branch_1_0/src/main/org/jboss/jms/client: container remoting
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Sep 11 20:32:37 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-09-11 20:32:35 -0400 (Mon, 11 Sep 2006)
New Revision: 1277
Modified:
branches/Branch_1_0/src/main/org/jboss/jms/client/container/AsfAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
Log:
cleaned some method interfaces and added tentative fix (currently commented out) for http://jira.jboss.org/jira/browse/JBMESSAGING-542
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/AsfAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/AsfAspect.java 2006-09-12 00:30:28 UTC (rev 1276)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/AsfAspect.java 2006-09-12 00:32:35 UTC (rev 1277)
@@ -162,8 +162,7 @@
if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
- MessageCallbackHandler.callOnMessage(holder.consumerDelegate, del,
- sessionListener, holder.consumerID, false,
+ MessageCallbackHandler.callOnMessage(del, sessionListener, holder.consumerID, false,
holder.msg, ackMode);
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-09-12 00:30:28 UTC (rev 1276)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-09-12 00:32:35 UTC (rev 1277)
@@ -137,7 +137,7 @@
{
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
- //We acknowledge immediately
+ // We acknowledge immediately
if (!state.isRecoverCalled())
{
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-09-12 00:30:28 UTC (rev 1276)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-09-12 00:32:35 UTC (rev 1277)
@@ -64,11 +64,10 @@
trace = log.isTraceEnabled();
}
- //Hardcoded for now
+ // Hardcoded for now
private static final int MAX_REDELIVERIES = 10;
- public static void callOnMessage(ConsumerDelegate cons,
- SessionDelegate sess,
+ public static void callOnMessage(SessionDelegate sess,
MessageListener listener,
int consumerID,
boolean isConnectionConsumer,
@@ -76,15 +75,19 @@
int ackMode)
throws JMSException
{
- preDeliver(sess, consumerID, m, isConnectionConsumer);
+ preDeliver(sess, consumerID, m, isConnectionConsumer);
int tries = 0;
while (true)
{
try
- {
- listener.onMessage(m);
+ {
+ if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+
+ listener.onMessage(m);
+
+ if (trace) { log.trace("listener's onMessage() finished"); }
break;
}
@@ -103,9 +106,9 @@
{
m.setJMSRedelivered(true);
- //TODO delivery count although optional should be global
- //so we need to send it back to the server
- //but this has performance hit so perhaps we just don't support it?
+ // TODO delivery count although optional should be global so we need to send it
+ // back to the server but this has performance hit so perhaps we just don't
+ // support it?
m.incDeliveryCount();
tries++;
@@ -130,7 +133,7 @@
}
}
- postDeliver(sess, consumerID, m, isConnectionConsumer);
+ postDeliver(sess, isConnectionConsumer);
}
protected static void preDeliver(SessionDelegate sess,
@@ -147,10 +150,7 @@
}
}
- protected static void postDeliver(SessionDelegate sess,
- int consumerID,
- MessageProxy m,
- boolean isConnectionConsumer)
+ protected static void postDeliver(SessionDelegate sess, boolean isConnectionConsumer)
throws JMSException
{
// If this is the callback-handler for a connection consumer we don't want to acknowledge or
@@ -203,15 +203,15 @@
}
this.bufferSize = bufferSize;
-
+
buffer = new LinkedList();
-
+
isConnectionConsumer = isCC;
this.ackMode = ackMode;
-
+
this.sessionDelegate = sess;
-
+
this.consumerDelegate = cons;
this.consumerID = consumerID;
@@ -302,13 +302,13 @@
if (receiverThread != null)
{
- //Wake up any receive() thread that might be waiting
+ // Wake up any receive() thread that might be waiting
mainLock.notify();
}
this.listener = null;
}
-
+
waitForOnMessageToComplete();
// Now we cancel anything left in the buffer. The reason we do this now is that otherwise the
@@ -342,14 +342,22 @@
private void waitForOnMessageToComplete()
{
- // Wait for any on message executions to complete
-
+ // Wait for any onMessage() executions to complete
+
+// if (Thread.currentThread().equals(sessionExecutor.getThread()))
+// {
+// // the current thread already closing this MessageCallbackHandler, so no need to register
+// // another Closer (see http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+// return;
+// }
+
Future result = new Future();
try
{
- this.sessionExecutor.execute(new Closer(result));
-
+ sessionExecutor.execute(new Closer(result));
+
+ if (trace) { log.trace("blocking wait for Closer execution"); }
result.getResult();
}
catch (InterruptedException e)
@@ -439,7 +447,7 @@
// message is acknowledged so it gets removed from the queue/subscription.
preDeliver(sessionDelegate, consumerID, m, isConnectionConsumer);
- postDeliver(sessionDelegate, consumerID, m, isConnectionConsumer);
+ postDeliver(sessionDelegate, isConnectionConsumer);
if (!m.getMessage().isExpired())
{
@@ -590,9 +598,9 @@
{
MessageProxy msg = (MessageProxy)iter.next();
- //if this is the handler for a connection consumer we don't want to set the session delegate
- //since this is only used for client acknowledgement which is illegal for a session
- //used for an MDB
+ // If this is the handler for a connection consumer we don't want to set the session
+ // delegate since this is only used for client acknowledgement which is illegal for a
+ // session used for an MDB
msg.setSessionDelegate(sessionDelegate, isConnectionConsumer);
msg.setReceived();
@@ -628,7 +636,7 @@
{
listenerRunning = true;
- if (trace) { log.trace(this + ": new ListenerRunner scheduled"); }
+ if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
this.queueRunner(new ListenerRunner());
}
@@ -654,6 +662,7 @@
public void run()
{
result.setResult(null);
+ if (trace) { log.trace("Closer finished run"); }
}
}
@@ -673,15 +682,16 @@
if (listener == null)
{
listenerRunning = false;
-
+ if (trace) { log.trace("no listener, returning"); }
return;
}
- //remove a message from the buffer
+ // remove a message from the buffer
if (buffer.isEmpty())
{
- listenerRunning = false;
+ listenerRunning = false;
+ if (trace) { log.trace("no messages in buffer, marking listener as not running"); }
}
else
{
@@ -697,6 +707,7 @@
if (!again)
{
listenerRunning = false;
+ if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
}
}
}
@@ -705,7 +716,7 @@
{
try
{
- callOnMessage(consumerDelegate, sessionDelegate, listener, consumerID, false, mp, ackMode);
+ callOnMessage(sessionDelegate, listener, consumerID, false, mp, ackMode);
}
catch (JMSException e)
{
@@ -715,14 +726,14 @@
if (again)
{
- //Queue it up again
+ // Queue it up again
queueRunner(this);
}
else
{
if (!serverSending)
{
- //Ask server for more messages
+ // Ask server for more messages
try
{
consumerDelegate.more();
More information about the jboss-cvs-commits
mailing list