
介绍
grpc 流允许 protobuf 消息从客户端流式传输到服务器、从服务器流式传输到客户端,或者双向流式传输。 
这一强大的功能可用于构建实时应用程序,例如聊天应用程序、实时监控仪表板等。
在本文中,我们将探讨如何正确使用 grpc 流。
先决条件
- grpc基础知识
- go 编程语言的基础知识(示例代码是用 go 编写的,但这个概念也可以应用于其他语言)
- 代码示例可在 github 上获取
良好实践
让我们检查一下使用 grpc 流的良好实践:
使用一元请求进行一元请求
一个常见的错误是对一元请求使用流式传输。 
例如,考虑以下 grpc 服务定义:
service myservice {
  rpc getsomething (somethingrequest) returns (stream somethingresponse) {}
}
如果客户端只需要发送一个请求并接收一个响应,
您不需要使用流式传输。相反,我们可以按如下方式定义服务:
service myservice {
  rpc getsomething (somethingrequest) returns (somethingresponse) {}
}
通过对一元请求使用流式传输,我们增加了不必要的复杂性
到代码,这可能会使其更难理解和维护,而不是 
从使用流媒体中获得任何好处。
比较一元请求和流请求的 golang 代码示例:
一元请求:
type somethingunary struct {
    pb.unimplementedsomethingunaryserver
}
func (s *somethingunary) getsomething(ctx context.context, req *pb.somethingrequest) (*pb.somethingresponse, error) {
    return &pb.somethingresponse{
        message: "hello " + req.name,
    }, nil
}
func testsomethingunary(t *testing.t) {
    conn := newserver(t, func(s grpc.serviceregistrar) {
        pb.registersomethingunaryserver(s, &somethingunary{})
    })
    client := pb.newsomethingunaryclient(conn)
    response, err := client.getsomething(
        context.background(),
        &pb.somethingrequest{
            name: "test",
        },
    )
    if err != nil {
        t.fatalf("failed to get something: %v", err)
    }
    if response.message != "hello test" {
        t.errorf("unexpected response: %v", response.message)
    }
}
流式一元请求:
type somethingstream struct {
    pb.unimplementedsomethingstreamserver
}
func (s *somethingstream) getsomething(req *pb.somethingrequest, stream pb.somethingstream_getsomethingserver) error {
    if err := stream.send(&pb.somethingresponse{
        message: "hello " + req.name,
    }); err != nil {
        return err
    }
    return nil
}
func testsomethingstream(t *testing.t) {
    conn := newserver(t, func(s grpc.serviceregistrar) {
        pb.registersomethingstreamserver(s, &somethingstream{})
    })
    client := pb.newsomethingstreamclient(conn)
    stream, err := client.getsomething(
        context.background(),
        &pb.somethingrequest{
            name: "test",
        },
    )
    if err != nil {
        t.fatalf("failed to get something stream: %v", err)
    }
    response, err := stream.recv()
    if err != nil {
        t.fatalf("failed to receive response: %v", err)
    }
    if response.message != "hello test" {
        t.errorf("unexpected response: %v", response.message)
    }
}
我们可以看到,一元请求的代码更简单,更容易理解
比流请求的代码。
如果可以的话,一次发送多个文档
让我们比较一下这两个服务定义:
service bookstore {
  rpc listbooks(listbooksrequest) returns (stream book) {}
}
service bookstorebatch {
  rpc listbooks(listbooksrequest) returns (stream listbooksresponse) {}
}
message listbooksresponse {
  repeated book books = 1;
}
bookstore 一次流式传输一本书,而 bookstorebatch 同时流式传输多本书。
如果客户端需要列出所有书籍,使用bookstorebatch 效率更高
因为它减少了客户端和服务器之间的往返次数。
让我们看看 bookstore 和 bookstorebatch 的 golang 代码示例:
书店:
type bookstore struct {
    pb.unimplementedbookstoreserver
}
func (s *bookstore) listbooks(req *pb.listbooksrequest, stream pb.bookstore_listbooksserver) error {
    for _, b := range bookstoredata {
        if b.author == req.author {
            if err := stream.send(&pb.book{
                title:           b.title,
                author:          b.author,
                publicationyear: int32(b.publicationyear),
                genre:           b.genre,
            }); err != nil {
                return err
            }
        }
    }
    return nil
}
func testbookstore_listbooks(t *testing.t) {
    conn := newserver(t, func(s grpc.serviceregistrar) {
        pb.registerbookstoreserver(s, &bookstore{})
    })
    client := pb.newbookstoreclient(conn)
    stream, err := client.listbooks(
        context.background(),
        &pb.listbooksrequest{
            author: charlesdickens,
        },
    )
    if err != nil {
        t.fatalf("failed to list books: %v", err)
    }
    books := []*pb.book{}
    for {
        book, err := stream.recv()
        if err != nil {
            break
        }
        books = append(books, book)
    }
    if len(books) != charlesdickensbooks {
        t.errorf("unexpected number of books: %d", len(books))
    }
}
书店批次:
type bookstorebatch struct {
    pb.unimplementedbookstorebatchserver
}
func (s *bookstorebatch) listbooks(req *pb.listbooksrequest, stream pb.bookstorebatch_listbooksserver) error {
    const batchsize = 10
    books := make([]*pb.book, 0, batchsize)
    for _, b := range bookstoredata {
        if b.author == req.author {
            books = append(books, &pb.book{
                title:           b.title,
                author:          b.author,
                publicationyear: int32(b.publicationyear),
                genre:           b.genre,
            })
            if len(books) == batchsize {
                if err := stream.send(&pb.listbooksresponse{
                    books: books,
                }); err != nil {
                    return err
                }
                books = books[:0]
            }
        }
    }
    if len(books) > 0 {
        if err := stream.send(&pb.listbooksresponse{
            books: books,
        }); err != nil {
            return nil
        }
    }
    return nil
}
func testbookstorebatch_listbooks(t *testing.t) {
    conn := newserver(t, func(s grpc.serviceregistrar) {
        pb.registerbookstorebatchserver(s, &bookstorebatch{})
    })
    client := pb.newbookstorebatchclient(conn)
    stream, err := client.listbooks(
        context.background(),
        &pb.listbooksrequest{
            author: charlesdickens,
        },
    )
    if err != nil {
        t.fatalf("failed to list books: %v", err)
    }
    books := []*pb.book{}
    for {
        response, err := stream.recv()
        if err != nil {
            break
        }
        books = append(books, response.books...)
    }
    if len(books) != charlesdickensbooks {
        t.errorf("unexpected number of books: %d", len(books))
    }
}
从上面的代码中,需要明确哪一个更好。
让我们运行一个基准测试来看看差异:
书店基准:
func benchmarkbookstore_listbooks(b *testing.b) {
    conn := newserver(b, func(s grpc.serviceregistrar) {
        pb.registerbookstoreserver(s, &bookstore{})
    })
    client := pb.newbookstoreclient(conn)
    var benchinnerbooks []*pb.book
    b.resettimer()
    for i := 0; i < b.n; i++ {
        stream, err := client.listbooks(
            context.background(),
            &pb.listbooksrequest{
                author: charlesdickens,
            },
        )
        if err != nil {
            b.fatalf("failed to list books: %v", err)
        }
        books := []*pb.book{}
        for {
            book, err := stream.recv()
            if err != nil {
                break
            }
            books = append(books, book)
        }
        benchinnerbooks = books
    }
    benchbooks = benchinnerbooks
}
bookstorebatch 基准:
func benchmarkbookstorebatch_listbooks(b *testing.b) {
    conn := newserver(b, func(s grpc.serviceregistrar) {
        pb.registerbookstorebatchserver(s, &bookstorebatch{})
    })
    client := pb.newbookstorebatchclient(conn)
    var benchinnerbooks []*pb.book
    b.resettimer()
    for i := 0; i < b.n; i++ {
        stream, err := client.listbooks(
            context.background(),
            &pb.listbooksrequest{
                author: charlesdickens,
            },
        )
        if err != nil {
            b.fatalf("failed to list books: %v", err)
        }
        books := []*pb.book{}
        for {
            response, err := stream.recv()
            if err != nil {
                break
            }
            books = append(books, response.books...)
        }
        benchinnerbooks = books
    }
    benchbooks = benchinnerbooks
}
基准测试结果:
benchmarkbookstore_listbooks benchmarkbookstore_listbooks-12 732 1647454 ns/op 85974 b/op 1989 allocs/op benchmarkbookstorebatch_listbooks benchmarkbookstorebatch_listbooks-12 1202 937491 ns/op 61098 b/op 853 allocs/op
多么大的进步啊! bookstorebatch 比 bookstore 快 1.75 倍。
但是为什么 bookstorebatch 比 bookstore 快?
服务器每次向客户端发送消息流.send(),都需要
对消息进行编码并通过网络发送。通过发送多个文件
我们立即减少了服务器需要编码和发送的次数
消息,不仅提高了服务器的性能,还提高了
对于需要解码消息的客户端。
在上面的例子中,批量大小设置为10,但客户端可以根据网络情况和文档大小进行调整。
使用双向流来控制流量
书店示例返回所有书籍并完成流,但如果客户端
需要实时观察事件(例如传感器),使用双向
直播是正确的选择。
双向流有点棘手,因为客户端和服务器都
可以同时发送和接收消息。希望 golang 能让这一切变得简单
像这样处理并发。
如前所述,传感器是双向流的一个很好的例子。
监视功能允许客户端决定监视和请求哪些传感器
如果需要的话,当前值。
让我们看一下下面的protobuf定义:
service sensor {
  rpc watch(stream watchrequest) returns (stream watchresponse) {}
}
message watchrequest {
  oneof request {
    watchcreaterequest create_request = 1;
    watchcancelrequest cancel_request = 2;
    watchnowrequest now_request = 3;
  }
}
message watchcreaterequest {
  // sensor_id contains the sensor id to watch.
  string sensor_id = 1;
}
message watchcancelrequest {
  // sensor_id contains the sensor id to cancel.
  string sensor_id = 1;
}
message watchnowrequest {
  // sensor_id contains the sensor id to get the current value.
  string sensor_id = 1;
}
message watchresponse {
  // sensor_id contains the sensor id for the current response.
  string sensor_id = 1;
  // created is true if the watch was created successfully.
  bool created = 2;
  // canceleted is true if the watch was canceled successfully or if the creation failed.
  bool canceleted = 3;
  // error contains the error message if something went wrong.
  string error = 4;
  // timestamp contains the timestamp of the value.
  google.protobuf.timestamp timestamp = 5;
  // value contains the value of the sensor.
  int32 value = 6;
}
请求消息不仅仅是消息流,更是一条可以
包含不同类型的请求。 oneof 指令允许我们定义一个
只能包含指定类型之一的字段。
传感器的 golang 代码将被忽略,但您可以在这里找到它
serverstream 包装流和传感器数据,使其更易于使用。
type serverstream struct {
    s           *sensorservice         // service
    stream      pb.sensor_watchserver  // stream
    sendch      chan *pb.watchresponse // control channel
    sensorch    chan sensordata        // data channel
    sensorwatch map[string]int         // map of sensor id to watch id
}
如前所述,服务器可以同时发送和接收消息,一个
函数将处理传入的消息,另一个函数将处理
传出消息。
接收消息:
func (ss *serverstream) recvloop() error {
    defer ss.close()
    for {
        req, err := ss.stream.recv()
        if errors.is(err, io.eof) {
            return nil
        }
        if err != nil {
            return err
        }
        switch req := req.request.(type) {
        case *pb.watchrequest_createrequest:
            // ignore validation (check the full code)
            // create a channel to send data to the client
            id := sensor.watch(ss.sensorch)
            ss.sensorwatch[sensorid] = id
            // send created message
            ss.sendch <- &pb.watchresponse{
                sensorid: sensorid,
                created:  true,
            }
        case *pb.watchrequest_cancelrequest:
            // ignore validation (check the full code)
            // cancel the watch
            ss.s.sensors[sensorid].cancel(id)
            delete(ss.sensorwatch, sensorid)
            ss.sendch <- &pb.watchresponse{
                sensorid:   sensorid,
                canceleted: true,
            }
        case *pb.watchrequest_nowrequest:
            // ignore validation (check the full code)
            // send current value
            ss.sendch <- &pb.watchresponse{
                sensorid:  sensorid,
                timestamp: timestamppb.now(),
                value:     int32(sensor.read()),
            }
        }
    }
}
switch语句用于处理不同类型的请求并做出决定
如何处理每个请求。只保留recvloop 函数很重要
读取消息但不向客户端发送消息,因此我们有 sendloop
将从控制通道读取消息并将其发送到客户端。
发送消息:
func (ss *serverstream) sendloop() {
    for {
        select {
        case m, ok := <-ss.sendch:
            if !ok {
                return
            }
            // send message
            if err := ss.stream.send(m); err != nil {
                return
            }
        case data, ok := <-ss.sensorch:
            if !ok {
                return
            }
            // send data
            if err := ss.stream.send(&pb.watchresponse{
                sensorid:  data.id,
                timestamp: timestamppb.new(data.time),
                value:     int32(data.val),
            }); err != nil {
                return
            }
        case <-ss.stream.context().done():
            return
        }
    }
}
sendloop函数读取控制通道和数据通道并发送
发送给客户端的消息。如果流关闭,该函数将返回。
最后,传感器服务的快乐路径测试:
func TestSensor(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSensorServer(s, &sensorService{
            sensors: newSensors(),
        })
    })
    client := pb.NewSensorClient(conn)
    stream, err := client.Watch(context.Background())
    if err != nil {
        t.Fatalf("failed to watch: %v", err)
    }
    response := make(chan *pb.WatchResponse)
    // Go routine to read from the stream
    go func() {
        defer close(response)
        for {
            resp, err := stream.Recv()
            if errors.Is(err, io.EOF) {
                return
            }
            if err != nil {
                return
            }
            response <- resp
        }
    }()
    createRequest(t, stream, "temp")
    waitUntilCreated(t, response, "temp")
    waitForSensorData(t, response, "temp")
    createRequest(t, stream, "pres")
    waitUntilCreated(t, response, "pres")
    waitForSensorData(t, response, "pres")
    waitForSensorData(t, response, "temp")
    waitForSensorData(t, response, "pres")
    // invalid sensor
    createRequest(t, stream, "invalid")
    waitUntilCanceled(t, response, "invalid")
    nowRequest(t, stream, "light")
    waitForSensorData(t, response, "light")
    // Wait for 2 seconds to make sure we don't receive any data for light
    waitForNoSensorData(t, response, "light", 2*time.Second)
    cancelRequest(t, stream, "temp")
    waitUntilCanceled(t, response, "temp")
    waitForSensorData(t, response, "pres")
    // Wait for 2 seconds to make sure we don't receive any data for temp
    waitForNoSensorData(t, response, "temp", 2*time.Second)
    err = stream.CloseSend()
    if err != nil {
        t.Fatalf("failed to close send: %v", err)
    }
}
从上面的测试中我们可以看到客户端可以创建、取消、获取当前
传感器的值。客户端还可以同时观看多个传感器。
挑战自己
- 使用 grpc 流实现聊天应用程序。
- 修改传感器服务以一次发送多个值以节省往返次数。
- 嗅探网络流量以查看一元请求和流式请求之间的区别。
结论
grpc 流是一种用于构建实时应用程序的多功能且强大的工具。 
通过遵循最佳实践,例如仅在必要时使用流式传输、有效地批处理数据以及明智地利用双向流式传输,开发人员可以最大限度地提高性能
并保持代码简单性。 
虽然 grpc 流式传输带来了复杂性,但其好处远远超过了挑战
当深思熟虑地应用时。
保持联系
如果您有任何问题或反馈,请随时在 linkedin 上与我联系。
以上就是gRPC 流:最佳实践和性能见解的详细内容,更多请关注php中文网其它相关文章!
 www.rfbc.cn
     www.rfbc.cn       
     
     
     
         
         
             
             
             
             
                 
                 
                 
                 
                 
  
  