09 July 2018

使用goroutine 运行程序

并发与并行

并行是让不同的代码片段同时在不同的物理处理器上执行。并行的关键是同时做很多事情,而并发是指同时管理很多事情,这些事情可能只做了一半就被暂停去做别的事情了。
package main

import (
    "fmt"
    "runtime"
    "sync"
)
func main() {
    // 分配1个逻辑处理器给调度器使用
    runtime.GOMAXPROCS(1)
    // 申明一个wg用来等待程序完成
    var wg sync.WaitGroup
    // 计数加 2,表示要等待两个goroutine
    wg.Add(2)
    fmt.Println("START\n")
    //声明一个匿名函数,并创建一个goroutine
    go func() {
        for count := 0; count < 3; count++ {
            for cha := 'a'; cha < 'a'+26; cha++ {
                fmt.Printf("%c ", cha)
            }
        }
        defer wg.Done()
    }()

    //声明一个匿名函数,并创建一个goroutine
    go func() {

        for count := 0; count < 3; count++ {
            for char := 'A'; char < 'A'+26; char++ {
                fmt.Printf("%c ", char)
            }
        }
        //在函数退出时调用Done 来通知main函数工作已经完成
        defer wg.Done()
    }()
    fmt.Printf("gorunning finished \n")
    wg.Wait()  //等待goroutine结束
    fmt.Printf("end\n")
}

检测并修正竞争状态

锁住共享资源

Go语言提供了传统的同步goroutine的机制,就是对共享资源加锁。如果需要顺序访问一个整型变量或者一段代码,atomic 和sync 包里的函数提供了解决方案。
//原子函数,实现共享
package main

import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
)

var (
    counter int64
    wg      sync.WaitGroup
)

func incCounter(u int64) {
    defer wg.Done()
    for count := 0; count < 2; count++ {
        atomic.AddInt64(&counter, u)
        runtime.Gosched()  //从线程退出到队列中
    }
}

func main() {
    wg.Add(2)
    go incCounter(1)
    go incCounter(2)
    wg.Wait()
    fmt.Println("counter:", counter)
}
//互斥锁
package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    counter int
    wg sync.WaitGroup
    //mutex用来定义一段代码临界区
    mutex sync.Mutex
)

func incCounter(count int) {
    defer wg.Done()
    for count := 0; count <2; count++ {
        //同一时刻,只允许一个goroutine进入临界区
        mutex.lock()
        {
            count := counter
            runtime.Gosched()
            count++
            counter = count
        }
        //释放锁
        mutex.unlock()
    }
}

func main() {
    wg.add(2)
    //创建2个goroutine
    go incCounter(1)
    go incCounter(2)
    wg.Wait()
    fmt.Printf("Final Counter: %d\\n", counter)
}

利用通道共享数据

原子函数和互斥锁都能工作,但是依靠它们都不会让编写并发程序变得更简单。在go语言中,通道的概念,也能够在goroutine中实现数据同步。并且简单高效。

在声明通道时,需要指定将要被共享的数据类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。

通道的创建

使用 make 创建通道

// 无缓冲的整型通道
unbuffered := make(chan int)

// 有缓冲的字符串通道
buffered := make(chan string, 10)

// 通过通道发送一个字符串
buffered <- "Gopher"

//从通道中接收数据并初始化value
value := <- buffered

无缓冲的通道

是指在接收前没有能力保存任何值的通道,这种类型的通道要求发送goroutine 和接收goroutine 同时准备好,才能完成发送和接收操作。
//无缓冲通道实例
package main
import(
    "time"
    "sync"
    "fmt"
    "math/rand"
)
//监听程序是否结束
var wg sync.WaitGroup

func init () {
    rand.Seed(time.Now().UnixNano())
}

//player 模拟一个选手在打网球
func player(name string, c chan int) {
    // 在函数退出时调用Done 来通知main 函数工作已经完成
    defer wg.Down()
    for {
        ball , ok := <- c
        if !ok {
            //如果通过关闭,就表示赢了
            fmt.Printf("player %s won\n" , name)
            return
        }
        //选取随机数,用来判断是否丢球
        n := rand.Intn(100)
        if n%13 == 0 {
            fmt.Printf("player %s missed\n" , name)
            //关闭通道
            close(c)
            return   
        }
        //显示击球数,并将击球数加1
        fmt.Printf("player %s hit %d \n" , name, ball)
        ball++

        //将击球数存入通道
        c <- ball
    }
}

func main() {
    //创建一个无缓冲通道
    chan1 := make(chan int)
    //计数器加2,表示要等待2个goroutine
    wg.Add(2)
    //启动2个goroutine
    go player("刘德华", chan1)
    go player("张学友", chan1)
    //发球,将1存入通道
    chan1 <- 1
    wg.Wait()
}

有缓冲的通道

是一种在被接收前能存储一个或者多个值的通道。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。
package main 

import (
    "fmt"
    "time"
    "sync"
    "math/rand"
)

//定义常量
const (
    goroutineNums = 4  //goroutine数量
    taskLoad = 10   //通道大小
)

//监听goroutine是的结束
var wg sync.WaitGroup

//初始化函数
func init() {
    //初始化随机种子
    rand.Seed(time.Now().UnixNano())
}

func main() {
    //创建一个有缓冲通道
    chan2 := make(chan string, taskLoad)
    wg.Add(goroutineNums)
    //启动多个goroutine
    for gr := 1;gr <= goroutineNums; gr++ {
        go worker(gr, chan2)
    }

    //增加一组要完成的工作
    for task :=1; task <= taskLoad; task++ {
        tasks <- fmt.Printf("task : %d", task)
    }

    //关闭通道
    close(tasks)
    //等待所有的任务完成
    wg.Wait()
}

//goroutine函数
func worker(worker int, tasks chan string) {
    defer wg.Done()
    for {
        //等待分配任务
        task, ok := <- tasks
        if !ok {
            fmt.Printf("worker:%d shutting Down \n", worker)
            return 
        }
        //开始工作
        fmt.Printf("worker : %d started %s \n", worker, task)

        //随机等待一定时间模拟工作
        sleep := rand.int64n(100)
        time.Sleep(time.Duration(sleep) * time.Millisecond)

        //已经完成工作
        fmt.Printf("worker : %d compeleted %s \n", worker, task)        
    }
}