引子
如今很多云原生系统、分布式系统,例如 Kubernetes,都是用 Go 语言写的,这是因为 Go 语言天然支持异步编程,而且静态语言能保证应用系统的稳定性。笔者的开源项目 Crawlab 作为爬虫管理平台,也应用到了分布式系统。本篇文章将介绍如何用 Go 语言编写一个简单的分布式系统。
思路
在开始写代码之前,我们先思考一下需要实现些什么。
- 主节点(Master Node):中控系统,相当于军队中的指挥官,派发任务命令
- 工作节点(Worker Node):执行者,相当于军队中的士兵,执行任务
除了上面的概念以外,我们需要实现一些简单功能。
- 上报运行状态(Report Status):工作节点向主节点上报当前状态
- 分派任务(Assign Task):通过 API 向主节点发起请求,主节点再向工作节点分派任务
- 运行脚本(Execute Script):工作节点执行任务中的脚本
整个流程示意图如下。
实战
节点通信
节点之间的通信在分布式系统中非常重要,毕竟每个节点或机器如果孤立运行,就失去了分布式系统的意义。因此,节点通信在分布式系统中是核心模块。
gRPC 协议
首先,我们来想一下,如何让节点之间进行相互通信。最常用的通信方式就是 API,不过这个通信方式有个缺点,就是需要将各个节点的 IP 地址及端口显示暴露给其他节点,这在公网中是不太安全的。因此,我们选择了 gRPC,一种流行的远程过程调用(Remote Procedure Call,RPC)框架。这里我们不过多的解释 RPC 或 gRPC 的原理,简而言之,就是能让调用者在远程机器上执行命令的协议方式。
为了使用 gRPC 框架,我们先创建 go.mod
并输入以下内容,并执行 go mod download
。注意:对于国内的朋友,或许需要添加代理才能正常下载,可以先执行 export GOPROXY=goproxy.cn,direct
后再执行下载命令。
module go-distributed-system go 1.17 require ( github.com/golang/protobuf v1.5.0 google.golang.org/grpc v1.27.0 google.golang.org/protobuf v1.27.1 )
然后,我们创建 Protocol Buffers 文件 node.proto
(表示节点对应的 gRPC 协议文件),并输入以下内容。
syntax = "proto3"; package core; option go_package = ".;core"; message Request { string action = 1; } message Response { string data = 1; } service NodeService { rpc ReportStatus(Request) returns (Response){}; // Simple RPC rpc AssignTask(Request) returns (stream Response){}; // Server-Side RPC }
在这里我们创建了两个 RPC 服务,分别是负责上报状态的 Simple RPC ReportStatus
以及 Server-Side RPC AssignTask
。Simple RPC 和 Server-Side RPC 的区别如下图所示,主要区别在于 Server-Side RPC 可以从通过流(Stream)向客户端(Client)主动发送数据,而 Simple RPC 只能从客户端向服务端(Server)发请求。
创建好 .proto
文件后,我们需要将这个 gRPC 协议文件转化为 .go
代码文件,从而能被 Go 程序引用。在命令行窗口中执行如下命令。注意:编译工具 protoc
不是自带的,需要单独下载,具体可以参考文档 https://grpc.io/docs/protoc-installation/。
mkdir core protoc --go_out=./core \ --go-grpc_out=./core \ node.proto
执行完后,可以在 core
目录下看到两个 Go 代码文件, node.pb.go
和 node_grpc.pb.go
,这相当于 Go 程序中对应的 gRPC 库。
gRPC 服务端
现在开始编写服务端逻辑。
咱们先创建一个新文件 core/node_service_server.go
,输入以下内容。主要逻辑就是实现了之前创建好的 gRPC 协议中的两个调用方法。其中,暴露了 CmdChannel
这个通道(Channel)来获取需要发送到工作节点的命令。
package core import ( "context" ) type NodeServiceGrpcServer struct { UnimplementedNodeServiceServer // channel to receive command CmdChannel chan string } func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) { return &Response{Data: "ok"}, nil } func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error { for { select { case cmd := <-n.CmdChannel: // receive command and send to worker node (client) if err := server.Send(&Response{Data: cmd}); err != nil { return err } } } } var server *NodeServiceGrpcServer // GetNodeServiceGrpcServer singleton service func GetNodeServiceGrpcServer() *NodeServiceGrpcServer { if server == nil { server = &NodeServiceGrpcServer{ CmdChannel: make(chan string), } } return server }
gRPC 客户端
gRPC 客户端不需要具体实现,我们通常只需要调用 gRPC 客户端的方法,程序会自动发起向服务端的请求以及获取后续的响应。
主节点
编写好了节点通信的基础部分,现在我们需要实现主节点了,这是整个中心化分布式系统的核心。
咱们创建一个新的文件 node.go
,输入以下内容。
package core import ( "github.com/gin-gonic/gin" "google.golang.org/grpc" "net" "net/http" ) // MasterNode is the node instance type MasterNode struct { api *gin.Engine // api server ln net.Listener // listener svr *grpc.Server // grpc server nodeSvr *NodeServiceGrpcServer // node service } func (n *MasterNode) Init() (err error) { // TODO: implement me panic("implement me") } func (n *MasterNode) Start() { // TODO: implement me panic("implement me") } var node *MasterNode // GetMasterNode returns the node instance func GetMasterNode() *MasterNode { if node == nil { // node node = &MasterNode{} // initialize node if err := node.Init(); err != nil { panic(err) } } return node }
其中,我们创建了两个占位方法 Init
和 Start
,我们分别实现。
在初始化方法 Init
中,我们需要做几件事情:
- 注册 gRPC 服务
- 注册 API 服务
现在,在 Init
方法中加入如下代码。
func (n *MasterNode) Init() (err error) { // grpc server listener with port as 50051 n.ln, err = net.Listen("tcp", ":50051") if err != nil { return err } // grpc server n.svr = grpc.NewServer() // node service n.nodeSvr = GetNodeServiceGrpcServer() // register node service to grpc server RegisterNodeServiceServer(node.svr, n.nodeSvr) // api n.api = gin.Default() n.api.POST("/tasks", func(c *gin.Context) { // parse payload var payload struct { Cmd string `json:"cmd"` } if err := c.ShouldBindJSON(&payload); err != nil { c.AbortWithStatus(http.StatusBadRequest) return } // send command to node service n.nodeSvr.CmdChannel <- payload.Cmd c.AbortWithStatus(http.StatusOK) }) return nil }
可以看到,我们新建了一个 gRPC Server,并将之前的 NodeServiceGrpcServer
注册了进去。另外,我们还用 gin
框架创建了一个简单的 API 服务,可以 POST 请求到 /tasks
向 NodeServiceGrpcServer
中的命令通道 CmdChannel
传送命令。这样就将各个部件串接起来了!
启动方法 Start
很简单,就是启动 gRPC Server 以及 API Server。
func (n *MasterNode) Start() { // start grpc server go n.svr.Serve(n.ln) // start api server _ = n.api.Run(":9092") // wait for exit n.svr.Stop() }
下一步,我们就要实现实际做任务的工作节点了。
工作节点
现在,我们创建一个新文件 core/worker_node.go
,输入以下内容。
package core import ( "context" "google.golang.org/grpc" "os/exec" ) type WorkerNode struct { conn *grpc.ClientConn // grpc client connection c NodeServiceClient // grpc client } func (n *WorkerNode) Init() (err error) { // connect to master node n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure()) if err != nil { return err } // grpc client n.c = NewNodeServiceClient(n.conn) return nil } func (n *WorkerNode) Start() { // log fmt.Println("worker node started") // report status _, _ = n.c.ReportStatus(context.Background(), &Request{}) // assign task stream, _ := n.c.AssignTask(context.Background(), &Request{}) for { // receive command from master node res, err := stream.Recv() if err != nil { return } // log command fmt.Println("received command: ", res.Data) // execute command parts := strings.Split(res.Data, " ") if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil { fmt.Println(err) } } } var workerNode *WorkerNode func GetWorkerNode() *WorkerNode { if workerNode == nil { // node workerNode = &WorkerNode{} // initialize node if err := workerNode.Init(); err != nil { panic(err) } } return workerNode }
其中,我们在初始化方法 Init
中创建了gRPC 客户端,并连接了主节点的 gRPC 服务端。
在启动方法 Start
中做了几件事情:
- 调用上报状态(Report Status)的 Simple RPC 方法
- 调用分配任务(Assign Task)的 Server-Side RPC 方法,获取到了流(Stream)
- 通过循环不断接受流传输过来的来自服务端(也就是主节点)的信息,并执行命令
这样,整个包含主节点、工作节点的分布式系统核心逻辑就写好了!
将它们放在一起
最后,我们需要将这些核心逻辑用命令行工具封装一下,以便启用。
创建主程序文件 main.go
,并输入以下内容。
package main import ( "go-distributed-system/core" "os" ) func main() { nodeType := os.Args[0] switch nodeType { case "master": core.GetMasterNode().Start() case "worker": core.GetWorkerNode().Start() default: panic("invalid node type") } }
这样,整个简单的分布式系统就创建好了!
代码效果
下面我们来运行一下代码。
打开两个命令行窗口,其中一个输入 go run main.go master
启动主节点,另一个输入 go run main.go worker
启动工作节点。
如果主节点启动成功,将会看到如下日志信息。
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached. [GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production. - using env: export GIN_MODE=release - using code: gin.SetMode(gin.ReleaseMode) [GIN-debug] POST /tasks --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers) [GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value. Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details. [GIN-debug] Listening and serving HTTP on :9092
如果工作节点启动成功,将会看到如下日志信息。
worker node started
主节点、工作节点都启动成功后,我们在另外一个命令行窗口中输入如下命令来发起 API 请求。
curl -X POST \
-H "Content-Type: application/json" \
-d '{"cmd": "touch /tmp/hello-distributed-system"}' \
http://localhost:9092/tasks
在工作节点窗口应该可以看到日志 received command: touch /tmp/hello-distributed-system
。
然后查看文件是否顺利生成,执行 ls -l /tmp/hello-distributed-system
。
-rw-r--r-- 1 marvzhang wheel 0B Oct 26 12:22 /tmp/hello-distributed-system
文件成功生成,表示已经通过工作节点执行成功了!大功告成!
总结
本篇文章通过 RPC 框架 gRPC 以及 Go 语言自带的 Channel,将节点串接起来,开发出了一个简单的分布式系统。所用到的核心库和技术:
整个代码示例仓库在 GitHub 上: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system
到此这篇关于Go语言实战之实现一个简单分布式系统的文章就介绍到这了,更多相关Go语言分布式系统内容请搜索好代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持好代码网!