JBoss hornetq SVN: r11146 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/management/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-08 11:45:41 -0400 (Mon, 08 Aug 2011)
New Revision: 11146
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
Log:
change requested
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-08-08 07:47:16 UTC (rev 11145)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-08-08 15:45:41 UTC (rev 11146)
@@ -1522,7 +1522,8 @@
{
checkStarted();
- if (pageSizeBytes >= maxSizeBytes)
+ // JBPAPP-6334 requested this to be pageSizeBytes > maxSizeBytes
+ if (pageSizeBytes > maxSizeBytes)
{
throw new IllegalStateException("pageSize has to be lower than maxSizeBytes. Invalid argument (" + pageSizeBytes + " < " + maxSizeBytes + ")");
}
13 years, 5 months
JBoss hornetq SVN: r11145 - tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-08 03:47:16 -0400 (Mon, 08 Aug 2011)
New Revision: 11145
Removed:
tags/HornetQ_2_2_7_Final_pending/
Log:
unneeded branch
13 years, 5 months
JBoss hornetq SVN: r11144 - tags.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-08-08 03:46:18 -0400 (Mon, 08 Aug 2011)
New Revision: 11144
Added:
tags/HornetQ_2_2_7_Final/
Log:
2.2.7.Final
13 years, 5 months
JBoss hornetq SVN: r11143 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl: wireformat and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-05 11:07:05 -0400 (Fri, 05 Aug 2011)
New Revision: 11143
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateReplicationSessionMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
Log:
HORNETQ-720 Get rid of createReplication left-overs
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-05 15:01:37 UTC (rev 11142)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-05 15:07:05 UTC (rev 11143)
@@ -17,7 +17,6 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
@@ -88,7 +87,6 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
@@ -106,7 +104,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -115,6 +112,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
@@ -428,11 +426,6 @@
packet = new SessionProducerCreditsMessage();
break;
}
- case CREATE_REPLICATION:
- {
- packet = new CreateReplicationSessionMessage();
- break;
- }
case REPLICATION_APPEND:
{
packet = new ReplicationAddMessage();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-08-05 15:01:37 UTC (rev 11142)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-08-05 15:07:05 UTC (rev 11143)
@@ -69,8 +69,6 @@
public static final byte DELETE_QUEUE = 35;
- public static final byte CREATE_REPLICATION = 36;
-
// Session
public static final byte SESS_CREATECONSUMER = 40;
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateReplicationSessionMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateReplicationSessionMessage.java 2011-08-05 15:01:37 UTC (rev 11142)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateReplicationSessionMessage.java 2011-08-05 15:07:05 UTC (rev 11143)
@@ -1,75 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * @author <a href="mailto:tim.fox@jboss.com">Clebert Suconic</a>
- */
-public class CreateReplicationSessionMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long sessionChannelID;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public CreateReplicationSessionMessage(final long sessionChannelID)
- {
- super(PacketImpl.CREATE_REPLICATION);
-
- this.sessionChannelID = sessionChannelID;
- }
-
- public CreateReplicationSessionMessage()
- {
- super(PacketImpl.CREATE_REPLICATION);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeLong(sessionChannelID);
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- sessionChannelID = buffer.readLong();
- }
-
- /**
- * @return the sessionChannelID
- */
- public long getSessionChannelID()
- {
- return sessionChannelID;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
13 years, 5 months
JBoss hornetq SVN: r11142 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-05 11:01:37 -0400 (Fri, 05 Aug 2011)
New Revision: 11142
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 _Initial_ support for replication during sync.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-05 15:00:17 UTC (rev 11141)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-05 15:01:37 UTC (rev 11142)
@@ -218,6 +218,8 @@
private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
+ private final boolean hasCallbackSupport;
+
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
{
this(config, executorFactory, null);
@@ -304,8 +306,8 @@
{
throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
}
+ hasCallbackSupport = journalFF.isSupportsCallbacks();
-
idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
@@ -3480,4 +3482,9 @@
journal.stop();
}
+ public boolean hasCallbackSupport()
+ {
+ return hasCallbackSupport;
+ }
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-05 15:00:17 UTC (rev 11141)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-05 15:01:37 UTC (rev 11142)
@@ -456,7 +456,8 @@
JournalImpl journal = assertJournalImpl(journalIf);
Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
- registerJournal(packet.getJournalContentType().typeByte, new ReplicatingJournal(current));
+ registerJournal(packet.getJournalContentType().typeByte,
+ new ReplicatingJournal(current, storage.hasCallbackSupport()));
}
// XXX HORNETQ-720 really need to do away with this once the method calls get stable.
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-05 15:00:17 UTC (rev 11141)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-05 15:01:37 UTC (rev 11142)
@@ -1,7 +1,11 @@
package org.hornetq.core.journal.impl;
import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
@@ -10,6 +14,9 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
/**
* Journal used at a replicating backup server during the synchronization of data with the 'live'
@@ -19,69 +26,120 @@
*/
public class ReplicatingJournal implements Journal
{
+ private final ReentrantLock lockAppend = new ReentrantLock();
+ private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private final JournalFile file;
+ private final boolean hasCallbackSupport;
/**
* @param file
*/
- public ReplicatingJournal(JournalFile file)
+ public ReplicatingJournal(JournalFile file, boolean hasCallbackSupport)
{
this.file = file;
+ this.hasCallbackSupport = hasCallbackSupport;
}
@Override
public void start() throws Exception
{
- // TODO Auto-generated method stub
-
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void stop() throws Exception
{
- // TODO Auto-generated method stub
-
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public boolean isStarted()
{
- // TODO Auto-generated method stub
- return false;
+ throw new UnsupportedOperationException();
}
+ // ------------------------
@Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
- throws Exception
+ throws Exception
{
- throw new UnsupportedOperationException();
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, completionCallback);
}
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendAddRecord(id, recordType, record, sync, callback);
+
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
}
+ // ------------------------
+
+ private void readLockJournal()
+ {
+ journalLock.readLock().lock();
+ }
+
+ private void readUnlockJournal()
+ {
+ journalLock.readLock().unlock();
+ }
+
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync,
- IOCompletion completionCallback) throws Exception
+ IOCompletion callback) throws Exception
{
- throw new UnsupportedOperationException();
+ JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+ if (callback != null)
+ {
+ callback.storeLineUp();
+ }
+
+ lockAppend.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+ }
+ finally
+ {
+ lockAppend.unlock();
+ }
+
}
+ /**
+ * @param addRecord
+ * @param b
+ * @param sync
+ * @param object
+ * @param callback
+ * @return
+ */
+ private JournalFile appendRecord(JournalInternalRecord addRecord, boolean b, boolean sync, Object object,
+ IOCompletion callback)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
@Override
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
@@ -89,146 +147,145 @@
appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync,
- IOCompletion completionCallback) throws Exception
+ IOCompletion completionCallback) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecord(long id, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendCommitRecord(long txID, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
- public void
- appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext)
+ throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
- public void
- appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback)
- throws Exception
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback)
+ throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendRollbackRecord(long txID, boolean sync) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public JournalLoadInformation loadInternalOnly() throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
@@ -242,13 +299,13 @@
List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure)
throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
public int getAlignment() throws Exception
{
- throw new UnsupportedOperationException();
+ throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
@@ -274,4 +331,17 @@
{
throw new UnsupportedOperationException();
}
+
+ private SyncIOCompletion getSyncCallback(final boolean sync)
+ {
+ if (hasCallbackSupport)
+ {
+ if (sync)
+ {
+ return new SimpleWaitIOCallback();
+ }
+ return DummyCallback.getInstance();
+ }
+ return null;
+ }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-05 15:00:17 UTC (rev 11141)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-05 15:01:37 UTC (rev 11142)
@@ -72,8 +72,9 @@
// SEND more messages, now with the backup replicating
sendMessages(session, producer, N_MSGS);
+ handler.deliver = true;
+ sendMessages(session, producer, 1);
- handler.notifyAll();
waitForBackup(sessionFactory, 10, true);
Set<Long> liveIds = getFileIds(messageJournal);
@@ -199,7 +200,6 @@
public BackupSyncDelay(ReplicationChannelHandler handler)
{
this.handler = handler;
- // TODO Auto-generated constructor stub
}
@Override
@@ -228,6 +228,8 @@
{
private ChannelHandler handler;
+ private Packet onHold;
+ public volatile boolean deliver;
public void addSubHandler(ChannelHandler handler)
{
@@ -237,21 +239,17 @@
@Override
public void handlePacket(Packet packet)
{
- System.out.println(packet);
+ if (onHold != null && deliver)
+ {
+ handler.handlePacket(onHold);
+ }
if (packet.getType() == PacketImpl.REPLICATION_SYNC)
{
ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
if (syncMsg.isUpToDate())
{
- // Hold the message that notifies the backup that sync is done.
- try
- {
- wait();
- }
- catch (InterruptedException e)
- {
- // no-op
- }
+ onHold = packet;
+ return;
}
}
handler.handlePacket(packet);
13 years, 5 months
JBoss hornetq SVN: r11141 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-05 11:00:17 -0400 (Fri, 05 Aug 2011)
New Revision: 11141
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Control when backup reaches "up-to-date" status during tests
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-05 14:59:02 UTC (rev 11140)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-05 15:00:17 UTC (rev 11141)
@@ -21,7 +21,7 @@
private JournalContent journalType;
/** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()} */
private long fileId;
- private boolean backupIsUpToDate = false;
+ private boolean backupIsUpToDate;
private byte[] byteArray;
public ReplicationJournalFileMessage()
@@ -33,6 +33,7 @@
{
this();
this.fileId = id;
+ this.backupIsUpToDate = id == -1;
this.dataSize = size;
this.data = buffer;
this.journalType = content;
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java 2011-08-05 14:59:02 UTC (rev 11140)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java 2011-08-05 15:00:17 UTC (rev 11141)
@@ -23,4 +23,4 @@
public interface IOCompletion extends IOAsyncTask
{
void storeLineUp();
-}
+}
\ No newline at end of file
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-05 14:59:02 UTC (rev 11140)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-05 15:00:17 UTC (rev 11141)
@@ -4,6 +4,7 @@
import java.util.Set;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientProducer;
@@ -13,7 +14,14 @@
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
+import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -24,6 +32,7 @@
private ClientSessionFactoryInternal sessionFactory;
private ClientSession session;
private ClientProducer producer;
+ private ReplicationChannelHandler handler;
private static final int N_MSGS = 100;
@Override
@@ -49,6 +58,8 @@
public void testReserveFileIdValuesOnBackup() throws Exception
{
+ handler = new ReplicationChannelHandler();
+ liveServer.addInterceptor(new BackupSyncDelay(handler));
createProducerSendSomeMessages();
JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
for (int i = 0; i < 5; i++)
@@ -57,11 +68,14 @@
sendMessages(session, producer, N_MSGS);
}
backupServer.start();
- waitForBackup(sessionFactory, 10);
+ waitForBackup(sessionFactory, 10, false);
// SEND more messages, now with the backup replicating
sendMessages(session, producer, N_MSGS);
+ handler.notifyAll();
+ waitForBackup(sessionFactory, 10, true);
+
Set<Long> liveIds = getFileIds(messageJournal);
assertFalse("should not be initialized", backupServer.getServer().isInitialised());
crash(session);
@@ -146,6 +160,10 @@
@Override
protected void tearDown() throws Exception
{
+ if (handler != null)
+ {
+ handler.notifyAll();
+ }
if (sessionFactory != null)
sessionFactory.close();
if (session != null)
@@ -173,4 +191,71 @@
return TransportConfigurationUtils.getInVMConnector(live);
}
+ private class BackupSyncDelay implements Interceptor
+ {
+
+ private final ReplicationChannelHandler handler;
+
+ public BackupSyncDelay(ReplicationChannelHandler handler)
+ {
+ this.handler = handler;
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ {
+ try
+ {
+ ReplicationEndpoint repEnd = backupServer.getServer().getReplicationEndpoint();
+ handler.addSubHandler(repEnd);
+ Channel repChannel = repEnd.getChannel();
+ repChannel.setHandler(handler);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
+ }
+
+ private static class ReplicationChannelHandler implements ChannelHandler
+ {
+
+ private ChannelHandler handler;
+
+ public void addSubHandler(ChannelHandler handler)
+ {
+ this.handler = handler;
+ }
+
+ @Override
+ public void handlePacket(Packet packet)
+ {
+ System.out.println(packet);
+ if (packet.getType() == PacketImpl.REPLICATION_SYNC)
+ {
+ ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
+ if (syncMsg.isUpToDate())
+ {
+ // Hold the message that notifies the backup that sync is done.
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ // no-op
+ }
+ }
+ }
+ handler.handlePacket(packet);
+ }
+
+ }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-05 14:59:02 UTC (rev 11140)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-05 15:00:17 UTC (rev 11141)
@@ -240,12 +240,22 @@
*/
protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, long seconds) throws Exception
{
+ waitForBackup(sessionFactory, seconds, true);
+ }
+
+ /**
+ * @param sessionFactory
+ * @param seconds
+ * @param waitForSync whether to wait for sync'ing data with the live to finish or not
+ */
+ protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, long seconds, boolean waitForSync)
+ {
final long toWait = seconds * 1000;
final long time = System.currentTimeMillis();
final HornetQServerImpl actualServer = (HornetQServerImpl)backupServer.getServer();
while (true)
{
- if (sessionFactory.getBackupConnector() != null && actualServer.isRemoteBackupUpToDate())
+ if (sessionFactory.getBackupConnector() != null && (actualServer.isRemoteBackupUpToDate() || !waitForSync))
{
break;
}
@@ -263,9 +273,6 @@
//ignore
}
}
-
- System.out.println("Backup server state: [started=" + actualServer.isStarted() + ", upToDate=" +
- actualServer.isRemoteBackupUpToDate() + "]");
}
protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)
13 years, 5 months
JBoss hornetq SVN: r11140 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl/wireformat and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-05 10:59:02 -0400 (Fri, 05 Aug 2011)
New Revision: 11140
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
HORNETQ-720 Rename Packet type code to something that makes more sense
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-05 14:58:22 UTC (rev 11139)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-05 14:59:02 UTC (rev 11140)
@@ -533,7 +533,7 @@
packet = new HaBackupRegistrationMessage();
break;
}
- case PacketImpl.REPLICATION_FILE_ID:
+ case PacketImpl.REPLICATION_START_SYNC:
{
packet = new ReplicationStartSyncMessage();
break;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-08-05 14:58:22 UTC (rev 11139)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-08-05 14:59:02 UTC (rev 11140)
@@ -194,9 +194,10 @@
public static final byte SUBSCRIBE_TOPOLOGY = 112;
+ /** XXX HORNETQ-720 "HA" is not really used anywhere else. Better name? */
public static final byte HA_BACKUP_REGISTRATION = 113;
- public static final byte REPLICATION_FILE_ID = 120;
+ public static final byte REPLICATION_START_SYNC = 120;
// Static --------------------------------------------------------
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-08-05 14:58:22 UTC (rev 11139)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-08-05 14:59:02 UTC (rev 11140)
@@ -19,7 +19,7 @@
public ReplicationStartSyncMessage()
{
- super(REPLICATION_FILE_ID);
+ super(REPLICATION_START_SYNC);
}
public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-05 14:58:22 UTC (rev 11139)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-05 14:59:02 UTC (rev 11140)
@@ -192,7 +192,7 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
- else if (type == PacketImpl.REPLICATION_FILE_ID)
+ else if (type == PacketImpl.REPLICATION_START_SYNC)
{
handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
}
13 years, 5 months
JBoss hornetq SVN: r11139 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: server and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-05 10:58:22 -0400 (Fri, 05 Aug 2011)
New Revision: 11139
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/management/ManagementService.java
Log:
HORNETQ-720 Remove unused 'create replication' code
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-08-05 13:41:29 UTC (rev 11138)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-08-05 14:58:22 UTC (rev 11139)
@@ -25,14 +25,11 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.version.Version;
@@ -102,16 +99,6 @@
break;
}
- case PacketImpl.CREATE_REPLICATION:
- {
- // Create queue can also be fielded here in the case of a replicated store and forward queue creation
-
- CreateReplicationSessionMessage request = (CreateReplicationSessionMessage)packet;
-
- handleCreateReplication(request);
-
- break;
- }
default:
{
log.error("Invalid packet " + packet);
@@ -293,32 +280,4 @@
log.error("Failed to handle create queue", e);
}
}
-
- private void handleCreateReplication(final CreateReplicationSessionMessage request)
- {
- Packet response;
-
- try
- {
- Channel channel = connection.getChannel(request.getSessionChannelID(), -1);
-
- ReplicationEndpoint endpoint = server.connectToReplicationEndpoint(channel);
-
- channel.setHandler(endpoint);
-
- response = new NullResponseMessage();
- }
- catch (HornetQException e)
- {
- response = new HornetQExceptionMessage(e);
- }
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
-
- channel1.send(response);
- }
-
}
\ No newline at end of file
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-08-05 13:41:29 UTC (rev 11138)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-08-05 14:58:22 UTC (rev 11139)
@@ -27,7 +27,6 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -91,10 +90,6 @@
void unregisterActivateCallback(ActivateCallback callback);
- /** The journal at the backup server has to be equivalent as the journal used on the live node.
- * Or else the backup node is out of sync. */
- ReplicationEndpoint connectToReplicationEndpoint(Channel channel) throws Exception;
-
ServerSession createSession(String name,
String username,
String password,
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-05 13:41:29 UTC (rev 11138)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-05 14:58:22 UTC (rev 11139)
@@ -1071,7 +1071,7 @@
* XXX FIXME to be made private, and method removed from Server interface once HORNETQ-720 is
* finished.
*/
- public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
+ private synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
{
if (!configuration.isBackup())
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/management/ManagementService.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/management/ManagementService.java 2011-08-05 13:41:29 UTC (rev 11138)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/management/ManagementService.java 2011-08-05 14:58:22 UTC (rev 11139)
@@ -18,11 +18,9 @@
import javax.management.ObjectName;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ObjectNameBuilder;
-import org.hornetq.core.cluster.DiscoveryGroup;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
13 years, 5 months
JBoss hornetq SVN: r11138 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-05 09:41:29 -0400 (Fri, 05 Aug 2011)
New Revision: 11138
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
Fixing confirmation window size
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-05 13:10:52 UTC (rev 11137)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-05 13:41:29 UTC (rev 11138)
@@ -105,6 +105,8 @@
private final boolean useDuplicateDetection;
private final boolean routeWhenNoConsumers;
+
+ private final int confirmationWindowSize;
private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<String, MessageFlowRecord>();
@@ -190,6 +192,8 @@
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+ this.confirmationWindowSize = confirmationWindowSize;
this.executorFactory = executorFactory;
@@ -288,6 +292,8 @@
this.routeWhenNoConsumers = routeWhenNoConsumers;
+ this.confirmationWindowSize = confirmationWindowSize;
+
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
@@ -463,12 +469,16 @@
serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
serverLocator.setConnectionTTL(connectionTTL);
- if (serverLocator.getConfirmationWindowSize() < 0)
+ if (confirmationWindowSize < 0)
{
// We can't have confirmationSize = -1 on the cluster Bridge
// Otherwise we won't have confirmation working
serverLocator.setConfirmationWindowSize(0);
}
+ else
+ {
+ serverLocator.setConfirmationWindowSize(confirmationWindowSize);
+ }
if (!useDuplicateDetection)
{
13 years, 5 months
JBoss hornetq SVN: r11137 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-05 09:10:52 -0400 (Fri, 05 Aug 2011)
New Revision: 11137
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
damn
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-05 07:23:26 UTC (rev 11136)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-05 13:10:52 UTC (rev 11137)
@@ -792,18 +792,10 @@
{
log.debug("stopping bridge " + BridgeImpl.this);
-///////////////
-// if (session != null)
-// {
-// log.debug("Cleaning up session " + session);
-// session.close();
-//
-// }
-
- session.cleanUp(false);
-
if (session != null)
{
+ log.debug("Cleaning up session " + session);
+ session.close();
session.removeFailureListener(BridgeImpl.this);
}
13 years, 5 months