[jboss-cvs] JBoss Messaging SVN: r7381 - in trunk/src: main/org/jboss/messaging/core/remoting and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 17 12:44:51 EDT 2009
Author: timfox
Date: 2009-06-17 12:44:51 -0400 (Wed, 17 Jun 2009)
New Revision: 7381
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
Modified:
trunk/src/config/common/version.properties
trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
Log:
factored channel out of remoting connection
Modified: trunk/src/config/common/version.properties
===================================================================
--- trunk/src/config/common/version.properties 2009-06-17 13:43:23 UTC (rev 7380)
+++ trunk/src/config/common/version.properties 2009-06-17 16:44:51 UTC (rev 7381)
@@ -1,7 +1,7 @@
-messaging.version.versionName=larvae
+messaging.version.versionName=maggot
messaging.version.majorVersion=2
messaging.version.minorVersion=0
messaging.version.microVersion=0
-messaging.version.incrementingVersion=103
-messaging.version.versionSuffix=BETA2
-messaging.version.versionTag=beta2
+messaging.version.incrementingVersion=104
+messaging.version.versionSuffix=BETA3
+messaging.version.versionTag=beta3
Modified: trunk/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-06-17 13:43:23 UTC (rev 7380)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-06-17 16:44:51 UTC (rev 7381)
@@ -60,5 +60,9 @@
void setCommandConfirmationHandler(CommandConfirmationHandler handler);
- void flushConfirmations();
+ void flushConfirmations();
+
+ void handlePacket(Packet packet);
+
+ void waitForAllReplicationResponse();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-06-17 13:43:23 UTC (rev 7380)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-06-17 16:44:51 UTC (rev 7381)
@@ -32,6 +32,10 @@
String getRemoteAddress();
Channel getChannel(long channelID, int windowSize, boolean block);
+
+ void putChannel(long channelID, Channel channel);
+
+ boolean removeChannel(long channelID);
long generateChannelID();
@@ -62,4 +66,14 @@
void freeze();
Connection getTransportConnection();
+
+ boolean isActive();
+
+ boolean isClient();
+
+ boolean isDestroyed();
+
+ long getBlockingCallTimeout();
+
+ Object getTransferLock();
}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-06-17 16:44:51 UTC (rev 7381)
@@ -0,0 +1,766 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A ChannelImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ChannelImpl implements Channel
+{
+ private static final Logger log = Logger.getLogger(ChannelImpl.class);
+
+ private volatile long id;
+
+ private ChannelHandler handler;
+
+ private Packet response;
+
+ private final java.util.Queue<Packet> resendCache;
+
+ private volatile int firstStoredCommandID;
+
+ private volatile int lastReceivedCommandID = -1;
+
+ private volatile RemotingConnection connection;
+
+ private volatile boolean closed;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition sendCondition = lock.newCondition();
+
+ private final Condition failoverCondition = lock.newCondition();
+
+ private final Object sendLock = new Object();
+
+ private final Object sendBlockingLock = new Object();
+
+ private final Object replicationLock = new Object();
+
+ private boolean failingOver;
+
+ private final Queue<Runnable> responseActions = new ConcurrentLinkedQueue<Runnable>();
+
+ private final int windowSize;
+
+ private final int confWindowSize;
+
+ private final Semaphore sendSemaphore;
+
+ private int receivedBytes;
+
+ private CommandConfirmationHandler commandConfirmationHandler;
+
+ private int responseActionCount;
+
+ private boolean playedResponsesOnFailure;
+
+ public ChannelImpl(final RemotingConnection connection, final long id, final int windowSize, final boolean block)
+ {
+ this.connection = connection;
+
+ this.id = id;
+
+ this.windowSize = windowSize;
+
+ this.confWindowSize = (int)(0.75 * windowSize);
+
+ if (this.windowSize != -1)
+ {
+ resendCache = new ConcurrentLinkedQueue<Packet>();
+
+ if (block)
+ {
+ sendSemaphore = new Semaphore(windowSize, true);
+ }
+ else
+ {
+ sendSemaphore = null;
+ }
+ }
+ else
+ {
+ resendCache = null;
+
+ sendSemaphore = null;
+ }
+ }
+
+ public long getID()
+ {
+ return id;
+ }
+
+ public int getLastReceivedCommandID()
+ {
+ return lastReceivedCommandID;
+ }
+
+ public Lock getLock()
+ {
+ return lock;
+ }
+
+ public void returnBlocking()
+ {
+ lock.lock();
+
+ try
+ {
+ response = new PacketImpl(EARLY_RESPONSE);
+
+ sendCondition.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void sendAndFlush(final Packet packet)
+ {
+ send(packet, true);
+ }
+
+ public void send(final Packet packet)
+ {
+ send(packet, false);
+ }
+
+ // This must never called by more than one thread concurrently
+ public void send(final Packet packet, final boolean flush)
+ {
+ synchronized (sendLock)
+ {
+ packet.setChannelID(id);
+
+ final MessagingBuffer buffer = connection.getTransportConnection()
+ .createBuffer(packet.getRequiredBufferSize());
+
+ int size = packet.encode(buffer);
+
+ // Must block on semaphore outside the main lock or this can prevent failover from occurring
+ if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
+ {
+ try
+ {
+ sendSemaphore.acquire(size);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Semaphore interrupted");
+ }
+ }
+
+ lock.lock();
+
+ try
+ {
+ while (failingOver)
+ {
+ // TODO - don't hardcode this timeout
+ try
+ {
+ failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ if (resendCache != null && packet.isRequiresConfirmations())
+ {
+ resendCache.add(packet);
+ }
+
+ if (connection.isActive() || packet.isWriteAlways())
+ {
+ connection.getTransportConnection().write(buffer, flush);
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ }
+
+ public Packet sendBlocking(final Packet packet) throws MessagingException
+ {
+ if (closed)
+ {
+ throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
+ }
+
+ if (connection.getBlockingCallTimeout() == -1)
+ {
+ throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
+ }
+
+ // Synchronized since can't be called concurrently by more than one thread and this can occur
+ // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
+ synchronized (sendBlockingLock)
+ {
+ packet.setChannelID(id);
+
+ final MessagingBuffer buffer = connection.getTransportConnection()
+ .createBuffer(packet.getRequiredBufferSize());
+
+ int size = packet.encode(buffer);
+
+ // Must block on semaphore outside the main lock or this can prevent failover from occurring
+ if (sendSemaphore != null)
+ {
+ try
+ {
+ sendSemaphore.acquire(size);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Semaphore interrupted");
+ }
+ }
+
+ lock.lock();
+
+ try
+ {
+ while (failingOver)
+ {
+ // TODO - don't hardcode this timeout
+ try
+ {
+ failoverCondition.await(10000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ response = null;
+
+ if (resendCache != null && packet.isRequiresConfirmations())
+ {
+ resendCache.add(packet);
+ }
+
+ connection.getTransportConnection().write(buffer);
+
+ long toWait = connection.getBlockingCallTimeout();
+
+ long start = System.currentTimeMillis();
+
+ while (response == null && toWait > 0)
+ {
+ try
+ {
+ sendCondition.await(toWait, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ if (closed)
+ {
+ break;
+ }
+
+ final long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (response == null)
+ {
+ throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
+ "Timed out waiting for response when sending packet " + packet.getType());
+ }
+
+ if (response.getType() == PacketImpl.EXCEPTION)
+ {
+ final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
+
+ throw mem.getException();
+ }
+ else
+ {
+ return response;
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ }
+
+ // Must be synchronized since can be called by incoming session commands but also by deliveries
+ // Also needs to be synchronized with respect to replicatingChannelDead
+ public void replicatePacket(final Packet packet, final long replicatedChannelID, final Runnable action)
+ {
+ packet.setChannelID(replicatedChannelID);
+
+ boolean runItNow = false;
+
+ synchronized (replicationLock)
+ {
+ if (playedResponsesOnFailure && action != null)
+ {
+ // Already replicating channel failed, so just play the action now
+
+ runItNow = true;
+ }
+ else
+ {
+ if (action != null)
+ {
+ responseActions.add(action);
+
+ responseActionCount++;
+ }
+
+ final MessagingBuffer buffer = connection.getTransportConnection()
+ .createBuffer(packet.getRequiredBufferSize());
+
+ packet.encode(buffer);
+
+ connection.getTransportConnection().write(buffer);
+ }
+ }
+
+ // Execute outside lock
+
+ if (runItNow)
+ {
+ action.run();
+ }
+ }
+
+ public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
+ {
+ this.commandConfirmationHandler = handler;
+ }
+
+ public void executeOutstandingDelayedResults()
+ {
+ // Execute on different thread to avoid deadlock
+
+ new Thread()
+ {
+ public void run()
+ {
+ doExecuteOutstandingDelayedResults();
+ }
+ }.start();
+ }
+
+ private void doExecuteOutstandingDelayedResults()
+ {
+ List<Runnable> toRun = new ArrayList<Runnable>();
+
+ synchronized (replicationLock)
+ {
+ // Execute all the response actions now
+
+ while (true)
+ {
+ Runnable action = responseActions.poll();
+
+ if (action != null)
+ {
+ toRun.add(action);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ responseActionCount = 0;
+
+ playedResponsesOnFailure = true;
+
+ for (Runnable action : toRun)
+ {
+ action.run();
+ }
+ }
+
+ }
+
+ public void setHandler(final ChannelHandler handler)
+ {
+ this.handler = handler;
+ }
+
+ public ChannelHandler getHandler()
+ {
+ return handler;
+ }
+
+ public void close()
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ if (!connection.isDestroyed() && !connection.removeChannel(id))
+ {
+ throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
+ }
+
+ closed = true;
+ }
+
+ public void transferConnection(final RemotingConnection newConnection,
+ final long newChannelID,
+ final Channel replicatingChannel)
+ {
+ // Needs to synchronize on the connection to make sure no packets from
+ // the old connection get processed after transfer has occurred
+ synchronized (connection.getTransferLock())
+ {
+ connection.removeChannel(id);
+
+ if (replicatingChannel != null)
+ {
+ // If we're reconnecting to a live node which is replicated then there will be a replicating channel
+ // too. We need to then make sure that all replication responses come back since packets aren't
+ // considered confirmed until response comes back and is processed. Otherwise responses to previous
+ // message sends could come back after reconnection resulting in clients resending same message
+ // since it wasn't confirmed yet.
+ replicatingChannel.waitForAllReplicationResponse();
+ }
+
+ // And switch it
+
+ final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+
+ rnewConnection.putChannel(newChannelID, this);
+
+ connection = rnewConnection;
+
+ this.id = newChannelID;
+ }
+ }
+
+ public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
+ {
+ clearUpTo(otherLastReceivedCommandID);
+
+ for (final Packet packet : resendCache)
+ {
+ packet.setChannelID(newChannelID);
+
+ doWrite(packet);
+ }
+ }
+
+ public void lock()
+ {
+ lock.lock();
+
+ failingOver = true;
+
+ lock.unlock();
+ }
+
+ public void unlock()
+ {
+ lock.lock();
+
+ failingOver = false;
+
+ failoverCondition.signalAll();
+
+ lock.unlock();
+ }
+
+ public RemotingConnection getConnection()
+ {
+ return connection;
+ }
+
+ public void flushConfirmations()
+ {
+ if (receivedBytes != 0 && connection.isActive())
+ {
+ receivedBytes = 0;
+
+ final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+
+ confirmed.setChannelID(id);
+
+ doWrite(confirmed);
+ }
+ }
+
+ public void confirm(final Packet packet)
+ {
+ if (resendCache != null && packet.isRequiresConfirmations())
+ {
+ lastReceivedCommandID++;
+
+ receivedBytes += packet.getPacketSize();
+
+ if (receivedBytes >= confWindowSize)
+ {
+ receivedBytes = 0;
+
+ if (connection.isActive())
+ {
+ final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
+
+ confirmed.setChannelID(id);
+
+ doWrite(confirmed);
+ }
+ }
+ }
+ }
+
+ public void handlePacket(final Packet packet)
+ {
+ if (packet.getType() == PACKETS_CONFIRMED)
+ {
+ if (resendCache != null)
+ {
+ final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
+
+ clearUpTo(msg.getCommandID());
+ }
+
+ if (!connection.isClient())
+ {
+ handler.handlePacket(packet);
+ }
+
+ return;
+ }
+ else if (packet.getType() == REPLICATION_RESPONSE)
+ {
+ replicateResponseReceived();
+
+ return;
+ }
+ else
+ {
+ if (packet.isResponse())
+ {
+ response = packet;
+
+ confirm(packet);
+
+ lock.lock();
+
+ try
+ {
+ sendCondition.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ else if (handler != null)
+ {
+ handler.handlePacket(packet);
+ }
+ }
+
+ replicateComplete();
+ }
+
+ public void waitForAllReplicationResponse()
+ {
+ synchronized (replicationLock)
+ {
+ long toWait = 10000; // TODO don't hardcode timeout
+
+ long start = System.currentTimeMillis();
+
+ while (responseActionCount > 0 && toWait > 0)
+ {
+ try
+ {
+ replicationLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ log.warn("Timed out waiting for replication responses to return");
+ }
+ }
+ }
+
+ private void replicateComplete()
+ {
+ if (!connection.isActive() && id != 0)
+ {
+ // We're on backup and not ping channel so send back a replication response
+
+ Packet packet = new PacketImpl(REPLICATION_RESPONSE);
+
+ packet.setChannelID(2);
+
+ doWrite(packet);
+ }
+ }
+
+ // This will never get called concurrently by more than one thread
+
+ // TODO it's not ideal synchronizing this since it forms a contention point with replication
+ // but we need to do this to protect it w.r.t. the check on replicatingChannel
+ private void replicateResponseReceived()
+ {
+ Runnable result = null;
+
+ synchronized (replicationLock)
+ {
+ if (playedResponsesOnFailure)
+ {
+ return;
+ }
+
+ result = responseActions.poll();
+
+ if (result == null)
+ {
+ throw new IllegalStateException("Cannot find response action");
+ }
+ }
+
+ // Must execute outside of lock
+ if (result != null)
+ {
+ result.run();
+
+ // TODO - we can optimise this not to lock every time - only if waiting for all replications to return
+ synchronized (replicationLock)
+ {
+ responseActionCount--;
+
+ if (responseActionCount == 0)
+ {
+ replicationLock.notify();
+ }
+ }
+ }
+ }
+
+ private void doWrite(final Packet packet)
+ {
+ final MessagingBuffer buffer = connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
+
+ packet.encode(buffer);
+
+ connection.getTransportConnection().write(buffer);
+ }
+
+ private void clearUpTo(final int lastReceivedCommandID)
+ {
+ final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
+
+ if (numberToClear == -1)
+ {
+ throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
+ }
+
+ int sizeToFree = 0;
+
+ for (int i = 0; i < numberToClear; i++)
+ {
+ final Packet packet = resendCache.poll();
+
+ if (packet == null)
+ {
+ throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
+ " last received command id " +
+ lastReceivedCommandID +
+ " first stored command id " +
+ firstStoredCommandID);
+ }
+
+ if (packet.getType() != PACKETS_CONFIRMED)
+ {
+ sizeToFree += packet.getPacketSize();
+ }
+
+ if (commandConfirmationHandler != null)
+ {
+ commandConfirmationHandler.commandConfirmed(packet);
+ }
+ }
+
+ firstStoredCommandID += numberToClear;
+
+ if (sendSemaphore != null)
+ {
+ sendSemaphore.release(sizeToFree);
+ }
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-06-17 16:44:51 UTC (rev 7381)
@@ -0,0 +1,449 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.SessionReplicateDeliveryMessage;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A PacketDecoder
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class PacketDecoder
+{
+ public Packet decode(final MessagingBuffer in)
+ {
+ final byte packetType = in.readByte();
+
+ Packet packet;
+
+ switch (packetType)
+ {
+ case PING:
+ {
+ packet = new Ping();
+ break;
+ }
+ case DISCONNECT:
+ {
+ packet = new PacketImpl(DISCONNECT);
+ break;
+ }
+ case EXCEPTION:
+ {
+ packet = new MessagingExceptionMessage();
+ break;
+ }
+ case PACKETS_CONFIRMED:
+ {
+ packet = new PacketsConfirmedMessage();
+ break;
+ }
+ case REPLICATION_RESPONSE:
+ {
+ packet = new PacketImpl(REPLICATION_RESPONSE);
+ break;
+ }
+ case CREATESESSION:
+ {
+ packet = new CreateSessionMessage();
+ break;
+ }
+ case REPLICATE_CREATESESSION:
+ {
+ packet = new ReplicateCreateSessionMessage();
+ break;
+ }
+ case CREATESESSION_RESP:
+ {
+ packet = new CreateSessionResponseMessage();
+ break;
+ }
+ case REATTACH_SESSION:
+ {
+ packet = new ReattachSessionMessage();
+ break;
+ }
+ case REATTACH_SESSION_RESP:
+ {
+ packet = new ReattachSessionResponseMessage();
+ break;
+ }
+ case SESS_FAILOVER_COMPLETE:
+ {
+ packet = new SessionFailoverCompleteMessage();
+ break;
+ }
+ case SESS_CLOSE:
+ {
+ packet = new SessionCloseMessage();
+ break;
+ }
+ case SESS_CREATECONSUMER:
+ {
+ packet = new SessionCreateConsumerMessage();
+ break;
+ }
+ case SESS_ACKNOWLEDGE:
+ {
+ packet = new SessionAcknowledgeMessage();
+ break;
+ }
+ case SESS_EXPIRED:
+ {
+ packet = new SessionExpiredMessage();
+ break;
+ }
+ case SESS_COMMIT:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_COMMIT);
+ break;
+ }
+ case SESS_ROLLBACK:
+ {
+ packet = new RollbackMessage();
+ break;
+ }
+ case SESS_QUEUEQUERY:
+ {
+ packet = new SessionQueueQueryMessage();
+ break;
+ }
+ case SESS_QUEUEQUERY_RESP:
+ {
+ packet = new SessionQueueQueryResponseMessage();
+ break;
+ }
+ case CREATE_QUEUE:
+ {
+ packet = new CreateQueueMessage();
+ break;
+ }
+ case DELETE_QUEUE:
+ {
+ packet = new SessionDeleteQueueMessage();
+ break;
+ }
+ case SESS_BINDINGQUERY:
+ {
+ packet = new SessionBindingQueryMessage();
+ break;
+ }
+ case SESS_BINDINGQUERY_RESP:
+ {
+ packet = new SessionBindingQueryResponseMessage();
+ break;
+ }
+ case SESS_XA_START:
+ {
+ packet = new SessionXAStartMessage();
+ break;
+ }
+ case SESS_XA_END:
+ {
+ packet = new SessionXAEndMessage();
+ break;
+ }
+ case SESS_XA_COMMIT:
+ {
+ packet = new SessionXACommitMessage();
+ break;
+ }
+ case SESS_XA_PREPARE:
+ {
+ packet = new SessionXAPrepareMessage();
+ break;
+ }
+ case SESS_XA_RESP:
+ {
+ packet = new SessionXAResponseMessage();
+ break;
+ }
+ case SESS_XA_ROLLBACK:
+ {
+ packet = new SessionXARollbackMessage();
+ break;
+ }
+ case SESS_XA_JOIN:
+ {
+ packet = new SessionXAJoinMessage();
+ break;
+ }
+ case SESS_XA_SUSPEND:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+ break;
+ }
+ case SESS_XA_RESUME:
+ {
+ packet = new SessionXAResumeMessage();
+ break;
+ }
+ case SESS_XA_FORGET:
+ {
+ packet = new SessionXAForgetMessage();
+ break;
+ }
+ case SESS_XA_INDOUBT_XIDS:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
+ break;
+ }
+ case SESS_XA_INDOUBT_XIDS_RESP:
+ {
+ packet = new SessionXAGetInDoubtXidsResponseMessage();
+ break;
+ }
+ case SESS_XA_SET_TIMEOUT:
+ {
+ packet = new SessionXASetTimeoutMessage();
+ break;
+ }
+ case SESS_XA_SET_TIMEOUT_RESP:
+ {
+ packet = new SessionXASetTimeoutResponseMessage();
+ break;
+ }
+ case SESS_XA_GET_TIMEOUT:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
+ break;
+ }
+ case SESS_XA_GET_TIMEOUT_RESP:
+ {
+ packet = new SessionXAGetTimeoutResponseMessage();
+ break;
+ }
+ case SESS_START:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_START);
+ break;
+ }
+ case SESS_STOP:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_STOP);
+ break;
+ }
+ case SESS_FLOWTOKEN:
+ {
+ packet = new SessionConsumerFlowCreditMessage();
+ break;
+ }
+ case SESS_SEND:
+ {
+ packet = new SessionSendMessage();
+ break;
+ }
+ case SESS_SEND_LARGE:
+ {
+ packet = new SessionSendLargeMessage();
+ break;
+ }
+ case SESS_RECEIVE_MSG:
+ {
+ packet = new SessionReceiveMessage();
+ break;
+ }
+ case SESS_CONSUMER_CLOSE:
+ {
+ packet = new SessionConsumerCloseMessage();
+ break;
+ }
+ case NULL_RESPONSE:
+ {
+ packet = new NullResponseMessage();
+ break;
+ }
+ case SESS_RECEIVE_CONTINUATION:
+ {
+ packet = new SessionReceiveContinuationMessage();
+ break;
+ }
+ case SESS_SEND_CONTINUATION:
+ {
+ packet = new SessionSendContinuationMessage();
+ break;
+ }
+ case REPLICATE_ADD_REMOTE_QUEUE_BINDING:
+ {
+ packet = new ReplicateRemoteBindingAddedMessage();
+ break;
+ }
+ case REPLICATE_REMOVE_REMOTE_QUEUE_BINDING:
+ {
+ packet = new ReplicateRemoteBindingRemovedMessage();
+ break;
+ }
+ case REPLICATE_ADD_REMOTE_CONSUMER:
+ {
+ packet = new ReplicateRemoteConsumerAddedMessage();
+ break;
+ }
+ case REPLICATE_REMOVE_REMOTE_CONSUMER:
+ {
+ packet = new ReplicateRemoteConsumerRemovedMessage();
+ break;
+ }
+ case SESS_REPLICATE_DELIVERY:
+ {
+ packet = new SessionReplicateDeliveryMessage();
+ break;
+ }
+ case REPLICATE_STARTUP_INFO:
+ {
+ packet = new ReplicateStartupInfoMessage();
+ break;
+ }
+ case REPLICATE_ACKNOWLEDGE:
+ {
+ packet = new ReplicateAcknowledgeMessage();
+ break;
+ }
+ case REPLICATE_REDISTRIBUTION:
+ {
+ packet = new ReplicateRedistributionMessage();
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Invalid type: " + packetType);
+ }
+ }
+
+ packet.decode(in);
+
+ return packet;
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-06-17 13:43:23 UTC (rev 7380)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-06-17 16:44:51 UTC (rev 7381)
@@ -12,140 +12,21 @@
package org.jboss.messaging.core.remoting.impl;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.CloseListener;
-import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.SessionReplicateDeliveryMessage;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
import org.jboss.messaging.core.remoting.spi.Connector;
@@ -170,7 +51,7 @@
public static RemotingConnection createConnection(final ConnectorFactory connectorFactory,
final Map<String, Object> params,
- final long callTimeout,
+ final long callTimeout,
final Executor threadPool,
final ConnectionLifeCycleListener listener)
{
@@ -192,31 +73,19 @@
return null;
}
- RemotingConnection connection = new RemotingConnectionImpl(tc,
- callTimeout,
- null);
+ RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, null);
handler.conn = connection;
return connection;
}
- private static class DelegatingBufferHandler extends AbstractBufferHandler
- {
- RemotingConnection conn;
-
- public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
- {
- conn.bufferReceived(connectionID, buffer);
- }
- }
-
// Attributes
// -----------------------------------------------------------------------------------
private final Connection transportConnection;
- private final Map<Long, ChannelImpl> channels = new ConcurrentHashMap<Long, ChannelImpl>();
+ private final Map<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>();
private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
@@ -226,8 +95,6 @@
private final List<Interceptor> interceptors;
- private ScheduledFuture<?> future;
-
private volatile boolean destroyed;
private volatile boolean active;
@@ -247,11 +114,9 @@
private boolean frozen;
private final Object failLock = new Object();
-
- // debug only stuff
-
- private boolean createdActive;
-
+
+ private final PacketDecoder decoder = new PacketDecoder();
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -259,14 +124,10 @@
* Create a client side connection
*/
public RemotingConnectionImpl(final Connection transportConnection,
- final long blockingCallTimeout,
+ final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection,
- blockingCallTimeout,
- interceptors,
- true,
- true);
+ this(transportConnection, blockingCallTimeout, interceptors, true, true);
}
/*
@@ -281,7 +142,7 @@
}
private RemotingConnectionImpl(final Connection transportConnection,
- final long blockingCallTimeout,
+ final long blockingCallTimeout,
final List<Interceptor> interceptors,
final boolean active,
final boolean client)
@@ -296,8 +157,6 @@
this.active = active;
this.client = client;
-
- this.createdActive = active;
}
// RemotingConnection implementation
@@ -332,7 +191,7 @@
public synchronized Channel getChannel(final long channelID, final int windowSize, final boolean block)
{
- ChannelImpl channel = channels.get(channelID);
+ Channel channel = channels.get(channelID);
if (channel == null)
{
@@ -344,6 +203,16 @@
return channel;
}
+ public synchronized boolean removeChannel(final long channelID)
+ {
+ return channels.remove(channelID) != null;
+ }
+
+ public synchronized void putChannel(final long channelID, final Channel channel)
+ {
+ channels.put(channelID, channel);
+ }
+
public void addFailureListener(final FailureListener listener)
{
if (listener == null)
@@ -363,17 +232,17 @@
return failureListeners.remove(listener);
}
-
+
public void addCloseListener(CloseListener listener)
{
if (listener == null)
{
throw new IllegalStateException("CloseListener cannot be null");
}
-
+
closeListeners.add(listener);
}
-
+
public boolean removeCloseListener(final CloseListener listener)
{
if (listener == null)
@@ -413,17 +282,18 @@
internalClose();
- for (ChannelImpl channel : channels.values())
+ for (Channel channel : channels.values())
{
- channel.lock.lock();
- try
- {
- channel.sendCondition.signalAll();
- }
- finally
- {
- channel.lock.unlock();
- }
+ // channel.lock.lock();
+ // try
+ // {
+ // channel.sendCondition.signalAll();
+ // }
+ // finally
+ // {
+ // channel.lock.unlock();
+ // }
+ channel.returnBlocking();
}
}
@@ -464,19 +334,66 @@
return idGenerator.getCurrentID();
}
+ public Object getTransferLock()
+ {
+ return transferLock;
+ }
+
+ public boolean isActive()
+ {
+ return active;
+ }
+
+ public boolean isClient()
+ {
+ return client;
+ }
+
+ public boolean isDestroyed()
+ {
+ return destroyed;
+ }
+
+ public long getBlockingCallTimeout()
+ {
+ return blockingCallTimeout;
+ }
+
// Buffer Handler implementation
// ----------------------------------------------------
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
- final Packet packet = decode(buffer);
-
+ final Packet packet = decoder.decode(buffer);
+
synchronized (transferLock)
{
if (!frozen)
{
- final ChannelImpl channel = channels.get(packet.getChannelID());
+ if (interceptors != null)
+ {
+ for (final Interceptor interceptor : interceptors)
+ {
+ try
+ {
+ boolean callNext = interceptor.intercept(packet, this);
+ if (!callNext)
+ {
+ // abort
+
+ return;
+ }
+ }
+ catch (final Throwable e)
+ {
+ log.warn("Failure in calling interceptor: " + interceptor, e);
+ }
+ }
+ }
+
+ final Channel channel = channels.get(packet.getChannelID());
+
if (channel != null)
{
channel.handlePacket(packet);
@@ -556,11 +473,6 @@
private void internalClose()
{
- if (future != null)
- {
- future.cancel(false);
- }
-
// We close the underlying transport connection
transportConnection.close();
@@ -569,1054 +481,15 @@
channel.close();
}
}
-
- private Packet decode(final MessagingBuffer in)
+
+ private static class DelegatingBufferHandler extends AbstractBufferHandler
{
- final byte packetType = in.readByte();
+ RemotingConnection conn;
- Packet packet;
-
- switch (packetType)
+ public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
- case PING:
- {
- packet = new Ping();
- break;
- }
- case DISCONNECT:
- {
- packet = new PacketImpl(DISCONNECT);
- break;
- }
- case EXCEPTION:
- {
- packet = new MessagingExceptionMessage();
- break;
- }
- case PACKETS_CONFIRMED:
- {
- packet = new PacketsConfirmedMessage();
- break;
- }
- case REPLICATION_RESPONSE:
- {
- packet = new PacketImpl(REPLICATION_RESPONSE);
- break;
- }
- case CREATESESSION:
- {
- packet = new CreateSessionMessage();
- break;
- }
- case REPLICATE_CREATESESSION:
- {
- packet = new ReplicateCreateSessionMessage();
- break;
- }
- case CREATESESSION_RESP:
- {
- packet = new CreateSessionResponseMessage();
- break;
- }
- case REATTACH_SESSION:
- {
- packet = new ReattachSessionMessage();
- break;
- }
- case REATTACH_SESSION_RESP:
- {
- packet = new ReattachSessionResponseMessage();
- break;
- }
- case SESS_FAILOVER_COMPLETE:
- {
- packet = new SessionFailoverCompleteMessage();
- break;
- }
- case SESS_CLOSE:
- {
- packet = new SessionCloseMessage();
- break;
- }
- case SESS_CREATECONSUMER:
- {
- packet = new SessionCreateConsumerMessage();
- break;
- }
- case SESS_ACKNOWLEDGE:
- {
- packet = new SessionAcknowledgeMessage();
- break;
- }
- case SESS_EXPIRED:
- {
- packet = new SessionExpiredMessage();
- break;
- }
- case SESS_COMMIT:
- {
- packet = new PacketImpl(PacketImpl.SESS_COMMIT);
- break;
- }
- case SESS_ROLLBACK:
- {
- packet = new RollbackMessage();
- break;
- }
- case SESS_QUEUEQUERY:
- {
- packet = new SessionQueueQueryMessage();
- break;
- }
- case SESS_QUEUEQUERY_RESP:
- {
- packet = new SessionQueueQueryResponseMessage();
- break;
- }
- case CREATE_QUEUE:
- {
- packet = new CreateQueueMessage();
- break;
- }
- case DELETE_QUEUE:
- {
- packet = new SessionDeleteQueueMessage();
- break;
- }
- case SESS_BINDINGQUERY:
- {
- packet = new SessionBindingQueryMessage();
- break;
- }
- case SESS_BINDINGQUERY_RESP:
- {
- packet = new SessionBindingQueryResponseMessage();
- break;
- }
- case SESS_XA_START:
- {
- packet = new SessionXAStartMessage();
- break;
- }
- case SESS_XA_END:
- {
- packet = new SessionXAEndMessage();
- break;
- }
- case SESS_XA_COMMIT:
- {
- packet = new SessionXACommitMessage();
- break;
- }
- case SESS_XA_PREPARE:
- {
- packet = new SessionXAPrepareMessage();
- break;
- }
- case SESS_XA_RESP:
- {
- packet = new SessionXAResponseMessage();
- break;
- }
- case SESS_XA_ROLLBACK:
- {
- packet = new SessionXARollbackMessage();
- break;
- }
- case SESS_XA_JOIN:
- {
- packet = new SessionXAJoinMessage();
- break;
- }
- case SESS_XA_SUSPEND:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
- break;
- }
- case SESS_XA_RESUME:
- {
- packet = new SessionXAResumeMessage();
- break;
- }
- case SESS_XA_FORGET:
- {
- packet = new SessionXAForgetMessage();
- break;
- }
- case SESS_XA_INDOUBT_XIDS:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
- break;
- }
- case SESS_XA_INDOUBT_XIDS_RESP:
- {
- packet = new SessionXAGetInDoubtXidsResponseMessage();
- break;
- }
- case SESS_XA_SET_TIMEOUT:
- {
- packet = new SessionXASetTimeoutMessage();
- break;
- }
- case SESS_XA_SET_TIMEOUT_RESP:
- {
- packet = new SessionXASetTimeoutResponseMessage();
- break;
- }
- case SESS_XA_GET_TIMEOUT:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
- break;
- }
- case SESS_XA_GET_TIMEOUT_RESP:
- {
- packet = new SessionXAGetTimeoutResponseMessage();
- break;
- }
- case SESS_START:
- {
- packet = new PacketImpl(PacketImpl.SESS_START);
- break;
- }
- case SESS_STOP:
- {
- packet = new PacketImpl(PacketImpl.SESS_STOP);
- break;
- }
- case SESS_FLOWTOKEN:
- {
- packet = new SessionConsumerFlowCreditMessage();
- break;
- }
- case SESS_SEND:
- {
- packet = new SessionSendMessage();
- break;
- }
- case SESS_SEND_LARGE:
- {
- packet = new SessionSendLargeMessage();
- break;
- }
- case SESS_RECEIVE_MSG:
- {
- packet = new SessionReceiveMessage();
- break;
- }
- case SESS_CONSUMER_CLOSE:
- {
- packet = new SessionConsumerCloseMessage();
- break;
- }
- case NULL_RESPONSE:
- {
- packet = new NullResponseMessage();
- break;
- }
- case SESS_RECEIVE_CONTINUATION:
- {
- packet = new SessionReceiveContinuationMessage();
- break;
- }
- case SESS_SEND_CONTINUATION:
- {
- packet = new SessionSendContinuationMessage();
- break;
- }
- case REPLICATE_ADD_REMOTE_QUEUE_BINDING:
- {
- packet = new ReplicateRemoteBindingAddedMessage();
- break;
- }
- case REPLICATE_REMOVE_REMOTE_QUEUE_BINDING:
- {
- packet = new ReplicateRemoteBindingRemovedMessage();
- break;
- }
- case REPLICATE_ADD_REMOTE_CONSUMER:
- {
- packet = new ReplicateRemoteConsumerAddedMessage();
- break;
- }
- case REPLICATE_REMOVE_REMOTE_CONSUMER:
- {
- packet = new ReplicateRemoteConsumerRemovedMessage();
- break;
- }
- case SESS_REPLICATE_DELIVERY:
- {
- packet = new SessionReplicateDeliveryMessage();
- break;
- }
- case REPLICATE_STARTUP_INFO:
- {
- packet = new ReplicateStartupInfoMessage();
- break;
- }
- case REPLICATE_ACKNOWLEDGE:
- {
- packet = new ReplicateAcknowledgeMessage();
- break;
- }
- case REPLICATE_REDISTRIBUTION:
- {
- packet = new ReplicateRedistributionMessage();
- break;
- }
- default:
- {
- throw new IllegalArgumentException("Invalid type: " + packetType);
- }
+ conn.bufferReceived(connectionID, buffer);
}
-
- packet.decode(in);
-
- return packet;
}
- // Inner classes
- // --------------------------------------------------------------------------------
-
- // Needs to be static so we can re-assign it to another remotingconnection
- private static class ChannelImpl implements Channel
- {
- private volatile long id;
-
- private ChannelHandler handler;
-
- private Packet response;
-
- private final java.util.Queue<Packet> resendCache;
-
- private volatile int firstStoredCommandID;
-
- private volatile int lastReceivedCommandID = -1;
-
- private volatile RemotingConnectionImpl connection;
-
- private volatile boolean closed;
-
- private final Lock lock = new ReentrantLock();
-
- private final Condition sendCondition = lock.newCondition();
-
- private final Condition failoverCondition = lock.newCondition();
-
- private final Object sendLock = new Object();
-
- private final Object sendBlockingLock = new Object();
-
- private final Object replicationLock = new Object();
-
- private boolean failingOver;
-
- private final Queue<Runnable> responseActions = new ConcurrentLinkedQueue<Runnable>();
-
- private final int windowSize;
-
- private final int confWindowSize;
-
- private final Semaphore sendSemaphore;
-
- private int receivedBytes;
-
- private CommandConfirmationHandler commandConfirmationHandler;
-
- private int responseActionCount;
-
- private boolean playedResponsesOnFailure;
-
- public void setCommandConfirmationHandler(final CommandConfirmationHandler handler)
- {
- this.commandConfirmationHandler = handler;
- }
-
- private ChannelImpl(final RemotingConnectionImpl connection,
- final long id,
- final int windowSize,
- final boolean block)
- {
- this.connection = connection;
-
- this.id = id;
-
- this.windowSize = windowSize;
-
- this.confWindowSize = (int)(0.75 * windowSize);
-
- if (this.windowSize != -1)
- {
- resendCache = new ConcurrentLinkedQueue<Packet>();
-
- if (block)
- {
- sendSemaphore = new Semaphore(windowSize, true);
- }
- else
- {
- sendSemaphore = null;
- }
- }
- else
- {
- resendCache = null;
-
- sendSemaphore = null;
- }
- }
-
- public long getID()
- {
- return id;
- }
-
- public int getLastReceivedCommandID()
- {
- return lastReceivedCommandID;
- }
-
- public Lock getLock()
- {
- return lock;
- }
-
- public void returnBlocking()
- {
- lock.lock();
-
- try
- {
- response = new PacketImpl(EARLY_RESPONSE);
-
- sendCondition.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public void sendAndFlush(final Packet packet)
- {
- send(packet, true);
- }
-
- public void send(final Packet packet)
- {
- send(packet, false);
- }
-
- // This must never called by more than one thread concurrently
- public void send(final Packet packet, final boolean flush)
- {
- synchronized (sendLock)
- {
- packet.setChannelID(id);
-
- final MessagingBuffer buffer = connection.transportConnection.createBuffer(packet.getRequiredBufferSize());
-
- int size = packet.encode(buffer);
-
- // Must block on semaphore outside the main lock or this can prevent failover from occurring
- if (sendSemaphore != null && packet.getType() != PACKETS_CONFIRMED)
- {
- try
- {
- sendSemaphore.acquire(size);
- }
- catch (InterruptedException e)
- {
- throw new IllegalStateException("Semaphore interrupted");
- }
- }
-
- lock.lock();
-
- try
- {
- while (failingOver)
- {
- // TODO - don't hardcode this timeout
- try
- {
- failoverCondition.await(10000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- if (resendCache != null && packet.isRequiresConfirmations())
- {
- resendCache.add(packet);
- }
-
- if (connection.active || packet.isWriteAlways())
- {
- connection.transportConnection.write(buffer, flush);
- }
- }
- finally
- {
- lock.unlock();
- }
- }
- }
-
- public Packet sendBlocking(final Packet packet) throws MessagingException
- {
- if (closed)
- {
- throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
- }
-
- if (connection.blockingCallTimeout == -1)
- {
- throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
- }
-
- // Synchronized since can't be called concurrently by more than one thread and this can occur
- // E.g. blocking acknowledge() from inside a message handler at some time as other operation on main thread
- synchronized (sendBlockingLock)
- {
- packet.setChannelID(id);
-
- final MessagingBuffer buffer = connection.transportConnection.createBuffer(packet.getRequiredBufferSize());
-
- int size = packet.encode(buffer);
-
- // Must block on semaphore outside the main lock or this can prevent failover from occurring
- if (sendSemaphore != null)
- {
- try
- {
- sendSemaphore.acquire(size);
- }
- catch (InterruptedException e)
- {
- throw new IllegalStateException("Semaphore interrupted");
- }
- }
-
- lock.lock();
-
- try
- {
- while (failingOver)
- {
- // TODO - don't hardcode this timeout
- try
- {
- failoverCondition.await(10000, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- response = null;
-
- if (resendCache != null && packet.isRequiresConfirmations())
- {
- resendCache.add(packet);
- }
-
- connection.transportConnection.write(buffer);
-
- long toWait = connection.blockingCallTimeout;
-
- long start = System.currentTimeMillis();
-
- while (response == null && toWait > 0)
- {
- try
- {
- sendCondition.await(toWait, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- }
-
- if (closed)
- {
- break;
- }
-
- final long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- if (response == null)
- {
- throw new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
- "Timed out waiting for response when sending packet " + packet.getType());
- }
-
- if (response.getType() == PacketImpl.EXCEPTION)
- {
- final MessagingExceptionMessage mem = (MessagingExceptionMessage)response;
-
- throw mem.getException();
- }
- else
- {
- return response;
- }
- }
- finally
- {
- lock.unlock();
- }
- }
- }
-
- // Must be synchronized since can be called by incoming session commands but also by deliveries
- // Also needs to be synchronized with respect to replicatingChannelDead
- public void replicatePacket(final Packet packet, final long replicatedChannelID, final Runnable action)
- {
- packet.setChannelID(replicatedChannelID);
-
- boolean runItNow = false;
-
- synchronized (replicationLock)
- {
- if (playedResponsesOnFailure && action != null)
- {
- // Already replicating channel failed, so just play the action now
-
- runItNow = true;
- }
- else
- {
- if (action != null)
- {
- responseActions.add(action);
-
- responseActionCount++;
- }
-
- final MessagingBuffer buffer = connection.transportConnection.createBuffer(packet.getRequiredBufferSize());
-
- packet.encode(buffer);
-
- connection.transportConnection.write(buffer);
- }
- }
-
- // Execute outside lock
-
- if (runItNow)
- {
- action.run();
- }
- }
-
- public void executeOutstandingDelayedResults()
- {
- // Execute on different thread to avoid deadlock
-
- new Thread()
- {
- public void run()
- {
- doExecuteOutstandingDelayedResults();
- }
- }.start();
- }
-
- private void doExecuteOutstandingDelayedResults()
- {
- List<Runnable> toRun = new ArrayList<Runnable>();
-
- synchronized (replicationLock)
- {
- // Execute all the response actions now
-
- while (true)
- {
- Runnable action = responseActions.poll();
-
- if (action != null)
- {
- toRun.add(action);
- }
- else
- {
- break;
- }
- }
-
- responseActionCount = 0;
-
- playedResponsesOnFailure = true;
-
- for (Runnable action : toRun)
- {
- action.run();
- }
- }
-
- }
-
- public void setHandler(final ChannelHandler handler)
- {
- this.handler = handler;
- }
-
- public ChannelHandler getHandler()
- {
- return handler;
- }
-
- public void close()
- {
- if (closed)
- {
- return;
- }
-
- if (!connection.destroyed && connection.channels.remove(id) == null)
- {
- throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
- }
-
- closed = true;
- }
-
- public void transferConnection(final RemotingConnection newConnection,
- final long newChannelID,
- final Channel replicatingChannel)
- {
- // Needs to synchronize on the connection to make sure no packets from
- // the old connection get processed after transfer has occurred
- synchronized (connection.transferLock)
- {
- connection.channels.remove(id);
-
- if (replicatingChannel != null)
- {
- // If we're reconnecting to a live node which is replicated then there will be a replicating channel
- // too. We need to then make sure that all replication responses come back since packets aren't
- // considered confirmed until response comes back and is processed. Otherwise responses to previous
- // message sends could come back after reconnection resulting in clients resending same message
- // since it wasn't confirmed yet.
- ((ChannelImpl)replicatingChannel).waitForAllReplicationResponse();
- }
-
- // And switch it
-
- final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
-
- if (rnewConnection.channels.containsKey(newChannelID))
- {
- throw new IllegalStateException("connection already has channel with id " + newChannelID);
- }
-
- rnewConnection.channels.put(newChannelID, this);
-
- connection = rnewConnection;
-
- this.id = newChannelID;
- }
- }
-
- public void replayCommands(final int otherLastReceivedCommandID, final long newChannelID)
- {
- clearUpTo(otherLastReceivedCommandID);
-
- for (final Packet packet : resendCache)
- {
- packet.setChannelID(newChannelID);
-
- doWrite(packet);
- }
- }
-
- public void lock()
- {
- lock.lock();
-
- failingOver = true;
-
- lock.unlock();
- }
-
- public void unlock()
- {
- lock.lock();
-
- failingOver = false;
-
- failoverCondition.signalAll();
-
- lock.unlock();
- }
-
- public RemotingConnection getConnection()
- {
- return connection;
- }
-
- public void flushConfirmations()
- {
- if (receivedBytes != 0 && connection.active)
- {
- receivedBytes = 0;
-
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
-
- confirmed.setChannelID(id);
-
- doWrite(confirmed);
- }
- }
-
- public void confirm(final Packet packet)
- {
- if (resendCache != null && packet.isRequiresConfirmations())
- {
- lastReceivedCommandID++;
-
- receivedBytes += packet.getPacketSize();
-
- if (receivedBytes >= confWindowSize)
- {
- receivedBytes = 0;
-
- if (connection.active)
- {
- final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
-
- confirmed.setChannelID(id);
-
- doWrite(confirmed);
- }
- }
- }
- }
-
- private void replicateComplete()
- {
- if (!connection.active && id != 0)
- {
- // We're on backup and not ping channel so send back a replication response
-
- Packet packet = new PacketImpl(REPLICATION_RESPONSE);
-
- packet.setChannelID(2);
-
- doWrite(packet);
- }
- }
-
- // This will never get called concurrently by more than one thread
-
- // TODO it's not ideal synchronizing this since it forms a contention point with replication
- // but we need to do this to protect it w.r.t. the check on replicatingChannel
- private void replicateResponseReceived()
- {
- Runnable result = null;
-
- synchronized (replicationLock)
- {
- if (playedResponsesOnFailure)
- {
- return;
- }
-
- result = responseActions.poll();
-
- if (result == null)
- {
- throw new IllegalStateException("Cannot find response action");
- }
- }
-
- // Must execute outside of lock
- if (result != null)
- {
- result.run();
-
- // TODO - we can optimise this not to lock every time - only if waiting for all replications to return
- synchronized (replicationLock)
- {
- responseActionCount--;
-
- if (responseActionCount == 0)
- {
- replicationLock.notify();
- }
- }
- }
- }
-
- private void waitForAllReplicationResponse()
- {
- synchronized (replicationLock)
- {
- long toWait = 10000; // TODO don't hardcode timeout
-
- long start = System.currentTimeMillis();
-
- while (responseActionCount > 0 && toWait > 0)
- {
- try
- {
- replicationLock.wait();
- }
- catch (InterruptedException e)
- {
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- if (toWait <= 0)
- {
- log.warn("Timed out waiting for replication responses to return");
- }
- }
- }
-
- private void handlePacket(final Packet packet)
- {
- if (packet.getType() == PACKETS_CONFIRMED)
- {
- if (resendCache != null)
- {
- final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
-
- clearUpTo(msg.getCommandID());
- }
-
- if (!connection.client)
- {
- handler.handlePacket(packet);
- }
-
- return;
- }
- else if (packet.getType() == REPLICATION_RESPONSE)
- {
- replicateResponseReceived();
-
- return;
- }
- else
- {
- if (connection.interceptors != null)
- {
- for (final Interceptor interceptor : connection.interceptors)
- {
- try
- {
- boolean callNext = interceptor.intercept(packet, connection);
-
- if (!callNext)
- {
- // abort
-
- return;
- }
- }
- catch (final Throwable e)
- {
- log.warn("Failure in calling interceptor: " + interceptor, e);
- }
- }
- }
-
- if (packet.isResponse())
- {
- response = packet;
-
- confirm(packet);
-
- lock.lock();
-
- try
- {
- sendCondition.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
- else if (handler != null)
- {
- handler.handlePacket(packet);
- }
- }
-
- replicateComplete();
- }
-
- private void doWrite(final Packet packet)
- {
- final MessagingBuffer buffer = connection.transportConnection.createBuffer(packet.getRequiredBufferSize());
-
- packet.encode(buffer);
-
- connection.transportConnection.write(buffer);
- }
-
- private void clearUpTo(final int lastReceivedCommandID)
- {
- final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
-
- if (numberToClear == -1)
- {
- throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
- }
-
- int sizeToFree = 0;
-
- for (int i = 0; i < numberToClear; i++)
- {
- final Packet packet = resendCache.poll();
-
- if (packet == null)
- {
- throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
- " last received command id " +
- lastReceivedCommandID +
- " first stored command id " +
- firstStoredCommandID +
- " cache size " +
- this.resendCache.size() +
- " channel id " +
- id +
- " client " +
- connection.client +
- " created active " +
- connection.createdActive);
- }
-
- if (packet.getType() != PACKETS_CONFIRMED)
- {
- sizeToFree += packet.getPacketSize();
- }
-
- if (commandConfirmationHandler != null)
- {
- commandConfirmationHandler.commandConfirmed(packet);
- }
- }
-
- firstStoredCommandID += numberToClear;
-
- if (sendSemaphore != null)
- {
- sendSemaphore.release(sizeToFree);
- }
- }
- }
}
More information about the jboss-cvs-commits
mailing list