diff --git a/examples/typescript/pipe-sync-meetings-to-notion/README.md b/examples/typescript/pipe-sync-meetings-to-notion/README.md
new file mode 100644
index 000000000..cdf42555d
--- /dev/null
+++ b/examples/typescript/pipe-sync-meetings-to-notion/README.md
@@ -0,0 +1,87 @@
+
+
+in screenpipe we have a plugin system called "pipe store" or "pipes"
+
+think of it like this:
+
+screenpipe data -> your pipe like "AI annotate" or "send to salesforce"
+
+a more dev-friendly explanation:
+
+screenpipe | AI tag | notion update
+
+or
+
+screenpipe | AI tag | slack send report
+
+or
+
+screenpipe | fill salesforce
+
+or
+
+screenpipe | logs daily
+
+basically it would read, process, annotate, analyse, summarize, send, your data customisable to your desire, effortlessly
+
+### pipe-sync-meetings-to-notion
+
+this is an experimental, but official pipe, that will sync your meetings to notion
+
+this is how you run it through the app:
+
+
+## Setup
+
+1. create a notion integration:
+ - go to https://www.notion.so/my-integrations
+ - click "new integration"
+ - give it a name (e.g., "screenpipe meeting sync")
+ - select the workspace where you want to sync your meetings
+ - click "submit" to create the integration
+
+2. get your notion api key:
+ - in the integration page, find the "internal integration token"
+ - copy this token, you'll need it later
+
+3. create a database in notion:
+ - create a new page in notion
+ - add a database to this page
+ - add columns: title, date, transcription + optionally notion ai columns if you want
+ - share this page with your integration (click three dots, connections, your integration)
+
+4. get your notion database id:
+ - open your database in notion
+ - look at the url, it should look like: https://www.notion.so/yourworkspace/83c75a51b3bd4a)
+ - the part after the last slash and before the ? is your database id
+
+now, your meeting transcriptions will automatically sync to your notion database!
+
+
+if you're in dev mode you can run the cli like this:
+
+```bash
+export SCREENPIPE_NOTION_API_KEY=secret_abcd
+export SCREENPIPE_NOTION_DATABASE_ID=1234567890
+screenpipe --pipe ./examples/typescript/pipe-sync-meetings-to-notion/main.js
+```
+
+please look at the code, it's 99% normal JS but there are limitations currently:
+- you cannot use dependencies (yet)
+- untested with typescript (but will make pipes TS first soon)
+
+i recommend you copy paste the current main.js file into AI and ask some changes for whatever you want to do, make sure to run an infinite loop also
+
+get featured in the pipe store:
+
+
+
+just ask @louis030195
+
+### what's next for pipes
+
+- use dependencies (like vercel/ai so cool)
+- TS
+- access to screenpipe desktop api (e.g. trigger notifications, customise what cursor-like @ are in the chat, etc.)
+- easier to publish your pipes (like obsidian store)
+- everything frictionless, effortless, and maximize the value you get out of screenpipe
diff --git a/examples/typescript/pipe-sync-meetings-to-notion/main.js b/examples/typescript/pipe-sync-meetings-to-notion/main.js
new file mode 100644
index 000000000..d7e126899
--- /dev/null
+++ b/examples/typescript/pipe-sync-meetings-to-notion/main.js
@@ -0,0 +1,94 @@
+const INTERVAL = 30 * 1000; // 30 seconds in milliseconds
+const NOTION_API_URL = 'https://api.notion.com/v1/pages';
+const NOTION_DATABASE_ID = process.env.SCREENPIPE_NOTION_DATABASE_ID;
+const NOTION_API_KEY = process.env.SCREENPIPE_NOTION_API_KEY;
+
+
+
+async function queryScreenpipe(startTime, endTime) {
+ try {
+ const queryParams = `start_time=${startTime}&end_time=${endTime}&limit=50&content_type=audio`;
+ const response = await fetch(`http://localhost:3030/search?${queryParams}`);
+ if (!response.ok) {
+ throw new Error(`HTTP error! status: ${response.status}`);
+ }
+ return await response.json();
+ } catch (error) {
+ console.error("Error querying screenpipe:", error.toString());
+ return [];
+ }
+}
+
+async function syncAudioToNotion(audioData) {
+ try {
+ const title = `Audio - ${audioData.content.timestamp}`;
+ const date = audioData.content.timestamp;
+ const transcription = audioData.content.transcription;
+
+ // Split transcription into chunks of 2000 characters
+ const chunks = splitTranscription(transcription);
+
+ for (let i = 0; i < chunks.length; i++) {
+ const response = await fetch(NOTION_API_URL, {
+ method: 'POST',
+ headers: {
+ 'Authorization': `Bearer ${NOTION_API_KEY}`,
+ 'Notion-Version': '2022-06-28',
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify({
+ parent: { database_id: NOTION_DATABASE_ID },
+ properties: {
+ Title: { title: [{ text: { content: `${title} (Part ${i + 1}/${chunks.length})` } }] },
+ Date: { date: { start: date } },
+ Transcription: { rich_text: [{ text: { content: chunks[i] } }] },
+ },
+ }),
+ });
+
+ if (!response.ok) {
+ const errorBody = await response.text();
+ throw new Error(`HTTP error! status: ${response.status}, body: ${errorBody}`);
+ }
+ }
+
+ console.log("Audio synced to Notion successfully");
+ } catch (error) {
+ console.error("Error syncing audio to Notion:", error);
+ }
+}
+
+function splitTranscription(transcription, chunkSize = 2000) {
+ const chunks = [];
+ for (let i = 0; i < transcription.length; i += chunkSize) {
+ chunks.push(transcription.slice(i, i + chunkSize));
+ }
+ return chunks;
+}
+
+async function streamAudioToNotion() {
+ console.log("Starting Audio Stream to Notion");
+
+ while (true) {
+ try {
+ const now = new Date();
+ const thirtySecondsAgo = new Date(now.getTime() - INTERVAL);
+
+ const audioData = await queryScreenpipe(thirtySecondsAgo.toISOString(), now.toISOString());
+
+ console.log("Audio data:", audioData);
+ for (const audio of audioData.data) {
+ await syncAudioToNotion(audio);
+ }
+ } catch (error) {
+ console.error("Error syncing audio to Notion:", {
+ message: error.message,
+ stack: error.stack,
+ audioData: JSON.stringify(audioData)
+ });
+ }
+ await new Promise((resolve) => setTimeout(resolve, INTERVAL));
+ }
+}
+
+streamAudioToNotion();
\ No newline at end of file
diff --git a/screenpipe-core/src/deno/runtime.js b/screenpipe-core/src/deno/runtime.js
index 7c47a00f7..518cdea2c 100644
--- a/screenpipe-core/src/deno/runtime.js
+++ b/screenpipe-core/src/deno/runtime.js
@@ -32,10 +32,37 @@ const pipe = {
const response = await ops.op_fetch_post(url, body);
return JSON.parse(response);
},
+ fetch: async (url, options) => {
+ try {
+ const responseString = await ops.op_fetch(url, options);
+ const response = JSON.parse(responseString);
+ return {
+ ok: response.status >= 200 && response.status < 300,
+ status: response.status,
+ statusText: response.statusText,
+ headers: response.headers, // Use the headers directly without wrapping in Headers object
+ text: async () => response.text,
+ json: async () => {
+ try {
+ return JSON.parse(response.text);
+ } catch (error) {
+ console.error("Error parsing JSON:", error);
+ return response.text;
+ }
+ },
+ };
+ } catch (error) {
+ console.error("Fetch error:", error);
+ throw error;
+ }
+ },
};
globalThis.setTimeout = (callback, delay) => {
ops.op_set_timeout(delay).then(callback);
};
globalThis.console = console;
-globalThis.pipe = pipe;
\ No newline at end of file
+globalThis.pipe = pipe;
+globalThis.fetch = pipe.fetch;
+
+
diff --git a/screenpipe-core/src/pipes.rs b/screenpipe-core/src/pipes.rs
index 62e099e58..a1d77c252 100644
--- a/screenpipe-core/src/pipes.rs
+++ b/screenpipe-core/src/pipes.rs
@@ -12,9 +12,77 @@ mod pipes {
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::header::CONTENT_TYPE;
+ use std::collections::HashMap;
use std::env;
use std::rc::Rc;
+ use reqwest::Client;
+ use serde_json::Value;
+
+ #[op2]
+ #[string]
+ fn op_get_env(#[string] key: String) -> Option {
+ env::var(&key).ok()
+ }
+
+ #[op2(async)]
+ #[string]
+ async fn op_fetch(
+ #[string] url: String,
+ #[serde] options: Option,
+ ) -> anyhow::Result {
+ let client = Client::new();
+ let mut request = client.get(&url);
+
+ if let Some(opts) = options {
+ if let Some(method) = opts.get("method").and_then(|m| m.as_str()) {
+ request = match method.to_uppercase().as_str() {
+ "GET" => client.get(&url),
+ "POST" => client.post(&url),
+ "PUT" => client.put(&url),
+ "DELETE" => client.delete(&url),
+ // Add other methods as needed
+ _ => return Err(anyhow::anyhow!("Unsupported HTTP method")),
+ };
+ }
+
+ if let Some(headers) = opts.get("headers").and_then(|h| h.as_object()) {
+ for (key, value) in headers {
+ if let Some(value_str) = value.as_str() {
+ request = request.header(key, value_str);
+ }
+ }
+ }
+
+ if let Some(body) = opts.get("body").and_then(|b| b.as_str()) {
+ request = request.body(body.to_string());
+ }
+ }
+
+ let response = match request.send().await {
+ Ok(resp) => resp,
+ Err(e) => return Err(anyhow::anyhow!(e)),
+ };
+
+ let status = response.status();
+ let headers = response.headers().clone();
+ let text = match response.text().await {
+ Ok(t) => t,
+ Err(e) => return Err(anyhow::anyhow!(e)),
+ };
+
+ let result = serde_json::json!({
+ "status": status.as_u16(),
+ "statusText": status.to_string(),
+ "headers": headers.iter()
+ .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
+ .collect::>(),
+ "text": text,
+ });
+
+ Ok(result.to_string())
+ }
+
#[op2(async)]
#[string]
async fn op_read_file(#[string] path: String) -> anyhow::Result {
@@ -177,6 +245,8 @@ mod pipes {
op_fetch_get,
op_fetch_post,
op_set_timeout,
+ op_fetch,
+ op_get_env,
]
}
@@ -189,6 +259,18 @@ mod pipes {
..Default::default()
});
+ // add all env var starting with SCREENPIPE_ to the global scope in process.env
+
+ // first init the process.env object
+ js_runtime.execute_script("main", "globalThis.process = { env: {} }")?;
+
+ for (key, value) in env::vars() {
+ if key.starts_with("SCREENPIPE_") {
+ js_runtime
+ .execute_script("main", format!("process.env['{}'] = '{}'", key, value))?;
+ }
+ }
+
let mod_id = js_runtime.load_main_es_module(&main_module).await?;
let result = js_runtime.mod_evaluate(mod_id);
js_runtime.run_event_loop(Default::default()).await?;