Пакет sync является краеугольным камнем безопасного параллельного программирования в Go. Он предоставляет проверенные временем примитивы синхронизации для координации доступа к разделяемым ресурсам в многопоточных приложениях. Несмотря на то, что Go продвигает философию “не передавайте данные через разделяемую память, а разделяйте память через передачу данных”, пакет sync остается незаменимым для решения конкретных архитектурных задач.
В современной разработке высоконагруженных приложений понимание sync пакета критически важно. Он позволяет создавать thread-safe структуры данных, управлять жизненным циклом ресурсов и обеспечивать корректную работу сложных параллельных алгоритмов.
🎯 Философия синхронизации в Go
Каналы vs sync: когда что использовать
Go предлагает два подхода к параллельному программированию:
- Каналы - для передачи данных и координации между горутинами
- sync пакет - для защиты разделяемого состояния и примитивов синхронизации
Используйте каналы, когда:
- Передаете данные между горутинами
- Координируете выполнение операций
- Реализуете паттерны producer-consumer
- Строите конвейеры обработки данных
Используйте sync, когда:
- Защищаете разделяемое состояние (счетчики, кэши, конфигурации)
- Нужна одноразовая инициализация ресурсов
- Ожидаете завершения группы горутин
- Реализуете thread-safe структуры данных
Производительность и memory model
Sync примитивы в Go построены на основе низкоуровневых атомарных операций и memory barriers, что обеспечивает:
- Высокую производительность за счет минимального overhead
- Корректное поведение в многопроцессорных системах
- Гарантии видимости изменений между горутинами
- Предотвращение race conditions на уровне процессора
💬 Статистика производительности
В высоконагруженных системах правильное использование sync.RWMutex может дать прирост производительности до 300% для операций чтения по сравнению с обычным Mutex, когда соотношение чтения к записи составляет 10:1.
🔒 Mutex - основа безопасности данных
Mutex (mutual exclusion) — это основной примитив синхронизации для обеспечения эксклюзивного доступа к ресурсам. В многопоточной среде без синхронизации concurrent доступ к данным приводит к race conditions — ситуациям, когда результат операции зависит от случайного timing выполнения горутин.
Анатомия race condition
Race condition возникает, когда несколько горутин одновременно читают и изменяют одни и те же данные. Классический пример — инкремент счетчика:
- Горутина A читает значение (например, 5)
- Горутина B читает то же значение (5)
- Обе горутины увеличивают на 1
- Горутина A записывает 6
- Горутина B записывает 6 Результат: вместо ожидаемого 7 получаем 6
Mutex как решение
Mutex решает эту проблему, создавая критическую секцию — участок кода, который может выполнять только одна горутина в момент времени. Go предоставляет простой и эффективный API для работы с мьютексами.
Демонстрация проблемы и решения
// Небезопасный счетчик - race condition гарантирована
type UnsafeCounter struct {
value int
}
func (c *UnsafeCounter) Increment() {
c.value++ // НЕ атомарная операция!
}
// Безопасный счетчик с Mutex
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Increment() {
c.mu.Lock() // Критическая секция начинается
defer c.mu.Unlock() // Гарантированное освобождение
c.value++ // Безопасная операция
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value // Консистентное чтение
}
Результаты тестирования производительности
При запуске 1000 горутин, каждая из которых инкрементирует счетчик 1000 раз:
UnsafeCounter:
- Ожидаемый результат: 1,000,000
- Реальный результат: ~850,000-950,000 (случайный)
- Время: ~50ms
- Статус: ❌ Race condition!
SafeCounter (Mutex):
- Ожидаемый результат: 1,000,000
- Реальный результат: 1,000,000 (всегда)
- Время: ~200ms
- Статус: ✅ Корректен, но медленнее
Цена безопасности
Mutex добавляет overhead:
- Блокировка/разблокировка: ~10-20ns на операцию
- Contention: при высокой конкуренции время растет экспоненциально
- Context switching: переключение между горутинами
Но это цена за корректность данных.
📖 RWMutex - читатели-писатели
RWMutex (Read-Write Mutex) решает проблему производительности, когда у вас много операций чтения и мало записи.
Принцип работы RWMutex
type DataStore struct {
mu sync.RWMutex
data map[string]string
}
// Чтение - может выполняться параллельно
func (ds *DataStore) Get(key string) (string, bool) {
ds.mu.RLock() // Блокировка только для чтения
defer ds.mu.RUnlock()
value, exists := ds.data[key]
return value, exists
}
// Запись - эксклюзивная блокировка
func (ds *DataStore) Set(key, value string) {
ds.mu.Lock() // Полная блокировка
defer ds.mu.Unlock()
ds.data[key] = value
}
Когда использовать RWMutex
Используйте RWMutex когда:
- Операций чтения значительно больше, чем записи (соотношение 5:1 и больше)
- Операции чтения занимают заметное время
- У вас много concurrent readers
Используйте обычный Mutex когда:
- Операции быстрые (<1μs)
- Соотношение чтения к записи близко к 1:1
- Простота важнее производительности
Производительность RWMutex
Benchmark результаты (1000 горутин):
При соотношении read:write = 10:1:
- Mutex: 500ms
- RWMutex: 150ms (прирост 300%)
При соотношении read:write = 1:1:
- Mutex: 200ms
- RWMutex: 250ms (overhead 25%)
Внутреннее устройство
RWMutex использует:
- Семафор для читателей: счетчик активных readers
- Эксклюзивный lock для писателей: блокирует все операции
- Writer starvation prevention: предотвращает голодание писателей
👥 WaitGroup - координация горутин
WaitGroup решает задачу ожидания завершения группы горутин. Это важно для graceful shutdown и обеспечения завершения всех операций.
Основные методы WaitGroup
var wg sync.WaitGroup
// Добавляем счетчик горутин
wg.Add(3) // Ожидаем 3 горутины
// В каждой горутине сигнализируем о завершении
go func() {
defer wg.Done() // Уменьшаем счетчик на 1
// Выполняем работу...
}()
// Ждем завершения всех горутин
wg.Wait() // Блокируется пока счетчик не станет 0
Паттерн Worker Pool
func workerPool() {
const numWorkers = 5
tasks := make(chan Task, 100)
var wg sync.WaitGroup
// Запускаем workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range tasks {
processTask(task)
}
}(i)
}
// Отправляем задачи
for _, task := range getAllTasks() {
tasks <- task
}
close(tasks) // Важно: закрываем канал
// Ждем завершения всех workers
wg.Wait()
}
Распространенные ошибки
1. Вызов Add() в горутине:
// ❌ Неправильно - race condition
go func() {
wg.Add(1) // Может выполниться после wg.Wait()
defer wg.Done()
// работа...
}()
// ✅ Правильно
wg.Add(1)
go func() {
defer wg.Done()
// работа...
}()
2. Забыли Done():
// ❌ Deadlock - wg.Wait() будет ждать вечно
wg.Add(1)
go func() {
// Забыли wg.Done()!
doWork()
}()
wg.Wait()
3. Передача WaitGroup по значению:
// ❌ Неправильно
func badWorker(wg sync.WaitGroup) {
defer wg.Done() // Не влияет на оригинальный WaitGroup!
}
// ✅ Правильно
func goodWorker(wg *sync.WaitGroup) {
defer wg.Done()
}
Практические применения
Batch обработка:
- Обработка файлов параллельно
- Загрузка данных из multiple sources
- Валидация большого массива данных
Graceful shutdown:
- Ожидание завершения всех HTTP requests
- Закрытие database connections
- Сохранение state перед выходом
🔄 Once - одноразовая инициализация
package main
import (
"fmt"
"sync"
"time"
)
// Глобальная переменная для инициализации
var (
config *AppConfig
configOnce sync.Once
)
type AppConfig struct {
DatabaseURL string
APIKey string
Debug bool
LoadedAt time.Time
}
// Инициализация конфигурации (вызовется только один раз)
func initConfig() {
fmt.Println("🔧 Загружаем конфигурацию...")
// Симулируем долгую инициализацию
time.Sleep(2 * time.Second)
config = &AppConfig{
DatabaseURL: "postgresql://localhost:5432/mydb",
APIKey: "secret-api-key",
Debug: true,
LoadedAt: time.Now(),
}
fmt.Println("✅ Конфигурация загружена!")
}
// GetConfig возвращает конфигурацию (ленивая инициализация)
func GetConfig() *AppConfig {
configOnce.Do(initConfig) // initConfig вызовется только один раз
return config
}
// Сервис, который использует конфигурацию
func databaseService(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("🗄️ Database Service %d: получаем конфигурацию...\n", id)
cfg := GetConfig()
fmt.Printf("🗄️ Database Service %d: подключение к %s\n", id, cfg.DatabaseURL)
fmt.Printf("🗄️ Database Service %d: конфигурация загружена в %s\n", id, cfg.LoadedAt.Format("15:04:05"))
}
func apiService(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("🌐 API Service %d: получаем конфигурацию...\n", id)
cfg := GetConfig()
fmt.Printf("🌐 API Service %d: используем API ключ %s\n", id, cfg.APIKey)
fmt.Printf("🌐 API Service %d: режим отладки: %t\n", id, cfg.Debug)
}
// Singleton паттерн с Once
type Logger struct {
level string
file string
}
var (
logger *Logger
loggerOnce sync.Once
)
func GetLogger() *Logger {
loggerOnce.Do(func() {
fmt.Println("📝 Инициализируем логгер...")
time.Sleep(1 * time.Second) // Симулируем инициализацию
logger = &Logger{
level: "INFO",
file: "/var/log/app.log",
}
fmt.Println("📝 Логгер инициализирован!")
})
return logger
}
func (l *Logger) Log(message string) {
fmt.Printf("[%s] %s\n", l.level, message)
}
func serviceWithLogging(name string, wg *sync.WaitGroup) {
defer wg.Done()
log := GetLogger()
log.Log(fmt.Sprintf("Сервис %s запущен", name))
time.Sleep(500 * time.Millisecond)
log.Log(fmt.Sprintf("Сервис %s выполняет работу", name))
time.Sleep(500 * time.Millisecond)
log.Log(fmt.Sprintf("Сервис %s завершен", name))
}
func main() {
fmt.Println("🔄 Примеры использования sync.Once")
fmt.Println()
// Тест 1: Ленивая инициализация конфигурации
fmt.Println("=== Тест 1: Инициализация конфигурации ===")
var wg1 sync.WaitGroup
// Запускаем несколько сервисов одновременно
for i := 1; i <= 3; i++ {
wg1.Add(1)
go databaseService(i, &wg1)
}
for i := 1; i <= 2; i++ {
wg1.Add(1)
go apiService(i, &wg1)
}
wg1.Wait()
fmt.Println()
// Тест 2: Singleton паттерн с логгером
fmt.Println("=== Тест 2: Singleton логгер ===")
var wg2 sync.WaitGroup
services := []string{"UserService", "OrderService", "PaymentService"}
for _, serviceName := range services {
wg2.Add(1)
go serviceWithLogging(serviceName, &wg2)
}
wg2.Wait()
fmt.Println("\n✨ Все тесты завершены!")
// Проверяем, что объекты создались только один раз
fmt.Println("\n🔍 Проверка результатов:")
fmt.Printf("Config pointer: %p\n", config)
fmt.Printf("Logger pointer: %p\n", logger)
// Еще один вызов - объекты не пересоздаются
cfg2 := GetConfig()
log2 := GetLogger()
fmt.Printf("Config pointer (2nd call): %p\n", cfg2)
fmt.Printf("Logger pointer (2nd call): %p\n", log2)
if config == cfg2 && logger == log2 {
fmt.Println("✅ Объекты не пересоздавались - sync.Once работает корректно!")
}
}
🎯 Практический пример: Thread-safe кэш
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Элемент кэша
type CacheItem struct {
Value interface{}
ExpiresAt time.Time
}
func (ci *CacheItem) IsExpired() bool {
return time.Now().After(ci.ExpiresAt)
}
// Thread-safe кэш с TTL
type Cache struct {
mu sync.RWMutex
items map[string]*CacheItem
// Статистика
hits int64
misses int64
// Для автоочистки
stopCleanup chan bool
cleanupOnce sync.Once
}
func NewCache() *Cache {
cache := &Cache{
items: make(map[string]*CacheItem),
stopCleanup: make(chan bool),
}
// Запускаем автоочистку
cache.startCleanup()
return cache
}
// Запуск автоочистки (только один раз)
func (c *Cache) startCleanup() {
c.cleanupOnce.Do(func() {
go c.cleanupLoop()
})
}
// Цикл автоочистки истекших элементов
func (c *Cache) cleanupLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.removeExpired()
case <-c.stopCleanup:
return
}
}
}
// Удаление истекших элементов
func (c *Cache) removeExpired() {
c.mu.Lock()
defer c.mu.Unlock()
expired := make([]string, 0)
for key, item := range c.items {
if item.IsExpired() {
expired = append(expired, key)
}
}
for _, key := range expired {
delete(c.items, key)
}
if len(expired) > 0 {
fmt.Printf("🗑️ Удалено %d истекших элементов\n", len(expired))
}
}
// Установка значения
func (c *Cache) Set(key string, value interface{}, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = &CacheItem{
Value: value,
ExpiresAt: time.Now().Add(ttl),
}
fmt.Printf("💾 Добавлено в кэш: %s\n", key)
}
// Получение значения
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
item, exists := c.items[key]
if !exists || item.IsExpired() {
c.misses++
return nil, false
}
c.hits++
return item.Value, true
}
// Удаление элемента
func (c *Cache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.items, key)
fmt.Printf("🗑️ Удалено из кэша: %s\n", key)
}
// Получение размера кэша
func (c *Cache) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.items)
}
// Статистика кэша
func (c *Cache) Stats() (hits, misses int64, hitRatio float64) {
c.mu.RLock()
defer c.mu.RUnlock()
hits = c.hits
misses = c.misses
total := hits + misses
if total > 0 {
hitRatio = float64(hits) / float64(total) * 100
}
return
}
// Закрытие кэша
func (c *Cache) Close() {
close(c.stopCleanup)
}
// Симулируем работу с базой данных
func fetchFromDatabase(key string) (string, time.Duration) {
// Симулируем задержку БД
delay := time.Duration(rand.Intn(1000)) * time.Millisecond
time.Sleep(delay)
return fmt.Sprintf("data_for_%s", key), delay
}
// Сервис, который использует кэш
func dataService(cache *Cache, serviceID int, wg *sync.WaitGroup) {
defer wg.Done()
keys := []string{"user:123", "user:456", "user:789", "product:1", "product:2"}
for i := 0; i < 10; i++ {
key := keys[rand.Intn(len(keys))]
// Сначала проверяем кэш
if value, found := cache.Get(key); found {
fmt.Printf("🟢 Service %d: CACHE HIT для %s = %v\n", serviceID, key, value)
} else {
fmt.Printf("🔴 Service %d: CACHE MISS для %s\n", serviceID, key)
// Получаем из "базы данных"
data, dbDelay := fetchFromDatabase(key)
// Сохраняем в кэш на 10 секунд
cache.Set(key, data, 10*time.Second)
fmt.Printf("🟡 Service %d: загружено из БД за %v: %s = %s\n",
serviceID, dbDelay, key, data)
}
time.Sleep(500 * time.Millisecond)
}
}
func main() {
fmt.Println("🎯 Thread-safe кэш с TTL")
fmt.Println()
cache := NewCache()
defer cache.Close()
var wg sync.WaitGroup
// Запускаем несколько сервисов, которые конкурентно используют кэш
for i := 1; i <= 3; i++ {
wg.Add(1)
go dataService(cache, i, &wg)
}
// Мониторим статистику в отдельной горутине
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for i := 0; i < 5; i++ {
<-ticker.C
hits, misses, hitRatio := cache.Stats()
size := cache.Size()
fmt.Printf("\n📊 Статистика кэша:\n")
fmt.Printf(" Размер: %d элементов\n", size)
fmt.Printf(" Попадания: %d\n", hits)
fmt.Printf(" Промахи: %d\n", misses)
fmt.Printf(" Коэффициент попаданий: %.2f%%\n\n", hitRatio)
}
}()
wg.Wait()
// Финальная статистика
hits, misses, hitRatio := cache.Stats()
fmt.Printf("🏁 Финальная статистика:\n")
fmt.Printf(" Попадания: %d\n", hits)
fmt.Printf(" Промахи: %d\n", misses)
fmt.Printf(" Коэффициент попаданий: %.2f%%\n", hitRatio)
}
🛡️ Лучшие практики sync
1. Всегда используй defer с Unlock
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock() // ✅ Гарантированно разблокируется
c.value++
}
2. Используй RWMutex для данных с частым чтением
type Cache struct {
mu sync.RWMutex // ✅ Позволяет множественное чтение
data map[string]interface{}
}
3. Избегай блокировок внутри блокировок
// ❌ Опасно - может вызвать deadlock
func (c *Cache) BadMethod() {
c.mu.Lock()
defer c.mu.Unlock()
c.AnotherMethod() // Если AnotherMethod тоже захватывает блокировку
}
4. Всегда закрывай ресурсы
func (c *Cache) Close() {
close(c.stopCleanup) // ✅ Завершаем горутину очистки
}
🧠 Проверь себя
- В чем разница между Mutex и RWMutex?
- Зачем использовать WaitGroup?
- Когда применять sync.Once?
- Как избежать deadlock при работе с мьютексами?
- Почему важно использовать defer с Unlock()?
📌 Главное из главы
- sync.Mutex для взаимного исключения при записи
- sync.RWMutex для множественных читателей и эксклюзивной записи
- sync.WaitGroup для ожидания завершения группы горутин
- sync.Once для одноразовой инициализации
- defer unlock всегда используй для гарантированного освобождения
- RWMutex лучше Mutex когда чтение происходит чаще записи
- Предпочитай каналы, но sync незаменим для защиты состояния