[jboss-cvs] JBoss Messaging SVN: r3957 - in branches/Branch_Stable: src/main/org/jboss/jms/client/state and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 27 06:29:13 EDT 2008
Author: timfox
Date: 2008-03-27 06:29:12 -0400 (Thu, 27 Mar 2008)
New Revision: 3957
Added:
branches/Branch_Stable/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
Modified:
branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/Branch_Stable/src/main/org/jboss/messaging/util/JBMExecutor.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/PagingTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1266
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -201,25 +201,32 @@
public Object handleClose(Invocation invocation) throws Throwable
{
- Object res = invocation.invokeNext();
-
- SessionState state = getState(invocation);
+ try
+ {
+ Object res = invocation.invokeNext();
+
+ return res;
+ }
+ finally
+ {
+ SessionState state = getState(invocation);
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+
+ Object xid = state.getCurrentTxId();
+
+ if (xid != null)
+ {
+ //Remove transaction from the resource manager
+ connState.getResourceManager().removeTx(xid);
+ }
+
+ // We must explicitly shutdown the executor
+
+ state.getExecutor().shutdownNow();
+ }
- ConnectionState connState = (ConnectionState)state.getParent();
-
- Object xid = state.getCurrentTxId();
-
- if (xid != null)
- {
- //Remove transaction from the resource manager
- connState.getResourceManager().removeTx(xid);
- }
-
- // We must explicitly shutdown the executor
-
- state.getExecutor().shutdownNow();
-
- return res;
+
}
public Object handlePreDeliver(Invocation invocation) throws Throwable
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -29,11 +29,10 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+
import javax.jms.MessageListener;
import javax.jms.Session;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import org.jboss.jms.client.container.ClientConsumer;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
@@ -50,6 +49,8 @@
import org.jboss.messaging.util.JBMExecutor;
import org.jboss.messaging.util.Version;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+
/**
* State corresponding to a session. This state is acessible inside aspects/interceptors.
*
@@ -141,7 +142,7 @@
currentTxId = parent.getResourceManager().createLocalTx();
}
- executor = new JBMExecutor(new LinkedQueue());
+ executor = new JBMExecutor("jbm-client-session-" + sessionID);
clientAckList = new ArrayList();
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/ServerPeer.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -289,7 +289,7 @@
}
if (clusterPullConnectionFactoryName != null)
{
- clusterConnectionManager = new ClusterConnectionManager(useXAForMessagePull, serverPeerID,
+ clusterConnectionManager = new ClusterConnectionManager(serverPeerID,
clusterPullConnectionFactoryName, defaultPreserveOrdering,
SecurityStore.SUCKER_USER, suckerPassword);
clusterNotifier.registerListener(clusterConnectionManager);
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -52,11 +52,10 @@
import org.jboss.messaging.core.contract.ClusterNotificationListener;
import org.jboss.messaging.core.contract.Replicator;
import org.jboss.messaging.util.JNDIUtil;
+import org.jboss.messaging.util.NamedThreadQueuedExecutor;
import org.jboss.messaging.util.Version;
import org.jboss.remoting.InvokerLocator;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
@@ -100,7 +99,7 @@
this.serverPeer = serverPeer;
endpoints = new HashMap();
delegates = new HashMap();
- notifyExecutor = new QueuedExecutor(new LinkedQueue());
+ notifyExecutor = new NamedThreadQueuedExecutor("jbm-cf-jndimapper");
}
// ConnectionFactoryManager implementation ------------------------------------------------------
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -46,7 +46,6 @@
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.jms.delegate.DeliveryRecovery;
-import org.jboss.jms.delegate.SessionEndpoint;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
@@ -86,6 +85,7 @@
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.messaging.util.NamedThreadQueuedExecutor;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
@@ -169,7 +169,7 @@
private long deliveryIdSequence;
//Temporary until we have our own NIO transport
- QueuedExecutor executor = new QueuedExecutor(new LinkedQueue());
+ QueuedExecutor executor;
private LinkedQueue toDeliver = new LinkedQueue();
@@ -186,6 +186,8 @@
boolean replicating) throws Exception
{
this.id = sessionID;
+
+ this.executor = new NamedThreadQueuedExecutor("jbm-server-session-" + sessionID);
this.connectionEndpoint = connectionEndpoint;
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -30,6 +30,7 @@
import java.util.Map;
import javax.jms.JMSException;
+import javax.jms.Session;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
@@ -64,9 +65,7 @@
private boolean trace = log.isTraceEnabled();
private Map connections;
-
- private boolean xa;
-
+
private boolean started;
private int nodeID;
@@ -83,15 +82,13 @@
private String suckerPassword;
- public ClusterConnectionManager(boolean xa, int nodeID,
+ public ClusterConnectionManager(int nodeID,
String connectionFactoryUniqueName, boolean preserveOrdering,
String suckerUser,
String suckerPassword)
{
connections = new HashMap();
- this.xa = xa;
-
this.nodeID = nodeID;
this.connectionFactoryUniqueName = connectionFactoryUniqueName;
@@ -167,20 +164,6 @@
}
}
- public void setIsXA(boolean xa) throws Exception
- {
- boolean needToClose = this.xa != xa;
- if (needToClose)
- {
- closeAllSuckers();
- }
- this.xa = xa;
- if (needToClose)
- {
- createAllSuckers();
- }
- }
-
public void closeAllSuckers()
{
Iterator iter = connections.values().iterator();
@@ -394,7 +377,7 @@
public String toString()
{
return "ClusterConnectionManager:" + System.identityHashCode(this) +
- " xa: " + xa + " nodeID: " + nodeID + " connectionFactoryName: " + connectionFactoryUniqueName;
+ " nodeID: " + nodeID + " connectionFactoryName: " + connectionFactoryUniqueName;
}
private void ensureAllConnectionsCreated() throws Exception
@@ -487,8 +470,8 @@
throw new IllegalArgumentException("Cannot find source channel id");
}
- MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection,
- xa, preserveOrdering, sourceChannelID);
+ MessageSucker sucker = new MessageSucker(localQueue, info.session, localInfo.session,
+ preserveOrdering, sourceChannelID);
info.addSucker(sucker);
@@ -630,6 +613,8 @@
private JBossConnection connection;
+ private Session session;
+
private Map suckers;
private boolean started;
@@ -658,7 +643,9 @@
if (connection == null)
{
- connection = (JBossConnection)connectionFactory.createConnection(suckerUser, suckerPassword);
+ connection = (JBossConnection)connectionFactory.createConnection(suckerUser, suckerPassword);
+
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
}
connection.start();
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -29,20 +29,17 @@
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossSession;
import org.jboss.jms.client.container.ClientConsumer;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.client.state.ConsumerState;
import org.jboss.jms.delegate.ProducerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
-import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Queue;
-import org.jboss.tm.TransactionManagerLocator;
/**
*
@@ -57,23 +54,17 @@
private static final Logger log = Logger.getLogger(MessageSucker.class);
private boolean trace = log.isTraceEnabled();
-
- private JBossConnection sourceConnection;
-
- private JBossConnection localConnection;
-
+
private Queue localQueue;
- private SessionDelegate sourceSession;
+ private Session sourceSession;
- private SessionDelegate localSession;
+ private Session localSession;
private ProducerDelegate producer;
private volatile boolean started;
- private boolean xa;
-
private TransactionManager tm;
private boolean consuming;
@@ -86,37 +77,30 @@
private long sourceChannelID;
+ private JBossQueue jbq;
+
public String toString()
{
return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
}
-
- MessageSucker(Queue localQueue, JBossConnection sourceConnection, JBossConnection localConnection,
- boolean xa, boolean preserveOrdering, long sourceChannelID)
- {
- if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " xa:" + xa + " preserveOrdering:" + preserveOrdering); }
-
- this.localQueue = localQueue;
-
- this.sourceConnection = sourceConnection;
-
- this.localConnection = localConnection;
-
- //this.xa = xa;
-
- //XA is currently disabled for message sucking - this is because JBM 1.4.0 uses shared database so XA is
- //unnecesary - we can move the ref from one channel to another with a database update
- this.xa = false;
-
- this.preserveOrdering = preserveOrdering;
-
- this.sourceChannelID = sourceChannelID;
-
- if (xa)
- {
- tm = TransactionManagerLocator.getInstance().locate();
- }
- }
+
+ MessageSucker(Queue localQueue, Session sourceSession, Session localSession,
+ boolean preserveOrdering, long sourceChannelID)
+ {
+ if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " preserveOrdering:" + preserveOrdering); }
+
+ this.jbq = new JBossQueue(localQueue.getName(), true);
+
+ this.localQueue = localQueue;
+
+ this.sourceSession = sourceSession;
+
+ this.localSession = localSession;
+
+ this.preserveOrdering = preserveOrdering;
+
+ this.sourceChannelID = sourceChannelID;
+ }
synchronized void start() throws Exception
{
@@ -126,44 +110,21 @@
}
if (trace) { log.trace(this + " starting"); }
+
+ SessionDelegate localdel = ((JBossSession)localSession).getDelegate();
- if (!xa)
- {
- //If not XA then we use a client ack session for consuming - this allows us to get the message, send it to the destination
- //then ack the message.
- //This means that if a failure occurs between sending and acking the message won't be lost but may get delivered
- //twice - i.e we have dups_ok behaviour
-
- JBossSession sess = (JBossSession)sourceConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ producer = localdel.createProducerDelegate(jbq);
- sourceSession = (SessionDelegate)sess.getDelegate();
-
- sess = (JBossSession)localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- localSession = (SessionDelegate)sess.getDelegate();
- }
- else
- {
- JBossSession sess = (JBossSession)sourceConnection.createXASession();
-
- sourceSession = (SessionDelegate)sess.getDelegate();
-
- sess = (JBossSession)localConnection.createXASession();
-
- localSession = (SessionDelegate)sess.getDelegate();
- }
-
- JBossDestination dest = new JBossQueue(localQueue.getName(), true);
-
- producer = localSession.createProducerDelegate(dest);
-
//We create the consumer with autoFlowControl = false
//In this mode, the consumer does not handle it's own flow control, but it must be handled
//manually using changeRate() methods
//The local queue itself will manually send these messages depending on its state -
//So effectively the message buffering is handled by the local queue, not the ClientConsumer
- consumer = (ClientConsumerDelegate)sourceSession.createConsumerDelegate(dest, null, false, null, false, false);
+ SessionDelegate sourcedel = ((JBossSession)sourceSession).getDelegate();
+
+ consumer = (ClientConsumerDelegate)sourcedel.createConsumerDelegate(jbq, null, false, null, false, false);
+
clientConsumer = ((ConsumerState)consumer.getState()).getClientConsumer();
consumer.setMessageListener(this);
@@ -192,22 +153,40 @@
try
{
- sourceSession.close();
+ consumer.closing(-1);
}
catch (Throwable t)
{
+ // Ignore
+ }
+ try
+ {
+ consumer.close();
+ }
+ catch (Throwable t)
+ {
//Ignore
}
try
{
- localSession.close();
+ producer.close();
}
catch (Throwable t)
{
//Ignore
}
+
+ sourceSession = null;
+ localSession = null;
+
+ consumer = null;
+
+ clientConsumer = null;
+
+ producer = null;
+
started = false;
}
@@ -248,78 +227,9 @@
public void onMessage(Message msg)
{
Transaction tx = null;
-
-
+
try
{
- /*
- Commented out until JBM 2.0
-
- boolean startTx = xa && msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT;
-
- if (startTx)
- {
- //Start a JTA transaction
-
- if (trace) { log.trace("Starting JTA transactions"); }
-
- tm.begin();
-
- tx = tm.getTransaction();
-
- tx.enlistResource(sourceSession.getXAResource());
-
- tx.enlistResource(localSession.getXAResource());
-
- if (trace) { log.trace("Started JTA transaction"); }
- }
-
- org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
-
- if (preserveOrdering)
- {
- //Add a header saying we have sucked the message
- coreMessage.putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
- }
-
- //Add a header with the node id of the node we sucked from - this is used on the sending end to do
- //the move optimisation
- coreMessage.putHeader(org.jboss.messaging.core.contract.Message.SOURCE_CHANNEL_ID, sourceChannelID);
-
- long timeToLive = msg.getJMSExpiration();
- if (timeToLive != 0)
- {
- timeToLive -= System.currentTimeMillis();
- if (timeToLive <= 0)
- {
- timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
- }
- }
-
- producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
-
- if (trace) { log.trace(this + " forwarded message to queue"); }
-
- if (startTx)
- {
- if (trace) { log.trace("Committing JTA transaction"); }
-
- tx.delistResource(sourceSession.getXAResource(), XAResource.TMSUCCESS);
-
- tx.delistResource(localSession.getXAResource(), XAResource.TMSUCCESS);
-
- tm.commit();
-
- if (trace) { log.trace("Committed JTA transaction"); }
- }
- else
- {
- msg.acknowledge();
-
- if (trace) { log.trace("Acknowledged message"); }
- }
- */
-
if (trace) { log.trace(this + " sucked message " + msg + " JMSDestination - " + msg.getJMSDestination()); }
Destination originalDestination = msg.getJMSDestination();
@@ -353,10 +263,13 @@
coreMessage.getHeaders().put(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, originalDestination);
- //Then we send - this causes the ref to be moved (SQL UPDATE) in the database
- producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
-
- if (trace) { log.trace(this + " forwarded message to queue"); }
+ synchronized (localSession)
+ {
+ //Then we send - this causes the ref to be moved (SQL UPDATE) in the database
+ producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
+
+ if (trace) { log.trace(this + " forwarded message to queue"); }
+ }
}
catch (Exception e)
{
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -81,7 +81,6 @@
import org.jgroups.View;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
@@ -1623,8 +1622,7 @@
failoverMap = new ConcurrentHashMap();
leftSet = new ConcurrentHashSet();
- }
-
+ }
}
private void deInit()
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/util/JBMExecutor.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/util/JBMExecutor.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/util/JBMExecutor.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -25,10 +25,7 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.Channel;
-
/** Any Executor being used on client side has to clean its contextClassLoader, as that could cause leaks.
* This class encapsulates the necessary cleanup to avoid that leak.
*
@@ -49,27 +46,23 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
**/
-public class JBMExecutor extends QueuedExecutor
+public class JBMExecutor extends NamedThreadQueuedExecutor
{
private boolean needToSetClassLoader = true;
- public JBMExecutor(Channel channel)
+ public JBMExecutor(String name)
{
- super(channel);
+ super(name);
}
- public JBMExecutor()
+ class TCLRunnable implements Runnable
{
- }
- class TCLExecutor implements Runnable
- {
-
- private Runnable realExecutor;
+ private Runnable realRunnable;
private ClassLoader tcl;
- public TCLExecutor(Runnable realExecutor)
+ public TCLRunnable(Runnable realRunnable)
{
if (needToSetClassLoader)
{
@@ -84,7 +77,7 @@
);
}
- this.realExecutor = realExecutor;
+ this.realRunnable = realRunnable;
}
@SuppressWarnings("unchecked")
@@ -105,14 +98,14 @@
}
);
}
- realExecutor.run();
+ realRunnable.run();
}
}
public void execute(Runnable runnable) throws InterruptedException
{
- super.execute(new TCLExecutor(runnable));
+ super.execute(new TCLRunnable(runnable));
}
public void clearClassLoader() throws InterruptedException
Copied: branches/Branch_Stable/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java (from rev 3955, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java)
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java (rev 0)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import org.jboss.logging.Logger;
+
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
+
+/**
+ *
+ * A NamedThreadQueuedExecutor
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NamedThreadQueuedExecutor extends QueuedExecutor
+{
+ private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
+
+ private final String name;
+
+ private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
+
+ public NamedThreadQueuedExecutor(String name)
+ {
+ super(new LinkedQueue());
+
+ this.name = name;
+
+ setThreadFactory(new Factory());
+
+ clearThread();
+
+ restart();
+ }
+
+ private class Factory implements ThreadFactory
+ {
+ public Thread newThread(Runnable command)
+ {
+ return new Thread(jbmGroup, command, name);
+ }
+
+ }
+}
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/PagingTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/PagingTest.java 2008-03-27 09:23:46 UTC (rev 3956)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/PagingTest.java 2008-03-27 10:29:12 UTC (rev 3957)
@@ -78,11 +78,11 @@
MessageConsumer cons = session.createConsumer(queue);
- final int numMessages = 100000;
+ final int numMessages = 10000;
long start = System.currentTimeMillis();
- for (int i = 0; i < 100000; i++)
+ for (int i = 0; i < numMessages; i++)
{
Message m = session.createMessage();
@@ -102,7 +102,7 @@
start = System.currentTimeMillis();
- for (int i = 0; i < 100000; i++)
+ for (int i = 0; i < numMessages; i++)
{
Message m = cons.receive(2000);
More information about the jboss-cvs-commits
mailing list