JBoss hornetq SVN: r11086 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 09:51:31 -0400 (Mon, 01 Aug 2011)
New Revision: 11086
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 Adjust unit-test
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-01 13:50:45 UTC (rev 11085)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-01 13:51:31 UTC (rev 11086)
@@ -56,14 +56,13 @@
sendMessages(session, producer, N_MSGS);
}
backupServer.start();
- waitForBackup(sessionFactory, 5);
+ waitForBackup(sessionFactory, 10);
// XXX HORNETQ-720 must wait for backup to sync!
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
- Set<Long> bckpIds = getFileIds(backupMsgJournal);
- assertFalse(bckpIds.isEmpty());
+ Set<Long> backupIds = getFileIds(backupMsgJournal);
Set<Long> liveIds = getFileIds(messageJournal);
- assertEquals("sets must match! " + liveIds, bckpIds, liveIds);
+ assertEquals("sets must match! " + liveIds, liveIds, backupIds);
}
/**
12 years, 9 months
JBoss hornetq SVN: r11085 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: replication/impl and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 09:50:45 -0400 (Mon, 01 Aug 2011)
New Revision: 11085
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 INCOMPLETE: Lock StorageManager before re-assigning journal fields.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01 12:50:41 UTC (rev 11084)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01 13:50:45 UTC (rev 11085)
@@ -32,6 +32,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.Xid;
@@ -155,6 +156,7 @@
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
private final BatchingIDGenerator idGenerator;
+ private final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
private ReplicationManager replicator;
@@ -361,31 +363,37 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"journals here are not JournalImpl. You can't set a replicator!");
}
+ JournalFile[] messageFiles = null;
+ JournalFile[] bindingsFiles = null;
+
// XXX HORNETQ-720 WRITE LOCK the StorageManager.
+ storageManagerLock.writeLock().lock();
+ try
+ {
+ final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+ final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
- final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
- final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
- JournalFile[] messageFiles;
- JournalFile[] bindingsFiles;
-
- localMessageJournal.writeLock();
- localBindingsJournal.writeLock();
- try
- {
- messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ localMessageJournal.writeLock();
+ localBindingsJournal.writeLock();
+ try
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ }
+ finally
+ {
+ localMessageJournal.writeUnlock();
+ localBindingsJournal.writeUnlock();
+ }
+ bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
}
finally
{
- localMessageJournal.writeUnlock();
- localBindingsJournal.writeUnlock();
+ // XXX HORNETQ-720 UNLOCK StorageManager...
+ storageManagerLock.writeLock().unlock();
}
- bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
- messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
-
- // XXX HORNETQ-720 UNLOCK StorageManager...
-
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
@@ -571,6 +579,7 @@
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
}
+ readLock();
// Note that we don't sync, the add reference that comes immediately after will sync if appropriate
if (message.isLargeMessage())
@@ -589,29 +598,45 @@
false,
getContext(false));
}
+ readUnLock();
}
public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID),
last && syncNonTransactional,
getContext(last && syncNonTransactional));
+ readUnLock();
}
+ private void readLock()
+ {
+ storageManagerLock.readLock().lock();
+ }
+
+ private void readUnLock()
+ {
+ storageManagerLock.readLock().unlock();
+ }
+
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID),
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
{
+ readLock();
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecord(ackID,
@@ -619,19 +644,23 @@
new CursorAckRecordEncoding(queueID, position),
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void deleteMessage(final long messageID) throws Exception
{
+ readLock();
// Messages are deleted on postACK, one after another.
// If these deletes are synchronized, we would build up messages on the Executor
// increasing chances of losing deletes.
// The StorageManager should verify messages without references
messageJournal.appendDeleteRecord(messageID, false, getContext(false));
+ readUnLock();
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
+ readLock();
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
@@ -640,10 +669,12 @@
encoding,
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
+ readLock();
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
messageJournal.appendAddRecord(recordID,
@@ -651,17 +682,21 @@
encoding,
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void deleteDuplicateID(final long recordID) throws Exception
{
+ readLock();
messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
+ readUnLock();
}
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
{
+ readLock();
if (message.getMessageID() <= 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
@@ -681,51 +716,61 @@
JournalStorageManager.ADD_MESSAGE,
message);
}
-
+ readUnLock();
}
public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
{
+ readLock();
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
pageTransaction);
+ readUnLock();
}
public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
depages));
+ readUnLock();
}
public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
syncNonTransactional,
getContext(syncNonTransactional));
+ readUnLock();
}
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID));
+ readUnLock();
}
public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
+ readLock();
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID));
+ readUnLock();
}
/* (non-Javadoc)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-01 12:50:41 UTC (rev 11084)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-01 13:50:45 UTC (rev 11085)
@@ -378,6 +378,7 @@
journal.finishRemoteBackupSync();
}
server.setRemoteBackupUpToDate();
+ log.info("Backup server " + server + " is synchronized with live-server.");
return;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-01 12:50:41 UTC (rev 11084)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-01 13:50:45 UTC (rev 11085)
@@ -580,7 +580,7 @@
}
catch (Exception e)
{
- log.warn("unable to announce backup for replication", e);
+ log.warn("Unable to announce backup for replication.", e);
}
}
});
12 years, 9 months
JBoss hornetq SVN: r11084 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl/wireformat and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 08:50:41 -0400 (Mon, 01 Aug 2011)
New Revision: 11084
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Fix handling of read(buffer) returning 0.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01 12:50:41 UTC (rev 11084)
@@ -366,15 +366,21 @@
final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+ JournalFile[] messageFiles;
+ JournalFile[] bindingsFiles;
+
localMessageJournal.writeLock();
localBindingsJournal.writeLock();
-
- JournalFile[] messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
- JournalFile[] bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
-
- localMessageJournal.writeUnlock();
- localBindingsJournal.writeUnlock();
-
+ try
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ }
+ finally
+ {
+ localMessageJournal.writeUnlock();
+ localBindingsJournal.writeUnlock();
+ }
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-01 12:50:41 UTC (rev 11084)
@@ -47,7 +47,7 @@
buffer.writeByte(journalType.typeByte);
buffer.writeInt(dataSize);
// sending -1 will close the file
- if (dataSize > -1)
+ if (dataSize > 0)
{
buffer.writeBytes(data);// (data, 0, dataSize);
}
@@ -64,7 +64,7 @@
}
journalType = JournalContent.getType(buffer.readByte());
int size = buffer.readInt();
- if (size > -1)
+ if (size > 0)
{
byteArray = new byte[size];
buffer.readBytes(byteArray);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-01 12:50:41 UTC (rev 11084)
@@ -377,7 +377,7 @@
JournalImpl journal = (JournalImpl)j;
journal.finishRemoteBackupSync();
}
- server.setRemoteBackupUpToDate(true);
+ server.setRemoteBackupUpToDate();
return;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-01 12:50:41 UTC (rev 11084)
@@ -508,25 +508,25 @@
public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
{
SequentialFile file = jf.getFile().copy();
+ log.info("Replication: sending " + jf + " to backup. " + file);
if (!file.isOpen())
{
file.open(1, false);
}
final long id = jf.getFileID();
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
-
while (true)
{
int bytesRead = file.read(buffer);
- if (bytesRead > -1)
+ if (bytesRead > 0)
buffer.limit(bytesRead);
- // sending -1 bytes will close the file at the backup
+ buffer.rewind();
+
+ // sending -1 or 0 bytes will close the file at the backup
replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
- if (bytesRead == -1)
+ if (bytesRead == -1 || bytesRead == 0)
break;
}
- // XXX probably need to sync the JournalFile(?)
- throw new UnsupportedOperationException();
}
@Override
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-01 12:50:41 UTC (rev 11084)
@@ -2046,8 +2046,8 @@
return backupUpToDate;
}
- public void setRemoteBackupUpToDate(boolean isUpToDate)
+ public void setRemoteBackupUpToDate()
{
- backupUpToDate = isUpToDate;
+ backupUpToDate = true;
}
}
12 years, 9 months
JBoss hornetq SVN: r11083 - in branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster: util and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 08:49:46 -0400 (Mon, 01 Aug 2011)
New Revision: 11083
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
Log:
HORNETQ-720 "waitForBackup(..)" to wait for synchronization to finish.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-01 10:31:50 UTC (rev 11082)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-01 12:49:46 UTC (rev 11083)
@@ -41,6 +41,7 @@
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -230,12 +231,29 @@
return sf;
}
- protected static void waitForBackup(ClientSessionFactoryInternal sessionFactory, long seconds) throws Exception
+ /**
+ * This method will Waits for backup to be in the "started" state and to finish synchronization
+ * with the live.
+ * @param sessionFactory
+ * @param seconds
+ * @throws Exception
+ */
+ protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, long seconds) throws Exception
{
final long toWait = seconds * 1000;
final long time = System.currentTimeMillis();
- while (sessionFactory.getBackupConnector() == null)
+ final HornetQServerImpl actualServer = (HornetQServerImpl)backupServer.getServer();
+ while (true)
{
+ if (sessionFactory.getBackupConnector() != null && actualServer.isRemoteBackupUpToDate())
+ {
+ break;
+ }
+ if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("backup server never started (" + backupServer.isStarted() + "), or never finished synchronizing (" +
+ actualServer.isRemoteBackupUpToDate() + ")");
+ }
try
{
Thread.sleep(100);
@@ -244,16 +262,10 @@
{
//ignore
}
- if (sessionFactory.getBackupConnector() != null)
- {
- break;
- }
- else if (System.currentTimeMillis() > (time + toWait))
- {
- fail("backup server never started");
- }
}
- System.out.println("sf.getBackupConnector() = " + sessionFactory.getBackupConnector());
+
+ System.out.println("Backup server state: [started=" + actualServer.isStarted() + ", upToDate=" +
+ actualServer.isRemoteBackupUpToDate() + "]");
}
protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2011-08-01 10:31:50 UTC (rev 11082)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2011-08-01 12:49:46 UTC (rev 11083)
@@ -34,7 +34,7 @@
public class RemoteProcessHornetQServer implements TestableServer
{
- private String configurationClassName;
+ private final String configurationClassName;
private Process serverProcess;
private boolean initialised = false;
private CountDownLatch initLatch;
@@ -44,7 +44,7 @@
{
this.configurationClassName = configurationClassName;
}
-
+
public boolean isInitialised()
{
if (serverProcess == null)
@@ -53,7 +53,7 @@
}
try
{
- initLatch = new CountDownLatch(1);
+ initLatch = new CountDownLatch(1);
RemoteProcessHornetQServerSupport.isInitialised(serverProcess);
boolean ok = initLatch.await(10, TimeUnit.SECONDS);
if (ok)
@@ -139,7 +139,7 @@
RemoteProcessHornetQServerSupport.crash(serverProcess);
serverProcess = null;
}
-
+
// Wait to be informed of failure
boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
12 years, 9 months
JBoss hornetq SVN: r11082 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 06:31:50 -0400 (Mon, 01 Aug 2011)
New Revision: 11082
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
Log:
HORNETQ-720 Add decode step for REPLICATION_SYNC
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-01 10:31:11 UTC (rev 11081)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-01 10:31:50 UTC (rev 11082)
@@ -107,6 +107,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -537,6 +538,11 @@
packet = new ReplicationFileIdMessage();
break;
}
+ case PacketImpl.REPLICATION_SYNC:
+ {
+ packet = new ReplicationJournalFileMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
12 years, 9 months
JBoss hornetq SVN: r11081 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 06:31:11 -0400 (Mon, 01 Aug 2011)
New Revision: 11081
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Use a copy of sequential file, and a ByteBuffer to transfer
bytes
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01 10:31:11 UTC (rev 11081)
@@ -393,7 +393,7 @@
* @throws IOException
* @throws HornetQException
*/
- private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws IOException, HornetQException
+ private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception
{
for (JournalFile jf : journalFiles)
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-01 10:31:11 UTC (rev 11081)
@@ -155,7 +155,7 @@
}
catch (Exception e)
{
- // XXX This is not what we want
+ // XXX HORNETQ-720 This is not what we want
e.printStackTrace();
throw new RuntimeException(e);
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-01 10:31:11 UTC (rev 11081)
@@ -1,5 +1,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
+import java.nio.ByteBuffer;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -14,24 +16,25 @@
public final class ReplicationJournalFileMessage extends PacketImpl
{
- private byte[] data;
+ private ByteBuffer data;
private int dataSize;
private JournalContent journalType;
/** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()} */
private long fileId;
private boolean backupIsUpToDate = false;
+ private byte[] byteArray;
public ReplicationJournalFileMessage()
{
super(REPLICATION_SYNC);
}
- public ReplicationJournalFileMessage(int size, byte[] data, JournalContent content, long id)
+ public ReplicationJournalFileMessage(int size, ByteBuffer buffer, JournalContent content, long id)
{
this();
this.fileId = id;
this.dataSize = size;
- this.data = data;
+ this.data = buffer;
this.journalType = content;
}
@@ -46,7 +49,7 @@
// sending -1 will close the file
if (dataSize > -1)
{
- buffer.writeBytes(data, 0, dataSize);
+ buffer.writeBytes(data);// (data, 0, dataSize);
}
}
@@ -63,8 +66,8 @@
int size = buffer.readInt();
if (size > -1)
{
- data = new byte[size];
- buffer.readBytes(data);
+ byteArray = new byte[size];
+ buffer.readBytes(byteArray);
}
}
@@ -75,7 +78,7 @@
public byte[] getData()
{
- return data;
+ return byteArray;
}
public JournalContent getJournalContent()
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-08-01 10:31:11 UTC (rev 11081)
@@ -13,7 +13,6 @@
package org.hornetq.core.replication;
-import java.io.IOException;
import java.util.Set;
import org.hornetq.api.core.HornetQException;
@@ -90,8 +89,9 @@
/**
* Sends the whole content of the file to be duplicated.
* @throws HornetQException
+ * @throws Exception
*/
- void sendJournalFile(JournalFile jf, JournalContent type) throws IOException, HornetQException;
+ void sendJournalFile(JournalFile jf, JournalContent type) throws Exception;
/**
* Reserve the following fileIDs in the backup server.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-01 10:31:11 UTC (rev 11081)
@@ -13,8 +13,7 @@
package org.hornetq.core.replication.impl;
-import java.io.FileInputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
@@ -26,6 +25,7 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
@@ -505,16 +505,23 @@
}
@Override
- public void sendJournalFile(JournalFile jf, JournalContent content) throws IOException, HornetQException
+ public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
{
- FileInputStream file = new FileInputStream(jf.getFile().getFileName());
+ SequentialFile file = jf.getFile().copy();
+ if (!file.isOpen())
+ {
+ file.open(1, false);
+ }
final long id = jf.getFileID();
- final byte[] data = new byte[1 << 17]; // about 130 kB
+ final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
+
while (true)
{
- int bytesRead = file.read(data);
- // sending -1 bytes will close the file.
- replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, data, content, id));
+ int bytesRead = file.read(buffer);
+ if (bytesRead > -1)
+ buffer.limit(bytesRead);
+ // sending -1 bytes will close the file at the backup
+ replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
if (bytesRead == -1)
break;
}
12 years, 9 months
JBoss hornetq SVN: r11080 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 06:30:12 -0400 (Mon, 01 Aug 2011)
New Revision: 11080
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
Log:
HORNETQ-720 Initialize Map
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-01 09:35:28 UTC (rev 11079)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-01 10:30:12 UTC (rev 11080)
@@ -14,6 +14,7 @@
package org.hornetq.core.journal.impl;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
@@ -448,6 +449,10 @@
public void createRemoteBackupSyncFile(long fileID) throws Exception
{
+ if (filesReservedForSync == null)
+ {
+ filesReservedForSync = new HashMap<Long, JournalFile>();
+ }
assert !filesReservedForSync.containsKey(Long.valueOf(fileID));
filesReservedForSync.put(Long.valueOf(fileID), createFile(false, false, false, false, fileID));
}
12 years, 9 months
JBoss hornetq SVN: r11079 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-01 05:35:28 -0400 (Mon, 01 Aug 2011)
New Revision: 11079
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
Log:
HORNETQ-720 FIX race condition when setting next fileID
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-01 04:54:03 UTC (rev 11078)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-01 09:35:28 UTC (rev 11079)
@@ -150,22 +150,31 @@
for (JournalFile file : files)
{
- long fileID = file.getFileID();
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
+ final long fileIdFromFile = file.getFileID();
+ final long fileIdFromName = getFileNameID(file.getFile().getFileName());
- long fileNameID = getFileNameID(file.getFile().getFileName());
-
// The compactor could create a fileName but use a previously assigned ID.
// Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
+ setNextFileID(Math.max(fileIdFromName, fileIdFromFile));
}
+ }
+ /**
+ * Set the {link #nextFileID} value to {@code targetUpdate} if the current value is less than
+ * {@code targetUpdate}.
+ * @param targetUpdate
+ */
+ public void setNextFileID(final long targetUpdate)
+ {
+ while (true)
+ {
+ final long current = nextFileID.get();
+ if (current >= targetUpdate)
+ return;
+
+ if (nextFileID.compareAndSet(current, targetUpdate))
+ return;
+ }
}
public void ensureMinFiles() throws Exception
@@ -536,11 +545,6 @@
return nextFileID.incrementAndGet();
}
- public void setNextFileID(long value)
- {
- nextFileID.set(value);
- }
-
/** Get the ID part of the name */
private long getFileNameID(final String fileName)
{
12 years, 9 months
JBoss hornetq SVN: r11078 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-01 00:54:03 -0400 (Mon, 01 Aug 2011)
New Revision: 11078
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-01 04:30:35 UTC (rev 11077)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-08-01 04:54:03 UTC (rev 11078)
@@ -979,6 +979,7 @@
catch (Throwable e)
{
log.info(threadDump(e.getMessage()));
+ System.err.println(threadDump(e.getMessage()));
throw new RuntimeException (e.getMessage(), e);
}
}
12 years, 9 months
JBoss hornetq SVN: r11077 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-01 00:30:35 -0400 (Mon, 01 Aug 2011)
New Revision: 11077
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Fixing tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-30 19:02:39 UTC (rev 11076)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-01 04:30:35 UTC (rev 11077)
@@ -20,6 +20,7 @@
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -1314,11 +1315,13 @@
private void updateArraysAndPairs()
{
+ Collection<TopologyMember> membersCopy = topology.getMembers();
+
topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
- topology.members());
+ membersCopy.size());
int count = 0;
- for (TopologyMember pair : topology.getMembers())
+ for (TopologyMember pair : membersCopy)
{
topologyArray[count++] = pair.getConnector();
}
12 years, 9 months