将 channel 用作通信机制

已完成

Go 中的 channel 是 goroutine 之间的通信机制。 请记住 Go 的并发方法是:“不是通过共享内存通信;而是通过通信共享内存。”当你需要将值从一个 goroutine 发送到另一个时,可以使用通道。 让我们看看它们的工作原理,以及如何开始使用它们来编写并发 Go 程序。

Channel 语法

由于 channel 是发送和接收数据的通信机制,因此它也有类型之分。 这意味着你只能发送 channel 支持的数据类型。 除使用关键字 chan 作为 channel 的数据类型外,还需指定将通过 channel 传递的数据类型,如 int 类型。

每次声明一个 channel 或希望在函数中指定一个 channel 作为参数时,都需要使用 chan <type>,如 chan int。 若要创建通道,需使用内置的 make() 函数:

ch := make(chan int)

一个 channel 可以执行两项操作:发送数据和接收数据。 若要指定 channel 具有的操作类型,需要使用 channel 运算符 <-。 此外,在 channel 中发送数据和接收数据属于阻止操作。 你一会儿就会明白为何如此。

如果希望通道仅发送数据,请在通道之后使用 <- 运算符。 如果希望通道接收数据,请在通道之前使用 <- 运算符,如下所示:

ch <- x // sends (or writes ) x through channel ch
x = <-ch // x receives (or reads) data sent to the channel ch
<-ch // receives data, but the result is discarded

可在 channel 中执行的另一项操作是关闭 channel。 若要关闭通道,使用内置的 close() 函数:

close(ch)

当你关闭通道时,你希望数据将不再在该通道中发送。 如果试图将数据发送到已关闭的 channel,则程序将发生严重错误。 如果试图通过已关闭的 channel 接收数据,则可以读取发送的所有数据。 随后的每次“读取”都将返回一个零值。

让我们回到之前创建的程序,然后使用通道来删除睡眠功能。 首先,让我们在 main 函数中创建一个字符串 channel,如下所示:

ch := make(chan string)

接下来,删除睡眠行 time.Sleep(3 * time.Second)

现在,我们可以使用 channel 在 goroutine 之间进行通信。 应重构代码并通过通道发送该消息,而不是在 checkAPI 函数中打印结果。 要使用该函数中的 channel,需要添加 channel 作为参数。 checkAPI 函数应如下所示:

func checkAPI(api string, ch chan string) {
    _, err := http.Get(api)
    if err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
        return
    }

    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}

请注意,我们必须使用 fmt.Sprintf 函数,因为我们不想打印任何文本,只需利用通道发送格式化文本。 另请注意,我们在 channel 变量之后使用 <- 运算符来发送数据。

现在,你需要更改 main 函数以发送 channel 变量并接收要打印的数据,如下所示:

ch := make(chan string)

for _, api := range apis {
    go checkAPI(api, ch)
}

fmt.Print(<-ch)

请注意,我们在 channel 之前使用 <- 运算符来表明我们想要从 channel 读取数据。

重新运行程序时,会看到如下所示的输出:

ERROR: https://api.somewhereintheinternet.com/ is down!

Done! It took 0.007401217 seconds!

至少它不用调用睡眠函数就可以工作,对吧? 但它仍然没有达到我们的目的。 我们只看到其中一个 goroutine 的输出,而我们共创建了五个 goroutine。 在下一节中,我们来看看这个程序为什么是这样工作的。

无缓冲 channel

使用 make() 函数创建 channel 时,会创建一个无缓冲 channel,这是默认行为。 无缓冲 channel 会阻止发送操作,直到有人准备好接收数据。 正如我们之前所说,发送和接收都属于阻止操作。 此阻止操作也是上一节中的程序在收到第一条消息后立即停止的原因。

我们可以说 fmt.Print(<-ch) 会阻止程序,因为它从 channel 读取,并等待一些数据到达。 一旦有任何数据到达,它就会继续下一行,然后程序完成。

其他 goroutine 发生了什么? 它们仍在运行,但都没有在侦听。 而且,由于程序提前完成,一些 goroutine 无法发送数据。 为了证明这一点,让我们添加另一个 fmt.Print(<-ch),如下所示:

ch := make(chan string)

for _, api := range apis {
    go checkAPI(api, ch)
}

fmt.Print(<-ch)
fmt.Print(<-ch)

重新运行程序时,会看到如下所示的输出:

ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
Done! It took 0.263611711 seconds!

请注意,现在你会看到两个 API 的输出。 如果继续添加更多 fmt.Print(<-ch) 行,你最终将会读取发送到 channel 的所有数据。 但是如果你试图读取更多数据,而没有 goroutine 再发送数据,会发生什么呢? 例如:

ch := make(chan string)

for _, api := range apis {
    go checkAPI(api, ch)
}

fmt.Print(<-ch)
fmt.Print(<-ch)
fmt.Print(<-ch)
fmt.Print(<-ch)
fmt.Print(<-ch)
fmt.Print(<-ch)

fmt.Print(<-ch)

重新运行程序时,会看到如下所示的输出:

ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://graph.microsoft.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
SUCCESS: https://dev.azure.com is up and running!

它在运行,但程序未完成。 最后一个打印行阻止了程序,因为它需要接收数据。 必须使用类似 Ctrl+C 的命令关闭程序。

上个示例只是证明了读取数据和接收数据都属于阻止操作。 要解决此问题,可以将代码更改为 for 循环,并只接收确定要发送的数据,如下所示:

for i := 0; i < len(apis); i++ {
    fmt.Print(<-ch)
}

以下是程序的最终版本,以防你的版本出错:

package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    ch := make(chan string)

    for _, api := range apis {
        go checkAPI(api, ch)
    }

    for i := 0; i < len(apis); i++ {
        fmt.Print(<-ch)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

func checkAPI(api string, ch chan string) {
    _, err := http.Get(api)
    if err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
        return
    }

    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}

重新运行程序时,会看到如下所示的输出:

ERROR: https://api.somewhereintheinternet.com/ is down!
SUCCESS: https://api.github.com is up and running!
SUCCESS: https://management.azure.com is up and running!
SUCCESS: https://dev.azure.com is up and running!
SUCCESS: https://graph.microsoft.com is up and running!
SUCCESS: https://outlook.office.com/ is up and running!
Done! It took 0.602099714 seconds!

程序正在执行应执行的操作。 你不再使用睡眠函数,而是使用通道。 另请注意,在不使用并发时,现在需要约 600 毫秒完成,而不会耗费近 2 秒。

最后,我们可以说,无缓冲 channel 在同步发送和接收操作。 即使使用并发,通信也是同步的。