当前位置 : 主页 > 网络编程 > 其它编程 >

go微服务框架gomicro架构学习(源码分析)

来源:互联网 收集:自由互联 发布时间:2023-07-02
go微服务框架go-micro架构学习(源码分析),Go语言社区,Golang程序员人脉社 产品嘴里的一个小项目,从立项到开发上线,随着时间和需求的不断激增,会越来越复杂,变成一个大项目,如果前
go微服务框架go-micro架构学习(源码分析),Go语言社区,Golang程序员人脉社

产品嘴里的一个小项目,从立项到开发上线,随着时间和需求的不断激增,会越来越复杂,变成一个大项目,如果前期项目架构没设计的不好,代码会越来越臃肿,难以维护,后期的每次产品迭代上线都会牵一发而动全身。项目微服务化,松耦合模块间的关系,是一个很好的选择,随然增加了维护成本,但是还是很值得的。 

微服务化项目除了稳定性我个人还比较关心的几个问题:

     一: 服务间数据传输的效率和安全性。

     二: 服务的动态扩充,也就是服务的注册和发现,服务集群化。

     三: 微服务功能的可订制化,因为并不是所有的功能都会很符合你的需求,难免需要根据自己的需要二次开发一些功能。

go-micro是go语言下的一个很好的rpc微服务框架,功能很完善,而且我关心的几个问题也解决的很好:

     一:服务间传输格式为protobuf,效率上没的说,非常的快,也很安全。

     二:go-micro的服务注册和发现是多种多样的。我个人比较喜欢etcdv3的服务服务发现和注册。

     三:主要的功能都有相应的接口,只要实现相应的接口,就可以根据自己的需要订制插件。

     业余时间把go-micro的源码系统地读了一遍,越读越感觉这个框架写的好,从中也学到了很多东西。就想整理一系列的帖子,把学习go-micro的心得和大家分享。

通信流程

go-micro的通信流程大至如下:

  Server监听客户端的调用,和Brocker推送过来的信息进行处理。并且Server端需要向Register注册自己的存在或消亡,这样Client才能知道自己的状态。

    Register服务的注册的发现。

    Client端从Register中得到Server的信息,然后每次调用都根据算法选择一个的Server进行通信,当然通信是要经过编码/解码,选择传输协议等一系列过程的。

    如果有需要通知所有的Server端可以使用Brocker进行信息的推送。

    Brocker 信息队列进行信息的接收和发布。

     go-micro之所以可以高度订制和他的框架结构是分不开的,go-micro由8个关键的interface组成,每一个interface都可以根据自己的需求重新实现,这8个主要的inteface也构成了go-micro的框架结构。

这些接口go-micir都有他自己默认的实现方式,还有一个go-plugins是对这些接口实现的可替换项。你也可以根据需求实现自己的插件。

    

通过 go-plugins 可以设置其他服务发现,如mdns, etcd,etcdv3,zookeeper,kubernetes.等等。

#部分代码import "github.com/micro/go-plugins/registry/etcdv3"// 我这里用的etcd 做为服务发现,如果使用consul可以去掉 etcdv3.NewRegistry()//etcd.NewRegistry()//mdns.NewMDNSService()//zookeeper.NewRegistry()//kubernetes.NewRegistry()service := micro.NewService(micro.Name("greeter"),micro.Version("latest"),micro.Metadata(map[string]string{"type": "hello world"}))service.Init()

这篇帖子主要是给大家介绍go-micro的主体结构和这些接口的功能,具体细节以后的文章我们再慢慢说。

Transort

服务之间通信的接口。也就是服务发送和接收的最终实现方式,是由这些接口定制的。

源码:

