Сведения о буферизованных каналах

Завершено

Как вы уже знаете, каналы по умолчанию не буферизуются. Это означает, что они принимают операцию отправки только при наличии операции получения. В противном случае программа будет заблокирована, оставаясь в состоянии ожидания неограниченное время.

В некоторых случаях такого рода синхронизация между Go-подпрограммами действительно необходима. Однако в ряде случаев требуется просто реализовать параллелизм без ограничения того, как Go-подпрограммы взаимодействуют друг с другом.

Буферизованные каналы отправляют и получают данные без блокировки программы, так как они ведут себя подобно очереди. Размер этой очереди можно ограничить при создании канала следующим образом.

ch := make(chan string, 10)

Каждый раз при отправке любых данных в канал в очередь добавляется соответствующий элемент. Затем операция получения удаляет элемент из очереди. Когда канал заполнен, операция отправки просто ожидает, когда освободится место для хранения данных. И наоборот, если канал пуст и имеется операция чтения, она блокируется до тех пор, пока не появятся данные для чтения.

Ниже приведен простой пример, который поможет понять принцип работы буферизованных каналов.

package main

import (
    "fmt"
)

func send(ch chan string, message string) {
    ch <- message
}

func main() {
    size := 4
    ch := make(chan string, size)
    send(ch, "one")
    send(ch, "two")
    send(ch, "three")
    send(ch, "four")
    fmt.Println("All data sent to the channel ...")

    for i := 0; i < size; i++ {
        fmt.Println(<-ch)
    }

    fmt.Println("Done!")
}

После выполнения программы вы увидите следующий результат:

All data sent to the channel ...
one
two
three
four
Done!

Вы можете сказать, что в итоге получилось то же самое, и будете правы. Но давайте посмотрим, что произойдет, если уменьшить значение переменной size (вы можете даже попытаться его увеличить), как показано в примере ниже.

size := 2

После выполнения программы вы получите следующее сообщение об ошибке:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.send(...)
        /Users/developer/go/src/concurrency/main.go:8
main.main()
        /Users/developer/go/src/concurrency/main.go:16 +0xf3
exit status 2

Причина заключается в том, что вызовы функции send выполняются последовательно. Вы не создаете новую Go-подпрограмму. Поэтому в очередь добавить нечего.

Каналы тесно связаны с Go-подпрограммами. Без другой Go-подпрограммы, получающей данные из канала, вся программа может оказаться заблокированной на неограниченное время. Как вы уже видели, так и происходит.

Теперь давайте сделаем кое-что интересное! Мы создадим горутин для последних двух вызовов (первые два вызова правильно помещаются в буфер) и выполните цикл четыре раза. Вот этот код:

func main() {
    size := 2
    ch := make(chan string, size)
    send(ch, "one")
    send(ch, "two")
    go send(ch, "three")
    go send(ch, "four")
    fmt.Println("All data sent to the channel ...")

    for i := 0; i < 4; i++ {
        fmt.Println(<-ch)
    }

    fmt.Println("Done!")
}

При выполнении программы все работает надлежащим образом. В случае применения каналов рекомендуется всегда использовать Go-подпрограммы.

Рассмотрим ситуацию, когда вы создадите буферный канал с большими элементами, чем вам потребуется. Мы будем использовать пример, который мы использовали раньше для проверка API и создания буферизованного канала с размером 10:

package main

import (
    "fmt"
    "net/http"
    "time"
)

func main() {
    start := time.Now()

    apis := []string{
        "https://management.azure.com",
        "https://dev.azure.com",
        "https://api.github.com",
        "https://outlook.office.com/",
        "https://api.somewhereintheinternet.com/",
        "https://graph.microsoft.com",
    }

    ch := make(chan string, 10)

    for _, api := range apis {
        go checkAPI(api, ch)
    }

    for i := 0; i < len(apis); i++ {
        fmt.Print(<-ch)
    }

    elapsed := time.Since(start)
    fmt.Printf("Done! It took %v seconds!\n", elapsed.Seconds())
}

func checkAPI(api string, ch chan string) {
    _, err := http.Get(api)
    if err != nil {
        ch <- fmt.Sprintf("ERROR: %s is down!\n", api)
        return
    }

    ch <- fmt.Sprintf("SUCCESS: %s is up and running!\n", api)
}

После выполнения программы вы получите тот же результат, что и ранее. Вы можете поэкспериментировать, увеличивая или уменьшая размер канала, однако программа будет по-прежнему работать.

