当前位置:网站首页>Go语言实现Etcd服务发现(Etcd & Service Discovery & Go)
Go语言实现Etcd服务发现(Etcd & Service Discovery & Go)
2022-08-11 05:57:00 【lizongti】
Go语言实现Etcd服务发现
package etcd
client.go
package etcd
import (
"context"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
type Client struct {
Servers []string
Timeout int64
close func() error
}
func (client *Client) Register(path string, data []byte) error {
conn, err := clientv3.New(clientv3.Config{
Endpoints: client.Servers,
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
}
ctx := context.Background()
lease, err := conn.Grant(ctx, client.Timeout)
if err != nil {
return err
}
_, err = conn.Put(ctx, path, string(data), clientv3.WithLease(lease.ID))
if err != nil {
return err
}
keepAliveCh, err := conn.KeepAlive(context.Background(), lease.ID)
if err != nil {
return err
}
go func() {
for {
<-keepAliveCh
}
}()
client.close = func() error {
if _, err = conn.Revoke(ctx, lease.ID); err != nil {
return err
}
return conn.Close()
}
return nil
}
func (client *Client) Deregister() error {
return client.close()
}
func (client *Client) WatchNode(path string,
putHandler func(string, []byte) error,
deleteHandler func(string) error) error {
conn, err := clientv3.New(clientv3.Config{
Endpoints: client.Servers,
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
}
ctx := context.Background()
resp, err := conn.Get(ctx, path, clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range resp.Kvs {
if err := putHandler(string(kv.Key), kv.Value); err != nil {
return err
}
}
watchChan := conn.Watch(ctx, path, clientv3.WithPrefix())
for watchResp := range watchChan {
for _, ev := range watchResp.Events {
switch ev.Type {
case mvccpb.PUT:
if err := putHandler(string(ev.Kv.Key), ev.Kv.Value); err != nil {
return err
}
case mvccpb.DELETE:
if err := deleteHandler(string(ev.Kv.Key)); err != nil {
return err
}
}
}
}
return nil
}
node.go
package etcd
import "log"
type Node struct {
Path string
Host string
Port int
}
func (node *Node) putCallback(path string, data []byte) error {
log.Printf("[Callback] Node %s is updated", path)
// To do
return nil
}
func (node *Node) deleteCallback(path string) error {
log.Printf("[Callback] Node %s is deleted", path)
// To do
return nil
}
func (node *Node) getWatchNodes() []string {
nodes := []string{
}
// Test code begins
nodes = append(nodes, "/services")
// Test code ends
return nodes
}
etcd.go
package etcd
import (
"fmt"
"log"
"time"
)
func watchNode(client *Client, node *Node, path string) {
go func() {
for {
err := client.WatchNode(path, node.putCallback, node.deleteCallback)
if err != nil {
log.Println(err)
}
time.Sleep(time.Second)
}
}()
}
func registerNode(client *Client, node *Node) {
err := client.Register(node.Path, []byte(fmt.Sprintf("%s:%d", node.Host, node.Port)))
if err != nil {
log.Println(err)
}
}
func Init(client *Client, node *Node) {
registerNode(client, node)
for _, path := range node.getWatchNodes() {
watchNode(client, node, path)
}
}
package main
main.go
package main
import (
"log"
"os"
"strconv"
"strings"
"time"
"./etcd"
)
func main() {
if len(os.Args) < 4 {
return
}
log.Println(os.Args)
path := "/services/" + os.Args[1]
host := os.Args[2]
port, err := strconv.Atoi(os.Args[3])
if err != nil {
log.Fatalln(err)
}
etcdHosts := "0.0.0.0:2379"
etcd.Init(&etcd.Client{
Servers: strings.Split(etcdHosts, ","), Timeout: 5}, &etcd.Node{
Path: path, Host: host, Port: port})
time.Sleep(time.Second * 100)
}
边栏推荐
- 那些事情是用Unity开发项目应该一开始规划好的?如何避免后期酿成巨坑?
- jar服务导致cpu飙升问题-带解决方法
- 详解BLEU的原理和计算
- HCIP OSPF dynamic routing protocol
- LabelEncoder和LabelBinarizer的区别
- Daily sql - judgment + aggregation
- Concurrent programming in eight-part essay
- 技能在赛题解析:交换机防环路设置
- 抖音分享口令url API工具
- Amazon Get AMAZON Product Details API Return Value Description
猜你喜欢

获取拼多多商品信息操作详情

下一代 无线局域网--强健性

Daily sql: request for friend application pass rate

A used in the study of EEG ultra scanning analysis process

radix-4 FFT principle and C language code implementation

Discourse 的关闭主题(Close Topic )和重新开放主题

什么是Inductive learning和Transductive learning

HCIP OSPF/MGRE Comprehensive Experiment

拼多多API接口(附上我的可用API)

每日sql-求2016年成功的投资总和
随机推荐
Douyin API interface
Redis源码-String:Redis String命令、Redis String存储原理、Redis字符串三种编码类型、Redis String SDS源码解析、Redis String应用场景
实现通用的、高性能排序和快排优化
《Show, Attend and Tell: Neural Image Caption Generation with Visual Attention》论文阅读(详细)
图的拉普拉斯矩阵
My meeting of the OA project (meeting seating & review)
【深度学习】什么是互信息最大化?
《Show and Tell: A Neural Image Caption Generator》论文解读
淘宝商品详情API接口
daily sql - user retention rate for two days
Redis测试
JVM学习——3——数据一致性
Taobao API common interface and acquisition method
OA项目之项目简介&会议发布
Unity3D 学习路线?
那些事情是用Unity开发项目应该一开始规划好的?如何避免后期酿成巨坑?
docker安装mysql5.7(仅供测试使用)
每日sql-员工奖金过滤和回答率排序第一
下一代 无线局域网--强健性
[损失函数]——均方差