[jboss-cvs] JBoss Messaging SVN: r3783 - in trunk: src/main/org/jboss/jms/client/api and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 25 07:15:14 EST 2008
Author: timfox
Date: 2008-02-25 07:15:14 -0500 (Mon, 25 Feb 2008)
New Revision: 3783
Added:
trunk/src/main/org/jboss/jms/server/endpoint/ServerProducer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerResponseMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ProducerSendMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerResponseMessage.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
Modified:
trunk/src/main/org/jboss/jms/client/JBossConnection.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
trunk/src/main/org/jboss/jms/client/api/ClientSession.java
trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionResponse.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/MessagingExceptionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAcknowledgeMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAddAddressMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserResetMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionDeleteQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRemoveAddressMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXACommitMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAEndMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAForgetMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetInDoubtXidsResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetTimeoutResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAJoinMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAPrepareMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResumeMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXARollbackMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAStartMessage.java
trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
Log:
Various tweaks plus create producer endpoint in preparation for producer flow control, also added producer caching
Modified: trunk/src/main/org/jboss/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnection.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/JBossConnection.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -115,9 +115,9 @@
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
- return createSessionInternal(transacted, acknowledgeMode, false, TYPE_GENERIC_CONNECTION);
+ return createSessionInternal(transacted, acknowledgeMode, false, TYPE_GENERIC_CONNECTION, false);
}
-
+
public String getClientID() throws JMSException
{
checkClosed();
@@ -263,7 +263,7 @@
int acknowledgeMode) throws JMSException
{
return createSessionInternal(transacted, acknowledgeMode, false,
- JBossSession.TYPE_QUEUE_SESSION);
+ JBossSession.TYPE_QUEUE_SESSION, false);
}
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
@@ -281,9 +281,9 @@
int acknowledgeMode) throws JMSException
{
return createSessionInternal(transacted, acknowledgeMode, false,
- JBossSession.TYPE_TOPIC_SESSION);
+ JBossSession.TYPE_TOPIC_SESSION, false);
}
-
+
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException
@@ -298,29 +298,63 @@
public XASession createXASession() throws JMSException
{
return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
- JBossSession.TYPE_GENERIC_SESSION);
+ JBossSession.TYPE_GENERIC_SESSION, false);
}
-
+
// XAQueueConnection implementation -------------------------------------------------------------
public XAQueueSession createXAQueueSession() throws JMSException
{
return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
- JBossSession.TYPE_QUEUE_SESSION);
+ JBossSession.TYPE_QUEUE_SESSION, false);
}
+
// XATopicConnection implementation -------------------------------------------------------------
public XATopicSession createXATopicSession() throws JMSException
{
return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
- JBossSession.TYPE_TOPIC_SESSION);
+ JBossSession.TYPE_TOPIC_SESSION, false);
}
-
+
// Public ---------------------------------------------------------------------------------------
+ // We provide some overloaded createSession methods to allow the value of cacheProducers to be specified
+
+ public Session createSession(boolean transacted, int acknowledgeMode, boolean cacheProducers) throws JMSException
+ {
+ return createSessionInternal(transacted, acknowledgeMode, false, TYPE_GENERIC_CONNECTION, cacheProducers);
+ }
+
+ public TopicSession createTopicSession(boolean transacted,
+ int acknowledgeMode, boolean cacheProducers) throws JMSException
+ {
+ return createSessionInternal(transacted, acknowledgeMode, false,
+ JBossSession.TYPE_TOPIC_SESSION, cacheProducers);
+ }
+
+ public XASession createXASession(boolean cacheProducers) throws JMSException
+ {
+ return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
+ JBossSession.TYPE_GENERIC_SESSION, cacheProducers);
+ }
+
+ public XAQueueSession createXAQueueSession(boolean cacheProducers) throws JMSException
+ {
+ return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
+ JBossSession.TYPE_QUEUE_SESSION, cacheProducers);
+ }
+
+ public XATopicSession createXATopicSession(boolean cacheProducers) throws JMSException
+ {
+ return createSessionInternal(true, Session.SESSION_TRANSACTED, true,
+ JBossSession.TYPE_TOPIC_SESSION, cacheProducers);
+ }
+
+
public ClientConnection getConnection()
{
return connection;
@@ -342,7 +376,7 @@
// Protected ------------------------------------------------------------------------------------
protected JBossSession createSessionInternal(boolean transacted, int acknowledgeMode,
- boolean isXA, int type) throws JMSException
+ boolean isXA, int type, boolean cacheProducers) throws JMSException
{
if (transacted)
{
@@ -387,7 +421,8 @@
}
}
- ClientSession session = connection.createClientSession(isXA, autoCommitSends, autoCommitAcks, ackBatchSize);
+ ClientSession session =
+ connection.createClientSession(isXA, autoCommitSends, autoCommitAcks, ackBatchSize, cacheProducers);
justCreated = false;
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -343,11 +343,13 @@
throw new InvalidDestinationException("Not a JBoss Destination:" + d);
}
+ JBossDestination jbd = (JBossDestination)d;
+
try
{
- ClientProducer producer = session.createProducer();
+ ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getAddress());
- return new JBossMessageProducer(producer, (JBossDestination)d);
+ return new JBossMessageProducer(producer, jbd);
}
catch (MessagingException e)
{
Modified: trunk/src/main/org/jboss/jms/client/api/ClientConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConnection.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConnection.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -16,7 +16,7 @@
public interface ClientConnection
{
ClientSession createClientSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
- int ackBatchSize) throws MessagingException;
+ int ackBatchSize, boolean cacheProducers) throws MessagingException;
void start() throws MessagingException;
Modified: trunk/src/main/org/jboss/jms/client/api/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientProducer.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/api/ClientProducer.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -15,6 +15,8 @@
*/
public interface ClientProducer
{
+ String getAddress();
+
void send(String address, Message message) throws MessagingException;
void registerAcknowledgementHandler(AcknowledgementHandler handler);
Modified: trunk/src/main/org/jboss/jms/client/api/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientSession.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/api/ClientSession.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -38,7 +38,7 @@
ClientBrowser createBrowser(String queueName, String messageSelector) throws MessagingException;
- ClientProducer createProducer() throws MessagingException;
+ ClientProducer createProducer(String address) throws MessagingException;
XAResource getXAResource();
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConnectionImpl.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -90,7 +90,7 @@
// ClientConnection implementation --------------------------------------------------------------
public ClientSession createClientSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks,
- int ackBatchSize) throws MessagingException
+ int ackBatchSize, boolean cacheProducers) throws MessagingException
{
checkClosed();
@@ -98,7 +98,7 @@
ConnectionCreateSessionResponseMessage response = (ConnectionCreateSessionResponseMessage)remotingConnection.send(id, request);
- ClientSession session = new ClientSessionImpl(this, response.getSessionID(), ackBatchSize);
+ ClientSession session = new ClientSessionImpl(this, response.getSessionID(), ackBatchSize, cacheProducers);
children.put(response.getSessionID(), session);
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientProducerImpl.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -23,7 +23,9 @@
import org.jboss.jms.client.api.AcknowledgementHandler;
import org.jboss.jms.client.api.ClientProducer;
+import org.jboss.jms.client.remoting.RemotingConnection;
import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.remoting.wireformat.ProducerSendMessage;
import org.jboss.messaging.util.Logger;
import org.jboss.messaging.util.MessagingException;
@@ -47,26 +49,46 @@
private boolean trace = log.isTraceEnabled();
- private ClientSessionInternal session;
+ private final String address;
+ private final String id;
+
+ private final ClientSessionInternal session;
+
+ private final RemotingConnection remotingConnection;
+
private volatile boolean closed;
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
- public ClientProducerImpl(ClientSessionInternal session)
- {
+ public ClientProducerImpl(final ClientSessionInternal session, final String id, final String address,
+ final RemotingConnection remotingConnection)
+ {
this.session = session;
+
+ this.id = id;
+
+ this.address = address;
+
+ this.remotingConnection = remotingConnection;
}
// ClientProducer implementation ----------------------------------------------------------------
- public void send(String address, Message message) throws MessagingException
+ public String getAddress()
{
+ return address;
+ }
+
+ public void send(String address, Message msg) throws MessagingException
+ {
checkClosed();
- session.send(address, message);
+ ProducerSendMessage message = new ProducerSendMessage(address, msg.copy());
+
+ remotingConnection.send(id, message, !msg.isDurable());
}
public void registerAcknowledgementHandler(AcknowledgementHandler handler)
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -38,7 +38,6 @@
import org.jboss.jms.client.api.ClientConsumer;
import org.jboss.jms.client.api.ClientProducer;
import org.jboss.jms.client.remoting.RemotingConnection;
-import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.CloseMessage;
@@ -53,13 +52,14 @@
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
@@ -99,10 +99,16 @@
// Attributes -----------------------------------------------------------------------------------
- private String id;
+ private final ClientConnectionInternal connection;
+
+ private final String id;
- private int lazyAckBatchSize;
+ private final int lazyAckBatchSize;
+ private final boolean cacheProducers;
+
+ private final ExecutorService executor;
+
private volatile boolean closed;
private boolean acked = true;
@@ -117,25 +123,23 @@
private boolean deliveryExpired;
- private ExecutorService executor;
-
- private RemotingConnection remotingConnection;
-
- private ClientConnectionInternal connection;
+ private final RemotingConnection remotingConnection;
- private Set<ClientBrowser> browsers = new HashSet<ClientBrowser>();
+ private final Set<ClientBrowser> browsers = new HashSet<ClientBrowser>();
- private Set<ClientProducer> producers = new HashSet<ClientProducer>();
+ private final Set<ClientProducer> producers = new HashSet<ClientProducer>();
- private Map<String, ClientConsumerInternal> consumers = new HashMap<String, ClientConsumerInternal>();
+ private final Map<String, ClientConsumerInternal> consumers = new HashMap<String, ClientConsumerInternal>();
+ private final Map<String, ClientProducer> producerCache;
+
//For testing only
private boolean forceNotSameRM;
// Constructors ---------------------------------------------------------------------------------
- public ClientSessionImpl(ClientConnectionInternal connection, String id,
- int lazyAckBatchSize) throws MessagingException
+ public ClientSessionImpl(final ClientConnectionInternal connection, final String id,
+ final int lazyAckBatchSize, final boolean cacheProducers) throws MessagingException
{
this.id = id;
@@ -143,9 +147,20 @@
this.remotingConnection = connection.getRemotingConnection();
+ this.cacheProducers = cacheProducers;
+
executor = Executors.newSingleThreadExecutor();
- this.lazyAckBatchSize = lazyAckBatchSize;
+ this.lazyAckBatchSize = lazyAckBatchSize;
+
+ if (cacheProducers)
+ {
+ producerCache = new HashMap<String, ClientProducer>();
+ }
+ else
+ {
+ producerCache = null;
+ }
}
// ClientSession implementation -----------------------------------------------------------------
@@ -263,11 +278,26 @@
return browser;
}
- public ClientProducer createProducer() throws MessagingException
+ public ClientProducer createProducer(String address) throws MessagingException
{
checkClosed();
+
+ ClientProducer producer = null;
+
+ if (cacheProducers)
+ {
+ producer = producerCache.remove(address);
+ }
- ClientProducer producer = new ClientProducerImpl(this);
+ if (producer == null)
+ {
+ SessionCreateProducerMessage request = new SessionCreateProducerMessage(address);
+
+ SessionCreateProducerResponseMessage response =
+ (SessionCreateProducerResponseMessage)remotingConnection.send(id, request);
+
+ producer = new ClientProducerImpl(this, response.getProducerID(), address, remotingConnection);
+ }
producers.add(producer);
@@ -353,6 +383,11 @@
{
closeChildren();
+ if (cacheProducers)
+ {
+ producerCache.clear();
+ }
+
//Make sure any remaining acks make it to the server
acknowledgeInternal(false);
@@ -418,6 +453,11 @@
public void removeProducer(ClientProducer producer)
{
producers.remove(producer);
+
+ if (cacheProducers && !producerCache.containsKey(producer.getAddress()))
+ {
+ producerCache.put(producer.getAddress(), producer);
+ }
}
public void removeBrowser(ClientBrowser browser)
@@ -425,15 +465,8 @@
browsers.remove(browser);
}
- public void send(String address, Message m) throws MessagingException
- {
- checkClosed();
+
- SessionSendMessage message = new SessionSendMessage(address, m.copy());
-
- remotingConnection.send(id, message, !m.isDurable());
- }
-
// XAResource implementation --------------------------------------------------------------------
public void commit(Xid xid, boolean onePhase) throws XAException
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionInternal.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionInternal.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -33,7 +33,5 @@
void removeProducer(ClientProducer producer);
- void removeBrowser(ClientBrowser browser);
-
- void send(String address, Message message) throws MessagingException;
+ void removeBrowser(ClientBrowser browser);
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -131,8 +131,6 @@
{
if (enableFlowControl && availableTokens.get() == 0)
{
- if (trace) { log.trace(this + " is NOT accepting messages!"); }
-
return HandleStatus.BUSY;
}
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerProducer.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerProducer.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,23 @@
+package org.jboss.jms.server.endpoint;
+
+import org.jboss.messaging.core.Message;
+
+/**
+ *
+ * A ServerProducer
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface ServerProducer
+{
+ String getID();
+
+ void close() throws Exception;
+
+ void send(String address, Message msg) throws Exception;
+
+ void sendCredits(int credits);
+
+ int getNumCredits();
+}
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerEndpoint.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerEndpoint.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import java.util.UUID;
+
+import org.jboss.messaging.core.Message;
+
+/**
+ *
+ * A ServerProducerEndpoint
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class ServerProducerEndpoint implements ServerProducer
+{
+ private final String id;
+
+ private final ServerSession session;
+
+ private final String address;
+
+ // Constructors ----------------------------------------------------------------
+
+ public ServerProducerEndpoint(final ServerSession session, final String address)
+ {
+ id = UUID.randomUUID().toString();
+
+ this.session = session;
+
+ this.address = address;
+ }
+
+ // ServerProducer implementation --------------------------------------------
+
+ public String getID()
+ {
+ return id;
+ }
+
+ public void close() throws Exception
+ {
+ session.removeProducer(id);
+ }
+
+ public void send(final String address, final Message message) throws Exception
+ {
+ if (address != null)
+ {
+ //Anonymous producer - no flow control
+ session.send(address, message);
+ }
+ else
+ {
+ session.send(this.address, message);
+ }
+ }
+
+ public int getNumCredits()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public void sendCredits(int credits)
+ {
+ // TODO Auto-generated method stub
+
+ }
+}
Added: trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerPacketHandler.java (rev 0)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerProducerPacketHandler.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CLOSE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.CONS_FLOWTOKEN;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PROD_SEND;
+
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.ConsumerFlowTokenMessage;
+import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.ProducerSendMessage;
+import org.jboss.messaging.util.MessagingException;
+
+
+public class ServerProducerPacketHandler extends ServerPacketHandlerSupport
+{
+ private final ServerProducer producer;
+
+ public ServerProducerPacketHandler(final ServerProducer producer)
+ {
+ this.producer = producer;
+ }
+
+ public String getID()
+ {
+ return producer.getID();
+ }
+
+ public Packet doHandle(final Packet packet, final PacketSender sender) throws Exception
+ {
+ Packet response = null;
+
+ PacketType type = packet.getType();
+
+ if (type == PROD_SEND)
+ {
+ ProducerSendMessage message = (ProducerSendMessage) packet;
+
+ producer.send(message.getAddress(), message.getMessage());
+ }
+ else if (type == CLOSE)
+ {
+ producer.close();
+ }
+ else
+ {
+ throw new MessagingException(MessagingException.UNSUPPORTED_PACKET,
+ "Unsupported packet " + type);
+ }
+
+ // reply if necessary
+ if (response == null && packet.isOneWay() == false)
+ {
+ response = new NullPacket();
+ }
+
+ return response;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ServerConsumerEndpointPacketHandler[id=" + producer.getID() + "]";
+ }
+}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSession.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAResponseMessage;
@@ -51,6 +52,8 @@
void removeConsumer(String consumerID) throws Exception;
+ void removeProducer(String producerID) throws Exception;
+
void close() throws Exception;
void setStarted(boolean started) throws Exception;
@@ -59,7 +62,7 @@
void promptDelivery(Queue queue);
- boolean send(String address, Message msg) throws Exception;
+ void send(String address, Message msg) throws Exception;
void acknowledge(long deliveryID, boolean allUpTo) throws Exception;
@@ -103,6 +106,8 @@
SessionCreateConsumerResponseMessage createConsumer(String queueName, String filterString,
boolean noLocal, boolean autoDeleteQueue, int prefetchSize) throws Exception;
+
+ SessionCreateProducerResponseMessage createProducer(String address) throws Exception;
SessionQueueQueryResponseMessage executeQueueQuery(SessionQueueQueryMessage request) throws Exception;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -57,6 +57,7 @@
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAResponseMessage;
@@ -115,6 +116,8 @@
private final Map<String, ServerConsumer> consumers = new ConcurrentHashMap<String, ServerConsumer>();
private final Map<String, ServerBrowserEndpoint> browsers = new ConcurrentHashMap<String, ServerBrowserEndpoint>();
+
+ private final Map<String, ServerProducer> producers = new ConcurrentHashMap<String, ServerProducer>();
private final LinkedList<Delivery> deliveries = new LinkedList<Delivery>();
@@ -176,26 +179,36 @@
return connection;
}
- public void removeBrowser(final String browserId) throws Exception
+ public void removeBrowser(final String browserID) throws Exception
{
- if (browsers.remove(browserId) == null)
+ if (browsers.remove(browserID) == null)
{
- throw new IllegalStateException("Cannot find browser with id " + browserId + " to remove");
+ throw new IllegalStateException("Cannot find browser with id " + browserID + " to remove");
}
- dispatcher.unregister(browserId);
+ dispatcher.unregister(browserID);
}
- public void removeConsumer(final String consumerId) throws Exception
+ public void removeConsumer(final String consumerID) throws Exception
{
- if (consumers.remove(consumerId) == null)
+ if (consumers.remove(consumerID) == null)
{
- throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
+ throw new IllegalStateException("Cannot find consumer with id " + consumerID + " to remove");
}
- dispatcher.unregister(consumerId);
+ dispatcher.unregister(consumerID);
}
+ public void removeProducer(final String producerID) throws Exception
+ {
+ if (producers.remove(producerID) == null)
+ {
+ throw new IllegalStateException("Cannot find producer with id " + producerID + " to remove");
+ }
+
+ dispatcher.unregister(producerID);
+ }
+
public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
{
Delivery delivery = new DeliveryImpl(ref, consumer.getID(), deliveryIDSequence++, sender);
@@ -233,10 +246,17 @@
browser.close();
}
- consumers.clear();
-
browsers.clear();
+
+ Map<String, ServerProducer> producersClone = new HashMap<String, ServerProducer>(producers);
+ for (ServerProducer producer: producersClone.values())
+ {
+ producer.close();
+ }
+
+ producers.clear();
+
rollback();
executor.shutdown();
@@ -258,7 +278,7 @@
});
}
- public boolean send(final String address, final Message msg) throws Exception
+ public void send(final String address, final Message msg) throws Exception
{
//check the address exists, if it doesnt add if the user has the correct privileges
if (!postOffice.containsAllowableAddress(address))
@@ -287,14 +307,8 @@
postOffice.route(address, msg);
- if (msg.getReferences().isEmpty())
+ if (!msg.getReferences().isEmpty())
{
- // Didn't route anywhere
-
- return false;
- }
- else
- {
if (autoCommitSends)
{
if (msg.getNumDurableReferences() != 0)
@@ -308,8 +322,6 @@
{
tx.addMessage(msg);
}
-
- return true;
}
}
@@ -885,10 +897,7 @@
SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(consumer.getID(),
prefetchSize);
- synchronized (consumers)
- {
- consumers.put(consumer.getID(), consumer);
- }
+ consumers.put(consumer.getID(), consumer);
log.trace(this + " created and registered " + consumer);
@@ -953,7 +962,7 @@
public SessionCreateBrowserResponseMessage createBrowser(final String queueName, final String selector)
throws Exception
{
- if(!postOffice.containsAllowableAddress(queueName))
+ if (!postOffice.containsAllowableAddress(queueName))
{
try
{
@@ -966,6 +975,7 @@
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
}
+
Binding binding = postOffice.getBinding(queueName);
if (binding == null)
@@ -976,20 +986,25 @@
ServerBrowserEndpoint browser = new ServerBrowserEndpoint(this, binding.getQueue(), selector);
- // still need to synchronized since close() can come in on a different
- // thread
- synchronized (browsers)
- {
- browsers.put(browser.getID(), browser);
- }
-
+ browsers.put(browser.getID(), browser);
+
dispatcher.register(browser.newHandler());
- log.trace(this + " created and registered " + browser);
-
return new SessionCreateBrowserResponseMessage(browser.getID());
}
+ public SessionCreateProducerResponseMessage createProducer(String address) throws Exception
+ {
+ ServerProducerEndpoint producer = new ServerProducerEndpoint(this, address);
+
+ producers.put(producer.getID(), producer);
+
+ dispatcher.register(new ServerProducerPacketHandler(producer));
+
+ return new SessionCreateProducerResponseMessage(producer.getID());
+ }
+
+
// Public ---------------------------------------------------------------------------------------------
public String toString()
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionPacketHandler.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -28,11 +28,11 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
@@ -60,11 +60,11 @@
import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
@@ -79,7 +79,6 @@
import org.jboss.messaging.core.remoting.wireformat.SessionXAStartMessage;
import org.jboss.messaging.util.MessagingException;
-
/**
*
* A ServerSessionPacketHandler
@@ -113,14 +112,8 @@
PacketType type = packet.getType();
// TODO use a switch for this
- if (type == SESS_SEND)
+ if (type == SESS_CREATECONSUMER)
{
- SessionSendMessage message = (SessionSendMessage) packet;
-
- session.send(message.getAddress(), message.getMessage());
- }
- else if (type == SESS_CREATECONSUMER)
- {
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
response = session.createConsumer(request.getQueueName(), request
@@ -159,6 +152,12 @@
response = session.createBrowser(request.getQueueName(), request
.getFilterString());
}
+ else if (type == SESS_CREATEPRODUCER)
+ {
+ SessionCreateProducerMessage request = (SessionCreateProducerMessage) packet;
+
+ response = session.createProducer(request.getAddress());
+ }
else if (type == CLOSE)
{
session.close();
Added: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerMessageCodec.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerMessageCodec.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,73 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER;
+
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
+
+/**
+ *
+ * A SessionCreateProducerMessageCodec
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SessionCreateProducerMessageCodec extends
+ AbstractPacketCodec<SessionCreateProducerMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionCreateProducerMessageCodec()
+ {
+ super(SESS_CREATEPRODUCER);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(SessionCreateProducerMessage request, RemotingBuffer out) throws Exception
+ {
+ String address = request.getAddress();
+
+ int bodyLength = sizeof(address);
+
+ out.putInt(bodyLength);
+ out.putNullableString(address);
+ }
+
+ @Override
+ protected SessionCreateProducerMessage decodeBody(RemotingBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ String address = in.getNullableString();
+
+ return new SessionCreateProducerMessage(address);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerResponseMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerResponseMessageCodec.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionCreateProducerResponseMessageCodec.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER_RESP;
+
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
+
+/**
+ *
+ * A SessionCreateProducerResponseMessageCodec
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class SessionCreateProducerResponseMessageCodec extends
+ AbstractPacketCodec<SessionCreateProducerResponseMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionCreateProducerResponseMessageCodec()
+ {
+ super(SESS_CREATEPRODUCER_RESP);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(SessionCreateProducerResponseMessage response,
+ RemotingBuffer out) throws Exception
+ {
+ String producerID = response.getProducerID();
+
+ int bodyLength = sizeof(producerID);
+
+ out.putInt(bodyLength);
+ out.putNullableString(producerID);
+ }
+
+ @Override
+ protected SessionCreateProducerResponseMessage decodeBody(RemotingBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ String producerID = in.getNullableString();
+
+ return new SessionCreateProducerResponseMessage(producerID);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/SessionSendMessageCodec.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -1,79 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.codec;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
-
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.impl.MessageImpl;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
-import org.jboss.messaging.util.StreamUtils;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- */
-public class SessionSendMessageCodec extends AbstractPacketCodec<SessionSendMessage>
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionSendMessageCodec()
- {
- super(SESS_SEND);
- }
-
- // Public --------------------------------------------------------
-
- // AbstractPacketCodec overrides ---------------------------------
-
- @Override
- protected void encodeBody(SessionSendMessage message, RemotingBuffer out) throws Exception
- {
- String address = message.getAddress();
- byte[] encodedMsg = StreamUtils.toBytes(message.getMessage());
-
- int bodyLength = sizeof(address) + INT_LENGTH + encodedMsg.length;
-
- out.putInt(bodyLength);
- out.putNullableString(address);
- out.putInt(encodedMsg.length);
- out.put(encodedMsg);
- }
-
- @Override
- protected SessionSendMessage decodeBody(RemotingBuffer in)
- throws Exception
- {
- int bodyLength = in.getInt();
- if (in.remaining() < bodyLength)
- {
- return null;
- }
-
- String address = in.getNullableString();
- int msgLength = in.getInt();
- byte[] encodedMsg = new byte[msgLength];
- in.get(encodedMsg);
- Message message = new MessageImpl();
- StreamUtils.fromBytes(message, encodedMsg);
-
- return new SessionSendMessage(address, message);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private ----------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,6 +20,7 @@
import org.jboss.messaging.core.remoting.codec.MessagingExceptionMessageCodec;
import org.jboss.messaging.core.remoting.codec.PingCodec;
import org.jboss.messaging.core.remoting.codec.PongCodec;
+import org.jboss.messaging.core.remoting.codec.ProducerSendMessageCodec;
import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
import org.jboss.messaging.core.remoting.codec.SessionAcknowledgeMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionAddAddressMessageCodec;
@@ -34,12 +35,13 @@
import org.jboss.messaging.core.remoting.codec.SessionCreateBrowserResponseMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerResponseMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCreateProducerMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCreateProducerResponseMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateQueueMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionDeleteQueueMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionQueueQueryMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionQueueQueryResponseMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionRemoveAddressMessageCodec;
-import org.jboss.messaging.core.remoting.codec.SessionSendMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionXACommitMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionXAEndMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionXAForgetMessageCodec;
@@ -70,6 +72,7 @@
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.Ping;
import org.jboss.messaging.core.remoting.wireformat.Pong;
+import org.jboss.messaging.core.remoting.wireformat.ProducerSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
@@ -87,6 +90,8 @@
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
@@ -94,7 +99,6 @@
import org.jboss.messaging.core.remoting.wireformat.SessionRecoverMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
@@ -153,12 +157,14 @@
addCodec(ConnectionCreateSessionResponseMessage.class, ConnectionCreateSessionResponseMessageCodec.class);
- addCodec(SessionSendMessage.class, SessionSendMessageCodec.class);
-
addCodec(SessionCreateConsumerMessage.class, SessionCreateConsumerMessageCodec.class);
addCodec(SessionCreateConsumerResponseMessage.class, SessionCreateConsumerResponseMessageCodec.class);
+
+ addCodec(SessionCreateProducerMessage.class, SessionCreateProducerMessageCodec.class);
+ addCodec(SessionCreateProducerResponseMessage.class, SessionCreateProducerResponseMessageCodec.class);
+
addCodec(SessionCreateBrowserMessage.class, SessionCreateBrowserMessageCodec.class);
addCodec(SessionCreateBrowserResponseMessage.class, SessionCreateBrowserResponseMessageCodec.class);
@@ -256,6 +262,9 @@
addCodec(SessionBindingQueryResponseMessage.class, SessionBindingQueryResponseMessageCodec.class);
addCodec(SessionDeleteQueueMessage.class, SessionDeleteQueueMessageCodec.class);
+
+ addCodec(ProducerSendMessage.class, ProducerSendMessageCodec.class);
+
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -30,7 +30,7 @@
// Constructors --------------------------------------------------
- public ConnectionCreateSessionMessage(boolean xa, boolean autoCommitSends, boolean autoCommitAcks)
+ public ConnectionCreateSessionMessage(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks)
{
super(CONN_CREATESESSION);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConnectionCreateSessionResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -26,7 +26,7 @@
// Constructors --------------------------------------------------
- public ConnectionCreateSessionResponseMessage(String sessionID)
+ public ConnectionCreateSessionResponseMessage(final String sessionID)
{
super(PacketType.CONN_CREATESESSION_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ConsumerFlowTokenMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -26,7 +26,7 @@
// Constructors --------------------------------------------------
- public ConsumerFlowTokenMessage(int tokens)
+ public ConsumerFlowTokenMessage(final int tokens)
{
super(CONS_FLOWTOKEN);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,15 +27,15 @@
private final String clientVMID;
private final String username;
private final String password;
- private int prefetchSize;
+ private final int prefetchSize;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public CreateConnectionRequest(byte version,
- String remotingSessionID, String clientVMID, String username, String password,
- int prefetchSize)
+ public CreateConnectionRequest(final byte version,
+ final String remotingSessionID, final String clientVMID, final String username, final String password,
+ final int prefetchSize)
{
super(CREATECONNECTION);
@@ -95,16 +95,9 @@
return prefetchSize;
}
- public void setPrefetchSize(int prefetchSize)
- {
- this.prefetchSize = prefetchSize;
- }
-
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
-
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionResponse.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionResponse.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
// Constructors --------------------------------------------------
- public CreateConnectionResponse(String connectionID)
+ public CreateConnectionResponse(final String connectionID)
{
super(CREATECONNECTION_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -31,7 +31,7 @@
// Constructors --------------------------------------------------
- public DeliverMessage(Message message, long deliveryID)
+ public DeliverMessage(final Message message, final long deliveryID)
{
super(SESS_DELIVER);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/MessagingExceptionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/MessagingExceptionMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/MessagingExceptionMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -28,7 +28,7 @@
// Constructors --------------------------------------------------
- public MessagingExceptionMessage(MessagingException exception)
+ public MessagingExceptionMessage(final MessagingException exception)
{
super(EXCEPTION);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -36,51 +36,55 @@
// Session
SESS_CREATECONSUMER ((byte)41),
- SESS_CREATECONSUMER_RESP ((byte)42),
- SESS_CREATEBROWSER ((byte)43),
- SESS_CREATEBROWSER_RESP ((byte)44),
- SESS_SEND ((byte)45),
- SESS_DELIVER ((byte)46),
- SESS_ACKNOWLEDGE ((byte)47),
- SESS_RECOVER ((byte)48),
- SESS_COMMIT ((byte)49),
- SESS_ROLLBACK ((byte)50),
- SESS_CANCEL ((byte)51),
- SESS_QUEUEQUERY ((byte)52),
- SESS_QUEUEQUERY_RESP ((byte)53),
- SESS_CREATEQUEUE ((byte)54),
- SESS_DELETE_QUEUE ((byte)55),
- SESS_ADD_ADDRESS ((byte)56),
- SESS_REMOVE_ADDRESS ((byte)57),
- SESS_BINDINGQUERY ((byte)58),
- SESS_BINDINGQUERY_RESP ((byte)59),
- SESS_BROWSER_RESET ((byte)60),
- SESS_BROWSER_HASNEXTMESSAGE ((byte)61),
- SESS_BROWSER_HASNEXTMESSAGE_RESP ((byte)62),
- SESS_BROWSER_NEXTMESSAGEBLOCK ((byte)63),
- SESS_BROWSER_NEXTMESSAGEBLOCK_RESP ((byte)64),
- SESS_BROWSER_NEXTMESSAGE ((byte)65),
- SESS_BROWSER_NEXTMESSAGE_RESP ((byte)66),
- SESS_XA_START ((byte)67),
- SESS_XA_END ((byte)68),
- SESS_XA_COMMIT ((byte)69),
- SESS_XA_PREPARE ((byte)70),
- SESS_XA_RESP ((byte)71),
- SESS_XA_ROLLBACK ((byte)72),
- SESS_XA_JOIN ((byte)73),
- SESS_XA_SUSPEND ((byte)74),
- SESS_XA_RESUME ((byte)75),
- SESS_XA_FORGET ((byte)76),
- SESS_XA_INDOUBT_XIDS ((byte)77),
- SESS_XA_INDOUBT_XIDS_RESP ((byte)78),
- SESS_XA_SET_TIMEOUT ((byte)79),
- SESS_XA_SET_TIMEOUT_RESP ((byte)80),
- SESS_XA_GET_TIMEOUT ((byte)81),
- SESS_XA_GET_TIMEOUT_RESP ((byte)82),
+ SESS_CREATECONSUMER_RESP ((byte)42),
+ SESS_CREATEPRODUCER ((byte)43),
+ SESS_CREATEPRODUCER_RESP ((byte)44),
+ SESS_CREATEBROWSER ((byte)45),
+ SESS_CREATEBROWSER_RESP ((byte)46),
+ SESS_DELIVER ((byte)47),
+ SESS_ACKNOWLEDGE ((byte)48),
+ SESS_RECOVER ((byte)49),
+ SESS_COMMIT ((byte)50),
+ SESS_ROLLBACK ((byte)51),
+ SESS_CANCEL ((byte)52),
+ SESS_QUEUEQUERY ((byte)53),
+ SESS_QUEUEQUERY_RESP ((byte)54),
+ SESS_CREATEQUEUE ((byte)55),
+ SESS_DELETE_QUEUE ((byte)56),
+ SESS_ADD_ADDRESS ((byte)57),
+ SESS_REMOVE_ADDRESS ((byte)58),
+ SESS_BINDINGQUERY ((byte)59),
+ SESS_BINDINGQUERY_RESP ((byte)60),
+ SESS_BROWSER_RESET ((byte)61),
+ SESS_BROWSER_HASNEXTMESSAGE ((byte)62),
+ SESS_BROWSER_HASNEXTMESSAGE_RESP ((byte)63),
+ SESS_BROWSER_NEXTMESSAGEBLOCK ((byte)64),
+ SESS_BROWSER_NEXTMESSAGEBLOCK_RESP ((byte)65),
+ SESS_BROWSER_NEXTMESSAGE ((byte)66),
+ SESS_BROWSER_NEXTMESSAGE_RESP ((byte)67),
+ SESS_XA_START ((byte)68),
+ SESS_XA_END ((byte)69),
+ SESS_XA_COMMIT ((byte)70),
+ SESS_XA_PREPARE ((byte)71),
+ SESS_XA_RESP ((byte)72),
+ SESS_XA_ROLLBACK ((byte)73),
+ SESS_XA_JOIN ((byte)74),
+ SESS_XA_SUSPEND ((byte)75),
+ SESS_XA_RESUME ((byte)76),
+ SESS_XA_FORGET ((byte)77),
+ SESS_XA_INDOUBT_XIDS ((byte)78),
+ SESS_XA_INDOUBT_XIDS_RESP ((byte)79),
+ SESS_XA_SET_TIMEOUT ((byte)80),
+ SESS_XA_SET_TIMEOUT_RESP ((byte)81),
+ SESS_XA_GET_TIMEOUT ((byte)82),
+ SESS_XA_GET_TIMEOUT_RESP ((byte)83),
// Consumer
- CONS_FLOWTOKEN ((byte)90);
+ CONS_FLOWTOKEN ((byte)90),
+ //Producer
+ PROD_SEND ((byte)91);
+
private final byte type;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Ping.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
// Constructors --------------------------------------------------
- public Ping(String sessionID)
+ public Ping(final String sessionID)
{
super(PING);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/Pong.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -29,7 +29,7 @@
// Constructors --------------------------------------------------
- public Pong(String sessionID, boolean sessionFailed)
+ public Pong(final String sessionID, final boolean sessionFailed)
{
super(PONG);
Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ProducerSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ProducerSendMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/ProducerSendMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PROD_SEND;
+
+import org.jboss.messaging.core.Message;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class ProducerSendMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private String address;
+
+ private final Message message;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ProducerSendMessage(final String address, final Message message)
+ {
+ super(PROD_SEND);
+
+ this.address = address;
+
+ this.message = message;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public Message getMessage()
+ {
+ return message;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", address=" + address + ", message=" + message
+ + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAcknowledgeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAcknowledgeMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAcknowledgeMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -18,15 +18,15 @@
// Attributes ----------------------------------------------------
- private long deliveryID;
+ private final long deliveryID;
- private boolean allUpTo;
+ private final boolean allUpTo;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionAcknowledgeMessage(long deliveryID, boolean allUpTo)
+ public SessionAcknowledgeMessage(final long deliveryID, final boolean allUpTo)
{
super(PacketType.SESS_ACKNOWLEDGE);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAddAddressMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAddAddressMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionAddAddressMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -22,13 +22,13 @@
// Attributes ----------------------------------------------------
- private String address;
+ private final String address;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionAddAddressMessage(String address)
+ public SessionAddAddressMessage(final String address)
{
super(SESS_ADD_ADDRESS);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -16,9 +16,9 @@
*/
public class SessionBindingQueryMessage extends AbstractPacket
{
- private String address;
+ private final String address;
- public SessionBindingQueryMessage(String address)
+ public SessionBindingQueryMessage(final String address)
{
super(PacketType.SESS_BINDINGQUERY);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBindingQueryResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -13,11 +13,11 @@
*/
public class SessionBindingQueryResponseMessage extends AbstractPacket
{
- private boolean exists;
+ private final boolean exists;
- private List<String> queueNames;
+ private final List<String> queueNames;
- public SessionBindingQueryResponseMessage(boolean exists, List<String> queueNames)
+ public SessionBindingQueryResponseMessage(final boolean exists, final List<String> queueNames)
{
super(SESS_BINDINGQUERY_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
public SessionBrowserHasNextMessageMessage()
{
- super(SESS_BROWSER_HASNEXTMESSAGE);
+ super(SESS_BROWSER_HASNEXTMESSAGE);
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserHasNextMessageResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
// Constructors --------------------------------------------------
- public SessionBrowserHasNextMessageResponseMessage(boolean hasNext)
+ public SessionBrowserHasNextMessageResponseMessage(final boolean hasNext)
{
super(SESS_BROWSER_HASNEXTMESSAGE_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
// Constructors --------------------------------------------------
- public SessionBrowserNextMessageBlockMessage(long maxMessages)
+ public SessionBrowserNextMessageBlockMessage(final long maxMessages)
{
super(SESS_BROWSER_NEXTMESSAGEBLOCK);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageBlockResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -30,7 +30,7 @@
// Constructors --------------------------------------------------
- public SessionBrowserNextMessageBlockResponseMessage(Message[] messages)
+ public SessionBrowserNextMessageBlockResponseMessage(final Message[] messages)
{
super(SESS_BROWSER_NEXTMESSAGEBLOCK_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserNextMessageResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -26,7 +26,7 @@
// Constructors --------------------------------------------------
- public SessionBrowserNextMessageResponseMessage(Message message)
+ public SessionBrowserNextMessageResponseMessage(final Message message)
{
super(PacketType.SESS_BROWSER_NEXTMESSAGE_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserResetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserResetMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionBrowserResetMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -27,7 +27,7 @@
public SessionBrowserResetMessage()
{
- super(SESS_BROWSER_RESET);
+ super(SESS_BROWSER_RESET);
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCancelMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,15 +20,15 @@
// Attributes ----------------------------------------------------
- private long deliveryID;
+ private final long deliveryID;
- private boolean expired;
+ private final boolean expired;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCancelMessage(long deliveryID, boolean expired)
+ public SessionCancelMessage(final long deliveryID, final boolean expired)
{
super(SESS_CANCEL);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -22,13 +22,14 @@
// Attributes ----------------------------------------------------
private final String queueName;
+
private final String filterString;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateBrowserMessage(String queueName, String filterString)
+ public SessionCreateBrowserMessage(final String queueName, final String filterString)
{
super(SESS_CREATEBROWSER);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateBrowserResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -28,7 +28,7 @@
// Constructors --------------------------------------------------
- public SessionCreateBrowserResponseMessage(String browserID)
+ public SessionCreateBrowserResponseMessage(final String browserID)
{
super(SESS_CREATEBROWSER_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -18,16 +18,20 @@
// Attributes ----------------------------------------------------
- private String queueName;
- private String filterString;
- private boolean noLocal;
- private boolean autoDeleteQueue;
+ private final String queueName;
+
+ private final String filterString;
+
+ private final boolean noLocal;
+
+ private final boolean autoDeleteQueue;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateConsumerMessage(String queueName, String filterString, boolean noLocal, boolean autoDeleteQueue)
+ public SessionCreateConsumerMessage(final String queueName, final String filterString,
+ final boolean noLocal, final boolean autoDeleteQueue)
{
super(PacketType.SESS_CREATECONSUMER);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateConsumerResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -23,13 +23,14 @@
// Attributes ----------------------------------------------------
private final String consumerID;
+
private final int prefetchSize;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateConsumerResponseMessage(String consumerID, int prefetchSize)
+ public SessionCreateConsumerResponseMessage(final String consumerID, final int prefetchSize)
{
super(SESS_CREATECONSUMER_RESP);
Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionCreateProducerMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String address;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionCreateProducerMessage(final String address)
+ {
+ super(PacketType.SESS_CREATEPRODUCER);
+
+ this.address = address;
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buff = new StringBuffer(getParentString());
+ buff.append(", address=" + address);
+ buff.append("]");
+ return buff.toString();
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerResponseMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateProducerResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER_RESP;
+
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionCreateProducerResponseMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String producerID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionCreateProducerResponseMessage(final String producerID)
+ {
+ super(SESS_CREATEPRODUCER_RESP);
+
+ this.producerID = producerID;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getProducerID()
+ {
+ return producerID;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getParentString());
+ buf.append(", producerID=" + producerID);
+ buf.append("]");
+ return buf.toString();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateQueueMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionCreateQueueMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,17 +20,18 @@
// Attributes ----------------------------------------------------
- private String address;
- private String queueName;
- private String filterString;
- private boolean durable;
- private boolean temporary;
+ private final String address;
+ private final String queueName;
+ private final String filterString;
+ private final boolean durable;
+ private final boolean temporary;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionCreateQueueMessage(String address, String queueName, String filterString, boolean durable, boolean temporary)
+ public SessionCreateQueueMessage(final String address, final String queueName,
+ final String filterString, final boolean durable, final boolean temporary)
{
super(SESS_CREATEQUEUE);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionDeleteQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionDeleteQueueMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionDeleteQueueMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
// Attributes ----------------------------------------------------
- private String queueName;
+ private final String queueName;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionDeleteQueueMessage(String queueName)
+ public SessionDeleteQueueMessage(final String queueName)
{
super(SESS_DELETE_QUEUE);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -16,9 +16,9 @@
*/
public class SessionQueueQueryMessage extends AbstractPacket
{
- private String queueName;
+ private final String queueName;
- public SessionQueueQueryMessage(String queueName)
+ public SessionQueueQueryMessage(final String queueName)
{
super(PacketType.SESS_QUEUEQUERY);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionQueueQueryResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -11,31 +11,39 @@
*/
public class SessionQueueQueryResponseMessage extends AbstractPacket
{
- private boolean exists;
+ private final boolean exists;
- private boolean durable;
+ private final boolean durable;
- private boolean temporary;
+ private final boolean temporary;
- private int maxSize;
+ private final int maxSize;
- private int consumerCount;
+ private final int consumerCount;
- private int messageCount;
+ private final int messageCount;
- private String filterString;
+ private final String filterString;
- private String address;
+ private final String address;
- //etc
+ public SessionQueueQueryResponseMessage(final boolean durable, final boolean temporary, final int maxSize,
+ final int consumerCount, final int messageCount, final String filterString, final String address)
+ {
+ this(durable, temporary, maxSize, consumerCount, messageCount, filterString, address, true);
+ }
- public SessionQueueQueryResponseMessage(boolean durable, boolean temporary, int maxSize, int consumerCount,
- int messageCount, String filterString, String address)
+ public SessionQueueQueryResponseMessage()
{
+ this(false, false, 0, 0, 0, null, null, false);
+ }
+
+ private SessionQueueQueryResponseMessage(final boolean durable, final boolean temporary, final int maxSize,
+ final int consumerCount, final int messageCount, final String filterString, final String address,
+ final boolean exists)
+ {
super(SESS_QUEUEQUERY_RESP);
-
- this.exists = true;
-
+
this.durable = durable;
this.temporary = temporary;
@@ -49,15 +57,10 @@
this.filterString = filterString;
this.address = address;
+
+ this.exists = exists;
}
-
- public SessionQueueQueryResponseMessage()
- {
- super(SESS_QUEUEQUERY_RESP);
- this.exists = false;
- }
-
public boolean isExists()
{
return exists;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRemoveAddressMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRemoveAddressMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionRemoveAddressMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -22,13 +22,13 @@
// Attributes ----------------------------------------------------
- private String address;
+ private final String address;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionRemoveAddressMessage(String address)
+ public SessionRemoveAddressMessage(final String address)
{
super(SESS_REMOVE_ADDRESS);
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionSendMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -1,70 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.wireformat;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
-
-import org.jboss.messaging.core.Message;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionSendMessage extends AbstractPacket
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private String address;
-
- private final Message message;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionSendMessage(String address, Message message)
- {
- super(SESS_SEND);
-
- assert message != null;
-
- this.address = address;
-
- this.message = message;
- }
-
- // Public --------------------------------------------------------
-
- public Message getMessage()
- {
- return message;
- }
-
- public String getAddress()
- {
- return address;
- }
-
- @Override
- public String toString()
- {
- return getParentString() + ", message=" + message + ", address=" + address
- + "]";
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXACommitMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXACommitMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXACommitMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,15 +20,15 @@
// Attributes ----------------------------------------------------
- private boolean onePhase;
+ private final boolean onePhase;
- private Xid xid;
+ private final Xid xid;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXACommitMessage(Xid xid, boolean onePhase)
+ public SessionXACommitMessage(final Xid xid, final boolean onePhase)
{
super(PacketType.SESS_XA_COMMIT);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAEndMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAEndMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAEndMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,15 +20,15 @@
// Attributes ----------------------------------------------------
- private Xid xid;
+ private final Xid xid;
- private boolean failed;
+ private final boolean failed;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAEndMessage(Xid xid, boolean failed)
+ public SessionXAEndMessage(final Xid xid, final boolean failed)
{
super(PacketType.SESS_XA_END);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAForgetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAForgetMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAForgetMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
// Attributes ----------------------------------------------------
- private Xid xid;
+ private final Xid xid;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAForgetMessage(Xid xid)
+ public SessionXAForgetMessage(final Xid xid)
{
super(PacketType.SESS_XA_FORGET);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetInDoubtXidsResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetInDoubtXidsResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetInDoubtXidsResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -22,13 +22,13 @@
// Attributes ----------------------------------------------------
- private List<Xid> xids;
+ private final List<Xid> xids;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAGetInDoubtXidsResponseMessage(List<Xid> xids)
+ public SessionXAGetInDoubtXidsResponseMessage(final List<Xid> xids)
{
super(PacketType.SESS_XA_INDOUBT_XIDS_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetTimeoutResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetTimeoutResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAGetTimeoutResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -19,13 +19,13 @@
// Attributes ----------------------------------------------------
- private int timeoutSeconds;
+ private final int timeoutSeconds;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAGetTimeoutResponseMessage(int timeoutSeconds)
+ public SessionXAGetTimeoutResponseMessage(final int timeoutSeconds)
{
super(PacketType.SESS_XA_GET_TIMEOUT_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAJoinMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAJoinMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAJoinMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
// Attributes ----------------------------------------------------
- private Xid xid;
+ private final Xid xid;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAJoinMessage(Xid xid)
+ public SessionXAJoinMessage(final Xid xid)
{
super(PacketType.SESS_XA_JOIN);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAPrepareMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAPrepareMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAPrepareMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
// Attributes ----------------------------------------------------
- private Xid xid;
+ private final Xid xid;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAPrepareMessage(Xid xid)
+ public SessionXAPrepareMessage(final Xid xid)
{
super(PacketType.SESS_XA_PREPARE);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -18,17 +18,17 @@
// Attributes ----------------------------------------------------
- private boolean error;
+ private final boolean error;
- private int responseCode;
+ private final int responseCode;
- private String message;
+ private final String message;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAResponseMessage(boolean isError, int responseCode, String message)
+ public SessionXAResponseMessage(final boolean isError, final int responseCode, final String message)
{
super(PacketType.SESS_XA_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResumeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResumeMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAResumeMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
// Attributes ----------------------------------------------------
- private Xid xid;
+ private final Xid xid;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAResumeMessage(Xid xid)
+ public SessionXAResumeMessage(final Xid xid)
{
super(PacketType.SESS_XA_RESUME);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXARollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXARollbackMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXARollbackMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
// Attributes ----------------------------------------------------
- private Xid xid;
+ private final Xid xid;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXARollbackMessage(Xid xid)
+ public SessionXARollbackMessage(final Xid xid)
{
super(PacketType.SESS_XA_ROLLBACK);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -19,13 +19,13 @@
// Attributes ----------------------------------------------------
- private int timeoutSeconds;
+ private final int timeoutSeconds;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXASetTimeoutMessage(int timeoutSeconds)
+ public SessionXASetTimeoutMessage(final int timeoutSeconds)
{
super(PacketType.SESS_XA_SET_TIMEOUT);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutResponseMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXASetTimeoutResponseMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -18,13 +18,13 @@
// Attributes ----------------------------------------------------
- private boolean ok;
+ private final boolean ok;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXASetTimeoutResponseMessage(boolean ok)
+ public SessionXASetTimeoutResponseMessage(final boolean ok)
{
super(PacketType.SESS_XA_SET_TIMEOUT_RESP);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAStartMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAStartMessage.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/SessionXAStartMessage.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -20,13 +20,13 @@
// Attributes ----------------------------------------------------
- private Xid xid;
+ private final Xid xid;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionXAStartMessage(Xid xid)
+ public SessionXAStartMessage(final Xid xid)
{
super(PacketType.SESS_XA_START);
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-02-25 03:43:39 UTC (rev 3782)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-02-25 12:15:14 UTC (rev 3783)
@@ -29,6 +29,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.PING;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.PONG;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.PROD_SEND;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ACKNOWLEDGE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ADD_ADDRESS;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_BINDINGQUERY;
@@ -46,6 +47,8 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEBROWSER_RESP;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATECONSUMER_RESP;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEPRODUCER_RESP;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_CREATEQUEUE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_DELIVER;
@@ -54,7 +57,6 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_RECOVER;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_REMOVE_ADDRESS;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_SEND;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_COMMIT;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_END;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.SESS_XA_FORGET;
@@ -100,6 +102,7 @@
import org.jboss.messaging.core.remoting.codec.DeliverMessageCodec;
import org.jboss.messaging.core.remoting.codec.PingCodec;
import org.jboss.messaging.core.remoting.codec.PongCodec;
+import org.jboss.messaging.core.remoting.codec.ProducerSendMessageCodec;
import org.jboss.messaging.core.remoting.codec.RemotingBuffer;
import org.jboss.messaging.core.remoting.codec.SessionAcknowledgeMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionAddAddressMessageCodec;
@@ -114,12 +117,13 @@
import org.jboss.messaging.core.remoting.codec.SessionCreateBrowserResponseMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateConsumerResponseMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCreateProducerMessageCodec;
+import org.jboss.messaging.core.remoting.codec.SessionCreateProducerResponseMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionCreateQueueMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionDeleteQueueMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionQueueQueryMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionQueueQueryResponseMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionRemoveAddressMessageCodec;
-import org.jboss.messaging.core.remoting.codec.SessionSendMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionXACommitMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionXAEndMessageCodec;
import org.jboss.messaging.core.remoting.codec.SessionXAForgetMessageCodec;
@@ -151,6 +155,7 @@
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.Ping;
import org.jboss.messaging.core.remoting.wireformat.Pong;
+import org.jboss.messaging.core.remoting.wireformat.ProducerSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionAddAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionBindingQueryMessage;
@@ -168,6 +173,8 @@
import org.jboss.messaging.core.remoting.wireformat.SessionCreateBrowserResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.wireformat.SessionCreateProducerResponseMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCreateQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionQueueQueryMessage;
@@ -175,7 +182,6 @@
import org.jboss.messaging.core.remoting.wireformat.SessionRecoverMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRemoveAddressMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionRollbackMessage;
-import org.jboss.messaging.core.remoting.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAEndMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionXAForgetMessage;
@@ -543,9 +549,9 @@
public void testSendMessage() throws Exception
{
- SessionSendMessage packet = new SessionSendMessage(randomString(), new MessageImpl());
+ ProducerSendMessage packet = new ProducerSendMessage(randomString(), new MessageImpl());
- AbstractPacketCodec codec = new SessionSendMessageCodec();
+ AbstractPacketCodec codec = new ProducerSendMessageCodec();
SimpleRemotingBuffer buffer = encode(packet, codec);
checkHeader(buffer, packet);
checkBody(buffer, packet.getAddress(), StreamUtils.toBytes(packet.getMessage()));
@@ -553,9 +559,9 @@
AbstractPacket p = codec.decode(buffer);
- assertTrue(p instanceof SessionSendMessage);
- SessionSendMessage decodedPacket = (SessionSendMessage) p;
- assertEquals(SESS_SEND, decodedPacket.getType());
+ assertTrue(p instanceof ProducerSendMessage);
+ ProducerSendMessage decodedPacket = (ProducerSendMessage) p;
+ assertEquals(PROD_SEND, decodedPacket.getType());
assertEquals(packet.getAddress(), decodedPacket.getAddress());
assertEquals(packet.getMessage().getMessageID(), decodedPacket
.getMessage().getMessageID());
@@ -604,7 +610,44 @@
assertEquals(SESS_CREATECONSUMER_RESP, decodedResponse.getType());
assertEquals(response.getPrefetchSize(), decodedResponse.getPrefetchSize());
}
+
+ public void testCreateProducerRequest() throws Exception
+ {
+ String destination = "queue.testCreateProducerRequest";
+ SessionCreateProducerMessage request = new SessionCreateProducerMessage(destination);
+ AbstractPacketCodec codec = new SessionCreateProducerMessageCodec();
+ SimpleRemotingBuffer buffer = encode(request, codec);
+ checkHeader(buffer, request);
+ checkBody(buffer, request.getAddress());
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof SessionCreateProducerMessage);
+ SessionCreateProducerMessage decodedRequest = (SessionCreateProducerMessage) decodedPacket;
+ assertEquals(SESS_CREATEPRODUCER, decodedRequest.getType());
+ assertEquals(request.getAddress(), decodedRequest.getAddress());
+ }
+
+ public void testCreateProducerResponse() throws Exception
+ {
+ SessionCreateProducerResponseMessage response = new SessionCreateProducerResponseMessage(randomString());
+
+ AbstractPacketCodec codec = new SessionCreateProducerResponseMessageCodec();
+ SimpleRemotingBuffer buffer = encode(response, codec);
+ checkHeader(buffer, response);
+ checkBody(buffer, response.getProducerID());
+ buffer.rewind();
+
+ AbstractPacket decodedPacket = codec.decode(buffer);
+
+ assertTrue(decodedPacket instanceof SessionCreateProducerResponseMessage);
+ SessionCreateProducerResponseMessage decodedResponse = (SessionCreateProducerResponseMessage) decodedPacket;
+ assertEquals(SESS_CREATEPRODUCER_RESP, decodedResponse.getType());
+ assertEquals(response.getProducerID(), decodedResponse.getProducerID());;
+ }
+
public void testStartConnectionMessage() throws Exception
{
ConnectionStartMessage packet = new ConnectionStartMessage();
More information about the jboss-cvs-commits
mailing list