[jboss-cvs] jboss-jms/src/main/org/jboss/jms/server/plugin ...
Timothy Fox
tim.fox at jboss.com
Mon Jul 17 13:14:46 EDT 2006
User: timfox
Date: 06/07/17 13:14:46
Modified: src/main/org/jboss/jms/server/plugin JDBCChannelMapper.java
Log:
Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
Revision Changes Path
1.17 +24 -3 jboss-jms/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: JDBCChannelMapper.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -b -r1.16 -r1.17
--- JDBCChannelMapper.java 27 Jun 2006 19:44:39 -0000 1.16
+++ JDBCChannelMapper.java 17 Jul 2006 17:14:46 -0000 1.17
@@ -36,6 +36,7 @@
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.selector.Selector;
+import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.jms.server.plugin.contract.ChannelMapper;
import org.jboss.jms.server.subscription.DurableSubscription;
import org.jboss.jms.server.subscription.Subscription;
@@ -54,6 +55,7 @@
import org.jboss.tm.TransactionManagerServiceMBean;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
* JDBC Implementation of ChannelMapper
@@ -143,6 +145,8 @@
protected Properties sqlProperties;
protected List populateTables;
+ protected QueuedExecutorPool queuedExecutorPool;
+
// Constructors --------------------------------------------------
public JDBCChannelMapper()
@@ -183,6 +187,11 @@
this.channelIDManager = new IdManager("CHANNEL_ID", 10, pm);
}
+ public void setQueuedExecutorPool(QueuedExecutorPool pool) throws Exception
+ {
+ this.queuedExecutorPool = pool;
+ }
+
// ServiceMBeanSupport overrides ---------------------------------
protected void startService() throws Exception
@@ -270,7 +279,10 @@
// TODO I am using LocalQueues for the time being, switch to distributed Queues
if (isQueue)
{
- cd = new Queue(id, ms, pm, mm, true, fullSize, pageSize, downCacheSize);
+ //We allocate an executor for the queue from the rotating pool
+ QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(destName);
+
+ cd = new Queue(id, ms, pm, mm, true, fullSize, pageSize, downCacheSize, executor);
try
{
@@ -555,9 +567,13 @@
throw new javax.jms.IllegalStateException("Topic " + topicName + " is not loaded");
}
+ //We allocate an executor for the subscription from the rotating pool
+ //Currently all subscriptions for the same topic share the same executor
+ QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(topicName);
+
return new Subscription(id, topic, ms, pm, mm,
topic.getFullSize(), topic.getPageSize(),
- topic.getDownCacheSize(), sel, noLocal);
+ topic.getDownCacheSize(), executor, sel, noLocal);
}
catch (Exception e)
{
@@ -1109,9 +1125,14 @@
throw new javax.jms.IllegalStateException("Topic " + topicName + " is not loaded");
}
+ //We allocate an executor for the subscription from the rotating pool
+ //Currently all subscriptions for the same topic share the same executor
+ QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(topicName);
+
DurableSubscription subscription =
new DurableSubscription(id, topic, ms, pm, mm,
- topic.getFullSize(), topic.getPageSize(), topic.getDownCacheSize(), selector,
+ topic.getFullSize(), topic.getPageSize(), topic.getDownCacheSize(),
+ executor, selector,
noLocal, subscriptionName, clientID);
subs.put(subscriptionName, subscription);
More information about the jboss-cvs-commits
mailing list