Skip to content

Commit

Permalink
Fix listener
Browse files Browse the repository at this point in the history
  • Loading branch information
char-1ee committed Feb 27, 2024
1 parent e8092e4 commit 1324e37
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 84 deletions.
13 changes: 10 additions & 3 deletions ctriface/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package ctriface
import (
"context"
"encoding/json"
"net"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -504,6 +505,8 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
BackendType: fileBackend,
BackendPath: snap.GetMemFilePath(),
}

var uffdConn *net.UnixConn

if o.GetUPFEnabled() {
logger.Debug("TEST: UPF is enabled")
Expand All @@ -517,6 +520,10 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
if err := o.memoryManager.FetchState(originVmID); err != nil {
return nil, nil, err
}

if uffdConn, err = o.memoryManager.ListenUffd(originVmID, o.uffdSockAddr); err != nil {
return nil, nil, err
}
}

tStart = time.Now()
Expand Down Expand Up @@ -556,7 +563,6 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
}()

logger.Debug("TEST: CreatVM request sent")
<-loadDone

if o.GetUPFEnabled() {

Expand All @@ -571,18 +577,19 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
VMMStatePath: o.getSnapshotFile(vmID),
WorkingSetPath: o.getWorkingSetFile(vmID),
InstanceSockAddr: o.uffdSockAddr,
UffdConn: uffdConn,
}
if err := o.memoryManager.RegisterVMFromSnap(originVmID, stateCfg); err != nil {
logger.Error(err, "failed to register new VM with memory manager")
}

logger.Debug("TEST: activate VM in mm")
if activateErr = o.memoryManager.Activate(vmID); activateErr != nil {
if activateErr = o.memoryManager.Activate(vmID, uffdConn); activateErr != nil {
logger.Warn("Failed to activate VM in the memory manager", activateErr)
}
}

// <-loadDone
<-loadDone

loadSnapshotMetric.MetricMap[metrics.LoadVMM] = metrics.ToUS(time.Since(tStart))

Expand Down
3 changes: 0 additions & 3 deletions ctriface/iface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/vhive-serverless/vhive/snapshotting"

"github.com/vhive-serverless/vhive/lg"
)

// TODO: Make it impossible to use lazy mode without UPF
Expand Down Expand Up @@ -105,7 +103,6 @@ func TestStartSnapStop(t *testing.T) {
require.NoError(t, err, "Failed to load snapshot of VM")

log.Debug("TEST: LoadSnapshot completed")
lg.UniLogger.Println("This is a test")
_, err = orch.ResumeVM(ctx, vmID)
require.NoError(t, err, "Failed to resume VM")

Expand Down
7 changes: 6 additions & 1 deletion ctriface/orch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/vhive-serverless/vhive/devmapper"

log "github.com/sirupsen/logrus"
lg "github.com/vhive-serverless/vhive/lg"

"github.com/containerd/containerd"

Expand Down Expand Up @@ -129,12 +130,16 @@ func NewOrchestrator(snapshotter, hostIface string, opts ...OrchestratorOption)
}
defer file.Close()

lg.UniLogger.Println("TEST: created the uffd sock addr")

managerCfg := manager.MemoryManagerCfg{
MetricsModeOn: o.isMetricsMode,
UffdSockAddr: o.uffdSockAddr,
}
o.memoryManager = manager.NewMemoryManager(managerCfg)
go o.memoryManager.ListenUffdSocket(o.uffdSockAddr)

// lg.UniLogger.Println("TEST: created a new memory manager. Start listen uffd socket")
// go o.memoryManager.ListenUffdSocket(o.uffdSockAddr)
}

log.Info("Creating containerd client")
Expand Down
2 changes: 1 addition & 1 deletion lg/uni_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
var UniLogger *log.Logger

