JBoss hornetq SVN: r11076 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq: core/client/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-30 15:02:39 -0400 (Sat, 30 Jul 2011)
New Revision: 11076
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
tweaks for tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ServerLocator.java 2011-07-30 18:57:05 UTC (rev 11075)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ServerLocator.java 2011-07-30 19:02:39 UTC (rev 11076)
@@ -26,6 +26,12 @@
*/
public interface ServerLocator
{
+
+ /**
+ * Returns true if close was already called
+ * @return
+ */
+ boolean isClosed();
/**
* This method will disable any checks when a GarbageCollection happens
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-30 18:57:05 UTC (rev 11075)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-30 19:02:39 UTC (rev 11076)
@@ -512,7 +512,7 @@
public boolean isClosed()
{
- return closed;
+ return closed || serverLocator.isClosed();
}
public ServerLocator getServerLocator()
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-30 18:57:05 UTC (rev 11075)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-30 19:02:39 UTC (rev 11076)
@@ -546,6 +546,11 @@
addFactory(sf);
return sf;
}
+
+ public boolean isClosed()
+ {
+ return closed || closing;
+ }
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
13 years, 4 months
JBoss hornetq SVN: r11075 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-30 14:57:05 -0400 (Sat, 30 Jul 2011)
New Revision: 11075
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-07-30 04:30:31 UTC (rev 11074)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-07-30 18:57:05 UTC (rev 11075)
@@ -509,7 +509,7 @@
this.pauseInterval = pauseInterval;
}
- public synchronized void close()
+ public void close()
{
closed = true;
13 years, 4 months
JBoss hornetq SVN: r11074 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-30 00:30:31 -0400 (Sat, 30 Jul 2011)
New Revision: 11074
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
Log:
disabling test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-07-30 03:56:15 UTC (rev 11073)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-07-30 04:30:31 UTC (rev 11074)
@@ -361,7 +361,7 @@
}
- public void testSimpleRoundRobbinNoFailure() throws Exception
+ public void _testSimpleRoundRobbinNoFailure() throws Exception
{
//TODO make this test to crash a node
setupServer(0, true, isNetty());
13 years, 4 months
JBoss hornetq SVN: r11073 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-29 23:56:15 -0400 (Fri, 29 Jul 2011)
New Revision: 11073
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
fixing tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-29 22:39:12 UTC (rev 11072)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-30 03:56:15 UTC (rev 11073)
@@ -664,7 +664,7 @@
protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
{
- final ServerLocatorInternal targetLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+ final ServerLocatorInternal targetLocator = new ServerLocatorImpl(this.clusterManagerTopology, false, record.getConnector());
targetLocator.setReconnectAttempts(0);
13 years, 4 months
JBoss hornetq SVN: r11072 - in branches/Branch_2_2_EAP_cluster_clean2: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-29 18:39:12 -0400 (Fri, 29 Jul 2011)
New Revision: 11072
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Improvements on getQueueCount
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-07-29 18:35:41 UTC (rev 11071)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-07-29 22:39:12 UTC (rev 11072)
@@ -402,7 +402,7 @@
directDeliver = false;
- executor.execute(concurrentPoller);
+ getExecutor().execute(concurrentPoller);
}
public void forceDelivery()
@@ -921,9 +921,31 @@
public long getMessagesAdded()
{
- blockOnExecutorFuture();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicLong count = new AtomicLong(0);
- return getInstantMessagesAdded();
+ getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ count.set(getInstantMessagesAdded());
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ if (!latch.await(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Timed out on waiting for MessagesAdded");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ return count.get();
}
public long getInstantMessagesAdded()
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-29 18:35:41 UTC (rev 11071)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-29 22:39:12 UTC (rev 11072)
@@ -24,8 +24,10 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.ejb.FinderException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -1274,6 +1276,40 @@
{
bb.put(getSamplebyte(j));
}
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ class TCount extends Thread
+ {
+ Queue queue;
+
+ TCount(Queue queue)
+ {
+ this.queue = queue;
+ }
+ public void run()
+ {
+ try
+ {
+ while (running.get())
+ {
+ // log.info("Message count = " + queue.getMessageCount() + " on queue " + queue.getName());
+ queue.getMessagesAdded();
+ queue.getMessageCount();
+ //log.info("Message added = " + queue.getMessagesAdded() + " on queue " + queue.getName());
+ Thread.sleep(10);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.info("Thread interrupted");
+ }
+ }
+ };
+
+ TCount tcount1 = null;
+ TCount tcount2 = null;
+
try
{
@@ -1300,7 +1336,8 @@
session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-2", null, true);
}
-
+
+
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
@@ -1309,6 +1346,7 @@
{
if (i % 500 == 0)
{
+ log.info("Sent " + i + " messages");
session.commit();
}
message = session.createMessage(true);
@@ -1338,7 +1376,24 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
+
+ Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1"));
+
+ Queue queue2 = server.locateQueue(PagingTest.ADDRESS.concat("-2"));
+
+ assertNotNull(queue1);
+
+ assertNotNull(queue2);
+
+ assertNotSame(queue1, queue2);
+ tcount1 = new TCount(queue1);
+
+ tcount2 = new TCount(queue2);
+
+ tcount1.start();
+ tcount2.start();
+
ServerLocator locator = createInVMNonHALocator();
final ClientSessionFactory sf2 = locator.createSessionFactory();
@@ -1375,8 +1430,14 @@
Assert.assertNotNull(message2);
- if (i % 1000 == 0)
+ if (i % 100 == 0)
+ {
+ if (i % 5000 == 0)
+ {
+ log.info(addressToSubscribe + " consumed " + i + " messages");
+ }
session.commit();
+ }
try
{
@@ -1437,6 +1498,20 @@
}
finally
{
+ running.set(false);
+
+ if (tcount1 != null)
+ {
+ tcount1.interrupt();
+ tcount1.join();
+ }
+
+ if (tcount2 != null)
+ {
+ tcount2.interrupt();
+ tcount2.join();
+ }
+
try
{
server.stop();
13 years, 4 months
JBoss hornetq SVN: r11071 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-29 14:35:41 -0400 (Fri, 29 Jul 2011)
New Revision: 11071
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-720 Set value for the next file ID.
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-29 17:27:23 UTC (rev 11070)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-29 18:35:41 UTC (rev 11071)
@@ -536,6 +536,11 @@
return nextFileID.incrementAndGet();
}
+ public void setNextFileID(long value)
+ {
+ nextFileID.set(value);
+ }
+
/** Get the ID part of the name */
private long getFileNameID(final String fileName)
{
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-29 17:27:23 UTC (rev 11070)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-29 18:35:41 UTC (rev 11071)
@@ -3282,6 +3282,10 @@
maxID = Math.max(maxID, id);
filesRepository.createRemoteBackupSyncFile(id);
}
+ if (maxID > 0)
+ {
+ filesRepository.setNextFileID(maxID);
+ }
}
finally
{
13 years, 4 months
JBoss hornetq SVN: r11070 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat and 5 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-29 13:27:23 -0400 (Fri, 29 Jul 2011)
New Revision: 11070
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 Add data file sync code and backupServer may only start if journals are sync'ed.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -343,10 +343,7 @@
}
/**
- * XXX FIXME Method ignores the synchronization of LargeMessages and Paging.
- * <p>
- * XXX A second version improvement would be to allow new operations to be sent to the backup,
- * while we synchronize the existing logs.
+ * XXX FIXME HORNETQ-720 Method ignores the synchronization of LargeMessages and Paging.
* @param replicationManager
* @throws HornetQException
*/
@@ -354,7 +351,7 @@
{
if (!started)
{
- throw new IllegalStateException("must be started...");
+ throw new IllegalStateException("JournalStorageManager must be started...");
}
assert replicationManager != null;
replicator = replicationManager;
@@ -364,7 +361,7 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"journals here are not JournalImpl. You can't set a replicator!");
}
- // XXX NEED to take a global lock on the StorageManager.
+ // XXX HORNETQ-720 WRITE LOCK the StorageManager.
final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
@@ -378,14 +375,15 @@
localMessageJournal.writeUnlock();
localBindingsJournal.writeUnlock();
-
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+ // XXX HORNETQ-720 UNLOCK StorageManager...
+
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
- // SEND "SYNC_DONE" msg to backup telling it can become operational.
+ replicator.sendSynchronizationDone();
}
/**
@@ -408,8 +406,8 @@
{
journal.setAutoReclaim(false);
/*
- * XXX need to check whether it is safe to proceed if compacting is running (specially at the
- * end of it)
+ * XXX HORNETQ-720 need to check whether it is safe to proceed if compacting is running
+ * (specially at the end of it)
*/
journal.forceMoveNextFile();
JournalFile[] datafiles = journal.getDataFiles();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -5,7 +5,11 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Used to copy JournalFile data over to the backup during synchronization.
+ * Message is used to:
+ * <ol>
+ * <li>copy JournalFile data over to the backup during synchronization;
+ * <li>send a up-to-date signal to backup;
+ * </ol>
*/
public final class ReplicationJournalFileMessage extends PacketImpl
{
@@ -13,7 +17,9 @@
private byte[] data;
private int dataSize;
private JournalContent journalType;
+ /** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()} */
private long fileId;
+ private boolean backupIsUpToDate = false;
public ReplicationJournalFileMessage()
{
@@ -33,18 +39,56 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeLong(fileId);
+ if (fileId == -1)
+ return;
buffer.writeByte(journalType.typeByte);
buffer.writeInt(dataSize);
- buffer.writeBytes(data, 0, dataSize);
+ // sending -1 will close the file
+ if (dataSize > -1)
+ {
+ buffer.writeBytes(data, 0, dataSize);
+ }
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
fileId = buffer.readLong();
+ if (fileId == -1)
+ {
+ backupIsUpToDate = true;
+ return;
+ }
journalType = JournalContent.getType(buffer.readByte());
int size = buffer.readInt();
- data = new byte[size];
- buffer.readBytes(data);
+ if (size > -1)
+ {
+ data = new byte[size];
+ buffer.readBytes(data);
+ }
}
+
+ public long getFileId()
+ {
+ return fileId;
+ }
+
+ public byte[] getData()
+ {
+ return data;
+ }
+
+ public JournalContent getJournalContent()
+ {
+ return journalType;
+ }
+
+ /**
+ * @return {@code true} if the live has finished synchronizing its data and the backup is
+ * therefore up-to-date, {@code false} otherwise.
+ */
+ public boolean isUpToDate()
+ {
+ return backupIsUpToDate;
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -101,4 +101,11 @@
*/
void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
+ /**
+ * Informs backup that data synchronization is done.
+ * <p>
+ * So if 'live' fails, the (up-to-date) backup now may take over its duties.
+ */
+ void sendSynchronizationDone();
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -13,6 +13,7 @@
package org.hornetq.core.replication.impl;
+import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -21,6 +22,8 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
@@ -42,6 +45,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -50,15 +54,12 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
/**
- *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public class ReplicationEndpointImpl implements ReplicationEndpoint
{
@@ -71,7 +72,7 @@
private static final boolean trace = log.isTraceEnabled();
- private final HornetQServer server;
+ private final HornetQServerImpl server;
private Channel channel;
@@ -94,7 +95,7 @@
private boolean started;
// Constructors --------------------------------------------------
- public ReplicationEndpointImpl(final HornetQServer server)
+ public ReplicationEndpointImpl(final HornetQServerImpl server)
{
this.server = server;
}
@@ -184,6 +185,10 @@
{
handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
}
+ else if (type == PacketImpl.REPLICATION_SYNC)
+ {
+ handleReplicationSynchronization((ReplicationJournalFileMessage)packet);
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
@@ -362,21 +367,63 @@
// Private -------------------------------------------------------
- private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws HornetQException
+ private void handleReplicationSynchronization(ReplicationJournalFileMessage msg) throws Exception
{
- final Journal journalIf = journals[packet.getJournalContentType().typeByte];
- if (journalIf.isStarted())
+ if (msg.isUpToDate())
{
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Journal can not be started!");
+ // XXX HORNETQ-720 must reload journals(?)
+ for (Journal j : journals)
+ {
+ JournalImpl journal = (JournalImpl)j;
+ journal.finishRemoteBackupSync();
+ }
+ server.setRemoteBackupUpToDate(true);
+ return;
}
+ Journal journalIf = getJournal(msg.getJournalContent().typeByte);
+ JournalImpl journal = assertJournalImpl(journalIf);
+
+ long id = msg.getFileId();
+ JournalFile journalFile = journal.getRemoteBackupSyncFile(id);
+ byte[] data = msg.getData();
+ if (data == null)
+ {
+ journalFile.getFile().close();
+ }
+ else
+ {
+ SequentialFile sf = journalFile.getFile();
+ if (!sf.isOpen())
+ {
+ sf.open(1, false);
+ }
+ sf.writeDirect(ByteBuffer.wrap(data), true);
+ }
+ // journal.get
+ }
+
+ private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws Exception
+ {
+ if (server.isRemoteBackupUpToDate())
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup can not be up-to-date!");
+ }
+
+ final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+
+ JournalImpl journal = assertJournalImpl(journalIf);
+ journal.createFilesForRemoteSync(packet.getFileIds());
+ }
+
+ private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException
+ {
if (!(journalIf instanceof JournalImpl))
{
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"Journals of backup server are expected to be JournalImpl");
}
- JournalImpl journal = (JournalImpl)journalIf;
-
+ return (JournalImpl)journalIf;
}
private void handleLargeMessageEnd(final ReplicationLargemessageEndMessage packet)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -513,9 +513,10 @@
while (true)
{
int bytesRead = file.read(data);
+ // sending -1 bytes will close the file.
+ replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, data, content, id));
if (bytesRead == -1)
break;
- replicatingChannel.sendBlocking(new ReplicationJournalFileMessage(bytesRead, data, content, id));
}
// XXX probably need to sync the JournalFile(?)
throw new UnsupportedOperationException();
@@ -524,7 +525,13 @@
@Override
public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
{
- replicatingChannel.sendBlocking(new ReplicationFileIdMessage(datafiles, contentType));
+ replicatingChannel.send(new ReplicationFileIdMessage(datafiles, contentType));
}
+ @Override
+ public void sendSynchronizationDone()
+ {
+ replicatingChannel.send(new ReplicationJournalFileMessage(-1, null, null, -1));
+ }
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -219,6 +219,12 @@
private boolean initialised;
+ /**
+ * Only applicable to 'remote backup servers'. If this flag is false the backup may not become
+ * 'live'.
+ */
+ private volatile boolean backupUpToDate = true;
+
// private FailoverManager replicationFailoverManager;
private ReplicationManager replicationManager;
@@ -587,8 +593,20 @@
// Server node (i.e. Life node) is not running, now the backup takes over.
//we must remember to close stuff we don't need any more
nodeManager.awaitLiveNode();
+
serverLocator.close();
replicationEndpoint.stop();
+
+ if (!isRemoteBackupUpToDate())
+ {
+ /*
+ * XXX HORNETQ-720 Live is down, and this server was not in sync. Perhaps we should
+ * first try to wait a little longer to see if the 'live' comes back?
+ */
+ throw new RuntimeException("Backup Server was not yet in sync with live");
+ }
+
+
configuration.setBackup(false);
initialisePart2();
@@ -603,7 +621,7 @@
}
public void close(final boolean permanently) throws Exception
- {
+ {
if(serverLocator != null)
{
serverLocator.close();
@@ -696,6 +714,7 @@
else
{
assert replicationEndpoint == null;
+ backupUpToDate = false;
replicationEndpoint = new ReplicationEndpointImpl(this);
activation = new SharedNothingBackupActivation();
}
@@ -2014,4 +2033,21 @@
journalStorageManager.setReplicator(replicationManager);
}
+
+ /**
+ * Whether a remote backup server was in sync with its live server. If it was not in sync, it may
+ * not take over the live's functions.
+ * <p>
+ * A local backup server or a live server should always return {@code true}
+ * @return
+ */
+ public boolean isRemoteBackupUpToDate()
+ {
+ return backupUpToDate;
+ }
+
+ public void setRemoteBackupUpToDate(boolean isUpToDate)
+ {
+ backupUpToDate = isUpToDate;
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -15,6 +15,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -63,6 +64,8 @@
private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+ private Map<Long, JournalFile> filesReservedForSync;
+
private final AtomicLong nextFileID = new AtomicLong(0);
private final int maxAIO;
@@ -175,7 +178,7 @@
for (int i = 0; i < filesToCreate; i++)
{
// Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true, false));
+ freeFiles.add(createFile(false, false, true, false, -1));
}
}
@@ -416,7 +419,7 @@
if (nextFile == null)
{
- nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension, -1);
}
else
{
@@ -434,6 +437,12 @@
return nextFile;
}
+ public void createRemoteBackupSyncFile(long fileID) throws Exception
+ {
+ assert !filesReservedForSync.containsKey(Long.valueOf(fileID));
+ filesReservedForSync.put(Long.valueOf(fileID), createFile(false, false, false, false, fileID));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -449,9 +458,10 @@
private JournalFile createFile(final boolean keepOpened,
final boolean multiAIO,
final boolean init,
- final boolean tmpCompact) throws Exception
+ final boolean tmpCompact,
+ final long fileIdPreSet) throws Exception
{
- long fileID = generateFileID();
+ long fileID = fileIdPreSet != -1 ? fileIdPreSet : generateFileID();
String fileName;
@@ -560,6 +570,22 @@
return jf;
}
- // Inner classes -------------------------------------------------
+ /**
+ * @param id
+ * @return
+ */
+ public JournalFile getRemoteBackupSyncFile(long id)
+ {
+ return filesReservedForSync.get(Long.valueOf(id));
+ }
+ public Collection<? extends JournalFile> getSyncFiles()
+ {
+ return filesReservedForSync.values();
+ }
+
+ public void clearSyncFiles()
+ {
+ filesReservedForSync.clear();
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -3266,4 +3266,64 @@
{
journalLock.writeLock().unlock();
}
+
+ /**
+ * @param fileIds
+ * @throws Exception
+ */
+ public void createFilesForRemoteSync(long[] fileIds) throws Exception
+ {
+ writeLock();
+ try
+ {
+ long maxID = -1;
+ for (long id : fileIds)
+ {
+ maxID = Math.max(maxID, id);
+ filesRepository.createRemoteBackupSyncFile(id);
+ }
+ }
+ finally
+ {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * @param id
+ * @return
+ */
+ public JournalFile getRemoteBackupSyncFile(long id)
+ {
+ return filesRepository.getRemoteBackupSyncFile(id);
+ }
+
+ /**
+ *
+ */
+ public void finishRemoteBackupSync()
+ {
+ writeLock();
+ try
+ {
+ lockAppend.lock();
+ List<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>();
+ dataFilesToProcess.addAll(filesRepository.getDataFiles());
+ filesRepository.clearDataFiles();
+ dataFilesToProcess.addAll(filesRepository.getSyncFiles());
+ filesRepository.clearSyncFiles();
+ Collections.sort(dataFilesToProcess, new JournalFileComparator());
+ for (JournalFile file : dataFilesToProcess)
+ {
+ filesRepository.addDataFileOnTop(file);
+ }
+ // XXX HORNETQ-720 still missing a "reload" call
+ }
+ finally
+ {
+ lockAppend.unlock();
+ writeUnlock();
+ }
+
+ }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-07-29 17:27:23 UTC (rev 11070)
@@ -1,5 +1,8 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.util.HashSet;
+import java.util.Set;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -7,6 +10,10 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.TransportConfigurationUtils;
public class BackupJournalSyncTest extends FailoverTestBase
@@ -15,6 +22,7 @@
private ServerLocatorInternal locator;
private ClientSessionFactoryInternal sessionFactory;
private ClientSession session;
+ private ClientProducer producer;
private static final int N_MSGS = 100;
@Override
@@ -38,15 +46,50 @@
backupServer.getServer().getNodeID());
}
+ public void testReserveFileIdValuesOnBackup() throws Exception
+ {
+ createProducerSendSomeMessages();
+ JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
+ for (int i = 0; i < 5; i++)
+ {
+ messageJournal.forceMoveNextFile();
+ sendMessages(session, producer, N_MSGS);
+ }
+ backupServer.start();
+ waitForBackup(sessionFactory, 5);
+ // XXX HORNETQ-720 must wait for backup to sync!
+
+ JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+ Set<Long> bckpIds = getFileIds(backupMsgJournal);
+ assertFalse(bckpIds.isEmpty());
+ Set<Long> liveIds = getFileIds(messageJournal);
+ assertEquals("sets must match! " + liveIds, bckpIds, liveIds);
+ }
+
+ /**
+ * @param backupMsgJournal
+ * @return
+ */
+ private Set<Long> getFileIds(JournalImpl journal)
+ {
+ Set<Long> results = new HashSet<Long>();
+ for (JournalFile jf : journal.getDataFiles())
+ {
+ results.add(Long.valueOf(jf.getFileID()));
+ }
+ return results;
+ }
+
+ static JournalImpl getMessageJournalFromServer(TestableServer server)
+ {
+ JournalStorageManager sm = (JournalStorageManager)server.getServer().getStorageManager();
+ return (JournalImpl)sm.getMessageJournal();
+ }
+
public void testMessageSync() throws Exception
{
- session = sessionFactory.createSession(true, true);
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+ createProducerSendSomeMessages();
- sendMessages(session, producer, N_MSGS);
- session.start();
-
receiveMsgs(0, N_MSGS / 2);
assertFalse("backup is not started!", backupServer.isStarted());
@@ -56,11 +99,20 @@
waitForBackup(sessionFactory, 5);
crash(session);
-
// consume N/2 from 'new' live (the old backup)
receiveMsgs(N_MSGS / 2, N_MSGS);
}
+ private void createProducerSendSomeMessages() throws HornetQException, Exception
+ {
+ session = sessionFactory.createSession(true, true);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+ producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ sendMessages(session, producer, N_MSGS);
+ session.start();
+ }
+
private void receiveMsgs(int start, int end) throws HornetQException
{
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
13 years, 4 months
JBoss hornetq SVN: r11069 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl: wireformat and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-29 13:25:36 -0400 (Fri, 29 Jul 2011)
New Revision: 11069
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java
Log:
Make 'type' private.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-07-29 17:24:59 UTC (rev 11068)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-07-29 17:25:36 UTC (rev 11069)
@@ -38,7 +38,7 @@
protected long channelID;
- protected final byte type;
+ private final byte type;
protected int size = -1;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveMessage.java 2011-07-29 17:24:59 UTC (rev 11068)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveMessage.java 2011-07-29 17:25:36 UTC (rev 11069)
@@ -87,7 +87,7 @@
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
- buffer.setByte(DataConstants.SIZE_INT, type);
+ buffer.setByte(DataConstants.SIZE_INT, getType());
buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
// Position reader for reading by Netty
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java 2011-07-29 17:24:59 UTC (rev 11068)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionSendMessage.java 2011-07-29 17:25:36 UTC (rev 11069)
@@ -82,7 +82,7 @@
int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
- buffer.setByte(DataConstants.SIZE_INT, type);
+ buffer.setByte(DataConstants.SIZE_INT, getType());
buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
// Position reader for reading by Netty
13 years, 4 months
JBoss hornetq SVN: r11068 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-29 13:24:59 -0400 (Fri, 29 Jul 2011)
New Revision: 11068
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Send fileID values used by live to backup
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -160,7 +160,22 @@
public enum JournalContent
{
- MESSAGES, BINDINGS;
+ BINDINGS((byte)0), MESSAGES((byte)1);
+
+ public final byte typeByte;
+
+ JournalContent(byte b){
+ typeByte = b;
+ }
+
+ public static JournalContent getType(byte type)
+ {
+ if (MESSAGES.typeByte == type)
+ return MESSAGES;
+ if (BINDINGS.typeByte == type)
+ return BINDINGS;
+ throw new RuntimeException("invalid byte");
+ }
}
private Journal messageJournal;
@@ -337,6 +352,10 @@
*/
public void setReplicator(ReplicationManager replicationManager) throws Exception
{
+ if (!started)
+ {
+ throw new IllegalStateException("must be started...");
+ }
assert replicationManager != null;
replicator = replicationManager;
@@ -345,24 +364,28 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"journals here are not JournalImpl. You can't set a replicator!");
}
- JournalImpl localMessageJournal = (JournalImpl)messageJournal;
- JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
- if (false)
- {
+ // XXX NEED to take a global lock on the StorageManager.
+
+ final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+ final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+
localMessageJournal.writeLock();
localBindingsJournal.writeLock();
- JournalFile[] messageFiles = prepateJournalForCopy(localMessageJournal);
- JournalFile[] bindingsFiles = prepateJournalForCopy(localBindingsJournal);
+ JournalFile[] messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+ JournalFile[] bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+
localMessageJournal.writeUnlock();
localBindingsJournal.writeUnlock();
+
+ bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
- }
- // XXX NEED to take a global lock on the StorageManager.
- bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
- messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+
+ // SEND "SYNC_DONE" msg to backup telling it can become operational.
}
/**
@@ -381,12 +404,12 @@
}
}
- private JournalFile[] prepateJournalForCopy(JournalImpl journal) throws Exception
+ private JournalFile[] prepareJournalForCopy(JournalImpl journal, JournalContent contentType) throws Exception
{
journal.setAutoReclaim(false);
/*
- * need to check whether it is safe to proceed if compacting is running (specially at the end
- * of it)
+ * XXX need to check whether it is safe to proceed if compacting is running (specially at the
+ * end of it)
*/
journal.forceMoveNextFile();
JournalFile[] datafiles = journal.getDataFiles();
@@ -394,6 +417,7 @@
{
jf.setCanReclaim(false);
}
+ replicator.reserveFileIds(datafiles, contentType);
return datafiles;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -106,6 +106,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -531,6 +532,11 @@
packet = new HaBackupRegistrationMessage();
break;
}
+ case PacketImpl.REPLICATION_FILE_ID:
+ {
+ packet = new ReplicationFileIdMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -196,6 +196,8 @@
public static final byte HA_BACKUP_REGISTRATION = 113;
+ public static final byte REPLICATION_FILE_ID = 120;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -0,0 +1,65 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Send all fileIDs used in the live server to the backup.
+ */
+public class ReplicationFileIdMessage extends PacketImpl
+{
+
+ private long[] ids;
+ private JournalContent journalType;
+
+ public ReplicationFileIdMessage()
+ {
+ super(REPLICATION_FILE_ID);
+ }
+
+ public ReplicationFileIdMessage(JournalFile[] datafiles, JournalContent contentType)
+ {
+ this();
+ ids = new long[datafiles.length];
+ for (int i = 0; i < datafiles.length; i++)
+ {
+ ids[i] = datafiles[i].getFileID();
+ }
+ journalType = contentType;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalType.typeByte);
+ buffer.writeInt(ids.length);
+ for (long id : ids)
+ {
+ buffer.writeLong(id);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ journalType = JournalContent.getType(buffer.readByte());
+ int length = buffer.readInt();
+ ids = new long[length];
+ for (int i = 0; i < length; i++)
+ {
+ ids[i] = buffer.readLong();
+ }
+ }
+
+ public JournalContent getJournalContentType()
+ {
+ return journalType;
+ }
+
+ public long[] getFileIds()
+ {
+ return ids;
+ }
+}
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java 2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -1,29 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Used to copy JournalFile data over to the backup during synchronization.
- */
-public final class ReplicationJournalFile extends PacketImpl
-{
-
- private byte[] data;
- private int dataSize;
- private JournalContent journalType;
-
- public ReplicationJournalFile()
- {
- super(REPLICATION_SYNC);
- }
-
- public ReplicationJournalFile(int size, byte[] data, JournalContent content)
- {
- this();
- this.dataSize = size;
- this.data = data;
- this.journalType = content;
- }
-
-}
Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -0,0 +1,50 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Used to copy JournalFile data over to the backup during synchronization.
+ */
+public final class ReplicationJournalFileMessage extends PacketImpl
+{
+
+ private byte[] data;
+ private int dataSize;
+ private JournalContent journalType;
+ private long fileId;
+
+ public ReplicationJournalFileMessage()
+ {
+ super(REPLICATION_SYNC);
+ }
+
+ public ReplicationJournalFileMessage(int size, byte[] data, JournalContent content, long id)
+ {
+ this();
+ this.fileId = id;
+ this.dataSize = size;
+ this.data = data;
+ this.journalType = content;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(fileId);
+ buffer.writeByte(journalType.typeByte);
+ buffer.writeInt(dataSize);
+ buffer.writeBytes(data, 0, dataSize);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ fileId = buffer.readLong();
+ journalType = JournalContent.getType(buffer.readByte());
+ int size = buffer.readInt();
+ data = new byte[size];
+ buffer.readBytes(data);
+ }
+}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -93,4 +93,12 @@
*/
void sendJournalFile(JournalFile jf, JournalContent type) throws IOException, HornetQException;
+ /**
+ * Reserve the following fileIDs in the backup server.
+ * @param datafiles
+ * @param contentType
+ * @throws HornetQException
+ */
+ void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -21,6 +21,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
@@ -28,6 +29,7 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -39,6 +41,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -80,10 +83,11 @@
private JournalLoadInformation[] journalLoadInformation;
- private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
+ private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex =
+ new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
+ private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
+ new ConcurrentHashMap<Long, LargeServerMessage>();
- private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
-
// Used on tests, to simulate failures on delete pages
private boolean deletePages = true;
@@ -176,6 +180,10 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
+ else if (type == PacketImpl.REPLICATION_FILE_ID)
+ {
+ handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
@@ -215,8 +223,8 @@
server.getManagementService().setStorageManager(storage);
- registerJournal((byte)1, storage.getMessageJournal());
- registerJournal((byte)0, storage.getBindingsJournal());
+ registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
+ registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
// We only need to load internal structures on the backup...
journalLoadInformation = storage.loadInternalOnly();
@@ -353,9 +361,24 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
- /**
- * @param packet
- */
+
+ private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws HornetQException
+ {
+ final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+ if (journalIf.isStarted())
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Journal can not be started!");
+ }
+
+ if (!(journalIf instanceof JournalImpl))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "Journals of backup server are expected to be JournalImpl");
+ }
+ JournalImpl journal = (JournalImpl)journalIf;
+
+ }
+
private void handleLargeMessageEnd(final ReplicationLargemessageEndMessage packet)
{
LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-29 17:24:59 UTC (rev 11068)
@@ -44,7 +44,8 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFile;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -507,15 +508,23 @@
public void sendJournalFile(JournalFile jf, JournalContent content) throws IOException, HornetQException
{
FileInputStream file = new FileInputStream(jf.getFile().getFileName());
- byte[] data = new byte[1 << 17]; // about 130 kB
+ final long id = jf.getFileID();
+ final byte[] data = new byte[1 << 17]; // about 130 kB
while (true)
{
int bytesRead = file.read(data);
if (bytesRead == -1)
break;
- replicatingChannel.sendBlocking(new ReplicationJournalFile(bytesRead, data, content));
+ replicatingChannel.sendBlocking(new ReplicationJournalFileMessage(bytesRead, data, content, id));
}
+ // XXX probably need to sync the JournalFile(?)
throw new UnsupportedOperationException();
}
+ @Override
+ public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
+ {
+ replicatingChannel.sendBlocking(new ReplicationFileIdMessage(datafiles, contentType));
+ }
+
}
13 years, 4 months
JBoss hornetq SVN: r11067 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-29 00:25:11 -0400 (Fri, 29 Jul 2011)
New Revision: 11067
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-29 04:22:24 UTC (rev 11066)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-29 04:25:11 UTC (rev 11067)
@@ -135,20 +135,7 @@
log.info("ZZZ Server 0 " + servers[0].describe());
- // try
- // {
- // stopServers(1);
- // waitForTopology(servers[0], 1);
- // startServers(1);
- // waitForTopology(servers[0], 2);
- // stopServers(0,1);
- // }
- // catch (Throwable e)
- // {
- // e.printStackTrace(System.out);
- // throw e;
- // }
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < 5; i++)
{
log.info("#stop #test #" + i);
Thread.sleep(500);
@@ -164,17 +151,6 @@
}
- public void testLoop() throws Exception
- {
- for (int i = 0; i < 100; i++)
- {
- log.info("#test " + i);
- testStopStart();
- tearDown();
- setUp();
- }
- }
-
public void testStopStart() throws Exception
{
startServers(0, 1);
13 years, 5 months