Сравнение небуферизованных и буферизованных каналов

На этом этапе может возникнуть вопрос о том, когда следует использовать тот или иной тип канала. Все зависит от того, как будет осуществляться обмен данными между Go-подпрограммами. Каналы без буферизации обмениваются данными синхронно. Они гарантируют, что каждый раз при отправке данных программа блокируется до тех пор, пока кто-то не начнет считывать данные из канала.

И наоборот, буферизованные каналы разделяют операции отправки и получения. Они не блокируют программу, но с ними необходимо проявлять осторожность, так как они могут привести к взаимоблокировке (как было показано ранее). При использовании небуферизованных каналов можно контролировать число Go-подпрограмм, которые могут выполняться одновременно. Например, можно указать количество вызовов, выполняемых каждую секунду для вызова API. В противном случае будет происходить блокировка.

Направления каналов

Каналы в Go имеют еще одну интересную функцию. При использовании каналов в качестве параметров для функции можно указать, предназначен ли канал для отправки или получения данных. По мере увеличения программы функций может оказаться слишком много, так что рекомендуется документировать назначение всех каналов, чтобы правильно их использовать. Кроме того, если канал используется при написании библиотеки, можно предоставить к нему доступ только для чтения для обеспечения согласованности данных.

Направление канала определяется так же, как и его назначение (для чтения или получения данных). Только делать это нужно при объявлении канала в параметре функции. Синтаксис для определения типа канала в качестве параметра в функции:

chan<- int // it's a channel to only send data
<-chan int // it's a channel to only receive data

При отправке данных через канал, предназначенный только для приема, в процессе компиляции программы происходит ошибка.

Рассмотрим следующую программу в качестве примера двух функций. Одна используется для считывания данных, а другая — для их отправки.

package main

import "fmt"

func send(ch chan<- string, message string) {
    fmt.Printf("Sending: %#v\n", message)
    ch <- message
}

func read(ch <-chan string) {
    fmt.Printf("Receiving: %#v\n", <-ch)
}

func main() {
    ch := make(chan string, 1)
    send(ch, "Hello World!")
    read(ch)
}

После выполнения программы вы увидите следующий результат:

Sending: "Hello World!"
Receiving: "Hello World!"

Программа уточняет назначение каждого канала в каждой функции. При попытке использовать канал для отправки данных в канале, предназначенном для их получения, произойдет ошибка компиляции. Например, попробуйте что-то вроде этого:

func read(ch <-chan string) {
    fmt.Printf("Receiving: %#v\n", <-ch)
    ch <- "Bye!"
}

После выполнения программы вы увидите следующее сообщение об ошибке:

# command-line-arguments
./main.go:12:5: invalid operation: ch <- "Bye!" (send to receive-only type <-chan string)

Ошибка компиляции не так страшна, как неправильное использование канала.

Мультиплексирование

Наконец, давайте посмотрим, как взаимодействовать с несколькими каналами одновременно с помощью select ключевое слово. Иногда при работе с несколькими каналами необходимо дождаться определенного события. Например, можно добавить логику для отмены операции при обнаружении аномалий в данных, обрабатываемых программой.

Оператор select работает так же, как оператор switch, но для каналов. Она блокирует выполнение программы до тех пор, пока не получит событие для обработки. Если он получает несколько событий, то выбирает одно из них случайным образом.

Важным аспектом оператора select является завершение его выполнения после обработки события. Если вы хотите дождаться других событий, возможно, потребуется использовать цикл.

Давайте воспользуемся следующей программой, чтобы увидеть select в действии.

package main

import (
    "fmt"
    "time"
)

func process(ch chan string) {
    time.Sleep(3 * time.Second)
    ch <- "Done processing!"
}

func replicate(ch chan string) {
    time.Sleep(1 * time.Second)
    ch <- "Done replicating!"
}

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    go process(ch1)
    go replicate(ch2)

    for i := 0; i < 2; i++ {
        select {
        case process := <-ch1:
            fmt.Println(process)
        case replicate := <-ch2:
            fmt.Println(replicate)
        }
    }
}

После выполнения программы вы увидите следующий результат:

Done replicating!
Done processing!

Обратите внимание, что replicate функция завершилась первым, поэтому вы увидите его выходные данные в терминале. Функция main имеет цикл, так как оператор select завершается сразу же после получения события, но все еще ожидаем завершения выполнения функции process.