Skip to content
/ asyncp Public

The simple framework to build async task execution programs

License

Notifications You must be signed in to change notification settings

demdxx/asyncp

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

64 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Asyncp processing framework

Build Status Coverage Status Go Report Card GoDoc

License Apache 2.0

The simple framework to build async task execution programs.

Pipelines

Sequentially apply a list of tasks. Pipelines allow to split one big processing part for small simple steps to simplify the complex logic.

The pipeline is atomic, all tasks in the pipeline will successfully execute the whole task or all not.

Example program

import (
  ...
  "github.com/demdxx/asyncp/v2"
  "github.com/demdxx/asyncp/v2/streams"
  ...
)

func main() {
  taskQueueSub := nats.NewSubscriber(...)
  taskQueuePub := nats.NewPublisher(...)

  // Create new async multiplexer
  mx := asyncp.NewTaskMux(
    // Define default strem message queue
    asyncp.WithStreamResponseFactory(taskQueuePub),
    asyncp.WithPanicHandler(...),
    asyncp.WithErrorHandler(...),
    asyncp.WithCluster(...),
  )
  defer func() { _ = mx.Close() }()

  // Create new task handler to download articles by RSS
  mx.Handle("rss", downloadRSSList).
    Then(downloadRSSItem).
    Then(updateRSSArticles)

  // Create new task handler to process video files
  mx.Handle("video", loadVideoForProcessing).
    Then(makeVideoThumbs).
    Then(convertVideoFormat)

  // Send report to user (event contains login and email target)
  mx.Handle("email", pipeline.New(
    `userinfo`, assembleBasicInfo,
    `changes`, assembleAllChangesForUser,
    `template`, assembleEmailHTMLTemplate,
    pipeline.New(
      sendNotification,
      sendSendLogs,
    ),
  )).Then(sendEmailTask)

  // Retranslate all message to the queue if can`t process
  mx.Failver(asyncp.Retranslator(taskQueuePub))

  // Alternative:
  // taskQueueSub.Subscribe(context.Background(), mx)
  // taskQueueSub.Listen(context.Background())
  err = streams.ListenAndServe(context.Background(), mx,
    taskQueueSub, "nats://host:2222/group?topics=topicName")
}

Convert task to async executor.

atask := asyncp.WrapAsyncTask(task,
  WithWorkerCount(10),
  WithWorkerPoolSize(20),
  WithRecoverHandler(func(rec any) {
    // ...
  }))

// Or

asyncp.FuncTask(assembleBasicInfo).Async()

Cluster mode

The framework supports cluster task processing. Some of the servers can execute some specific tasks like video processing or windows specific stuff.

To extend cluster base functionality need to create in other applications (with the same sync options) linked task baseHandlerTaskName>myNewClusterTask.

func main() {
  ...
  // After RSS parsing we need to prcess video files from links if it's present
  mx.Handle("rss>videoExtraction", loadVideoForProcessing).
    Then(makeVideoThumbs).
    Then(convertVideoFormat)
  ...
}

Apmonitor tool

Displays state of the cluster and every task common state.

apmonitor tool

About

The simple framework to build async task execution programs

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published