From 75e537d5547deb7e714dcfc04242d024d3949429 Mon Sep 17 00:00:00 2001 From: anjor Date: Fri, 8 Nov 2024 22:02:33 +0000 Subject: [PATCH 01/15] add stream transactions protobuf --- .../old-faithful-grpc/old-faithful.pb.go | 323 ++++++++++++++---- .../old-faithful-grpc/old-faithful_grpc.pb.go | 53 ++- old-faithful-proto/proto/old-faithful.proto | 17 +- 3 files changed, 313 insertions(+), 80 deletions(-) diff --git a/old-faithful-proto/old-faithful-grpc/old-faithful.pb.go b/old-faithful-proto/old-faithful-grpc/old-faithful.pb.go index cd4f53a4..ba476802 100644 --- a/old-faithful-proto/old-faithful-grpc/old-faithful.pb.go +++ b/old-faithful-proto/old-faithful-grpc/old-faithful.pb.go @@ -869,7 +869,7 @@ type StreamBlocksRequest struct { unknownFields protoimpl.UnknownFields StartSlot uint64 `protobuf:"varint,1,opt,name=start_slot,json=startSlot,proto3" json:"start_slot,omitempty"` - EndSlot *uint64 `protobuf:"varint,2,opt,name=end_slot,json=endSlot,proto3,oneof" json:"end_slot,omitempty"` // If not specified, continues indefinitely + EndSlot *uint64 `protobuf:"varint,2,opt,name=end_slot,json=endSlot,proto3,oneof" json:"end_slot,omitempty"` Filter *StreamBlocksFilter `protobuf:"bytes,3,opt,name=filter,proto3,oneof" json:"filter,omitempty"` } @@ -969,6 +969,144 @@ func (x *StreamBlocksFilter) GetAccountInclude() []string { return nil } +type StreamTransactionsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StartSlot uint64 `protobuf:"varint,1,opt,name=start_slot,json=startSlot,proto3" json:"start_slot,omitempty"` + EndSlot *uint64 `protobuf:"varint,2,opt,name=end_slot,json=endSlot,proto3,oneof" json:"end_slot,omitempty"` + Filter *StreamTransactionsFilter `protobuf:"bytes,3,opt,name=filter,proto3,oneof" json:"filter,omitempty"` +} + +func (x *StreamTransactionsRequest) Reset() { + *x = StreamTransactionsRequest{} + mi := &file_old_faithful_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamTransactionsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamTransactionsRequest) ProtoMessage() {} + +func (x *StreamTransactionsRequest) ProtoReflect() protoreflect.Message { + mi := &file_old_faithful_proto_msgTypes[14] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamTransactionsRequest.ProtoReflect.Descriptor instead. +func (*StreamTransactionsRequest) Descriptor() ([]byte, []int) { + return file_old_faithful_proto_rawDescGZIP(), []int{14} +} + +func (x *StreamTransactionsRequest) GetStartSlot() uint64 { + if x != nil { + return x.StartSlot + } + return 0 +} + +func (x *StreamTransactionsRequest) GetEndSlot() uint64 { + if x != nil && x.EndSlot != nil { + return *x.EndSlot + } + return 0 +} + +func (x *StreamTransactionsRequest) GetFilter() *StreamTransactionsFilter { + if x != nil { + return x.Filter + } + return nil +} + +type StreamTransactionsFilter struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Vote *bool `protobuf:"varint,1,opt,name=vote,proto3,oneof" json:"vote,omitempty"` + Failed *bool `protobuf:"varint,2,opt,name=failed,proto3,oneof" json:"failed,omitempty"` + AccountInclude []string `protobuf:"bytes,3,rep,name=account_include,json=accountInclude,proto3" json:"account_include,omitempty"` + AccountExclude []string `protobuf:"bytes,4,rep,name=account_exclude,json=accountExclude,proto3" json:"account_exclude,omitempty"` + AccountRequired []string `protobuf:"bytes,5,rep,name=account_required,json=accountRequired,proto3" json:"account_required,omitempty"` +} + +func (x *StreamTransactionsFilter) Reset() { + *x = StreamTransactionsFilter{} + mi := &file_old_faithful_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StreamTransactionsFilter) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamTransactionsFilter) ProtoMessage() {} + +func (x *StreamTransactionsFilter) ProtoReflect() protoreflect.Message { + mi := &file_old_faithful_proto_msgTypes[15] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamTransactionsFilter.ProtoReflect.Descriptor instead. +func (*StreamTransactionsFilter) Descriptor() ([]byte, []int) { + return file_old_faithful_proto_rawDescGZIP(), []int{15} +} + +func (x *StreamTransactionsFilter) GetVote() bool { + if x != nil && x.Vote != nil { + return *x.Vote + } + return false +} + +func (x *StreamTransactionsFilter) GetFailed() bool { + if x != nil && x.Failed != nil { + return *x.Failed + } + return false +} + +func (x *StreamTransactionsFilter) GetAccountInclude() []string { + if x != nil { + return x.AccountInclude + } + return nil +} + +func (x *StreamTransactionsFilter) GetAccountExclude() []string { + if x != nil { + return x.AccountExclude + } + return nil +} + +func (x *StreamTransactionsFilter) GetAccountRequired() []string { + if x != nil { + return x.AccountRequired + } + return nil +} + var File_old_faithful_proto protoreflect.FileDescriptor var file_old_faithful_proto_rawDesc = []byte{ @@ -1089,45 +1227,77 @@ var file_old_faithful_proto_rawDesc = []byte{ 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x27, 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x61, 0x63, 0x63, - 0x6f, 0x75, 0x6e, 0x74, 0x49, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x2a, 0x33, 0x0a, 0x14, 0x47, - 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x43, - 0x6f, 0x64, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, - 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, - 0x32, 0xcb, 0x03, 0x0a, 0x0b, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, - 0x12, 0x47, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b, - 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x56, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x4f, 0x6c, + 0x6f, 0x75, 0x6e, 0x74, 0x49, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x22, 0xb6, 0x01, 0x0a, 0x19, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x61, + 0x72, 0x74, 0x5f, 0x73, 0x6c, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x53, 0x6c, 0x6f, 0x74, 0x12, 0x1e, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x5f, + 0x73, 0x6c, 0x6f, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x48, 0x00, 0x52, 0x07, 0x65, 0x6e, + 0x64, 0x53, 0x6c, 0x6f, 0x74, 0x88, 0x01, 0x01, 0x12, 0x42, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, + 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, + 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x48, + 0x01, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x88, 0x01, 0x01, 0x42, 0x0b, 0x0a, 0x09, + 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x73, 0x6c, 0x6f, 0x74, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x66, 0x69, + 0x6c, 0x74, 0x65, 0x72, 0x22, 0xe1, 0x01, 0x0a, 0x18, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x46, 0x69, 0x6c, 0x74, 0x65, + 0x72, 0x12, 0x17, 0x0a, 0x04, 0x76, 0x6f, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x48, + 0x00, 0x52, 0x04, 0x76, 0x6f, 0x74, 0x65, 0x88, 0x01, 0x01, 0x12, 0x1b, 0x0a, 0x06, 0x66, 0x61, + 0x69, 0x6c, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x48, 0x01, 0x52, 0x06, 0x66, 0x61, + 0x69, 0x6c, 0x65, 0x64, 0x88, 0x01, 0x01, 0x12, 0x27, 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x6f, 0x75, + 0x6e, 0x74, 0x5f, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x0e, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x49, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, + 0x12, 0x27, 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x65, 0x78, 0x63, 0x6c, + 0x75, 0x64, 0x65, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x61, 0x63, 0x63, 0x6f, 0x75, + 0x6e, 0x74, 0x45, 0x78, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x12, 0x29, 0x0a, 0x10, 0x61, 0x63, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x18, 0x05, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x69, 0x72, 0x65, 0x64, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x76, 0x6f, 0x74, 0x65, 0x42, 0x09, 0x0a, + 0x07, 0x5f, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x2a, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x64, 0x65, + 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x0d, + 0x0a, 0x09, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x01, 0x32, 0xad, 0x04, + 0x0a, 0x0b, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x12, 0x47, 0x0a, + 0x0a, 0x47, 0x65, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x08, 0x47, 0x65, 0x74, - 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x19, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, - 0x66, 0x75, 0x6c, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1a, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x42, - 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x0c, - 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x1d, 0x2e, 0x4f, - 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, - 0x54, 0x69, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x4f, 0x6c, - 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, - 0x69, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x53, 0x0a, 0x0e, 0x47, - 0x65, 0x74, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, - 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x54, 0x72, 0x61, 0x6e, - 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, - 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x3c, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x17, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, - 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x18, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x47, - 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4e, - 0x0a, 0x0c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x20, - 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1a, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x42, - 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x4e, - 0x5a, 0x4c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x70, 0x63, - 0x70, 0x6f, 0x6f, 0x6c, 0x2f, 0x79, 0x65, 0x6c, 0x6c, 0x6f, 0x77, 0x73, 0x74, 0x6f, 0x6e, 0x65, - 0x2d, 0x66, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2f, 0x6f, 0x6c, 0x64, 0x2d, 0x66, 0x61, - 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6f, 0x6c, 0x64, - 0x5f, 0x66, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x5f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, + 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x12, 0x19, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, + 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, + 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x42, 0x6c, 0x6f, 0x63, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x0c, 0x47, 0x65, 0x74, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x1d, 0x2e, 0x4f, 0x6c, 0x64, 0x46, + 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, + 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x53, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x2e, 0x4f, 0x6c, 0x64, + 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x4f, 0x6c, + 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, + 0x03, 0x47, 0x65, 0x74, 0x12, 0x17, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, + 0x75, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, + 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4e, 0x0a, 0x0c, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x20, 0x2e, 0x4f, 0x6c, + 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, + 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x42, 0x6c, 0x6f, 0x63, + 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x60, 0x0a, 0x12, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x12, 0x26, 0x2e, 0x4f, 0x6c, 0x64, 0x46, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x4f, 0x6c, 0x64, 0x46, + 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x4e, 0x5a, + 0x4c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x70, 0x63, 0x70, + 0x6f, 0x6f, 0x6c, 0x2f, 0x79, 0x65, 0x6c, 0x6c, 0x6f, 0x77, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x2d, + 0x66, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2f, 0x6f, 0x6c, 0x64, 0x2d, 0x66, 0x61, 0x69, + 0x74, 0x68, 0x66, 0x75, 0x6c, 0x2d, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x6f, 0x6c, 0x64, 0x5f, + 0x66, 0x61, 0x69, 0x74, 0x68, 0x66, 0x75, 0x6c, 0x5f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1143,23 +1313,25 @@ func file_old_faithful_proto_rawDescGZIP() []byte { } var file_old_faithful_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_old_faithful_proto_msgTypes = make([]protoimpl.MessageInfo, 14) +var file_old_faithful_proto_msgTypes = make([]protoimpl.MessageInfo, 16) var file_old_faithful_proto_goTypes = []any{ - (GetResponseErrorCode)(0), // 0: OldFaithful.GetResponseErrorCode - (*VersionRequest)(nil), // 1: OldFaithful.VersionRequest - (*VersionResponse)(nil), // 2: OldFaithful.VersionResponse - (*BlockRequest)(nil), // 3: OldFaithful.BlockRequest - (*BlockResponse)(nil), // 4: OldFaithful.BlockResponse - (*BlockTimeRequest)(nil), // 5: OldFaithful.BlockTimeRequest - (*BlockTimeResponse)(nil), // 6: OldFaithful.BlockTimeResponse - (*TransactionRequest)(nil), // 7: OldFaithful.TransactionRequest - (*TransactionResponse)(nil), // 8: OldFaithful.TransactionResponse - (*Transaction)(nil), // 9: OldFaithful.Transaction - (*GetRequest)(nil), // 10: OldFaithful.GetRequest - (*GetResponse)(nil), // 11: OldFaithful.GetResponse - (*GetResponseError)(nil), // 12: OldFaithful.GetResponseError - (*StreamBlocksRequest)(nil), // 13: OldFaithful.StreamBlocksRequest - (*StreamBlocksFilter)(nil), // 14: OldFaithful.StreamBlocksFilter + (GetResponseErrorCode)(0), // 0: OldFaithful.GetResponseErrorCode + (*VersionRequest)(nil), // 1: OldFaithful.VersionRequest + (*VersionResponse)(nil), // 2: OldFaithful.VersionResponse + (*BlockRequest)(nil), // 3: OldFaithful.BlockRequest + (*BlockResponse)(nil), // 4: OldFaithful.BlockResponse + (*BlockTimeRequest)(nil), // 5: OldFaithful.BlockTimeRequest + (*BlockTimeResponse)(nil), // 6: OldFaithful.BlockTimeResponse + (*TransactionRequest)(nil), // 7: OldFaithful.TransactionRequest + (*TransactionResponse)(nil), // 8: OldFaithful.TransactionResponse + (*Transaction)(nil), // 9: OldFaithful.Transaction + (*GetRequest)(nil), // 10: OldFaithful.GetRequest + (*GetResponse)(nil), // 11: OldFaithful.GetResponse + (*GetResponseError)(nil), // 12: OldFaithful.GetResponseError + (*StreamBlocksRequest)(nil), // 13: OldFaithful.StreamBlocksRequest + (*StreamBlocksFilter)(nil), // 14: OldFaithful.StreamBlocksFilter + (*StreamTransactionsRequest)(nil), // 15: OldFaithful.StreamTransactionsRequest + (*StreamTransactionsFilter)(nil), // 16: OldFaithful.StreamTransactionsFilter } var file_old_faithful_proto_depIdxs = []int32{ 9, // 0: OldFaithful.BlockResponse.transactions:type_name -> OldFaithful.Transaction @@ -1175,23 +1347,26 @@ var file_old_faithful_proto_depIdxs = []int32{ 8, // 10: OldFaithful.GetResponse.transaction:type_name -> OldFaithful.TransactionResponse 0, // 11: OldFaithful.GetResponseError.code:type_name -> OldFaithful.GetResponseErrorCode 14, // 12: OldFaithful.StreamBlocksRequest.filter:type_name -> OldFaithful.StreamBlocksFilter - 1, // 13: OldFaithful.OldFaithful.GetVersion:input_type -> OldFaithful.VersionRequest - 3, // 14: OldFaithful.OldFaithful.GetBlock:input_type -> OldFaithful.BlockRequest - 5, // 15: OldFaithful.OldFaithful.GetBlockTime:input_type -> OldFaithful.BlockTimeRequest - 7, // 16: OldFaithful.OldFaithful.GetTransaction:input_type -> OldFaithful.TransactionRequest - 10, // 17: OldFaithful.OldFaithful.Get:input_type -> OldFaithful.GetRequest - 13, // 18: OldFaithful.OldFaithful.StreamBlocks:input_type -> OldFaithful.StreamBlocksRequest - 2, // 19: OldFaithful.OldFaithful.GetVersion:output_type -> OldFaithful.VersionResponse - 4, // 20: OldFaithful.OldFaithful.GetBlock:output_type -> OldFaithful.BlockResponse - 6, // 21: OldFaithful.OldFaithful.GetBlockTime:output_type -> OldFaithful.BlockTimeResponse - 8, // 22: OldFaithful.OldFaithful.GetTransaction:output_type -> OldFaithful.TransactionResponse - 11, // 23: OldFaithful.OldFaithful.Get:output_type -> OldFaithful.GetResponse - 4, // 24: OldFaithful.OldFaithful.StreamBlocks:output_type -> OldFaithful.BlockResponse - 19, // [19:25] is the sub-list for method output_type - 13, // [13:19] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 16, // 13: OldFaithful.StreamTransactionsRequest.filter:type_name -> OldFaithful.StreamTransactionsFilter + 1, // 14: OldFaithful.OldFaithful.GetVersion:input_type -> OldFaithful.VersionRequest + 3, // 15: OldFaithful.OldFaithful.GetBlock:input_type -> OldFaithful.BlockRequest + 5, // 16: OldFaithful.OldFaithful.GetBlockTime:input_type -> OldFaithful.BlockTimeRequest + 7, // 17: OldFaithful.OldFaithful.GetTransaction:input_type -> OldFaithful.TransactionRequest + 10, // 18: OldFaithful.OldFaithful.Get:input_type -> OldFaithful.GetRequest + 13, // 19: OldFaithful.OldFaithful.StreamBlocks:input_type -> OldFaithful.StreamBlocksRequest + 15, // 20: OldFaithful.OldFaithful.StreamTransactions:input_type -> OldFaithful.StreamTransactionsRequest + 2, // 21: OldFaithful.OldFaithful.GetVersion:output_type -> OldFaithful.VersionResponse + 4, // 22: OldFaithful.OldFaithful.GetBlock:output_type -> OldFaithful.BlockResponse + 6, // 23: OldFaithful.OldFaithful.GetBlockTime:output_type -> OldFaithful.BlockTimeResponse + 8, // 24: OldFaithful.OldFaithful.GetTransaction:output_type -> OldFaithful.TransactionResponse + 11, // 25: OldFaithful.OldFaithful.Get:output_type -> OldFaithful.GetResponse + 4, // 26: OldFaithful.OldFaithful.StreamBlocks:output_type -> OldFaithful.BlockResponse + 8, // 27: OldFaithful.OldFaithful.StreamTransactions:output_type -> OldFaithful.TransactionResponse + 21, // [21:28] is the sub-list for method output_type + 14, // [14:21] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_old_faithful_proto_init() } @@ -1216,13 +1391,15 @@ func file_old_faithful_proto_init() { (*GetResponse_Transaction)(nil), } file_old_faithful_proto_msgTypes[12].OneofWrappers = []any{} + file_old_faithful_proto_msgTypes[14].OneofWrappers = []any{} + file_old_faithful_proto_msgTypes[15].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_old_faithful_proto_rawDesc, NumEnums: 1, - NumMessages: 14, + NumMessages: 16, NumExtensions: 0, NumServices: 1, }, diff --git a/old-faithful-proto/old-faithful-grpc/old-faithful_grpc.pb.go b/old-faithful-proto/old-faithful-grpc/old-faithful_grpc.pb.go index 309884be..c13ab44c 100644 --- a/old-faithful-proto/old-faithful-grpc/old-faithful_grpc.pb.go +++ b/old-faithful-proto/old-faithful-grpc/old-faithful_grpc.pb.go @@ -19,12 +19,13 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - OldFaithful_GetVersion_FullMethodName = "/OldFaithful.OldFaithful/GetVersion" - OldFaithful_GetBlock_FullMethodName = "/OldFaithful.OldFaithful/GetBlock" - OldFaithful_GetBlockTime_FullMethodName = "/OldFaithful.OldFaithful/GetBlockTime" - OldFaithful_GetTransaction_FullMethodName = "/OldFaithful.OldFaithful/GetTransaction" - OldFaithful_Get_FullMethodName = "/OldFaithful.OldFaithful/Get" - OldFaithful_StreamBlocks_FullMethodName = "/OldFaithful.OldFaithful/StreamBlocks" + OldFaithful_GetVersion_FullMethodName = "/OldFaithful.OldFaithful/GetVersion" + OldFaithful_GetBlock_FullMethodName = "/OldFaithful.OldFaithful/GetBlock" + OldFaithful_GetBlockTime_FullMethodName = "/OldFaithful.OldFaithful/GetBlockTime" + OldFaithful_GetTransaction_FullMethodName = "/OldFaithful.OldFaithful/GetTransaction" + OldFaithful_Get_FullMethodName = "/OldFaithful.OldFaithful/Get" + OldFaithful_StreamBlocks_FullMethodName = "/OldFaithful.OldFaithful/StreamBlocks" + OldFaithful_StreamTransactions_FullMethodName = "/OldFaithful.OldFaithful/StreamTransactions" ) // OldFaithfulClient is the client API for OldFaithful service. @@ -37,6 +38,7 @@ type OldFaithfulClient interface { GetTransaction(ctx context.Context, in *TransactionRequest, opts ...grpc.CallOption) (*TransactionResponse, error) Get(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[GetRequest, GetResponse], error) StreamBlocks(ctx context.Context, in *StreamBlocksRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[BlockResponse], error) + StreamTransactions(ctx context.Context, in *StreamTransactionsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TransactionResponse], error) } type oldFaithfulClient struct { @@ -119,6 +121,25 @@ func (c *oldFaithfulClient) StreamBlocks(ctx context.Context, in *StreamBlocksRe // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type OldFaithful_StreamBlocksClient = grpc.ServerStreamingClient[BlockResponse] +func (c *oldFaithfulClient) StreamTransactions(ctx context.Context, in *StreamTransactionsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[TransactionResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &OldFaithful_ServiceDesc.Streams[2], OldFaithful_StreamTransactions_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[StreamTransactionsRequest, TransactionResponse]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type OldFaithful_StreamTransactionsClient = grpc.ServerStreamingClient[TransactionResponse] + // OldFaithfulServer is the server API for OldFaithful service. // All implementations must embed UnimplementedOldFaithfulServer // for forward compatibility. @@ -129,6 +150,7 @@ type OldFaithfulServer interface { GetTransaction(context.Context, *TransactionRequest) (*TransactionResponse, error) Get(grpc.BidiStreamingServer[GetRequest, GetResponse]) error StreamBlocks(*StreamBlocksRequest, grpc.ServerStreamingServer[BlockResponse]) error + StreamTransactions(*StreamTransactionsRequest, grpc.ServerStreamingServer[TransactionResponse]) error mustEmbedUnimplementedOldFaithfulServer() } @@ -157,6 +179,9 @@ func (UnimplementedOldFaithfulServer) Get(grpc.BidiStreamingServer[GetRequest, G func (UnimplementedOldFaithfulServer) StreamBlocks(*StreamBlocksRequest, grpc.ServerStreamingServer[BlockResponse]) error { return status.Errorf(codes.Unimplemented, "method StreamBlocks not implemented") } +func (UnimplementedOldFaithfulServer) StreamTransactions(*StreamTransactionsRequest, grpc.ServerStreamingServer[TransactionResponse]) error { + return status.Errorf(codes.Unimplemented, "method StreamTransactions not implemented") +} func (UnimplementedOldFaithfulServer) mustEmbedUnimplementedOldFaithfulServer() {} func (UnimplementedOldFaithfulServer) testEmbeddedByValue() {} @@ -268,6 +293,17 @@ func _OldFaithful_StreamBlocks_Handler(srv interface{}, stream grpc.ServerStream // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type OldFaithful_StreamBlocksServer = grpc.ServerStreamingServer[BlockResponse] +func _OldFaithful_StreamTransactions_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(StreamTransactionsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(OldFaithfulServer).StreamTransactions(m, &grpc.GenericServerStream[StreamTransactionsRequest, TransactionResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type OldFaithful_StreamTransactionsServer = grpc.ServerStreamingServer[TransactionResponse] + // OldFaithful_ServiceDesc is the grpc.ServiceDesc for OldFaithful service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -304,6 +340,11 @@ var OldFaithful_ServiceDesc = grpc.ServiceDesc{ Handler: _OldFaithful_StreamBlocks_Handler, ServerStreams: true, }, + { + StreamName: "StreamTransactions", + Handler: _OldFaithful_StreamTransactions_Handler, + ServerStreams: true, + }, }, Metadata: "old-faithful.proto", } diff --git a/old-faithful-proto/proto/old-faithful.proto b/old-faithful-proto/proto/old-faithful.proto index de328a7e..ae1bd3fb 100644 --- a/old-faithful-proto/proto/old-faithful.proto +++ b/old-faithful-proto/proto/old-faithful.proto @@ -13,6 +13,7 @@ service OldFaithful { rpc Get(stream GetRequest) returns (stream GetResponse); rpc StreamBlocks(StreamBlocksRequest) returns (stream BlockResponse); + rpc StreamTransactions(StreamTransactionsRequest) returns (stream TransactionResponse); } message VersionRequest {} @@ -96,10 +97,24 @@ message GetResponseError { message StreamBlocksRequest { uint64 start_slot = 1; - optional uint64 end_slot = 2; // If not specified, continues indefinitely + optional uint64 end_slot = 2; optional StreamBlocksFilter filter = 3; } message StreamBlocksFilter { repeated string account_include = 1; // Filter blocks/txns mentioning these accounts } + +message StreamTransactionsRequest { + uint64 start_slot = 1; + optional uint64 end_slot = 2; + optional StreamTransactionsFilter filter = 3; +} + +message StreamTransactionsFilter { + optional bool vote = 1; + optional bool failed = 2; + repeated string account_include = 3; + repeated string account_exclude = 4; + repeated string account_required = 5; +} From 61b58be81e73e174b0d61f73e41e116ce51e7457 Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 14 Nov 2024 10:41:02 -0700 Subject: [PATCH 02/15] impl --- grpc-server.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/grpc-server.go b/grpc-server.go index d51832f1..d3c5e3d2 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -696,5 +696,60 @@ func blockContainsAccounts(block *old_faithful_grpc.BlockResponse, accounts []st } return false +} + +func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTransactionsRequest, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer) error { + ctx := ser.Context() + + startSlot := params.StartSlot + endSlot := startSlot + maxSlotsToStream + + if params.EndSlot != nil { + endSlot = *params.EndSlot + } + + filterFunc := func(tx *old_faithful_grpc.Transaction) bool { + if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { + return true + } + return txContainsAccounts(tx, params.Filter.AccountInclude) + } + + for slot := startSlot; slot <= endSlot; slot++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) + if err != nil { + if status.Code(err) == codes.NotFound { + continue // is this the right thing to do? + } + return err + } + + for _, tx := range block.Transactions { + if filterFunc(tx) { + if err := ser.Send(constructTransactionResponse(tx)); err != nil { + return err + } + } + } + } + + return nil +} + +func txContainsAccounts(tx *old_faithful_grpc.Transaction, accounts []string) bool { + return true +} + +func constructTransactionResponse(tx *old_faithful_grpc.Transaction) *old_faithful_grpc.TransactionResponse { + // to do + return &old_faithful_grpc.TransactionResponse{ + Transaction: tx, + } } From d05760770173c25b33aefd6821d29a7ea452000b Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 18 Nov 2024 15:33:49 +0000 Subject: [PATCH 03/15] no filter case --- grpc-server.go | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index d3c5e3d2..e8a514b5 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -708,14 +708,6 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran endSlot = *params.EndSlot } - filterFunc := func(tx *old_faithful_grpc.Transaction) bool { - if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { - return true - } - - return txContainsAccounts(tx, params.Filter.AccountInclude) - } - for slot := startSlot; slot <= endSlot; slot++ { select { case <-ctx.Done(): @@ -723,17 +715,17 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran default: } - block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) - if err != nil { - if status.Code(err) == codes.NotFound { - continue // is this the right thing to do? + if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { + block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) + if err != nil { + if status.Code(err) == codes.NotFound { + continue // is this the right thing to do? + } + return err } - return err - } - for _, tx := range block.Transactions { - if filterFunc(tx) { - if err := ser.Send(constructTransactionResponse(tx)); err != nil { + for _, tx := range block.Transactions { + if err := ser.Send(constructTransactionResponse(tx, block)); err != nil { return err } } @@ -743,13 +735,11 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran return nil } -func txContainsAccounts(tx *old_faithful_grpc.Transaction, accounts []string) bool { - return true -} - -func constructTransactionResponse(tx *old_faithful_grpc.Transaction) *old_faithful_grpc.TransactionResponse { - // to do +func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_faithful_grpc.BlockResponse) *old_faithful_grpc.TransactionResponse { return &old_faithful_grpc.TransactionResponse{ Transaction: tx, + BlockTime: block.BlockTime, + Slot: block.Slot, + // What to do for index? } } From 03bcf0f4accebd2e0698aa45e5bfcdc408635681 Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 21 Nov 2024 16:01:22 +0000 Subject: [PATCH 04/15] working --- grpc-server.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index e8a514b5..9a474875 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -707,6 +707,7 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran if params.EndSlot != nil { endSlot = *params.EndSlot } + gsfaReader, _ := multi.getGsfaReadersInEpochDescendingOrderForSlotRange(ctx, startSlot, endSlot) for slot := startSlot; slot <= endSlot; slot++ { select { @@ -716,19 +717,14 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran } if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { - block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) - if err != nil { - if status.Code(err) == codes.NotFound { - continue // is this the right thing to do? - } + if err := multi.streamAllTxns(ctx, ser, slot); err != nil { return err } - - for _, tx := range block.Transactions { - if err := ser.Send(constructTransactionResponse(tx, block)); err != nil { - return err - } + } else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 { + for _, account := range params.Filter.AccountInclude { + gsfaReader.Get() } + } } @@ -743,3 +739,16 @@ func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_ // What to do for index? } } + +func (multi *MultiEpoch) streamAllTxns(ctx context.Context, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, slot uint64) error { + block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) + if err != nil && status.Code(err) != codes.NotFound { + return err + } + + for _, tx := range block.Transactions { + if err := ser.Send(constructTransactionResponse(tx, block)); err != nil { + return err + } + } +} From 997cf991a41b5612bb19b0fb50c68fd49f554fad Mon Sep 17 00:00:00 2001 From: anjor Date: Sat, 30 Nov 2024 20:42:54 +0000 Subject: [PATCH 05/15] account include impl --- grpc-server.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 61 insertions(+), 5 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index 9a474875..5a5e9a73 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -18,6 +18,8 @@ import ( "github.com/ipld/go-car/util" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/rpcpool/yellowstone-faithful/compactindexsized" + "github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog" + "github.com/rpcpool/yellowstone-faithful/indexes" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc" @@ -722,9 +724,63 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran } } else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 { for _, account := range params.Filter.AccountInclude { - gsfaReader.Get() - } + pKey := solana.MustPublicKeyFromBase58(account) + epochToTxns, err := gsfaReader.Get( + ctx, + pKey, + 1000, + func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) { + epoch, err := multi.GetEpoch(epochNum) + if err != nil { + return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err) + } + raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{ + Offset: oas.Offset, + Size: oas.Size, + }) + if err != nil { + return nil, fmt.Errorf("failed to get signature: %w", err) + } + decoded, err := iplddecoders.DecodeTransaction(raw) + if err != nil { + return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err) + } + return decoded, nil + }, + ) + + if err != nil { + return err + } + + for epochNumber, txns := range epochToTxns { + epochHandler, err := multi.GetEpoch(epochNumber) + if err != nil { + return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) + } + for _, txn := range txns { + txResp := new(old_faithful_grpc.TransactionResponse) + txResp.Transaction = new(old_faithful_grpc.Transaction) + { + pos, ok := txn.GetPositionIndex() + if ok { + txResp.Index = ptrToUint64(uint64(pos)) + txResp.Transaction.Index = ptrToUint64(uint64(pos)) + } + txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) + if err != nil { + return status.Errorf(codes.Internal, "Failed to get transaction: %v", err) + } + txResp.Slot = uint64(txn.Slot) + // What to do for blocktime? + } + if err := ser.Send(txResp); err != nil { + return err + } + } + } + } } } @@ -734,9 +790,9 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_faithful_grpc.BlockResponse) *old_faithful_grpc.TransactionResponse { return &old_faithful_grpc.TransactionResponse{ Transaction: tx, - BlockTime: block.BlockTime, - Slot: block.Slot, - // What to do for index? + BlockTime: block.GetBlockTime(), + Slot: block.GetSlot(), + Index: tx.Index, } } From 3e5da4503a2af885b862eda764adde59a86e3ab7 Mon Sep 17 00:00:00 2001 From: anjor Date: Sat, 30 Nov 2024 20:46:54 +0000 Subject: [PATCH 06/15] missing return --- grpc-server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/grpc-server.go b/grpc-server.go index 5a5e9a73..ecd778c3 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -807,4 +807,5 @@ func (multi *MultiEpoch) streamAllTxns(ctx context.Context, ser old_faithful_grp return err } } + return nil } From f76cb838fad95da37cf1f281aec30c6f71acedf9 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 2 Dec 2024 10:44:41 +0000 Subject: [PATCH 07/15] filters --- grpc-server.go | 146 +++++++++++++++++++++++++++---------------------- 1 file changed, 81 insertions(+), 65 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index ecd778c3..65ca1c73 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -18,6 +18,7 @@ import ( "github.com/ipld/go-car/util" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/rpcpool/yellowstone-faithful/compactindexsized" + "github.com/rpcpool/yellowstone-faithful/gsfa" "github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog" "github.com/rpcpool/yellowstone-faithful/indexes" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" @@ -718,75 +719,15 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran default: } - if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { - if err := multi.streamAllTxns(ctx, ser, slot); err != nil { - return err - } - } else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 { - for _, account := range params.Filter.AccountInclude { - pKey := solana.MustPublicKeyFromBase58(account) - epochToTxns, err := gsfaReader.Get( - ctx, - pKey, - 1000, - func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) { - epoch, err := multi.GetEpoch(epochNum) - if err != nil { - return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err) - } - raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{ - Offset: oas.Offset, - Size: oas.Size, - }) - if err != nil { - return nil, fmt.Errorf("failed to get signature: %w", err) - } - decoded, err := iplddecoders.DecodeTransaction(raw) - if err != nil { - return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err) - } - return decoded, nil - }, - ) - - if err != nil { - return err - } - - for epochNumber, txns := range epochToTxns { - epochHandler, err := multi.GetEpoch(epochNumber) - if err != nil { - return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) - } - for _, txn := range txns { - txResp := new(old_faithful_grpc.TransactionResponse) - txResp.Transaction = new(old_faithful_grpc.Transaction) - { - pos, ok := txn.GetPositionIndex() - if ok { - txResp.Index = ptrToUint64(uint64(pos)) - txResp.Transaction.Index = ptrToUint64(uint64(pos)) - } - txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) - if err != nil { - return status.Errorf(codes.Internal, "Failed to get transaction: %v", err) - } - txResp.Slot = uint64(txn.Slot) - // What to do for blocktime? - } - - if err := ser.Send(txResp); err != nil { - return err - } - } - } - } - } + if err := multi.processSlotTransactions(ctx, ser, slot, params.filter, gsfareader); err != nil { + return err + } } - return nil } + + func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_faithful_grpc.BlockResponse) *old_faithful_grpc.TransactionResponse { return &old_faithful_grpc.TransactionResponse{ Transaction: tx, @@ -809,3 +750,78 @@ func (multi *MultiEpoch) streamAllTxns(ctx context.Context, ser old_faithful_grp } return nil } + + +func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter, gsfaReader *gsfa.GsfaReaderMultiepoch) error { + if filter == nil { + return multi.streamAllTxns(ctx, ser, slot) + } + + if len(filter.AccountInclude) == 0 { + if err := multi.streamAllTxns(ctx, ser, slot); err != nil { + return err + } + } else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 { + for _, account := range params.Filter.AccountInclude { + pKey := solana.MustPublicKeyFromBase58(account) + epochToTxns, err := gsfaReader.Get( + ctx, + pKey, + 1000, + func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) { + epoch, err := multi.GetEpoch(epochNum) + if err != nil { + return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err) + } + raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{ + Offset: oas.Offset, + Size: oas.Size, + }) + if err != nil { + return nil, fmt.Errorf("failed to get signature: %w", err) + } + decoded, err := iplddecoders.DecodeTransaction(raw) + if err != nil { + return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err) + } + return decoded, nil + }, + ) + + if err != nil { + return err + } + + for epochNumber, txns := range epochToTxns { + epochHandler, err := multi.GetEpoch(epochNumber) + if err != nil { + return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) + } + for _, txn := range txns { + txResp := new(old_faithful_grpc.TransactionResponse) + txResp.Transaction = new(old_faithful_grpc.Transaction) + { + pos, ok := txn.GetPositionIndex() + if ok { + txResp.Index = ptrToUint64(uint64(pos)) + txResp.Transaction.Index = ptrToUint64(uint64(pos)) + } + txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) + if err != nil { + return status.Errorf(codes.Internal, "Failed to get transaction: %v", err) + } + txResp.Slot = uint64(txn.Slot) + // What to do for blocktime? + } + + if err := ser.Send(txResp); err != nil { + return err + } + } + } + } + } +} + +return nil +} From cde0c65de3fa569d673985f4dd4b3b80c1621646 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 2 Dec 2024 10:46:03 +0000 Subject: [PATCH 08/15] filters --- grpc-server.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index 65ca1c73..1b44b44f 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -758,10 +758,10 @@ func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_fa } if len(filter.AccountInclude) == 0 { - if err := multi.streamAllTxns(ctx, ser, slot); err != nil { - return err - } - } else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 { + // Get all transactions + // Apply filters + // Send + } else { for _, account := range params.Filter.AccountInclude { pKey := solana.MustPublicKeyFromBase58(account) epochToTxns, err := gsfaReader.Get( @@ -814,6 +814,8 @@ func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_fa // What to do for blocktime? } + // apply more filters + if err := ser.Send(txResp); err != nil { return err } From a74fc180ffbda4820cc9b769c705e0f94ec94793 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 2 Dec 2024 14:03:13 +0000 Subject: [PATCH 09/15] return --- grpc-server.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index 1b44b44f..b810acd8 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -720,14 +720,12 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran } if err := multi.processSlotTransactions(ctx, ser, slot, params.filter, gsfareader); err != nil { - return err - } + return err + } } return nil } - - func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_faithful_grpc.BlockResponse) *old_faithful_grpc.TransactionResponse { return &old_faithful_grpc.TransactionResponse{ Transaction: tx, @@ -751,17 +749,16 @@ func (multi *MultiEpoch) streamAllTxns(ctx context.Context, ser old_faithful_grp return nil } - func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter, gsfaReader *gsfa.GsfaReaderMultiepoch) error { if filter == nil { return multi.streamAllTxns(ctx, ser, slot) } - if len(filter.AccountInclude) == 0 { + if len(filter.AccountInclude) == 0 { // Get all transactions // Apply filters // Send - } else { + } else { for _, account := range params.Filter.AccountInclude { pKey := solana.MustPublicKeyFromBase58(account) epochToTxns, err := gsfaReader.Get( @@ -823,7 +820,5 @@ func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_fa } } } -} - -return nil + return nil } From 85b6d2c63eea7ff9442cd34ebb02dfd2b50e0da2 Mon Sep 17 00:00:00 2001 From: anjor Date: Tue, 3 Dec 2024 20:56:43 +0000 Subject: [PATCH 10/15] latest --- grpc-server.go | 43 +++++++++++++++---------------------------- 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index b810acd8..0a3c0e2c 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -719,47 +719,34 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran default: } - if err := multi.processSlotTransactions(ctx, ser, slot, params.filter, gsfareader); err != nil { + if err := multi.processSlotTransactions(ctx, ser, slot, params.Filter, gsfaReader); err != nil { return err } } return nil } -func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_faithful_grpc.BlockResponse) *old_faithful_grpc.TransactionResponse { - return &old_faithful_grpc.TransactionResponse{ - Transaction: tx, - BlockTime: block.GetBlockTime(), - Slot: block.GetSlot(), - Index: tx.Index, - } -} +func (multi *MultiEpoch) processSlotTransactions( + ctx context.Context, + ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, + slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter, + gsfaReader *gsfa.GsfaReaderMultiepoch, +) error { -func (multi *MultiEpoch) streamAllTxns(ctx context.Context, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, slot uint64) error { - block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) - if err != nil && status.Code(err) != codes.NotFound { - return err - } + filterFunc := func(txn *ipldbindcode.Transaction) bool { + // fill this out - for _, tx := range block.Transactions { - if err := ser.Send(constructTransactionResponse(tx, block)); err != nil { - return err - } + return true } - return nil -} -func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter, gsfaReader *gsfa.GsfaReaderMultiepoch) error { - if filter == nil { - return multi.streamAllTxns(ctx, ser, slot) - } + if filter == nil || len(filter.AccountInclude) == 0 { - if len(filter.AccountInclude) == 0 { - // Get all transactions + // get block -> not sure which one to use + // block -> transactions // Apply filters // Send } else { - for _, account := range params.Filter.AccountInclude { + for _, account := range filter.AccountInclude { pKey := solana.MustPublicKeyFromBase58(account) epochToTxns, err := gsfaReader.Get( ctx, @@ -811,7 +798,7 @@ func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_fa // What to do for blocktime? } - // apply more filters + // not sure how to apply more filters if err := ser.Send(txResp); err != nil { return err From 091d80e4c6ec3cb92e301b6bff45d71ddfa7bc5c Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 9 Dec 2024 13:48:47 +0000 Subject: [PATCH 11/15] add filters --- grpc-server.go | 82 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 20 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index 0a3c0e2c..b55f114c 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -733,8 +733,45 @@ func (multi *MultiEpoch) processSlotTransactions( gsfaReader *gsfa.GsfaReaderMultiepoch, ) error { - filterFunc := func(txn *ipldbindcode.Transaction) bool { - // fill this out + filterOutTxn := func(tx solana.Transaction, meta any) bool { + if filter == nil { + return true + } + + // add Vote + + if !(*filter.Failed) { // If failed is false, we should filter out failed transactions + err := getErr(meta) + if err != nil { + return false + } + } + + // AccountInclude is handled in the main function + + for _, acc := range filter.AccountExclude { + pkey := solana.MustPublicKeyFromBase58(acc) + ok, err := tx.HasAccount(pkey) + if err != nil { + klog.Errorf("Failed to check if transaction %v has account %s", tx, acc) + return false + } + if ok { // If any excluded account is present, filter out the transaction + return false + } + } + + for _, acc := range filter.AccountRequired { + pkey := solana.MustPublicKeyFromBase58(acc) + ok, err := tx.HasAccount(pkey) + if err != nil { + klog.Errorf("Failed to check if transaction %v has account %s", tx, acc) + return false + } + if !ok { // If any required account is missing, filter out the transaction + return false + } + } return true } @@ -771,7 +808,6 @@ func (multi *MultiEpoch) processSlotTransactions( return decoded, nil }, ) - if err != nil { return err } @@ -782,26 +818,32 @@ func (multi *MultiEpoch) processSlotTransactions( return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) } for _, txn := range txns { - txResp := new(old_faithful_grpc.TransactionResponse) - txResp.Transaction = new(old_faithful_grpc.Transaction) - { - pos, ok := txn.GetPositionIndex() - if ok { - txResp.Index = ptrToUint64(uint64(pos)) - txResp.Transaction.Index = ptrToUint64(uint64(pos)) - } - txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) - if err != nil { - return status.Errorf(codes.Internal, "Failed to get transaction: %v", err) - } - txResp.Slot = uint64(txn.Slot) - // What to do for blocktime? + tx, meta, err := parseTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) + if err != nil { + return status.Errorf(codes.Internal, "Failed to parse transaction from node: %v", err) } - // not sure how to apply more filters + if !filterOutTxn(tx, meta) { + + txResp := new(old_faithful_grpc.TransactionResponse) + txResp.Transaction = new(old_faithful_grpc.Transaction) + { + pos, ok := txn.GetPositionIndex() + if ok { + txResp.Index = ptrToUint64(uint64(pos)) + txResp.Transaction.Index = ptrToUint64(uint64(pos)) + } + txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) + if err != nil { + return status.Errorf(codes.Internal, "Failed to get transaction: %v", err) + } + txResp.Slot = uint64(txn.Slot) + // What to do for blocktime? + } - if err := ser.Send(txResp); err != nil { - return err + if err := ser.Send(txResp); err != nil { + return err + } } } } From 5d7c7961358437378f4ff2f86b87d5e48eefe2f7 Mon Sep 17 00:00:00 2001 From: anjor Date: Tue, 10 Dec 2024 11:34:38 +0000 Subject: [PATCH 12/15] all txns --- grpc-server.go | 44 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index b55f114c..720838ae 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -13,6 +13,7 @@ import ( "sync" "time" + bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" "github.com/ipfs/go-cid" "github.com/ipld/go-car/util" @@ -24,6 +25,7 @@ import ( "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc" + solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers" "github.com/rpcpool/yellowstone-faithful/tooling" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -778,10 +780,44 @@ func (multi *MultiEpoch) processSlotTransactions( if filter == nil || len(filter.AccountInclude) == 0 { - // get block -> not sure which one to use - // block -> transactions - // Apply filters - // Send + block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) + if err != nil { + if status.Code(err) == codes.NotFound { + return nil + } + return err + } + + for _, tx := range block.Transactions { + decoder := bin.NewBinDecoder(tx.Transaction) + txn, err := solana.TransactionFromDecoder(decoder) + if err != nil { + return status.Errorf(codes.Internal, "Failed to decode transaction: %v", err) + } + + meta, err := solanatxmetaparsers.ParseAnyTransactionStatusMeta(tx.Meta) + if err != nil { + return status.Errorf(codes.Internal, "Failed to parse transaction meta: %v", err) + } + + if !filterOutTxn(*txn, meta) { + + txResp := new(old_faithful_grpc.TransactionResponse) + txResp.Transaction = new(old_faithful_grpc.Transaction) + + { + txResp.Transaction.Transaction = tx.Transaction + txResp.Transaction.Meta = tx.Meta + + // how to get index + // how to get blocktime + } + + if err := ser.Send(txResp); err != nil { + return err + } + } + } } else { for _, account := range filter.AccountInclude { pKey := solana.MustPublicKeyFromBase58(account) From c56cd24fc042b0ef3d2296598a9b99422090b7fc Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 11 Dec 2024 09:41:59 +0000 Subject: [PATCH 13/15] check vote --- grpc-server.go | 4 +++- vote.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 vote.go diff --git a/grpc-server.go b/grpc-server.go index 720838ae..f75b1200 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -740,7 +740,9 @@ func (multi *MultiEpoch) processSlotTransactions( return true } - // add Vote + if !(*filter.Vote) && IsSimpleVoteTransaction(&tx) { // If vote is false, we should filter out vote transactions + return false + } if !(*filter.Failed) { // If failed is false, we should filter out failed transactions err := getErr(meta) diff --git a/vote.go b/vote.go new file mode 100644 index 00000000..27692347 --- /dev/null +++ b/vote.go @@ -0,0 +1,33 @@ +package main + +import ( + "github.com/gagliardetto/solana-go" +) + +// IsSimpleVoteTransaction checks if a transaction is a simple vote transaction. +// A simple vote transaction meets these conditions: +// 1. has 1 or 2 signatures +// 2. is legacy message (this is implicit in solana-go as it mainly handles legacy messages) +// 3. has only one instruction +// 4. which must be Vote instruction +func IsSimpleVoteTransaction(tx *solana.Transaction) bool { + // Check signature count (condition 1) + if len(tx.Signatures) == 0 || len(tx.Signatures) > 2 { + return false + } + + // Check instruction count (condition 3) + instructions := tx.Message.Instructions + if len(instructions) != 1 { + return false + } + + // Get the program ID for the instruction + programID := tx.Message.AccountKeys[instructions[0].ProgramIDIndex] + + // Check if it's a Vote instruction (condition 4) + // Note: This is the Vote Program ID on Solana mainnet + voteProgram := solana.VoteProgramID // This is a built-in constant in solana-go + + return programID.Equals(voteProgram) +} From be21252d722aa18fdc88ea5ed760a7906e58a308 Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 11 Dec 2024 11:17:49 +0000 Subject: [PATCH 14/15] index --- grpc-server.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index f75b1200..a98c9669 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -810,8 +810,7 @@ func (multi *MultiEpoch) processSlotTransactions( { txResp.Transaction.Transaction = tx.Transaction txResp.Transaction.Meta = tx.Meta - - // how to get index + txResp.Transaction.Index = tx.Index // how to get blocktime } From 889b733814029104c4d3b9257a0897dcb85e2995 Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 11 Dec 2024 14:52:13 +0000 Subject: [PATCH 15/15] slot checking --- grpc-server.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index a98c9669..a04c2007 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -811,7 +811,8 @@ func (multi *MultiEpoch) processSlotTransactions( txResp.Transaction.Transaction = tx.Transaction txResp.Transaction.Meta = tx.Meta txResp.Transaction.Index = tx.Index - // how to get blocktime + + // To do: add blocketime after index work is done } if err := ser.Send(txResp); err != nil { @@ -855,6 +856,9 @@ func (multi *MultiEpoch) processSlotTransactions( return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) } for _, txn := range txns { + if slot != uint64(txn.Slot) { // If the transaction is not in the requested slot, skip + continue + } tx, meta, err := parseTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) if err != nil { return status.Errorf(codes.Internal, "Failed to parse transaction from node: %v", err) @@ -875,7 +879,8 @@ func (multi *MultiEpoch) processSlotTransactions( return status.Errorf(codes.Internal, "Failed to get transaction: %v", err) } txResp.Slot = uint64(txn.Slot) - // What to do for blocktime? + + // To do: add blocketime after index work is done } if err := ser.Send(txResp); err != nil {