[jboss-cvs] JBoss Messaging SVN: r4478 - in trunk/src/main/org/jboss/messaging/core/server: impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 16 07:56:32 EDT 2008
Author: timfox
Date: 2008-06-16 07:56:32 -0400 (Mon, 16 Jun 2008)
New Revision: 4478
Modified:
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Small refactoring on server classes before writing server tests
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.version.Version;
import org.jboss.messaging.util.OrderedExecutorFactory;
@@ -99,4 +100,6 @@
DeploymentManager getDeploymentManager();
OrderedExecutorFactory getOrderedExecutorFactory();
+
+ ResourceManager getResourceManager();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConnection.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -40,6 +40,8 @@
{
long getID();
+ MessagingServer getServer();
+
ConnectionCreateSessionResponseMessage createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
PacketReturner sender) throws Exception;
@@ -49,8 +51,6 @@
void close() throws Exception;
- SecurityStore getSecurityStore();
-
String getUsername();
String getPassword();
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -47,6 +47,8 @@
{
long getID();
+ ServerConnection getConnection();
+
void removeBrowser(ServerBrowserImpl browser) throws Exception;
void removeConsumer(ServerConsumer consumer) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -21,6 +21,13 @@
*/
package org.jboss.messaging.core.server.impl;
+import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.deployers.Deployer;
@@ -60,10 +67,7 @@
import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.VersionLoader;
-import java.util.HashSet;
-import java.util.concurrent.*;
-
/**
* A Messaging Server
*
@@ -346,14 +350,9 @@
securityStore.authenticate(username, password);
- long id = remotingService.getDispatcher().generateID();
-
final ServerConnection connection =
- new ServerConnectionImpl(id, username, password,
- sender.getSessionID(), clientAddress,
- remotingService.getDispatcher(), resourceManager, storageManager,
- queueSettingsRepository,
- postOffice, securityStore, connectionManager, orderedExecutorFactory);
+ new ServerConnectionImpl(this, username, password,
+ sender.getSessionID(), clientAddress);
remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
@@ -365,6 +364,11 @@
return orderedExecutorFactory;
}
+ public ResourceManager getResourceManager()
+ {
+ return resourceManager;
+ }
+
// Public ---------------------------------------------------------------------------------------
// Package protected ----------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerBrowserImpl.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -65,8 +65,6 @@
// Static ---------------------------------------------------------------------------------------
- private static boolean trace = log.isTraceEnabled();
-
// Attributes -----------------------------------------------------------------------------------
private final long id;
@@ -77,12 +75,13 @@
// Constructors ---------------------------------------------------------------------------------
- ServerBrowserImpl(final long id, final ServerSession session,
- final Queue destination, final String messageFilter) throws Exception
+ public ServerBrowserImpl(final ServerSession session,
+ final Queue destination, final String messageFilter) throws Exception
{
this.session = session;
- this.id = id;
+ this.id = session.getConnection().getServer().getRemotingService().getDispatcher().generateID();
+
this.destination = destination;
if (messageFilter != null)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionImpl.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -26,22 +26,17 @@
import java.util.Set;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.impl.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.server.ConnectionManager;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConnection;
import org.jboss.messaging.core.server.ServerSession;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.ConcurrentHashSet;
-import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.SimpleString;
/**
@@ -63,8 +58,6 @@
// Static ---------------------------------------------------------------------------------------
- private static boolean trace = log.isTraceEnabled();
-
// Attributes -----------------------------------------------------------------------------------
private final long id;
@@ -76,23 +69,7 @@
private final long remotingClientSessionID;
private final String clientAddress;
-
- private final PacketDispatcher dispatcher;
-
- private final ResourceManager resourceManager;
-
- private final StorageManager persistenceManager;
-
- private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
- private final PostOffice postOffice;
-
- private final SecurityStore securityStore;
-
- private final ConnectionManager connectionManager;
-
- private final OrderedExecutorFactory orderedExecutorFactory;
-
+
private final long createdTime;
private final Set<ServerSession> sessions = new ConcurrentHashSet<ServerSession>();
@@ -100,24 +77,27 @@
private final Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
private final Set<SimpleString> temporaryDestinations = new ConcurrentHashSet<SimpleString>();
+
+ private final MessagingServer server;
private volatile boolean started;
+
+ //We cache some of the service locally
+
+ private final PostOffice postOffice;
+ private final ConnectionManager connectionManager;
+ private final PacketDispatcher dispatcher;
+
// Constructors ---------------------------------------------------------------------------------
- public ServerConnectionImpl(final long id, final String username, final String password,
+ public ServerConnectionImpl(final MessagingServer server,
+ final String username, final String password,
final long remotingClientSessionID,
- final String clientAddress,
- final PacketDispatcher dispatcher,
- final ResourceManager resourceManager,
- final StorageManager persistenceManager,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final PostOffice postOffice, final SecurityStore securityStore,
- final ConnectionManager connectionManager,
- final OrderedExecutorFactory orderedExecutorFactory)
+ final String clientAddress)
{
- this.id = id;
+ this.id = server.getRemotingService().getDispatcher().generateID();
this.username = username;
@@ -127,27 +107,19 @@
this.clientAddress = clientAddress;
- this.dispatcher = dispatcher;
+ started = false;
- this.resourceManager = resourceManager;
+ createdTime = System.currentTimeMillis();
+
+ server.getConnectionManager().registerConnection(remotingClientSessionID, this);
- this.persistenceManager = persistenceManager;
+ this.server = server;
- this.queueSettingsRepository = queueSettingsRepository;
+ this.dispatcher = server.getRemotingService().getDispatcher();
- this.postOffice = postOffice;
+ this.postOffice = server.getPostOffice();
- this.securityStore = securityStore;
-
- this.connectionManager = connectionManager;
-
- this.orderedExecutorFactory = orderedExecutorFactory;
-
- started = false;
-
- createdTime = System.currentTimeMillis();
-
- connectionManager.registerConnection(remotingClientSessionID, this);
+ this.connectionManager = server.getConnectionManager();
}
// ServerConnection implementation ------------------------------------------------------------
@@ -157,15 +129,17 @@
return id;
}
+ public MessagingServer getServer()
+ {
+ return server;
+ }
+
public ConnectionCreateSessionResponseMessage createSession(final boolean xa, final boolean autoCommitSends,
final boolean autoCommitAcks,
final PacketReturner sender) throws Exception
- {
- long id = dispatcher.generateID();
+ {
ServerSession session =
- new ServerSessionImpl(id, autoCommitSends, autoCommitAcks, xa, this, resourceManager,
- sender, dispatcher, persistenceManager, queueSettingsRepository, postOffice, securityStore,
- orderedExecutorFactory.getOrderedExecutor());
+ new ServerSessionImpl(this, autoCommitSends, autoCommitAcks, xa, sender);
sessions.add(session);
@@ -225,11 +199,6 @@
dispatcher.unregister(id);
}
- public SecurityStore getSecurityStore()
- {
- return securityStore;
- }
-
public String getUsername()
{
return username;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConsumer;
import org.jboss.messaging.core.server.ServerMessage;
@@ -37,8 +38,6 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.util.TokenBucketLimiter;
-import org.jboss.messaging.util.TokenBucketLimiterImpl;
/**
* Concrete implementation of a ClientConsumer.
@@ -77,17 +76,9 @@
private final boolean autoDeleteQueue;
- private final TokenBucketLimiter limiter;
-
private final long connectionID;
- private final ServerSession sessionEndpoint;
-
- private final StorageManager persistenceManager;
-
- private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
- private final PostOffice postOffice;
+ private final ServerSession session;
private final Object startStopLock = new Object();
@@ -95,18 +86,21 @@
private boolean started;
+ //We cache some of the service locally
+ private final StorageManager storageManager;
+
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ private final PostOffice postOffice;
+
// Constructors ---------------------------------------------------------------------------------
- ServerConsumerImpl(final long id, final long clientTargetID, final Queue messageQueue, final boolean noLocal, final Filter filter,
+ ServerConsumerImpl(final ServerSession session, final long clientTargetID,
+ final Queue messageQueue, final boolean noLocal, final Filter filter,
final boolean autoDeleteQueue, final boolean enableFlowControl, final int maxRate,
- final long connectionID, final ServerSession sessionEndpoint,
- final StorageManager persistenceManager,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final PostOffice postOffice,
+ final long connectionID,
final boolean started)
{
- this.id = id;
-
this.clientTargetID = clientTargetID;
this.messageQueue = messageQueue;
@@ -117,25 +111,12 @@
this.autoDeleteQueue = autoDeleteQueue;
- if (maxRate != -1)
- {
- limiter = new TokenBucketLimiterImpl(maxRate, false);
- }
- else
- {
- limiter = null;
- }
-
this.connectionID = connectionID;
- this.sessionEndpoint = sessionEndpoint;
-
- this.persistenceManager = persistenceManager;
+ this.session = session;
- this.queueSettingsRepository = queueSettingsRepository;
-
- this.postOffice = postOffice;
-
+ MessagingServer server = session.getConnection().getServer();
+
this.started = started;
if (enableFlowControl)
@@ -147,6 +128,14 @@
availableCredits = null;
}
+ this.storageManager = server.getStorageManager();
+
+ this.queueSettingsRepository = server.getQueueSettingsRepository();
+
+ this.postOffice = server.getPostOffice();
+
+ this.id = server.getRemotingService().getDispatcher().generateID();
+
messageQueue.addConsumer(this);
}
@@ -171,7 +160,7 @@
if (ref.getMessage().isExpired())
{
- ref.expire(persistenceManager, postOffice, queueSettingsRepository);
+ ref.expire(storageManager, postOffice, queueSettingsRepository);
return HandleStatus.HANDLED;
}
@@ -198,7 +187,7 @@
if (connectionID == conId)
{
- Transaction tx = new TransactionImpl(persistenceManager, postOffice);
+ Transaction tx = new TransactionImpl(storageManager, postOffice);
tx.addAcknowledgement(ref);
@@ -215,7 +204,7 @@
try
{
- sessionEndpoint.handleDelivery(ref, this);
+ session.handleDelivery(ref, this);
}
catch (Exception e)
{
@@ -247,12 +236,12 @@
if (messageQueue.isDurable())
{
- messageQueue.deleteAllReferences(persistenceManager);
+ messageQueue.deleteAllReferences(storageManager);
}
}
}
- sessionEndpoint.removeConsumer(this);
+ session.removeConsumer(this);
}
public void setStarted(final boolean started)
@@ -302,6 +291,6 @@
private void promptDelivery()
{
- sessionEndpoint.promptDelivery(messageQueue);
+ session.promptDelivery(messageQueue);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -64,13 +64,13 @@
// Constructors ----------------------------------------------------------------
- public ServerProducerImpl(final long id, final long clientTargetID, final ServerSession session,
+ public ServerProducerImpl(final ServerSession session, final long clientTargetID,
final SimpleString address,
final PacketReturner sender,
final FlowController flowController,
final int windowSize) throws Exception
{
- this.id = id;
+ this.id = session.getConnection().getServer().getRemotingService().getDispatcher().generateID();
this.clientTargetID = clientTargetID;
@@ -97,7 +97,6 @@
session.removeProducer(this);
}
-
public void send(final ServerMessage message) throws Exception
{
if (this.address != null)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-06-16 10:56:54 UTC (rev 4477)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-06-16 11:56:32 UTC (rev 4478)
@@ -56,6 +56,7 @@
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.server.Delivery;
import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerConnection;
import org.jboss.messaging.core.server.ServerConsumer;
@@ -105,21 +106,9 @@
private final boolean autoCommitAcks;
private final ServerConnection connection;
-
- private final ResourceManager resourceManager;
-
+
private final PacketReturner sender;
- private final PacketDispatcher dispatcher;
-
- private final StorageManager persistenceManager;
-
- private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
- private final PostOffice postOffice;
-
- private final SecurityStore securityStore;
-
private final Set<ServerConsumer> consumers = new ConcurrentHashSet<ServerConsumer>();
private final Set<ServerBrowserImpl> browsers = new ConcurrentHashSet<ServerBrowserImpl>();
@@ -133,51 +122,51 @@
private final Executor executor;
private Transaction tx;
+
+ //We cache some of the services locally
+
+ private final StorageManager storageManager;
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ private final ResourceManager resourceManager;
+
+ private final PostOffice postOffice;
+
+ private final SecurityStore securityStore;
+
+ private final PacketDispatcher dispatcher;
+
// Constructors
// ---------------------------------------------------------------------------------
- public ServerSessionImpl(final long id, final boolean autoCommitSends,
+ public ServerSessionImpl(final ServerConnection connection, final boolean autoCommitSends,
final boolean autoCommitAcks,
- final boolean xa, final ServerConnection connection,
- final ResourceManager resourceManager, final PacketReturner sender,
- final PacketDispatcher dispatcher, final StorageManager persistenceManager,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final PostOffice postOffice, final SecurityStore securityStore,
- final Executor executor) throws Exception
+ final boolean xa,
+ final PacketReturner sender) throws Exception
{
- this.id = id;
-
this.autoCommitSends = autoCommitSends;
this.autoCommitAcks = autoCommitAcks;
-
- if (!xa)
- {
- tx = new TransactionImpl(persistenceManager, postOffice);
- }
-
+
this.connection = connection;
-
- this.resourceManager = resourceManager;
-
+
this.sender = sender;
-
- this.dispatcher = dispatcher;
-
- this.persistenceManager = persistenceManager;
-
- this.queueSettingsRepository = queueSettingsRepository;
-
- this.postOffice = postOffice;
-
- this.securityStore = securityStore;
- this.executor = executor;
+ MessagingServer server = connection.getServer();
+
+ this.storageManager = server.getStorageManager();
+ this.postOffice = server.getPostOffice();
+ this.queueSettingsRepository = server.getQueueSettingsRepository();
+ this.resourceManager = server.getResourceManager();
+ this.securityStore = server.getSecurityStore();
+ this.dispatcher = server.getRemotingService().getDispatcher();
+ this.id = dispatcher.generateID();
+ this.executor = server.getOrderedExecutorFactory().getOrderedExecutor();
- if (log.isTraceEnabled())
+ if (!xa)
{
- log.trace("created server session endpoint for " + sender.getRemoteAddress());
+ tx = new TransactionImpl(storageManager, postOffice);
}
}
@@ -300,6 +289,7 @@
if (!autoCommitSends)
{
MessagingException messagingException;
+
if (e instanceof MessagingException)
{
messagingException = (MessagingException) e;
@@ -308,12 +298,13 @@
{
messagingException = new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage());
}
+
tx.markAsRollbackOnly(messagingException);
}
throw e;
}
- msg.setMessageID(persistenceManager.generateMessageID());
+ msg.setMessageID(storageManager.generateMessageID());
// This allows the no-local consumers to filter out the messages that come
// from the same connection.
@@ -326,7 +317,7 @@
if (msg.getDurableRefCount() != 0)
{
- persistenceManager.storeMessage(msg);
+ storageManager.storeMessage(msg);
}
for (MessageReference ref : refs)
@@ -433,7 +424,7 @@
{
// Might be null if XA
- tx = new TransactionImpl(persistenceManager, postOffice);
+ tx = new TransactionImpl(storageManager, postOffice);
}
//We need to lock all the queues while we're rolling back, to prevent any deliveries occurring during this
@@ -475,7 +466,7 @@
}
}
- tx = new TransactionImpl(persistenceManager, postOffice);
+ tx = new TransactionImpl(storageManager, postOffice);
}
public void cancel(final long deliveryID, final boolean expired) throws Exception
@@ -498,7 +489,7 @@
try
{
- Transaction cancelTx = new TransactionImpl(persistenceManager, postOffice);
+ Transaction cancelTx = new TransactionImpl(storageManager, postOffice);
for (Delivery del : deliveries)
{
@@ -534,7 +525,7 @@
if (delivery.getDeliveryID() == deliveryID)
{
- delivery.getReference().expire(persistenceManager, postOffice, queueSettingsRepository);
+ delivery.getReference().expire(storageManager, postOffice, queueSettingsRepository);
iter.remove();
@@ -556,7 +547,7 @@
}
finally
{
- tx = new TransactionImpl(persistenceManager, postOffice);
+ tx = new TransactionImpl(storageManager, postOffice);
}
@@ -566,7 +557,7 @@
{
if (tx != null)
{
- final String msg = "Cannot commit, session is currently doing work in a transaction "
+ final String msg = "Cannot commit, session is currently doing work in transaction "
+ tx.getXid();
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
@@ -806,7 +797,7 @@
return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
- tx = new TransactionImpl(xid, persistenceManager, postOffice);
+ tx = new TransactionImpl(xid, storageManager, postOffice);
boolean added = resourceManager.putTransaction(xid, tx);
@@ -958,7 +949,7 @@
if (queue.isDurable())
{
- binding.getQueue().deleteAllReferences(persistenceManager);
+ binding.getQueue().deleteAllReferences(storageManager);
}
if (queue.isTemporary())
@@ -997,11 +988,10 @@
maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
- long id = dispatcher.generateID();
-
ServerConsumer consumer =
- new ServerConsumerImpl(id, clientTargetID, binding.getQueue(), noLocal, filter, autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
- this, persistenceManager, queueSettingsRepository, postOffice, connection.isStarted());
+ new ServerConsumerImpl(this, clientTargetID, binding.getQueue(), noLocal, filter,
+ autoDeleteQueue, windowSize != -1, maxRate, connection.getID(),
+ connection.isStarted());
dispatcher.register(new ServerConsumerPacketHandler(consumer));
@@ -1080,10 +1070,8 @@
securityStore.check(binding.getAddress(), CheckType.READ, connection);
- long id = dispatcher.generateID();
+ ServerBrowserImpl browser = new ServerBrowserImpl(this, binding.getQueue(), filterString == null ? null : filterString.toString());
- ServerBrowserImpl browser = new ServerBrowserImpl(id, this, binding.getQueue(), filterString == null ? null : filterString.toString());
-
browsers.add(browser);
dispatcher.register(browser.newHandler());
@@ -1112,8 +1100,6 @@
flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
}
- long id = dispatcher.generateID();
-
final int windowToUse = flowController == null ? -1 : windowSize;
//Server window size is 0.75 client window size for producer flow control (other way round to consumer flow control)
@@ -1121,7 +1107,7 @@
final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);
ServerProducerImpl producer
- = new ServerProducerImpl(id, clientTargetID, this, address, sender, flowController, serverWindowSize);
+ = new ServerProducerImpl(this, clientTargetID, address, sender, flowController, serverWindowSize);
producers.add(producer);
@@ -1155,11 +1141,11 @@
if (count == 0)
{
- persistenceManager.storeDelete(message.getMessageID());
+ storageManager.storeDelete(message.getMessageID());
}
else
{
- persistenceManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
+ storageManager.storeAcknowledge(queue.getPersistenceID(), message.getMessageID());
}
}
More information about the jboss-cvs-commits
mailing list