Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

hidb.cassandra.md

maoxiaoyue edited this page May 14, 2026 · 2 revisions

hidb.cassandra — Cassandra 5.0 ORM 與管理工具

pkg/hidb/cassandra 是 HypGo 內建的 Cassandra 5.0+ 完整驅動封裝。它建構在 github.com/gocql/gocql 之上,提供:

  • Fluent DDL builder(Keyspace / Table / Index / MV / UDT / UDF / UDA / Trigger)
  • 基於 struct tag 的 Model 對映與 AutoMigrate
  • CRUD builder(Insert / Update / Delete / Select / Batch / LWT)
  • Vector / ANN 查詢(Cassandra 5.0 內建 vector<float, N>)
  • Migration(版本化 schema 變更)
  • RBAC(role / grant / revoke)
  • Describe / Schema Introspection / Diff
  • Nodetool(CQL 虛擬表 + 可選的 exec 包裝)

目錄:連線 · Keyspace · Table · Alter Table · Index · Materialized View · UDT · UDF--uda · Trigger · Model · AutoMigrate · Insert · Update · Delete · Select · Batch--LWT · Migration · RBAC · Describe · Schema Introspection · Nodetool · Query Tracing · NoSQLBench · 預設值


連線

import "github.com/maoxiaoyue/hypgo/pkg/hidb/cassandra"
db, err := cassandra.New(cassandra.Config{
 Hosts: []string{"10.0.0.1", "10.0.0.2"},
 Keyspace: "app",
 Consistency: "quorum", // any/one/two/three/quorum/all/local_quorum/each_quorum/local_one
 Port: 9042,
 Username: "cass",
 Password: "secret",
 ConnectTimeout: 5 * time.Second,
 Timeout: 10 * time.Second,
 NumConns: 4,
 ProtoVersion: 4,
})
if err != nil { log.Fatal(err) }
defer db.Close()

其他 API:

  • cassandra.NewWithoutConnect(cfg) — 延遲連線(呼叫 db.Connect() 才連)
  • db.Session() — 取得底層 *gocql.Session(thread-safe;已關閉時回 nil)
  • db.Ping(ctx) / db.IsConnected()
  • db.Exec(ctx, stmt) — 執行單一 DDL/DML
  • db.ExecScript(ctx, script) — 執行多語句 CQL 腳本(支援 ; 分隔、保留 '...'$$...$$)

Production 硬化欄位(2026年04月29日 補強):

cfg := cassandra.Config{
 Hosts: []string{"10.0.0.1", "10.0.0.2"},
 Keyspace: "app",
 // === Consistency ===
 Consistency: "local_quorum",
 StrictConsistency: true, // 拼錯字串直接回 error(推薦);
 // false(預設)→ fallback 為 LocalOne,不再像舊版悄悄變 Quorum
 // === Retry / Reconnect(不設值即用預設)===
 NumRetries: 3, // -1 = 停用 retry
 RetryMinDelay: 100 * time.Millisecond,
 RetryMaxDelay: 10 * time.Second,
 ReconnectMax: 10, // -1 = 停用 reconnect
 ReconnectInterval: 1 * time.Second,
 // === TLS / mTLS ===
 TLS: cassandra.TLSConfig{
 Enabled: true,
 CertFile: "/etc/cass/client.crt", // mTLS 用
 KeyFile: "/etc/cass/client.key",
 CaFile: "/etc/cass/ca.crt",
 ServerName: "cassandra.prod",
 // InsecureSkipVerify: true, // 僅 dev
 // Config: customTLSConfig, // 直接注入 *tls.Config
 },
 // === 可觀測性 ===
 QueryObserver: myObserver, // gocql.QueryObserver(latency/error metrics)
}
行為 修正前 修正後
Consistency 拼錯 悄悄 fallback 成 Quorum,單節點 dev 卡死 預設 fallback 為 LocalOne;StrictConsistency=true 直接 error
Retry / Reconnect 完全沒設定 → 抖動即失敗 預設 ExponentialBackoff (3 retries) + ConstantReconnect (10 retries ×ばつ 1s)
Close() 多 goroutine 同呼會 panic(double close) sync.Mutex 守護;可重複呼叫;closed 後 Connect 直接 error
Connect() 重複呼叫 直接覆寫 session,舊的 leak idempotent — 已連線回 nil,舊 session 關閉後重連
TLS 完全無 TLSConfig 支援檔案路徑或直接注入 *tls.Config
QueryObserver 無暴露 Config.QueryObserver 直接傳入
Init(map) 只解析 hosts/auth/keyspace/port 全欄位(含 timeout / retry / TLS nested block)

