[hornetq-commits] JBoss hornetq SVN: r9368 - in trunk: examples/common and 22 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 29 04:46:12 EDT 2010


Author: ataylor
Date: 2010-06-29 04:46:10 -0400 (Tue, 29 Jun 2010)
New Revision: 9368

Added:
   trunk/examples/core/twitter-connector/
   trunk/examples/core/twitter-connector/build.bat
   trunk/examples/core/twitter-connector/build.sh
   trunk/examples/core/twitter-connector/build.xml
   trunk/examples/core/twitter-connector/readme.html
   trunk/examples/core/twitter-connector/server0/
   trunk/examples/core/twitter-connector/server0/client-jndi.properties
   trunk/examples/core/twitter-connector/server0/hornetq-beans.xml
   trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml
   trunk/examples/core/twitter-connector/server0/hornetq-jms.xml
   trunk/examples/core/twitter-connector/server0/hornetq-users.xml
   trunk/examples/core/twitter-connector/src/
   trunk/examples/core/twitter-connector/src/org/
   trunk/examples/core/twitter-connector/src/org/hornetq/
   trunk/examples/core/twitter-connector/src/org/hornetq/core/
   trunk/examples/core/twitter-connector/src/org/hornetq/core/example/
   trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
   trunk/licenses/LICENSE_twitter4j.txt
   trunk/src/main/org/hornetq/core/config/TwitterConnectorConfiguration.java
   trunk/src/main/org/hornetq/core/twitter/
   trunk/src/main/org/hornetq/core/twitter/TwitterConnectorService.java
   trunk/src/main/org/hornetq/core/twitter/TwitterConstants.java
   trunk/src/main/org/hornetq/core/twitter/impl/
   trunk/src/main/org/hornetq/core/twitter/impl/TwitterConnectorServiceImpl.java
   trunk/tests/src/org/hornetq/tests/integration/twitter/
   trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
Modified:
   trunk/.classpath
   trunk/build-hornetq.xml
   trunk/examples/common/build.xml
   trunk/pom.xml
   trunk/src/config/common/schema/hornetq-configuration.xsd
   trunk/src/config/jboss-as-4/build.xml
   trunk/src/config/jboss-as-5/build.xml
   trunk/src/main/org/hornetq/core/config/Configuration.java
   trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-189 - Create twitter bridge


Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/.classpath	2010-06-29 08:46:10 UTC (rev 9368)
@@ -1,6 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
+	<classpathentry kind="src" path="examples/core/twitter-connector/src"/>
 	<classpathentry kind="src" path="src/config/common"/>
 	<classpathentry kind="src" path="build/src"/>
 	<classpathentry kind="src" path="tests/jms-tests/config"/>
@@ -124,5 +125,6 @@
 	<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jboss-security-spi.jar"/>
 	<classpathentry kind="lib" path="thirdparty/wutka-dtdparser/lib/dtdparser121.jar"/>
 	<classpathentry kind="lib" path="thirdparty/org/jboss/ejb3/lib/jboss-ejb3-ext-api.jar"/>
+	<classpathentry kind="lib" path="thirdparty/org/twitter4j/lib/twitter4j-core.jar"/>
 	<classpathentry kind="output" path="eclipse-output"/>
 </classpath>

Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/build-hornetq.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -78,6 +78,7 @@
    <property name="service.sources.sar.name" value="hornetq-service-sources.sar"/>
    <property name="resources.jar.name" value="hornetq-resources.jar"/>
    <property name="resources.sources.jar.name" value="hornetq-resources-sources.jar"/>
+   <property name="twitter4j.jar.name" value="twitter4j-core.jar"/>
 
    <!--source and build dirs-->
    <property name="build.dir" value="build"/>
@@ -185,6 +186,7 @@
 
    <path id="core.compilation.classpath">
       <path refid="org.jboss.netty.classpath"/>
+      <path refid="org.twitter4j.classpath"/>
    </path>
 
    <path id="jms.compilation.classpath">
@@ -231,6 +233,7 @@
       <path refid="jboss.integration.compilation.classpath"/>
       <path refid="bootstrap.compilation.classpath"/>
       <path refid="junit.junit.classpath"/>
+      <path refid="org.twitter4j.classpath"/>
       <path location="${build.jars.dir}/${ra.jar.name}"/>
       <path location="${build.jars.dir}/${jms.jar.name}"/>
       <path location="${build.jars.dir}/${jboss.integration.jar.name}"/>
@@ -288,6 +291,7 @@
  	  <!-- we must include Apache commons logging   -->
  	  <!-- as a transitive dependency from JBoss TM -->
  	  <path refid="apache.logging.classpath"/>
+      <path refid="org.twitter4j.classpath"/>
    </path>
 
    <path id="emma.unit.test.execution.classpath">
@@ -950,6 +954,7 @@
          </fileset>
       </copy>
       <copy file="${org.jboss.netty.lib}/${netty.jar.name}" tofile="${build.distro.lib.dir}/netty.jar"/>
+      <copy file="${org.twitter4j.lib}/${twitter4j.jar.name}" tofile="${build.distro.lib.dir}/${twitter4j.jar.name}"/>
       <copy todir="${build.distro.config.dir}">
          <fileset dir="${src.config.dir}">
             <include name="*.xml"/>

Modified: trunk/examples/common/build.xml
===================================================================
--- trunk/examples/common/build.xml	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/examples/common/build.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -101,6 +101,7 @@
       <fileset dir="${jars.dir}">
          <include name="org/jboss/naming/lib/jnpserver.jar"/>
          <include name="org/jboss/netty/lib/netty*.jar"/>
+         <include name="org/twitter4j/lib/twitter4j*.jar"/>
       </fileset>
    </path>
 

