Skip to content

Commit fe855df

Browse files
committed
shard index: add bucket shard summary xaction
* scan source bucket's TAR objects and aggregate index/unindexed shard counts and sizes * expose running snapshots through the same xaction `Results()` model as the bucket-summary xaction * classify a TAR as a shard only when its existing shard index loads successfully Signed-off-by: Tony Chen <a122774007@gmail.com>
1 parent 6ef50e5 commit fe855df

9 files changed

Lines changed: 451 additions & 8 deletions

File tree

‎ais/proxy.go‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,8 @@ func (p *proxy) httpbckget(w http.ResponseWriter, r *http.Request, dpq *dpq) {
642642
}
643643

644644
switch {
645+
// TODO: add apc.ActSummaryShard handler here: proxy starts/queries the
646+
// shard-summary action via 2PC and aggregates target Result snapshots.
645647
case msg.Action == apc.ActSummaryBck:
646648
p.bgetSumm(w, r, qbck, msg, dpq)
647649
case msg.Action == apc.ActShowNBI:

‎ais/tgtbck.go‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ func (t *target) httpbckget(w http.ResponseWriter, r *http.Request, dpq *dpq) {
8484
// list objects
8585
t.bgetObjects(w, r, qbck, msg, dpq, phase)
8686

87+
// TODO: add apc.ActSummaryShard handler here: target begins/queries the
88+
// shard-summary xaction and returns Result snapshots to the proxy.
8789
case apc.ActSummaryBck:
8890
var bucket, phase string // txn
8991
if len(apiItems) == 0 {

‎api/apc/actmsg.go‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ const (
4040
ActMakeNCopies = "make-n-copies"
4141
ActPutCopies = "put-copies"
4242
ActRechunk = "rechunk"
43-
ActIndexShard = "index-shard"
43+
44+
ActIndexShard = "index-shard"
45+
ActSummaryShard = "summary-shard"
4446

4547
ActRebalance = "rebalance"
4648
ActMoveBck = "move-bck"

‎api/apc/shard_idx.go‎

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,26 @@
44
*/
55
package apc
66

7-
// IndexShardMsg is the control message for ActIndexShard xaction.
8-
type IndexShardMsg struct {
9-
Prefix string `json:"prefix,omitempty"` // only index shards whose name begins with Prefix
10-
NumWorkers int `json:"num-workers,omitempty"` // number of concurrent workers; (-1) none; (0) auto-computed (media type + load)
11-
SkipVerify bool `json:"skip-verify,omitempty"` // if shard already has an index, trust it without loading+verifying staleness (fast re-run)
12-
}
7+
type (
8+
// IndexShardMsg is the control message for ActIndexShard xaction.
9+
IndexShardMsg struct {
10+
Prefix string `json:"prefix,omitempty"` // only index shards whose name begins with Prefix
11+
NumWorkers int `json:"num-workers,omitempty"` // number of concurrent workers; (-1) none; (0) auto-computed (media type + load)
12+
SkipVerify bool `json:"skip-verify,omitempty"` // if shard already has an index, trust it without loading+verifying staleness (fast re-run)
13+
}
14+
// ShardSummMsg is the control message for ActSummaryShard.
15+
ShardSummMsg struct {
16+
UUID string `json:"uuid,omitempty"` // server-assigned on the first response; client echoes it back
17+
Prefix string `json:"prefix,omitempty"` // only include TAR objects whose name begins with Prefix
18+
}
19+
// ShardSummResult is the per-bucket local TAR/index coverage summary.
20+
ShardSummResult struct {
21+
TarObjs uint64 `json:"tar-objs,string"` // local TAR objects found
22+
TarSize uint64 `json:"tar-size,string"` // total size of local TAR objects
23+
Shards uint64 `json:"shards,string"` // local TAR objects with a valid shard index
24+
ShardSize uint64 `json:"shard-size,string"` // total size of valid indexed shards
25+
ArchivedObjs uint64 `json:"archived-objs,string"` // total archived objects across valid indexed shards
26+
StaleIndexes uint64 `json:"stale-indexes,string"` // # TAR objects whose shard index is stale
27+
InvalidIndexes uint64 `json:"invalid-indexes,string"` // # TAR objects whose shard index failed to load
28+
}
29+
)

‎xact/api_table.go‎

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,21 @@ var Table = map[string]Descriptor{
117117
Metasync: false,
118118
// ICMode: ICNone - synchronous; proxy aggregates per-target results and returns to client
119119
},
120+
apc.ActSummaryShard: {
121+
DisplayName: "shard-summary",
122+
Scope: ScopeB,
123+
Access: apc.AceObjLIST,
124+
Startable: false,
125+
Metasync: false,
126+
},
120127

121128
// single target (node)
122129
apc.ActResilver: {Scope: ScopeT, Startable: true, Resilver: true}, // ICMode: ICNone - ScopeT, single-target, no aggregation
123130
apc.ActRechunk: {Scope: ScopeB, Startable: true, RefreshCap: true, ConflictRebRes: true, AbortByReb: true, ICMode: ICUponTerm},
124131

125132
// IndexShard is a best-effort build: stale entries are detected via LOM checksum
126133
// and fall back to tar.Next() scan. A partial index remains useful, and resumed
127-
// builds atomically skip already-indexed LOMs (lom.md.flags&Indexed + index file).
134+
// builds atomically skip already-indexed LOMs (lom.md.flags&Indexed + index object).
128135
apc.ActIndexShard: {Scope: ScopeB, Startable: true, RefreshCap: false, ConflictRebRes: true, AbortByReb: false, ICMode: ICUponTerm},
129136

130137
// on-demand EC and n-way replication

‎xact/xreg/bucket.go‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ func RenewBckShardIndex(bck *meta.Bck, uuid string, msg *apc.IndexShardMsg) Rene
8989
return RenewBucketXact(apc.ActIndexShard, bck, Args{Custom: msg, UUID: uuid})
9090
}
9191

92+
func RenewBckShardSumm(bck *meta.Bck, msg *apc.ShardSummMsg) RenewRes {
93+
return RenewBucketXact(apc.ActSummaryShard, bck, Args{Custom: msg, UUID: msg.UUID})
94+
}
95+
9296
func RenewPutMirror(lom *core.LOM) RenewRes {
9397
return RenewBucketXact(apc.ActPutCopies, lom.Bck(), Args{Custom: lom})
9498
}

‎xact/xs/init.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func Tinit(coi COI) {
5959
xreg.RegBckXact(&blobFactory{})
6060

6161
xreg.RegBckXact(&rechunkFactory{kind: apc.ActRechunk})
62+
xreg.RegBckXact(&shardSummFactory{})
6263
xreg.RegBckXact(&shardIndexFactory{kind: apc.ActIndexShard})
6364

6465
// assign COI singleton

‎xact/xs/shard_summ.go‎

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Package xs is a collection of eXtended actions (xactions), including multi-object
2+
// operations, list-objects, (cluster) rebalance and (target) resilver, ETL, and more.
3+
/*
4+
* Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
5+
*/
6+
package xs
7+
8+
import (
9+
"errors"
10+
"strconv"
11+
"sync"
12+
13+
"github.com/NVIDIA/aistore/api/apc"
14+
"github.com/NVIDIA/aistore/cmn"
15+
"github.com/NVIDIA/aistore/cmn/archive"
16+
"github.com/NVIDIA/aistore/cmn/atomic"
17+
"github.com/NVIDIA/aistore/cmn/cos"
18+
"github.com/NVIDIA/aistore/core"
19+
"github.com/NVIDIA/aistore/core/meta"
20+
"github.com/NVIDIA/aistore/xact"
21+
"github.com/NVIDIA/aistore/xact/xreg"
22+
)
23+
24+
type (
25+
shardSummFactory struct {
26+
xctn *XactShardSumm
27+
msg *apc.ShardSummMsg
28+
xreg.RenewBase
29+
}
30+
XactShardSumm struct {
31+
p *shardSummFactory
32+
// live counters; snapshot into apc.ShardSummResult in Result()
33+
nTarObjs atomic.Uint64
34+
nTarSize atomic.Uint64
35+
nShards atomic.Uint64
36+
nShardSize atomic.Uint64
37+
nArchivedObjs atomic.Uint64
38+
nStale atomic.Uint64
39+
nInvalid atomic.Uint64
40+
// TODO: include orphaned index count by sampling sysbucket indexes.
41+
xact.BckJogRunner
42+
}
43+
)
44+
45+
var (
46+
_ core.Xact = (*XactShardSumm)(nil)
47+
_ xreg.Renewable = (*shardSummFactory)(nil)
48+
)
49+
50+
func (*shardSummFactory) New(args xreg.Args, bck *meta.Bck) xreg.Renewable {
51+
return &shardSummFactory{
52+
RenewBase: xreg.RenewBase{Args: args, Bck: bck},
53+
msg: args.Custom.(*apc.ShardSummMsg),
54+
}
55+
}
56+
57+
func (p *shardSummFactory) Start() error {
58+
r := &XactShardSumm{p: p}
59+
err := r.BckJogRunner.Init(p.UUID(), p.Kind(), p.Bck, xact.BckJogRunnerOpts{
60+
CbObj: r.visit,
61+
Prefix: p.msg.Prefix,
62+
NumWorkers: xact.NwpNone,
63+
}, cmn.GCO.Get())
64+
if err != nil {
65+
return err
66+
}
67+
p.xctn = r
68+
xact.GoRunW(r)
69+
return nil
70+
}
71+
72+
func (*shardSummFactory) Kind() string { return apc.ActSummaryShard }
73+
func (p *shardSummFactory) Get() core.Xact { return p.xctn }
74+
75+
func (*shardSummFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) {
76+
return xreg.WprKeepAndStartNew, nil
77+
}
78+
79+
func (r *XactShardSumm) Run(wg *sync.WaitGroup) {
80+
wg.Done()
81+
r.BckJogRunner.Run()
82+
if err := r.BckJogRunner.Wait(); err != nil {
83+
r.AddErr(err)
84+
}
85+
r.Finish()
86+
}
87+
88+
func (r *XactShardSumm) visit(lom *core.LOM, _ []byte) error {
89+
mime, err := archive.Mime("", lom.ObjName)
90+
if err != nil || mime != archive.ExtTar {
91+
return nil
92+
}
93+
94+
lom.Lock(false)
95+
defer lom.Unlock(false)
96+
if err := lom.Load(false /*cache it*/, true /*locked*/); err != nil {
97+
if cos.IsNotExist(err) {
98+
return nil
99+
}
100+
return err
101+
}
102+
if lom.IsCopy() {
103+
return nil
104+
}
105+
106+
size := lom.Lsize()
107+
r.nTarObjs.Inc()
108+
r.nTarSize.Add(uint64(size))
109+
r.ObjsAdd(1, size)
110+
if !lom.HasShardIdx() {
111+
return nil
112+
}
113+
114+
idx, err := core.LoadShardIndex(lom)
115+
if err != nil {
116+
if errors.Is(err, archive.ErrShardIdxStale) {
117+
r.nStale.Inc()
118+
} else {
119+
r.nInvalid.Inc()
120+
}
121+
return nil
122+
}
123+
if idx == nil {
124+
return nil
125+
}
126+
r.nShards.Inc()
127+
r.nShardSize.Add(uint64(size))
128+
r.nArchivedObjs.Add(uint64(len(idx.Entries)))
129+
return nil
130+
}
131+
132+
func (r *XactShardSumm) Result() (*apc.ShardSummResult, error) {
133+
return &apc.ShardSummResult{
134+
TarObjs: r.nTarObjs.Load(),
135+
TarSize: r.nTarSize.Load(),
136+
Shards: r.nShards.Load(),
137+
ShardSize: r.nShardSize.Load(),
138+
ArchivedObjs: r.nArchivedObjs.Load(),
139+
StaleIndexes: r.nStale.Load(),
140+
InvalidIndexes: r.nInvalid.Load(),
141+
}, r.Err()
142+
}
143+
144+
func (r *XactShardSumm) Snap() *core.Snap { return r.Base.NewSnap(r) }
145+
146+
func (r *XactShardSumm) CtlMsg() string {
147+
if r.p == nil || r.p.msg == nil {
148+
return ""
149+
}
150+
var sb cos.SB
151+
sb.Init(160)
152+
if r.p.msg.Prefix != "" {
153+
idxAppend(&sb, "prefix", r.p.msg.Prefix)
154+
}
155+
tarObjs := r.nTarObjs.Load()
156+
shards := r.nShards.Load()
157+
idxAppend(&sb, "tar-objs", strconv.FormatUint(tarObjs, 10))
158+
idxAppend(&sb, "tar-size", strconv.FormatUint(r.nTarSize.Load(), 10))
159+
idxAppend(&sb, "shards", strconv.FormatUint(shards, 10))
160+
idxAppend(&sb, "shard-size", strconv.FormatUint(r.nShardSize.Load(), 10))
161+
idxAppend(&sb, "archived-objs", strconv.FormatUint(r.nArchivedObjs.Load(), 10))
162+
if n := r.nStale.Load(); n > 0 {
163+
idxAppend(&sb, "stale", strconv.FormatUint(n, 10))
164+
}
165+
if n := r.nInvalid.Load(); n > 0 {
166+
idxAppend(&sb, "invalid-index", strconv.FormatUint(n, 10))
167+
}
168+
if n := r.ErrCnt(); n > 0 {
169+
idxAppend(&sb, "errs", strconv.Itoa(n))
170+
}
171+
return sb.String()
172+
}

0 commit comments

Comments
 (0)