[jboss-cvs] JBoss Messaging SVN: r3682 - in trunk: src/main/org/jboss/jms/client/api and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 8 09:04:33 EST 2008
Author: timfox
Date: 2008-02-08 09:04:33 -0500 (Fri, 08 Feb 2008)
New Revision: 3682
Added:
trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerFlowTokenMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java
trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java
trunk/src/main/org/jboss/messaging/util/ConcurrentReaderHashSet.java
Modified:
trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/api/ClientSession.java
trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateConsumerResponseMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java
trunk/src/main/org/jboss/messaging/util/AbstractHashSet.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptor.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptorB.java
trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/TemporaryDestinationTest.java
Log:
Proper token based flow control and some tweaks
Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.client;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.jms.ConnectionConsumer;
import javax.jms.JMSException;
import javax.jms.ServerSessionPool;
@@ -28,8 +30,6 @@
import org.jboss.jms.destination.JBossDestination;
import org.jboss.messaging.util.Logger;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
/**
* This class implements javax.jms.ConnectionConsumer
*
@@ -80,7 +80,7 @@
private int id;
/** The thread id generator */
- private static SynchronizedInt threadId = new SynchronizedInt(0);
+ private static AtomicInteger threadId = new AtomicInteger(0);
private int maxDeliveries;
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -492,7 +492,7 @@
throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist");
}
- consumer = session.createConsumer(dest.getAddress(), coreFilterString, noLocal, false);
+ consumer = session.createConsumer(dest.getAddress(), coreFilterString, noLocal, false, false);
}
else
{
@@ -513,7 +513,7 @@
session.createQueue(dest.getAddress(), queueName, coreFilterString, false, false);
- consumer = session.createConsumer(queueName, null, noLocal, true);
+ consumer = session.createConsumer(queueName, null, noLocal, true, false);
}
else
{
@@ -575,7 +575,7 @@
}
}
- consumer = session.createConsumer(queueName, null, noLocal, false);
+ consumer = session.createConsumer(queueName, null, noLocal, false, false);
}
}
Modified: trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -24,8 +24,6 @@
void setMessageHandler(MessageHandler handler) throws MessagingException;
- String getQueueName();
-
void close() throws MessagingException;
boolean isClosed();
Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -33,8 +33,8 @@
SessionBindingQueryResponseMessage bindingQuery(String address) throws MessagingException;
- ClientConsumer createConsumer(String queueName, String filterString,
- boolean noLocal, boolean autoDeleteQueue) throws MessagingException;
+ ClientConsumer createConsumer(String queueName, String filterString, boolean noLocal,
+ boolean autoDeleteQueue, boolean direct) throws MessagingException;
ClientBrowser createBrowser(String queueName, String messageSelector) throws MessagingException;
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionFactoryImpl.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -25,7 +25,6 @@
import org.jboss.jms.client.api.ClientConnection;
import org.jboss.jms.client.api.ClientConnectionFactory;
-import org.jboss.jms.client.plugin.LoadBalancingFactory;
import org.jboss.jms.client.remoting.MessagingRemotingConnection;
import org.jboss.messaging.core.remoting.RemotingConfiguration;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.client.impl;
+import java.util.concurrent.ExecutorService;
+
import org.jboss.jms.client.api.MessageHandler;
import org.jboss.jms.client.remoting.MessagingRemotingConnection;
import org.jboss.messaging.core.Message;
@@ -28,14 +30,12 @@
import org.jboss.messaging.core.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -59,23 +59,35 @@
// -----------------------------------------------------------------------------------
private String id;
+
private ClientSessionInternal session;
- private int bufferSize;
- private PriorityLinkedList<DeliverMessage> buffer = new PriorityLinkedListImpl<DeliverMessage>(
- 10);
+
+ private PriorityLinkedList<DeliverMessage> buffer = new PriorityLinkedListImpl<DeliverMessage>(10);
+
private volatile Thread receiverThread;
+
private MessageHandler handler;
+
private volatile boolean closed;
+
private Object mainLock = new Object();
- private QueuedExecutor sessionExecutor;
+
+ private ExecutorService sessionExecutor;
+
private boolean listenerRunning;
- private int consumeCount;
+
private MessagingRemotingConnection remotingConnection;
- private String queueName;
+
private long ignoreDeliveryMark = -1;
-
- // FIXME - revisit closed and closing flags
-
+
+ private boolean direct;
+
+ private Thread onMessageThread;
+
+ private int tokensToSend;
+
+ private int tokenBatchSize;
+
// Static
// ---------------------------------------------------------------------------------------
@@ -83,15 +95,16 @@
// ---------------------------------------------------------------------------------
public ClientConsumerImpl(ClientSessionInternal session, String id,
- int bufferSize, QueuedExecutor sessionExecutor,
- MessagingRemotingConnection remotingConnection, String queueName)
+ ExecutorService sessionExecutor,
+ MessagingRemotingConnection remotingConnection,
+ boolean direct, int tokenBatchSize)
{
this.id = id;
this.session = session;
- this.bufferSize = bufferSize;
this.sessionExecutor = sessionExecutor;
this.remotingConnection = remotingConnection;
- this.queueName = queueName;
+ this.direct = direct;
+ this.tokenBatchSize = tokenBatchSize;
}
// ClientConsumer implementation
@@ -101,16 +114,11 @@
{
checkClosed();
- DeliverMessage m = null;
-
synchronized (mainLock)
{
- if (closed) { return null; }
-
if (handler != null)
{
- throw new MessagingException(
- MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
}
receiverThread = Thread.currentThread();
@@ -148,13 +156,13 @@
if (!closed && !buffer.isEmpty())
{
- m = buffer.removeFirst();
+ DeliverMessage m = buffer.removeFirst();
boolean expired = m.getMessage().isExpired();
session.delivered(m.getDeliveryID(), expired);
- checkSendChangeRate();
+ flowControl();
if (expired)
{
@@ -198,47 +206,40 @@
public void setMessageHandler(MessageHandler handler) throws MessagingException
{
checkClosed();
-
- synchronized (mainLock)
+
+ if (receiverThread != null)
{
- if (receiverThread != null) { throw new MessagingException(
- MessagingException.ILLEGAL_STATE,
- "Cannot set MessageHandler - consumer is in receive(...)"); }
+ throw new MessagingException(MessagingException.ILLEGAL_STATE,"Cannot set MessageHandler - consumer is in receive(...)");
+ }
+ synchronized (mainLock)
+ {
this.handler = handler;
if (handler != null && !buffer.isEmpty())
{
listenerRunning = true;
- this.queueRunner(new ListenerRunner());
+ queueRunner();
}
}
}
- public String getQueueName()
+ public void close() throws MessagingException
{
- return queueName;
- }
+ if (closed)
+ {
+ return;
+ }
- public synchronized void close() throws MessagingException
- {
- if (closed) { return; }
-
try
{
- // Important! We set the handler to null so the next ListenerRunner
- // won't run
- if (handler != null)
- {
- setMessageHandler(null);
- }
+ // We set the handler to null so the next ListenerRunner won't run
+ handler = null;
// Now we wait for any current handler runners to run.
waitForOnMessageToComplete();
- // TODO sort out these close and closing flags
-
synchronized (mainLock)
{
closed = true;
@@ -248,25 +249,16 @@
// Wake up any receive() thread that might be waiting
mainLock.notify();
}
-
- this.handler = null;
}
remotingConnection.send(id, new CloseMessage());
PacketDispatcher.client.unregister(id);
-
- if (trace)
- {
- log.trace(this + " closed");
- }
-
}
finally
{
session.removeConsumer(this);
}
-
}
public boolean isClosed()
@@ -282,13 +274,13 @@
return id;
}
- public void changeRate(float newRate) throws MessagingException
- {
- checkClosed();
+// public void changeRate(float newRate) throws MessagingException
+// {
+// checkClosed();
+//
+// remotingConnection.send(id, new ConsumerFlowTokenMessage(newRate), true);
+// }
- remotingConnection.send(id, new ConsumerChangeRateMessage(newRate), true);
- }
-
public void handleMessage(final DeliverMessage message) throws Exception
{
synchronized (mainLock)
@@ -324,13 +316,34 @@
buffer.addLast(message, coreMessage.getPriority());
- if (trace)
+ if (receiverThread != null)
{
- log.trace(this + " added message(s) to the buffer are now "
- + buffer.size() + " messages");
+ mainLock.notify();
}
+ else if (handler != null)
+ {
+ if (direct)
+ {
+ //Dispatch it directly on remoting thread
+
+ boolean expired = message.getMessage().isExpired();
- messageAdded();
+ session.delivered(message.getDeliveryID(), expired);
+
+ flowControl();
+
+ if (!expired)
+ {
+ handler.onMessage(message.getMessage());
+ }
+ }
+ else if (!listenerRunning)
+ {
+ listenerRunning = true;
+
+ queueRunner();
+ }
+ }
}
}
@@ -356,118 +369,107 @@
// Private
// --------------------------------------------------------------------------------------
- private void checkSendChangeRate() throws MessagingException
+ private void flowControl() throws MessagingException
{
- consumeCount++;
-
- if (consumeCount == bufferSize)
+ if (tokenBatchSize > 0)
{
- consumeCount = 0;
-
- changeRate(1.0f);
+ tokensToSend++;
+
+ if (tokensToSend == tokenBatchSize)
+ {
+ tokensToSend = 0;
+
+ remotingConnection.send(id, new ConsumerFlowTokenMessage(tokenBatchSize), true);
+ }
}
}
-
+
private void waitForOnMessageToComplete()
{
// Wait for any onMessage() executions to complete
- if (Thread.currentThread().equals(sessionExecutor.getThread()))
+ if (Thread.currentThread() == onMessageThread)
{
- // the current thread already closing this ClientConsumer (this happens
- // when the
- // session is closed from within the MessageListener.onMessage(), for
- // example), so no need
- // to register another Closer (see
- // http://jira.jboss.org/jira/browse/JBMESSAGING-542)
+ // If called from inside onMessage then return immediately - otherwise would block forever
return;
}
Future result = new Future();
- try
- {
- sessionExecutor.execute(new Closer(result));
+ sessionExecutor.execute(new Closer(result));
- result.getResult();
- }
- catch (InterruptedException e)
- {
- }
+ result.getResult();
}
- private void queueRunner(ListenerRunner runner)
+ private void queueRunner()
{
- try
- {
- this.sessionExecutor.execute(runner);
- }
- catch (InterruptedException e)
- {
- }
+ sessionExecutor.execute(new ListenerRunner());
}
- private void messageAdded()
+ private void checkClosed() throws MessagingException
{
- boolean notified = false;
-
- if (trace)
+ if (closed)
{
- log.trace("Receiver thread:" + receiverThread + " handler:" + handler
- + " listenerRunning:" + listenerRunning + " sessionExecutor:"
- + sessionExecutor);
+ throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
}
-
- // If we have a thread waiting on receive() we notify it
- if (receiverThread != null)
+ }
+
+ private void onMessageLoop()
+ {
+ try
{
- if (trace)
- {
- log.trace(this + " notifying receiver/waiter thread");
- }
+ onMessageThread = Thread.currentThread();
- mainLock.notifyAll();
+ DeliverMessage msg = null;
- notified = true;
- }
- else if (handler != null)
- {
- // We have a message handler
- if (!listenerRunning)
+ MessageHandler theListener = null;
+
+ synchronized (mainLock)
{
- listenerRunning = true;
-
- if (trace)
+ if (handler == null || buffer.isEmpty())
{
- log.trace(this + " scheduled a new ListenerRunner");
+ listenerRunning = false;
+
+ return;
}
- this.queueRunner(new ListenerRunner());
+ theListener = handler;
+
+ msg = buffer.removeFirst();
}
- // TODO - Execute onMessage on same thread for even better throughput
- }
-
- // Make sure we notify any thread waiting for last delivery
- if (!notified)
- {
- if (trace)
+ if (msg != null)
{
- log.trace("Notifying");
+ boolean expired = msg.getMessage().isExpired();
+
+ session.delivered(msg.getDeliveryID(), expired);
+
+ flowControl();
+
+ if (!expired)
+ {
+ theListener.onMessage(msg.getMessage());
+ }
}
- mainLock.notifyAll();
+ synchronized (mainLock)
+ {
+ if (!buffer.isEmpty())
+ {
+ queueRunner();
+ }
+ else
+ {
+ listenerRunning = false;
+ }
+ }
}
+ catch (MessagingException e)
+ {
+ log.error("Failure in ListenerRunner", e);
+ }
}
-
-
-
- private void checkClosed() throws MessagingException
- {
- if (closed) { throw new MessagingException(
- MessagingException.OBJECT_CLOSED, "Consumer is closed"); }
- }
-
+
// Inner classes
// --------------------------------------------------------------------------------
@@ -489,107 +491,12 @@
result.setResult(null);
}
}
-
- /*
- * This class handles the execution of onMessage methods
- */
+
private class ListenerRunner implements Runnable
{
public void run()
{
- try
- {
- DeliverMessage msg = null;
-
- MessageHandler theListener = null;
-
- synchronized (mainLock)
- {
- if (handler == null || buffer.isEmpty())
- {
- listenerRunning = false;
-
- if (trace)
- {
- log.trace("no handler or buffer is empty, returning");
- }
-
- return;
- }
-
- theListener = handler;
-
- // remove a message from the buffer
-
- msg = buffer.removeFirst();
-
- checkSendChangeRate();
- }
-
- /*
- * Bug here is as follows: The next runner gets scheduled BEFORE the
- * on message is executed so if the onmessage fails on acking it
- * will be put on hold and failover will kick in, this will clear
- * the executor so the next queud one disappears at everything
- * grinds to a halt
- *
- * Solution - don't use a session executor - have a session thread
- * instead much nicer
- */
-
- if (msg != null)
- {
- boolean expired = msg.getMessage().isExpired();
-
- session.delivered(msg.getDeliveryID(), expired);
-
- if (!expired)
- {
- theListener.onMessage(msg.getMessage());
- }
- }
-
- synchronized (mainLock)
- {
- if (!buffer.isEmpty())
- {
- // Queue up the next runner to run
-
- if (trace)
- {
- log
- .trace("More messages in buffer so queueing next onMessage to run");
- }
-
- queueRunner(this);
-
- if (trace)
- {
- log.trace("Queued next onMessage to run");
- }
- }
- else
- {
- if (trace)
- {
- log
- .trace("no more messages in buffer, marking handler as not running");
- }
-
- listenerRunning = false;
- }
- }
-
- if (trace)
- {
- log.trace("Exiting run()");
- }
- }
- catch (MessagingException e)
- {
- log.error("Failure in ListenerRunner", e);
- }
+ onMessageLoop();
}
}
-
}
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerInternal.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerInternal.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,8 +21,6 @@
{
String getID();
- void changeRate(float newRate) throws MessagingException;
-
void handleMessage(DeliverMessage message) throws Exception;
void recover(long lastDeliveryID) throws MessagingException;
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -26,6 +26,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -39,9 +41,14 @@
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCommitMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
@@ -50,10 +57,6 @@
import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCommitMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
@@ -73,12 +76,9 @@
import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXASuspendMessage;
-import org.jboss.messaging.util.ClearableQueuedExecutor;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -117,8 +117,7 @@
private boolean deliveryExpired;
- // Executor used for executing onMessage methods
- private ClearableQueuedExecutor executor;
+ private ExecutorService executor;
private MessagingRemotingConnection remotingConnection;
@@ -144,7 +143,7 @@
this.remotingConnection = connection.getRemotingConnection();
- executor = new ClearableQueuedExecutor(new LinkedQueue());
+ executor = Executors.newSingleThreadExecutor();
this.lazyAckBatchSize = lazyAckBatchSize;
}
@@ -209,7 +208,7 @@
}
public ClientConsumer createConsumer(String queueName, String filterString, boolean noLocal,
- boolean autoDeleteQueue) throws MessagingException
+ boolean autoDeleteQueue, boolean direct) throws MessagingException
{
checkClosed();
@@ -218,17 +217,31 @@
SessionCreateConsumerResponseMessage response = (SessionCreateConsumerResponseMessage)remotingConnection.send(id, request);
+ int prefetchSize = response.getPrefetchSize();
+
ClientConsumerInternal consumer =
- new ClientConsumerImpl(this, response.getConsumerID(), response.getBufferSize(),
- executor, remotingConnection, queueName);
+ new ClientConsumerImpl(this, response.getConsumerID(),
+ executor, remotingConnection, direct, response.getPrefetchSize());
consumers.put(response.getConsumerID(), consumer);
PacketDispatcher.client.register(new ClientConsumerPacketHandler(consumer, response.getConsumerID()));
- //Now we have finished creating the client consumer, we can tell the SCD
- //we are ready
- consumer.changeRate(1);
+ if (prefetchSize > 0) // 0 ==> flow control is disabled
+ {
+ //Now give the server consumer some initial tokens (1.5 * prefetchSize)
+
+ int initialTokens = prefetchSize + prefetchSize >>> 1;
+
+ remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(initialTokens), true);
+ }
+ else
+ {
+ //FIXME
+ //FIXME - for now we need to send a flow control token to ensure the return packet sender gets set
+ //FIXME
+ remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(1), true);
+ }
return consumer;
}
@@ -676,6 +689,7 @@
}
SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(lastID, !broken);
+
remotingConnection.send(id, message, !block);
acked = true;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -70,7 +70,6 @@
// Attributes -----------------------------------------------------------------------------------
private String id;
- private boolean closed;
private ServerSessionEndpoint session;
private Queue destination;
private Filter filter;
@@ -102,40 +101,23 @@
public void reset() throws Exception
{
- if (closed)
- {
- throw new IllegalStateException("Browser is closed");
- }
-
- log.trace(this + " is being resetted");
-
iterator = createIterator();
}
public boolean hasNextMessage() throws Exception
{
- if (closed)
- {
- throw new IllegalStateException("Browser is closed");
- }
-
if (iterator == null)
{
iterator = createIterator();
}
boolean has = iterator.hasNext();
- if (trace) { log.trace(this + (has ? " has": " DOESN'T have") + " a next message"); }
+
return has;
}
public Message nextMessage() throws Exception
{
- if (closed)
- {
- throw new IllegalStateException("Browser is closed");
- }
-
if (iterator == null)
{
iterator = createIterator();
@@ -143,20 +125,11 @@
Message r = (Message)iterator.next();
- if (trace) { log.trace(this + " returning " + r); }
-
return r;
}
public Message[] nextMessageBlock(int maxMessages) throws Exception
{
- if (trace) { log.trace(this + " returning next message block of " + maxMessages); }
-
- if (closed)
- {
- throw new IllegalStateException("Browser is closed");
- }
-
if (maxMessages < 2)
{
throw new IllegalArgumentException("maxMessages must be >=2 otherwise use nextMessage");
@@ -184,8 +157,12 @@
public void close() throws Exception
{
- localClose();
+ iterator = null;
+
+ session.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
+
session.removeBrowser(id);
+
log.trace(this + " closed");
}
@@ -198,20 +175,6 @@
// Package protected ----------------------------------------------------------------------------
- void localClose() throws Exception
- {
- if (closed)
- {
- throw new IllegalStateException("Browser is already closed");
- }
-
- iterator = null;
-
- session.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
-
- closed = true;
- }
-
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -22,9 +22,9 @@
package org.jboss.jms.server.endpoint;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_CREATESESSION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_START;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_STOP;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_CREATESESSION;
import java.util.HashMap;
import java.util.HashSet;
@@ -75,8 +75,6 @@
private String id;
- private volatile boolean closed;
-
private volatile boolean started;
private String username;
@@ -147,8 +145,6 @@
sessions.put(sessionID, ep);
}
- messagingServer.addSession(sessionID, ep);
-
messagingServer.getRemotingService().getDispatcher().register(ep.newHandler());
return new ConnectionCreateSessionResponseMessage(sessionID);
@@ -156,38 +152,21 @@
public void start() throws Exception
{
- if (closed)
- {
- throw new IllegalStateException("Connection is closed");
- }
-
setStarted(true);
}
public synchronized void stop() throws Exception
{
- if (closed)
- {
- throw new IllegalStateException("Connection is closed");
- }
-
setStarted(false);
}
public void close() throws Exception
{
- if (closed)
- {
- log.warn("Connection is already closed");
- return;
- }
-
- //We clone to avoid deadlock http://jira.jboss.org/jira/browse/JBMESSAGING-836
Map<String, ServerSessionEndpoint> sessionsClone = new HashMap<String, ServerSessionEndpoint>(sessions);
- for(ServerSessionEndpoint session: sessionsClone.values())
+ for (ServerSessionEndpoint session: sessionsClone.values())
{
- session.localClose();
+ session.close();
}
sessions.clear();
@@ -213,8 +192,6 @@
cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
messagingServer.getRemotingService().getDispatcher().unregister(id);
-
- closed = true;
}
// Public ---------------------------------------------------------------------------------------
@@ -295,7 +272,6 @@
private void setStarted(boolean started) throws Exception
{
- //We clone to avoid deadlock http://jira.jboss.org/jira/browse/JBMESSAGING-836
Map<String, ServerSessionEndpoint> sessionsClone = null;
sessionsClone = new HashMap<String, ServerSessionEndpoint>(sessions);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,9 +21,11 @@
*/
package org.jboss.jms.server.endpoint;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_CHANGERATE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.jboss.messaging.core.Consumer;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.HandleStatus;
@@ -34,7 +36,7 @@
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
@@ -66,44 +68,33 @@
private boolean trace = log.isTraceEnabled();
- private String id;
+ private final String id;
- private Queue messageQueue;
+ private final Queue messageQueue;
- private ServerSessionEndpoint sessionEndpoint;
+ private final ServerSessionEndpoint sessionEndpoint;
- private boolean noLocal;
+ private final boolean noLocal;
- private Filter filter;
+ private final Filter filter;
private boolean started;
// This lock protects starting and stopping
- private Object startStopLock;
+ private final Object startStopLock;
- // Must be volatile
- private volatile boolean clientAccepting;
-
- private int prefetchSize;
+ private final AtomicInteger availableTokens = new AtomicInteger(0);
- private volatile int sendCount;
+ private final boolean autoDeleteQueue;
- private boolean firstTime = true;
-
- private boolean autoDeleteQueue;
+ private final boolean enableFlowControl;
// Constructors ---------------------------------------------------------------------------------
ServerConsumerEndpoint(MessagingServer sp, String id, Queue messageQueue,
ServerSessionEndpoint sessionEndpoint, Filter filter,
- boolean noLocal,
- int prefetchSize, boolean autoDeleteQueue)
+ boolean noLocal, boolean autoDeleteQueue, boolean enableFlowControl)
{
- if (trace)
- {
- log.trace("constructing consumer endpoint " + id);
- }
-
this.id = id;
this.messageQueue = messageQueue;
@@ -112,39 +103,30 @@
this.noLocal = noLocal;
- // Always start as false - wait for consumer to initiate.
- this.clientAccepting = false;
-
this.startStopLock = new Object();
- this.prefetchSize = prefetchSize;
-
this.filter = filter;
this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
this.autoDeleteQueue = autoDeleteQueue;
+ log.info("Enable flow control is " + enableFlowControl);
+
+ this.enableFlowControl = enableFlowControl;
+
// adding the consumer to the queue
messageQueue.addConsumer(this);
messageQueue.deliver();
-
- log.trace(this + " constructed");
}
// Receiver implementation ----------------------------------------------------------------------
public HandleStatus handle(MessageReference ref) throws Exception
{
- if (trace)
+ if (enableFlowControl && availableTokens.get() == 0)
{
- log.trace(this + " receives " + ref + " for delivery");
- }
-
- // This is ok to have outside lock - is volatile
- if (!clientAccepting)
- {
if (trace) { log.trace(this + " is NOT accepting messages!"); }
return HandleStatus.BUSY;
@@ -163,16 +145,12 @@
// queue for delivery later.
if (!started)
{
- if (trace) { log.trace(this + " NOT started"); }
-
return HandleStatus.BUSY;
}
- if (trace) { log.trace(this + " has startStopLock lock, preparing the message for delivery"); }
-
Message message = ref.getMessage();
- if (!accept(message))
+ if (filter != null && !filter.match(message))
{
return HandleStatus.NO_MATCH;
}
@@ -181,12 +159,8 @@
{
String conId = message.getConnectionID();
- if (trace) { log.trace("message connection id: " + conId + " current connection connection id: " + sessionEndpoint.getConnectionEndpoint().getConnectionID()); }
-
if (sessionEndpoint.getConnectionEndpoint().getConnectionID().equals(conId))
{
- if (trace) { log.trace("Message from local connection so rejecting"); }
-
PersistenceManager pm = sessionEndpoint.getConnectionEndpoint().getMessagingServer().getPersistenceManager();
ref.acknowledge(pm);
@@ -194,23 +168,11 @@
return HandleStatus.HANDLED;
}
}
-
- sendCount++;
-
- int num = prefetchSize;
-
- if (firstTime)
+
+ if (enableFlowControl)
{
- //We make sure we have a little extra buffer on the client side
- num = num + num / 3 ;
+ availableTokens.decrementAndGet();
}
-
- if (sendCount == num)
- {
- clientAccepting = false;
-
- firstTime = false;
- }
try
{
@@ -220,31 +182,13 @@
{
log.error("Failed to handle delivery", e);
- this.started = false; // DO NOT return null or the message might get delivered more than once
+ started = false; // DO NOT return null or the message might get delivered more than once
}
return HandleStatus.HANDLED;
}
}
- // Filter implementation ------------------------------------------------------------------------
-
- public boolean accept(Message msg)
- {
- if (filter != null)
- {
- boolean accept = filter.match(msg);
-
- if (trace) { log.trace("message filter " + (accept ? "accepts " : "DOES NOT accept ") + "the message"); }
-
- return accept;
- }
- else
- {
- return true;
- }
- }
-
// Closeable implementation ---------------------------------------------------------------------
public void close() throws Exception
@@ -254,38 +198,37 @@
log.trace(this + " close");
}
- stop();
+ setStarted(false);
- localClose();
-
- sessionEndpoint.removeConsumer(id);
-
+ messageQueue.removeConsumer(this);
+
+ sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
+
+ if (autoDeleteQueue)
+ {
+ if (messageQueue.getConsumerCount() == 0)
+ {
+ MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
+
+ server.getPostOffice().removeBinding(messageQueue.getName());
+
+ if (messageQueue.isDurable())
+ {
+ server.getPersistenceManager().deleteAllReferences(messageQueue);
+ }
+ }
+ }
+
+ sessionEndpoint.removeConsumer(id);
}
// ConsumerEndpoint implementation --------------------------------------------------------------
- public void changeRate(float newRate) throws Exception
+ public void receiveTokens(int tokens) throws Exception
{
- if (trace)
- {
- log.trace(this + " changing rate to " + newRate);
- }
+ availableTokens.addAndGet(tokens);
- if (newRate > 0)
- {
- sendCount = 0;
-
- clientAccepting = true;
- }
- else
- {
- clientAccepting = false;
- }
-
- if (clientAccepting)
- {
- promptDelivery();
- }
+ promptDelivery();
}
// Public ---------------------------------------------------------------------------------------
@@ -309,62 +252,45 @@
void setStarted(boolean started)
{
- //No need to lock since caller already has the lock
- this.started = started;
- }
-
- void localClose() throws Exception
- {
- if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
-
- messageQueue.removeConsumer(this);
+ boolean useStarted;
- sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
-
- if (autoDeleteQueue)
- {
- if (messageQueue.getConsumerCount() == 0)
- {
- MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
-
- server.getPostOffice().removeBinding(messageQueue.getName());
-
- if (messageQueue.isDurable())
- {
- server.getPersistenceManager().deleteAllReferences(messageQueue);
- }
- }
- }
- }
-
- void start()
- {
synchronized (startStopLock)
{
- if (started)
- {
- return;
- }
-
- started = true;
+ this.started = started;
+
+ useStarted = started;
}
-
- // Prompt delivery
- promptDelivery();
- }
-
- void stop() throws Exception
- {
- synchronized (startStopLock)
+
+ //Outside the lock
+ if (useStarted)
{
- if (!started)
- {
- return;
- }
-
- started = false;
+ promptDelivery();
}
}
+
+// void localClose() throws Exception
+// {
+// if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
+//
+// messageQueue.removeConsumer(this);
+//
+// sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
+//
+// if (autoDeleteQueue)
+// {
+// if (messageQueue.getConsumerCount() == 0)
+// {
+// MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
+//
+// server.getPostOffice().removeBinding(messageQueue.getName());
+//
+// if (messageQueue.isDurable())
+// {
+// server.getPersistenceManager().deleteAllReferences(messageQueue);
+// }
+// }
+// }
+// }
// Protected ------------------------------------------------------------------------------------
@@ -400,13 +326,13 @@
PacketType type = packet.getType();
- if (type == CONS_CHANGERATE)
+ if (type == CONS_FLOWTOKEN)
{
setReplier(sender);
- ConsumerChangeRateMessage message = (ConsumerChangeRateMessage) packet;
+ ConsumerFlowTokenMessage message = (ConsumerFlowTokenMessage) packet;
- changeRate(message.getRate());
+ receiveTokens(message.getTokens());
}
else if (type == CLOSE)
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -21,30 +21,30 @@
*/
package org.jboss.jms.server.endpoint;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CANCEL;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_PREPARE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_RESUME;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_ROLLBACK;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SET_TIMEOUT;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_START;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SUSPEND;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_PREPARE;
import java.util.ArrayList;
import java.util.HashMap;
@@ -53,6 +53,9 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -75,22 +78,22 @@
import org.jboss.messaging.core.impl.filter.FilterImpl;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.Packet;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
@@ -109,9 +112,6 @@
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
* Session implementation
*
@@ -131,8 +131,7 @@
// Constants
// ------------------------------------------------------------------------------------
- private static final Logger log = Logger
- .getLogger(ServerSessionEndpoint.class);
+ private static final Logger log = Logger.getLogger(ServerSessionEndpoint.class);
// Static
// ---------------------------------------------------------------------------------------
@@ -146,15 +145,13 @@
private String id;
- private volatile boolean closed;
-
private ServerConnectionEndpoint connectionEndpoint;
private MessagingServer sp;
- private Map<String, ServerConsumerEndpoint> consumers = new HashMap<String, ServerConsumerEndpoint>();
+ private Map<String, ServerConsumerEndpoint> consumers = new ConcurrentHashMap<String, ServerConsumerEndpoint>();
- private Map<String, ServerBrowserEndpoint> browsers = new HashMap<String, ServerBrowserEndpoint>();
+ private Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
private PostOffice postOffice;
@@ -162,8 +159,7 @@
private long deliveryIDSequence = 0;
- // Temporary until we have our own NIO transport
- QueuedExecutor executor = new QueuedExecutor(new LinkedQueue());
+ ExecutorService executor = Executors.newSingleThreadExecutor();
private Transaction tx;
@@ -173,8 +169,6 @@
private ResourceManager resourceManager;
- private boolean strict;
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -201,8 +195,6 @@
this.autoCommitAcks = autoCommitAcks;
this.resourceManager = resourceManager;
-
- strict = sp.getConfiguration().isStrictTck();
}
// Public
@@ -225,27 +217,15 @@
{
Queue expiryQueue = ref.getQueue().getExpiryQueue();
- if (trace)
- {
- log.trace(this + " detected expired message " + ref);
- }
-
if (expiryQueue != null)
{
- if (trace)
- {
- log.trace(this + " sending expired message to expiry queue "
- + expiryQueue);
- }
-
Message copy = makeCopyForDLQOrExpiry(true, ref);
moveInTransaction(copy, ref, expiryQueue, true);
}
else
{
- log.warn("No expiry queue has been configured so removing expired "
- + ref);
+ log.warn("No expiry queue has been configured so removing expired " + ref);
// TODO - tidy up these references - ugly
ref.acknowledge(this.getConnectionEndpoint().getMessagingServer()
@@ -257,82 +237,54 @@
void removeBrowser(String browserId) throws Exception
{
- synchronized (browsers)
+ if (browsers.remove(browserId) == null)
{
- if (browsers.remove(browserId) == null) { throw new IllegalStateException(
- "Cannot find browser with id " + browserId + " to remove"); }
- }
+ throw new IllegalStateException("Cannot find browser with id " + browserId + " to remove");
+ }
}
void removeConsumer(String consumerId) throws Exception
{
- synchronized (consumers)
+ if (consumers.remove(consumerId) == null)
{
- if (consumers.remove(consumerId) == null) { throw new IllegalStateException(
- "Cannot find consumer with id " + consumerId + " to remove"); }
- }
+ throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+ }
}
- void localClose() throws Exception
- {
- if (closed) { throw new IllegalStateException("Session is already closed"); }
+// void localClose() throws Exception
+// {
+// Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+//
+// for (ServerConsumerEndpoint consumer: consumersClone.values())
+// {
+// consumer.close();
+// }
+//
+// consumers.clear();
+//
+// Map<String, ServerBrowserEndpoint> browsersClone = new HashMap<String, ServerBrowserEndpoint>(browsers);
+//
+// for (ServerBrowserEndpoint browser: browsersClone.values())
+// {
+// browser.close();
+// }
+//
+// consumers.clear();
+//
+// browsers.clear();
+//
+// rollback();
+//
+// executor.shutdown();
+//
+// deliveries.clear();
+//
+// sp.removeSession(id);
+//
+// closed = true;
+// }
- if (trace) log.trace(this + " close()");
-
- // We clone to avoid deadlock
- // http://jira.jboss.org/jira/browse/JBMESSAGING-836
- Map consumersClone;
- synchronized (consumers)
- {
- consumersClone = new HashMap(consumers);
- }
-
- for (Iterator i = consumersClone.values().iterator(); i.hasNext();)
- {
- ((ServerConsumerEndpoint) i.next()).localClose();
- }
-
- consumers.clear();
-
- // We clone to avoid deadlock
- // http://jira.jboss.org/jira/browse/JBMESSAGING-836
- Map browsersClone;
- synchronized (browsers)
- {
- browsersClone = new HashMap(browsers);
- }
-
- for (Iterator i = browsersClone.values().iterator(); i.hasNext();)
- {
- ((ServerBrowserEndpoint) i.next()).localClose();
- }
-
- browsers.clear();
-
- rollback();
-
- // Close down the executor
-
- // Note we need to wait for ALL tasks to complete NOT just one otherwise
- // we can end up with the following situation
- // prompter is queued and starts to execute
- // prompter almost finishes executing then a message is cancelled due to
- // this session closing
- // this causes another prompter to be queued
- // shutdownAfterProcessingCurrentTask is then called
- // this means the second prompter never runs and the cancelled message
- // doesn't get redelivered
- executor.shutdownAfterProcessingCurrentlyQueuedTasks();
-
- deliveries.clear();
-
- sp.removeSession(id);
-
- closed = true;
- }
-
- synchronized void handleDelivery(MessageReference ref,
- ServerConsumerEndpoint consumer, PacketSender sender) throws Exception
+ synchronized void handleDelivery(MessageReference ref, ServerConsumerEndpoint consumer, PacketSender sender) throws Exception
{
// FIXME - we shouldn't have to pass in the packet Sender - this should be
// creatable
@@ -345,62 +297,56 @@
delivery.deliver();
}
- /**
- * Starts this session's Consumers
- */
void setStarted(boolean s) throws Exception
{
- // We clone to prevent deadlock
- // http://jira.jboss.org/jira/browse/JBMESSAGING-836
- Map consumersClone;
- synchronized (consumers)
+ Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+
+ for (ServerConsumerEndpoint consumer: consumersClone.values())
{
- consumersClone = new HashMap(consumers);
+ consumer.setStarted(s);
}
+ }
- for (Iterator i = consumersClone.values().iterator(); i.hasNext();)
+ void promptDelivery(final Queue queue)
+ {
+ // TODO - do we really need to prompt on a different thread?
+ executor.execute(new Runnable()
{
- ServerConsumerEndpoint sce = (ServerConsumerEndpoint) i.next();
- if (s)
+ public void run()
{
- sce.start();
+ queue.deliver();
}
- else
- {
- sce.stop();
- }
- }
+ });
}
- void promptDelivery(final Queue queue)
+ public void close() throws Exception
{
- if (trace)
+ Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+
+ for (ServerConsumerEndpoint consumer: consumersClone.values())
{
- log.trace("Prompting delivery on " + queue);
+ consumer.close();
}
- try
- {
- // TODO - do we really need to prompt on a different thread?
- this.executor.execute(new Runnable()
- {
- public void run()
- {
- queue.deliver();
- }
- });
+ consumers.clear();
- }
- catch (Throwable t)
+ Map<String, ServerBrowserEndpoint> browsersClone = new HashMap<String, ServerBrowserEndpoint>(browsers);
+
+ for (ServerBrowserEndpoint browser: browsersClone.values())
{
- log.error("Failed to prompt delivery", t);
+ browser.close();
}
- }
- private void close() throws Exception
- {
- localClose();
+ consumers.clear();
+ browsers.clear();
+
+ rollback();
+
+ executor.shutdown();
+
+ deliveries.clear();
+
connectionEndpoint.removeSession(id);
connectionEndpoint.getMessagingServer().getRemotingService()
@@ -1119,7 +1065,7 @@
}
ServerConsumerEndpoint ep = new ServerConsumerEndpoint(sp, consumerID,
- binding.getQueue(), this, filter, noLocal, prefetchSize, autoDeleteQueue);
+ binding.getQueue(), this, filter, noLocal, autoDeleteQueue, prefetchSize > 0);
connectionEndpoint.getMessagingServer().getRemotingService()
.getDispatcher().register(ep.newHandler());
@@ -1195,8 +1141,6 @@
private SessionCreateBrowserResponseMessage createBrowser(String queueName, String selector)
throws Exception
{
- if (closed) { throw new IllegalStateException("Session is closed"); }
-
Binding binding = postOffice.getBinding(queueName);
if (binding == null) { throw new MessagingException(
Modified: trunk/src/main/org/jboss/messaging/core/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServer.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServer.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -65,15 +65,7 @@
void setRemotingService(RemotingService remotingService);
RemotingService getRemotingService();
-
- ServerSessionEndpoint getSession(String sessionID);
-
- Collection getSessions();
-
- void addSession(String id, ServerSessionEndpoint session);
-
- void removeSession(String id);
-
+
SecurityStore getSecurityManager();
ConnectionManager getConnectionManager();
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -305,13 +305,7 @@
return false;
}
- long overtime = System.currentTimeMillis() - expiration;
-
- if (overtime >= 0)
- {
- return true;
- }
- return false;
+ return System.currentTimeMillis() - expiration >= 0;
}
public MessageReference createReference(Queue queue)
Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -23,11 +23,8 @@
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.destination.JBossDestination;
@@ -38,7 +35,6 @@
import org.jboss.jms.server.SecurityStore;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
import org.jboss.jms.server.endpoint.MessagingServerPacketHandler;
-import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.jms.server.plugin.NullUserManager;
import org.jboss.jms.server.plugin.contract.JMSUserManager;
import org.jboss.jms.server.security.NullAuthenticationManager;
@@ -98,12 +94,8 @@
private Version version;
- private boolean started;
+ private volatile boolean started;
- //private boolean supportsFailover = true;
-
- private Map<String, ServerSessionEndpoint> sessions;
-
// wired components
private SecurityMetadataStore securityStore;
@@ -140,8 +132,6 @@
version = Version.instance();
- sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
-
started = false;
}
@@ -311,29 +301,7 @@
return remotingService;
}
- public ServerSessionEndpoint getSession(String sessionID)
- {
- return (ServerSessionEndpoint) sessions.get(sessionID);
- }
-
- public Collection<ServerSessionEndpoint> getSessions()
- {
- return sessions.values();
- }
-
- public void addSession(String id, ServerSessionEndpoint session)
- {
- sessions.put(id, session);
- }
-
- public void removeSession(String id)
- {
- if (sessions.remove(id) == null)
- {
- throw new IllegalStateException("Cannot find session with id " + id + " to remove");
- }
- }
-
+
public void enableMessageCounters()
{
messageCounterManager.start();
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -1,65 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.codec;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_CHANGERATE;
-
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- */
-public class ConsumerChangeRateMessageCodec extends
- AbstractPacketCodec<ConsumerChangeRateMessage>
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ConsumerChangeRateMessageCodec()
- {
- super(CONS_CHANGERATE);
- }
-
- // Public --------------------------------------------------------
-
- // AbstractPacketCodec overrides ---------------------------------
-
- @Override
- protected void encodeBody(ConsumerChangeRateMessage message, RemotingBuffer out) throws Exception
- {
- out.putInt(FLOAT_LENGTH);
- out.putFloat(message.getRate());
- }
-
- @Override
- protected ConsumerChangeRateMessage decodeBody(RemotingBuffer in)
- throws Exception
- {
- int bodyLength = in.getInt();
- if (in.remaining() < bodyLength)
- {
- return null;
- }
-
- float rate = in.getFloat();
-
- return new ConsumerChangeRateMessage(rate);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private ----------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Copied: trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerFlowTokenMessageCodec.java (from rev 3674, trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerChangeRateMessageCodec.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerFlowTokenMessageCodec.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/ConsumerFlowTokenMessageCodec.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class ConsumerFlowTokenMessageCodec extends AbstractPacketCodec<ConsumerFlowTokenMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ConsumerFlowTokenMessageCodec()
+ {
+ super(CONS_FLOWTOKEN);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(ConsumerFlowTokenMessage message, RemotingBuffer out) throws Exception
+ {
+ out.putInt(INT_LENGTH);
+ out.putFloat(message.getTokens());
+ }
+
+ @Override
+ protected ConsumerFlowTokenMessage decodeBody(RemotingBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ return new ConsumerFlowTokenMessage(in.getInt());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private ----------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateConsumerResponseMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateConsumerResponseMessageCodec.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateConsumerResponseMessageCodec.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -38,13 +38,13 @@
RemotingBuffer out) throws Exception
{
String consumerID = response.getConsumerID();
- int bufferSize = response.getBufferSize();
+ int prefetchSize = response.getPrefetchSize();
int bodyLength = sizeof(consumerID) + INT_LENGTH;
out.putInt(bodyLength);
out.putNullableString(consumerID);
- out.putInt(bufferSize);
+ out.putInt(prefetchSize);
}
@Override
@@ -58,9 +58,9 @@
}
String consumerID = in.getNullableString();
- int bufferSize = in.getInt();
+ int prefetchSize = in.getInt();
- return new SessionCreateConsumerResponseMessage(consumerID, bufferSize);
+ return new SessionCreateConsumerResponseMessage(consumerID, prefetchSize);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -15,7 +15,7 @@
import org.jboss.messaging.core.remoting.codec.BytesPacketCodec;
import org.jboss.messaging.core.remoting.codec.ConnectionCreateSessionMessageCodec;
import org.jboss.messaging.core.remoting.codec.ConnectionCreateSessionResponseMessageCodec;
-import org.jboss.messaging.core.remoting.codec.ConsumerChangeRateMessageCodec;
+import org.jboss.messaging.core.remoting.codec.ConsumerFlowTokenMessageCodec;
import org.jboss.messaging.core.remoting.codec.CreateConnectionMessageCodec;
import org.jboss.messaging.core.remoting.codec.CreateConnectionResponseMessageCodec;
import org.jboss.messaging.core.remoting.codec.DeliverMessageCodec;
@@ -62,7 +62,7 @@
import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.ConnectionStartMessage;
import org.jboss.messaging.core.remoting.wireformat.ConnectionStopMessage;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
@@ -172,7 +172,7 @@
addCodecForEmptyPacket(PacketType.CONN_STOP,
ConnectionStopMessage.class);
- addCodec(ConsumerChangeRateMessage.class, ConsumerChangeRateMessageCodec.class);
+ addCodec(ConsumerFlowTokenMessage.class, ConsumerFlowTokenMessageCodec.class);
addCodec(DeliverMessage.class, DeliverMessageCodec.class);
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -1,55 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.wireformat;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_CHANGERATE;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- *
- * @version <tt>$Revision$</tt>
- */
-public class ConsumerChangeRateMessage extends AbstractPacket
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final float rate;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ConsumerChangeRateMessage(float rate)
- {
- super(CONS_CHANGERATE);
-
- this.rate = rate;
- }
-
- // Public --------------------------------------------------------
-
- public float getRate()
- {
- return rate;
- }
-
- @Override
- public String toString()
- {
- return getParentString() + ", rate=" + rate + "]";
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Copied: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java (from rev 3674, trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerChangeRateMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class ConsumerFlowTokenMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final int tokens;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ConsumerFlowTokenMessage(int tokens)
+ {
+ super(CONS_FLOWTOKEN);
+
+ this.tokens = tokens;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getTokens()
+ {
+ return tokens;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", tokens=" + tokens + "]";
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -80,7 +80,7 @@
SESS_XA_GET_TIMEOUT_RESP ((byte)82),
// Consumer
- CONS_CHANGERATE ((byte)90);
+ CONS_FLOWTOKEN ((byte)90);
private final byte type;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -23,20 +23,20 @@
// Attributes ----------------------------------------------------
private final String consumerID;
- private final int bufferSize;
+ private final int prefetchSize;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateConsumerResponseMessage(String consumerID, int bufferSize)
+ public SessionCreateConsumerResponseMessage(String consumerID, int prefetchSize)
{
super(SESS_CREATECONSUMER_RESP);
Assert.assertValidID(consumerID);
this.consumerID = consumerID;
- this.bufferSize = bufferSize;
+ this.prefetchSize = prefetchSize;
}
// Public --------------------------------------------------------
@@ -46,9 +46,9 @@
return consumerID;
}
- public int getBufferSize()
+ public int getPrefetchSize()
{
- return bufferSize;
+ return prefetchSize;
}
@Override
@@ -56,7 +56,7 @@
{
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", consumerID=" + consumerID);
- buf.append(", bufferSize=" + bufferSize);
+ buf.append(", prefetchSize=" + prefetchSize);
buf.append("]");
return buf.toString();
}
Modified: trunk/src/main/org/jboss/messaging/util/AbstractHashSet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/AbstractHashSet.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/util/AbstractHashSet.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -33,7 +33,7 @@
*/
public abstract class AbstractHashSet<Key> extends AbstractSet<Key>
{
- private Map theMap;
+ private Map<Key, Object> theMap;
private static Object dummy = new Object();
@@ -42,7 +42,7 @@
theMap = buildInternalHashMap();
}
- protected abstract Map buildInternalHashMap();
+ protected abstract Map<Key, Object> buildInternalHashMap();
public int size()
{
Deleted: trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/util/ClearableQueuedExecutor.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -1,73 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.util;
-
-import EDU.oswego.cs.dl.util.concurrent.Channel;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-/**
- * A ClearableQueuedExecutor
- *
- * This class extends the QueuedExector with a method to clear all but the currently
- * executing task without shutting it down.
- *
- * We need this functionality when failing over a session.
- *
- * In that case we need to clear all tasks apart from the currently executing one.
- *
- * We can't just shutdownAfterProcessingCurrentTask then use another instance
- * after failover since when failover resumes the current task and the next delivery
- * will be executed on different threads and smack into each other.
- *
- * http://jira.jboss.org/jira/browse/JBMESSAGING-904
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ClearableQueuedExecutor extends QueuedExecutor
-{
- public ClearableQueuedExecutor()
- {
- }
-
- public ClearableQueuedExecutor(Channel channel)
- {
- super(channel);
- }
-
- public void clearAllExceptCurrentTask()
- {
- try
- {
- while (queue_.poll(0) != null);
- }
- catch (InterruptedException ex)
- {
- Thread.currentThread().interrupt();
- }
- }
-
-}
-
Deleted: trunk/src/main/org/jboss/messaging/util/ConcurrentReaderHashSet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ConcurrentReaderHashSet.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/src/main/org/jboss/messaging/util/ConcurrentReaderHashSet.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -1,52 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.util;
-
-import java.util.Map;
-
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
-
-/**
- *
- * A ConcurrentReaderHashSet.
- *
- * Offers same concurrency as ConcurrentHashMap but for a Set
- *
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="clebert.suconic at jboss.com">Clebert Suconic</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class ConcurrentReaderHashSet<Key> extends AbstractHashSet<Key>
-{
- public ConcurrentReaderHashSet()
- {
- super();
- }
-
- protected Map buildInternalHashMap()
- {
- return new ConcurrentReaderHashMap();
- }
-
-}
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptor.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptor.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptor.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -7,21 +7,21 @@
package org.jboss.messaging.core.remoting.impl.integration;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
public class DummyInterceptor implements Interceptor
{
protected Logger log = Logger.getLogger(DummyInterceptor.class);
boolean sendException = false;
boolean changeMessage = false;
- SynchronizedInt syncCounter = new SynchronizedInt(0);
+ AtomicInteger syncCounter = new AtomicInteger(0);
public int getCounter()
{
@@ -36,7 +36,7 @@
public void intercept(Packet packet) throws MessagingException
{
log.info("DummyFilter packet = " + packet.getClass().getName());
- syncCounter.add(1);
+ syncCounter.addAndGet(1);
if (sendException)
{
throw new MessagingException(MessagingException.INTERNAL_ERROR);
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptorB.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptorB.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/integration/DummyInterceptorB.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -7,19 +7,19 @@
package org.jboss.messaging.core.remoting.impl.integration;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
public class DummyInterceptorB implements Interceptor
{
protected Logger log = Logger.getLogger(DummyInterceptorB.class);
- static SynchronizedInt syncCounter = new SynchronizedInt(0);
+ static AtomicInteger syncCounter = new AtomicInteger(0);
public static int getCounter()
{
@@ -33,7 +33,7 @@
public void intercept(Packet packet) throws MessagingException
{
- syncCounter.add(1);
+ syncCounter.addAndGet(1);
log.info("DummyFilter packet = " + packet);
}
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -23,7 +23,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ADD_ADDRESS;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BROWSER_RESET;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CANCEL;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_CHANGERATE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELIVER;
@@ -102,7 +102,7 @@
import org.jboss.messaging.core.remoting.codec.BytesPacketCodec;
import org.jboss.messaging.core.remoting.codec.CreateConnectionMessageCodec;
import org.jboss.messaging.core.remoting.codec.CreateConnectionResponseMessageCodec;
-import org.jboss.messaging.core.remoting.codec.ConsumerChangeRateMessageCodec;
+import org.jboss.messaging.core.remoting.codec.ConsumerFlowTokenMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateBrowserMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateBrowserResponseMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerMessageCodec;
@@ -149,7 +149,7 @@
import org.jboss.messaging.core.remoting.wireformat.SessionBrowserResetMessage;
import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
@@ -610,7 +610,7 @@
AbstractPacketCodec codec = new SessionCreateConsumerResponseMessageCodec();
SimpleRemotingBuffer buffer = encode(response, codec);
checkHeader(buffer, response);
- checkBody(buffer, response.getConsumerID(), response.getBufferSize());
+ checkBody(buffer, response.getConsumerID(), response.getPrefetchSize());
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -618,7 +618,7 @@
assertTrue(decodedPacket instanceof SessionCreateConsumerResponseMessage);
SessionCreateConsumerResponseMessage decodedResponse = (SessionCreateConsumerResponseMessage) decodedPacket;
assertEquals(SESS_CREATECONSUMER_RESP, decodedResponse.getType());
- assertEquals(response.getBufferSize(), decodedResponse.getBufferSize());
+ assertEquals(response.getPrefetchSize(), decodedResponse.getPrefetchSize());
}
public void testStartConnectionMessage() throws Exception
@@ -657,19 +657,19 @@
public void testChangeRateMessage() throws Exception
{
- ConsumerChangeRateMessage message = new ConsumerChangeRateMessage(0.63f);
- AbstractPacketCodec codec = new ConsumerChangeRateMessageCodec();
+ ConsumerFlowTokenMessage message = new ConsumerFlowTokenMessage(10);
+ AbstractPacketCodec codec = new ConsumerFlowTokenMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
- checkBody(buffer, message.getRate());
+ checkBody(buffer, message.getTokens());
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
- assertTrue(decodedPacket instanceof ConsumerChangeRateMessage);
- ConsumerChangeRateMessage decodedMessage = (ConsumerChangeRateMessage) decodedPacket;
- assertEquals(CONS_CHANGERATE, decodedMessage.getType());
- assertEquals(message.getRate(), decodedMessage.getRate());
+ assertTrue(decodedPacket instanceof ConsumerFlowTokenMessage);
+ ConsumerFlowTokenMessage decodedMessage = (ConsumerFlowTokenMessage) decodedPacket;
+ assertEquals(CONS_FLOWTOKEN, decodedMessage.getType());
+ assertEquals(message.getTokens(), decodedMessage.getTokens());
}
public void testDeliverMessage() throws Exception
Modified: trunk/tests/src/org/jboss/test/messaging/jms/TemporaryDestinationTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/TemporaryDestinationTest.java 2008-02-08 13:36:06 UTC (rev 3681)
+++ trunk/tests/src/org/jboss/test/messaging/jms/TemporaryDestinationTest.java 2008-02-08 14:04:33 UTC (rev 3682)
@@ -68,10 +68,8 @@
Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- log.info("** creating temp topic");
TemporaryTopic tempTopic = producerSession.createTemporaryTopic();
- log.info("** created temp topic");
-
+
MessageProducer producer = producerSession.createProducer(tempTopic);
MessageConsumer consumer = consumerSession.createConsumer(tempTopic);
More information about the jboss-cvs-commits
mailing list