[jboss-cvs] JBoss Messaging SVN: r1482 - branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 17 03:46:06 EDT 2006
Author: ron_sigal
Date: 2006-10-17 03:46:05 -0400 (Tue, 17 Oct 2006)
New Revision: 1482
Modified:
branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
Log:
JBMESSAGING-207: Uses standard Remoting callbacks.
Modified: branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-10-17 07:39:06 UTC (rev 1481)
+++ branches/Branch_HTTP_Experiment/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-10-17 07:46:05 UTC (rev 1482)
@@ -23,10 +23,14 @@
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
@@ -56,6 +60,9 @@
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.callback.CallbackListener;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -70,7 +77,7 @@
*
* $Id$
*/
-public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint
+public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint, CallbackListener
{
// Constants -----------------------------------------------------
@@ -123,6 +130,8 @@
private Map deliveries;
+ private Set pendingCallbacks;
+
// Constructors --------------------------------------------------
protected ServerConsumerEndpoint(int id, PagingFilteredQueue messageQueue, String queueName,
@@ -194,6 +203,8 @@
// prompt delivery
messageQueue.deliver(false);
+ this.pendingCallbacks = new HashSet();
+
log.debug(this + " constructed");
}
@@ -634,10 +645,13 @@
// Flush any messages waiting to be sent to the client.
this.executor.execute(new Deliverer());
- // Now wait for it to execute.
- Future result = new Future();
- this.executor.execute(new Waiter(result));
- result.getResult();
+ //Now wait for it to execute
+ synchronized (pendingCallbacks)
+ {
+ System.out.println("waiting on pendingCallbacks");
+ if (!pendingCallbacks.isEmpty())
+ pendingCallbacks.wait();
+ }
// Now we know any deliverer has delivered any outstanding messages to the client buffer.
}
@@ -760,20 +774,30 @@
// concurrently to the same consumer on different threads?
MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
- MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
-
- if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
-
- HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
-
- // For now we don't look at how many messages are accepted since they all will be.
- // The field is a placeholder for the future.
- if (result.clientIsFull())
+ Map metadata = new HashMap();
+ metadata.put(ServerInvokerCallbackHandler.CALLBACK_POSTPROCESS_LISTENER,
+ ServerConsumerEndpoint.this);
+ Callback callback = new Callback(mm);
+ callback.setReturnPayload(metadata);
+
+ synchronized (pendingCallbacks)
{
- // Stop the server sending any more messages to the client.
- // This is ok outside lock.
- clientConsumerFull = true;
+ pendingCallbacks.add(callback);
}
+
+ connection.getCallbackHandler().handleCallback(callback);
+ if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
+//
+// HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
+//
+// // For now we don't look at how many messages are accepted since they all will be.
+// // The field is a placeholder for the future.
+// if (result.clientIsFull())
+// {
+// // Stop the server sending any more messages to the client.
+// // This is ok outside lock.
+// clientConsumerFull = true;
+// }
}
catch(Throwable t)
{
@@ -870,4 +894,20 @@
}
}
+ /**
+ * To implement org.jboss.remoting.callback.CallbackListener
+ */
+ public void callbackSent(Callback callback)
+ {
+ System.out.println("entering callbackSent(): " + callback);
+ synchronized (pendingCallbacks)
+ {
+ if (!pendingCallbacks.remove(callback))
+ log.warn("callbackSent() received unknown Callback: " + callback);
+
+ if (pendingCallbacks.isEmpty())
+ pendingCallbacks.notify();
+ }
+ }
+
}
More information about the jboss-cvs-commits
mailing list