Appearance
Конкурентность и параллелизм
Процессы и потоки в ОС
Физическое ядро ОС - выполняет операции. Несколько ядер могут выполнять инструкции параллельно
Логические ядра ОС - абстракция, которая позволяет разделить ресурсы физического ядра. Псевдопараллельно выполняет инструкции на физическом ядре
Процесс ОС - экземпляр программы, которая выполняется
- Содержит изолированные ресурсы
- Управляется ОС
- Переключение контекста процесса осуществляется ОС. Это дорогостоящая операция
- Есть идентификатор процесс
Потоки ОС - единица выполнения внутри процесса. Более легкая сущность. Размер стека фиксирован - 1-10МБ, определяется во время компиляции
- У потоков общие ресурсы
- Управляются планировщиком ОС
- Можно получить количество потоков в ОС - runtime.GOMAXPROCS(0)
- Переключение контекста также дорогостоящая операция, потому что они большие по размеру
- Для управления доступом к ресурсам используются мьютексы, семафоры, атомарные операции
Goroutine
Также как и потоки позволяют выполнять операции параллельно, но:
- Управляются планировщиком Go
- Динамический размер стека, увеличится если будет необходимо
- Гораздо легче по размеру чем потоки (от 2Кб)
- Синхронизация может быть реализована не управлением доступом к памяти, а передачей данных между горутинами с помощью каналов.
- Так как они легче, контекст переключить можно быстрее
Горутины с помощью планировщика выполняются поверх потоков ОС. 1000 горутин могут выполняться на 10 потоках ОС.
Количество горутин можно получить с помощью runtime.NumGoroutine()
GC
Планировщик во время работы GC останавливает выполнение всех горутин. Stop The World
Планировщик Go
Планировщик распределяет выполнение горутин на потоки ОС
Особенности работы:
- Кооперативная многозадачность. Горутина может сама уступить ресурсы, например во время блокировки
- Поддерживает блокировки. При блокировках переключает контекст и передает ресурсы следующим в очереди горутинам
Все это работает эффективно из-за маленького размера горутин:
- быстрое создание горутин
- быстрое переключение контекста
- возможность использования каналов вместо мьютексов
Самое главное в параллелизме это чтобы время на переключение контекста оправдывало себя. Если у нас сложная вычислительная операция, которую нельзя выполнить асинхронно, то переключение контекста в данном случае будет неэффективно.
Как передать ресурсы другим горутинам:
go
runtime.Gosched()
Стандартные пакеты для реализации параллельных и конкурентных вычислений
sync/atomic
Позволяет выполнять атомарные операции над простыми переменными
выполняется за один шаг на процессоре, поэтому:
- нет состояния гонки
- нет блокировки
- работает быстрее и оптимальнее чем sync.Mutex
Пример, в котором итоговое значение при запусках могло отличатся на 5 единиц:
go
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
atomicCounter := atomic.Int64{}
counter := 0
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomicCounter.Add(1)
counter += 1
}()
}
wg.Wait()
fmt.Println("Итоговое значение counter:", counter)
fmt.Println("Итоговое значение atomicCounter:", atomicCounter.Load())
}
sync.Mutex/sync.RWMutex
Одновременное чтение и запись может привести к некорректному(частичному) чтению данных. Часть будет обновлена, а часть нет. Особенно актуально для сложных структур, в которых чтение и запись сложные операции с несколькими полями.
sync.Mutex/sync.RWMutex используются для синхронизации. sync.RWMutex позволяет управлять отдельно блокировкой на чтение и на запись. Операции чтения могут выполняться параллельно. Хорошо оптимизирует, когда мало операций на запись.
sync.WaitGroup
wg.Add(), wg.Done(), wg.Wait()
sync.Once, sync.Pool, sync.Cond, sync.Map
sync.Pool - позволяет закинуть в пул объект, чтобы он не был очищен GC, а потом его заново переиспользовать
sync.Map - оптимизирована для частого чтения и редких операций записи. Если записи и удаления частые, то обычная map и sync.Mutex может быть эффективнее
Каналы
Каналы позволяют обмениваться данными между горутинами.
Реализация:
- Кольцевая очередь FIFO. Буферизованные - размер очереди > 0. Небуферизованные - размер очереди 0.
- В небуферизованных каналах данные передаются из стека напрямую в другую горутину.
- При чтении и записи из очереди для блокировки используются mutex и sync/atomic.
- Для очередей на запись и на чтение используются связанные списки.
Пример чтения из одного из доступных каналов
go
select {
case <- ch1:
handler1()
case <- ch2:
handler2()
default:
defaultHandler()
}
nil каналы бывают полезны для отключения некоторых потоков данных. В этом случае нужно использовать select как в примере выше. Один из каналов будет nil, а чтение будет происходить из другого.
- Если канал закрыт, при записи будет паника
- Читать из закрытого канала можно, отдаст 0 значения
- Чтение и запись в nil канал вызовет deadlock
Примеры
- Какое количество горутин будет оптимальным для сложения двух массивов чисел размером 100_000_000?
Зависит от компьютера, на котором запущена программа. При увеличении числа горутин начинает тратится время на создание горутин и переключение контекста.
Увеличение значения GOMAXPROCS не оказало значительного влияния на время выполнения.
Пример кода:
go
package main
import (
"fmt"
"math/rand"
"runtime"
"sync"
"time"
)
const len = 100_000_000
const computeCount = 100
const numberOfGoroutines = 10
func main() {
runtime.GOMAXPROCS(10)
arr1, arr2 := initArr()
startsAt := time.Now()
for i := 0; i < computeCount; i++ {
compute(arr1, arr2, numberOfGoroutines)
}
fmt.Println("Число элементов в массиве:", len)
fmt.Println("Число горутин:", numberOfGoroutines)
fmt.Println("Число запусков сложения массивов:", computeCount)
fmt.Println("Время выполнения сложения массивов:", time.Since(startsAt))
fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0))
}
func initArr() (*[len]int, *[len]int) {
arr1 := [len]int{}
arr2 := [len]int{}
for i := 0; i < len; i++ {
arr1[i] = rand.Int()
arr2[i] = rand.Int()
}
return &arr1, &arr2
}
func compute(arr1 *[len]int, arr2 *[len]int, numberOfGoroutines int) *[len]int {
res := [len]int{}
batchSize := len / numberOfGoroutines
wg := sync.WaitGroup{}
for i := 0; i < numberOfGoroutines; i++ {
wg.Add(1)
go func(batchIndex int) {
defer wg.Done()
for j := batchSize * batchIndex; j < batchSize*(batchIndex+1); j++ {
res[j] = arr1[j] + arr2[j]
}
}(i)
}
wg.Wait()
return &res
}