[jboss-cvs] JBoss Messaging SVN: r7660 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/remoting/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Aug 3 15:11:49 EDT 2009
Author: timfox
Date: 2009-08-03 15:11:48 -0400 (Mon, 03 Aug 2009)
New Revision: 7660
Added:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ChannelManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ChannelManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueuePacketHandler.java
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -42,6 +42,8 @@
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.ChannelManager;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.utils.ExecutorFactory;
@@ -70,6 +72,10 @@
private final Executor globalDepagerExecutor;
+ private final ChannelManager channelManager;
+
+ private final MessagingServer server;
+
private PagingManager pagingManager;
private StorageManager storageManager;
@@ -80,12 +86,19 @@
// Constructors --------------------------------------------------
- public PagingStoreFactoryNIO(final String directory, final ExecutorFactory executorFactory)
+ public PagingStoreFactoryNIO(final String directory,
+ final ExecutorFactory executorFactory,
+ final ChannelManager channelManager,
+ final MessagingServer server)
{
this.directory = directory;
this.executorFactory = executorFactory;
+ this.channelManager = channelManager;
+
+ this.server = server;
+
globalDepagerExecutor = executorFactory.getExecutor();
}
@@ -102,7 +115,6 @@
public synchronized PagingStore newStore(final SimpleString destinationName, final AddressSettings settings) throws Exception
{
-
return new PagingStoreImpl(pagingManager,
storageManager,
postOffice,
@@ -110,7 +122,9 @@
this,
destinationName,
settings,
- executorFactory.getExecutor());
+ executorFactory,
+ channelManager,
+ server);
}
/**
@@ -176,7 +190,7 @@
{
final String guid = file.getName();
-
+
final File addressFile = new File(file, ADDRESS_FILE);
if (!addressFile.exists())
@@ -188,7 +202,7 @@
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(addressFile)));
String destination;
-
+
try
{
destination = reader.readLine();
@@ -211,7 +225,9 @@
this,
destinationName,
settings,
- executorFactory.getExecutor());
+ executorFactory,
+ channelManager,
+ server);
storesReturn.add(store);
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -30,9 +30,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.journal.SequentialFile;
@@ -47,13 +45,24 @@
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.server.ChannelManager;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.impl.PagingStorePacketHandler;
+import org.jboss.messaging.core.server.replication.ReplicableCall;
+import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.ReplicationAwareMutex;
+import org.jboss.messaging.core.server.replication.impl.ReplicatorImpl;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
-import org.jboss.messaging.utils.Future;
+import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -61,6 +70,7 @@
* @see PagingStore
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public class PagingStoreImpl implements TestSupportPageStore
@@ -110,15 +120,15 @@
private volatile Page currentPage;
- private final ReentrantLock writeLock = new ReentrantLock();
+ private final ReplicationAwareMutex writeLock;
/**
* We need to perform checks on currentPage with minimal locking
* */
private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
-
- //private final Lock lock = ReplicationAwareMutex.createLock().writeLock();
+ // private final Lock lock = ReplicationAwareMutex.createLock().writeLock();
+
private volatile boolean running = false;
// Static --------------------------------------------------------
@@ -142,7 +152,9 @@
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor)
+ final ExecutorFactory executorFactory,
+ final ChannelManager channelManager,
+ final MessagingServer server) throws Exception
{
if (pagingManager == null)
{
@@ -161,15 +173,43 @@
dropMessagesWhenFull = addressSettings.isDropMessagesWhenFull();
- this.executor = executor;
+ this.executor = executorFactory.getExecutor();
this.pagingManager = pagingManager;
this.fileFactory = fileFactory;
this.storeFactory = storeFactory;
+
+ long id = storageManager.generateUniqueID();
+
+ if (server.getConfiguration().isBackup())
+ {
+ Channel channel = new ChannelImpl(id, executorFactory.getExecutor());
+
+ channel.setHandler(new PagingStorePacketHandler(this, channel));
+
+ channelManager.putChannel(channel);
+ }
+
+ RemotingConnection connection = server.getPooledReplicatingConnection();
+
+ if (connection != null)
+ {
+ Channel replicatingChannel = new ChannelImpl(id, connection);
+
+ replicator = new ReplicatorImpl("paging-store-" + storeName, replicatingChannel);
+ }
+ else
+ {
+ replicator = null;
+ }
+
+ this.writeLock = new ReplicationAwareMutex("paging-store-" + storeName, 0, false);
}
+ private final Replicator replicator;
+
// Public --------------------------------------------------------
// PagingStore implementation ------------------------------------
@@ -346,7 +386,7 @@
currentPageLock.readLock().unlock();
}
- writeLock.lock();
+ writeLock.lock(1);
try
{
@@ -367,7 +407,7 @@
ByteBuffer buff = ByteBuffer.wrap(bytes);
buff.putLong(msg.getMessageID());
-
+
msg.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
}
@@ -501,7 +541,7 @@
if (running)
{
running = false;
-
+
if (currentPage != null)
{
currentPage.close();
@@ -512,7 +552,7 @@
public void start() throws Exception
{
- writeLock.lock();
+ writeLock.lock(2);
try
{
@@ -604,7 +644,7 @@
// if the first check failed, we do it again under a global currentPageLock
// (writeLock) this time
- writeLock.lock();
+ writeLock.lock(3);
try
{
@@ -632,6 +672,23 @@
*/
public boolean readPage() throws Exception
{
+ if (replicator == null)
+ {
+ return doReadPage();
+ }
+ else
+ {
+ ReplicableCall<Boolean> action = new DepageAction();
+
+ replicator.execute(action, null);
+
+ return action.getResult();
+ }
+
+ }
+
+ private boolean doReadPage() throws Exception
+ {
Page page = depage();
if (page == null)
@@ -654,7 +711,35 @@
}
}
+
+ private class DepageAction implements ReplicableCall<Boolean>
+ {
+ public Packet getPacket()
+ {
+ return new PacketImpl(PacketImpl.REPLICATE_DEPAGE);
+ }
+ private boolean ok;
+
+ public void run()
+ {
+ try
+ {
+ ok = doReadPage();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to read page", e);
+ }
+ }
+
+ public Boolean getResult()
+ {
+ return ok;
+ }
+ }
+
+
// TestSupportPageStore ------------------------------------------
public void forceAnotherPage() throws Exception
@@ -672,7 +757,7 @@
* */
public Page depage() throws Exception
{
- writeLock.lock();
+ writeLock.lock(4);
currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -37,6 +37,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ACKNOWLEDGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_DEPAGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_QUEUE_DELIVERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
@@ -439,6 +440,11 @@
packet = new PacketImpl(REPLICATE_QUEUE_DELIVERY);
break;
}
+ case REPLICATE_DEPAGE:
+ {
+ packet = new PacketImpl(REPLICATE_DEPAGE);
+ break;
+ }
case BACKUP_CONNECTION:
{
packet = new PacketImpl(BACKUP_CONNECTION);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -162,9 +162,11 @@
public static final byte REPLICATE_QUEUE_DELIVERY = 99;
- public static final byte BACKUP_CONNECTION = 100;
+ public static final byte REPLICATE_DEPAGE = 100;
+ public static final byte BACKUP_CONNECTION = 101;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ChannelManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ChannelManager.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/ChannelManager.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server;
+
+import org.jboss.messaging.core.remoting.Channel;
+
+/**
+ * A ChannelManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public interface ChannelManager
+{
+ Channel getChannel(long id);
+
+ void putChannel(Channel channel);
+
+ Channel removeChannel(long id);
+}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -116,6 +116,8 @@
RemotingConnection getNonPooledReplicatingConnection() throws Exception;
void returnNonPooledReplicatingConnection(RemotingConnection connection);
+
+ RemotingConnection getPooledReplicatingConnection() throws Exception;
void initialiseBackup(UUID nodeID, long currentMessageID) throws Exception;
Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ChannelManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ChannelManagerImpl.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ChannelManagerImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.server.impl;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.server.ChannelManager;
+
+/**
+ * A ChannelManagerImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class ChannelManagerImpl implements ChannelManager
+{
+ private ConcurrentMap<Long, Channel> channels = new ConcurrentHashMap<Long, Channel>();
+
+ public Channel getChannel(final long id)
+ {
+ return channels.get(id);
+ }
+
+ public void putChannel(final Channel channel)
+ {
+ if (channels.put(channel.getID(), channel) != null)
+ {
+ throw new IllegalStateException("Already a channel with id " + channel.getID());
+ }
+ }
+
+ public Channel removeChannel(final long id)
+ {
+ return channels.remove(id);
+ }
+
+}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -1045,7 +1045,10 @@
protected PagingManager createPagingManager()
{
- return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
+ return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ executorFactory,
+ remotingService.getChannelManager(),
+ this),
storageManager,
addressSettingsRepository,
configuration.getPagingMaxGlobalSizeBytes(),
Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/PagingStorePacketHandler.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -0,0 +1,116 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_DEPAGE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
+
+import java.util.List;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
+import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.Triple;
+
+/**
+ * A PagingStorePacketHandler
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class PagingStorePacketHandler implements ChannelHandler
+{
+ private static final Logger log = Logger.getLogger(PagingStorePacketHandler.class);
+
+ private final PagingStore pagingStore;
+
+ private volatile List<Triple<Long, Long, Integer>> sequences;
+
+ private final Channel channel;
+
+ public PagingStorePacketHandler(final PagingStore pagingStore, final Channel channel)
+ {
+ this.pagingStore = pagingStore;
+
+ this.channel = channel;
+ }
+
+ public void handlePacket(final Packet packet)
+ {
+ switch (packet.getType())
+ {
+ case REPLICATE_LOCK_SEQUENCES:
+ {
+ ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
+
+ sequences = msg.getSequences();
+
+ // log.info("got sequences on queue " + msg.getID());
+ // dumpSequences(sequences);
+
+ break;
+ }
+ case REPLICATE_DEPAGE:
+ {
+ JBMThread thread = JBMThread.currentThread();
+
+ thread.setReplay(sequences);
+
+ try
+ {
+ pagingStore.readPage();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to read page", e);
+ }
+
+ channel.send(new ReplicationResponseMessage());
+
+ thread.setNoReplayOrRecord(12);
+
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Invalid packet " + packet);
+ }
+ }
+ }
+
+ private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+ {
+ log.info(Thread.currentThread() + " Got on paging store replication ph Sequences size is " + sequences.size());
+
+ for (Triple<Long, Long, Integer> sequence: sequences)
+ {
+ log.info(sequence.a + ": " + sequence.b);
+ }
+ }
+}
Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueuePacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueuePacketHandler.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueuePacketHandler.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.server.impl;
+
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_LOCK_SEQUENCES;
+
+import java.util.List;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.Triple;
+
+/**
+ * A QueuePacketHandler
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class QueuePacketHandler implements ChannelHandler
+{
+ private static final Logger log = Logger.getLogger(QueuePacketHandler.class);
+
+ private final Queue queue;
+
+ private volatile List<Triple<Long, Long, Integer>> sequences;
+
+ private final Channel channel;
+
+ public QueuePacketHandler(final Queue queue, final Channel channel)
+ {
+ this.queue = queue;
+
+ this.channel = channel;
+ }
+
+ public void handlePacket(final Packet packet)
+ {
+ switch (packet.getType())
+ {
+ case REPLICATE_LOCK_SEQUENCES:
+ {
+ ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
+
+ sequences = msg.getSequences();
+
+ // log.info("got sequences on queue " + msg.getID());
+ // dumpSequences(sequences);
+
+ break;
+ }
+ case PacketImpl.REPLICATE_QUEUE_DELIVERY:
+ {
+ // log.info("Got replicated queue delivery");
+
+ JBMThread thread = JBMThread.currentThread();
+
+ thread.setReplay(sequences);
+
+ queue.deliverOne();
+
+ channel.send(new ReplicationResponseMessage());
+
+ thread.setNoReplayOrRecord(12);
+
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Invalid packet " + packet);
+ }
+ }
+ }
+
+ private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+ {
+ log.info(Thread.currentThread() + " Got on queue replication ph Sequences size is " + sequences.size());
+
+ for (Triple<Long, Long, Integer> sequence : sequences)
+ {
+ log.info(sequence.a + ": " + sequence.b);
+ }
+ }
+}
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-08-03 18:28:37 UTC (rev 7659)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-08-03 19:11:48 UTC (rev 7660)
@@ -233,7 +233,7 @@
protected int getNumberOfMessages()
{
- return 500;
+ return 1000;
}
protected int getNumberOfThreads()
More information about the jboss-cvs-commits
mailing list