diff --git a/Makefile b/Makefile index c62904a..10e9a12 100644 --- a/Makefile +++ b/Makefile @@ -22,4 +22,4 @@ build-server: protos: rm -rf borepb mkdir -p borepb - protoc --go_out=./borepb protos/*.proto + protoc --go_out=./borepb protos/*.proto -I protos diff --git a/borepb/message.pb.go b/borepb/message.pb.go new file mode 100644 index 0000000..829b14f --- /dev/null +++ b/borepb/message.pb.go @@ -0,0 +1,203 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v6.33.4 +// source: message.proto + +package __ + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Message struct { + state protoimpl.MessageState `protogen:"open.v1"` + MessageId string `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` + // Types that are valid to be assigned to Payload: + // + // *Message_Request + // *Message_Response + // *Message_Socket + Payload isMessage_Payload `protobuf_oneof:"payload"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Message) Reset() { + *x = Message{} + mi := &file_message_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetMessageId() string { + if x != nil { + return x.MessageId + } + return "" +} + +func (x *Message) GetPayload() isMessage_Payload { + if x != nil { + return x.Payload + } + return nil +} + +func (x *Message) GetRequest() *Request { + if x != nil { + if x, ok := x.Payload.(*Message_Request); ok { + return x.Request + } + } + return nil +} + +func (x *Message) GetResponse() *Response { + if x != nil { + if x, ok := x.Payload.(*Message_Response); ok { + return x.Response + } + } + return nil +} + +func (x *Message) GetSocket() *Socket { + if x != nil { + if x, ok := x.Payload.(*Message_Socket); ok { + return x.Socket + } + } + return nil +} + +type isMessage_Payload interface { + isMessage_Payload() +} + +type Message_Request struct { + Request *Request `protobuf:"bytes,2,opt,name=request,proto3,oneof"` +} + +type Message_Response struct { + Response *Response `protobuf:"bytes,3,opt,name=response,proto3,oneof"` +} + +type Message_Socket struct { + Socket *Socket `protobuf:"bytes,4,opt,name=socket,proto3,oneof"` +} + +func (*Message_Request) isMessage_Payload() {} + +func (*Message_Response) isMessage_Payload() {} + +func (*Message_Socket) isMessage_Payload() {} + +var File_message_proto protoreflect.FileDescriptor + +const file_message_proto_rawDesc = "" + + "\n" + + "\rmessage.proto\x12\x06borepb\x1a\rrequest.proto\x1a\x0eresponse.proto\x1a\fsocket.proto\"\xba\x01\n" + + "\aMessage\x12\x1d\n" + + "\n" + + "message_id\x18\x01 \x01(\tR\tmessageId\x12+\n" + + "\arequest\x18\x02 \x01(\v2\x0f.borepb.RequestH\x00R\arequest\x12.\n" + + "\bresponse\x18\x03 \x01(\v2\x10.borepb.ResponseH\x00R\bresponse\x12(\n" + + "\x06socket\x18\x04 \x01(\v2\x0e.borepb.SocketH\x00R\x06socketB\t\n" + + "\apayloadB\x03Z\x01.b\x06proto3" + +var ( + file_message_proto_rawDescOnce sync.Once + file_message_proto_rawDescData []byte +) + +func file_message_proto_rawDescGZIP() []byte { + file_message_proto_rawDescOnce.Do(func() { + file_message_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_message_proto_rawDesc), len(file_message_proto_rawDesc))) + }) + return file_message_proto_rawDescData +} + +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_message_proto_goTypes = []any{ + (*Message)(nil), // 0: borepb.Message + (*Request)(nil), // 1: borepb.Request + (*Response)(nil), // 2: borepb.Response + (*Socket)(nil), // 3: borepb.Socket +} +var file_message_proto_depIdxs = []int32{ + 1, // 0: borepb.Message.request:type_name -> borepb.Request + 2, // 1: borepb.Message.response:type_name -> borepb.Response + 3, // 2: borepb.Message.socket:type_name -> borepb.Socket + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_message_proto_init() } +func file_message_proto_init() { + if File_message_proto != nil { + return + } + file_request_proto_init() + file_response_proto_init() + file_socket_proto_init() + file_message_proto_msgTypes[0].OneofWrappers = []any{ + (*Message_Request)(nil), + (*Message_Response)(nil), + (*Message_Socket)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_message_proto_rawDesc), len(file_message_proto_rawDesc)), + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_message_proto_goTypes, + DependencyIndexes: file_message_proto_depIdxs, + MessageInfos: file_message_proto_msgTypes, + }.Build() + File_message_proto = out.File + file_message_proto_goTypes = nil + file_message_proto_depIdxs = nil +} diff --git a/borepb/request.pb.go b/borepb/request.pb.go index e336959..202745f 100644 --- a/borepb/request.pb.go +++ b/borepb/request.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v6.33.2 -// source: protos/request.proto +// protoc v6.33.4 +// source: request.proto package __ @@ -23,20 +23,19 @@ const ( type Request struct { state protoimpl.MessageState `protogen:"open.v1"` - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Method string `protobuf:"bytes,2,opt,name=method,proto3" json:"method,omitempty"` - Path string `protobuf:"bytes,3,opt,name=path,proto3" json:"path,omitempty"` - Headers map[string]string `protobuf:"bytes,4,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` - Body []byte `protobuf:"bytes,5,opt,name=body,proto3" json:"body,omitempty"` - Timestamp int64 `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Cookies string `protobuf:"bytes,7,opt,name=cookies,proto3" json:"cookies,omitempty"` + Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"` + Path string `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` + Headers map[string]string `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` + Timestamp int64 `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Cookies string `protobuf:"bytes,6,opt,name=cookies,proto3" json:"cookies,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *Request) Reset() { *x = Request{} - mi := &file_protos_request_proto_msgTypes[0] + mi := &file_request_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -48,7 +47,7 @@ func (x *Request) String() string { func (*Request) ProtoMessage() {} func (x *Request) ProtoReflect() protoreflect.Message { - mi := &file_protos_request_proto_msgTypes[0] + mi := &file_request_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -61,14 +60,7 @@ func (x *Request) ProtoReflect() protoreflect.Message { // Deprecated: Use Request.ProtoReflect.Descriptor instead. func (*Request) Descriptor() ([]byte, []int) { - return file_protos_request_proto_rawDescGZIP(), []int{0} -} - -func (x *Request) GetId() string { - if x != nil { - return x.Id - } - return "" + return file_request_proto_rawDescGZIP(), []int{0} } func (x *Request) GetMethod() string { @@ -113,41 +105,40 @@ func (x *Request) GetCookies() string { return "" } -var File_protos_request_proto protoreflect.FileDescriptor +var File_request_proto protoreflect.FileDescriptor -const file_protos_request_proto_rawDesc = "" + +const file_request_proto_rawDesc = "" + "\n" + - "\x14protos/request.proto\x12\x06borepb\"\x85\x02\n" + - "\aRequest\x12\x0e\n" + - "\x02id\x18\x01 \x01(\tR\x02id\x12\x16\n" + - "\x06method\x18\x02 \x01(\tR\x06method\x12\x12\n" + - "\x04path\x18\x03 \x01(\tR\x04path\x126\n" + - "\aheaders\x18\x04 \x03(\v2\x1c.borepb.Request.HeadersEntryR\aheaders\x12\x12\n" + - "\x04body\x18\x05 \x01(\fR\x04body\x12\x1c\n" + - "\ttimestamp\x18\x06 \x01(\x03R\ttimestamp\x12\x18\n" + - "\acookies\x18\a \x01(\tR\acookies\x1a:\n" + + "\rrequest.proto\x12\x06borepb\"\xf5\x01\n" + + "\aRequest\x12\x16\n" + + "\x06method\x18\x01 \x01(\tR\x06method\x12\x12\n" + + "\x04path\x18\x02 \x01(\tR\x04path\x126\n" + + "\aheaders\x18\x03 \x03(\v2\x1c.borepb.Request.HeadersEntryR\aheaders\x12\x12\n" + + "\x04body\x18\x04 \x01(\fR\x04body\x12\x1c\n" + + "\ttimestamp\x18\x05 \x01(\x03R\ttimestamp\x12\x18\n" + + "\acookies\x18\x06 \x01(\tR\acookies\x1a:\n" + "\fHeadersEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01B\x03Z\x01.b\x06proto3" var ( - file_protos_request_proto_rawDescOnce sync.Once - file_protos_request_proto_rawDescData []byte + file_request_proto_rawDescOnce sync.Once + file_request_proto_rawDescData []byte ) -func file_protos_request_proto_rawDescGZIP() []byte { - file_protos_request_proto_rawDescOnce.Do(func() { - file_protos_request_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_protos_request_proto_rawDesc), len(file_protos_request_proto_rawDesc))) +func file_request_proto_rawDescGZIP() []byte { + file_request_proto_rawDescOnce.Do(func() { + file_request_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_request_proto_rawDesc), len(file_request_proto_rawDesc))) }) - return file_protos_request_proto_rawDescData + return file_request_proto_rawDescData } -var file_protos_request_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_protos_request_proto_goTypes = []any{ +var file_request_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_request_proto_goTypes = []any{ (*Request)(nil), // 0: borepb.Request nil, // 1: borepb.Request.HeadersEntry } -var file_protos_request_proto_depIdxs = []int32{ +var file_request_proto_depIdxs = []int32{ 1, // 0: borepb.Request.headers:type_name -> borepb.Request.HeadersEntry 1, // [1:1] is the sub-list for method output_type 1, // [1:1] is the sub-list for method input_type @@ -156,26 +147,26 @@ var file_protos_request_proto_depIdxs = []int32{ 0, // [0:1] is the sub-list for field type_name } -func init() { file_protos_request_proto_init() } -func file_protos_request_proto_init() { - if File_protos_request_proto != nil { +func init() { file_request_proto_init() } +func file_request_proto_init() { + if File_request_proto != nil { return } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_protos_request_proto_rawDesc), len(file_protos_request_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_request_proto_rawDesc), len(file_request_proto_rawDesc)), NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_protos_request_proto_goTypes, - DependencyIndexes: file_protos_request_proto_depIdxs, - MessageInfos: file_protos_request_proto_msgTypes, + GoTypes: file_request_proto_goTypes, + DependencyIndexes: file_request_proto_depIdxs, + MessageInfos: file_request_proto_msgTypes, }.Build() - File_protos_request_proto = out.File - file_protos_request_proto_goTypes = nil - file_protos_request_proto_depIdxs = nil + File_request_proto = out.File + file_request_proto_goTypes = nil + file_request_proto_depIdxs = nil } diff --git a/borepb/response.pb.go b/borepb/response.pb.go index deb48b7..ede51da 100644 --- a/borepb/response.pb.go +++ b/borepb/response.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v6.33.2 -// source: protos/response.proto +// protoc v6.33.4 +// source: response.proto package __ @@ -23,19 +23,18 @@ const ( type Response struct { state protoimpl.MessageState `protogen:"open.v1"` - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - StatusCode int32 `protobuf:"varint,2,opt,name=status_code,json=statusCode,proto3" json:"status_code,omitempty"` - Headers map[string]string `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` - Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` - Timestamp int64 `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Cookies string `protobuf:"bytes,6,opt,name=cookies,proto3" json:"cookies,omitempty"` + StatusCode int32 `protobuf:"varint,1,opt,name=status_code,json=statusCode,proto3" json:"status_code,omitempty"` + Headers map[string]string `protobuf:"bytes,2,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"` + Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Cookies string `protobuf:"bytes,5,opt,name=cookies,proto3" json:"cookies,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *Response) Reset() { *x = Response{} - mi := &file_protos_response_proto_msgTypes[0] + mi := &file_response_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -47,7 +46,7 @@ func (x *Response) String() string { func (*Response) ProtoMessage() {} func (x *Response) ProtoReflect() protoreflect.Message { - mi := &file_protos_response_proto_msgTypes[0] + mi := &file_response_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -60,14 +59,7 @@ func (x *Response) ProtoReflect() protoreflect.Message { // Deprecated: Use Response.ProtoReflect.Descriptor instead. func (*Response) Descriptor() ([]byte, []int) { - return file_protos_response_proto_rawDescGZIP(), []int{0} -} - -func (x *Response) GetId() string { - if x != nil { - return x.Id - } - return "" + return file_response_proto_rawDescGZIP(), []int{0} } func (x *Response) GetStatusCode() int32 { @@ -105,41 +97,40 @@ func (x *Response) GetCookies() string { return "" } -var File_protos_response_proto protoreflect.FileDescriptor +var File_response_proto protoreflect.FileDescriptor -const file_protos_response_proto_rawDesc = "" + +const file_response_proto_rawDesc = "" + "\n" + - "\x15protos/response.proto\x12\x06borepb\"\xfc\x01\n" + - "\bResponse\x12\x0e\n" + - "\x02id\x18\x01 \x01(\tR\x02id\x12\x1f\n" + - "\vstatus_code\x18\x02 \x01(\x05R\n" + + "\x0eresponse.proto\x12\x06borepb\"\xec\x01\n" + + "\bResponse\x12\x1f\n" + + "\vstatus_code\x18\x01 \x01(\x05R\n" + "statusCode\x127\n" + - "\aheaders\x18\x03 \x03(\v2\x1d.borepb.Response.HeadersEntryR\aheaders\x12\x12\n" + - "\x04body\x18\x04 \x01(\fR\x04body\x12\x1c\n" + - "\ttimestamp\x18\x05 \x01(\x03R\ttimestamp\x12\x18\n" + - "\acookies\x18\x06 \x01(\tR\acookies\x1a:\n" + + "\aheaders\x18\x02 \x03(\v2\x1d.borepb.Response.HeadersEntryR\aheaders\x12\x12\n" + + "\x04body\x18\x03 \x01(\fR\x04body\x12\x1c\n" + + "\ttimestamp\x18\x04 \x01(\x03R\ttimestamp\x12\x18\n" + + "\acookies\x18\x05 \x01(\tR\acookies\x1a:\n" + "\fHeadersEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01B\x03Z\x01.b\x06proto3" var ( - file_protos_response_proto_rawDescOnce sync.Once - file_protos_response_proto_rawDescData []byte + file_response_proto_rawDescOnce sync.Once + file_response_proto_rawDescData []byte ) -func file_protos_response_proto_rawDescGZIP() []byte { - file_protos_response_proto_rawDescOnce.Do(func() { - file_protos_response_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_protos_response_proto_rawDesc), len(file_protos_response_proto_rawDesc))) +func file_response_proto_rawDescGZIP() []byte { + file_response_proto_rawDescOnce.Do(func() { + file_response_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_response_proto_rawDesc), len(file_response_proto_rawDesc))) }) - return file_protos_response_proto_rawDescData + return file_response_proto_rawDescData } -var file_protos_response_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_protos_response_proto_goTypes = []any{ +var file_response_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_response_proto_goTypes = []any{ (*Response)(nil), // 0: borepb.Response nil, // 1: borepb.Response.HeadersEntry } -var file_protos_response_proto_depIdxs = []int32{ +var file_response_proto_depIdxs = []int32{ 1, // 0: borepb.Response.headers:type_name -> borepb.Response.HeadersEntry 1, // [1:1] is the sub-list for method output_type 1, // [1:1] is the sub-list for method input_type @@ -148,26 +139,26 @@ var file_protos_response_proto_depIdxs = []int32{ 0, // [0:1] is the sub-list for field type_name } -func init() { file_protos_response_proto_init() } -func file_protos_response_proto_init() { - if File_protos_response_proto != nil { +func init() { file_response_proto_init() } +func file_response_proto_init() { + if File_response_proto != nil { return } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_protos_response_proto_rawDesc), len(file_protos_response_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_response_proto_rawDesc), len(file_response_proto_rawDesc)), NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_protos_response_proto_goTypes, - DependencyIndexes: file_protos_response_proto_depIdxs, - MessageInfos: file_protos_response_proto_msgTypes, + GoTypes: file_response_proto_goTypes, + DependencyIndexes: file_response_proto_depIdxs, + MessageInfos: file_response_proto_msgTypes, }.Build() - File_protos_response_proto = out.File - file_protos_response_proto_goTypes = nil - file_protos_response_proto_depIdxs = nil + File_response_proto = out.File + file_response_proto_goTypes = nil + file_response_proto_depIdxs = nil } diff --git a/internal/client/client.go b/internal/client/client.go index 4780dec..249ddbc 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -49,7 +49,7 @@ func (bc *BoreClient) NewWSConnection() error { WriteBufferSize: 1024, } - wsConnStr := fmt.Sprintf("%s://%s/ws", WSScheme, BoreServerHost) + wsConnStr := fmt.Sprintf("%s://%s/register", WSScheme, BoreServerHost) bc.logger.Debug("attempting websocket connection", zap.String("url", wsConnStr)) conn, res, err := dialer.Dial(wsConnStr, nil) @@ -94,72 +94,77 @@ func (bc *BoreClient) HandleWSMessages() error { bc.logger.Info("starting to handle websocket messages") for { - _, message, err := bc.wsConn.ReadMessage() + _, mes, err := bc.wsConn.ReadMessage() if err != nil { bc.logger.Error("error reading websocket message", zap.Error(err)) return err } - var request borepb.Request + var message borepb.Message - err = proto.Unmarshal(message, &request) + err = proto.Unmarshal(mes, &message) if err != nil { bc.logger.Error("failed to unmarshal protobuf message", zap.Error(err)) return err } - bc.logger.Debug("received request", zap.String("reqId", request.Id), zap.String("method", request.Method), zap.String("path", request.Path)) - - cookies, _ := http.ParseCookie(request.Cookies) - - ctx := context.WithValue(context.TODO(), traffik.RequestIDKey, request.Id) - - req := bc.resty. - NewRequest(). - SetContext(ctx). - SetMethod(request.Method). - SetURL(request.Path). - SetBody(request.Body). - SetCookies(cookies). - SetHeaders(request.Headers) - - bc.Traffik.LogRequest(req) - - res, err := req.Send() - if err != nil { - bc.logger.Error("failed to send request", zap.String("reqId", request.Id), zap.Error(err)) - return err - } - - bc.logger.Debug("response received", zap.String("reqId", request.Id), zap.Int("statusCode", res.StatusCode())) - bc.Traffik.LogResponse(res) - - response := borepb.Response{ - Id: request.Id, - StatusCode: int32(res.StatusCode()), - Body: res.Bytes(), - Timestamp: res.ReceivedAt().UnixMilli(), - Headers: make(map[string]string), + switch msg := message.Payload.(type) { + case *borepb.Message_Request: + bc.logger.Debug("received request", zap.String("reqId", message.MessageId), zap.String("method", msg.Request.Method), zap.String("path", msg.Request.Path)) + cookies, _ := http.ParseCookie(msg.Request.Cookies) + ctx := context.WithValue(context.TODO(), traffik.RequestIDKey, message.MessageId) + req := bc.resty. + NewRequest(). + SetContext(ctx). + SetMethod(msg.Request.Method). + SetURL(msg.Request.Path). + SetBody(msg.Request.Body). + SetCookies(cookies). + SetHeaders(msg.Request.Headers) + bc.Traffik.LogRequest(req) + + res, err := req.Send() + if err != nil { + bc.logger.Error("failed to send request", zap.String("reqId", message.MessageId), zap.Error(err)) + return err + } + + bc.logger.Debug("response received", zap.String("reqId", message.MessageId), zap.Int("statusCode", res.StatusCode())) + bc.Traffik.LogResponse(res) + + response := borepb.Response{ + StatusCode: int32(res.StatusCode()), + Body: res.Bytes(), + Timestamp: res.ReceivedAt().UnixMilli(), + Headers: make(map[string]string), + } + + for headerName, headerValues := range res.Header() { + response.Headers[headerName] = strings.Join(headerValues, ",") + } + + responseMessage := borepb.Message{ + MessageId: message.MessageId, + Payload: &borepb.Message_Response{Response: &response}, + } + + resBytes, err := proto.Marshal(&responseMessage) + if err != nil { + bc.logger.Error("failed to marshal response", zap.String("reqId", responseMessage.MessageId), zap.Error(err)) + return err + } + + bc.wsMutex.Lock() + err = bc.wsConn.WriteMessage(websocket.BinaryMessage, resBytes) + bc.wsMutex.Unlock() + if err != nil { + bc.logger.Error("failed to write response to websocket", zap.String("reqId", responseMessage.MessageId), zap.Error(err)) + return err + } + + bc.logger.Debug("response sent", zap.String("reqId", responseMessage.MessageId)) } - for headerName, headerValues := range res.Header() { - response.Headers[headerName] = strings.Join(headerValues, ",") - } - - resBytes, err := proto.Marshal(&response) - if err != nil { - bc.logger.Error("failed to marshal response", zap.String("reqId", request.Id), zap.Error(err)) - return err - } - - bc.wsMutex.Lock() - err = bc.wsConn.WriteMessage(websocket.BinaryMessage, resBytes) - bc.wsMutex.Unlock() - if err != nil { - bc.logger.Error("failed to write response to websocket", zap.String("reqId", request.Id), zap.Error(err)) - return err - } - bc.logger.Debug("response sent", zap.String("reqId", request.Id)) } } diff --git a/internal/server/app/app.go b/internal/server/app/app.go new file mode 100644 index 0000000..01b6d48 --- /dev/null +++ b/internal/server/app/app.go @@ -0,0 +1,190 @@ +package appregistry + +import ( + borepb "bore/borepb" + "context" + "fmt" + "sync" + "time" + + "github.com/gorilla/websocket" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +type AppConfig struct { + AppId string + DownstreamWSConn *websocket.Conn + Logger *zap.Logger +} + +type App struct { + AppId string + WsConn *websocket.Conn // WS conn to bore client (downstream) + WsMutex *sync.RWMutex // Mutex to synchronize writes to WS conn + fromUpstreamChan chan *borepb.Message // Channel to receive messages from upstream (nginx) + fromDownstreamChan chan *borepb.Message // Channel to receive messages from downstream (bore client) + logger *zap.Logger + appRegistry *AppRegistry + shutdownCtx context.Context + shutdown context.CancelFunc + once sync.Once +} + +func (app *App) handleMessagesFromUpstream() <-chan error { + errCh := make(chan error, 1) + + for message := range app.fromUpstreamChan { + mes, err := proto.Marshal(message) + if err != nil { + errCh <- fmt.Errorf("error marshalling message: %w", err) + return errCh + } + + switch message.Payload.(type) { + case *borepb.Message_Request: + app.WsMutex.Lock() + err = app.WsConn.WriteMessage(websocket.BinaryMessage, mes) + app.WsMutex.Unlock() + if err != nil { + errCh <- fmt.Errorf("error writing message to downstream WS connection: %w", err) + return errCh + } + } + } + + app.logger.Warn("fromUpstreamChan closed, stopping handleMessagesFromUpstream goroutine") + + return nil +} + +func (app *App) handleMessagesFromDownstream() <-chan error { + errCh := make(chan error, 1) + + for { + var message borepb.Message + + _, mes, err := app.WsConn.ReadMessage() + if err != nil { + errCh <- fmt.Errorf("error reading message from downstream WS connection: %w", err) + return errCh + } + + err = proto.Unmarshal(mes, &message) + if err != nil { + errCh <- fmt.Errorf("error unmarshalling message from downstream WS connection: %w", err) + return errCh + } + + app.fromDownstreamChan <- &message + } +} + +func (app *App) keepDownstreamAlive() <-chan error { + errCh := make(chan error, 1) + + pingInterval := time.Duration(10 * time.Second) + ticker := time.NewTicker(pingInterval) + + defer ticker.Stop() + + for range ticker.C { + app.WsMutex.Lock() + err := app.WsConn.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)) + app.WsMutex.Unlock() + + if err != nil { + errCh <- fmt.Errorf("error sending ping message: %w", err) + return errCh + } + } + + return errCh +} + +func (app *App) WriteMessageToDownStream(message *borepb.Message) { + app.fromUpstreamChan <- message +} + +func (app *App) ReadMessagesFromDownstream() <-chan *borepb.Message { + return app.fromDownstreamChan +} + +func (app *App) Done() <-chan struct{} { + return app.shutdownCtx.Done() +} + +func (app *App) Destroy() { + app.once.Do(func() { + app.WsMutex.Lock() + defer app.WsMutex.Unlock() + + app.shutdown() + + close(app.fromUpstreamChan) + close(app.fromDownstreamChan) + + app.fromUpstreamChan = nil + app.fromDownstreamChan = nil + + if app.WsConn != nil { + app.WsConn.Close() + } + + app.appRegistry.DeleteApp(app.AppId) + }) +} + +func newApp(appCfg AppConfig) (*App, error) { + appRegistry := NewAppRegistry() + shutdownCtx, shutdown := context.WithCancel(context.Background()) + + app := App{ + AppId: appCfg.AppId, + WsConn: appCfg.DownstreamWSConn, + WsMutex: &sync.RWMutex{}, + logger: appCfg.Logger, + fromUpstreamChan: make(chan *borepb.Message, 10), + fromDownstreamChan: make(chan *borepb.Message, 10), + appRegistry: appRegistry, + shutdownCtx: shutdownCtx, + shutdown: shutdown, + } + + err := appRegistry.addApp(&app) + if err != nil { + return nil, err + } + + go func() { + select { + case err := <-app.keepDownstreamAlive(): + app.logger.Error("error in keepDownstreamAlive goroutine", zap.Error(err)) + app.Destroy() + case <-app.shutdownCtx.Done(): + app.logger.Error("shutdown signal received, stopping keepDownstreamAlive goroutine") + } + }() + + go func() { + select { + case err := <-app.handleMessagesFromDownstream(): + app.logger.Error("error in handleMessagesFromDownstream goroutine", zap.Error(err)) + app.Destroy() + case <-app.shutdownCtx.Done(): + app.logger.Error("shutdown signal received, stopping handleMessagesFromDownstream goroutine") + } + }() + + go func() { + select { + case err := <-app.handleMessagesFromUpstream(): + app.logger.Error("error in handleMessagesFromUpstream goroutine", zap.Error(err)) + app.Destroy() + case <-app.shutdownCtx.Done(): + app.logger.Error("shutdown signal received, stopping handleMessagesFromUpstream goroutine") + } + }() + + return &app, nil +} diff --git a/internal/server/app/appRegistry.go b/internal/server/app/appRegistry.go new file mode 100644 index 0000000..3a3412c --- /dev/null +++ b/internal/server/app/appRegistry.go @@ -0,0 +1,68 @@ +package appregistry + +import ( + "fmt" + "sync" + + haikunator "github.com/atrox/haikunatorgo/v2" +) + +type AppRegistry struct { + haikunator *haikunator.Haikunator + apps map[string]*App + mutex *sync.RWMutex +} + +var once sync.Once +var appRegistry *AppRegistry + +func (r *AppRegistry) addApp(app *App) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + if _, ok := r.apps[app.AppId]; ok { + return fmt.Errorf("app with id %s already exists", app.AppId) + } + + r.apps[app.AppId] = app + return nil +} + +func (r *AppRegistry) GetApp(appId string) (*App, bool) { + r.mutex.RLock() + defer r.mutex.RUnlock() + app, ok := r.apps[appId] + + return app, ok +} + +func (r *AppRegistry) DeleteApp(appId string) { + r.mutex.Lock() + defer r.mutex.Unlock() + delete(r.apps, appId) +} + +func (r *AppRegistry) NewAppId() string { + return r.haikunator.Haikunate() +} + +func (r *AppRegistry) NewApp(appCfg AppConfig) (*App, error) { + return newApp(appCfg) +} + +func NewAppRegistry() *AppRegistry { + once.Do(func() { + h := haikunator.New() + h.TokenLength = 5 + h.TokenChars = "abcdefghijklmnopqrstuvwxyz0123456789" + + appRegistry = &AppRegistry{ + apps: make(map[string]*App), + mutex: &sync.RWMutex{}, + haikunator: h, + } + + }) + + return appRegistry +} diff --git a/internal/server/server.go b/internal/server/server.go index 8c49971..b42ef6f 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -3,6 +3,7 @@ package server import ( borepb "bore/borepb" "bore/internal/logger" + appregistry "bore/internal/server/app" "fmt" "io" "net" @@ -12,27 +13,20 @@ import ( "sync" "time" - haikunator "github.com/atrox/haikunatorgo/v2" "github.com/go-chi/chi/v5" "github.com/google/uuid" "github.com/gorilla/websocket" "go.uber.org/zap" - "google.golang.org/protobuf/proto" ) const maxRetries int = 10 -type App struct { - wsConn *websocket.Conn - wsMutex *sync.Mutex -} - type BoreServer struct { - logger *zap.Logger - reqIdChanMap map[string]chan *borepb.Response - apps map[string]App - haikunator *haikunator.Haikunator - port int + logger *zap.Logger + port int + mu sync.Mutex + messageSubscriptions map[string]chan *borepb.Message + appRegistry *appregistry.AppRegistry } type BoreServerCfg struct { @@ -41,76 +35,64 @@ type BoreServerCfg struct { Version string } -func (bs *BoreServer) generateAppId() string { - return bs.haikunator.Haikunate() -} - -func (bs *BoreServer) handleApp(appId string) { - defer func() { - delete(bs.apps, appId) - bs.logger.Info("cleaned up resources for app", zap.String("app_id", appId)) - }() - - app, ok := bs.apps[appId] - if !ok { - bs.logger.Error("No App found!") - return - } - - if app.wsConn == nil { - bs.logger.Info("no wsConn for app", zap.String("app_id", appId)) - return - } - - go bs.ping(&app) - - for { - response := &borepb.Response{} +func (bs *BoreServer) newSubCh(messageId string) chan *borepb.Message { + ch := make(chan *borepb.Message, 1) + bs.mu.Lock() + bs.messageSubscriptions[messageId] = ch + bs.mu.Unlock() - _, res, err := app.wsConn.ReadMessage() + return ch +} - if websocket.IsUnexpectedCloseError(err) { - bs.logger.Info("ws conn closed unexpectedly", zap.Error(err)) - return - } +func (bs *BoreServer) GetSubCh(messageId string) (chan *borepb.Message, bool) { + bs.mu.Lock() + ch, ok := bs.messageSubscriptions[messageId] + bs.mu.Unlock() - if err != nil { - bs.logger.Error("failed to read response from bore client", zap.Error(err)) - return - } + return ch, ok +} - err = proto.Unmarshal(res, response) - if err != nil { - bs.logger.Error("Failed to unmarshal response", zap.Error(err)) - return - } +func (bs *BoreServer) RegisterApp(appId string, conn *websocket.Conn) (*appregistry.App, error) { + logger := bs.logger.With(zap.String("app_id", appId)) - bs.reqIdChanMap[response.Id] <- response + appCfg := appregistry.AppConfig{ + AppId: appId, + Logger: logger, + DownstreamWSConn: conn, } -} -func (bs *BoreServer) ping(app *App) { - pingInterval := time.Duration(10 * time.Second) - ticker := time.NewTicker(pingInterval) + app, err := bs.appRegistry.NewApp(appCfg) + if err != nil { + return nil, err + } - defer ticker.Stop() + go func() { + for { + select { + case msg := <-app.ReadMessagesFromDownstream(): + msgId := msg.GetMessageId() + if subChan, ok := bs.GetSubCh(msgId); ok { + subChan <- msg + close(subChan) + } else { + logger.Warn("no subscription channel found for message", zap.String("message_id", msgId)) + } + case <-app.Done(): + logger.Info("app shutdown signal received, stopping ReadMessagesFromDownstream goroutine") + return + } + } + }() - for range ticker.C { - app.wsMutex.Lock() - err := app.wsConn.WriteControl(websocket.PingMessage, nil, time.Now().Add(5*time.Second)) - app.wsMutex.Unlock() + logger.Info("registered app!", zap.String("app_id", appId)) - if err != nil { - bs.logger.Error("failed to send ping!", zap.Error(err)) - return - } - } + return app, nil } func (bs *BoreServer) StartBoreServer() error { router := chi.NewRouter() - router.Get("/ws", func(w http.ResponseWriter, r *http.Request) { + router.Get("/register", func(w http.ResponseWriter, r *http.Request) { clientIP := r.Header.Get("X-Real-IP") bs.logger.Info("new bore client connection request", zap.String("client_ip", clientIP)) @@ -119,7 +101,7 @@ func (bs *BoreServer) StartBoreServer() error { WriteBufferSize: 1024, } - appId := bs.generateAppId() + appId := bs.appRegistry.NewAppId() conn, err := upgrader.Upgrade(w, r, http.Header{ "X-Bore-App-ID": {appId}, @@ -133,42 +115,37 @@ func (bs *BoreServer) StartBoreServer() error { bs.logger.Info("connection upgraded to WS", zap.String("client_ip", clientIP)) - bs.apps[appId] = App{ - wsConn: conn, - wsMutex: &sync.Mutex{}, + if _, err = bs.RegisterApp(appId, conn); err != nil { + bs.logger.Error("failed to register app", zap.Error(err), zap.String("app_id", appId)) + http.Error(w, "Failed to register app", http.StatusInternalServerError) + return } - bs.logger.Info("registered app!", zap.String("app_id", appId)) - - go bs.handleApp(appId) }) router.HandleFunc("/*", func(w http.ResponseWriter, r *http.Request) { - requestId := uuid.New().String() + messageId := uuid.New().String() appId := strings.Split(r.Host, ".")[0] clientIP := r.Header.Get("X-Real-IP") defer func() { - delete(bs.reqIdChanMap, requestId) - bs.logger.Info("cleaned up resources for request", zap.String("req_id", requestId)) + delete(bs.messageSubscriptions, messageId) }() reqLogger := bs.logger.With( - zap.String("req_id", requestId), + zap.String("req_id", messageId), zap.String("client_ip", clientIP), zap.String("app_id", appId), ) reqLogger.Info("new incoming request", zap.String("method", r.Method), zap.String("host", r.Host), zap.String("path", r.URL.Path)) - app, ok := bs.apps[appId] + app, ok := bs.appRegistry.GetApp(appId) if !ok { reqLogger.Error("No app found!") http.Error(w, "No app found!", http.StatusBadRequest) return } - bs.reqIdChanMap[requestId] = make(chan *borepb.Response) - hopByHopHeaders := []string{ "Connection", "Keep-Alive", @@ -204,8 +181,7 @@ func (bs *BoreServer) StartBoreServer() error { reqLogger.Debug("finished parsing headers", zap.Any("headers", headersParsed)) - req := &borepb.Request{ - Id: requestId, + request := borepb.Request{ Method: r.Method, Path: r.RequestURI, Body: bodyBytes, @@ -214,21 +190,17 @@ func (bs *BoreServer) StartBoreServer() error { Timestamp: time.Now().UnixMilli(), } - reqBytes, err := proto.Marshal(req) - if err != nil { - reqLogger.Error("failed to marshal request", zap.Error(err)) - return + message := borepb.Message{ + MessageId: messageId, + Payload: &borepb.Message_Request{Request: &request}, } - app.wsMutex.Lock() - err = app.wsConn.WriteMessage(websocket.BinaryMessage, reqBytes) - app.wsMutex.Unlock() - if err != nil { - reqLogger.Error("failed to write reqBytes to ws", zap.Error(err)) - return - } + subCh := bs.newSubCh(messageId) - response := <-bs.reqIdChanMap[requestId] + app.WriteMessageToDownStream(&message) + res := <-subCh + + response := res.GetResponse() reqLogger.Info("received response", zap.Int32("status_code", response.StatusCode), zap.Any("headers", response.Headers)) for headerName, headerValues := range response.Headers { @@ -272,15 +244,11 @@ func NewBoreServer(boreCfg *BoreServerCfg) *BoreServer { panic(err) } - h := haikunator.New() - h.TokenLength = 5 - h.TokenChars = "abcdefghijklmnopqrstuvwxyz0123456789" - return &BoreServer{ - reqIdChanMap: make(map[string]chan *borepb.Response), - apps: make(map[string]App), - haikunator: h, - logger: logger, - port: boreCfg.Port, + logger: logger, + port: boreCfg.Port, + mu: sync.Mutex{}, + messageSubscriptions: make(map[string]chan *borepb.Message), + appRegistry: appregistry.NewAppRegistry(), } } diff --git a/protos/message.proto b/protos/message.proto new file mode 100644 index 0000000..5f92764 --- /dev/null +++ b/protos/message.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +import "request.proto"; +import "response.proto"; +import "socket.proto"; + +package borepb; +option go_package = "."; + + +message Message { + string message_id = 1; + oneof payload { + Request request = 2; + Response response = 3; + Socket socket = 4; + } +} diff --git a/protos/request.proto b/protos/request.proto index e7e59a2..ea151c3 100644 --- a/protos/request.proto +++ b/protos/request.proto @@ -4,11 +4,10 @@ package borepb; option go_package = "."; message Request { - string id = 1; - string method = 2; - string path = 3; - map headers = 4; - bytes body = 5; - int64 timestamp = 6; - string cookies = 7; + string method = 1; + string path = 2; + map headers = 3; + bytes body = 4; + int64 timestamp = 5; + string cookies = 6; } diff --git a/protos/response.proto b/protos/response.proto index c8eb308..baa0779 100644 --- a/protos/response.proto +++ b/protos/response.proto @@ -4,10 +4,9 @@ package borepb; option go_package = "."; message Response { - string id = 1; - int32 status_code = 2; - map headers = 3; - bytes body = 4; - int64 timestamp = 5; - string cookies = 6; + int32 status_code = 1; + map headers = 2; + bytes body = 3; + int64 timestamp = 4; + string cookies = 5; } diff --git a/protos/socket.proto b/protos/socket.proto new file mode 100644 index 0000000..eefdb43 --- /dev/null +++ b/protos/socket.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package borepb; +option go_package = "."; + +message Socket { + bytes data = 1; +}