[jboss-cvs] JBoss Messaging SVN: r5061 - in trunk/src/main/org/jboss/messaging: core/remoting/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 3 06:06:40 EDT 2008
Author: timfox
Date: 2008-10-03 06:06:40 -0400 (Fri, 03 Oct 2008)
New Revision: 5061
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
Log:
More replication work plus replace with new orderedexecutorfactory from david lloyd
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
@@ -418,8 +418,7 @@
for (ClientSessionInternal session : sessions)
{
// Need to get it once for each session to ensure ref count in
- // holder is
- // incremented properly
+ // holder is incremented properly
RemotingConnection backupConnection = connectionRegistry.getConnection(backupConnectorFactory,
backupTransportParams,
pingPeriod,
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
@@ -232,7 +232,7 @@
final RemotingConnection replicatingConnection,
final boolean client)
- {
+ {
this.transportConnection = transportConnection;
this.blockingCallTimeout = blockingCallTimeout;
@@ -281,7 +281,7 @@
pinger = null;
}
}
-
+
// RemotingConnection implementation
// ------------------------------------------------------------
@@ -869,8 +869,8 @@
if (packetConfirmationBatchSize != -1 && ((connection.client && !connection.replicating) || (!connection.client && connection.replicatingConnection == null)))
{
resendCache = new ConcurrentLinkedQueue<Packet>();
-
- nextConfirmation = packetConfirmationBatchSize - 1;
+
+ nextConfirmation = packetConfirmationBatchSize - 1;
}
else
{
@@ -898,6 +898,7 @@
public int getLastReceivedCommandID()
{
+ //log.info("getting last received command id, last received packet is " + this.lastReceivedPacket);
return lastReceivedCommandID;
}
@@ -921,9 +922,7 @@
lock.unlock();
}
- final byte packetType = packet.getType();
-
- if (connection.writePackets || packet.isWriteAlways())
+ if (connection.writePackets || packet.isWriteAlways())
{
connection.doWrite(packet);
}
@@ -933,9 +932,9 @@
private final Object waitLock = new Object();
private Thread blockThread;
-
+
private ResponseNotifier responseNotifier;
-
+
public Executor getExecutor()
{
return executor;
@@ -946,7 +945,7 @@
{
return sendBlocking(packet, null);
}
-
+
// This must never called by more than one thread concurrently
public Packet sendBlocking(final Packet packet, final ResponseNotifier notifier) throws MessagingException
{
@@ -958,15 +957,9 @@
try
{
blockThread = Thread.currentThread();
-
+
responseNotifier = notifier;
- if (connection.destroyed)
- {
- throw new MessagingException(MessagingException.NOT_CONNECTED,
- "Cannot write to connection - it is destroyed");
- }
-
response = null;
packet.setChannelID(id);
@@ -1016,7 +1009,8 @@
if (response == null)
{
- throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "Timed out waiting for response");
+ throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Timed out waiting for response when sending packet " + packet.getType());
}
if (response.getType() == PacketImpl.EXCEPTION)
@@ -1051,7 +1045,7 @@
}
}
}
-
+
public void replicatePacketBlocking(final Packet packet) throws MessagingException
{
if (replicatingChannel != null)
@@ -1073,22 +1067,22 @@
}
synchronized (connection)
- {
+ {
if (!connection.destroyed && connection.channels.remove(id) == null)
{
throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
- }
+ }
}
-
+
if (!onExecutorThread)
{
waitForExecutorToComplete();
}
-
+
if (replicatingChannel != null)
{
replicatingChannel.close(false);
-
+
replicatingChannel = null;
}
@@ -1113,7 +1107,7 @@
{
return replicatingChannel;
}
-
+
private void waitForExecutorToComplete()
{
if (executor != null)
@@ -1208,7 +1202,7 @@
}
}
else if (replicatingChannel != null)
- {
+ {
replicatingChannel.send(packet);
}
else
@@ -1261,10 +1255,10 @@
else if (handler != null)
{
if (executor == null)
- {
+ {
checkConfirmation(packet);
-
- handler.handlePacket(packet);
+
+ handler.handlePacket(packet);
}
else
{
@@ -1273,9 +1267,9 @@
public void run()
{
try
- {
+ {
checkConfirmation(packet);
-
+
handler.handlePacket(packet);
}
catch (Exception e)
@@ -1293,12 +1287,16 @@
}
}
+ private volatile Packet lastReceivedPacket;
+
private void checkConfirmation(final Packet packet)
- {
+ {
if (resendCache != null && packet.isRequiresConfirmations())
{
lastReceivedCommandID++;
+ lastReceivedPacket = packet;
+
if (lastReceivedCommandID == nextConfirmation)
{
final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
@@ -1319,7 +1317,7 @@
}
private void clearUpTo(final int lastReceivedCommandID)
- {
+ {
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
if (numberToClear == -1)
@@ -1333,7 +1331,11 @@
if (packet == null)
{
- throw new IllegalStateException("Can't find packet to clear");
+ throw new IllegalStateException("Can't find packet to clear, client: " + connection.client +
+ " replicating: " + connection.replicating +
+ " last received command id " + lastReceivedCommandID +
+ " first stored command id " + firstStoredCommandID +
+ " channel id " + id);
}
}
@@ -1376,7 +1378,7 @@
// Send ping
final Packet ping = new Ping(expirePeriod);
-
+
pingChannel.send(ping);
}
}
@@ -1394,7 +1396,7 @@
if (stopPinging)
{
future.cancel(true);
- }
+ }
}
else if (type == PING)
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionConsumerCloseMessage.java 2008-10-03 10:06:40 UTC (rev 5061)
@@ -69,6 +69,14 @@
{
consumerID = buffer.getLong();
}
+
+ //Needs to be replicated blocking since otherwise if do a session.close(), then a session2.deletequeue
+ //from a different session, the session2.deletequeue can get to the backup before the close, and
+ //the delete queue can fail with "can't delete it has consumers"
+ public boolean isReplicateBlocking()
+ {
+ return true;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionDeleteQueueMessage.java 2008-10-03 10:06:40 UTC (rev 5061)
@@ -80,6 +80,13 @@
queueName = buffer.getSimpleString();
}
+ //Needs to be true so we can ensure packet has reached backup before we start sending messages to it from another
+ //session
+ public boolean isReplicateBlocking()
+ {
+ return true;
+ }
+
public boolean equals(Object other)
{
if (other instanceof SessionDeleteQueueMessage == false)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java 2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionRemoveDestinationMessage.java 2008-10-03 10:06:40 UTC (rev 5061)
@@ -84,6 +84,13 @@
address = buffer.getSimpleString();
durable = buffer.getBoolean();
}
+
+ //Needs to be true so we can ensure packet has reached backup before we start sending messages to it from another
+ //session
+ public boolean isReplicateBlocking()
+ {
+ return true;
+ }
@Override
public String toString()
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-10-03 10:06:40 UTC (rev 5061)
@@ -393,7 +393,7 @@
{
throw new IllegalArgumentException("Cannot find session with name " + name + " to reattach");
}
-
+
// Reconnect the channel to the new connection
session.transferConnection(connection);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-10-03 10:06:40 UTC (rev 5061)
@@ -181,7 +181,7 @@
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
session.deleteQueue(request.getQueueName());
- response = new NullResponseMessage(false);
+ response = new NullResponseMessage(true);
break;
}
case SESS_QUEUEQUERY:
@@ -312,7 +312,7 @@
{
SessionRemoveDestinationMessage message = (SessionRemoveDestinationMessage)packet;
session.removeDestination(message.getAddress(), message.isDurable());
- response = new NullResponseMessage(false);
+ response = new NullResponseMessage(true);
break;
}
case SESS_START:
@@ -336,7 +336,7 @@
{
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
session.closeConsumer(message.getConsumerID());
- response = new NullResponseMessage(false);
+ response = new NullResponseMessage(true);
break;
}
case SESS_PRODUCER_CLOSE:
Modified: trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java 2008-10-02 13:43:48 UTC (rev 5060)
+++ trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java 2008-10-03 10:06:40 UTC (rev 5061)
@@ -18,22 +18,18 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.util;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedList;
-import java.util.Set;
import java.util.concurrent.Executor;
/**
- * This factory creates a hierarchy of Executor which shares the threads of the
- * parent Executor (typically, the root parent is a Thread pool).
- *
+ * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
+ *
* @author <a href="david.lloyd at jboss.com">David Lloyd</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
* @version <tt>$Revision$</tt>
*
@@ -41,49 +37,97 @@
public final class OrderedExecutorFactory implements ExecutorFactory
{
private final Executor parent;
- private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
+ /**
+ * Construct a new instance delegating to the given parent executor.
+ *
+ * @param parent the parent executor
+ */
public OrderedExecutorFactory(final Executor parent)
{
this.parent = parent;
}
+ /**
+ * Get an executor that always executes tasks in order.
+ *
+ * @return an ordered executor
+ */
public Executor getExecutor()
{
- return new ChildExecutor();
+ return new OrderedExecutor(parent);
}
- private final class ChildExecutor implements Executor, Runnable
+ /**
+ * An executor that always runs all tasks in order, using a delegate executor to run the tasks.
+ * <p/>
+ * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
+ * same method, will result in B's task running after A's.
+ */
+ private static final class OrderedExecutor implements Executor
{
+ // @protectedby tasks
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
- public void execute(Runnable command)
+ // @protectedby tasks
+ private boolean running;
+
+ private final Executor parent;
+
+ private final Runnable runner;
+
+ /**
+ * Construct a new instance.
+ *
+ * @param parent the parent executor
+ */
+ public OrderedExecutor(final Executor parent)
{
- synchronized (tasks)
+ this.parent = parent;
+ runner = new Runnable()
{
- tasks.add(command);
- if (tasks.size() == 1 && runningChildren.add(this))
+ public void run()
{
- parent.execute(this);
+ for (;;)
+ {
+ final Runnable task;
+ synchronized (tasks)
+ {
+ task = tasks.poll();
+ if (task == null)
+ {
+ running = false;
+ return;
+ }
+ }
+ try
+ {
+ task.run();
+ }
+ catch (Throwable t)
+ {
+ // eat it!
+ }
+ }
}
- }
+ };
}
- public void run()
+ /**
+ * Run a task.
+ *
+ * @param command the task to run.
+ */
+ public void execute(Runnable command)
{
- for (;;)
+ synchronized (tasks)
{
- final Runnable task;
- synchronized (tasks)
+ tasks.add(command);
+ if (!running)
{
- task = tasks.poll();
- if (task == null)
- {
- runningChildren.remove(this);
- return;
- }
+ running = true;
+ parent.execute(runner);
}
- task.run();
}
}
}
More information about the jboss-cvs-commits
mailing list