kafka.go
package middleware
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"github.com/Shopify/sarama"
"github.com/garyburd/redigo/redis"
"github.com/golang/protobuf/proto"
"github.com/jinzhu/copier"
)
var Producer sarama.SyncProducer
// OnErrorOccurred 表示发生一个消费错误
type OnErrorOccurred func(err *ConsumeError)
// OnMsgReceived 表示收到一个消息
type OnMsgReceived func(msg Msg)
var consumer *Consumer //消费者
func KafkaInit() {
KafkaProductConnect()
KafkaConsumerConnect()
}
func KafkaConsumerConnect() {
sasl_info := SASLInfo{
UserName: "",
Password: "",
}
go func() {
var err error
consumer, err = NewConsumer(
conf.Kafka_brokerlist,
[]string{conf.Kafka_topic},
conf.Kafka_groupid,
handleMsg,
WithConsumerSASL(sasl_info),
WithConsumerErrorHandler(handleErr),
WithConsumerAOR(Earliest),
WithConsumerManualCommit(false))
if err != nil {
fmt.Println("consumer init error", err.Error())
}
defer func() {
if err := consumer.Close(); err != nil {
Log.Loger.Println("close consumer error", err)
}
}()
fmt.Println("kafka consumer start")
consumer.StartBlock(context.Background())
}()
}
func KafkaProductConnect() {
config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForLocal
// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应
config.Producer.Return.Successes = true
//设置大小
config.Producer.MaxMessageBytes = 5000000
//设置重新转发次数
config.Producer.Retry.Max = 3
//kafka登录
config.Net.SASL.User = conf.Kafka_username
config.Net.SASL.Password = conf.Kafka_password
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.SASL.Enable = true
//scram认证登录
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{
HashGeneratorFcn: SHA512,
}
}
config.ClientID = conf.Kafka_clientid
// 使用给定代理地址和配置创建一个同步生产者
broker_list := strings.Split(conf.Kafka_brokerlist, ",")
// 使用给定代理地址和配置创建一个同步生产者
var err error
Producer, err = sarama.NewSyncProducer(broker_list, config)
if err != nil {
fmt.Println("kafka connect err:", err)
return
}
fmt.Println("kafka connetct success ip", broker_list)
}
func ProduceMsg(topic string, partition int32, key string, data []byte) {
if Producer == nil {
console.Clog.Errorln("kafka not connect ")
return
}
//构建发送的消息,
msg := &sarama.ProducerMessage{
Topic: topic, //包含了消息的主题
Partition: partition, //
Key: sarama.StringEncoder(key), //
}
msg.Value = sarama.ByteEncoder(data)
//SendMessage:该方法是生产者生产给定的消息
//生产成功的时候返回该消息的分区和所在的偏移量
//生产失败的时候返回error
partition, offset, err := Producer.SendMessage(msg)
if err != nil {
fmt.Printf("Send message Fail:%s topic: %s", err, topic)
}
fmt.Println("Partition=", partition, " offset=", offset)
}
func ProduceMsgToDefaultAddr(key string, data []byte) {
ProduceMsg(conf.Kafka_topic, conf.Kafka_partition, key, data)
}
// NewConsumer 创建一个消费者
func NewConsumer(brokers string, topics []string, groupName string, dataMsgReceived OnMsgReceived, options ...ConsumerOption) (*Consumer, error) {
if strings.TrimSpace(brokers) == "" {
return nil, errors.New("brokers为空")
}
addr := strings.Split(brokers, ",")
if len(addr) == 0 {
return nil, errors.New("brokers format error,like 127.0.0.1:9092,127.0.0.1:9093")
}
for _, s := range addr {
if strings.TrimSpace(s) == "" {
return nil, errors.New("brokers has null address")
}
}
if len(topics) == 0 {
return nil, errors.New("topics is null")
}
for _, topic := range topics {
if strings.TrimSpace(topic) == "" {
return nil, errors.New("topic has null word")
}
}
if strings.TrimSpace(groupName) == "" {
return nil, errors.New("groupName is null")
}
if dataMsgReceived == nil {
return nil, errors.New("dataMsgReceived is null")
}
cfg := &consumerConfig{
manualCommit: false, //默认自动确认消息,此时语义是至多一次
aor: Newest, //默认消费最新消息
}
for _, v := range options {
v(cfg)
}
config := sarama.NewConfig()
//是否自动提交已消费的偏移量
config.Consumer.Offsets.AutoCommit.Enable = !cfg.manualCommit
//若当前组无消费偏移量记录,则从什么偏移量消费数据
initial := sarama.OffsetNewest
if cfg.aor == Earliest {
initial = sarama.OffsetOldest
}
config.Consumer.Offsets.Initial = initial
if cfg.saslInfo != nil {
config.Net.SASL.Enable = true
config.Net.SASL.User = cfg.saslInfo.UserName
config.Net.SASL.Password = cfg.saslInfo.Password
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{
HashGeneratorFcn: SHA512,
}
}
}
returnErrors := cfg.errorHandler != nil
config.Consumer.Return.Errors = returnErrors
group, err := sarama.NewConsumerGroup(addr, groupName, config)
if err != nil {
return nil, err
}
ret := &Consumer{
brokers: addr,
groupName: groupName,
topics: topics,
aor: cfg.aor,
manualCommit: cfg.manualCommit,
group: group,
msgReceived: dataMsgReceived,
}
if returnErrors {
go func() {
for e := range group.Errors() {
//消费过程中若通信中断,此处将有error,若通信恢复则消费自动恢复
cfg.errorHandler(&ConsumeError{
Err: e,
})
}
}()
}
return ret, nil
}
// Setup在新的会话开始前执行 早于ConsumeClaim
func (defaultConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup在会话结束时执行
func (defaultConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (c *Consumer) Confirm(msg Msg) error {
if err := c.checkManualCommit(); err != nil {
return err
}
if err := c.checkStarted(); err != nil {
return err
}
c.session.MarkOffset(msg.Topic(), msg.Partition(), msg.Offset()+1, "")
c.session.Commit()
return nil
}
func (c *Consumer) ConfirmBatch(msgs []Msg) error {
if len(msgs) == 0 {
return errors.New("it has no data to verify")
}
if err := c.checkManualCommit(); err != nil {
return err
}
if err := c.checkStarted(); err != nil {
return err
}
if err := c.checkSession(); err != nil {
return err
}
//对于Kafka而言,相同topic下的相同partition下的多个不同的offset,以最后一个确认的为准
//比如offset的顺序是 1 2 3 4 5 6 7 8 4 ,那么提交的offset不是8而是4
//所以需要按照topic和partition进行分组,以组中最大的offset+1作为提交依据
//因为kafka是按照提交的offset作为下次的消费的依据,提交10则下次就从10开始消费,所以+1以便从11开始消费
groups := make(map[string]Msg)
for _, v := range msgs {
if v == nil {
continue
}
key := fmt.Sprintf("%v-%v", v.Topic(), v.Partition())
m, ok := groups[key]
if !ok {
groups[key] = v
continue
}
if v.Offset() > m.Offset() {
groups[key] = v
}
}
for _, v := range groups {
c.session.MarkOffset(v.Topic(), v.Partition(), v.Offset()+1, "")
}
c.session.Commit()
return nil
}
func (c *Consumer) StartBlock(ctx context.Context) {
for {
handler := defaultConsumerGroupHandler{c: c}
errChan := make(chan error, 1)
go func() {
//与broker断开时这里会返回错误 需通过死循环进行重试
err := c.group.Consume(context.Background(), c.topics, handler)
errChan <- err
}()
select {
case <-ctx.Done():
break
case <-errChan:
break
}
}
}
func (c *Consumer) Close() (err error) {
if c.group != nil {
_ = c.group.Close()
}
return nil
}
func (c *Consumer) setStarted(started bool) {
c.isStartedLock.Lock()
c.isStarted = started
c.isStartedLock.Unlock()
}
func (c *Consumer) checkStarted() error {
c.isStartedLock.RLock()
defer c.isStartedLock.RUnlock()
if !c.isStarted {
return errors.New("未开始消费数据,请先执行StartBlock")
}
return nil
}
func (c *Consumer) setSession(sess sarama.ConsumerGroupSession) {
c.sessionLock.Lock()
c.session = sess
c.sessionLock.Unlock()
}
func (c *Consumer) checkSession() error {
c.sessionLock.RLock()
defer c.sessionLock.RUnlock()
if c.session == nil {
return errors.New("Session is null")
}
return nil
}
func (c *Consumer) checkManualCommit() error {
if !c.manualCommit {
return errors.New("manualCommit is true")
}
return nil
}
// ConsumerOption 消费者配置选项
type ConsumerOption func(*consumerConfig)
// WithConsumerSASL 指定消费者的SASL验证信息
func WithConsumerSASL(sasl SASLInfo) ConsumerOption {
return func(cfg *consumerConfig) {
cfg.saslInfo = &sasl
}
}
// WithConsumerAOR 指定消费者的自动偏移量重置方式 默认为Newest
func WithConsumerAOR(aorType AORMode) ConsumerOption {
return func(cfg *consumerConfig) {
cfg.aor = aorType
}
}
// WithConsumerManualCommit 指定消费者是否手动确认消息 默认为自动确认
func WithConsumerManualCommit(manual bool) ConsumerOption {
return func(cfg *consumerConfig) {
cfg.manualCommit = manual
}
}
// WithConsumerErrorHandler 指定消费者错误处理器
func WithConsumerErrorHandler(h OnErrorOccurred) ConsumerOption {
return func(cfg *consumerConfig) {
cfg.errorHandler = h
}
}
func (h defaultConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
//当与broker断开连接,重连并且消费到新消息时会重新进入ConsumeClaim
//断开过程中收到的消息无法进行确认(会失败,但API不会返回错误) 只能待恢复后用新会话进行提交
h.c.setSession(sess)
h.c.setStarted(true)
for msg := range claim.Messages() {
//将收到的消息保证后处理
dm := &MsgData{
session: sess,
msg: msg,
}
if !h.c.manualCommit {
//自动提交模式下每收到一条消息就进行标记,保证自动Commit时可以确认收到的消息
sess.MarkMessage(msg, "")
}
h.c.msgReceived(dm)
}
h.c.setStarted(false)
return nil
}
// 处理msg error
func handleErr(err *ConsumeError) {
Log.Loger.Println("kafka handle error:", err)
}
// 处理msg
func handleMsg(msg Msg) {
Log.Loger.Println("kafka data topic = ", msg.Topic(), " partition = ", msg.Partition(), " offset = ", msg.Offset(), " key = ", msg.Key(), " data = ", string(msg.Data()))
err := consumer.Confirm(msg)
if err != nil {
Log.Loger.Println("kafka confirm err", err)
return
}
seq, _ := strconv.Atoi(strings.Split(msg.Key(), "")[1])
Log.Loger.Println("Recv MQ ID:", seq)
recvChan := conf.RecvChan.ReadMap(seq)
//if _,ok := <-recvChan; ok {
if recvChan != nil {
recvChan <- msg.Data()
}
fmt.Println("Recv MQ ID DONE:", seq)
}
kafka_data.go
package middleware
import (
"sync"
"github.com/Shopify/sarama"
)
// ConsumeError 消费错误
type ConsumeError struct {
Err error
}
// 进行实际的消费处理
type defaultConsumerGroupHandler struct {
c *Consumer
}
// Msg 表示一个Kafka的数据消息
type Msg interface {
//当前数据所属Topic
Topic() string
//当前数据所属Topic下的分区
Partition() int32
//当前数据的分区偏移量
Offset() int64
//实际数据内容
Data() []byte
Key() string
}
// AORMode(auto.offset.reset) Kafka的自动偏移量重置方式
// 同消费者组的消费者无初始偏移或当前偏移在服务器上不存在时消费者的消费行为
type AORMode int16
const (
//从消费开始后产生的新消息开始消费
Newest AORMode = 0
//从最早的消息开始消费
Earliest AORMode = 1
)
// Consumer 消息消费者
type Consumer struct {
//kafka的broker列表
brokers []string
//待消费的Topic列表
topics []string
//消费者组
groupName string
//消费者是否手动提交确认
manualCommit bool
//消费者auto.offset.reset模式
aor AORMode
//当消费者接收到消息时的处理函数
msgReceived OnMsgReceived
//标记当前消费者是否开始消费
isStarted bool
isStartedLock sync.RWMutex
//当前消费者组会话
session sarama.ConsumerGroupSession
sessionLock sync.RWMutex
//实际进行消息消费组对象
group sarama.ConsumerGroup
}
// SASLInfo 简单验证与安全层信息 用于生产者与消费者同Broker之间的安全验证
type SASLInfo struct {
//用户名
UserName string
//密码(明文)
Password string
}
// consumerConfig 消费者配置
type consumerConfig struct {
saslInfo *SASLInfo
aor AORMode
manualCommit bool
errorHandler OnErrorOccurred
}
// MsgData 数据
type MsgData struct {
//实际的消息数据
msg *sarama.ConsumerMessage
//消费者组会话
session sarama.ConsumerGroupSession
}
func (d *MsgData) Data() []byte {
return d.msg.Value
}
func (d *MsgData) Topic() string {
return d.msg.Topic
}
func (d *MsgData) Partition() int32 {
return d.msg.Partition
}
func (d *MsgData) Offset() int64 {
return d.msg.Offset
}
func (d *MsgData) Key() string {
return string(d.msg.Key)
}