Unary call is very simple to understand, it is essentially a normal REST API call. Each request will get a single response from the server.
In this project, we implement unary call to create books in our store.
First, we need create book_message.proto
which is a book pojo carrying the related info of book. Simply speaking, each message is pojo we are gonna use in our service.
syntax = "proto3";
package book;
option java_multiple_files = true;
option java_package = "today.ihelio.grpc";
import "sample_message.proto";
import "image_message.proto";
message Book {
enum Genre {
UNKNOWN = 0;
FICTION = 1;
MYSTERY = 2;
THRILLER = 3;
HORROR = 4;
HISTORICAL = 5;
ROMANCE = 6;
SCI_FICTION = 7;
}
string id = 1;
string name = 2;
string author = 3;
uint32 publish_year = 4;
double price = 5;
string publication = 6;
uint32 rating = 7;
uint32 rating_count = 8;
double avg_rating = 9;
repeated Sample sample = 10;
repeated Image image = 11;
repeated Genre genre = 12;
optional uint32 popularity = 13;
}
To implement a message, we need speficy:
- type - like string, uint32, double, enum and even other kind of message
- variable name
- filed number - unique number used to identify the fields i the message binary format (see google developer guide for more details)
- field rule - required, optional and repeated (see google developer guide for more details)
Next, we need build our service proto which will generate the necessary files for us including stub class.
syntax = "proto3";
package book;
option java_multiple_files = true;
option java_package = "today.ihelio.grpc";
import "book_message.proto";
message CreateBookRequest {
Book book = 1;
}
message CreateBookResponse {
string id = 1;
}
service BookService {
rpc CreateBook(CreateBookRequest) returns (CreateBookResponse) {};
}
In above proto, we define a service to create a book in our data store and normally we need an request message, an response message and a service containing the method of CreateBook
. Since it is a unary call, then we just return a CreateBookResponse
and passing a CreateBookRequest
.
After we build the proto files, we should be to find the generated java files in build/generated/source/proto/main
.
Then we should go ahead creating corresponding methods in our project:
- BookServer - handle the request and return the response
- BookClient - send the request and handle the response
- BookService - implement the actual logic in our server
- InMemoryBookStore - save the records of books
For the simplicity, I created an InMemoryBookStore
to store all book records we created in the client but using a database like MySQL in practise is better.
As I described above, the core logic we should focus on is BookService
since BookServer
and BookClient
simply inherited from the boilerplate generated by proto files created previsouly.
So we should start with creating a method CreateBook
in the BookService
.
package today.ihelio.learngrpc;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import today.ihelio.grpc.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.UUID;
import java.util.logging.Logger;
import static java.util.logging.Level.SEVERE;
/**
* @author helio
* @date 2022/9/3
* @package today.ihelio.learngrpc
*/public class BookService extends BookServiceGrpc.BookServiceImplBase {
private static final Logger logger = Logger.getLogger(BookService.class.getName());
private BookStore bookStore;
public BookService(BookStore bookStore) {
this.bookStore = bookStore;
}
@Override
public void createBook(CreateBookRequest request, StreamObserver<CreateBookResponse> responseObserver) {
Book book = request.getBook();
String id = book.getId();
logger.info("Received request to create a new book with ID: " + id);
UUID uuid;
if (id.isEmpty()) {
uuid = UUID.randomUUID();
} else {
try {
uuid = UUID.fromString(id);
} catch (IllegalArgumentException e) {
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription(e.getMessage())
.asRuntimeException()
); return;
} }
if (Context.current().isCancelled()) {
logger.info("request is cancelled");
responseObserver.onError(
Status.CANCELLED
.withDescription("request is cancelled")
.asRuntimeException());
} Book copy = book.toBuilder().setId(uuid.toString()).build();
try {
bookStore.createBook(copy);
} catch (AlreadyExistsException e) {
responseObserver.onError(
Status.ALREADY_EXISTS
.withDescription(e.getMessage())
.asRuntimeException()
); }
CreateBookResponse response = CreateBookResponse.newBuilder().setId(copy.getId()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
logger.info(String.format("book %s created", copy.getName()));
}}
Then we know we also need a BookStore
to save the book into our data store:
package today.ihelio.learngrpc;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.xds.shaded.io.envoyproxy.envoy.config.accesslog.v3.GrpcStatusFilter;
import org.checkerframework.checker.units.qual.C;
import today.ihelio.grpc.Book;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import static io.grpc.Status.NOT_FOUND;
/**
* @author helio
* @date 2022/9/3
* @package today.ihelio.learngrpc
*/public class InMemoryBookStore implements BookStore{
private final Logger logger = Logger.getLogger(InMemoryBookStore.class.getName());
ConcurrentMap<String, Book> inMemoryBookStore;
public InMemoryBookStore() {
this.inMemoryBookStore = new ConcurrentHashMap<>();
}
@Override
public void createBook(Book book) {
if (inMemoryBookStore.containsKey(book.getId())) {
throw new AlreadyExistsException("book already exists");
} else {
inMemoryBookStore.put(book.getId(), book);
} }}
Lastly, we need a Client
and a Server
.
Client will need a channel and a stub:
- channel - used to connect with server given the address and port
- stub - gRPC is still client/server mode so we have server and client in gRPC as well. However, client is called as stub in gRPC.
package today.ihelio.learngrpc;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import today.ihelio.grpc.*;
import today.ihelio.sample.Generator;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.logging.Level.SEVERE;
/**
* @author helio
* @date 2022/9/3
* @package today.ihelio.learngrpc
*/public class BookClient {
private static final Logger logger = Logger.getLogger(BookClient.class.getName());
private final ManagedChannel channel;
private final BookServiceGrpc.BookServiceBlockingStub blockingStub;
private static final Random rand = new Random();
public BookClient(String host, int port) {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
blockingStub = BookServiceGrpc.newBlockingStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, SECONDS);
}
public void createBook(Book book) throws InterruptedException {
CreateBookRequest request = CreateBookRequest.newBuilder().setBook(book).build();
CreateBookResponse response = CreateBookResponse.getDefaultInstance();
try {
response = blockingStub.withDeadlineAfter(5, SECONDS).createBook(request);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
logger.info("Book already exists");
return;
}
} catch (Exception e) {
logger.log(SEVERE, "request failed " + e.getMessage());
}
logger.info("New book created: \n" + response.getId());
}
As for server, we need create a gRPC server object which will listen on given address and port. Here we only need pass in BookService
as the core component to implement the business logic and there is no need to create another CreateBook
. That tremendously simply our work on server side. But why?
The answer is simple - It is gRPC. When we created these Java files from protos, the service we created already generate the createBook
method in BookServiceGrpc.java
where we can find all services we want to implement. And we already have a BookService
which implements createBook
, thus we just need simply pass the service object to the server which is exactly the following line:
this.server = serverBuilder.addService(bookService)
.addService(ProtoReflectionService.newInstance())
.build();
Thus, the server will wait at the port and it will automatically call createBook
from BookService
once it captures the createBookRequest
.
Conclusively, it is like we can call createBook
- which implemented in BookService
and invoked in server side - directly from client side. This is essentially why gRPC supports differenet language even client in Python and server in Java.
package today.ihelio.learngrpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
* @author helio
* @date 2022/9/3
* @package today.ihelio.learngrpc
*/public class BookServer {
private final Logger logger = Logger.getLogger(BookServer.class.getName());
private BookService bookService;
private final int port;
private final Server server;
public BookServer(int port, BookStore bookStore, ImageStore imageStore) {
this(ServerBuilder.forPort(port), port, bookStore, imageStore);
}
public BookServer(ServerBuilder serverBuilder, int port, BookStore bookStore, ImageStore imageStore) {
this.port = port;
this.bookService = new BookService(bookStore, imageStore);
this.server = serverBuilder.addService(bookService)
.addService(ProtoReflectionService.newInstance())
.build();
}
public void start() throws IOException {
server.start();
logger.info("Book server started on port " + port);
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
System.err.println("shutdown gRPC server because JVM shuts down");
try {
BookServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("server shutdown");
}
});
}
public void stop() throws InterruptedException{
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException{
InMemoryBookStore inMemoryBookStore = new InMemoryBookStore();
BookServer server = new BookServer(9080, inMemoryBookStore);
server.start();
server.blockUntilShutdown();
}
}
This is how we implement unary call and we will get into client streaming in next chapter.
The complete project can be found at here.