[jboss-cvs] JBoss Messaging SVN: r5060 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/nullpm and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 2 09:43:49 EDT 2008
Author: timfox
Date: 2008-10-02 09:43:48 -0400 (Thu, 02 Oct 2008)
New Revision: 5060
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Pong.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java
trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
Log:
More failover and session replication
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -13,6 +13,7 @@
package org.jboss.messaging.core.client.impl;
import java.util.LinkedList;
+import java.util.Queue;
import java.util.concurrent.Executor;
import org.jboss.messaging.core.client.ClientMessage;
@@ -55,11 +56,9 @@
private final Executor sessionExecutor;
private final int clientWindowSize;
+
+ private final Queue<ClientMessage> buffer = new LinkedList<ClientMessage>();
- // private final PriorityLinkedList<ClientMessage> buffer = new PriorityLinkedListImpl<ClientMessage>(10);
-
- private final LinkedList<ClientMessage> buffer = new LinkedList<ClientMessage>();
-
private final boolean direct;
private final Runner runner = new Runner();
@@ -75,6 +74,10 @@
private volatile int creditsToSend;
private volatile boolean cleared;
+
+ private volatile long lastMessageIDProcessed = -1;
+
+ private volatile long ignoreMessageID = -1;
// Constructors
// ---------------------------------------------------------------------------------
@@ -148,7 +151,9 @@
if (!closed && !buffer.isEmpty())
{
- ClientMessage m = buffer.removeFirst();
+ ClientMessage m = buffer.poll();
+
+ lastMessageIDProcessed = m.getMessageID();
boolean expired = m.isExpired();
@@ -257,7 +262,7 @@
{
return id;
}
-
+
public void handleMessage(final ClientMessage message) throws Exception
{
if (closed)
@@ -273,6 +278,12 @@
return;
}
+ if (message.getMessageID() == ignoreMessageID)
+ {
+ //Ignore this - this is one resent after failover since was processing in onMessage
+ return;
+ }
+
message.onReceipt(session, id);
if (handler != null)
@@ -300,7 +311,7 @@
synchronized (this)
{
- buffer.addLast(message);
+ buffer.add(message);
}
queueExecutor();
@@ -311,7 +322,7 @@
// Add it to the buffer
synchronized (this)
{
- buffer.addLast(message);
+ buffer.add(message);
notify();
}
@@ -344,18 +355,27 @@
{
return creditsToSend;
}
+
+ public void failover()
+ {
+ // We ignore any message that might be resent on redelivery after failover due to it being
+ // in onMessage and processed not having been called yet
+ ignoreMessageID = lastMessageIDProcessed;
+
+ buffer.clear();
+ }
// Public
// ---------------------------------------------------------------------------------------
// Package protected
- // ----------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------
// Protected
- // ------------------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------
// Private
- // --------------------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------
private void queueExecutor()
{
@@ -409,7 +429,7 @@
throw new MessagingException(MessagingException.OBJECT_CLOSED, "Consumer is closed");
}
}
-
+
private void callOnMessage()
{
try
@@ -427,7 +447,7 @@
synchronized (this)
{
- message = buffer.removeFirst();
+ message = buffer.poll();
}
if (message != null)
@@ -439,6 +459,8 @@
if (!expired)
{
onMessageThread = Thread.currentThread();
+
+ lastMessageIDProcessed = message.getMessageID();
handler.onMessage(message);
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -49,4 +49,6 @@
int getCreditsToSend();
void cleanUp() throws Exception;
+
+ void failover();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -1,33 +1,94 @@
/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.messaging.core.client.impl;
-import org.jboss.messaging.core.client.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientBrowser;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ConnectionRegistry;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.ResponseNotifier;
import org.jboss.messaging.core.remoting.impl.ConnectionRegistryImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.*;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.IDGenerator;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleIDGenerator;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TokenBucketLimiterImpl;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
/*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -101,11 +162,11 @@
private final IDGenerator idGenerator = new SimpleIDGenerator(0);
- // Constructors ----------------------------------------------------------------------------
+ // Constructors ----------------------------------------------------------------------------
public ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
final String name,
- final boolean xa,
+ final boolean xa,
final boolean cacheProducers,
final boolean autoCommitSends,
final boolean autoCommitAcks,
@@ -410,27 +471,26 @@
{
checkClosed();
- //We need to make sure we don't get any inflight messages
+ // We need to make sure we don't get any inflight messages
for (ClientConsumerInternal consumer : consumers.values())
{
- consumer.clear();
+ consumer.clear();
}
-
- channel.sendBlocking(new PacketImpl(PacketImpl.SESS_ROLLBACK),
- new ResponseNotifier()
+
+ channel.sendBlocking(new PacketImpl(PacketImpl.SESS_ROLLBACK), new ResponseNotifier()
{
public void onResponseReceived()
{
- //This needs to be called on before the blocking thread is awoken
- //hence the ResponseNotifier
+ // This needs to be called on before the blocking thread is awoken
+ // hence the ResponseNotifier
for (ClientConsumerInternal consumer : consumers.values())
{
consumer.resume();
}
}
- });
+ });
}
-
+
public synchronized void close() throws MessagingException
{
if (closed)
@@ -444,9 +504,7 @@
{
closeChildren();
- Channel channel1 = remotingConnection.getChannel(1, false, -1, true);
-
- channel1.sendBlocking(new CloseSessionMessage(name));
+ channel.sendBlocking(new SessionCloseMessage());
}
catch (Throwable ignore)
{
@@ -547,7 +605,7 @@
{
return name;
}
-
+
public void processed(final long consumerID, final long messageID) throws MessagingException
{
checkClosed();
@@ -561,7 +619,7 @@
else
{
channel.send(message);
- }
+ }
}
public void addConsumer(final ClientConsumerInternal consumer)
@@ -661,14 +719,18 @@
public void handleFailover(final RemotingConnection backupConnection)
{
// We lock the channel to prevent any packets to be added to the resend
- // cache
- // during the failover process
+ // cache during the failover process
channel.lock();
try
- {
+ {
channel.transferConnection(backupConnection);
-
+
+ for (ClientConsumerInternal consumer: consumers.values())
+ {
+ consumer.failover();
+ }
+
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = backupConnection;
@@ -874,29 +936,33 @@
{
checkXA();
- //We need to make sure we don't get any inflight messages
+ // We need to make sure we don't get any inflight messages
for (ClientConsumerInternal consumer : consumers.values())
{
- consumer.clear();
+ consumer.clear();
}
-
+
SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
try
{
SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet,
- new ResponseNotifier()
- {
- public void onResponseReceived()
- {
- //This needs to be called on before the blocking thread is awoken
- //hence the ResponseNotifier
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.resume();
- }
- }
- });
+ new ResponseNotifier()
+ {
+ public void onResponseReceived()
+ {
+ // This needs to be
+ // called on before
+ // the blocking
+ // thread is awoken
+ // hence the
+ // ResponseNotifier
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.resume();
+ }
+ }
+ });
if (response.isError())
{
@@ -969,7 +1035,7 @@
}
// Public
- //----------------------------------------------------------------------------
+ // ----------------------------------------------------------------------------
public void setForceNotSameRM(final boolean force)
{
@@ -987,13 +1053,13 @@
}
// Protected
- //----------------------------------------------------------------------------
+ // ----------------------------------------------------------------------------
// Package Private
- //----------------------------------------------------------------------------
+ // ----------------------------------------------------------------------------
// Private
- //----------------------------------------------------------------------------
+ // ----------------------------------------------------------------------------
private void checkXA() throws XAException
{
@@ -1067,7 +1133,7 @@
producerCache.clear();
}
- channel.close();
+ channel.close(false);
connectionRegistry.returnConnection(remotingConnection.getID());
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -22,6 +22,12 @@
package org.jboss.messaging.core.persistence.impl.nullpm;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -32,13 +38,10 @@
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.util.IDGenerator;
import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.TimeAndCounterIDGenerator;
-import javax.transaction.xa.Xid;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
*
* A NullStorageManager
@@ -49,10 +52,13 @@
*/
public class NullStorageManager implements StorageManager
{
- private final AtomicLong messageIDSequence = new AtomicLong(0);
-
- private final AtomicLong transactionIDSequence = new AtomicLong(0);
-
+ private static final Logger log = Logger.getLogger(NullStorageManager.class);
+
+
+ //FIXME - these need to use id generators from 1.4 null storage manager since is not unique across
+ //cluster
+ private final IDGenerator idGenerator = new TimeAndCounterIDGenerator();
+
private volatile boolean started;
public void addBinding(Binding binding) throws Exception
@@ -136,22 +142,9 @@
public long generateUniqueID()
{
//FIXME - this needs to use Howard's ID generator from JBM 1.4
- return messageIDSequence.getAndIncrement();
+ return idGenerator.generateID();
}
- public synchronized void setMaxID(final long id)
- {
- if (1 + id > messageIDSequence.get())
- {
- messageIDSequence.set(id + 1);
- }
- }
-
- public long generateTransactionID()
- {
- return transactionIDSequence.getAndIncrement();
- }
-
public synchronized void start() throws Exception
{
if (started)
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -83,7 +83,7 @@
Set<SimpleString> listAllDestinations();
- void setBackup(boolean backup);
+ void activate();
PagingManager getPagingManager();
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -87,7 +87,8 @@
final ManagementService managementService,
final boolean checkAllowable,
final ResourceManager resourceManager,
- final boolean enableWildCardRouting)
+ final boolean enableWildCardRouting,
+ final boolean backup)
{
this.storageManager = storageManager;
@@ -109,6 +110,8 @@
{
addressManager = new SimpleAddressManager();
}
+
+ this.backup = backup;
}
// MessagingComponent implementation ---------------------------------------
@@ -195,7 +198,8 @@
public Binding addBinding(final SimpleString address,
final SimpleString queueName,
final Filter filter,
- final boolean durable, boolean temporary) throws Exception
+ final boolean durable,
+ boolean temporary) throws Exception
{
Binding binding = createBinding(address, queueName, filter, durable, temporary);
@@ -289,29 +293,6 @@
}
- // public void routeFromCluster(final String address, final Message message)
- // throws Exception
- // {
- // List<Binding> bindings = mappings.get(address);
- //
- // for (Binding binding: bindings)
- // {
- // Queue queue = binding.getQueue();
- //
- // if (binding.getNodeID() == nodeID)
- // {
- // if (queue.getFilter() == null || queue.getFilter().match(message))
- // {
- // MessageReference ref = message.createReference(queue);
- //
- // //We never route durably from other nodes - so no need to persist
- //
- // queue.addLast(ref);
- // }
- // }
- // }
- // }
-
public PagingManager getPagingManager()
{
return pagingManager;
@@ -327,19 +308,16 @@
return flowControllers.get(address);
}
- public void setBackup(final boolean backup)
+ public void activate()
{
- if (this.backup != backup)
- {
- this.backup = backup;
+ this.backup = false;
+
+ Map<SimpleString, Binding> nameMap = addressManager.getBindings();
- Map<SimpleString, Binding> nameMap = addressManager.getBindings();
-
- for (Binding binding : nameMap.values())
- {
- binding.getQueue().setBackup(backup);
- }
- }
+ for (Binding binding : nameMap.values())
+ {
+ binding.getQueue().activate();
+ }
}
// Private -----------------------------------------------------------------
@@ -347,11 +325,15 @@
private Binding createBinding(final SimpleString address,
final SimpleString name,
final Filter filter,
- final boolean durable, final boolean temporary) throws Exception
+ final boolean durable,
+ final boolean temporary) throws Exception
{
Queue queue = queueFactory.createQueue(-1, name, filter, durable, false);
- queue.setBackup(backup);
+ if (backup)
+ {
+ queue.setBackup();
+ }
Binding binding = new BindingImpl(address, queue);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -11,6 +11,8 @@
*/
package org.jboss.messaging.core.remoting;
+import java.util.concurrent.Executor;
+
import org.jboss.messaging.core.exception.MessagingException;
/**
@@ -28,11 +30,11 @@
Packet sendBlocking(Packet packet, ResponseNotifier notifier) throws MessagingException;
- void replicatePacket(Packet packet);
-
+ void replicatePacket(Packet packet) throws MessagingException;
+
void setHandler(ChannelHandler handler);
- void close();
+ void close(boolean onExecutorThread);
void fail();
@@ -47,4 +49,6 @@
void lock();
void unlock();
+
+ Executor getExecutor();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Packet.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -44,4 +44,10 @@
void encode(MessagingBuffer buffer);
void decode(MessagingBuffer buffer);
+
+ boolean isRequiresConfirmations();
+
+ boolean isReplicateBlocking();
+
+ boolean isWriteAlways();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -12,11 +12,11 @@
package org.jboss.messaging.core.remoting.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CLOSE_SESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PONG;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
@@ -29,6 +29,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
@@ -40,14 +41,12 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PACKETS_CONFIRMED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PROCESSED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVETOKENS;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECOVER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
@@ -94,7 +93,6 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.ResponseNotifier;
-import org.jboss.messaging.core.remoting.impl.wireformat.CloseSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
@@ -114,6 +112,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
@@ -282,7 +281,7 @@
pinger = null;
}
}
-
+
// RemotingConnection implementation
// ------------------------------------------------------------
@@ -480,7 +479,7 @@
future.cancel(false);
}
- pingChannel.close();
+ pingChannel.close(false);
destroyed = true;
@@ -494,7 +493,7 @@
for (Channel channel : channels.values())
{
- channel.close();
+ channel.close(false);
}
}
@@ -530,6 +529,11 @@
packet = new MessagingExceptionMessage();
break;
}
+ case PACKETS_CONFIRMED:
+ {
+ packet = new PacketsConfirmedMessage();
+ break;
+ }
case CREATESESSION:
{
packet = new CreateSessionMessage();
@@ -550,9 +554,9 @@
packet = new ReattachSessionResponseMessage();
break;
}
- case CLOSE_SESSION:
+ case SESS_CLOSE:
{
- packet = new CloseSessionMessage();
+ packet = new SessionCloseMessage();
break;
}
case SESS_CREATECONSUMER:
@@ -585,11 +589,6 @@
packet = new SessionProcessedMessage();
break;
}
- case SESS_RECOVER:
- {
- packet = new PacketImpl(PacketImpl.SESS_RECOVER);
- break;
- }
case SESS_COMMIT:
{
packet = new PacketImpl(PacketImpl.SESS_COMMIT);
@@ -775,11 +774,6 @@
packet = new SessionReceiveMessage();
break;
}
- case SESS_PACKETS_CONFIRMED:
- {
- packet = new PacketsConfirmedMessage();
- break;
- }
case SESS_CONSUMER_CLOSE:
{
packet = new SessionConsumerCloseMessage();
@@ -797,7 +791,7 @@
}
case NULL_RESPONSE:
{
- packet = new NullResponseMessage();
+ packet = new NullResponseMessage(false);
break;
}
case SESS_MANAGEMENT_SEND:
@@ -929,8 +923,7 @@
final byte packetType = packet.getType();
- if (connection.writePackets || packetType == SESS_PACKETS_CONFIRMED ||
- packetType == PONG)
+ if (connection.writePackets || packet.isWriteAlways())
{
connection.doWrite(packet);
}
@@ -942,6 +935,11 @@
private Thread blockThread;
private ResponseNotifier responseNotifier;
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
// This must never called by more than one thread concurrently
public Packet sendBlocking(final Packet packet) throws MessagingException
@@ -1039,36 +1037,59 @@
}
}
- public void replicatePacket(final Packet packet)
+ public void replicatePacket(final Packet packet) throws MessagingException
{
if (replicatingChannel != null)
{
- replicatingChannel.send(packet);
+ if (packet.isReplicateBlocking())
+ {
+ replicatingChannel.sendBlocking(packet);
+ }
+ else
+ {
+ replicatingChannel.send(packet);
+ }
}
}
+
+ public void replicatePacketBlocking(final Packet packet) throws MessagingException
+ {
+ if (replicatingChannel != null)
+ {
+ replicatingChannel.sendBlocking(packet);
+ }
+ }
public void setHandler(final ChannelHandler handler)
{
this.handler = handler;
}
- public void close()
+ public void close(boolean onExecutorThread)
{
if (closed)
{
return;
}
- if (!connection.destroyed && connection.channels.remove(id) == null)
+ synchronized (connection)
+ {
+ if (!connection.destroyed && connection.channels.remove(id) == null)
+ {
+ throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
+ }
+ }
+
+ if (!onExecutorThread)
{
- throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
+ waitForExecutorToComplete();
}
- waitForExecutorToComplete();
-
if (replicatingChannel != null)
{
- replicatingChannel.close();
+ replicatingChannel.close(false);
+
+ replicatingChannel = null;
}
closed = true;
@@ -1158,7 +1179,7 @@
private void handlePacket(final Packet packet)
{
- if (packet.getType() == PacketImpl.SESS_PACKETS_CONFIRMED)
+ if (packet.getType() == PACKETS_CONFIRMED)
{
if (resendCache != null)
{
@@ -1187,7 +1208,7 @@
}
}
else if (replicatingChannel != null)
- {
+ {
replicatingChannel.send(packet);
}
else
@@ -1229,13 +1250,6 @@
checkConfirmation(packet);
- // Shouldn't get responses back on replicating connections - since should never be written
-
- if (connection.replicating)
- {
- throw new IllegalStateException("Got response back on replicating connection " + packet.getType());
- }
-
if (responseNotifier != null)
{
responseNotifier.onResponseReceived();
@@ -1247,10 +1261,10 @@
else if (handler != null)
{
if (executor == null)
- {
+ {
checkConfirmation(packet);
-
- handler.handlePacket(packet);
+
+ handler.handlePacket(packet);
}
else
{
@@ -1259,9 +1273,9 @@
public void run()
{
try
- {
+ {
checkConfirmation(packet);
-
+
handler.handlePacket(packet);
}
catch (Exception e)
@@ -1281,7 +1295,7 @@
private void checkConfirmation(final Packet packet)
{
- if (resendCache != null)
+ if (resendCache != null && packet.isRequiresConfirmations())
{
lastReceivedCommandID++;
@@ -1330,7 +1344,7 @@
{
public void handlePacket(final Packet packet)
{
- if (packet.getType() == SESS_PACKETS_CONFIRMED)
+ if (packet.getType() == PACKETS_CONFIRMED)
{
// Send it straight back to the client
connection.doWrite(packet);
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -1,83 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-
-/**
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- */
-public class CloseSessionMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private String name;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public CloseSessionMessage(final String name)
- {
- super(CLOSE_SESSION);
-
- this.name = name;
- }
-
- public CloseSessionMessage()
- {
- super(CLOSE_SESSION);
- }
-
- // Public --------------------------------------------------------
-
- public String getName()
- {
- return name;
- }
-
- @Override
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putString(name);
- }
-
- @Override
- public void decodeBody(final MessagingBuffer buffer)
- {
- name = buffer.getString();
- }
-
- @Override
- public boolean equals(final Object other)
- {
- if (other instanceof CloseSessionMessage == false)
- {
- return false;
- }
-
- CloseSessionMessage r = (CloseSessionMessage)other;
-
- return super.equals(other) && name == r.name;
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -172,6 +172,11 @@
return matches;
}
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -101,6 +101,11 @@
return matches;
}
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/NullResponseMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -22,13 +22,17 @@
// Attributes ----------------------------------------------------
+ private final boolean writeAlways;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NullResponseMessage()
+ public NullResponseMessage(final boolean writeAlways)
{
super(NULL_RESPONSE);
+
+ this.writeAlways = writeAlways;
}
// Public --------------------------------------------------------
@@ -39,6 +43,11 @@
return true;
}
+ public boolean isWriteAlways()
+ {
+ return writeAlways;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -45,18 +45,19 @@
public static final byte EXCEPTION = 20;
public static final byte NULL_RESPONSE = 21;
+
+ public static final byte PACKETS_CONFIRMED = 22;
+
// Server
public static final byte CREATESESSION = 30;
public static final byte CREATESESSION_RESP = 31;
+
+ public static final byte REATTACH_SESSION = 32;
- public static final byte CLOSE_SESSION = 32;
+ public static final byte REATTACH_SESSION_RESP = 33;
- public static final byte REATTACH_SESSION = 33;
-
- public static final byte REATTACH_SESSION_RESP = 34;
-
// Session
public static final byte SESS_CREATECONSUMER = 40;
@@ -72,93 +73,89 @@
public static final byte SESS_PROCESSED = 46;
- public static final byte SESS_RECOVER = 47;
+ public static final byte SESS_COMMIT = 47;
- public static final byte SESS_COMMIT = 48;
-
- public static final byte SESS_ROLLBACK = 49;
+ public static final byte SESS_ROLLBACK = 48;
- public static final byte SESS_QUEUEQUERY = 51;
+ public static final byte SESS_QUEUEQUERY = 49;
- public static final byte SESS_QUEUEQUERY_RESP = 52;
+ public static final byte SESS_QUEUEQUERY_RESP = 50;
- public static final byte SESS_CREATEQUEUE = 53;
+ public static final byte SESS_CREATEQUEUE = 51;
- public static final byte SESS_DELETE_QUEUE = 54;
+ public static final byte SESS_DELETE_QUEUE = 52;
- public static final byte SESS_ADD_DESTINATION = 55;
+ public static final byte SESS_ADD_DESTINATION = 53;
- public static final byte SESS_REMOVE_DESTINATION = 56;
+ public static final byte SESS_REMOVE_DESTINATION = 54;
- public static final byte SESS_BINDINGQUERY = 57;
+ public static final byte SESS_BINDINGQUERY = 55;
- public static final byte SESS_BINDINGQUERY_RESP = 58;
+ public static final byte SESS_BINDINGQUERY_RESP = 56;
- public static final byte SESS_BROWSER_MESSAGE = 59;
+ public static final byte SESS_BROWSER_MESSAGE = 57;
- public static final byte SESS_BROWSER_RESET = 60;
+ public static final byte SESS_BROWSER_RESET = 58;
- public static final byte SESS_BROWSER_HASNEXTMESSAGE = 61;
+ public static final byte SESS_BROWSER_HASNEXTMESSAGE = 59;
- public static final byte SESS_BROWSER_HASNEXTMESSAGE_RESP = 62;
+ public static final byte SESS_BROWSER_HASNEXTMESSAGE_RESP = 60;
- public static final byte SESS_BROWSER_NEXTMESSAGE = 63;
+ public static final byte SESS_BROWSER_NEXTMESSAGE = 61;
- public static final byte SESS_XA_START = 64;
+ public static final byte SESS_XA_START = 62;
- public static final byte SESS_XA_END = 65;
+ public static final byte SESS_XA_END = 63;
- public static final byte SESS_XA_COMMIT = 66;
+ public static final byte SESS_XA_COMMIT = 64;
- public static final byte SESS_XA_PREPARE = 67;
+ public static final byte SESS_XA_PREPARE = 65;
- public static final byte SESS_XA_RESP = 68;
+ public static final byte SESS_XA_RESP = 66;
- public static final byte SESS_XA_ROLLBACK = 69;
+ public static final byte SESS_XA_ROLLBACK = 67;
- public static final byte SESS_XA_JOIN = 70;
+ public static final byte SESS_XA_JOIN = 68;
- public static final byte SESS_XA_SUSPEND = 71;
+ public static final byte SESS_XA_SUSPEND = 69;
- public static final byte SESS_XA_RESUME = 72;
+ public static final byte SESS_XA_RESUME = 70;
- public static final byte SESS_XA_FORGET = 73;
+ public static final byte SESS_XA_FORGET = 71;
- public static final byte SESS_XA_INDOUBT_XIDS = 74;
+ public static final byte SESS_XA_INDOUBT_XIDS = 72;
- public static final byte SESS_XA_INDOUBT_XIDS_RESP = 75;
+ public static final byte SESS_XA_INDOUBT_XIDS_RESP = 73;
- public static final byte SESS_XA_SET_TIMEOUT = 76;
+ public static final byte SESS_XA_SET_TIMEOUT = 74;
- public static final byte SESS_XA_SET_TIMEOUT_RESP = 77;
+ public static final byte SESS_XA_SET_TIMEOUT_RESP = 75;
- public static final byte SESS_XA_GET_TIMEOUT = 78;
+ public static final byte SESS_XA_GET_TIMEOUT = 76;
- public static final byte SESS_XA_GET_TIMEOUT_RESP = 79;
+ public static final byte SESS_XA_GET_TIMEOUT_RESP = 77;
- public static final byte SESS_START = 80;
+ public static final byte SESS_START = 78;
- public static final byte SESS_STOP = 81;
+ public static final byte SESS_STOP = 79;
+
+ public static final byte SESS_CLOSE = 80;
- public static final byte SESS_FLOWTOKEN = 82;
+ public static final byte SESS_FLOWTOKEN = 81;
- public static final byte SESS_SEND = 83;
+ public static final byte SESS_SEND = 82;
- public static final byte SESS_RECEIVETOKENS = 84;
+ public static final byte SESS_RECEIVETOKENS = 83;
- public static final byte SESS_CONSUMER_CLOSE = 85;
+ public static final byte SESS_CONSUMER_CLOSE = 84;
- public static final byte SESS_PRODUCER_CLOSE = 86;
+ public static final byte SESS_PRODUCER_CLOSE = 85;
- public static final byte SESS_BROWSER_CLOSE = 87;
+ public static final byte SESS_BROWSER_CLOSE = 86;
- public static final byte SESS_RECEIVE_MSG = 88;
+ public static final byte SESS_RECEIVE_MSG = 87;
- public static final byte SESS_PACKETS_CONFIRMED = 89;
-
- public static final byte SESS_REPLICATE_DELIVERY = 90;
-
- public static final byte SESS_MANAGEMENT_SEND = 94;
+ public static final byte SESS_MANAGEMENT_SEND = 88;
// Static --------------------------------------------------------
@@ -220,6 +217,21 @@
public void decodeBody(final MessagingBuffer buffer)
{
}
+
+ public boolean isRequiresConfirmations()
+ {
+ return true;
+ }
+
+ public boolean isReplicateBlocking()
+ {
+ return false;
+ }
+
+ public boolean isWriteAlways()
+ {
+ return false;
+ }
@Override
public String toString()
@@ -239,7 +251,7 @@
return r.type == type && r.channelID == channelID;
}
-
+
// Package protected ---------------------------------------------
protected String getParentString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketsConfirmedMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -45,14 +45,14 @@
public PacketsConfirmedMessage(final int commandID)
{
- super(SESS_PACKETS_CONFIRMED);
+ super(PACKETS_CONFIRMED);
this.commandID = commandID;
}
public PacketsConfirmedMessage()
{
- super(SESS_PACKETS_CONFIRMED);
+ super(PACKETS_CONFIRMED);
}
// Public --------------------------------------------------------
@@ -89,6 +89,17 @@
return super.equals(other) && this.commandID == r.commandID;
}
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
+
+ public boolean isWriteAlways()
+ {
+ return true;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Ping.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -92,6 +92,11 @@
return super.equals(other) && this.expirePeriod == r.expirePeriod;
}
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Pong.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Pong.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Pong.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -77,6 +77,11 @@
{
newPeriod = buffer.getLong();
}
+
+ public boolean isWriteAlways()
+ {
+ return true;
+ }
@Override
public String toString()
@@ -98,6 +103,11 @@
return super.equals(other) && this.newPeriod == r.newPeriod;
}
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -95,6 +95,11 @@
return super.equals(other) && this.name.equals(r.name) &&
this.lastReceivedCommandID == r.lastReceivedCommandID;
}
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReattachSessionResponseMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -88,6 +88,11 @@
return super.equals(other) && this.lastReceivedCommandID == r.lastReceivedCommandID;
}
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionAddDestinationMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -96,6 +96,13 @@
temporary = buffer.getBoolean();
}
+ //Needs to be true so we can ensure packet has reached backup before we start sending messages to it from another
+ //session
+ public boolean isReplicateBlocking()
+ {
+ return true;
+ }
+
@Override
public String toString()
{
Copied: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java (from rev 5046, trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CloseSessionMessage.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCloseMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionCloseMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionCloseMessage()
+ {
+ super(SESS_CLOSE);
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public boolean equals(final Object other)
+ {
+ if (other instanceof SessionCloseMessage == false)
+ {
+ return false;
+ }
+
+ SessionCloseMessage r = (SessionCloseMessage)other;
+
+ return super.equals(other);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionCreateQueueMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -121,6 +121,13 @@
temporary = buffer.getBoolean();
}
+ //Needs to be true so we can ensure packet has reached backup before we start sending messages to it from another
+ //session
+ public boolean isReplicateBlocking()
+ {
+ return true;
+ }
+
public boolean equals(Object other)
{
if (other instanceof SessionCreateQueueMessage == false)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -116,6 +116,11 @@
clientMessage.getBody().flip();
}
+
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -73,7 +73,7 @@
boolean autoCommitAcks,
boolean xa) throws Exception;
- void closeSession(String name) throws Exception;
+ void removeSession(String name) throws Exception;
boolean isStarted();
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -24,6 +24,7 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import org.jboss.messaging.core.filter.Filter;
@@ -110,6 +111,8 @@
MessageReference removeReferenceWithID(long id);
+ MessageReference waitForReferenceWithID(long id, CountDownLatch latch);
+
MessageReference getReference(long id);
void deleteAllReferences(StorageManager storageManager) throws Exception;
@@ -135,10 +138,13 @@
boolean moveMessage(long messageID, Binding toBinding,
StorageManager storageManager, PostOffice postOffice) throws Exception;
- void setBackup(boolean backup);
+ void setBackup();
+ void activate();
+
boolean isBackup();
MessageReference removeFirst();
-
+
+ boolean consumerFailedOver();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -44,5 +44,7 @@
Queue getQueue();
- MessageReference getReference(long messageID) throws Exception;
+ MessageReference waitForReference(long messageID) throws Exception;
+
+ void failedOver();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -130,4 +130,6 @@
int replayCommands(int lastReceivedCommandID);
void handleManagementMessage(SessionSendManagementMessage message) throws Exception;
+
+ void failedOver() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -210,7 +210,8 @@
managementService,
configuration.isRequireDestinations(),
resourceManager,
- configuration.isWildcardRoutingEnabled());
+ configuration.isWildcardRoutingEnabled(),
+ configuration.isBackup());
securityRepository = new HierarchicalObjectRepository<Set<Role>>();
securityRepository.setDefault(new HashSet<Role>());
@@ -224,7 +225,6 @@
this);
postOffice.start();
- postOffice.setBackup(configuration.isBackup());
TransportConfiguration backupConnector = configuration.getBackupConnectorConfiguration();
@@ -396,7 +396,7 @@
// Reconnect the channel to the new connection
session.transferConnection(connection);
-
+
// This is necessary for invm since the replicating connection will be the
// same connection
// as the original replicating connection since the key is the same in the
@@ -406,7 +406,7 @@
int serverLastReceivedCommandID = session.replayCommands(lastReceivedCommandID);
- postOffice.setBackup(false);
+ postOffice.activate();
configuration.setBackup(false);
@@ -414,8 +414,7 @@
connection.setReplicating(false);
- // Re-prompt delivery
- session.setStarted(true);
+ session.failedOver();
return new ReattachSessionResponseMessage(serverLastReceivedCommandID);
}
@@ -467,6 +466,7 @@
executorFactory.getExecutor(),
channel,
managementService,
+ this,
simpleStringIdGenerator);
// If the session already exists that's fine - create session must be idempotent
@@ -486,15 +486,9 @@
configuration.getPacketConfirmationBatchSize());
}
- // Must also be idempotent
- public void closeSession(final String name) throws Exception
- {
- ServerSession session = sessions.remove(name);
-
- if (session != null)
- {
- session.close();
- }
+ public void removeSession(final String name) throws Exception
+ {
+ sessions.remove(name);
}
public RemotingConnection getReplicatingConnection()
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -12,7 +12,6 @@
package org.jboss.messaging.core.server.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CLOSE_SESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
@@ -22,10 +21,8 @@
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CloseSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.jboss.messaging.core.server.MessagingServer;
@@ -58,12 +55,7 @@
}
public void handlePacket(final Packet packet)
- {
- if (channel1.getReplicatingChannel() != null)
- {
- channel1.replicatePacket(packet);
- }
-
+ {
Packet response = null;
byte type = packet.getType();
@@ -72,6 +64,8 @@
// reliability replay functionality
try
{
+ channel1.replicatePacket(packet);
+
switch (type)
{
case CREATESESSION:
@@ -97,16 +91,6 @@
break;
}
- case CLOSE_SESSION:
- {
- CloseSessionMessage request = (CloseSessionMessage)packet;
-
- server.closeSession(request.getName());
-
- response = new NullResponseMessage();
-
- break;
- }
default:
{
response = new MessagingExceptionMessage(new MessagingException(MessagingException.UNSUPPORTED_PACKET,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -12,6 +12,23 @@
package org.jboss.messaging.core.server.impl;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
@@ -20,22 +37,18 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.SimpleString;
-import java.util.*;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Implementation of a Queue TODO use Java 5 concurrent queue
*
@@ -78,6 +91,8 @@
private boolean promptDelivery;
+ private int pos;
+
private AtomicInteger sizeBytes = new AtomicInteger(0);
private AtomicInteger messagesAdded = new AtomicInteger(0);
@@ -92,6 +107,10 @@
private volatile boolean backup;
+ private int consumersToFailover = -1;
+
+ private Map<Long, CountDownLatch> waitingIDMap = new ConcurrentHashMap<Long, CountDownLatch>();
+
public QueueImpl(final long persistenceID,
final SimpleString name,
final Filter filter,
@@ -145,25 +164,33 @@
public HandleStatus addLast(final MessageReference ref)
{
- return add(ref, false);
+ HandleStatus status = add(ref, false);
+
+ checkWaiting(ref.getMessage().getMessageID());
+
+ return status;
}
public HandleStatus addFirst(final MessageReference ref)
{
return add(ref, true);
}
-
- public void addListFirst(final LinkedList<MessageReference> list)
+
+ public synchronized void addListFirst(final LinkedList<MessageReference> list)
{
ListIterator<MessageReference> iter = list.listIterator(list.size());
while (iter.hasPrevious())
{
MessageReference ref = iter.previous();
+
+ ServerMessage msg = ref.getMessage();
- messageReferences.addFirst(ref, ref.getMessage().getPriority());
+ messageReferences.addFirst(ref, msg.getPriority());
+
+ checkWaiting(msg.getMessageID());
}
-
+
deliver();
}
@@ -262,7 +289,8 @@
public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
{
boolean removed = distributionPolicy.removeConsumer(consumer);
- if(removed)
+
+ if (removed)
{
distributionPolicy.removeConsumer(consumer);
}
@@ -580,24 +608,86 @@
return backup;
}
- public void setBackup(final boolean backup)
+ public synchronized void setBackup()
{
- this.backup = backup;
+ this.backup = true;
this.direct = false;
+ }
- if (!backup)
+ public MessageReference removeFirst()
+ {
+ return messageReferences.removeFirst();
+ }
+
+ public synchronized void activate()
+ {
+ consumersToFailover = distributionPolicy.getConsumerCount();
+
+ if (consumersToFailover == 0)
{
+ backup = false;
+ }
+ }
+
+ public synchronized boolean consumerFailedOver()
+ {
+ consumersToFailover--;
+
+ if (consumersToFailover == 0)
+ {
+ // All consumers for the queue have failed over, can re-activate it now
+
+ backup = false;
+
for (ScheduledDeliveryRunnable runnable : scheduledRunnables)
{
scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
}
+
+ return true;
}
+ else
+ {
+ return false;
+ }
}
- public MessageReference removeFirst()
+ public MessageReference waitForReferenceWithID(final long id, final CountDownLatch latch)
{
- return messageReferences.removeFirst();
+ MessageReference ref;
+
+ synchronized (this)
+ {
+ ref = removeReferenceWithID(id);
+
+ if (ref == null)
+ {
+ waitingIDMap.put(id, latch);
+ }
+ }
+
+ if (ref == null)
+ {
+ boolean ok = false;
+
+ try
+ {
+ ok = latch.await(10000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ if (!ok)
+ {
+ throw new IllegalStateException("Timed out or interrupted waiting for ref to arrive on queue " + id);
+ }
+
+ ref = this.removeReferenceWithID(id);
+ }
+
+ return ref;
}
// Public
@@ -732,7 +822,7 @@
private HandleStatus deliver(final MessageReference reference)
{
- if (!distributionPolicy.hasConsumers())
+ if (distributionPolicy.getConsumerCount() == 0)
{
return HandleStatus.BUSY;
}
@@ -747,7 +837,7 @@
{
Consumer consumer = distributionPolicy.select(reference.getMessage(), status != null);
pos = distributionPolicy.getCurrentPosition();
- if(consumer == null)
+ if (consumer == null)
{
if (filterRejected)
{
@@ -800,11 +890,11 @@
filterRejected = true;
}
- if(startPos > distributionPolicy.getConsumerCount() - 1)
+ if (startPos > distributionPolicy.getConsumerCount() - 1)
{
startPos = distributionPolicy.getConsumerCount() - 1;
}
- if(startPos == pos)
+ if (startPos == pos)
{
// Tried all of them
if (filterRejected)
@@ -815,10 +905,19 @@
{
// Give up - all consumers busy
return HandleStatus.BUSY;
- }
+ }
}
}
+ }
+
+ private void checkWaiting(final long messageID)
+ {
+ CountDownLatch latch = waitingIDMap.remove(messageID);
+ if (latch != null)
+ {
+ latch.countDown();
+ }
}
// Inner classes
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.server.impl;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.filter.Filter;
@@ -84,11 +85,13 @@
private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
private final PostOffice postOffice;
-
+
private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
-
- private final Channel channel;
+ private final Channel channel;
+
+ private volatile CountDownLatch waitingLatch;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -128,9 +131,9 @@
this.queueSettingsRepository = queueSettingsRepository;
this.postOffice = postOffice;
-
+
this.channel = channel;
-
+
messageQueue.addConsumer(this);
}
@@ -142,10 +145,10 @@
return id;
}
- public HandleStatus handle(MessageReference ref) throws Exception
+ public HandleStatus handle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
- {
+ {
return HandleStatus.BUSY;
}
@@ -177,35 +180,39 @@
{
availableCredits.addAndGet(-message.getEncodeSize());
}
+
+ deliveringRefs.add(ref);
- deliveringRefs.add(ref);
-
- SessionReceiveMessage packet =
- new SessionReceiveMessage(id, ref.getMessage(), ref.getDeliveryCount() + 1);
-
+ SessionReceiveMessage packet = new SessionReceiveMessage(id, ref.getMessage(), ref.getDeliveryCount() + 1);
+
channel.send(packet);
-
+
return HandleStatus.HANDLED;
}
}
-
+
public void close() throws Exception
- {
+ {
setStarted(false);
+
+ if (waitingLatch != null)
+ {
+ waitingLatch.countDown();
+ }
messageQueue.removeConsumer(this);
session.removeConsumer(this);
-
+
cancelRefs();
}
-
+
public void cancelRefs() throws Exception
{
if (!deliveringRefs.isEmpty())
{
Transaction tx = new TransactionImpl(storageManager, postOffice);
-
+
for (MessageReference ref : deliveringRefs)
{
tx.addAcknowledgement(ref);
@@ -214,22 +221,18 @@
deliveringRefs.clear();
tx.rollback(queueSettingsRepository);
- }
+ }
}
public void setStarted(final boolean started)
{
- boolean useStarted;
-
synchronized (startStopLock)
{
this.started = started;
-
- useStarted = started;
}
// Outside the lock
- if (useStarted)
+ if (started)
{
promptDelivery();
}
@@ -240,11 +243,11 @@
if (availableCredits != null)
{
int previous = availableCredits.getAndAdd(credits);
-
- if (previous <= 0 && (previous + credits) > 0)
+
+ if (previous <= 0 && previous + credits > 0)
{
promptDelivery();
- }
+ }
}
}
@@ -252,62 +255,45 @@
{
return messageQueue;
}
-
- private MessageReference deliverMessage(final long messageID) throws Exception
+
+ public MessageReference waitForReference(final long messageID) throws Exception
{
- // Deliver a specific message from the queue - this is used when
- // replicating delivery state
- // We can't just deliver the next message since there may be multiple
- // sessions on the same queue
- // delivering concurrently
- // and we could end up with different delivery state on backup compare to
- // live
- // So we need the message id so we can be sure the backup session has the
- // same delivery state
- MessageReference ref = messageQueue.removeReferenceWithID(messageID);
-
- if (ref == null)
- {
- throw new IllegalStateException("Cannot find reference " + messageID);
- }
-
- HandleStatus handled = handle(ref);
-
- if (handled != HandleStatus.HANDLED)
- {
- throw new IllegalStateException("Failed to handle replicated reference " + messageID);
- }
-
- return ref;
- }
-
- public MessageReference getReference(final long messageID) throws Exception
- {
-// MessageReference ref;
-// do
-// {
-// ref = deliveringRefs.poll();
-// }
-// while (ref.getMessage().getMessageID() != messageID);
-
if (messageQueue.isBackup())
{
- return deliverMessage(messageID);
+ waitingLatch = new CountDownLatch(1);
+
+ MessageReference ref = messageQueue.waitForReferenceWithID(messageID, waitingLatch);
+
+ waitingLatch = null;
+
+ return ref;
}
else
{
-
MessageReference ref = deliveringRefs.poll();
-
+
if (ref.getMessage().getMessageID() != messageID)
{
throw new IllegalStateException("Invalid order");
}
-
+
return ref;
}
}
+ public void failedOver()
+ {
+ synchronized (startStopLock)
+ {
+ started = true;
+ }
+
+ if (messageQueue.consumerFailedOver())
+ {
+ promptDelivery();
+ }
+ }
+
// Public
// -----------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -116,6 +116,12 @@
private final IDGenerator idGenerator = new SimpleIDGenerator(0);
+ private volatile boolean closed;
+
+ private final String name;
+
+ private final MessagingServer server;
+
private final SimpleStringIdGenerator simpleStringIdGenerator;
// Constructors ---------------------------------------------------------------------------------
@@ -136,6 +142,7 @@
final Executor executor,
final Channel channel,
final ManagementService managementService,
+ final MessagingServer server,
final SimpleStringIdGenerator simpleStringIdGenerator) throws Exception
{
this.id = id;
@@ -173,6 +180,10 @@
this.managementService = managementService;
+ this.name = name;
+
+ this.server = server;
+
this.simpleStringIdGenerator = simpleStringIdGenerator;
}
@@ -229,10 +240,24 @@
started = s;
}
+ public void failedOver() throws Exception
+ {
+ Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+
+ for (ServerConsumer consumer : consumersClone)
+ {
+ consumer.failedOver();
+ }
+
+ started = true;
+ }
+
public void close() throws Exception
{
- channel.close();
+ closed = true;
+ channel.close(true);
+
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
for (ServerConsumer consumer : consumersClone)
@@ -260,7 +285,9 @@
producers.clear();
- rollback();
+ rollback(false);
+
+ server.removeSession(name);
}
public void promptDelivery(final Queue queue)
@@ -305,13 +332,12 @@
{
tx.addMessage(msg);
}
-
}
public void processed(final long consumerID, final long messageID) throws Exception
{
- MessageReference ref = consumers.get(consumerID).getReference(messageID);
-
+ MessageReference ref = consumers.get(consumerID).waitForReference(messageID);
+
// Ref = null would imply consumer is already closed so we could ignore it
if (ref != null)
{
@@ -328,10 +354,27 @@
ref.incrementDeliveryCount();
}
}
+ else
+ {
+ if (!closed)
+ {
+ throw new IllegalStateException(System.identityHashCode(this) + " Could not find ref with id " + messageID);
+ }
+ else
+ {
+ // If closed then might not find ref since processed might come in before send and send
+ // didn't come in since closed
+ }
+ }
}
-
+
public void rollback() throws Exception
{
+ rollback(true);
+ }
+
+ private void rollback(final boolean sendResponse) throws Exception
+ {
if (tx == null)
{
// Might be null if XA
@@ -339,31 +382,34 @@
tx = new TransactionImpl(storageManager, postOffice);
}
- //Need to write the response now - before redeliveries occur
- channel.send(new NullResponseMessage());
-
+ if (sendResponse)
+ {
+ // Need to write the response now - before redeliveries occur
+ channel.send(new NullResponseMessage(false));
+ }
+
boolean wasStarted = started;
-
+
for (ServerConsumer consumer : consumers.values())
- {
+ {
if (wasStarted)
{
consumer.setStarted(false);
}
-
+
consumer.cancelRefs();
}
-
+
tx.rollback(queueSettingsRepository);
-
+
if (wasStarted)
- {
+ {
for (ServerConsumer consumer : consumers.values())
- {
+ {
consumer.setStarted(true);
}
}
-
+
tx = new TransactionImpl(storageManager, postOffice);
}
@@ -594,23 +640,23 @@
}
boolean wasStarted = started;
-
+
for (ServerConsumer consumer : consumers.values())
- {
+ {
if (wasStarted)
{
consumer.setStarted(false);
}
-
+
consumer.cancelRefs();
}
-
+
theTx.rollback(queueSettingsRepository);
-
+
if (wasStarted)
{
for (ServerConsumer consumer : consumers.values())
- {
+ {
consumer.setStarted(true);
}
}
@@ -1053,20 +1099,20 @@
public void handleManagementMessage(final SessionSendManagementMessage message) throws Exception
{
ServerMessage serverMessage = message.getServerMessage();
-
+
if (serverMessage.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
{
boolean subscribe = (Boolean)serverMessage.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
-
+
final SimpleString replyTo = (SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
-
+
if (subscribe)
{
if (log.isDebugEnabled())
{
log.debug("added notification listener " + this);
}
-
+
managementService.addNotificationListener(this, null, replyTo);
}
else
@@ -1075,15 +1121,15 @@
{
log.debug("removed notification listener " + this);
}
-
+
managementService.removeNotificationListener(this);
}
return;
}
managementService.handleMessage(message.getServerMessage());
-
+
serverMessage.setDestination((SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
-
+
send(serverMessage);
}
@@ -1106,7 +1152,22 @@
}
}
- close();
+ // We execute this on the session's serial executor, then we can avoid complex synchronization
+ // and ensure no operations are fielded on the session after it is closed
+ channel.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
+ }
+ });
}
catch (Throwable t)
{
@@ -1141,7 +1202,7 @@
{
return tx;
}
-
+
// Private
// ----------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -12,20 +12,91 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_HASNEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_NEXTMESSAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BROWSER_RESET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEBROWSER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEPRODUCER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATEQUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_MANAGEMENT_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PROCESSED;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.impl.wireformat.*;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.*;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserHasNextMessageResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserNextMessageMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBrowserResetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProcessedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
-import javax.transaction.xa.Xid;
-import java.util.List;
-
/**
* A ServerSessionPacketHandler
*
@@ -73,24 +144,17 @@
{
// must generate message id here, so we know they are in sync
long id = storageManager.generateUniqueID();
-
+
send.getServerMessage().setMessageID(id);
}
-// else
-// {
-// log.info("Got replicated send");
-// }
}
-
- if (channel.getReplicatingChannel() != null)
- {
- channel.replicatePacket(packet);
- }
-
+
Packet response = null;
try
{
+ channel.replicatePacket(packet);
+
switch (type)
{
case SESS_CREATECONSUMER:
@@ -110,14 +174,14 @@
request.getFilterString(),
request.isDurable(),
request.isTemporary());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(true);
break;
}
case SESS_DELETE_QUEUE:
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
session.deleteQueue(request.getQueueName());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
case SESS_QUEUEQUERY:
@@ -136,7 +200,7 @@
{
SessionCreateBrowserMessage request = (SessionCreateBrowserMessage)packet;
session.createBrowser(request.getQueueName(), request.getFilterString());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
case SESS_CREATEPRODUCER:
@@ -151,14 +215,14 @@
session.processed(message.getConsumerID(), message.getMessageID());
if (message.isRequiresResponse())
{
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
}
break;
}
case SESS_COMMIT:
{
session.commit();
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
case SESS_ROLLBACK:
@@ -241,14 +305,14 @@
{
SessionAddDestinationMessage message = (SessionAddDestinationMessage)packet;
session.addDestination(message.getAddress(), message.isDurable(), message.isTemporary());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(true);
break;
}
case SESS_REMOVE_DESTINATION:
{
SessionRemoveDestinationMessage message = (SessionRemoveDestinationMessage)packet;
session.removeDestination(message.getAddress(), message.isDurable());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
case SESS_START:
@@ -259,28 +323,34 @@
case SESS_STOP:
{
session.setStarted(false);
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
+ case SESS_CLOSE:
+ {
+ session.close();
+ response = new NullResponseMessage(false);
+ break;
+ }
case SESS_CONSUMER_CLOSE:
{
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
session.closeConsumer(message.getConsumerID());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
case SESS_PRODUCER_CLOSE:
{
SessionProducerCloseMessage message = (SessionProducerCloseMessage)packet;
session.closeProducer(message.getProducerID());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
case SESS_BROWSER_CLOSE:
{
SessionBrowserCloseMessage message = (SessionBrowserCloseMessage)packet;
session.closeBrowser(message.getBrowserID());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
case SESS_FLOWTOKEN:
@@ -295,7 +365,7 @@
session.sendProducerMessage(message.getProducerID(), message.getServerMessage());
if (message.isRequiresResponse())
{
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
}
break;
}
@@ -316,7 +386,7 @@
{
SessionBrowserResetMessage message = (SessionBrowserResetMessage)packet;
session.browserReset(message.getBrowserID());
- response = new NullResponseMessage();
+ response = new NullResponseMessage(false);
break;
}
case SESS_MANAGEMENT_SEND:
Modified: trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/jms/client/JMSMessageListenerWrapper.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -75,7 +75,7 @@
return;
}
- if (this.transactedOrClientAck)
+ if (transactedOrClientAck)
{
try
{
Modified: trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/util/JBMThreadFactory.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -39,7 +39,7 @@
public JBMThreadFactory(final String groupName)
{
- group = new ThreadGroup(groupName);
+ group = new ThreadGroup(groupName + "-" + System.identityHashCode(this));
}
public Thread newThread(final Runnable command)
Modified: trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -18,109 +18,72 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.util;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
+import java.util.Set;
import java.util.concurrent.Executor;
/**
- * A OrderedExecutorFactory2
- *
- * @author <a href="mailto:david.lloyd at jboss.com">David LLoyd</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * This factory creates a hierarchy of Executor which shares the threads of the
+ * parent Executor (typically, the root parent is a Thread pool).
*
+ * @author <a href="david.lloyd at jboss.com">David Lloyd</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
*/
public final class OrderedExecutorFactory implements ExecutorFactory
{
private final Executor parent;
+ private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
- /**
- * Construct a new instance delegating to the given parent executor.
- *
- * @param parent the parent executor
- */
public OrderedExecutorFactory(final Executor parent)
{
this.parent = parent;
}
- /**
- * Get an executor that always executes tasks in order.
- *
- * @return an ordered executor
- */
public Executor getExecutor()
{
- return new OrderedExecutor(parent);
+ return new ChildExecutor();
}
- private static final class OrderedExecutor implements Executor
+ private final class ChildExecutor implements Executor, Runnable
{
- // @protectedby tasks
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
- // @protectedby tasks
- private boolean running;
-
- private final Executor parent;
-
- private final Runnable runner;
-
- /**
- * Construct a new instance.
- *
- * @param parent the parent executor
- */
- public OrderedExecutor(final Executor parent)
+ public void execute(Runnable command)
{
- this.parent = parent;
-
- runner = new Runnable()
+ synchronized (tasks)
{
- public void run()
+ tasks.add(command);
+ if (tasks.size() == 1 && runningChildren.add(this))
{
- for (;;)
- {
- final Runnable task;
- synchronized (tasks)
- {
- task = tasks.poll();
- if (task == null)
- {
- running = false;
- return;
- }
- }
- try
- {
- task.run();
- }
- catch (Throwable t)
- {
- // eat it!
- }
- }
+ parent.execute(this);
}
- };
+ }
}
- /**
- * Run a task.
- *
- * @param command the task to run.
- */
- public void execute(Runnable command)
+ public void run()
{
- synchronized (tasks)
+ for (;;)
{
- tasks.add(command);
- if (!running)
+ final Runnable task;
+ synchronized (tasks)
{
- running = true;
- parent.execute(runner);
+ task = tasks.poll();
+ if (task == null)
+ {
+ runningChildren.remove(this);
+ return;
+ }
}
+ task.run();
}
}
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -3763,7 +3763,7 @@
}
}
-// http://jira.jboss.org/jira/browse/JBMESSAGING-1294 - commented out until 2.0 beta
+ // http://jira.jboss.org/jira/browse/JBMESSAGING-1294 - commented out until 2.0 beta
// public void testExceptionMessageListener1() throws Exception
// {
// Connection conn = null;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -128,9 +128,9 @@
message2.processed();
}
- ClientMessage message3 = consumer.receive(250);
+ //ClientMessage message3 = consumer.receive(250);
- assertNull(message3);
+ //assertNull(message3);
session.close();
}
@@ -548,15 +548,17 @@
{
ClientConsumer cons = consumers.get(i);
- ClientSession sess = sessions.get(i);
-
for (int j = 0; j < numMessages; j++)
{
ClientMessage message2 = cons.receive();
assertEquals("aardvarks", message2.getBody().getString());
+
+ // log.info("actually got message " + message2.getMessageID());
assertEquals(j, message2.getProperty(new SimpleString("count")));
+
+
message2.processed();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -133,7 +133,7 @@
return null;
}
- public void setBackup(boolean backup)
+ public void activate()
{
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplTest.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -60,7 +60,7 @@
{
private final QueueFactory queueFactory = new FakeQueueFactory();
- protected boolean wildCardRoutingEnabled = false;
+ protected boolean wildCardRoutingEnabled;
public void testPostOfficeStart() throws Exception
{
@@ -69,7 +69,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -90,7 +90,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -125,7 +125,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -174,7 +174,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -222,7 +222,7 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -273,7 +273,7 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -321,7 +321,7 @@
PagingStore store = EasyMock.createNiceMock(PagingStore.class);
EasyMock.expect(pgm.getPageStore(address1)).andReturn(store);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -355,7 +355,7 @@
PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -418,7 +418,7 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -479,7 +479,7 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -541,7 +541,7 @@
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
PagingStore pgstore = EasyMock.createNiceMock(PagingStore.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -600,7 +600,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
final long id = 324;
final SimpleString name = new SimpleString("wibb22");
@@ -642,7 +642,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
final long id = 324;
final SimpleString name = new SimpleString("wibb22");
@@ -693,7 +693,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice po = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
final SimpleString condition1 = new SimpleString("queue.wibble");
@@ -781,7 +781,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -809,7 +809,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -843,7 +843,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -874,7 +874,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -919,7 +919,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -929,7 +929,6 @@
(ResourceManager)EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
EasyMock.replay(pm, qf, queue);
@@ -954,7 +953,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -966,13 +965,10 @@
EasyMock.expect(qf.createQueue(-1, queueName2, filter, true, false)).andReturn(queue2);
EasyMock.expect(qf.createQueue(-1, queueName3, filter, true, false)).andReturn(queue3);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue2.setBackup(false);
queue2.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
- queue3.setBackup(false);
queue3.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
@@ -999,7 +995,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1009,7 +1005,6 @@
(ResourceManager)EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.replay(pm, qf, queue);
postOffice.start();
@@ -1033,7 +1028,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1045,13 +1040,10 @@
EasyMock.expect(qf.createQueue(-1, queueName2, filter, false, false)).andReturn(queue2);
EasyMock.expect(qf.createQueue(-1, queueName3, filter, false, false)).andReturn(queue3);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue2.setBackup(false);
queue2.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
- queue3.setBackup(false);
queue3.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.replay(pm, qf, queue, queue2, queue3);
postOffice.start();
@@ -1075,7 +1067,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1086,10 +1078,8 @@
EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
- queue.setBackup(false);
EasyMock.replay(pm, qf, queue);
postOffice.start();
@@ -1117,7 +1107,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1127,7 +1117,6 @@
(ResourceManager)EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, true, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
EasyMock.expect(queue.isDurable()).andStubReturn(true);
@@ -1156,7 +1145,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1168,13 +1157,10 @@
EasyMock.expect(qf.createQueue(-1, queueName2, filter, true, false)).andReturn(queue2);
EasyMock.expect(qf.createQueue(-1, queueName3, filter, true, false)).andReturn(queue3);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue2.setBackup(false);
queue2.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
- queue3.setBackup(false);
queue3.setFlowController((FlowController)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
pm.addBinding((Binding)EasyMock.anyObject());
@@ -1209,7 +1195,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1219,7 +1205,6 @@
(ResourceManager)EasyMock.anyObject());
EasyMock.expect(qf.createQueue(-1, queueName, filter, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue.isDurable()).andStubReturn(false);
queue.setFlowController(null);
@@ -1246,7 +1231,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1258,13 +1243,10 @@
EasyMock.expect(qf.createQueue(-1, queueName2, filter, false, false)).andReturn(queue2);
EasyMock.expect(qf.createQueue(-1, queueName3, filter, false, false)).andReturn(queue3);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue2.setBackup(false);
queue2.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
- queue3.setBackup(false);
queue3.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(queue.isDurable()).andStubReturn(false);
@@ -1294,7 +1276,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1328,7 +1310,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1361,7 +1343,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1390,7 +1372,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1403,7 +1385,6 @@
EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getFilter()).andStubReturn(null);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
@@ -1439,7 +1420,7 @@
EasyMock.expect(pgm.addSize(EasyMock.isA(ServerMessage.class))).andReturn(-1l);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1453,7 +1434,6 @@
EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getFilter()).andStubReturn(null);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.replay(pm, pgm, qf, message, queue);
@@ -1480,7 +1460,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1494,7 +1474,6 @@
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getFilter()).andStubReturn(filter);
EasyMock.expect(filter.match(message)).andReturn(true);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.expect(message.createReference(queue)).andReturn(messageReference);
EasyMock.replay(pm, qf, message, queue, messageReference, filter);
@@ -1519,7 +1498,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1533,7 +1512,6 @@
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue.getFilter()).andStubReturn(filter);
EasyMock.expect(filter.match(message)).andReturn(false);
- queue.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
EasyMock.replay(pm, qf, message, queue, messageReference, filter);
postOffice.start();
@@ -1561,7 +1539,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1580,9 +1558,6 @@
EasyMock.expect(queue2.getFilter()).andStubReturn(null);
EasyMock.expect(queue3.getName()).andStubReturn(queueName3);
EasyMock.expect(queue3.getFilter()).andStubReturn(null);
- queue.setBackup(false);
- queue2.setBackup(false);
- queue3.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
queue2.setFlowController((FlowController)EasyMock.anyObject());
queue3.setFlowController((FlowController)EasyMock.anyObject());
@@ -1622,7 +1597,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, false, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -1643,9 +1618,6 @@
EasyMock.expect(queue3.getFilter()).andStubReturn(filter2);
EasyMock.expect(filter.match(message)).andReturn(false);
EasyMock.expect(filter2.match(message)).andReturn(true);
- queue.setBackup(false);
- queue2.setBackup(false);
- queue3.setBackup(false);
queue.setFlowController((FlowController)EasyMock.anyObject());
queue2.setFlowController((FlowController)EasyMock.anyObject());
queue3.setFlowController((FlowController)EasyMock.anyObject());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java 2008-10-02 08:02:47 UTC (rev 5059)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/PostOfficeImplWildcardManagerTest.java 2008-10-02 13:43:48 UTC (rev 5060)
@@ -67,7 +67,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -81,7 +81,6 @@
EasyMock.expect(message3.getDestination()).andStubReturn(address3);
EasyMock.expect(qf.createQueue(-1, queueName, null, false, false)).andReturn(queue);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
- queue.setBackup(false);
queue.setFlowController(null);
EasyMock.expect(queue.getFilter()).andStubReturn(null);
EasyMock.expect(pgm.addSize(message)).andStubReturn(1000l);
@@ -130,7 +129,7 @@
ManagementService ms = EasyMock.createNiceMock(ManagementService.class);
PagingManager pgm = EasyMock.createNiceMock(PagingManager.class);
- PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled);
+ PostOffice postOffice = new PostOfficeImpl(pm, pgm, qf, ms, true, null, wildCardRoutingEnabled, false);
qf.setPostOffice(postOffice);
@@ -148,8 +147,6 @@
EasyMock.expect(qf.createQueue(-1, queueName2, null, false, false)).andReturn(queue2);
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(queue2.getName()).andStubReturn(queueName2);
- queue.setBackup(false);
- queue2.setBackup(false);
queue.setFlowController(null);
queue2.setFlowController(null);
EasyMock.expect(queue.getFilter()).andStubReturn(null);
More information about the jboss-cvs-commits
mailing list