Added: trunk/examples/core/twitter-connector/build.bat
===================================================================
--- trunk/examples/core/twitter-connector/build.bat	                        (rev 0)
+++ trunk/examples/core/twitter-connector/build.bat	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,13 @@
+ at echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+   rem running from TRUNK
+   call ..\..\..\src\bin\build.bat %*
+) else (
+   rem running from the distro
+   call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="

Added: trunk/examples/core/twitter-connector/build.sh
===================================================================
--- trunk/examples/core/twitter-connector/build.sh	                        (rev 0)
+++ trunk/examples/core/twitter-connector/build.sh	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,15 @@
+#!/bin/sh -x
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+   # running from TRUNK
+   ../../../src/bin/build.sh "$@"
+else
+   # running from the distro
+   ../../../bin/build.sh "$@"
+fi
+
+
+

Added: trunk/examples/core/twitter-connector/build.xml
===================================================================
--- trunk/examples/core/twitter-connector/build.xml	                        (rev 0)
+++ trunk/examples/core/twitter-connector/build.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+      <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+      ]>
+<!--
+  ~ Copyright 2009 Red Hat, Inc.
+  ~ Red Hat licenses this file to you under the Apache License, version
+  ~ 2.0 (the "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+  ~ implied.  See the License for the specific language governing
+  ~ permissions and limitations under the License.
+  -->
+<project default="run" name="HornetQ Twitter Connector Service Example">
+
+   <import file="../../common/build.xml"/>
+   <property environment='env'/>
+
+   <target name="check" unless="env.TWITTER_USERNAME">
+      <echo>**************************************************************************</echo>
+      <echo>* Please set the twitter account:                                        *</echo>
+      <echo>* ./build.sh -Denv.TWITTER_USERNAME=user -Denv.TWITTER_PASSWORD=password *</echo>
+      <echo>**************************************************************************</echo>
+      <fail message="run example failed"/>
+   </target>
+      
+   <target name="run" depends="check">
+      <antcall target="runExample">
+         <param name="example.classname" value="org.hornetq.core.example.TwitterConnectorExample"/>
+	 <!-- HTTP proxy settings
+         <param name="server.args" value="-Dtwitter4j.http.proxyHost=your.proxy.server -Dtwitter4j.http.proxyPort=your.proxy.port"/>
+	 -->
+         <param name="server.args" value="-Dtwitter.username=${env.TWITTER_USERNAME} -Dtwitter.password=${env.TWITTER_PASSWORD}"/>
+      </antcall>
+   </target>
+
+   <target name="runRemote">
+      <antcall target="runExample">
+         <param name="example.classname" value="org.hornetq.core.example.TwitterConnectorExample"/>
+         <param name="hornetq.example.runServer" value="false"/>
+      </antcall>
+   </target>
+
+</project>

Added: trunk/examples/core/twitter-connector/readme.html
===================================================================
--- trunk/examples/core/twitter-connector/readme.html	                        (rev 0)
+++ trunk/examples/core/twitter-connector/readme.html	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,136 @@
+<html>
+  <head>
+    <title>HornetQ Twitter Connector Service Example</title>
+    <link rel="stylesheet" type="text/css" href="../../common/common.css" />
+    <link rel="stylesheet" type="text/css" href="../../common/prettify.css" />
+    <script type="text/javascript" src="../../common/prettify.js"></script>
+  </head>
+  <body onload="prettyPrint()">
+     <h1>Twitter Connector Service Example</h1>
+
+     <p>This example shows you how to configure HornetQ to use the Twitter Connector Service.</p>
+     
+     <p>HornetQ supports 2 types of Twitter connector, incoming and outgoing.
+     Incoming connector consumes from twitter and forwards to a configurable address.
+     Outgoing connector consumes from a configurable address and forwards to twitter.
+     </p>
+     
+     <p>In this example, incoming connector and outgoing conenctor is related to same twitter account.
+     So if you send a message to an outgoing address, outgoing connector forwards it to twitter,
+     and then incoming connector consumes it and forwards to incoming address.</p>
+     <p>All you need to do is edit the server0/hornetq-configuration.xml to use twitter connector.</p>
+ 
+ 
+      <pre class="prettyprint">
+      <code>
+      &lt;twitter-connectors&gt;
+         &lt;!-- consumes from twitter and forwards to queue.incomingQueue --&gt;
+         &lt;incoming-twitter-connector name=&quot;my-incoming-tweets&quot;&gt;
+            &lt;queue-name&gt;queue.incomingQueue&lt;/queue-name&gt;
+            &lt;twitter-account&gt;
+               &lt;username&gt;${twitter.username}&lt;/username&gt;
+               &lt;password&gt;${twitter.password}&lt;/password&gt;
+            &lt;/twitter-account&gt;
+            &lt;interval-seconds&gt;60&lt;/interval-seconds&gt;
+         &lt;/incoming-twitter-connector&gt;
+
+         &lt;!-- consumes from queue.outgoingQueue and forwards to twitter --&gt;
+         &lt;outgoing-twitter-connector name=&quot;my-outgoing-tweets&quot;&gt;
+            &lt;queue-name&gt;queue.outgoingQueue&lt;/queue-name&gt;
+            &lt;twitter-account&gt;
+               &lt;username&gt;${twitter.username}&lt;/username&gt;
+               &lt;password&gt;${twitter.password}&lt;/password&gt;
+            &lt;/twitter-account&gt;
+         &lt;/outgoing-twitter-connector&gt;
+      &lt;/twitter-connectors&gt;
+      </code>
+      </pre>
+     
+     <h2>Example step-by-step</h2>
+     <p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory.
+     Also please remember to set the twitter user/pass in environment variable Before you run.</i></p>
+
+     on Linux:     
+     <pre class="prettyprint">
+     <code>export TWITTER_USERNAME=your_twitter_username
+export TWITTER_PASSWORD=your_twitter_password
+./build.sh</code>
+     </pre>
+
+	 on Windows:	
+     <pre class="prettyprint">
+     <code>set TWITTER_USERNAME=your_twitter_username
+set TWITTER_PASSWORD=your_twitter_password
+build.bat</code>
+     </pre>
+     </p>
+     
+     <ol>
+        <li>First we need to create a ClientSessionFactory with Netty transport configuration</li>
+        <pre class="prettyprint">
+           <code>csf = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));</code>
+        </pre>
+
+        <li>We create a core session with auto-commit mode</li>
+        <pre class="prettyprint">
+           <code>session = csf.createSession(true,true);</code>
+        </pre>
+
+        <li>We Create a core producer for queue.outgoingQueue</li>
+        <pre class="prettyprint">
+           <code>ClientProducer cp = session.createProducer(OUTGOING_QUEUE);</code>
+        </pre>
+
+        <li>We create a core consumer for queue.incomingQueue</li>
+        <pre class="prettyprint">
+           <code>ClientConsumer cc = session.createConsumer(INCOMING_QUEUE);</code>
+        </pre>
+
+        <li>We create a core message that we are going to send</li>
+        <pre class="prettyprint">
+           <code>ClientMessage cm = session.createMessage(org.hornetq.api.core.Message.TEXT_TYPE,true);
+String testMessage = System.currentTimeMillis() + ": twitter connector test example";
+cm.getBodyBuffer().writeString(testMessage);</code>
+        </pre>
+
+        <li>We send the message to queue.outgoingQueue</li>
+        <pre class="prettyprint">
+          <code>cp.send(cm);</code>
+       </pre>
+
+        <li>We start the session</li>
+        <pre class="prettyprint">
+           <code>session.start();</code>
+        </pre>
+
+        <li>We will receive a message from queue.incomingQueue.
+        Outgoing connector forwards a message(we sent before) to twitter immediately.
+        Since incoming connector consumes from twitter and forwards to queue.incomingQueue
+        every 60 seconds, It will be received in 60+x seconds.</li>
+        <pre class="prettyprint">
+           <code>ClientMessage received = cc.receive(70 * 1000);
+received.acknowledge();
+String receivedText = received.getBodyBuffer().readString();</code>
+        </pre>
+
+        <li>And finally, remember to close core session and ClientSessionFactory in a <code>finally</code> block.</li>
+
+        <pre class="prettyprint">
+           <code>finally
+{
+    if(session != null)
+    {
+       session.close();
+    }
+    if(csf != null)
+    {
+       csf.close();
+    }
+}</code>
+        </pre>
+
+
+
+     </ol>
+  </body>
+</html>

Added: trunk/examples/core/twitter-connector/server0/client-jndi.properties
===================================================================
--- trunk/examples/core/twitter-connector/server0/client-jndi.properties	                        (rev 0)
+++ trunk/examples/core/twitter-connector/server0/client-jndi.properties	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Added: trunk/examples/core/twitter-connector/server0/hornetq-beans.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-beans.xml	                        (rev 0)
+++ trunk/examples/core/twitter-connector/server0/hornetq-beans.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+   <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+   <!-- JNDI server. Disable this if you don't want JNDI -->
+   <bean name="JNDIServer" class="org.jnp.server.Main">
+      <property name="namingInfo">
+         <inject bean="Naming"/>
+      </property>
+      <property name="port">1099</property>
+      <property name="bindAddress">localhost</property>
+      <property name="rmiPort">1098</property>
+      <property name="rmiBindAddress">localhost</property>
+   </bean>
+   
+   <!-- MBean server -->
+   <bean name="MBeanServer" class="javax.management.MBeanServer">
+      <constructor factoryClass="java.lang.management.ManagementFactory"
+                   factoryMethod="getPlatformMBeanServer"/>
+   </bean> 
+
+   <!-- The core configuration -->
+   <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+   <!-- The security manager -->
+   <bean name="HornetQSecurityManager" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+
+   <!-- The core server -->
+   <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+      <constructor>
+         <parameter>
+            <inject bean="Configuration"/>
+         </parameter>
+         <parameter>
+            <inject bean="MBeanServer"/>
+         </parameter>
+         <parameter>
+            <inject bean="HornetQSecurityManager"/>
+         </parameter>        
+      </constructor>
+      <start ignored="true"/>
+      <stop ignored="true"/>
+   </bean>
+   
+   <!-- The JMS server -->
+   <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+      <constructor>         
+         <parameter>
+            <inject bean="HornetQServer"/>
+         </parameter>
+      </constructor>
+   </bean>
+
+</deployment>

