Skip to content
Snippets Groups Projects
Commit d3b24fed authored by ale's avatar ale
Browse files

Add GetAllScores RPC method

parent c3efa68d
Branches
No related tags found
No related merge requests found
......@@ -427,6 +427,61 @@ func (x *GetScoreResponse) GetScore() float32 {
return 0
}
type GetAllScoresRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Horizon int64 `protobuf:"varint,1,opt,name=horizon,proto3" json:"horizon,omitempty"`
Threshold float32 `protobuf:"fixed32,2,opt,name=threshold,proto3" json:"threshold,omitempty"`
}
func (x *GetAllScoresRequest) Reset() {
*x = GetAllScoresRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_iprep_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *GetAllScoresRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetAllScoresRequest) ProtoMessage() {}
func (x *GetAllScoresRequest) ProtoReflect() protoreflect.Message {
mi := &file_iprep_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetAllScoresRequest.ProtoReflect.Descriptor instead.
func (*GetAllScoresRequest) Descriptor() ([]byte, []int) {
return file_iprep_proto_rawDescGZIP(), []int{7}
}
func (x *GetAllScoresRequest) GetHorizon() int64 {
if x != nil {
return x.Horizon
}
return 0
}
func (x *GetAllScoresRequest) GetThreshold() float32 {
if x != nil {
return x.Threshold
}
return 0
}
var File_iprep_proto protoreflect.FileDescriptor
var file_iprep_proto_rawDesc = []byte{
......@@ -467,19 +522,28 @@ var file_iprep_proto_rawDesc = []byte{
0x6e, 0x22, 0x38, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x02, 0x69, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x02, 0x52, 0x05, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x32, 0x80, 0x01, 0x0a, 0x05,
0x49, 0x70, 0x52, 0x65, 0x70, 0x12, 0x38, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12,
0x14, 0x2e, 0x69, 0x70, 0x72, 0x65, 0x70, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12,
0x3d, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x12, 0x16, 0x2e, 0x69, 0x70,
0x72, 0x65, 0x70, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x69, 0x70, 0x72, 0x65, 0x70, 0x2e, 0x47, 0x65, 0x74, 0x53,
0x63, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x29,
0x5a, 0x27, 0x67, 0x69, 0x74, 0x2e, 0x61, 0x75, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x69, 0x2e,
0x6f, 0x72, 0x67, 0x2f, 0x61, 0x69, 0x33, 0x2f, 0x74, 0x6f, 0x6f, 0x6c, 0x73, 0x2f, 0x69, 0x70,
0x72, 0x65, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
0x20, 0x01, 0x28, 0x02, 0x52, 0x05, 0x73, 0x63, 0x6f, 0x72, 0x65, 0x22, 0x4d, 0x0a, 0x13, 0x47,
0x65, 0x74, 0x41, 0x6c, 0x6c, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x68, 0x6f, 0x72, 0x69, 0x7a, 0x6f, 0x6e, 0x18, 0x01, 0x20,
0x01, 0x28, 0x03, 0x52, 0x07, 0x68, 0x6f, 0x72, 0x69, 0x7a, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x09,
0x74, 0x68, 0x72, 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x02, 0x52,
0x09, 0x74, 0x68, 0x72, 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x32, 0xc9, 0x01, 0x0a, 0x05, 0x49,
0x70, 0x52, 0x65, 0x70, 0x12, 0x38, 0x0a, 0x06, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x12, 0x14,
0x2e, 0x69, 0x70, 0x72, 0x65, 0x70, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d,
0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x12, 0x16, 0x2e, 0x69, 0x70, 0x72,
0x65, 0x70, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x17, 0x2e, 0x69, 0x70, 0x72, 0x65, 0x70, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63,
0x6f, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x47, 0x0a,
0x0c, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x73, 0x12, 0x1a, 0x2e,
0x69, 0x70, 0x72, 0x65, 0x70, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x53, 0x63, 0x6f, 0x72,
0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x69, 0x70, 0x72, 0x65,
0x70, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x63, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x2e, 0x61, 0x75,
0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x69, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x61, 0x69, 0x33, 0x2f,
0x74, 0x6f, 0x6f, 0x6c, 0x73, 0x2f, 0x69, 0x70, 0x72, 0x65, 0x70, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
......@@ -494,16 +558,17 @@ func file_iprep_proto_rawDescGZIP() []byte {
return file_iprep_proto_rawDescData
}
var file_iprep_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_iprep_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_iprep_proto_goTypes = []interface{}{
(*Event)(nil), // 0: iprep.Event
(*AggregateIPEntry)(nil), // 1: iprep.AggregateIPEntry
(*AggregateTypeEntry)(nil), // 2: iprep.AggregateTypeEntry
(*Aggregate)(nil), // 3: iprep.Aggregate
(*SubmitRequest)(nil), // 4: iprep.SubmitRequest
(*GetScoreRequest)(nil), // 5: iprep.GetScoreRequest
(*GetScoreResponse)(nil), // 6: iprep.GetScoreResponse
(*empty.Empty)(nil), // 7: google.protobuf.Empty
(*Event)(nil), // 0: iprep.Event
(*AggregateIPEntry)(nil), // 1: iprep.AggregateIPEntry
(*AggregateTypeEntry)(nil), // 2: iprep.AggregateTypeEntry
(*Aggregate)(nil), // 3: iprep.Aggregate
(*SubmitRequest)(nil), // 4: iprep.SubmitRequest
(*GetScoreRequest)(nil), // 5: iprep.GetScoreRequest
(*GetScoreResponse)(nil), // 6: iprep.GetScoreResponse
(*GetAllScoresRequest)(nil), // 7: iprep.GetAllScoresRequest
(*empty.Empty)(nil), // 8: google.protobuf.Empty
}
var file_iprep_proto_depIdxs = []int32{
1, // 0: iprep.AggregateTypeEntry.by_ip:type_name -> iprep.AggregateIPEntry
......@@ -512,10 +577,12 @@ var file_iprep_proto_depIdxs = []int32{
3, // 3: iprep.SubmitRequest.aggregates:type_name -> iprep.Aggregate
4, // 4: iprep.IpRep.Submit:input_type -> iprep.SubmitRequest
5, // 5: iprep.IpRep.GetScore:input_type -> iprep.GetScoreRequest
7, // 6: iprep.IpRep.Submit:output_type -> google.protobuf.Empty
6, // 7: iprep.IpRep.GetScore:output_type -> iprep.GetScoreResponse
6, // [6:8] is the sub-list for method output_type
4, // [4:6] is the sub-list for method input_type
7, // 6: iprep.IpRep.GetAllScores:input_type -> iprep.GetAllScoresRequest
8, // 7: iprep.IpRep.Submit:output_type -> google.protobuf.Empty
6, // 8: iprep.IpRep.GetScore:output_type -> iprep.GetScoreResponse
6, // 9: iprep.IpRep.GetAllScores:output_type -> iprep.GetScoreResponse
7, // [7:10] is the sub-list for method output_type
4, // [4:7] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
......@@ -611,6 +678,18 @@ func file_iprep_proto_init() {
return nil
}
}
file_iprep_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetAllScoresRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
......@@ -618,7 +697,7 @@ func file_iprep_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_iprep_proto_rawDesc,
NumEnums: 0,
NumMessages: 7,
NumMessages: 8,
NumExtensions: 0,
NumServices: 1,
},
......
......@@ -44,7 +44,13 @@ message GetScoreResponse {
float score = 2;
}
message GetAllScoresRequest {
int64 horizon = 1;
float threshold = 2;
}
service IpRep {
rpc Submit(SubmitRequest) returns (google.protobuf.Empty) {}
rpc GetScore(GetScoreRequest) returns (GetScoreResponse) {}
rpc GetAllScores(GetAllScoresRequest) returns (stream GetScoreResponse) {}
}
......@@ -21,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion7
type IpRepClient interface {
Submit(ctx context.Context, in *SubmitRequest, opts ...grpc.CallOption) (*empty.Empty, error)
GetScore(ctx context.Context, in *GetScoreRequest, opts ...grpc.CallOption) (*GetScoreResponse, error)
GetAllScores(ctx context.Context, in *GetAllScoresRequest, opts ...grpc.CallOption) (IpRep_GetAllScoresClient, error)
}
type ipRepClient struct {
......@@ -49,12 +50,45 @@ func (c *ipRepClient) GetScore(ctx context.Context, in *GetScoreRequest, opts ..
return out, nil
}
func (c *ipRepClient) GetAllScores(ctx context.Context, in *GetAllScoresRequest, opts ...grpc.CallOption) (IpRep_GetAllScoresClient, error) {
stream, err := c.cc.NewStream(ctx, &IpRep_ServiceDesc.Streams[0], "/iprep.IpRep/GetAllScores", opts...)
if err != nil {
return nil, err
}
x := &ipRepGetAllScoresClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type IpRep_GetAllScoresClient interface {
Recv() (*GetScoreResponse, error)
grpc.ClientStream
}
type ipRepGetAllScoresClient struct {
grpc.ClientStream
}
func (x *ipRepGetAllScoresClient) Recv() (*GetScoreResponse, error) {
m := new(GetScoreResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// IpRepServer is the server API for IpRep service.
// All implementations must embed UnimplementedIpRepServer
// for forward compatibility
type IpRepServer interface {
Submit(context.Context, *SubmitRequest) (*empty.Empty, error)
GetScore(context.Context, *GetScoreRequest) (*GetScoreResponse, error)
GetAllScores(*GetAllScoresRequest, IpRep_GetAllScoresServer) error
mustEmbedUnimplementedIpRepServer()
}
......@@ -68,6 +102,9 @@ func (UnimplementedIpRepServer) Submit(context.Context, *SubmitRequest) (*empty.
func (UnimplementedIpRepServer) GetScore(context.Context, *GetScoreRequest) (*GetScoreResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetScore not implemented")
}
func (UnimplementedIpRepServer) GetAllScores(*GetAllScoresRequest, IpRep_GetAllScoresServer) error {
return status.Errorf(codes.Unimplemented, "method GetAllScores not implemented")
}
func (UnimplementedIpRepServer) mustEmbedUnimplementedIpRepServer() {}
// UnsafeIpRepServer may be embedded to opt out of forward compatibility for this service.
......@@ -117,6 +154,27 @@ func _IpRep_GetScore_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler)
}
func _IpRep_GetAllScores_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(GetAllScoresRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(IpRepServer).GetAllScores(m, &ipRepGetAllScoresServer{stream})
}
type IpRep_GetAllScoresServer interface {
Send(*GetScoreResponse) error
grpc.ServerStream
}
type ipRepGetAllScoresServer struct {
grpc.ServerStream
}
func (x *ipRepGetAllScoresServer) Send(m *GetScoreResponse) error {
return x.ServerStream.SendMsg(m)
}
// IpRep_ServiceDesc is the grpc.ServiceDesc for IpRep service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
......@@ -133,6 +191,12 @@ var IpRep_ServiceDesc = grpc.ServiceDesc{
Handler: _IpRep_GetScore_Handler,
},
},
Streams: []grpc.StreamDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "GetAllScores",
Handler: _IpRep_GetAllScores_Handler,
ServerStreams: true,
},
},
Metadata: "iprep.proto",
}
......@@ -108,6 +108,30 @@ func (s *Server) GetScore(ctx context.Context, req *ippb.GetScoreRequest) (*ippb
}, nil
}
func (s *Server) GetAllScores(req *ippb.GetAllScoresRequest, stream ippb.IpRep_GetAllScoresServer) error {
horizon := s.horizon
if req.Horizon > 0 {
horizon = time.Second * time.Duration(req.Horizon)
}
script := s.Script()
return s.db.ScanAll(time.Now().Add(-horizon), func(ip string, m map[string]int64) error {
score, err := script.RunIP(context.Background(), ip, m, horizon.Seconds(), s.ext)
if err != nil {
log.Printf("script error on ip %s: %v", ip, err)
return nil
}
if float32(score) < req.Threshold {
return nil
}
return stream.Send(&ippb.GetScoreResponse{
Ip: ip,
Score: float32(score),
})
})
}
func (s *Server) cleaner() {
t := time.NewTicker(6 * time.Hour)
defer t.Stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment