Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client-server syncing (à la Web after Tomorrow / Datsync) (WIP) #18

Open
wants to merge 63 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
b879ea2
Basic test established for Datomic!
alexandergunnarson Jan 7, 2017
44054e4
The tests progress... semi-unsatisfactorily
alexandergunnarson Jan 7, 2017
1ce7a63
Slightly update test
alexandergunnarson Jan 7, 2017
7875409
CLJ DataScript simple test works!!
alexandergunnarson Jan 7, 2017
bcd0ea9
Update tests
alexandergunnarson Jan 7, 2017
f40f42b
Add in @seantempesta 's changes to datomic.clj (thanks!)
alexandergunnarson Jan 16, 2017
7292a31
Fix minor compilation error
alexandergunnarson Jan 16, 2017
e2f71dc
Add tools.namespace in dev dependencies
alexandergunnarson Jan 17, 2017
17d661d
`rand` -> `gensym` for listener ids to guarantee uniqueness
alexandergunnarson Jan 17, 2017
cbb28e4
Move certain utils to, well, lib/util
alexandergunnarson Jan 17, 2017
c3de7d3
Add `notified-times` test to DS and Datomic
alexandergunnarson Jan 17, 2017
3fed4ed
Update plugin-base slightly
alexandergunnarson Jan 17, 2017
16b1fbd
Remove code duplication
alexandergunnarson Jan 17, 2017
7e32df3
Remove all `unsynchronized-mutable` and replace with atoms. Retain ea…
alexandergunnarson Jan 17, 2017
a612b5a
No superfluous debug in plugin-base
alexandergunnarson Jan 17, 2017
1927f61
Debug should default to false
alexandergunnarson Jan 17, 2017
49cce06
Datomic test works!! This is b/c Posh listeners are now run before `p…
alexandergunnarson Jan 17, 2017
ff00d4a
Add a few explanatory comments; remove certain logging
alexandergunnarson Jan 17, 2017
58380ba
`add-eager-watch`; `make-wrapper` (as is present in Reagent implement…
alexandergunnarson Jan 17, 2017
3275552
Can now eagerly watch reactive queries (really, any reactions)
alexandergunnarson Jan 17, 2017
3d5c2c6
Slight cleanup
alexandergunnarson Jan 17, 2017
10e9a6a
Refactor Datomic test to be a little cleaner
alexandergunnarson Jan 17, 2017
0e324ca
Slightly extend Datomic test
alexandergunnarson Jan 17, 2017
6b152dd
`add-eager-watch` now disposes runner when `remove-watch`
alexandergunnarson Jan 17, 2017
2a7c1ad
Test `remove-watch`
alexandergunnarson Jan 17, 2017
2cb4a53
Refactor into common DataScript and Datomic tests
alexandergunnarson Jan 17, 2017
4ef266a
Initial commit
alexandergunnarson Jan 17, 2017
7eb7c0a
A small refactor to be able to use Datomic utils from e.g. posh.sync
alexandergunnarson Jan 17, 2017
97c178e
Add `posh-one!` for customized retrieval (e.g. :datom-t); begin sync …
alexandergunnarson Jan 17, 2017
a92f7c7
Move most of clj/datomic to lib/datomic
alexandergunnarson Jan 18, 2017
e0bc3f6
Add `lib.datascript/->schema` for sanitization of Datomic schemas in DS
alexandergunnarson Jan 18, 2017
8af39fd
Unify schema representation b/w Datomic and DS; `posh!` now derefs ap…
alexandergunnarson Jan 18, 2017
119594d
Move testing comment to scratch.clj; use material from scratch.clj to…
alexandergunnarson Jan 18, 2017
98bbba0
Unify signature of stateful/add-db with that of core/add-db
alexandergunnarson Jan 18, 2017
b49eeea
Fixed a few things with sync_test... a lot left to do
alexandergunnarson Jan 18, 2017
3792f07
Successfully used `:datoms-t` with both DS and Datomic
alexandergunnarson Jan 18, 2017
a783550
Aesthetic code reordering
alexandergunnarson Jan 18, 2017
9f7ab18
Add `conn->schema` for lib.datomic
alexandergunnarson Jan 18, 2017
dab995f
Ensure debug prints happen sequentially by thread
alexandergunnarson Jan 18, 2017
af5479f
Avoid NullPointException in plugin-base
alexandergunnarson Jan 18, 2017
df1364d
(Simulated) server and client now incrementally pass each other the m…
alexandergunnarson Jan 18, 2017
871330b
Added documentation; filtered db is working!; more tests forthcoming
alexandergunnarson Jan 18, 2017
81b45ef
Filtered DB works for DataScript and Datomic; tests added for that
alexandergunnarson Jan 18, 2017
09ac940
Condense sync test
alexandergunnarson Jan 18, 2017
693437b
Ensure that (simulated) server and client receive the correct messages
alexandergunnarson Jan 18, 2017
a4180e3
Cleanup; add `sub-datoms!` convenience function; begin to add more co…
alexandergunnarson Jan 18, 2017
25e1bb4
Redid the sync test dataset; filtering test is more robust! Incomplet…
alexandergunnarson Jan 19, 2017
2d79438
`user-admin-q`, `user-public-q`
alexandergunnarson Jan 19, 2017
24274de
Remove cruft
alexandergunnarson Jan 19, 2017
dc6b035
`repo-q`; compress test data
alexandergunnarson Jan 19, 2017
d0bb801
Ensure filters are complied with; Compress test data
alexandergunnarson Jan 19, 2017
1d107a9
Initialization factored out; works with DS
alexandergunnarson Jan 19, 2017
01ad411
Add a few todos
alexandergunnarson Jan 19, 2017
bc595fc
`dissoc-in`
alexandergunnarson Jan 19, 2017
a8a6ff9
`merge-deep-with`, `merge-deep`
alexandergunnarson Jan 19, 2017
0d9d6da
`posh.sync.schema` in order, well, to sync schemas
alexandergunnarson Jan 19, 2017
dea9bfe
Distinguish `conn?`, `db?`, `pushable-conn?`
alexandergunnarson Jan 19, 2017
1293a96
Expand on test; Use mults and taps to enable multiple consumers of da…
alexandergunnarson Jan 19, 2017
7a97a49
`deduce` transducer; client push successful!
alexandergunnarson Jan 19, 2017
c7633d0
Add to sync test
alexandergunnarson Jan 19, 2017
ad7d482
Private notification test; Fixed a few reporting errors
alexandergunnarson Jan 19, 2017
56bc21f
Add important overarching todo items
alexandergunnarson Jan 19, 2017
805cfb3
Fix incorrect behavior of lib.util/debug* in CLJS
alexandergunnarson Jan 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions dev/scratch.clj
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@

;; same db as :hux but without any :task/name datoms
(pt/add-db :tasks conn (:schema @conn) {:filter 'scratch/no-task-names-filter})

(pt/add-pull [:db :hux] '[*] 3)
(pt/add-filter-tx [:db :hux] '[[_ #{:category/name}]])
(pt/add-filter-pull
Expand Down Expand Up @@ -161,15 +161,15 @@
;;(d/pull @conn '[*] [:task/name "jim"])

(def conn3 (d/create-conn))

(d/transact!
conn3
[{:db/id -1
:name "joe"}

{:db/id -2
:name "sally"}

{:db/id -3
:name "bob"}])

Expand All @@ -185,7 +185,7 @@
:schema (:schema @conn3)
:key :hux}])


)


Expand Down Expand Up @@ -225,7 +225,7 @@
:key [:db :conn2]}
54])

;;; not working yet...
;;; not working yet...
(qa/q-analyze-with-pulls {:q d/q}
[:pulls]
'[:find (pull ?tname '[*]) ?t ?uuid ?p ?level
Expand Down Expand Up @@ -346,7 +346,7 @@
(comment

(d/transact! conn '[[:db/add 5 :yoyo/ma "bingo"] [:db/add 8 :youou "hey"]])

(def poshtree (p/new-posh dcfg [:results]))

(def hux (p/add-db poshtree :hux conn schema nil))
Expand All @@ -372,7 +372,7 @@

(p/poshdb->conn filtp)
(p/cache filtp)

(db/poshdb->db @poshtree filtp)

(def f (d/filter @conn (fn [db datom] (do (println datom)
Expand Down Expand Up @@ -400,5 +400,17 @@

)



; @alexandergunnarson Sync testing

#_(try #_(clojure.main/repl
:print clojure.pprint/pprint
:caught clojure.pprint/pprint)
(require '[clojure.tools.namespace.repl :refer [refresh]])
(let [x (refresh)] (when (instance? Throwable x) (throw x)))
(set! *warn-on-reflection* true)
(eval `(do (reset! posh.lib.util/debug? true)
(clojure.test/run-tests 'posh.lib.ratom-test)
(clojure.test/run-tests 'posh.clj.datascript-test)
(clojure.test/run-tests 'posh.clj.datomic-test)
(clojure.test/run-tests 'posh.sync-test)))
(catch Throwable t (println t)))
6 changes: 4 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.7.0"]
[org.clojure/clojurescript "1.7.228"]
#_[datascript "0.15.0"]
#_[com.datomic/datomic-free "0.9.5407"]
[org.clojure/core.match "0.3.0-alpha4"]]
:plugins [[lein-cljsbuild "1.1.3"]]
:profiles {:dev {:dependencies [[datascript "0.15.0"]
[com.datomic/datomic-free "0.9.5344"]
[org.clojure/core.async "0.2.391"]
[org.clojure/tools.namespace "0.2.11"]]}}
:cljsbuild {
:builds [ {:id "posh"
:source-paths ["src/"]
Expand Down
6 changes: 5 additions & 1 deletion src/posh/clj/datascript.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
(ns posh.clj.datascript
"The public API of Posh's DataScript implementation for Clojure."
(:require [posh.plugin-base :as base]
[posh.lib.ratom :as rx]
[posh.lib.datascript :as ldb]
[datascript.core :as d]))

(def dcfg
Expand All @@ -14,7 +16,9 @@
:listen! d/listen!
:conn? d/conn?
:ratom rx/atom
:make-reaction rx/make-reaction}]
:make-reaction rx/make-reaction
:conn->schema ldb/conn->schema
:additional-listeners ldb/add-schema-listener!}]
(assoc dcfg :pull (partial base/safe-pull dcfg))))

(base/add-plugin dcfg)
34 changes: 12 additions & 22 deletions src/posh/clj/datomic.clj
Original file line number Diff line number Diff line change
@@ -1,32 +1,22 @@
(ns posh.clj.datomic
(:require [posh.plugin-base :as base]
[posh.lib.ratom :as rx]
[datomic.api :as d]))

(defn- TODO [& [msg]] (throw (ex-info (str "TODO: " msg) nil)))

(defn- conn? [x] (instance? datomic.Connection x))

; TODO maybe we don't want blocking here?)
(defn- transact!* [& args] @(apply d/transact args))

(defn- listen!
([conn callback] (listen! conn (rand) callback))
([conn key callback]
{:pre [(conn? conn)]}
(TODO "Need to figure out how to listen to Datomic connection in the same way as DataScript")
key))
"The public API of Posh's Datomic implementation (for Clojure)."
(:require [posh.plugin-base :as base]
[posh.lib.ratom :as rx]
[datomic.api :as d]
[posh.lib.datomic :as l]))

