Regarding server streaming, we will implement a simplified recommendation service to recommend the books based on the popularity. Like client streaming, we would need a response handler - StreamObserver<RecommendBookResponse>
- to handle a sequential of responses given one request.
Since this is a recommendation function, we would need search the books from our book store given the criteria. Thus we would need add a searchBook
method to our book store.
@Override
public void searchBook(Context ctx, Integer popularity, BookStream bookStream) {
for (Map.Entry<String, Book> entry : inMemoryBookStore.entrySet()) {
if (ctx.isCancelled()) {
logger.info("context is cancelled");
return;
}
Book book = entry.getValue();
if (book.getPopularity() >= popularity) {
bookStream.Send(book.getId());
}
}
}
The searchBook
need three vairables:
- context: check if connection is still available
- popularity: used to find the recommended books
- bookStream: the wrapper of
StreamObserver<RecommendBookResponse>
to send back the responses (bookStream is an interface and we will implement a lambda function for it inBookService
).
Then we can add recommenBook
method to BookService
to handle the request and keep sending back the responses if any.
@Override
public void recommendBook(RecommendBookRequest request, StreamObserver<RecommendBookResponse> responseObserver) {
Integer popularity = request.getPopularity();
logger.info("checked all books with popularity greater than " + popularity);
bookStore.searchBook(Context.current(), popularity, new BookStream() {
@Override
public void send(String bookID) {
logger.info("found book: " + bookID);
RecommendBookResponse response = RecommendBookResponse.newBuilder().setBookId(bookID).build();
responseObserver.onNext(response);
}
});
responseObserver.onCompleted();
logger.info("finished book recommendation!");
}
As we have gone through in client streaming, we could just auto insert the override method and we would know what variables we could use: request and responseObserver. Request is easy to understand and responseObserver is just the response handler. Noticebly there is nothing to return here because response are all handled inside this method by the response handler - ``StreamObserver`.
We firstly get the popularity from our request - this could be way more complicated in real-world recommendation system - and we can feed this to bookStore
to search the qualified books. And the last variable is bookStream
we metioned earlier.
bookStream
is essentially the wrapper of StreamObserver<RecommendBookResponse>
and we could directly pass in StreamObserver<RecommendBookResponse>
if we want. In bookStream
, we implement send
method to send out the response which just call StreamObserver<RecommendBookResponse>
to pass in response.
Then we need shutdown the connection after finishing one recommendation service.
You might wonder where we implement responseObserver
and how onNext
is working. Well, we don’t need in this case since we are using blockingStub
which only requires request
as the input. However, we would need responseObserver
if we are using async stub. Thus, we need think about where we would call recommendBook
. The answer is our client and we should create responseObserver
in our client for it to handle response in async way (we will discuss the differences between blockingStub and asyncStub when we wrap up).
public void recommendBook(Integer popularity) {
logger.info("search started");
RecommendBookRequest request = RecommendBookRequest.newBuilder()
.setPopularity(rand.nextInt(100))
.build();
try {
Iterator<RecommendBookResponse> iterator = blockingStub
.withDeadlineAfter(5, SECONDS)
.recommendBook(request);
while (iterator.hasNext()) {
RecommendBookResponse response = iterator.next();
logger.info("found: " + response.getBookId());
}
} catch (Exception e) {
logger.log(SEVERE, "request failed: " + e.getMessage());
return;
}
logger.info("recommendation completed!");
}
Here we would get an iterator of RecommendBookResponse
by calling recommendBook
and we could do futher process like refine the ranking and do another filtering. Now we just log everything we received.
For server, there is nothing we need do now since server only need call the service when receiving the request. Thus we don’t need change it as long as our service signature remains the same which makes saves a lot of time for us to focus on the core logic.
Next chapter will be last one - bidirectional streaming - and we will implement a rating function for client to submit ratings for several books at the same time.
The complete project can be found here.