We will speed it up a bit in this chapter after implementing unary call and one direction streaming. As always, we need implement our core logic which includes our book store which handles rating update and computing and book service which handls the request and return response.

@Override  
public Book rateBook(String bookID, Integer rating) {  
    Book book = inMemoryBookStore.getOrDefault(bookID, null);  
    if (book == null) {  
        throw NOT_FOUND.withDescription("book not found")  
                .asRuntimeException();  
    }  
    inMemoryBookStore.computeIfPresent(bookID, (k, v) -> {  
        Integer oldRating = v.getRating();  
        Integer oldCount = v.getRatingCount();  
        return v.toBuilder()  
                .setRating(rating + oldRating)  
                .setRatingCount(oldCount + 1)  
                .setAvgRating((rating + oldRating)/(float) (oldCount + 1))  
                .build();  
    });  
    return inMemoryBookStore.get(bookID);  
}

we simply update the rating in our book store.

We also need add rateBook method to BookService.

@Override  
public StreamObserver<RateBookRequest> rateBook(StreamObserver<RateBookResponse> responseObserver) {  
    return new StreamObserver<RateBookRequest>() {  
        @Override  
        public void onNext(RateBookRequest request) {  
            String bookID = request.getBookId();  
            Integer rating = request.getRating();  
            Book book = bookStore.rateBook(bookID, rating);  
            RateBookResponse response = RateBookResponse.newBuilder()  
                    .setBookId(book.getId())  
                    .setRatingCount(book.getRatingCount())  
                    .setAvgRating(book.getAvgRating())  
                    .build();  
            responseObserver.onNext(response);  
        }  
  
        @Override  
        public void onError(Throwable t) {  
            logger.log(SEVERE, "rating failed " + t.getMessage());  
        }  
  
        @Override  
        public void onCompleted() {  
            logger.info("rating finished");  
        }  
    };  
}

As client streaming, we need return a StreamObserver<RateBookRequest> and fill the three Override methods.

The last we need update is client. Since it is a bidirectional streaming, we need pass in StreamObserver<RateBookResponse> to asyncStub.rateBook and get a StreamObserver<RateBookRequest> back.

public void rateBook(String[] bookIDs, Integer[] ratings) throws InterruptedException {  
    CountDownLatch finishLatch = new CountDownLatch(1);  
    logger.info("rating started");  
    StreamObserver<RateBookRequest> requestObserver = asyncStub  
            .rateBook(new StreamObserver<RateBookResponse>() {  
                @Override  
                public void onNext(RateBookResponse response) {  
                    logger.info(String.format("laptop rated: id = %s, count = %s, avg = %s",  
                            response.getBookId(),  
                            response.getRatingCount(),  
                            response.getAvgRating()));  
                }  
  
                @Override  
                public void onError(Throwable t) {  
                    logger.log(SEVERE, "rating failed: " + t.getMessage());  
                    finishLatch.countDown();  
                }  
  
                @Override  
                public void onCompleted() {  
                    logger.info("rate laptop completed");  
                    finishLatch.countDown();  
                }  
            });  
  
    int n = bookIDs.length;  
    try {  
        for (int i = 0; i < n; i++) {  
            RateBookRequest request = RateBookRequest.newBuilder()  
                    .setBookId(bookIDs[i])  
                    .setRating(ratings[i])  
                    .build();  
            requestObserver.onNext(request);  
            logger.info("sent rate-book request: id = " + request.getBookId() + ", score = " + request.getRating());  
        }  
    } catch (Exception e) {  
        logger.log(Level.SEVERE, "unexpected error: " + e.getMessage());  
        requestObserver.onError(e);  
        return;  
    }  
  
    requestObserver.onCompleted();  
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {  
        logger.warning("request cannot finish within 1 minute");  
    }  
  
}

For responseObserver, we just log the response data. The returned requestObserer is the one we defined in BookService. Then the requestObserver will handle the requests sequentially until none. In the end, we should call onCompleted which will just simply log the message.

Now we have implemented four kinds of gRPC method. I will wrap up everything we learned in this tutorial next chapter.

The complete project can be found here.