Keyspace

// SimpleStrategy(適合單資料中心)
db.Keyspace("app").Simple(3).Create(ctx)
// NetworkTopologyStrategy(production)
db.Keyspace("app").
 NetworkTopology(map[string]int{"dc1": 3, "dc2": 2}).
 DurableWrites(true).
 Create(ctx)
// 取得 CQL 字串
cql := db.Keyspace("app").Simple(3).CreateCQL()
// Alter
db.Keyspace("app").Simple(5).Alter(ctx)
// Drop
db.Keyspace("app").Drop(ctx) // DROP KEYSPACE IF EXISTS app
// 切換預設 keyspace
db.Use(ctx, "app")

預設值行為: 未指定 replication / DURABLE_WRITES 時自動帶入(見 預設值)。使用 .NoDefaults() 關閉。


Table

完整 Cassandra 5.0 CREATE TABLE 選項:

db.Table("events").
 Column("tenant_id", cassandra.TypeUUID).
 Column("bucket", cassandra.TypeInt).
 Column("ts", cassandra.TypeTimestamp).
 Column("payload", cassandra.TypeText).
 Static("tenant_name", cassandra.TypeText).
 PartitionKey("tenant_id", "bucket").
 ClusteringKey("ts").
 ClusteringOrder("ts", cassandra.Desc).
 WithTTL(604800).
 WithComment("append-only log").
 WithCompaction(cassandra.CompactionOptions{
 Class: cassandra.CompactionTWCS,
 Extra: map[string]interface{}{
 "compaction_window_size": 1,
 "compaction_window_unit": "DAYS",
 },
 }).
 WithCompression(cassandra.CompressionOptions{
 SSTableCompression: cassandra.CompressionZstd,
 ChunkLengthKB: 16,
 }).
 WithCaching(cassandra.CachingOptions{Keys: "ALL", RowsPerPartition: "NONE"}).
 WithCDC(true).
 Create(ctx)

常用型別

常數 CQL 型別
TypeText, TypeVarchar, TypeAscii text / varchar / ascii
TypeInt, TypeBigInt, TypeSmallInt, TypeTinyInt, TypeVarInt int / bigint / smallint / tinyint / varint
TypeFloat, TypeDouble, TypeDecimal float / double / decimal
TypeBoolean, TypeUUID, TimeUUID, TypeInet, TypeBlob, TypeDuration
TypeTimestamp, TypeDate, TypeTime
TypeCounter counter

Vector 序列化注意事項(Cassandra 5.0)

vector<float, N> 是 Cassandra 5.0 新增類型,gocql v1.7.0 尚未原生支援(在 wire 層以 CUSTOM type 傳輸)。直接 session.Query(..., []float32{...}) 可能會被 driver 編成 list<float> 而 server 回 type mismatch。

對策:

// 寫入:手動序列化成 4N bytes big-endian binary32
blob, _ := cassandra.MarshalVectorFloat32(emb, 384)
db.Insert("embeddings").
 Value("id", id).
 Value("vector", blob). // bind 成 []byte 即可
 Exec(ctx)
// 讀出:從 []byte 解
var raw []byte
db.Select("vector").From("embeddings").Where("id", "=", id).
 One(ctx, &raw)
vec, _ := cassandra.UnmarshalVectorFloat32(raw, 384)

等 gocql 升到原生支援後即可直接傳 []float32;屆時這兩個 helper 可下線。建議在新專案先 smoke test 一下 driver 行為。

集合 / 泛型

cassandra.List(cassandra.TypeInt) // list<int>
cassandra.Set(cassandra.TypeText) // set<text>
cassandra.Map(cassandra.TypeText, TypeInt) // map<text, int>
cassandra.Frozen(cassandra.Set(TypeText)) // frozen<set<text>>
cassandra.Tuple(TypeText, TypeInt) // tuple<text, int>
cassandra.UDT("address") // frozen<address>
cassandra.Vector(TypeFloat, 384) // vector<float, 384>
cassandra.VectorFloat(384) // 簡寫

Compaction 常數

CompactionUnified (5.0 推薦) / CompactionSTCS / CompactionLCS / CompactionTWCS / CompactionIncremental

Compression 常數

CompressionLZ4 / CompressionSnappy / CompressionDeflate / CompressionZstd / CompressionNone


Alter Table

db.AlterTable("users").
 AddColumn("bio", cassandra.TypeText).
 DropColumn("legacy").
 RenameColumn("fullname", "full_name").
 WithOptions(cassandra.TableOptions{DefaultTTL: 3600}).
 Exec(ctx)
// 或單純產出多條 CQL 字串
cql := db.AlterTable("users").AddColumn("bio", cassandra.TypeText).CQL()
// ALTER TABLE users ADD bio text;

Index

SAI(Storage-Attached Index,Cassandra 5.0 推薦):

db.Index("users_name_idx").
 On("users", "name").
 SAI().
 Option("case_sensitive", false).
 Create(ctx)

傳統 2i:

db.Index("users_email_idx").On("users", "email").Create(ctx)

集合欄位索引:

db.Index("tags_idx").On("items", "tags").
 Target(cassandra.IndexTargetKeys). // KEYS / VALUES / ENTRIES / FULL
 Create(ctx)

自訂索引:

db.Index("custom_idx").On("t", "col").
 Custom("com.example.CustomIndex").
 Create(ctx)

Materialized View

db.MaterializedView("users_by_email").
 FromTable("users").
 Select("*").
 WhereNotNull("email", "id").
 PartitionKey("email").
 ClusteringKey("id").
 Create(ctx)

UDT

db.Type("address").
 Field("street", cassandra.TypeText).
 Field("city", cassandra.TypeText).
 Field("zip", cassandra.TypeText).
 Create(ctx)
// Alter
db.Type("address").AddField("country", cassandra.TypeText).Alter(ctx)
db.Type("address").Rename("zip", "postal_code").Alter(ctx)

UDF / UDA

// User-defined function
db.Function("state_add").
 Arg("s", cassandra.TypeInt).
 Arg("val", cassandra.TypeInt).
 Returns(cassandra.TypeInt).
 Language("java").
 Body("return s + val;").
 Deterministic(true).
 Monotonic(true).
 CalledOnNullInput(false).
 Create(ctx)
// User-defined aggregate
db.Aggregate("my_sum").
 Arg(cassandra.TypeInt).
 SFunc("state_add").
 StateType(cassandra.TypeInt).
 InitCond("0").
 Create(ctx)

Trigger

// 部署 Java 實作類到每個節點的 $CASSANDRA_HOME/conf/triggers 後:
db.Trigger("audit").
 On("logs.events").
 Using("com.example.triggers.AuditTrigger").
 Create(ctx)
db.Trigger("audit").On("logs.events").Drop(ctx)

Model

用 struct tag 宣告對映:

type User struct {
 ID gocql.UUID `cql:"id,pk"`
 CreatedAt time.Time `cql:"created_at,ck,order=desc"`
 Email string `cql:"email,omitempty"`
 Tags []string `cql:"tags,type=set<text>"`
 Vector []float32 `cql:"vector,type=vector<float, 384>"`
}
func (User) TableName() string { return "users" }

標籤語法

cql:"name[,kind][,type=<cql>][,order=asc|desc][,position=N][,omitempty][,static][,counter]"
關鍵字 說明
pk / partition / partition_key Partition key
ck / clustering / clustering_key Clustering key
static 靜態欄位
counter counter 欄位
type=... 明確指定 CQL 型別(集合/tuple/UDT/vector 必填)
`order=asc desc`
position=N 複合 key 的順序
omitempty Insert/Update 時零值跳過

API

info, _ := cassandra.ParseModel(&User{})
fmt.Println(info.Table, info.PartitionKey, info.Clustering, info.Columns())
tb, _ := db.TableFromModel(&User{})
tb.Create(ctx)

AutoMigrate

一次把多個 model 對應的 table 建出來:

db.AutoMigrate(ctx, &User{}, &Event{}, &Embedding{})

Insert

// 手動
stmt, args := db.Insert("users").
 Value("id", id).Value("email", "a@b.c").
 IfNotExists().
 TTL(3600).
 CQL()
// 執行
err := db.Insert("users").
 Value("id", id).Value("email", "a@b.c").
 Exec(ctx)
// LWT(CAS)— IF NOT EXISTS,回傳是否真的寫入
applied, err := db.Insert("users").
 Value("id", id).Value("email", "a@b.c").
 IfNotExists().
 ExecCAS(ctx)
// 從 model struct 插入
db.Save(ctx, &User{ID: id, Email: "a@b.c"})
// 或用 model 的 options
db.Save(ctx, &user, cassandra.SaveTTL(3600), cassandra.SaveIfNotExists())

Update

// 一般更新
db.Update("users").
 Set("email", "new@x.y").
 Where("id", "=", id).
 Exec(ctx)
// TTL + LWT
applied, _ := db.Update("users").
 Set("email", "new@x.y").
 Where("id", "=", id).
 If("email", "=", "old@x.y").
 ExecCAS(ctx)
// Counter
db.Update("page_views").Increment("views", 1).Where("page", "=", "/home").Exec(ctx)
db.Update("page_views").Decrement("views", 1).Where("page", "=", "/home").Exec(ctx)
// Collection 操作
db.Update("items").Append("tags", []string{"new"}).Where("id", "=", id).Exec(ctx)
db.Update("items").Prepend("tags", []string{"first"}).Where("id", "=", id).Exec(ctx)
db.Update("items").Remove("tags", []string{"old"}).Where("id", "=", id).Exec(ctx)

Delete

// 整行
db.Delete().From("users").Where("id", "=", id).Exec(ctx)
// 只刪欄位
db.Delete().Columns("email").From("users").Where("id", "=", id).Exec(ctx)
// 刪集合中某元素
db.Delete().ElementAt("tags", 0).From("items").Where("id", "=", id).Exec(ctx)
// LWT
applied, _ := db.Delete().From("users").Where("id", "=", id).IfExists().ExecCAS(ctx)

Select

// One
var u User
err := db.Select("id", "email").From("users").
 Where("id", "=", id).
 One(ctx, &u)
// All
var users []User
db.Select("*").From("users").All(ctx, &users)
// Count
n, _ := db.Select().From("users").Count(ctx)
// IN / LIMIT / ALLOW FILTERING
db.Select("*").From("users").
 WhereIn("id", ids).
 Limit(100).
 AllowFiltering().
 All(ctx, &users)
// ANN(vector similarity search)
var results []Embedding
db.Select("*").From("embeddings").
 OrderByANN("vector", queryVector).
 Limit(10).
 All(ctx, &results)
// Pagination (page state)
iter := db.Select("*").From("users").PageSize(100).Iter(ctx)

Batch & LWT

b := db.BatchBuilder(gocql.LoggedBatch)
b.Insert("users").Value("id", id1).Value("email", "a@b")
b.Update("events").Set("status", "done").Where("id", "=", id2)
b.Exec(ctx)
// Counter batch
b := db.BatchBuilder(gocql.CounterBatch)
b.Update("counters").Increment("views", 1).Where("page", "=", "/")
b.Exec(ctx)
// Batch CAS
applied, _ := b.ExecCAS(ctx)

Migration

mig := cassandra.NewMigrator(db, "app", "schema_migrations")
mig.Register(
 cassandra.Migration{
 Version: 202604170001,
 Name: "create_users",
 Up: func(ctx context.Context, db *cassandra.CassandraDB) error { /* ... */ },
 Down: func(ctx context.Context, db *cassandra.CassandraDB) error { /* ... */ },
 },
)
mig.Up(ctx) // 套用所有未套用的
mig.Down(ctx) // 回滾最後一筆
report, _ := mig.Status(ctx)

