掌握了gRPC的基本原理后,我们可以借助它来实现日志的微服务功能。在构建高并发系统时,内部的服务组件通常使用gRPC来实现高效数据传输,因此我们把前面使用json完成的日志服务改成用gRPC来完成。
第一步还是要定义proto文件,修改proglog/api/v1下面的log.proto文件:
syntax = "proto3";
package log.v1;
option go_package = "api/log_v1";
service Log {
rpc Produce(ProduceRequest) returns (ProduceResponse) {}
rpc Consume(ConsumeRequest) returns (ConsumeResponse) {}
rpc ConsumeStream(ConsumeRequest) returns (stream ConsumeResponse){}
rpc ProduceStream(stream ProduceRequest) returns (stream ProduceResponse) {}
}
message Record {
bytes value = 1;
uint64 offset = 2;
}
message ProduceRequest {
Record record = 1;
}
message ProduceResponse {
uint64 offset = 1;
}
message ConsumeRequest {
uint64 offset = 1;
}
message ConsumeResponse {
Record record = 2;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
代码的逻辑跟前面几节我们尝试使用gRPC时的proto文件定义逻辑没什么不同,Produce接口是客户端向服务端提交一条日志信息,Consume是客户端向服务端提交日志编号,然后服务端返回日志信息,ConsumeStream是客户端向服务端提交一连串的日志编号,然后服务端返回一连串的日志信息,ProduceStream是客户端向服务端提交一连串的日志信息,然后服务端返回日志添加后对应的编号。完成上面proto文件定义后,将它编译出对应的pb.go文件,这些文件会放置在api/v1/api_log_v1目录下,然后我们看看服务端的逻辑设计。
在internal/server下新建server.go文件,首先我们添加依赖模块,同时生成gRPC服务器对象,并注册我们要实现的接口:
package server
import (
"context"
api "api/v1/api/log_v1"
"google.golang.org/grpc"
)
type commitLog interface {
Append(*api.Record) (uint64, error)
Read(uint64) (*api.Record, error)
}
type Config struct { //实现依赖注入
CommitLog commitLog
}
var _ api.LogServer = (*grpcServer)(nil) //gRPC服务器对象
func NewGRPCServer(config *Config) (*grpc.Server, error) {
gsrv := grpc.NewServer()
srv, err := newgrpcServer(config)
if err != nil {
return nil, err
}
api.RegisterLogServer(gsrv, srv)
return gsrv, nil
}
type grpcServer struct {
api.UnimplementedLogServer
*Config
}
func newgrpcServer(config *Config)(srv *grpcServer, err error) {
srv = &grpcServer { //grpcServer会实现proto文件里面定义的接口
Config: config,
}
return srv, nil
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
在上面代码中有一点需要注意,那就是它使用了常用的设计模式叫依赖注入,我们的服务需要使用到日志模块提供的功能,但是我们这里只需要知道日志模块提供的接口,也就是Append和Read,我们不需要关心它的具体实现,这样我们就能实现逻辑上的解耦合,在启动我们的服务程序时,只需要调用者将实现了commitLog接口的实例传给我们即可,至于接口的实现细节我们不需要关心,通过依赖注入这种设计模式能够使得系统设计的复杂度降低,灵活性提升。
接下来就是对四个服务接口的实现,其逻辑跟我们前两节做的没有什么区别:
func (s *grpcServer) Produce(ctx context.Context,
req *api.ProduceRequest) (*api.ProduceResponse, error){
//收到客户端发来的日志添加请求,然后调用日志模块Append接口进行添加
offset, err := s.CommitLog.Append(req.Record)
if err != nil {
return nil, err
}
//添加完成后返回日志编号
return &api.ProduceResponse{Offset: offset}, nil
}
func (s *grpcServer) Consume(ctx context.Context,
req *api.ConsumeRequest)(*api.ConsumeResponse, error) {
//收到客户端发来的日志编号,返回日志内容
record, err := s.CommitLog.Read(req.Offset)
if err != nil {
return nil, err
}
return &api.ConsumeResponse{Record: record}, nil
}
func (s *grpcServer) ProduceStream (stream api.Log_ProduceStreamServer) error {
for {
//客户端发来一系列日志数据,服务端通过Recv()依次收取,然后将日志进行添加
req, err := stream.Recv()
if err != nil {
return err
}
res, err := s.Produce(stream.Context(), req)
if err != nil {
return err
}
if err = stream.Send(res); err != nil {
return err
}
}
}
func (s *grpcServer) ConsumeStream(req *api.ConsumeRequest, stream api.Log_ConsumeStreamServer) error {
for {
//客户端发来一系列日志编号,服务端返回一系列与编号对应的日志内容
select {
case <-stream.Context().Done():
//进入这里表明客户端终端了连接
return nil
default:
res, err := s.Consume(stream.Context(), req)
switch err.(type){
case nil:
case api.ErrorOffsetOutOfRange:
continue
default:
return err
}
//将获得的日志信息发送给客户端
if err = stream.Send(res); err != nil {
return err
}
req.Offset++
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
上面代码的实现逻辑与我们前面描述的一模一样,因此没有多少可以探究的地方,最后我们测试一下上面代码的实现,新建server_test.go,添加内容如下:
package server
import (
"context"
"io/ioutil"
"net"
"testing"
"github.com/stretchr/testify/require"
api "api/v1/api/log_v1"
"internal/log"
"google.golang.org/grpc"
)
func TestServer(t *testing.T) {
for scenario, fn := range map[string]func(
t *testing.T,
client api.LogClient,
config *Config,
) {
"produce/consume a meesage to/from the log success": testProduceConsume ,
"produce/consume stream success": testProduceConsumeStream,
"consume past log boundary fails: ": testConsumePastBoundary,
} {
t.Run(scenario, func(t *testing.T) {
//在运行测试用例前先创建服务端对象
client, config, teardown := setupTest(t, nil)
defer teardown() //关闭服务端
fn(t, client, config)
})
}
}
func setupTest(t *testing.T, fn func(*Config)) (client api.LogClient, cfg*Config, teardown func()) {
t.Helper()
//生成tcp连接,使用0意味着使用随机端口
l, err := net.Listen("tcp", ":0")
require.NoError(t, err)
clientOptions := []grpc.DialOption{grpc.WithInsecure()}
cc, err := grpc.Dial(l.Addr().String() , clientOptions...)
require.NoError(t, err)
dir, err := ioutil.TempDir("", "server-test")
require.NoError(t, err)
clog, err := log.NewLog(dir, log.Config{})
require.NoError(t, err)
cfg = &Config{
CommitLog: clog,
}
if fn != nil {
fn(cfg)
}
//创建服务端对象
server, err := NewGRPCServer(cfg)
require.NoError(t ,err)
go func() {
//启动服务端
server.Serve(l)
}()
//创建客户端对象
client = api.NewLogClient(cc)
return client, cfg, func() {
server.Stop()
cc.Close()
l.Close()
clog.Remove()
}
}
func testProduceConsume(t *testing.T, client api.LogClient, config*Config) {
ctx := context.Background()
want := &api.Record{
Value: []byte("hello world"),
}
//客户端提交一条日志,然后拿到日志编号后再用于请求日志内容,检验服务端返回的日志内容与提交的是否一致
produce, err := client.Produce(ctx, &api.ProduceRequest{
Record: want,
})
require.NoError(t, err)
consume, err := client.Consume(ctx, &api.ConsumeRequest{
Offset: produce.Offset,
})
require.NoError(t, err)
require.Equal(t, want.Value, consume.Record.Value)
require.Equal(t, want.Offset, consume.Record.Offset)
}
func testConsumePastBoundary(t *testing.T, client api.LogClient, config *Config) {
ctx := context.Background()
produce, err := client.Produce(ctx, &api.ProduceRequest{
Record: &api.Record {
Value: []byte("hello world"),
},
})
//使用不存在的日志编号进行请求,服务端应该返回相应错误
require.NoError(t, err)
consume, err := client.Consume(ctx, &api.ConsumeRequest{
Offset: produce.Offset + 1,
})
if consume != nil {
t.Fatal("consume not nil")
}
got := grpc.Code(err)
want := grpc.Code(api.ErrorOffsetOutOfRange{}.GRPCStatus().Err())
if got != want {
t.Fatalf("got err: %v, want %v", got, want)
}
}
func testProduceConsumeStream(t *testing.T, client api.LogClient, config *Config) {
ctx := context.Background()
records := []*api.Record{{
Value: []byte("first message"),
Offset: 0,
},
{
Value: []byte("second message"),
Offset: 0,
},
}
//客户端向服务端提交多个日志,获得多个日志编号,然后再提交获得的编号,从而让服务端返回一系列日志数据
//接着比对服务端返回的日志内容和服务端是否一致
{
stream, err := client.ProduceStream(ctx)
require.NoError(t, err)
for offset, record := range records {
err = stream.Send(&api.ProduceRequest{
Record: record,
})
require.NoError(t, err)
res, err := stream.Recv()
require.NoError(t, err)
if res.Offset != uint64(offset) {
t.Fatalf("got offset: %d, want: %d", res.Offset, offset,)
}
}
}
{
stream, err := client.ConsumeStream(ctx, &api.ConsumeRequest{Offset: 0},)
require.NoError(t, err)
for i, record := range records{
res, err := stream.Recv()
require.NoError(t, err)
require.Equal(t, res.Record, &api.Record{
Value: record.Value,
Offset: uint64(i),
})
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
- 156.
- 157.
- 158.
- 159.
- 160.
测试代码的逻辑通过注释就能理解,在测试用例中,客户端的创建,数据的发送和接收跟我们前面描述的没什么区别,由此我们依靠gRPC框架就完成了日志服务,下一节我们看看gRPC框架提供的数据安全功能。