深圳幻海软件技术有限公司 欢迎您!

Go 实现分布式高可用后台:使用 gRPC 实现日志微服务

2023-02-28

掌握了gRPC的基本原理后,我们可以借助它来实现日志的微服务功能。在构建高并发系统时,内部的服务组件通常使用gRPC来实现高效数据传输,因此我们把前面使用json完成的日志服务改成用gRPC来完成。第一步还是要定义proto文件,修改proglog/api/v1下面的log.proto文件:复制sy

掌握了gRPC的基本原理后,我们可以借助它来实现日志的微服务功能。在构建高并发系统时,内部的服务组件通常使用gRPC来实现高效数据传输,因此我们把前面使用json完成的日志服务改成用gRPC来完成。

第一步还是要定义proto文件,修改proglog/api/v1下面的log.proto文件:

syntax = "proto3";
package log.v1;

option go_package = "api/log_v1";

service Log {
    rpc Produce(ProduceRequest) returns (ProduceResponse) {}
    rpc Consume(ConsumeRequest) returns (ConsumeResponse) {}
    rpc ConsumeStream(ConsumeRequest) returns (stream ConsumeResponse){}
    rpc ProduceStream(stream ProduceRequest) returns (stream ProduceResponse) {}
}

message Record {
    bytes value = 1;
    uint64 offset = 2;
}

message ProduceRequest {
    Record record = 1;
}

message ProduceResponse {
    uint64 offset = 1;
}

message ConsumeRequest {
    uint64 offset = 1;
}