Added: trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml	                        (rev 0)
+++ trunk/examples/core/twitter-connector/server0/hornetq-configuration.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,64 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+
+   <!-- Connectors -->
+
+   <connectors>
+      <connector name="netty-connector">
+         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+      </connector>
+   </connectors>
+   
+   <!-- Acceptors -->
+   <acceptors>
+      <acceptor name="netty-acceptor">
+         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>        
+      </acceptor>
+   </acceptors>
+
+   <!-- Other config -->
+
+   <security-settings>
+      <!--security for example queue-->
+      <security-setting match="queue.incomingQueue">
+         <permission type="consume" roles="guest"/>
+         <permission type="send" roles="guest"/>
+      </security-setting>
+      <security-setting match="queue.outgoingQueue">
+         <permission type="consume" roles="guest"/>
+         <permission type="send" roles="guest"/>
+      </security-setting>
+   </security-settings>
+
+   <queues>
+      <queue name="queue.incomingQueue">
+         <address>queue.incomingQueue</address>
+      </queue>
+      <queue name="queue.outgoingQueue">
+         <address>queue.outgoingQueue</address>
+      </queue>
+   </queues>
+   
+   <twitter-connectors>
+      <!-- consumes from twitter and forwards to queue.incomingQueue -->
+      <incoming-twitter-connector name="my-incoming-tweets">
+         <queue-name>queue.incomingQueue</queue-name>
+         <twitter-account>
+            <username>${twitter.username}</username>
+            <password>${twitter.password}</password>
+         </twitter-account>
+         <interval-seconds>60</interval-seconds>
+      </incoming-twitter-connector>
+      
+      <!-- consumes from queue.outgoingQueue and forwards to twitter -->
+      <outgoing-twitter-connector name="my-outgoing-tweets">
+         <queue-name>queue.outgoingQueue</queue-name>
+         <twitter-account>
+            <username>${twitter.username}</username>
+            <password>${twitter.password}</password>
+         </twitter-account>
+      </outgoing-twitter-connector>
+   </twitter-connectors>
+   
+</configuration>

Added: trunk/examples/core/twitter-connector/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-jms.xml	                        (rev 0)
+++ trunk/examples/core/twitter-connector/server0/hornetq-jms.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,19 @@
+<configuration xmlns="urn:hornetq"
+            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+   <!--the connection factory used by the example-->
+   <connection-factory name="ConnectionFactory">
+      <connectors>
+         <connector-ref connector-name="netty-connector"/>
+      </connectors>
+      <entries>
+         <entry name="ConnectionFactory"/>
+      </entries>
+   </connection-factory>
+
+   <!--the queue used by the example-->
+   <queue name="exampleQueue">
+      <entry name="/queue/exampleQueue"/>
+   </queue>
+
+</configuration>
\ No newline at end of file

Added: trunk/examples/core/twitter-connector/server0/hornetq-users.xml
===================================================================
--- trunk/examples/core/twitter-connector/server0/hornetq-users.xml	                        (rev 0)
+++ trunk/examples/core/twitter-connector/server0/hornetq-users.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+   <!-- the default user.  this is used where username is null-->
+   <defaultuser name="guest" password="guest">
+      <role name="guest"/>
+   </defaultuser>
+</configuration>
\ No newline at end of file

Added: trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java
===================================================================
--- trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java	                        (rev 0)
+++ trunk/examples/core/twitter-connector/src/org/hornetq/core/example/TwitterConnectorExample.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.example;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.common.example.HornetQExample;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+
+/**
+ * A simple example of using twitter connector service.
+ *
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public class TwitterConnectorExample extends HornetQExample
+{
+   private static final String INCOMING_QUEUE = "queue.incomingQueue";
+   private static final String OUTGOING_QUEUE = "queue.outgoingQueue";
+   
+   public static void main(final String[] args)
+   {
+      new TwitterConnectorExample().run(args);
+   }
+
+   @Override
+   public boolean runExample() throws Exception
+   {
+      ClientSessionFactory csf = null;
+      ClientSession session = null;
+      try
+      {
+         // Step 1. Create a ClientSessionFactory.
+         csf = HornetQClient.createClientSessionFactory (new TransportConfiguration(NettyConnectorFactory.class.getName()));
+
+         // Step 2. Create a core session.
+         session = csf.createSession(true,true);
+
+         // Step 3. Create a core producer for queue.outgoingQueue.
+         ClientProducer cp = session.createProducer(OUTGOING_QUEUE);
+
+         // Step 4. Create a core consumer for queue.incomingQueue.
+         ClientConsumer cc = session.createConsumer(INCOMING_QUEUE);
+
+         // Step 5. Create a core message.
+         ClientMessage cm = session.createMessage(org.hornetq.api.core.Message.TEXT_TYPE,true);
+         String testMessage = System.currentTimeMillis() + ": ### Hello, HornetQ fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen before ;-) ###";
+         cm.getBodyBuffer().writeString(testMessage);
+
+         // Step 6. Send a message to queue.outgoingQueue.
+         cp.send(cm);
+         System.out.println("#### Sent a message to " + OUTGOING_QUEUE + ": " + testMessage);
+         
+         // Step 7. Start the session.
+         session.start();
+
+         // Step 8. Receive a message from queue.incomingQueue.
+         //         Outgoing connector forwards a message(sent at Step 6.) to twitter immediately.
+         //         Since incoming connector consumes from twitter and forwards to queue.incomingQueue
+         //         every 60 seconds, It will be received in 60+x seconds. 
+         System.out.println("#### A message will be received in 60 seconds. Please wait...");
+         ClientMessage received = cc.receive(70 * 1000);
+         received.acknowledge();
+         String receivedText = received.getBodyBuffer().readString();
+         System.out.println("#### Received a message from " + INCOMING_QUEUE + ": " + receivedText);
+
+         if(!receivedText.equals(testMessage))
+         {
+            return false;
+         }
+         
+         return true;
+      }
+      finally
+      {
+         // Step 9. Be sure to close some resources. 
+         if(session != null)
+         {
+            session.close();
+         }
+         if(csf != null)
+         {
+            csf.close();
+         }
+      }
+   }
+
+}

Added: trunk/licenses/LICENSE_twitter4j.txt
===================================================================
--- trunk/licenses/LICENSE_twitter4j.txt	                        (rev 0)
+++ trunk/licenses/LICENSE_twitter4j.txt	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,26 @@
+Twitter4J includes software from JSON.org to parse JSON response from the Twitter API. You can see the license term at http://www.JSON.org/license.html
+
+Copyright (c) 2007-2010, Yusuke Yamamoto
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright
+      notice, this list of conditions and the following disclaimer in the
+      documentation and/or other materials provided with the distribution.
+    * Neither the name of the Yusuke Yamamoto nor the
+      names of its contributors may be used to endorse or promote products
+      derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY Yusuke Yamamoto ``AS IS'' AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL Yusuke Yamamoto BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/pom.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -235,6 +235,12 @@
          <artifactId>jboss-logging-spi</artifactId>
          <version>2.1.0.GA</version>
       </dependency>
+      <!--needed to compile twitter support-->
+      <dependency>
+         <groupId>org.twitter4j</groupId>
+           <artifactId>twitter4j-core</artifactId>
+           <version>2.1.2</version>
+       </dependency>
       <!-- needed to compile the tests-->
       <dependency>
         <groupId>junit</groupId>

Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd	2010-06-29 08:46:10 UTC (rev 9368)
@@ -180,6 +180,8 @@
                 </xsd:element>
                 <xsd:element maxOccurs="1" minOccurs="0" name="address-settings">
                 </xsd:element>
+                <xsd:element maxOccurs="1" minOccurs="0" name="twitter-connectors">
+                </xsd:element>
 			</xsd:all>
 		</xsd:complexType>
 	</xsd:element>
@@ -505,5 +507,59 @@
 		</xsd:restriction>
 	</xsd:simpleType>
 	
-	
+	<xsd:element name="twitter-connectors">
+	    <xsd:complexType>
+	    	<xsd:sequence>
+				<xsd:element maxOccurs="unbounded" minOccurs="0" name="incoming-twitter-connector">
+					<xsd:complexType>
+						<xsd:all>
+							<xsd:element maxOccurs="1" minOccurs="1" name="queue-name" type="xsd:string"/>
+							<xsd:element maxOccurs="1" minOccurs="0" name="interval-seconds" type="xsd:int"/>
+							<xsd:element maxOccurs="1" minOccurs="0" name="use-streaming">
+								<xsd:simpleType>
+									<xsd:restriction base="xsd:string">
+										<xsd:enumeration value="yes"/>
+										<xsd:enumeration value="no"/>
+									</xsd:restriction>
+								</xsd:simpleType>
+							</xsd:element>
+							<xsd:element maxOccurs="1" minOccurs="1" name="twitter-account">
+								<xsd:complexType>
+									<xsd:all>
+										<xsd:element maxOccurs="1" minOccurs="1" name="username" type="xsd:string"/> 
+										<xsd:element maxOccurs="1" minOccurs="1" name="password" type="xsd:string"/> 
+									</xsd:all>
+								</xsd:complexType>
+							</xsd:element>
+						</xsd:all>
+						<xsd:attribute name="name" type="xsd:string" use="required"/>
+					</xsd:complexType>
+				</xsd:element>
+				<xsd:element maxOccurs="unbounded" minOccurs="0" name="outgoing-twitter-connector">
+					<xsd:complexType>
+						<xsd:all>
+							<xsd:element maxOccurs="1" minOccurs="1" name="queue-name" type="xsd:string" />
+							<xsd:element maxOccurs="1" minOccurs="0" name="use-streaming">
+								<xsd:simpleType>
+									<xsd:restriction base="xsd:string">
+										<xsd:enumeration value="yes" />
+										<xsd:enumeration value="no" />
+									</xsd:restriction>
+								</xsd:simpleType>
+							</xsd:element>
+							<xsd:element maxOccurs="1" minOccurs="1" name="twitter-account">
+								<xsd:complexType>
+									<xsd:all>
+										<xsd:element maxOccurs="1" minOccurs="1" name="username" type="xsd:string"/> 
+										<xsd:element maxOccurs="1" minOccurs="1" name="password" type="xsd:string"/> 
+									</xsd:all>
+								</xsd:complexType>
+							</xsd:element>
+						</xsd:all>
+						<xsd:attribute name="name" type="xsd:string" use="required"/>
+					</xsd:complexType>
+				</xsd:element>
+			</xsd:sequence>
+		</xsd:complexType>
+	</xsd:element>
 </xsd:schema>

Modified: trunk/src/config/jboss-as-4/build.xml
===================================================================
--- trunk/src/config/jboss-as-4/build.xml	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/config/jboss-as-4/build.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -70,6 +70,7 @@
          <fileset dir="${lib.dir}">
             <include name="hornetq-*.jar"/>
             <include name="netty*.jar"/>
+            <include name="twitter4j*.jar"/>
          </fileset>
       </copy>
 

Modified: trunk/src/config/jboss-as-5/build.xml
===================================================================
--- trunk/src/config/jboss-as-5/build.xml	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/config/jboss-as-5/build.xml	2010-06-29 08:46:10 UTC (rev 9368)
@@ -69,6 +69,7 @@
          <fileset dir="${lib.dir}">
             <include name="hornetq-*.jar"/>
             <include name="netty*.jar"/>
+            <include name="twitter4j*.jar"/>
          </fileset>
       </copy>
       <property name="hornetq.sar.dir" value="${dest.dir}/deploy/hornetq.sar"/>

Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -822,4 +822,14 @@
     */
    Map<String, Set<Role>> getSecurityRoles();
 
