Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVENT-49: replace argo-dataflow to numaflow #49

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
7 changes: 5 additions & 2 deletions bin/install.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const init = require("../lib/init").init;

const yargs = require("yargs");

const { cyan, dim, bright } = require("ansicolor");
const { cyan, dim, bright, red } = require("ansicolor");
const asTable = require("as-table").configure({
title: (x) => bright(x),
delimiter: dim(cyan(" | ")),
Expand Down Expand Up @@ -52,7 +52,7 @@ yargs
default: Intl.DateTimeFormat().resolvedOptions().timeZone,
}),
handler: (argv) => {
var options = {
const options = {
force: argv["f"],
cronString: argv["cs"],
timeZone: argv["tz"],
Expand Down Expand Up @@ -86,6 +86,9 @@ yargs
})
.then((info) => {
console.log(info);
})
.catch((err) => {
console.log(red(`${err}`));
});
},
})
Expand Down
12 changes: 11 additions & 1 deletion lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const constants = {
ARGO_CRON_WORKFLOW_KIND: "CronWorkflow",
ARGO_DATAFLOW_KIND: "Pipeline",
CONFIGMAP_KIND: "ConfigMap",
SECERT_KIND: "Secret",
SECRET_KIND: "Secret",

// workflow
ARGO_K8S_API_GROUP: "argoproj.io",
Expand All @@ -33,9 +33,19 @@ const constants = {
ARGO_CRON_WORKFLOW_PLURAL: "cronworkflows",

// dataflow
/**
* @deprecated please remove this in next stable release
*/
ARGO_DATAFLOW_K8S_API_GROUP: "dataflow.argoproj.io",
/**
* @deprecated please remove this in next stable release
*/
ARGO_PIPELINES_PLURAL: "pipelines",

// numaflow
NUMAFLOW_K8S_API_GROUP: "numaflow.numaproj.io",
NUMAFLOW_K8S_API_VERSION: "v1alpha1",

// atlan-defaults configmap details
ATLAN_DEFAULTS_CONFIGMAP_NAME: "atlan-defaults",
ATLAN_DEFAULTS_CONFIGMAP_NAMESPACE: "default",
Expand Down
2 changes: 1 addition & 1 deletion lib/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const system = require("system-commands");
const utils = require("./utils");

/**
* Init an Aro package inside the folder
* Init an Argo package inside the folder
* Steps:
* 1. Check if folder is empty
* 2. Package.json should not be present (unless force is set to true)
Expand Down
1 change: 0 additions & 1 deletion lib/install.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ const packageNameFromPath = function (path) {
* @param {string} dirPath
*/
const install = function (packageName, registry, namespace, save, cluster, options, dirPath = process.cwd()) {
// dirPath = "/Users/amit/Documents/marketplace-packages/atlan-atlas";
let npmSaveParam = "--no-save";
if (save) {
npmSaveParam = "--save";
Expand Down
6 changes: 3 additions & 3 deletions lib/k8s.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class K8sInstaller {
*/
installSecrets() {
const dirPath = `${this.packagePath}/secrets/`;
return this.installYamlInPath(dirPath, false, constants.SECERT_KIND, "", K8sInstaller.upsertSecret);
return this.installYamlInPath(dirPath, false, constants.SECRET_KIND, "", K8sInstaller.upsertSecret);
}

/**
Expand All @@ -134,7 +134,7 @@ class K8sInstaller {
dirPath,
false,
constants.ARGO_DATAFLOW_KIND,
constants.ARGO_DATAFLOW_K8S_API_GROUP,
constants.NUMAFLOW_K8S_API_GROUP,
K8sInstaller.upsertTemplate
);
}
Expand All @@ -145,7 +145,7 @@ class K8sInstaller {
*/
installTemplates(cluster) {
const dirPath = `${this.packagePath}/templates/`;
var kind = constants.ARGO_WORKFLOW_TEMPLATES_KIND;
let kind = constants.ARGO_WORKFLOW_TEMPLATES_KIND;
if (cluster) {
kind = constants.ARGO_CLUSTER_WORKFLOW_TEMPLATES_KIND;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/models/info.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class PackageInfo {
* @returns {string}
*/
getPackageLabel() {
return `${constants.ARGOPM_LIBRARY_NAME_LABEL}=${encode(this.name)}`;
return `${constants.ARGOPM_LIBRARY_NAME_LABEL}=${specialEncode(this.name)}`;
Copy link
Contributor

@louisnow louisnow Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this change? Is it safe/backwards compatible, etc?

Copy link
Author

@si3nloong si3nloong Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do aware this might be a bug in the current argopm (0.8.5). You can refer to the following:

  1. When I run argopm list -n default

Screenshot 2022-11-11 at 3 08 51 PM

  1. When I run argopm info @atlan/mssql -n default, it throw error instead. As you can see the second screenshot, the URL passed over is /k8s/clusters/c-kzq84/apis/dataflow.argoproj.io/v1alpha1/namespaces/default/pipelines?pretty=&allowWatchBookmarks=&continue=&fieldSelector=&labelSelector=package.argoproj.io/name=-atlan-mssql

Screenshot 2022-11-11 at 3 13 02 PM

Screenshot 2022-11-11 at 3 14 27 PM

Screenshot 2022-11-14 at 1 34 51 PM

}
}

Expand Down
30 changes: 16 additions & 14 deletions lib/models/package.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ class Package {
})
.then((_) => {
console.log(`Deleting templates for package ${this.metadata.name}`);
var kind = constants.ARGO_WORKFLOW_TEMPLATES_KIND;
var plural = `${kind.toLowerCase()}s`;
let kind = constants.ARGO_WORKFLOW_TEMPLATES_KIND;
let plural = `${kind.toLowerCase()}s`;

if (cluster) {
kind = constants.ARGO_CLUSTER_WORKFLOW_TEMPLATES_KIND;
Expand Down Expand Up @@ -179,8 +179,8 @@ class Package {
* @returns {Promise<[Package]>}
*/
dependencies(cluster) {
var kind = constants.ARGO_WORKFLOW_TEMPLATES_KIND;
var plural = `${kind.toLowerCase()}s`;
let kind = constants.ARGO_WORKFLOW_TEMPLATES_KIND;
let plural = `${kind.toLowerCase()}s`;

if (cluster) {
kind = constants.ARGO_CLUSTER_WORKFLOW_TEMPLATES_KIND;
Expand Down Expand Up @@ -290,8 +290,8 @@ class Package {
let plural = `${constants.ARGO_DATAFLOW_KIND.toLowerCase()}s`;
return customK8sApi
.listNamespacedCustomObject(
constants.ARGO_DATAFLOW_K8S_API_GROUP,
constants.ARGO_K8S_API_VERSION,
constants.NUMAFLOW_K8S_API_GROUP,
constants.NUMAFLOW_K8S_API_VERSION,
namespace,
plural,
null,
Expand All @@ -315,8 +315,8 @@ class Package {
return Promise.each(pipelines, function (pipeline) {
const metadata = pipeline.metadata;
return customK8sApi.deleteNamespacedCustomObject(
constants.ARGO_DATAFLOW_K8S_API_GROUP,
constants.ARGO_K8S_API_VERSION,
constants.NUMAFLOW_K8S_API_GROUP,
constants.NUMAFLOW_K8S_API_VERSION,
metadata.namespace,
plural,
metadata.name
Expand Down Expand Up @@ -456,14 +456,15 @@ Package.getInstallerLabel = function () {
};

/**
* Get install package
* @param {String} namespace
* @param {String} packageName
* @param {Boolean} cluster
* Get install package info
*
* @param {string} namespace
* @param {string} packageName
* @param {boolean} cluster
* @returns {Promise<Package>}
*/
Package.info = function (namespace, packageName, cluster) {
var plural = `${constants.ARGO_WORKFLOW_TEMPLATES_KIND.toLowerCase()}s`;
let plural = `${constants.ARGO_WORKFLOW_TEMPLATES_KIND.toLowerCase()}s`;
if (cluster) {
plural = `${constants.ARGO_CLUSTER_WORKFLOW_TEMPLATES_KIND.toLowerCase()}s`;
return customK8sApi
Expand Down Expand Up @@ -508,12 +509,13 @@ Package.info = function (namespace, packageName, cluster) {

/**
* Get all installed packages in the namespace
*
* @param {String} namespace
* @param {Boolean} cluster
* @returns {Promise<[Package]>}
*/
Package.list = function (namespace, cluster) {
var plural = `${constants.ARGO_WORKFLOW_TEMPLATES_KIND.toLowerCase()}s`;
let plural = `${constants.ARGO_WORKFLOW_TEMPLATES_KIND.toLowerCase()}s`;

if (cluster) {
plural = `${constants.ARGO_CLUSTER_WORKFLOW_TEMPLATES_KIND.toLowerCase()}s`;
Expand Down
2 changes: 1 addition & 1 deletion lib/static/package/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ This package was bootstrapped using argopm
- `argopm run`
- `argopm info`

For more details on these commands run `argom --help`
For more details on these commands run `argopm --help`

## Pre-requisites

Expand Down
62 changes: 59 additions & 3 deletions lib/static/package/pipelines/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## Pipelines

The yaml files in this directory are installed as dataflow pipelines on the Argo instance in the cluster.
> The yaml files in this directory are installed as pipelines on the [numaflow](https://numaflow.numaproj.io/) instance in the cluster.

### Sample Pipeline
### Sample `argo-dataflow` Pipeline

```yaml
apiVersion: dataflow.argoproj.io/v1alpha1
Expand Down Expand Up @@ -30,4 +30,60 @@ spec:

### Useful Links

- https://github.com/argoproj-labs/argo-dataflow
- https://github.com/argoproj-labs/argo-dataflow

### Sample `numaflow` Pipeline

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
annotations:
# Modify your pipeline name here
numaflow.numaproj.io/pipeline-name: 101-hello
# Modify your vertex name here
numaflow.numaproj.io/vertex-name: in
# Modify your pipeline description here
numaflow.numaproj.io/description: |-
This is the hello world of pipelines.

It uses a cron schedule as a source and then just cat the message to a log
numaflow.numaproj.io/owner: altanhq
numaflow.numaproj.io/test: 'true'
# Modify your pipeline name here
name: 101-hello
namespace: numaflow-system
spec:
# Data processing tasks
vertices:
# Sources / Inputs
- name: in
source:
generator:
rpu: 5
duration: 1s
# User-defined functions
- name: cat
udf:
builtin:
name: cat
containerTemplate:
env:
# This flag will enable debug for `numaflow`, please remove this if it's production
- name: NUMAFLOW_DEBUG
value: "true"
# Sinks
- name: out
sink:
log: {}
# The relationship between the vertices
edges:
- from: in
to: cat
- from: cat
to: out
```

### Useful Links

- https://numaflow.numaproj.io/pipeline
2 changes: 1 addition & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const Promise = require("bluebird");
const fs = require("fs").promises;
const path = require("path");
var rimraf = Promise.promisify(require("rimraf"));
let rimraf = Promise.promisify(require("rimraf"));

/**
* Recursively walk through the folder and return all file paths
Expand Down
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
"eslint": "eslint --fix .",
"lint-staged": "lint-staged"
},
"engines" : {
"npm" : ">=8.0.0",
"node" : ">=16.0.0"
},
"author": "Atlan",
"license": "MIT",
"dependencies": {
Expand Down