gRPC intro and learning

Yueying Li 7/13/2021

Outline

  1. Goal of RPC
  2. gRPC CCore and Arch Overview
  3. Life of an gRPC
  4. Examples (HelloWorld, RouteGuide)
  5. Reference and Resources

What’s the goal of RPC

Within a single program, running in a single process, recall the well-known notion of a procedure call:

  • Caller pushes args onto stack and jumps to addr of callee
  • Callee reads args from stack, executes, puts return values in register and returns to next instruction in caller

Goal: make communication appear like a local procedure call: transparency for procedure calls – way less painful than sockets…

RPC issue

  1. Heterogeneity
    1. Server must dispatch to required function (what if server is different type of machine)
    2. Client needs to rendezvous with the server
  2. Failure
    1. What if the message get dropped?
    2. What if client, server or network fails?
  3. Performance
    1. Pocedure call takes 10 cycles (3ns)
    2. RPC in DC takes 10 $\mu$s (1000x slower). In WAN, 10 $ms$
      • Due to two context switches, and call/return trap, one procedure execution

RPC Solution

Differences in data representation

  • Remote machine may run process in a certain language, with certain byte ordering, and represent FP in a different way, and with different data alignment requirements
  • Solution: IDL (Interface Description Language)
    • Mechanism to pass procedure parameters and return values in a machine-independent way
    • Procedure: Client defines API for procedure calls like names, parameters and return types, and then they run an IDL compiler which generates code to marshal

What is GRPC

gRPC is an RPC library built on C

gRPC Implementations

  1. APIs in 9 languages, natively implemented in 3 langugages, java, go and C
    w:600

gRPC C-Core: Key features

  • Payload agnostic:
    • Protobuf support (or other encoding formats like flatbuffers, json, etc.) are provided by language wrappers (not gRPC C-Core)
  • Extensible architecture:
    • The core stack is very lean
    • Most of the features are provided by pluggable Filters:
      • e.g. Auth, Tracing, Name resolution, LB, compression, message size checks, RPC deadlines, etc.

Arch overview

  • Surface layer
    • API : Expose C-Core API
    • Filters : Extensions to core functionality
      • auth_filter: authentication
      • deadline_filter: rpc deadlines
      • client_channel_filter: name resolution, load balancing
  • Note: Customers usually don’t have to call the C-Core API but wrapped language APIs
  • Transport:

    • gRPC Wire protocol
      • By default HTTP2, can also have QUIC, Cronet, Inprocess, etc.
  • IO Manager:

    • Abstraction for performing R/W (grpc_endpoint)
    • Low-level IO utility: polling engine socket creation, utils
    • Timers
    • Note: Platform specific implementation

Programming Model of C-Core

  • Channel is a commmunication pipe between client and target
  • Call is bound to a channel
    • (Multiple calls can share same channel)
  • Ops
    • Send initial Metadata
    • Receive Initial Metadata
    • Send Message
    • Receive Message
    • Send Trailing Metadata
    • Receive Trailing Metadata
  • Client and Server perform operations in batches
  • The event of the completion of the batch is notified via Completion Queue

Semantics to execute a batch of ops:

  • Call API to start a batch of ops on a call
  • Wait/Poll for the batch completion notification on CQ (associated with that call during call creation)

Types of RPC:

  • Unary: Send exactly on Msg
  • Streaming: zero or more Msgs
    • Client sends initial metadata, and sends the messages in different batches
  • Client streaming:
    • Like uploading video
  • Server streaming:
    • Like YouTube
  • Bidrectional

Life of an RPC:

Channel Creation (1)

  • Client starts by creating CQ
  • A thread waits on CQ for events as much as they can.
  • Client call the API to create the channel to a server and specify a server URI (initializes filter stack for that channel)
    • Iniialize filters for that channel
  • Client channel filter chooses a resolver based on the URI scheme (of the server URI)
  • Resolver returns list of addresses and config

Channel Creation (2)

  • Clients starts a batch of operations the Call (let’s say it’s unary)
    • All the ops sent in once
    • The client then polls the CQ for events that indicate 3 “Receive” ops (Link to Ops)
  • One of the subchannels is picked.
  • The call is now bound to that sub-channel for the lifetime of the call

LB-Policy

  • Pick-first
  • GRPC-LB: look-aside
  • Round-Robin

Overview

  1. Client started a batch of ops
  2. Server Receives the initial metadata
    1. Start a batch, and notification sent to CQ after the batch complete
    2. Thread picks up completion event (and calls RPC handler from appication)
  3. Server sends the Intial Metadata, Message, Trailing Metadata
  4. Client receives the three ops
  5. Application picks up completion event

Caveat:

  • There is no check if the server is reachable

Hello World!

Outline

  • Todo application (server, client)
  • createTodo()
  • readTodos() //sync
    • Slow: build all the TODOs and send as one request
  • readTodos() //server stream

Implementation: Protobuf

syntax = "proto3";

