[jboss-cvs] jboss-jms/src/main/org/jboss/jms/server/plugin ...

Timothy Fox tim.fox at jboss.com
Mon Jul 17 13:14:46 EDT 2006


  User: timfox  
  Date: 06/07/17 13:14:46

  Modified:    src/main/org/jboss/jms/server/plugin  JDBCChannelMapper.java
  Log:
  Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
  
  Revision  Changes    Path
  1.17      +24 -3     jboss-jms/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: JDBCChannelMapper.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/jms/server/plugin/JDBCChannelMapper.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -b -r1.16 -r1.17
  --- JDBCChannelMapper.java	27 Jun 2006 19:44:39 -0000	1.16
  +++ JDBCChannelMapper.java	17 Jul 2006 17:14:46 -0000	1.17
  @@ -36,6 +36,7 @@
   import org.jboss.jms.destination.JBossQueue;
   import org.jboss.jms.destination.JBossTopic;
   import org.jboss.jms.selector.Selector;
  +import org.jboss.jms.server.QueuedExecutorPool;
   import org.jboss.jms.server.plugin.contract.ChannelMapper;
   import org.jboss.jms.server.subscription.DurableSubscription;
   import org.jboss.jms.server.subscription.Subscription;
  @@ -54,6 +55,7 @@
   import org.jboss.tm.TransactionManagerServiceMBean;
   
   import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
  +import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
   
   /**
    * JDBC Implementation of ChannelMapper
  @@ -143,6 +145,8 @@
      protected Properties sqlProperties;
      protected List populateTables;
   
  +   protected QueuedExecutorPool queuedExecutorPool;
  +
      // Constructors --------------------------------------------------
      
      public JDBCChannelMapper()
  @@ -183,6 +187,11 @@
         this.channelIDManager = new IdManager("CHANNEL_ID", 10, pm);
      }
      
  +   public void setQueuedExecutorPool(QueuedExecutorPool pool) throws Exception
  +   {
  +      this.queuedExecutorPool = pool;
  +   }
  +   
      // ServiceMBeanSupport overrides ---------------------------------
      
      protected void startService() throws Exception
  @@ -270,7 +279,10 @@
            // TODO I am using LocalQueues for the time being, switch to distributed Queues
            if (isQueue)
            {
  -            cd = new Queue(id, ms, pm, mm, true, fullSize, pageSize, downCacheSize);
  +            //We allocate an executor for the queue from the rotating pool
  +            QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(destName);
  +            
  +            cd = new Queue(id, ms, pm, mm, true, fullSize, pageSize, downCacheSize, executor);
               
               try
               {
  @@ -555,9 +567,13 @@
               throw new javax.jms.IllegalStateException("Topic " + topicName + " is not loaded");
            }
   
  +         //We allocate an executor for the subscription from the rotating pool
  +         //Currently all subscriptions for the same topic share the same executor
  +         QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(topicName);
  +                  
            return new Subscription(id, topic, ms, pm, mm,
                                    topic.getFullSize(), topic.getPageSize(),
  -                                 topic.getDownCacheSize(), sel, noLocal);
  +                                 topic.getDownCacheSize(), executor, sel, noLocal);
         }
         catch (Exception e)
         {
  @@ -1109,9 +1125,14 @@
            throw new javax.jms.IllegalStateException("Topic " + topicName + " is not loaded");
         }
          
  +      //We allocate an executor for the subscription from the rotating pool
  +      //Currently all subscriptions for the same topic share the same executor
  +      QueuedExecutor executor = (QueuedExecutor)queuedExecutorPool.get(topicName);
  +             
         DurableSubscription subscription =
            new DurableSubscription(id, topic, ms, pm, mm,
  -               topic.getFullSize(), topic.getPageSize(), topic.getDownCacheSize(), selector,
  +               topic.getFullSize(), topic.getPageSize(), topic.getDownCacheSize(),
  +               executor, selector,
                  noLocal, subscriptionName, clientID);
         
         subs.put(subscriptionName, subscription);
  
  
  



More information about the jboss-cvs-commits mailing list