// Package transport is an interface for synchronous communicationpackage transportimport ("time")type Message struct {Header map[string]stringBody []byte}type Socket interface {Recv(*Message) errorSend(*Message) errorClose() errorLocal() stringRemote() string}type Client interface {Socket}type Listener interface {Addr() stringClose() errorAccept(func(Socket)) error}// Transport is an interface which is used for communication between// services. It uses socket send/recv semantics and had various// implementations {HTTP, RabbitMQ, NATS, ...}type Transport interface {Init(...Option) errorOptions() OptionsDial(addr string, opts ...DialOption) (Client, error)Listen(addr string, opts ...ListenOption) (Listener, error)String() string}type Option func(*Options)type DialOption func(*DialOptions)type ListenOption func(*ListenOptions)var (DefaultTransport Transport = newHTTPTransport()DefaultDialTimeout = time.Second * 5)func NewTransport(opts ...Option) Transport {return newHTTPTransport(opts...)}

 Transport 的Listen方法是一般是Server端进行调用的,他监听一个端口,等待客户端调用。

    Transport 的Dial就是客户端进行连接服务的方法。他返回一个Client接口,这个接口返回一个Client接口,这个Client嵌入了Socket接口,这个接口的方法就是具体发送和接收通信的信息。

    http传输是go-micro默认的同步通信机制。当然还有很多其他的插件:grpc,nats,tcp,udp,rabbitmq,nats,都是目前已经实现了的方式。在go-plugins里你都可以找到。

Codec

 有了传输方式,下面要解决的就是传输编码和解码问题,go-micro有很多种编码解码方式,默认的实现方式是protobuf,当然也有其他的实现方式,json、protobuf、jsonrpc、mercury等等。

// Package codec is an interface for encoding messagespackage codecimport ("io")const (Error MessageType = iotaRequestResponsePublication)type MessageType int// Takes in a connection/buffer and returns a new Codectype NewCodec func(io.ReadWriteCloser) Codec// Codec encodes/decodes various types of messages used within go-micro.// ReadHeader and ReadBody are called in pairs to read requests/responses// from the connection. Close is called when finished with the// connection. ReadBody may be called with a nil argument to force the// body to be read and discarded.type Codec interface {ReaderWriterClose() errorString() string}type Reader interface {ReadHeader(*Message, MessageType) errorReadBody(interface{}) error}type Writer interface {Write(*Message, interface{}) error}// Marshaler is a simple encoding interface used for the broker/transport// where headers are not supported by the underlying implementation.type Marshaler interface {Marshal(interface{}) ([]byte, error)Unmarshal([]byte, interface{}) errorString() string}// Message represents detailed information about// the communication, likely followed by the body.// In the case of an error, body may be nil.type Message struct {Id stringType MessageTypeTarget stringMethod stringEndpoint stringError string// The values read from the socketHeader map[string]stringBody []byte}

 Codec接口的Write方法就是编码过程,两个Read是解码过程。

Registry

 服务的注册和发现,目前实现的consul,mdns, etcd,etcdv3,zookeeper,kubernetes.等等,

// Package registry is an interface for service discoverypackage registryimport ("errors")// The registry provides an interface for service discovery// and an abstraction over varying implementations// {consul, etcd, zookeeper, ...}type Registry interface {Init(...Option) errorOptions() OptionsRegister(*Service, ...RegisterOption) errorDeregister(*Service) errorGetService(string) ([]*Service, error)ListServices() ([]*Service, error)Watch(...WatchOption) (Watcher, error)String() string}type Option func(*Options)type RegisterOption func(*RegisterOptions)type WatchOption func(*WatchOptions)var (DefaultRegistry = NewRegistry()// Not found error when GetService is calledErrNotFound = errors.New("not found")// Watcher stopped error when watcher is stoppedErrWatcherStopped = errors.New("watcher stopped"))// Register a service node. Additionally supply options such as TTL.func Register(s *Service, opts ...RegisterOption) error {return DefaultRegistry.Register(s, opts...)}// Deregister a service nodefunc Deregister(s *Service) error {return DefaultRegistry.Deregister(s)}// Retrieve a service. A slice is returned since we separate Name/Version.func GetService(name string) ([]*Service, error) {return DefaultRegistry.GetService(name)}// List the services. Only returns service namesfunc ListServices() ([]*Service, error) {return DefaultRegistry.ListServices()}// Watch returns a watcher which allows you to track updates to the registry.func Watch(opts ...WatchOption) (Watcher, error) {return DefaultRegistry.Watch(opts...)}func String() string {return DefaultRegistry.String()}

 简单来说,就是Service 进行Register,来进行注册,Client 使用watch方法进行监控,当有服务加入或者删除时这个方法会被触发,以提醒客户端更新Service信息。

