Skip to content

Commit

Permalink
Merge pull request #54 from basenana/fix/webdavconflict
Browse files Browse the repository at this point in the history
update: invalid cache after append segment
  • Loading branch information
hyponet authored Aug 13, 2023
2 parents 1af588c + e5afc30 commit a5c263e
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 27 deletions.
2 changes: 2 additions & 0 deletions pkg/bio/bio.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Writer interface {
Close()
}

type InvalidCacheHook func(eid int64)

var (
fileChunkReaders = make(map[int64]Reader)
fileChunkWriters = make(map[int64]Writer)
Expand Down
18 changes: 10 additions & 8 deletions pkg/bio/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,15 @@ func (c *segReader) mergePage(ctx context.Context, pageIndex int64, page *pageNo

type chunkWriter struct {
*chunkReader
unready int32
writers map[int64]*segWriter
ref int32
commitLimit int32
writerMux sync.Mutex
unready int32
writers map[int64]*segWriter
ref int32
commitLimit int32
invalidCache InvalidCacheHook
writerMux sync.Mutex
}

func NewChunkWriter(reader Reader) Writer {
func NewChunkWriter(reader Reader, hook InvalidCacheHook) Writer {
r, ok := reader.(*chunkReader)
if !ok {
return nil
Expand All @@ -373,7 +374,7 @@ func NewChunkWriter(reader Reader) Writer {
defer fileChunkMux.Unlock()
w, ok := fileChunkWriters[r.entry.ID]
if !ok {
cw := &chunkWriter{chunkReader: r, writers: map[int64]*segWriter{}, ref: 1}
cw := &chunkWriter{chunkReader: r, writers: map[int64]*segWriter{}, ref: 1, invalidCache: hook}
fileChunkWriters[r.entry.ID] = cw
return cw
}
Expand Down Expand Up @@ -775,6 +776,7 @@ func (w *segWriter) commitSegment(ctx context.Context) {
chunkDiscardSegmentCounter.Inc()
continue
}
w.invalidCache(w.entry.ID)
w.invalidate(w.chunkID)
w.entry = &newObj.Metadata
w.uncommitted = w.uncommitted[1:]
Expand Down Expand Up @@ -989,7 +991,7 @@ func CompactChunksData(ctx context.Context, entry *types.Metadata, chunkStore me
// rebuild new sequential segment
if reader == nil {
reader = NewChunkReader(entry, chunkStore, dataStore)
writer = NewChunkWriter(reader)
writer = NewChunkWriter(reader, noneInvalidCache)
buf = make([]byte, fileChunkSize)
}
readN, err = reader.ReadAt(ctx, buf, chunkStart)
Expand Down
6 changes: 3 additions & 3 deletions pkg/bio/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Describe("TestChunkIO", func() {
It("init should be succeed", func() {
reader = NewChunkReader(&fakeObj.Metadata, chunkStore, dataStore)
Expect(reader).ShouldNot(BeNil())
writer = NewChunkWriter(reader)
writer = NewChunkWriter(reader, noneInvalidCache)
Expect(writer).ShouldNot(BeNil())
})

Expand Down Expand Up @@ -93,7 +93,7 @@ var _ = Describe("TestChunkRewrite", func() {
It("init should be succeed", func() {
reader = NewChunkReader(&fakeObj.Metadata, chunkStore, dataStore)
Expect(reader).ShouldNot(BeNil())
writer = NewChunkWriter(reader)
writer = NewChunkWriter(reader, noneInvalidCache)
Expect(writer).ShouldNot(BeNil())
})

Expand Down Expand Up @@ -215,7 +215,7 @@ var _ = Describe("TestChunkCompact", func() {
It("init should be succeed", func() {
reader = NewChunkReader(&fakeObj.Metadata, chunkStore, dataStore)
Expect(reader).ShouldNot(BeNil())
writer = NewChunkWriter(reader)
writer = NewChunkWriter(reader, noneInvalidCache)
Expect(writer).ShouldNot(BeNil())
})

Expand Down
2 changes: 2 additions & 0 deletions pkg/bio/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,5 @@ func buildCompactEvent(entry *types.Metadata) *types.Event {
Data: types.EventData{Metadata: entry},
}
}

func noneInvalidCache(eid int64) {}
11 changes: 9 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (c *controller) CreateEntry(ctx context.Context, parentId int64, attr types
return nil, types.ErrNameTooLong
}

c.logger.Debugw("create entry", "parent", parentId, "entryName", attr.Name)
entry, err := c.entry.CreateEntry(ctx, parentId, dentry.EntryAttr{
Name: attr.Name,
Kind: attr.Kind,
Expand All @@ -134,20 +135,21 @@ func (c *controller) CreateEntry(ctx context.Context, parentId int64, attr types
return entry, nil
}

func (c *controller) UpdateEntry(ctx context.Context, entryID int64, patch *types.Metadata) error {
func (c *controller) UpdateEntry(ctx context.Context, entryID int64, newContent *types.Metadata) error {
defer trace.StartRegion(ctx, "controller.UpdateEntry").End()
en, err := c.GetEntry(ctx, entryID)
if err != nil {
return err
}

c.logger.Debugw("update entry", "entry", entryID)
parent, err := c.entry.OpenGroup(ctx, en.ParentID)
if err != nil {
c.logger.Errorw("open group error", "parent", en.ParentID, "entry", entryID, "err", err)
return err
}

if err = parent.PatchEntry(ctx, entryID, patch); err != nil {
if err = parent.UpdateEntry(ctx, entryID, newContent); err != nil {
c.logger.Errorw("save entry error", "entry", entryID, "err", err)
return err
}
Expand All @@ -173,6 +175,7 @@ func (c *controller) DestroyEntry(ctx context.Context, parentId, entryId int64,
return types.ErrNoAccess
}

c.logger.Debugw("destroy entry", "parent", parentId, "entry", entryId)
err = c.entry.RemoveEntry(ctx, parentId, entryId)
if err != nil {
c.logger.Errorw("delete entry failed", "entry", entryId, "err", err.Error())
Expand Down Expand Up @@ -206,6 +209,7 @@ func (c *controller) MirrorEntry(ctx context.Context, srcId, dstParentId int64,
c.logger.Errorw("mirror entry failed", "src", srcId, "err", err.Error())
return nil, err
}
c.logger.Debugw("mirror entry", "src", srcId, "dstParent", dstParentId, "entry", entry.ID)

events.Publish(events.EntryActionTopic(events.TopicEntryActionFmt, events.ActionTypeMirror),
dentry.BuildEntryEvent(events.ActionTypeMirror, entry))
Expand Down Expand Up @@ -274,6 +278,7 @@ func (c *controller) ChangeEntryParent(ctx context.Context, targetId, oldParentI
existObjId = &eid
}

c.logger.Debugw("change entry parent", "target", targetId, "existObj", existObjId, "oldParent", oldParentId, "newParent", newParentId, "newName", newName)
err = c.entry.ChangeEntryParent(ctx, targetId, existObjId, oldParentId, newParentId, newName, dentry.ChangeParentAttr{
Replace: opt.Replace,
Exchange: opt.Exchange,
Expand All @@ -293,11 +298,13 @@ func (c *controller) GetEntryExtendField(ctx context.Context, id int64, fKey str

func (c *controller) SetEntryExtendField(ctx context.Context, id int64, fKey, fVal string) error {
defer trace.StartRegion(ctx, "controller.SetEntryExtendField").End()
c.logger.Debugw("set entry extend filed", "entry", id, "key", fKey)
return c.entry.SetEntryExtendField(ctx, id, fKey, fVal)
}

func (c *controller) RemoveEntryExtendField(ctx context.Context, id int64, fKey string) error {
defer trace.StartRegion(ctx, "controller.RemoveEntryExtendField").End()
c.logger.Debugw("remove entry extend filed", "entry", id, "key", fKey)
return c.entry.RemoveEntryExtendField(ctx, id, fKey)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/dentry/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func openFile(en *types.Metadata, attr Attr, store metastore.ObjectStore, cacheS
}
f.reader = bio.NewChunkReader(en, store.(metastore.ChunkStore), fileStorage)
if attr.Write {
f.writer = bio.NewChunkWriter(f.reader)
f.writer = bio.NewChunkWriter(f.reader, cacheStore.delEntryCache)
}
increaseOpenedFile(en.ID)
return f, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/dentry/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newMockFileEntry(name string) *types.Metadata {
}
en.Size = fileChunkSize * 4
en.Storage = storage.MemoryStorage
err = grp.PatchEntry(context.TODO(), en.ID, en)
err = grp.UpdateEntry(context.TODO(), en.ID, en)
if err != nil {
panic(fmt.Errorf("update file entry failed: %s", err))
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/dentry/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
type Group interface {
FindEntry(ctx context.Context, name string) (*types.Metadata, error)
CreateEntry(ctx context.Context, attr EntryAttr) (*types.Metadata, error)
PatchEntry(ctx context.Context, entryID int64, patch *types.Metadata) error
UpdateEntry(ctx context.Context, entryID int64, patch *types.Metadata) error
RemoveEntry(ctx context.Context, entryID int64) error
ListChildren(ctx context.Context) ([]*types.Metadata, error)
}
Expand All @@ -50,7 +50,7 @@ func (e emptyGroup) CreateEntry(ctx context.Context, attr EntryAttr) (*types.Met
return nil, types.ErrNoAccess
}

func (e emptyGroup) PatchEntry(ctx context.Context, entryID int64, patch *types.Metadata) error {
func (e emptyGroup) UpdateEntry(ctx context.Context, entryID int64, patch *types.Metadata) error {
return types.ErrNoAccess
}

Expand Down Expand Up @@ -143,7 +143,7 @@ func (g *stdGroup) CreateEntry(ctx context.Context, attr EntryAttr) (*types.Meta
return &obj.Metadata, nil
}

func (g *stdGroup) PatchEntry(ctx context.Context, entryId int64, patch *types.Metadata) error {
func (g *stdGroup) UpdateEntry(ctx context.Context, entryId int64, patch *types.Metadata) error {
defer trace.StartRegion(ctx, "dentry.stdGroup.UpdateEntry").End()
patch.ID = entryId
if err := g.cacheStore.updateEntries(ctx, patch); err != nil {
Expand Down Expand Up @@ -243,7 +243,7 @@ func (e *extGroup) CreateEntry(ctx context.Context, attr EntryAttr) (*types.Meta
return e.syncEntry(ctx, mirrorEn, en)
}

func (e *extGroup) PatchEntry(ctx context.Context, entryId int64, patch *types.Metadata) error {
func (e *extGroup) UpdateEntry(ctx context.Context, entryId int64, patch *types.Metadata) error {
group, err := e.stdGroup.cacheStore.getEntry(ctx, e.stdGroup.entryID)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/dentry/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ var _ = Describe("TestManageGroupEntry", func() {
Expect(err).Should(BeNil())

file2.Name = "test_group_manage_file3"
err = grp.PatchEntry(context.TODO(), file2.ID, file2)
err = grp.UpdateEntry(context.TODO(), file2.ID, file2)
Expect(err).Should(BeNil())

_, err = grp.FindEntry(context.TODO(), "test_group_manage_file2")
Expand Down
4 changes: 2 additions & 2 deletions pkg/dentry/instrumental.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func (i instrumentalGroup) CreateEntry(ctx context.Context, attr EntryAttr) (*ty
return en, logOperationError(groupOperationErrorCounter, operation, err)
}

func (i instrumentalGroup) PatchEntry(ctx context.Context, entryId int64, patch *types.Metadata) error {
func (i instrumentalGroup) UpdateEntry(ctx context.Context, entryId int64, patch *types.Metadata) error {
const operation = "patch_entry"
defer logOperationLatency(groupOperationLatency, operation, time.Now())
err := i.grp.PatchEntry(ctx, entryId, patch)
err := i.grp.UpdateEntry(ctx, entryId, patch)
return logOperationError(groupOperationErrorCounter, operation, err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/dentry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (m *manager) ChangeEntryParent(ctx context.Context, targetEntryId int64, ov
defer entryLifecycleLock.Unlock()
err = m.metastore.ChangeParent(ctx, &types.Object{Metadata: *oldParent}, &types.Object{Metadata: *newParent}, &types.Object{Metadata: *target}, types.ChangeParentOption{Name: newName})
if err != nil {
m.logger.Errorw("change object parent failed", "entry", target, "newParent", newParentId, "newName", newName, "err", err)
m.logger.Errorw("change object parent failed", "entry", target.ID, "newParent", newParentId, "newName", newName, "err", err)
return err
}
m.cache.delEntryCache(oldParent.ID)
Expand Down
8 changes: 4 additions & 4 deletions pkg/metastore/db/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func (l *Logger) Error(ctx context.Context, s string, i ...interface{}) {
}

func (l *Logger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) {
sqlContent, rows := fc()
l.Debugw("trace sql", "sql", sqlContent, "rows", rows, "err", err)
switch {
case err != nil && err != gorm.ErrRecordNotFound && err != context.Canceled:
sqlContent, rows := fc()
l.Warnw("trace error", "sql", sqlContent, "rows", rows, "err", err)
case time.Since(begin) > time.Second || logger.Debug:
sqlContent, rows := fc()
l.Infow("trace sql", "sql", sqlContent, "rows", rows, "err", err, "slow", time.Since(begin) > time.Second)
case time.Since(begin) > time.Second:
l.Infow("slow sql", "sql", sqlContent, "rows", rows, "err", err)
}
}

Expand Down

0 comments on commit a5c263e

Please sign in to comment.