[jboss-cvs] JBoss Messaging SVN: r5162 - in trunk: src/main/org/jboss/messaging/core/remoting and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 21 06:39:10 EDT 2008
Author: timfox
Date: 2008-10-21 06:39:09 -0400 (Tue, 21 Oct 2008)
New Revision: 5162
Modified:
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/SecurityTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
Log:
More session replication, failover, fixes etc
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -69,7 +69,7 @@
List<Binding> getBindingsForAddress(SimpleString address) throws Exception;
- Binding getBinding(SimpleString queueName) throws Exception;
+ Binding getBinding(SimpleString queueName);
List<MessageReference> route(ServerMessage message) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -11,6 +11,8 @@
*/
package org.jboss.messaging.core.remoting;
+import java.util.Queue;
+
import org.jboss.messaging.core.exception.MessagingException;
/**
@@ -49,4 +51,24 @@
void unlock();
void interruptBlocking();
+
+ //debug only
+ Queue<Command> getSentCommands();
+
+ Queue<Command> getReceivedCommands();
+
+ // For debug only
+ static class Command
+ {
+ public final int commandID;
+
+ public final Packet packet;
+
+ public Command(final int commandID, final Packet packet)
+ {
+ this.commandID = commandID;
+
+ this.packet = packet;
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionRegistryImpl.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -100,9 +100,6 @@
callTimeout,
pingInterval,
pingExecutor,
- null,
- null,
- true,
null);
handler.conn = connection;
@@ -143,9 +140,6 @@
callTimeout,
pingInterval,
pingExecutor,
- null,
- null,
- true,
null);
handler.conn = connection;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -34,7 +34,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUECOPY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -69,6 +68,9 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -196,6 +198,8 @@
private volatile boolean active;
+ private final boolean client;
+
private final long pingPeriod;
private final ScheduledExecutorService pingExecutor;
@@ -210,27 +214,204 @@
private final ReadWriteLock readWriteLock;
private final Object transferLock = new Object();
-
- private static volatile boolean debug;
-
+
+ // debug only stuff
+
+ private boolean createdActive;
+
+ public static volatile boolean debug = false;
+
+ private static final int MAX_DEBUG_CACHE_SIZE = 1000;
+
+ private static final Map<Long, Channel> serverLiveChannels = new LinkedHashMap<Long, Channel>();
+
+ private static final Map<Long, Channel> serverBackupChannels = new LinkedHashMap<Long, Channel>();
+
+ private static final Map<Long, Channel> clientChannels = new LinkedHashMap<Long, Channel>();
+
private static void report()
{
+ log.info("=================================================================");
+ log.info("Report on channel history");
+ log.info("-------------------------");
+ log.info(" ");
+ log.info("Server--->Client traffic");
+ log.info("------------------------");
+ for (Channel serverBackupChannel : serverBackupChannels.values())
+ {
+ Channel serverLiveChannel = serverLiveChannels.get(serverBackupChannel.getID());
+
+ Channel clientChannel = clientChannels.get(serverBackupChannel.getID());
+
+ log.info("Channel id: " + serverBackupChannel.getID());
+ log.info("Backup Live Client");
+ Iterator<Channel.Command> backup = serverBackupChannel.getSentCommands().iterator();
+ Iterator<Channel.Command> live = serverLiveChannel == null ? null : serverLiveChannel.getSentCommands()
+ .iterator();
+ Iterator<Channel.Command> client = clientChannel == null ? null : clientChannel.getReceivedCommands()
+ .iterator();
+
+ while (backup.hasNext() || (live != null && live.hasNext()) || (client != null && client.hasNext()))
+ {
+ String line = "";
+ byte bkpPacketType = -1;
+ byte livePacketType = -1;
+ byte clientPacketType = -1;
+ if (backup.hasNext())
+ {
+ Channel.Command backupCommand = backup.next();
+ line += "id:" + backupCommand.commandID + " type:" + backupCommand.packet.getType() + ", ";
+ bkpPacketType = backupCommand.packet.getType();
+ }
+ else
+ {
+ line += "--------------";
+ }
+ if (live != null && live.hasNext())
+ {
+ Channel.Command liveCommand = live.next();
+ line += "id:" + liveCommand.commandID + " type:" + liveCommand.packet.getType() + ", ";
+ livePacketType = liveCommand.packet.getType();
+ }
+ else
+ {
+ line += "--------------";
+ }
+ if (client != null && client.hasNext())
+ {
+ Channel.Command clientCommand = client.next();
+ line += "id:" + clientCommand.commandID + " type:" + clientCommand.packet.getType() + ", ";
+ clientPacketType = clientCommand.packet.getType();
+ }
+ else
+ {
+ line += "--------------";
+ }
+ log.info(line);
+ if (bkpPacketType != -1 && livePacketType != -1 && clientPacketType != -1 &&
+ (bkpPacketType != livePacketType || livePacketType != clientPacketType))
+ {
+ log.info("*** ERROR Packets in wrong sequence ***");
+ }
+ }
+ }
+ log.info("Client--->Server traffic");
+ log.info("------------------------");
+ for (Channel serverBackupChannel : serverBackupChannels.values())
+ {
+ Channel serverLiveChannel = serverLiveChannels.get(serverBackupChannel.getID());
+
+ Channel clientChannel = clientChannels.get(serverBackupChannel.getID());
+
+ log.info("Channel id: " + serverBackupChannel.getID());
+ log.info("Backup Live Client");
+ Iterator<Channel.Command> backup = serverBackupChannel.getReceivedCommands().iterator();
+ Iterator<Channel.Command> live = serverLiveChannel == null ? null : serverLiveChannel.getReceivedCommands()
+ .iterator();
+ Iterator<Channel.Command> client = clientChannel == null ? null : clientChannel.getSentCommands()
+ .iterator();
+
+ while (backup.hasNext() || (live != null && live.hasNext()) || (client != null && client.hasNext()))
+ {
+ String line = "";
+ byte bkpPacketType = -1;
+ byte livePacketType = -1;
+ byte clientPacketType = -1;
+ if (backup.hasNext())
+ {
+ Channel.Command backupCommand = backup.next();
+ line += "id:" + backupCommand.commandID + " type:" + backupCommand.packet.getType() + ", ";
+ bkpPacketType = backupCommand.packet.getType();
+ }
+ else
+ {
+ line += "--------------";
+ }
+ if (live != null && live.hasNext())
+ {
+ Channel.Command liveCommand = live.next();
+ line += "id:" + liveCommand.commandID + " type:" + liveCommand.packet.getType() + ", ";
+ livePacketType = liveCommand.packet.getType();
+ }
+ else
+ {
+ line += "--------------";
+ }
+ if (client != null && client.hasNext())
+ {
+ Channel.Command clientCommand = client.next();
+ line += "id:" + clientCommand.commandID + " type:" + clientCommand.packet.getType() + ", ";
+ clientPacketType = clientCommand.packet.getType();
+ }
+ else
+ {
+ line += "--------------";
+ }
+ log.info(line);
+ if (bkpPacketType != -1 && livePacketType != -1 && clientPacketType != -1 &&
+ (bkpPacketType != livePacketType || livePacketType != clientPacketType))
+ {
+ log.info("*** ERROR Packets in wrong sequence ***");
+ }
+ }
+ }
+
+ log.info("End of report====================================================");
}
+ // end debug only stuff
+
// Constructors
// ---------------------------------------------------------------------------------
+ /*
+ * Create a client side connection
+ */
public RemotingConnectionImpl(final Connection transportConnection,
final long blockingCallTimeout,
final long pingPeriod,
final ScheduledExecutorService pingExecutor,
+ final List<Interceptor> interceptors)
+ {
+ this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, null, true);
+ }
+
+ /*
+ * Create a server side connection
+ */
+ public RemotingConnectionImpl(final Connection transportConnection,
+ final long blockingCallTimeout,
+ final long pingPeriod,
+ final ScheduledExecutorService pingExecutor,
final List<Interceptor> interceptors,
final RemotingConnection replicatingConnection,
final boolean active,
final ReadWriteLock readWriteLock)
{
+ this(transportConnection,
+ blockingCallTimeout,
+ pingPeriod,
+ pingExecutor,
+ interceptors,
+ replicatingConnection,
+ active,
+ readWriteLock,
+ false);
+ }
+
+ private RemotingConnectionImpl(final Connection transportConnection,
+ final long blockingCallTimeout,
+ final long pingPeriod,
+ final ScheduledExecutorService pingExecutor,
+ final List<Interceptor> interceptors,
+ final RemotingConnection replicatingConnection,
+ final boolean active,
+ final ReadWriteLock readWriteLock,
+ final boolean client)
+
+ {
this.transportConnection = transportConnection;
this.blockingCallTimeout = blockingCallTimeout;
@@ -253,6 +434,10 @@
final ChannelHandler ppHandler = new PingPongHandler();
pingChannel.setHandler(ppHandler);
+
+ this.client = client;
+
+ this.createdActive = active;
}
public void startPinger()
@@ -290,6 +475,25 @@
channel = new ChannelImpl(this, channelID, packetConfirmationBatchSize, interruptBlockOnFailure);
channels.put(channelID, channel);
+
+ if (debug)
+ {
+ if (client)
+ {
+ clientChannels.put(channelID, channel);
+ }
+ else
+ {
+ if (createdActive)
+ {
+ serverLiveChannels.put(channelID, channel);
+ }
+ else
+ {
+ serverBackupChannels.put(channelID, channel);
+ }
+ }
+ }
}
return channel;
@@ -407,8 +611,8 @@
final long channelID = packet.getChannelID();
- //FIXME - need to redo global ordering since this won't work with multiple connections
- //Instead use lastSeq technique
+ // FIXME - need to redo global ordering since this won't work with multiple connections
+ // Instead use lastSeq technique
final boolean useLock = readWriteLock != null;
@@ -856,6 +1060,14 @@
private final Queue<DelayedResult> responseActions = new ConcurrentLinkedQueue<DelayedResult>();
+ // debug stuff
+
+ private final Queue<Command> receivedCommandsCache;
+
+ private final Queue<Command> sentCommandsCache;
+
+ // end debug stuff
+
private ChannelImpl(final RemotingConnectionImpl connection,
final long id,
final int packetConfirmationBatchSize,
@@ -893,6 +1105,19 @@
}
this.interruptBlockOnFailure = interruptBlockOnFailure;
+
+ if (debug)
+ {
+ this.sentCommandsCache = new LinkedList<Command>();
+
+ this.receivedCommandsCache = new LinkedList<Command>();
+ }
+ else
+ {
+ this.sentCommandsCache = null;
+
+ this.receivedCommandsCache = null;
+ }
}
public long getID()
@@ -1201,6 +1426,16 @@
lock.unlock();
}
+ public Queue<Command> getSentCommands()
+ {
+ return this.sentCommandsCache;
+ }
+
+ public Queue<Command> getReceivedCommands()
+ {
+ return this.receivedCommandsCache;
+ }
+
// we need to do a thorough investigation of how packets confirmed get
//
// a) replicated from client through live to backup without dealing with on live
@@ -1208,6 +1443,18 @@
private void handlePacket(final Packet packet)
{
+ if (debug)
+ {
+ int commandID = packet.isRequiresConfirmations() ? lastReceivedCommandID + 1 : -1;
+
+ receivedCommandsCache.add(new Command(commandID, packet));
+
+ if (receivedCommandsCache.size() == MAX_DEBUG_CACHE_SIZE)
+ {
+ receivedCommandsCache.poll();
+ }
+ }
+
if (packet.getType() == PACKETS_CONFIRMED)
{
if (resendCache != null)
@@ -1294,13 +1541,18 @@
nextConfirmation += packetConfirmationBatchSize;
confirmed.setChannelID(id);
+
+ if (debug)
+ {
+ this.sentCommandsCache.add(new Command(-1, confirmed));
+ }
connection.doWrite(confirmed);
}
}
}
- // private volatile int lastSentID;
+ private volatile int lastSentID;
private void addToCache(final Packet packet)
{
@@ -1308,12 +1560,22 @@
{
resendCache.add(packet);
}
+
+ if (debug && packet.getType() != PacketImpl.SESS_REPLICATE_DELIVERY)
+ {
+ sentCommandsCache.add(new Command(lastSentID++, packet));
+
+ if (sentCommandsCache.size() == MAX_DEBUG_CACHE_SIZE)
+ {
+ sentCommandsCache.poll();
+ }
+ }
}
private void clearUpTo(final int lastReceivedCommandID)
{
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
-
+
if (numberToClear == -1)
{
throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
@@ -1325,8 +1587,8 @@
if (packet == null)
{
- throw new IllegalStateException(System.identityHashCode(this) +
- " Can't find packet to clear: " +
+ report();
+ throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
" last received command id " +
lastReceivedCommandID +
" first stored command id " +
@@ -1334,7 +1596,11 @@
" cache size " +
this.resendCache.size() +
" channel id " +
- id);
+ id +
+ " client " +
+ connection.client +
+ " created active " +
+ connection.createdActive);
}
}
@@ -1347,6 +1613,10 @@
{
if (packet.getType() == PACKETS_CONFIRMED)
{
+ if (debug)
+ {
+ sentCommandsCache.add(new Command(-1, packet));
+ }
connection.doWrite(packet);
}
else if (packet.getType() == REPLICATION_RESPONSE)
@@ -1415,4 +1685,5 @@
}
}
}
+
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -25,6 +25,8 @@
import java.util.List;
+import org.jboss.messaging.core.remoting.Packet;
+
/**
*
* A ServerConsumer
@@ -36,11 +38,13 @@
{
long getID();
+ void handleClose(Packet packet);
+
void close() throws Exception;
List<MessageReference> cancelRefs() throws Exception;
- void setStarted(boolean started) throws Exception;
+ void setStarted(boolean started);
void receiveCredits(int credits) throws Exception;
@@ -51,4 +55,8 @@
void failedOver();
void deliverReplicated(final long messageID) throws Exception;
+
+ void lock();
+
+ void unlock();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -22,18 +22,35 @@
package org.jboss.messaging.core.server;
+import org.jboss.messaging.core.remoting.DelayedResult;
+import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import javax.transaction.xa.Xid;
-import java.util.List;
-
/**
*
* A ServerSession
@@ -55,87 +72,84 @@
void removeProducer(ServerProducer producer) throws Exception;
void close() throws Exception;
-
- void setStarted(boolean started) throws Exception;
-
+
void promptDelivery(Queue queue);
void send(ServerMessage msg) throws Exception;
void sendScheduled(ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
- void acknowledge(final long consumerID, final long messageID) throws Exception;
+ void handleAcknowledge(final SessionAcknowledgeMessage packet);
- void rollback() throws Exception;
+ void handleRollback(Packet packet);
- void commit() throws Exception;
+ void handleCommit(Packet packet);
- SessionXAResponseMessage XACommit(boolean onePhase, Xid xid) throws Exception;
+ void handleXACommit(SessionXACommitMessage packet);
- SessionXAResponseMessage XAEnd(Xid xid, boolean failed) throws Exception;
+ void handleXAEnd(SessionXAEndMessage packet);
- SessionXAResponseMessage XAForget(Xid xid);
+ void handleXAForget(SessionXAForgetMessage packet);
- SessionXAResponseMessage XAJoin(Xid xid) throws Exception;
+ void handleXAJoin(SessionXAJoinMessage packet);
- SessionXAResponseMessage XAPrepare(Xid xid) throws Exception;
+ void handleXAPrepare(SessionXAPrepareMessage packet);
- SessionXAResponseMessage XAResume(Xid xid) throws Exception;
+ void handleXAResume(SessionXAResumeMessage packet);
- SessionXAResponseMessage XARollback(Xid xid) throws Exception;
+ void handleXARollback(SessionXARollbackMessage packet);
- SessionXAResponseMessage XAStart(Xid xid);
+ void handleXAStart(SessionXAStartMessage packet);
- SessionXAResponseMessage XASuspend() throws Exception;
+ void handleXASuspend(Packet packet);
- List<Xid> getInDoubtXids() throws Exception;
+ void handleGetInDoubtXids(Packet packet);
- int getXATimeout();
+ void handleGetXATimeout(Packet packet);
- boolean setXATimeout(int timeoutSeconds);
+ void handleSetXATimeout(SessionXASetTimeoutMessage packet);
- void addDestination(SimpleString address, boolean durable, boolean temporary) throws Exception;
+ void handleAddDestination(SessionAddDestinationMessage packet);
+
+ void handleStart(Packet packet);
+
+ void handleStop(Packet packet);
- void removeDestination(SimpleString address, boolean durable) throws Exception;
+ void handleRemoveDestination(SessionRemoveDestinationMessage packet);
- void createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filterString,
- boolean durable,
- boolean temporary) throws Exception;
+ void handleCreateQueue(SessionCreateQueueMessage packet);
- void deleteQueue(SimpleString queueName) throws Exception;
+ void handleDeleteQueue(SessionDeleteQueueMessage packet);
- SessionCreateConsumerResponseMessage createConsumer(SimpleString queueName,
- SimpleString filterString,
- int windowSize,
- int maxRate,
- boolean browseOnly) throws Exception;
+ void handleCreateConsumer(SessionCreateConsumerMessage packet);
- SessionCreateProducerResponseMessage createProducer(SimpleString address,
- int windowSize,
- int maxRate,
- boolean autoGroupId) throws Exception;
+ void handleCreateProducer(SessionCreateProducerMessage packet);
- SessionQueueQueryResponseMessage executeQueueQuery(SimpleString queueName) throws Exception;
+ void handleExecuteQueueQuery(SessionQueueQueryMessage packet);
- SessionBindingQueryResponseMessage executeBindingQuery(SimpleString address) throws Exception;
+ void handleExecuteBindingQuery(SessionBindingQueryMessage packet);
- void closeConsumer(long consumerID) throws Exception;
+ void handleCloseConsumer(SessionConsumerCloseMessage packet);
- void closeProducer(long producerID) throws Exception;
+ void handleCloseProducer(SessionProducerCloseMessage packet);
- void receiveConsumerCredits(long consumerID, int credits) throws Exception;
+ void handleReceiveConsumerCredits(SessionConsumerFlowCreditMessage packet);
- void sendProducerMessage(long producerID, ServerMessage message) throws Exception;
+ void handleSendProducerMessage(SessionSendMessage packet);
- void sendScheduledProducerMessage(long producerID, ServerMessage serverMessage, long scheduledDeliveryTime) throws Exception;
+ void handleSendScheduledProducerMessage(SessionScheduledSendMessage packet);
+ void handleManagementMessage(SessionSendManagementMessage packet);
+
+ void handleFailedOver(Packet packet);
+
+ void handleClose(Packet packet);
+
+ void handleReplicatedDelivery(SessionReplicateDeliveryMessage packet);
+
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
- void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
+ //Should this really be here??
+ void sendResponse(final DelayedResult result, final Packet response);
- void failedOver() throws Exception;
-
- void handleReplicatedDelivery(long consumerID, long messageID) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DistributionPolicyImpl.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -21,13 +21,12 @@
*/
package org.jboss.messaging.core.server.impl;
-import org.jboss.messaging.core.server.Consumer;
-import org.jboss.messaging.core.server.DistributionPolicy;
-import org.jboss.messaging.core.server.ServerMessage;
-
import java.util.ArrayList;
import java.util.List;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -483,7 +483,7 @@
// to the backup, so we need to work in both cases
if (sessions.putIfAbsent(name, session) == null)
{
- ChannelHandler handler = new ServerSessionPacketHandler(session, channel, storageManager);
+ ChannelHandler handler = new ServerSessionPacketHandler(session, channel);
channel.setHandler(handler);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -203,7 +203,7 @@
}
- public synchronized void addConsumer(final Consumer consumer)
+ public void addConsumer(final Consumer consumer)
{
distributionPolicy.addConsumer(consumer);
}
@@ -212,11 +212,6 @@
{
boolean removed = distributionPolicy.removeConsumer(consumer);
- if (removed)
- {
- distributionPolicy.removeConsumer(consumer);
- }
-
if (!distributionPolicy.hasConsumers())
{
promptDelivery = false;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/RoundRobinDistributionPolicy.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -39,19 +39,24 @@
protected int pos = 0;
-
- public synchronized void addConsumer(Consumer consumer)
+ @Override
+ public synchronized void addConsumer(final Consumer consumer)
{
pos = 0;
super.addConsumer(consumer);
}
- public synchronized boolean removeConsumer(Consumer consumer)
+ @Override
+ public synchronized boolean removeConsumer(final Consumer consumer)
{
-
pos = 0;
return super.removeConsumer(consumer);
}
+
+ public synchronized int getConsumerCount()
+ {
+ return super.getConsumerCount();
+ }
public HandleStatus distribute(final MessageReference reference)
{
@@ -90,7 +95,7 @@
}
}
- protected Consumer getNextConsumer()
+ protected synchronized Consumer getNextConsumer()
{
Consumer consumer = consumers.get(pos);
incrementPosition();
@@ -106,7 +111,7 @@
}
}
- protected HandleStatus handle(MessageReference reference, Consumer consumer)
+ protected HandleStatus handle(final MessageReference reference, final Consumer consumer)
{
HandleStatus status;
try
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -22,12 +22,23 @@
package org.jboss.messaging.core.server.impl;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.DelayedResult;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
import org.jboss.messaging.core.server.HandleStatus;
@@ -39,11 +50,6 @@
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Concrete implementation of a ClientConsumer.
*
@@ -75,7 +81,7 @@
private final ServerSession session;
- private final Object startStopLock = new Object();
+ private final Lock lock = new ReentrantLock();
private final AtomicInteger availableCredits;
@@ -84,7 +90,7 @@
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
*/
- private boolean browseOnly;
+ private final boolean browseOnly;
private final StorageManager storageManager;
@@ -168,9 +174,12 @@
return HandleStatus.HANDLED;
}
-
- synchronized (startStopLock)
+
+ lock.lock();
+
+ try
{
+
// If the consumer is stopped then we don't accept the message, it
// should go back into the
// queue for delivery later.
@@ -217,36 +226,73 @@
return HandleStatus.HANDLED;
}
+ finally
+ {
+ lock.unlock();
+ }
}
-
- public void close() throws Exception
+
+ public void handleClose(final Packet packet)
{
- browseOnly = false;
+ DelayedResult result = null;
- setStarted(false);
-
- messageQueue.removeConsumer(this);
-
- session.removeConsumer(this);
-
- LinkedList<MessageReference> refs = cancelRefs();
-
- Iterator<MessageReference> iter = refs.iterator();
-
- while (iter.hasNext())
+ Packet response = null;
+
+ try
+ {
+ lock.lock();
+ try
+ {
+ setStarted(false);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ //We must stop delivery before replicating the packet, this ensures the close message gets processed
+ //and replicated on the backup in the same order as any delivery that might be occuring gets
+ //processed and replicated on the backup.
+ //Otherwise we could end up with a situation where a close comes in, then a delivery comes in,
+ //then close gets replicated to backup, then delivery gets replicated, but consumer is already
+ //closed!
+
+ result = channel.replicatePacket(packet);
+
+ doClose();
+
+ response = new NullResponseMessage();
+ }
+ catch (Exception e)
{
- MessageReference ref = iter.next();
-
- if (!ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ log.error("Failed to close producer", e);
+
+ if (e instanceof MessagingException)
{
- iter.remove();
+ response = new MessagingExceptionMessage((MessagingException)e);
}
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
-
- if (!refs.isEmpty())
+
+ session.sendResponse(result, response);
+ }
+
+ public void close() throws Exception
+ {
+ lock.lock();
+ try
{
- messageQueue.addListFirst(refs);
+ setStarted(false);
}
+ finally
+ {
+ lock.unlock();
+ }
+
+ doClose();
}
public LinkedList<MessageReference> cancelRefs() throws Exception
@@ -268,11 +314,8 @@
public void setStarted(final boolean started)
{
- synchronized (startStopLock)
- {
- this.started = browseOnly || started;
- }
-
+ this.started = browseOnly || started;
+
// Outside the lock
if (started)
{
@@ -355,6 +398,16 @@
}
}
}
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
// Public
// -----------------------------------------------------------------------------
@@ -362,6 +415,32 @@
// Private
// --------------------------------------------------------------------------------------
+ private void doClose() throws Exception
+ {
+ messageQueue.removeConsumer(this);
+
+ session.removeConsumer(this);
+
+ LinkedList<MessageReference> refs = cancelRefs();
+
+ Iterator<MessageReference> iter = refs.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+
+ if (!ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ {
+ iter.remove();
+ }
+ }
+
+ if (!refs.isEmpty())
+ {
+ messageQueue.addListFirst(refs);
+ }
+ }
+
private void promptDelivery()
{
session.promptDelivery(messageQueue);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -41,15 +41,46 @@
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.DelayedResult;
import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.server.MessageReference;
@@ -68,7 +99,6 @@
import org.jboss.messaging.util.SimpleIDGenerator;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.SimpleStringIdGenerator;
-import org.jboss.messaging.util.UUIDGenerator;
/*
* Session implementation
@@ -235,31 +265,9 @@
}
}
- public void setStarted(final boolean s) throws Exception
- {
- Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
-
- for (ServerConsumer consumer : consumersClone)
- {
- consumer.setStarted(s);
- }
-
- started = s;
- }
-
- public void failedOver() throws Exception
- {
- Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
-
- for (ServerConsumer consumer : consumersClone)
- {
- consumer.failedOver();
- }
- }
-
public void close() throws Exception
{
- rollback(false);
+ rollback();
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -349,723 +357,1506 @@
}
}
- public void acknowledge(final long consumerID, final long messageID) throws Exception
+ public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
{
- MessageReference ref = consumers.get(consumerID).getReference(messageID);
-
- //Null implies a browser
- if (ref != null)
+ SimpleString queueName = packet.getQueueName();
+
+ SimpleString filterString = packet.getFilterString();
+
+ int windowSize = packet.getWindowSize();
+
+ int maxRate = packet.getMaxRate();
+
+ boolean browseOnly = packet.isBrowseOnly();
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ try
{
- if (autoCommitAcks)
+ Binding binding = postOffice.getBinding(queueName);
+
+ if (binding == null)
{
- doAck(ref);
+ throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
+
+ securityStore.check(binding.getAddress(), CheckType.READ, this);
+
+ Filter filter = null;
+
+ if (filterString != null)
+ {
+ filter = new FilterImpl(filterString);
+ }
+
+ // Flow control values if specified on queue override those passed in from
+ // client
+
+ QueueSettings qs = queueSettingsRepository.getMatch(queueName.toString());
+
+ Integer queueWindowSize = qs.getConsumerWindowSize();
+
+ windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
+
+ Integer queueMaxRate = queueSettingsRepository.getMatch(queueName.toString()).getConsumerMaxRate();
+
+ maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
+
+ Queue theQueue;
+ if (browseOnly)
+ {
+ // We consume a copy of the queue - TODO - this is a temporary measure
+ // and will disappear once we can provide a proper iterator on the queue
+
+ theQueue = new QueueImpl(-1, queueName, filter, false, false, false, null, postOffice);
+
+ // There's no need for any special locking since the list method is synchronized
+ List<MessageReference> refs = binding.getQueue().list(filter);
+
+ for (MessageReference ref : refs)
+ {
+ theQueue.addLast(ref);
+ }
+ }
else
{
- tx.addAcknowledgement(ref);
-
- // Del count is not actually updated in storage unless it's
- // cancelled
- ref.incrementDeliveryCount();
+ theQueue = binding.getQueue();
}
+
+ ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
+ this,
+ theQueue,
+ filter,
+ windowSize != -1,
+ maxRate,
+ started,
+ browseOnly,
+ storageManager,
+ queueSettingsRepository,
+ postOffice,
+ channel);
+
+ response = new SessionCreateConsumerResponseMessage(windowSize);
+
+ consumers.put(consumer.getID(), consumer);
}
+ catch (Exception e)
+ {
+ log.error("Failed to create consumer", e);
+
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+
+ sendResponse(result, response);
}
- public void rollback() throws Exception
+ public void handleCreateQueue(final SessionCreateQueueMessage packet)
{
- rollback(true);
+ SimpleString address = packet.getAddress();
+
+ SimpleString queueName = packet.getQueueName();
+
+ SimpleString filterString = packet.getFilterString();
+
+ boolean temporary = packet.isTemporary();
+
+ boolean durable = packet.isDurable();
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ try
+ {
+ // make sure the user has privileges to create this queue
+ if (!postOffice.containsDestination(address))
+ {
+ securityStore.check(address, CheckType.CREATE, this);
+ }
+
+ Binding binding = postOffice.getBinding(queueName);
+
+ if (binding != null)
+ {
+ throw new MessagingException(MessagingException.QUEUE_EXISTS);
+ }
+
+ Filter filter = null;
+
+ if (filterString != null)
+ {
+ filter = new FilterImpl(filterString);
+ }
+
+ binding = postOffice.addBinding(address, queueName, filter, durable, temporary);
+
+ if (temporary)
+ {
+ // Temporary queue in core simply means the queue will be deleted if
+ // the remoting connection
+ // dies. It does not mean it will get deleted automatically when the
+ // session is closed.
+ // It is up to the user to delete the queue when finished with it
+
+ final Queue queue = binding.getQueue();
+
+ failureRunners.add(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ postOffice.removeBinding(queue.getName());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to remove temporary queue " + queue.getName());
+ }
+ }
+ });
+ }
+
+ response = new NullResponseMessage();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to create queue", e);
+
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+
+ sendResponse(result, response);
}
- private void doRollback(final Transaction theTx) throws Exception
+ public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
{
- boolean wasStarted = started;
+ SimpleString queueName = packet.getQueueName();
- List<MessageReference> toCancel = new ArrayList<MessageReference>();
+ DelayedResult result = channel.replicatePacket(packet);
- for (ServerConsumer consumer : consumers.values())
+ Packet response = null;
+
+ try
{
- if (wasStarted)
+ Binding binding = postOffice.removeBinding(queueName);
+
+ if (binding == null)
{
- consumer.setStarted(false);
+ throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
}
- toCancel.addAll(consumer.cancelRefs());
+ Queue queue = binding.getQueue();
+
+ if (queue.getConsumerCount() != 0)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
+ }
+
+ if (queue.isDurable())
+ {
+ binding.getQueue().deleteAllReferences(storageManager);
+ }
+
+ response = new NullResponseMessage();
}
+ catch (Exception e)
+ {
+ log.error("Failed to delete consumer", e);
- List<MessageReference> rolledBack = theTx.rollback(queueSettingsRepository);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
- rolledBack.addAll(toCancel);
+ sendResponse(result, response);
+ }
- if (wasStarted)
+ public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
+ {
+ SimpleString queueName = packet.getQueueName();
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ try
{
- for (ServerConsumer consumer : consumers.values())
+ if (queueName == null)
{
- consumer.setStarted(true);
+ throw new IllegalArgumentException("Queue name is null");
}
+
+ Binding binding = postOffice.getBinding(queueName);
+
+ if (binding != null)
+ {
+ Queue queue = binding.getQueue();
+
+ Filter filter = queue.getFilter();
+
+ SimpleString filterString = filter == null ? null : filter.getFilterString();
+ // TODO: Remove MAX-SIZE-BYTES from SessionQueueQueryResponse.
+ response = new SessionQueueQueryResponseMessage(queue.isDurable(),
+ queue.getConsumerCount(),
+ queue.getMessageCount(),
+ filterString,
+ binding.getAddress());
+ }
+ else
+ {
+ response = new SessionQueueQueryResponseMessage();
+ }
}
+ catch (Exception e)
+ {
+ log.error("Failed to execute queue query", e);
- // Now cancel the refs back to the queue(s), we sort into queues and cancel back atomically to
- // preserve order
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
- Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+ sendResponse(result, response);
+ }
- for (MessageReference ref : rolledBack)
+ public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
+ {
+ SimpleString address = packet.getAddress();
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ try
{
- if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ if (address == null)
{
- Queue queue = ref.getQueue();
+ throw new IllegalArgumentException("Address is null");
+ }
- LinkedList<MessageReference> list = queueMap.get(queue);
+ boolean exists = postOffice.containsDestination(address);
- if (list == null)
+ List<SimpleString> queueNames = new ArrayList<SimpleString>();
+
+ if (exists)
+ {
+ List<Binding> bindings = postOffice.getBindingsForAddress(address);
+
+ for (Binding binding : bindings)
{
- list = new LinkedList<MessageReference>();
-
- queueMap.put(queue, list);
+ queueNames.add(binding.getQueue().getName());
}
+ }
- list.add(ref);
+ response = new SessionBindingQueryResponseMessage(exists, queueNames);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to execute binding query", e);
+
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
}
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
- for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
+ sendResponse(result, response);
+ }
+
+ /**
+ * Create a producer for the specified address
+ *
+ * @param address The address to produce too
+ * @param windowSize - the producer window size to use for flow control. Specify -1 to disable flow control
+ * completely The actual window size used may be less than the specified window size if it is overridden by
+ * any producer-window-size specified on the queue
+ */
+ public void handleCreateProducer(final SessionCreateProducerMessage packet)
+ {
+ SimpleString address = packet.getAddress();
+
+ int maxRate = packet.getMaxRate();
+
+ int windowSize = packet.getWindowSize();
+
+ boolean autoGroupID = packet.isAutoGroupId();
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ try
{
- LinkedList<MessageReference> refs = entry.getValue();
+ FlowController flowController = null;
- entry.getKey().addListFirst(refs);
+ final int maxRateToUse = maxRate;
+
+ if (address != null)
+ {
+ flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
+ }
+
+ final int windowToUse = flowController == null ? -1 : windowSize;
+
+ // Server window size is 0.75 client window size for producer flow control
+ // (other way round to consumer flow control)
+
+ final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);
+
+ ServerProducerImpl producer = new ServerProducerImpl(idGenerator.generateID(),
+ this,
+ address,
+ flowController,
+ serverWindowSize,
+ channel);
+
+ producers.put(producer.getID(), producer);
+
+ // Get some initial credits to send to the producer - we try for
+ // windowToUse
+
+ int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
+
+ SimpleString groupId = null;
+
+ if (autoGroupID)
+ {
+ groupId = simpleStringIdGenerator.generateID();
+ }
+
+ response = new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
}
+ catch (Exception e)
+ {
+ log.error("Failed to create producer", e);
+
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+
+ sendResponse(result, response);
}
- private void rollback(final boolean sendResponse) throws Exception
+ public void handleAcknowledge(final SessionAcknowledgeMessage packet)
{
- if (tx == null)
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ try
{
- // Might be null if XA
+ MessageReference ref = consumers.get(packet.getConsumerID()).getReference(packet.getMessageID());
- tx = new TransactionImpl(storageManager, postOffice);
+ // Null implies a browser
+ if (ref != null)
+ {
+ if (autoCommitAcks)
+ {
+ doAck(ref);
+ }
+ else
+ {
+ tx.addAcknowledgement(ref);
+
+ // Del count is not actually updated in storage unless it's
+ // cancelled
+ ref.incrementDeliveryCount();
+ }
+ }
+
+ if (packet.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
}
+ catch (Exception e)
+ {
+ log.error("Failed to acknowledge", e);
- doRollback(tx);
+ if (packet.isRequiresResponse())
+ {
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+ }
- tx = new TransactionImpl(storageManager, postOffice);
+ sendResponse(result, response);
}
- public void commit() throws Exception
+ public void handleCommit(final Packet packet)
{
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
try
{
tx.commit();
+
+ response = new NullResponseMessage();
}
+ catch (Exception e)
+ {
+ log.error("Failed to commit", e);
+
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
finally
{
tx = new TransactionImpl(storageManager, postOffice);
}
+
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XACommit(final boolean onePhase, final Xid xid) throws Exception
+ public void handleRollback(final Packet packet)
{
- if (tx != null)
- {
- final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
+ DelayedResult result = channel.replicatePacket(packet);
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
+ Packet response = null;
- Transaction theTx = resourceManager.removeTransaction(xid);
-
- if (theTx == null)
+ try
{
- final String msg = "Cannot find xid in resource manager: " + xid;
+ rollback();
- return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ response = new NullResponseMessage();
}
-
- if (theTx.getState() == Transaction.State.SUSPENDED)
+ catch (Exception e)
{
- // Put it back
- resourceManager.putTransaction(xid, tx);
+ log.error("Failed to rollback", e);
- return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot commit transaction, it is suspended " + xid);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
- theTx.commit();
-
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XAEnd(final Xid xid, final boolean failed) throws Exception
+ public void handleXACommit(final SessionXACommitMessage packet)
{
- if (tx != null && tx.getXid().equals(xid))
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ Xid xid = packet.getXid();
+
+ try
{
- if (tx.getState() == Transaction.State.SUSPENDED)
+ if (tx != null)
{
- final String msg = "Cannot end, transaction is suspended";
+ final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
+ else
+ {
+ Transaction theTx = resourceManager.removeTransaction(xid);
- tx = null;
+ if (theTx == null)
+ {
+ final String msg = "Cannot find xid in resource manager: " + xid;
+
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ }
+ else
+ {
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ // Put it back
+ resourceManager.putTransaction(xid, tx);
+
+ response = new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot commit transaction, it is suspended " + xid);
+ }
+ else
+ {
+ theTx.commit();
+
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
+ }
+ }
}
- else
+ catch (Exception e)
{
- // It's also legal for the TM to call end for a Xid in the suspended
- // state
- // See JTA 1.1 spec 3.4.4 - state diagram
- // Although in practice TMs rarely do this.
- Transaction theTx = resourceManager.getTransaction(xid);
+ log.error("Failed to xa commit", e);
- if (theTx == null)
+ if (e instanceof MessagingException)
{
- final String msg = "Cannot find suspended transaction to end " + xid;
-
- return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ response = new MessagingExceptionMessage((MessagingException)e);
}
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
- if (theTx.getState() != Transaction.State.SUSPENDED)
+ sendResponse(result, response);
+ }
+
+ public void handleXAEnd(final SessionXAEndMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ Xid xid = packet.getXid();
+
+ try
+ {
+ if (tx != null && tx.getXid().equals(xid))
{
- final String msg = "Transaction is not suspended " + xid;
+ if (tx.getState() == Transaction.State.SUSPENDED)
+ {
+ final String msg = "Cannot end, transaction is suspended";
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ tx = null;
+ }
}
+ else
+ {
+ // It's also legal for the TM to call end for a Xid in the suspended
+ // state
+ // See JTA 1.1 spec 3.4.4 - state diagram
+ // Although in practice TMs rarely do this.
+ Transaction theTx = resourceManager.getTransaction(xid);
- theTx.resume();
+ if (theTx == null)
+ {
+ final String msg = "Cannot find suspended transaction to end " + xid;
+
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ }
+ else
+ {
+ if (theTx.getState() != Transaction.State.SUSPENDED)
+ {
+ final String msg = "Transaction is not suspended " + xid;
+
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ theTx.resume();
+ }
+ }
+ }
+
+ if (response == null)
+ {
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
}
+ catch (Exception e)
+ {
+ log.error("Failed to xa end", e);
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XAForget(final Xid xid)
+ public void handleXAForget(final SessionXAForgetMessage packet)
{
+ DelayedResult result = channel.replicatePacket(packet);
+
// Do nothing since we don't support heuristic commits / rollback from the
// resource manager
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XAJoin(final Xid xid) throws Exception
+ public void handleXAJoin(final SessionXAJoinMessage packet)
{
- Transaction theTx = resourceManager.getTransaction(xid);
+ DelayedResult result = channel.replicatePacket(packet);
- if (theTx == null)
+ Packet response = null;
+
+ Xid xid = packet.getXid();
+
+ try
{
- final String msg = "Cannot find xid in resource manager: " + xid;
+ Transaction theTx = resourceManager.getTransaction(xid);
- return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ if (theTx == null)
+ {
+ final String msg = "Cannot find xid in resource manager: " + xid;
+
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ }
+ else
+ {
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ response = new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot join tx, it is suspended " + xid);
+ }
+ else
+ {
+ tx = theTx;
+
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
+ }
}
+ catch (Exception e)
+ {
+ log.error("Failed to xa join", e);
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, "Cannot join tx, it is suspended " + xid);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
- tx = theTx;
-
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XAPrepare(final Xid xid) throws Exception
+ public void handleXAResume(final SessionXAResumeMessage packet)
{
- if (tx != null)
- {
- final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
+ DelayedResult result = channel.replicatePacket(packet);
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
+ Packet response = null;
- Transaction theTx = resourceManager.getTransaction(xid);
+ Xid xid = packet.getXid();
- if (theTx == null)
+ try
{
- final String msg = "Cannot find xid in resource manager: " + xid;
+ if (tx != null)
+ {
+ final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
- return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
- }
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ Transaction theTx = resourceManager.getTransaction(xid);
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot prepare transaction, it is suspended " + xid);
- }
+ if (theTx == null)
+ {
+ final String msg = "Cannot find xid in resource manager: " + xid;
- if (theTx.isEmpty())
- {
- return new SessionXAResponseMessage(false, XAResource.XA_RDONLY, null);
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ }
+ else
+ {
+ if (theTx.getState() != Transaction.State.SUSPENDED)
+ {
+ response = new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot resume transaction, it is not suspended " + xid);
+ }
+ else
+ {
+ tx = theTx;
+
+ tx.resume();
+
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
+ }
+ }
}
- else
+ catch (Exception e)
{
- theTx.prepare();
+ log.error("Failed to xa resume", e);
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
+
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XAResume(final Xid xid) throws Exception
+ public void handleXARollback(final SessionXARollbackMessage packet)
{
- if (tx != null)
- {
- final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
+ DelayedResult result = channel.replicatePacket(packet);
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
+ Packet response = null;
- Transaction theTx = resourceManager.getTransaction(xid);
+ Xid xid = packet.getXid();
- if (theTx == null)
+ try
{
- final String msg = "Cannot find xid in resource manager: " + xid;
+ if (tx != null)
+ {
+ final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
- return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ Transaction theTx = resourceManager.removeTransaction(xid);
+
+ if (theTx == null)
+ {
+ final String msg = "Cannot find xid in resource manager: " + xid;
+
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ }
+ else
+ {
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ // Put it back
+ resourceManager.putTransaction(xid, tx);
+
+ response = new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot rollback transaction, it is suspended " + xid);
+ }
+ else
+ {
+ doRollback(theTx);
+
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
+ }
+ }
}
+ catch (Exception e)
+ {
+ log.error("Failed to xa rollback", e);
- if (theTx.getState() != Transaction.State.SUSPENDED)
- {
- return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot resume transaction, it is not suspended " + xid);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
- tx = theTx;
-
- tx.resume();
-
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XARollback(final Xid xid) throws Exception
+ public void handleXAStart(final SessionXAStartMessage packet)
{
- if (tx != null)
- {
- final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
+ DelayedResult result = channel.replicatePacket(packet);
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
+ Packet response = null;
- Transaction theTx = resourceManager.removeTransaction(xid);
+ Xid xid = packet.getXid();
- if (theTx == null)
+ try
{
- final String msg = "Cannot find xid in resource manager: " + xid;
+ if (tx != null)
+ {
+ final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
- return new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ tx = new TransactionImpl(xid, storageManager, postOffice);
+
+ boolean added = resourceManager.putTransaction(xid, tx);
+
+ if (!added)
+ {
+ final String msg = "Cannot start, there is already a xid " + tx.getXid();
+
+ response = new SessionXAResponseMessage(true, XAException.XAER_DUPID, msg);
+ }
+ else
+ {
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
+ }
}
-
- if (theTx.getState() == Transaction.State.SUSPENDED)
+ catch (Exception e)
{
- // Put it back
- resourceManager.putTransaction(xid, tx);
+ log.error("Failed to xa start", e);
- return new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot rollback transaction, it is suspended " + xid);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
- doRollback(theTx);
-
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XAStart(final Xid xid)
+ public void handleXASuspend(final Packet packet)
{
- if (tx != null)
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ try
{
- final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
+ if (tx == null)
+ {
+ final String msg = "Cannot suspend, session is not doing work in a transaction ";
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ if (tx.getState() == Transaction.State.SUSPENDED)
+ {
+ final String msg = "Cannot suspend, transaction is already suspended " + tx.getXid();
- tx = new TransactionImpl(xid, storageManager, postOffice);
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ tx.suspend();
- boolean added = resourceManager.putTransaction(xid, tx);
+ tx = null;
- if (!added)
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
+ }
+ }
+ catch (Exception e)
{
- final String msg = "Cannot start, there is already a xid " + tx.getXid();
+ log.error("Failed to xa suspend", e);
- return new SessionXAResponseMessage(true, XAException.XAER_DUPID, msg);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ sendResponse(result, response);
}
- public SessionXAResponseMessage XASuspend() throws Exception
+ public void handleXAPrepare(final SessionXAPrepareMessage packet)
{
- if (tx == null)
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = null;
+
+ Xid xid = packet.getXid();
+
+ try
{
- final String msg = "Cannot suspend, session is not doing work in a transaction ";
+ if (tx != null)
+ {
+ final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ Transaction theTx = resourceManager.getTransaction(xid);
+
+ if (theTx == null)
+ {
+ final String msg = "Cannot find xid in resource manager: " + xid;
+
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ }
+ else
+ {
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ response = new SessionXAResponseMessage(true,
+ XAException.XAER_PROTO,
+ "Cannot prepare transaction, it is suspended " + xid);
+ }
+ else
+ {
+ if (theTx.isEmpty())
+ {
+ response = new SessionXAResponseMessage(false, XAResource.XA_RDONLY, null);
+ }
+ else
+ {
+ theTx.prepare();
+
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ }
+ }
+ }
+ }
}
-
- if (tx.getState() == Transaction.State.SUSPENDED)
+ catch (Exception e)
{
- final String msg = "Cannot suspend, transaction is already suspended " + tx.getXid();
+ log.error("Failed to xa prepare", e);
- return new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
- tx.suspend();
+ sendResponse(result, response);
+ }
- tx = null;
+ public void handleGetInDoubtXids(final Packet packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
- return new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
+
+ sendResponse(result, response);
}
- public List<Xid> getInDoubtXids() throws Exception
+ public void handleGetXATimeout(final Packet packet)
{
- return resourceManager.getPreparedTransactions();
+ DelayedResult result = channel.replicatePacket(packet);
+
+ Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
+
+ sendResponse(result, response);
}
- public int getXATimeout()
+ public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
{
- return resourceManager.getTimeoutSeconds();
- }
+ DelayedResult result = channel.replicatePacket(packet);
- public boolean setXATimeout(final int timeoutSeconds)
- {
- return resourceManager.setTimeoutSeconds(timeoutSeconds);
+ Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
+
+ sendResponse(result, response);
}
- public void addDestination(final SimpleString address, final boolean durable, final boolean temporary) throws Exception
+ public void handleAddDestination(final SessionAddDestinationMessage packet)
{
- securityStore.check(address, CheckType.CREATE, this);
+ DelayedResult result = channel.replicatePacket(packet);
- if (!postOffice.addDestination(address, durable))
- {
- throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
- }
+ Packet response = null;
- if (temporary)
+ final SimpleString address = packet.getAddress();
+
+ final boolean durable = packet.isDurable();
+
+ final boolean temporary = packet.isTemporary();
+
+ try
{
- // Temporary address in core simply means the address will be deleted
- // if the remoting connection
- // dies. It does not mean it will get deleted automatically when the
- // session is closed.
- // It is up to the user to delete the address when finished with it
+ securityStore.check(address, CheckType.CREATE, this);
- failureRunners.add(new Runnable()
+ if (!postOffice.addDestination(address, durable))
{
- public void run()
+ throw new MessagingException(MessagingException.ADDRESS_EXISTS, "Address already exists: " + address);
+ }
+
+ if (temporary)
+ {
+ // Temporary address in core simply means the address will be deleted
+ // if the remoting connection
+ // dies. It does not mean it will get deleted automatically when the
+ // session is closed.
+ // It is up to the user to delete the address when finished with it
+
+ failureRunners.add(new Runnable()
{
- try
+ public void run()
{
- postOffice.removeDestination(address, durable);
+ try
+ {
+ postOffice.removeDestination(address, durable);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to remove temporary address " + address);
+ }
}
- catch (Exception e)
- {
- log.error("Failed to remove temporary address " + address);
- }
- }
- });
+ });
+ }
+
+ response = new NullResponseMessage();
}
+ catch (Exception e)
+ {
+ log.error("Failed to add destination", e);
+
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+
+ sendResponse(result, response);
}
- public void removeDestination(final SimpleString address, final boolean durable) throws Exception
+ public void handleRemoveDestination(final SessionRemoveDestinationMessage packet)
{
- securityStore.check(address, CheckType.CREATE, this);
+ DelayedResult result = channel.replicatePacket(packet);
- if (!postOffice.removeDestination(address, durable))
+ Packet response = null;
+
+ final SimpleString address = packet.getAddress();
+
+ final boolean durable = packet.isDurable();
+
+ try
{
- throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST, "Address does not exist: " + address);
+ securityStore.check(address, CheckType.CREATE, this);
+
+ if (!postOffice.removeDestination(address, durable))
+ {
+ throw new MessagingException(MessagingException.ADDRESS_DOES_NOT_EXIST,
+ "Address does not exist: " + address);
+ }
+
+ response = new NullResponseMessage();
}
+ catch (Exception e)
+ {
+ log.error("Failed to remove destination", e);
+
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+
+ sendResponse(result, response);
}
- public void createQueue(final SimpleString address,
- final SimpleString queueName,
- final SimpleString filterString,
- final boolean durable,
- final boolean temporary) throws Exception
+ private void lockConsumers()
{
- // make sure the user has privileges to create this queue
- if (!postOffice.containsDestination(address))
+ for (ServerConsumer consumer : consumers.values())
{
- securityStore.check(address, CheckType.CREATE, this);
+ consumer.lock();
}
- Binding binding = postOffice.getBinding(queueName);
+ }
- if (binding != null)
+ private void unlockConsumers()
+ {
+ for (ServerConsumer consumer : consumers.values())
{
- throw new MessagingException(MessagingException.QUEUE_EXISTS);
+ consumer.unlock();
}
+ }
- Filter filter = null;
-
- if (filterString != null)
+ public void handleStart(final Packet packet)
+ {
+ boolean lock = this.channel.getReplicatingChannel() != null;
+
+ if (lock)
{
- filter = new FilterImpl(filterString);
+ lockConsumers();
}
- binding = postOffice.addBinding(address, queueName, filter, durable, temporary);
-
- if (temporary)
+ //We need to prevent any delivery and replication of delivery occurring while the start/stop
+ //is being processed.
+ //Otherwise we can end up with start/stop being processed in different order on backup to live.
+ //Which can result in, say, a delivery arriving at backup, but it's still not started!
+ try
{
- // Temporary queue in core simply means the queue will be deleted if
- // the remoting connection
- // dies. It does not mean it will get deleted automatically when the
- // session is closed.
- // It is up to the user to delete the queue when finished with it
+ channel.replicatePacket(packet);
- final Queue queue = binding.getQueue();
+ // set started will unlock
+ setStarted(true);
- failureRunners.add(new Runnable()
+ }
+ finally
+ {
+ if (lock)
{
- public void run()
- {
- try
- {
- postOffice.removeBinding(queue.getName());
- }
- catch (Exception e)
- {
- log.error("Failed to remove temporary queue " + queue.getName());
- }
- }
- });
+ unlockConsumers();
+ }
}
}
- public void deleteQueue(final SimpleString queueName) throws Exception
+ public void handleStop(final Packet packet)
{
- Binding binding = postOffice.removeBinding(queueName);
-
- if (binding == null)
+ boolean lock = this.channel.getReplicatingChannel() != null;
+
+ if (lock)
{
- throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ lockConsumers();
}
- Queue queue = binding.getQueue();
+ try
+ {
+ DelayedResult result = channel.replicatePacket(packet);
- if (queue.getConsumerCount() != 0)
+ setStarted(false);
+
+ sendResponse(result, new NullResponseMessage());
+ }
+ finally
{
- throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
+ if (lock)
+ {
+ unlockConsumers();
+ }
}
+ }
- if (queue.isDurable())
+ public void handleFailedOver(final Packet packet)
+ {
+ // No need to replicate since this only occurs after failover
+ Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+
+ for (ServerConsumer consumer : consumersClone)
{
- binding.getQueue().deleteAllReferences(storageManager);
+ consumer.failedOver();
}
}
- public SessionCreateConsumerResponseMessage createConsumer(final SimpleString queueName,
- final SimpleString filterString,
- int windowSize,
- int maxRate,
- final boolean browseOnly) throws Exception
+ public void handleClose(final Packet packet)
{
- Binding binding = postOffice.getBinding(queueName);
+ DelayedResult result = channel.replicatePacket(packet);
- if (binding == null)
+ Packet response = null;
+
+ try
{
- throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ close();
+
+ response = new NullResponseMessage();
}
+ catch (Exception e)
+ {
+ log.error("Failed to close", e);
- securityStore.check(binding.getAddress(), CheckType.READ, this);
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
- Filter filter = null;
+ sendResponse(result, response, true);
+ }
- if (filterString != null)
+ private void setStarted(final boolean s)
+ {
+ Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+
+ for (ServerConsumer consumer : consumersClone)
{
- filter = new FilterImpl(filterString);
+ consumer.setStarted(s);
}
- // Flow control values if specified on queue override those passed in from
- // client
+ started = s;
+ }
- QueueSettings qs = queueSettingsRepository.getMatch(queueName.toString());
+ public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
+ {
+ consumers.get(packet.getConsumerID()).handleClose(packet);
+ }
- Integer queueWindowSize = qs.getConsumerWindowSize();
+ public void handleCloseProducer(final SessionProducerCloseMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
- windowSize = queueWindowSize != null ? queueWindowSize : windowSize;
+ Packet response = null;
- Integer queueMaxRate = queueSettingsRepository.getMatch(queueName.toString()).getConsumerMaxRate();
+ try
+ {
+ producers.get(packet.getProducerID()).close();
- maxRate = queueMaxRate != null ? queueMaxRate : maxRate;
-
- Queue theQueue;
- if (browseOnly)
+ response = new NullResponseMessage();
+ }
+ catch (Exception e)
{
- // We consume a copy of the queue - TODO - this is a temporary measure
- // and will disappear once we can provide a proper iterator on the queue
+ log.error("Failed to close producer", e);
- theQueue = new QueueImpl(-1,
- queueName,
- filter,
- false,
- false,
- false,
- null,
- postOffice);
-
- //There's no need for any special locking since the list method is synchronized
- List<MessageReference> refs = binding.getQueue().list(filter);
-
- for (MessageReference ref : refs)
+ if (e instanceof MessagingException)
{
- theQueue.addLast(ref);
+ response = new MessagingExceptionMessage((MessagingException)e);
}
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
}
- else
- {
- theQueue = binding.getQueue();
- }
- ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
- this,
- theQueue,
- filter,
- windowSize != -1,
- maxRate,
- started,
- browseOnly,
- storageManager,
- queueSettingsRepository,
- postOffice,
- channel);
+ sendResponse(result, response);
+ }
- SessionCreateConsumerResponseMessage response = new SessionCreateConsumerResponseMessage(windowSize);
+ public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
+ {
+ channel.replicatePacket(packet);
- consumers.put(consumer.getID(), consumer);
-
- return response;
+ try
+ {
+ consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive credits", e);
+ }
}
- public SessionQueueQueryResponseMessage executeQueueQuery(final SimpleString queueName) throws Exception
+ public void handleSendProducerMessage(final SessionSendMessage packet)
{
- if (queueName == null)
+ ServerMessage msg = packet.getServerMessage();
+
+ if (msg.getMessageID() == 0L)
{
- throw new IllegalArgumentException("Queue name is null");
+ // must generate message id here, so we know they are in sync on live and backup
+ long id = storageManager.generateUniqueID();
+
+ msg.setMessageID(id);
}
- Binding binding = postOffice.getBinding(queueName);
+ DelayedResult result = channel.replicatePacket(packet);
- SessionQueueQueryResponseMessage response;
+ Packet response = null;
- if (binding != null)
+ try
{
- Queue queue = binding.getQueue();
+ producers.get(packet.getProducerID()).send(msg);
- Filter filter = queue.getFilter();
-
- SimpleString filterString = filter == null ? null : filter.getFilterString();
- // TODO: Remove MAX-SIZE-BYTES from SessionQueueQueryResponse.
- response = new SessionQueueQueryResponseMessage(queue.isDurable(),
- queue.getConsumerCount(),
- queue.getMessageCount(),
- filterString,
- binding.getAddress());
+ if (packet.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
}
- else
+ catch (Exception e)
{
- response = new SessionQueueQueryResponseMessage();
+ log.error("Failed to send message", e);
+
+ if (packet.isRequiresResponse())
+ {
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
}
- return response;
+ sendResponse(result, response);
}
- public SessionBindingQueryResponseMessage executeBindingQuery(final SimpleString address) throws Exception
+ public void handleSendScheduledProducerMessage(final SessionScheduledSendMessage packet)
{
- if (address == null)
+ ServerMessage msg = packet.getServerMessage();
+
+ if (msg.getMessageID() == 0L)
{
- throw new IllegalArgumentException("Address is null");
+ // must generate message id here, so we know they are in sync on live and backup
+ long id = storageManager.generateUniqueID();
+
+ msg.setMessageID(id);
}
- boolean exists = postOffice.containsDestination(address);
+ DelayedResult result = channel.replicatePacket(packet);
- List<SimpleString> queueNames = new ArrayList<SimpleString>();
+ Packet response = null;
- if (exists)
+ try
{
- List<Binding> bindings = postOffice.getBindingsForAddress(address);
+ producers.get(packet.getProducerID()).sendScheduled(msg, packet.getScheduledDeliveryTime());
- for (Binding binding : bindings)
+ if (packet.isRequiresResponse())
{
- queueNames.add(binding.getQueue().getName());
+ response = new NullResponseMessage();
}
}
+ catch (Exception e)
+ {
+ log.error("Failed to send scheduled message", e);
+ if (packet.isRequiresResponse())
+ {
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+ }
- return new SessionBindingQueryResponseMessage(exists, queueNames);
+ sendResponse(result, response);
}
- /**
- * Create a producer for the specified address
- *
- * @param address The address to produce too
- * @param windowSize - the producer window size to use for flow control. Specify -1 to disable flow control
- * completely The actual window size used may be less than the specified window size if it is overridden by
- * any producer-window-size specified on the queue
- */
- public SessionCreateProducerResponseMessage createProducer(final SimpleString address,
- final int windowSize,
- final int maxRate,
- final boolean autoGroupId) throws Exception
+ public void handleManagementMessage(final SessionSendManagementMessage packet)
{
- FlowController flowController = null;
+ DelayedResult result = channel.replicatePacket(packet);
- final int maxRateToUse = maxRate;
+ Packet response = null;
- if (address != null)
+ try
{
- flowController = windowSize == -1 ? null : postOffice.getFlowController(address);
- }
+ ServerMessage serverMessage = packet.getServerMessage();
- final int windowToUse = flowController == null ? -1 : windowSize;
+ if (serverMessage.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
+ {
+ boolean subscribe = (Boolean)serverMessage.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
- // Server window size is 0.75 client window size for producer flow control
- // (other way round to consumer flow control)
+ final SimpleString replyTo = (SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
- final int serverWindowSize = windowToUse == -1 ? -1 : (int)(windowToUse * 0.75);
+ if (subscribe)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("added notification listener " + this);
+ }
- ServerProducerImpl producer = new ServerProducerImpl(idGenerator.generateID(),
- this,
- address,
- flowController,
- serverWindowSize,
- channel);
+ managementService.addNotificationListener(this, null, replyTo);
+ }
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("removed notification listener " + this);
+ }
- producers.put(producer.getID(), producer);
+ managementService.removeNotificationListener(this);
+ }
+ }
+ else
+ {
+ managementService.handleMessage(serverMessage);
- // Get some initial credits to send to the producer - we try for
- // windowToUse
+ serverMessage.setDestination((SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
- int initialCredits = flowController == null ? -1 : flowController.getInitialCredits(windowToUse, producer);
+ send(serverMessage);
+ }
- SimpleString groupId = null;
- if (autoGroupId)
+ if (packet.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ }
+ catch (Exception e)
{
- groupId = simpleStringIdGenerator.generateID();
+ log.error("Failed to send management message", e);
+ if (packet.isRequiresResponse())
+ {
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
}
- return new SessionCreateProducerResponseMessage(initialCredits, maxRateToUse, groupId);
- }
- public void closeConsumer(final long consumerID) throws Exception
- {
- consumers.get(consumerID).close();
+ sendResponse(result, response);
}
- public void closeProducer(final long producerID) throws Exception
+ public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
{
- producers.get(producerID).close();
- }
+ ServerConsumer consumer = consumers.get(packet.getConsumerID());
- public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception
- {
- consumers.get(consumerID).receiveCredits(credits);
- }
+ if (consumer == null)
+ {
+ throw new IllegalStateException("Cannot handle replicated delivery, consumer is closed");
+ }
- public void sendProducerMessage(final long producerID, final ServerMessage message) throws Exception
- {
- producers.get(producerID).send(message);
+ try
+ {
+ consumer.deliverReplicated(packet.getMessageID());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle replicated delivery", e);
+ }
}
- public void sendScheduledProducerMessage(final long producerID,
- final ServerMessage message,
- final long scheduledDeliveryTime) throws Exception
- {
- producers.get(producerID).sendScheduled(message, scheduledDeliveryTime);
- }
-
public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
{
remotingConnection.removeFailureListener(this);
@@ -1088,53 +1879,6 @@
return serverLastReceivedCommandID;
}
- public void handleManagementMessage(final SessionSendManagementMessage message) throws Exception
- {
- ServerMessage serverMessage = message.getServerMessage();
-
- if (serverMessage.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
- {
- boolean subscribe = (Boolean)serverMessage.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
-
- final SimpleString replyTo = (SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
-
- if (subscribe)
- {
- if (log.isDebugEnabled())
- {
- log.debug("added notification listener " + this);
- }
-
- managementService.addNotificationListener(this, null, replyTo);
- }
- else
- {
- if (log.isDebugEnabled())
- {
- log.debug("removed notification listener " + this);
- }
-
- managementService.removeNotificationListener(this);
- }
- return;
- }
- managementService.handleMessage(message.getServerMessage());
-
- serverMessage.setDestination((SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
-
- send(serverMessage);
- }
-
- public void handleReplicatedDelivery(long consumerID, long messageID) throws Exception
- {
- ServerConsumer consumer = consumers.get(consumerID);
-
- if (consumer != null)
- {
- consumer.deliverReplicated(messageID);
- }
- }
-
// FailureListener implementation
// --------------------------------------------------------------------
@@ -1155,6 +1899,8 @@
}
close();
+
+ channel.close();
}
catch (Throwable t)
{
@@ -1193,6 +1939,80 @@
// Private
// ----------------------------------------------------------------------------
+ private void doRollback(final Transaction theTx) throws Exception
+ {
+ boolean wasStarted = started;
+
+ List<MessageReference> toCancel = new ArrayList<MessageReference>();
+
+ for (ServerConsumer consumer : consumers.values())
+ {
+ if (wasStarted)
+ {
+ consumer.setStarted(false);
+ }
+
+ toCancel.addAll(consumer.cancelRefs());
+ }
+
+ List<MessageReference> rolledBack = theTx.rollback(queueSettingsRepository);
+
+ rolledBack.addAll(toCancel);
+
+ if (wasStarted)
+ {
+ for (ServerConsumer consumer : consumers.values())
+ {
+ consumer.setStarted(true);
+ }
+ }
+
+ // Now cancel the refs back to the queue(s), we sort into queues and cancel back atomically to
+ // preserve order
+
+ Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+
+ for (MessageReference ref : rolledBack)
+ {
+ if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ {
+ Queue queue = ref.getQueue();
+
+ LinkedList<MessageReference> list = queueMap.get(queue);
+
+ if (list == null)
+ {
+ list = new LinkedList<MessageReference>();
+
+ queueMap.put(queue, list);
+ }
+
+ list.add(ref);
+ }
+ }
+
+ for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
+ {
+ LinkedList<MessageReference> refs = entry.getValue();
+
+ entry.getKey().addListFirst(refs);
+ }
+ }
+
+ private void rollback() throws Exception
+ {
+ if (tx == null)
+ {
+ // Might be null if XA
+
+ tx = new TransactionImpl(storageManager, postOffice);
+ }
+
+ doRollback(tx);
+
+ tx = new TransactionImpl(storageManager, postOffice);
+ }
+
private void doAck(final MessageReference ref) throws Exception
{
ServerMessage message = ref.getMessage();
@@ -1236,4 +2056,41 @@
throw e;
}
}
+
+ private void sendResponse(final DelayedResult result, final Packet response, final boolean closeChannel)
+ {
+ if (response != null)
+ {
+ if (result == null)
+ {
+ // Not clustered - just send now
+ channel.send(response);
+
+ if (closeChannel)
+ {
+ channel.close();
+ }
+ }
+ else
+ {
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ channel.send(response);
+
+ if (closeChannel)
+ {
+ channel.close();
+ }
+ }
+ });
+ }
+ }
+ }
+
+ public void sendResponse(final DelayedResult result, final Packet response)
+ {
+ this.sendResponse(result, response, false);
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -48,19 +48,10 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-import java.util.List;
-
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.DelayedResult;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
@@ -80,16 +71,12 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
/**
@@ -107,18 +94,13 @@
private final Channel channel;
- private final StorageManager storageManager;
-
public ServerSessionPacketHandler(final ServerSession session,
- final Channel channel,
- final StorageManager storageManager)
+ final Channel channel)
{
this.session = session;
this.channel = channel;
-
- this.storageManager = storageManager;
}
public long getID()
@@ -130,25 +112,6 @@
{
byte type = packet.getType();
- if (type == SESS_SEND || type == SESS_SCHEDULED_SEND)
- {
- SessionSendMessage send = (SessionSendMessage)packet;
-
- ServerMessage msg = send.getServerMessage();
-
- if (msg.getMessageID() == 0L)
- {
- // must generate message id here, so we know they are in sync on live and backup
- long id = storageManager.generateUniqueID();
-
- send.getServerMessage().setMessageID(id);
- }
- }
-
- Packet response = null;
-
- DelayedResult result = channel.replicatePacket(packet);
-
try
{
switch (type)
@@ -156,220 +119,184 @@
case SESS_CREATECONSUMER:
{
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
- response = session.createConsumer(request.getQueueName(),
- request.getFilterString(),
- request.getWindowSize(),
- request.getMaxRate(),
- request.isBrowseOnly());
+ session.handleCreateConsumer(request);
break;
}
case SESS_CREATEQUEUE:
{
SessionCreateQueueMessage request = (SessionCreateQueueMessage)packet;
- session.createQueue(request.getAddress(),
- request.getQueueName(),
- request.getFilterString(),
- request.isDurable(),
- request.isTemporary());
- response = new NullResponseMessage();
+ session.handleCreateQueue(request);
break;
}
case SESS_DELETE_QUEUE:
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
- session.deleteQueue(request.getQueueName());
- response = new NullResponseMessage();
+ session.handleDeleteQueue(request);
break;
}
case SESS_QUEUEQUERY:
{
SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
- response = session.executeQueueQuery(request.getQueueName());
+ session.handleExecuteQueueQuery(request);
break;
}
case SESS_BINDINGQUERY:
{
SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
- response = session.executeBindingQuery(request.getAddress());
+ session.handleExecuteBindingQuery(request);
break;
}
case SESS_CREATEPRODUCER:
{
SessionCreateProducerMessage request = (SessionCreateProducerMessage)packet;
- response = session.createProducer(request.getAddress(),
- request.getWindowSize(),
- request.getMaxRate(),
- request.isAutoGroupId());
+ session.handleCreateProducer(request);
break;
}
case SESS_ACKNOWLEDGE:
{
SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
- session.acknowledge(message.getConsumerID(), message.getMessageID());
- if (message.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
+ session.handleAcknowledge(message);
break;
}
case SESS_COMMIT:
{
- session.commit();
- response = new NullResponseMessage();
+ session.handleCommit(packet);
break;
}
case SESS_ROLLBACK:
{
- session.rollback();
- response = new NullResponseMessage();
+ session.handleRollback(packet);
break;
}
case SESS_XA_COMMIT:
{
SessionXACommitMessage message = (SessionXACommitMessage)packet;
- response = session.XACommit(message.isOnePhase(), message.getXid());
+ session.handleXACommit(message);
break;
}
case SESS_XA_END:
{
SessionXAEndMessage message = (SessionXAEndMessage)packet;
- response = session.XAEnd(message.getXid(), message.isFailed());
+ session.handleXAEnd(message);
break;
}
case SESS_XA_FORGET:
{
SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
- response = session.XAForget(message.getXid());
+ session.handleXAForget(message);
break;
}
case SESS_XA_JOIN:
{
SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
- response = session.XAJoin(message.getXid());
+ session.handleXAJoin(message);
break;
}
case SESS_XA_RESUME:
{
SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
- response = session.XAResume(message.getXid());
+ session.handleXAResume(message);
break;
}
case SESS_XA_ROLLBACK:
{
SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
- response = session.XARollback(message.getXid());
+ session.handleXARollback(message);
break;
}
case SESS_XA_START:
{
SessionXAStartMessage message = (SessionXAStartMessage)packet;
- response = session.XAStart(message.getXid());
+ session.handleXAStart(message);
break;
}
case SESS_XA_SUSPEND:
{
- response = session.XASuspend();
+ session.handleXASuspend(packet);
break;
}
case SESS_XA_PREPARE:
{
SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
- response = session.XAPrepare(message.getXid());
+ session.handleXAPrepare(message);
break;
}
case SESS_XA_INDOUBT_XIDS:
{
- List<Xid> xids = session.getInDoubtXids();
- response = new SessionXAGetInDoubtXidsResponseMessage(xids);
+ session.handleGetInDoubtXids(packet);
break;
}
case SESS_XA_GET_TIMEOUT:
{
- response = new SessionXAGetTimeoutResponseMessage(session.getXATimeout());
+ session.handleGetXATimeout(packet);
break;
}
case SESS_XA_SET_TIMEOUT:
{
SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
- response = new SessionXASetTimeoutResponseMessage(session.setXATimeout(message.getTimeoutSeconds()));
+ session.handleSetXATimeout(message);
break;
}
case SESS_ADD_DESTINATION:
{
SessionAddDestinationMessage message = (SessionAddDestinationMessage)packet;
- session.addDestination(message.getAddress(), message.isDurable(), message.isTemporary());
- response = new NullResponseMessage();
+ session.handleAddDestination(message);
break;
}
case SESS_REMOVE_DESTINATION:
{
SessionRemoveDestinationMessage message = (SessionRemoveDestinationMessage)packet;
- session.removeDestination(message.getAddress(), message.isDurable());
- response = new NullResponseMessage();
+ session.handleRemoveDestination(message);
break;
}
case SESS_START:
{
- session.setStarted(true);
+ session.handleStart(packet);
break;
}
case SESS_FAILOVER_COMPLETE:
{
- session.failedOver();
+ session.handleFailedOver(packet);
break;
}
case SESS_STOP:
{
- session.setStarted(false);
- response = new NullResponseMessage();
+ session.handleStop(packet);
break;
}
case SESS_CLOSE:
{
- session.close();
- response = new NullResponseMessage();
+ session.handleClose(packet);
break;
}
case SESS_CONSUMER_CLOSE:
{
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
- session.closeConsumer(message.getConsumerID());
- response = new NullResponseMessage();
+ session.handleCloseConsumer(message);
break;
}
case SESS_PRODUCER_CLOSE:
{
SessionProducerCloseMessage message = (SessionProducerCloseMessage)packet;
- session.closeProducer(message.getProducerID());
- response = new NullResponseMessage();
+ session.handleCloseProducer(message);
break;
}
case SESS_FLOWTOKEN:
{
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
- session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
+ session.handleReceiveConsumerCredits(message);
break;
}
case SESS_SEND:
{
SessionSendMessage message = (SessionSendMessage)packet;
- session.sendProducerMessage(message.getProducerID(), message.getServerMessage());
- if (message.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
+ session.handleSendProducerMessage(message);
break;
}
case SESS_SCHEDULED_SEND:
{
SessionScheduledSendMessage message = (SessionScheduledSendMessage)packet;
- session.sendScheduledProducerMessage(message.getProducerID(),
- message.getServerMessage(),
- message.getScheduledDeliveryTime());
- if (message.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
+ session.handleSendScheduledProducerMessage(message);
break;
}
case SESS_MANAGEMENT_SEND:
@@ -381,67 +308,16 @@
case SESS_REPLICATE_DELIVERY:
{
SessionReplicateDeliveryMessage message = (SessionReplicateDeliveryMessage)packet;
- session.handleReplicatedDelivery(message.getConsumerID(), message.getMessageID());
+ session.handleReplicatedDelivery(message);
break;
}
- default:
- {
- response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
- "Unsupported packet " + type));
- }
}
}
catch (Throwable t)
{
- MessagingException me;
-
log.error("Caught unexpected exception", t);
-
- if (t instanceof MessagingException)
- {
- me = (MessagingException)t;
- }
- else
- {
- me = new MessagingException(MessagingException.INTERNAL_ERROR);
- }
-
- response = new MessagingExceptionMessage(me);
}
- if (response != null)
- {
- final boolean closeChannel = type == SESS_CLOSE;
-
- if (result == null)
- {
- // Not clustered - just send now
- channel.send(response);
-
- if (closeChannel)
- {
- channel.close();
- }
- }
- else
- {
- final Packet theResponse = response;
-
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- channel.send(theResponse);
-
- if (closeChannel)
- {
- channel.close();
- }
- }
- });
- }
- }
-
channel.replicateComplete();
}
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -3760,7 +3760,7 @@
{
conn.close();
}
- }
+ }
}
// http://jira.jboss.org/jira/browse/JBMESSAGING-1294 - commented out until 2.0 beta
@@ -4115,14 +4115,14 @@
{
TextMessage tm = (TextMessage) m;
- log.trace("Got message:" + tm.getText() + " count is " + count);
+ log.info("Got message:" + tm.getText() + " count is " + count);
messageOrder += tm.getText() + " ";
if (count == 0)
{
if (!("a".equals(tm.getText())))
{
- log.trace("Should be a but was " + tm.getText());
+ log.info("Should be a but was " + tm.getText());
failed = true;
latch.countDown();
}
@@ -4130,14 +4130,14 @@
{
sess.rollback();
messageOrder += "RB ";
- log.trace("rollback() called");
+ log.info("rollback() called");
}
else
{
- log.trace("Calling recover");
+ log.info("Calling recover");
messageOrder += "RC ";
sess.recover();
- log.trace("recover() called");
+ log.info("recover() called");
}
}
@@ -4145,7 +4145,7 @@
{
if (!("a".equals(tm.getText())))
{
- log.trace("Should be a but was " + tm.getText());
+ log.info("Should be a but was " + tm.getText());
failed = true;
latch.countDown();
}
@@ -4159,7 +4159,7 @@
{
if (!("b".equals(tm.getText())))
{
- log.trace("Should be b but was " + tm.getText());
+ log.info("Should be b but was " + tm.getText());
failed = true;
latch.countDown();
}
@@ -4168,7 +4168,7 @@
{
if (!("c".equals(tm.getText())))
{
- log.trace("Should be c but was " + tm.getText());
+ log.info("Should be c but was " + tm.getText());
failed = true;
latch.countDown();
}
@@ -4178,7 +4178,7 @@
}
else
{
- log.trace("Acknowledging session");
+ log.info("Acknowledging session");
tm.acknowledge();
}
latch.countDown();
@@ -4187,7 +4187,7 @@
}
catch (JMSException e)
{
- log.trace("Caught JMSException", e);
+ log.info("Caught JMSException", e);
failed = true;
latch.countDown();
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/SecurityTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/SecurityTest.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/SecurityTest.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -98,8 +98,10 @@
}
finally
{
- if (conn1 != null) conn1.close();
- if (conn2 != null) conn2.close();
+ if (conn1 != null)
+ conn1.close();
+ if (conn2 != null)
+ conn2.close();
}
}
@@ -116,7 +118,8 @@
}
finally
{
- if (conn1 != null) conn1.close();
+ if (conn1 != null)
+ conn1.close();
}
}
@@ -134,11 +137,12 @@
}
catch (JMSSecurityException e)
{
- //Expected
+ // Expected
}
finally
{
- if (conn1 != null) conn1.close();
+ if (conn1 != null)
+ conn1.close();
}
}
@@ -156,11 +160,12 @@
}
catch (JMSSecurityException e)
{
- //Expected
+ // Expected
}
finally
{
- if (conn1 != null) conn1.close();
+ if (conn1 != null)
+ conn1.close();
}
}
@@ -169,7 +174,7 @@
/**
* user/pwd with preconfigured clientID, should return preconf
*/
- public void testPreConfClientID() throws Exception
+ public void testPreConfClientID() throws Exception
{
Connection conn = null;
try
@@ -177,7 +182,7 @@
ArrayList<String> bindings = new ArrayList<String>();
bindings.add("preConfcf");
deployConnectionFactory("dilbert-id", "preConfcf", bindings);
- ConnectionFactory cf = (ConnectionFactory) getInitialContext().lookup("preConfcf");
+ ConnectionFactory cf = (ConnectionFactory)getInitialContext().lookup("preConfcf");
conn = cf.createConnection("dilbert", "dogbert");
String clientID = conn.getClientID();
assertEquals("Invalid ClientID", "dilbert-id", clientID);
@@ -205,7 +210,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -220,7 +226,7 @@
ArrayList<String> bindings = new ArrayList<String>();
bindings.add("preConfcf");
deployConnectionFactory("dilbert-id", "preConfcf", bindings);
- ConnectionFactory cf = (ConnectionFactory) getInitialContext().lookup("preConfcf");
+ ConnectionFactory cf = (ConnectionFactory)getInitialContext().lookup("preConfcf");
conn = cf.createConnection("dilbert", "dogbert");
conn.setClientID("myID");
fail();
@@ -252,11 +258,12 @@
}
catch (IllegalStateException e)
{
- //Expected
+ // Expected
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -343,7 +350,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -358,7 +366,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -374,11 +383,12 @@
}
catch (JMSSecurityException e)
{
- //Expected
+ // Expected
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -392,7 +402,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -406,7 +417,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -420,7 +432,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -434,7 +447,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -449,8 +463,8 @@
ArrayList<String> bindings = new ArrayList<String>();
bindings.add("preConfcf");
deployConnectionFactory("dilbert-id", "preConfcf", bindings);
- ConnectionFactory cf = (ConnectionFactory) getInitialContext().lookup("preConfcf");
- //setSecurityConfig(oldDefaultConfig);
+ ConnectionFactory cf = (ConnectionFactory)getInitialContext().lookup("preConfcf");
+ // setSecurityConfig(oldDefaultConfig);
conn = cf.createConnection("dilbert", "dogbert");
assertTrue(this.canCreateDurableSub(conn, topic1, "sub2"));
}
@@ -476,11 +490,11 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
-
public void testDefaultSecurityValid() throws Exception
{
Connection conn = null;
@@ -494,7 +508,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -511,7 +526,8 @@
}
finally
{
- if (conn != null) conn.close();
+ if (conn != null)
+ conn.close();
}
}
@@ -532,7 +548,7 @@
assertTrue(canReadDestination(conn, queue2));
assertTrue(canWriteDestination(conn, queue2));
- HashSet<Role> newSecurityConfig = new HashSet<Role>();
+ HashSet<Role> newSecurityConfig = new HashSet<Role>();
newSecurityConfig.add(new Role("someotherrole", true, true, false));
setSecurityConfig(newSecurityConfig);
@@ -584,11 +600,10 @@
assertFalse(canReadDestination(conn, queue2));
assertFalse(canWriteDestination(conn, queue2, false));
-
newSecurityConfig = new HashSet<Role>();
newSecurityConfig.add(new Role("def", true, false, false));
- configureSecurityForDestination("Queue2", true, newSecurityConfig);
+ configureSecurityForDestination("Queue2", true, newSecurityConfig);
assertTrue(canReadDestination(conn, queue2));
assertFalse(canWriteDestination(conn, queue2, false));
@@ -596,16 +611,16 @@
newSecurityConfig = new HashSet<Role>();
newSecurityConfig.add(new Role("def", true, true, false));
- configureSecurityForDestination("Queue2", true, newSecurityConfig);
+ configureSecurityForDestination("Queue2", true, newSecurityConfig);
assertTrue(canReadDestination(conn, queue2));
assertTrue(canWriteDestination(conn, queue2, false));
- //Now set to null
+ // Now set to null
ServerManagement.configureSecurityForDestination("Queue2", null);
- //Should fall back to the default config
+ // Should fall back to the default config
HashSet<Role> lockedConf = new HashSet<Role>();
lockedConf.add(new Role("alien", true, true, true));
@@ -643,18 +658,16 @@
assertTrue(canReadDestination(conn, topic2));
assertTrue(canWriteDestination(conn, topic2));
-
HashSet<Role> newSecurityConfig = new HashSet<Role>();
- newSecurityConfig.add(new Role("someotherrole", true, true, false)) ;
+ newSecurityConfig.add(new Role("someotherrole", true, true, false));
configureSecurityForDestination("Topic2", false, newSecurityConfig);
assertFalse(canReadDestination(conn, topic2));
assertFalse(canWriteDestination(conn, topic2, false));
-
newSecurityConfig = new HashSet<Role>();
- newSecurityConfig.add(new Role("def", true, false, false)) ;
+ newSecurityConfig.add(new Role("def", true, false, false));
configureSecurityForDestination("Topic2", false, newSecurityConfig);
@@ -662,20 +675,20 @@
assertFalse(canWriteDestination(conn, topic2, false));
newSecurityConfig = new HashSet<Role>();
- newSecurityConfig.add(new Role("def", true, true, false)) ;
+ newSecurityConfig.add(new Role("def", true, true, false));
- configureSecurityForDestination("Topic2", false, newSecurityConfig);
+ configureSecurityForDestination("Topic2", false, newSecurityConfig);
assertTrue(canReadDestination(conn, topic2));
assertTrue(canWriteDestination(conn, topic2, false));
- //Now set to null
+ // Now set to null
configureSecurityForDestination("Topic2", false, null);
- //Should fall back to the default config
+ // Should fall back to the default config
HashSet<Role> lockedConf = new HashSet<Role>();
- lockedConf.add(new Role("alien", true, true, true)) ;
+ lockedConf.add(new Role("alien", true, true, true));
Set<Role> orig = getSecurityConfig();
setSecurityConfig(lockedConf);
@@ -708,7 +721,7 @@
// configure the queue to allow "def" to read
HashSet<Role> config = new HashSet<Role>();
config.add(new Role("def", true, false, false));
- configureSecurityForDestination("Accounting", true, config );
+ configureSecurityForDestination("Accounting", true, config);
// configure the topic to prevent "def" from reading
HashSet<Role> config2 = new HashSet<Role>();
@@ -751,7 +764,7 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
oldDefaultConfig = getSecurityConfig();
HashSet<Role> roles = new HashSet<Role>();
@@ -761,7 +774,6 @@
roles.add(new Role("john", true, false, false));
configureSecurityForDestination("Queue1", true, roles);
-
HashSet<Role> roles2 = new HashSet<Role>();
roles2.add(new Role("guest", true, true, true));
roles2.add(new Role("publisher", true, true, false));
@@ -777,15 +789,15 @@
protected void tearDown() throws Exception
{
- super.tearDown();
-
+ super.tearDown();
+
setSecurityConfig(oldDefaultConfig);
- configureSecurityForDestination("Queue1", true, null);
- configureSecurityForDestination("Queue2", true, null);
+ configureSecurityForDestination("Queue1", true, null);
+ configureSecurityForDestination("Queue2", true, null);
configureSecurityForDestination("Topic1", false, null);
configureSecurityForDestination("Topic2", false, null);
}
-
+
// Private -------------------------------------------------------
private boolean canReadDestination(Connection conn, Destination dest) throws Exception
@@ -808,6 +820,7 @@
}
}
+
private boolean canWriteDestination(Connection conn, Destination dest) throws Exception
{
boolean transacted = canWriteDestination(conn, dest, true);
@@ -901,15 +914,14 @@
private void testSecurityForTemporaryDestination(boolean isQueue) throws Exception
{
- Destination dest = isQueue ? (Destination) queue1 : topic1;
+ Destination dest = isQueue ? (Destination)queue1 : topic1;
Connection conn = cf.createConnection("guest", "guest");
try
{
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination temporaryDestination = isQueue
- ? (Destination) session.createTemporaryQueue()
- : session.createTemporaryTopic();
+ Destination temporaryDestination = isQueue ? (Destination)session.createTemporaryQueue()
+ : session.createTemporaryTopic();
Message message = session.createMessage();
message.setJMSReplyTo(temporaryDestination);
MessageProducer producer = session.createProducer(dest);
@@ -947,9 +959,6 @@
}
}
-
// Inner classes -------------------------------------------------
}
-
-
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -209,29 +209,29 @@
}
}, NUM_THREADS);
}
-
-// public void testM() throws Exception
-// {
-// runTestMultipleThreads(new RunnableT()
-// {
-// public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-// {
-// doTestM(sf, threadNum);
-// }
-// }, NUM_THREADS);
-// }
-
-// public void testN() throws Exception
-// {
-// runTestMultipleThreads(new RunnableT()
-// {
-// public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
-// {
-// doTestN(sf, threadNum);
-// }
-// }, NUM_THREADS);
-// }
+ // public void testM() throws Exception
+ // {
+ // runTestMultipleThreads(new RunnableT()
+ // {
+ // public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ // {
+ // doTestM(sf, threadNum);
+ // }
+ // }, NUM_THREADS);
+ // }
+
+ public void testN() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestN(sf, threadNum);
+ }
+ }, NUM_THREADS);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -370,7 +370,7 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
assertTrue(ok);
-
+
if (handler.failure != null)
{
throw new Exception("Handler failed: " + handler.failure);
@@ -457,7 +457,7 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
assertTrue(ok);
-
+
if (handler.failure != null)
{
throw new Exception("Handler failed: " + handler.failure);
@@ -562,7 +562,7 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
assertTrue(ok);
-
+
if (handler.failure != null)
{
throw new Exception("Handler failed: " + handler.failure);
@@ -1001,123 +1001,123 @@
s.close();
}
-
- //Browsers
- //FIXME - this test won't work until we use a proper iterator for browsing a queue.
- //Making a copy of the queue for a browser consumer doesn't work well with replication since
- //When replicating the create consumer (browser) to the backup, when executed on the backup the
- //backup may have different messages in its queue since been added on different threads.
- //So when replicating deliveries they may not be found.
- //https://jira.jboss.org/jira/browse/JBMESSAGING-1433
-// protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
-// {
-// long start = System.currentTimeMillis();
-//
-// ClientSession sessSend = sf.createSession(false, true, true, false);
-//
-// ClientSession sessConsume = sf.createSession(false, true, true, false);
-//
-// sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
-//
-// final int numMessages = 100;
-//
-// ClientProducer producer = sessSend.createProducer(ADDRESS);
-//
-// sendMessages(sessSend, producer, numMessages, threadNum);
-//
-// ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
-// null, false, true);
-//
-// Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
-//
-// assertNotNull(msg);
-//
-// int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-// int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-//
-// Integer c = consumerCounts.get(tn);
-// if (c == null)
-// {
-// c = new Integer(cnt);
-// }
-//
-// if (cnt != c.intValue())
-// {
-// throw new Exception("Invalid count, expected " + c + " got " + cnt);
-// }
-//
-// c++;
-//
-// //Wrap
-// if (c == numMessages)
-// {
-// c = 0;
-// }
-//
-// consumerCounts.put(tn, c);
-//
-// msg.acknowledge();
-// }
-//
-// sessConsume.close();
-//
-// sessConsume = sf.createSession(false, true, true, false);
-//
-// browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
-// null, false, true);
-//
-// //Messages should still be there
-//
-// consumerCounts.clear();
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
-//
-// assertNotNull(msg);
-//
-// int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-// int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-//
-// Integer c = consumerCounts.get(tn);
-// if (c == null)
-// {
-// c = new Integer(cnt);
-// }
-//
-// if (cnt != c.intValue())
-// {
-// throw new Exception("Invalid count, expected " + c + " got " + cnt);
-// }
-//
-// c++;
-//
-// //Wrap
-// if (c == numMessages)
-// {
-// c = 0;
-// }
-//
-// consumerCounts.put(tn, c);
-//
-// msg.acknowledge();
-// }
-//
-// sessConsume.close();
-//
-// sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
-//
-// sessSend.close();
-//
-// long end = System.currentTimeMillis();
-//
-// log.info("duration " + (end - start));
-// }
-
+
+ // Browsers
+ // FIXME - this test won't work until we use a proper iterator for browsing a queue.
+ // Making a copy of the queue for a browser consumer doesn't work well with replication since
+ // When replicating the create consumer (browser) to the backup, when executed on the backup the
+ // backup may have different messages in its queue since been added on different threads.
+ // So when replicating deliveries they may not be found.
+ // https://jira.jboss.org/jira/browse/JBMESSAGING-1433
+ // protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
+ // {
+ // long start = System.currentTimeMillis();
+ //
+ // ClientSession sessSend = sf.createSession(false, true, true, false);
+ //
+ // ClientSession sessConsume = sf.createSession(false, true, true, false);
+ //
+ // sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
+ //
+ // final int numMessages = 100;
+ //
+ // ClientProducer producer = sessSend.createProducer(ADDRESS);
+ //
+ // sendMessages(sessSend, producer, numMessages, threadNum);
+ //
+ // ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
+ // null, false, true);
+ //
+ // Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
+ //
+ // for (int i = 0; i < numMessages; i++)
+ // {
+ // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
+ //
+ // assertNotNull(msg);
+ //
+ // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+ // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+ //
+ // Integer c = consumerCounts.get(tn);
+ // if (c == null)
+ // {
+ // c = new Integer(cnt);
+ // }
+ //
+ // if (cnt != c.intValue())
+ // {
+ // throw new Exception("Invalid count, expected " + c + " got " + cnt);
+ // }
+ //
+ // c++;
+ //
+ // //Wrap
+ // if (c == numMessages)
+ // {
+ // c = 0;
+ // }
+ //
+ // consumerCounts.put(tn, c);
+ //
+ // msg.acknowledge();
+ // }
+ //
+ // sessConsume.close();
+ //
+ // sessConsume = sf.createSession(false, true, true, false);
+ //
+ // browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
+ // null, false, true);
+ //
+ // //Messages should still be there
+ //
+ // consumerCounts.clear();
+ //
+ // for (int i = 0; i < numMessages; i++)
+ // {
+ // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
+ //
+ // assertNotNull(msg);
+ //
+ // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+ // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+ //
+ // Integer c = consumerCounts.get(tn);
+ // if (c == null)
+ // {
+ // c = new Integer(cnt);
+ // }
+ //
+ // if (cnt != c.intValue())
+ // {
+ // throw new Exception("Invalid count, expected " + c + " got " + cnt);
+ // }
+ //
+ // c++;
+ //
+ // //Wrap
+ // if (c == numMessages)
+ // {
+ // c = 0;
+ // }
+ //
+ // consumerCounts.put(tn, c);
+ //
+ // msg.acknowledge();
+ // }
+ //
+ // sessConsume.close();
+ //
+ // sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
+ //
+ // sessSend.close();
+ //
+ // long end = System.currentTimeMillis();
+ //
+ // log.info("duration " + (end - start));
+ // }
+
protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
{
ClientSession sessCreate = sf.createSession(false, true, true, false);
@@ -1125,11 +1125,11 @@
sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
ClientSession sess = sf.createSession(false, true, true, false);
-
+
sess.stop();
sess.start();
-
+
sess.stop();
ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
@@ -1144,7 +1144,7 @@
message.getBody().flip();
producer.send(message);
-
+
sess.start();
ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
@@ -1152,9 +1152,9 @@
assertNotNull(message2);
message2.acknowledge();
-
+
sess.stop();
-
+
sess.start();
sess.close();
@@ -1166,7 +1166,7 @@
protected int getNumIterations()
{
- return 1000;
+ return 10;
}
protected void setUp() throws Exception
@@ -1182,11 +1182,11 @@
{
log.info("************* Ending test " + this.getName());
- if(liveService != null && liveService.isStarted())
+ if (liveService != null && liveService.isStarted())
{
liveService.stop();
}
- if(backupService != null && backupService.isStarted())
+ if (backupService != null && backupService.isStarted())
{
backupService.stop();
}
@@ -1351,7 +1351,7 @@
private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages) throws Exception
{
- //We make sure the messages arrive in the order they were sent from a particular producer
+ // We make sure the messages arrive in the order they were sent from a particular producer
Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
for (int i = 0; i < numMessages; i++)
@@ -1383,15 +1383,15 @@
{
throw new Exception("Invalid count, expected " + c + " got " + cnt);
}
-
+
c++;
-
- //Wrap
+
+ // Wrap
if (c == numMessages)
{
c = 0;
}
-
+
consumerCounts.put(tn, c);
msg.acknowledge();
@@ -1469,15 +1469,15 @@
volatile String failure;
final int tn;
-
+
final int numMessages;
-
+
volatile boolean done;
MyHandler(final int threadNum, final int numMessages)
{
this.tn = threadNum;
-
+
this.numMessages = numMessages;
}
@@ -1491,7 +1491,7 @@
{
log.error("Failed to process", me);
}
-
+
if (done)
{
return;
@@ -1505,9 +1505,9 @@
{
c = new Integer(cnt);
}
-
- //log.info("got message " + threadNum + "-" + cnt);
+ // log.info("got message " + threadNum + "-" + cnt);
+
if (cnt != c.intValue())
{
failure = "Invalid count, expected " + c + " got " + cnt;
@@ -1515,7 +1515,7 @@
latch.countDown();
}
-
+
if (tn == threadNum && c == numMessages - 1)
{
done = true;
@@ -1523,15 +1523,14 @@
}
c++;
- //Wrap around at 100
+ // Wrap around at 100
if (c == 100)
{
c = 0;
}
-
+
counts.put(threadNum, c);
-
}
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-10-21 09:53:33 UTC (rev 5161)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-10-21 10:39:09 UTC (rev 5162)
@@ -76,7 +76,7 @@
return addresses.contains(address);
}
- public Binding getBinding(SimpleString queueName) throws Exception
+ public Binding getBinding(SimpleString queueName)
{
return bindings.get(queueName);
}
More information about the jboss-cvs-commits
mailing list