[jboss-cvs] JBoss Messaging SVN: r3778 - in trunk: src/main/org/jboss/jms/server and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Feb 24 07:15:31 EST 2008
Author: timfox
Date: 2008-02-24 07:15:29 -0500 (Sun, 24 Feb 2008)
New Revision: 3778
Added:
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionPacketHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerPacketHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java
Modified:
trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java
trunk/src/main/org/jboss/jms/server/ConnectionManager.java
trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/jms/server/container/SecurityManager.java
trunk/src/main/org/jboss/jms/server/endpoint/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerPacketHandlerSupport.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
Refactor and create interfaces for endpoints
Modified: trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -33,24 +33,28 @@
*/
public class AsfMessageHolder
{
- private JBossMessage msg;
+ private final JBossMessage msg;
- private String consumerID;
+ private final String consumerID;
- private String queueName;
+ private final String queueName;
- private int maxDeliveries;
+ private final int maxDeliveries;
- private ClientSession connectionConsumerSession;
+ private final ClientSession connectionConsumerSession;
- public AsfMessageHolder(JBossMessage msg, String consumerID,
- String queueName, int maxDeliveries,
- ClientSession connectionConsumerSession)
+ public AsfMessageHolder(final JBossMessage msg, final String consumerID,
+ final String queueName, final int maxDeliveries,
+ final ClientSession connectionConsumerSession)
{
this.msg = msg;
+
this.consumerID = consumerID;
+
this.queueName = queueName;
+
this.maxDeliveries = maxDeliveries;
+
this.connectionConsumerSession = connectionConsumerSession;
}
@@ -59,48 +63,23 @@
return msg;
}
- public void setMsg(JBossMessage msg)
- {
- this.msg = msg;
- }
-
public String getConsumerID()
{
return consumerID;
}
- public void setConsumerID(String consumerID)
- {
- this.consumerID = consumerID;
- }
-
public String getQueueName()
{
return queueName;
}
- public void setQueueName(String queueName)
- {
- this.queueName = queueName;
- }
-
public int getMaxDeliveries()
{
return maxDeliveries;
}
- public void setMaxDeliveries(int maxDeliveries)
- {
- this.maxDeliveries = maxDeliveries;
- }
-
public ClientSession getConnectionConsumerSession()
{
return connectionConsumerSession;
}
-
- public void setConnectionConsumerSession(ClientSession connectionConsumerSession)
- {
- this.connectionConsumerSession = connectionConsumerSession;
- }
}
Modified: trunk/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionManager.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/ConnectionManager.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -23,7 +23,7 @@
import java.util.List;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnection;
import org.jboss.messaging.core.MessagingComponent;
@@ -42,20 +42,20 @@
public interface ConnectionManager extends MessagingComponent
{
void registerConnection(String clientVMID,
- String remotingClientSessionID, ServerConnectionEndpoint endpoint);
+ String remotingClientSessionID, ServerConnection endpoint);
/**
- * @param serverConnectionEndpoint
+ * @param ServerConnection
* @return null if there is no such connection.
*/
- ServerConnectionEndpoint unregisterConnection(String remotingClientSessionID, ServerConnectionEndpoint serverConnectionEndpoint);
+ ServerConnection unregisterConnection(String remotingClientSessionID, ServerConnection ServerConnection);
/**
* Returns a list of active connection endpoints currently maintained by an instance of this
* manager. The implementation should make a copy of the list to avoid
* ConcurrentModificationException. The list could be empty, but never null.
*
- * @return List<ServerConnectionEndpoint>
+ * @return List<ServerConnection>
*/
- List<ServerConnectionEndpoint> getActiveConnections();
+ List<ServerConnection> getActiveConnections();
}
Modified: trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,38 +21,38 @@
*/
package org.jboss.jms.server;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.jms.Message;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+import javax.naming.NamingException;
+
+import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.api.ClientConnectionFactory;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.server.endpoint.ServerConnection;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.MessagingServerManagement;
import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
import org.jboss.messaging.core.impl.server.SubscriptionInfo;
-import org.jboss.jms.server.MessageStatistics;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
-import org.jboss.messaging.core.impl.filter.FilterImpl;
import org.jboss.messaging.deployers.Deployer;
import org.jboss.messaging.deployers.DeploymentManager;
import org.jboss.messaging.util.JNDIUtil;
import org.jboss.messaging.util.MessageQueueNameHelper;
-import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
-import javax.naming.NamingException;
-import javax.jms.Message;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
-
/**
* A Deployer used to create and add to JNDI queues, topics and connection factories. Typically this would only be used
* in an app server env.
@@ -502,13 +502,13 @@
public List<ClientInfo> getClients() throws Exception
{
List<ClientInfo> clientInfos = new ArrayList<ClientInfo>();
- List<ServerConnectionEndpoint> endpoints = messagingServerManagement.getActiveConnections();
- for (ServerConnectionEndpoint endpoint : endpoints)
+ List<ServerConnection> endpoints = messagingServerManagement.getActiveConnections();
+ for (ServerConnection endpoint : endpoints)
{
clientInfos.add(new ClientInfo(endpoint.getUsername(),
endpoint.getClientAddress(),
endpoint.isStarted(),
- endpoint.getCreated()));
+ endpoint.getCreatedTime()));
}
return clientInfos;
}
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -32,7 +32,7 @@
import org.jboss.jms.client.api.FailureListener;
import org.jboss.jms.client.impl.JMSClientVMIdentifier;
import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnection;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
import org.jboss.messaging.util.RemotingException;
@@ -57,9 +57,9 @@
// Attributes -----------------------------------------------------------------------------------
- private Map<String /* remoting session ID */, List<ServerConnectionEndpoint>> endpoints;
+ private Map<String /* remoting session ID */, List<ServerConnection>> endpoints;
- private Set<ServerConnectionEndpoint> activeServerConnectionEndpoints;
+ private Set<ServerConnection> activeServerConnections;
// the clients maps is for information only: to better identify the clients of
// jboss messaging using their VM ID
@@ -69,27 +69,27 @@
public SimpleConnectionManager()
{
- endpoints = new HashMap<String, List<ServerConnectionEndpoint>>();
- activeServerConnectionEndpoints = new HashSet<ServerConnectionEndpoint>();
+ endpoints = new HashMap<String, List<ServerConnection>>();
+ activeServerConnections = new HashSet<ServerConnection>();
clients = new HashMap<String, String>();
}
// ConnectionManager implementation -------------------------------------------------------------
public synchronized void registerConnection(String clientVMID, String remotingClientSessionID,
- ServerConnectionEndpoint endpoint)
+ ServerConnection endpoint)
{
- List<ServerConnectionEndpoint> connectionEndpoints = endpoints.get(remotingClientSessionID);
+ List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
if (connectionEndpoints == null)
{
- connectionEndpoints = new ArrayList<ServerConnectionEndpoint>();
+ connectionEndpoints = new ArrayList<ServerConnection>();
endpoints.put(remotingClientSessionID, connectionEndpoints);
}
connectionEndpoints.add(endpoint);
- activeServerConnectionEndpoints.add(endpoint);
+ activeServerConnections.add(endpoint);
clients.put(remotingClientSessionID, clientVMID);
@@ -97,10 +97,10 @@
Util.guidToString(remotingClientSessionID));
}
- public synchronized ServerConnectionEndpoint unregisterConnection(String remotingClientSessionID,
- ServerConnectionEndpoint endpoint)
+ public synchronized ServerConnection unregisterConnection(String remotingClientSessionID,
+ ServerConnection endpoint)
{
- List<ServerConnectionEndpoint> connectionEndpoints = endpoints.get(remotingClientSessionID);
+ List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
if (connectionEndpoints != null)
{
@@ -108,7 +108,7 @@
if (removed)
{
- activeServerConnectionEndpoints.remove(endpoint);
+ activeServerConnections.remove(endpoint);
}
log.debug("unregistered connection " + endpoint + " with remoting session ID " + remotingClientSessionID);
@@ -124,11 +124,11 @@
return null;
}
- public synchronized List<ServerConnectionEndpoint> getActiveConnections()
+ public synchronized List<ServerConnection> getActiveConnections()
{
// I will make a copy to avoid ConcurrentModification
- List<ServerConnectionEndpoint> list = new ArrayList<ServerConnectionEndpoint>();
- list.addAll(activeServerConnectionEndpoints);
+ List<ServerConnection> list = new ArrayList<ServerConnection>();
+ list.addAll(activeServerConnections);
return list;
}
@@ -199,15 +199,15 @@
{
assert remotingClientSessionID != null;
- List<ServerConnectionEndpoint> connectionEndpoints = endpoints.get(remotingClientSessionID);
+ List<ServerConnection> connectionEndpoints = endpoints.get(remotingClientSessionID);
// the connection endpoints are copied in a new list to avoid concurrent modification exception
- List<ServerConnectionEndpoint> copy;
+ List<ServerConnection> copy;
if (connectionEndpoints != null)
- copy = new ArrayList<ServerConnectionEndpoint>(connectionEndpoints);
+ copy = new ArrayList<ServerConnection>(connectionEndpoints);
else
- copy = new ArrayList<ServerConnectionEndpoint>();
+ copy = new ArrayList<ServerConnection>();
- for (ServerConnectionEndpoint sce : copy)
+ for (ServerConnection sce : copy)
{
try
{
@@ -244,11 +244,11 @@
{
buff.append(" No registered endpoints\n");
}
- for (Entry<String, List<ServerConnectionEndpoint>> entry : endpoints.entrySet())
+ for (Entry<String, List<ServerConnection>> entry : endpoints.entrySet())
{
- List<ServerConnectionEndpoint> connectionEndpoints = entry.getValue();
+ List<ServerConnection> connectionEndpoints = entry.getValue();
buff.append(" " + entry.getKey() + "----->\n");
- for (ServerConnectionEndpoint sce : connectionEndpoints)
+ for (ServerConnection sce : connectionEndpoints)
{
buff.append(" " + sce + " (" + System.identityHashCode(sce) + ")\n");
}
Modified: trunk/src/main/org/jboss/jms/server/container/SecurityManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/container/SecurityManager.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/container/SecurityManager.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -25,7 +25,7 @@
import java.util.Set;
import org.jboss.jms.server.SecurityStore;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnection;
import org.jboss.jms.server.security.CheckType;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
@@ -134,7 +134,7 @@
return granted;
}
- public void check(String address, CheckType checkType, ServerConnectionEndpoint conn)
+ public void check(String address, CheckType checkType, ServerConnection conn)
throws MessagingException
{
if (trace) { log.trace("checking access permissions to " + address); }
@@ -145,7 +145,7 @@
return;
}
- SecurityStore sm = conn.getSecurityManager();
+ SecurityStore sm = conn.getSecurityStore();
// Authenticate. Successful autentication will place a new SubjectContext on thread local,
// which will be used in the authorization process. However, we need to make sure we clean up
Modified: trunk/src/main/org/jboss/jms/server/endpoint/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/MessagingServerPacketHandler.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/MessagingServerPacketHandler.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -24,8 +24,13 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.CREATECONNECTION;
import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
+import org.jboss.jms.server.ConnectionManager;
+import org.jboss.jms.server.SecurityStore;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
+import org.jboss.messaging.core.ResourceManager;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
@@ -44,11 +49,34 @@
{
private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
- private MessagingServer messagingServer;
+ private final PacketDispatcher dispatcher;
+
+ private final ResourceManager resourceManager;
+
+ private final PersistenceManager persistenceManager;
+
+ private final PostOffice postOffice;
+
+ private final SecurityStore securityStore;
+
+ private final ConnectionManager connectionManager;
- public MessagingServerPacketHandler(MessagingServer messagingServer)
+ public MessagingServerPacketHandler(final PacketDispatcher dispatcher, final ResourceManager resourceManager,
+ final PersistenceManager persistenceManager,
+ final PostOffice postOffice, final SecurityStore securityStore,
+ final ConnectionManager connectionManager)
{
- this.messagingServer = messagingServer;
+ this.dispatcher = dispatcher;
+
+ this.resourceManager = resourceManager;
+
+ this.persistenceManager = persistenceManager;
+
+ this.postOffice = postOffice;
+
+ this.securityStore = securityStore;
+
+ this.connectionManager = connectionManager;
}
/*
@@ -64,7 +92,7 @@
return ClientConnectionFactoryImpl.id;
}
- public Packet doHandle(Packet packet, PacketSender sender) throws Exception
+ public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
{
Packet response = null;
@@ -87,11 +115,9 @@
return response;
}
- private CreateConnectionResponse
- createConnection(String username,
- String password,
- String remotingSessionID, String clientVMID, int prefetchSize,
- String address)
+ private CreateConnectionResponse createConnection(final String username, final String password,
+ final String remotingClientSessionID, final String clientVMID, final int prefetchSize,
+ final String clientAddress)
throws Exception
{
log.trace("creating a new connection for user " + username);
@@ -101,39 +127,19 @@
// up thread local immediately after we used the information, otherwise some other people
// security my be screwed up, on account of thread local security stack being corrupted.
- messagingServer.getSecurityManager().authenticate(username, password);
+ securityStore.authenticate(username, password);
// We don't need the SubjectContext on thread local anymore, clean it up
SecurityActions.popSubjectContext();
- //Client ID is a JMS concept and does not belong on the server
-
-// String clientIDUsed = clientID;
-//
-// // see if there is a preconfigured client id for the user
-// if (username != null)
-// {
-// String preconfClientID =
-// messagingServer.getJmsUserManagerInstance().getPreConfiguredClientID(username);
-//
-// if (preconfClientID != null)
-// {
-// clientIDUsed = preconfClientID;
-// }
-// }
+ final ServerConnection connection =
+ new ServerConnectionEndpoint(username, password,
+ remotingClientSessionID, clientVMID, clientAddress,
+ prefetchSize, dispatcher, resourceManager, persistenceManager,
+ postOffice, securityStore, connectionManager);
- // create the corresponding "server-side" connection endpoint and register it with the
- // server peer's ClientManager
- final ServerConnectionEndpoint endpoint =
- new ServerConnectionEndpoint(messagingServer, username, password, prefetchSize,
- remotingSessionID, clientVMID, address);
+ dispatcher.register(new ServerConnectionPacketHandler(connection));
- String connectionID = endpoint.getConnectionID();
-
- messagingServer.getRemotingService().getDispatcher().register(endpoint.newHandler());
-
- log.trace("created and registered " + endpoint);
-
- return new CreateConnectionResponse(connectionID);
+ return new CreateConnectionResponse(connection.getID());
}
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.UUID;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Message;
@@ -67,18 +68,19 @@
// Attributes -----------------------------------------------------------------------------------
private final String id;
- private final ServerSessionEndpoint session;
+ private final ServerSession session;
private final Queue destination;
private final Filter filter;
private Iterator iterator;
// Constructors ---------------------------------------------------------------------------------
- ServerBrowserEndpoint(ServerSessionEndpoint session, String id,
+ ServerBrowserEndpoint(ServerSession session,
Queue destination, String messageFilter) throws Exception
{
this.session = session;
- this.id = id;
+ id = UUID.randomUUID().toString();
+
this.destination = destination;
if (messageFilter != null)
@@ -93,6 +95,11 @@
// BrowserEndpoint implementation ---------------------------------------------------------------
+ public String getID()
+ {
+ return id;
+ }
+
public void reset() throws Exception
{
iterator = createIterator();
@@ -153,8 +160,6 @@
{
iterator = null;
- session.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
-
session.removeBrowser(id);
log.trace(this + " closed");
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnection.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import org.jboss.jms.server.SecurityStore;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionResponseMessage;
+
+/**
+ *
+ * A ServerConnection
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ServerConnection
+{
+ String getID();
+
+ ConnectionCreateSessionResponseMessage createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
+ PacketSender sender) throws Exception;
+
+ void start() throws Exception;
+
+ void stop() throws Exception;
+
+ void close() throws Exception;
+
+ SecurityStore getSecurityStore();
+
+ String getUsername();
+
+ String getPassword();
+
+ void removeSession(String sessionID) throws Exception;
+
+ void addTemporaryQueue(Queue queue);
+
+ void removeTemporaryQueue(Queue queue);
+
+ boolean isStarted();
+
+ long getCreatedTime();
+
+ String getClientAddress();
+}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,11 +21,6 @@
*/
package org.jboss.jms.server.endpoint;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_CREATESESSION;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_START;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_STOP;
-
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -37,19 +32,16 @@
import org.jboss.jms.server.ConnectionManager;
import org.jboss.jms.server.SecurityStore;
import org.jboss.messaging.core.Binding;
-import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.PersistenceManager;
import org.jboss.messaging.core.PostOffice;
import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.ResourceManager;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionMessage;
import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.Packet;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.util.MessagingException;
/**
* Concrete implementation of ConnectionEndpoint.
@@ -61,7 +53,7 @@
*
* $Id$
*/
-public class ServerConnectionEndpoint
+public class ServerConnectionEndpoint implements ServerConnection
{
// Constants ------------------------------------------------------------------------------------
@@ -75,87 +67,102 @@
private final String id;
- private volatile boolean started;
-
private final String username;
private final String password;
private final String remotingClientSessionID;
- private final String jmsClientVMID;
+ private final String clientAddress;
+
+ private final int prefetchSize;
- private final MessagingServer messagingServer;
-
+ private final PacketDispatcher dispatcher;
+
+ private final ResourceManager resourceManager;
+
+ private final PersistenceManager persistenceManager;
+
private final PostOffice postOffice;
- private final SecurityStore sm;
+ private final SecurityStore securityStore;
- private final ConnectionManager cm;
+ private final ConnectionManager connectionManager;
- private final ConcurrentMap<String, ServerSessionEndpoint> sessions = new ConcurrentHashMap<String, ServerSessionEndpoint>();
+ private final long createdTime;
+
+ private final ConcurrentMap<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
private final Set<Queue> temporaryQueues = new ConcurrentHashSet<Queue>();
-
- private final int prefetchSize;
-
- private String clientAddress;
- private long created;
-
+
+ private volatile boolean started;
+
+
// Constructors ---------------------------------------------------------------------------------
-
- public ServerConnectionEndpoint(MessagingServer messagingServer,
- String username, String password, int prefetchSize,
- String remotingSessionID,
- String clientVMID,
- String clientAddress) throws Exception
+
+ public ServerConnectionEndpoint(final String username, final String password,
+ final String remotingClientSessionID, final String jmsClientVMID,
+ final String clientAddress,
+ final int prefetchSize, final PacketDispatcher dispatcher,
+ final ResourceManager resourceManager,
+ final PersistenceManager persistenceManager,
+ final PostOffice postOffice, final SecurityStore securityStore,
+ final ConnectionManager connectionManager)
{
- this.messagingServer = messagingServer;
-
- sm = messagingServer.getSecurityManager();
- cm = messagingServer.getConnectionManager();
- postOffice = messagingServer.getPostOffice();
-
- started = false;
-
- this.id = UUID.randomUUID().toString();
+ id = UUID.randomUUID().toString();
- this.prefetchSize = prefetchSize;
-
- this.username = username;
+ this.username = username;
this.password = password;
+
+ this.remotingClientSessionID = remotingClientSessionID;
- this.remotingClientSessionID = remotingSessionID;
-
- this.jmsClientVMID = clientVMID;
-
this.clientAddress = clientAddress;
- created = System.currentTimeMillis();
+ this.prefetchSize = prefetchSize;
- cm.registerConnection(jmsClientVMID, remotingClientSessionID, this);
+ this.dispatcher = dispatcher;
+
+ this.resourceManager = resourceManager;
+
+ this.persistenceManager = persistenceManager;
+
+ this.postOffice = postOffice;
+
+ this.securityStore = securityStore;
+
+ this.connectionManager = connectionManager;
+
+ started = false;
+
+ createdTime = System.currentTimeMillis();
+
+ connectionManager.registerConnection(jmsClientVMID, remotingClientSessionID, this);
}
- // ConnectionDelegate implementation ------------------------------------------------------------
+ // ServerConnection implementation ------------------------------------------------------------
- public ConnectionCreateSessionResponseMessage createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
- PacketSender sender)
- throws Exception
+ public String getID()
+ {
+ return id;
+ }
+
+ public ConnectionCreateSessionResponseMessage createSession(final boolean xa, final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final PacketSender sender) throws Exception
{
- String sessionID = UUID.randomUUID().toString();
-
- ServerSessionEndpoint ep =
- new ServerSessionEndpoint(sessionID, this, autoCommitSends, autoCommitAcks, xa, sender, messagingServer.getResourceManager());
+ ServerSession session =
+ new ServerSessionEndpoint(autoCommitSends, autoCommitAcks, prefetchSize, xa, this, resourceManager,
+ sender, dispatcher, persistenceManager, postOffice);
synchronized (sessions)
{
- sessions.put(sessionID, ep);
+ sessions.put(session.getID(), session);
}
- messagingServer.getRemotingService().getDispatcher().register(ep.newHandler());
+ dispatcher.register(new ServerSessionPacketHandler(session, prefetchSize));
- return new ConnectionCreateSessionResponseMessage(sessionID);
+ return new ConnectionCreateSessionResponseMessage(session.getID());
}
public void start() throws Exception
@@ -170,9 +177,9 @@
public void close() throws Exception
{
- Map<String, ServerSessionEndpoint> sessionsClone = new HashMap<String, ServerSessionEndpoint>(sessions);
+ Map<String, ServerSession> sessionsClone = new HashMap<String, ServerSession>(sessions);
- for (ServerSessionEndpoint session: sessionsClone.values())
+ for (ServerSession session: sessionsClone.values())
{
session.close();
}
@@ -197,13 +204,16 @@
temporaryQueues.clear();
- cm.unregisterConnection(remotingClientSessionID, this);
+ connectionManager.unregisterConnection(remotingClientSessionID, this);
- messagingServer.getRemotingService().getDispatcher().unregister(id);
+ dispatcher.unregister(id);
}
-
- // Public ---------------------------------------------------------------------------------------
-
+
+ public SecurityStore getSecurityStore()
+ {
+ return securityStore;
+ }
+
public String getUsername()
{
return username;
@@ -213,152 +223,65 @@
{
return password;
}
-
- public long getCreated()
+
+ public void removeSession(final String sessionId) throws Exception
{
- return created;
+ if (sessions.remove(sessionId) == null)
+ {
+ throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
+ }
}
- public String getClientAddress()
+ public void addTemporaryQueue(final Queue queue)
{
- return clientAddress;
+ temporaryQueues.add(queue);
}
-
- public SecurityStore getSecurityManager()
+
+ public void removeTemporaryQueue(final Queue queue)
{
- return sm;
+ temporaryQueues.remove(queue);
}
-
- public MessagingServer getMessagingServer()
+
+ public int getPrefetchSize()
{
- return messagingServer;
- }
-
- public PacketHandler newHandler()
- {
- return new ConnectionPacketHandler();
- }
-
- public String toString()
- {
- return "ConnectionEndpoint[" + id + "]";
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- int getPrefetchSize()
- {
return prefetchSize;
}
-
- String getConnectionID()
- {
- return id;
- }
-
+
public boolean isStarted()
{
return started;
}
-
- void removeSession(String sessionId) throws Exception
+
+ public long getCreatedTime()
{
- if (sessions.remove(sessionId) == null)
- {
- throw new IllegalStateException("Cannot find session with id " + sessionId + " to remove");
- }
+ return createdTime;
}
- void addTemporaryQueue(Queue queue)
+ public String getClientAddress()
{
- temporaryQueues.add(queue);
+ return clientAddress;
}
-
- void removeTemporaryQueue(Queue queue)
- {
- temporaryQueues.remove(queue);
- }
- String getRemotingClientSessionID()
+ // Public ---------------------------------------------------------------------------------------
+
+ public String toString()
{
- return remotingClientSessionID;
+ return "ConnectionEndpoint[" + id + "]";
}
-
- // Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
- private void setStarted(boolean started) throws Exception
+ private void setStarted(final boolean started) throws Exception
{
- Map<String, ServerSessionEndpoint> sessionsClone = null;
+ Map<String, ServerSession> sessionsClone = null;
- sessionsClone = new HashMap<String, ServerSessionEndpoint>(sessions);
+ sessionsClone = new HashMap<String, ServerSession>(sessions);
- for (ServerSessionEndpoint session: sessionsClone.values() )
+ for (ServerSession session: sessionsClone.values() )
{
session.setStarted(started);
}
this.started = started;
- }
-
- // Inner classes --------------------------------------------------------------------------------
-
- private class ConnectionPacketHandler extends ServerPacketHandlerSupport
- {
- public ConnectionPacketHandler()
- {
- }
-
- public String getID()
- {
- return ServerConnectionEndpoint.this.id;
- }
-
- public Packet doHandle(Packet packet, PacketSender sender) throws Exception
- {
- Packet response = null;
-
- PacketType type = packet.getType();
-
- if (type == CONN_CREATESESSION)
- {
- ConnectionCreateSessionMessage request = (ConnectionCreateSessionMessage) packet;
-
- response = createSession(request.isXA(), request.isAutoCommitSends(), request.isAutoCommitAcks(), sender);
- }
- else if (type == CONN_START)
- {
- start();
- }
- else if (type == CONN_STOP)
- {
- stop();
- }
- else if (type == CLOSE)
- {
- close();
- }
- else
- {
- throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
- "Unsupported packet " + type);
- }
-
- // reply if necessary
- if (response == null && packet.isOneWay() == false)
- {
- response = new NullPacket();
- }
-
- return response;
- }
-
- @Override
- public String toString()
- {
- return "ConnectionAdvisedPacketHandler[id=" + id + "]";
- }
- }
-
+ }
}
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionPacketHandler.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionPacketHandler.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_CREATESESSION;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_START;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONN_STOP;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.ConnectionCreateSessionMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.util.MessagingException;
+
+/**
+ *
+ * A ServerConnectionPacketHandler
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ServerConnectionPacketHandler extends ServerPacketHandlerSupport
+{
+ private final ServerConnection connection;
+
+ public ServerConnectionPacketHandler(final ServerConnection connection)
+ {
+ this.connection = connection;
+ }
+
+ public String getID()
+ {
+ return connection.getID();
+ }
+
+ public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
+ {
+ Packet response = null;
+
+ PacketType type = packet.getType();
+
+ if (type == CONN_CREATESESSION)
+ {
+ ConnectionCreateSessionMessage request = (ConnectionCreateSessionMessage) packet;
+
+ response = connection.createSession(request.isXA(), request.isAutoCommitSends(), request.isAutoCommitAcks(), sender);
+ }
+ else if (type == CONN_START)
+ {
+ connection.start();
+ }
+ else if (type == CONN_STOP)
+ {
+ connection.stop();
+ }
+ else if (type == CLOSE)
+ {
+ connection.close();
+ }
+ else
+ {
+ throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
+ "Unsupported packet " + type);
+ }
+
+ // reply if necessary
+ if (response == null && packet.isOneWay() == false)
+ {
+ response = new NullPacket();
+ }
+
+ return response;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConnectionAdvisedPacketHandler[id=" + connection.getID() + "]";
+ }
+}
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumer.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumer.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import org.jboss.messaging.core.Consumer;
+
+/**
+ *
+ * A ServerConsumer
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ServerConsumer extends Consumer
+{
+ String getID();
+
+ void close() throws Exception;
+
+ void setStarted(boolean started) throws Exception;
+
+ void receiveTokens(int tokens) throws Exception;
+}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,27 +21,18 @@
*/
package org.jboss.jms.server.endpoint;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
-
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.messaging.core.Consumer;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.HandleStatus;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.MessagingServer;
import org.jboss.messaging.core.PersistenceManager;
+import org.jboss.messaging.core.PostOffice;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.Packet;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.Logger;
-import org.jboss.messaging.util.MessagingException;
/**
* Concrete implementation of a ClientConsumer.
@@ -56,7 +47,7 @@
*
* @version <tt>$Revision$</tt> $Id$
*/
-public class ServerConsumerEndpoint implements Consumer
+public class ServerConsumerEndpoint implements ServerConsumer
{
// Constants ------------------------------------------------------------------------------------
@@ -66,65 +57,76 @@
// Attributes -----------------------------------------------------------------------------------
- private boolean trace = log.isTraceEnabled();
+ private final boolean trace = log.isTraceEnabled();
private final String id;
private final Queue messageQueue;
-
- private final ServerSessionEndpoint sessionEndpoint;
-
+
private final boolean noLocal;
private final Filter filter;
-
- private boolean started;
-
- // This lock protects starting and stopping
- private final Object startStopLock;
-
- private final AtomicInteger availableTokens = new AtomicInteger(0);
private final boolean autoDeleteQueue;
private final boolean enableFlowControl;
+ private final String connectionID;
+
+ private final ServerSession sessionEndpoint;
+
private final PersistenceManager persistenceManager;
+
+ private final PostOffice postOffice;
+
+ private final Object startStopLock = new Object();
+ private final AtomicInteger availableTokens = new AtomicInteger(0);
+
+ private boolean started;
+
// Constructors ---------------------------------------------------------------------------------
- ServerConsumerEndpoint(MessagingServer sp, String id, Queue messageQueue,
- ServerSessionEndpoint sessionEndpoint, Filter filter,
- boolean noLocal, boolean autoDeleteQueue, boolean enableFlowControl)
+ ServerConsumerEndpoint(final Queue messageQueue, final boolean noLocal, final Filter filter,
+ final boolean autoDeleteQueue, final boolean enableFlowControl,
+ final String connectionID, final ServerSession sessionEndpoint,
+ final PersistenceManager persistenceManager, final PostOffice postOffice,
+ final boolean started)
{
- this.id = id;
-
+ id = UUID.randomUUID().toString();
+
this.messageQueue = messageQueue;
-
- this.sessionEndpoint = sessionEndpoint;
-
+
this.noLocal = noLocal;
- this.startStopLock = new Object();
-
this.filter = filter;
-
- this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
this.autoDeleteQueue = autoDeleteQueue;
this.enableFlowControl = enableFlowControl;
- this.persistenceManager = sessionEndpoint.getConnectionEndpoint().getMessagingServer().getPersistenceManager();
+ this.connectionID = connectionID;
+
+ this.sessionEndpoint = sessionEndpoint;
+
+ this.persistenceManager = persistenceManager;
- // adding the consumer to the queue
+ this.postOffice = postOffice;
+
+ this.started = started;
+
messageQueue.addConsumer(this);
messageQueue.deliver();
}
- // Receiver implementation ----------------------------------------------------------------------
+ // ServerConsumer implementation ----------------------------------------------------------------------
+ public String getID()
+ {
+ return id;
+ }
+
public HandleStatus handle(MessageReference ref) throws Exception
{
if (enableFlowControl && availableTokens.get() == 0)
@@ -161,11 +163,9 @@
{
String conId = message.getConnectionID();
- if (sessionEndpoint.getConnectionEndpoint().getConnectionID().equals(conId))
- {
- PersistenceManager pm = sessionEndpoint.getConnectionEndpoint().getMessagingServer().getPersistenceManager();
-
- ref.acknowledge(pm);
+ if (connectionID.equals(conId))
+ {
+ ref.acknowledge(persistenceManager);
return HandleStatus.HANDLED;
}
@@ -191,8 +191,6 @@
}
}
- // Closeable implementation ---------------------------------------------------------------------
-
public void close() throws Exception
{
if (trace)
@@ -204,56 +202,24 @@
messageQueue.removeConsumer(this);
- sessionEndpoint.getConnectionEndpoint().getMessagingServer().getRemotingService().getDispatcher().unregister(id);
-
if (autoDeleteQueue)
{
if (messageQueue.getConsumerCount() == 0)
- {
- MessagingServer server = sessionEndpoint.getConnectionEndpoint().getMessagingServer();
+ {
+ postOffice.removeBinding(messageQueue.getName());
- server.getPostOffice().removeBinding(messageQueue.getName());
-
if (messageQueue.isDurable())
{
- server.getPersistenceManager().deleteAllReferences(messageQueue);
+ persistenceManager.deleteAllReferences(messageQueue);
}
}
}
sessionEndpoint.removeConsumer(id);
}
-
- // ConsumerEndpoint implementation --------------------------------------------------------------
-
- public void receiveTokens(int tokens) throws Exception
- {
- availableTokens.addAndGet(tokens);
-
- promptDelivery();
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public String toString()
- {
- return "ConsumerEndpoint[" + id + "]";
- }
-
- public PacketHandler newHandler()
- {
- return new ServerConsumerEndpointPacketHandler();
- }
-
- // Package protected ----------------------------------------------------------------------------
- String getID()
+ public void setStarted(boolean started)
{
- return this.id;
- }
-
- void setStarted(boolean started)
- {
boolean useStarted;
synchronized (startStopLock)
@@ -269,61 +235,25 @@
promptDelivery();
}
}
-
- // Protected ------------------------------------------------------------------------------------
+
+ public void receiveTokens(int tokens) throws Exception
+ {
+ availableTokens.addAndGet(tokens);
- // Private --------------------------------------------------------------------------------------
+ promptDelivery();
+ }
- private void promptDelivery()
+ // Public -----------------------------------------------------------------------------
+
+ public String toString()
{
- sessionEndpoint.promptDelivery(messageQueue);
+ return "ConsumerEndpoint[" + id + "]";
}
-
- // Inner classes --------------------------------------------------------------------------------
- private class ServerConsumerEndpointPacketHandler extends ServerPacketHandlerSupport
- {
+ // Private --------------------------------------------------------------------------------------
- public String getID()
- {
- return ServerConsumerEndpoint.this.id;
- }
-
- public Packet doHandle(Packet packet, PacketSender sender) throws Exception
- {
- Packet response = null;
-
- PacketType type = packet.getType();
-
- if (type == CONS_FLOWTOKEN)
- {
- ConsumerFlowTokenMessage message = (ConsumerFlowTokenMessage) packet;
-
- receiveTokens(message.getTokens());
- }
- else if (type == CLOSE)
- {
- close();
- }
- else
- {
- throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
- "Unsupported packet " + type);
- }
-
- // reply if necessary
- if (response == null && packet.isOneWay() == false)
- {
- response = new NullPacket();
- }
-
- return response;
- }
-
- @Override
- public String toString()
- {
- return "ServerConsumerEndpointPacketHandler[id=" + id + "]";
- }
- }
+ private void promptDelivery()
+ {
+ sessionEndpoint.promptDelivery(messageQueue);
+ }
}
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerPacketHandler.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerPacketHandler.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.util.MessagingException;
+
+/**
+ *
+ * A ServerConsumerPacketHandler
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ServerConsumerPacketHandler extends ServerPacketHandlerSupport
+{
+ private final ServerConsumer consumer;
+
+ public ServerConsumerPacketHandler(final ServerConsumer consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ public String getID()
+ {
+ return consumer.getID();
+ }
+
+ public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
+ {
+ Packet response = null;
+
+ PacketType type = packet.getType();
+
+ if (type == CONS_FLOWTOKEN)
+ {
+ ConsumerFlowTokenMessage message = (ConsumerFlowTokenMessage) packet;
+
+ consumer.receiveTokens(message.getTokens());
+ }
+ else if (type == CLOSE)
+ {
+ consumer.close();
+ }
+ else
+ {
+ throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
+ "Unsupported packet " + type);
+ }
+
+ // reply if necessary
+ if (response == null && packet.isOneWay() == false)
+ {
+ response = new NullPacket();
+ }
+
+ return response;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ServerConsumerEndpointPacketHandler[id=" + consumer.getID() + "]";
+ }
+}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerPacketHandlerSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerPacketHandlerSupport.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerPacketHandlerSupport.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -1,3 +1,24 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.jms.server.endpoint;
import org.jboss.logging.Logger;
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAResponseMessage;
+
+/**
+ *
+ * A ServerSession
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ServerSession
+{
+ String getID();
+
+ void removeBrowser(String browserID) throws Exception;
+
+ void removeConsumer(String consumerID) throws Exception;
+
+ void close() throws Exception;
+
+ void setStarted(boolean started) throws Exception;
+
+ void handleDelivery(MessageReference reference, ServerConsumer consumer) throws Exception;
+
+ void promptDelivery(Queue queue);
+
+ boolean send(String address, Message msg) throws Exception;
+
+ void acknowledge(long deliveryID, boolean allUpTo) throws Exception;
+
+ void rollback() throws Exception;
+
+ void cancel(long deliveryID, boolean expired) throws Exception;
+
+ void commit() throws Exception;
+
+ SessionXAResponseMessage XACommit(boolean onePhase, Xid xid) throws Exception;
+
+ SessionXAResponseMessage XAEnd(Xid xid, boolean failed) throws Exception;
+
+ SessionXAResponseMessage XAForget(Xid xid);
+
+ SessionXAResponseMessage XAJoin(Xid xid) throws Exception;
+
+ SessionXAResponseMessage XAPrepare(Xid xid) throws Exception;
+
+ SessionXAResponseMessage XAResume(Xid xid) throws Exception;
+
+ SessionXAResponseMessage XARollback(Xid xid) throws Exception;
+
+ SessionXAResponseMessage XAStart(Xid xid);
+
+ SessionXAResponseMessage XASuspend() throws Exception;
+
+ List<Xid> getInDoubtXids() throws Exception;
+
+ int getXATimeout();
+
+ boolean setXATimeout(int timeoutSeconds);
+
+ void addAddress(String address) throws Exception;
+
+ void removeAddress(String address) throws Exception;
+
+ void createQueue(String address, String queueName, String filterString, boolean durable, boolean temporary) throws Exception;
+
+ void deleteQueue(String queueName) throws Exception;
+
+ SessionCreateConsumerResponseMessage createConsumer(String queueName, String filterString,
+ boolean noLocal, boolean autoDeleteQueue, int prefetchSize) throws Exception;
+
+ SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception;
+
+ SessionBindingQueryResponseMessage executeBindingQuery(SessionBindingQueryMessage request) throws Exception;
+
+ SessionCreateBrowserResponseMessage createBrowser(String queueName, String selector) throws Exception;
+}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,31 +21,6 @@
*/
package org.jboss.jms.server.endpoint;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ACKNOWLEDGE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CANCEL;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_GET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_JOIN;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_PREPARE;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_RESUME;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_ROLLBACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_START;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SUSPEND;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -68,7 +43,7 @@
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.PersistenceManager;
import org.jboss.messaging.core.PostOffice;
import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.ResourceManager;
@@ -76,39 +51,15 @@
import org.jboss.messaging.core.impl.DeliveryImpl;
import org.jboss.messaging.core.impl.TransactionImpl;
import org.jboss.messaging.core.impl.filter.FilterImpl;
-import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.NullPacket;
-import org.jboss.messaging.core.remoting.wireformat.Packet;
-import org.jboss.messaging.core.remoting.wireformat.PacketType;
-import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAPrepareMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionXAStartMessage;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
@@ -126,13 +77,12 @@
*
* $Id$
*/
-public class ServerSessionEndpoint
+public class ServerSessionEndpoint implements ServerSession
{
// Constants
// ------------------------------------------------------------------------------------
- private static final Logger log = Logger
- .getLogger(ServerSessionEndpoint.class);
+ private static final Logger log = Logger.getLogger(ServerSessionEndpoint.class);
// Static
// ---------------------------------------------------------------------------------------
@@ -145,17 +95,27 @@
private final boolean trace = log.isTraceEnabled();
private final String id;
+
+ private final boolean autoCommitSends;
- private final ServerConnectionEndpoint connectionEndpoint;
+ private final boolean autoCommitAcks;
+
+ private final ServerConnection connection;
+
+ private final ResourceManager resourceManager;
- private final MessagingServer sp;
+ private final PacketSender sender;
+
+ private final PacketDispatcher dispatcher;
+
+ private final PersistenceManager persistenceManager;
+
+ private final PostOffice postOffice;
+
+ private final Map<String, ServerConsumer> consumers = new ConcurrentHashMap<String, ServerConsumer>();
- private final Map<String, ServerConsumerEndpoint> consumers = new ConcurrentHashMap<String, ServerConsumerEndpoint>();
-
private final Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
- private final PostOffice postOffice;
-
private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
private long deliveryIDSequence = 0;
@@ -164,80 +124,79 @@
private Transaction tx;
- private final boolean autoCommitSends;
-
- private final boolean autoCommitAcks;
-
- private final ResourceManager resourceManager;
-
- private PacketSender sender;
-
// Constructors
// ---------------------------------------------------------------------------------
- ServerSessionEndpoint(String sessionID,
- ServerConnectionEndpoint connectionEndpoint, boolean autoCommitSends,
- boolean autoCommitAcks, boolean xa, PacketSender sender, ResourceManager resourceManager)
- throws Exception
+ ServerSessionEndpoint(final boolean autoCommitSends,
+ final boolean autoCommitAcks, final int prefetchSize,
+ final boolean xa, final ServerConnection connection,
+ final ResourceManager resourceManager, final PacketSender sender,
+ final PacketDispatcher dispatcher, final PersistenceManager persistenceManager,
+ final PostOffice postOffice) throws Exception
{
- this.id = sessionID;
+ id = UUID.randomUUID().toString();
+
+ this.autoCommitSends = autoCommitSends;
- this.connectionEndpoint = connectionEndpoint;
-
- sp = connectionEndpoint.getMessagingServer();
-
- postOffice = sp.getPostOffice();
-
+ this.autoCommitAcks = autoCommitAcks;
+
if (!xa)
{
tx = new TransactionImpl();
}
- this.autoCommitSends = autoCommitSends;
+ this.connection = connection;
- this.autoCommitAcks = autoCommitAcks;
-
- this.sender = sender;
-
this.resourceManager = resourceManager;
+
+ this.sender = sender;
+
+ this.dispatcher = dispatcher;
+ this.persistenceManager = persistenceManager;
+
+ this.postOffice = postOffice;
+
if (log.isTraceEnabled())
+ {
log.trace("created server session endpoint for " + sender.getRemoteAddress());
+ }
}
- // Public
+ // ServerSession implementation
// ---------------------------------------------------------------------------------------
-
- public ServerConnectionEndpoint getConnectionEndpoint()
+
+ public String getID()
{
- return connectionEndpoint;
+ return id;
}
-
- public String toString()
+
+ public ServerConnection getConnection()
{
- return "SessionEndpoint[" + id + "]";
+ return connection;
}
- // Package protected
- // ----------------------------------------------------------------------------
-
- void removeBrowser(String browserId) throws Exception
+ public void removeBrowser(final String browserId) throws Exception
{
if (browsers.remove(browserId) == null)
{
throw new IllegalStateException("Cannot find browser with id " + browserId + " to remove");
}
+
+ dispatcher.unregister(browserId);
}
- void removeConsumer(String consumerId) throws Exception
+ public void removeConsumer(final String consumerId) throws Exception
{
if (consumers.remove(consumerId) == null)
{
throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
}
+
+ dispatcher.unregister(consumerId);
}
-
- synchronized void handleDelivery(MessageReference ref, ServerConsumerEndpoint consumer) throws Exception
+
+ public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
{
Delivery delivery = new DeliveryImpl(ref, consumer.getID(), deliveryIDSequence++, sender);
@@ -245,34 +204,22 @@
delivery.deliver();
}
-
- void setStarted(boolean s) throws Exception
+
+ public void setStarted(final boolean s) throws Exception
{
- Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+ Map<String, ServerConsumer> consumersClone = new HashMap<String, ServerConsumer>(consumers);
- for (ServerConsumerEndpoint consumer: consumersClone.values())
+ for (ServerConsumer consumer: consumersClone.values())
{
consumer.setStarted(s);
}
}
- void promptDelivery(final Queue queue)
- {
- // TODO - do we really need to prompt on a different thread?
- executor.execute(new Runnable()
- {
- public void run()
- {
- queue.deliver();
- }
- });
- }
-
public void close() throws Exception
{
- Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
+ Map<String, ServerConsumer> consumersClone = new HashMap<String, ServerConsumer>(consumers);
- for (ServerConsumerEndpoint consumer: consumersClone.values())
+ for (ServerConsumer consumer: consumersClone.values())
{
consumer.close();
}
@@ -296,20 +243,32 @@
deliveries.clear();
- connectionEndpoint.removeSession(id);
+ connection.removeSession(id);
- connectionEndpoint.getMessagingServer().getRemotingService()
- .getDispatcher().unregister(id);
+ dispatcher.unregister(id);
}
-
- private boolean send(String address, Message msg) throws Exception
+
+ public void promptDelivery(final Queue queue)
{
+ // TODO - do we really need to prompt on a different thread?
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ queue.deliver();
+ }
+ });
+ }
+
+ public boolean send(final String address, final Message msg) throws Exception
+ {
//check the address exists, if it doesnt add if the user has the correct privileges
- if(!postOffice.containsAllowableAddress(address))
+ if (!postOffice.containsAllowableAddress(address))
{
try
{
- security.check(address, CheckType.CREATE, getConnectionEndpoint());
+ security.check(address, CheckType.CREATE, connection);
+
postOffice.addAllowableAddress(address);
}
catch (MessagingException e)
@@ -318,15 +277,15 @@
}
}
//check the user has write access to this address
- security.check(address, CheckType.WRITE, getConnectionEndpoint());
+ security.check(address, CheckType.WRITE, connection);
// Assign the message an internal id - this is used to key it in the store
- msg.setMessageID(sp.getPersistenceManager().generateMessageID());
+ msg.setMessageID(persistenceManager.generateMessageID());
// This allows the no-local consumers to filter out the messages that come
// from the same
// connection.
- msg.setConnectionID(connectionEndpoint.getConnectionID());
+ msg.setConnectionID(connection.getID());
postOffice.route(address, msg);
@@ -342,7 +301,7 @@
{
if (msg.getNumDurableReferences() != 0)
{
- sp.getPersistenceManager().addMessage(msg);
+ persistenceManager.addMessage(msg);
}
msg.send();
@@ -356,8 +315,7 @@
}
}
- private synchronized void acknowledge(long deliveryID, boolean allUpTo)
- throws Exception
+ public synchronized void acknowledge(final long deliveryID, final boolean allUpTo) throws Exception
{
// Note that we do not consider it an error if the deliveries cannot be
// found to be acked.
@@ -391,7 +349,7 @@
if (autoCommitAcks)
{
- ref.acknowledge(sp.getPersistenceManager());
+ ref.acknowledge(persistenceManager);
}
else
{
@@ -429,7 +387,7 @@
if (autoCommitAcks)
{
- ref.acknowledge(sp.getPersistenceManager());
+ ref.acknowledge(persistenceManager);
}
else
{
@@ -445,7 +403,7 @@
}
}
- private void rollback() throws Exception
+ public void rollback() throws Exception
{
if (tx == null)
{
@@ -471,10 +429,10 @@
deliveryIDSequence -= tx.getAcknowledgementsCount();
}
- tx.rollback(sp.getPersistenceManager());
+ tx.rollback(persistenceManager);
}
- private void cancel(long deliveryID, boolean expired) throws Exception
+ public void cancel(final long deliveryID, final boolean expired) throws Exception
{
if (deliveryID == -1)
{
@@ -494,7 +452,7 @@
deliveries.clear();
}
- cancelTx.rollback(sp.getPersistenceManager());
+ cancelTx.rollback(persistenceManager);
}
else if (expired)
{
@@ -511,7 +469,7 @@
if (delivery.getDeliveryID() == deliveryID)
{
- delivery.getReference().expire(sp.getPersistenceManager());
+ delivery.getReference().expire(persistenceManager);
iter.remove();
@@ -525,13 +483,12 @@
}
}
- private void commit() throws Exception
+ public void commit() throws Exception
{
- tx.commit(true, sp.getPersistenceManager());
+ tx.commit(true, persistenceManager);
}
- private SessionXAResponseMessage XACommit(boolean onePhase, Xid xid)
- throws Exception
+ public SessionXAResponseMessage XACommit(final boolean onePhase, final Xid xid) throws Exception
{
if (tx != null)
{
@@ -554,7 +511,7 @@
XAException.XAER_PROTO,
"Cannot commit transaction, it is suspended " + xid); }
- theTx.commit(onePhase, sp.getPersistenceManager());
+ theTx.commit(onePhase, persistenceManager);
boolean removed = resourceManager.removeTransaction(xid);
@@ -568,7 +525,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
- private SessionXAResponseMessage XAEnd(Xid xid, boolean failed) throws Exception
+ public SessionXAResponseMessage XAEnd(final Xid xid, final boolean failed) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -610,7 +567,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
- private SessionXAResponseMessage XAForget(Xid xid)
+ public SessionXAResponseMessage XAForget(final Xid xid)
{
// Do nothing since we don't support heuristic commits / rollback from the
// resource manager
@@ -618,7 +575,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
- private SessionXAResponseMessage XAJoin(Xid xid) throws Exception
+ public SessionXAResponseMessage XAJoin(final Xid xid) throws Exception
{
Transaction theTx = resourceManager.getTransaction(xid);
@@ -637,7 +594,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
- private SessionXAResponseMessage XAPrepare(Xid xid) throws Exception
+ public SessionXAResponseMessage XAPrepare(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -677,13 +634,13 @@
}
else
{
- theTx.prepare(sp.getPersistenceManager());
+ theTx.prepare(persistenceManager);
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
}
- private SessionXAResponseMessage XAResume(Xid xid) throws Exception
+ public SessionXAResponseMessage XAResume(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -713,7 +670,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
- private SessionXAResponseMessage XARollback(Xid xid) throws Exception
+ public SessionXAResponseMessage XARollback(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -736,7 +693,7 @@
XAException.XAER_PROTO,
"Cannot rollback transaction, it is suspended " + xid); }
- theTx.rollback(sp.getPersistenceManager());
+ theTx.rollback(persistenceManager);
boolean removed = resourceManager.removeTransaction(xid);
@@ -750,7 +707,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
- private SessionXAResponseMessage XAStart(Xid xid)
+ public SessionXAResponseMessage XAStart(final Xid xid)
{
if (tx != null)
{
@@ -775,7 +732,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
- private SessionXAResponseMessage XASuspend() throws Exception
+ public SessionXAResponseMessage XASuspend() throws Exception
{
if (tx == null)
{
@@ -800,38 +757,32 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
- private List<Xid> getInDoubtXids() throws Exception
+ public List<Xid> getInDoubtXids() throws Exception
{
return null;
}
- private int getXATimeout()
+ public int getXATimeout()
{
return resourceManager.getTimeoutSeconds();
}
- private boolean setXATimeout(int timeoutSeconds)
+ public boolean setXATimeout(int timeoutSeconds)
{
return resourceManager.setTimeoutSeconds(timeoutSeconds);
}
- // Protected
- // ------------------------------------------------------------------------------------
-
- // Private
- // --------------------------------------------------------------------------------------
-
- private void addAddress(String address) throws Exception
+ public void addAddress(final String address) throws Exception
{
if (postOffice.containsAllowableAddress(address))
{
throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
}
- security.check(address, CheckType.CREATE, getConnectionEndpoint());
+ security.check(address, CheckType.CREATE, connection);
postOffice.addAllowableAddress(address);
}
- private void removeAddress(String address) throws Exception
+ public void removeAddress(final String address) throws Exception
{
if (!postOffice.removeAllowableAddress(address))
{
@@ -839,16 +790,15 @@
}
}
- private void createQueue(String address, String queueName,
- String filterString, boolean durable, boolean temporary)
- throws Exception
+ public void createQueue(final String address, final String queueName,
+ final String filterString, boolean durable, final boolean temporary) throws Exception
{
//make sure the user has privileges to create this address
- if(!postOffice.containsAllowableAddress(address))
+ if (!postOffice.containsAllowableAddress(address))
{
try
{
- security.check(address, CheckType.CREATE, getConnectionEndpoint());
+ security.check(address, CheckType.CREATE, connection);
postOffice.addAllowableAddress(address);
}
catch (MessagingException e)
@@ -882,11 +832,11 @@
{
Queue queue = binding.getQueue();
- connectionEndpoint.addTemporaryQueue(queue);
+ connection.addTemporaryQueue(queue);
}
}
- private void deleteQueue(String queueName) throws Exception
+ public void deleteQueue(final String queueName) throws Exception
{
Binding binding = postOffice.removeBinding(queueName);
@@ -899,17 +849,18 @@
if (queue.isDurable())
{
- sp.getPersistenceManager().deleteAllReferences(binding.getQueue());
+ persistenceManager.deleteAllReferences(binding.getQueue());
}
if (queue.isTemporary())
{
- connectionEndpoint.removeTemporaryQueue(queue);
+ connection.removeTemporaryQueue(queue);
}
}
- private SessionCreateConsumerResponseMessage createConsumer(String queueName, String filterString,
- boolean noLocal, boolean autoDeleteQueue) throws Exception
+ public SessionCreateConsumerResponseMessage
+ createConsumer(final String queueName, final String filterString,
+ final boolean noLocal, final boolean autoDeleteQueue, final int prefetchSize) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
@@ -917,11 +868,9 @@
{
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
- security.check(binding.getAddress(), CheckType.READ, getConnectionEndpoint());
- int prefetchSize = connectionEndpoint.getPrefetchSize();
-
- String consumerID = UUID.randomUUID().toString();
-
+
+ security.check(binding.getAddress(), CheckType.READ, connection);
+
Filter filter = null;
if (filterString != null)
@@ -929,26 +878,26 @@
filter = new FilterImpl(filterString);
}
- ServerConsumerEndpoint ep = new ServerConsumerEndpoint(sp, consumerID,
- binding.getQueue(), this, filter, noLocal, autoDeleteQueue, prefetchSize > 0);
+ ServerConsumer consumer =
+ new ServerConsumerEndpoint(binding.getQueue(), noLocal, filter, autoDeleteQueue, prefetchSize > 0, connection.getID(),
+ this, persistenceManager, postOffice, connection.isStarted());
- connectionEndpoint.getMessagingServer().getRemotingService()
- .getDispatcher().register(ep.newHandler());
+ dispatcher.register(new ServerConsumerPacketHandler(consumer));
- SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(consumerID,
+ SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(consumer.getID(),
prefetchSize);
synchronized (consumers)
{
- consumers.put(consumerID, ep);
+ consumers.put(consumer.getID(), consumer);
}
- log.trace(this + " created and registered " + ep);
+ log.trace(this + " created and registered " + consumer);
return response;
}
- public SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception
+ public SessionQueueQueryResponseMessage executeQueueQuery(final SessionQueueQueryMessage request) throws Exception
{
if (request.getQueueName() == null)
{
@@ -979,7 +928,7 @@
return response;
}
- public SessionBindingQueryResponseMessage executeBindingQuery(SessionBindingQueryMessage request) throws Exception
+ public SessionBindingQueryResponseMessage executeBindingQuery(final SessionBindingQueryMessage request) throws Exception
{
if (request.getAddress() == null)
{
@@ -1003,14 +952,15 @@
return new SessionBindingQueryResponseMessage(exists, queueNames);
}
- private SessionCreateBrowserResponseMessage createBrowser(String queueName, String selector)
+ public SessionCreateBrowserResponseMessage createBrowser(final String queueName, final String selector)
throws Exception
{
if(!postOffice.containsAllowableAddress(queueName))
{
try
{
- security.check(queueName, CheckType.CREATE, this.getConnectionEndpoint());
+ security.check(queueName, CheckType.CREATE, connection);
+
postOffice.addAllowableAddress(queueName);
}
catch (MessagingException e)
@@ -1024,224 +974,30 @@
{
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
- security.check(binding.getAddress(), CheckType.READ, this.getConnectionEndpoint());
- String browserID = UUID.randomUUID().toString();
+ security.check(binding.getAddress(), CheckType.READ, connection);
+
+ ServerBrowserEndpoint browser = new ServerBrowserEndpoint(this, binding.getQueue(), selector);
- ServerBrowserEndpoint ep = new ServerBrowserEndpoint(this, browserID,
- binding.getQueue(), selector);
-
// still need to synchronized since close() can come in on a different
// thread
synchronized (browsers)
{
- browsers.put(browserID, ep);
+ browsers.put(browser.getID(), browser);
}
- connectionEndpoint.getMessagingServer().getRemotingService()
- .getDispatcher().register(ep.newHandler());
+ dispatcher.register(browser.newHandler());
- log.trace(this + " created and registered " + ep);
+ log.trace(this + " created and registered " + browser);
- return new SessionCreateBrowserResponseMessage(browserID);
+ return new SessionCreateBrowserResponseMessage(browser.getID());
}
-
- public PacketHandler newHandler()
+
+ // Public ---------------------------------------------------------------------------------------------
+
+ public String toString()
{
- return new ServerSessionEndpointPacketHandler();
+ return "SessionEndpoint[" + id + "]";
}
+
- // Inner classes
- // --------------------------------------------------------------------------------
-
- private class ServerSessionEndpointPacketHandler extends ServerPacketHandlerSupport
- {
- public ServerSessionEndpointPacketHandler()
- {
- }
-
- public String getID()
- {
- return ServerSessionEndpoint.this.id;
- }
-
- public Packet doHandle(Packet packet, PacketSender sender)
- throws Exception
- {
- Packet response = null;
-
- PacketType type = packet.getType();
-
- // TODO use a switch for this
- if (type == SESS_SEND)
- {
- SessionSendMessage message = (SessionSendMessage) packet;
-
- send(message.getAddress(), message.getMessage());
- }
- else if (type == SESS_CREATECONSUMER)
- {
- SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
-
- response = createConsumer(request.getQueueName(), request
- .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue());
- }
- else if (type == SESS_CREATEQUEUE)
- {
- SessionCreateQueueMessage request = (SessionCreateQueueMessage) packet;
-
- createQueue(request.getAddress(), request.getQueueName(), request
- .getFilterString(), request.isDurable(), request
- .isTemporary());
- }
- else if (type == SESS_DELETE_QUEUE)
- {
- SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
-
- deleteQueue(request.getQueueName());
- }
- else if (type == SESS_QUEUEQUERY)
- {
- SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
-
- response = executeQueueQuery(request);
- }
- else if (type == SESS_BINDINGQUERY)
- {
- SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
-
- response = executeBindingQuery(request);
- }
- else if (type == SESS_CREATEBROWSER)
- {
- SessionCreateBrowserMessage request = (SessionCreateBrowserMessage) packet;
-
- response = createBrowser(request.getQueueName(), request
- .getFilterString());
- }
- else if (type == CLOSE)
- {
- close();
- }
- else if (type == SESS_ACKNOWLEDGE)
- {
- SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
-
- acknowledge(message.getDeliveryID(), message.isAllUpTo());
- }
- else if (type == SESS_COMMIT)
- {
- commit();
- }
- else if (type == SESS_ROLLBACK)
- {
- rollback();
- }
- else if (type == SESS_CANCEL)
- {
- SessionCancelMessage message = (SessionCancelMessage) packet;
-
- cancel(message.getDeliveryID(), message.isExpired());
- }
- else if (type == SESS_XA_COMMIT)
- {
- SessionXACommitMessage message = (SessionXACommitMessage) packet;
-
- response = XACommit(message.isOnePhase(), message.getXid());
- }
- else if (type == SESS_XA_END)
- {
- SessionXAEndMessage message = (SessionXAEndMessage) packet;
-
- response = XAEnd(message.getXid(), message.isFailed());
- }
- else if (type == SESS_XA_FORGET)
- {
- SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
-
- response = XAForget(message.getXid());
- }
- else if (type == SESS_XA_JOIN)
- {
- SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
-
- response = XAJoin(message.getXid());
- }
- else if (type == SESS_XA_RESUME)
- {
- SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
-
- response = XAResume(message.getXid());
- }
- else if (type == SESS_XA_ROLLBACK)
- {
- SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
-
- response = XARollback(message.getXid());
- }
- else if (type == SESS_XA_START)
- {
- SessionXAStartMessage message = (SessionXAStartMessage) packet;
-
- response = XAStart(message.getXid());
- }
- else if (type == SESS_XA_SUSPEND)
- {
- response = XASuspend();
- }
- else if (type == SESS_XA_PREPARE)
- {
- SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
-
- response = XAPrepare(message.getXid());
- }
- else if (type == SESS_XA_INDOUBT_XIDS)
- {
- List<Xid> xids = getInDoubtXids();
-
- response = new SessionXAGetInDoubtXidsResponseMessage(xids);
- }
- else if (type == SESS_XA_GET_TIMEOUT)
- {
- response = new SessionXAGetTimeoutResponseMessage(getXATimeout());
- }
- else if (type == SESS_XA_SET_TIMEOUT)
- {
- SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet;
-
- response = new SessionXASetTimeoutResponseMessage(setXATimeout(message
- .getTimeoutSeconds()));
- }
- else if (type == PacketType.SESS_ADD_ADDRESS)
- {
- SessionAddAddressMessage message = (SessionAddAddressMessage) packet;
-
- addAddress(message.getAddress());
- }
- else if (type == PacketType.SESS_REMOVE_ADDRESS)
- {
- SessionRemoveAddressMessage message = (SessionRemoveAddressMessage) packet;
-
- removeAddress(message.getAddress());
- }
- else
- {
- throw new MessagingException(MessagingException.UNSUPPORTED_PACKET, "Unsupported packet " + type);
- }
-
- // reply if necessary
- if (response == null && packet.isOneWay() == false)
- {
- response = new NullPacket();
- }
-
- return response;
- }
-
- @Override
- public String toString()
- {
- return "ServerSessionEndpointPacketHandler[id=" + id + "]";
- }
- }
-
}
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -0,0 +1,286 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CANCEL;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_INDOUBT_XIDS;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_PREPARE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_RESUME;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_ROLLBACK;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_START;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_SUSPEND;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.util.MessagingException;
+
+
+/**
+ *
+ * A ServerSessionPacketHandler
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ServerSessionPacketHandler extends ServerPacketHandlerSupport
+{
+ private final ServerSession session;
+
+ private final int prefetchSize;
+
+ public ServerSessionPacketHandler(final ServerSession session, final int prefetchSize)
+ {
+ this.session = session;
+
+ this.prefetchSize = prefetchSize;
+ }
+
+ public String getID()
+ {
+ return session.getID();
+ }
+
+ public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
+ {
+ Packet response = null;
+
+ PacketType type = packet.getType();
+
+ // TODO use a switch for this
+ if (type == SESS_SEND)
+ {
+ SessionSendMessage message = (SessionSendMessage) packet;
+
+ session.send(message.getAddress(), message.getMessage());
+ }
+ else if (type == SESS_CREATECONSUMER)
+ {
+ SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
+
+ response = session.createConsumer(request.getQueueName(), request
+ .getFilterString(), request.isNoLocal(), request.isAutoDeleteQueue(), prefetchSize);
+ }
+ else if (type == SESS_CREATEQUEUE)
+ {
+ SessionCreateQueueMessage request = (SessionCreateQueueMessage) packet;
+
+ session.createQueue(request.getAddress(), request.getQueueName(), request
+ .getFilterString(), request.isDurable(), request
+ .isTemporary());
+ }
+ else if (type == SESS_DELETE_QUEUE)
+ {
+ SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
+
+ session.deleteQueue(request.getQueueName());
+ }
+ else if (type == SESS_QUEUEQUERY)
+ {
+ SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
+
+ response = session.executeQueueQuery(request);
+ }
+ else if (type == SESS_BINDINGQUERY)
+ {
+ SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
+
+ response = session.executeBindingQuery(request);
+ }
+ else if (type == SESS_CREATEBROWSER)
+ {
+ SessionCreateBrowserMessage request = (SessionCreateBrowserMessage) packet;
+
+ response = session.createBrowser(request.getQueueName(), request
+ .getFilterString());
+ }
+ else if (type == CLOSE)
+ {
+ session.close();
+ }
+ else if (type == SESS_ACKNOWLEDGE)
+ {
+ SessionAcknowledgeMessage message = (SessionAcknowledgeMessage) packet;
+
+ session.acknowledge(message.getDeliveryID(), message.isAllUpTo());
+ }
+ else if (type == SESS_COMMIT)
+ {
+ session.commit();
+ }
+ else if (type == SESS_ROLLBACK)
+ {
+ session.rollback();
+ }
+ else if (type == SESS_CANCEL)
+ {
+ SessionCancelMessage message = (SessionCancelMessage) packet;
+
+ session.cancel(message.getDeliveryID(), message.isExpired());
+ }
+ else if (type == SESS_XA_COMMIT)
+ {
+ SessionXACommitMessage message = (SessionXACommitMessage) packet;
+
+ response = session.XACommit(message.isOnePhase(), message.getXid());
+ }
+ else if (type == SESS_XA_END)
+ {
+ SessionXAEndMessage message = (SessionXAEndMessage) packet;
+
+ response = session.XAEnd(message.getXid(), message.isFailed());
+ }
+ else if (type == SESS_XA_FORGET)
+ {
+ SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
+
+ response = session.XAForget(message.getXid());
+ }
+ else if (type == SESS_XA_JOIN)
+ {
+ SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
+
+ response = session.XAJoin(message.getXid());
+ }
+ else if (type == SESS_XA_RESUME)
+ {
+ SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
+
+ response = session.XAResume(message.getXid());
+ }
+ else if (type == SESS_XA_ROLLBACK)
+ {
+ SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
+
+ response = session.XARollback(message.getXid());
+ }
+ else if (type == SESS_XA_START)
+ {
+ SessionXAStartMessage message = (SessionXAStartMessage) packet;
+
+ response = session.XAStart(message.getXid());
+ }
+ else if (type == SESS_XA_SUSPEND)
+ {
+ response = session.XASuspend();
+ }
+ else if (type == SESS_XA_PREPARE)
+ {
+ SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
+
+ response = session.XAPrepare(message.getXid());
+ }
+ else if (type == SESS_XA_INDOUBT_XIDS)
+ {
+ List<Xid> xids = session.getInDoubtXids();
+
+ response = new SessionXAGetInDoubtXidsResponseMessage(xids);
+ }
+ else if (type == SESS_XA_GET_TIMEOUT)
+ {
+ response = new SessionXAGetTimeoutResponseMessage(session.getXATimeout());
+ }
+ else if (type == SESS_XA_SET_TIMEOUT)
+ {
+ SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet;
+
+ response = new SessionXASetTimeoutResponseMessage(session.setXATimeout(message
+ .getTimeoutSeconds()));
+ }
+ else if (type == PacketType.SESS_ADD_ADDRESS)
+ {
+ SessionAddAddressMessage message = (SessionAddAddressMessage) packet;
+
+ session.addAddress(message.getAddress());
+ }
+ else if (type == PacketType.SESS_REMOVE_ADDRESS)
+ {
+ SessionRemoveAddressMessage message = (SessionRemoveAddressMessage) packet;
+
+ session.removeAddress(message.getAddress());
+ }
+ else
+ {
+ throw new MessagingException(MessagingException.UNSUPPORTED_PACKET, "Unsupported packet " + type);
+ }
+
+ // reply if necessary
+ if (response == null && packet.isOneWay() == false)
+ {
+ response = new NullPacket();
+ }
+
+ return response;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ServerSessionPacketHandler[id=" + session.getID() + "]";
+ }
+}
Modified: trunk/src/main/org/jboss/messaging/core/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServer.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServer.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,11 +21,9 @@
*/
package org.jboss.messaging.core;
-
import java.util.HashSet;
import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.SecurityStore;
import org.jboss.jms.server.security.Role;
import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.util.HierarchicalRepository;
@@ -63,20 +61,14 @@
RemotingService getRemotingService();
- SecurityStore getSecurityManager();
-
ConnectionManager getConnectionManager();
- MemoryManager getMemoryManager();
-
PersistenceManager getPersistenceManager();
void setPersistenceManager(PersistenceManager persistenceManager);
PostOffice getPostOffice();
- ResourceManager getResourceManager();
-
HierarchicalRepository<HashSet<Role>> getSecurityRepository();
void setPostOffice(PostOffice postOffice);
Modified: trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,14 +21,14 @@
*/
package org.jboss.messaging.core;
-import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
-import org.jboss.messaging.core.impl.filter.FilterImpl;
-import org.jboss.jms.client.api.ClientConnectionFactory;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
-
-import java.util.List;
import java.util.Collection;
+import java.util.List;
+import org.jboss.jms.client.api.ClientConnectionFactory;
+import org.jboss.jms.server.endpoint.ServerConnection;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
+import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+
/**
* This interface describes the management interface exposed by the server
*
@@ -37,10 +37,6 @@
*/
public interface MessagingServerManagement
{
-// String getServerVersion();
-//
-// Configuration getConfiguration();
-//
boolean isStarted();
void createQueue(String address,String name) throws Exception;
@@ -93,7 +89,7 @@
public int getConsumerCountForQueue(String queue) throws Exception;
- List<ServerConnectionEndpoint> getActiveConnections();
+ List<ServerConnection> getActiveConnections();
void moveMessages(String toQueue, String fromQueue, FilterImpl filter) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -30,7 +30,6 @@
import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.server.ConnectionManager;
-import org.jboss.jms.server.SecurityStore;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
import org.jboss.jms.server.endpoint.MessagingServerPacketHandler;
import org.jboss.jms.server.security.NullAuthenticationManager;
@@ -203,7 +202,13 @@
remotingService.addFailureListener(connectionManager);
memoryManager.start();
postOffice.start();
- MessagingServerPacketHandler serverPacketHandler = new MessagingServerPacketHandler(this);
+
+ MessagingServerPacketHandler serverPacketHandler =
+ new MessagingServerPacketHandler(remotingService.getDispatcher(), resourceManager,
+ persistenceManager, postOffice, securityStore,
+ connectionManager);
+
+
getRemotingService().getDispatcher().register(serverPacketHandler);
ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -318,20 +323,20 @@
public void createQueue(String address, String name) throws Exception
{
- if (getPostOffice().getBinding(name) == null)
+ if (postOffice.getBinding(name) == null)
{
- getPostOffice().addBinding(address, name, null, true, false);
+ postOffice.addBinding(address, name, null, true, false);
}
- if (!getPostOffice().containsAllowableAddress(address))
+ if (!postOffice.containsAllowableAddress(address))
{
- getPostOffice().addAllowableAddress(address);
+ postOffice.addAllowableAddress(address);
}
}
public boolean destroyQueuesByAddress(String address) throws Exception
{
- List<Binding> bindings = getPostOffice().getBindingsForAddress(address);
+ List<Binding> bindings = postOffice.getBindingsForAddress(address);
boolean destroyed = false;
@@ -339,23 +344,23 @@
{
Queue queue = binding.getQueue();
- getPersistenceManager().deleteAllReferences(queue);
+ persistenceManager.deleteAllReferences(queue);
queue.removeAllReferences();
- getPostOffice().removeBinding(queue.getName());
+ postOffice.removeBinding(queue.getName());
destroyed = true;
}
- getPostOffice().removeAllowableAddress(address);
+ postOffice.removeAllowableAddress(address);
return destroyed;
}
public boolean destroyQueue(String name) throws Exception
{
- Binding binding = getPostOffice().getBinding(name);
+ Binding binding = postOffice.getBinding(name);
boolean destroyed = false;
@@ -363,11 +368,11 @@
{
Queue queue = binding.getQueue();
- getPersistenceManager().deleteAllReferences(queue);
+ persistenceManager.deleteAllReferences(queue);
queue.removeAllReferences();
- getPostOffice().removeBinding(queue.getName());
+ postOffice.removeBinding(queue.getName());
destroyed = true;
}
@@ -377,9 +382,9 @@
public boolean addAddress(String address)
{
- if (!getPostOffice().containsAllowableAddress(address))
+ if (!postOffice.containsAllowableAddress(address))
{
- getPostOffice().addAllowableAddress(address);
+ postOffice.addAllowableAddress(address);
return true;
}
return false;
@@ -387,9 +392,9 @@
public boolean removeAddress(String address)
{
- if (getPostOffice().containsAllowableAddress(address))
+ if (postOffice.containsAllowableAddress(address))
{
- getPostOffice().removeAllowableAddress(address);
+ postOffice.removeAllowableAddress(address);
return true;
}
return false;
@@ -451,21 +456,11 @@
}
}
- public SecurityStore getSecurityManager()
- {
- return securityStore;
- }
-
public ConnectionManager getConnectionManager()
{
return connectionManager;
}
- public MemoryManager getMemoryManager()
- {
- return memoryManager;
- }
-
public PersistenceManager getPersistenceManager()
{
return persistenceManager;
@@ -486,11 +481,6 @@
this.postOffice = postOffice;
}
- public ResourceManager getResourceManager()
- {
- return resourceManager;
- }
-
public HierarchicalRepository<HashSet<Role>> getSecurityRepository()
{
return securityRepository;
@@ -524,7 +514,7 @@
{
- List<Binding> bindings = getPostOffice().getBindingsForAddress(address);
+ List<Binding> bindings = postOffice.getBindingsForAddress(address);
boolean destroyed = false;
@@ -532,16 +522,16 @@
{
Queue queue = binding.getQueue();
- getPersistenceManager().deleteAllReferences(queue);
+ persistenceManager.deleteAllReferences(queue);
queue.removeAllReferences();
- getPostOffice().removeBinding(queue.getName());
+ postOffice.removeBinding(queue.getName());
destroyed = true;
}
- getPostOffice().removeAllowableAddress(address);
+ postOffice.removeAllowableAddress(address);
return destroyed;
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -21,21 +21,30 @@
*/
package org.jboss.messaging.core.impl.server;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledFuture;
import org.jboss.aop.microcontainer.aspects.jmx.JMX;
import org.jboss.jms.client.api.ClientConnectionFactory;
import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
-import org.jboss.jms.client.SelectorTranslator;
-import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
-import org.jboss.messaging.core.*;
+import org.jboss.jms.server.endpoint.ServerConnection;
+import org.jboss.messaging.core.Binding;
+import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.MessagingComponent;
+import org.jboss.messaging.core.MessagingServer;
+import org.jboss.messaging.core.MessagingServerManagement;
import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
-import org.jboss.messaging.core.impl.filter.FilterImpl;
import org.jboss.messaging.util.MessagingException;
/**
@@ -325,7 +334,7 @@
return getQueue(queue).getConsumerCount();
}
- public List<ServerConnectionEndpoint> getActiveConnections()
+ public List<ServerConnection> getActiveConnections()
{
return messagingServer.getConnectionManager().getActiveConnections();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -102,7 +102,8 @@
callFilters(packet);
handler.handle(packet, sender);
- } else
+ }
+ else
{
log.error("Unhandled packet " + packet);
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2008-02-23 13:51:34 UTC (rev 3777)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2008-02-24 12:15:29 UTC (rev 3778)
@@ -651,8 +651,6 @@
assertRemainingMessages(NUM_MESSAGES);
- log.info("Sent messages");
-
int count = 0;
Message m = null;
@@ -662,8 +660,6 @@
m = consumer.receive(200);
- log.info("got message " + m);
-
assertRemainingMessages(NUM_MESSAGES - (i + 1));
if (m == null) break;
@@ -703,11 +699,9 @@
{
final int BATCH_SIZE = 10;
- log.info("*********** DEPLOYING CF");
ArrayList<String> bindings = new ArrayList<String>();
bindings.add("mycf");
deployConnectionFactory(null,"mycf", bindings, -1, -1, -1, -1, false, false, false, BATCH_SIZE);
- log.info("************ DONE DEPLOY");
Connection conn = null;
try
More information about the jboss-cvs-commits
mailing list