+   /**
+    * 
+    * @param 
+    */
+   void setTwitterConnectorConfigurations(List<TwitterConnectorConfiguration> configs);
+   /**
+    * 
+    * @return 
+    */
+   List<TwitterConnectorConfiguration> getTwitterConnectorConfigurations();
 }

Added: trunk/src/main/org/hornetq/core/config/TwitterConnectorConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/TwitterConnectorConfiguration.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/config/TwitterConnectorConfiguration.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.config;
+
+import java.io.Serializable;
+
+/**
+ * A TwitterConnectorConfiguration
+ *
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ *
+ *
+ */
+public class TwitterConnectorConfiguration implements Serializable
+{
+   private static final long serialVersionUID = -641207073030767325L;
+
+   private String connectorName = null;
+
+   private boolean isIncoming = false;
+
+   private String userName = null;
+
+   private String password = null;
+
+   private String queueName = null;
+
+   private int intervalSeconds = 0;
+
+   public boolean isIncoming()
+   {
+      return isIncoming;
+   }
+   
+   public String getUserName()
+   {
+      return userName;
+   }
+
+   public String getPassword()
+   {
+      return password;
+   }
+
+   public String getQueueName()
+   {
+      return queueName;
+   }
+
+   public int getIntervalSeconds()
+   {
+      return intervalSeconds;
+   }
+
+   public String getConnectorName()
+   {
+      return connectorName;
+   }
+
+   /**
+    * @param isIncoming the isIncoming to set
+    */
+   public void setIncoming(boolean isIncoming)
+   {
+      this.isIncoming = isIncoming;
+   }
+   
+   /**
+    * @param userName the userName to set
+    */
+   public void setUserName(String userName)
+   {
+      this.userName = userName;
+   }
+
+   /**
+    * @param password the password to set
+    */
+   public void setPassword(String password)
+   {
+      this.password = password;
+   }
+
+   /**
+    * @param queueName the queueName to set
+    */
+   public void setQueueName(String queueName)
+   {
+      this.queueName = queueName;
+   }
+
+   /**
+    * @param intervalSeconds the intervalSeconds to set
+    */
+   public void setIntervalSeconds(int intervalSeconds)
+   {
+      this.intervalSeconds = intervalSeconds;
+   }
+
+   /**
+    * @param connectorName the connectorName to set
+    */
+   public void setConnectorName(String connectorName)
+   {
+      this.connectorName = connectorName;
+   }
+}

Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -30,6 +30,7 @@
 import org.hornetq.core.config.DiscoveryGroupConfiguration;
 import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.TwitterConnectorConfiguration;
 import org.hornetq.core.logging.impl.JULLogDelegateFactory;
 import org.hornetq.core.security.Role;
 import org.hornetq.core.server.JournalType;
@@ -322,6 +323,8 @@
 
    private Map<String, Set<Role>> securitySettings = new HashMap<String, Set<Role>>();
 
+   protected List<TwitterConnectorConfiguration> twitterConnectorConfigurations = new ArrayList<TwitterConnectorConfiguration>();
+   
    // Public -------------------------------------------------------------------------
 
    public boolean isClustered()
@@ -1316,4 +1319,14 @@
       this.securitySettings = securitySettings;
    }
 