RBAC

Role

db.Role("alice").
 Password("secret").
 Login(true).
 Superuser(false).
 Create(ctx)
db.Role("alice").Password("newpw").Alter(ctx)
db.Role("alice").Drop(ctx)

Grant / Revoke

// 權限等級
cassandra.PermAll / PermSelect / PermModify / PermCreate / PermAlter / PermDrop / PermAuthorize / PermDescribe / PermExecute / PermUnmask / PermSelectMask
// 資源
cassandra.AllKeyspaces()
cassandra.KeyspaceResource("app")
cassandra.TableResource("app.users")
cassandra.RoleResource("alice")
// Grant
db.Grant(ctx, cassandra.PermSelect, cassandra.TableResource("app.users"), "reader")
db.Revoke(ctx, cassandra.PermModify, cassandra.KeyspaceResource("app"), "writer")
// Role hierarchy
db.GrantRole(ctx, "admin", "alice") // GRANT admin TO alice
db.RevokeRole(ctx, "admin", "alice")
// 僅取 CQL 字串
stmt, _ := cassandra.GrantCQL(cassandra.PermSelect, cassandra.TableResource("app.users"), "reader")
stmt := cassandra.ListRolesCQL()
stmt := cassandra.ListPermissionsCQL("alice")

Describe

// DESCRIBE 字串產生
cassandra.DescribeCQL(cassandra.DescTable, "logs.events")
// → DESCRIBE TABLE logs.events
cassandra.DescribeCQL(cassandra.DescFullSchema, "")
// 透過 system_schema 取得結構化資料
ks, _ := db.DescribeKeyspace(ctx, "app")
tables, _ := db.DescribeTables(ctx, "app")
cols, _ := db.DescribeColumns(ctx, "app", "users")
tbl, cols, _ := db.DescribeTable(ctx, "app", "users")
// 從 system_schema 結果重建 CREATE TABLE 語句
ddl := cassandra.RenderTableDDL(*tbl, cols)

Schema Introspection

完整抓取 system_schema.*:

// 全部 keyspace(預設排除 system_*)
schema, _ := db.Introspect(ctx, cassandra.IntrospectOptions{})
// 單一 keyspace
ks, _ := db.IntrospectKeyspace(ctx, "app")
// JSON 輸出(AI context / migration diff 用)
raw, _ := schema.MarshalJSON()
// Diff:算出從 old 變到 new 的變更
changes := cassandra.SchemaDiff(oldSchema, newSchema)
for _, c := range changes {
 fmt.Printf("%s %s.%s.%s %s\n", c.Kind, c.Keyspace, c.Table, c.Column, c.Detail)
}

變更類型:ChangeAddKeyspace / DropKeyspace / AlterKeyspace / AddTable / DropTable / AlterTable / AddColumn / DropColumn / AlterColumn


Nodetool

方案 B:純 CQL(system_views 虛擬表)

// nodetool info + status subset
info, _ := db.LocalInfo(ctx)
peers, _ := db.Peers(ctx)
snap, _ := db.ClusterStatus(ctx)
// nodetool tpstats
pools, _ := db.ThreadPools(ctx)
// nodetool compactionstats
tasks, _ := db.CompactionTasks(ctx)
// nodetool clientstats
clients, _ := db.Clients(ctx)
// nodetool getlogginglevels / get*
all, _ := db.Settings(ctx)
val, _ := db.Setting(ctx, "compaction_throughput_mb_per_sec")
// cache info
caches, _ := db.Caches(ctx)
// tablestats(size 部分)
totals, _ := db.TableStats(ctx, "app", "users")
// 等待 compaction 跑完
elapsed, _ := db.PollCompactionsUntilIdle(ctx, time.Second)
// 泛型 MapScan
rows, _ := db.RawSystemViewRow(ctx, "thread_pools", nil)

方案 A:包裝 nodetool 二進位(動作類)

nt := &cassandra.NodetoolExec{
 Binary: "nodetool", // 預設 "nodetool"
 Host: "10.0.0.1",
 Port: 7199,
 Username: "admin",
 Password: "secret",
}
nt.Flush(ctx, "app", "events")
nt.Compact(ctx, "app", "events")
nt.Repair(ctx, "app", nil, cassandra.RepairOptions{Full: true, PrimaryRange: true})
nt.Cleanup(ctx, "app")
nt.Drain(ctx)
nt.Snapshot(ctx, "backup-2026年04月17日", "app", "events")
nt.ClearSnapshot(ctx, "backup-2026年04月17日", "app")
nt.Refresh(ctx, "app", "events")
nt.SetCompactionThroughput(ctx, 64)
nt.SetLoggingLevel(ctx, "org.apache.cassandra.db", "DEBUG")
out, _ := nt.Status(ctx, "")

安全性: 使用 exec.CommandContext 傳參,無 shell 解譯,不受命令注入影響。


Query Tracing

啟用 Cassandra 原生 tracing(寫入 system_traces.sessions / system_traces.events),可查每條查詢的 coordinator 路徑、每個階段耗時、replica 分派等。

一次性探針(最常用)

tr, err := db.TraceProbe(ctx, 2*time.Second,
 "SELECT * FROM app.users WHERE id = ?", uid)
if err != nil { log.Fatal(err) }
fmt.Println(tr.Format())
// → Trace <uuid>
// command : QUERY
// coordinator : 10.0.0.1
// duration : 1.523ms (server-reported)
// total elapsed: 1.41ms (max source_elapsed)
// events:
// [ 120μs] 10.0.0.1 Native-Transport-1 Parsing SELECT ...
// [ 340μs] 10.0.0.1 ReadStage-1 Executing single-partition query
// ...

針對既有 gocql.Query 開啟 tracing

q := db.Session().Query("SELECT * FROM app.users").WithContext(ctx)
tracer := cassandra.TraceQuery(q) // 掛上 MemTracer
_ = q.Iter().Close() // 正常執行 query
if id, ok := tracer.Last(); ok {
 trace, _ := db.WaitForTrace(ctx, id, 2*time.Second, 50*time.Millisecond)
 fmt.Println(trace.Format())
}

取得結構化資料

type Trace struct {
 Session TraceSession // command / coordinator / duration / request / client
 Events []TraceEvent // 依 source_elapsed 排序
}
tr, _ := db.GetTrace(ctx, sessionID)
fmt.Println(tr.Duration(), tr.TotalElapsed())
for _, e := range tr.Events {
 fmt.Printf("%s @%dμs: %s\n", e.Source, e.SourceElapsed, e.Activity)
}

cqlsh 風格即時輸出

tracer := cassandra.NewTraceWriter(db.Session(), os.Stdout)
q := db.Session().Query("SELECT * FROM users").WithContext(ctx)
q.Trace(tracer)
_ = q.Iter().Close()
// 每個 event 直接 print 到 stdout

API

函式 / 型別 用途
db.TraceProbe(ctx, wait, stmt, args...) 最簡介面:執行 + 等 trace flush + 回傳 *Trace
db.GetTrace(ctx, sessionID) 直接讀 system_traces.*
db.WaitForTrace(ctx, id, timeout, interval) 輪詢等待 trace 可見(server 寫入有延遲)
cassandra.TraceQuery(q) *gocql.Query 上掛 MemTracer
MemTracer.Last() / .Sessions() / .Reset() 取得 / 重設收集到的 session ID
cassandra.NewTraceWriter(sess, w) 邊跑邊 print 每個 event(cqlsh 風格)
Trace.Format() 人類可讀多行輸出
Trace.Duration() / .TotalElapsed() 伺服器自報 / 最大 source_elapsed

注意事項

  • Cassandra 寫入 system_traces.*非同步的,直接 GetTrace 可能拿到 ErrNotFound;用 WaitForTraceTraceProbe
  • Tracing 預設 TTL 24 小時(tracetype_query_ttl / tracetype_repair_ttl)。
  • 開 tracing 會加 coordinator 負擔;不要全量開,做 sampling(例如 1%)或只針對慢查詢診斷。
  • TraceProbe 會丟棄查詢結果,只供診斷使用;不要拿來取資料。

