[jboss-cvs] JBoss Messaging SVN: r5066 - in trunk: src/main/org/jboss/messaging/core/remoting and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 3 14:27:15 EDT 2008
Author: timfox
Date: 2008-10-03 14:27:14 -0400 (Fri, 03 Oct 2008)
New Revision: 5066
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/DuplicablePacket.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/ChannelHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/MessagingExceptionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerCloseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXACommitMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAEndMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAForgetMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetInDoubtXidsResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetTimeoutResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
Log:
More session replication and failover
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -51,7 +51,7 @@
{
this.clientSession = clientSesssion;
}
-
+
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
@@ -97,4 +97,8 @@
log.error("Failed to handle packet", e);
}
}
+
+ public void rehandlePacket(final Packet packet)
+ {
+ }
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/remoting/ChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/ChannelHandler.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/ChannelHandler.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -30,4 +30,6 @@
public interface ChannelHandler
{
void handlePacket(Packet packet);
+
+ //void rehandlePacket(Packet packet);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -50,4 +50,10 @@
boolean isReplicateBlocking();
boolean isWriteAlways();
+
+ boolean isReHandleResponseOnFailure();
+
+ boolean isDuplicate();
+
+ void setDuplicate(boolean duplicate);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -95,6 +95,7 @@
import org.jboss.messaging.core.remoting.ResponseNotifier;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.DuplicablePacket;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
@@ -778,7 +779,7 @@
}
case NULL_RESPONSE:
{
- packet = new NullResponseMessage(false);
+ packet = new NullResponseMessage();
break;
}
case SESS_MANAGEMENT_SEND:
@@ -837,6 +838,10 @@
private Thread blockThread;
private ResponseNotifier responseNotifier;
+
+ private volatile boolean justFailedOver;
+
+ private volatile Packet lastPacketReceived;
private ChannelImpl(final RemotingConnectionImpl connection,
final long id,
@@ -895,8 +900,7 @@
}
public int getLastReceivedCommandID()
- {
- //log.info("getting last received command id, last received packet is " + this.lastReceivedPacket);
+ {
return lastReceivedCommandID;
}
@@ -913,13 +917,15 @@
try
{
addToCache(packet);
+
+ lastResponseSent = packet;
+
+ connection.doWrite(packet);
}
finally
{
lock.unlock();
- }
-
- connection.doWrite(packet);
+ }
}
}
}
@@ -938,83 +944,83 @@
// This must never called by more than one thread concurrently
public Packet sendBlocking(final Packet packet, final ResponseNotifier notifier) throws MessagingException
{
- // For now we only allow one blocking request-response at a time per
- // channel
- // We can relax this but it will involve some kind of correlation id
- synchronized (waitLock)
+ packet.setChannelID(id);
+
+ lock.lock();
+ try
{
- try
+ addToCache(packet);
+
+ // For now we only allow one blocking request-response at a time per
+ // channel
+ // We can relax this but it will involve some kind of correlation id
+ synchronized (waitLock)
{
- blockThread = Thread.currentThread();
-
- responseNotifier = notifier;
-
- response = null;
-
- packet.setChannelID(id);
-
- lock.lock();
try
{
- addToCache(packet);
- }
- finally
- {
- lock.unlock();
- }
-
- connection.doWrite(packet);
-
- long toWait = connection.blockingCallTimeout;
-
- long start = System.currentTimeMillis();
-
- while (response == null && toWait > 0)
- {
- try
+ blockThread = Thread.currentThread();
+
+ responseNotifier = notifier;
+
+ response = null;
+
+ connection.doWrite(packet);
+
+ long toWait = connection.blockingCallTimeout;
+
+ long start = System.currentTimeMillis();
+
+ while (response == null && toWait > 0)
{
- waitLock.wait(toWait);
- }
- catch (final InterruptedException e)
- {
- if (interruptBlockOnFailure)
+ try
{
- if (connection.destroyed)
+ waitLock.wait(toWait);
+ }
+ catch (final InterruptedException e)
+ {
+ if (interruptBlockOnFailure)
{
- throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection failed");
+ if (connection.destroyed)
+ {
+ throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection failed");
+ }
}
}
+
+ final long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
}
-
- final long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
+
+ if (response == null)
+ {
+ throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Timed out waiting for response when sending packet " + packet.getType());
+ }
+
+ if (response.getType() == PacketImpl.EXCEPTION)
+ {
+ final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+
+ throw mem.getException();
+ }
+ else
+ {
+ return response;
+ }
}
-
- if (response == null)
+ finally
{
- throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
- "Timed out waiting for response when sending packet " + packet.getType());
+ blockThread = null;
}
-
- if (response.getType() == PacketImpl.EXCEPTION)
- {
- final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
-
- throw mem.getException();
- }
- else
- {
- return response;
- }
}
- finally
- {
- blockThread = null;
- }
}
+ finally
+ {
+ lock.unlock();
+ }
}
public void replicatePacket(final Packet packet) throws MessagingException
@@ -1111,9 +1117,11 @@
}
}
}
-
+
+ private volatile Packet lastResponseSent;
+
public void transferConnection(final RemotingConnection newConnection)
- {
+ {
// Needs to synchronize on the connection to make sure no packets from
// the old connection
// get processed after transfer has occurred
@@ -1132,6 +1140,21 @@
connection = rnewConnection;
replicatingChannel = null;
+
+ justFailedOver = true;
+
+ //Send back any blocking responses that may be required
+
+ if (lastPacketReceived != null && lastPacketReceived.isReHandleResponseOnFailure())
+ {
+ if (lastResponseSent != null && lastResponseSent instanceof DuplicablePacket)
+ {
+ lastResponseSent.setDuplicate(true);
+
+ send(lastResponseSent);
+ }
+ }
+
}
}
@@ -1157,6 +1180,8 @@
private void handlePacket(final Packet packet)
{
+ lastResponseSent = null;
+
if (packet.getType() == PACKETS_CONFIRMED)
{
if (resendCache != null)
@@ -1220,20 +1245,31 @@
}
}
+ lastPacketReceived = packet;
+
if (packet.isResponse())
- {
- synchronized (waitLock)
+ {
+ if (packet.isDuplicate() && !justFailedOver)
{
- response = packet;
-
- checkConfirmation(packet);
-
- if (responseNotifier != null)
+ //Ignore it - duplicate packets can only come just after failover
+ }
+ else
+ {
+ synchronized (waitLock)
{
- responseNotifier.onResponseReceived();
+ response = packet;
+
+ checkConfirmation(packet);
+
+ if (responseNotifier != null)
+ {
+ responseNotifier.onResponseReceived();
+ }
+
+ waitLock.notify();
}
-
- waitLock.notify();
+
+ justFailedOver = false;
}
}
else if (handler != null)
@@ -1393,7 +1429,5 @@
throw new IllegalArgumentException("Invalid packet: " + packet);
}
}
-
}
-
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -64,7 +64,7 @@
{
return true;
}
-
+
public int getServerVersion()
{
return serverVersion;
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/DuplicablePacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/DuplicablePacket.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/DuplicablePacket.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A DuplicablePacket
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 3 Oct 2008 15:00:31
+ *
+ *
+ */
+public class DuplicablePacket extends PacketImpl
+{
+ private boolean duplicate;
+
+ public DuplicablePacket(final byte type)
+ {
+ super(type);
+ }
+
+ public boolean isDuplicate()
+ {
+ return duplicate;
+ }
+
+ public void setDuplicate(final boolean duplicate)
+ {
+ this.duplicate = duplicate;
+ }
+
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putBoolean(duplicate);
+ }
+
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ duplicate = buffer.getBoolean();
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/MessagingExceptionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/MessagingExceptionMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/MessagingExceptionMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.wireformat;
@@ -32,7 +32,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class MessagingExceptionMessage extends PacketImpl
+public class MessagingExceptionMessage extends DuplicablePacket
{
// Constants -----------------------------------------------------
@@ -50,7 +50,7 @@
this.exception = exception;
}
-
+
public MessagingExceptionMessage()
{
super(EXCEPTION);
@@ -62,43 +62,45 @@
{
return true;
}
-
+
public MessagingException getException()
{
return exception;
}
-
+
public void encodeBody(final MessagingBuffer buffer)
{
+ super.encodeBody(buffer);
buffer.putInt(exception.getCode());
buffer.putNullableString(exception.getMessage());
}
-
+
public void decodeBody(final MessagingBuffer buffer)
{
+ super.decodeBody(buffer);
int code = buffer.getInt();
String msg = buffer.getNullableString();
exception = new MessagingException(code, msg);
}
-
+
@Override
public String toString()
{
return getParentString() + ", exception= " + exception + "]";
}
-
+
public boolean equals(Object other)
{
if (other instanceof MessagingExceptionMessage == false)
{
return false;
}
-
+
MessagingExceptionMessage r = (MessagingExceptionMessage)other;
-
+
return super.equals(other) && this.exception.equals(r.exception);
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -12,22 +12,32 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*/
-public class NullResponseMessage extends PacketImpl
+public class NullResponseMessage extends DuplicablePacket
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
+ //This does not need to be written over the wire
private final boolean writeAlways;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
+ public NullResponseMessage()
+ {
+ super(NULL_RESPONSE);
+
+ this.writeAlways = false;
+ }
+
public NullResponseMessage(final boolean writeAlways)
{
super(NULL_RESPONSE);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -232,6 +232,20 @@
{
return false;
}
+
+ public boolean isReHandleResponseOnFailure()
+ {
+ return false;
+ }
+
+ public boolean isDuplicate()
+ {
+ return false;
+ }
+
+ public void setDuplicate(final boolean duplicate)
+ {
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -103,6 +103,11 @@
return true;
}
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
@Override
public String toString()
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -64,6 +64,11 @@
address = buffer.getSimpleString();
}
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public boolean equals(Object other)
{
if (other instanceof SessionBindingQueryMessage == false)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBindingQueryResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -35,7 +35,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public class SessionBindingQueryResponseMessage extends PacketImpl
+public class SessionBindingQueryResponseMessage extends DuplicablePacket
{
private boolean exists;
@@ -72,6 +72,7 @@
public void encodeBody(final MessagingBuffer buffer)
{
+ super.encodeBody(buffer);
buffer.putBoolean(exists);
buffer.putInt(queueNames.size());
for (SimpleString queueName: queueNames)
@@ -82,6 +83,7 @@
public void decodeBody(final MessagingBuffer buffer)
{
+ super.decodeBody(buffer);
exists = buffer.getBoolean();
int numQueues = buffer.getInt();
queueNames = new ArrayList<SimpleString>(numQueues);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserCloseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -69,6 +69,11 @@
{
browserID = buffer.getLong();
}
+
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -70,6 +70,11 @@
{
browserID = buffer.getLong();
}
+
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserHasNextMessageResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -31,7 +31,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class SessionBrowserHasNextMessageResponseMessage extends PacketImpl
+public class SessionBrowserHasNextMessageResponseMessage extends DuplicablePacket
{
// Constants -----------------------------------------------------
@@ -69,11 +69,13 @@
public void encodeBody(final MessagingBuffer buffer)
{
+ super.encodeBody(buffer);
buffer.putBoolean(hasNext);
}
public void decodeBody(final MessagingBuffer buffer)
{
+ super.decodeBody(buffer);
hasNext = buffer.getBoolean();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserNextMessageMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -70,6 +70,11 @@
{
browserID = buffer.getLong();
}
+
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionBrowserResetMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -71,6 +71,11 @@
{
browserID = buffer.getLong();
}
+
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -34,6 +34,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
@Override
public boolean equals(final Object other)
{
@@ -47,6 +52,8 @@
return super.equals(other);
}
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -77,6 +77,11 @@
{
return true;
}
+
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateBrowserMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,9 +58,14 @@
{
super(SESS_CREATEBROWSER);
}
-
+
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public SimpleString getQueueName()
{
return queueName;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -63,10 +63,15 @@
public SessionCreateConsumerMessage()
{
super(SESS_CREATECONSUMER);
- }
+ }
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
@Override
public String toString()
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateConsumerResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -30,7 +30,7 @@
*
* @version <tt>$Revision$</tt>
*/
-public class SessionCreateConsumerResponseMessage extends PacketImpl
+public class SessionCreateConsumerResponseMessage extends DuplicablePacket
{
// Constants -----------------------------------------------------
@@ -68,11 +68,13 @@
public void encodeBody(final MessagingBuffer buffer)
{
+ super.encodeBody(buffer);
buffer.putInt(windowSize);
}
public void decodeBody(final MessagingBuffer buffer)
{
+ super.decodeBody(buffer);
windowSize = buffer.getInt();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -69,6 +69,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
@Override
public String toString()
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateProducerResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -30,7 +30,7 @@
*
* @version <tt>$Revision$</tt>
*/
-public class SessionCreateProducerResponseMessage extends PacketImpl
+public class SessionCreateProducerResponseMessage extends DuplicablePacket
{
// Constants -----------------------------------------------------
@@ -79,7 +79,6 @@
return maxRate;
}
-
public SimpleString getAutoGroupId()
{
return autoGroupId;
@@ -87,6 +86,7 @@
public void encodeBody(final MessagingBuffer buffer)
{
+ super.encodeBody(buffer);
buffer.putInt(initialCredits);
buffer.putInt(maxRate);
buffer.putNullableSimpleString(autoGroupId);
@@ -94,6 +94,7 @@
public void decodeBody(final MessagingBuffer buffer)
{
+ super.decodeBody(buffer);
initialCredits = buffer.getInt();
maxRate = buffer.getInt();
autoGroupId = buffer.getNullableSimpleString();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -128,6 +128,11 @@
return true;
}
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public boolean equals(Object other)
{
if (other instanceof SessionCreateQueueMessage == false)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -87,6 +87,11 @@
return true;
}
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public boolean equals(Object other)
{
if (other instanceof SessionDeleteQueueMessage == false)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerCloseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerCloseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -55,6 +55,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public long getProducerID()
{
return producerID;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionProducerFlowCreditMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -31,7 +31,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public class SessionProducerFlowCreditMessage extends PacketImpl
+public class SessionProducerFlowCreditMessage extends DuplicablePacket
{
// Constants -----------------------------------------------------
@@ -73,12 +73,14 @@
public void encodeBody(final MessagingBuffer buffer)
{
+ super.encodeBody(buffer);
buffer.putLong(producerID);
buffer.putInt(credits);
}
public void decodeBody(final MessagingBuffer buffer)
{
+ super.decodeBody(buffer);
producerID = buffer.getLong();
credits = buffer.getInt();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -54,6 +54,11 @@
return queueName;
}
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putSimpleString(queueName);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.wireformat;
@@ -32,88 +32,94 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public class SessionQueueQueryResponseMessage extends PacketImpl
+public class SessionQueueQueryResponseMessage extends DuplicablePacket
{
private boolean exists;
-
+
private boolean durable;
-
+
private int consumerCount;
-
+
private int messageCount;
-
+
private SimpleString filterString;
-
+
private SimpleString address;
-
- public SessionQueueQueryResponseMessage(final boolean durable,
- final int consumerCount, final int messageCount, final SimpleString filterString,
- final SimpleString address)
+
+ public SessionQueueQueryResponseMessage(final boolean durable,
+ final int consumerCount,
+ final int messageCount,
+ final SimpleString filterString,
+ final SimpleString address)
{
- this(durable, consumerCount, messageCount, filterString, address, true);
+ this(durable, consumerCount, messageCount, filterString, address, true);
}
-
+
public SessionQueueQueryResponseMessage()
{
this(false, 0, 0, null, null, false);
}
-
- private SessionQueueQueryResponseMessage(final boolean durable,
- final int consumerCount, final int messageCount, final SimpleString filterString, final SimpleString address,
- final boolean exists)
+
+ private SessionQueueQueryResponseMessage(final boolean durable,
+ final int consumerCount,
+ final int messageCount,
+ final SimpleString filterString,
+ final SimpleString address,
+ final boolean exists)
{
super(SESS_QUEUEQUERY_RESP);
-
+
this.durable = durable;
-
+
this.consumerCount = consumerCount;
-
+
this.messageCount = messageCount;
-
+
this.filterString = filterString;
-
+
this.address = address;
-
- this.exists = exists;
+
+ this.exists = exists;
}
-
+
public boolean isResponse()
{
return true;
}
-
+
public boolean isExists()
{
return exists;
}
-
+
public boolean isDurable()
{
return durable;
}
-
+
public int getConsumerCount()
{
return consumerCount;
}
-
+
public int getMessageCount()
{
return messageCount;
}
-
+
public SimpleString getFilterString()
{
return filterString;
}
-
+
public SimpleString getAddress()
{
return address;
}
-
+
public void encodeBody(final MessagingBuffer buffer)
{
+ super.encodeBody(buffer);
buffer.putBoolean(exists);
buffer.putBoolean(durable);
buffer.putInt(consumerCount);
@@ -121,32 +127,34 @@
buffer.putNullableSimpleString(filterString);
buffer.putNullableSimpleString(address);
}
-
+
public void decodeBody(final MessagingBuffer buffer)
{
+ super.decodeBody(buffer);
exists = buffer.getBoolean();
durable = buffer.getBoolean();
consumerCount = buffer.getInt();
messageCount = buffer.getInt();
- filterString = buffer.getNullableSimpleString();
+ filterString = buffer.getNullableSimpleString();
address = buffer.getNullableSimpleString();
}
-
+
public boolean equals(Object other)
{
if (other instanceof SessionQueueQueryResponseMessage == false)
{
return false;
}
-
+
SessionQueueQueryResponseMessage r = (SessionQueueQueryResponseMessage)other;
-
+
return super.equals(other) && this.exists == r.exists &&
- this.durable == r.durable &&
- this.consumerCount == r.consumerCount &&
- this.messageCount == r.messageCount &&
- this.filterString == null ? r.filterString == null : this.filterString.equals(r.filterString) &&
- this.address == null ? r.address == null : this.address.equals(r.address);
+ this.durable == r.durable &&
+ this.consumerCount == r.consumerCount &&
+ this.messageCount == r.messageCount &&
+ this.filterString == null ? r.filterString == null
+ : this.filterString.equals(r.filterString) && this.address == null ? r.address == null
+ : this.address.equals(r.address);
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -91,6 +91,11 @@
{
return true;
}
+
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -71,6 +71,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public long getProducerID()
{
return producerID;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -72,6 +72,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public long getProducerID()
{
return producerID;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXACommitMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXACommitMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXACommitMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -60,6 +60,11 @@
}
// Public --------------------------------------------------------
+
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
public Xid getXid()
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAEndMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAEndMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAEndMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -62,6 +62,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public boolean isFailed()
{
return failed;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAForgetMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAForgetMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAForgetMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public Xid getXid()
{
return xid;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetInDoubtXidsResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetInDoubtXidsResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetInDoubtXidsResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -61,6 +61,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public boolean isResponse()
{
return true;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetTimeoutResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetTimeoutResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAGetTimeoutResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -55,9 +55,13 @@
super(SESS_XA_GET_TIMEOUT_RESP);
}
-
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public boolean isResponse()
{
return true;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAJoinMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public Xid getXid()
{
return xid;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAPrepareMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public Xid getXid()
{
return xid;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResponseMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResponseMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -63,7 +63,7 @@
}
// Public --------------------------------------------------------
-
+
public boolean isResponse()
{
return true;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAResumeMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public Xid getXid()
{
return xid;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXARollbackMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public Xid getXid()
{
return xid;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXASetTimeoutMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -55,9 +55,13 @@
super(SESS_XA_SET_TIMEOUT);
}
-
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public int getTimeoutSeconds()
{
return this.timeoutSeconds;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionXAStartMessage.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -58,6 +58,11 @@
// Public --------------------------------------------------------
+ public boolean isReHandleResponseOnFailure()
+ {
+ return true;
+ }
+
public Xid getXid()
{
return xid;
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.server;
@@ -39,28 +39,28 @@
*/
public interface ServerSession
{
- long getID();
-
- String getUsername();
-
- String getPassword();
-
- void removeBrowser(ServerBrowserImpl browser) throws Exception;
-
- void removeConsumer(ServerConsumer consumer) throws Exception;
-
- void removeProducer(ServerProducer producer) throws Exception;
-
- void close() throws Exception;
-
- void setStarted(boolean started) throws Exception;
-
- void promptDelivery(Queue queue);
-
- void send(ServerMessage msg) throws Exception;
-
- void processed(final long consumerID, final long messageID) throws Exception;
-
+ long getID();
+
+ String getUsername();
+
+ String getPassword();
+
+ void removeBrowser(ServerBrowserImpl browser) throws Exception;
+
+ void removeConsumer(ServerConsumer consumer) throws Exception;
+
+ void removeProducer(ServerProducer producer) throws Exception;
+
+ void close() throws Exception;
+
+ void setStarted(boolean started) throws Exception;
+
+ void promptDelivery(Queue queue);
+
+ void send(ServerMessage msg) throws Exception;
+
+ void processed(final long consumerID, final long messageID) throws Exception;
+
void rollback() throws Exception;
void commit() throws Exception;
@@ -82,7 +82,7 @@
SessionXAResponseMessage XAStart(Xid xid);
SessionXAResponseMessage XASuspend() throws Exception;
-
+
List<Xid> getInDoubtXids() throws Exception;
int getXATimeout();
@@ -93,41 +93,59 @@
void removeDestination(SimpleString address, boolean durable) throws Exception;
- void createQueue(SimpleString address, SimpleString queueName, SimpleString filterString,
- boolean durable, boolean temporary) throws Exception;
+ void createQueue(SimpleString address,
+ SimpleString queueName,
+ SimpleString filterString,
+ boolean durable,
+ boolean temporary) throws Exception;
void deleteQueue(SimpleString queueName) throws Exception;
- SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName, SimpleString filterString,
- int windowSize, int maxRate) throws Exception;
-
- SessionCreateProducerResponseMessage createProducer(SimpleString address, int windowSize, int maxRate, boolean autoGroupId) throws Exception;
+ SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
+ SimpleString filterString,
+ int windowSize,
+ int maxRate) throws Exception;
+// SessionCreateConsumerResponseMessage recreateConsumer(SimpleString queueName,
+// SimpleString filterString,
+// int windowSize,
+// int maxRate) throws Exception;
+
+ SessionCreateProducerResponseMessage createProducer(SimpleString address,
+ int windowSize,
+ int maxRate,
+ boolean autoGroupId) throws Exception;
+
+// SessionCreateProducerResponseMessage recreateProducer(SimpleString address,
+// int windowSize,
+// int maxRate,
+// boolean autoGroupId) throws Exception;
+
SessionQueueQueryResponseMessage executeQueueQuery(SimpleString queueName) throws Exception;
SessionBindingQueryResponseMessage executeBindingQuery(SimpleString address) throws Exception;
void createBrowser(SimpleString queueName, SimpleString filterString) throws Exception;
-
+
void closeConsumer(long consumerID) throws Exception;
-
+
void closeProducer(long producerID) throws Exception;
-
+
void closeBrowser(long browserID) throws Exception;
-
+
void receiveConsumerCredits(long consumerID, int credits) throws Exception;
-
+
void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
-
+
boolean browserHasNextMessage(long browserID) throws Exception;
-
+
ServerMessage browserNextMessage(long browserID) throws Exception;
-
+
void browserReset(long browserID) throws Exception;
-
+
int transferConnection(RemotingConnection newConnection);
-
+
void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
-
+
void failedOver() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -118,4 +118,8 @@
channel1.send(response);
}
+//
+// public void rehandlePacket(final Packet packet)
+// {
+// }
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -463,7 +463,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
-
+
public SessionXAResponseMessage XAEnd(final Xid xid, final boolean failed) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
@@ -504,7 +504,7 @@
return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
}
-
+
public SessionXAResponseMessage XAForget(final Xid xid)
{
// Do nothing since we don't support heuristic commits / rollback from the
@@ -907,6 +907,34 @@
return response;
}
+
+// public SessionCreateConsumerResponseMessage recreateConsumer(final SimpleString queueName,
+// final SimpleString filterString,
+// int windowSize,
+// int maxRate) throws Exception
+// {
+// Binding binding = postOffice.getBinding(queueName);
+//
+// if (binding == null)
+// {
+// throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+// }
+//
+// securityStore.check(binding.getAddress(), CheckType.READ, this);
+//
+// // Flow control values if specified on queue override those passed in from
+// // client
+//
+// QueueSettings qs = queueSettingsRepository.getMatch(queueName.toString());
+//
+// Integer queueWindowSize = qs.getConsumerWindowSize();
+//
+// windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
+//
+// SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(windowSize);
+//
+// return response;
+// }
public SessionQueueQueryResponseMessage executeQueueQuery(final SimpleString queueName) throws Exception
{
@@ -1034,6 +1062,36 @@
}
return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
}
+
+// public SessionCreateProducerResponseMessage recreateProducer(final SimpleString address,
+// final int windowSize,
+// final int maxRate,
+// final boolean autoGroupId) throws Exception
+// {
+// FlowController flowController = null;
+//
+// final int maxRateToUse = maxRate;
+//
+// if (address != null)
+// {
+// flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
+// }
+//
+// final int windowToUse = flowController == null ? -1 : windowSize;
+//
+// // Get some initial credits to send to the producer - we try for
+// // windowToUse
+//
+// int initialCredits = flowController == null ? -1 : windowToUse;
+//
+// SimpleString groupId = null;
+// if(autoGroupId)
+// {
+// groupId = simpleStringIdGenerator.generateID();
+// }
+//
+// return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
+// }
public boolean browserHasNextMessage(final long browserID) throws Exception
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -51,6 +51,7 @@
import java.util.List;
+import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.exception.MessagingException;
@@ -89,6 +90,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
@@ -425,4 +427,246 @@
channel.send(response);
}
}
+
+// public void rehandlePacket(final Packet packet)
+// {
+// Packet response = null;
+// try
+// {
+// byte type = packet.getType();
+//
+// switch (type)
+// {
+// case SESS_CREATECONSUMER:
+// {
+// SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
+// response = session.recreateConsumer(request.getQueueName(),
+// request.getFilterString(),
+// request.getWindowSize(),
+// request.getMaxRate());
+// break;
+// }
+// case SESS_CREATEQUEUE:
+// {
+// response = new NullResponseMessage(true, true);
+// break;
+// }
+// case SESS_DELETE_QUEUE:
+// {
+// response = new NullResponseMessage(true, true);
+// break;
+// }
+// case SESS_QUEUEQUERY:
+// {
+// SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
+// response = session.executeQueueQuery(request.getQueueName());
+// break;
+// }
+// case SESS_BINDINGQUERY:
+// {
+// SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
+// response = session.executeBindingQuery(request.getAddress());
+// break;
+// }
+// case SESS_CREATEBROWSER:
+// {
+// response = new NullResponseMessage(false, true);
+// break;
+// }
+// case SESS_CREATEPRODUCER:
+// {
+// SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
+// response = session.recreateProducer(request.getAddress(), request.getWindowSize(), request.getMaxRate(), request.isAutoGroupId());
+// break;
+// }
+// case SESS_COMMIT:
+// {
+// response = new NullResponseMessage(false, true);
+// break;
+// }
+// case SESS_ROLLBACK:
+// {
+// response = new NullResponseMessage(false, true);
+// break;
+// }
+// case SESS_XA_COMMIT:
+// {
+// SessionXACommitMessage message = (SessionXACommitMessage)packet;
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_END:
+// {
+// SessionXAEndMessage message = (SessionXAEndMessage)packet;
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_FORGET:
+// {
+// SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_JOIN:
+// {
+// SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_RESUME:
+// {
+// SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_ROLLBACK:
+// {
+// SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_START:
+// {
+// SessionXAStartMessage message = (SessionXAStartMessage)packet;
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_SUSPEND:
+// {
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_PREPARE:
+// {
+// SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
+// //FIXME - what if the xa operation didn't succeed last time round?
+// //Need to return an xa error code then.
+// //So we need to store the state of the last operation
+//
+// ok ok it's simpler than this!!!
+//
+// no need for having reversions of everything.
+//
+// just store the last response in the server session packet hjandler and send back that
+//
+// response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+// break;
+// }
+// case SESS_XA_INDOUBT_XIDS:
+// {
+// List<Xid> xids = session.getInDoubtXids();
+// response = new SessionXAGetInDoubtXidsResponseMessage(xids);
+// break;
+// }
+// case SESS_XA_GET_TIMEOUT:
+// {
+// response = new SessionXAGetTimeoutResponseMessage(session.getXATimeout());
+// break;
+// }
+// case SESS_XA_SET_TIMEOUT:
+// {
+// SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
+// response = new SessionXASetTimeoutResponseMessage(session.setXATimeout(message.getTimeoutSeconds()));
+// break;
+// }
+// case SESS_ADD_DESTINATION:
+// {
+// response = new NullResponseMessage(true, true);
+// break;
+// }
+// case SESS_REMOVE_DESTINATION:
+// {
+// response = new NullResponseMessage(true, true);
+// break;
+// }
+// case SESS_STOP:
+// {
+// response = new NullResponseMessage(false, true);
+// break;
+// }
+// case SESS_CLOSE:
+// {
+// response = new NullResponseMessage(false, true);
+// break;
+// }
+// case SESS_CONSUMER_CLOSE:
+// {
+// response = new NullResponseMessage(true, true);
+// break;
+// }
+// case SESS_PRODUCER_CLOSE:
+// {
+// response = new NullResponseMessage(false, true);
+// break;
+// }
+// case SESS_BROWSER_CLOSE:
+// {
+// response = new NullResponseMessage(false, true);
+// break;
+// }
+// case SESS_SEND:
+// {
+// SessionSendMessage message = (SessionSendMessage)packet;
+// if (message.isRequiresResponse())
+// {
+// response = new NullResponseMessage(false, true);
+// }
+// break;
+// }
+// case SESS_BROWSER_HASNEXTMESSAGE:
+// {
+// //FIXME - this won't work - but we're getting rid of these browser commands anyway so live
+// //with it for now
+// SessionBrowserHasNextMessageMessage message = (SessionBrowserHasNextMessageMessage)packet;
+// response = new SessionBrowserHasNextMessageResponseMessage(session.browserHasNextMessage(message.getBrowserID()));
+// break;
+// }
+// case SESS_BROWSER_NEXTMESSAGE:
+// {
+// //FIXME - this won't work - but we're getting rid of these browser commands anyway so live
+// //with it for now
+// SessionBrowserNextMessageMessage message = (SessionBrowserNextMessageMessage)packet;
+// ServerMessage smsg = session.browserNextMessage(message.getBrowserID());
+// response = new SessionBrowseMessage(smsg);
+// break;
+// }
+// case SESS_BROWSER_RESET:
+// {
+// //FIXME - this won't work - but we're getting rid of these browser commands anyway so live
+// //with it for now
+// SessionBrowserResetMessage message = (SessionBrowserResetMessage)packet;
+// session.browserReset(message.getBrowserID());
+// response = new NullResponseMessage(false, true);
+// break;
+// }
+// default:
+// {
+// response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
+// "Unsupported packet on resend " + type));
+// }
+// }
+// }
+// catch (Throwable t)
+// {
+// MessagingException me;
+//
+// log.error("Caught unexpected exception", t);
+//
+// if (t instanceof MessagingException)
+// {
+// me = (MessagingException)t;
+// }
+// else
+// {
+// me = new MessagingException(MessagingException.INTERNAL_ERROR);
+// }
+//
+// response = new MessagingExceptionMessage(me);
+// }
+//
+// if (response != null)
+// {
+// channel.send(response);
+// }
+// }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java 2008-10-03 17:58:39 UTC (rev 5065)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java 2008-10-03 18:27:14 UTC (rev 5066)
@@ -121,8 +121,7 @@
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
-
+ backupParams));
do
{
testA(sf);
@@ -133,6 +132,30 @@
}
}
+ public void testAA(final ClientSessionFactory sf) throws Exception
+ {
+ ClientSession s = sf.createSession(false, false, false, false);
+
+ s.createQueue(ADDRESS, ADDRESS, null, false, false);
+
+ failer.session = s;
+
+ final int numConsumers = 100;
+
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientConsumer consumer = s.createConsumer(ADDRESS);
+
+ consumer.close();
+ }
+
+ s.deleteQueue(ADDRESS);
+
+ s.close();
+
+ log.info("done");
+ }
+
public void testA(final ClientSessionFactory sf) throws Exception
{
long start = System.currentTimeMillis();
More information about the jboss-cvs-commits
mailing list