+   public List<TwitterConnectorConfiguration> getTwitterConnectorConfigurations()
+   {
+      return this.twitterConnectorConfigurations;
+   }
+   
+   public void setTwitterConnectorConfigurations(final List<TwitterConnectorConfiguration> configs)
+   {
+      this.twitterConnectorConfigurations = configs; 
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -34,6 +34,7 @@
 import org.hornetq.core.config.DiscoveryGroupConfiguration;
 import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.TwitterConnectorConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.config.impl.FileConfiguration;
 import org.hornetq.core.config.impl.Validators;
@@ -546,7 +547,25 @@
       parseQueues(e, config);
 
       parseSecurity(e, config);
+      
+      NodeList incomingTwitterConnectors = e.getElementsByTagName("incoming-twitter-connector");
+      
+      for (int i = 0; i < incomingTwitterConnectors.getLength(); i++)
+      {
+         Element twitterConnectorNode = (Element)incomingTwitterConnectors.item(i);
 
+         parseTwitterConnector(twitterConnectorNode, config, true);
+      }
+
+      NodeList outgoingTwitterConnectors = e.getElementsByTagName("outgoing-twitter-connector");
+      
+      for (int i = 0; i < outgoingTwitterConnectors.getLength(); i++)
+      {
+         Element twitterConnectorNode = (Element)outgoingTwitterConnectors.item(i);
+
+         parseTwitterConnector(twitterConnectorNode, config, false);
+      }
+
    }
 
    /**
@@ -1223,7 +1242,47 @@
 
       mainConfig.getDivertConfigurations().add(config);
    }
+   
+   private void parseTwitterConnector(final Element connector,
+                                      final Configuration mainConfig,
+                                      final boolean isIncoming )
+   {
+      TwitterConnectorConfiguration conf = new TwitterConnectorConfiguration();
+      conf.setIncoming(isIncoming);
+      
+      String connectorName = connector.getAttribute("name");
+      conf.setConnectorName(connectorName);
+      
+      String queueName = XMLConfigurationUtil.getString(connector, "queue-name", null, Validators.NOT_NULL_OR_EMPTY);
+      conf.setQueueName(queueName);
+      
+      if(isIncoming)
+      {
+         int intervalMinutes = XMLConfigurationUtil.getInteger(connector, "interval-minutes", 10, Validators.NO_CHECK);
+         conf.setIntervalSeconds(intervalMinutes);
+      }
+      
+      NodeList accountInfo = connector.getElementsByTagName("twitter-account").item(0).getChildNodes();
+      String username = null;
+      String password = null;
+      for(int i=0; i<accountInfo.getLength(); i++)
+      {
+         Node val = accountInfo.item(i);
+         if(val.getNodeName().equals("username"))
+         {
+            username = val.getTextContent();
+         }
+         else if(val.getNodeName().equals("password"))
+         {
+            password = val.getTextContent();
+         }
+      }
+      conf.setUserName(username);
+      conf.setPassword(password);
 
+      mainConfig.getTwitterConnectorConfigurations().add(conf);
+   }
+
    // Inner classes -------------------------------------------------
 
 }

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -35,6 +35,7 @@
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.twitter.TwitterConnectorService;
 import org.hornetq.core.version.Version;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.protocol.SessionCallback;
@@ -144,7 +145,9 @@
    ReplicationManager getReplicationManager();
 
    boolean checkActivate() throws Exception;
-
+   
+   TwitterConnectorService getTwitterConnectorService();        
+   
    void deployDivert(DivertConfiguration config) throws Exception;
 
    void destroyDivert(SimpleString name) throws Exception;

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-06-29 08:37:55 UTC (rev 9367)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -108,6 +108,8 @@
 import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.impl.ResourceManagerImpl;
+import org.hornetq.core.twitter.TwitterConnectorService;
+import org.hornetq.core.twitter.impl.TwitterConnectorServiceImpl;
 import org.hornetq.core.version.Version;
 import org.hornetq.spi.core.logging.LogDelegateFactory;
 import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -184,6 +186,8 @@
    private volatile RemotingService remotingService;
 
    private volatile ManagementService managementService;
+   
+   private volatile TwitterConnectorService twitterService;
 
    private MemoryManager memoryManager;
 
@@ -314,6 +318,13 @@
       // so it can be initialised by the live node
       remotingService.start();
 
+      // start twitter connector service
+      twitterService = new TwitterConnectorServiceImpl(configuration,
+                                                       scheduledPool,
+                                                       storageManager,
+                                                       postOffice);
+      twitterService.start();
+      
       started = true;
 
       HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " started");
@@ -341,6 +352,8 @@
             return;
          }
 
+         twitterService.stop();
+         
          if (clusterManager != null)
          {
             clusterManager.stop();
@@ -766,6 +779,12 @@
       return replicationManager;
    }
 
+   public TwitterConnectorService getTwitterConnectorService()
+   {
+      return twitterService;
+   }
+
+
    // Public
    // ---------------------------------------------------------------------------------------
 

Added: trunk/src/main/org/hornetq/core/twitter/TwitterConnectorService.java
===================================================================
--- trunk/src/main/org/hornetq/core/twitter/TwitterConnectorService.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/twitter/TwitterConnectorService.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.twitter;
+
+import org.hornetq.core.server.HornetQComponent;
+/**
+ * A TwitterConnectorService
+ *
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ *
+ *
+ */
+public interface TwitterConnectorService extends HornetQComponent
+{
+   public int getIncomingConnectorCount();
+   
+   public int getOutgoingConnectorCount();
+   
+   public boolean isStarted();
+}

Added: trunk/src/main/org/hornetq/core/twitter/TwitterConstants.java
===================================================================
--- trunk/src/main/org/hornetq/core/twitter/TwitterConstants.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/twitter/TwitterConstants.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.twitter;
+
+/**
+ * A TwitterConstants
+ *
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ */
+public class TwitterConstants
+{
+   public static final String KEY_ID = "id";
+   public static final String KEY_SOURCE = "source";
+   public static final String KEY_CREATED_AT = "createdAt";
+   public static final String KEY_IS_TRUNCATED = "isTruncated";
+   public static final String KEY_IN_REPLY_TO_STATUS_ID = "inReplyToStatusId";
+   public static final String KEY_IN_REPLY_TO_USER_ID = "inReplyToUserId";
+   public static final String KEY_IN_REPLY_TO_SCREEN_NAME = "inReplyToScreenName";
+   public static final String KEY_IS_FAVORITED = "isFavorited";
+   public static final String KEY_IS_RETWEET = "isRetweet";
+   public static final String KEY_CONTRIBUTORS = "contributors";
+   public static final String KEY_GEO_LOCATION_LATITUDE = "geoLocation.latitude";
+   public static final String KEY_GEO_LOCATION_LONGITUDE = "geoLocation.longitude";
+   public static final String KEY_PLACE_ID = "place.id";
+   public static final String KEY_DISPLAY_COODINATES = "displayCoodinates";
+   
+   public static final int DEFAULT_POLLING_INTERVAL_SECS = 10;
+   public static final int DEFAULT_PAGE_SIZE = 100;
+   public static final int FIRST_ATTEMPT_PAGE_SIZE = 1;
+   public static final int START_SINCE_ID = 1;
+   public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50;
+}

