Skip to content

Commit

Permalink
scanner: Scan buckets asynchronously
Browse files Browse the repository at this point in the history
- Scan buckets in all erasure sets asynchrounously
- No data format is changed
- Cycle concept moved to be bucket centric, the cycle is
  incremented ecah time a bucket is successfully or unsuccesfully
  scanned
- Next bucket to scan is chosen based on the lowest possible cycle
  number and managed by the scan manager code
  • Loading branch information
Anis Elleuch committed Oct 27, 2023
1 parent 37aa593 commit 3ef62f4
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 385 deletions.
178 changes: 178 additions & 0 deletions cmd/buckets-scan-mgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
"context"
"math"
"sync"
"time"
)

const (
// the interval to discover if there are new buckets created in the cluster
bucketsListInterval = time.Minute
)

type bucketScanStat struct {
ongoing bool // this bucket is currently being scanned
cycle uint32 // the last cycle of this scan
}

type bucketsScanMgr struct {
ctx context.Context

// A registered function which knows how to list buckets
bucketsLister func(context.Context, BucketOptions) ([]BucketInfo, error)

mu sync.RWMutex
bucketsCh map[int]chan string // A map of an erasure set identifier and a channel of buckets to scan
internal map[int]map[string]bucketScanStat // A map of an erasure set identifier and bucket scan stats
}

func newBucketsScanMgr(s3 ObjectLayer) *bucketsScanMgr {
mgr := &bucketsScanMgr{
ctx: GlobalContext,
bucketsLister: s3.ListBuckets,
internal: make(map[int]map[string]bucketScanStat),
bucketsCh: make(map[int]chan string),
}
return mgr
}

func (mgr *bucketsScanMgr) start() {
// A routine that discovers new buckets and initialize scan stats for each new bucket
go func() {
t := time.NewTimer(bucketsListInterval)
defer t.Stop()

for {
select {
case <-t.C:
allBuckets, err := mgr.bucketsLister(mgr.ctx, BucketOptions{})
if err == nil {
mgr.mu.Lock()
for _, bucket := range allBuckets {
for _, set := range mgr.internal {
_, ok := set[bucket.Name]
if !ok {
set[bucket.Name] = bucketScanStat{}
}
}
}
mgr.mu.Unlock()
}
t.Reset(bucketsListInterval)
case <-mgr.ctx.Done():
return
}
}
}()

// A routine that sends the next bucket to scan for each erasure set listener
go func() {
tick := 10 * time.Second

t := time.NewTimer(tick)
defer t.Stop()

for {
select {
case <-t.C:
mgr.mu.RLock()
for id, ch := range mgr.bucketsCh {
if len(ch) == 0 {
b := mgr.unsafeGetNextBucket(id)
if b != "" {
select {
case ch <- b:
default:
}
}
}
}
mgr.mu.RUnlock()

t.Reset(tick)
case <-mgr.ctx.Done():
return
}
}
}()
}

// Return a channel of buckets names to scan a given erasure set identifier
func (mgr *bucketsScanMgr) getBucketCh(id int) chan string {
mgr.mu.Lock()
defer mgr.mu.Unlock()

mgr.internal[id] = make(map[string]bucketScanStat)
mgr.bucketsCh[id] = make(chan string, 1)

return mgr.bucketsCh[id]
}

// Return the next bucket name to scan of a given erasure set identifier
func (mgr *bucketsScanMgr) unsafeGetNextBucket(id int) string {
var (
leastCycle = uint32(math.MaxUint32)
nextBucket = ""
)

for bucket, stat := range mgr.internal[id] {
if stat.ongoing {
continue
}
if stat.cycle == 0 {
return bucket
}
if stat.cycle < leastCycle {
leastCycle = stat.cycle
nextBucket = bucket
}
}

return nextBucket
}

// Mark a bucket as done in a specific erasure set - returns true if successful,
// false if the bucket is already in a scanning phase
func (mgr *bucketsScanMgr) markBucketScanStarted(id int, bucket string, cycle uint32) bool {
mgr.mu.Lock()
defer mgr.mu.Unlock()

m, _ := mgr.internal[id][bucket]
if m.ongoing {
return false
}

m.ongoing = true
m.cycle = cycle
mgr.internal[id][bucket] = m
return true
}

