Skip to content

Конкурентность и параллелизм

Процессы и потоки в ОС

Физическое ядро ОС - выполняет операции. Несколько ядер могут выполнять инструкции параллельно

Логические ядра ОС - абстракция, которая позволяет разделить ресурсы физического ядра. Псевдопараллельно выполняет инструкции на физическом ядре

Процесс ОС - экземпляр программы, которая выполняется

  • Содержит изолированные ресурсы
  • Управляется ОС
  • Переключение контекста процесса осуществляется ОС. Это дорогостоящая операция
  • Есть идентификатор процесс

Потоки ОС - единица выполнения внутри процесса. Более легкая сущность. Размер стека фиксирован - 1-10МБ, определяется во время компиляции

  • У потоков общие ресурсы
  • Управляются планировщиком ОС
  • Можно получить количество потоков в ОС - runtime.GOMAXPROCS(0)
  • Переключение контекста также дорогостоящая операция, потому что они большие по размеру
  • Для управления доступом к ресурсам используются мьютексы, семафоры, атомарные операции

Goroutine

Также как и потоки позволяют выполнять операции параллельно, но:

  • Управляются планировщиком Go
  • Динамический размер стека, увеличится если будет необходимо
  • Гораздо легче по размеру чем потоки (от 2Кб)
  • Синхронизация может быть реализована не управлением доступом к памяти, а передачей данных между горутинами с помощью каналов.
  • Так как они легче, контекст переключить можно быстрее

Горутины с помощью планировщика выполняются поверх потоков ОС. 1000 горутин могут выполняться на 10 потоках ОС.

Количество горутин можно получить с помощью runtime.NumGoroutine()

GC

Планировщик во время работы GC останавливает выполнение всех горутин. Stop The World

Планировщик Go

Планировщик распределяет выполнение горутин на потоки ОС

Особенности работы:

  • Кооперативная многозадачность. Горутина может сама уступить ресурсы, например во время блокировки
  • Поддерживает блокировки. При блокировках переключает контекст и передает ресурсы следующим в очереди горутинам

Все это работает эффективно из-за маленького размера горутин:

  • быстрое создание горутин
  • быстрое переключение контекста
  • возможность использования каналов вместо мьютексов

Самое главное в параллелизме это чтобы время на переключение контекста оправдывало себя. Если у нас сложная вычислительная операция, которую нельзя выполнить асинхронно, то переключение контекста в данном случае будет неэффективно.

Как передать ресурсы другим горутинам:

go
runtime.Gosched()

Стандартные пакеты для реализации параллельных и конкурентных вычислений

sync/atomic

Позволяет выполнять атомарные операции над простыми переменными

выполняется за один шаг на процессоре, поэтому:

  • нет состояния гонки
  • нет блокировки
  • работает быстрее и оптимальнее чем sync.Mutex

Пример, в котором итоговое значение при запусках могло отличатся на 5 единиц:

go
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	atomicCounter := atomic.Int64{}
	counter := 0
	wg := sync.WaitGroup{}

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			atomicCounter.Add(1)
			counter += 1
		}()
	}

	wg.Wait()
	fmt.Println("Итоговое значение counter:", counter)
	fmt.Println("Итоговое значение atomicCounter:", atomicCounter.Load())
}

sync.Mutex/sync.RWMutex

Одновременное чтение и запись может привести к некорректному(частичному) чтению данных. Часть будет обновлена, а часть нет. Особенно актуально для сложных структур, в которых чтение и запись сложные операции с несколькими полями.

sync.Mutex/sync.RWMutex используются для синхронизации. sync.RWMutex позволяет управлять отдельно блокировкой на чтение и на запись. Операции чтения могут выполняться параллельно. Хорошо оптимизирует, когда мало операций на запись.

sync.WaitGroup

wg.Add(), wg.Done(), wg.Wait()

sync.Once, sync.Pool, sync.Cond, sync.Map

sync.Pool - позволяет закинуть в пул объект, чтобы он не был очищен GC, а потом его заново переиспользовать

sync.Map - оптимизирована для частого чтения и редких операций записи. Если записи и удаления частые, то обычная map и sync.Mutex может быть эффективнее

Каналы

Каналы позволяют обмениваться данными между горутинами.

Реализация:

  • Кольцевая очередь FIFO. Буферизованные - размер очереди > 0. Небуферизованные - размер очереди 0.
  • В небуферизованных каналах данные передаются из стека напрямую в другую горутину.
  • При чтении и записи из очереди для блокировки используются mutex и sync/atomic.
  • Для очередей на запись и на чтение используются связанные списки.

Пример чтения из одного из доступных каналов

go
select {
	case <- ch1:
		handler1()
	case <- ch2:
		handler2()
	default:
		defaultHandler()
}

nil каналы бывают полезны для отключения некоторых потоков данных. В этом случае нужно использовать select как в примере выше. Один из каналов будет nil, а чтение будет происходить из другого.

  • Если канал закрыт, при записи будет паника
  • Читать из закрытого канала можно, отдаст 0 значения
  • Чтение и запись в nil канал вызовет deadlock

Примеры

  1. Какое количество горутин будет оптимальным для сложения двух массивов чисел размером 100_000_000?

Зависит от компьютера, на котором запущена программа. При увеличении числа горутин начинает тратится время на создание горутин и переключение контекста.

Увеличение значения GOMAXPROCS не оказало значительного влияния на время выполнения.

Пример кода:

go
package main

import (
	"fmt"
	"math/rand"
	"runtime"
	"sync"
	"time"
)

const len = 100_000_000

const computeCount = 100
const numberOfGoroutines = 10

func main() {
	runtime.GOMAXPROCS(10)

	arr1, arr2 := initArr()

	startsAt := time.Now()

	for i := 0; i < computeCount; i++ {
		compute(arr1, arr2, numberOfGoroutines)
	}

	fmt.Println("Число элементов в массиве:", len)
	fmt.Println("Число горутин:", numberOfGoroutines)
	fmt.Println("Число запусков сложения массивов:", computeCount)
	fmt.Println("Время выполнения сложения массивов:", time.Since(startsAt))
	fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0))
}

func initArr() (*[len]int, *[len]int) {
	arr1 := [len]int{}
	arr2 := [len]int{}

	for i := 0; i < len; i++ {
		arr1[i] = rand.Int()
		arr2[i] = rand.Int()
	}

	return &arr1, &arr2
}

func compute(arr1 *[len]int, arr2 *[len]int, numberOfGoroutines int) *[len]int {
	res := [len]int{}
	batchSize := len / numberOfGoroutines

	wg := sync.WaitGroup{}

	for i := 0; i < numberOfGoroutines; i++ {
		wg.Add(1)

		go func(batchIndex int) {
			defer wg.Done()

			for j := batchSize * batchIndex; j < batchSize*(batchIndex+1); j++ {
				res[j] = arr1[j] + arr2[j]
			}
		}(i)
	}

	wg.Wait()

	return &res
}