NoSQLBench (nb5)

pkg/hidb/cassandra/nb5.go 提供 NoSQLBench v5 (nb5 二進位) 的安全 exec wrapper,用來對 Cassandra 5.0 做壓力測試 / benchmark / 容量規劃。

前置: 需另外安裝 nb5,並可在 PATH 上找到(或設定 Binary 指定完整路徑)。nb5 是獨立的 Java fat-jar 工具,不隨 HypGo 發佈。使用 exec.CommandContext 以 argv 方式傳參,不經 shell,無命令注入風險。

基礎設定

nb := &cassandra.NB5Exec{
 Binary: "nb5", // 預設 "nb5"
 Host: "10.0.0.1",
 Port: 9042,
 LocalDC: "dc1", // 現代 driver 必填
 Driver: "cqld4", // nb5 預設
 Username: "cass",
 Password: "secret",
 WorkingDir: "/var/benchmarks", // cwd(讀本地 YAML 時有用)
}
// 探查
out, _ := nb.Version(ctx)
out, _ = nb.ListWorkloads(ctx)
out, _ = nb.ListDrivers(ctx)

最簡:跑 bundled workload 的某個階段

// schema / rampup / main / read / write
out, _ := nb.RunPhase(ctx, "cql-iot", cassandra.PhaseSchema, "", 1, nil)
out, _ = nb.RunPhase(ctx, "cql-iot", cassandra.PhaseRampup, "100000", 0, nil)
out, _ = nb.RunPhase(ctx, "cql-iot", cassandra.PhaseMain, "1M", 32, map[string]string{
 "keycount": "1000000",
 "cyclerate": "10000",
})

Fluent scenario builder(推薦)

out, err := nb.Scenario("cql-iot").
 Phase(cassandra.PhaseMain).
 Cycles("1M").
 Rampup("100k").
 Threads(32).
 CycleRate("10000"). // 固定 ops/s 目標
 Param("keycount", "1000000").
 Errors("count"). // count | warn | stop | retry | ignore
 Alias("steady-state").
 Extra("--report-summary-to", "stdout:60s").
 Extra("--log-histograms", "metrics.hdr:.*:10s").
 Run(ctx)
// 除錯:先印出完整命令列
fmt.Println(nb.Scenario("cql-iot").Phase(cassandra.PhaseSchema).CommandLine())

自己的 workload YAML

out, _ := nb.RunActivity(ctx, "/var/bench/my-workload.yaml", map[string]string{
 "tags": "block:main",
 "cycles": "5M",
 "threads": "64",
})

即生成 inline workload(快速打煙霧測試)

path, _ := cassandra.WriteInlineWorkload("/tmp/kv.yaml", "app", "kv")
// 1. 建 schema
nb.Scenario(path).Phase(cassandra.PhaseSchema).Run(ctx)
// 2. rampup 寫入
nb.Scenario(path).Phase(cassandra.PhaseRampup).
 Cycles("100000").Threads(16).Run(ctx)
// 3. 混合讀寫
out, _ := nb.Scenario(path).Phase(cassandra.PhaseMain).
 Cycles("1M").Threads(32).CycleRate("20000").Run(ctx)

模板包含 scenariosbindingsblocks: schema / rampup / main,並定義 {seq_key}{rw_key}{payload} 三個 binding。

解析結果摘要

summary := cassandra.ParseSummary(out)
fmt.Printf("cycles=%d rate=%.0f ops/s duration=%s errors=%d\n",
 summary.TotalCycles, summary.OpsPerSec, summary.Duration, summary.Errors)

完整指標: ParseSummary 只擷取 stdout 結尾的 headline。詳細 latency 分位數 / timer histogram 請用 nb5 內建匯出器:

  • --report-summary-to stdout:60s — 每 60 秒印
  • --log-histograms metrics.hdr:.*:10s — HDR histogram 檔
  • --report-graphite-to graphite:2003 / --report-prometheus ...

API 一覽