Added: trunk/src/main/org/hornetq/core/twitter/impl/TwitterConnectorServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/twitter/impl/TwitterConnectorServiceImpl.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/twitter/impl/TwitterConnectorServiceImpl.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,524 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.twitter.impl;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import twitter4j.*;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.Consumer;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.twitter.TwitterConstants;
+import org.hornetq.core.twitter.TwitterConnectorService;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TwitterConnectorConfiguration;
+import org.hornetq.core.filter.Filter;
+
+/**
+ * A TwitterConnectorServiceImpl
+ *
+ * @author <a href="tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ * 
+ */
+public class TwitterConnectorServiceImpl implements TwitterConnectorService
+{
+   private static final Logger log = Logger.getLogger(TwitterConnectorServiceImpl.class);
+
+   private volatile boolean isStarted = false;
+
+   private final Configuration config;
+
+   private final ScheduledExecutorService scheduledPool;
+   
+   private final StorageManager storageManager;
+   
+   private final PostOffice postOffice;
+   
+   private final List<IncomingTweetsHandler> incomingHandlers = new ArrayList<IncomingTweetsHandler>();
+
+   private final HashMap<IncomingTweetsHandler, ScheduledFuture<?>> futureList = new HashMap<IncomingTweetsHandler,ScheduledFuture<?>>();
+
+   private final List<OutgoingTweetsHandler> outgoingHandlers = new ArrayList<OutgoingTweetsHandler>();
+
+   public TwitterConnectorServiceImpl(final Configuration config,
+                                      final ScheduledExecutorService pool,
+                                      final StorageManager storageManager,
+                                      final PostOffice postOffice)
+   {
+      this.config = config;
+      this.scheduledPool = pool;
+      this.storageManager = storageManager;
+      this.postOffice = postOffice;
+   }
+
+   public boolean isStarted()
+   {
+      return this.isStarted;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (this.isStarted)
+      {
+         return;
+      }
+
+      for (TwitterConnectorConfiguration twitterConf : this.config.getTwitterConnectorConfigurations())
+      {
+         String connectorName = twitterConf.getConnectorName();
+
+         if (twitterConf.isIncoming())
+         {
+            IncomingTweetsHandler incoming;
+            try
+            {
+                incoming = new IncomingTweetsHandler(connectorName,
+                                                    twitterConf.getUserName(),
+                                                    twitterConf.getPassword(),
+                                                    twitterConf.getQueueName(),
+                                                    twitterConf.getIntervalSeconds(),
+                                                    storageManager,
+                                                    postOffice);
+                incoming.initialize();
+                ScheduledFuture<?> sf = this.scheduledPool.scheduleWithFixedDelay(incoming,
+                                                                                  incoming.getIntervalSeconds(),
+                                                                                  incoming.getIntervalSeconds(),
+                                                                                  TimeUnit.SECONDS);
+                this.futureList.put(incoming, sf);
+                this.incomingHandlers.add(incoming);
+            }
+            catch(Exception e)
+            {
+               log.warn(connectorName + ": failed to initialize", e);
+               continue;
+            }
+         }
+         else
+         {
+            OutgoingTweetsHandler outgoing;
+            try
+            {
+               outgoing = new OutgoingTweetsHandler(connectorName,
+                                                   twitterConf.getUserName(),
+                                                   twitterConf.getPassword(),
+                                                   twitterConf.getQueueName(),
+                                                   postOffice);
+               outgoing.start();
+               this.outgoingHandlers.add(outgoing);
+            }
+            catch(Exception e)
+            {
+               log.warn(connectorName + ": failed to initialize", e);
+               continue;
+            }
+         }
+         
+         log.debug("Initialize twitter connector: [" + "connector-name=" +
+                   connectorName +
+                   ", username=" +
+                   twitterConf.getUserName() +
+                   ", queue-name=" +
+                   twitterConf.getQueueName() +
+                   ", interval-seconds=" +
+                   twitterConf.getIntervalSeconds() +
+                   "]");
+      }
+
+      this.isStarted = true;
+      log.debug(this.getClass().getSimpleName() + " started");
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      if (!this.isStarted)
+      {
+         return;
+      }
+
+      for (IncomingTweetsHandler in : this.incomingHandlers)
+      {
+         if (this.futureList.get(in).cancel(true))
+         {
+            this.futureList.remove(in);
+            log.debug(in.getConnectorName() + ": stopped");
+         }
+         else
+         {
+            log.warn(in.getConnectorName() + ": stop failed");
+         }
+      }
+      this.incomingHandlers.clear();
+
+      for (OutgoingTweetsHandler out : this.outgoingHandlers)
+      {
+         try
+         {
+            out.shutdown();
+         }
+         catch(Exception e)
+         {
+            log.warn(e);
+         }
+      }
+      this.outgoingHandlers.clear();
+
+      this.isStarted = false;
+      log.debug(this.getClass().getSimpleName() + " stopped");
+   }
+
+   public int getIncomingConnectorCount()
+   {
+      return incomingHandlers.size();
+   }
+
+   public int getOutgoingConnectorCount()
+   {
+      return outgoingHandlers.size();
+   }
+
+   /**
+    * IncomingTweetsHandler consumes from twitter and forwards to the
+    * configured HornetQ address.
+    */
+   private class IncomingTweetsHandler extends Thread
+   {
+      private final String connectorName;
+
+      private final String userName;
+      
+      private final String password;
+      
+      private final String queueName;
+      
+      private final int intervalSeconds;
+
+      private final StorageManager storageManager;
+
+      private final PostOffice postOffice;
+
+      private final Paging paging = new Paging();
+
+      private Twitter twitter;
+
+      public IncomingTweetsHandler(final String connectorName,
+                                   final String userName,
+                                   final String password,
+                                   final String queueName,
+                                   final int intervalSeconds,
+                                   final StorageManager storageManager,
+                                   final PostOffice postOffice) throws Exception
+      {
+         this.connectorName = connectorName;
+         this.userName = userName;
+         this.password = password;
+         this.queueName = queueName;
+         if (intervalSeconds > 0)
+         {
+            this.intervalSeconds = intervalSeconds;
+         }
+         else
+         {
+            this.intervalSeconds = TwitterConstants.DEFAULT_POLLING_INTERVAL_SECS;
+         }
+         this.storageManager = storageManager;
+         this.postOffice = postOffice;
+      }
+
+      public void initialize() throws Exception
+      {
+         Binding b = postOffice.getBinding(new SimpleString(queueName));
+         if(b == null)
+         {
+            throw new Exception(connectorName + ": queue " + queueName + " not found");
+         }
+
+         TwitterFactory tf = new TwitterFactory();
+         this.twitter = tf.getInstance(userName, password);
+         this.twitter.verifyCredentials();
+
+         // getting latest ID
+         this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE);
+         ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
+         this.paging.setSinceId(res.get(0).getId());
+         log.debug(connectorName + " initialise(): got latest ID: "+this.paging.getSinceId());
+
+         // TODO make page size configurable
+         this.paging.setCount(TwitterConstants.DEFAULT_PAGE_SIZE);
+      }
+      
+      /**
+       *  TODO streaming API support
+       *  TODO rate limit support
+       */
+      public void run()
+      {
+         // Avoid cancelling the task with RuntimeException
+         try
+         {
+            poll();
+         }
+         catch(Throwable t)
+         {
+            log.warn(connectorName, t);
+         }
+      }
+
+      public int getIntervalSeconds()
+      {
+         return this.intervalSeconds;
+      }
+
+      public String getConnectorName()
+      {
+         return this.connectorName;
+      }
+
+      private void poll() throws Exception
+      {
+         // get new tweets         
+         ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
+
+         if(res == null || res.size() == 0)
+         {
+            return;
+         }
+      
+         for (int i = res.size() - 1; i >= 0; i--)
+         {
+            Status status = res.get(i);
+
+            ServerMessage msg = new ServerMessageImpl(this.storageManager.generateUniqueID(),
+                                                      TwitterConstants.INITIAL_MESSAGE_BUFFER_SIZE);
+            msg.setAddress(new SimpleString(this.queueName));
+            msg.setDurable(true);
+            msg.encodeMessageIDToBuffer();
+
+            putTweetIntoMessage(status, msg);
+
+            this.postOffice.route(msg,false);
+            log.debug(connectorName + ": routed: " + status.toString());
+         }
+
+         this.paging.setSinceId(res.get(0).getId());
+         log.debug(connectorName + ": update latest ID: " + this.paging.getSinceId());
+      }
+      
+      private void putTweetIntoMessage(final Status status, final ServerMessage msg)
+      {
+         msg.getBodyBuffer().writeString(status.getText());
+         msg.putLongProperty(TwitterConstants.KEY_ID, status.getId());
+         msg.putStringProperty(TwitterConstants.KEY_SOURCE, status.getSource());
+
+         msg.putLongProperty(TwitterConstants.KEY_CREATED_AT, status.getCreatedAt().getTime());
+         msg.putBooleanProperty(TwitterConstants.KEY_IS_TRUNCATED, status.isTruncated());
+         msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, status.getInReplyToStatusId());
+         msg.putIntProperty(TwitterConstants.KEY_IN_REPLY_TO_USER_ID, status.getInReplyToUserId());
+         msg.putBooleanProperty(TwitterConstants.KEY_IS_FAVORITED, status.isFavorited());
+         msg.putBooleanProperty(TwitterConstants.KEY_IS_RETWEET, status.isRetweet());
+         msg.putObjectProperty(TwitterConstants.KEY_CONTRIBUTORS, status.getContributors());
+         GeoLocation gl;
+         if ((gl = status.getGeoLocation()) != null)
+         {
+            msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE, gl.getLatitude());
+            msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE, gl.getLongitude());
+         }
+         Place place;
+         if ((place = status.getPlace()) != null)
+         {
+            msg.putStringProperty(TwitterConstants.KEY_PLACE_ID, place.getId());
+         }
+      }
+   }
+
+   /**
+    * OutgoingTweetsHandler consumes from configured HornetQ address
+    * and forwards to the twitter.
+    */
+   private class OutgoingTweetsHandler implements Consumer
+   {
+      private final String connectorName;
+      
+      private final String userName;
+      
+      private final String password;
+      
+      private final String queueName;
+      
+      private final PostOffice postOffice;
+      
+      private Twitter twitter = null;
+      
+      private Queue queue = null;
+
+      private Filter filter = null;
+      
+      private boolean enabled = false;
+      
+      public OutgoingTweetsHandler(final String connectorName,
+                                   final String userName,
+                                   final String password,
+                                   final String queueName,
+                                   final PostOffice postOffice) throws Exception
+      {
+         this.connectorName = connectorName;
+         this.userName = userName;
+         this.password = password;
+         this.queueName = queueName;
+         this.postOffice = postOffice;
+      }
+
+      /**
+       * TODO streaming API support 
+       * TODO rate limit support
+       */
+      public synchronized void start() throws Exception
+      {
+         if(this.enabled)
+         {
+            return;
+         }
+         
+         if(this.connectorName == null || this.connectorName.trim().equals(""))
+         {
+            throw new Exception("invalid connector name: " + this.connectorName);
+         }
+         
+         if(this.queueName == null || this.queueName.trim().equals(""))
+         {
+            throw new Exception("invalid queue name: " + queueName);
+         }
+         
+         SimpleString name = new SimpleString(this.queueName);
+         Binding b = this.postOffice.getBinding(name);
+         if(b == null)
+         {
+            throw new Exception(connectorName + ": queue " + queueName + " not found");
+         }
+         this.queue = (Queue)b.getBindable();
+
+         TwitterFactory tf = new TwitterFactory();
+         this.twitter = tf.getInstance(userName, password);
+         this.twitter.verifyCredentials();
+         // TODO make filter-string configurable
+         // this.filter = FilterImpl.createFilter(filterString);
+         this.filter = null;
+
+         this.queue.addConsumer(this);
+
+         this.queue.deliverAsync();
+         this.enabled = true;
+         log.debug(connectorName + ": started");
+      }
+
+      public synchronized void shutdown() throws Exception
+      {
+         if(!this.enabled)
+         {
+            return;
+         }
+         
+         log.debug(connectorName + ": receive shutdown request");
+
+         this.queue.removeConsumer(this);
+
+         this.enabled = false;
+         log.debug(connectorName + ": shutdown");
+      }
+
+      public Filter getFilter()
+      {
+         return filter;
+      }
+
+      public HandleStatus handle(final MessageReference ref) throws Exception
+      {
+         if (filter != null && !filter.match(ref.getMessage()))
+         {
+            return HandleStatus.NO_MATCH;
+         }
+
+         synchronized (this)
+         {
+            ref.handled();
+
+            ServerMessage message = ref.getMessage();
+
+            StatusUpdate status = new StatusUpdate(message.getBodyBuffer().readString());
+
+            // set optional property
+            
+            if(message.containsProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID))
+            {
+               status.setInReplyToStatusId(message.getLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID));
+            }
+            
+            if(message.containsProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE))
+            {
+               double geolat = message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE);
+               double geolong = message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE);
+               status.setLocation(new GeoLocation(geolat, geolong));
+            }
+
+            if(message.containsProperty(TwitterConstants.KEY_PLACE_ID))
+            {
+               status.setPlaceId(message.getStringProperty(TwitterConstants.KEY_PLACE_ID));
+            }
+
+            if(message.containsProperty(TwitterConstants.KEY_DISPLAY_COODINATES))
+            {
+               status.setDisplayCoordinates(message.getBooleanProperty(TwitterConstants.KEY_DISPLAY_COODINATES));
+            }
+
+            // send to Twitter
+            try
+            {
+               this.twitter.updateStatus(status);
+            }
+            catch (TwitterException e)
+            {
+               if(e.getStatusCode() == 403 )
+               {
+                  // duplicated message
+                  log.warn(connectorName + ": HTTP status code = 403: Ignore duplicated message");
+                  queue.acknowledge(ref);
+                  
+                  return HandleStatus.HANDLED;
+               }
+               else
+               {
+                  throw e;
+               }
+            }
+
+            queue.acknowledge(ref);
+            log.debug(connectorName + ": forwarded to twitter: " + message.getMessageID());
+            return HandleStatus.HANDLED;
+         }
+      }
+   }
+}

