当前位置:网站首页>Go语言实现Etcd服务发现(Etcd & Service Discovery & Go)

Go语言实现Etcd服务发现(Etcd & Service Discovery & Go)

2022-08-11 05:57:00 lizongti

package etcd

client.go

package etcd

import (
	"context"
	"time"

	"go.etcd.io/etcd/api/v3/mvccpb"
	clientv3 "go.etcd.io/etcd/client/v3"
)

type Client struct {
    
	Servers []string
	Timeout int64
	close   func() error
}

func (client *Client) Register(path string, data []byte) error {
    
	conn, err := clientv3.New(clientv3.Config{
    
		Endpoints:   client.Servers,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
    
		return err
	}

	ctx := context.Background()

	lease, err := conn.Grant(ctx, client.Timeout)
	if err != nil {
    
		return err
	}

	_, err = conn.Put(ctx, path, string(data), clientv3.WithLease(lease.ID))
	if err != nil {
    
		return err
	}

	keepAliveCh, err := conn.KeepAlive(context.Background(), lease.ID)
	if err != nil {
    
		return err
	}

	go func() {
    
		for {
    
			<-keepAliveCh
		}
	}()

	client.close = func() error {
    
		if _, err = conn.Revoke(ctx, lease.ID); err != nil {
    
			return err
		}
		return conn.Close()
	}

	return nil
}

func (client *Client) Deregister() error {
    
	return client.close()
}

func (client *Client) WatchNode(path string,
	putHandler func(string, []byte) error,
	deleteHandler func(string) error) error {
    
	conn, err := clientv3.New(clientv3.Config{
    
		Endpoints:   client.Servers,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
    
		return err
	}

	ctx := context.Background()
	resp, err := conn.Get(ctx, path, clientv3.WithPrefix())
	if err != nil {
    
		return err
	}

	for _, kv := range resp.Kvs {
    
		if err := putHandler(string(kv.Key), kv.Value); err != nil {
    
			return err
		}
	}

	watchChan := conn.Watch(ctx, path, clientv3.WithPrefix())
	for watchResp := range watchChan {
    
		for _, ev := range watchResp.Events {
    
			switch ev.Type {
    
			case mvccpb.PUT:
				if err := putHandler(string(ev.Kv.Key), ev.Kv.Value); err != nil {
    
					return err
				}
			case mvccpb.DELETE:
				if err := deleteHandler(string(ev.Kv.Key)); err != nil {
    
					return err
				}
			}
		}
	}

	return nil
}


node.go

package etcd

import "log"

type Node struct {
    
	Path string
	Host string
	Port int
}

func (node *Node) putCallback(path string, data []byte) error {
    
	log.Printf("[Callback] Node %s is updated", path)
	// To do
	return nil
}

func (node *Node) deleteCallback(path string) error {
    
	log.Printf("[Callback] Node %s is deleted", path)
	// To do
	return nil
}

func (node *Node) getWatchNodes() []string {
    
	nodes := []string{
    }
	// Test code begins
	nodes = append(nodes, "/services")
	// Test code ends
	return nodes
}

etcd.go

package etcd

import (
	"fmt"
	"log"
	"time"
)

func watchNode(client *Client, node *Node, path string) {
    
	go func() {
    
		for {
    
			err := client.WatchNode(path, node.putCallback, node.deleteCallback)
			if err != nil {
    
				log.Println(err)
			}
			time.Sleep(time.Second)
		}
	}()
}

func registerNode(client *Client, node *Node) {
    
	err := client.Register(node.Path, []byte(fmt.Sprintf("%s:%d", node.Host, node.Port)))
	if err != nil {
    
		log.Println(err)
	}
}

func Init(client *Client, node *Node) {
    
	registerNode(client, node)
	for _, path := range node.getWatchNodes() {
    
		watchNode(client, node, path)
	}
}


package main

main.go

package main

import (
	"log"
	"os"
	"strconv"
	"strings"
	"time"

	"./etcd"
)

func main() {
    
	if len(os.Args) < 4 {
    
		return
	}

	log.Println(os.Args)

	path := "/services/" + os.Args[1]
	host := os.Args[2]
	port, err := strconv.Atoi(os.Args[3])
	if err != nil {
    
		log.Fatalln(err)
	}

	etcdHosts := "0.0.0.0:2379"

	etcd.Init(&etcd.Client{
    Servers: strings.Split(etcdHosts, ","), Timeout: 5}, &etcd.Node{
    Path: path, Host: host, Port: port})
	time.Sleep(time.Second * 100)
}
原网站

版权声明
本文为[lizongti]所创,转载请带上原文链接,感谢
https://blog.csdn.net/lizongti/article/details/126252843