Author: jmesnil
Date: 2010-01-22 05:19:02 -0500 (Fri, 22 Jan 2010)
New Revision: 8835
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
Modified:
branches/HORNETQ-129_STOMP_protocol/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/api/core/Interceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* sync with the trunk: svn merge -r 8821:8832
https://svn.jboss.org/repos/hornetq/trunk
Modified:
branches/HORNETQ-129_STOMP_protocol/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/examples/jms/interceptor/src/org/hornetq/jms/example/SimpleInterceptor.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -18,8 +18,8 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
/**
* A simple Interceptor implementation
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/api/core/Interceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/api/core/Interceptor.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/api/core/Interceptor.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -15,7 +15,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.spi.core.protocol.RemotingConnection;
/**
* This is class is a simple way to intercepting calls on HornetQ client and servers.
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -26,11 +26,11 @@
import org.hornetq.core.list.impl.PriorityLinkedListImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.utils.Future;
import org.hornetq.utils.TokenBucketLimiter;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -17,9 +17,9 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
/**
*
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -25,9 +25,9 @@
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.utils.TokenBucketLimiter;
import org.hornetq.utils.UUIDGenerator;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -37,43 +37,43 @@
import org.hornetq.core.protocol.core.CommandConfirmationHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.PacketImpl;
-import org.hornetq.core.protocol.core.wireformat.CreateQueueMessage;
-import org.hornetq.core.protocol.core.wireformat.CreateSessionMessage;
-import org.hornetq.core.protocol.core.wireformat.ReattachSessionMessage;
-import org.hornetq.core.protocol.core.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.RollbackMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionCloseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionExpiredMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.protocol.core.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionSendMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXACommitMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAEndMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXASetTimeoutResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionXAStartMessage;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXAJoinMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXAPrepareMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXAResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXAResumeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXARollbackMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.IDGenerator;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -17,10 +17,10 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveMessage;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
/**
* A ClientSessionInternal
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/ClientSessionPacketHandler.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -13,21 +13,21 @@
package org.hornetq.core.client.impl;
-import static org.hornetq.core.protocol.core.PacketImpl.EXCEPTION;
-import static org.hornetq.core.protocol.core.PacketImpl.SESS_RECEIVE_CONTINUATION;
-import static org.hornetq.core.protocol.core.PacketImpl.SESS_RECEIVE_LARGE_MSG;
-import static org.hornetq.core.protocol.core.PacketImpl.SESS_RECEIVE_MSG;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.PacketImpl;
-import org.hornetq.core.protocol.core.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionProducerCreditsMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveMessage;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
/**
*
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -28,10 +28,10 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveMessage;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.ConcurrentHashSet;
/**
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -39,15 +39,15 @@
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.PacketDecoder;
-import org.hornetq.core.protocol.core.PacketImpl;
-import org.hornetq.core.protocol.core.RemotingConnectionImpl;
-import org.hornetq.core.protocol.core.wireformat.CreateSessionMessage;
-import org.hornetq.core.protocol.core.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.Ping;
+import org.hornetq.core.protocol.core.impl.PacketDecoder;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.version.Version;
+import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -30,7 +30,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UTF8Util;
import org.jboss.netty.buffer.ChannelBuffer;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -46,7 +46,6 @@
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
@@ -54,6 +53,7 @@
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -27,7 +27,7 @@
import org.hornetq.core.client.impl.LargeMessageBufferInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
-import org.hornetq.core.protocol.core.PacketImpl;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.TypedProperties;
@@ -48,10 +48,12 @@
public abstract class MessageImpl implements MessageInternal
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(MessageImpl.class);
public static final SimpleString HDR_ROUTE_TO_IDS = new
SimpleString("_HQ_ROUTE_TO");
+
+ public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
protected long messageID;
@@ -165,7 +167,7 @@
int bodyPos = endOfBodyPosition == -1 ? buffer.writerIndex() : endOfBodyPosition;
- int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
+ int bodySize = bodyPos - BUFFER_HEADER_SPACE - DataConstants.SIZE_INT;
return DataConstants.SIZE_INT + bodySize + DataConstants.SIZE_INT +
headersPropsSize;
}
@@ -212,7 +214,7 @@
{
if (buffer instanceof LargeMessageBufferInternal == false)
{
- bodyBuffer = new
ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
+ bodyBuffer = new ResetLimitWrappedHornetQBuffer(BUFFER_HEADER_SPACE +
DataConstants.SIZE_INT,
buffer,
this);
}
@@ -393,7 +395,7 @@
{
encodeToBuffer();
- buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition -
PacketImpl.PACKET_HEADERS_SIZE);
+ buff.writeBytes(buffer, BUFFER_HEADER_SPACE, endOfMessagePosition -
BUFFER_HEADER_SPACE);
}
// Decode from journal or paging
@@ -403,11 +405,11 @@
endOfBodyPosition = buff.readInt();
- endOfMessagePosition = buff.getInt(endOfBodyPosition -
PacketImpl.PACKET_HEADERS_SIZE + start);
+ endOfMessagePosition = buff.getInt(endOfBodyPosition - BUFFER_HEADER_SPACE +
start);
- int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
+ int length = endOfMessagePosition - BUFFER_HEADER_SPACE;
- buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+ buffer.setIndex(0, BUFFER_HEADER_SPACE);
buffer.writeBytes(buff, start, length);
@@ -825,7 +827,7 @@
}
// write it
- buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
+ buffer.setInt(BUFFER_HEADER_SPACE, endOfBodyPosition);
// Position at end of body and skip past the message end position int.
// check for enough room in the buffer even tho it is dynamic
@@ -855,7 +857,7 @@
private void decode()
{
- endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
+ endOfBodyPosition = buffer.getInt(BUFFER_HEADER_SPACE);
buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
@@ -873,7 +875,7 @@
// There's a bug in netty which means a dynamic buffer won't resize until
you write a byte
buffer.writeByte((byte)0);
- int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
+ int limit = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
buffer.setIndex(limit, limit);
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-01-22
10:16:25 UTC (rev 8834)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -55,7 +55,6 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.protocol.core.wireformat.XidCodecSupport;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.server.JournalType;
@@ -74,6 +73,7 @@
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.UUID;
+import org.hornetq.utils.XidCodecSupport;
/**
*
Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark
(from rev 8832, trunk/src/main/org/hornetq/core/protocol/aardvark)
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl (from
rev 8832, trunk/src/main/org/hornetq/core/protocol/aardvark/impl)
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-21
20:09:23 UTC (rev 8832)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -1,139 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.aardvark.impl;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.CloseListener;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.spi.core.remoting.Connection;
-
-/**
- * A AardvarkConnection
- *
- * @author Tim Fox
- *
- *
- */
-public class AardvarkConnection implements RemotingConnection
-{
- private static final Logger log = Logger.getLogger(AardvarkConnection.class);
-
- private final AardvarkProtocolManager manager;
-
- private final Connection transportConnection;
-
- AardvarkConnection(final Connection transportConnection, final AardvarkProtocolManager
manager)
- {
- this.transportConnection = transportConnection;
-
- this.manager = manager;
- }
-
- public void addCloseListener(CloseListener listener)
- {
- }
-
- public void addFailureListener(FailureListener listener)
- {
- }
-
- public boolean checkDataReceived()
- {
- return true;
- }
-
- public HornetQBuffer createBuffer(int size)
- {
- return HornetQBuffers.dynamicBuffer(size);
- }
-
- public void destroy()
- {
- }
-
- public void disconnect()
- {
- }
-
- public void fail(HornetQException me)
- {
- }
-
- public void flush()
- {
- }
-
- public List<FailureListener> getFailureListeners()
- {
- return Collections.EMPTY_LIST;
- }
-
- public Object getID()
- {
- return transportConnection.getID();
- }
-
- public String getRemoteAddress()
- {
- return transportConnection.getRemoteAddress();
- }
-
- public Connection getTransportConnection()
- {
- return transportConnection;
- }
-
- public boolean isClient()
- {
- return false;
- }
-
- public boolean isDestroyed()
- {
- return false;
- }
-
- public boolean removeCloseListener(CloseListener listener)
- {
- return false;
- }
-
- public boolean removeFailureListener(FailureListener listener)
- {
- return false;
- }
-
- public void setFailureListeners(List<FailureListener> listeners)
- {
- }
-
- public void bufferReceived(Object connectionID, HornetQBuffer buffer)
- {
- manager.handleBuffer(this, buffer);
- }
-
- public int isReadyToHandle(HornetQBuffer buffer)
- {
- return -1;
- }
-
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
(from rev 8832,
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkConnection.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A AardvarkConnection
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkConnection implements RemotingConnection
+{
+ private static final Logger log = Logger.getLogger(AardvarkConnection.class);
+
+ private final AardvarkProtocolManager manager;
+
+ private final Connection transportConnection;
+
+ AardvarkConnection(final Connection transportConnection, final AardvarkProtocolManager
manager)
+ {
+ this.transportConnection = transportConnection;
+
+ this.manager = manager;
+ }
+
+ public void addCloseListener(CloseListener listener)
+ {
+ }
+
+ public void addFailureListener(FailureListener listener)
+ {
+ }
+
+ public boolean checkDataReceived()
+ {
+ return true;
+ }
+
+ public HornetQBuffer createBuffer(int size)
+ {
+ return HornetQBuffers.dynamicBuffer(size);
+ }
+
+ public void destroy()
+ {
+ }
+
+ public void disconnect()
+ {
+ }
+
+ public void fail(HornetQException me)
+ {
+ }
+
+ public void flush()
+ {
+ }
+
+ public List<FailureListener> getFailureListeners()
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ public Object getID()
+ {
+ return transportConnection.getID();
+ }
+
+ public String getRemoteAddress()
+ {
+ return transportConnection.getRemoteAddress();
+ }
+
+ public Connection getTransportConnection()
+ {
+ return transportConnection;
+ }
+
+ public boolean isClient()
+ {
+ return false;
+ }
+
+ public boolean isDestroyed()
+ {
+ return false;
+ }
+
+ public boolean removeCloseListener(CloseListener listener)
+ {
+ return false;
+ }
+
+ public boolean removeFailureListener(FailureListener listener)
+ {
+ return false;
+ }
+
+ public void setFailureListeners(List<FailureListener> listeners)
+ {
+ }
+
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ {
+ manager.handleBuffer(this, buffer);
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java 2010-01-21
20:09:23 UTC (rev 8832)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -1,171 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.aardvark.impl;
-
-import java.util.List;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.ServerSession;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.spi.core.protocol.ConnectionEntry;
-import org.hornetq.spi.core.protocol.ProtocolManager;
-import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.spi.core.protocol.SessionCallback;
-import org.hornetq.spi.core.remoting.Connection;
-
-/**
- * AardvarkProtocolManager
- *
- * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
- *
- * @author Tim Fox
- *
- *
- */
-public class AardvarkProtocolManager implements ProtocolManager
-{
- private static final Logger log = Logger.getLogger(AardvarkProtocolManager.class);
-
- private final HornetQServer server;
-
- public AardvarkProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
- {
- this.server = server;
- }
-
- public ConnectionEntry createConnectionEntry(final Connection connection)
- {
- AardvarkConnection conn = new AardvarkConnection(connection, this);
-
- return new ConnectionEntry(conn, 0, 0);
- }
-
- public void handleBuffer(final RemotingConnection conn, final HornetQBuffer buffer)
- {
- try
- {
- ServerSession session = server.createSession("aardvark",
- null,
- null,
- Integer.MAX_VALUE,
- conn,
- true,
- true,
- true,
- false);
-
- session.setCallback(new
AardvarkSessionCallback(conn.getTransportConnection()));
-
- final SimpleString name = new SimpleString("hornetq.aardvark");
-
- session.createQueue(name, name, null, false, false);
-
- session.createConsumer(0, name, null, false);
-
- session.receiveConsumerCredits(0, -1); // No flow control
-
- session.start();
-
- ServerMessage message = new ServerMessageImpl(0, 1000);
-
- message.setAddress(name);
-
- message.getBodyBuffer().writeUTF("GIRAFFE\n");
-
- session.send(message);
-
- session.start();
-
- session.closeConsumer(0);
-
- session.deleteQueue(name);
-
- session.close();
- }
- catch (Exception e)
- {
- log.error("Failed to create session", e);
- }
- }
-
- private class AardvarkSessionCallback implements SessionCallback
- {
- private final Connection connection;
-
- AardvarkSessionCallback(final Connection connection)
- {
- this.connection = connection;
- }
-
- public void closed()
- {
- }
-
- public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize,
int deliveryCount)
- {
- return 0;
- }
-
- public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
- {
- return 0;
- }
-
- public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
- {
- HornetQBuffer buffer = message.getBodyBuffer();
-
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
-
- connection.write(buffer);
-
- return -1;
- }
-
- public void sendProducerCreditsMessage(int credits, SimpleString address, int
offset)
- {
- }
-
- }
-
- public void bufferReceived(Object connectionID, HornetQBuffer buffer)
- {
- }
-
- public int isReadyToHandle(HornetQBuffer buffer)
- {
- //Look for a new-line
-
- //BTW this is very inefficient - in a real protocol you'd want to do this
better
-
- for (int i = buffer.readerIndex(); i < buffer.writerIndex(); i++)
- {
- byte b = buffer.getByte(i);
-
- if (b == (byte)'\n')
- {
- return buffer.writerIndex();
- }
- }
-
- return -1;
- }
-
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
(from rev 8832,
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * AardvarkProtocolManager
+ *
+ * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkProtocolManager implements ProtocolManager
+{
+ private static final Logger log = Logger.getLogger(AardvarkProtocolManager.class);
+
+ private final HornetQServer server;
+
+ public AardvarkProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
+ {
+ this.server = server;
+ }
+
+ public ConnectionEntry createConnectionEntry(final Connection connection)
+ {
+ AardvarkConnection conn = new AardvarkConnection(connection, this);
+
+ return new ConnectionEntry(conn, 0, 0);
+ }
+
+ public void removeHandler(String name)
+ {
+ }
+
+ public void handleBuffer(final RemotingConnection conn, final HornetQBuffer buffer)
+ {
+ try
+ {
+ ServerSession session = server.createSession("aardvark",
+ null,
+ null,
+ Integer.MAX_VALUE,
+ conn,
+ true,
+ true,
+ true,
+ false,
+ new
AardvarkSessionCallback(conn.getTransportConnection()));
+
+ final SimpleString name = new SimpleString("hornetq.aardvark");
+
+ session.createQueue(name, name, null, false, false);
+
+ session.createConsumer(0, name, null, false);
+
+ session.receiveConsumerCredits(0, -1); // No flow control
+
+ session.start();
+
+ ServerMessage message = new ServerMessageImpl(0, 1000);
+
+ message.setAddress(name);
+
+ message.getBodyBuffer().writeUTF("GIRAFFE\n");
+
+ session.send(message);
+
+ session.start();
+
+ session.closeConsumer(0);
+
+ session.deleteQueue(name);
+
+ session.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to create session", e);
+ }
+ }
+
+ private class AardvarkSessionCallback implements SessionCallback
+ {
+ private final Connection connection;
+
+ AardvarkSessionCallback(final Connection connection)
+ {
+ this.connection = connection;
+ }
+
+ public void closed()
+ {
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize,
int deliveryCount)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+ {
+ HornetQBuffer buffer = message.getBodyBuffer();
+
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
+
+ connection.write(buffer);
+
+ return -1;
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int
offset)
+ {
+ }
+
+ }
+
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ {
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ //Look for a new-line
+
+ //BTW this is very inefficient - in a real protocol you'd want to do this
better
+
+ for (int i = buffer.readerIndex(); i < buffer.writerIndex(); i++)
+ {
+ byte b = buffer.getByte(i);
+
+ if (b == (byte)'\n')
+ {
+ return buffer.writerIndex();
+ }
+ }
+
+ return -1;
+ }
+
+}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java 2010-01-21
20:09:23 UTC (rev 8832)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -1,38 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.aardvark.impl;
-
-import java.util.List;
-
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.spi.core.protocol.ProtocolManager;
-import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
-
-/**
- * A AardvarkProtocolManagerFactory
- *
- * @author Tim Fox
- *
- *
- */
-public class AardvarkProtocolManagerFactory implements ProtocolManagerFactory
-{
-
- public ProtocolManager createProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
- {
- return new AardvarkProtocolManager(server, interceptors);
- }
-
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
(from rev 8832,
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManagerFactory.java 2010-01-22
10:19:02 UTC (rev 8835)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.aardvark.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A AardvarkProtocolManagerFactory
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class AardvarkProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+ public ProtocolManager createProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
+ {
+ return new AardvarkProtocolManager(server, interceptors);
+ }
+
+}