From 1324e37ed1646142b3b10b85cddb4d4c61fa7b4d Mon Sep 17 00:00:00 2001 From: char-1ee Date: Tue, 27 Feb 2024 04:53:34 -0700 Subject: [PATCH] Fix listener --- ctriface/iface.go | 13 ++++-- ctriface/iface_test.go | 3 -- ctriface/orch.go | 7 ++- lg/uni_logger.go | 2 +- memory/manager/manager.go | 80 +++++++++++--------------------- memory/manager/snapshot_state.go | 61 +++++++++++++++--------- 6 files changed, 82 insertions(+), 84 deletions(-) diff --git a/ctriface/iface.go b/ctriface/iface.go index 298546742..a185281e1 100644 --- a/ctriface/iface.go +++ b/ctriface/iface.go @@ -25,6 +25,7 @@ package ctriface import ( "context" "encoding/json" + "net" "os" "os/exec" "path/filepath" @@ -504,6 +505,8 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID BackendType: fileBackend, BackendPath: snap.GetMemFilePath(), } + + var uffdConn *net.UnixConn if o.GetUPFEnabled() { logger.Debug("TEST: UPF is enabled") @@ -517,6 +520,10 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID if err := o.memoryManager.FetchState(originVmID); err != nil { return nil, nil, err } + + if uffdConn, err = o.memoryManager.ListenUffd(originVmID, o.uffdSockAddr); err != nil { + return nil, nil, err + } } tStart = time.Now() @@ -556,7 +563,6 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID }() logger.Debug("TEST: CreatVM request sent") - <-loadDone if o.GetUPFEnabled() { @@ -571,18 +577,19 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID VMMStatePath: o.getSnapshotFile(vmID), WorkingSetPath: o.getWorkingSetFile(vmID), InstanceSockAddr: o.uffdSockAddr, + UffdConn: uffdConn, } if err := o.memoryManager.RegisterVMFromSnap(originVmID, stateCfg); err != nil { logger.Error(err, "failed to register new VM with memory manager") } logger.Debug("TEST: activate VM in mm") - if activateErr = o.memoryManager.Activate(vmID); activateErr != nil { + if activateErr = o.memoryManager.Activate(vmID, uffdConn); activateErr != nil { logger.Warn("Failed to activate VM in the memory manager", activateErr) } } - // <-loadDone + <-loadDone loadSnapshotMetric.MetricMap[metrics.LoadVMM] = metrics.ToUS(time.Since(tStart)) diff --git a/ctriface/iface_test.go b/ctriface/iface_test.go index d2b7c7e8b..3ab93a94c 100644 --- a/ctriface/iface_test.go +++ b/ctriface/iface_test.go @@ -33,8 +33,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "github.com/vhive-serverless/vhive/snapshotting" - - "github.com/vhive-serverless/vhive/lg" ) // TODO: Make it impossible to use lazy mode without UPF @@ -105,7 +103,6 @@ func TestStartSnapStop(t *testing.T) { require.NoError(t, err, "Failed to load snapshot of VM") log.Debug("TEST: LoadSnapshot completed") - lg.UniLogger.Println("This is a test") _, err = orch.ResumeVM(ctx, vmID) require.NoError(t, err, "Failed to resume VM") diff --git a/ctriface/orch.go b/ctriface/orch.go index 33394c4c2..a8042fd9f 100644 --- a/ctriface/orch.go +++ b/ctriface/orch.go @@ -34,6 +34,7 @@ import ( "github.com/vhive-serverless/vhive/devmapper" log "github.com/sirupsen/logrus" + lg "github.com/vhive-serverless/vhive/lg" "github.com/containerd/containerd" @@ -129,12 +130,16 @@ func NewOrchestrator(snapshotter, hostIface string, opts ...OrchestratorOption) } defer file.Close() + lg.UniLogger.Println("TEST: created the uffd sock addr") + managerCfg := manager.MemoryManagerCfg{ MetricsModeOn: o.isMetricsMode, UffdSockAddr: o.uffdSockAddr, } o.memoryManager = manager.NewMemoryManager(managerCfg) - go o.memoryManager.ListenUffdSocket(o.uffdSockAddr) + + // lg.UniLogger.Println("TEST: created a new memory manager. Start listen uffd socket") + // go o.memoryManager.ListenUffdSocket(o.uffdSockAddr) } log.Info("Creating containerd client") diff --git a/lg/uni_logger.go b/lg/uni_logger.go index b8decb6a5..f626cf650 100644 --- a/lg/uni_logger.go +++ b/lg/uni_logger.go @@ -9,7 +9,7 @@ import ( var UniLogger *log.Logger func init() { - file, err := os.OpenFile("output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + file, err := os.OpenFile("uni_output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatalln("Failed to open log file:", err) } diff --git a/memory/manager/manager.go b/memory/manager/manager.go index b694abe8c..8714025af 100644 --- a/memory/manager/manager.go +++ b/memory/manager/manager.go @@ -149,7 +149,7 @@ func (m *MemoryManager) DeregisterVM(vmID string) error { } // Activate Creates an epoller to serve page faults for the VM -func (m *MemoryManager) Activate(vmID string) error { +func (m *MemoryManager) Activate(vmID string, conn *net.UnixConn) error { logger := log.WithFields(log.Fields{"vmID": vmID}) logger.Debug("Activating instance in the memory manager") @@ -164,14 +164,6 @@ func (m *MemoryManager) Activate(vmID string) error { logger.Debug("TEST: Activate: fetch snapstate by vmID for UFFD") - // originID, ok := m.origins[vmID] - - // if !ok { - // logger.Debug("TEST: not loaded from snapshot") - // } - - // state, ok = m.instances[originID] - state, ok = m.instances[vmID] if !ok { @@ -187,29 +179,21 @@ func (m *MemoryManager) Activate(vmID string) error { return errors.New("VM already active") } - select { - case <-m.startEpollingCh: - if err := state.mapGuestMemory(); err != nil { - logger.Error("Failed to map guest memory") - return err - } - - if err := state.getUFFD(); err != nil { - logger.Error("Failed to get uffd") - return err - } + if err := state.mapGuestMemory(); err != nil { + logger.Error("Failed to map guest memory") + return err + } - state.setupStateOnActivate() + if err := state.getUFFD(conn); err != nil { + logger.Error("Failed to get uffd") + return err + } - go state.pollUserPageFaults(readyCh) + state.setupStateOnActivate() - <-readyCh + go state.pollUserPageFaults(readyCh) - case <-time.After(100 * time.Second): - return errors.New("Uffd connection to firecracker timeout") - default: - return errors.New("Failed to start epoller") - } + <-readyCh return nil } @@ -409,37 +393,25 @@ func (m *MemoryManager) GetUPFLatencyStats(vmID string) ([]*metrics.Metric, erro return state.latencyMetrics, nil } -func (m *MemoryManager) ListenUffdSocket(uffdSockAddr string) error { - log.Debug("Start listening to uffd socket") +func (m *MemoryManager) ListenUffd(vmID string, uffdSockAddr string) (*net.UnixConn, error) { + logger := log.WithFields(log.Fields{"vmID": vmID}) - m.startEpollingOnce.Do(func() { - m.startEpollingCh = make(chan struct{}) - }) + logger.Debug("listening to uffd") - ln, err := net.Listen("unix", uffdSockAddr) - if err != nil { - log.Errorf("Failed to listen on uffd socket: %v", err) - return errors.New("Failed to listen on uffd socket") - } - defer ln.Close() + m.Lock() - for { - conn, err := ln.Accept() - if err != nil { - log.Printf("Failed to accept connection on uffd socket: %v", err) - continue - } - go func(conn net.Conn) { - defer conn.Close() - if err := ln.Close(); err != nil { - log.Printf("Failed to close uffd socket listener: %v", err) - } - close(m.startEpollingCh) - }(conn) - break + state, ok := m.instances[vmID] + if !ok { + m.Unlock() + logger.Error("VM not registered with the memory manager") + return nil, errors.New("VM not registered with the memory manager") } - return nil + m.Unlock() + + conn, _ := state.ListenUffdSocket(uffdSockAddr) + + return conn, nil } // Deprecated diff --git a/memory/manager/snapshot_state.go b/memory/manager/snapshot_state.go index 34e041918..89c9adce0 100644 --- a/memory/manager/snapshot_state.go +++ b/memory/manager/snapshot_state.go @@ -28,7 +28,6 @@ package manager import "C" import ( - "context" "encoding/binary" "errors" "fmt" @@ -64,6 +63,7 @@ type SnapshotStateCfg struct { VMMStatePath, GuestMemPath, WorkingSetPath string + UffdConn *net.UnixConn InstanceSockAddr string BaseDir string // base directory for the instance MetricsPath string // path to csv file where the metrics should be stored @@ -134,37 +134,54 @@ func (s *SnapshotState) setupStateOnActivate() { } } -func (s *SnapshotState) getUFFD() error { - var d net.Dialer - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() +func (s *SnapshotState) ListenUffdSocket(uffdSockAddr string) (*net.UnixConn, error) { + log.Debug("Start listening to uffd socket") + ln, err := net.Listen("unix", uffdSockAddr) + if err != nil { + log.Errorf("Failed to listen on uffd socket: %v", err) + return nil, errors.New("Failed to listen on uffd socket") + } + // defer ln.Close() + + var uffdConn *net.UnixConn for { - c, err := d.DialContext(ctx, "unix", s.InstanceSockAddr) + lg.UniLogger.Println("Listening ...") + conn, err := ln.Accept() if err != nil { - if ctx.Err() != nil { - log.Error("Failed to dial within the context timeout") - return err - } - time.Sleep(1 * time.Millisecond) - continue + log.Error("Failed to accept connection") + return nil, err } - log.Debugf("TEST: Dial uffd socket done: %s", s.InstanceSockAddr) - defer c.Close() + sendfdConn, ok := conn.(*net.UnixConn) + if !ok { + log.Error("Failed to assert net.Conn to *net.UnixConn") + return nil, fmt.Errorf("failed to assert net.Conn to *net.UnixConn") + } + s.SnapshotStateCfg.UffdConn = sendfdConn + uffdConn = sendfdConn + break + // TODO: maybe need a synchronziation + } - sendfdConn := c.(*net.UnixConn) + return uffdConn, nil +} - fs, err := fd.Get(sendfdConn, 1, []string{"a file"}) - if err != nil { - log.Error("Failed to receive the uffd") - return err - } - s.userFaultFD = fs[0] +func (s *SnapshotState) getUFFD(conn *net.UnixConn) error { + // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + // defer cancel() - return nil + // sendfdConn := s.SnapshotStateCfg.UffdConn + sendfdConn := conn + fs, err := fd.Get(sendfdConn, 1, []string{"a file"}) + if err != nil { + log.Error("Failed to receive the uffd") + return err } + + s.userFaultFD = fs[0] + return nil } func (s *SnapshotState) processMetrics() {