当前位置 博文首页 > golang 限制同一时间的并发量操作

    golang 限制同一时间的并发量操作

    作者:小辣抓 时间:2021-02-09 09:27

    go的并发量是很厉害的,goroutine创建的代价极小,其中一个重要的原因是因为go采用了分段栈技术,每一个goroutine只占极小的空间。与此同时,goroutine是语言层面的,减少了内核态到用户态的切换开销,并且goroutine摒弃了一些golang用不到的一些os thread的系统调用,创建代价小。

    我们可以一瞬间创建很多个goroutine,这是相当容易的。

    乍一看,这与题目完全不符,前面说了那么多,难道不是鼓励我们多创建goroutine吗?不不不,goroutine确实很好用,但是如果不加以限制,很有可能出现其他的不可预料的错误。

    比如在web领域中, 一个连接,在linux/unix下就相当于是打开了一个文件,占用一个文件描述符。但是系统会规定文件描述符的上限,我们可以使用ulimit -n来进行查看,如果我们遵循量大就好的话,那么一拥而上的请求连接会瞬间报错。

    2018/06/30 10:09:54 dial tcp :8080: socket: too many open files

    上面这条报错信息源于我写的一个循环请求的工具

    package main
    import (
      "sync"
      "net"
      "strconv"
      "fmt"
      "log"
    )
    const (
      MAX_CONCURRENCY = 10000 
    )
    var waitGroup sync.WaitGroup
    func main(){
      concurrency()
      waitGroup.Wait()
    }
    //进行网络io
    func request(currentCount int){
      fmt.Println("request" + strconv.Itoa(currentCount) + "\r")
      conn, err := net.Dial("tcp",":8080")
      if err != nil { log.Fatal(err) }
      defer conn.Close()
      defer waitGroup.Done()
    }
    //并发请求
    func concurrency(){
      for i := 0;i < MAX_CONCURRENCY;i++ {
        waitGroup.Add(1)
        go request(i)
      }
    }
    

    用go建立一个服务端很简单,我这里简单的贴下server的代码

    package main
    import (
      "io"
      "os"
      "fmt"
      "net"
    )
    func checkErr(err error){
      if err != nil { fmt.Fprintln(os.Stderr, err) }
    }
    func main() {
      listener, err := net.Listen("tcp",":8080")
      checkErr(err)
      for {
        conn, err := listener.Accept()
        checkErr(err)
        go func(conn net.Conn){ 
          _, err := io.WriteString(conn, "welcome!") 
          checkErr(err)
          defer conn.Close()
        }(conn)
      }
    }
    

    现在回到主题,我们可以看到一拥而上其实也有坏处,想要解决这一问题,我们可以限制同一时间的并发数量,可以利用channel来达到这一点,这有点类似于信号量(Semaphore)

    创建一个带缓存的channel,其中CHANNEL_CACHE为同一时间的最大并发量

    想简单的说一下为什么这里chan的类型要用一个空的struct,这是因为在这个场景下(限制同一时间的并发量),通过channel传输的数据的类型并不重要,我们只需要通过做一个通知效果就行了(就像你通知你朋友起床,你只用闪个电话,而不用实际的接通,省去了电话费的开销),这里的空的struct实际上是不占任何空间的,因此这里选用空的struct

    const (
      CHANNEL_CACHE = 200
    )
    var tmpChannel = make(chan struct{}, CHANNEL_CACHE)
    

    在与服务器建立连接的地方这样写(是不是很类似于信号量)

    tmpChan <- struct{}{}
    conn, err := net.Dial("tcp",":8080")
    <- tmpChan

    这样同一时间的并发量就由CHANNEL_CACHE限制下来

    经过循环开启的goroutine在请求服务器之前会向channel发送消息,如果缓存满了,那么说明已经有CHANNEL_CACHE个goroutine在进行与服务器的连接,接着就会阻塞在这里,等待其中一个goroutine处理完之后,从channel中读出一个空的struct,这时阻塞的地方向channel发送一个空struct,就可以与服务器建立连接了

    下面贴一下全部的代码

    package main
    import (
      "sync"
      "net"
      "strconv"
      "fmt"
      "log"
    )
    const (
      MAX_CONCURRENCY = 10000 
      CHANNEL_CACHE = 200
    )
    var tmpChan = make(chan struct{}, MAX_CONCURRENCY)
    var waitGroup sync.WaitGroup
    func main(){
      concurrency()
      waitGroup.Wait()
    }
    //进行网络io
    func request(currentCount int){
      fmt.Println("request" + strconv.Itoa(currentCount) + "\r")
      tmpChan <- struct{}{}
      conn, err := net.Dial("tcp",":8080")
      <- tmpChan
      if err != nil { log.Fatal(err) }
      defer conn.Close()
      defer waitGroup.Done()
    }
    //并发
    func concurrency(){
      for i := 0;i < MAX_CONCURRENCY;i++ {
        waitGroup.Add(1)
        go request(i)
    	}
    }
    

    这样就可以愉快的进行并发了!!!

    补充:Golang限制N个并发同时运行

    我就废话不多说了,大家还是直接看代码吧~

    package main 
    import (
      "fmt"
      "sync"
      "time"
    ) 
    var wg sync.WaitGroup 
    func main() {
      var wg sync.WaitGroup
     
      sem := make(chan struct{}, 2) // 最多允许2个并发同时执行
      taskNum := 10
     
      for i := 0; i < taskNum; i++ {
        wg.Add(1)
     
        go func(id int) {
          defer wg.Done()
     
          sem <- struct{}{}    // 获取信号
          defer func() { <-sem }() // 释放信号
     
          // do something for task
          time.Sleep(time.Second * 2)
          fmt.Println(id, time.Now())
        }(i)
      }
      wg.Wait()
    }
    

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持站长博客。如有错误或未考虑完全的地方,望不吝赐教。

    js
    下一篇:没有了