// Mark a bucket as done in a specific erasure set
func (mgr *bucketsScanMgr) markBucketScanDone(id int, bucket string) {
mgr.mu.Lock()
defer mgr.mu.Unlock()

m, _ := mgr.internal[id][bucket]
m.ongoing = false
mgr.internal[id][bucket] = m
}
68 changes: 50 additions & 18 deletions cmd/data-scanner-metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type scannerMetrics struct {
currentPaths sync.Map

cycleInfoMu sync.Mutex
cycleInfo *currentScannerCycle
cycleInfo map[string]currentScannerCycle
}

var globalScannerMetrics scannerMetrics
var globalScannerMetrics = scannerMetrics{cycleInfo: make(map[string]currentScannerCycle)}

const (
// START Realtime metrics, that only to records
Expand Down Expand Up @@ -247,37 +247,69 @@ func (p *scannerMetrics) lastMinuteActions(a lifecycle.Action) AccElem {
return val
}

// setCycle updates the current cycle metrics.
func (p *scannerMetrics) setCycle(c *currentScannerCycle) {
if c != nil {
c2 := c.clone()
c = &c2
}
// addCycleEvent updates the current cycle metrics.
func (p *scannerMetrics) bucketScanStarted(bucket string, cycle uint32) {
p.cycleInfoMu.Lock()
p.cycleInfo = c
p.cycleInfoMu.Unlock()
defer p.cycleInfoMu.Unlock()

info := p.cycleInfo[bucket]
info.current = uint64(cycle)
info.started = time.Now()
info.ongoing = true

p.cycleInfo[bucket] = info
}

func (p *scannerMetrics) bucketScanFinished(bucket string) {
p.cycleInfoMu.Lock()
defer p.cycleInfoMu.Unlock()

info := p.cycleInfo[bucket]
info.ongoing = false
info.cycleCompleted = append(info.cycleCompleted, time.Now())
if len(info.cycleCompleted) > 10 {
info.cycleCompleted = info.cycleCompleted[len(info.cycleCompleted)-10:]
}
p.cycleInfo[bucket] = info
}

type currentScannerCycle struct {
current uint64
ongoing bool
started time.Time
cycleCompleted []time.Time
}

// clone returns a clone.
func (z currentScannerCycle) clone() currentScannerCycle {
z.cycleCompleted = append(make([]time.Time, 0, len(z.cycleCompleted)), z.cycleCompleted...)
return z
}

// getCycle returns the current cycle metrics.
// If not nil, the returned value can safely be modified.
func (p *scannerMetrics) getCycle() *currentScannerCycle {
func (p *scannerMetrics) getCycles() map[string]currentScannerCycle {
p.cycleInfoMu.Lock()
defer p.cycleInfoMu.Unlock()
if p.cycleInfo == nil {
return nil
}
c := p.cycleInfo.clone()
return &c
ret := make(map[string]currentScannerCycle, len(p.cycleInfo))
for bucket := range p.cycleInfo {
ret[bucket] = p.cycleInfo[bucket].clone()
}
return ret
}

func (p *scannerMetrics) report() madmin.ScannerMetrics {
var m madmin.ScannerMetrics
cycle := p.getCycle()
if cycle != nil {
m.CurrentCycle = cycle.current
m.CyclesCompletedAt = cycle.cycleCompleted
m.CurrentStarted = cycle.started

for _, cycle := range p.getCycles() {
if cycle.ongoing {
m.BucketsScanInfo.OngoingBuckets++
}
}

m.CollectedAt = time.Now()
m.ActivePaths = p.getCurrentPaths()
m.LifeTimeOps = make(map[string]uint64, scannerMetricLast)
Expand Down
64 changes: 5 additions & 59 deletions cmd/data-scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cmd

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -113,13 +112,13 @@ type backgroundHealInfo struct {
CurrentScanMode madmin.HealScanMode `json:"currentScanMode"`
}

func readBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer) backgroundHealInfo {
func readBackgroundHealInfo(ctx context.Context, objAPI erasureObjects, bucketName string) backgroundHealInfo {
if globalIsErasureSD {
return backgroundHealInfo{}
}

// Get last healing information
buf, err := readConfig(ctx, objAPI, backgroundHealInfoPath)
buf, err := readConfig(ctx, objAPI, bucketMetaPrefix+SlashSeparator+bucketName+SlashSeparator+backgroundHealInfoName)
if err != nil {
if !errors.Is(err, errConfigNotFound) {
logger.LogIf(ctx, err)
Expand All @@ -133,7 +132,7 @@ func readBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer) backgroundH
return info
}

func saveBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer, info backgroundHealInfo) {
func saveBackgroundHealInfo(ctx context.Context, objAPI erasureObjects, bucketName string, info backgroundHealInfo) {
if globalIsErasureSD {
return
}
Expand All @@ -144,7 +143,7 @@ func saveBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer, info backgr
return
}
// Get last healing information
err = saveConfig(ctx, objAPI, backgroundHealInfoPath, b)
err = saveConfig(ctx, objAPI, bucketMetaPrefix+SlashSeparator+bucketName+SlashSeparator+backgroundHealInfoName, b)
if err != nil {
logger.LogIf(ctx, err)
}
Expand All @@ -157,22 +156,8 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
ctx, cancel := globalLeaderLock.GetLock(ctx)
defer cancel()

// Load current bloom cycle
var cycleInfo currentScannerCycle

buf, _ := readConfig(ctx, objAPI, dataUsageBloomNamePath)
if len(buf) == 8 {
cycleInfo.next = binary.LittleEndian.Uint64(buf)
} else if len(buf) > 8 {
cycleInfo.next = binary.LittleEndian.Uint64(buf[:8])
buf = buf[8:]
_, err := cycleInfo.UnmarshalMsg(buf)
logger.LogIf(ctx, err)
}

scannerTimer := time.NewTimer(scannerCycle.Load())
defer scannerTimer.Stop()
defer globalScannerMetrics.setCycle(nil)

for {
select {
Expand All @@ -183,49 +168,10 @@ func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
// If scanner takes longer we start at once.
scannerTimer.Reset(scannerCycle.Load())

stopFn := globalScannerMetrics.log(scannerMetricScanCycle)
cycleInfo.current = cycleInfo.next
cycleInfo.started = time.Now()
globalScannerMetrics.setCycle(&cycleInfo)

bgHealInfo := readBackgroundHealInfo(ctx, objAPI)
scanMode := getCycleScanMode(cycleInfo.current, bgHealInfo.BitrotStartCycle, bgHealInfo.BitrotStartTime)
if bgHealInfo.CurrentScanMode != scanMode {
newHealInfo := bgHealInfo
newHealInfo.CurrentScanMode = scanMode
if scanMode == madmin.HealDeepScan {
newHealInfo.BitrotStartTime = time.Now().UTC()
newHealInfo.BitrotStartCycle = cycleInfo.current
}
saveBackgroundHealInfo(ctx, objAPI, newHealInfo)
}

// Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results)
err := objAPI.NSScanner(ctx, results, uint32(cycleInfo.current), scanMode)
logger.LogIf(ctx, err)
res := map[string]string{"cycle": strconv.FormatUint(cycleInfo.current, 10)}
if err != nil {
res["error"] = err.Error()
}
stopFn(res)
if err == nil {
// Store new cycle...
cycleInfo.next++
cycleInfo.current = 0
cycleInfo.cycleCompleted = append(cycleInfo.cycleCompleted, time.Now())
if len(cycleInfo.cycleCompleted) > dataUsageUpdateDirCycles {
cycleInfo.cycleCompleted = cycleInfo.cycleCompleted[len(cycleInfo.cycleCompleted)-dataUsageUpdateDirCycles:]
}
globalScannerMetrics.setCycle(&cycleInfo)
tmp := make([]byte, 8, 8+cycleInfo.Msgsize())
// Cycle for backward compat.
binary.LittleEndian.PutUint64(tmp, cycleInfo.next)
tmp, _ = cycleInfo.MarshalMsg(tmp)
err = saveConfig(ctx, objAPI, dataUsageBloomNamePath, tmp)
logger.LogIf(ctx, err)
}
logger.LogIf(ctx, objAPI.NSScanner(ctx, results))
}
}
}
Expand Down

0 comments on commit 3ef62f4

Please sign in to comment.