Blocking request/response with Netty

Iain McGinniss iainmcgin at gmail.com
Thu Jul 23 04:54:01 EDT 2009


I think you can avoid the race condition by adding the listener to the  
queue before the request is dispatched - logically, the response  
cannot arrive before the request is dispatched. Having said this, my  
understanding of Netty is still somewhat limited so I don't know if  
multiple threads can be handling the pipeline simultaneously, i.e. two  
writes could be happening in parallel. In this case, or if you need to  
do some post-write processing that must happen before processing the  
response, you will need locking.

Iain

On 22 Jul 2009, at 23:38, Tomasz Blachowicz wrote:

>
> Hey Iain,
>
> Thanks a lot for your reply. Blocking while waiting for asynchronous
> response to arrive is what I precisely wanted to do. I don't want to  
> wait
> until the frame is send to the wire, this is just a half of the  
> story and
> honestly speaking not the more interesting half ;)
>
> If I'm reading your recommended approach right this is something  
> that I
> presented in my previous post in sample code. I reckon there is  
> however a
> gap in this what you are saying. Here is the thing, the setup of a  
> response
> listener and writing to channel must be synchronized. Otherwise  
> there is a
> potential situation when the listener may be notified in wrong  
> order. This
> is why I used ReentrantLock to make sure that registering the  
> listener in a
> queue and asynchronous writing to channel happens together.
>
> Anyone else, any other patterns or idioms?
>
> Cheers,
> Tom
>
>
> Iain McGinniss wrote:
>>
>> The complication in this case is that waiting on the success of the
>> ChannelFuture for write isn't enough - Tomasz also needs to wait for
>> the response corresponding to the request in the write. The situation
>> is potentially further complicated by whether the client wishes to  
>> use
>> HTTP pipelining or not. If pipelining is used, it simplifies the
>> ChannelHandlers that need to be written as they just fire stuff out
>> onto the wire without waiting for responses. If pipelining is not
>> used, you need to wait for each response to come back before you can
>> send another request.
>>
>> I would recommend first implementing the asynchronous solution -
>> dispatch an HttpRequest to Netty, and store an event listener in a
>> FIFO queue corresponding to this request. As responses come back in,
>> pop a listener off the queue and notify it that the response has
>> arrived, attaching the HttpResponse object. Wrapping that in a
>> synchronous call is then quite simple - create a new CountdownLatch
>> for each request (with value 1), send the request using the
>> asynchronous interface and give it an event listener that will call
>> countDown() on the latch. Then, call await() on the latch without a
>> timeout if you like. Edge cases exist, for instance of the connection
>> dies before the response comes back you should notify all listeners.
>>
>> Iain
>>
>> On 22 Jul 2009, at 18:19, Michael McGrady wrote:
>>
>>> Just a thought.  While Netty is asynchronous, synchronicity is  
>>> always
>>> just virtual anyway, cf. TCP/IP.  You can build a program using the
>>> ChannelFuture functionality to create your own virtual  
>>> synchronicity.
>>> 'Hope this helps.
>>>
>>> How did you implement a Netty synchronous call on the client  
>>> side?  I
>>> assume that you mean a virtual synchronous call?  Netty is  
>>> inherently
>>> asynchronous.
>>>
>>> Mike
>>>
>>>
>>> On Jul 22, 2009, at 9:57 AM, Tomasz Blachowicz wrote:
>>>
>>>>
>>>> Hi Trustin,
>>>>
>>>>
>>>> Trustin Lee wrote:
>>>>>
>>>>> In HTTP, the first response corresponds to the first request and  
>>>>> the
>>>>> second response corresponds to the second response and so on.
>>>>> Therefore, you could maintain a queue of sent requests, and poll  
>>>>> the
>>>>> request from the queue when you receive a response.  Then you can
>>>>> match
>>>>> a request and response.
>>>>>
>>>>
>>>> This is all right. However I'm not too sure how you'd implement the
>>>> matching
>>>> of requests and responses. I played a little with Netty recently  
>>>> and
>>>> managed
>>>> to implement synchronous call on client side, that works pretty
>>>> well. Here
>>>> is how I have done that. Can you have a look and comment on my
>>>> approach? I
>>>> wonder if there are any other idioms/patterns to implement
>>>> synchronous call
>>>> using Netty.
>>>>
>>>> Please, bear in mind that I still want to use single instance of
>>>> client by
>>>> many threads. Effectively single channel being used by many  
>>>> threads.
>>>> Do you
>>>> thing it is correct approach?
>>>>
>>>> Let's assume that we are having the interface of synchronous  
>>>> service
>>>> (client):
>>>>
>>>> <code>
>>>> public interface SyncService {
>>>> Response callService(Request request);
>>>> }
>>>> </code>
>>>>
>>>> It doesn't matter what is Request and Response and how these are
>>>> serialized
>>>> and transported to the server and back.
>>>>
>>>> Here is how I implemented the interface:
>>>>
>>>> <code>
>>>> @ChannelPipelineCoverage("one")
>>>> public class SyncServiceImpl extends SimpleChannleUpstreamHAndler
>>>> implements
>>>> SyncService, Closeable {
>>>>
>>>> private final Lock lock = new ReentrantLock();
>>>> private final Queue<Callback> callbacks = new
>>>> ConcurrentLinkedQueue<Callback>();
>>>>
>>>> private Channel channel;
>>>>
>>>> public Response callService(Request request) {
>>>>  Callback callback = new Callback();
>>>>  lock.lock();
>>>>  try {
>>>>    callbacks.add(callback);
>>>>    channel.write(request);	
>>>>  } finally {
>>>>    lock.unlock();
>>>>  }
>>>>  return callback.get();
>>>> }
>>>>
>>>> public void messageReceived(ChannelHandlerContext ctx, MessageEvent
>>>> e)
>>>> throws Exception {
>>>>  Response response = (Response) e.getMessage();
>>>>  callbacks.poll().handle(response);
>>>> }
>>>> 	
>>>> public void channelOpen(ChannelHandlerContext ctx,
>>>> ChannelStateEvent e)
>>>> throws Exception {
>>>>  channel = e.getChannel();
>>>>  super.channelOpen(ctx, e);
>>>> }
>>>>
>>>> public void close() throws IOException {
>>>>  channel.close().awaitUninterruptibly();
>>>> }
>>>>
>>>> static class Callback {
>>>>
>>>>  private final CountDownLatch latch = new CountDownLatch(1);
>>>>
>>>>  private Response response;
>>>>
>>>>  Response get() {
>>>>    try {
>>>>      latch.await();
>>>>    } catch (InterruptedException e) {
>>>>      throw new RuntimeException(e);
>>>>    }
>>>>    return response;
>>>>  }
>>>>
>>>>  void handle(Response response) {
>>>>    this.response = response;
>>>>    latch.countDown();
>>>>  }
>>>> }
>>>>
>>>> }
>>>> </code>
>>>>
>>>> How does it look to you? Any ideas how to improve this?
>>>>
>>>> Thanks a lot for any help and piece of discussion!
>>>>
>>>> Regards,
>>>> Tom
>>
>>
>
> -- 
> View this message in context: http://n2.nabble.com/Blocking-request-response-with-Netty-tp3254813p3306337.html
> Sent from the Netty User Group mailing list archive at Nabble.com.
> _______________________________________________
> netty-users mailing list
> netty-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/netty-users



More information about the netty-users mailing list