[jboss-cvs] jboss-jms/src/main/org/jboss/jms/server/endpoint ...
Timothy Fox
tim.fox at jboss.com
Mon Jul 17 13:14:46 EDT 2006
User: timfox
Date: 06/07/17 13:14:46
Modified: src/main/org/jboss/jms/server/endpoint
ConnectionEndpoint.java ConsumerEndpoint.java
ServerConnectionEndpoint.java
ServerConnectionFactoryEndpoint.java
ServerConsumerEndpoint.java
ServerSessionEndpoint.java SessionEndpoint.java
Added: src/main/org/jboss/jms/server/endpoint
ClientDelivery.java
Log:
Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
Revision Changes Path
1.11 +3 -4 jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ConnectionEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -b -r1.10 -r1.11
--- ConnectionEndpoint.java 27 Jun 2006 19:44:39 -0000 1.10
+++ ConnectionEndpoint.java 17 Jul 2006 17:14:45 -0000 1.11
@@ -34,9 +34,9 @@
* The rest of the methods are handled in the advice stack.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.10 $</tt>
+ * @version <tt>$Revision: 1.11 $</tt>
*
- * $Id: ConnectionEndpoint.java,v 1.10 2006/06/27 19:44:39 timfox Exp $
+ * $Id: ConnectionEndpoint.java,v 1.11 2006/07/17 17:14:45 timfox Exp $
*/
public interface ConnectionEndpoint extends Closeable
{
@@ -55,6 +55,5 @@
void sendTransaction(TransactionRequest request) throws JMSException;
Xid[] getPreparedTransactions();
-
}
1.11 +9 -16 jboss-jms/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ConsumerEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -b -r1.10 -r1.11
--- ConsumerEndpoint.java 28 Mar 2006 14:26:16 -0000 1.10
+++ ConsumerEndpoint.java 17 Jul 2006 17:14:45 -0000 1.11
@@ -21,34 +21,27 @@
*/
package org.jboss.jms.server.endpoint;
-import java.util.List;
-
import javax.jms.JMSException;
import org.jboss.jms.client.Closeable;
-import org.jboss.jms.message.MessageProxy;
/**
* Represents the set of methods from the ConsumerDelegate that are handled on the server.
* The rest of the methods are handled in the advice stack.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.10 $</tt>
+ * @version <tt>$Revision: 1.11 $</tt>
*
- * $Id: ConsumerEndpoint.java,v 1.10 2006/03/28 14:26:16 timfox Exp $
+ * $Id: ConsumerEndpoint.java,v 1.11 2006/07/17 17:14:45 timfox Exp $
*/
public interface ConsumerEndpoint extends Closeable
{
- void cancelDelivery(long messageID) throws JMSException;
-
- void cancelDeliveries(List messageIDs) throws JMSException;
-
- MessageProxy getMessageNow(boolean wait) throws JMSException;
-
- void activate() throws JMSException;
-
/**
- * @return the last message ID delivered to the client consumer
+ * If the client buffer has previously become full because the server was sending at a faster rate than the
+ * client could consume, then the server will stop sending messages.
+ * When the client has emptied the buffer it then needs to inform the server that it can receive more messages
+ * by calling this method
+ * @throws JMSException
*/
- long deactivate() throws JMSException;
+ void more() throws JMSException;
}
1.45 +15 -9 jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerConnectionEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java,v
retrieving revision 1.44
retrieving revision 1.45
diff -u -b -r1.44 -r1.45
--- ServerConnectionEndpoint.java 24 Jun 2006 09:05:37 -0000 1.44
+++ ServerConnectionEndpoint.java 17 Jul 2006 17:14:46 -0000 1.45
@@ -70,9 +70,9 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.44 $</tt>
+ * @version <tt>$Revision: 1.45 $</tt>
*
- * $Id: ServerConnectionEndpoint.java,v 1.44 2006/06/24 09:05:37 timfox Exp $
+ * $Id: ServerConnectionEndpoint.java,v 1.45 2006/07/17 17:14:46 timfox Exp $
*/
public class ServerConnectionEndpoint implements ConnectionEndpoint
{
@@ -125,10 +125,12 @@
private byte usingVersion;
+ private int prefetchSize;
+
// Constructors --------------------------------------------------
protected ServerConnectionEndpoint(ServerPeer serverPeer, String clientID,
- String username, String password)
+ String username, String password, int prefetchSize)
{
this.serverPeer = serverPeer;
@@ -141,6 +143,7 @@
this.connectionID = serverPeer.getNextObjectID();
this.clientID = clientID;
+ this.prefetchSize = prefetchSize;
sessions = new ConcurrentReaderHashMap();
temporaryDestinations = new ConcurrentReaderHashSet();
@@ -410,8 +413,6 @@
{
if (trace) { log.trace("one phase rollback request received"); }
- // We just need to cancel deliveries
-
Transaction tx = null;
try
{
@@ -598,6 +599,11 @@
return usingVersion;
}
+ public int getPrefetchSize()
+ {
+ return prefetchSize;
+ }
+
public String toString()
{
return "ConnectionEndpoint[" + connectionID + "]";
@@ -805,7 +811,7 @@
// Private -------------------------------------------------------
- private void setStarted(boolean s)
+ private void setStarted(boolean s) throws JMSException
{
synchronized(sessions)
{
1.25 +12 -8 jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerConnectionFactoryEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -b -r1.24 -r1.25
--- ServerConnectionFactoryEndpoint.java 23 May 2006 18:25:08 -0000 1.24
+++ ServerConnectionFactoryEndpoint.java 17 Jul 2006 17:14:46 -0000 1.25
@@ -38,9 +38,9 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.24 $</tt>
+ * @version <tt>$Revision: 1.25 $</tt>
*
- * $Id: ServerConnectionFactoryEndpoint.java,v 1.24 2006/05/23 18:25:08 ovidiu Exp $
+ * $Id: ServerConnectionFactoryEndpoint.java,v 1.25 2006/07/17 17:14:46 timfox Exp $
*/
public class ServerConnectionFactoryEndpoint implements ConnectionFactoryEndpoint
{
@@ -52,13 +52,15 @@
// Attributes ----------------------------------------------------
- protected ServerPeer serverPeer;
+ private ServerPeer serverPeer;
- protected String clientID;
+ private String clientID;
- protected int id;
+ private int id;
- protected JNDIBindings jndiBindings;
+ private JNDIBindings jndiBindings;
+
+ private int prefetchSize;
// Constructors --------------------------------------------------
@@ -68,12 +70,14 @@
*/
public ServerConnectionFactoryEndpoint(int id, ServerPeer serverPeer,
String defaultClientID,
- JNDIBindings jndiBindings)
+ JNDIBindings jndiBindings,
+ int preFetchSize)
{
this.serverPeer = serverPeer;
this.clientID = defaultClientID;
this.id = id;
this.jndiBindings = jndiBindings;
+ this.prefetchSize = preFetchSize;
}
// ConnectionFactoryDelegate implementation ----------------------
@@ -101,7 +105,7 @@
// create the corresponding "server-side" connection endpoint and register it with the
// server peer's ClientManager
ServerConnectionEndpoint endpoint =
- new ServerConnectionEndpoint(serverPeer, clientID, username, password);
+ new ServerConnectionEndpoint(serverPeer, clientID, username, password, prefetchSize);
int connectionID = endpoint.getConnectionID();
1.44 +477 -349 jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerConsumerEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java,v
retrieving revision 1.43
retrieving revision 1.44
diff -u -b -r1.43 -r1.44
--- ServerConsumerEndpoint.java 26 Jun 2006 18:41:21 -0000 1.43
+++ ServerConsumerEndpoint.java 17 Jul 2006 17:14:46 -0000 1.44
@@ -32,12 +32,15 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import org.jboss.jms.client.remoting.HandleMessageResponse;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.selector.Selector;
-import org.jboss.jms.server.plugin.contract.ThreadPool;
+import org.jboss.jms.server.ConnectionManager;
+import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.jms.server.remoting.JMSDispatcher;
+import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.jms.server.subscription.Subscription;
import org.jboss.jms.util.MessagingJMSException;
import org.jboss.logging.Logger;
@@ -53,6 +56,10 @@
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TxCallback;
+import org.jboss.messaging.util.Future;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
* Concrete implementation of ConsumerEndpoint. Lives on the boundary between Messaging Core and the
@@ -60,9 +67,9 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.43 $</tt>
+ * @version <tt>$Revision: 1.44 $</tt>
*
- * $Id: ServerConsumerEndpoint.java,v 1.43 2006/06/26 18:41:21 timfox Exp $
+ * $Id: ServerConsumerEndpoint.java,v 1.44 2006/07/17 17:14:46 timfox Exp $
*/
public class ServerConsumerEndpoint implements Receiver, Filter, ConsumerEndpoint
{
@@ -90,37 +97,41 @@
private Selector messageSelector;
- private ThreadPool threadPoolDelegate;
+ private DeliveryCallback deliveryCallback;
- private volatile boolean started;
+ private JBossDestination destination;
- private boolean disconnected = false;
+ private List toDeliver;
- // deliveries must be maintained in order they were received
- private Map deliveries;
+ //Must be volatile
+ private volatile boolean clientConsumerFull;
- private boolean closed;
+ //Must be volatile
+ private volatile boolean bufferFull;
- private boolean active;
+ //No need to be volatile - is protected by lock
+ private boolean started;
- private boolean grabbing;
+ //No need to be volatile
+ private boolean closed;
- private MessageProxy toGrab;
+ //No need to be volatile
+ private boolean disconnected;
- private DeliveryCallback deliveryCallback;
+ private Executor executor;
- private boolean selectorRejected;
+ private int prefetchSize;
- private JBossDestination destination;
+ private Object lock;
- // We record the id of the last message delivered to the client consumer
- private long lastMessageIDDelivered = -1;
+ private Map deliveries;
// Constructors --------------------------------------------------
protected ServerConsumerEndpoint(int id, Channel channel,
ServerSessionEndpoint sessionEndpoint,
- String selector, boolean noLocal, JBossDestination dest)
+ String selector, boolean noLocal, JBossDestination dest,
+ int prefetchSize)
throws InvalidSelectorException
{
if (trace) { log.trace("creating consumer endpoint " + id); }
@@ -128,11 +139,42 @@
this.id = id;
this.channel = channel;
this.sessionEndpoint = sessionEndpoint;
- this.threadPoolDelegate =
- sessionEndpoint.getConnectionEndpoint().getServerPeer().getThreadPoolDelegate();
+ this.prefetchSize = prefetchSize;
+
+ //We always created with clientConsumerFull = true
+ //This prevents the SCD sending messages to the client before the client has fully
+ //finished creating the MessageCallbackHandler
+ this.clientConsumerFull = true;
+
+ //We allocate an executor for this consumer based on the destination name
+ //so that all consumers for the same destination currently use the same executor
+ //(we can change this if need be)
+ //Note that they do not use the same executor as the channel of the destination
+ QueuedExecutorPool pool =
+ sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
+
+ this.executor = (QueuedExecutor)pool.get("consumer" + dest.getName());
+
+ /*
+ Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
+ deliveries for the same consumer happen serially, since even if they are queued serially
+ the actual deliveries can happen in parallel, resulting in a later one "overtaking" an earlier
+ non-deterministicly depending on thread scheduling.
+ Consequently we use a QueuedExecutor to ensure the deliveries happen sequentially.
+ We do not want each ServerConsumerEndpoint instance to have it's own instance - since
+ we would end up using too many threads, neither do we want to share the same instance
+ amongst all consumers - we do not want to serialize delivery to all consumers.
+ So we maintain a bag of QueuedExecutors and give them out to consumers as required.
+ Different consumers can end up using the same queuedexecutor concurrently if there are a lot
+ of active consumers.
+ */
this.noLocal = noLocal;
this.destination = dest;
+ this.toDeliver = new ArrayList();
+
+ this.lock = new Object();
+
if (selector != null)
{
if (trace) log.trace("creating selector:" + selector);
@@ -140,52 +182,75 @@
if (trace) log.trace("created selector");
}
+ //FIXME -
+ //We really need to get rid of this delivery list - it's only purpose in life is to solve
+ //the race condition where acks or cancels can come in before handle has returned - and
+ //that can be solved in a simpler way anyway.
+ //It adds extra complexity both in all the extra code necessary to maintain it, the extra memory
+ //needed to maintain it, the extra complexity in synchronization on this class to protect access to it
+ //and when we do clustering we will have to replicate it too!!
+ //Let's GET RID OF IT!!!!!!!!!!!
this.deliveries = new LinkedHashMap();
- this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
- // adding the consumer to the channel
+
+ this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted(); // adding the consumer to the channel
this.channel.add(this);
+ //prompt delivery
+ channel.deliver(false);
+
log.debug(this + " created");
}
// Receiver implementation ---------------------------------------
- // There is no need to synchronize this method. The channel synchronizes delivery to its
- // consumers
+ /*
+ * The channel ensures that handle is never called concurrently by more than one thread
+ */
public Delivery handle(DeliveryObserver observer, Routable reference, Transaction tx)
{
if (trace) { log.trace(this + " receives reference " + reference.getMessageID() + " for delivery"); }
- MessageReference ref = (MessageReference)reference;
-
- if (!isReady())
+ //This is ok to have outside lock - is volatile
+ if (bufferFull)
{
- if (trace) { log.trace(this + " rejects reference with ID " + ref.getMessageID()); }
+ //We buffer a maximum of PREFETCH_LIMIT messages at once
+
+ if (trace) { log.trace(this + " has reached prefetch size will not accept any more references"); }
+
return null;
}
- try
+ //Need to synchronized around the whole block to prevent setting started = false
+ //but handle is already running and a message is deposited during the stop procedure
+ synchronized (lock)
{
- Delivery delivery = null;
+ // If the consumer is stopped then we don't accept the message, it should go back into the
+ // channel for delivery later.
+ if (!started)
+ {
+ // this is a common programming error, make this visible in the debug logs
+ // TODO: analyse performance implications
+ log.debug(this + " NOT started yet!");
+ return null;
+ }
+
+ MessageReference ref = (MessageReference)reference;
JBossMessage message = (JBossMessage)ref.getMessage();
- // TODO - We need to put the message in a DLQ
- // For now we just ack it otherwise the message will keep being retried and we'll never get
- // anywhere
- if (ref.getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
+ boolean selectorRejected = !this.accept(message);
+
+ SimpleDelivery delivery = new SimpleDelivery(observer, ref, false, !selectorRejected);
+
+ checkDeliveryCount(delivery);
+
+ if (delivery.isDone())
{
- log.warn(message + " has exceed maximum delivery attempts and will be removed");
- delivery = new SimpleDelivery(observer, ref, true);
return delivery;
}
- selectorRejected = !this.accept(message);
-
- delivery = new SimpleDelivery(observer, ref, false, !selectorRejected);
deliveries.put(new Long(ref.getMessageID()), delivery);
-
if (selectorRejected)
{
// we "arrest" the message so we can get the next one
@@ -194,6 +259,12 @@
// http://jira.jboss.org/jira/browse/JBMESSAGING-275
if (trace) { log.trace(this + " DOES NOT accept the message because the selector rejected it"); }
+ //FIXME - This hack also breaks delivery behaviour - if there are multiple competing consumers
+ //on the same queue, each with a different selector, then if the message arrives at one receiver
+ //(e.g. this one) and doesn't match the selector, then it is arrested, which means the
+ //PointToPointRouter does not try the next receiver which does match.
+ //See
+
// ... however, keep asking for messages, the fact that this one wasn't accepted doesn't
// mean that the next one it won't.
@@ -207,45 +278,29 @@
MessageProxy mp = JBossMessage.createThinDelegate(message, ref.getDeliveryCount());
- if (!grabbing)
- {
- // We want to asynchronously deliver the message to the consumer. Deliver the message on
- // a different thread than the core thread that brought it here.
+ //Add the proxy to the list to deliver
+ toDeliver.add(mp);
+
+ bufferFull = toDeliver.size() >= prefetchSize;
+
+ if (!clientConsumerFull)
+ {
try
{
- if (trace) { log.trace("queueing message " + message + " for delivery to client"); }
- threadPoolDelegate.execute(
- new DeliveryRunnable(mp, id, sessionEndpoint.getConnectionEndpoint(), trace));
+ this.executor.execute(new Deliverer());
}
catch (InterruptedException e)
{
log.warn("Thread interrupted", e);
}
}
- else
- {
- // The message is being "grabbed" and returned for receiveNoWait semantics
- toGrab = mp;
- }
-
- lastMessageIDDelivered = mp.getMessage().getMessageID();
return delivery;
}
- finally
- {
- // reset the "active" state, but only if the current message hasn't been rejected by
- // selector, because otherwise we want to get more messages
- // TODO this is a kludge that will be cleared by http://jira.jboss.org/jira/browse/JBMESSAGING-275
- if (!selectorRejected)
- {
- active = false;
- grabbing = false;
- }
- }
}
+
// Filter implementation -----------------------------------------
public boolean accept(Routable r)
@@ -285,22 +340,21 @@
public void closing() throws JMSException
{
if (trace) { log.trace(this + " closing"); }
+
+ stop();
}
public void close() throws JMSException
{
- if (closed)
+ synchronized (lock)
{
- throw new IllegalStateException("Consumer is already closed");
- }
-
- if (trace) { log.trace(this + " close"); }
-
- closed = true;
-
- // On close we only disconnect the consumer from the Channel we don't actually remove it
+ //On close we only disconnect the consumer from the Channel we don't actually remove it
// This is because it may still contain deliveries that may well be acknowledged after
// the consumer has closed. This is perfectly valid.
+ //FIXME - The deliveries should really be stored in the session endpoint, not here
+ //that is their natural place, that would mean we wouldn't have to mess around with keeping
+ //deliveries after this is closed
+
disconnect();
JMSDispatcher.instance.unregisterTarget(new Integer(id));
@@ -322,107 +376,64 @@
throw new MessagingJMSException("Failed to disconnect", e);
}
}
- }
-
- // ConsumerEndpoint implementation -------------------------------
- public void cancelDelivery(long messageID) throws JMSException
- {
- SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(new Long(messageID));
- if (del != null)
- {
- try
- {
- del.cancel();
- }
- catch (Throwable t)
- {
- throw new MessagingJMSException("Failed to cancel delivery " + del, t);
- }
- promptDelivery();
- }
- else
- {
- throw new IllegalStateException("Cannot find delivery to cancel:" + messageID);
+ closed = true;
}
}
- public void cancelDeliveries(List messageIDs) throws JMSException
- {
- //Cancel in reverse order to preserve order in queue
-
- for (int i = messageIDs.size() - 1; i >= 0; i--)
- {
- Long id = (Long)messageIDs.get(i);
-
- cancelDelivery(id.longValue());
- }
- }
+ // ConsumerEndpoint implementation -------------------------------
- /**
- * We attempt to get the message directly fron the channel first. If we find one, we return that.
- * Otherwise, if wait = true, we register as being interested in receiving a message
- * asynchronously, then return and wait for it on the client side.
+ /*
+ * This is called by the client consumer to tell the server to wake up and start sending more
+ * messages if available
*/
- public MessageProxy getMessageNow(boolean wait) throws JMSException
- {
- synchronized (channel)
+ public void more()
{
try
{
- grabbing = true;
+ /*
+ Set clientConsumerFull to false
+ NOTE! This must be done using a Runnable on the delivery executor - this is to
+ prevent the following race condition:
+ 1) Messages are delivered to the client, causing it to be full
+ 2) The messages are consumed very quickly on the client causing more to be called()
+ 3) more() hits the server BEFORE the deliverer thread has returned from delivering to the client
+ causing clientConsumerFull to be set to false and adding a deliverer to the queue.
+ 4) The deliverer thread returns and sets clientConsumerFull to true
+ 5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
+ though the client needs messages
+ */
+ this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
- // This will always deliver a message (if there is one) on the same thread
- promptDelivery();
+ //Run a deliverer to deliver any existing ones
+ this.executor.execute(new Deliverer());
- if (wait && toGrab == null)
- {
- active = true;
- }
+ //TODO Why do we need to wait for it to execute??
+ //Why not just return immediately?
- return toGrab;
- }
- finally
- {
- toGrab = null;
- grabbing = false;
- }
- }
- }
+ //Now wait for it to execute
+ Future result = new Future();
- public long deactivate() throws JMSException
- {
- synchronized (channel)
- {
- active = false;
- if (trace) { log.trace(this + " deactivated"); }
+ this.executor.execute(new Waiter(result));
- return lastMessageIDDelivered;
- }
- }
+ result.getResult();
- public void activate() throws JMSException
- {
- synchronized (channel)
- {
- if (closed)
+ //Now we know the deliverer has delivered any outstanding messages to the client buffer
+ }
+ catch (InterruptedException e)
{
- //Do nothing
- return;
+ log.warn("Thread interrupted", e);
}
- active = true;
- if (trace) { log.trace(this + " just activated"); }
-
- promptDelivery();
- }
+ channel.deliver(false);
}
+
// Public --------------------------------------------------------
public String toString()
{
- return "ConsumerEndpoint[" + id + "]" + (active ? "(active)" : "");
+ return "ConsumerEndpoint[" + id + "]";
}
public JBossDestination getDestination()
@@ -435,14 +446,18 @@
return sessionEndpoint;
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
+ public int getId()
+ {
+ return id;
+ }
/**
* Actually remove the consumer and clear up any deliveries it may have
- * */
- protected void remove() throws JMSException
+ * This is called by the session on session.close()
+ * We can get rid of this when we store the deliveries on the session
+ *
+ **/
+ public void remove() throws JMSException
{
if (trace) log.trace("attempting to remove receiver " + this + " from destination " + channel);
@@ -480,19 +495,24 @@
//If we cancelled any deliveries we need to force a deliver on the channel
//This is because there may be other waiting competing consumers who need a chance to get
//any of the cancelled messages
- channel.deliver();
+ channel.deliver(false);
}
}
- protected void acknowledgeAll() throws JMSException
+ public void acknowledge(long messageID) throws JMSException
{
- // acknowledge all "pending" deliveries, except the ones corresponding to messages rejected
- // by selector, which are cancelled
+ // acknowledge a delivery
try
{
- for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
+ SingleReceiverDelivery d;
+
+ synchronized (lock)
+ {
+ d = (SingleReceiverDelivery)deliveries.remove(new Long(messageID));
+ }
+
+ if (d != null)
{
- SingleReceiverDelivery d = (SingleReceiverDelivery)i.next();
//TODO - Selector kludge - remove this
if (d.isSelectorAccepted())
@@ -504,8 +524,10 @@
d.cancel();
}
}
-
- deliveries.clear();
+ else
+ {
+ throw new IllegalStateException("Cannot find delivery to acknowledge:" + messageID);
+ }
}
catch(Throwable t)
{
@@ -513,15 +535,17 @@
}
}
-
- protected void acknowledgeTransactionally(long messageID, Transaction tx) throws JMSException
+ public void acknowledgeTransactionally(long messageID, Transaction tx) throws JMSException
{
- if (trace) { log.trace("acknowledging " + messageID); }
+ if (trace) { log.trace("acknowledging transactionally " + messageID); }
SingleReceiverDelivery d = null;
// The actual removal of the deliveries from the delivery list is deferred until tx commit
+ synchronized (lock)
+ {
d = (SingleReceiverDelivery)deliveries.get(new Long(messageID));
+ }
if (deliveryCallback == null)
{
deliveryCallback = new DeliveryCallback();
@@ -529,7 +553,6 @@
}
deliveryCallback.addMessageID(messageID);
-
if (d != null)
{
try
@@ -548,89 +571,131 @@
}
}
- protected void removeDelivery(String messageID) throws JMSException
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void promptDelivery()
{
- if (deliveries.remove(messageID) == null)
+ channel.deliver(false);
+ }
+
+ protected void cancelDelivery(Long messageID) throws JMSException
+ {
+ SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
+ if (del != null)
{
- throw new IllegalStateException("Cannot find delivery to remove:" + messageID);
+ del.getReference().decrementDeliveryCount();
+ try
+ {
+ del.cancel();
+ }
+ catch (Throwable t)
+ {
+ throw new MessagingJMSException("Failed to cancel delivery " + del, t);
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("Cannot find delivery to cancel:" + id);
}
}
- protected void cancelAllDeliveries() throws JMSException
+ protected void start() throws JMSException
+ {
+ synchronized (lock)
+ {
+ //can't start or stop it if it is closed
+ if (closed)
{
- if (trace) { log.trace(this + " cancels deliveries"); }
+ return;
+ }
- // Need to cancel starting at the end of the list and working to the front in order that the
- // messages end up back in the correct order in the channel.
+ if (started)
+ {
+ return;
+ }
- List toCancel = new ArrayList();
+ started = true;
+ }
- Iterator iter = deliveries.values().iterator();
- while (iter.hasNext())
- {
- SingleReceiverDelivery d = (SingleReceiverDelivery)iter.next();
- toCancel.add(d);
+ //Prompt delivery
+ channel.deliver(false);
}
- for (int i = toCancel.size() - 1; i >= 0; i--)
+ protected void stop() throws JMSException
{
- SingleReceiverDelivery d = (SingleReceiverDelivery)toCancel.get(i);
- try
+ //We need to:
+ //Stop accepting any new messages in the SCE
+ //Flush any messages from the SCE to the buffer
+ //If the client consumer is now full, then we need to cancel the ones in the toDeliver list
+
+ //We need to lock since otherwise we could set started to false but the handle method was already executing
+ //and messages might get deposited after
+ synchronized (lock)
{
- d.cancel();
- if (trace) { log.trace(d + " canceled"); }
- }
- catch(Throwable t)
+ //can't start or stop it if it is closed
+ if (closed)
{
- log.error("Failed to cancel delivery: " + d, t);
- }
+ return;
}
- deliveries.clear();
- promptDelivery();
+ started = false;
}
- protected void setStarted(boolean started)
+ //Now we know no more messages will be accepted in the SCE
+
+ try
{
- if (trace) { log.trace(this + (started ? " started" : " stopped")); }
+ //Flush any messages waiting to be sent to the client
+ this.executor.execute(new Deliverer());
- this.started = started;
+ //Now wait for it to execute
+ Future result = new Future();
- if (started)
- {
- //need to prompt delivery
- promptDelivery();
+ this.executor.execute(new Waiter(result));
+
+ result.getResult();
+
+ //Now we know any deliverer has delivered any outstanding messages to the client buffer
}
+ catch (InterruptedException e)
+ {
+ log.warn("Thread interrupted", e);
}
- protected void promptDelivery()
+ //Now we know that there are no in flight messages on the way to the client consumer.
+
+ //But there may be messages still in the toDeliver list since the client consumer might be full
+ //So we need to cancel these
+
+ if (!toDeliver.isEmpty())
{
- if (active || grabbing)
+ synchronized (lock)
{
- // prompt delivery in a loop, since this consumer may "arrest" a message not accepted
- // by the selector, while it still wants to get the next one
- // TODO this is a kludge that will be cleared by http://jira.jboss.org/jira/browse/JBMESSAGING-275
- while(true)
+ for (int i = toDeliver.size() - 1; i >= 0; i--)
{
- if (trace) { log.trace(this + " prompts delivery"); }
-
- selectorRejected = false;
+ MessageProxy proxy = (MessageProxy)toDeliver.get(i);
- channel.deliver(this);
+ long id = proxy.getMessage().getMessageID();
- if (!selectorRejected)
- {
- break;
+ cancelDelivery(new Long(id));
}
}
+
+ toDeliver.clear();
+
+ bufferFull = false;
}
}
+ // Private -------------------------------------------------------
+
/**
* Disconnect this consumer from the Channel that feeds it. This method does not clear up
* deliveries, except the "arrested" ones
*/
- protected void disconnect()
+ private void disconnect()
{
// clean up "arrested" deliveries, no acknowledgment will ever come for them
for(Iterator i = deliveries.values().iterator(); i.hasNext(); )
@@ -658,46 +723,132 @@
}
}
+ private void checkDeliveryCount(SimpleDelivery del)
+ {
+ //TODO - We need to put the message in a DLQ
+ // For now we just ack it otherwise the message will keep being retried and we'll never get
+ // anywhere
+ if (del.getReference().getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
+ {
+ log.warn(del.getReference() + " has exceed maximum delivery attempts and will be removed");
+
+ try
+ {
+ del.acknowledge(null);
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to acknowledge delivery", t);
+ }
+ }
+
+ }
+
+ // Inner classes -------------------------------------------------
+
/*
- * Do we want to handle the message? (excluding filter check)
+ * Delivers messages to the client
+ * TODO - We can make this a bit more intelligent by letting it measure the rate
+ * the client is consuming messages and send messages at that rate.
+ * This would mean the client consumer wouldn't be full so often and more wouldn't have to be called
+ * This should give higher throughput.
*/
- protected boolean isReady()
+ private class Deliverer implements Runnable
{
- // If the client side consumer is not ready to accept a message and have it sent to it
- // or we're not grabbing a message for receiveNoWait we return null to refuse the message
- if (!active && !grabbing)
+ public void run()
{
- if (trace) { log.trace(this + " not ready"); }
- return false;
+ //Is there anything to deliver?
+ //This is ok outside lock - is volatile
+ if (clientConsumerFull)
+ {
+ //Do nothing
+ return;
}
- if (closed)
+ List list = null;
+
+ synchronized (lock)
{
- if (trace) { log.trace(this + " closed"); }
- return false;
+ if (!toDeliver.isEmpty())
+ {
+ list = new ArrayList(toDeliver);
+
+ toDeliver.clear();
+
+ bufferFull = false;
+ }
}
- // If the consumer is stopped then we don't accept the message, it should go back into the
- // channel for delivery later.
- if (!started)
+ if (list != null)
{
- // this is a common programming error, make this visible in the debug logs
- // TODO: anaylize performance implications
- log.debug(this + " NOT started yet!");
- return false;
+ ServerConnectionEndpoint connection =
+ ServerConsumerEndpoint.this.sessionEndpoint.getConnectionEndpoint();
+
+ try
+ {
+ if (trace) { log.trace("handing " + list.size() + " messages over to the remoting layer"); }
+
+ ClientDelivery del = new ClientDelivery(list, id);
+
+ //TODO How can we ensure that messages for the same consumer aren't delivered
+ //concurrently to the same consumer on different threads?
+ MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
+
+ MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
+
+ HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
+
+ if (trace) { log.trace("handed messages over to the remoting layer"); }
+
+ //For now we don't look at how many messages are accepted since they all will be
+ //The field is a placeholder for the future
+ if (result.clientIsFull())
+ {
+ //Stop the server sending any more messages to the client
+ //This is ok outside lock
+ clientConsumerFull = true;
}
+ }
+ catch(Throwable t)
+ {
+ log.warn("Failed to deliver the message to the client.");
- //TODO nice all the message headers and properties are in the reference we can do the
- //filter check in here too.
+ if (trace)
+ {
+ log.trace("Failed to deliver message", t);
+ }
+
+ ConnectionManager mgr = connection.getServerPeer().getConnectionManager();
- return true;
+ mgr.handleClientFailure(connection.getRemotingClientSessionId());
+ }
+ }
+ }
}
- // Private -------------------------------------------------------
+ /*
+ * The purpose of this class is to put it on the QueuedExecutor and wait for it to run
+ * We can then ensure that all the Runnables in front of it on the queue have also executed
+ * We cannot just call shutdownAfterProcessingCurrentlyQueuedTasks() since the
+ * QueueExecutor might be share by other consumers and we don't want to wait for their
+ * tasks to complete
+ */
+ private class Waiter implements Runnable
+ {
+ Future result;
- // Inner classes -------------------------------------------------
+ Waiter(Future result)
+ {
+ this.result = result;
+ }
+
+ public void run()
+ {
+ result.setResult(null);
+ }
+ }
- class DeliveryCallback implements TxCallback
+ private class DeliveryCallback implements TxCallback
{
List delList = new ArrayList();
@@ -740,30 +891,7 @@
public void afterRollback(boolean onePhase) throws TransactionException
{
- // Cancel the deliveries. Need to be cancelled in reverse order to maintain ordering
- for(Iterator i = delList.iterator(); i.hasNext(); )
- {
- Long messageID = (Long)i.next();
-
- SimpleDelivery del;
-
- if ((del = (SimpleDelivery)deliveries.remove(messageID)) == null)
- {
- throw new TransactionException("Failed to remove delivery " + messageID);
- }
-
- // cancel the delivery
- try
- {
- del.cancel();
- }
- catch (Throwable t)
- {
- throw new TransactionException("Failed to cancel delivery " + del, t);
- }
- }
-
- deliveryCallback = null;
+ //Do nothing
}
void addMessageID(long messageID)
1.41 +78 -29 jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerSessionEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -b -r1.40 -r1.41
--- ServerSessionEndpoint.java 27 Jun 2006 19:44:39 -0000 1.40
+++ ServerSessionEndpoint.java 17 Jul 2006 17:14:46 -0000 1.41
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,6 +47,7 @@
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.server.subscription.DurableSubscription;
import org.jboss.jms.server.subscription.Subscription;
+import org.jboss.jms.tx.AckInfo;
import org.jboss.jms.util.MessagingJMSException;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Channel;
@@ -61,9 +63,9 @@
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.40 $</tt>
+ * @version <tt>$Revision: 1.41 $</tt>
*
- * $Id: ServerSessionEndpoint.java,v 1.40 2006/06/27 19:44:39 timfox Exp $
+ * $Id: ServerSessionEndpoint.java,v 1.41 2006/07/17 17:14:46 timfox Exp $
*/
public class ServerSessionEndpoint implements SessionEndpoint
{
@@ -250,14 +252,17 @@
}
}
+ int prefetchSize = connectionEndpoint.getPrefetchSize();
+
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID,
subscription == null ? (Channel)coreDestination : subscription,
- this, selector, noLocal, jmsDestination);
+ this, selector, noLocal, jmsDestination, prefetchSize);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
- ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID);
+
+ ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize);
if (subscription != null)
{
@@ -382,33 +387,69 @@
connectionEndpoint.sendMessage(message, null);
}
- /**
- * Cancel all the deliveries in the session
- */
- public void cancelDeliveries() throws JMSException
+ public void acknowledgeBatch(List ackInfos) throws JMSException
{
- if (closed)
+ Iterator iter = ackInfos.iterator();
+
+ while (iter.hasNext())
{
- throw new IllegalStateException("Session is closed");
+ AckInfo ackInfo = (AckInfo)iter.next();
+
+ acknowledge(ackInfo);
+ }
}
- if (trace) { log.trace("Cancelling messages"); }
+ public void acknowledge(AckInfo ackInfo) throws JMSException
+ {
+ //If the message was delivered via a connection consumer then the message needs to be acked
+ //via the original consumer that was used to feed the connection consumer - which
+ //won't be one of the consumers of this session
+ //Therefore we always look in the global map of consumers held in the server peer
+ ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
- for(Iterator i = this.consumers.values().iterator(); i.hasNext(); )
+ if (consumer == null)
{
- ServerConsumerEndpoint scd = (ServerConsumerEndpoint)i.next();
- scd.cancelAllDeliveries();
+ throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
}
+
+ consumer.acknowledge(ackInfo.getMessageID());
+
}
- public void acknowledge() throws JMSException
+ public void cancelDeliveries(List ackInfos) throws JMSException
+ {
+ //Deliveries must be cancelled in reverse order
+
+ log.info(this + " cancelling deliveries");
+
+ Set consumers = new HashSet();
+
+ for (int i = ackInfos.size() - 1; i >= 0; i--)
+ {
+ AckInfo ack = (AckInfo)ackInfos.get(i);
+
+ //We look in the global map since the message might have come from connection consumer
+ ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
+
+ if (consumer == null)
{
+ throw new IllegalArgumentException("Cannot find consumer id: " + ack.getConsumerID());
+ }
+
+ consumer.cancelDelivery(new Long(ack.getMessageID()));
+
+ consumers.add(consumer);
+ }
+
+ //Need to prompt delivery for all consumers
+
+ Iterator iter = consumers.iterator();
- Iterator iter = consumers.values().iterator();
while (iter.hasNext())
{
ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)iter.next();
- consumer.acknowledgeAll();
+
+ consumer.promptDelivery();
}
}
@@ -576,13 +617,21 @@
/**
* Starts this session's Consumers
*/
- protected void setStarted(boolean s)
+ protected void setStarted(boolean s) throws JMSException
{
synchronized(consumers)
{
for(Iterator i = consumers.values().iterator(); i.hasNext(); )
{
- ((ServerConsumerEndpoint)i.next()).setStarted(s);
+ ServerConsumerEndpoint sce = (ServerConsumerEndpoint)i.next();
+ if (s)
+ {
+ sce.start();
+ }
+ else
+ {
+ sce.stop();
+ }
}
}
}
1.10 +30 -8 jboss-jms/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SessionEndpoint.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -b -r1.9 -r1.10
--- SessionEndpoint.java 20 Apr 2006 20:42:26 -0000 1.9
+++ SessionEndpoint.java 17 Jul 2006 17:14:46 -0000 1.10
@@ -22,6 +22,8 @@
package org.jboss.jms.server.endpoint;
+import java.util.List;
+
import javax.jms.JMSException;
import org.jboss.jms.client.Closeable;
@@ -31,14 +33,15 @@
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.tx.AckInfo;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version <tt>$Revision: 1.9 $</tt>
+ * @version <tt>$Revision: 1.10 $</tt>
*
- * $Id: SessionEndpoint.java,v 1.9 2006/04/20 20:42:26 timfox Exp $
+ * $Id: SessionEndpoint.java,v 1.10 2006/07/17 17:14:46 timfox Exp $
*/
public interface SessionEndpoint extends Closeable
{
@@ -64,11 +67,18 @@
JBossTopic createTopic(String topicName) throws JMSException;
/**
- * Acknowledges the session
+ * Acknowledge a batch of messages - used with client acknowledge or dups_ok acknowledge
+ * @param ackInfos
+ * @throws JMSException
*/
- void acknowledge() throws JMSException;
+ void acknowledgeBatch(List ackInfos) throws JMSException;
- void cancelDeliveries() throws JMSException;
+ /**
+ * Acknowledge a message - used for auto acknowledge
+ * @param ackInfo
+ * @throws JMSException
+ */
+ void acknowledge(AckInfo ackInfo) throws JMSException;
/**
* Add a temporary destination.
@@ -89,8 +99,20 @@
*/
void unsubscribe(String subscriptionName) throws JMSException;
+ /**
+ * Send a message
+ * @param message The message to send
+ * @throws JMSException
+ */
void send(JBossMessage message) throws JMSException;
-
+ /**
+ * Cancel some deliveries.
+ * This used at consumer close to cancel any undelivered messages left in the client buffer
+ * or at session recovery to cancel any messages that couldn't be redelivered locally
+ * @param ackInfos
+ * @throws Exception
+ */
+ void cancelDeliveries(List ackInfos) throws JMSException;
}
1.1 date: 2006/07/17 17:14:45; author: timfox; state: Exp;jboss-jms/src/main/org/jboss/jms/server/endpoint/ClientDelivery.java
Index: ClientDelivery.java
===================================================================
/*
* JBoss, Home of Professional Open Source
* Copyright 2005, JBoss Inc., and individual contributors as indicated
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.jms.server.endpoint;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.messaging.core.message.MessageFactory;
/**
*
* A ClientDelivery
* Encapsulates a delivery of some messages to a client consumer
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id: ClientDelivery.java,v 1.1 2006/07/17 17:14:45 timfox Exp $
*
*/
public class ClientDelivery implements Externalizable
{
// Constants -----------------------------------------------------
private static final long serialVersionUID = 8375144805659344430L;
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
private List msgs;
private int consumerID;
// Constructors --------------------------------------------------
public ClientDelivery()
{
}
public ClientDelivery(List msgs, int consumerID)
{
this.msgs = msgs;
this.consumerID = consumerID;
}
// Externalizable implementation
// ---------------------------------------------------------------
public void writeExternal(ObjectOutput out) throws IOException
{
out.writeInt(consumerID);
out.writeInt(msgs.size());
Iterator iter = msgs.iterator();
while (iter.hasNext())
{
MessageProxy mp = (MessageProxy)iter.next();
out.writeByte(mp.getMessage().getType());
out.writeInt(mp.getDeliveryCount());
mp.getMessage().writeExternal(out);
}
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
consumerID = in.readInt();
int numMessages = in.readInt();
msgs = new ArrayList(numMessages);
for (int i = 0; i < numMessages; i++)
{
byte type = in.readByte();
int deliveryCount = in.readInt();
JBossMessage m = (JBossMessage)MessageFactory.createMessage(type);
m.readExternal(in);
MessageProxy md = JBossMessage.createThinDelegate(m, deliveryCount);
msgs.add(md);
}
}
// Public --------------------------------------------------------
public List getMessages()
{
return msgs;
}
public int getConsumerID()
{
return consumerID;
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list