(def dcfg
(let [dcfg {:db d/db
(let [dcfg {:db l/db*
:pull* d/pull
:q d/q
:q l/q*
:filter d/filter
:with d/with
:entid d/entid
:transact! transact!*
:listen! listen!
:conn? conn?
:transact! l/transact!*
:listen! l/listen!
:conn? l/poshable-conn?
:conn->schema l/conn->schema
:->poshable-conn l/->poshable-conn
:ratom rx/atom
:make-reaction rx/make-reaction}]
(assoc dcfg :pull (partial base/safe-pull dcfg))))
Expand Down
1 change: 0 additions & 1 deletion src/posh/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
:filters {}})
;; {db-id {:filter pred :as-of t :with tx-data :since t}}


(defn add-db
([posh-tree db-id conn schema] (add-db posh-tree db-id conn schema nil))
([{:keys [dcfg conns schemas dbs cache graph] :as posh-tree}
Expand Down
22 changes: 22 additions & 0 deletions src/posh/lib/datascript.cljc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
(ns posh.lib.datascript
"General DataScript utils."
(:require [datascript.core :as d]))

(def default-partition :db.part/default)

(defn tempid [] (d/tempid default-partition))

(defn conn->schema [conn] (:schema @conn))

(defn add-schema-listener! [conn posh-atom db-id]
(add-watch conn :posh-schema-listener
(fn [_ _ old-state new-state]
(when (not= (:schema old-state) (:schema new-state))
(swap! posh-atom assoc-in [:schema db-id] (:schema new-state))))))

(defn ->schema [schema]
(->> schema
(map (fn [[k v]] [k (if (-> v :db/valueType (not= :db.type/ref))
(dissoc v :db/valueType)
v)]))
(into {})))
192 changes: 192 additions & 0 deletions src/posh/lib/datomic.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
(ns posh.lib.datomic
"General Datomic utils."
(:require [datomic.api :as d]
[clojure.core.async :as async
:refer [thread offer! <!! promise-chan]]
[posh.plugin-base :as base]
[posh.lib.util :as u
:refer [debug]]))

; TODO import stuartsierra.component ?
; TODO move
(defprotocol Lifecycle
(start [this])
(stop [this]))

; TODO fix
#_(defn datom->seq [^datomic.Datom d] [(.e d) (.a d) (.v d) (.tx d) (.added d)])

(defn datom->seq [db-after ^datomic.Datom d]
[(.e d) (d/ident db-after (.a d)) (.v d) (.tx d) (.added d)])

#_(extend-type datomic.Datom ; TODO fix
clojure.core.protocols/CollReduce
(coll-reduce [datom f ] (reduce f (f) (datom->seq datom)))
(coll-reduce [datom f v] (reduce f v (datom->seq datom))))

(defn normalized-tx-report [{:keys [db-after] :as tx-report}]
(update tx-report :tx-data
(fn [datoms] (mapv #(datom->seq db-after %) datoms))))

(defn run-listeners! [pconn tx-report']
(try (doseq [[_ callback] @(:listeners pconn)] (callback tx-report'))
(catch Throwable e (debug "WARNING:" e))))

(defrecord PoshableConnection [datomic-conn listeners deduplicate-tx-idents interrupted?]
Lifecycle
(start [this]
(assert (instance? datomic.Connection datomic-conn))
(assert (instance? clojure.lang.IAtom listeners))
(assert (instance? clojure.lang.IAtom interrupted?))
; See `transact!*` as to why the below schema+entity is required.
; TODO use `posh.lib.datomic/transact-schemas!`?
@(d/transact datomic-conn
[{:db/id (d/tempid :db.part/db)
:db.install/_attribute :db.part/db
:db/ident :posh.clj.datomic.tx-notifier/value
:db/cardinality :db.cardinality/one
:db/valueType :db.type/uuid}
{:db/id (d/tempid :db.part/db)
:db/ident ::tx-notifier}])
(thread
(loop []
(when-not @interrupted?
; `poll` because if `take`, still won't be nil or stop waiting when conn is released
; the poll time is how long it will take to shut down when that time comes
(when-let [{:keys [db-after] :as tx-report}
(.poll ^java.util.concurrent.BlockingQueue
(d/tx-report-queue datomic-conn)
1
java.util.concurrent.TimeUnit/SECONDS)]
(try (let [{:keys [tx-data] :as tx-report'} (normalized-tx-report tx-report)
last-tx-item (last tx-data)
tx-ident (when (and last-tx-item
(= (d/ident db-after (get last-tx-item 0)) ::tx-notifier)
(= (d/ident db-after (get last-tx-item 1)) :posh.clj.datomic.tx-notifier/value))
(get last-tx-item 2))]
(try (debug "tx-report received in PoshableConnection")
(when-not (get @deduplicate-tx-idents tx-ident)
(run-listeners! this tx-report'))
(finally
(swap! deduplicate-tx-idents #(disj % tx-ident)))))
(catch Throwable e (debug "WARNING:" e)))
(recur)))))
this)
(stop [this]
(reset! interrupted? true)
(swap! deduplicate-tx-idents empty)
this))

(defn db? [x] (instance? datomic.Database x))
(defn conn? [x] (instance? datomic.Connection x))
(defn poshable-conn? [x] (instance? PoshableConnection x))

(defn ->conn [x] (if (poshable-conn? x) (:datomic-conn x) x))

(defn ->poshable-conn [datomic-conn]
{:pre [(conn? datomic-conn)]}
(let [listeners (atom nil)]
(with-meta (start (PoshableConnection. datomic-conn listeners (atom #{}) (atom false)))
{:listeners listeners})))

(def system-ns #{"db" "db.type" "db.install" "db.part" "db.sys" "db.alter"
"db.lang" "fressian" "db.unique" "db.excise" "db.bootstrap"
"db.cardinality" "db.fn"})

(defn conn->schema [conn]
{:pre [(conn? conn)]}
(let [db (d/db conn)
es (d/q '[:find [?e ...]
:in $ ?system-ns
:where
[?e :db/ident ?ident]
[(namespace ?ident) ?ns]
[((comp not contains?) ?system-ns ?ns)]]
db system-ns)]
(->> es
(map (fn [e] (let [m (d/touch (d/entity db e))]
[(:db/ident m) m])))
(into {}))))

(defn listen!
([conn callback] (listen! conn (gensym) callback))
([conn key callback]
{:pre [(poshable-conn? conn)]}
(swap! (:listeners (meta conn)) assoc key callback)
key))

(def default-partition :db.part/default)

(defn tempid [] (d/tempid default-partition))

(defn install-partition [part]
(let [id (d/tempid :db.part/db)]
[{:db/id id
:db/ident part}
[:db/add :db.part/db
:db.install/partition id]]))

(defn transact!*
"The main point of the additions onto Datomic's base `transact` fn is to wait for related
listeners to be run before returning."
[conn tx]
{:pre [(poshable-conn? conn)]}
(let [; In order to ensure listeners are run only once (i.e. deduplicate them),
; we have to transmit to the report queue in a race-condition-free way
; some sort of unique ID we know ahead of time. I'd like to just use the
; txn ID, but this is not given ahead of time. Thus we must pass a squuid
; to the transaction.
; This is cleaner than e.g. using channels because they introduce race
; conditions in this situation.
tx-ident (d/squuid)
_ (swap! (:deduplicate-tx-idents conn) conj tx-ident)
tx-report @(d/transact (->conn conn)
(conj (vec tx) [:db/add ::tx-notifier :posh.clj.datomic.tx-notifier/value tx-ident]))
tx-report' (normalized-tx-report tx-report)
_ (run-listeners! conn tx-report')]
tx-report'))

(defn transact-schemas!
"This is used because, perhaps very strangely, schema changes to Datomic happen
asynchronously."
{:todo #{"Make more robust"}}
[conn schemas]
(let [txn-report @(d/transact conn
(->> schemas
(map (fn [[ident v]] (assoc v :db/ident ident
:db/id (d/tempid :db.part/db)
:db.install/_attribute :db.part/db)))))
txn-id (-> txn-report :tx-data ^datomic.Datom first (.tx))
_ #_(deref (d/sync (->conn conn) (java.util.Date. (System/currentTimeMillis))) 500 nil)
(deref (d/sync-schema (->conn conn) (inc txn-id)) 500 nil)] ; frustratingly, doesn't even work with un-`inc`ed txn-id
txn-report))

(defn db* [x]
(cond (db? x)
x
(poshable-conn? x)
(-> x :datomic-conn d/db)
(conn? x)
(d/db x)
:else x #_(throw (ex-info "Object cannot be converted into DB" {:obj x}))))

(defn q* [q x & args]
(apply d/q q (db* x) args))

(defn with-conn [uri f]
(try (d/create-database uri)
(let [conn (d/connect uri)]
(try (f conn)
(finally (d/release conn))))
(finally (d/delete-database uri))))

(defn with-posh-conn [dcfg retrieve uri schemas f]
(with-conn uri
(fn [conn*]
(let [_ @(d/transact conn* (install-partition default-partition))
_ (transact-schemas! conn* schemas)
poshed (base/posh-one! dcfg conn* retrieve) ; This performs a `with-meta` so the result is needed
conn (-> poshed deref :conns :conn0) ; Has the necessary meta ; TODO simplify this
_ (assert (poshable-conn? conn))]
(try (f poshed conn)
(finally (stop conn))))))) ; TODO `unposh!`
2 changes: 1 addition & 1 deletion src/posh/lib/db.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
db (if with (:db-after ((:with dcfg) db with)) db) ;; with tx-data
db (if filter
((:filter dcfg) db (if (symbol? filter)
#?(:clj (resolve filter) :cljs nil)
#?(:clj (resolve filter) :cljs nil) ; TODO why use resolve?
filter))
db) ;; filter pred-sym
]
Expand Down
Loading