Selector

  以Registry为基础,Selector 是客户端级别的负载均衡,当有客户端向服务发送请求时, selector根据不同的算法从Registery中的主机列表,得到可用的Service节点,进行通信。目前实现的有循环算法和随机算法,默认的是随机算法。

// Package selector is a way to pick a list of service nodespackage selectorimport ("errors""github.com/micro/go-micro/registry")// Selector builds on the registry as a mechanism to pick nodes// and mark their status. This allows host pools and other things// to be built using various algorithms.type Selector interface {Init(opts ...Option) errorOptions() Options// Select returns a function which should return the next nodeSelect(service string, opts ...SelectOption) (Next, error)// Mark sets the success/error against a nodeMark(service string, node *registry.Node, err error)// Reset returns state back to zero for a serviceReset(service string)// Close renders the selector unusableClose() error// Name of the selectorString() string}// Next is a function that returns the next node// based on the selector's strategytype Next func() (*registry.Node, error)// Filter is used to filter a service during the selection processtype Filter func([]*registry.Service) []*registry.Service// Strategy is a selection strategy e.g random, round robintype Strategy func([]*registry.Service) Nextvar (DefaultSelector = NewSelector()ErrNotFound = errors.New("not found")ErrNOneAvailable= errors.New("none available"))

 默认的是实现是本地缓存,当前实现的有blacklist,label,named等方式。

 Broker

Broker是消息发布和订阅的接口。很简单的一个例子,因为服务的节点是不固定的,如果有需要修改所有服务行为的需求,可以使服务订阅某个主题,当有信息发布时,所有的监听服务都会收到信息,根据你的需要做相应的行为。

// Package broker is an interface used for asynchronous messagingpackage broker// Broker is an interface used for asynchronous messaging.type Broker interface {Options() OptionsAddress() stringConnect() errorDisconnect() errorInit(...Option) errorPublish(string, *Message, ...PublishOption) errorSubscribe(string, Handler, ...SubscribeOption) (Subscriber, error)String() string}// Handler is used to process messages via a subscription of a topic.// The handler is passed a publication interface which contains the// message and optional Ack method to acknowledge receipt of the message.type Handler func(Publication) errortype Message struct {Header map[string]stringBody []byte}// Publication is given to a subscription handler for processingtype Publication interface {Topic() stringMessage() *MessageAck() error}// Subscriber is a convenience return type for the Subscribe methodtype Subscriber interface {Options() SubscribeOptionsTopic() stringUnsubscribe() error}var (DefaultBroker Broker = newHttpBroker())func NewBroker(opts ...Option) Broker {return newHttpBroker(opts...)}func Init(opts ...Option) error {return DefaultBroker.Init(opts...)}func Connect() error {return DefaultBroker.Connect()}func Disconnect() error {return DefaultBroker.Disconnect()}func Publish(topic string, msg *Message, opts ...PublishOption) error {return DefaultBroker.Publish(topic, msg, opts...)}func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {return DefaultBroker.Subscribe(topic, handler, opts...)}func String() string {return DefaultBroker.String()}

 Broker默认的实现方式是http方式,但是这种方式不要在生产环境用。go-plugins里有很多成熟的消息队列实现方式,有kafka、nsq、rabbitmq、redis,等等。

 Client

Client是请求服务的接口,他封装Transport和Codec进行rpc调用,也封装了Brocker进行信息的发布。

