JBoss hornetq SVN: r8281 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-13 13:57:52 -0500 (Fri, 13 Nov 2009)
New Revision: 8281
Removed:
trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
Log:
removed test added by accident in the previous commit
Deleted: trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java 2009-11-13 18:53:47 UTC (rev 8280)
+++ trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java 2009-11-13 18:57:52 UTC (rev 8281)
@@ -1,21 +0,0 @@
-package org.hornetq.tests.integration.client;
-
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.Interceptor;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-
-public class DropForceConsumerDeliveryInterceptor implements Interceptor
-{
- public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
- {
- if (packet.getType() == PacketImpl.SESS_FORCE_CONSUMER_DELIVERY)
- {
- return false;
- } else
- {
- return true;
- }
- }
-}
\ No newline at end of file
Deleted: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java 2009-11-13 18:53:47 UTC (rev 8280)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java 2009-11-13 18:57:52 UTC (rev 8281)
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.tests.integration.client;
-
-import org.hornetq.core.client.ClientConsumer;
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.HornetQ;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- */
-public class ReceiveImmediate2Test extends ServiceTestBase
-{
- private static final Logger log = Logger.getLogger(ReceiveImmediate2Test.class);
-
- private HornetQServer server;
-
- private final SimpleString QUEUE = new SimpleString("ReceiveImmediateTest.queue");
-
- private final SimpleString ADDRESS = new SimpleString("ReceiveImmediateTest.address");
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- Configuration config = createDefaultConfig(false);
- config.getInterceptorClassNames().add(DropForceConsumerDeliveryInterceptor.class.getName());
- server = HornetQ.newHornetQServer(config, false);
-
- server.start();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- server.stop();
-
- server = null;
-
- super.tearDown();
- }
-
- private ClientSessionFactory sf;
-
- public void testConsumerReceiveImmediateDoesNotHang() throws Exception
- {
- sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnAcknowledge(true);
- sf.setAckBatchSize(0);
- sf.setReconnectAttempts(1);
- sf.setFailoverOnServerShutdown(true);
-
- final ClientSession session = sf.createSession(false, true, false);
-
- session.createQueue(ADDRESS, QUEUE, null, false);
-
- final ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
- session.start();
-
- Runnable r = new Runnable()
- {
-
- public void run()
- {
- try
- {
- Thread.sleep(2000);
- //((ClientSessionInternal)session).getConnection().fail(new HornetQException(HornetQException.NOT_CONNECTED));
- //session.close();
- consumer.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
- long start = System.currentTimeMillis();
-
- new Thread(r).start();
-
- ClientMessage message = consumer.receiveImmediate();
- assertNull(message);
-
- long end = System.currentTimeMillis();
- assertTrue(end - start >= 2000);
- session.close();
-
- sf.close();
- }
-}
15 years, 1 month
JBoss hornetq SVN: r8280 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 8 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-13 13:53:47 -0500 (Fri, 13 Nov 2009)
New Revision: 8280
Added:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
Modified:
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
HORNETQ-218: Incorrect order when persistent and non-persistent messages are sent over replication
* in PostOffice.processRoute(), synchronize the storage manager if it is replicated when the message is non-persistent.if it is replicated to ensure order delivery between
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -147,4 +147,6 @@
void deleteGrouping(GroupBinding groupBinding) throws Exception;
+
+ void sync();
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -498,6 +498,14 @@
messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
}
+ public void sync()
+ {
+ if (replicator != null)
+ {
+ replicator.sync();
+ }
+ }
+
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
@@ -1901,5 +1909,7 @@
}
}
+
+
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -66,6 +66,11 @@
{
this.id = id;
}
+
+ public void sync()
+ {
+ // NO OP
+ }
public void addQueueBinding(final Binding queueBinding) throws Exception
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -780,6 +780,7 @@
// First send a reset message
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+// message.setDurable(true);
message.setBody(ChannelBuffers.EMPTY_BUFFER);
message.setDestination(queueName);
message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
@@ -913,6 +914,13 @@
}
}
}
+ else
+ {
+ if (storageManager.isReplicated())
+ {
+ storageManager.sync();
+ }
+ }
message.incrementRefCount(reference);
}
@@ -969,6 +977,7 @@
message.setBody(ChannelBuffers.EMPTY_BUFFER);
message.setDestination(queueName);
+// message.setDurable(true);
String uid = UUIDGenerator.getInstance().generateStringUUID();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -29,6 +29,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
@@ -93,6 +94,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -414,6 +416,11 @@
packet = new ReplicationDeleteMessage();
break;
}
+ case REPLICATION_SYNC:
+ {
+ packet = new ReplicationSyncContextMessage();
+ break;
+ }
case REPLICATION_DELETE_TX:
{
packet = new ReplicationDeleteTXMessage();
@@ -468,7 +475,7 @@
{
packet = new SessionForceConsumerDelivery();
break;
- }
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -172,6 +172,8 @@
public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
public static final byte REPLICATION_COMPARE_DATA = 92;
+
+ public static final byte REPLICATION_SYNC = 93;
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Added: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+
+/**
+ * Message sent when a Replication Context is complete without any persistence replicated.
+ * On that case we need to go over the cluster to make sure we get the data sent at the right order.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationSyncContextMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationSyncContextMessage()
+ {
+ super(REPLICATION_SYNC);
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE;
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -88,4 +88,6 @@
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
+ void sync();
+
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -17,6 +17,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -162,6 +163,11 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
+ else if (packet.getType() == REPLICATION_SYNC)
+ {
+ // https://jira.jboss.org/jira/browse/HORNETQ-218
+ // Nothing to be done, we just needed a round trip to process events in order
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -34,6 +34,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -277,6 +278,14 @@
sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
}
}
+
+ public void sync()
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationSyncContextMessage());
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long, byte[])
Added: trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/DropForceConsumerDeliveryInterceptor.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -0,0 +1,21 @@
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+
+public class DropForceConsumerDeliveryInterceptor implements Interceptor
+{
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_FORCE_CONSUMER_DELIVERY)
+ {
+ return false;
+ } else
+ {
+ return true;
+ }
+ }
+}
\ No newline at end of file
Added: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediate2Test.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class ReceiveImmediate2Test extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(ReceiveImmediate2Test.class);
+
+ private HornetQServer server;
+
+ private final SimpleString QUEUE = new SimpleString("ReceiveImmediateTest.queue");
+
+ private final SimpleString ADDRESS = new SimpleString("ReceiveImmediateTest.address");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration config = createDefaultConfig(false);
+ config.getInterceptorClassNames().add(DropForceConsumerDeliveryInterceptor.class.getName());
+ server = HornetQ.newHornetQServer(config, false);
+
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ private ClientSessionFactory sf;
+
+ public void testConsumerReceiveImmediateDoesNotHang() throws Exception
+ {
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ sf.setAckBatchSize(0);
+ sf.setReconnectAttempts(1);
+ sf.setFailoverOnServerShutdown(true);
+
+ final ClientSession session = sf.createSession(false, true, false);
+
+ session.createQueue(ADDRESS, QUEUE, null, false);
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+ session.start();
+
+ Runnable r = new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(2000);
+ //((ClientSessionInternal)session).getConnection().fail(new HornetQException(HornetQException.NOT_CONNECTED));
+ //session.close();
+ consumer.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ long start = System.currentTimeMillis();
+
+ new Thread(r).start();
+
+ ClientMessage message = consumer.receiveImmediate();
+ assertNull(message);
+
+ long end = System.currentTimeMillis();
+ assertTrue(end - start >= 2000);
+ session.close();
+
+ sf.close();
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-13 10:35:30 UTC (rev 8279)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-13 18:53:47 UTC (rev 8280)
@@ -978,6 +978,10 @@
{
// To change body of implemented methods use File | Settings | File Templates.
}
+
+ public void sync()
+ {
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
15 years, 1 month
JBoss hornetq SVN: r8279 - trunk/tests/src/org/hornetq/tests/integration/jms/connection.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-13 05:35:30 -0500 (Fri, 13 Nov 2009)
New Revision: 8279
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/ExceptionListenerTest.java
Log:
fixed timing issue on test
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/ExceptionListenerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/ExceptionListenerTest.java 2009-11-13 02:05:29 UTC (rev 8278)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/ExceptionListenerTest.java 2009-11-13 10:35:30 UTC (rev 8279)
@@ -31,6 +31,9 @@
import org.hornetq.tests.integration.jms.server.management.NullInitialContext;
import org.hornetq.tests.util.UnitTestCase;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
*
* A ExceptionListenerTest
@@ -98,24 +101,34 @@
private class MyExceptionListener implements ExceptionListener
{
volatile int numCalls;
-
+
+ private CountDownLatch latch;
+
+ public MyExceptionListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
public synchronized void onException(JMSException arg0)
{
numCalls++;
+ latch.countDown();
}
}
public void testListenerCalledForOneConnection() throws Exception
{
Connection conn = cf.createConnection();
+ CountDownLatch latch = new CountDownLatch(1);
+ MyExceptionListener listener = new MyExceptionListener(latch);
- MyExceptionListener listener = new MyExceptionListener();
-
conn.setExceptionListener(listener);
ClientSessionInternal coreSession = (ClientSessionInternal)((HornetQConnection)conn).getInitialSession();
coreSession.getConnection().fail(new HornetQException(HornetQException.INTERNAL_ERROR, "blah"));
+
+ latch.await(5, TimeUnit.SECONDS);
assertEquals(1, listener.numCalls);
@@ -125,9 +138,10 @@
public void testListenerCalledForOneConnectionAndSessions() throws Exception
{
Connection conn = cf.createConnection();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ MyExceptionListener listener = new MyExceptionListener(latch);
- MyExceptionListener listener = new MyExceptionListener();
-
conn.setExceptionListener(listener);
Session sess1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -151,7 +165,8 @@
coreSession2.getConnection().fail(new HornetQException(HornetQException.INTERNAL_ERROR, "blah"));
coreSession3.getConnection().fail(new HornetQException(HornetQException.INTERNAL_ERROR, "blah"));
-
+
+ latch.await(5, TimeUnit.SECONDS);
//Listener should only be called once even if all sessions connections die
assertEquals(1, listener.numCalls);
15 years, 1 month
JBoss hornetq SVN: r8278 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-12 21:05:29 -0500 (Thu, 12 Nov 2009)
New Revision: 8278
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-214 - performance tweaks
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-12 21:08:49 UTC (rev 8277)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-13 02:05:29 UTC (rev 8278)
@@ -55,9 +55,6 @@
* This is the class returned to the factory when the file is being activated. */
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
-
-
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -66,7 +63,7 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(String directory, File file, SequentialFileFactory factory)
+ public AbstractSequentialFile(final String directory, final File file, final SequentialFileFactory factory)
{
super();
this.file = file;
@@ -86,7 +83,6 @@
return file.getName();
}
-
public final void delete() throws Exception
{
if (isOpen())
@@ -107,12 +103,10 @@
return position.get();
}
-
public final void renameTo(final String newFileName) throws Exception
{
close();
File newFile = new File(directory + "/" + newFileName);
-
if (!file.equals(newFile))
{
@@ -120,13 +114,12 @@
file = newFile;
}
}
-
- public final boolean fits(int size)
+ public final boolean fits(final int size)
{
if (timedBuffer == null)
{
- return this.position.get() + size <= fileSize;
+ return position.get() + size <= fileSize;
}
else
{
@@ -150,22 +143,22 @@
}
}
- public void setTimedBuffer(TimedBuffer buffer)
+ public void setTimedBuffer(final TimedBuffer buffer)
{
if (timedBuffer != null)
{
timedBuffer.setObserver(null);
}
- this.timedBuffer = buffer;
+ timedBuffer = buffer;
if (buffer != null)
{
- buffer.setObserver(this.timedBufferObserver);
+ buffer.setObserver(timedBufferObserver);
}
}
-
+
public void write(final HornetQBuffer bytes, final boolean sync, final IOCompletion callback) throws Exception
{
if (timedBuffer != null)
@@ -196,8 +189,6 @@
write(bytes, false, DummyCallback.getInstance());
}
}
-
-
// Package protected ---------------------------------------------
@@ -208,7 +199,6 @@
return file;
}
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -217,7 +207,7 @@
{
final List<IOCompletion> delegates;
- DelegateCallback(List<IOCompletion> delegates)
+ DelegateCallback(final List<IOCompletion> delegates)
{
this.delegates = delegates;
}
@@ -237,7 +227,7 @@
}
}
- public void onError(int errorCode, String errorMessage)
+ public void onError(final int errorCode, final String errorMessage)
{
for (IOCompletion callback : delegates)
{
@@ -259,7 +249,7 @@
protected class LocalBufferObserver implements TimedBufferObserver
{
- public void flushBuffer(ByteBuffer buffer, List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCompletion> callbacks)
{
buffer.flip();
@@ -269,7 +259,7 @@
}
else
{
- writeDirect(buffer, true, new DelegateCallback(callbacks));
+ writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
}
}
@@ -295,6 +285,7 @@
}
}
+ @Override
public String toString()
{
return "TimedBufferObserver on file (" + getFile().getName() + ")";
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-12 21:08:49 UTC (rev 8277)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-13 02:05:29 UTC (rev 8278)
@@ -241,18 +241,15 @@
if (sync)
{
+ if (!pendingSync)
+ {
+ pendingSync = true;
+ }
+
if (flushOnSync)
{
flush();
}
- else
- {
- // We should flush on the next timeout, no matter what other activity happens on the buffer
- if (!pendingSync)
- {
- pendingSync = true;
- }
- }
}
if (buffer.writerIndex() == bufferLimit)
@@ -281,7 +278,7 @@
directBuffer.put(buffer.array(), 0, pos);
- bufferObserver.flushBuffer(directBuffer, callbacks);
+ bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
callbacks = new ArrayList<IOCompletion>();
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-12 21:08:49 UTC (rev 8277)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-13 02:05:29 UTC (rev 8278)
@@ -39,7 +39,7 @@
// Public --------------------------------------------------------
- public void flushBuffer(ByteBuffer buffer, List<IOCompletion> callbacks);
+ public void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOCompletion> callbacks);
/** Return the number of remaining bytes that still fit on the observer (file) */
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-12 21:08:49 UTC (rev 8277)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-13 02:05:29 UTC (rev 8278)
@@ -226,7 +226,7 @@
log.info("AIO journal selected");
if (!AIOSequentialFileFactory.isSupported())
{
- log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. " + "If your platform is Linux, install LibAIO to enable the AIO journal");
+ log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO journal");
journalFF = new NIOSequentialFileFactory(journalDir,
true,
config.getJournalBufferSize(),
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-12 21:08:49 UTC (rev 8277)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-13 02:05:29 UTC (rev 8278)
@@ -64,7 +64,7 @@
final AtomicInteger flushTimes = new AtomicInteger(0);
class TestObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCompletion> callbacks)
{
buffers.add(buffer);
flushTimes.incrementAndGet();
15 years, 1 month
JBoss hornetq SVN: r8277 - in trunk: examples/core/perf/server0 and 10 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-12 16:08:49 -0500 (Thu, 12 Nov 2009)
New Revision: 8277
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/perf-tuning.xml
trunk/docs/user-manual/en/persistence.xml
trunk/examples/core/perf/server0/hornetq-configuration.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/config/ConfigurationTest-full-config.xml
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-214 - Documentation and configuration changes
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-11-12 21:08:49 UTC (rev 8277)
@@ -115,25 +115,25 @@
</row>
<row>
<entry><link
- linkend="configuring.message.journal.journal-aio-buffer-size"
- >journal-aio-buffer-size</link></entry>
+ linkend="configuring.message.journal.journal-buffer-size"
+ >journal-buffer-size</link></entry>
<entry>Long</entry>
- <entry>The size of the internal buffer on AIO.</entry>
+ <entry>The size of the internal buffer on the journal.</entry>
<entry>128 KiB</entry>
</row>
<row>
<entry><link
- linkend="configuring.message.journal.journal-aio-buffer-timeout"
- >journal-aio-buffer-timeout</link></entry>
+ linkend="configuring.message.journal.journal-buffer-timeout"
+ >journal-buffer-timeout</link></entry>
<entry>Long</entry>
<entry>The timeout (in nanoseconds) used to flush internal
- buffers.</entry>
+ buffers on the journal.</entry>
<entry>20000</entry>
</row>
<row>
<entry><link
- linkend="configuring.message.journal.journal-aio-flush-on-sync"
- >journal-aio-flush-on-sync</link></entry>
+ linkend="configuring.message.journal.journal-flush-on-sync"
+ >journal-flush-on-sync</link></entry>
<entry>Boolean</entry>
<entry>If true, transactions will ignore timeouts and be persisted
immediately</entry>
Modified: trunk/docs/user-manual/en/perf-tuning.xml
===================================================================
--- trunk/docs/user-manual/en/perf-tuning.xml 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/docs/user-manual/en/perf-tuning.xml 2009-11-12 21:08:49 UTC (rev 8277)
@@ -49,13 +49,13 @@
will scale better than Java NIO.</para>
</listitem>
<listitem>
- <para><literal>journal-aio-flush-on-sync</literal>. If you don't have many producers
- in your system you may consider setting journal-aio-flush-on-sync to true.
+ <para><literal>journal-flush-on-sync</literal>. If you don't have many producers
+ in your system you may consider setting journal-flush-on-sync to true.
HornetQ by default is optimized by the case where you have many producers. We
try to combine multiple writes in a single OS operation. However if that's not
your case setting this option to true will give you a performance boost.</para>
<para>On the other hand when you have multiple producers, keeping <literal
- >journal-aio-flush-on-sync</literal> set to false. This will make your
+ >journal-flush-on-sync</literal> set to false. This will make your
system flush multiple syncs in a single OS call making your system scale much
better.</para>
</listitem>
Modified: trunk/docs/user-manual/en/persistence.xml
===================================================================
--- trunk/docs/user-manual/en/persistence.xml 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/docs/user-manual/en/persistence.xml 2009-11-12 21:08:49 UTC (rev 8277)
@@ -195,8 +195,8 @@
at the OS level (/proc/sys/fs/aio-max-nr) usually at 65536.</para>
<para>The default value for this is <literal>500</literal>. </para>
</listitem>
- <listitem id="configuring.message.journal.journal-aio-buffer-timeout">
- <para><literal>journal-aio-buffer-timeout</literal></para>
+ <listitem id="configuring.message.journal.journal-buffer-timeout">
+ <para><literal>journal-buffer-timeout</literal></para>
<para>Flush period on the internal AIO timed buffer, configured in nano seconds. For
performance reasons we buffer data before submitting it to the kernel in a
single batch. This parameter determines the maximum amount of time to wait
@@ -205,8 +205,8 @@
<para>The default value for this paramater is <literal>20000</literal> nano seconds
(i.e. 20 microseconds). </para>
</listitem>
- <listitem id="configuring.message.journal.journal-aio-flush-on-sync">
- <para><literal>journal-aio-flush-on-sync</literal></para>
+ <listitem id="configuring.message.journal.journal-flush-on-sync">
+ <para><literal>journal-flush-on-sync</literal></para>
<para>If this is set to true, the internal buffers are flushed right away when a
sync request is performed. Sync requests are performed on transactions if
<literal>journal-sync-transactional</literal> is true, or on sending regular
@@ -220,8 +220,8 @@
flush-on-sync.</para>
<para>The default value for this parameter is <literal>false</literal>. </para>
</listitem>
- <listitem id="configuring.message.journal.journal-aio-buffer-size">
- <para><literal>journal-aio-buffer-size</literal></para>
+ <listitem id="configuring.message.journal.journal-buffer-size">
+ <para><literal>journal-buffer-size</literal></para>
<para>The size of the timed buffer on AIO. The default value is <literal
>128KiB</literal>.</para>
</listitem>
Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-11-12 21:08:49 UTC (rev 8277)
@@ -21,7 +21,7 @@
<journal-sync-transactional>false</journal-sync-transactional>
<journal-type>ASYNCIO</journal-type>
<journal-min-files>20</journal-min-files>
- <journal-aio-buffer-timeout>20000</journal-aio-buffer-timeout>
+ <journal-buffer-timeout>20000</journal-buffer-timeout>
<log-journal-write-rate>false</log-journal-write-rate>
<run-sync-speed-test>false</run-sync-speed-test>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-12 21:08:49 UTC (rev 8277)
@@ -148,11 +148,11 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-type" type="journalType">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="journal-aio-buffer-timeout" type="xsd:long">
+ <xsd:element maxOccurs="1" minOccurs="0" name="journal-buffer-timeout" type="xsd:long">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="journal-aio-buffer-size" type="xsd:long">
+ <xsd:element maxOccurs="1" minOccurs="0" name="journal-buffer-size" type="xsd:long">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="journal-aio-flush-on-sync" type="xsd:boolean">
+ <xsd:element maxOccurs="1" minOccurs="0" name="journal-flush-on-sync" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-sync-transactional" type="xsd:boolean">
</xsd:element>
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -228,17 +228,17 @@
void setJournalMaxAIO(int maxAIO);
- void setAIOBufferSize(int size);
+ void setJournalBufferSize(int size);
- int getAIOBufferSize();
+ int getJournalBufferSize();
- void setAIOBufferTimeout(int timeout);
+ void setJournalBufferTimeout(int timeout);
- int getAIOBufferTimeout();
+ int getJournalBufferTimeout();
- void setAIOFlushOnSync(boolean flush);
+ void setJournalFlushOnSync(boolean flush);
- boolean isAIOFlushOnSync();
+ boolean isJournalFlushOnSync();
boolean isCreateBindingsDir();
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -102,11 +102,11 @@
public static final int DEFAULT_JOURNAL_MAX_AIO = 500;
- public static final boolean DEFAULT_JOURNAL_AIO_FLUSH_SYNC = false;
+ public static final boolean DEFAULT_JOURNAL_FLUSH_SYNC = false;
- public static final int DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 20000;
+ public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT = 20000;
- public static final int DEFAULT_JOURNAL_AIO_BUFFER_SIZE = 128 * 1024;
+ public static final int DEFAULT_JOURNAL_BUFFER_SIZE = 128 * 1024;
public static final boolean DEFAULT_JOURNAL_LOG_WRITE_RATE = false;
@@ -268,11 +268,11 @@
protected int journalMaxAIO = DEFAULT_JOURNAL_MAX_AIO;
- protected boolean journalAIOFlushSync = DEFAULT_JOURNAL_AIO_FLUSH_SYNC;
+ protected boolean journalFlushSync = DEFAULT_JOURNAL_FLUSH_SYNC;
- protected int journalAIOBufferTimeout = DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT;
+ protected int journalBufferTimeout = DEFAULT_JOURNAL_BUFFER_TIMEOUT;
- protected int journalAIOBufferSize = DEFAULT_JOURNAL_AIO_BUFFER_SIZE;
+ protected int journalBufferSize = DEFAULT_JOURNAL_BUFFER_SIZE;
protected boolean logJournalWriteRate = DEFAULT_JOURNAL_LOG_WRITE_RATE;
@@ -815,34 +815,34 @@
jmxDomain = domain;
}
- public void setAIOBufferTimeout(int timeout)
+ public void setJournalBufferTimeout(int timeout)
{
- this.journalAIOBufferTimeout = timeout;
+ this.journalBufferTimeout = timeout;
}
- public int getAIOBufferTimeout()
+ public int getJournalBufferTimeout()
{
- return journalAIOBufferTimeout;
+ return journalBufferTimeout;
}
- public void setAIOFlushOnSync(boolean flush)
+ public void setJournalFlushOnSync(boolean flush)
{
- journalAIOFlushSync = flush;
+ journalFlushSync = flush;
}
- public boolean isAIOFlushOnSync()
+ public boolean isJournalFlushOnSync()
{
- return journalAIOFlushSync;
+ return journalFlushSync;
}
- public int getAIOBufferSize()
+ public int getJournalBufferSize()
{
- return journalAIOBufferSize;
+ return journalBufferSize;
}
- public void setAIOBufferSize(int size)
+ public void setJournalBufferSize(int size)
{
- this.journalAIOBufferSize = size;
+ this.journalBufferSize = size;
}
public String getLargeMessagesDirectory()
@@ -983,11 +983,11 @@
return false;
if (jmxManagementEnabled != other.jmxManagementEnabled)
return false;
- if (journalAIOBufferSize != other.journalAIOBufferSize)
+ if (journalBufferSize != other.journalBufferSize)
return false;
- if (journalAIOBufferTimeout != other.journalAIOBufferTimeout)
+ if (journalBufferTimeout != other.journalBufferTimeout)
return false;
- if (journalAIOFlushSync != other.journalAIOFlushSync)
+ if (journalFlushSync != other.journalFlushSync)
return false;
if (journalCompactMinFiles != other.journalCompactMinFiles)
return false;
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -335,11 +335,11 @@
journalFileSize = getInteger(e, "journal-file-size", journalFileSize, GT_ZERO);
- journalAIOFlushSync = getBoolean(e, "journal-aio-flush-on-sync", DEFAULT_JOURNAL_AIO_FLUSH_SYNC);
+ journalFlushSync = getBoolean(e, "journal-flush-on-sync", DEFAULT_JOURNAL_FLUSH_SYNC);
- journalAIOBufferTimeout = getInteger(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT, GT_ZERO);
+ journalBufferTimeout = getInteger(e, "journal-buffer-timeout", DEFAULT_JOURNAL_BUFFER_TIMEOUT, GT_ZERO);
- journalAIOBufferSize = getInteger(e, "journal-aio-buffer-size", DEFAULT_JOURNAL_AIO_BUFFER_SIZE, GT_ZERO);
+ journalBufferSize = getInteger(e, "journal-buffer-size", DEFAULT_JOURNAL_BUFFER_SIZE, GT_ZERO);
journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles, GT_ZERO);
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -63,9 +63,9 @@
public AIOSequentialFileFactory(final String journalDir)
{
this(journalDir,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
+ ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
false);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -37,9 +37,9 @@
{
this(journalDir,
false,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
+ ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
false);
}
@@ -47,9 +47,9 @@
{
this(journalDir,
buffered,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
+ ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
false);
}
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -151,12 +151,12 @@
public int getAIOBufferSize()
{
- return configuration.getAIOBufferSize();
+ return configuration.getJournalBufferSize();
}
public int getAIOBufferTimeout()
{
- return configuration.getAIOBufferTimeout();
+ return configuration.getJournalBufferTimeout();
}
public String getJournalDirectory()
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -229,17 +229,17 @@
log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. " + "If your platform is Linux, install LibAIO to enable the AIO journal");
journalFF = new NIOSequentialFileFactory(journalDir,
true,
- config.getAIOBufferSize(),
- config.getAIOBufferTimeout(),
- config.isAIOFlushOnSync(),
+ config.getJournalBufferSize(),
+ config.getJournalBufferTimeout(),
+ config.isJournalFlushOnSync(),
config.isLogJournalWriteRate());
}
else
{
journalFF = new AIOSequentialFileFactory(journalDir,
- config.getAIOBufferSize(),
- config.getAIOBufferTimeout(),
- config.isAIOFlushOnSync(),
+ config.getJournalBufferSize(),
+ config.getJournalBufferTimeout(),
+ config.isJournalFlushOnSync(),
config.isLogJournalWriteRate());
log.info("AIO loaded successfully");
}
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2009-11-12 21:08:49 UTC (rev 8277)
@@ -37,11 +37,11 @@
<journal-directory>somedir2</journal-directory>
<create-journal-dir>false</create-journal-dir>
<journal-type>NIO</journal-type>
- <journal-aio-flush-on-sync>true</journal-aio-flush-on-sync>
<journal-compact-min-files>123</journal-compact-min-files>
<journal-compact-percentage>33</journal-compact-percentage>
- <journal-aio-buffer-timeout>1000</journal-aio-buffer-timeout>
- <journal-aio-buffer-size>10000</journal-aio-buffer-size>
+ <journal-flush-on-sync>true</journal-flush-on-sync>
+ <journal-buffer-timeout>1000</journal-buffer-timeout>
+ <journal-buffer-size>10000</journal-buffer-size>
<journal-sync-transactional>false</journal-sync-transactional>
<journal-sync-non-transactional>true</journal-sync-non-transactional>
<journal-file-size>12345678</journal-file-size>
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -63,7 +63,7 @@
file.mkdir();
return new AIOSequentialFileFactory(getTestDir(),
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
1000000,
true,
false
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -70,7 +70,7 @@
file.mkdir();
return new AIOSequentialFileFactory(getTestDir(),
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
1000000,
true,
false
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -388,9 +388,9 @@
if (factoryType.equals("aio"))
{
return new AIOSequentialFileFactory(directory,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
+ ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
false);
}
else
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -107,8 +107,8 @@
assertEquals(conf.getJournalFileSize(), serverControl.getJournalFileSize());
assertEquals(conf.getJournalMinFiles(), serverControl.getJournalMinFiles());
assertEquals(conf.getJournalMaxAIO(), serverControl.getJournalMaxAIO());
- assertEquals(conf.getAIOBufferSize(), serverControl.getAIOBufferSize());
- assertEquals(conf.getAIOBufferTimeout(), serverControl.getAIOBufferTimeout());
+ assertEquals(conf.getJournalBufferSize(), serverControl.getAIOBufferSize());
+ assertEquals(conf.getJournalBufferTimeout(), serverControl.getAIOBufferTimeout());
assertEquals(conf.isCreateBindingsDir(), serverControl.isCreateBindingsDir());
assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());
assertEquals(conf.getPagingDirectory(), serverControl.getPagingDirectory());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -82,8 +82,8 @@
assertEquals(ConfigurationImpl.DEFAULT_PAGING_DIR, conf.getPagingDirectory());
assertEquals(ConfigurationImpl.DEFAULT_LARGE_MESSAGES_DIR, conf.getLargeMessagesDirectory());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE, conf.getJournalCompactPercentage());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC, conf.isAIOFlushOnSync());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT, conf.getAIOBufferTimeout());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC, conf.isJournalFlushOnSync());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT, conf.getJournalBufferTimeout());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_LOG_WRITE_RATE, conf.isLogJournalWriteRate());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_PERF_BLAST_PAGES, conf.getJournalPerfBlastPages());
assertEquals(ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_ENABLED, conf.isMessageCounterEnabled());
@@ -244,16 +244,16 @@
assertEquals(i, conf.getJournalCompactPercentage());
i = randomInt();
- conf.setAIOBufferSize(i);
- assertEquals(i, conf.getAIOBufferSize());
+ conf.setJournalBufferSize(i);
+ assertEquals(i, conf.getJournalBufferSize());
i = randomInt();
- conf.setAIOBufferTimeout(i);
- assertEquals(i, conf.getAIOBufferTimeout());
+ conf.setJournalBufferTimeout(i);
+ assertEquals(i, conf.getJournalBufferTimeout());
b = randomBoolean();
- conf.setAIOFlushOnSync(b);
- assertEquals(b, conf.isAIOFlushOnSync());
+ conf.setJournalFlushOnSync(b);
+ assertEquals(b, conf.isJournalFlushOnSync());
b = randomBoolean();
conf.setLogJournalWriteRate(b);
@@ -461,16 +461,16 @@
assertEquals(i, conf.getJournalCompactPercentage());
i = randomInt();
- conf.setAIOBufferSize(i);
- assertEquals(i, conf.getAIOBufferSize());
+ conf.setJournalBufferSize(i);
+ assertEquals(i, conf.getJournalBufferSize());
i = randomInt();
- conf.setAIOBufferTimeout(i);
- assertEquals(i, conf.getAIOBufferTimeout());
+ conf.setJournalBufferTimeout(i);
+ assertEquals(i, conf.getJournalBufferTimeout());
b = randomBoolean();
- conf.setAIOFlushOnSync(b);
- assertEquals(b, conf.isAIOFlushOnSync());
+ conf.setJournalFlushOnSync(b);
+ assertEquals(b, conf.isJournalFlushOnSync());
b = randomBoolean();
conf.setLogJournalWriteRate(b);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -107,9 +107,9 @@
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT, conf.getAIOBufferTimeout());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT, conf.getJournalBufferTimeout());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE, conf.getAIOBufferSize());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE, conf.getJournalBufferSize());
assertEquals(ConfigurationImpl.DEFAULT_CREATE_BINDINGS_DIR, conf.isCreateBindingsDir());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-11-12 16:13:22 UTC (rev 8276)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-11-12 21:08:49 UTC (rev 8277)
@@ -70,9 +70,9 @@
assertEquals("somedir2", conf.getJournalDirectory());
assertEquals(false, conf.isCreateJournalDir());
assertEquals(JournalType.NIO, conf.getJournalType());
- assertEquals(10000, conf.getAIOBufferSize());
- assertEquals(true, conf.isAIOFlushOnSync());
- assertEquals(1000, conf.getAIOBufferTimeout());
+ assertEquals(10000, conf.getJournalBufferSize());
+ assertEquals(true, conf.isJournalFlushOnSync());
+ assertEquals(1000, conf.getJournalBufferTimeout());
assertEquals(false, conf.isJournalSyncTransactional());
assertEquals(true, conf.isJournalSyncNonTransactional());
assertEquals(12345678, conf.getJournalFileSize());
15 years, 1 month
JBoss hornetq SVN: r8276 - in trunk: src/main/org/hornetq/core/journal and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-12 11:13:22 -0500 (Thu, 12 Nov 2009)
New Revision: 8276
Added:
trunk/src/main/org/hornetq/core/journal/IOCompletion.java
Removed:
trunk/src/main/org/hornetq/core/journal/IOCallback.java
Modified:
trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Simply renaming journal.IOCallback to IOCompletion to better distinct it from AIOCallback
Modified: trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -30,7 +30,7 @@
*
* Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error
* @param fileName
- * @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
+ * @param maxIO The number of max concurrent asynchronous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
* @throws HornetQException
*/
void open(String fileName, int maxIO) throws HornetQException;
Deleted: trunk/src/main/org/hornetq/core/journal/IOCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/IOCallback.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/IOCallback.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal;
-
-import org.hornetq.core.asyncio.AIOCallback;
-
-/**
- *
- * This class is just a direct extension of AIOCallback.
- * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the journal.
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
- */
-public interface IOCallback extends AIOCallback
-{
- void waitCompletion() throws Exception;
-}
Copied: trunk/src/main/org/hornetq/core/journal/IOCompletion.java (from rev 8274, trunk/src/main/org/hornetq/core/journal/IOCallback.java)
===================================================================
--- trunk/src/main/org/hornetq/core/journal/IOCompletion.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+import org.hornetq.core.asyncio.AIOCallback;
+
+/**
+ *
+ * This class is just a direct extension of AIOCallback.
+ * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the journal.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface IOCompletion extends AIOCallback
+{
+ void waitCompletion() throws Exception;
+}
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -56,17 +56,17 @@
void delete() throws Exception;
- void write(HornetQBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+ void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws Exception;
void write(HornetQBuffer bytes, boolean sync) throws Exception;
/** Write directly to the file without using any buffer */
- void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback);
+ void writeDirect(ByteBuffer bytes, boolean sync, IOCompletion callback);
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
- int read(ByteBuffer bytes, IOCallback callback) throws Exception;
+ int read(ByteBuffer bytes, IOCompletion callback) throws Exception;
int read(ByteBuffer bytes) throws Exception;
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -25,7 +25,7 @@
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
@@ -214,7 +214,7 @@
aioFile.setBufferCallback(callback);
}
- public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
{
int bytesToRead = bytes.limit();
@@ -229,7 +229,7 @@
public int read(final ByteBuffer bytes) throws Exception
{
- IOCallback waitCompletion = SimpleWaitIOCallback.getInstance();
+ IOCompletion waitCompletion = SimpleWaitIOCallback.getInstance();
int bytesRead = read(bytes, waitCompletion);
@@ -281,7 +281,7 @@
{
if (sync)
{
- IOCallback completion = SimpleWaitIOCallback.getInstance();
+ IOCompletion completion = SimpleWaitIOCallback.getInstance();
writeDirect(bytes, true, completion);
@@ -298,7 +298,7 @@
*
* @param sync Not used on AIO
* */
- public void writeDirect(final ByteBuffer bytes, final boolean sync, IOCallback callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, IOCompletion callback)
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -18,7 +18,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -166,7 +166,7 @@
}
- public void write(final HornetQBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+ public void write(final HornetQBuffer bytes, final boolean sync, final IOCompletion callback) throws Exception
{
if (timedBuffer != null)
{
@@ -185,7 +185,7 @@
{
if (sync)
{
- IOCallback completion = SimpleWaitIOCallback.getInstance();
+ IOCompletion completion = SimpleWaitIOCallback.getInstance();
write(bytes, true, completion);
@@ -213,18 +213,18 @@
// Inner classes -------------------------------------------------
- protected static class DelegateCallback implements IOCallback
+ protected static class DelegateCallback implements IOCompletion
{
- final List<IOCallback> delegates;
+ final List<IOCompletion> delegates;
- DelegateCallback(List<IOCallback> delegates)
+ DelegateCallback(List<IOCompletion> delegates)
{
this.delegates = delegates;
}
public void done()
{
- for (IOCallback callback : delegates)
+ for (IOCompletion callback : delegates)
{
try
{
@@ -239,7 +239,7 @@
public void onError(int errorCode, String errorMessage)
{
- for (IOCallback callback : delegates)
+ for (IOCompletion callback : delegates)
{
try
{
@@ -259,7 +259,7 @@
protected class LocalBufferObserver implements TimedBufferObserver
{
- public void flushBuffer(ByteBuffer buffer, List<IOCallback> callbacks)
+ public void flushBuffer(ByteBuffer buffer, List<IOCompletion> callbacks)
{
buffer.flip();
Modified: trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -14,7 +14,7 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -24,13 +24,13 @@
*
*
*/
-public class DummyCallback implements IOCallback
+public class DummyCallback implements IOCompletion
{
private static DummyCallback instance = new DummyCallback();
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
- public static IOCallback getInstance()
+ public static IOCompletion getInstance()
{
return instance;
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -44,7 +44,7 @@
import org.hornetq.core.buffers.ChannelBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -852,7 +852,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- IOCallback callback = null;
+ IOCompletion callback = null;
compactingLock.readLock().lock();
@@ -901,7 +901,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- IOCallback callback = null;
+ IOCompletion callback = null;
compactingLock.readLock().lock();
@@ -967,7 +967,7 @@
compactingLock.readLock().lock();
- IOCallback callback = null;
+ IOCompletion callback = null;
try
{
@@ -2833,7 +2833,7 @@
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
- IOCallback callback) throws Exception
+ IOCompletion callback) throws Exception
{
try
{
@@ -3233,7 +3233,7 @@
return tx;
}
- private IOCallback getSyncCallback(final boolean sync)
+ private IOCompletion getSyncCallback(final boolean sync)
{
if (fileFactory.isSupportsCallbacks())
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -19,7 +19,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -133,7 +133,7 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
{
try
{
@@ -197,7 +197,7 @@
return new NIOSequentialFile(factory, getFile());
}
- public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCompletion callback)
{
if (callback == null)
{
@@ -226,7 +226,7 @@
* @throws IOException
* @throws Exception
*/
- private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+ private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCompletion callback) throws Exception
{
position.addAndGet(bytes.limit());
Modified: trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -16,7 +16,7 @@
import java.util.concurrent.CountDownLatch;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -26,7 +26,7 @@
*
*
*/
-public class SimpleWaitIOCallback implements IOCallback
+public class SimpleWaitIOCallback implements IOCompletion
{
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
@@ -37,7 +37,7 @@
private volatile int errorCode = 0;
- public static IOCallback getInstance()
+ public static IOCompletion getInstance()
{
return new SimpleWaitIOCallback();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -22,7 +22,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.VariableLatch;
@@ -56,7 +56,7 @@
private int bufferLimit = 0;
- private List<IOCallback> callbacks;
+ private List<IOCompletion> callbacks;
private final Lock lock = new ReentrantReadWriteLock().writeLock();
@@ -106,7 +106,7 @@
buffer.clear();
bufferLimit = 0;
- callbacks = new ArrayList<IOCallback>();
+ callbacks = new ArrayList<IOCompletion>();
this.flushOnSync = flushOnSync;
latchTimer.up();
this.timeout = timeout;
@@ -225,7 +225,7 @@
}
}
- public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOCallback callback)
+ public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOCompletion callback)
{
if (buffer.writerIndex() == 0)
{
@@ -283,7 +283,7 @@
bufferObserver.flushBuffer(directBuffer, callbacks);
- callbacks = new ArrayList<IOCallback>();
+ callbacks = new ArrayList<IOCompletion>();
active = false;
pendingSync = false;
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -17,7 +17,7 @@
import java.nio.ByteBuffer;
import java.util.List;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
/**
* A TimedBufferObserver
@@ -39,7 +39,7 @@
// Public --------------------------------------------------------
- public void flushBuffer(ByteBuffer buffer, List<IOCallback> callbacks);
+ public void flushBuffer(ByteBuffer buffer, List<IOCompletion> callbacks);
/** Return the number of remaining bytes that still fit on the observer (file) */
Modified: trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -14,7 +14,7 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.utils.VariableLatch;
/**
@@ -24,7 +24,7 @@
*
*
*/
-public class TransactionCallback implements IOCallback
+public class TransactionCallback implements IOCompletion
{
private final VariableLatch countLatch = new VariableLatch();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -18,7 +18,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.impl.TimedBuffer;
import org.hornetq.core.journal.impl.TimedBufferObserver;
import org.hornetq.tests.util.UnitTestCase;
@@ -42,7 +42,7 @@
// Public --------------------------------------------------------
- IOCallback dummyCallback = new IOCallback()
+ IOCompletion dummyCallback = new IOCompletion()
{
public void done()
@@ -64,7 +64,7 @@
final AtomicInteger flushTimes = new AtomicInteger(0);
class TestObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final List<IOCallback> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final List<IOCompletion> callbacks)
{
buffers.add(buffer);
flushTimes.incrementAndGet();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-12 15:57:34 UTC (rev 8275)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-12 16:13:22 UTC (rev 8276)
@@ -20,7 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.core.asyncio.BufferCallback;
-import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.TimedBuffer;
@@ -241,11 +241,11 @@
final ByteBuffer bytes;
- final IOCallback callback;
+ final IOCompletion callback;
volatile boolean sendError;
- CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final IOCallback callback)
+ CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final IOCompletion callback)
{
this.file = file;
this.bytes = bytes;
@@ -399,7 +399,7 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
{
if (!open)
{
@@ -439,7 +439,7 @@
return data.position();
}
- public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
+ public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCompletion callback)
{
if (!open)
{
@@ -605,7 +605,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.remoting.spi.HornetQBuffer, boolean, org.hornetq.core.journal.IOCallback)
*/
- public void write(HornetQBuffer bytes, boolean sync, IOCallback callback) throws Exception
+ public void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws Exception
{
writeDirect(ByteBuffer.wrap(bytes.array()), sync, callback);
15 years, 1 month
JBoss hornetq SVN: r8275 - trunk/tests/src/org/hornetq/tests/integration/jms/connection.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-12 10:57:34 -0500 (Thu, 12 Nov 2009)
New Revision: 8275
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
Log:
added latch instead to make sure server connection was closed
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-11-12 15:46:28 UTC (rev 8274)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-11-12 15:57:34 UTC (rev 8275)
@@ -13,12 +13,17 @@
package org.hornetq.tests.integration.jms.connection;
import java.lang.ref.WeakReference;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.Session;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.tests.util.JMSTestBase;
@@ -53,7 +58,8 @@
super.tearDown();
}
-
+
+
public void testCloseOneConnectionOnGC() throws Exception
{
//Debug - don't remove this until intermittent failure with this test is fixed
@@ -67,11 +73,22 @@
WeakReference<Connection> wr = new WeakReference<Connection>(conn);
assertEquals(1, server.getRemotingService().getConnections().size());
-
+ final CountDownLatch latch = new CountDownLatch(1);
+ Iterator<RemotingConnection> connectionIterator = server.getRemotingService().getConnections().iterator();
+ connectionIterator.next().addCloseListener(new CloseListener()
+ {
+ public void connectionClosed()
+ {
+ latch.countDown();
+ }
+ });
+
conn = null;
+
checkWeakReferences(wr);
-
+
+ latch.await(5000, TimeUnit.MILLISECONDS);
assertEquals(0, server.getRemotingService().getConnections().size());
}
@@ -86,13 +103,30 @@
WeakReference<Connection> wr3 = new WeakReference<Connection>(conn3);
assertEquals(3, server.getRemotingService().getConnections().size());
-
+
+ final CountDownLatch latch = new CountDownLatch(3);
+ Iterator<RemotingConnection> connectionIterator = server.getRemotingService().getConnections().iterator();
+ while (connectionIterator.hasNext())
+ {
+ RemotingConnection remotingConnection = connectionIterator.next();
+ remotingConnection.addCloseListener(new CloseListener()
+ {
+ public void connectionClosed()
+ {
+ latch.countDown();
+ }
+ });
+ }
+
conn1 = null;
conn2 = null;
conn3 = null;
+
checkWeakReferences(wr1, wr2, wr3);
-
+
+ latch.await(5000, TimeUnit.MILLISECONDS);
+
assertEquals(0, server.getRemotingService().getConnections().size());
}
@@ -113,14 +147,28 @@
Session sess5 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess6 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess7 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+ final CountDownLatch latch = new CountDownLatch(3);
+ Iterator<RemotingConnection> connectionIterator = server.getRemotingService().getConnections().iterator();
+ while (connectionIterator.hasNext())
+ {
+ RemotingConnection remotingConnection = connectionIterator.next();
+ remotingConnection.addCloseListener(new CloseListener()
+ {
+ public void connectionClosed()
+ {
+ latch.countDown();
+ }
+ });
+ }
sess1 = sess2 = sess3 = sess4 = sess5 = sess6 = sess7 = null;
conn1 = null;
conn2 = null;
conn3 = null;
-
+
checkWeakReferences(wr1, wr2, wr3);
+
+ latch.await(5000, TimeUnit.MILLISECONDS);
assertEquals(0, server.getRemotingService().getConnections().size());
}
15 years, 1 month
JBoss hornetq SVN: r8274 - in trunk/src/main/org/hornetq/core/journal: impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-12 10:46:28 -0500 (Thu, 12 Nov 2009)
New Revision: 8274
Modified:
trunk/src/main/org/hornetq/core/journal/IOCallback.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
Log:
Removing wrong AIOCallback usage from the journal package
Modified: trunk/src/main/org/hornetq/core/journal/IOCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/IOCallback.java 2009-11-12 14:38:03 UTC (rev 8273)
+++ trunk/src/main/org/hornetq/core/journal/IOCallback.java 2009-11-12 15:46:28 UTC (rev 8274)
@@ -17,7 +17,7 @@
/**
*
- * This class is just a direct extention of AIOCallback.
+ * This class is just a direct extension of AIOCallback.
* Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the journal.
*
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-12 14:38:03 UTC (rev 8273)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-12 15:46:28 UTC (rev 8274)
@@ -18,7 +18,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.asyncio.AIOCallback;
import org.hornetq.core.journal.IOCallback;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -240,7 +239,7 @@
public void onError(int errorCode, String errorMessage)
{
- for (AIOCallback callback : delegates)
+ for (IOCallback callback : delegates)
{
try
{
15 years, 1 month
JBoss hornetq SVN: r8273 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-12 09:38:03 -0500 (Thu, 12 Nov 2009)
New Revision: 8273
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
added latch instead of sleep to fix test
Modified: trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2009-11-12 14:36:45 UTC (rev 8272)
+++ trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2009-11-12 14:38:03 UTC (rev 8273)
@@ -184,6 +184,7 @@
.iterator()
.next();
final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
remotingConnection.addCloseListener(new CloseListener()
{
public void connectionClosed()
@@ -191,13 +192,21 @@
latch.countDown();
}
});
-
+
+ server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener()
+ {
+ public void connectionClosed()
+ {
+ latch2.countDown();
+ }
+ });
+
((ClientSessionInternal)session).getConnection().fail(new HornetQException(HornetQException.INTERNAL_ERROR, "simulate a client failure"));
// let some time for the server to clean the connections
- latch.await(2 * CONNECTION_TTL + 1, TimeUnit.MILLISECONDS);
- Thread.sleep(5000);
+ latch.await(2 * CONNECTION_TTL + 1000, TimeUnit.MILLISECONDS);
+ latch2.await(4 * CONNECTION_TTL + 1000, TimeUnit.MILLISECONDS);
assertEquals(0, server.getConnectionCount());
session.close();
15 years, 1 month
JBoss hornetq SVN: r8272 - trunk/tests/src/org/hornetq/tests/unit/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-12 09:36:45 -0500 (Thu, 12 Nov 2009)
New Revision: 8272
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
added test for iterator when first message handled is busy and that the order is still correct
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-12 13:31:32 UTC (rev 8271)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-12 14:36:45 UTC (rev 8272)
@@ -906,6 +906,47 @@
assertRefListsIdenticalRefs(refs, consumer.getReferences());
}
+ public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
+ {
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+ FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
+
+ consumer.setStatusImmediate(HandleStatus.BUSY);
+
+ queue.addConsumer(consumer);
+
+ final int numMessages = 10;
+
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+ ref.getMessage().putStringProperty("color", "green");
+ refs.add(ref);
+
+ queue.addLast(ref);
+ }
+
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+
+ queue.deliverNow();
+
+ consumer.setStatusImmediate(null);
+
+ queue.deliverNow();
+
+ List<MessageReference> receeivedRefs = consumer.getReferences();
+ int currId = 0;
+ for (MessageReference receeivedRef : receeivedRefs)
+ {
+ assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID() , currId++);
+ }
+ }
+
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -971,6 +1012,13 @@
assertEquals(30, queue.getMessageCount());
assertEquals(0, queue.getScheduledCount());
assertEquals(10, queue.getDeliveringCount());
+
+ List<MessageReference> receeivedRefs = consumer.getReferences();
+ int currId = 10;
+ for (MessageReference receeivedRef : receeivedRefs)
+ {
+ assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID() , currId++);
+ }
}
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
15 years, 1 month