-
-
Notifications
You must be signed in to change notification settings - Fork 112
Getting Started
First, install the Cap'n Proto tools.
Then, run the following commands:
go install capnproto.org/go/capnp/v3/capnpc-go@latest # install go compiler plugin
GO111MODULE=off go get -u capnproto.org/go/capnp/v3/ # install go-capnproto to $GOPATH
Cap'n Proto works by generating code: your schema can be converted to code for any language that Cap'n Proto supports.
Consider the following schema, stored in foo/books.capnp
:
using Go = import "/go.capnp";
@0x85d3acc39d94e0f8;
$Go.package("books");
$Go.import("foo/books");
struct Book {
title @0 :Text;
# Title of the book.
pageCount @1 :Int32;
# Number of pages in the book.
}
capnpc-go requires two annotations for all files: package
and import
. package
is needed to know what package to place at the head of the generated file and what identifier to use when referring to the type from another package. import
should be the fully qualified import path, and is used to generate the import statement from other packages and to detect when two types are in the same package. Compilation will fail unless these annotations are present.
To compile this schema into Go code, run the following command:
capnp compile -I$GOPATH/src/capnproto.org/go/capnp/std -ogo foo/books.capnp
This generates the file foo/books.capnp.go
. If you encounter the error:
go: no such plugin (executable should be 'capnpc-go')
go: plugin failed: exit code 1
then ensure your $PATH
is set to include $GOPATH/bin
(e.g. export PATH=$PATH:$GOPATH/bin
or similar).
The data structures contained in foo/books.capnp.go
are special Go structs that can be imported into your programs. Moreover, these can be written to byte-streams.
package main
import (
"os"
"foo/books"
"capnproto.org/go/capnp/v3"
)
func main() {
// Make a brand new empty message. A Message allocates Cap'n Proto structs.
msg, seg, err := capnp.NewMessage(capnp.SingleSegment(nil))
if err != nil {
panic(err)
}
// Create a new Book struct. Every message must have a root struct.
book, err := books.NewRootBook(seg)
if err != nil {
panic(err)
}
book.SetTitle("War and Peace")
book.SetPageCount(1440)
// Write the message to stdout.
err = capnp.NewEncoder(os.Stdout).Encode(msg)
if err != nil {
panic(err)
}
}
These datatypes can also be read from byte-streams, as follows:
package main
import (
"fmt"
"os"
"foo/books"
"capnproto.org/go/capnp/v3"
)
func main() {
// Read the message from stdin.
msg, err := capnp.NewDecoder(os.Stdin).Decode()
if err != nil {
panic(err)
}
// Extract the root struct from the message.
book, err := books.ReadRootBook(msg)
if err != nil {
panic(err)
}
// Access fields from the struct.
title, err := book.Title()
if err != nil {
panic(err)
}
pageCount := book.PageCount()
fmt.Printf("%q has %d pages\n", title, pageCount)
}
In addition, each type has a .Message()
method that returns a capnp.Message
, which can be directly marshaled into []byte
s using Message.Marshal
, and unmarshaled with a corresponding call to Message.Unmarshal
.
Lastly, packed encodings are supported via the following methods:
Serializing data structures is useful, but the real power of Cap'n Proto is the RPC Protocol. RPC (remote procedure call) is a form of network communication that allows you to make function calls between processes, often on different machines. Just by providing a byte stream -- like a pipe or a network connection -- you can make method calls on objects in other processes.
The first step is to define an interface inside your schema:
using Go = import "/go.capnp";
@0xf454c62f08bc504b;
$Go.package("logger");
$Go.import("logger");
struct Level {
enum LogLevel {
debug @0;
info @1;
error @2;
}
level @0 :LogLevel;
}
interface Logger {
debug @0 (msg :Text) -> ();
info @1 (msg :Text) -> ();
error @2 (msg :Text) -> ();
getLogLevel @3 () -> (level :Level);
}
When defining an interface in Cap'n Proto, it's helpful to think of the generated RPC code as a "server". While it's not a server in the standard sense of a gRPC or HTTP server, there are several architectural patterns which you can use that will ease the overall adoption of Cap'n Proto if you think of the generated RPC implementation as a server.
The recommended pattern while learning Cap'n Proto is the adapter pattern, as it's likely going to be the easiest to integrate into your existing code base. Using the example interface schema, you would also want a native Go interface and equivalent type:
type LogLevel uint16
type ILogger interface {
Debug(msg string)
Info(msg string)
Error(msg string)
}
It's helpful to think of the generated RPC "server" interface as being the adapter to the actual implementation of the internal ILogger
. Ultimately, you can implement the generated Logger_Server
as an implementation of ILogger
, but it's not recommended. This inherently ties your code to go-capnproto2, and makes it hard to decouple or safely maintain. Generally, you'll want a native Go interface that's identical to your Cap'n Proto interface definition and use the generated RPC "server" as the adapter between your native Go interface and whatever is making the RPC call.
Now you need an actual implementation. This example uses the stdlib log
package for simplicity as a way to represent your existing code base.
var _ ILogger = (*internalLogger)(nil)
type internalLogger struct {
logger *log.Logger
}
func (il *internalLogger) Debug(msg string) {
il.logger.Print(msg)
}
func (il *internalLogger) Info(msg string) {
il.logger.Print(msg)
}
func (il *internalLogger) Error(msg string) {
il.logger.Fatal(msg)
}
func (il *internalLogger) GetLogLevel() LogLevel {
return LogLevel(0)
}
Using the adapter pattern, there are two adaptations that you want to make, the first being for the enum:
var logLevelAdapter = map[LogLevel]pkg.Level_LogLevel{
0: pkg.Level_LogLevel_debug,
1: pkg.Level_LogLevel_info,
2: pkg.Level_LogLevel_error,
}
var logLevelReverseAdapter = map[pkg.Level_LogLevel]LogLevel{
pkg.Level_LogLevel_debug: LogLevel(0),
pkg.Level_LogLevel_info: LogLevel(1),
pkg.Level_LogLevel_error: LogLevel(2),
}
And the second being for the "server" implementation:
var _ pkg.Logger_Server = (*loggerAdapterServer)(nil)
type loggerAdapterServer struct {
*internalLogger
}
func (las *loggerAdapterServer) Debug(_ context.Context, call pkg.Logger_debug) error {
msgPayload, err := call.Args().Msg()
if err != nil {
return err
}
las.internalLogger.Debug(msgPayload)
return nil
}
func (las *loggerAdapterServer) Info(_ context.Context, call pkg.Logger_info) error {
msgPayload, err := call.Args().Msg()
if err != nil {
return err
}
las.internalLogger.Info(msgPayload)
return nil
}
func (las *loggerAdapterServer) Error(_ context.Context, call pkg.Logger_error) error {
msgPayload, err := call.Args().Msg()
if err != nil {
return err
}
las.internalLogger.Error(msgPayload)
return nil
}
func (las *loggerAdapterServer) GetLogLevel(_ context.Context, call pkg.Logger_getLogLevel) error {
results, err := call.AllocResults()
if err != nil {
return err
}
level := logLevelAdapter[las.internalLogger.GetLogLevel()]
levelMsg, err := results.NewLevel()
if err != nil {
return err
}
levelMsg.SetLevel(level)
return nil
}
You will also need a "serving" method, which is what accepts the RPC calls over the io.ReadWriteCloser
implementation.
func serveLogger(ctx context.Context, rwc io.ReadWriteCloser) error {
// Create a new locally implemented logger.
main := pkg.Logger_ServerToClient(&loggerAdapterServer{
internalLogger: &internalLogger{logger: log.New(os.Stdout, "", log.LstdFlags)},
})
// Listen for calls, using the logger as the bootstrap interface.
conn := rpc.NewConn(rpc.NewStreamTransport(rwc), &rpc.Options{
BootstrapClient: main.Client,
})
defer conn.Close()
// Wait for connection to abort.
select {
case <-conn.Done():
return nil
case <-ctx.Done():
return conn.Close()
}
}
Once all this is put together, then you can consume the remote interface in a fairly straightforward manner!
func main() {
ctx := context.Background()
// minic the network with a net pipe
// what you call it doesn't matter, they're just named this
// way so it's easier to understand
serverSideConn, clientSideConn := net.Pipe()
// "start the server"
go serveLogger(ctx, serverSideConn)
// now we build a client with the defaults
clientConn := rpc.NewConn(rpc.NewStreamTransport(clientSideConn), nil)
defer clientConn.Close()
// now we generate the "client"
// it's just called "logger" to make it a bit easier to understand
logger := pkg.Logger{Client: clientConn.Bootstrap(ctx)}
// send an info message!
// we have to set the params in the delegate because of how capnp
// might wait to send the message, so this allows it to allocate memory
// more effectively.
_, freeInfoMsg := logger.Info(ctx, func(params pkg.Logger_info_Params) error {
err := params.SetMsg("info message!")
if err != nil {
return err
}
return nil
})
defer freeInfoMsg()
logLevelResponse, freelogLevelResp := logger.GetLogLevel(ctx, func(params pkg.Logger_getLogLevel_Params) error {
return nil
})
defer freelogLevelResp()
logLevelResults, err := logLevelResponse.Struct()
if err != nil {
panic(err)
}
level, err := logLevelResults.Level()
if err != nil {
panic(err)
}
fmt.Printf("log level is %d", logLevelReverseAdapter[level.Level()])
}
You can generate the logger schema with mkdir pkg && capnp compile -I $(go env GOPATH)/src/capnproto.org/go/capnp/std -ogo:pkg *.capnp
- then run go get -v ./...
to get all the necessary packages. The code below is all of the same code that's referenced above, just compiled into a single file for easy review.
package main
import (
"context"
"fmt"
"io"
"log"
"net"
"os"
"capnproto.org/go/capnp/v3/rpc"
"capnproto.org/go/capnp/v3/server"
"logger/pkg"
)
func main() {
ctx := context.Background()
// minic the network with a net pipe
// what you call it doesn't matter, they're just named this
// way so it's easier to understand
serverSideConn, clientSideConn := net.Pipe()
// "start the server"
go serveLogger(ctx, serverSideConn)
// now we build a client with the defaults
clientConn := rpc.NewConn(rpc.NewStreamTransport(clientSideConn), nil)
defer clientConn.Close()
// now we generate the "client"
// it's just called "logger" to make it a bit easier to understand
logger := pkg.Logger{Client: clientConn.Bootstrap(ctx)}
// send an info message!
// we have to set the params in the delegate because of how capnp
// might wait to send the message, so this allows it to allocate memory
// more effectively.
_, freeInfoMsg := logger.Info(ctx, func(params pkg.Logger_info_Params) error {
err := params.SetMsg("info message!")
if err != nil {
return err
}
return nil
})
defer freeInfoMsg()
logLevelResponse, freelogLevelResp := logger.GetLogLevel(ctx, func(params pkg.Logger_getLogLevel_Params) error {
return nil
})
defer freelogLevelResp()
logLevelResults, err := logLevelResponse.Struct()
if err != nil {
panic(err)
}
level, err := logLevelResults.Level()
if err != nil {
panic(err)
}
fmt.Printf("log level is %d", logLevelReverseAdapter[level.Level()])
}
var logLevelReverseAdapter = map[pkg.Level_LogLevel]LogLevel{
pkg.Level_LogLevel_debug: LogLevel(0),
pkg.Level_LogLevel_info: LogLevel(1),
pkg.Level_LogLevel_error: LogLevel(2),
}
// ILogger is the internal golang implementation
type ILogger interface {
Debug(msg string)
Info(msg string)
Error(msg string)
GetLogLevel() LogLevel
}
type LogLevel uint16
var _ ILogger = (*internalLogger)(nil)
type internalLogger struct {
logger *log.Logger
}
func (il *internalLogger) Debug(msg string) {
il.logger.Print(msg)
}
func (il *internalLogger) Info(msg string) {
il.logger.Print(msg)
}
func (il *internalLogger) Error(msg string) {
il.logger.Fatal(msg)
}
func (il *internalLogger) GetLogLevel() LogLevel {
return LogLevel(0)
}
var logLevelAdapter = map[LogLevel]pkg.Level_LogLevel{
0: pkg.Level_LogLevel_debug,
1: pkg.Level_LogLevel_info,
2: pkg.Level_LogLevel_error,
}
var _ pkg.Logger_Server = (*loggerAdapterServer)(nil)
type loggerAdapterServer struct {
*internalLogger
}
func (las *loggerAdapterServer) Debug(_ context.Context, call pkg.Logger_debug) error {
msgPayload, err := call.Args().Msg()
if err != nil {
return err
}
las.internalLogger.Debug(msgPayload)
return nil
}
func (las *loggerAdapterServer) Info(_ context.Context, call pkg.Logger_info) error {
msgPayload, err := call.Args().Msg()
if err != nil {
return err
}
las.internalLogger.Info(msgPayload)
return nil
}
func (las *loggerAdapterServer) Error(_ context.Context, call pkg.Logger_error) error {
msgPayload, err := call.Args().Msg()
if err != nil {
return err
}
las.internalLogger.Error(msgPayload)
return nil
}
func (las *loggerAdapterServer) GetLogLevel(_ context.Context, call pkg.Logger_getLogLevel) error {
results, err := call.AllocResults()
if err != nil {
return err
}
level := logLevelAdapter[las.internalLogger.GetLogLevel()]
levelMsg, err := results.NewLevel()
if err != nil {
return err
}
levelMsg.SetLevel(level)
return nil
}
func serveLogger(ctx context.Context, rwc io.ReadWriteCloser) error {
// Create a new locally implemented logger.
main := pkg.Logger_ServerToClient(&loggerAdapterServer{
internalLogger: &internalLogger{logger: log.New(os.Stdout, "", log.LstdFlags)},
})
// Listen for calls, using the logger as the bootstrap interface.
conn := rpc.NewConn(rpc.NewStreamTransport(rwc), &rpc.Options{
BootstrapClient: main.Client,
})
defer conn.Close()
// Wait for connection to abort.
select {
case <-conn.Done():
return nil
case <-ctx.Done():
return conn.Close()
}
}
For the sake of simplicity, this example uses an in-memory pipe, but you can use TCP connections, Unix pipes, or any other type that implements io.ReadWriteCloser
.
The return type for a client call is a promise, not an immediate value.
It isn't until the Struct()
method is called on a method that the client
function blocks on the remote side.
This relies on a feature of the Cap'n Proto RPC protocol called promise pipelining.
The upshot is that this function only requires one network round trip to receive its results, even though there were multiple chained calls.
This is one of Cap'n Proto's key advantages.
For more details on writing schemas, see the Cap'n Proto language reference. The capnp package docs detail the encoding and client API, whereas the rpc package docs detail how to initiate an RPC connection.