Skip to content

Commit efbacca

Browse files
committed
core: revise chunk manifest loading (completed vs partial)
* add LoadCompleted() and LoadPartial() methods - remove fallback logic from file I/O - callers now specify intent explicitly * add cos.ValidateManifestID() for consistent upload ID validation * amend packed size estimation * update tests to use revised APIs * with refactoring and cleanup ---- * part thirteen, prev. commit: 9bad95b Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent 9bad95b commit efbacca

7 files changed

Lines changed: 227 additions & 215 deletions

File tree

‎ais/s3/mpt.go‎

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ func ListUploads(all []*core.Ufest, bckName, idMarker string, maxUploads int) *L
1919

2020
// filter by bucket
2121
for _, manifest := range all {
22-
if bckName == "" || manifest.Lom.Bck().Name == bckName {
22+
lom := manifest.Lom()
23+
if bckName == "" || lom.Bck().Name == bckName {
2324
results = append(results, UploadInfoResult{
24-
Key: manifest.Lom.ObjName,
25+
Key: lom.ObjName,
2526
UploadID: manifest.ID,
26-
Initiated: manifest.Created,
27+
Initiated: manifest.Created(),
2728
})
2829
}
2930
}

‎ais/tgtmpt.go‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ func (ups *ups) init(id string, lom *core.LOM, rmd map[string]string) error {
5050
}
5151

5252
func (ups *ups) _add(id string, manifest *core.Ufest, rmd map[string]string) {
53-
debug.Assert(manifest.Lom != nil)
53+
debug.Assert(manifest.Lom() != nil)
5454
_, ok := ups.m[id]
55-
debug.Assert(!ok, "duplicated upload ID: ", id, " ", manifest.Lom.Cname())
55+
debug.Assert(!ok, "duplicated upload ID: ", id, " ", manifest.Lom().Cname())
5656
ups.m[id] = up{manifest, rmd}
5757
}
5858

@@ -72,13 +72,13 @@ func (ups *ups) getWithMeta(id string) (manifest *core.Ufest, rmd map[string]str
7272
}
7373

7474
// NOTE: must be called with ups and lom both unlocked
75-
func (ups *ups) fromFS(id string, lom *core.LOM, add bool) (manifest *core.Ufest, err error) {
75+
func (ups *ups) loadPartial(id string, lom *core.LOM, add bool) (manifest *core.Ufest, err error) {
7676
debug.Assert(lom.IsLocked() == apc.LockNone, "expecting not locked: ", lom.Cname())
7777

7878
lom.Lock(false)
7979
defer lom.Unlock(false)
8080
manifest = core.NewUfest(id, lom, true /*must-exist*/)
81-
if err = manifest.Load(lom); err == nil && add {
81+
if err = manifest.LoadPartial(lom); err == nil && add {
8282
ups.Lock()
8383
ups._add(id, manifest, nil /*remote metadata <= LOM custom*/)
8484
ups.Unlock()
@@ -92,7 +92,7 @@ func (ups *ups) abort(id string, lom *core.LOM) (ecode int, err error) {
9292
up, ok := ups.m[id]
9393
if !ok {
9494
ups.Unlock()
95-
manifest, err = ups.fromFS(id, lom, false /*add*/)
95+
manifest, err = ups.loadPartial(id, lom, false /*add*/)
9696
if err != nil {
9797
return 0, err
9898
}

‎ais/tgts3mpt.go‎

Lines changed: 8 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,42 +4,17 @@
44
*/
55
package ais
66

7-
// NOTE -- TODO: S3 Multipart Upload Implementation
8-
//
97
// 1. in-memory state
108
// - active uploads kept purely in-memory (ups map)
119
// - no support for target restart recovery during active uploads
12-
// - if target restarts during upload, client must restart the entire upload
13-
// - this simplifies design and avoids complex state recovery mechanisms
14-
// - rationale: most multipart uploads complete within hours; target restarts
15-
// are rare operational events. the complexity of persistent state recovery
16-
// outweighs the benefit for the typical use case.
17-
//
18-
// 2. cleanup
19-
// - no automatic cleanup of abandoned uploads
20-
// - uploads remain in memory until explicitly completed or aborted
21-
//
22-
// 3. parts ordering
23-
// - completeMpt enforces strictly ascending, contiguous parts (1..n)
24-
// - this is stricter than aws s3, which allows gaps and sparse part sets
25-
// - rationale: simplifies chunk manifest structure and append logic;
26-
// most s3 clients upload parts sequentially anyway
10+
// (if target restarts during upload, client must restart the entire upload)
2711
//
28-
// 4. concurrency
29-
// - global rwmutex for upload cache operations
30-
// - individual manifest mutexes for chunk operations
31-
// - tradeoff: simple consistency vs potential lock contention at scale
12+
// 2. t.completeMpt() locks/unlocks two times - consider CoW
3213
//
33-
// 5. error handling
34-
// - fail fast on inconsistencies (size mismatches, missing parts)
35-
// - atomic operations where possible (chunk replacement, manifest storage)
36-
// - orphaned chunks cleaned up only on explicit abort, not on errors
14+
// 3. TODO cleanup; orphan chunks, abandoned (partial) manifests
3715
//
38-
// 6. persistence
39-
// - chunk manifests persisted to xattr only after successful completion
40-
// - used later for individual chunk access (getMptPart)
41-
// - not used for active upload state recovery
42-
// - failed/aborted uploads clean up chunks but don't persist manifests
16+
// 4. parts ordering
17+
// - Add(chunk) keeps chunks ("parts") sorted
4318

4419
import (
4520
"errors"
@@ -246,7 +221,7 @@ func (t *target) putMptPart(w http.ResponseWriter, r *http.Request, items []stri
246221
debug.Assert(len(chunk.MD5) == 16, len(chunk.MD5))
247222
}
248223
if checkPartSHA {
249-
chunk.Cksum = &cksumSHA.Cksum
224+
chunk.SetCksum(&cksumSHA.Cksum)
250225
}
251226

252227
// - see NOTE above in re "active uploads in memory"
@@ -498,7 +473,7 @@ func (t *target) listMptParts(w http.ResponseWriter, r *http.Request, bck *meta.
498473
manifest := t.ups.get(uploadID)
499474
if manifest == nil {
500475
var err error
501-
if manifest, err = t.ups.fromFS(uploadID, lom, true /*add*/); err != nil {
476+
if manifest, err = t.ups.loadPartial(uploadID, lom, true /*add*/); err != nil {
502477
s3.WriteMptErr(w, r, s3.NewErrNoSuchUpload(uploadID, err), http.StatusNotFound, lom, uploadID)
503478
return
504479
}
@@ -563,7 +538,7 @@ func (t *target) getMptPart(w http.ResponseWriter, r *http.Request, bck *meta.Bc
563538

564539
// load chunk manifest and find out the part num's offset & size
565540
manifest := core.NewUfest("", lom, true /*must-exist*/)
566-
err = manifest.Load(lom)
541+
err = manifest.LoadCompleted(lom)
567542
if err != nil {
568543
s3.WriteErr(w, r, err, http.StatusNotFound)
569544
return

‎cmn/cos/uuid.go‎

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,31 @@ func GenTestingDaemonID(suffix string) string {
124124
return CryptoRandS(l) + suffix
125125
}
126126

127+
//
128+
// chunk manifest ID
129+
//
130+
131+
func ValidateManifestID(id string) error {
132+
const (
133+
lmin = 8
134+
lmax = 128
135+
)
136+
l := len(id)
137+
if l < lmin {
138+
return fmt.Errorf("chunk manifest ID %q is too short (expecting >= %d)", id, lmin)
139+
}
140+
if l > lmax {
141+
return fmt.Errorf("chunk manifest ID %q is too long (expecting <= %d)", id, lmax)
142+
}
143+
for i := range l {
144+
c := id[i]
145+
if c < 32 || c > 126 || c == '/' || c == '\\' {
146+
return fmt.Errorf("chunk manifest ID %q contains invalid character at position %d (expecting ASCII)", id, i)
147+
}
148+
}
149+
return nil
150+
}
151+
127152
//
128153
// utility functions
129154
//

0 commit comments

Comments
 (0)