-
Notifications
You must be signed in to change notification settings - Fork 0
hidb.cassandra.md
pkg/hidb/cassandra 是 HypGo 內建的 Cassandra 5.0+ 完整驅動封裝。它建構在 github.com/gocql/gocql 之上,提供:
- Fluent DDL builder(Keyspace / Table / Index / MV / UDT / UDF / UDA / Trigger)
- 基於 struct tag 的 Model 對映與 AutoMigrate
- CRUD builder(Insert / Update / Delete / Select / Batch / LWT)
- Vector / ANN 查詢(Cassandra 5.0 內建 vector<float, N>)
- Migration(版本化 schema 變更)
- RBAC(role / grant / revoke)
- Describe / Schema Introspection / Diff
- Nodetool(CQL 虛擬表 + 可選的 exec 包裝)
目錄:連線 · Keyspace · Table · Alter Table · Index · Materialized View · UDT · UDF--uda · Trigger · Model · AutoMigrate · Insert · Update · Delete · Select · Batch--LWT · Migration · RBAC · Describe · Schema Introspection · Nodetool · Query Tracing · NoSQLBench · 預設值
import "github.com/maoxiaoyue/hypgo/pkg/hidb/cassandra" db, err := cassandra.New(cassandra.Config{ Hosts: []string{"10.0.0.1", "10.0.0.2"}, Keyspace: "app", Consistency: "quorum", // any/one/two/three/quorum/all/local_quorum/each_quorum/local_one Port: 9042, Username: "cass", Password: "secret", ConnectTimeout: 5 * time.Second, Timeout: 10 * time.Second, NumConns: 4, ProtoVersion: 4, }) if err != nil { log.Fatal(err) } defer db.Close()
其他 API:
-
cassandra.NewWithoutConnect(cfg)— 延遲連線(呼叫db.Connect()才連) -
db.Session()— 取得底層*gocql.Session(thread-safe;已關閉時回 nil) -
db.Ping(ctx)/db.IsConnected() -
db.Exec(ctx, stmt)— 執行單一 DDL/DML -
db.ExecScript(ctx, script)— 執行多語句 CQL 腳本(支援;分隔、保留'...'與$$...$$)
Production 硬化欄位(2026年04月29日 補強):
cfg := cassandra.Config{ Hosts: []string{"10.0.0.1", "10.0.0.2"}, Keyspace: "app", // === Consistency === Consistency: "local_quorum", StrictConsistency: true, // 拼錯字串直接回 error(推薦); // false(預設)→ fallback 為 LocalOne,不再像舊版悄悄變 Quorum // === Retry / Reconnect(不設值即用預設)=== NumRetries: 3, // -1 = 停用 retry RetryMinDelay: 100 * time.Millisecond, RetryMaxDelay: 10 * time.Second, ReconnectMax: 10, // -1 = 停用 reconnect ReconnectInterval: 1 * time.Second, // === TLS / mTLS === TLS: cassandra.TLSConfig{ Enabled: true, CertFile: "/etc/cass/client.crt", // mTLS 用 KeyFile: "/etc/cass/client.key", CaFile: "/etc/cass/ca.crt", ServerName: "cassandra.prod", // InsecureSkipVerify: true, // 僅 dev // Config: customTLSConfig, // 直接注入 *tls.Config }, // === 可觀測性 === QueryObserver: myObserver, // gocql.QueryObserver(latency/error metrics) }
| 行為 | 修正前 | 修正後 |
|---|---|---|
| Consistency 拼錯 | 悄悄 fallback 成 Quorum,單節點 dev 卡死 | 預設 fallback 為 LocalOne;StrictConsistency=true 直接 error |
| Retry / Reconnect | 完全沒設定 → 抖動即失敗 | 預設 ExponentialBackoff (3 retries) + ConstantReconnect (10 retries ×ばつ 1s) |
Close() |
多 goroutine 同呼會 panic(double close) | sync.Mutex 守護;可重複呼叫;closed 後 Connect 直接 error |
Connect() 重複呼叫 |
直接覆寫 session,舊的 leak | idempotent — 已連線回 nil,舊 session 關閉後重連 |
| TLS | 完全無 |
TLSConfig 支援檔案路徑或直接注入 *tls.Config
|
| QueryObserver | 無暴露 |
Config.QueryObserver 直接傳入 |
Init(map) |
只解析 hosts/auth/keyspace/port | 全欄位(含 timeout / retry / TLS nested block) |
// SimpleStrategy(適合單資料中心) db.Keyspace("app").Simple(3).Create(ctx) // NetworkTopologyStrategy(production) db.Keyspace("app"). NetworkTopology(map[string]int{"dc1": 3, "dc2": 2}). DurableWrites(true). Create(ctx) // 取得 CQL 字串 cql := db.Keyspace("app").Simple(3).CreateCQL() // Alter db.Keyspace("app").Simple(5).Alter(ctx) // Drop db.Keyspace("app").Drop(ctx) // DROP KEYSPACE IF EXISTS app // 切換預設 keyspace db.Use(ctx, "app")
預設值行為: 未指定 replication / DURABLE_WRITES 時自動帶入(見 預設值)。使用 .NoDefaults() 關閉。
完整 Cassandra 5.0 CREATE TABLE 選項:
db.Table("events"). Column("tenant_id", cassandra.TypeUUID). Column("bucket", cassandra.TypeInt). Column("ts", cassandra.TypeTimestamp). Column("payload", cassandra.TypeText). Static("tenant_name", cassandra.TypeText). PartitionKey("tenant_id", "bucket"). ClusteringKey("ts"). ClusteringOrder("ts", cassandra.Desc). WithTTL(604800). WithComment("append-only log"). WithCompaction(cassandra.CompactionOptions{ Class: cassandra.CompactionTWCS, Extra: map[string]interface{}{ "compaction_window_size": 1, "compaction_window_unit": "DAYS", }, }). WithCompression(cassandra.CompressionOptions{ SSTableCompression: cassandra.CompressionZstd, ChunkLengthKB: 16, }). WithCaching(cassandra.CachingOptions{Keys: "ALL", RowsPerPartition: "NONE"}). WithCDC(true). Create(ctx)
| 常數 | CQL 型別 |
|---|---|
TypeText, TypeVarchar, TypeAscii
|
text / varchar / ascii |
TypeInt, TypeBigInt, TypeSmallInt, TypeTinyInt, TypeVarInt
|
int / bigint / smallint / tinyint / varint |
TypeFloat, TypeDouble, TypeDecimal
|
float / double / decimal |
TypeBoolean, TypeUUID, TimeUUID, TypeInet, TypeBlob, TypeDuration
|
|
TypeTimestamp, TypeDate, TypeTime
|
|
TypeCounter |
counter |
vector<float, N> 是 Cassandra 5.0 新增類型,gocql v1.7.0 尚未原生支援(在 wire 層以 CUSTOM type 傳輸)。直接 session.Query(..., []float32{...}) 可能會被 driver 編成 list<float> 而 server 回 type mismatch。
對策:
// 寫入:手動序列化成 4N bytes big-endian binary32 blob, _ := cassandra.MarshalVectorFloat32(emb, 384) db.Insert("embeddings"). Value("id", id). Value("vector", blob). // bind 成 []byte 即可 Exec(ctx) // 讀出:從 []byte 解 var raw []byte db.Select("vector").From("embeddings").Where("id", "=", id). One(ctx, &raw) vec, _ := cassandra.UnmarshalVectorFloat32(raw, 384)
等 gocql 升到原生支援後即可直接傳
[]float32;屆時這兩個 helper 可下線。建議在新專案先 smoke test 一下 driver 行為。
cassandra.List(cassandra.TypeInt) // list<int> cassandra.Set(cassandra.TypeText) // set<text> cassandra.Map(cassandra.TypeText, TypeInt) // map<text, int> cassandra.Frozen(cassandra.Set(TypeText)) // frozen<set<text>> cassandra.Tuple(TypeText, TypeInt) // tuple<text, int> cassandra.UDT("address") // frozen<address> cassandra.Vector(TypeFloat, 384) // vector<float, 384> cassandra.VectorFloat(384) // 簡寫
CompactionUnified (5.0 推薦) / CompactionSTCS / CompactionLCS / CompactionTWCS / CompactionIncremental
CompressionLZ4 / CompressionSnappy / CompressionDeflate / CompressionZstd / CompressionNone
db.AlterTable("users"). AddColumn("bio", cassandra.TypeText). DropColumn("legacy"). RenameColumn("fullname", "full_name"). WithOptions(cassandra.TableOptions{DefaultTTL: 3600}). Exec(ctx) // 或單純產出多條 CQL 字串 cql := db.AlterTable("users").AddColumn("bio", cassandra.TypeText).CQL() // ALTER TABLE users ADD bio text;
SAI(Storage-Attached Index,Cassandra 5.0 推薦):
db.Index("users_name_idx"). On("users", "name"). SAI(). Option("case_sensitive", false). Create(ctx)
傳統 2i:
db.Index("users_email_idx").On("users", "email").Create(ctx)
集合欄位索引:
db.Index("tags_idx").On("items", "tags"). Target(cassandra.IndexTargetKeys). // KEYS / VALUES / ENTRIES / FULL Create(ctx)
自訂索引:
db.Index("custom_idx").On("t", "col"). Custom("com.example.CustomIndex"). Create(ctx)
db.MaterializedView("users_by_email"). FromTable("users"). Select("*"). WhereNotNull("email", "id"). PartitionKey("email"). ClusteringKey("id"). Create(ctx)
db.Type("address"). Field("street", cassandra.TypeText). Field("city", cassandra.TypeText). Field("zip", cassandra.TypeText). Create(ctx) // Alter db.Type("address").AddField("country", cassandra.TypeText).Alter(ctx) db.Type("address").Rename("zip", "postal_code").Alter(ctx)
// User-defined function db.Function("state_add"). Arg("s", cassandra.TypeInt). Arg("val", cassandra.TypeInt). Returns(cassandra.TypeInt). Language("java"). Body("return s + val;"). Deterministic(true). Monotonic(true). CalledOnNullInput(false). Create(ctx) // User-defined aggregate db.Aggregate("my_sum"). Arg(cassandra.TypeInt). SFunc("state_add"). StateType(cassandra.TypeInt). InitCond("0"). Create(ctx)
// 部署 Java 實作類到每個節點的 $CASSANDRA_HOME/conf/triggers 後: db.Trigger("audit"). On("logs.events"). Using("com.example.triggers.AuditTrigger"). Create(ctx) db.Trigger("audit").On("logs.events").Drop(ctx)
用 struct tag 宣告對映:
type User struct { ID gocql.UUID `cql:"id,pk"` CreatedAt time.Time `cql:"created_at,ck,order=desc"` Email string `cql:"email,omitempty"` Tags []string `cql:"tags,type=set<text>"` Vector []float32 `cql:"vector,type=vector<float, 384>"` } func (User) TableName() string { return "users" }
cql:"name[,kind][,type=<cql>][,order=asc|desc][,position=N][,omitempty][,static][,counter]"
| 關鍵字 | 說明 |
|---|---|
pk / partition / partition_key
|
Partition key |
ck / clustering / clustering_key
|
Clustering key |
static |
靜態欄位 |
counter |
counter 欄位 |
type=... |
明確指定 CQL 型別(集合/tuple/UDT/vector 必填) |
| `order=asc | desc` |
position=N |
複合 key 的順序 |
omitempty |
Insert/Update 時零值跳過 |
info, _ := cassandra.ParseModel(&User{}) fmt.Println(info.Table, info.PartitionKey, info.Clustering, info.Columns()) tb, _ := db.TableFromModel(&User{}) tb.Create(ctx)
一次把多個 model 對應的 table 建出來:
db.AutoMigrate(ctx, &User{}, &Event{}, &Embedding{})
// 手動 stmt, args := db.Insert("users"). Value("id", id).Value("email", "a@b.c"). IfNotExists(). TTL(3600). CQL() // 執行 err := db.Insert("users"). Value("id", id).Value("email", "a@b.c"). Exec(ctx) // LWT(CAS)— IF NOT EXISTS,回傳是否真的寫入 applied, err := db.Insert("users"). Value("id", id).Value("email", "a@b.c"). IfNotExists(). ExecCAS(ctx) // 從 model struct 插入 db.Save(ctx, &User{ID: id, Email: "a@b.c"}) // 或用 model 的 options db.Save(ctx, &user, cassandra.SaveTTL(3600), cassandra.SaveIfNotExists())
// 一般更新 db.Update("users"). Set("email", "new@x.y"). Where("id", "=", id). Exec(ctx) // TTL + LWT applied, _ := db.Update("users"). Set("email", "new@x.y"). Where("id", "=", id). If("email", "=", "old@x.y"). ExecCAS(ctx) // Counter db.Update("page_views").Increment("views", 1).Where("page", "=", "/home").Exec(ctx) db.Update("page_views").Decrement("views", 1).Where("page", "=", "/home").Exec(ctx) // Collection 操作 db.Update("items").Append("tags", []string{"new"}).Where("id", "=", id).Exec(ctx) db.Update("items").Prepend("tags", []string{"first"}).Where("id", "=", id).Exec(ctx) db.Update("items").Remove("tags", []string{"old"}).Where("id", "=", id).Exec(ctx)
// 整行 db.Delete().From("users").Where("id", "=", id).Exec(ctx) // 只刪欄位 db.Delete().Columns("email").From("users").Where("id", "=", id).Exec(ctx) // 刪集合中某元素 db.Delete().ElementAt("tags", 0).From("items").Where("id", "=", id).Exec(ctx) // LWT applied, _ := db.Delete().From("users").Where("id", "=", id).IfExists().ExecCAS(ctx)
// One var u User err := db.Select("id", "email").From("users"). Where("id", "=", id). One(ctx, &u) // All var users []User db.Select("*").From("users").All(ctx, &users) // Count n, _ := db.Select().From("users").Count(ctx) // IN / LIMIT / ALLOW FILTERING db.Select("*").From("users"). WhereIn("id", ids). Limit(100). AllowFiltering(). All(ctx, &users) // ANN(vector similarity search) var results []Embedding db.Select("*").From("embeddings"). OrderByANN("vector", queryVector). Limit(10). All(ctx, &results) // Pagination (page state) iter := db.Select("*").From("users").PageSize(100).Iter(ctx)
b := db.BatchBuilder(gocql.LoggedBatch) b.Insert("users").Value("id", id1).Value("email", "a@b") b.Update("events").Set("status", "done").Where("id", "=", id2) b.Exec(ctx) // Counter batch b := db.BatchBuilder(gocql.CounterBatch) b.Update("counters").Increment("views", 1).Where("page", "=", "/") b.Exec(ctx) // Batch CAS applied, _ := b.ExecCAS(ctx)
mig := cassandra.NewMigrator(db, "app", "schema_migrations") mig.Register( cassandra.Migration{ Version: 202604170001, Name: "create_users", Up: func(ctx context.Context, db *cassandra.CassandraDB) error { /* ... */ }, Down: func(ctx context.Context, db *cassandra.CassandraDB) error { /* ... */ }, }, ) mig.Up(ctx) // 套用所有未套用的 mig.Down(ctx) // 回滾最後一筆 report, _ := mig.Status(ctx)
db.Role("alice"). Password("secret"). Login(true). Superuser(false). Create(ctx) db.Role("alice").Password("newpw").Alter(ctx) db.Role("alice").Drop(ctx)
// 權限等級 cassandra.PermAll / PermSelect / PermModify / PermCreate / PermAlter / PermDrop / PermAuthorize / PermDescribe / PermExecute / PermUnmask / PermSelectMask // 資源 cassandra.AllKeyspaces() cassandra.KeyspaceResource("app") cassandra.TableResource("app.users") cassandra.RoleResource("alice") // Grant db.Grant(ctx, cassandra.PermSelect, cassandra.TableResource("app.users"), "reader") db.Revoke(ctx, cassandra.PermModify, cassandra.KeyspaceResource("app"), "writer") // Role hierarchy db.GrantRole(ctx, "admin", "alice") // GRANT admin TO alice db.RevokeRole(ctx, "admin", "alice") // 僅取 CQL 字串 stmt, _ := cassandra.GrantCQL(cassandra.PermSelect, cassandra.TableResource("app.users"), "reader") stmt := cassandra.ListRolesCQL() stmt := cassandra.ListPermissionsCQL("alice")
// DESCRIBE 字串產生 cassandra.DescribeCQL(cassandra.DescTable, "logs.events") // → DESCRIBE TABLE logs.events cassandra.DescribeCQL(cassandra.DescFullSchema, "") // 透過 system_schema 取得結構化資料 ks, _ := db.DescribeKeyspace(ctx, "app") tables, _ := db.DescribeTables(ctx, "app") cols, _ := db.DescribeColumns(ctx, "app", "users") tbl, cols, _ := db.DescribeTable(ctx, "app", "users") // 從 system_schema 結果重建 CREATE TABLE 語句 ddl := cassandra.RenderTableDDL(*tbl, cols)
完整抓取 system_schema.*:
// 全部 keyspace(預設排除 system_*) schema, _ := db.Introspect(ctx, cassandra.IntrospectOptions{}) // 單一 keyspace ks, _ := db.IntrospectKeyspace(ctx, "app") // JSON 輸出(AI context / migration diff 用) raw, _ := schema.MarshalJSON() // Diff:算出從 old 變到 new 的變更 changes := cassandra.SchemaDiff(oldSchema, newSchema) for _, c := range changes { fmt.Printf("%s %s.%s.%s %s\n", c.Kind, c.Keyspace, c.Table, c.Column, c.Detail) }
變更類型:ChangeAddKeyspace / DropKeyspace / AlterKeyspace / AddTable / DropTable / AlterTable / AddColumn / DropColumn / AlterColumn
// nodetool info + status subset info, _ := db.LocalInfo(ctx) peers, _ := db.Peers(ctx) snap, _ := db.ClusterStatus(ctx) // nodetool tpstats pools, _ := db.ThreadPools(ctx) // nodetool compactionstats tasks, _ := db.CompactionTasks(ctx) // nodetool clientstats clients, _ := db.Clients(ctx) // nodetool getlogginglevels / get* all, _ := db.Settings(ctx) val, _ := db.Setting(ctx, "compaction_throughput_mb_per_sec") // cache info caches, _ := db.Caches(ctx) // tablestats(size 部分) totals, _ := db.TableStats(ctx, "app", "users") // 等待 compaction 跑完 elapsed, _ := db.PollCompactionsUntilIdle(ctx, time.Second) // 泛型 MapScan rows, _ := db.RawSystemViewRow(ctx, "thread_pools", nil)
nt := &cassandra.NodetoolExec{ Binary: "nodetool", // 預設 "nodetool" Host: "10.0.0.1", Port: 7199, Username: "admin", Password: "secret", } nt.Flush(ctx, "app", "events") nt.Compact(ctx, "app", "events") nt.Repair(ctx, "app", nil, cassandra.RepairOptions{Full: true, PrimaryRange: true}) nt.Cleanup(ctx, "app") nt.Drain(ctx) nt.Snapshot(ctx, "backup-2026年04月17日", "app", "events") nt.ClearSnapshot(ctx, "backup-2026年04月17日", "app") nt.Refresh(ctx, "app", "events") nt.SetCompactionThroughput(ctx, 64) nt.SetLoggingLevel(ctx, "org.apache.cassandra.db", "DEBUG") out, _ := nt.Status(ctx, "")
安全性: 使用 exec.CommandContext 傳參,無 shell 解譯,不受命令注入影響。
啟用 Cassandra 原生 tracing(寫入 system_traces.sessions / system_traces.events),可查每條查詢的 coordinator 路徑、每個階段耗時、replica 分派等。
tr, err := db.TraceProbe(ctx, 2*time.Second, "SELECT * FROM app.users WHERE id = ?", uid) if err != nil { log.Fatal(err) } fmt.Println(tr.Format()) // → Trace <uuid> // command : QUERY // coordinator : 10.0.0.1 // duration : 1.523ms (server-reported) // total elapsed: 1.41ms (max source_elapsed) // events: // [ 120μs] 10.0.0.1 Native-Transport-1 Parsing SELECT ... // [ 340μs] 10.0.0.1 ReadStage-1 Executing single-partition query // ...
q := db.Session().Query("SELECT * FROM app.users").WithContext(ctx) tracer := cassandra.TraceQuery(q) // 掛上 MemTracer _ = q.Iter().Close() // 正常執行 query if id, ok := tracer.Last(); ok { trace, _ := db.WaitForTrace(ctx, id, 2*time.Second, 50*time.Millisecond) fmt.Println(trace.Format()) }
type Trace struct { Session TraceSession // command / coordinator / duration / request / client Events []TraceEvent // 依 source_elapsed 排序 } tr, _ := db.GetTrace(ctx, sessionID) fmt.Println(tr.Duration(), tr.TotalElapsed()) for _, e := range tr.Events { fmt.Printf("%s @%dμs: %s\n", e.Source, e.SourceElapsed, e.Activity) }
tracer := cassandra.NewTraceWriter(db.Session(), os.Stdout) q := db.Session().Query("SELECT * FROM users").WithContext(ctx) q.Trace(tracer) _ = q.Iter().Close() // 每個 event 直接 print 到 stdout
| 函式 / 型別 | 用途 |
|---|---|
db.TraceProbe(ctx, wait, stmt, args...) |
最簡介面:執行 + 等 trace flush + 回傳 *Trace
|
db.GetTrace(ctx, sessionID) |
直接讀 system_traces.*
|
db.WaitForTrace(ctx, id, timeout, interval) |
輪詢等待 trace 可見(server 寫入有延遲) |
cassandra.TraceQuery(q) |
在 *gocql.Query 上掛 MemTracer
|
MemTracer.Last() / .Sessions() / .Reset()
|
取得 / 重設收集到的 session ID |
cassandra.NewTraceWriter(sess, w) |
邊跑邊 print 每個 event(cqlsh 風格) |
Trace.Format() |
人類可讀多行輸出 |
Trace.Duration() / .TotalElapsed()
|
伺服器自報 / 最大 source_elapsed |
- Cassandra 寫入
system_traces.*是非同步的,直接GetTrace可能拿到ErrNotFound;用WaitForTrace或TraceProbe。 - Tracing 預設 TTL 24 小時(
tracetype_query_ttl/tracetype_repair_ttl)。 - 開 tracing 會加 coordinator 負擔;不要全量開,做 sampling(例如 1%)或只針對慢查詢診斷。
-
TraceProbe會丟棄查詢結果,只供診斷使用;不要拿來取資料。
pkg/hidb/cassandra/nb5.go 提供 NoSQLBench v5 (nb5 二進位) 的安全 exec wrapper,用來對 Cassandra 5.0 做壓力測試 / benchmark / 容量規劃。
前置: 需另外安裝
nb5,並可在 PATH 上找到(或設定Binary指定完整路徑)。nb5是獨立的 Java fat-jar 工具,不隨 HypGo 發佈。使用exec.CommandContext以 argv 方式傳參,不經 shell,無命令注入風險。
nb := &cassandra.NB5Exec{ Binary: "nb5", // 預設 "nb5" Host: "10.0.0.1", Port: 9042, LocalDC: "dc1", // 現代 driver 必填 Driver: "cqld4", // nb5 預設 Username: "cass", Password: "secret", WorkingDir: "/var/benchmarks", // cwd(讀本地 YAML 時有用) } // 探查 out, _ := nb.Version(ctx) out, _ = nb.ListWorkloads(ctx) out, _ = nb.ListDrivers(ctx)
// schema / rampup / main / read / write out, _ := nb.RunPhase(ctx, "cql-iot", cassandra.PhaseSchema, "", 1, nil) out, _ = nb.RunPhase(ctx, "cql-iot", cassandra.PhaseRampup, "100000", 0, nil) out, _ = nb.RunPhase(ctx, "cql-iot", cassandra.PhaseMain, "1M", 32, map[string]string{ "keycount": "1000000", "cyclerate": "10000", })
out, err := nb.Scenario("cql-iot"). Phase(cassandra.PhaseMain). Cycles("1M"). Rampup("100k"). Threads(32). CycleRate("10000"). // 固定 ops/s 目標 Param("keycount", "1000000"). Errors("count"). // count | warn | stop | retry | ignore Alias("steady-state"). Extra("--report-summary-to", "stdout:60s"). Extra("--log-histograms", "metrics.hdr:.*:10s"). Run(ctx) // 除錯:先印出完整命令列 fmt.Println(nb.Scenario("cql-iot").Phase(cassandra.PhaseSchema).CommandLine())
out, _ := nb.RunActivity(ctx, "/var/bench/my-workload.yaml", map[string]string{ "tags": "block:main", "cycles": "5M", "threads": "64", })
path, _ := cassandra.WriteInlineWorkload("/tmp/kv.yaml", "app", "kv") // 1. 建 schema nb.Scenario(path).Phase(cassandra.PhaseSchema).Run(ctx) // 2. rampup 寫入 nb.Scenario(path).Phase(cassandra.PhaseRampup). Cycles("100000").Threads(16).Run(ctx) // 3. 混合讀寫 out, _ := nb.Scenario(path).Phase(cassandra.PhaseMain). Cycles("1M").Threads(32).CycleRate("20000").Run(ctx)
模板包含 scenarios、bindings、blocks: schema / rampup / main,並定義 {seq_key}、{rw_key}、{payload} 三個 binding。
summary := cassandra.ParseSummary(out) fmt.Printf("cycles=%d rate=%.0f ops/s duration=%s errors=%d\n", summary.TotalCycles, summary.OpsPerSec, summary.Duration, summary.Errors)
完整指標:
ParseSummary只擷取 stdout 結尾的 headline。詳細 latency 分位數 / timer histogram 請用 nb5 內建匯出器:
--report-summary-to stdout:60s— 每 60 秒印--log-histograms metrics.hdr:.*:10s— HDR histogram 檔--report-graphite-to graphite:2003/--report-prometheus ...
| 型別 / 方法 | 用途 |
|---|---|
NB5Exec |
組態(binary / host / localdc / driver / auth) |
.Run(ctx, args...) |
原始呼叫,回傳 stdout |
.RunActivity(ctx, activity, params) |
單 activity,params 自動排序 |
.RunPhase(ctx, workload, phase, cycles, threads, extra) |
最常用:直接指定某階段 |
.Scenario(activity) |
Fluent builder |
.Version / ListWorkloads / ListDrivers |
探查 |
Phase 常數 |
PhaseSchema、PhaseRampup、PhaseMain、PhaseRead、PhaseWrite
|
WriteInlineWorkload(path, ks, tbl) |
產生 inline workload YAML |
ParseSummary(stdout) |
擷取 cycles / rate / duration / errors |
1. Introspect 既有 schema(或設計好 model)
2. WriteInlineWorkload 或撰寫 workload YAML
3. RunPhase(..., PhaseSchema, ...) 建表
4. RunPhase(..., PhaseRampup, ...) 寫入 baseline 資料
5. 用 Scenario builder 跑 main phase(固定 cyclerate + threads)
6. ParseSummary 或直接收集 HDR histogram
7. 期間同時用 ThreadPools / CompactionTasks 觀察叢集負載
pkg/hidb/cassandra/defaults.go 定義的全域預設變數(可在 init() 覆寫):
| 變數 | 預設值 | 用途 |
|---|---|---|
DefaultKeyspaceReplication |
SimpleStrategy RF=1 |
Keyspace 未指定 replication 時 |
DefaultDurableWrites |
true |
Keyspace 未指定 DURABLE_WRITES 時 |
DefaultCompaction |
UnifiedCompactionStrategy |
Table 未指定 compaction 時(5.0 推薦) |
DefaultCompression |
LZ4 + 16 KB chunk |
Table 未指定 compression 時 |
DefaultCaching |
keys=ALL, rows=NONE |
Table 未指定 caching 時 |
DefaultGCGraceSeconds |
864000(10 天) |
|
DefaultBloomFilterFPChance |
0.01 |
|
DefaultSpeculativeRetry |
"99p" |
行為:
- 預設自動套用到
KeyspaceBuilder.CreateCQL()與TableBuilder.CreateCQL() - 呼叫者已顯式指定的值絕不會被覆寫
- 以
.NoDefaults()關閉(產生「只含顯式設定」的 CQL)
// 全域覆寫(舉例改為 LCS) cassandra.DefaultCompaction = cassandra.CompactionOptions{Class: cassandra.CompactionLCS} // 單 builder 關閉 db.Table("t").Column("id", cassandra.TypeUUID).PartitionKey("id").NoDefaults().CreateCQL()
func bootstrap(ctx context.Context) { db, _ := cassandra.New(cassandra.Config{ Hosts: []string{"127.0.0.1"}, Keyspace: "app", }) defer db.Close() // 1. 建 keyspace db.Keyspace("app").NetworkTopology(map[string]int{"dc1": 3}).Create(ctx) db.Use(ctx, "app") // 2. 建 UDT db.Type("address"). Field("city", cassandra.TypeText). Field("zip", cassandra.TypeText). Create(ctx) // 3. 用 model AutoMigrate db.AutoMigrate(ctx, &User{}, &Event{}) // 4. SAI 索引 db.Index("users_email_idx").On("users", "email").SAI().Create(ctx) // 5. 寫入 + 查詢 u := User{ID: gocql.TimeUUID(), Email: "a@b.c"} db.Save(ctx, &u) var got User db.Select("*").From("users").Where("id", "=", u.ID).One(ctx, &got) // 6. 觀察叢集狀態 pools, _ := db.ThreadPools(ctx) for _, p := range pools { fmt.Println(p.Name, p.ActiveTasks) } // 7. Snapshot 一份 schema 備 AI 用 schema, _ := db.Introspect(ctx, cassandra.IntrospectOptions{}) os.WriteFile("schema.json", mustJSON(schema), 0644) }
設計文件
套件
- config — 設定
- context — 請求上下文
- router — 路由器
- server — 伺服器
- middleware — 中介層
- websocket — WebSocket
- hidb — 資料庫 ORM
- hidb/cassandra — Cassandra
- logger — 日誌
- json — JSON 處理
- grpc — gRPC
AI 協作工具鏈
- schema — Schema-first 路由
- manifest — 專案 Manifest
- contract — Contract Testing
- errors — Typed Error Catalog
- migrate — Migration Diff
- scaffold — 智慧 Scaffold
- airules — AI Rules
CLI 命令
- hyp 總覽
- hyp new
- hyp api
- hyp run
- hyp restart
- hyp generate
- hyp migrate
- hyp context
- hyp ai-rules
- hyp chkcomment
- hyp impact
- hyp docker
- hyp health
- hyp version
- hyp difflog
Design Docs
Packages
- config — Configuration
- context — Request Context
- router — Router
- server — Server
- middleware — Middleware
- websocket — WebSocket
- hidb — Database ORM
- hidb/cassandra - Cassandra 5.0
- logger — Logger
- json — JSON
- grpc — gRPC
AI Collaboration Toolchain
- schema — Schema-first Routing
- manifest — Project Manifest
- contract — Contract Testing
- errors — Typed Error Catalog
- migrate — Migration Diff
- scaffold — Smart Scaffold
- airules — AI Rules
CLI Commands