Skip to content

Commit

Permalink
Merge pull request #67 from named-data/varun/face
Browse files Browse the repository at this point in the history
Rework face API and fix transports
  • Loading branch information
pulsejet authored Dec 16, 2024
2 parents edafd12 + 7a26e1c commit cfcc88b
Show file tree
Hide file tree
Showing 22 changed files with 700 additions and 802 deletions.
75 changes: 75 additions & 0 deletions fw/executor/profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package executor

import (
"os"
"runtime"
"runtime/pprof"

"github.com/named-data/YaNFD/core"
)

type Profiler struct {
config *YaNFDConfig
cpuFile *os.File
memFile *os.File
block *pprof.Profile
}

func NewProfiler(config *YaNFDConfig) *Profiler {
return &Profiler{config: config}
}

func (p *Profiler) Start() (err error) {
if p.config.CpuProfile != "" {
p.cpuFile, err = os.Create(p.config.CpuProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for CPU profile: ", err)
}

core.LogInfo("Main", "Profiling CPU - outputting to ", p.config.CpuProfile)
pprof.StartCPUProfile(p.cpuFile)
}

if p.config.MemProfile != "" {
memProfileFile, err := os.Create(p.config.MemProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for memory profile: ", err)
}

core.LogInfo("Main", "Profiling memory - outputting to ", p.config.MemProfile)
runtime.GC()
if err := pprof.WriteHeapProfile(memProfileFile); err != nil {
core.LogFatal("Main", "Unable to write memory profile: ", err)
}
}

if p.config.BlockProfile != "" {
core.LogInfo("Main", "Profiling blocking operations - outputting to ", p.config.BlockProfile)
runtime.SetBlockProfileRate(1)
p.block = pprof.Lookup("block")
}

return
}

