[jboss-cvs] JBoss Messaging SVN: r4984 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417: src/main/org/jboss/jms/client/state and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 18 15:44:06 EDT 2008
Author: jbertram at redhat.com
Date: 2008-09-18 15:44:05 -0400 (Thu, 18 Sep 2008)
New Revision: 4984
Added:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/CompatibleExecutor.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/ExecutorFactory.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/JBMThreadFactory.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/build.properties
Log:
One-off patch work for [JBPAPP-1183]
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Executor;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
@@ -45,8 +46,6 @@
import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox/a>
@@ -118,9 +117,10 @@
return false;
}
}
-
+
+
//This is static so it can be called by the asf layer too
- public static void callOnMessage(SessionDelegate sess,
+ public static void callOnMessageStatic(SessionDelegate sess,
MessageListener listener,
String consumerID,
String queueName,
@@ -186,6 +186,75 @@
if (trace) { log.trace("Called postDeliver"); }
}
}
+
+ //Changed it to non-static
+ public void callOnMessage(SessionDelegate sess,
+ MessageListener listener,
+ String consumerID,
+ String queueName,
+ boolean isConnectionConsumer,
+ MessageProxy m,
+ int ackMode,
+ int maxDeliveries,
+ SessionDelegate connectionConsumerSession,
+ boolean shouldAck)
+ throws JMSException
+ {
+ if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+ {
+ //Message has been cancelled
+ return;
+ }
+
+ DeliveryInfo deliveryInfo =
+ new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+
+ m.incDeliveryCount();
+
+ // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+ // add anything to the tx for this session.
+ if (!isConnectionConsumer)
+ {
+ //We need to call preDeliver, deliver the message then call postDeliver - this is because
+ //it is legal to call session.recover(), or session.rollback() from within the onMessage()
+ //method in which case the last message needs to be delivered so it needs to know about it
+ sess.preDeliver(deliveryInfo);
+ }
+
+ try
+ {
+ if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+
+ onMessageThread = Thread.currentThread();
+ listener.onMessage(m);
+
+ if (trace) { log.trace("listener's onMessage() finished"); }
+ }
+ catch (RuntimeException e)
+ {
+ long id = m.getMessage().getMessageID();
+
+ log.error("RuntimeException was thrown from onMessage, " + id + " will be redelivered", e);
+
+ // See JMS 1.1 spec 4.5.2
+
+ if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ sess.recover();
+ }
+ }
+
+ // If this is the callback-handler for a connection consumer we don't want to acknowledge
+ // or add anything to the tx for this session
+ if (!isConnectionConsumer)
+ {
+ if (trace) { log.trace("Calling postDeliver"); }
+
+ sess.postDeliver();
+
+ if (trace) { log.trace("Called postDeliver"); }
+ }
+ }
// Attributes -----------------------------------------------------------------------------------
@@ -206,7 +275,7 @@
private int ackMode;
private boolean closed;
private Object mainLock;
- private QueuedExecutor sessionExecutor;
+ private Executor sessionExecutor;
private boolean listenerRunning;
private int maxDeliveries;
private String queueName;
@@ -219,7 +288,8 @@
private boolean paused;
private int consumeCount;
private boolean firstTime = true;
-
+ private volatile Thread onMessageThread;
+
public int getBufferSize()
{
return buffer.size();
@@ -230,7 +300,7 @@
public ClientConsumer(boolean isCC, int ackMode,
SessionDelegate sess, ConsumerDelegate cons, String consumerID,
String queueName,
- int bufferSize, QueuedExecutor sessionExecutor,
+ int bufferSize, Executor sessionExecutor,
int maxDeliveries, boolean shouldAck,
long redeliveryDelay)
{
@@ -693,7 +763,7 @@
{
// Wait for any onMessage() executions to complete
- if (Thread.currentThread().equals(sessionExecutor.getThread()))
+ if (Thread.currentThread() == onMessageThread)
{
// the current thread already closing this ClientConsumer (this happens when the
// session is closed from within the MessageListener.onMessage(), for example), so no need
@@ -702,29 +772,17 @@
}
Future result = new Future();
-
- try
- {
- sessionExecutor.execute(new Closer(result));
- if (trace) { log.trace(this + " blocking wait for Closer execution"); }
- result.getResult();
- if (trace) { log.trace(this + " got Closer result"); }
- }
- catch (InterruptedException e)
- {
- }
+ sessionExecutor.execute(new Closer(result));
+
+ if (trace) { log.trace(this + " blocking wait for Closer execution"); }
+ result.getResult();
+ if (trace) { log.trace(this + " got Closer result"); }
}
private void queueRunner(ListenerRunner runner)
{
- try
- {
- this.sessionExecutor.execute(runner);
- }
- catch (InterruptedException e)
- {
- }
+ this.sessionExecutor.execute(runner);
}
private void messageAdded()
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.client.container;
+import java.util.concurrent.Executor;
+
import javax.jms.MessageListener;
import org.jboss.aop.joinpoint.Invocation;
@@ -36,6 +38,7 @@
import org.jboss.jms.exception.MessagingShutdownException;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.messaging.util.OrderedExecutorFactory;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -82,7 +85,7 @@
ConsumerState consumerState = (ConsumerState)((DelegateSupport)consumerDelegate).getState();
String consumerID = consumerState.getConsumerID();
int prefetchSize = consumerState.getBufferSize();
- QueuedExecutor sessionExecutor = sessionState.getExecutor();
+ Executor executor = sessionState.getExecutor();
int maxDeliveries = consumerState.getMaxDeliveries();
long redeliveryDelay = consumerState.getRedeliveryDelay();
@@ -109,7 +112,7 @@
ClientConsumer messageHandler =
new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
sessionDelegate, consumerDelegate, consumerID, queueName,
- prefetchSize, sessionExecutor, maxDeliveries, consumerState.isShouldAck(),
+ prefetchSize, executor, maxDeliveries, consumerState.isShouldAck(),
redeliveryDelay);
sessionState.addCallbackHandler(messageHandler);
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/SessionAspect.java 2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/SessionAspect.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -827,8 +827,8 @@
AsfMessageHolder holder = (AsfMessageHolder)msgs.removeFirst();
if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
-
- ClientConsumer.callOnMessage(del, state.getDistinguishedListener(), holder.consumerID,
+
+ ClientConsumer.callOnMessageStatic(del, state.getDistinguishedListener(), holder.consumerID,
holder.queueName, false,
holder.msg, ackMode, holder.maxDeliveries,
holder.connectionConsumerDelegate, holder.shouldAck);
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/state/SessionState.java 2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/state/SessionState.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -29,6 +29,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -46,7 +48,10 @@
import org.jboss.jms.tx.MessagingXAResource;
import org.jboss.jms.tx.ResourceManager;
import org.jboss.logging.Logger;
-import org.jboss.messaging.util.JBMExecutor;
+import org.jboss.messaging.util.CompatibleExecutor;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.Version;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
@@ -83,8 +88,10 @@
private MessagingXAResource xaResource;
private Object currentTxId;
- // Executor used for executing onMessage methods
- private JBMExecutor executor;
+ // ExecutorFactory used for executing onMessage methods
+ private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
+ Executors.newCachedThreadPool(new JBMThreadFactory("session-state")));
+ private CompatibleExecutor executor;
private boolean recoverCalled;
@@ -142,13 +149,13 @@
currentTxId = parent.getResourceManager().createLocalTx();
}
- executor = new JBMExecutor("jbm-client-session-" + sessionID);
-
clientAckList = new ArrayList();
// TODO could optimise this to use the same map of callbackmanagers (which holds refs
// to callbackhandlers) in the connection, instead of maintaining another map
callbackHandlers = new HashMap();
+
+ executor = executorFactory.getExecutor("jbm-client-session-threads");
}
// HierarchicalState implementation -------------------------------------------------------------
@@ -424,7 +431,7 @@
return xaResource;
}
- public JBMExecutor getExecutor()
+ public CompatibleExecutor getExecutor()
{
return executor;
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -51,13 +52,14 @@
import org.jboss.messaging.core.contract.ClusterNotification;
import org.jboss.messaging.core.contract.ClusterNotificationListener;
import org.jboss.messaging.core.contract.Replicator;
+import org.jboss.messaging.util.CompatibleExecutor;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.messaging.util.JNDIUtil;
-import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.Version;
import org.jboss.remoting.InvokerLocator;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -77,6 +79,9 @@
private static boolean trace = log.isTraceEnabled();
+ private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
+ Executors.newCachedThreadPool(new JBMThreadFactory("conn-factory-jndi-mapper")));
+
// Attributes -----------------------------------------------------------------------------------
protected Context initialContext;
@@ -90,7 +95,7 @@
private Replicator replicator;
- private QueuedExecutor notifyExecutor;
+ private CompatibleExecutor notifyExecutor;
// Constructors ---------------------------------------------------------------------------------
@@ -99,7 +104,7 @@
this.serverPeer = serverPeer;
endpoints = new HashMap();
delegates = new HashMap();
- notifyExecutor = new NamedThreadQueuedExecutor("jbm-cf-jndimapper");
+ notifyExecutor = executorFactory.getExecutor("jbm-cf-jndimapper");
}
// ConnectionFactoryManager implementation ------------------------------------------------------
@@ -424,14 +429,7 @@
//Run on a different thread to prevent distributed deadlock when multiple nodes are starting together
//and deploying connection factories concurrently
- try
- {
- notifyExecutor.execute(new NotifyRunner());
- }
- catch (InterruptedException e)
- {
- //Ignore
- }
+ notifyExecutor.execute(new NotifyRunner());
}
// Public ---------------------------------------------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
@@ -82,16 +83,18 @@
import org.jboss.messaging.core.impl.tx.TransactionException;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
import org.jboss.messaging.core.impl.tx.TxCallback;
+import org.jboss.messaging.util.CompatibleExecutor;
import org.jboss.messaging.util.ExceptionUtil;
+import org.jboss.messaging.util.ExecutorFactory;
import org.jboss.messaging.util.GUIDGenerator;
+import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.messaging.util.MessageQueueNameHelper;
-import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
* The server side representation of a JMS session.
@@ -130,6 +133,9 @@
// Static ---------------------------------------------------------------------------------------
+
+ private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
+ Executors.newCachedThreadPool(new JBMThreadFactory("server-session-endpoint")));
// Attributes -----------------------------------------------------------------------------------
private boolean trace = log.isTraceEnabled();
@@ -169,7 +175,7 @@
private long deliveryIdSequence;
//Temporary until we have our own NIO transport
- QueuedExecutor executor;
+ CompatibleExecutor executor;
private LinkedQueue toDeliver = new LinkedQueue();
@@ -187,7 +193,7 @@
{
this.id = sessionID;
- this.executor = new NamedThreadQueuedExecutor("jbm-server-session-" + sessionID);
+ this.executor = executorFactory.getExecutor("jbm-server-session-" + sessionID);
this.connectionEndpoint = connectionEndpoint;
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -37,6 +37,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
+import java.util.concurrent.Executors;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
@@ -75,14 +76,16 @@
import org.jboss.messaging.core.impl.tx.TransactionRepository;
import org.jboss.messaging.core.impl.tx.TxCallback;
import org.jboss.messaging.util.ClearableSemaphore;
+import org.jboss.messaging.util.CompatibleExecutor;
import org.jboss.messaging.util.ConcurrentHashSet;
-import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.StreamUtils;
import org.jgroups.Address;
import org.jgroups.View;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
@@ -110,6 +113,8 @@
private static final long SEMAPHORE_ACQUIRE_TIMEOUT = 10000;
+ private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
+ Executors.newCachedThreadPool(new JBMThreadFactory("msg-post-office")));
//End only used in testing
// Static ---------------------------------------------------------------------------------------
@@ -217,9 +222,9 @@
private ServerPeer serverPeer;
//Note this MUST be a queued executor to ensure replicate repsonses arrive back in order
- private QueuedExecutor replyExecutor;
+ private CompatibleExecutor replyExecutor;
- private QueuedExecutor replicateResponseExecutor;
+ private CompatibleExecutor replicateResponseExecutor;
private volatile int failoverNodeID = -1;
@@ -1632,10 +1637,9 @@
leftSet = new ConcurrentHashSet();
}
- //NOTE, MUST be a QueuedExecutor so we ensure that responses arrive back in order
- replyExecutor = new NamedThreadQueuedExecutor("jbm-reply-executor");
+ replyExecutor = executorFactory.getExecutor("jbm-reply-executor");
- replicateResponseExecutor = new NamedThreadQueuedExecutor("jbm-response-executor");
+ replicateResponseExecutor = executorFactory.getExecutor("jbm-response-executor");
}
private void deInit()
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/CompatibleExecutor.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/CompatibleExecutor.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/CompatibleExecutor.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/CompatibleExecutor.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.concurrent.Executor;
+
+public interface CompatibleExecutor extends Executor
+{
+
+ void clearAllExceptCurrentTask();
+
+ void clearClassLoader();
+
+ void shutdownNow();
+
+ void shutdownAfterProcessingCurrentlyQueuedTasks();
+
+ String getName();
+
+}
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/ExecutorFactory.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/ExecutorFactory.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/ExecutorFactory.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/ExecutorFactory.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.util;
+
+import java.util.concurrent.Executor;
+
+/*
+ *
+ * A ExecutorFactory
+ *
+ * Copied from JBM2.0
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public interface ExecutorFactory
+{
+ CompatibleExecutor getExecutor(String name);
+}
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/JBMThreadFactory.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/JBMThreadFactory.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/JBMThreadFactory.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/JBMThreadFactory.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.concurrent.ThreadFactory;
+
+/*
+ *
+ * A JBMThreadFactory
+ *
+ * Copied from JBM2.0
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class JBMThreadFactory implements ThreadFactory
+{
+ private ThreadGroup group;
+
+ public JBMThreadFactory(final String groupName)
+ {
+ this.group = new ThreadGroup(groupName);
+ }
+
+ public Thread newThread(final Runnable command)
+ {
+ Thread t = new Thread(group, command);
+
+ if (command instanceof CompatibleExecutor)
+ {
+ t.setName(((CompatibleExecutor)command).getName());
+ }
+
+ // Don't want to prevent VM from exiting
+ t.setDaemon(true);
+
+ return t;
+ }
+}
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,189 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.util;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+/*
+ *
+ * This factory creates a hierarchy of Executor which shares the threads of the
+ * parent Executor (typically, the root parent is a Thread pool).
+ *
+ * Copied from JBM2.0
+ *
+ * @author <a href="david.lloyd at jboss.com">David Lloyd</a>
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public final class OrderedExecutorFactory implements ExecutorFactory
+{
+ private final Executor parent;
+
+ private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
+
+ public OrderedExecutorFactory(final Executor parent)
+ {
+ this.parent = parent;
+ }
+
+ public CompatibleExecutor getExecutor(String name)
+ {
+ return new ChildExecutor(name);
+ }
+
+ private final class ChildExecutor implements CompatibleExecutor, Runnable
+ {
+ private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+
+ private boolean needToSetClassLoader = true;
+
+ private ClassLoader tcl;
+
+ private boolean shutdown = false;
+
+ private String name = "default-thread";
+
+ public ChildExecutor(String n)
+ {
+ name = n;
+ }
+
+ public void execute(Runnable command)
+ {
+ if (needToSetClassLoader)
+ {
+ tcl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return Thread.currentThread().getContextClassLoader();
+ }
+ });
+ }
+ synchronized (tasks)
+ {
+ if (!shutdown)
+ {
+ tasks.add(command);
+ if (tasks.size() == 1 && runningChildren.add(this))
+ {
+ parent.execute(this);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run()
+ {
+ if (needToSetClassLoader)
+ {
+ needToSetClassLoader = false;
+
+ AccessController.doPrivileged(new PrivilegedAction()
+ {
+ public Object run()
+ {
+ Thread.currentThread().setContextClassLoader(tcl);
+ return null;
+ }
+ });
+ }
+
+ for (;;)
+ {
+ final Runnable task;
+ synchronized (tasks)
+ {
+ task = tasks.poll();
+ if (task == null)
+ {
+ runningChildren.remove(this);
+ tasks.notify();
+ return;
+ }
+ }
+ task.run();
+ }
+ }
+
+ public void clearAllExceptCurrentTask()
+ {
+ synchronized (tasks)
+ {
+ tasks.clear();
+ }
+ }
+
+ public void clearClassLoader()
+ {
+ execute(new Runnable()
+ {
+ @SuppressWarnings("unchecked")
+ public void run()
+ {
+ needToSetClassLoader = true;
+ }
+ });
+ }
+
+ // old behavior also terminates the thread. with a pool we shouldn't do
+ // this.
+ public void shutdownNow()
+ {
+ tasks.clear();
+ }
+
+ public void shutdownAfterProcessingCurrentlyQueuedTasks()
+ {
+ synchronized (tasks)
+ {
+ shutdown = true;
+ while (tasks.size() > 0)
+ {
+ try
+ {
+ tasks.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ }
+}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/build.properties
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/build.properties 2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/build.properties 2008-09-18 19:44:05 UTC (rev 4984)
@@ -2,5 +2,6 @@
# Local overrides for the builds initiated by build.sh
#
-#test.bind.address=192.168.1.2
+test.bind.address=localhost
+jgroups.bind_addr=localhost
Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java 2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.util;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+
+import org.jboss.messaging.util.CompatibleExecutor;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.test.messaging.MessagingTestCase;
+
+public class OrderedExecutorFactoryTest extends MessagingTestCase
+{
+
+ private ArrayList<CounterRunnable> exeQueue = new ArrayList<CounterRunnable>();
+
+ public OrderedExecutorFactoryTest(String name)
+ {
+ super(name);
+ }
+
+ public void testExecutionOrder()
+ {
+ OrderedExecutorFactory factory = new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("test-thread-factory")));
+ CompatibleExecutor executor = factory.getExecutor("test executor");
+
+ final int numTasks = 200000;
+
+ CounterRunnable[] tasks = new CounterRunnable[numTasks];
+
+ for (int i = 0; i < numTasks; ++i)
+ {
+ tasks[i] = new CounterRunnable(this);
+ }
+ exeQueue.clear();
+ for (int i = 0; i < numTasks; ++i)
+ {
+ executor.execute(tasks[i]);
+ }
+
+ executor.shutdownAfterProcessingCurrentlyQueuedTasks();
+ assertTrue(exeQueue.size() == numTasks);
+
+ for (int i = 0; i < numTasks; ++i)
+ {
+ CounterRunnable finTask = exeQueue.get(i);
+ assertTrue(finTask == tasks[i]);
+ }
+
+ exeQueue.clear();
+ }
+
+ class CounterRunnable implements Runnable
+ {
+ private OrderedExecutorFactoryTest myUser;
+
+ public CounterRunnable(OrderedExecutorFactoryTest user)
+ {
+ myUser = user;
+ }
+
+ public void run()
+ {
+ myUser.registerOrder(this);
+ }
+
+ }
+
+ public void registerOrder(CounterRunnable task)
+ {
+ exeQueue.add(task);
+ }
+}
More information about the jboss-cvs-commits
mailing list