Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 21:52:31 -0500 (Tue, 17 Nov 2009)
New Revision: 8303
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
Removed:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
Modified:
branches/ClebertTemporary/.classpath
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Refactoring II - won't commit at this point (some stuff pending and I wanted to save
this checkpoint)
Modified: branches/ClebertTemporary/.classpath
===================================================================
--- branches/ClebertTemporary/.classpath 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/.classpath 2009-11-18 02:52:31 UTC (rev 8303)
@@ -7,7 +7,7 @@
<classpathentry kind="src" path="tests/config"/>
<classpathentry excluding="**/.svn/**/*" kind="src"
path="tests/src">
<attributes>
- <attribute
name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY"
value="trunk/native/bin"/>
+ <attribute
name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY"
value="trunk-tmp/native/bin"/>
</attributes>
</classpathentry>
<classpathentry kind="src" path="tests/jms-tests/src"/>
Deleted:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion;
-
-
-/**
- * This represents a set of operations done as part of replication.
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface CompletionContext
-{
- /** To be called by the replication manager, when new replication is added to the
queue */
- void linedUp();
-
- boolean hasData();
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- void replicated();
-
- void afterCompletion(Runnable runnable);
-
- /** To be called when there are no more operations pending */
- void complete();
-
- /** Flush all pending callbacks on the Context */
- void flush();
-
- /** Replication may need some extra controls to guarantee ordering
- * when nothing is persisted through the contexts
- * @return The context is empty
- */
- boolean isEmpty();
-
- void setEmpty(boolean empty);
-
-}
Copied:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java (from
rev 8302,
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+import org.hornetq.core.journal.IOCompletion;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationContext extends IOCompletion
+{
+ /** To be called by the replication manager, when new replication is added to the
queue */
+ void linedUp();
+
+ boolean hasData();
+
+ void executeOnCompletion(IOCompletion runnable);
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+ /** Flush all pending callbacks on the Context */
+ void flush();
+
+ /** Replication may need some extra controls to guarantee ordering
+ * when nothing is persisted through the contexts
+ * @return The context is empty
+ */
+ boolean isEmpty();
+
+ void setEmpty(boolean empty);
+
+}
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+/**
+ * A OperationExceptionCallback
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationExceptionCallback
+{
+ void onError(int errorCode, String errorMessage);
+}
Deleted:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -1,139 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.hornetq.core.completion.CompletionContext;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * TODO: Maybe I should move this to persistence.journal. I need to check a few
dependencies first.
- *
- */
-public class CompletionContextImpl implements CompletionContext
-{
- private static final ThreadLocal<CompletionContext> tlReplicationContext = new
ThreadLocal<CompletionContext>();
-
- public static CompletionContext getContext()
- {
- CompletionContext token = tlReplicationContext.get();
- if (token == null)
- {
- token = new CompletionContextImpl();
- tlReplicationContext.set(token);
- }
- return token;
- }
-
- private List<Runnable> tasks;
-
- private int linedup = 0;
-
- private int replicated = 0;
-
- private boolean empty = false;
-
- private volatile boolean complete = false;
-
- /**
- * @param executor
- */
- public CompletionContextImpl()
- {
- super();
- }
-
- /** To be called by the replication manager, when new replication is added to the
queue */
- public void linedUp()
- {
- linedup++;
- }
-
- public boolean hasData()
- {
- return linedup > 0;
- }
-
- /** You may have several actions to be done after a replication operation is
completed. */
- public void afterCompletion(Runnable runnable)
- {
- if (complete)
- {
- // Sanity check, this shouldn't happen
- throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
- }
-
- if (tasks == null)
- {
- // No need to use Concurrent, we only add from a single thread.
- // We don't add any more Runnables after it is complete
- tasks = new ArrayList<Runnable>();
- }
-
- tasks.add(runnable);
- }
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- public synchronized void replicated()
- {
- if (++replicated == linedup && complete)
- {
- flush();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationToken#complete()
- */
- public synchronized void complete()
- {
- tlReplicationContext.set(null);
- complete = true;
- if (replicated == linedup && complete)
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (tasks != null)
- {
- for (Runnable run : tasks)
- {
- run.run();
- }
- tasks.clear();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
- */
- public boolean isEmpty()
- {
- return empty;
- }
-
- public void setEmpty(final boolean sync)
- {
- this.empty = sync;
- }
-
-}
Copied:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
(from rev 8302,
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java)
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.hornetq.core.completion.OperationContext;
+import org.hornetq.core.journal.IOCompletion;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Maybe I should move this to persistence.journal. I need to check a few
dependencies first.
+ *
+ */
+public class OperationContextImpl implements OperationContext
+{
+ private static final ThreadLocal<OperationContext> tlContext = new
ThreadLocal<OperationContext>();
+
+ public static OperationContext getContext()
+ {
+ OperationContext token = tlContext.get();
+ if (token == null)
+ {
+ token = new OperationContextImpl();
+ tlContext.set(token);
+ }
+ return token;
+ }
+
+ private List<IOCompletion> tasks;
+
+ private int linedup = 0;
+
+ private int replicated = 0;
+
+ private boolean empty = false;
+
+ private volatile boolean complete = false;
+
+ /**
+ * @param executor
+ */
+ public OperationContextImpl()
+ {
+ super();
+ }
+
+ /** To be called by the replication manager, when new replication is added to the
queue */
+ public void linedUp()
+ {
+ linedup++;
+ }
+
+ public boolean hasData()
+ {
+ return linedup > 0;
+ }
+
+ /** You may have several actions to be done after a replication operation is
completed. */
+ public void executeOnCompletion(IOCompletion completion)
+ {
+ if (complete)
+ {
+ // Sanity check, this shouldn't happen
+ throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
+ }
+
+ if (tasks == null)
+ {
+ // No need to use Concurrent, we only add from a single thread.
+ // We don't add any more Runnables after it is complete
+ tasks = new LinkedList<IOCompletion>();
+ }
+
+ tasks.add(completion);
+ }
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ public synchronized void done()
+ {
+ if (++replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public synchronized void complete()
+ {
+ tlContext.set(null);
+ complete = true;
+ if (replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
+ if (tasks != null)
+ {
+ for (IOCompletion run : tasks)
+ {
+ run.done();
+ }
+ tasks.clear();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+ */
+ public boolean isEmpty()
+ {
+ return empty;
+ }
+
+ public void setEmpty(final boolean sync)
+ {
+ this.empty = sync;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+ */
+ public void onError(int errorCode, String errorMessage)
+ {
+ if (tasks != null)
+ {
+ for (IOCompletion run : tasks)
+ {
+ run.onError(errorCode, errorMessage);
+ }
+ }
+ }
+
+}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -25,5 +25,4 @@
*/
public interface IOCompletion extends AIOCallback
{
- void waitCompletion() throws Exception;
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java 2009-11-17
21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -19,8 +19,10 @@
/**
*
- * A Journal
+ * Most methods on the journal provide a blocking version where you select the sync mode
and a non blocking mode where you pass a completion callback as a parameter.
*
+ * Notice also that even on the callback methods it's possible to pass the sync mode.
That will only make sense on the NIO operations.
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -31,14 +33,24 @@
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws
Exception;
+ void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync)
throws Exception;
+ void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws
Exception;
+ void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception;
+ void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion completionCallback) throws Exception;
+
void appendDeleteRecord(long id, boolean sync) throws Exception;
+ void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws
Exception;
+
// Transactional operations
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record)
throws Exception;
@@ -57,6 +69,8 @@
void appendCommitRecord(long txID, boolean sync) throws Exception;
+ void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception;
+
/**
*
* <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
@@ -70,10 +84,16 @@
*/
void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync)
throws Exception;
+ void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync,
IOCompletion callback) throws Exception;
+
void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws
Exception;
+ void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion
callback) throws Exception;
+
void appendRollbackRecord(long txID, boolean sync) throws Exception;
+ void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws
Exception;
+
// Load
JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -229,7 +229,7 @@
public int read(final ByteBuffer bytes) throws Exception
{
- IOCompletion waitCompletion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
int bytesRead = read(bytes, waitCompletion);
@@ -281,7 +281,7 @@
{
if (sync)
{
- IOCompletion completion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
writeDirect(bytes, true, completion);
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -178,7 +178,7 @@
{
if (sync)
{
- IOCompletion completion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
write(bytes, true, completion);
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -14,7 +14,6 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -24,13 +23,13 @@
*
*
*/
-public class DummyCallback implements IOCompletion
+class DummyCallback extends SyncIOCompletion
{
private static DummyCallback instance = new DummyCallback();
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
- public static IOCompletion getInstance()
+ public static DummyCallback getInstance()
{
return instance;
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -845,15 +845,31 @@
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record,
final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
{
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendAddRecord(id, recordType, record, sync, callback);
+
+ // We only wait on explicit callbacks
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ {
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
- IOCompletion callback = null;
-
compactingLock.readLock().lock();
try
@@ -864,8 +880,6 @@
writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled
later
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -882,11 +896,6 @@
{
compactingLock.readLock().unlock();
}
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
}
public void appendUpdateRecord(final long id, final byte recordType, final byte[]
record, final boolean sync) throws Exception
@@ -894,15 +903,31 @@
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[]
record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
public void appendUpdateRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
{
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendUpdateRecord(id, recordType, record, sync, callback);
+
+ // We only wait on explicit callbacks
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendUpdateRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ {
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
- IOCompletion callback = null;
-
compactingLock.readLock().lock();
try
@@ -924,8 +949,6 @@
writeUpdateRecord(-1, id, recordType, record, size, bb);
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -951,14 +974,23 @@
{
compactingLock.readLock().unlock();
}
+ }
+
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+ {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendDeleteRecord(id, sync, callback);
+
+ // We only wait on explicit callbacks
if (callback != null)
{
callback.waitCompletion();
}
}
-
- public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+
+ public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion
callback) throws Exception
{
if (state != STATE_LOADED)
{
@@ -967,8 +999,6 @@
compactingLock.readLock().lock();
- IOCompletion callback = null;
-
try
{
@@ -988,8 +1018,6 @@
writeDeleteRecord(-1, id, size, bb);
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -1016,11 +1044,6 @@
{
compactingLock.readLock().unlock();
}
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
}
public void appendAddRecordTransactional(final long txID, final long id, final byte
recordType, final byte[] record) throws Exception
@@ -1165,6 +1188,12 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
+
+
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion completion) throws Exception
+ {
+ appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync,
completion);
+ }
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
@@ -1174,6 +1203,18 @@
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
+ public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendPrepareRecord(txID, transactionData, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
/**
*
* <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
@@ -1187,7 +1228,7 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync) throws Exception
+ public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync, IOCompletion completion) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1198,11 +1239,6 @@
JournalTransaction tx = getTransactionInfo(txID);
- if (sync)
- {
- tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
- }
-
try
{
@@ -1214,7 +1250,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, completion);
tx.prepare(usedFile);
}
@@ -1228,11 +1264,23 @@
{
compactingLock.readLock().unlock();
}
-
- // We should wait this outside of the lock, to increase throughput
- tx.waitCompletion();
}
+
+
+
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendCommitRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements
the transaction has on each file.</p>
* <p>For example, a transaction was spread along 3 journal files with 10
pendingTransactions on each file.
@@ -1250,7 +1298,9 @@
*
* @see JournalImpl#writeTransaction(byte, long,
org.hornetq.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
*/
- public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+
+
+ public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion
callback) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1282,7 +1332,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
tx.commit(usedFile);
}
@@ -1296,15 +1346,23 @@
{
compactingLock.readLock().unlock();
}
+ }
- if (sync)
+
+ public void appendRollbackRecord(final long txID, final boolean sync) throws
Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendRollbackRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
{
- // We should wait this outside of the lock, to increase throuput
- tx.waitCompletion();
+ syncCompletion.waitCompletion();
}
+
}
-
- public void appendRollbackRecord(final long txID, final boolean sync) throws
Exception
+
+ public void appendRollbackRecord(final long txID, final boolean sync, final
IOCompletion completion) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1331,7 +1389,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, sync, tx, completion);
tx.rollback(usedFile);
}
@@ -1345,14 +1403,6 @@
{
compactingLock.readLock().unlock();
}
-
- // We should wait this outside of the lock, to increase throuput
-
- if (sync)
- {
- tx.waitCompletion();
- }
-
}
public int getAlignment() throws Exception
@@ -2833,7 +2883,7 @@
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
- IOCompletion callback) throws Exception
+ final IOCompletion parameterCallback) throws
Exception
{
try
{
@@ -2841,6 +2891,8 @@
{
throw new IllegalStateException("The journal is not loaded " +
state);
}
+
+ final IOCompletion callback;
int size = bb.capacity();
@@ -2874,25 +2926,26 @@
if (tx != null)
{
- if (callback != null)
- {
- // sanity check, it should not happen.
- throw new IllegalArgumentException("Invalid callback parameter. Use
of tx is mutually exclusive with the callback");
- }
-
// The callback of a transaction has to be taken inside the lock,
// when we guarantee the currentFile will not be changed,
// since we individualize the callback per file
if (fileFactory.isSupportsCallbacks())
{
- callback = tx.getCallback(currentFile);
+ // Set the delegated callback as a parameter
+ TransactionCallback txcallback = tx.getCallback(currentFile);
+ txcallback.setDelegateCompletion(parameterCallback);
+ callback = txcallback;
}
+ else
+ {
+ callback = null;
+ }
if (sync)
{
- // 99 % of the times this will be already synced, as previous files should
be closed already.
- // This is to have 100% guarantee the transaction will be persisted and no
loss of information would
- // happen
+ // In an edge case the transaction could still have pending data from
previous files.
+ // This shouldn't cause any blocking issues, as this is here to
guarantee we cover all possibilities
+ // on guaranteeing the data is on the disk
tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
@@ -2903,6 +2956,10 @@
tx.fillNumberOfRecords(currentFile, bb);
}
}
+ else
+ {
+ callback = parameterCallback;
+ }
// Adding fileID
bb.writerIndex(DataConstants.SIZE_BYTE);
@@ -3233,13 +3290,13 @@
return tx;
}
- private IOCompletion getSyncCallback(final boolean sync)
+ private SyncIOCompletion getSyncCallback(final boolean sync)
{
if (fileFactory.isSupportsCallbacks())
{
if (sync)
{
- return SimpleWaitIOCallback.getInstance();
+ return new SimpleWaitIOCallback();
}
else
{
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -14,9 +14,9 @@
package org.hornetq.core.journal.impl;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -26,7 +26,7 @@
*
*
*/
-public class SimpleWaitIOCallback implements IOCompletion
+public class SimpleWaitIOCallback extends SyncIOCompletion
{
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
@@ -37,12 +37,6 @@
private volatile int errorCode = 0;
- public static IOCompletion getInstance()
- {
- return new SimpleWaitIOCallback();
- }
-
-
public void done()
{
latch.countDown();
@@ -68,4 +62,9 @@
}
return;
}
+
+ public boolean waitCompletion(final long timeout) throws Exception
+ {
+ return latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
}
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.journal.IOCompletion;
+
+/**
+ * Internal class used to manage explicit syncs on the Journal through callbacks.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class SyncIOCompletion implements IOCompletion
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public abstract void waitCompletion() throws Exception;
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -31,15 +31,27 @@
private volatile String errorMessage = null;
private volatile int errorCode = 0;
+
+ private volatile int up = 0;
+
+ private volatile int done = 0;
+
+ private volatile IOCompletion delegateCompletion;
public void countUp()
{
+ up++;
countLatch.up();
}
public void done()
{
countLatch.down();
+ if (++done == up && delegateCompletion != null)
+ {
+ delegateCompletion.done();
+ delegateCompletion = null;
+ }
}
public void waitCompletion() throws InterruptedException
@@ -59,9 +71,30 @@
this.errorCode = errorCode;
countLatch.down();
+
+ if (delegateCompletion != null)
+ {
+ delegateCompletion.onError(errorCode, errorMessage);
+ }
}
/**
+ * @return the delegateCompletion
+ */
+ public IOCompletion getDelegateCompletion()
+ {
+ return delegateCompletion;
+ }
+
+ /**
+ * @param delegateCompletion the delegateCompletion to set
+ */
+ public void setDelegateCompletion(IOCompletion delegateCompletion)
+ {
+ this.delegateCompletion = delegateCompletion;
+ }
+
+ /**
* @return the errorMessage
*/
public String getErrorMessage()
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -997,8 +997,7 @@
depageTransaction.commit();
- // StorageManager does the check: if (replicated) -> do the proper cleanup
already
- storageManager.completeReplication();
+ storageManager.completeOperations();
if (isTrace)
{
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -18,6 +18,7 @@
import javax.transaction.xa.Xid;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -56,13 +57,14 @@
boolean isReplicated();
- void afterCompletion(Runnable run);
+ void afterCompleteOperations(IOCompletion run);
/** Block until the replication is done.
* @throws Exception */
- void waitOnReplication(long timeout) throws Exception;
+ void waitOnOperations(long timeout) throws Exception;
- void completeReplication();
+ /** To close the OperationsContext */
+ void completeOperations();
UUID getPersistentID();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -33,11 +33,12 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -48,6 +49,7 @@
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.SimpleWaitIOCallback;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -292,7 +294,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
if (replicator != null)
{
@@ -308,19 +310,12 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
*/
- public void waitOnReplication(final long timeout) throws Exception
+ public void waitOnOperations(final long timeout) throws Exception
{
- final CountDownLatch latch = new CountDownLatch(1);
- afterCompletion(new Runnable()
+ SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
+ afterCompleteOperations(waitCallback);
+ if (!waitCallback.waitCompletion(timeout))
{
- public void run()
- {
- latch.countDown();
- }
- });
- completeReplication();
- if (!latch.await(timeout, TimeUnit.MILLISECONDS))
- {
throw new IllegalStateException("no response received from
replication");
}
}
@@ -364,9 +359,9 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
- public void afterCompletion(Runnable run)
+ public void afterCompleteOperations(IOCompletion run)
{
- CompletionContextImpl.getContext().afterCompletion(run);
+ OperationContextImpl.getContext().executeOnCompletion(run);
}
public UUID getPersistentID()
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -20,6 +20,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -282,14 +283,6 @@
}
/* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
- */
- public void afterCompletion(Runnable run)
- {
- run.run();
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#isReplicated()
*/
public boolean isReplicated()
@@ -300,7 +293,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
}
@@ -336,7 +329,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnReplication(long timeout) throws Exception
+ public void waitOnOperations(long timeout) throws Exception
{
}
@@ -348,4 +341,12 @@
throw new IllegalStateException("Null Persistence should never be used as
replicated");
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
+ */
+ public void afterCompleteOperations(IOCompletion run)
+ {
+ run.done();
+ }
+
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -924,20 +924,13 @@
}
else
{
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new Runnable()
{
- storageManager.afterCompletion(new Runnable()
+ public void run()
{
- public void run()
- {
- addReferences(refs);
- }
- });
- }
- else
- {
- addReferences(refs);
- }
+ addReferences(refs);
+ }
+ });
}
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -15,7 +15,7 @@
import java.util.Set;
-import org.hornetq.core.completion.CompletionContext;
+import org.hornetq.core.completion.OperationContext;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -53,7 +53,7 @@
void closeContext();
/** A list of tokens that are still waiting for replications to be completed */
- Set<CompletionContext> getActiveTokens();
+ Set<OperationContext> getActiveTokens();
/**
* @param storeName
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -22,8 +22,8 @@
import org.hornetq.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.FailoverManager;
-import org.hornetq.core.completion.CompletionContext;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.OperationContext;
+import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -82,7 +82,7 @@
private final Object replicationLock = new Object();
- private final Queue<CompletionContext> pendingTokens = new
ConcurrentLinkedQueue<CompletionContext>();
+ private final Queue<OperationContext> pendingTokens = new
ConcurrentLinkedQueue<OperationContext>();
// Static --------------------------------------------------------
@@ -359,17 +359,17 @@
{
enabled = false;
- LinkedHashSet<CompletionContext> activeContexts = new
LinkedHashSet<CompletionContext>();
+ LinkedHashSet<OperationContext> activeContexts = new
LinkedHashSet<OperationContext>();
// The same context will be replicated on the pending tokens...
// as the multiple operations will be replicated on the same context
while (!pendingTokens.isEmpty())
{
- CompletionContext ctx = pendingTokens.poll();
+ OperationContext ctx = pendingTokens.poll();
activeContexts.add(ctx);
}
- for (CompletionContext ctx : activeContexts)
+ for (OperationContext ctx : activeContexts)
{
ctx.complete();
ctx.flush();
@@ -397,7 +397,7 @@
*/
public void closeContext()
{
- final CompletionContext token = getContext();
+ final OperationContext token = getContext();
if (token != null)
{
@@ -414,15 +414,15 @@
/* method for testcases only
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
- public Set<CompletionContext> getActiveTokens()
+ public Set<OperationContext> getActiveTokens()
{
- LinkedHashSet<CompletionContext> activeContexts = new
LinkedHashSet<CompletionContext>();
+ LinkedHashSet<OperationContext> activeContexts = new
LinkedHashSet<OperationContext>();
// The same context will be replicated on the pending tokens...
// as the multiple operations will be replicated on the same context
- for (CompletionContext ctx : pendingTokens)
+ for (OperationContext ctx : pendingTokens)
{
activeContexts.add(ctx);
}
@@ -435,7 +435,7 @@
{
boolean runItNow = false;
- CompletionContext repliToken = getContext();
+ OperationContext repliToken = getContext();
repliToken.linedUp();
synchronized (replicationLock)
@@ -458,7 +458,7 @@
if (runItNow)
{
- repliToken.replicated();
+ repliToken.done();
}
}
@@ -472,11 +472,11 @@
private void replicated()
{
- List<CompletionContext> tokensToExecute = getTokens();
+ List<OperationContext> tokensToExecute = getTokens();
- for (CompletionContext ctx : tokensToExecute)
+ for (OperationContext ctx : tokensToExecute)
{
- ctx.replicated();
+ ctx.done();
}
}
@@ -486,7 +486,7 @@
// Private -------------------------------------------------------
- private void sync(CompletionContext context)
+ private void sync(OperationContext context)
{
boolean executeNow = false;
synchronized (replicationLock)
@@ -507,14 +507,14 @@
}
if (executeNow)
{
- context.replicated();
+ context.done();
}
}
- public CompletionContext getContext()
+ public OperationContext getContext()
{
- return CompletionContextImpl.getContext();
+ return OperationContextImpl.getContext();
}
/**
@@ -523,11 +523,11 @@
* At last, if the list is empty, it will verify if there are any future tokens that
are sync tokens, to avoid a case where no more replication is done due to inactivity.
* @return
*/
- private List<CompletionContext> getTokens()
+ private List<OperationContext> getTokens()
{
- List<CompletionContext> retList = new LinkedList<CompletionContext>();
+ List<OperationContext> retList = new LinkedList<OperationContext>();
- CompletionContext tokenPolled = null;
+ OperationContext tokenPolled = null;
// First will get all the non replicated tokens up to the first one that is not
replicated
do
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -144,21 +144,14 @@
tx.commit();
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new Runnable()
{
- storageManager.afterCompletion(new Runnable()
+ public void run()
{
- public void run()
- {
- execPrompter();
- }
- });
- storageManager.completeReplication();
- }
- else
- {
- execPrompter();
- }
+ execPrompter();
+ }
+ });
+ storageManager.completeOperations();
}
private void execPrompter()
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -87,10 +87,7 @@
}
newList.add(groupBinding);
storageManager.addGrouping(groupBinding);
- if (storageManager.isReplicated())
- {
- storageManager.waitOnReplication(timeout);
- }
+ storageManager.waitOnOperations(timeout);
return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
}
else
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -1718,23 +1718,16 @@
final boolean flush,
final boolean closeChannel)
{
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new Runnable()
{
- storageManager.afterCompletion(new Runnable()
+ public void run()
{
- public void run()
- {
- doSendResponse(confirmPacket, response, flush, closeChannel);
- }
+ doSendResponse(confirmPacket, response, flush, closeChannel);
+ }
- });
-
- storageManager.completeReplication();
- }
- else
- {
- doSendResponse(confirmPacket, response, flush, closeChannel);
- }
+ });
+
+ storageManager.completeOperations();
}
/**
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -98,7 +98,7 @@
{
this.containsPersistent = true;
}
-
+
public long getID()
{
return id;
@@ -209,53 +209,35 @@
}
}
- Runnable execAfterCommit = null;
-
- if (operations != null)
- {
- execAfterCommit = new Runnable()
- {
- public void run()
- {
- for (TransactionOperation operation : operations)
- {
- try
- {
- operation.afterCommit(TransactionImpl.this);
- }
- catch (Exception e)
- {
- //
https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- log.warn(e.getMessage(), e);
- }
- }
- }
- };
- }
-
if (containsPersistent || (xid != null && state == State.PREPARED))
{
storageManager.commit(id);
state = State.COMMITTED;
+ }
- if (execAfterCommit != null)
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new Runnable()
+ {
+ public void run()
{
- if (storageManager.isReplicated())
+ for (TransactionOperation operation : operations)
{
- storageManager.afterCompletion(execAfterCommit);
+ try
+ {
+ operation.afterCommit(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ //
https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
}
- else
- {
- execAfterCommit.run();
- }
}
- }
- else if (execAfterCommit != null)
- {
- execAfterCommit.run();
- }
+ });
}
}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -92,7 +92,7 @@
}
session.createQueue(address, queue, true);
ClientProducer producer = session.createProducer(address);
- boolean durable = true;
+ boolean durable = false;
for (int i = 0; i < NUM; i++)
{
ClientMessage msg = session.createClientMessage(durable);
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -32,7 +32,7 @@
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
import org.hornetq.core.client.impl.FailoverManagerImpl;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -375,7 +375,7 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- CompletionContextImpl.getContext().afterCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new Runnable()
{
public void run()
{
@@ -402,7 +402,7 @@
private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- CompletionContextImpl.getContext().afterCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new Runnable()
{
public void run()
@@ -458,7 +458,7 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- CompletionContextImpl.getContext().afterCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new Runnable()
{
public void run()
@@ -468,22 +468,10 @@
});
- assertEquals(1, manager.getActiveTokens().size());
-
manager.closeContext();
assertTrue(latch.await(1, TimeUnit.SECONDS));
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
@@ -530,7 +518,7 @@
}
- CompletionContextImpl.getContext().afterCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new Runnable()
{
public void run()
@@ -552,17 +540,6 @@
assertEquals(i, executions.get(i).intValue());
}
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
-
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-17
21:40:56 UTC (rev 8302)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-18
02:52:31 UTC (rev 8303)
@@ -1155,7 +1155,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
*/
- public void afterCompletion(Runnable run)
+ public void afterCompleteOperations(Runnable run)
{
}
@@ -1163,7 +1163,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
}
@@ -1221,7 +1221,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnReplication(long timeout) throws Exception
+ public void waitOnOperations(long timeout) throws Exception
{
}