Added: trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/twitter/TwitterTest.java	2010-06-29 08:46:10 UTC (rev 9368)
@@ -0,0 +1,498 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.twitter;
+
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.TwitterConnectorConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.twitter.TwitterConnectorService;
+import org.hornetq.core.twitter.TwitterConstants;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import twitter4j.*;
+
+/**
+ * A TwitterTest
+ *
+ * @author tm.igarashi at gmail.com
+ *
+ *
+ */
+public class TwitterTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(TwitterTest.class);
+   private static final String KEY_CONNECTOR_NAME = "connector.name";
+   private static final String KEY_USERNAME = "username";
+   private static final String KEY_PASSWORD = "password";
+   private static final String KEY_QUEUE_NAME = "queue.name";
+   
+   private static final String TWITTER_USERNAME = System.getProperty("twitter.username");
+   private static final String TWITTER_PASSWORD = System.getProperty("twitter.password");
+   
+   @Override
+   protected void setUp() throws Exception
+   {
+      if(TWITTER_USERNAME == null || TWITTER_PASSWORD == null)
+      {
+         throw new Exception("* * *  Please set twitter.username and twitter.password in system property  * * *");
+      }
+      super.setUp();
+   }
+
+   // incoming
+   
+   public void testSimpleIncoming() throws Exception
+   {
+      internalTestIncoming(true,false);
+   }
+
+   public void testIncomingNoQueue() throws Exception
+   {
+      internalTestIncoming(false,false);
+   }
+
+   public void testIncomingWithRestart() throws Exception
+   {
+      internalTestIncoming(true,true);
+   }
+   
+   public void testIncomingWithEmptyConnectorName() throws Exception
+   {
+      HashMap<String,String> params = new HashMap<String,String>();
+      params.put(KEY_CONNECTOR_NAME, "");
+      internalTestIncomingFailedToInitialize(params);
+   }
+
+   public void testIncomingWithEmptyQueueName() throws Exception
+   {
+      HashMap<String,String> params = new HashMap<String,String>();
+      params.put(KEY_QUEUE_NAME, "");
+      internalTestIncomingFailedToInitialize(params);
+   }
+
+   public void testIncomingWithInvalidCredentials() throws Exception
+   {
+      HashMap<String,String> params = new HashMap<String,String>();
+      params.put(KEY_USERNAME, "invalidUsername");
+      params.put(KEY_PASSWORD, "invalidPassword");
+      internalTestIncomingFailedToInitialize(params);
+   }
+
+   //outgoing
+   
+   public void testSimpleOutgoing() throws Exception
+   {
+      internalTestOutgoing(true,false);
+   }
+
+   public void testOutgoingNoQueue() throws Exception
+   {
+      internalTestOutgoing(false,false);
+   }
+   public void testOutgoingWithRestart() throws Exception
+   {
+      internalTestOutgoing(true,true);
+   }
+   
+   public void testOutgoingWithEmptyConnectorName() throws Exception
+   {
+      HashMap<String,String> params = new HashMap<String,String>();
+      params.put(KEY_CONNECTOR_NAME, "");
+      internalTestOutgoingFailedToInitialize(params);
+   }
+
+   public void testOutgoingWithEmptyQueueName() throws Exception
+   {
+      HashMap<String,String> params = new HashMap<String,String>();
+      params.put(KEY_QUEUE_NAME, "");
+      internalTestOutgoingFailedToInitialize(params);
+   }
+
+   public void testOutgoingWithInvalidCredentials() throws Exception
+   {
+      HashMap<String,String> params = new HashMap<String,String>();
+      params.put(KEY_USERNAME, "invalidUsername");
+      params.put(KEY_PASSWORD, "invalidPassword");
+      internalTestOutgoingFailedToInitialize(params);
+   }
+   
+   /**
+    *  This will fail until TFJ-347 is fixed.
+    * http://twitter4j.org/jira/browse/TFJ-347
+    * 
+    * @throws Exception
+    */
+   public void _testOutgoingWithInReplyTo() throws Exception
+   {
+      internalTestOutgoingWithInReplyTo();
+   }
+   
+   protected void internalTestIncoming(boolean createQueue, boolean restart) throws Exception
+   {
+      HornetQServer server0 = null;
+      ClientSession session = null;
+      String queue = "TwitterTestQueue";
+      int interval = 5;
+      Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+      String testMessage = "TwitterTest/incoming: " + System.currentTimeMillis();
+      log.debug("test incoming: " + testMessage);
+      
+      try
+      {
+         Configuration configuration = createDefaultConfig(false);
+         TwitterConnectorConfiguration inconf = new TwitterConnectorConfiguration();
+         inconf.setConnectorName("test-incoming-connector");
+         inconf.setIncoming(true);
+         inconf.setIntervalSeconds(interval);
+         inconf.setQueueName(queue);
+         inconf.setUserName(TWITTER_USERNAME);
+         inconf.setPassword(TWITTER_PASSWORD);
+         configuration.getTwitterConnectorConfigurations().add(inconf);
+         if(createQueue)
+         {
+            CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, true);
+            configuration.getQueueConfigurations().add(qc);
+         }
+
+         server0 = createServer(false,configuration);
+         server0.start();
+         
+         TwitterConnectorService service = server0.getTwitterConnectorService();
+         if(restart)
+         {
+            service.stop();
+            service.start();
+         }
+
+         Assert.assertEquals(0, service.getOutgoingConnectorCount());
+         if(createQueue)
+         {
+            Assert.assertEquals(1, service.getIncomingConnectorCount());
+         }
+         else
+         {
+            Assert.assertEquals(0, service.getIncomingConnectorCount());
+            return;
+         }
+
+         twitter.updateStatus(testMessage);
+
+         TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+         ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+         session = sf.createSession(false, true, true);
+         ClientConsumer consumer = session.createConsumer(queue);
+         session.start();
+         ClientMessage msg = consumer.receive(60*1000);
+         
+         Assert.assertNotNull(msg);
+         Assert.assertEquals(testMessage, msg.getBodyBuffer().readString());
+         
+         msg.acknowledge();
+      }
+      finally
+      {
+         try
+         {
+            session.close();
+         }
+         catch(Throwable t)
+         {
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch(Throwable ignored)
+         {
+         }
+      }
+   }
+
+   protected void internalTestIncomingFailedToInitialize(HashMap<String,String> params) throws Exception
+   {
+      HornetQServer server0 = null;
+      String connectorName = "test-incoming-connector"; 
+      String queue = "TwitterTestQueue";
+      String userName = "invalidUsername";
+      String password = "invalidPassword";
+      int interval = 5;
+      
+      if(params.containsKey(KEY_CONNECTOR_NAME))
+      {
+         connectorName = params.get(KEY_CONNECTOR_NAME);
+      }
+      if(params.containsKey(KEY_USERNAME))
+      {
+         userName = params.get(KEY_USERNAME);
+      }
+      if(params.containsKey(KEY_PASSWORD))
+      {
+         password = params.get(KEY_PASSWORD);
+      }
+      if(params.containsKey(KEY_QUEUE_NAME))
+      {
+         queue = params.get(KEY_QUEUE_NAME);
+      }
+      
+      try
+      {
+         Configuration configuration = createDefaultConfig(false);
+         TwitterConnectorConfiguration inconf = new TwitterConnectorConfiguration();
+         inconf.setConnectorName(connectorName);
+         inconf.setIncoming(true);
+         inconf.setIntervalSeconds(interval);
+         inconf.setQueueName(queue);
+         inconf.setUserName(userName);
+         inconf.setPassword(password);
+         configuration.getTwitterConnectorConfigurations().add(inconf);
+         CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, true);
+         configuration.getQueueConfigurations().add(qc);
+
+         server0 = createServer(false,configuration);
+         server0.start();
+         
+         TwitterConnectorService twitterService = server0.getTwitterConnectorService();
+         Assert.assertEquals(0, twitterService.getIncomingConnectorCount());
+         Assert.assertEquals(0, twitterService.getOutgoingConnectorCount());
+      }
+      finally
+      {
+         try
+         {
+            server0.stop();
+         }
+         catch(Throwable ignored)
+         {
+         }
+      }
+   }
+
+   protected void internalTestOutgoing(boolean createQueue, boolean restart) throws Exception
+   {
+      HornetQServer server0 = null;
+      ClientSession session = null;
+      String queue = "TwitterTestQueue";
+      Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+      String testMessage = "TwitterTest/outgoing: " + System.currentTimeMillis();
+      log.debug("test outgoing: " + testMessage);
+      
+      try
+      {
+         Configuration configuration = createDefaultConfig(false);
+         TwitterConnectorConfiguration outconf = new TwitterConnectorConfiguration();
+         outconf.setConnectorName("test-outgoing-connector");
+         outconf.setIncoming(false);
+         outconf.setQueueName(queue);
+         outconf.setUserName(TWITTER_USERNAME);
+         outconf.setPassword(TWITTER_PASSWORD);
+         configuration.getTwitterConnectorConfigurations().add(outconf);
+         if(createQueue)
+         {
+            CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, false);
+            configuration.getQueueConfigurations().add(qc);
+         }
+         
+         server0 = createServer(false,configuration);
+         server0.start();
+         
+         TwitterConnectorService service = server0.getTwitterConnectorService();
+         if(restart)
+         {
+            service.stop();
+            service.start();
+         }
+
+         Assert.assertEquals(0, service.getIncomingConnectorCount());
+         if(createQueue)
+         {
+            Assert.assertEquals(1, service.getOutgoingConnectorCount());
+         }
+         else
+         {
+            Assert.assertEquals(0, service.getOutgoingConnectorCount());
+            return;
+         }
+
+         TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+         ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+         session = sf.createSession(false, true, true);
+         ClientProducer producer = session.createProducer(queue);
+         ClientMessage msg = session.createMessage(false);
+         msg.getBodyBuffer().writeString(testMessage);
+         session.start();
+         producer.send(msg);
+
+         Thread.sleep(3000);
+         
+         Paging page = new Paging();
+         page.setCount(1);
+         ResponseList<Status> res = twitter.getHomeTimeline(page);
+         
+         Assert.assertEquals(testMessage, res.get(0).getText());
+      }
+      finally
+      {
+         try
+         {
+            session.close();
+         }
+         catch(Throwable t)
+         {
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch(Throwable ignored)
+         {
+         }
+      }
+   }
+
+   protected void internalTestOutgoingFailedToInitialize(HashMap<String,String> params) throws Exception
+   {
+      HornetQServer server0 = null;
+      String connectorName = "test-outgoing-connector"; 
+      String queue = "TwitterTestQueue";
+      String userName = TWITTER_USERNAME;
+      String password = TWITTER_PASSWORD;
+      
+      if(params.containsKey(KEY_CONNECTOR_NAME))
+      {
+         connectorName = params.get(KEY_CONNECTOR_NAME);
+      }
+      if(params.containsKey(KEY_USERNAME))
+      {
+         userName = params.get(KEY_USERNAME);
+      }
+      if(params.containsKey(KEY_PASSWORD))
+      {
+         password = params.get(KEY_PASSWORD);
+      }
+      if(params.containsKey(KEY_QUEUE_NAME))
+      {
+         queue = params.get(KEY_QUEUE_NAME);
+      }
+      
+      try
+      {
+         Configuration configuration = createDefaultConfig(false);
+         TwitterConnectorConfiguration outconf = new TwitterConnectorConfiguration();
+         outconf.setConnectorName(connectorName);
+         outconf.setIncoming(false);
+         outconf.setQueueName(queue);
+         outconf.setUserName(userName);
+         outconf.setPassword(password);
+         configuration.getTwitterConnectorConfigurations().add(outconf);
+         CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, false);
+         configuration.getQueueConfigurations().add(qc);
+         
+         server0 = createServer(false,configuration);
+         server0.start();
+         
+         TwitterConnectorService service = server0.getTwitterConnectorService();
+         Assert.assertEquals(0, service.getIncomingConnectorCount());
+         Assert.assertEquals(0, service.getOutgoingConnectorCount());
+      }
+      finally
+      {
+         try
+         {
+            server0.stop();
+         }
+         catch(Throwable ignored)
+         {
+         }
+      }
+   }
+
+   protected void internalTestOutgoingWithInReplyTo() throws Exception
+   {
+      HornetQServer server0 = null;
+      ClientSession session = null;
+      String queue = "TwitterTestQueue";
+      Twitter twitter = new TwitterFactory().getInstance(TWITTER_USERNAME,TWITTER_PASSWORD);
+      String testMessage = "TwitterTest/outgoing with in_reply_to: " + System.currentTimeMillis();
+      String replyMessage = "@" + TWITTER_USERNAME + " TwitterTest/outgoing reply: " + System.currentTimeMillis();
+      try
+      {
+         Configuration configuration = createDefaultConfig(false);
+         TwitterConnectorConfiguration outconf = new TwitterConnectorConfiguration();
+         outconf.setConnectorName("test-outgoing-with-in-reply-to");
+         outconf.setIncoming(false);
+         outconf.setQueueName(queue);
+         outconf.setUserName(TWITTER_USERNAME);
+         outconf.setPassword(TWITTER_PASSWORD);
+         configuration.getTwitterConnectorConfigurations().add(outconf);
+         CoreQueueConfiguration qc = new CoreQueueConfiguration(queue, queue, null, false);
+         configuration.getQueueConfigurations().add(qc);
+
+         Status s = twitter.updateStatus(testMessage);
+
+         server0 = createServer(false,configuration);
+         server0.start();
+         
+         TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+         ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tpconf);
+         session = sf.createSession(false, true, true);
+         ClientProducer producer = session.createProducer(queue);
+         ClientMessage msg = session.createMessage(false);
+         msg.getBodyBuffer().writeString(replyMessage);
+         msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, s.getId());
+         session.start();
+         producer.send(msg);
+
+         Thread.sleep(3000);
+         
+         Paging page = new Paging();
+         page.setCount(2);
+         ResponseList<Status> res = twitter.getHomeTimeline(page);
+         
+         Assert.assertEquals(testMessage, res.get(1).getText());
+         Assert.assertEquals(-1, res.get(1).getInReplyToStatusId());
+         Assert.assertEquals(replyMessage, res.get(0).getText());
+         Assert.assertEquals(s.getId(), res.get(0).getInReplyToStatusId());
+      }
+      finally
+      {
+         try
+         {
+            session.close();
+         }
+         catch(Throwable t)
+         {
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch(Throwable ignored)
+         {
+         }
+      }
+   }
+}



More information about the hornetq-commits mailing list