// Package client is an interface for an RPC clientpackage clientimport ("context""time""github.com/micro/go-micro/codec")// Client is the interface used to make requests to services.// It supports Request/Response via Transport and Publishing via the Broker.// It also supports bidiectional streaming of requests.type Client interface {Init(...Option) errorOptions() OptionsNewMessage(topic string, msg interface{}, opts ...MessageOption) MessageNewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) RequestCall(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) errorStream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)Publish(ctx context.Context, msg Message, opts ...PublishOption) errorString() string}// Router manages request routingtype Router interface {SendRequest(context.Context, Request) (Response, error)}// Message is the interface for publishing asynchronouslytype Message interface {Topic() stringPayload() interface{}ContentType() string}// Request is the interface for a synchronous request used by Call or Streamtype Request interface {// The service to callService() string// The action to takeMethod() string// The endpoint to invokeEndpoint() string// The content typeContentType() string// The unencoded request bodyBody() interface{}// Write to the encoded request writer. This is nil before a call is madeCodec() codec.Writer// indicates whether the request will be a streaming one rather than unaryStream() bool}// Response is the response received from a servicetype Response interface {// Read the responseCodec() codec.Reader// read the headerHeader() map[string]string// Read the undecoded responseRead() ([]byte, error)}// Stream is the inteface for a bidirectional synchronous streamtype Stream interface {// Context for the streamContext() context.Context// The request madeRequest() Request// The response readResponse() Response// Send will encode and send a requestSend(interface{}) error// Recv will decode and read a responseRecv(interface{}) error// Error returns the stream errorError() error// Close closes the streamClose() error}// Option used by the Clienttype Option func(*Options)// CallOption used by Call or Streamtype CallOption func(*CallOptions)// PublishOption used by Publishtype PublishOption func(*PublishOptions)// MessageOption used by NewMessagetype MessageOption func(*MessageOptions)// RequestOption used by NewRequesttype RequestOption func(*RequestOptions)var (// DefaultClient is a default client to use out of the boxDefaultClient Client = newRpcClient()// DefaultBackoff is the default backoff function for retriesDefaultBackoff = exponentialBackoff// DefaultRetry is the default check-for-retry function for retriesDefaultRetry = RetryOnError// DefaultRetries is the default number of times a request is triedDefaultRetries = 1// DefaultRequestTimeout is the default request timeoutDefaultRequestTimeout = time.Second * 5// DefaultPoolSize sets the connection pool sizeDefaultPoolSize = 100// DefaultPoolTTL sets the connection pool ttlDefaultPoolTTL = time.Minute)// Makes a synchronous call to a service using the default clientfunc Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {return DefaultClient.Call(ctx, request, response, opts...)}// Publishes a publication using the default client. Using the underlying broker// set within the options.func Publish(ctx context.Context, msg Message, opts ...PublishOption) error {return DefaultClient.Publish(ctx, msg, opts...)}// Creates a new message using the default clientfunc NewMessage(topic string, payload interface{}, opts ...MessageOption) Message {return DefaultClient.NewMessage(topic, payload, opts...)}// Creates a new client with the options passed infunc NewClient(opt ...Option) Client {return newRpcClient(opt...)}// Creates a new request using the default client. Content Type will// be set to the default within options and use the appropriate codecfunc NewRequest(service, endpoint string, request interface{}, reqOpts ...RequestOption) Request {return DefaultClient.NewRequest(service, endpoint, request, reqOpts...)}// Creates a streaming connection with a service and returns responses on the// channel passed in. It's up to the user to close the streamer.func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {return DefaultClient.Stream(ctx, request, opts...)}func String() string {return DefaultClient.String()}

 当然他也支持双工通信 Stream 这些具体的实现方式和使用方式,以后会详细解说。

     默认的是rpc实现方式,他还有grpc和http方式,在go-plugins里可以找到

Server

Server看名字大家也知道是做什么的了。监听等待rpc请求。监听broker的订阅信息,等待信息队列的推送等。

