zhub

package module
v0.0.0-...-e6e6429 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2025 License: MIT Imports: 14 Imported by: 0

README

ZHub Go Client

一个独立的 ZHub 客户端组件,支持消息发布订阅、RPC 调用、分布式锁等功能。

功能特性

  • ✅ 消息发布订阅 (Pub/Sub)
  • ✅ RPC 调用支持
  • ✅ 分布式锁机制
  • ✅ 定时器支持
  • ✅ 自动重连
  • ✅ 灵活的配置管理
  • ✅ 环境变量支持

安装

从私有仓库安装
# 设置私有模块
go env -w GOPRIVATE=gitea.1216.top/lxy/*

# 安装包
go get gitea.1216.top/lxy/zhub-go-client
配置 Gitea 私有仓库访问
# 方法1: 使用 SSH 密钥(推荐)
git config --global url."[email protected]:".insteadOf "https://gitea.1216.top/"

# 方法2: 使用用户名密码或 Token
git config --global url."https://用户名:密码@gitea.1216.top/".insteadOf "https://gitea.1216.top/"

# 方法3: 使用 .netrc 文件
# 创建 ~/.netrc 文件并添加认证信息

快速开始

1. 配置文件初始化
package main

import (
    "log"
    zhub "gitea.1216.top/lxy/zhub-go-client"
)

func main() {
    // 使用项目配置文件初始化
    err := zhub.InitWithProjectConfig("app.yml")
    if err != nil {
        log.Fatal("Failed to initialize zhub:", err)
    }

    // 发布消息
    zhub.Publish("test-topic", "Hello World!")
    
    // 订阅消息
    zhub.Subscribe("test-topic", func(message string) {
        log.Printf("Received: %s", message)
    })
}
2. 配置文件格式
# app.yml
zhub:
  appname: "my-app"
  addr: "127.0.0.1:1216"
  groupid: "my-group"
  auth: "my-token"

# 其他项目配置...
web:
  addr: "0.0.0.0:8080"
3. 多种初始化方式
// 方式1: 指定配置文件路径
zhub.InitWithProjectConfig("config/app.yml")

// 方式2: 自动发现配置文件
zhub.InitFromCurrentDir()

// 方式3: 使用环境变量
zhub.InitFromEnv() // 需要设置 ZHUB_CONFIG_PATH

// 方式4: 自定义配置选项
opts := &zhub.ConfigOptions{
    ConfigPath: "./config/my-app.yml",
    ConfigKey:  "zhub",
    EnvPrefix:  "ZHUB",
}
zhub.InitWithOptions(opts)

API 使用

消息发布订阅
// 发布消息
zhub.Publish("topic", "message")

// 广播消息
zhub.Broadcast("topic", "message")

// 延迟消息
zhub.Delay("topic", "message", 30) // 30秒后发送

// 订阅消息
zhub.Subscribe("topic", func(message string) {
    // 处理消息
})

// 取消订阅
zhub.Unsubscribe("topic")
RPC 调用
// RPC 客户端
zhub.CallRpc("rpc-service", map[string]interface{}{
    "action": "getUser",
    "id":     123,
}, func(result zhub.RpcResult) {
    if result.Retcode == 0 {
        log.Printf("Success: %v", result.Result)
    } else {
        log.Printf("Error: %s", result.Retinfo)
    }
})

// RPC 服务端
zhub.RpcSubscribe("rpc-service", func(rpc zhub.Rpc) zhub.RpcResult {
    // 处理 RPC 请求
    return rpc.Render(map[string]interface{}{
        "user": map[string]interface{}{
            "id":   123,
            "name": "John",
        },
    })
})
分布式锁
// 获取锁
lock := zhub.AcquireLock("resource-key", 30) // 30秒超时

// 执行业务逻辑
log.Println("Doing critical work...")
time.Sleep(time.Second * 2)

// 释放锁
zhub.ReleaseLock(lock)
定时器
// 设置定时器
zhub.Timer("my-timer", func() {
    log.Println("Timer triggered!")
})

环境变量支持

# 设置配置文件路径
export ZHUB_CONFIG_PATH="/path/to/config.yml"

# 或者直接设置配置值
export ZHUB_APPNAME="my-app"
export ZHUB_ADDR="127.0.0.1:1216"
export ZHUB_GROUPID="my-group"
export ZHUB_AUTH="my-token"

错误处理

// 检查初始化错误
err := zhub.InitWithProjectConfig("config.yml")
if err != nil {
    log.Printf("初始化失败: %v", err)
    return
}

// 检查操作错误
err = zhub.Publish("topic", "message")
if err != nil {
    log.Printf("发布失败: %v", err)
}

测试

# 运行测试
go test ./...

# 运行示例
cd example
go run main.go

许可证

MIT License

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Broadcast

func Broadcast(topic, message string) error

func CallRpc

func CallRpc(topic string, message interface{}, callback func(RpcResult))

func ClientRun

func ClientRun(addr string)

ClientRun client 命令行程序

func Close

func Close()

func Cmd

func Cmd(cmd ...string)

func Delay

func Delay(topic, message string, delay int) error

func InitFromCurrentDir

func InitFromCurrentDir() error

InitFromCurrentDir 从当前目录的配置文件初始化

func InitFromEnv

func InitFromEnv() error

InitFromEnv 从环境变量指定的配置文件初始化

func InitWithOptions

func InitWithOptions(opts *ConfigOptions) error

InitWithOptions 使用选项初始化默认客户端

func InitWithProjectConfig

func InitWithProjectConfig(projectConfigPath string) error

InitWithProjectConfig 使用项目配置文件初始化

func InitWithWorkingDir

func InitWithWorkingDir(configName string) error

InitWithWorkingDir 使用工作目录下的配置文件

func Publish

func Publish(topic, message string) error

全局便捷方法

func ReleaseLock

func ReleaseLock(lock Lock)

func RpcSubscribe

func RpcSubscribe(topic string, handler func(Rpc) RpcResult)

func RpcWithTimeout

func RpcWithTimeout(topic string, message interface{}, timeout time.Duration, callback func(RpcResult))

func Subscribe

func Subscribe(topic string, callback func(string))

func Timer

func Timer(topic string, callback func())

func ToJSON

func ToJSON(message interface{}) (string, error)

ToJSON 将任意类型转换为JSON字符串

func Unsubscribe

func Unsubscribe(topic string)

Types

type Client

type Client struct {
	*ZHubClient
	// contains filtered or unexported fields
}

Client 客户端包装器

func DefaultClient

func DefaultClient() *Client

DefaultClient 获取默认客户端实例

func GetClient

func GetClient() *Client

GetClient 获取客户端实例(如果未初始化则使用默认配置)

func NewClient

func NewClient(config *Config) (*Client, error)

NewClient 创建新的客户端实例

type Config

type Config struct {
	Appname string `mapstructure:"appname"`
	Addr    string `mapstructure:"addr"`
	Groupid string `mapstructure:"groupid"`
	Auth    string `mapstructure:"auth"`
}

Config zhub客户端配置

func LoadConfig

func LoadConfig(configPath string) (*Config, error)

LoadConfig 默认配置加载(向后兼容)

func LoadConfigFromProject

func LoadConfigFromProject(projectConfigPath string) (*Config, error)

LoadConfigFromProject 从项目配置文件加载(推荐方式)

func LoadConfigWithOptions

func LoadConfigWithOptions(opts *ConfigOptions) (*Config, error)

LoadConfigWithOptions 使用选项加载配置

type ConfigOptions

type ConfigOptions struct {
	ConfigPath string // 配置文件路径
	ConfigName string // 配置文件名 (默认: "app")
	ConfigType string // 配置文件类型 (默认: "yml")
	ConfigKey  string // 配置节点名 (默认: "zhub")
	EnvPrefix  string // 环境变量前缀 (可选)
}

ConfigOptions 配置加载选项

type Lock

type Lock struct {
	Key   string // lock Key
	Value string // lock Value
	// contains filtered or unexported fields
}

Lock 分布式锁

func AcquireLock

func AcquireLock(key string, duration int) Lock

type Rpc

type Rpc struct {
	Ruk   string `json:"ruk"`
	Topic string `json:"topic"`
	Value string `json:"value"`

	Ch        chan int  `json:"-"` //请求返回标记
	RpcResult RpcResult `json:"-"`
}

Rpc RPC 请求结构

func (Rpc) Err

func (r Rpc) Err(err error) RpcResult

func (Rpc) Render

func (r Rpc) Render(result any) RpcResult

type RpcResult

type RpcResult struct {
	Ruk     string `json:"ruk"`
	Retcode int    `json:"retcode"`
	Retinfo string `json:"retinfo"`
	Result  any    `json:"result"`
}

RpcResult RPC 返回结果

func (*RpcResult) Err

func (r *RpcResult) Err(err error) RpcResult

func (*RpcResult) GetResult

func (r *RpcResult) GetResult() any

func (*RpcResult) GetRetcode

func (r *RpcResult) GetRetcode() int

func (*RpcResult) GetRetinfo

func (r *RpcResult) GetRetinfo() string

func (*RpcResult) GetRuk

func (r *RpcResult) GetRuk() string

type RpcRet

type RpcRet interface {
	GetRuk() string
	GetRetcode() int
	GetRetinfo() string
	GetResult() any
}

RpcRet RPC 返回值接口

type ZHubClient

type ZHubClient struct {
	Appname string // local appname
	Addr    string // host:port
	Groupid string // client group id
	Auth    string
	// contains filtered or unexported fields
}

ZHubClient zhub客户端

func (*ZHubClient) Broadcast

func (c *ZHubClient) Broadcast(topic string, message string) error

func (*ZHubClient) Close

func (c *ZHubClient) Close()

func (*ZHubClient) Cmd

func (c *ZHubClient) Cmd(cmd ...string)

Cmd send cmd

func (*ZHubClient) Delay

func (c *ZHubClient) Delay(topic string, message string, delay int) error

func (*ZHubClient) Lock

func (c *ZHubClient) Lock(key string, duration int) Lock

Lock Key

func (*ZHubClient) Publish

func (c *ZHubClient) Publish(topic string, message string) error

Publish 发布消息

func (*ZHubClient) Rpc

func (c *ZHubClient) Rpc(topic string, message interface{}, back func(res RpcResult))

func (*ZHubClient) RpcSubscribe

func (c *ZHubClient) RpcSubscribe(topic string, fun func(Rpc) RpcResult)

RpcSubscribe rpc subscribe

func (*ZHubClient) RpcWithTimeout

func (c *ZHubClient) RpcWithTimeout(topic string, message interface{}, timeout time.Duration, back func(res RpcResult))

func (*ZHubClient) Start

func (c *ZHubClient) Start() error

Start 创建一个客户端

func (*ZHubClient) Subscribe

func (c *ZHubClient) Subscribe(topic string, fun func(v string))

Subscribe subscribe topic

func (*ZHubClient) Timer

func (c *ZHubClient) Timer(topic string, fun func())

func (*ZHubClient) Unlock

func (c *ZHubClient) Unlock(l Lock)

func (*ZHubClient) Unsubscribe

func (c *ZHubClient) Unsubscribe(topic string)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL