[jboss-cvs] JBoss Messaging SVN: r4478 - in trunk/src/main/org/jboss/messaging/core/server: impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 16 07:56:32 EDT 2008


Author: timfox
Date: 2008-06-16 07:56:32 -0400 (Mon, 16 Jun 2008)
New Revision: 4478

Modified:
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Small refactoring on server classes before writing server tests


Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -35,6 +35,7 @@
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.version.Version;
 import org.jboss.messaging.util.OrderedExecutorFactory;
 
@@ -99,4 +100,6 @@
    DeploymentManager getDeploymentManager();
    
    OrderedExecutorFactory getOrderedExecutorFactory();
+   
+   ResourceManager getResourceManager();
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -40,6 +40,8 @@
 {
 	long getID();
 	
+	MessagingServer getServer();
+	
 	ConnectionCreateSessionResponseMessage createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
                                                         PacketReturner sender) throws Exception;
 	
@@ -49,8 +51,6 @@
 	
 	void close() throws Exception;
 	
-	SecurityStore getSecurityStore();
-	
 	String getUsername();
 	
 	String getPassword();

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -47,6 +47,8 @@
 {
 	long getID();
 	
+	ServerConnection getConnection();
+	
 	void removeBrowser(ServerBrowserImpl browser) throws Exception;
 	
 	void removeConsumer(ServerConsumer consumer) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -21,6 +21,13 @@
   */
 package org.jboss.messaging.core.server.impl;
 
+import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.deployers.Deployer;
@@ -60,10 +67,7 @@
 import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.VersionLoader;
 
-import java.util.HashSet;
-import java.util.concurrent.*;
 
-
 /**
  * A Messaging Server
  *
@@ -346,14 +350,9 @@
 
       securityStore.authenticate(username, password);
 
-      long id = remotingService.getDispatcher().generateID();
-
       final ServerConnection connection =
-              new ServerConnectionImpl(id, username, password,
-                      sender.getSessionID(), clientAddress,
-                      remotingService.getDispatcher(), resourceManager, storageManager,
-                      queueSettingsRepository,
-                      postOffice, securityStore, connectionManager, orderedExecutorFactory);
+              new ServerConnectionImpl(this, username, password,
+                      sender.getSessionID(), clientAddress);
 
       remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
 
@@ -365,6 +364,11 @@
       return orderedExecutorFactory;
    }
 
+   public ResourceManager getResourceManager()
+   {
+      return resourceManager;
+   }
+   
    // Public ---------------------------------------------------------------------------------------
 
    // Package protected ----------------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -65,8 +65,6 @@
 
    // Static ---------------------------------------------------------------------------------------
 
-   private static boolean trace = log.isTraceEnabled();
-
    // Attributes -----------------------------------------------------------------------------------
 
    private final long id;
@@ -77,12 +75,13 @@
 
    // Constructors ---------------------------------------------------------------------------------
 
-   ServerBrowserImpl(final long id, final ServerSession session,
-                     final Queue destination, final String messageFilter) throws Exception
+   public ServerBrowserImpl(final ServerSession session,
+                            final Queue destination, final String messageFilter) throws Exception
    {     
       this.session = session;
-      this.id = id;
       
+      this.id = session.getConnection().getServer().getRemotingService().getDispatcher().generateID();
+      
       this.destination = destination;
 
 		if (messageFilter != null)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -26,22 +26,17 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketReturner;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.server.ConnectionManager;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConnection;
 import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.ConcurrentHashSet;
-import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -63,8 +58,6 @@
 
    // Static ---------------------------------------------------------------------------------------
 
-   private static boolean trace = log.isTraceEnabled();
-
    // Attributes -----------------------------------------------------------------------------------
 
    private final long id;
@@ -76,23 +69,7 @@
    private final long remotingClientSessionID;
    
    private final String clientAddress;
-      
-   private final PacketDispatcher dispatcher;
-   
-   private final ResourceManager resourceManager;
-   
-   private final StorageManager persistenceManager;  
-   
-   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-      
-   private final PostOffice postOffice;
-   
-   private final SecurityStore securityStore;
-   
-   private final ConnectionManager connectionManager;
-   
-   private final OrderedExecutorFactory orderedExecutorFactory;
-
+         
    private final long createdTime;
          
    private final Set<ServerSession> sessions = new ConcurrentHashSet<ServerSession>();
@@ -100,24 +77,27 @@
    private final Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
    
    private final Set<SimpleString> temporaryDestinations = new ConcurrentHashSet<SimpleString>();
+   
+   private final MessagingServer server;
       
    private volatile boolean started;
+   
+   //We cache some of the service locally
+   
+   private final PostOffice postOffice;
 
+   private final ConnectionManager connectionManager;
 
+   private final PacketDispatcher dispatcher;
+
    // Constructors ---------------------------------------------------------------------------------
       
-   public ServerConnectionImpl(final long id, final String username, final String password,
+   public ServerConnectionImpl(final MessagingServer server,
+                               final String username, final String password,
    		                      final long remotingClientSessionID,
-   		                      final String clientAddress,
-   		                      final PacketDispatcher dispatcher,
-   		                      final ResourceManager resourceManager,
-   		                      final StorageManager persistenceManager,
-   		                      final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-   		                      final PostOffice postOffice, final SecurityStore securityStore,
-   		                      final ConnectionManager connectionManager,
-   		                      final OrderedExecutorFactory orderedExecutorFactory)
+   		                      final String clientAddress)
    {
-   	this.id = id;
+   	this.id = server.getRemotingService().getDispatcher().generateID();
       
    	this.username = username;
       
@@ -127,27 +107,19 @@
 
       this.clientAddress = clientAddress;
 
-      this.dispatcher = dispatcher;
+      started = false;
       
-      this.resourceManager = resourceManager;
+      createdTime = System.currentTimeMillis();
+
+      server.getConnectionManager().registerConnection(remotingClientSessionID, this);
       
-      this.persistenceManager = persistenceManager;
+      this.server = server;
       
-      this.queueSettingsRepository = queueSettingsRepository;      
+      this.dispatcher = server.getRemotingService().getDispatcher();
       
-      this.postOffice = postOffice;
+      this.postOffice = server.getPostOffice();
       
-      this.securityStore = securityStore;
-      
-      this.connectionManager = connectionManager;
-      
-      this.orderedExecutorFactory = orderedExecutorFactory;
-      
-      started = false;
-      
-      createdTime = System.currentTimeMillis();
-
-      connectionManager.registerConnection(remotingClientSessionID, this);
+      this.connectionManager = server.getConnectionManager();
    }
 
    // ServerConnection implementation ------------------------------------------------------------
@@ -157,15 +129,17 @@
    	return id;
    }
    
+   public MessagingServer getServer()
+   {
+      return server;
+   }
+   
    public ConnectionCreateSessionResponseMessage createSession(final boolean xa, final boolean autoCommitSends,
    		                                                      final boolean autoCommitAcks,
                                                                final PacketReturner sender) throws Exception
-   {           
-      long id = dispatcher.generateID();
+   {            
       ServerSession session =
-         new ServerSessionImpl(id, autoCommitSends, autoCommitAcks, xa, this, resourceManager,
-         		sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore,
-         		orderedExecutorFactory.getOrderedExecutor());
+         new ServerSessionImpl(this, autoCommitSends, autoCommitAcks, xa, sender);
 
       sessions.add(session);
       
@@ -225,11 +199,6 @@
       dispatcher.unregister(id);
    }
    
-   public SecurityStore getSecurityStore()
-   {
-      return securityStore;
-   }
-   
    public String getUsername()
    {
       return username;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -29,6 +29,7 @@
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConsumer;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -37,8 +38,6 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.util.TokenBucketLimiter;
-import org.jboss.messaging.util.TokenBucketLimiterImpl;
 
 /**
  * Concrete implementation of a ClientConsumer. 
@@ -77,17 +76,9 @@
    
    private final boolean autoDeleteQueue;
    
-   private final TokenBucketLimiter limiter;
-   
    private final long connectionID;   
    
-   private final ServerSession sessionEndpoint;
-
-   private final StorageManager persistenceManager;
-   
-   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-   
-   private final PostOffice postOffice;
+   private final ServerSession session;
          
    private final Object startStopLock = new Object();
 
@@ -95,18 +86,21 @@
    
    private boolean started;
    
+   //We cache some of the service locally
+   private final StorageManager storageManager;
+   
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+   
+   private final PostOffice postOffice;
+   
    // Constructors ---------------------------------------------------------------------------------
  
-   ServerConsumerImpl(final long id, final long clientTargetID, final Queue messageQueue, final boolean noLocal, final Filter filter,
+   ServerConsumerImpl(final ServerSession session, final long clientTargetID,
+                      final Queue messageQueue, final boolean noLocal, final Filter filter,
    		             final boolean autoDeleteQueue, final boolean enableFlowControl, final int maxRate,
-   		             final long connectionID, final ServerSession sessionEndpoint,
-					       final StorageManager persistenceManager,
-					       final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-					       final PostOffice postOffice,
+   		             final long connectionID, 
 					       final boolean started)
    {
-   	this.id = id;
-   	
    	this.clientTargetID = clientTargetID;
       
       this.messageQueue = messageQueue;
@@ -117,25 +111,12 @@
       
       this.autoDeleteQueue = autoDeleteQueue;
       
-      if (maxRate != -1)
-      {
-      	limiter = new TokenBucketLimiterImpl(maxRate, false);
-      }
-      else
-      {
-      	limiter = null;
-      }
-
       this.connectionID = connectionID;
 
-      this.sessionEndpoint = sessionEndpoint;
-
-      this.persistenceManager = persistenceManager;
+      this.session = session;
       
-      this.queueSettingsRepository = queueSettingsRepository;
-      
-      this.postOffice = postOffice;
-      
+      MessagingServer server = session.getConnection().getServer();
+
       this.started = started;
       
       if (enableFlowControl)
@@ -147,6 +128,14 @@
       	availableCredits = null;
       }
       
+      this.storageManager = server.getStorageManager();
+      
+      this.queueSettingsRepository = server.getQueueSettingsRepository();
+      
+      this.postOffice = server.getPostOffice();
+      
+      this.id = server.getRemotingService().getDispatcher().generateID();
+            
       messageQueue.addConsumer(this);
    }
 
@@ -171,7 +160,7 @@
       
       if (ref.getMessage().isExpired())
       {         
-         ref.expire(persistenceManager, postOffice, queueSettingsRepository);
+         ref.expire(storageManager, postOffice, queueSettingsRepository);
          
          return HandleStatus.HANDLED;
       }
@@ -198,7 +187,7 @@
 
             if (connectionID == conId)
             {	            	
-            	Transaction tx = new TransactionImpl(persistenceManager, postOffice);
+            	Transaction tx = new TransactionImpl(storageManager, postOffice);
             	
             	tx.addAcknowledgement(ref);
             	
@@ -215,7 +204,7 @@
                    
          try
          {            
-         	sessionEndpoint.handleDelivery(ref, this);
+         	session.handleDelivery(ref, this);
          }
          catch (Exception e)
          {
@@ -247,12 +236,12 @@
             
             if (messageQueue.isDurable())
             {
-               messageQueue.deleteAllReferences(persistenceManager);
+               messageQueue.deleteAllReferences(storageManager);
             }
          }
       }
       
-      sessionEndpoint.removeConsumer(this);           
+      session.removeConsumer(this);           
    }
    
    public void setStarted(final boolean started)
@@ -302,6 +291,6 @@
 
    private void promptDelivery()
    {
-      sessionEndpoint.promptDelivery(messageQueue);
+      session.promptDelivery(messageQueue);
    } 
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -64,13 +64,13 @@
      	
 	// Constructors ----------------------------------------------------------------
 	
-	public ServerProducerImpl(final long id, final long clientTargetID, final ServerSession session,
+	public ServerProducerImpl(final ServerSession session, final long clientTargetID,
 	                          final SimpleString address, 
 			                    final PacketReturner sender,
 			                    final FlowController flowController,
 			                    final int windowSize) throws Exception
 	{
-		this.id = id;
+		this.id = session.getConnection().getServer().getRemotingService().getDispatcher().generateID();
 		
 		this.clientTargetID = clientTargetID;
       
@@ -97,7 +97,6 @@
 		session.removeProducer(this);
 	}
 	
-
 	public void send(final ServerMessage message) throws Exception
 	{		
 		if (this.address != null)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-06-16 11:56:32 UTC (rev 4478)
@@ -56,6 +56,7 @@
 import org.jboss.messaging.core.security.SecurityStore;
 import org.jboss.messaging.core.server.Delivery;
 import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerConnection;
 import org.jboss.messaging.core.server.ServerConsumer;
@@ -105,21 +106,9 @@
    private final boolean autoCommitAcks;
 
    private final ServerConnection connection;
-
-   private final ResourceManager resourceManager;
-
+  
    private final PacketReturner sender;
 
-   private final PacketDispatcher dispatcher;
-
-   private final StorageManager persistenceManager;
-
-   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
-   private final PostOffice postOffice;
-
-   private final SecurityStore securityStore;
-
    private final Set<ServerConsumer> consumers = new ConcurrentHashSet<ServerConsumer>();
 
    private final Set<ServerBrowserImpl> browsers = new ConcurrentHashSet<ServerBrowserImpl>();
@@ -133,51 +122,51 @@
    private final Executor executor;
 
    private Transaction tx;
+   
+   //We cache some of the services locally
+   
+   private final StorageManager storageManager;
 
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+   private final ResourceManager resourceManager;
+   
+   private final PostOffice postOffice;
+
+   private final SecurityStore securityStore;
+   
+   private final PacketDispatcher dispatcher;
+   
    // Constructors
    // ---------------------------------------------------------------------------------
 
-   public ServerSessionImpl(final long id, final boolean autoCommitSends,
+   public ServerSessionImpl(final ServerConnection connection, final boolean autoCommitSends,
                             final boolean autoCommitAcks,
-                            final boolean xa, final ServerConnection connection,
-                            final ResourceManager resourceManager, final PacketReturner sender,
-                            final PacketDispatcher dispatcher, final StorageManager persistenceManager,
-                            final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-                            final PostOffice postOffice, final SecurityStore securityStore,
-                            final Executor executor) throws Exception
+                            final boolean xa, 
+                            final PacketReturner sender) throws Exception
    {
-      this.id = id;
-
       this.autoCommitSends = autoCommitSends;
 
       this.autoCommitAcks = autoCommitAcks;
-
-      if (!xa)
-      {
-         tx = new TransactionImpl(persistenceManager, postOffice);
-      }
-
+      
       this.connection = connection;
-
-      this.resourceManager = resourceManager;
-
+      
       this.sender = sender;
-
-      this.dispatcher = dispatcher;
-
-      this.persistenceManager = persistenceManager;
-
-      this.queueSettingsRepository = queueSettingsRepository;
-
-      this.postOffice = postOffice;
-
-      this.securityStore = securityStore;
       
-      this.executor = executor;
+      MessagingServer server = connection.getServer();
+            
+      this.storageManager = server.getStorageManager();
+      this.postOffice = server.getPostOffice();
+      this.queueSettingsRepository = server.getQueueSettingsRepository();
+      this.resourceManager = server.getResourceManager();
+      this.securityStore = server.getSecurityStore();
+      this.dispatcher = server.getRemotingService().getDispatcher();
+      this.id = dispatcher.generateID();      
+      this.executor = server.getOrderedExecutorFactory().getOrderedExecutor();
 
-      if (log.isTraceEnabled())
+      if (!xa)
       {
-         log.trace("created server session endpoint for " + sender.getRemoteAddress());
+         tx = new TransactionImpl(storageManager, postOffice);
       }
    }
 
@@ -300,6 +289,7 @@
          if (!autoCommitSends)
          {
             MessagingException messagingException;
+            
             if (e instanceof MessagingException)
             {
                messagingException = (MessagingException) e;
@@ -308,12 +298,13 @@
             {
                messagingException = new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage());
             }
+            
             tx.markAsRollbackOnly(messagingException);
          }
          throw e;         
       }
 
-      msg.setMessageID(persistenceManager.generateMessageID());
+      msg.setMessageID(storageManager.generateMessageID());
       
       // This allows the no-local consumers to filter out the messages that come
       // from the same connection.
@@ -326,7 +317,7 @@
 
          if (msg.getDurableRefCount() != 0)
          {
-            persistenceManager.storeMessage(msg);
+            storageManager.storeMessage(msg);
          }
          
          for (MessageReference ref : refs)
@@ -433,7 +424,7 @@
       {
          // Might be null if XA
 
-         tx = new TransactionImpl(persistenceManager, postOffice);
+         tx = new TransactionImpl(storageManager, postOffice);
       }
       
       //We need to lock all the queues while we're rolling back, to prevent any deliveries occurring during this
@@ -475,7 +466,7 @@
          }
       }
       
-      tx = new TransactionImpl(persistenceManager, postOffice);
+      tx = new TransactionImpl(storageManager, postOffice);
    }
 
    public void cancel(final long deliveryID, final boolean expired) throws Exception
@@ -498,7 +489,7 @@
          
          try
          {
-            Transaction cancelTx = new TransactionImpl(persistenceManager, postOffice);
+            Transaction cancelTx = new TransactionImpl(storageManager, postOffice);
    
             for (Delivery del : deliveries)
             {
@@ -534,7 +525,7 @@
 
             if (delivery.getDeliveryID() == deliveryID)
             {
-               delivery.getReference().expire(persistenceManager, postOffice, queueSettingsRepository);
+               delivery.getReference().expire(storageManager, postOffice, queueSettingsRepository);
 
                iter.remove();
 
@@ -556,7 +547,7 @@
       }
       finally
       {
-         tx = new TransactionImpl(persistenceManager, postOffice);
+         tx = new TransactionImpl(storageManager, postOffice);
       }
 
 
@@ -566,7 +557,7 @@
    {
       if (tx != null)
       {
-         final String msg = "Cannot commit, session is currently doing work in a transaction "
+         final String msg = "Cannot commit, session is currently doing work in transaction "
                  + tx.getXid();
 
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
@@ -806,7 +797,7 @@
          return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
       }
 
-      tx = new TransactionImpl(xid, persistenceManager, postOffice);
+      tx = new TransactionImpl(xid, storageManager, postOffice);
 
       boolean added = resourceManager.putTransaction(xid, tx);
 
@@ -958,7 +949,7 @@
 
       if (queue.isDurable())
       {
-         binding.getQueue().deleteAllReferences(persistenceManager);
+         binding.getQueue().deleteAllReferences(storageManager);
       }
 
       if (queue.isTemporary())
@@ -997,11 +988,10 @@
 
       maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
 
-      long id = dispatcher.generateID();
-
       ServerConsumer consumer =
-              new ServerConsumerImpl(id, clientTargetID, binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
-                      this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
+              new ServerConsumerImpl(this, clientTargetID, binding.getQueue(), noLocal, filter,
+                                     autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
+                                     connection.isStarted());
 
       dispatcher.register(new ServerConsumerPacketHandler(consumer));
 
@@ -1080,10 +1070,8 @@
 
       securityStore.check(binding.getAddress(), CheckType.READ, connection);
 
-      long id = dispatcher.generateID();
+      ServerBrowserImpl browser = new ServerBrowserImpl(this, binding.getQueue(), filterString == null ? null : filterString.toString());
 
-      ServerBrowserImpl browser = new ServerBrowserImpl(id, this, binding.getQueue(), filterString == null ? null : filterString.toString());
-
       browsers.add(browser);
 
       dispatcher.register(browser.newHandler());
@@ -1112,8 +1100,6 @@
    		flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
    	}
 
-      long id = dispatcher.generateID();
-      
       final int windowToUse = flowController == null ? -1 : windowSize;
       
       //Server window size is 0.75 client window size for producer flow control (other way round to consumer flow control)
@@ -1121,7 +1107,7 @@
       final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);            
       
       ServerProducerImpl producer 
-         = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController, serverWindowSize);
+         = new ServerProducerImpl(this, clientTargetID, address, sender, flowController, serverWindowSize);
 
       producers.add(producer);
 
@@ -1155,11 +1141,11 @@
 
          if (count == 0)
          {
-            persistenceManager.storeDelete(message.getMessageID());
+            storageManager.storeDelete(message.getMessageID());
          }
          else
          {
-            persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
+            storageManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
          }
       }
 




More information about the jboss-cvs-commits mailing list