message ConsumeResponse {
    Record record = 2;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.

代码的逻辑跟前面几节我们尝试使用gRPC时的proto文件定义逻辑没什么不同,Produce接口是客户端向服务端提交一条日志信息,Consume是客户端向服务端提交日志编号,然后服务端返回日志信息,ConsumeStream是客户端向服务端提交一连串的日志编号,然后服务端返回一连串的日志信息,ProduceStream是客户端向服务端提交一连串的日志信息,然后服务端返回日志添加后对应的编号。完成上面proto文件定义后,将它编译出对应的pb.go文件,这些文件会放置在api/v1/api_log_v1目录下,然后我们看看服务端的逻辑设计。

在internal/server下新建server.go文件,首先我们添加依赖模块,同时生成gRPC服务器对象,并注册我们要实现的接口:

package server 

import (
    "context"
    api "api/v1/api/log_v1"
    "google.golang.org/grpc"
)

type commitLog interface {
    Append(*api.Record) (uint64, error)
    Read(uint64) (*api.Record, error)
}

type Config struct { //实现依赖注入
    CommitLog commitLog 
}

var _ api.LogServer = (*grpcServer)(nil) //gRPC服务器对象

func NewGRPCServer(config *Config) (*grpc.Server, error) {
    gsrv := grpc.NewServer()
    srv, err := newgrpcServer(config) 
    if err != nil {
        return nil, err 
    }

    api.RegisterLogServer(gsrv, srv)
    return gsrv, nil 
}

type grpcServer struct {
    api.UnimplementedLogServer 
    *Config 
}

func newgrpcServer(config *Config)(srv *grpcServer, err error) {
    srv = &grpcServer { //grpcServer会实现proto文件里面定义的接口
        Config: config,
    }

    return srv, nil 
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.

在上面代码中有一点需要注意,那就是它使用了常用的设计模式叫依赖注入,我们的服务需要使用到日志模块提供的功能,但是我们这里只需要知道日志模块提供的接口,也就是Append和Read,我们不需要关心它的具体实现,这样我们就能实现逻辑上的解耦合,在启动我们的服务程序时,只需要调用者将实现了commitLog接口的实例传给我们即可,至于接口的实现细节我们不需要关心,通过依赖注入这种设计模式能够使得系统设计的复杂度降低,灵活性提升。

接下来就是对四个服务接口的实现,其逻辑跟我们前两节做的没有什么区别:

func (s *grpcServer) Produce(ctx context.Context, 
    req *api.ProduceRequest) (*api.ProduceResponse, error){
    //收到客户端发来的日志添加请求,然后调用日志模块Append接口进行添加
    offset, err := s.CommitLog.Append(req.Record)
    if err != nil {
        return nil, err 
    }

    //添加完成后返回日志编号
    return &api.ProduceResponse{Offset: offset}, nil 
}

func (s *grpcServer) Consume(ctx context.Context, 
    req *api.ConsumeRequest)(*api.ConsumeResponse, error) {
        //收到客户端发来的日志编号,返回日志内容
        record, err := s.CommitLog.Read(req.Offset)
        if err != nil {
            return nil, err 
        }

        return &api.ConsumeResponse{Record: record}, nil 
}

func (s *grpcServer) ProduceStream (stream api.Log_ProduceStreamServer) error {
    for {
        //客户端发来一系列日志数据,服务端通过Recv()依次收取,然后将日志进行添加
        req, err := stream.Recv()
        if err != nil {
            return err 
        }
        res, err := s.Produce(stream.Context(), req)
        if err != nil {
            return err 
        }
        if err = stream.Send(res); err != nil {
            return err 
        }
    }
}

func (s *grpcServer) ConsumeStream(req *api.ConsumeRequest, stream api.Log_ConsumeStreamServer) error {
    for {
        //客户端发来一系列日志编号,服务端返回一系列与编号对应的日志内容
        select {
        case <-stream.Context().Done():
            //进入这里表明客户端终端了连接
            return nil 
        default:
            res, err := s.Consume(stream.Context(), req)
            switch err.(type){
            case nil:
            case api.ErrorOffsetOutOfRange:
                continue 
            default:
                return err 
            }
            //将获得的日志信息发送给客户端
            if err = stream.Send(res); err != nil {
                return err 
            }
            req.Offset++
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.

上面代码的实现逻辑与我们前面描述的一模一样,因此没有多少可以探究的地方,最后我们测试一下上面代码的实现,新建server_test.go,添加内容如下:

package server 

import (
    "context"
    "io/ioutil"
    "net"
    "testing"
    "github.com/stretchr/testify/require"
    api "api/v1/api/log_v1"
    "internal/log"
    "google.golang.org/grpc"
)

func TestServer(t *testing.T) {
    for scenario, fn := range map[string]func(
        t *testing.T,
        client api.LogClient,
        config *Config,
    ) {
        "produce/consume a meesage to/from the log success": testProduceConsume ,
        "produce/consume stream success": testProduceConsumeStream,
        "consume past log boundary fails: ": testConsumePastBoundary,
    } {
        t.Run(scenario, func(t *testing.T) {
            //在运行测试用例前先创建服务端对象
            client, config, teardown := setupTest(t, nil)
            defer teardown() //关闭服务端
            fn(t, client, config)
        })
    }
}

func setupTest(t *testing.T, fn func(*Config)) (client api.LogClient, cfg*Config, teardown func()) {
    t.Helper()
    //生成tcp连接,使用0意味着使用随机端口
    l, err := net.Listen("tcp", ":0")
    require.NoError(t, err)

    clientOptions := []grpc.DialOption{grpc.WithInsecure()}
    cc, err := grpc.Dial(l.Addr().String() , clientOptions...)
    require.NoError(t, err)

    dir, err := ioutil.TempDir("", "server-test")
    require.NoError(t, err)

    clog, err := log.NewLog(dir, log.Config{})
    require.NoError(t, err)

    cfg = &Config{
        CommitLog: clog,
    }
    if fn != nil {
        fn(cfg)
    }

    //创建服务端对象
    server, err := NewGRPCServer(cfg)
    require.NoError(t ,err)
    go func() {
        //启动服务端
        server.Serve(l)
    }()
    //创建客户端对象
    client = api.NewLogClient(cc)
    return client, cfg, func() {
        server.Stop()
        cc.Close()
        l.Close()
        clog.Remove()
    }
}

func testProduceConsume(t *testing.T, client api.LogClient, config*Config) {
    ctx := context.Background()
    want := &api.Record{
        Value: []byte("hello world"),
    }
    //客户端提交一条日志,然后拿到日志编号后再用于请求日志内容,检验服务端返回的日志内容与提交的是否一致
    produce, err := client.Produce(ctx, &api.ProduceRequest{
        Record: want,
    })

    require.NoError(t, err)
    consume, err := client.Consume(ctx, &api.ConsumeRequest{
        Offset: produce.Offset,
    })

    require.NoError(t, err)
    require.Equal(t, want.Value, consume.Record.Value)
    require.Equal(t, want.Offset, consume.Record.Offset)
}

func testConsumePastBoundary(t *testing.T, client api.LogClient, config *Config) {
    ctx := context.Background()
    produce, err := client.Produce(ctx, &api.ProduceRequest{
        Record: &api.Record {
            Value: []byte("hello world"),
        },
    })

    //使用不存在的日志编号进行请求,服务端应该返回相应错误
    require.NoError(t, err)
    consume, err := client.Consume(ctx, &api.ConsumeRequest{
        Offset: produce.Offset + 1,
    })

    if consume != nil {
        t.Fatal("consume not nil")
    }

    got := grpc.Code(err)
    want := grpc.Code(api.ErrorOffsetOutOfRange{}.GRPCStatus().Err())
    if got != want {
        t.Fatalf("got err: %v, want %v", got, want)
    }
}

func testProduceConsumeStream(t *testing.T, client api.LogClient, config *Config) {
    ctx := context.Background()
    records := []*api.Record{{
        Value: []byte("first message"),
        Offset: 0,
    },
    {
        Value: []byte("second message"),
        Offset: 0,
    },
    }
    //客户端向服务端提交多个日志,获得多个日志编号,然后再提交获得的编号,从而让服务端返回一系列日志数据
    //接着比对服务端返回的日志内容和服务端是否一致
    {
        stream, err := client.ProduceStream(ctx)
        require.NoError(t, err)

        for offset, record := range records {
            err = stream.Send(&api.ProduceRequest{
                Record: record,
            })
            require.NoError(t, err)
            res, err := stream.Recv()
            require.NoError(t, err)
            if res.Offset != uint64(offset) {
                t.Fatalf("got offset: %d, want: %d", res.Offset, offset,)
            }
        }
    }

    {
        stream, err := client.ConsumeStream(ctx, &api.ConsumeRequest{Offset: 0},)
        require.NoError(t, err)
        for i, record := range records{
            res, err := stream.Recv()
            require.NoError(t, err)
            require.Equal(t, res.Record, &api.Record{
                Value: record.Value,
                Offset: uint64(i),
            })
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.

测试代码的逻辑通过注释就能理解,在测试用例中,客户端的创建,数据的发送和接收跟我们前面描述的没什么区别,由此我们依靠gRPC框架就完成了日志服务,下一节我们看看gRPC框架提供的数据安全功能。