func (p *Profiler) Stop() {
if p.block != nil {
blockProfileFile, err := os.Create(p.config.BlockProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for block profile: ", err)
}
if err := p.block.WriteTo(blockProfileFile, 0); err != nil {
core.LogFatal("Main", "Unable to write block profile: ", err)
}
blockProfileFile.Close()
}

if p.memFile != nil {
p.memFile.Close()
}

if p.cpuFile != nil {
pprof.StopCPUProfile()
p.cpuFile.Close()
}
}
105 changes: 25 additions & 80 deletions fw/executor/yanfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ package executor
import (
"net"
"os"
"runtime"
"runtime/pprof"
"time"

"github.com/named-data/YaNFD/core"
Expand All @@ -38,15 +36,13 @@ type YaNFDConfig struct {
// YaNFD is the wrapper class for the NDN Forwarding Daemon.
// Note: only one instance of this class should be created.
type YaNFD struct {
config *YaNFDConfig

cpuProfileFile *os.File
memProfileFile *os.File
blockProfiler *pprof.Profile
config *YaNFDConfig
profiler *Profiler

unixListener *face.UnixStreamListener
wsListener *face.WebSocketListener
tcpListeners []*face.TCPListener
udpListener *face.UDPListener
}

// NewYaNFD creates a YaNFD. Don't call this function twice.
Expand All @@ -68,33 +64,9 @@ func NewYaNFD(config *YaNFDConfig) *YaNFD {
table.Configure()
mgmt.Configure()

// Initialize profiling
var cpuProfileFile *os.File
var memProfileFile *os.File
var blockProfiler *pprof.Profile
var err error
if config.CpuProfile != "" {
cpuProfileFile, err = os.Create(config.CpuProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for CPU profile: ", err)
}

core.LogInfo("Main", "Profiling CPU - outputting to ", config.CpuProfile)
pprof.StartCPUProfile(cpuProfileFile)
}

if config.BlockProfile != "" {
core.LogInfo("Main", "Profiling blocking operations - outputting to ", config.BlockProfile)
runtime.SetBlockProfileRate(1)
blockProfiler = pprof.Lookup("block")
// Output at end of runtime
}

return &YaNFD{
config: config,
cpuProfileFile: cpuProfileFile,
memProfileFile: memProfileFile,
blockProfiler: blockProfiler,
config: config,
profiler: NewProfiler(config),
}
}

Expand All @@ -103,18 +75,18 @@ func NewYaNFD(config *YaNFDConfig) *YaNFD {
func (y *YaNFD) Start() {
core.LogInfo("Main", "Starting YaNFD")

// Start profiler
y.profiler.Start()

// Initialize FIB table
fibTableAlgorithm := core.GetConfigStringDefault("tables.fib.algorithm", "nametree")
table.CreateFIBTable(fibTableAlgorithm)

// Create null face
nullFace := face.MakeNullLinkService(face.MakeNullTransport())
face.FaceTable.Add(nullFace)
go nullFace.Run(nil)
face.MakeNullLinkService(face.MakeNullTransport()).Run(nil)

// Start management thread
management := mgmt.MakeMgmtThread()
go management.Run()
go mgmt.MakeMgmtThread().Run()

// Create forwarding threads
if fw.NumFwThreads < 1 || fw.NumFwThreads > fw.MaxFwThreads {
Expand Down Expand Up @@ -170,10 +142,13 @@ func (y *YaNFD) Start() {
core.LogError("Main", "Unable to create MulticastUDPTransport for ", path, " on ", iface.Name, ": ", err)
continue
}
multicastUDPFace := face.MakeNDNLPLinkService(multicastUDPTransport, face.MakeNDNLPLinkServiceOptions())
face.FaceTable.Add(multicastUDPFace)

face.MakeNDNLPLinkService(
multicastUDPTransport,
face.MakeNDNLPLinkServiceOptions(),
).Run(nil)

faceCnt += 1
go multicastUDPFace.Run(nil)
core.LogInfo("Main", "Created multicast UDP face for ", path, " on ", iface.Name)
}

Expand All @@ -184,6 +159,7 @@ func (y *YaNFD) Start() {
}
faceCnt += 1
go udpListener.Run()
y.udpListener = udpListener
core.LogInfo("Main", "Created UDP listener for ", path, " on ", iface.Name)

if tcpEnabled {
Expand All @@ -194,8 +170,8 @@ func (y *YaNFD) Start() {
}
faceCnt += 1
go tcpListener.Run()
core.LogInfo("Main", "Created TCP listener for ", path, " on ", iface.Name)
y.tcpListeners = append(y.tcpListeners, tcpListener)
core.LogInfo("Main", "Created TCP listener for ", path, " on ", iface.Name)
}
}
}
Expand Down Expand Up @@ -240,28 +216,22 @@ func (y *YaNFD) Stop() {
core.LogInfo("Main", "Forwarder shutting down ...")
core.ShouldQuit = true

if y.config.MemProfile != "" {
memProfileFile, err := os.Create(y.config.MemProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for memory profile: ", err)
}

core.LogInfo("Main", "Profiling memory - outputting to ", y.config.MemProfile)
runtime.GC()
if err := pprof.WriteHeapProfile(memProfileFile); err != nil {
core.LogFatal("Main", "Unable to write memory profile: ", err)
}
}
// Stop profiler
y.profiler.Stop()

// Wait for unix socket listener to quit
if y.unixListener != nil {
y.unixListener.Close()
<-y.unixListener.HasQuit
}
if y.wsListener != nil {
y.wsListener.Close()
}

// Wait for UDP listener to quit
if y.udpListener != nil {
y.udpListener.Close()
}

// Wait for TCP listeners to quit
for _, tcpListener := range y.tcpListeners {
tcpListener.Close()
Expand All @@ -272,12 +242,6 @@ func (y *YaNFD) Stop() {
face.Close()
}

// Wait for all faces to quit
for _, face := range face.FaceTable.GetAll() {
core.LogTrace("Main", "Waiting for face ", face, " to quit")
<-face.GetHasQuit()
}

// Tell all forwarding threads to quit
for _, fw := range fw.Threads {
fw.TellToQuit()
Expand All @@ -287,23 +251,4 @@ func (y *YaNFD) Stop() {
for _, fw := range fw.Threads {
<-fw.HasQuit
}

// Shutdown Profilers
if y.config.BlockProfile != "" {
blockProfileFile, err := os.Create(y.config.BlockProfile)
if err != nil {
core.LogFatal("Main", "Unable to open output file for block profile: ", err)
}
if err := y.blockProfiler.WriteTo(blockProfileFile, 0); err != nil {
core.LogFatal("Main", "Unable to write block profile: ", err)
}
blockProfileFile.Close()
}
if y.config.MemProfile != "" {
y.memProfileFile.Close()
}
if y.config.CpuProfile != "" {
pprof.StopCPUProfile()
y.cpuProfileFile.Close()
}
}
Loading

0 comments on commit cfcc88b

Please sign in to comment.