Skip to content

Commit

Permalink
Merge pull request #231 from skadefro/master
Browse files Browse the repository at this point in the history
close 1.4.13
  • Loading branch information
skadefro authored Jun 5, 2022
2 parents 1294346 + 705983b commit a90f439
Show file tree
Hide file tree
Showing 18 changed files with 604 additions and 256 deletions.
3 changes: 3 additions & 0 deletions OpenFlow/src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ export class Config {
Config.smtp_pass = Config.getEnv("smtp_service", "");
Config.smtp_url = Config.getEnv("smtp_url", "");
Config.debounce_lookup = Config.parseBoolean(Config.getEnv("debounce_lookup", "false"));
Config.validate_emails_disposable = Config.parseBoolean(Config.getEnv("validate_emails_disposable", "false"));



Config.tls_crt = Config.getEnv("tls_crt", "");
Expand Down Expand Up @@ -279,6 +281,7 @@ export class Config {
public static smtp_pass: string = Config.getEnv("smtp_pass", "");
public static smtp_url: string = Config.getEnv("smtp_url", "");
public static debounce_lookup: boolean = Config.parseBoolean(Config.getEnv("debounce_lookup", "false"));
public static validate_emails_disposable: boolean = Config.parseBoolean(Config.getEnv("validate_emails_disposable", "false"));

public static tls_crt: string = Config.getEnv("tls_crt", "");
public static tls_key: string = Config.getEnv("tls_key", "");
Expand Down
27 changes: 25 additions & 2 deletions OpenFlow/src/DBHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ export class DBHelper {
let item = await this.memoryCache.wrap("queuename_" + name, () => {
if (jwt === null || jwt == undefined || jwt == "") { jwt = Crypt.rootToken(); }
Logger.instanse.debug("DBHelper", "FindQueueByName", "Add queue to cache : " + name);
return Config.db.getbyname<User>(name, "mq", jwt, true, span);
return Config.db.GetOne<User>({ query: { name }, collectionname: "mq", jwt }, span);
});
if (NoderedUtil.IsNullUndefinded(item)) return null;
return this.DecorateWithRoles(User.assign(item), span);
Expand Down Expand Up @@ -295,7 +295,7 @@ export class DBHelper {
let item = await this.memoryCache.wrap("exchangename_" + name, () => {
if (jwt === null || jwt == undefined || jwt == "") { jwt = Crypt.rootToken(); }
Logger.instanse.debug("DBHelper", "FindExchangeByName", "Add exchange to cache : " + name);
return Config.db.getbyname<User>(name, "mq", jwt, true, span);
return Config.db.GetOne<User>({ query: { name }, collectionname: "mq", jwt }, span);
});
if (NoderedUtil.IsNullUndefinded(item)) return null;
return this.DecorateWithRoles(User.assign(item), span);
Expand Down Expand Up @@ -344,6 +344,29 @@ export class DBHelper {
Logger.otel.endSpan(span);
}
}
public async GetDisposableDomain(domain: string, parent: Span): Promise<Base> {
await this.init();
if (domain.indexOf("@") > -1) {
domain = domain.substr(domain.indexOf("@") + 1);
}
const span: Span = Logger.otel.startSubSpan("dbhelper.FindByUsername", parent);
try {
if (NoderedUtil.IsNullEmpty(domain)) return null;
let item = await this.memoryCache.wrap("disposable_" + domain, () => {
const jwt = Crypt.rootToken();
Logger.instanse.debug("DBHelper", "IsDisposableDomain", "Add to cache : " + domain);
const query = { name: domain, "_type": "disposable" };
return Config.db.GetOne<Base>({ query, collectionname: "domains", jwt }, span);
});
if (NoderedUtil.IsNullUndefinded(item)) return null;
return item;
} catch (error) {
span?.recordException(error);
throw error;
} finally {
Logger.otel.endSpan(span);
}
}
public async FindByUsernameOrFederationid(username: string, issuer: string, parent: Span): Promise<User> {
await this.init();
const span: Span = Logger.otel.startSubSpan("dbhelper.FindByUsername", parent);
Expand Down
211 changes: 191 additions & 20 deletions OpenFlow/src/DatabaseConnection.ts

Large diffs are not rendered by default.

78 changes: 46 additions & 32 deletions OpenFlow/src/LoginProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ export class LoginProvider {
} catch (error) {
}
const agent = req.headers['user-agent'];
const UpdateDoc: any = { "$set": { "_modified": dt }, "$push": { "opened": { dt, ip, domain, agent } } };
const UpdateDoc: any = { "$set": { "_modified": dt, "read": true }, "$push": { "opened": { dt, ip, domain, agent } }, "$inc": { "readcount": 1 } };
var res2 = await Config.db._UpdateOne({ id }, UpdateDoc, "mailhist", 1, true, Crypt.rootToken(), null);
} catch (error) {
Logger.instanse.error("LoginProvider", "/read", error);
Expand Down Expand Up @@ -690,6 +690,12 @@ export class LoginProvider {
throw new Error("Please use a valid and non temporary email address");
}
}
if (Config.validate_emails_disposable) {
var domain = await Logger.DBHelper.GetDisposableDomain(email, span);
if (domain != null) {
throw new Error("Please use a valid and non temporary email address");
}
}
}

if (email.indexOf("@") > -1) {
Expand All @@ -712,7 +718,7 @@ export class LoginProvider {
} else {
const code = NoderedUtil.GetUniqueIdentifier();
UpdateDoc.$set["_mailcode"] = code;
this.sendEmail("validate", email, 'Validate email in OpenIAP flow',
this.sendEmail("validate", tuser._id, email, 'Validate email in OpenIAP flow',
`Hi ${tuser.name}\nPlease use the below code to validate your email\n${code}`);
}
} else {
Expand Down Expand Up @@ -743,7 +749,7 @@ export class LoginProvider {
let email: string = u.username;
if (u.email.indexOf("@") > -1) email = u.email;
(u as any)._mailcode = NoderedUtil.GetUniqueIdentifier();
this.sendEmail("validate", email, 'Validate email in OpenIAP flow',
this.sendEmail("validate", u._id, email, 'Validate email in OpenIAP flow',
`Hi ${u.name}\nPlease use the below code to validate your email\n${(u as any)._mailcode}`);


Expand Down Expand Up @@ -802,7 +808,7 @@ export class LoginProvider {
const email: string = req.body.email;
let user = await Config.db.getbyusername(req.body.email, null, Crypt.rootToken(), true, span);
if (user == null) {
Logger.instanse.error("LoginProvider", "/forgotpassword", "Recevied unknown email " + email);
Logger.instanse.error("LoginProvider", "/forgotpassword", "Received unknown email " + email);
return res.end(JSON.stringify({ id }));
}
const code = NoderedUtil.GetUniqueIdentifier();
Expand All @@ -815,7 +821,8 @@ export class LoginProvider {
Logger.instanse.error("LoginProvider", "/forgotpassword", "Recevied wrong mail for id " + id);
return res.end(JSON.stringify({ id }));
}
// this.sendEmail("validate", email, 'Validate email in OpenIAP flow', 'Please use the below code to reset your password\n' + code);
this.sendEmail("pwreset", user._id, email, 'Reset password request',
`Hi ${user.name}\nYour password for ${Config.domain} can be reset by using the below validation code\n\n${code}\n\nIf you did not request a new password, please ignore this email.`);

return res.end(JSON.stringify({ id }));
}
Expand Down Expand Up @@ -860,6 +867,7 @@ export class LoginProvider {
if (!user.emailvalidated) user.validated = false;
}
await Logger.DBHelper.Save(user, Crypt.rootToken(), span);
await Logger.DBHelper.DeleteKey("user" + user._id);
await Logger.DBHelper.DeleteKey("forgotpass" + id);
return res.end(JSON.stringify({ id }));
}
Expand Down Expand Up @@ -1566,7 +1574,7 @@ export class LoginProvider {
}
} else {
var exists = _user.federationids.filter(x => x.id == username && x.issuer == issuer);
if (exists.length == 0) {
if (exists.length == 0 || _user.emailvalidated == false) {
_user.federationids = _user.federationids.filter(x => x.issuer != issuer);
_user.federationids.push(new FederationId(username, issuer));
_user.emailvalidated = true;
Expand Down Expand Up @@ -1608,7 +1616,7 @@ export class LoginProvider {
});
}

static sendEmail(type: string, to: string, subject: string, text: string): Promise<string> {
static sendEmail(type: string, userid: string, to: string, subject: string, text: string): Promise<string> {
return new Promise<string>((resolve, reject) => {
var transporter = null;
if (!NoderedUtil.IsNullEmpty(Config.smtp_url)) {
Expand All @@ -1628,31 +1636,37 @@ export class LoginProvider {
let html = text + `<img src="${imgurl}" border="0" width="1" height="1">`
let from = Config.smtp_from;

transporter.sendMail({
from,
to,
subject,
html
}, function (error, info) {
if (error) {
Logger.instanse.info("LoginProvider", "sendEmail", error);
reject(error);
} else {
Logger.instanse.info("LoginProvider", "sendEmail", "Email sent to " + to + " " + info.response);
var item: any = new Base();
item._type = type;
item.id = id;
item.from = from;
item.to = to;
item.text = text;
item.name = to + " " + subject;
item.opened = [];
item.response = info.response;
Config.db.InsertOne(item, "mailhist", 1, true, Crypt.rootToken(), null);
resolve(info.response);
}
});

if (Config.NODE_ENV != "production") {
resolve("email not sent");
} else {
transporter.sendMail({
from,
to,
subject,
html
}, function (error, info) {
if (error) {
Logger.instanse.info("LoginProvider", "sendEmail", error);
reject(error);
} else {
Logger.instanse.info("LoginProvider", "sendEmail", "Email sent to " + to + " " + info.response);
var item: any = new Base();
item.readcount = 0;
item._type = type;
item.id = id;
item.from = from;
item.to = to;
item.text = text;
item.name = to + " " + subject;
item.userid = userid;
item.opened = [];
item.response = info.response;
item.read = false;
Config.db.InsertOne(item, "mailhist", 1, true, Crypt.rootToken(), null);
resolve(info.response);
}
});
}
});
}
}
51 changes: 48 additions & 3 deletions OpenFlow/src/Messages/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Readable, Stream } from "stream";
import { GridFSBucket, ObjectID, Cursor } from "mongodb";
import * as path from "path";
import { DatabaseConnection } from "../DatabaseConnection";
import { StripeMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, RegisterExchangeMessage, EnsureCustomerMessage, Customer, stripe_tax_id, Role, SelectCustomerMessage, Rolemember, ResourceUsage, Resource, ResourceVariant, stripe_subscription, GetNextInvoiceMessage, stripe_invoice, stripe_price, stripe_plan, stripe_invoice_line, GetKubeNodeLabelsMessage, CreateWorkflowInstanceMessage, WorkitemFile } from "@openiap/openflow-api";
import { StripeMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, RegisterExchangeMessage, EnsureCustomerMessage, Customer, stripe_tax_id, Role, SelectCustomerMessage, Rolemember, ResourceUsage, Resource, ResourceVariant, stripe_subscription, GetNextInvoiceMessage, stripe_invoice, stripe_price, stripe_plan, stripe_invoice_line, GetKubeNodeLabelsMessage, CreateWorkflowInstanceMessage, WorkitemFile, InsertOrUpdateManyMessage } from "@openiap/openflow-api";
import { stripe_customer, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription_item, stripe_coupon } from "@openiap/openflow-api";
import { amqpwrapper, QueueMessageOptions } from "../amqpwrapper";
import { WebSocketServerClient } from "../WebSocketServerClient";
Expand Down Expand Up @@ -98,6 +98,9 @@ export class Message {
case "insertorupdateone":
await this.InsertOrUpdateOne(span);
break;
case "insertorupdatemany":
await this.InsertOrUpdateMany(span);
break;
case "deleteone":
await this.DeleteOne(span);
break;
Expand Down Expand Up @@ -393,6 +396,18 @@ export class Message {
cli.Send(this);
}
break;
case "insertorupdatemany":
if (!this.EnsureJWT(cli)) {
Logger.instanse.debug("Message", "Process", "Discard " + command + " due to missing jwt, and respond with error, for client at " + cli.remoteip + " " + cli.clientagent + " " + cli.clientversion);
break;
}
if (Config.enable_openflow_amqp) {
cli.Send(await QueueClient.SendForProcessing(this, this.priority));
} else {
await this.InsertOrUpdateMany(span);
cli.Send(this);
}
break;
case "deleteone":
if (!this.EnsureJWT(cli)) {
Logger.instanse.debug("Message", "Process", "Discard " + command + " due to missing jwt, and respond with error, for client at " + cli.remoteip + " " + cli.clientagent + " " + cli.clientversion);
Expand Down Expand Up @@ -1458,7 +1473,7 @@ export class Message {
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = this.jwt; }
if (NoderedUtil.IsNullEmpty(msg.w as any)) { msg.w = 0; }
if (NoderedUtil.IsNullEmpty(msg.j as any)) { msg.j = false; }
msg = await Config.db.UpdateMany(msg, span);
msg = await Config.db.UpdateDocument(msg, span);
delete msg.item;
} catch (error) {
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
Expand Down Expand Up @@ -1504,6 +1519,36 @@ export class Message {
await handleError(null, error);
}
}
private async InsertOrUpdateMany(parent: Span): Promise<void> {
this.Reply();
const span: Span = Logger.otel.startSubSpan("message.InsertOrUpdateMany", parent);
let msg: InsertOrUpdateManyMessage
try {
msg = InsertOrUpdateManyMessage.assign(this.data);
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = this.jwt; }
if (NoderedUtil.IsNullEmpty(msg.w as any)) { msg.w = 0; }
if (NoderedUtil.IsNullEmpty(msg.j as any)) { msg.j = false; }
if (NoderedUtil.IsNullEmpty(msg.jwt)) {
throw new Error("jwt is null and client is not authenticated");
}
msg.results = await Config.db.InsertOrUpdateMany(msg.items, msg.collectionname, msg.uniqeness, msg.skipresults, msg.w, msg.j, msg.jwt, span);
if (msg.skipresults) msg.results = [];
delete msg.items;
} catch (error) {
span?.recordException(error);
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error;
await handleError(null, error);
}
try {
this.data = JSON.stringify(msg);
} catch (error) {
span?.recordException(error);
this.data = "";
await handleError(null, error);
}
Logger.otel.endSpan(span);
}
private async DeleteOne(parent: Span): Promise<void> {
this.Reply();
let msg: DeleteOneMessage
Expand Down Expand Up @@ -4581,7 +4626,7 @@ export class Message {
} else if (["failed", "successful", "retry", "processing"].indexOf(msg.state) == -1) {
throw new Error("Illegal state " + msg.state + " on Workitem, must be failed, successful, processing or retry");
}
if (msg.errortype == "business") msg.state == "failed";
if (msg.errortype == "business") msg.state = "failed";
if (msg.state == "retry") {
if (NoderedUtil.IsNullEmpty(wi.retries)) wi.retries = 0;
if (wi.retries < wiq.maxretries || msg.ignoremaxretries) {
Expand Down
Loading

0 comments on commit a90f439

Please sign in to comment.