// Package server is an interface for a micro serverpackage serverimport ("context""os""os/signal""syscall""github.com/google/uuid""github.com/micro/go-log""github.com/micro/go-micro/codec""github.com/micro/go-micro/registry")// Server is a simple micro server abstractiontype Server interface {Options() OptionsInit(...Option) errorHandle(Handler) errorNewHandler(interface{}, ...HandlerOption) HandlerNewSubscriber(string, interface{}, ...SubscriberOption) SubscriberSubscribe(Subscriber) errorStart() errorStop() errorString() string}// Router handle serving messagestype Router interface {// ServeRequest processes a request to completionServeRequest(context.Context, Request, Response) error}// Message is an async message interfacetype Message interface {Topic() stringPayload() interface{}ContentType() string}// Request is a synchronous request interfacetype Request interface {// Service name requestedService() string// The action requestedMethod() string// Endpoint name requestedEndpoint() string// Content type providedContentType() string// Header of the requestHeader() map[string]string// Body is the initial decoded valueBody() interface{}// Read the undecoded request bodyRead() ([]byte, error)// The encoded message streamCodec() codec.Reader// Indicates whether its a streamStream() bool}// Response is the response writer for unencoded messagestype Response interface {// Encoded writerCodec() codec.Writer// Write the headerWriteHeader(map[string]string)// write a response directly to the clientWrite([]byte) error}// Stream represents a stream established with a client.// A stream can be bidirectional which is indicated by the request.// The last error will be left in Error().// EOF indicates end of the stream.type Stream interface {Context() context.ContextRequest() RequestSend(interface{}) errorRecv(interface{}) errorError() errorClose() error}// Handler interface represents a request handler. It's generated// by passing any type of public concrete object with endpoints into server.NewHandler.// Most will pass in a struct.//// Example://// type Greeter struct {}//// func (g *Greeter) Hello(context, request, response) error {// return nil// }//type Handler interface {Name() stringHandler() interface{}Endpoints() []*registry.EndpointOptions() HandlerOptions}// Subscriber interface represents a subscription to a given topic using// a specific subscriber function or object with endpoints.type Subscriber interface {Topic() stringSubscriber() interface{}Endpoints() []*registry.EndpointOptions() SubscriberOptions}type Option func(*Options)var (DefaultAddress = ":0"DefaultName = "server"DefaultVersion = "latest"DefaultId = uuid.New().String()DefaultServer Server = newRpcServer()DefaultRouter = newRpcRouter())// DefaultOptions returns config options for the default servicefunc DefaultOptions() Options {return DefaultServer.Options()}// Init initialises the default server with options passed infunc Init(opt ...Option) {if DefaultServer == nil {DefaultServer = newRpcServer(opt...)}DefaultServer.Init(opt...)}// NewServer returns a new server with options passed infunc NewServer(opt ...Option) Server {return newRpcServer(opt...)}// NewSubscriber creates a new subscriber interface with the given topic// and handler using the default serverfunc NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber {return DefaultServer.NewSubscriber(topic, h, opts...)}// NewHandler creates a new handler interface using the default server// Handlers are required to be a public object with public// endpoints. Call to a service endpoint such as Foo.Bar expects// the type:////type Foo struct {}//func (f *Foo) Bar(ctx, req, rsp) error {//return nil//}//func NewHandler(h interface{}, opts ...HandlerOption) Handler {return DefaultServer.NewHandler(h, opts...)}// Handle registers a handler interface with the default server to// handle inbound requestsfunc Handle(h Handler) error {return DefaultServer.Handle(h)}// Subscribe registers a subscriber interface with the default server// which subscribes to specified topic with the brokerfunc Subscribe(s Subscriber) error {return DefaultServer.Subscribe(s)}// Run starts the default server and waits for a kill// signal before exiting. Also registers/deregisters the serverfunc Run() error {if err := Start(); err != nil {return err}ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)log.Logf("Received signal %s", <-ch)return Stop()}// Start starts the default serverfunc Start() error {config := DefaultServer.Options()log.Logf("Starting server %s id %s", config.Name, config.Id)return DefaultServer.Start()}// Stop stops the default serverfunc Stop() error {log.Logf("Stopping server")return DefaultServer.Stop()}// String returns name of Server implementationfunc String() string {return DefaultServer.String()}

参考链接:https://yq.aliyun.com/articles/633797

➢了解更多Go语言知识:https://study.163.com/course/introduction/1210620804.htm

上一篇:杭电acm2037(今年暑假不ac)
下一篇:没有了
网友评论