[hornetq-commits] JBoss hornetq SVN: r8366 - in branches/ClebertCallback: src/main/org/hornetq/core/asyncio/impl and 11 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sat Nov 21 18:29:51 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-11-21 18:29:50 -0500 (Sat, 21 Nov 2009)
New Revision: 8366
Removed:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Removing sync operations on Replication and journal
Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -41,20 +41,13 @@
* */
long size() throws HornetQException;
- /** Some operations may need to be done only after persitency is done.
- * for instance, when a messaging system needs to guarantee ordering over non-persistent data,
- * it needs to make sure it will only deliver the message after all the data is persisted.
- * The sync won't perform any disk operation however it will wait for all the current pending operations
- * on this file to be finished */
- void syncCallback(AIOCallback aioCallback);
-
/** Any error will be reported on the callback interface */
void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws HornetQException;
void fill(long position, int blocks, long size, byte fillChar) throws HornetQException;
-
+
void setBufferCallback(BufferCallback callback);
int getBlockSize();
Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -295,9 +295,9 @@
public void run()
{
writeSemaphore.acquireUninterruptibly();
-
- final long sequence = nextWritingSequence.getAndIncrement();
+ long sequence = nextWritingSequence.getAndIncrement();
+
try
{
write(handler, sequence, position, size, directByteBuffer, aioCallback);
@@ -321,7 +321,7 @@
{
writeSemaphore.acquireUninterruptibly();
- final long sequence = nextWritingSequence.getAndIncrement();
+ long sequence = nextWritingSequence.getAndIncrement();
try
{
@@ -438,42 +438,16 @@
}
}
- // Callback methods ------------------------------------------------------------------
+ // Private ---------------------------------------------------------------------------
- public void syncCallback(final AIOCallback callback)
- {
- pendingWrites.up();
-
- writeExecutor.execute(new Runnable()
- {
- public void run()
- {
- callbackLock.lock();
-
- try
- {
- final long sequence = nextWritingSequence.getAndIncrement();
-
- // This will execute the callback immediately if nothing is pending,
- // or it will place it to the queue waiting for a response
- executeCallback(callback, sequence);
-
- }
- finally
- {
- callbackLock.unlock();
- }
- }
- });
-
- }
-
/** */
@SuppressWarnings("unused")
private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
{
writeSemaphore.release();
+ pendingWrites.down();
+
callbackLock.lock();
try
@@ -482,11 +456,20 @@
if (sequence == -1)
{
callback.done();
- pendingWrites.down();
}
else
{
- executeCallback(callback, sequence);
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.done();
+ flushCallbacks();
+ }
+ else
+ {
+ // System.out.println("Buffering callback");
+ pendingCallbacks.add(new CallbackHolder(sequence, callback));
+ }
}
// The buffer is not sent on callback for read operations
@@ -501,26 +484,6 @@
}
}
- /**
- * @param callback
- * @param sequence
- */
- private void executeCallback(final AIOCallback callback, final long sequence)
- {
- if (sequence == nextReadSequence)
- {
- nextReadSequence++;
- callback.done();
- pendingWrites.down();
- flushCallbacks();
- }
- else
- {
- // System.out.println("Buffering callback");
- pendingCallbacks.add(new CallbackHolder(sequence, callback));
- }
- }
-
private void flushCallbacks()
{
while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence == nextReadSequence)
@@ -535,7 +498,6 @@
{
holder.callback.done();
}
- pendingWrites.down();
nextReadSequence++;
}
}
@@ -588,8 +550,6 @@
}
}
- // Private ---------------------------------------------------------------------------
-
private void pollEvents()
{
if (!opened)
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -112,7 +112,5 @@
void perfBlast(int pages) throws Exception;
- void sync(IOCompletion callback);
-
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -80,9 +80,6 @@
void sync() throws Exception;
- /** This method will make sure the parameter callback will be invoked after all pending sync operations are done */
- void syncCallback(IOAsyncTask callback);
-
long size() throws Exception;
void renameTo(String newFileName) throws Exception;
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -304,14 +304,4 @@
throw new IllegalStateException("File not opened");
}
}
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.AbstractSequentialFile#syncCallbackDirect(org.hornetq.core.journal.IOAsyncTask)
- */
- @Override
- protected void syncCallbackDirect(IOAsyncTask callback)
- {
- aioFile.syncCallback(callback);
- }
-
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -16,6 +16,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.journal.IOAsyncTask;
@@ -189,30 +190,11 @@
write(bytes, false, DummyCallback.getInstance());
}
}
-
- /**
- * invoke the callback after all pending operations are complete.
- */
- public void syncCallback(IOAsyncTask callback)
- {
- if (timedBuffer != null)
- {
- timedBuffer.syncCallback(callback);
- }
- else
- {
- syncCallbackDirect(callback);
- }
- }
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
- protected abstract void syncCallbackDirect(IOAsyncTask callback);
-
protected File getFile()
{
return file;
@@ -270,22 +252,15 @@
{
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks)
{
- if (buffer == null)
+ buffer.flip();
+
+ if (buffer.limit() == 0)
{
- syncCallbackDirect(new DelegateCallback(callbacks));
+ factory.releaseBuffer(buffer);
}
else
{
- buffer.flip();
-
- if (buffer.limit() == 0)
- {
- factory.releaseBuffer(buffer);
- }
- else
- {
- writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
- }
+ writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
}
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -864,31 +864,7 @@
callback.waitCompletion();
}
}
-
- public void sync(IOCompletion callback)
- {
- callback.lineUp();
- compactingLock.readLock().lock();
-
- try
- {
- lockAppend.lock();
- try
- {
- currentFile.getFile().syncCallback(callback);
- }
- finally
- {
- lockAppend.unlock();
- }
- }
- finally
- {
- compactingLock.readLock().unlock();
- }
- }
-
public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -220,18 +220,6 @@
internalWrite(bytes, sync, null);
}
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.AbstractSequentialFile#syncCallbackDirect(org.hornetq.core.journal.IOAsyncTask)
- */
- @Override
- protected void syncCallbackDirect(IOAsyncTask callback)
- {
- // Nothing to be done on NIO.
- // Timed buffer took care of everything
- callback.done();
- }
-
/**
* @param bytes
* @param sync
@@ -255,5 +243,4 @@
callback.done();
}
}
-
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -225,30 +225,15 @@
return true;
}
}
-
- /**
- * This method will make sure this callback will be executed after all the pending callbacks
- */
- public synchronized void syncCallback(IOAsyncTask callback)
+ public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOAsyncTask callback)
{
- resumeTimerIfNeeded();
-
- callbacks.add(callback);
-
- pendingSync = true;
-
- if (flushOnSync)
+ if (buffer.writerIndex() == 0)
{
- flush();
+ // Resume latch
+ latchTimer.down();
}
- }
-
- public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOAsyncTask callback)
- {
- resumeTimerIfNeeded();
-
buffer.writeBytes(bytes);
callbacks.add(callback);
@@ -274,57 +259,35 @@
}
}
- private void resumeTimerIfNeeded()
- {
- if (buffer.writerIndex() == 0 && callbacks.size() == 0)
- {
- // Resume latch
- latchTimer.down();
- }
- }
-
public synchronized void flush()
{
- if (buffer.writerIndex() > 0 || callbacks.size() > 0)
+ if (buffer.writerIndex() > 0)
{
- // Stop latch
latchTimer.up();
-
- if (buffer.writerIndex() == 0 && callbacks.size() > 0)
+
+ int pos = buffer.writerIndex();
+
+ if (logRates)
{
- // This is to perform a sync callback.
- // When we get to here, means we have sync callbacks waiting with no buffer
- // on this case we need to call sync on the file to make sure no other callbacks are pending
- bufferObserver.flushBuffer(null, pendingSync, callbacks);
-
- callbacks = new LinkedList<IOAsyncTask>();
+ bytesFlushed += pos;
}
- else
- {
- int pos = buffer.writerIndex();
-
- if (logRates)
- {
- bytesFlushed += pos;
- }
-
- ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
-
- // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
- // Using directBuffer.put(buffer) would make several append calls for each byte
-
- directBuffer.put(buffer.array(), 0, pos);
-
- bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
-
- callbacks = new LinkedList<IOAsyncTask>();
-
- active = false;
- pendingSync = false;
-
- buffer.clear();
- bufferLimit = 0;
- }
+
+ ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
+
+ // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+ // Using directBuffer.put(buffer) would make several append calls for each byte
+
+ directBuffer.put(buffer.array(), 0, pos);
+
+ bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
+
+ callbacks = new LinkedList<IOAsyncTask>();
+
+ active = false;
+ pendingSync = false;
+
+ buffer.clear();
+ bufferLimit = 0;
}
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -161,6 +161,4 @@
void deleteGrouping(GroupBinding groupBinding) throws Exception;
-
- void sync();
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -515,11 +515,6 @@
messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getIOContext());
}
- public void sync()
- {
- messageJournal.sync(OperationContextImpl.getInstance());
- }
-
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
Deleted: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -1,154 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.persistence.impl.journal;
-
-import java.util.concurrent.Executor;
-
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.persistence.OperationContext;
-
-/**
- * A SyncOperation
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class SyncOperation implements OperationContext
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- OperationContext ctx;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SyncOperation (OperationContext ctx)
- {
- this.ctx = ctx;
- }
-
- // Public --------------------------------------------------------
-
- /**
- *
- * @see org.hornetq.core.persistence.OperationContext#complete()
- */
- public void complete()
- {
- ctx.complete();
- }
-
- /**
- *
- * @see org.hornetq.core.asyncio.AIOCallback#done()
- */
- public void done()
- {
- ctx.done();
- }
-
- /**
- * @param runnable
- * @see org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
- */
- public void executeOnCompletion(IOAsyncTask runnable)
- {
- ctx.executeOnCompletion(runnable);
- }
-
- /**
- * @return
- * @see org.hornetq.core.persistence.OperationContext#hasReplication()
- */
- public boolean hasReplication()
- {
- return ctx.hasReplication();
- }
-
- /**
- * @return
- * @see org.hornetq.core.persistence.OperationContext#isSync()
- */
- public boolean isSync()
- {
- return true;
- }
-
- /**
- *
- * @see org.hornetq.core.journal.IOCompletion#lineUp()
- */
- public void lineUp()
- {
- ctx.lineUp();
- }
-
- /**
- * @param errorCode
- * @param errorMessage
- * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
- */
- public void onError(int errorCode, String errorMessage)
- {
- ctx.onError(errorCode, errorMessage);
- }
-
- /**
- *
- * @see org.hornetq.core.persistence.OperationContext#replicationDone()
- */
- public void replicationDone()
- {
- ctx.replicationDone();
- }
-
- /**
- *
- * @see org.hornetq.core.persistence.OperationContext#replicationLineUp()
- */
- public void replicationLineUp()
- {
- ctx.replicationLineUp();
- }
-
- /**
- * @see org.hornetq.core.persistence.OperationContext#setExecutor(java.util.concurrent.Executor)
- */
- public void setExecutor(Executor executor)
- {
- ctx.setExecutor(executor);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.OperationContext#reattach()
- */
- public void reinstall()
- {
- OperationContextImpl.setInstance(this);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -29,7 +29,6 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
@@ -94,7 +93,6 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -416,11 +414,6 @@
packet = new ReplicationDeleteMessage();
break;
}
- case REPLICATION_SYNC:
- {
- packet = new ReplicationSyncContextMessage();
- break;
- }
case REPLICATION_DELETE_TX:
{
packet = new ReplicationDeleteTXMessage();
Modified: branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -172,8 +172,6 @@
public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
public static final byte REPLICATION_COMPARE_DATA = 92;
-
- public static final byte REPLICATION_SYNC = 93;
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Deleted: branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -1,80 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.remoting.impl.wireformat;
-
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-
-/**
- * Message sent when a Replication Context is complete without any persistence replicated.
- * On that case we need to go over the cluster to make sure we get the data sent at the right order.
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationSyncContextMessage extends PacketImpl
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ReplicationSyncContextMessage()
- {
- super(REPLICATION_SYNC);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public int getRequiredBufferSize()
- {
- return BASIC_PACKET_SIZE;
-
- }
-
- @Override
- public void encodeBody(final HornetQBuffer buffer)
- {
- }
-
- @Override
- public void decodeBody(final HornetQBuffer buffer)
- {
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -84,6 +84,4 @@
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
- void sync(OperationContext ctx);
-
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -25,7 +25,6 @@
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
@@ -434,15 +433,6 @@
localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#sync()
- */
- public void sync(IOCompletion ctx)
- {
- replicationManager.sync((OperationContext)ctx);
- localJournal.sync(ctx);
- }
-
/**
* @param committedRecords
* @param preparedTransactions
Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -17,7 +17,6 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -163,11 +162,6 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
- else if (packet.getType() == REPLICATION_SYNC)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-218
- // Nothing to be done, we just needed a round trip to process events in order
- }
else
{
log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -29,7 +29,6 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.persistence.impl.journal.SyncOperation;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
@@ -421,31 +420,6 @@
replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
}
-
- public void sync(OperationContext context)
- {
- boolean executeNow = false;
- synchronized (replicationLock)
- {
- context.replicationLineUp();
- if (pendingTokens.isEmpty())
- {
- // this means the list is empty and we should process it now
- executeNow = true;
- }
- else
- {
- // adding the sync to be executed in order
- // as soon as the reponses are back from the backup
- this.pendingTokens.add(new SyncOperation(context));
- }
- }
- if (executeNow)
- {
- context.replicationDone();
- }
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -492,7 +466,7 @@
ctx.replicationDone();
}
}
-
+
public OperationContext getContext()
{
return OperationContextImpl.getInstance();
Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -85,34 +85,25 @@
protected static class CountDownCallback implements AIOCallback
{
- private final CountDownLatch latchDone;
+ private final CountDownLatch latch;
- private final CountDownLatch waitCallback;
-
private final List<Integer> outputList;
private final int order;
private final AtomicInteger errors;
- public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order, final CountDownLatch waitCallback)
+ public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order)
{
- this.latchDone = latch;
+ this.latch = latch;
this.outputList = outputList;
this.order = order;
this.errors = errors;
-
- this.waitCallback = waitCallback;
}
- public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order)
- {
- this(latch, errors, outputList, order, null);
- }
-
volatile boolean doneCalled = false;
volatile int errorCalled = 0;
@@ -121,26 +112,15 @@
public void done()
{
- if (waitCallback != null)
- {
- try
- {
- waitCallback.await();
- }
- catch (Exception e)
- {
- e.printStackTrace(); // -> junit report
- }
- }
if (outputList != null)
{
outputList.add(order);
}
doneCalled = true;
timesDoneCalled.incrementAndGet();
- if (latchDone != null)
+ if (latch != null)
{
- latchDone.countDown();
+ latch.countDown();
}
}
@@ -155,11 +135,11 @@
{
errors.incrementAndGet();
}
- if (latchDone != null)
+ if (latch != null)
{
// even thought an error happened, we need to inform the latch,
// or the test won't finish
- latchDone.countDown();
+ latch.countDown();
}
}
Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -22,7 +22,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestSuite;
@@ -421,60 +420,6 @@
}
}
- public void testOrderOnSynCallback() throws Exception
- {
- boolean closed = false;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
- ByteBuffer buffer = null;
- try
- {
- final int NUMBER_LINES = 100;
- final int SIZE = 512;
-
- controller.open(FILE_NAME, 100);
-
- controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
-
- CountDownLatch latch = new CountDownLatch(NUMBER_LINES * 2);
-
- buffer = AsynchronousFileImpl.newBuffer(SIZE);
- buffer.rewind();
- for (int j = 0; j < SIZE; j++)
- {
- buffer.put((byte)(j % Byte.MAX_VALUE));
- }
-
- ArrayList<Integer> result = new ArrayList<Integer>();
-
- for (int i = 0; i < NUMBER_LINES * 2; i++)
- {
- CountDownCallback aio = new CountDownCallback(latch, null, result, i);
- if (i % 2 == 0)
- {
- controller.write(i * SIZE, SIZE, buffer, aio);
- }
- else
- {
- controller.syncCallback(aio);
- }
- }
-
- controller.close();
- closed = true;
-
- // We are not waiting the latch, as close should already hold for any writes
- CountDownCallback.checkResults(NUMBER_LINES * 2, result);
- }
- finally
- {
- AsynchronousFileImpl.destroyBuffer(buffer);
- if (!closed)
- {
- controller.close();
- }
- }
- }
-
public void testBufferCallbackAwaysSameBuffer() throws Exception
{
boolean closed = false;
Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -17,16 +17,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -44,9 +39,9 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
factory = createFactory();
-
+
factory.start();
}
@@ -54,13 +49,13 @@
protected void tearDown() throws Exception
{
assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
-
+
factory.stop();
-
+
factory = null;
-
+
forceGC();
-
+
super.tearDown();
}
@@ -177,139 +172,7 @@
sf2.close();
}
-
- public void testOrder() throws Exception
- {
- SequentialFile sf = factory.createSequentialFile("order-test.hq", 100);
-
- sf.open();
-
- factory.activateBuffer(sf);
-
- final int records = 5000;
-
- sf.fill(0, records * 1024, (byte)0);
-
-
- final ArrayList<Integer> result = new ArrayList<Integer>();
-
- final CountDownLatch latch = new CountDownLatch(records);
-
- HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(new byte[512]);
-
- for (int i = 0 ; i < records; i++)
- {
- final int toadd = i;
- IOAsyncTask callback = new IOAsyncTask()
- {
-
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- public void done()
- {
- result.add(toadd);
-
- latch.countDown();
- }
-
- };
-
- if (i % 2 == 0)
- {
- sf.disableAutoFlush();
- sf.fits(512);
- sf.write(buffer, false, callback);
- sf.enableAutoFlush();
- }
- else
- {
- sf.syncCallback(callback);
- }
- }
-
- assertTrue(latch.await(5, TimeUnit.SECONDS));
-
- assertEquals(records, result.size());
-
- int i = 0;
-
- for (Integer r : result)
- {
- assertEquals(i++, r.intValue());
- }
-
-
- factory.deactivateBuffer();
-
- sf.close();
- }
-
- public void testOrder2() throws Exception
- {
- SequentialFile sf = factory.createSequentialFile("order-test.hq", 100);
-
- sf.open();
-
- factory.activateBuffer(sf);
-
- final int records = 1000;
-
- sf.fill(0, records * 1024, (byte)0);
-
-
- final ArrayList<Integer> result = new ArrayList<Integer>();
-
- final CountDownLatch latch = new CountDownLatch(records);
-
- HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(new byte[512]);
-
- for (int i = 0 ; i < records; i++)
- {
- final int toadd = i;
- IOAsyncTask callback = new IOAsyncTask()
- {
-
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- public void done()
- {
- result.add(toadd);
-
- latch.countDown();
- }
-
- };
-
- if (i == 10)
- {
- sf.write(buffer, false, callback);
- }
- else
- {
- sf.syncCallback(callback);
- }
- }
-
- assertTrue(latch.await(5, TimeUnit.SECONDS));
-
- assertEquals(records, result.size());
-
- int i = 0;
-
- for (Integer r : result)
- {
- assertEquals(i++, r.intValue());
- }
-
- factory.deactivateBuffer();
-
- sf.close();
- }
-
+
public void testRename() throws Exception
{
SequentialFile sf = factory.createSequentialFile("test1.hq", 1);
@@ -321,7 +184,7 @@
assertEquals(1, fileNames.size());
assertTrue(fileNames.contains("test1.hq"));
-
+
sf.renameTo("test1.cmp");
fileNames = factory.listFiles("cmp");
@@ -341,7 +204,7 @@
assertEquals(0, fileNames.size());
}
-
+
public void testWriteandRead() throws Exception
{
SequentialFile sf = factory.createSequentialFile("write.hq", 1);
@@ -359,7 +222,7 @@
String s3 = "echidna";
byte[] bytes3 = s3.getBytes("UTF-8");
ByteBuffer bb3 = factory.wrapBuffer(bytes3);
-
+
long initialPos = sf.position();
sf.writeDirect(bb1, true);
long bytesWritten = sf.position() - initialPos;
@@ -442,6 +305,7 @@
sf.writeDirect(bb2, true);
bytesWritten = sf.position() - initialPos;
+
assertEquals(bb2.limit(), bytesWritten);
initialPos = sf.position();
@@ -518,9 +382,9 @@
try
{
-
+
bb1 = factory.wrapBuffer(bytes1);
-
+
sf.writeDirect(bb1, true);
fail("Should throw exception");
Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-21 23:08:15 UTC (rev 8365)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-21 23:29:50 UTC (rev 8366)
@@ -634,17 +634,10 @@
*/
public void setTimedBuffer(TimedBuffer buffer)
{
+ // TODO Auto-generated method stub
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.SequentialFile#syncCallback(org.hornetq.core.journal.IOAsyncTask)
- */
- public void syncCallback(IOAsyncTask callback)
- {
- callback.done();
- }
-
}
/* (non-Javadoc)
More information about the hornetq-commits
mailing list