[jboss-cvs] JBoss Messaging SVN: r7653 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/remoting and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Aug 2 05:15:24 EDT 2009
Author: timfox
Date: 2009-08-02 05:15:23 -0400 (Sun, 02 Aug 2009)
New Revision: 7653
Removed:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/RegisterQueueReplicationChannelMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/UnregisterQueueReplicationChannelMessage.java
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/QueuedWriteManagerTest.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -776,7 +776,7 @@
channel.waitForAllExecutions();
- channel.transferConnection(backupConnection);
+ channel.setConnection(backupConnection);
// log.info("unfreezing");
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -24,7 +24,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
-import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -153,7 +152,7 @@
private Connector connector;
private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
-
+
private volatile boolean neverFail;
// debug
@@ -211,11 +210,11 @@
{
backupConnectorFactory = null;
- backupTransportParams = null;
+ backupTransportParams = null;
}
-
- // log.info(System.identityHashCode(this) + " created cm with bcf " + this.backupConnectorFactory);
+ // log.info(System.identityHashCode(this) + " created cm with bcf " + this.backupConnectorFactory);
+
this.maxConnections = maxConnections;
this.callTimeout = callTimeout;
@@ -315,7 +314,7 @@
inCreateSession = true;
}
- Packet request = new CreateSessionMessage(name,
+ Packet request = new CreateSessionMessage(name,
clientVersion.getIncrementingVersion(),
username,
password,
@@ -343,13 +342,13 @@
{
CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
- Channel sessionChannel = new ChannelImpl(response.getSessionChannelID(), producerWindowSize,
- producerWindowSize != -1, null);
-
- sessionChannel.transferConnection(connection);
-
+ Channel sessionChannel = new ChannelImpl(response.getSessionChannelID(),
+ connection,
+ producerWindowSize,
+ producerWindowSize != -1);
+
connection.putChannel(sessionChannel);
-
+
ClientSessionInternal session = new ClientSessionImpl(this,
name,
xa,
@@ -504,7 +503,7 @@
}
}
}
-
+
public void setNeverFail()
{
neverFail = true;
@@ -545,16 +544,16 @@
private boolean failoverOrReconnect(final MessagingException me, final Object connectionID)
{
- // log.info(System.identityHashCode(this) + " connection manager failover or reconnect");
+ // log.info(System.identityHashCode(this) + " connection manager failover or reconnect");
// To prevent recursion
if (inFailoverOrReconnect)
{
- // log.info("Already in it");
+ // log.info("Already in it");
return false;
}
- // log.info("Waiting on failover lock");
-
+ // log.info("Waiting on failover lock");
+
synchronized (failoverLock)
{
if (connectionID != null && !connections.containsKey(connectionID))
@@ -563,14 +562,14 @@
// over then a async connection exception or disconnect
// came in for one of the already closed connections, so we return true - we don't want to call the
// listeners again
-
- // log.info("ALready failed over that connection");
+ // log.info("ALready failed over that connection");
+
return true;
}
-
- // log.info("Got failover lock");
+ // log.info("Got failover lock");
+
// Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure
// There are either no threads executing in createSession, or one is blocking on a createSession
// result.
@@ -598,16 +597,17 @@
boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.DISCONNECTED);
- // log.info("Attempt failover is " + attemptFailover);
- // log.info("bcf " + backupConnectorFactory + " fss: " + failoverOnServerShutdown + " me code " + me.getCode());
-
+ // log.info("Attempt failover is " + attemptFailover);
+ // log.info("bcf " + backupConnectorFactory + " fss: " + failoverOnServerShutdown + " me code " +
+ // me.getCode());
+
boolean done = false;
if (attemptFailover || reconnectAttempts != 0)
{
- // log.info("Locking all channels");
+ // log.info("Locking all channels");
lockAllChannel1s();
- // log.info("Locked all channels");
+ // log.info("Locked all channels");
final boolean needToInterrupt;
@@ -618,8 +618,8 @@
unlockAllChannel1s();
- // log.info("Need to interrupt is " + needToInterrupt);
-
+ // log.info("Need to interrupt is " + needToInterrupt);
+
if (needToInterrupt)
{
// Forcing return all channels won't guarantee that any blocked thread will return immediately
@@ -642,12 +642,12 @@
}
}
}
-
- // log.info("waited for create session to exit");
+
+ // log.info("waited for create session to exit");
}
- //log.info("continuing");
-
+ // log.info("continuing");
+
// Now we absolutely know that no threads are executing in or blocked in createSession, and no
// more will execute it until failover is complete
@@ -686,8 +686,8 @@
transportParams = backupTransportParams;
- //log.info(System.identityHashCode(this) + " set bcf to null");
-
+ // log.info(System.identityHashCode(this) + " set bcf to null");
+
backupConnectorFactory = null;
backupTransportParams = null;
@@ -811,8 +811,8 @@
}
}
- // log.info("ok is " + ok);
-
+ // log.info("ok is " + ok);
+
if (ok)
{
// If all connections got ok, then handle failover
@@ -922,7 +922,7 @@
connector = null;
}
}
-
+
public RemotingConnection getConnection(final int initialRefCount)
{
synchronized (createSessionLock)
@@ -937,7 +937,7 @@
private RemotingConnection internalGetConnection(final int initialRefCount)
{
RemotingConnection conn;
-
+
if (connections.size() < maxConnections)
{
// Create a new one
@@ -951,8 +951,8 @@
DelegatingBufferHandler handler = new DelegatingBufferHandler();
connector = connectorFactory.createConnector(transportParams, handler, this, threadPool);
-
- //For testing only - this makes sure that invm connector failures don't happen for backup connections
+
+ // For testing only - this makes sure that invm connector failures don't happen for backup connections
if (neverFail)
{
connector.setNeverFail();
@@ -1039,7 +1039,7 @@
pingers.put(conn.getID(), pinger);
Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
-
+
Channel pingChannel = conn.getChannel(0);
pingChannel.send(ping);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -39,7 +39,7 @@
void close();
- void transferConnection(RemotingConnection newConnection);
+ void setConnection(RemotingConnection newConnection);
void replayCommands(int lastConfirmedCommandID);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -97,10 +98,35 @@
private int receivedBytes;
- private CommandConfirmationHandler commandConfirmationHandler;
+ private volatile CommandConfirmationHandler commandConfirmationHandler;
+
+ public ChannelImpl(final long id, final RemotingConnection connection)
+ {
+ this(id, -1, false, null, connection);
+ }
+
+ public ChannelImpl(final long id)
+ {
+ this(id, -1, false, null);
+ }
+
+ public ChannelImpl(final long id, final Executor executor)
+ {
+ this(id, -1, false, executor);
+ }
+
+ public ChannelImpl(final long id, final RemotingConnection connection, final int windowSize, final boolean block)
+ {
+ this(id, windowSize, block, null, connection);
+ }
public ChannelImpl(final long id, final int windowSize, final boolean block, final Executor executor)
{
+ this(id, windowSize, block, executor, null);
+ }
+
+ private ChannelImpl(final long id, final int windowSize, final boolean block, final Executor executor, final RemotingConnection connection)
+ {
this.id = id;
this.windowSize = windowSize;
@@ -127,6 +153,8 @@
sendSemaphore = null;
}
this.executor = executor;
+
+ this.connection = connection;
}
public long getID()
@@ -367,7 +395,7 @@
return;
}
- if (!connection.isDestroyed() && !connection.removeChannel(id))
+ if (connection != null && !connection.isDestroyed() && !connection.removeChannel(id))
{
throw new IllegalArgumentException("Cannot find channel with id " + id + " to close");
}
@@ -375,7 +403,7 @@
closed = true;
}
- public void transferConnection(final RemotingConnection newConnection)
+ public void setConnection(final RemotingConnection newConnection)
{
if (connection != null)
{
@@ -498,20 +526,27 @@
}
else
{
- executor.execute(new Runnable()
+ try
{
- public void run()
+ executor.execute(new Runnable()
{
- try
+ public void run()
{
- doHandlePacket(packet);
+ try
+ {
+ doHandlePacket(packet);
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to handle packet", t);
+ }
}
- catch (Throwable t)
- {
- log.error("Failed to handle packet", t);
- }
- }
- });
+ });
+ }
+ catch (RejectedExecutionException e)
+ {
+ //Ignore - this can happen when shutting down
+ }
}
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -33,7 +33,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PING;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_QUEUE_REPLICATION_CHANNEL;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
@@ -80,7 +79,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.UNREGISTER_QUEUE_REPLICATION_CHANNEL;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
@@ -125,7 +123,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
@@ -135,7 +132,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.UnregisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
@@ -442,16 +438,6 @@
packet = new PacketImpl(REPLICATE_QUEUE_DELIVERY);
break;
}
- case REGISTER_QUEUE_REPLICATION_CHANNEL:
- {
- packet = new RegisterQueueReplicationChannelMessage();
- break;
- }
- case UNREGISTER_QUEUE_REPLICATION_CHANNEL:
- {
- packet = new UnregisterQueueReplicationChannelMessage();
- break;
- }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -19,7 +19,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -31,8 +30,7 @@
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.ExecutorFactory;
-import org.jboss.messaging.utils.OrderedExecutorFactory;
+import org.jboss.messaging.core.server.ChannelManager;
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -68,6 +66,8 @@
private volatile boolean active;
private final boolean client;
+
+ private final ChannelManager channelManager;
// Channels 0-9 are reserved for the system
// 0 is for pinging
@@ -90,7 +90,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection, null, blockingCallTimeout, interceptors, true, true);
+ this(transportConnection, null, blockingCallTimeout, interceptors, true, true, null);
}
/*
@@ -99,10 +99,11 @@
public RemotingConnectionImpl(final Connection transportConnection,
final RemotingConnection replicatingConnection,
final List<Interceptor> interceptors,
- final boolean active)
+ final boolean active,
+ final ChannelManager channelManager)
{
- this(transportConnection, replicatingConnection, -1, interceptors, active, false);
+ this(transportConnection, replicatingConnection, -1, interceptors, active, false, channelManager);
}
private RemotingConnectionImpl(final Connection transportConnection,
@@ -110,7 +111,8 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors,
final boolean active,
- final boolean client)
+ final boolean client,
+ final ChannelManager channelManager)
{
this.transportConnection = transportConnection;
@@ -125,14 +127,14 @@
this.client = client;
- Channel pingChannel = new ChannelImpl(0, -1, false, null);
- pingChannel.transferConnection(this);
+ this.channelManager = channelManager;
+
+ Channel pingChannel = new ChannelImpl(0, this);
putChannel(pingChannel);
- Channel channel1 = new ChannelImpl(1, -1, false, null);
- channel1.transferConnection(this);
-
+ Channel channel1 = new ChannelImpl(1, this);
+
putChannel(channel1);
}
@@ -336,6 +338,23 @@
if (!frozen)
{
Channel channel = channels.get(packet.getChannelID());
+
+ if (channel == null && channelManager != null)
+ {
+ channel = channelManager.getChannel(packet.getChannelID());
+
+ if (channel != null)
+ {
+ if (channel.getConnection() != null)
+ {
+ throw new IllegalStateException("Channel already has connection associated to it");
+ }
+
+ channel.setConnection(this);
+
+ channels.put(channel.getID(), channel);
+ }
+ }
//A
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -162,12 +162,7 @@
public static final byte REPLICATE_QUEUE_DELIVERY = 99;
- public static final byte REGISTER_QUEUE_REPLICATION_CHANNEL = 100;
- public static final byte UNREGISTER_QUEUE_REPLICATION_CHANNEL = 101;
-
- public static final byte REGISTER_POST_OFFICE_REPLICATION_CHANNEL = 102;
-
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/RegisterQueueReplicationChannelMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/RegisterQueueReplicationChannelMessage.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/RegisterQueueReplicationChannelMessage.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -1,82 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat.replication;
-
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.DataConstants;
-
-/**
- *
- * A RegisterQueueReplicationChannelMessage
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class RegisterQueueReplicationChannelMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long bindingID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public RegisterQueueReplicationChannelMessage(final long bindingID)
- {
- super(REGISTER_QUEUE_REPLICATION_CHANNEL);
-
- this.bindingID = bindingID;
- }
-
- // Public --------------------------------------------------------
-
- public RegisterQueueReplicationChannelMessage()
- {
- super(REGISTER_QUEUE_REPLICATION_CHANNEL);
- }
-
- public int getRequiredBufferSize()
- {
- return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG;
- }
-
- @Override
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.writeLong(bindingID);
- }
-
- @Override
- public void decodeBody(final MessagingBuffer buffer)
- {
- bindingID = buffer.readLong();
- }
-
- public long getBindingID()
- {
- return bindingID;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Deleted: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/UnregisterQueueReplicationChannelMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/UnregisterQueueReplicationChannelMessage.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/UnregisterQueueReplicationChannelMessage.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -1,82 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
- * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
- * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
- * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
- * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
- * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat.replication;
-
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.DataConstants;
-
-/**
- *
- * A RegisterQueueReplicationChannelMessage
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class UnregisterQueueReplicationChannelMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long bindingID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public UnregisterQueueReplicationChannelMessage(final long bindingID)
- {
- super(UNREGISTER_QUEUE_REPLICATION_CHANNEL);
-
- this.bindingID = bindingID;
- }
-
- // Public --------------------------------------------------------
-
- public UnregisterQueueReplicationChannelMessage()
- {
- super(UNREGISTER_QUEUE_REPLICATION_CHANNEL);
- }
-
- public int getRequiredBufferSize()
- {
- return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG;
- }
-
- @Override
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.writeLong(bindingID);
- }
-
- @Override
- public void decodeBody(final MessagingBuffer buffer)
- {
- bindingID = buffer.readLong();
- }
-
- public long getBindingID()
- {
- return bindingID;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.server.ChannelManager;
import org.jboss.messaging.core.server.MessagingComponent;
/**
@@ -56,4 +57,6 @@
void freeze();
RemotingConnection getServerSideReplicatingConnection();
+
+ ChannelManager getChannelManager();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -40,7 +40,6 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
-import org.jboss.messaging.core.remoting.impl.ChannelImpl;
import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -54,7 +53,9 @@
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ChannelManager;
import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.impl.ChannelManagerImpl;
import org.jboss.messaging.core.server.impl.MessagingServerPacketHandler;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.ReplicatorImpl;
@@ -102,6 +103,8 @@
private Map<Object, Pinger> pingers = new ConcurrentHashMap<Object, Pinger>();
private final int managementConnectorID;
+
+ private final ChannelManager channelManager = new ChannelManagerImpl();
// Static --------------------------------------------------------
@@ -275,6 +278,11 @@
{
return serverSideReplicatingConnection;
}
+
+ public ChannelManager getChannelManager()
+ {
+ return channelManager;
+ }
// ConnectionLifeCycleListener implementation -----------------------------------
@@ -313,7 +321,8 @@
RemotingConnection rc = new RemotingConnectionImpl(connection,
replicatingConnection,
interceptors,
- !config.isBackup());
+ !config.isBackup(),
+ channelManager);
final Replicator replicator;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -22,7 +22,6 @@
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -80,9 +79,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.UnregisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.remoting.server.RemotingService;
import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
@@ -106,7 +103,6 @@
import org.jboss.messaging.core.server.replication.impl.JBMThread;
import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
import org.jboss.messaging.core.server.replication.impl.ReplicatorImpl;
-import org.jboss.messaging.core.server.replication.impl.SequencedLock;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
@@ -605,9 +601,7 @@
false,
configuration.isBackup() ? this.executorFactory.getExecutor() : null);
- channel.transferConnection(connection);
-
- connection.putChannel(channel);
+ remotingService.getChannelManager().putChannel(channel);
RemotingConnection replicatingConnection = connection.getReplicatingConnection();
@@ -617,10 +611,8 @@
if (replicatingConnection != null)
{
- replicatingChannel = new ChannelImpl(channelID, -1, false, null);
+ replicatingChannel = new ChannelImpl(channelID, replicatingConnection);
- replicatingChannel.transferConnection(replicatingConnection);
-
replicatingConnection.putChannel(replicatingChannel);
replicator = new ReplicatorImpl("session " + channelID, replicatingChannel);
@@ -863,16 +855,9 @@
}
postOffice.removeBinding(queueName);
+
+ remotingService.getChannelManager().removeChannel(queue.getID());
- Replicator replicator = queue.getReplicator();
-
- if (replicator != null)
- {
- Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1);
-
- channel1.send(new UnregisterQueueReplicationChannelMessage(queue.getID()));
- }
-
queue.close();
}
@@ -1558,20 +1543,8 @@
if (replicatingConnection != null)
{
- Channel channel1 = replicatingConnection.getChannel(1);
-
- JBMThread thread = JBMThread.currentThread();
-
- thread.setNoReplayOrRecord(1);
-
- channel1.send(new RegisterQueueReplicationChannelMessage(queueID));
-
- thread.resumeRecording();
-
- Channel replChannel = new ChannelImpl(queueID, -1, false, null);
+ Channel replChannel = new ChannelImpl(queueID, replicatingConnection);
- replChannel.transferConnection(replicatingConnection);
-
replicatingConnection.putChannel(replChannel);
replicator = new ReplicatorImpl("queue " + queueID, replChannel);
@@ -1628,6 +1601,8 @@
queues.put(queueBindingInfo.getPersistenceID(), queue);
postOffice.addBinding(binding);
+
+ createHandlerForQueue(queue);
}
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
@@ -1738,9 +1713,21 @@
}
postOffice.addBinding(binding);
+
+ createHandlerForQueue(queue);
return queue;
}
+
+ private void createHandlerForQueue(final Queue queue)
+ {
+ if (configuration.isBackup())
+ {
+ Channel channel = new ChannelImpl(queue.getID(), executorFactory.getExecutor());
+
+ remotingService.getChannelManager().putChannel(channel);
+ }
+ }
private void deployDiverts() throws Exception
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -14,11 +14,8 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_POST_OFFICE_REPLICATION_CHANNEL;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REGISTER_QUEUE_REPLICATION_CHANNEL;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.UNREGISTER_QUEUE_REPLICATION_CHANNEL;
import java.util.List;
@@ -28,16 +25,13 @@
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.ChannelImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.RegisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateStartupInfoMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.replication.UnregisterQueueReplicationChannelMessage;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
@@ -116,40 +110,7 @@
}
break;
- }
- case REGISTER_QUEUE_REPLICATION_CHANNEL:
- {
- RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
-
- Channel channel = new ChannelImpl(msg.getBindingID(), -1, false, server.getExecutorFactory().getExecutor());
-
- channel.transferConnection(connection);
-
- connection.putChannel(channel);
-
- if (server.registerBackupConnection(channel.getConnection()))
- {
- channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
- }
-
- break;
- }
- case UNREGISTER_QUEUE_REPLICATION_CHANNEL:
- {
- UnregisterQueueReplicationChannelMessage msg = (UnregisterQueueReplicationChannelMessage)packet;
-
- Channel channel = connection.getChannel(msg.getBindingID());
-
- channel.setHandler(null);
-
- channel.close();
-
- break;
- }
- case REGISTER_POST_OFFICE_REPLICATION_CHANNEL:
- {
- break;
- }
+ }
case CREATESESSION:
{
final CreateSessionMessage request = (CreateSessionMessage)packet;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -1569,7 +1569,7 @@
// if these reach the client who then subsequently fails over, on reconnection to backup, it will have
// received responses that the backup did not know about.
- channel.transferConnection(newConnection);
+ channel.setConnection(newConnection);
remotingConnection = newConnection;
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/QueuedWriteManagerTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/QueuedWriteManagerTest.java 2009-07-31 19:45:44 UTC (rev 7652)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/QueuedWriteManagerTest.java 2009-08-02 09:15:23 UTC (rev 7653)
@@ -802,7 +802,7 @@
}
- public void transferConnection(RemotingConnection newConnection)
+ public void setConnection(RemotingConnection newConnection)
{
// TODO Auto-generated method stub
More information about the jboss-cvs-commits
mailing list