当前位置 博文首页 > golang如何使用sarama访问kafka

    golang如何使用sarama访问kafka

    作者:CodingCode 时间:2021-07-02 18:31

    下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

    使用方式

    1、命令行参数

    $ ./kafkaclient -h
    Usage of ./client:
     -ca string
      CA Certificate (default "ca.pem")
     -cert string
      Client Certificate (default "cert.pem")
     -command string
      consumer|producer (default "consumer")
     -host string
      Common separated kafka hosts (default "localhost:9093")
     -key string
      Client Key (default "key.pem")
     -partition int
      Kafka topic partition
     -tls
      TLS enable
     -topic string
      Kafka topic (default "test--topic")

    2、作为producer启动

    $ ./kafkaclient -command producer \
     -host kafka1:9092,kafka2:9092
    
    ## TLS-enabled
    $ ./kafkaclient -command producer \
     -tls -cert client.pem -key client.key -ca ca.pem \
     -host kafka1:9093,kafka2:9093
    

    producer发送消息给kafka:

    > aaa
    2018/12/15 07:11:21 Produced message: [aaa]
    > bbb
    2018/12/15 07:11:30 Produced message: [bbb]
    > quit

    3、作为consumer启动

    $ ./kafkaclient -command consumer \
     -host kafka1:9092,kafka2:9092
    
    ## TLS-enabled
    $ ./kafkaclient -command consumer \
     -tls -cert client.pem -key client.key -ca ca.pem \
     -host kafka1:9093,kafka2:9093
    

    consumer从kafka接受消息:

    2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
    2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

    完整源代码如下

    这个代码使用到了Shopify/sarama库,请自行下载使用。

    $ cat kafkaclient.go
    package main
    
    import (
     "flag"
     "fmt"
     "log"
     "os"
     "io/ioutil"
     "bufio"
     "strings"
    
     "crypto/tls"
     "crypto/x509"
    
     "github.com/Shopify/sarama"
    )
    
    var (
     command  string
     tlsEnable bool
     hosts  string
     topic  string
     partition int
     clientcert string
     clientkey string
     cacert  string
    )
    
    func main() {
     flag.StringVar(&command, "command",  "consumer",   "consumer|producer")
     flag.BoolVar(&tlsEnable, "tls",   false,    "TLS enable")
     flag.StringVar(&hosts,  "host",   "localhost:9093", "Common separated kafka hosts")
     flag.StringVar(&topic,  "topic",  "test--topic",  "Kafka topic")
     flag.IntVar(&partition,  "partition", 0,     "Kafka topic partition")
     flag.StringVar(&clientcert, "cert",   "cert.pem",   "Client Certificate")
     flag.StringVar(&clientkey, "key",   "key.pem",   "Client Key")
     flag.StringVar(&cacert,  "ca",   "ca.pem",   "CA Certificate")
     flag.Parse()
    
     config := sarama.NewConfig()
     if tlsEnable {
      //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
      tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
      if err != nil {
       log.Fatal(err)
      }
    
      config.Net.TLS.Enable = true
      config.Net.TLS.Config = tlsConfig
     }
     client, err := sarama.NewClient(strings.Split(hosts, ","), config)
     if err != nil {
      log.Fatalf("unable to create kafka client: %q", err)
     }
    
     if command == "consumer" {
      consumer, err := sarama.NewConsumerFromClient(client)
      if err != nil {
       log.Fatal(err)
      }
      defer consumer.Close()
      loopConsumer(consumer, topic, partition)
     } else {
      producer, err := sarama.NewAsyncProducerFromClient(client)
      if err != nil {
       log.Fatal(err)
      }
      defer producer.Close()
      loopProducer(producer, topic, partition)
     }
    }
    
    func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
     // load client cert
     clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
     if err != nil {
      return nil, err
     }
    
     // load ca cert pool
     cacert, err := ioutil.ReadFile(cacertfile)
     if err != nil {
      return nil, err
     }
     cacertpool := x509.NewCertPool()
     cacertpool.AppendCertsFromPEM(cacert)
    
     // generate tlcconfig
     tlsConfig := tls.Config{}
     tlsConfig.RootCAs = cacertpool
     tlsConfig.Certificates = []tls.Certificate{clientcert}
     tlsConfig.BuildNameToCertificate()
     // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
     return &tlsConfig, err
    }
    
    func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
     scanner := bufio.NewScanner(os.Stdin)
     fmt.Print("> ")
     for scanner.Scan() {
      text := scanner.Text()
      if text == "" {
      } else if text == "exit" || text == "quit" {
       break
      } else {
       producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
       log.Printf("Produced message: [%s]\n",text)
      }
      fmt.Print("> ")
     }
    }
    
    func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
     partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
     if err != nil {
      log.Println(err)
      return
     }
     defer partitionConsumer.Close()
    
     for {
      msg := <-partitionConsumer.Messages()
      log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
     }
    }

    编译:

    $ go build kafkaclient.go
    jsjbwy
    下一篇:没有了