func init() {
file, err := os.OpenFile("output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
file, err := os.OpenFile("uni_output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalln("Failed to open log file:", err)
}
Expand Down
80 changes: 26 additions & 54 deletions memory/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (m *MemoryManager) DeregisterVM(vmID string) error {
}

// Activate Creates an epoller to serve page faults for the VM
func (m *MemoryManager) Activate(vmID string) error {
func (m *MemoryManager) Activate(vmID string, conn *net.UnixConn) error {
logger := log.WithFields(log.Fields{"vmID": vmID})

logger.Debug("Activating instance in the memory manager")
Expand All @@ -164,14 +164,6 @@ func (m *MemoryManager) Activate(vmID string) error {

logger.Debug("TEST: Activate: fetch snapstate by vmID for UFFD")

// originID, ok := m.origins[vmID]

// if !ok {
// logger.Debug("TEST: not loaded from snapshot")
// }

// state, ok = m.instances[originID]

state, ok = m.instances[vmID]

if !ok {
Expand All @@ -187,29 +179,21 @@ func (m *MemoryManager) Activate(vmID string) error {
return errors.New("VM already active")
}

select {
case <-m.startEpollingCh:
if err := state.mapGuestMemory(); err != nil {
logger.Error("Failed to map guest memory")
return err
}

if err := state.getUFFD(); err != nil {
logger.Error("Failed to get uffd")
return err
}
if err := state.mapGuestMemory(); err != nil {
logger.Error("Failed to map guest memory")
return err
}

state.setupStateOnActivate()
if err := state.getUFFD(conn); err != nil {
logger.Error("Failed to get uffd")
return err
}

go state.pollUserPageFaults(readyCh)
state.setupStateOnActivate()

<-readyCh
go state.pollUserPageFaults(readyCh)

case <-time.After(100 * time.Second):
return errors.New("Uffd connection to firecracker timeout")
default:
return errors.New("Failed to start epoller")
}
<-readyCh

return nil
}
Expand Down Expand Up @@ -409,37 +393,25 @@ func (m *MemoryManager) GetUPFLatencyStats(vmID string) ([]*metrics.Metric, erro
return state.latencyMetrics, nil
}

func (m *MemoryManager) ListenUffdSocket(uffdSockAddr string) error {
log.Debug("Start listening to uffd socket")
func (m *MemoryManager) ListenUffd(vmID string, uffdSockAddr string) (*net.UnixConn, error) {
logger := log.WithFields(log.Fields{"vmID": vmID})

m.startEpollingOnce.Do(func() {
m.startEpollingCh = make(chan struct{})
})
logger.Debug("listening to uffd")

ln, err := net.Listen("unix", uffdSockAddr)
if err != nil {
log.Errorf("Failed to listen on uffd socket: %v", err)
return errors.New("Failed to listen on uffd socket")
}
defer ln.Close()
m.Lock()

for {
conn, err := ln.Accept()
if err != nil {
log.Printf("Failed to accept connection on uffd socket: %v", err)
continue
}
go func(conn net.Conn) {
defer conn.Close()
if err := ln.Close(); err != nil {
log.Printf("Failed to close uffd socket listener: %v", err)
}
close(m.startEpollingCh)
}(conn)
break
state, ok := m.instances[vmID]
if !ok {
m.Unlock()
logger.Error("VM not registered with the memory manager")
return nil, errors.New("VM not registered with the memory manager")
}

return nil
m.Unlock()

conn, _ := state.ListenUffdSocket(uffdSockAddr)

return conn, nil
}

// Deprecated
Expand Down
61 changes: 39 additions & 22 deletions memory/manager/snapshot_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ package manager
import "C"

Check failure on line 28 in memory/manager/snapshot_state.go

View workflow job for this annotation

GitHub Actions / Build and check code quality (1.19)

could not import C (cgo preprocessing failed) (typecheck)

Check failure on line 28 in memory/manager/snapshot_state.go

View workflow job for this annotation

GitHub Actions / Build and check code quality (1.18)

could not import C (cgo preprocessing failed) (typecheck)

import (
"context"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -64,6 +63,7 @@ type SnapshotStateCfg struct {

VMMStatePath, GuestMemPath, WorkingSetPath string

UffdConn *net.UnixConn
InstanceSockAddr string
BaseDir string // base directory for the instance
MetricsPath string // path to csv file where the metrics should be stored
Expand Down Expand Up @@ -134,37 +134,54 @@ func (s *SnapshotState) setupStateOnActivate() {
}
}

func (s *SnapshotState) getUFFD() error {
var d net.Dialer
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
func (s *SnapshotState) ListenUffdSocket(uffdSockAddr string) (*net.UnixConn, error) {
log.Debug("Start listening to uffd socket")
ln, err := net.Listen("unix", uffdSockAddr)
if err != nil {
log.Errorf("Failed to listen on uffd socket: %v", err)
return nil, errors.New("Failed to listen on uffd socket")
}
// defer ln.Close()

var uffdConn *net.UnixConn

for {
c, err := d.DialContext(ctx, "unix", s.InstanceSockAddr)
lg.UniLogger.Println("Listening ...")
conn, err := ln.Accept()
if err != nil {
if ctx.Err() != nil {
log.Error("Failed to dial within the context timeout")
return err
}
time.Sleep(1 * time.Millisecond)
continue
log.Error("Failed to accept connection")
return nil, err
}
log.Debugf("TEST: Dial uffd socket done: %s", s.InstanceSockAddr)

defer c.Close()
sendfdConn, ok := conn.(*net.UnixConn)
if !ok {
log.Error("Failed to assert net.Conn to *net.UnixConn")
return nil, fmt.Errorf("failed to assert net.Conn to *net.UnixConn")
}
s.SnapshotStateCfg.UffdConn = sendfdConn
uffdConn = sendfdConn
break
// TODO: maybe need a synchronziation
}

sendfdConn := c.(*net.UnixConn)
return uffdConn, nil
}

fs, err := fd.Get(sendfdConn, 1, []string{"a file"})
if err != nil {
log.Error("Failed to receive the uffd")
return err
}

s.userFaultFD = fs[0]
func (s *SnapshotState) getUFFD(conn *net.UnixConn) error {
// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// defer cancel()

return nil
// sendfdConn := s.SnapshotStateCfg.UffdConn
sendfdConn := conn
fs, err := fd.Get(sendfdConn, 1, []string{"a file"})
if err != nil {
log.Error("Failed to receive the uffd")
return err
}

s.userFaultFD = fs[0]
return nil
}

func (s *SnapshotState) processMetrics() {
Expand Down

0 comments on commit 1324e37

Please sign in to comment.