package todoPackage;

service Todo {
    rpc createTodo(TodoItem) returns (TodoItem);
    rpc readTodos(voidNoParam) returns (TodoItems);
    rpc readTodosStream(voidNoParam) returns (stream TodoItem);
}
message voidNoParam {}
message TodoItem {
    int32 id = 1;
    string text = 2;
}
message TodoItems {
    repeated TodoItem items = 1;
}

Implementation: example 2

  • In proto file.
    • We can have multiple methods for a service
    • Note that each method needs to have a return and input type
    • repeated means array
syntax = "proto3";
option java_package = "ex.grpc";
package helloworld;
message Reply {
    int32 result = 1;
}

message HelloMessage {
    int32 a = 1;
    int32 b = 2;
}

service TestServer {
    rpc hello_request (HelloMessage) returns (Reply) {}
}

File generation

# CPP
cd protos
protoc --cpp_out=. helloworld.proto
protoc --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` helloworld.proto

# Python
cd protos
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. helloworld.proto

Hellow world sever

  • Note:
    • implement the two methods specified in the protobuf file for the service.
    • Return status
    • Input: take the const pointer to the request message, and pointer to the reply message defined in the protobuf
class GreeterServiceImpl final : public Greeter::Service {
  Status SayHello(ServerContext* context, const HelloRequest* request,
                  HelloReply* reply) override {
     // ...
  }

  Status SayHelloAgain(ServerContext* context, const HelloRequest* request,
                       HelloReply* reply) override {
    std::string prefix("Hello again ");
    reply->set_message(prefix + request->name());
    return Status::OK;
  }
};

Hello world client

  • Note:
    • Final specifier prevents derived class to overwrite the base class.
    • request corresponds to the message type and has methods like “set_{}”, “{}” (name/message)
    • In the sync GRPC version, use stub_ to invoke the methods, parsing context, request and reply
class GreeterClient {
 public:
  // ...
  std::string SayHello(const std::string& user) {
     // ...
  }

  std::string SayHelloAgain(const std::string& user) {
    // Follows the same pattern as SayHello.
    HelloRequest request;
    request.set_name(user);
    HelloReply reply;
    ClientContext context;

    // Here we can use the stub's newly available method we just added.
    Status status = stub_->SayHelloAgain(&context, request, &reply);
    if (status.ok()) {
      return reply.message();
    } else {
      std::cout << status.error_code() << ": " << status.error_message()
                << std::endl;
      return "RRPC failed";
    }
  }
  private:
  std::unique_ptr<Greeter::Stub> stub_;
};

GRPC RunServer

  • Key structure:
    • ServerBuilder
      • AddListeningPort
      • RegisterService
      • BuildAndStart
      • wait
void RunServer() {
  std::string server_address("0.0.0.0:50051");
  GreeterServiceImpl service;

  grpc::EnableDefaultHealthCheckService(true);
  grpc::reflection::InitProtoReflectionServerBuilderPlugin();
  ServerBuilder builder;
  // Listen on the given address without any authentication mechanism.
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  // Register "service" as the instance through which we'll communicate with
  // clients. In this case it corresponds to an *synchronous* service.
  builder.RegisterService(&service);
  // Finally assemble the server.
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;

  // Wait for the server to shutdown. Note that some other thread must be
  // responsible for shutting down the server for this call to ever return.
  server->Wait();
}

Another example: RouteGuide

Methods:

rpc GetFeature(Point) returns (Feature) {}
rpc ListFeatures(Rectangle) returns (stream Feature) {}
rpc RecordRoute(stream Point) returns (RouteSummary) {}
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

Message:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}
// A latitude-longitude rectangle, represented as two diagonally opposite
// points "lo" and "hi".
message Rectangle {
  Point lo = 1;
  Point hi = 2;
}
// A feature names something at a given point.
//
// If a feature could not be named, the name is empty.
message Feature {
  // The name of the feature.
  string name = 1;

  // The point where the feature is detected.
  Point location = 2;
}

// A RouteNote is a message sent while at a given point.
message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}
// A RouteSummary is received in response to a RecordRoute rpc.
//
// It contains the number of individual points received, the number of
// detected features, and the total distance covered as the cumulative sum of
// the distance between each point.
message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Create Client

  • To call service methods, we first need to create a stub.
    • Create a gRPC channel for the stub, specifying the server addr and port
      • additional options for the channel: grpc::CreateCustomChannel() api with any special channel arguments - grpc::ChannelArguments

API for ProtoBuf

An example of generated name method for a service

// name
inline bool has_name() const;
inline void clear_name();
inline const ::std::string& name() const;
inline void set_name(const ::std::string& value);
inline void set_name(const char* value);
inline ::std::string* mutable_name();

How to locate the generated methods

  • Search for class MidTierRequest PROTOBUF_FINAL :
    • and // implements Message
  • It is important to look at the field that you defined for a message

      public:

  // nested types ----------------------------------------------------

  // accessors -------------------------------------------------------

Summary of Mid-tier request’s fields

  // .mid_tier_service.UtilRequest util_request = 1;
  bool has_util_request() const;
  private:
  bool _internal_has_util_request() const;
  public:
  void clear_util_request();
  const ::mid_tier_service::UtilRequest& util_request() const;
  ::mid_tier_service::UtilRequest* release_util_request();
  ::mid_tier_service::UtilRequest* mutable_util_request();
  void set_allocated_util_request(::mid_tier_service::UtilRequest* util_request);
  private:
  const ::mid_tier_service::UtilRequest& _internal_util_request() const;
  ::mid_tier_service::UtilRequest* _internal_mutable_util_request();
  public:
  void unsafe_arena_set_allocated_util_request(
      ::mid_tier_service::UtilRequest* util_request);
  ::mid_tier_service::UtilRequest* unsafe_arena_release_util_request();
  // uint64 resp_time = 3;
  void clear_resp_time();
  ::PROTOBUF_NAMESPACE_ID::uint64 resp_time() const;
  void set_resp_time(::PROTOBUF_NAMESPACE_ID::uint64 value);
  private:
  ::PROTOBUF_NAMESPACE_ID::uint64 _internal_resp_time() const;
  void _internal_set_resp_time(::PROTOBUF_NAMESPACE_ID::uint64 value);
  public:

  // bool last_request = 2;
  void clear_last_request();
  bool last_request() const;
  void set_last_request(bool value);
  private:
  bool _internal_last_request() const;
  void _internal_set_last_request(bool value);
  public:
  // bool kill = 4;
  void clear_kill();
  bool kill() const;
  void set_kill(bool value);
  private:
  bool _internal_kill() const;
  void _internal_set_kill(bool value);
  public:

  // uint32 load = 6;
  void clear_load();
  ::PROTOBUF_NAMESPACE_ID::uint32 load() const;
  void set_load(::PROTOBUF_NAMESPACE_ID::uint32 value);
  private:
  ::PROTOBUF_NAMESPACE_ID::uint32 _internal_load() const;
  void _internal_set_load(::PROTOBUF_NAMESPACE_ID::uint32 value);
  public:

  // uint64 request_id = 5;
  void clear_request_id();
  ::PROTOBUF_NAMESPACE_ID::uint64 request_id() const;
  void set_request_id(::PROTOBUF_NAMESPACE_ID::uint64 value);
  private:
  ::PROTOBUF_NAMESPACE_ID::uint64 _internal_request_id() const;
  void _internal_set_request_id(::PROTOBUF_NAMESPACE_ID::uint64 value);

Service method implementation for server

  • Note:
    • Can either be sync or async server depending on how it does inheretence from RouteGuide::Service/AsyncService

Example implementation of a method containing messages on client

  void ListFeatures() {
    routeguide::Rectangle rect;
    Feature feature;
    ClientContext context;

    rect.mutable_lo()->set_latitude(400000000);
    rect.mutable_lo()->set_longitude(-750000000);
    rect.mutable_hi()->set_latitude(420000000);
    rect.mutable_hi()->set_longitude(-730000000);
    std::cout << "Looking for features between 40, -75 and 42, -73"
              << std::endl;

    std::unique_ptr<ClientReader<Feature> > reader(
        stub_->ListFeatures(&context, rect));
    while (reader->Read(&feature)) {
      std::cout << "Found feature called "
                << feature.name() << " at "
                << feature.location().latitude()/kCoordFactor_ << ", "
                << feature.location().longitude()/kCoordFactor_ << std::endl;
    }
    Status status = reader->Finish();
    if (status.ok()) {
      std::cout << "ListFeatures rpc succeeded." << std::endl;
    } else {
      std::cout << "ListFeatures rpc failed." << std::endl;
    }
  }

Enhancement:

  1. Previous

Resource and Reference

  1. Intro https://github.com/sbueringer/kubecon-slides/tree/master/slides/2018-kubecon-eu

  2. CMakeList and building https://github.com/grpc/grpc/blob/master/BUILDING.md https://github.com/grpc/grpc/blob/v1.38.0/examples/cpp/route_guide/CMakeLists.txt

  3. Protobuf https://developers.google.com/protocol-buffers/docs/proto3

  4. GRPC Basic Tutorial: https://grpc.io/docs/languages/cpp/basics/

  5. GRPC Async https://grpc.io/docs/languages/cpp/async/

  6. GRPC C++ Core API https://grpc.github.io/grpc/cpp/

GRPC complletion queue thread model in tensorflow

  • Adds an experimental function to C++ Completion Queues DoThenAsyncNext. It allows users to pass a lambda to be executed before the AsyncNext. If the lambda triggers a completion event, it is guaranteed to be delivered on the AsyncNext call.

https://github.com/grpc/grpc/commit/1bf7207852b4138c8a30e5a2f8f2c4bfffbba262