Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

DoNewsCode/core-processor

Repository files navigation

core-processor

The project provides a framework for consuming Kafka.

It aims to simplify the logic of data consumption and transmission, and actively provide a configurable and efficient way.

With core and core-processor, we can do this:

package main
import (
 "context"
 "encoding/json"
 "fmt"
 
 "github.com/DoNewsCode/core"
 processor "github.com/DoNewsCode/core-processor"
 "github.com/DoNewsCode/core/di"
 "github.com/DoNewsCode/core/otkafka"
 "github.com/segmentio/kafka-go"
)
type Handler struct {
}
func NewHandlerOut() processor.Out {
 return processor.NewOut(
 &Handler{},
 )
}
type Data struct {
 ID int `json:"id"`
 Name string `json:"name"`
}
func (h *Handler) Info() *processor.Info {
 return &processor.Info{
 Name: "default", // the reader name is default
 BatchSize: 3,
 }
}
func (h *Handler) Handle(ctx context.Context, msg *kafka.Message) (interface{}, error) {
 e := &Data{}
 if err := json.Unmarshal(msg.Value, &e); err != nil {
 return nil, err
 }
 return e, nil
}
func (h *Handler) Batch(ctx context.Context, data []interface{}) error {
 for _, e := range data {
 fmt.Println(e.(*Data))
 }
 return nil
}
func main() {
 // prepare config and dependencies
 c := core.New(
 core.WithInline("kafka.reader.default.brokers", []string{"127.0.0.1:9092"}),
 core.WithInline("kafka.reader.default.topic", "test"),
 core.WithInline("kafka.reader.default.groupID", "test"),
 core.WithInline("kafka.reader.default.startOffset", kafka.FirstOffset),
 )
 defer c.Shutdown()
 c.ProvideEssentials()
 c.Provide(otkafka.Providers())
 c.AddModuleFunc(processor.New)
 // provide your handlers
 c.Provide(di.Deps{
 NewHandlerOut,
 })
 
 // start server
 err := c.Serve(context.Background())
 if err != nil {
 panic(err)
 }
}

After the above, we just need to add handlers and provide new methods for core. We can use processor.Info to flexibly adjust the operation of the processor.

Have fun!

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

AltStyle によって変換されたページ (->オリジナル) /