型別 / 方法 用途
NB5Exec 組態(binary / host / localdc / driver / auth)
.Run(ctx, args...) 原始呼叫,回傳 stdout
.RunActivity(ctx, activity, params) 單 activity,params 自動排序
.RunPhase(ctx, workload, phase, cycles, threads, extra) 最常用:直接指定某階段
.Scenario(activity) Fluent builder
.Version / ListWorkloads / ListDrivers 探查
Phase 常數 PhaseSchemaPhaseRampupPhaseMainPhaseReadPhaseWrite
WriteInlineWorkload(path, ks, tbl) 產生 inline workload YAML
ParseSummary(stdout) 擷取 cycles / rate / duration / errors

建議工作流

1. Introspect 既有 schema(或設計好 model)
2. WriteInlineWorkload 或撰寫 workload YAML
3. RunPhase(..., PhaseSchema, ...) 建表
4. RunPhase(..., PhaseRampup, ...) 寫入 baseline 資料
5. 用 Scenario builder 跑 main phase(固定 cyclerate + threads)
6. ParseSummary 或直接收集 HDR histogram
7. 期間同時用 ThreadPools / CompactionTasks 觀察叢集負載

預設值

pkg/hidb/cassandra/defaults.go 定義的全域預設變數(可在 init() 覆寫):

變數 預設值 用途
DefaultKeyspaceReplication SimpleStrategy RF=1 Keyspace 未指定 replication 時
DefaultDurableWrites true Keyspace 未指定 DURABLE_WRITES 時
DefaultCompaction UnifiedCompactionStrategy Table 未指定 compaction 時(5.0 推薦)
DefaultCompression LZ4 + 16 KB chunk Table 未指定 compression 時
DefaultCaching keys=ALL, rows=NONE Table 未指定 caching 時
DefaultGCGraceSeconds 864000(10 天)
DefaultBloomFilterFPChance 0.01
DefaultSpeculativeRetry "99p"

行為:

  • 預設自動套用KeyspaceBuilder.CreateCQL()TableBuilder.CreateCQL()
  • 呼叫者已顯式指定的值絕不會被覆寫
  • .NoDefaults() 關閉(產生「只含顯式設定」的 CQL)
// 全域覆寫(舉例改為 LCS)
cassandra.DefaultCompaction = cassandra.CompactionOptions{Class: cassandra.CompactionLCS}
// 單 builder 關閉
db.Table("t").Column("id", cassandra.TypeUUID).PartitionKey("id").NoDefaults().CreateCQL()

完整範例

func bootstrap(ctx context.Context) {
 db, _ := cassandra.New(cassandra.Config{
 Hosts: []string{"127.0.0.1"}, Keyspace: "app",
 })
 defer db.Close()
 // 1. 建 keyspace
 db.Keyspace("app").NetworkTopology(map[string]int{"dc1": 3}).Create(ctx)
 db.Use(ctx, "app")
 // 2. 建 UDT
 db.Type("address").
 Field("city", cassandra.TypeText).
 Field("zip", cassandra.TypeText).
 Create(ctx)
 // 3. 用 model AutoMigrate
 db.AutoMigrate(ctx, &User{}, &Event{})
 // 4. SAI 索引
 db.Index("users_email_idx").On("users", "email").SAI().Create(ctx)
 // 5. 寫入 + 查詢
 u := User{ID: gocql.TimeUUID(), Email: "a@b.c"}
 db.Save(ctx, &u)
 var got User
 db.Select("*").From("users").Where("id", "=", u.ID).One(ctx, &got)
 // 6. 觀察叢集狀態
 pools, _ := db.ThreadPools(ctx)
 for _, p := range pools { fmt.Println(p.Name, p.ActiveTasks) }
 // 7. Snapshot 一份 schema 備 AI 用
 schema, _ := db.Introspect(ctx, cassandra.IntrospectOptions{})
 os.WriteFile("schema.json", mustJSON(schema), 0644)
}

HypGo

繁體中文 | English


中文文件

設計文件

套件

AI 協作工具鏈

CLI 命令


English Docs

Design Docs

Packages

AI Collaboration Toolchain

CLI Commands

Clone this wiki locally

AltStyle によって変換されたページ (->オリジナル) /