[hornetq-commits] JBoss hornetq SVN: r8018 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Sep 30 22:50:16 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-09-30 22:50:16 -0400 (Wed, 30 Sep 2009)
New Revision: 8018
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes...
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -75,9 +75,6 @@
// Load
- /** This method could be promoted to {@link Journal} interface when we decide to use the loadManager
- * instead of load(List,List)
- */
long load(LoaderCallback reloadManager) throws Exception;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -16,7 +16,7 @@
import java.util.List;
/**
- * A TransactionFailureCallback
+ * A Callback to receive information about bad transactions for extra cleanup required for broken transactions such as large messages.
*
* @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -163,13 +163,7 @@
journalDir = config.getJournalDirectory();
- if (journalDir == null)
- {
- throw new NullPointerException("journal-dir is null");
- }
- createJournalDir = config.isCreateJournalDir();
-
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
bindingsJournal = new JournalImpl(1024 * 1024,
@@ -181,6 +175,13 @@
"bindings",
1);
+ if (journalDir == null)
+ {
+ throw new NullPointerException("journal-dir is null");
+ }
+
+ createJournalDir = config.isCreateJournalDir();
+
syncNonTransactional = config.isJournalSyncNonTransactional();
syncTransactional = config.isJournalSyncTransactional();
@@ -738,7 +739,7 @@
messageJournal.perfBlast(perfBlastPages);
}
}
-
+
/**
* @param messages
* @param buff
@@ -1141,7 +1142,7 @@
// This should be accessed from this package only
void deleteFile(final SequentialFile file)
{
- executor.execute(new Runnable()
+ Runnable deleteAction = new Runnable()
{
public void run()
{
@@ -1155,7 +1156,16 @@
}
}
- });
+ };
+
+ if (executor == null)
+ {
+ deleteAction.run();
+ }
+ else
+ {
+ executor.execute(deleteAction);
+ }
}
/**
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -141,7 +141,7 @@
// Replication
- public static final byte REPLICATION_APPEND = 77;
+ public static final byte REPLICATION_APPEND = 80;
// Static --------------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -33,6 +33,9 @@
private long id;
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
private byte recordType;
private EncodingSupport encodingData;
@@ -42,27 +45,26 @@
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
+
public ReplicationAddMessage()
{
super(REPLICATION_APPEND);
}
- public ReplicationAddMessage(long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationAddMessage(byte journalID, long id, byte recordType, EncodingSupport encodingData)
{
this();
+ this.journalID = journalID;
this.id = id;
this.recordType = recordType;
this.encodingData = encodingData;
}
// Public --------------------------------------------------------
-
-
-
+
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE +
DataConstants.SIZE_INT +
@@ -73,6 +75,7 @@
@Override
public void encodeBody(final HornetQBuffer buffer)
{
+ buffer.writeByte(journalID);
buffer.writeLong(id);
buffer.writeByte(recordType);
buffer.writeInt(encodingData.getEncodeSize());
@@ -82,6 +85,7 @@
@Override
public void decodeBody(final HornetQBuffer buffer)
{
+ journalID = buffer.readByte();
id = buffer.readLong();
recordType = buffer.readByte();
int size = buffer.readInt();
@@ -89,7 +93,38 @@
buffer.readBytes(recordData);
}
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ /**
+ * @return the recordType
+ */
+ public byte getRecordType()
+ {
+ return recordType;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,6 +13,7 @@
package org.hornetq.core.replication;
+import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.server.HornetQComponent;
@@ -26,4 +27,8 @@
public interface ReplicationEndpoint extends ChannelHandler, HornetQComponent
{
+ void setChannel(Channel channel);
+
+ Channel getChannel();
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -24,5 +24,5 @@
*/
public interface ReplicationManager extends HornetQComponent
{
- void appendAddRecord(long id, byte recordType, EncodingSupport record);
+ void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,11 +13,16 @@
package org.hornetq.core.replication.impl;
-import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPacket;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.HornetQServer;
/**
@@ -32,10 +37,20 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ReplicationEndpointImpl.class);
+
// Attributes ----------------------------------------------------
private final HornetQServer server;
+ private Channel channel;
+
+ private Journal bindingsJournal;
+
+ private Journal messagingJournal;
+
+ private JournalStorageManager storage;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -45,12 +60,26 @@
}
// Public --------------------------------------------------------
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
* @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
public void handlePacket(Packet packet)
{
- System.out.println("packet = " + packet);
+ try
+ {
+ if (packet.getType() == PacketImpl.REPLICATION_APPEND)
+ {
+ System.out.println("Replicated");
+ handleAppendAddRecord(packet);
+ }
+ }
+ catch (Exception e)
+ {
+ // TODO: what to do when the IO fails on the backup side? should we shutdown the backup?
+ log.warn(e.getMessage(), e);
+ }
+ channel.send(new NullResponseMessage());
}
/* (non-Javadoc)
@@ -66,6 +95,17 @@
*/
public void start() throws Exception
{
+ Configuration config = server.getConfiguration();
+
+ // TODO: this needs an executor
+ JournalStorageManager storage = new JournalStorageManager(config, null);
+ storage.start();
+
+ this.bindingsJournal = storage.getBindingsJournal();
+ this.messagingJournal = storage.getBindingsJournal();
+
+ // We only need to load internal structures on the backup...
+ storage.loadInternalOnly();
}
/* (non-Javadoc)
@@ -73,14 +113,52 @@
*/
public void stop() throws Exception
{
+ storage.stop();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationEndpoint#getChannel()
+ */
+ public Channel getChannel()
+ {
+ return channel;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
+ */
+ public void setChannel(Channel channel)
+ {
+ this.channel = channel;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ /**
+ * @param packet
+ * @throws Exception
+ */
+ private void handleAppendAddRecord(Packet packet) throws Exception
+ {
+ ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
+ Journal journalToUse;
+
+ if (addMessage.getJournalID() == (byte)0)
+ {
+ journalToUse = bindingsJournal;
+ }
+ else
+ {
+ journalToUse = messagingJournal;
+ }
+
+ journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,11 +13,19 @@
package org.hornetq.core.replication.impl;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
@@ -33,12 +41,15 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ReplicationManagerImpl.class);
// Attributes ----------------------------------------------------
// TODO: where should this be configured?
private static final int WINDOW_SIZE = 100 * 1024;
+ private final ResponseHandler responseHandler = new ResponseHandler();
+
private final ConnectionManager connectionManager;
private RemotingConnection connection;
@@ -47,6 +58,16 @@
private boolean started;
+ private boolean playedResponsesOnFailure;
+
+ private final Object replicationLock = new Object();
+
+ private final Executor executor;
+
+ private final ThreadLocal<ReplicationToken> repliToken = new ThreadLocal<ReplicationToken>();
+
+ private final Queue<ReplicationToken> pendingTokens = new ConcurrentLinkedQueue<ReplicationToken>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -54,10 +75,11 @@
/**
* @param replicationConnectionManager
*/
- public ReplicationManagerImpl(ConnectionManager connectionManager)
+ public ReplicationManagerImpl(final ConnectionManager connectionManager, final Executor executor)
{
super();
this.connectionManager = connectionManager;
+ this.executor = executor;
}
// Public --------------------------------------------------------
@@ -65,11 +87,10 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
*/
-
-
- public void appendAddRecord(long id, byte recordType, EncodingSupport encodingData)
+
+ public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
{
- replicatingChannel.send(new ReplicationAddMessage(id, recordType, encodingData));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, id, recordType, encodingData));
}
/* (non-Javadoc)
@@ -85,22 +106,22 @@
*/
public synchronized void start() throws Exception
{
- this.started = true;
-
connection = connectionManager.getConnection(1);
long channelID = connection.generateChannelID();
Channel mainChannel = connection.getChannel(1, -1, false);
- Channel tempChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
+ this.replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
+ this.replicatingChannel.setHandler(this.responseHandler);
+
CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID,
WINDOW_SIZE);
mainChannel.sendBlocking(replicationStartPackage);
- this.replicatingChannel = tempChannel;
+ this.started = true;
}
/* (non-Javadoc)
@@ -114,7 +135,7 @@
}
this.started = false;
-
+
if (connection != null)
{
connection.destroy();
@@ -123,6 +144,63 @@
connection = null;
}
+ public ReplicationToken getReplicationToken()
+ {
+ ReplicationToken token = repliToken.get();
+ if (token == null)
+ {
+ token = new ReplicationTokenImpl(executor);
+ repliToken.set(token);
+ }
+ return token;
+ }
+
+ private void sendReplicatePacket(final Packet packet)
+ {
+ boolean runItNow = false;
+
+ ReplicationToken repliToken = getReplicationToken();
+ repliToken.linedUp();
+
+ synchronized (replicationLock)
+ {
+ if (playedResponsesOnFailure)
+ {
+ // Already replicating channel failed, so just play the action now
+
+ runItNow = true;
+ }
+ else
+ {
+ pendingTokens.add(repliToken);
+
+ // TODO: Should I use connect.write directly here?
+ replicatingChannel.send(packet);
+ }
+ }
+
+ // Execute outside lock
+
+ if (runItNow)
+ {
+ repliToken.replicated();
+ }
+ }
+
+ private void replicated()
+ {
+ ReplicationToken tokenPolled = pendingTokens.poll();
+ if (tokenPolled == null)
+ {
+ // We should debug the logs if this happens
+ log.warn("Missing replication token on the stack. There is a bug on the ReplicatoinManager since this was not supposed to happen");
+ }
+ else
+ {
+ tokenPolled.replicated();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -131,4 +209,20 @@
// Inner classes -------------------------------------------------
+ protected class ResponseHandler implements ChannelHandler
+ {
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ */
+ public void handlePacket(Packet packet)
+ {
+ System.out.println("HandlePacket on client");
+ if (packet.getType() == PacketImpl.NULL_RESPONSE)
+ {
+ replicated();
+ }
+ }
+
+ }
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -46,6 +46,7 @@
public synchronized void linedUp()
{
pendings++;
+ System.out.println("pendings (lined up) = " + pendings);
}
/** To be called by the replication manager, when data is confirmed on the channel */
@@ -62,11 +63,13 @@
tasks.clear();
}
}
+ System.out.println("pendings (replicated) = " + pendings);
}
/** You may have several actions to be done after a replication operation is completed. */
public synchronized void addFutureCompletion(Runnable runnable)
{
+ System.out.println("pendings = " + pendings);
if (pendings == 0)
{
executor.execute(runnable);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -24,6 +24,7 @@
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -71,7 +72,7 @@
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
- ReplicationEndpoint createReplicationEndpoint() throws HornetQException;
+ ReplicationEndpoint createReplicationEndpoint(Channel channel) throws Exception;
CreateSessionResponseMessage createSession(String name,
long channelID,
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -196,7 +196,7 @@
try
{
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize(), false);
- ReplicationEndpoint endpoint = server.createReplicationEndpoint();
+ ReplicationEndpoint endpoint = server.createReplicationEndpoint(channel);
channel.setHandler(endpoint);
response = new NullResponseMessage();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -200,7 +200,7 @@
private ReplicationManager replicationManager;
- private ReplicationEndpoint replicationEndpoint = new ReplicationEndpointImpl(this);
+ private ReplicationEndpoint replicationEndpoint;
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
@@ -593,7 +593,7 @@
return new CreateSessionResponseMessage(true, version.getIncrementingVersion());
}
- public synchronized ReplicationEndpoint createReplicationEndpoint() throws HornetQException
+ public synchronized ReplicationEndpoint createReplicationEndpoint(final Channel channel) throws Exception
{
if (!configuration.isBackup())
{
@@ -603,7 +603,11 @@
if (replicationEndpoint == null)
{
replicationEndpoint = new ReplicationEndpointImpl(this);
+ replicationEndpoint.start();
}
+
+ replicationEndpoint.setChannel(channel);
+
return replicationEndpoint;
}
@@ -712,7 +716,7 @@
scheduledPool,
null);
- this.replicationManager = new ReplicationManagerImpl(replicatingConnectionManager);
+ this.replicationManager = new ReplicationManagerImpl(replicatingConnectionManager, this.executorFactory.getExecutor());
replicationManager.start();
}
}
@@ -1113,6 +1117,8 @@
}
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
}
+
+ startReplication();
initialised = true;
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -18,16 +18,15 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.client.impl.ConnectionManagerImpl;
@@ -36,39 +35,19 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.impl.HornetQServerControlImpl;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
-import org.hornetq.core.security.HornetQSecurityManager;
-import org.hornetq.core.security.Role;
-import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
-import org.hornetq.core.server.ServerSession;
-import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.core.version.Version;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.SimpleString;
/**
* A ReplicationTest
@@ -111,7 +90,7 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
manager.stop();
}
@@ -134,7 +113,7 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
try
{
manager.start();
@@ -165,10 +144,20 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
- manager.appendAddRecord(1, (byte)1, new DataImplement());
- Thread.sleep(1000);
+ manager.appendAddRecord((byte)0, 1, (byte)1, new DataImplement());
+ final CountDownLatch latch = new CountDownLatch(1);
+ manager.getReplicationToken().addFutureCompletion(new Runnable()
+ {
+
+ public void run()
+ {
+ latch.countDown();
+ }
+
+ });
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
manager.stop();
}
finally
@@ -176,7 +165,7 @@
server.stop();
}
}
-
+
class DataImplement implements EncodingSupport
{
@@ -196,7 +185,7 @@
{
return 5;
}
-
+
}
// Package protected ---------------------------------------------
@@ -279,313 +268,4 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
- static class FakeServer implements HornetQServer
- {
-
- public Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
- boolean durable,
- boolean temporary) throws Exception
- {
- return null;
- }
-
- public CreateSessionResponseMessage createSession(String name,
- long channelID,
- String username,
- String password,
- int minLargeMessageSize,
- int incrementingVersion,
- RemotingConnection remotingConnection,
- boolean autoCommitSends,
- boolean autoCommitAcks,
- boolean preAcknowledge,
- boolean xa,
- int producerWindowSize) throws Exception
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#deployQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString, boolean, boolean)
- */
- public Queue deployQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filterString,
- boolean durable,
- boolean temporary) throws Exception
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#destroyQueue(org.hornetq.utils.SimpleString, org.hornetq.core.server.ServerSession)
- */
- public void destroyQueue(SimpleString queueName, ServerSession session) throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getAddressSettingsRepository()
- */
- public HierarchicalRepository<AddressSettings> getAddressSettingsRepository()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getClusterManager()
- */
- public ClusterManager getClusterManager()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getConfiguration()
- */
- public Configuration getConfiguration()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getConnectionCount()
- */
- public int getConnectionCount()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getExecutorFactory()
- */
- public ExecutorFactory getExecutorFactory()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getHornetQServerControl()
- */
- public HornetQServerControlImpl getHornetQServerControl()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getMBeanServer()
- */
- public MBeanServer getMBeanServer()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getManagementService()
- */
- public ManagementService getManagementService()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getNodeID()
- */
- public SimpleString getNodeID()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getPostOffice()
- */
- public PostOffice getPostOffice()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getQueueFactory()
- */
- public QueueFactory getQueueFactory()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getRemotingService()
- */
- public RemotingService getRemotingService()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getResourceManager()
- */
- public ResourceManager getResourceManager()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSecurityManager()
- */
- public HornetQSecurityManager getSecurityManager()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSecurityRepository()
- */
- public HierarchicalRepository<Set<Role>> getSecurityRepository()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSession(java.lang.String)
- */
- public ServerSession getSession(String name)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSessions()
- */
- public Set<ServerSession> getSessions()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSessions(java.lang.String)
- */
- public List<ServerSession> getSessions(String connectionID)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getStorageManager()
- */
- public StorageManager getStorageManager()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getVersion()
- */
- public Version getVersion()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#isInitialised()
- */
- public boolean isInitialised()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#isStarted()
- */
- public boolean isStarted()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#reattachSession(org.hornetq.core.remoting.RemotingConnection, java.lang.String, int)
- */
- public ReattachSessionResponseMessage reattachSession(RemotingConnection connection,
- String name,
- int lastReceivedCommandID) throws Exception
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#registerActivateCallback(org.hornetq.core.server.ActivateCallback)
- */
- public void registerActivateCallback(ActivateCallback callback)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#removeSession(java.lang.String)
- */
- public void removeSession(String name) throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#unregisterActivateCallback(org.hornetq.core.server.ActivateCallback)
- */
- public void unregisterActivateCallback(ActivateCallback callback)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
- */
- public void start() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
- public void stop() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#createReplicationEndpoint()
- */
- public ReplicationEndpoint createReplicationEndpoint()
- {
- return new ReplicationEndpointImpl(this);
- }
-
- }
}
More information about the hornetq-commits
mailing list