JBoss hornetq SVN: r10367 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-25 17:13:53 -0400 (Fri, 25 Mar 2011)
New Revision: 10367
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
removing invalid System.exit
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-03-25 19:31:43 UTC (rev 10366)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-03-25 21:13:53 UTC (rev 10367)
@@ -94,7 +94,7 @@
checkFreePort(5447);
if (InVMRegistry.instance.size() > 0)
{
- System.exit(0);
+ fail("InVMREgistry size > 0");
}
}
13 years, 9 months
JBoss hornetq SVN: r10366 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-25 15:31:43 -0400 (Fri, 25 Mar 2011)
New Revision: 10366
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6170 - Redelivery counters infinite updates
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -37,9 +37,11 @@
private final PagePosition position;
private WeakReference<PagedMessage> message;
-
+
private Long deliveryTime = null;
-
+
+ private int persistedCount;
+
private final PageSubscription subscription;
public ServerMessage getMessage()
@@ -50,10 +52,10 @@
public synchronized PagedMessage getPagedMessage()
{
PagedMessage returnMessage = message != null ? message.get() : null;
-
+
// We only keep a few references on the Queue from paging...
// Besides those references are SoftReferenced on page cache...
- // So, this will unlikely be null,
+ // So, this will unlikely be null,
// unless the Queue has stalled for some time after paging
if (returnMessage == null)
{
@@ -69,7 +71,9 @@
return position;
}
- public PagedReferenceImpl(final PagePosition position, final PagedMessage message, final PageSubscription subscription)
+ public PagedReferenceImpl(final PagePosition position,
+ final PagedMessage message,
+ final PageSubscription subscription)
{
this.position = position;
this.message = new WeakReference<PagedMessage>(message);
@@ -80,6 +84,16 @@
{
return true;
}
+
+ public void setPersistedCount(int count)
+ {
+ this.persistedCount = count;
+ }
+
+ public int getPersistedCount()
+ {
+ return persistedCount;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -744,16 +744,22 @@
public void updateDeliveryCount(final MessageReference ref) throws Exception
{
- DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
- ref.getDeliveryCount());
+ // no need to store if it's the same value
+ // otherwise the journal will get OME in case of lots of redeliveries
+ if (ref.getDeliveryCount() != ref.getPersistedCount())
+ {
+ ref.setPersistedCount(ref.getDeliveryCount());
+ DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
+ ref.getDeliveryCount());
+
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+ JournalStorageManager.UPDATE_DELIVERY_COUNT,
+ updateInfo,
+
+ syncNonTransactional,
+ getContext(syncNonTransactional));
+ }
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- JournalStorageManager.UPDATE_DELIVERY_COUNT,
- updateInfo,
-
- syncNonTransactional,
- getContext(syncNonTransactional));
-
}
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -47,6 +47,10 @@
int getDeliveryCount();
void setDeliveryCount(int deliveryCount);
+
+ void setPersistedCount(int deliveryCount);
+
+ int getPersistedCount();
void incrementDeliveryCount();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -645,18 +645,14 @@
}
- OperationContext formerCtx = null;
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
// It may still be possible to have this scenario on a real failure (without the use of XA)
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- storageManager.setContext(session.getSessionContext());
session.close(true);
}
-
- storageManager.setContext(formerCtx);
remotingService.stop();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -230,6 +230,16 @@
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
+ public void setPersistedCount(int count)
+ {
+ ref.setPersistedCount(count);
+ }
+
+ public int getPersistedCount()
+ {
+ return ref.getPersistedCount();
+ }
+
public boolean isPaged()
{
return false;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -36,6 +36,8 @@
private volatile int deliveryCount;
+ private volatile int persistedCount;
+
private volatile long scheduledDeliveryTime;
private final ServerMessage message;
@@ -91,6 +93,23 @@
}
// MessageReference implementation -------------------------------
+
+ /**
+ * @return the persistedCount
+ */
+ public int getPersistedCount()
+ {
+ return persistedCount;
+ }
+
+ /**
+ * @param persistedCount the persistedCount to set
+ */
+ public void setPersistedCount(int persistedCount)
+ {
+ this.persistedCount = persistedCount;
+ }
+
public MessageReference copy(final Queue queue)
{
return new MessageReferenceImpl(this, queue);
@@ -109,6 +128,7 @@
public void setDeliveryCount(final int deliveryCount)
{
this.deliveryCount = deliveryCount;
+ this.persistedCount = deliveryCount;
}
public void incrementDeliveryCount()
@@ -145,7 +165,7 @@
{
queue.referenceHandled();
}
-
+
public boolean isPaged()
{
return false;
@@ -167,7 +187,6 @@
queue.acknowledge(tx, this);
}
-
// Public --------------------------------------------------------
@Override
@@ -175,7 +194,9 @@
{
return "Reference[" + getMessage().getMessageID() +
"]:" +
- (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") + ":" + getMessage() ;
+ (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") +
+ ":" +
+ getMessage();
}
// Package protected ---------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -953,24 +953,35 @@
public void close(final boolean failed)
{
- storageManager.afterCompleteOperations(new IOAsyncTask()
+ OperationContext formerCtx = storageManager.getContext();
+
+ try
{
- public void onError(int errorCode, String errorMessage)
- {
- }
+ storageManager.setContext(sessionContext);
- public void done()
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- try
+ public void onError(int errorCode, String errorMessage)
{
- doClose(failed);
}
- catch (Exception e)
+
+ public void done()
{
- log.error("Failed to close session", e);
+ try
+ {
+ doClose(failed);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
}
- }
- });
+ });
+ }
+ finally
+ {
+ storageManager.setContext(formerCtx);
+ }
}
public void closeConsumer(final long consumerID) throws Exception
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -13,13 +13,27 @@
package org.hornetq.tests.integration.client;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -223,6 +237,111 @@
session.close();
}
+
+ public void testInfiniteDedeliveryMessageOnPersistent() throws Exception
+ {
+ internaltestInfiniteDedeliveryMessageOnPersistent(false);
+ }
+
+ private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean strict) throws Exception
+ {
+ setUp(strict);
+ ClientSession session = factory.createSession(false, false, false);
+
+ RedeliveryConsumerTest.log.info("created");
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+ prod.send(createTextMessage(session, "Hello"));
+ session.commit();
+ session.close();
+
+
+ int expectedCount = 1;
+ for (int i = 0 ; i < 700; i++)
+ {
+ session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(expectedCount, msg.getDeliveryCount());
+
+ if (i % 100 == 0)
+ {
+ expectedCount++;
+ msg.acknowledge();
+ session.rollback();
+ }
+ session.close();
+ }
+
+ factory.close();
+ server.stop();
+
+ setUp(false);
+
+ for (int i = 0 ; i < 700; i++)
+ {
+ session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(expectedCount, msg.getDeliveryCount());
+ session.close();
+ }
+
+ server.stop();
+
+
+ JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(),
+ 2,
+ 0,
+ 0,
+ new NIOSequentialFileFactory(server.getConfiguration().getJournalDirectory()),
+ "hornetq-data",
+ "hq",
+ 1);
+
+
+ final AtomicInteger updates = new AtomicInteger();
+
+ journal.start();
+ journal.load(new LoaderCallback()
+ {
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ if (info.userRecordType == JournalStorageManager.UPDATE_DELIVERY_COUNT)
+ {
+ updates.incrementAndGet();
+ }
+ }
+
+ public void deleteRecord(long id)
+ {
+ }
+
+ public void addRecord(RecordInfo info)
+ {
+ }
+
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ {
+ }
+ });
+
+ journal.stop();
+
+
+ assertEquals(7, updates.get());
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -244,7 +363,14 @@
factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
- session.createQueue(ADDRESS, ADDRESS, true);
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, true);
+ }
+ catch (HornetQException expected)
+ {
+ // in case of restart
+ }
session.close();
}
13 years, 9 months
JBoss hornetq SVN: r10365 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: protocol/core/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-25 09:17:31 -0400 (Fri, 25 Mar 2011)
New Revision: 10365
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153 - fixing tests
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-03-25 11:25:15 UTC (rev 10364)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-03-25 13:17:31 UTC (rev 10365)
@@ -114,8 +114,6 @@
private final ServerSession session;
- private final OperationContext sessionContext;
-
// Storagemanager here is used to set the Context
private final StorageManager storageManager;
@@ -126,7 +124,6 @@
private final boolean direct;
public ServerSessionPacketHandler(final ServerSession session,
- final OperationContext sessionContext,
final StorageManager storageManager,
final Channel channel)
{
@@ -134,8 +131,6 @@
this.storageManager = storageManager;
- this.sessionContext = sessionContext;
-
this.channel = channel;
this.remotingConnection = channel.getConnection();
@@ -197,7 +192,7 @@
{
byte type = packet.getType();
- storageManager.setContext(sessionContext);
+ storageManager.setContext(session.getSessionContext());
Packet response = null;
boolean flush = false;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-03-25 11:25:15 UTC (rev 10364)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-03-25 13:17:31 UTC (rev 10365)
@@ -129,9 +129,9 @@
Version version = server.getVersion();
int[] compatibleList = version.getCompatibleVersionList();
boolean isCompatibleClient = false;
- for(int i=0; i<compatibleList.length; i++)
+ for (int i = 0; i < compatibleList.length; i++)
{
- if(compatibleList[i] == request.getVersion())
+ if (compatibleList[i] == request.getVersion())
{
isCompatibleClient = true;
break;
@@ -165,22 +165,23 @@
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
- ServerSession session = server.createSession(request.getName(),
+ ServerSession session = server.createSession(request.getName(),
request.getUsername(),
request.getPassword(),
- request.getMinLargeMessageSize(),
+ request.getMinLargeMessageSize(),
connection,
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
request.isPreAcknowledge(),
request.isXA(),
request.getDefaultAddress(),
- new CoreSessionCallback(request.getName(), protocolManager, channel));
+ new CoreSessionCallback(request.getName(),
+ protocolManager,
+ channel));
+ session.setSessionContext(server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
+
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
- server.getStorageManager()
- .newContext(server.getExecutorFactory()
- .getExecutor()),
server.getStorageManager(),
channel);
channel.setHandler(handler);
@@ -201,11 +202,11 @@
}
}
catch (Exception e)
- {
+ {
log.error("Failed to create session ", e);
-
+
HornetQPacketHandler.log.error("Failed to create session", e);
-
+
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
@@ -225,22 +226,22 @@
private void handleReattachSession(final ReattachSessionMessage request)
{
Packet response = null;
-
+
try
{
-
+
if (!server.isStarted())
{
response = new ReattachSessionResponseMessage(-1, false);
}
-
+
ServerSessionPacketHandler sessionHandler = protocolManager.getSessionHandler(request.getName());
-
+
if (!server.checkActivate())
{
response = new ReattachSessionResponseMessage(-1, false);
}
-
+
if (sessionHandler == null)
{
response = new ReattachSessionResponseMessage(-1, false);
@@ -252,9 +253,9 @@
// Even though session exists, we can't reattach since confi window size == -1,
// i.e. we don't have a resend cache for commands, so we just close the old session
// and let the client recreate
-
+
sessionHandler.close();
-
+
response = new ReattachSessionResponseMessage(-1, false);
}
else
@@ -262,7 +263,7 @@
// Reconnect the channel to the new connection
int serverLastConfirmedCommandID = sessionHandler.transferConnection(connection,
request.getLastConfirmedCommandID());
-
+
response = new ReattachSessionResponseMessage(serverLastConfirmedCommandID, true);
}
}
@@ -270,7 +271,7 @@
catch (Exception e)
{
HornetQPacketHandler.log.error("Failed to reattach session", e);
-
+
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-03-25 11:25:15 UTC (rev 10364)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-03-25 13:17:31 UTC (rev 10365)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.utils.json.JSONArray;
/**
@@ -136,4 +137,10 @@
String getLastSentMessageID(String address);
long getCreationTime();
+
+
+ OperationContext getSessionContext();
+
+ void setSessionContext(OperationContext context);
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25 11:25:15 UTC (rev 10364)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
@@ -63,11 +63,13 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -643,15 +645,18 @@
}
+ OperationContext formerCtx = null;
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
// It may still be possible to have this scenario on a real failure (without the use of XA)
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- log.info("closing a session" );
+ storageManager.setContext(session.getSessionContext());
session.close(true);
}
+
+ storageManager.setContext(formerCtx);
remotingService.stop();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-25 11:25:15 UTC (rev 10364)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
@@ -42,6 +42,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
@@ -146,6 +147,8 @@
private volatile int timeoutSeconds;
private Map<String, String> metaData;
+
+ private OperationContext sessionContext;
// Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
@@ -223,7 +226,23 @@
}
// ServerSession implementation ----------------------------------------------------------------------------
+ /**
+ * @return the sessionContext
+ */
+ public OperationContext getSessionContext()
+ {
+ return sessionContext;
+ }
+ /**
+ * @param sessionContext the sessionContext to set
+ */
+ public void setSessionContext(OperationContext sessionContext)
+ {
+ this.sessionContext = sessionContext;
+ }
+
+
public String getUsername()
{
return username;
13 years, 9 months
JBoss hornetq SVN: r10363 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-25 04:33:29 -0400 (Fri, 25 Mar 2011)
New Revision: 10363
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
Log:
upload build
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-03-25 08:23:37 UTC (rev 10362)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-03-25 08:33:29 UTC (rev 10363)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.1.GA-10354"/>
+ <property name="hornetq.version" value="2.2.1.GA-10362"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-03-25 08:23:37 UTC (rev 10362)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-03-25 08:33:29 UTC (rev 10363)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.1.GA-10354</hornetq.version>
+ <hornetq.version>2.2.1.GA-10362</hornetq.version>
</properties>
<licenses>
13 years, 9 months
JBoss hornetq SVN: r10362 - in branches/Branch_2_2_EAP/src/main/org/hornetq: core/postoffice/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-25 04:23:37 -0400 (Fri, 25 Mar 2011)
New Revision: 10362
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153 - avoiding message loss on shutdown
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2011-03-25 08:23:37 UTC (rev 10362)
@@ -134,7 +134,7 @@
@Override
public String toString()
{
- return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + "]";
+ return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
}
/* (non-Javadoc)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-25 08:23:37 UTC (rev 10362)
@@ -635,7 +635,7 @@
public void resetIfNeeded() throws HornetQException
{
- if(rollbackOnly)
+ if (rollbackOnly)
{
log.warn("resetting session after failure");
rollback(false);
@@ -1204,7 +1204,7 @@
{
checkXA();
- //we should never throw rollback if we have already prepared
+ // we should never throw rollback if we have already prepared
if (rollbackOnly)
{
log.warn("committing transaction after failover occurred, any non persistent messages may be lost");
@@ -1223,8 +1223,8 @@
if (response.isError())
{
- //if we retry and its not there the assume that it was committed
- if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+ // if we retry and its not there the assume that it was committed
+ if (xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
{
return;
}
@@ -1395,7 +1395,7 @@
}
catch (HornetQException e1)
{
- //ignore and rollback
+ // ignore and rollback
}
log.warn("failover occurred during prepare rolling back");
try
@@ -1481,8 +1481,8 @@
if (response.isError())
{
- //if we retry and its not there the assume that it was rolled back
- if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+ // if we retry and its not there the assume that it was rolled back
+ if (xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
{
return;
}
@@ -1557,7 +1557,7 @@
}
catch (HornetQException e)
{
- //we can retry this only because we know for sure that no work would have been done
+ // we can retry this only because we know for sure that no work would have been done
if (e.getCode() == HornetQException.UNBLOCKED)
{
try
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-25 08:23:37 UTC (rev 10362)
@@ -1062,10 +1062,7 @@
{
StringBuffer warnMessage = new StringBuffer();
warnMessage.append("Duplicate message detected through the bridge - message will not be routed. Message information:\n");
- for (SimpleString key : message.getPropertyNames())
- {
- warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
- }
+ warnMessage.append(message.toString());
PostOfficeImpl.log.warn(warnMessage.toString());
if (context.getTransaction() != null)
@@ -1105,10 +1102,7 @@
{
StringBuffer warnMessage = new StringBuffer();
warnMessage.append("Duplicate message detected - message will not be routed. Message information:\n");
- for (SimpleString key : message.getPropertyNames())
- {
- warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
- }
+ warnMessage.append(message.toString());
PostOfficeImpl.log.warn(warnMessage.toString());
if (context.getTransaction() != null)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25 08:23:37 UTC (rev 10362)
@@ -642,6 +642,16 @@
}
}
+
+ // We close all the exception in an attempt to let any pending IO to finish
+ // to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
+ // It may still be possible to have this scenario on a real failure (without the use of XA)
+ // But at least we will do our best to avoid it on regular shutdowns
+ for (ServerSession session : sessions.values())
+ {
+ log.info("closing a session" );
+ session.close(true);
+ }
remotingService.stop();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-25 08:23:37 UTC (rev 10362)
@@ -175,7 +175,7 @@
{
return "Reference[" + getMessage().getMessageID() +
"]:" +
- (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE");
+ (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") + ":" + getMessage() ;
}
// Package protected ---------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-03-25 08:23:37 UTC (rev 10362)
@@ -273,7 +273,7 @@
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + "]";
+ return "ServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
}
// FIXME - this is stuff that is only used in large messages
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2011-03-25 08:23:37 UTC (rev 10362)
@@ -19,15 +19,15 @@
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.api.core.client.MessageHandler;
-import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQDestination;
@@ -70,6 +70,8 @@
private boolean useLocalTx;
private boolean transacted;
+
+ private boolean useXA = false;
private final int sessionNr;
@@ -181,10 +183,12 @@
if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
{
endpoint = endpointFactory.createEndpoint(session);
+ useXA = true;
}
else
{
endpoint = endpointFactory.createEndpoint(null);
+ useXA = false;
}
consumer.setMessageHandler(this);
}
@@ -246,6 +250,8 @@
public void onMessage(final ClientMessage message)
{
+ HornetQMessageHandler.log.info("onMessage(" + message + ")");
+
if (HornetQMessageHandler.trace)
{
HornetQMessageHandler.log.trace("onMessage(" + message + ")");
@@ -298,6 +304,35 @@
// we need to call before/afterDelivery as a pair
if (beforeDelivery)
{
+ if (useXA && tm != null)
+ {
+ // This is the job for the container,
+ // however if the container throws an exception because of some other errors,
+ // there are situations where the container is not setting the rollback only
+ // this is to avoid a scenario where afterDelivery would kick in
+ try
+ {
+ Transaction tx = tm.getTransaction();
+ if (tx != null)
+ {
+ tx.setRollbackOnly();
+ }
+ }
+ catch (Exception e1)
+ {
+ log.warn("unnable to clear the transaction", e1);
+ try
+ {
+ session.rollback();
+ }
+ catch (HornetQException e2)
+ {
+ log.warn("Unable to rollback", e2);
+ return;
+ }
+ }
+ }
+
try
{
endpoint.afterDelivery();
13 years, 9 months
JBoss hornetq SVN: r10361 - in branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-24 14:09:04 -0400 (Thu, 24 Mar 2011)
New Revision: 10361
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-03-24 18:00:09 UTC (rev 10360)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-03-24 18:09:04 UTC (rev 10361)
@@ -38,6 +38,24 @@
public class FakeQueue implements Queue
{
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#isInternalQueue()
+ */
+ public boolean isInternalQueue()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#setInternalQueue(boolean)
+ */
+ public void setInternalQueue(boolean internalQueue)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
PageSubscription subs;
public boolean isDirectDeliver()
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-03-24 18:00:09 UTC (rev 10360)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-03-24 18:09:04 UTC (rev 10361)
@@ -102,6 +102,24 @@
{
/* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#isInternalQueue()
+ */
+ public boolean isInternalQueue()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#setInternalQueue(boolean)
+ */
+ public void setInternalQueue(boolean internalQueue)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.Bindable#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
*/
public void route(ServerMessage message, RoutingContext context) throws Exception
13 years, 9 months
JBoss hornetq SVN: r10360 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core/server: cluster/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-24 14:00:09 -0400 (Thu, 24 Mar 2011)
New Revision: 10360
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-03-24 18:00:09 UTC (rev 10360)
@@ -166,4 +166,12 @@
boolean isDirectDeliver();
SimpleString getAddress();
+
+ /**
+ * We can't send stuff to DLQ on queues used on clustered-bridge-communication
+ * @return
+ */
+ boolean isInternalQueue();
+
+ void setInternalQueue(boolean internalQueue);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-03-24 18:00:09 UTC (rev 10360)
@@ -107,6 +107,9 @@
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
this.connector = connector;
+
+ // we need to disable DLQ check on the clustered bridges
+ queue.setInternalQueue(true);
}
@Override
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-24 18:00:09 UTC (rev 10360)
@@ -715,19 +715,45 @@
HornetQServerImpl.log.debug("Waiting for " + task);
}
+ if (memoryManager != null)
+ {
+ memoryManager.stop();
+ }
+
threadPool.shutdown();
+
+ scheduledPool.shutdown();
- scheduledPool = null;
+ try
+ {
+ if (!threadPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ HornetQServerImpl.log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
+ threadPool = null;
- if (memoryManager != null)
+
+ try
{
- memoryManager.stop();
+ if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ HornetQServerImpl.log.warn("Timed out waiting for scheduled pool to terminate");
+ }
}
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
- addressSettingsRepository.clear();
+ threadPool = null;
+
+ scheduledPool = null;
- securityRepository.clear();
-
pagingManager = null;
securityStore = null;
resourceManager = null;
@@ -764,19 +790,7 @@
Logger.reset();
}
- try
- {
- if (!threadPool.awaitTermination(5000, TimeUnit.MILLISECONDS))
- {
- HornetQServerImpl.log.warn("Timed out waiting for pool to terminate");
- }
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- threadPool = null;
- }
+ }
// HornetQServer implementation
// -----------------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-03-24 18:00:09 UTC (rev 10360)
@@ -32,7 +32,6 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.persistence.StorageManager;
@@ -153,6 +152,8 @@
private volatile int consumerWithFilterCount;
private final Runnable concurrentPoller = new ConcurrentPoller();
+
+ private boolean internalQueue;
private volatile boolean checkDirect;
@@ -1308,6 +1309,22 @@
return directDeliver;
}
+ /**
+ * @return the internalQueue
+ */
+ public boolean isInternalQueue()
+ {
+ return internalQueue;
+ }
+
+ /**
+ * @param internalQueue the internalQueue to set
+ */
+ public void setInternalQueue(boolean internalQueue)
+ {
+ this.internalQueue = internalQueue;
+ }
+
// Public
// -----------------------------------------------------------------------------
@@ -1582,9 +1599,15 @@
{
ServerMessage message = reference.getMessage();
+ if (internalQueue)
+ {
+ // no DLQ check on internal queues
+ return true;
+ }
+
// TODO: DeliveryCount on paging
- if (message.isDurable() && durable && !reference.isPaged())
+ if (!internalQueue && message.isDurable() && durable && !reference.isPaged())
{
storageManager.updateDeliveryCount(reference);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-03-24 18:00:09 UTC (rev 10360)
@@ -265,7 +265,7 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount)
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
{
storageManager.updateDeliveryCount(ref);
}
13 years, 9 months
JBoss hornetq SVN: r10359 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-03-24 11:37:58 -0400 (Thu, 24 Mar 2011)
New Revision: 10359
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
added pause between starting servers
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-23 17:57:12 UTC (rev 10358)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-24 15:37:58 UTC (rev 10359)
@@ -330,6 +330,13 @@
}
System.out.println("=======================================================================");
+ for (HornetQServer hornetQServer : servers)
+ {
+ if (hornetQServer != null)
+ {
+ System.out.println(clusterDescription(hornetQServer));
+ }
+ }
throw new IllegalStateException(msg);
}
@@ -1793,7 +1800,11 @@
ClusterTestBase.log.info("started server " + servers[node]);
ClusterTestBase.log.info("started server " + node);
-
+ /*
+ * we need to wait a lil while between server start up to allow the server to communicate in some order.
+ * This is to avoid split brain on startup
+ * */
+ Thread.sleep(500);
}
for (int node : nodes)
{
13 years, 9 months
JBoss hornetq SVN: r10358 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-03-23 13:57:12 -0400 (Wed, 23 Mar 2011)
New Revision: 10358
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
tweak
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-23 14:52:22 UTC (rev 10357)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-23 17:57:12 UTC (rev 10358)
@@ -2954,7 +2954,7 @@
public String toString()
{
- return "ACK;" + refEncoding;
+ return "AddRef;" + refEncoding;
}
}
@@ -2970,7 +2970,7 @@
public String toString()
{
- return "AddRef;" + refEncoding;
+ return "ACK;" + refEncoding;
}
}
13 years, 9 months
JBoss hornetq SVN: r10357 - in branches/Branch_2_2_EAP: src/main/org/hornetq/jms/server/recovery and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-03-23 10:52:22 -0400 (Wed, 23 Mar 2011)
New Revision: 10357
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
Modified:
branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
Log:
https://issues.jboss.org/browse/JBPAPP-6049 - fixed recovery to handle multiple connectors and failover
Modified: branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml
===================================================================
--- branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml 2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/examples/javaee/xarecovery/server/jbossts-properties.xml 2011-03-23 14:52:22 UTC (rev 10357)
@@ -242,11 +242,17 @@
<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>
+ <!--you'll need something like this if the HornetQ Server is remote-->
<!--
<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445"/>-->
+ <!--you'll need something like this if the HornetQ Server is remote and has failover configured-->
+ <!--
+ <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ2"
+ value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost,port=5445;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,guest,guest,host=localhost2,port=5446"/>-->
+
<property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
</properties>
<properties depends="arjuna,txoj,jta" name="recoverymanager">
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java 2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceRecovery.java 2011-03-23 14:52:22 UTC (rev 10357)
@@ -20,14 +20,19 @@
import com.arjuna.ats.jta.recovery.XAResourceRecovery;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
/**
*
* A XAResourceRecovery instance that can be used to recover any JMS provider.
+ *
+ * In reality only recover,rollback and commit will be called but we still need to
+ * be implement all methods just in case.
*
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
@@ -58,14 +63,24 @@
HornetQXAResourceRecovery.log.trace(this + " intialise: " + config);
}
- ConfigParser parser = new ConfigParser(config);
- String connectorFactoryClassName = parser.getConnectorFactoryClassName();
- Map<String, Object> connectorParams = parser.getConnectorParameters();
- String username = parser.getUsername();
- String password = parser.getPassword();
+ String[] configs = config.split(";");
+ XARecoveryConfig[] xaRecoveryConfigs = new XARecoveryConfig[configs.length];
+ for (int i = 0, configsLength = configs.length; i < configsLength; i++)
+ {
+ String s = configs[i];
+ ConfigParser parser = new ConfigParser(s);
+ String connectorFactoryClassName = parser.getConnectorFactoryClassName();
+ Map<String, Object> connectorParams = parser.getConnectorParameters();
+ String username = parser.getUsername();
+ String password = parser.getPassword();
+ TransportConfiguration transportConfiguration = new TransportConfiguration(connectorFactoryClassName, connectorParams);
+ xaRecoveryConfigs[i] = new XARecoveryConfig(transportConfiguration, username, password);
+ }
- res = new HornetQXAResourceWrapper(connectorFactoryClassName, connectorParams, username, password);
+
+ res = new HornetQXAResourceWrapper(xaRecoveryConfigs);
+
if (HornetQXAResourceRecovery.log.isTraceEnabled())
{
HornetQXAResourceRecovery.log.trace(this + " initialised");
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-03-23 03:54:43 UTC (rev 10356)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/HornetQXAResourceWrapper.java 2011-03-23 14:52:22 UTC (rev 10357)
@@ -40,6 +40,7 @@
* @author <a href="adrian(a)jboss.com">Adrian Brock</a>
* @author <a href="tim.fox(a)jboss.com">Tim Fox/a>
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*
* @version $Revision: 45341 $
*/
@@ -51,36 +52,26 @@
/** The state lock */
private static final Object lock = new Object();
- /** The JNDI lookup for the XA connection factory */
- private final String connectorFactoryClassName;
-
- private final Map<String, Object> connectorConfig;
-
- private final String username;
-
- private final String password;
-
private ServerLocator serverLocator;
private ClientSessionFactory csf;
private XAResource delegate;
- public HornetQXAResourceWrapper(final String connectorFactoryClassName,
- final Map<String, Object> connectorConfig,
- final String username,
- final String password)
+ private XARecoveryConfig[] xaRecoveryConfigs;
+
+ private TransportConfiguration currentConnection;
+
+ public HornetQXAResourceWrapper(XARecoveryConfig... xaRecoveryConfigs)
{
- this.connectorFactoryClassName = connectorFactoryClassName;
- this.connectorConfig = connectorConfig;
- this.username = username;
- this.password = password;
+
+ this.xaRecoveryConfigs = xaRecoveryConfigs;
}
public Xid[] recover(final int flag) throws XAException
{
- HornetQXAResourceWrapper.log.debug("Recover " + connectorFactoryClassName);
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("Recover " + currentConnection);
try
{
return xaResource.recover(flag);
@@ -93,8 +84,8 @@
public void commit(final Xid xid, final boolean onePhase) throws XAException
{
- HornetQXAResourceWrapper.log.debug("Commit " + connectorFactoryClassName + " xid " + " onePhase=" + onePhase);
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(true);
+ HornetQXAResourceWrapper.log.debug("Commit " + currentConnection + " xid " + " onePhase=" + onePhase);
try
{
xaResource.commit(xid, onePhase);
@@ -107,8 +98,8 @@
public void rollback(final Xid xid) throws XAException
{
- HornetQXAResourceWrapper.log.debug("Rollback " + connectorFactoryClassName + " xid ");
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(true);
+ HornetQXAResourceWrapper.log.debug("Rollback " + currentConnection + " xid ");
try
{
xaResource.rollback(xid);
@@ -121,8 +112,8 @@
public void forget(final Xid xid) throws XAException
{
- HornetQXAResourceWrapper.log.debug("Forget " + connectorFactoryClassName + " xid ");
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("Forget " + currentConnection + " xid ");
try
{
xaResource.forget(xid);
@@ -137,10 +128,10 @@
{
if (xaRes instanceof HornetQXAResourceWrapper)
{
- xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate();
+ xaRes = ((HornetQXAResourceWrapper)xaRes).getDelegate(false);
}
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
try
{
return xaResource.isSameRM(xaRes);
@@ -153,7 +144,8 @@
public int prepare(final Xid xid) throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(true);
+ HornetQXAResourceWrapper.log.debug("prepare " + currentConnection + " xid ");
try
{
return xaResource.prepare(xid);
@@ -166,7 +158,8 @@
public void start(final Xid xid, final int flags) throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("start " + currentConnection + " xid ");
try
{
xaResource.start(xid, flags);
@@ -179,7 +172,8 @@
public void end(final Xid xid, final int flags) throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("end " + currentConnection + " xid ");
try
{
xaResource.end(xid, flags);
@@ -192,7 +186,8 @@
public int getTransactionTimeout() throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("getTransactionTimeout " + currentConnection + " xid ");
try
{
return xaResource.getTransactionTimeout();
@@ -205,7 +200,8 @@
public boolean setTransactionTimeout(final int seconds) throws XAException
{
- XAResource xaResource = getDelegate();
+ XAResource xaResource = getDelegate(false);
+ HornetQXAResourceWrapper.log.debug("setTransactionTimeout " + currentConnection + " xid ");
try
{
return xaResource.setTransactionTimeout(seconds);
@@ -218,7 +214,7 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
- HornetQXAResourceWrapper.log.warn("Notified of connection failure in recovery connectionFactory for provider " + connectorFactoryClassName,
+ HornetQXAResourceWrapper.log.warn("Notified of connection failure in xa recovery connectionFactory for provider " + currentConnection + " will attempt reconnect on next pass",
me);
close();
}
@@ -233,7 +229,7 @@
* @return the connectionFactory
* @throws XAException for any problem
*/
- public XAResource getDelegate() throws XAException
+ public XAResource getDelegate(boolean retry) throws XAException
{
XAResource result = null;
Exception error = null;
@@ -243,20 +239,36 @@
}
catch (Exception e)
{
- HornetQXAResourceWrapper.log.error("********************************Failed to connect to server", e);
error = e;
}
if (result == null)
{
- XAException xae = new XAException("Error trying to connect to provider " + connectorFactoryClassName);
- xae.errorCode = XAException.XAER_RMERR;
- if (error != null)
+ //we should always throw a retry for certain methods comit etc, if not the tx is marked as a heuristic and
+ //all chaos is let loose
+ if(retry)
{
- xae.initCause(error);
+ XAException xae = new XAException("Connection unavailable for xa recovery");
+ xae.errorCode = XAException.XA_RETRY;
+ if (error != null)
+ {
+ xae.initCause(error);
+ }
+ HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
+ throw xae;
}
- HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
- throw xae;
+ else
+ {
+ XAException xae = new XAException("Error trying to connect to any providers for xa recovery");
+ xae.errorCode = XAException.XAER_RMERR;
+ if (error != null)
+ {
+ xae.initCause(error);
+ }
+ HornetQXAResourceWrapper.log.debug("Cannot get connectionFactory XAResource", xae);
+ throw xae;
+ }
+
}
return result;
@@ -279,28 +291,42 @@
}
}
- TransportConfiguration config = new TransportConfiguration(connectorFactoryClassName, connectorConfig);
- serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration[]{config});
- serverLocator.disableFinalizeCheck();
- csf = serverLocator.createSessionFactory();
- ClientSession cs = null;
-
- if (username == null)
+ for (XARecoveryConfig xaRecoveryConfig : xaRecoveryConfigs)
{
- cs = csf.createSession(true, false, false);
- }
- else
- {
- cs = csf.createSession(username, password, true, false, false, false, 1);
- }
- cs.addFailureListener(this);
- synchronized (HornetQXAResourceWrapper.lock)
- {
- delegate = cs;
- }
- return delegate;
+ ClientSession cs = null;
+
+ try
+ {
+ serverLocator = HornetQClient.createServerLocatorWithoutHA(xaRecoveryConfig.getTransportConfiguration());
+ serverLocator.disableFinalizeCheck();
+ csf = serverLocator.createSessionFactory();
+ if (xaRecoveryConfig.getUsername() == null)
+ {
+ cs = csf.createSession(true, false, false);
+ }
+ else
+ {
+ cs = csf.createSession(xaRecoveryConfig.getUsername(), xaRecoveryConfig.getPassword(), true, false, false, false, 1);
+ }
+ }
+ catch (HornetQException e)
+ {
+ continue;
+ }
+ cs.addFailureListener(this);
+
+ synchronized (HornetQXAResourceWrapper.lock)
+ {
+ delegate = cs;
+ currentConnection = xaRecoveryConfig.getTransportConfiguration();
+ }
+
+ return delegate;
+ }
+ currentConnection = null;
+ throw new HornetQException(HornetQException.NOT_CONNECTED);
}
/**
@@ -344,10 +370,9 @@
{
if (e.errorCode == XAException.XA_RETRY)
{
- HornetQXAResourceWrapper.log.debug("Fatal error in provider " + connectorFactoryClassName, e);
close();
}
- throw new XAException(XAException.XAER_RMFAIL);
+ throw e;
}
@Override
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/recovery/XARecoveryConfig.java 2011-03-23 14:52:22 UTC (rev 10357)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.server.recovery;
+
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
+ *
+ * A wrapper around info needed for the xa recovery resource
+ * Date: 3/23/11
+ * Time: 10:15 AM
+ */
+public class XARecoveryConfig
+{
+ private final TransportConfiguration transportConfiguration;
+ private final String username;
+ private final String password;
+
+ public XARecoveryConfig(TransportConfiguration transportConfiguration, String username, String password)
+ {
+ this.transportConfiguration = transportConfiguration;
+ this.username = username;
+ this.password = password;
+ }
+
+ public TransportConfiguration getTransportConfiguration()
+ {
+ return transportConfiguration;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+}
13 years, 9 months