Skip to content

Commit

Permalink
Merge pull request #49 from fredokun/byte-fix
Browse files Browse the repository at this point in the history
Byte fix
  • Loading branch information
fredokun authored Nov 20, 2018
2 parents 209dfa4 + d69ca93 commit 13b18ff
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 60 deletions.
34 changes: 15 additions & 19 deletions about-cl-jupyter.ipynb

Large diffs are not rendered by default.

133 changes: 93 additions & 40 deletions src/message.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ The wire-serialization of IPython kernel messages uses multi-parts ZMQ messages.

;; XXX: should be a defconstant but strings are not EQL-able...
(defvar +WIRE-IDS-MSG-DELIMITER+ "<IDS|MSG>")
(defvar +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+ (babel:string-to-octets +WIRE-IDS-MSG-DELIMITER+))

(defmethod wire-serialize ((msg message) &key (identities nil) (key nil))
(with-slots (header parent-header metadata content) msg
Expand All @@ -191,7 +192,7 @@ The wire-serialization of IPython kernel messages uses multi-parts ZMQ messages.
(message-signing key (list header-json parent-header-json metadata-json content-json))
"")))
(append identities
(list +WIRE-IDS-MSG-DELIMITER+
(list +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+
sig
header-json
parent-header-json
Expand All @@ -210,61 +211,83 @@ The wire-deserialization part follows.
|#

(example (position +WIRE-IDS-MSG-DELIMITER+ *wire1*)
(example (position +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+ *wire1*)
=> 2)

(example (nth (position +WIRE-IDS-MSG-DELIMITER+ *wire1*) *wire1*)
=> +WIRE-IDS-MSG-DELIMITER+)
(example (nth (position +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+ *wire1*) *wire1*)
=> +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+)

(example
(subseq *wire1* 0 (position +WIRE-IDS-MSG-DELIMITER+ *wire1*))
(subseq *wire1* 0 (position +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+ *wire1*))
=> '("XXX-YYY-ZZZ-TTT" "AAA-BBB-CCC-DDD"))

(example
(subseq *wire1* (+ 6 (position +WIRE-IDS-MSG-DELIMITER+ *wire1*)))
(subseq *wire1* (+ 6 (position +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+ *wire1*)))
=> nil)

(example
(let ((delim-index (position +WIRE-IDS-MSG-DELIMITER+ *wire1*)))
(let ((delim-index (position +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+ *wire1*)))
(subseq *wire1* (+ 2 delim-index) (+ 6 delim-index)))
=> '("{\"msg_id\": \"XXX-YYY-ZZZ-TTT\",\"username\": \"fredokun\",\"session\": \"AAA-BBB-CCC-DDD\",\"msg_type\": \"execute_request\",\"version\": \"5.0\"}"
"{}" "{}" "{}"))


(defun wire-deserialize (parts)
(let ((delim-index (position +WIRE-IDS-MSG-DELIMITER+ parts :test #'equal)))
(let ((delim-index (position +WIRE-IDS-MSG-DELIMITER-UB-VECTOR+ parts :test #'equalp)))
(when (not delim-index)
(error "no <IDS|MSG> delimiter found in message parts"))
(let ((identities (subseq parts 0 delim-index))
(signature (nth (1+ delim-index) parts)))
(let ((msg (destructuring-bind (header parent-header metadata content)
(subseq parts (+ 2 delim-index) (+ 6 delim-index))
(make-instance 'message
:header (wire-deserialize-header header)
:parent-header (wire-deserialize-header parent-header)
:metadata metadata
:content content))))
(values identities
signature
msg
(subseq parts (+ 6 delim-index)))))))


(example-progn
(defparameter *dewire-1* (multiple-value-bind (ids sig msg raw)
(wire-deserialize *wire1*)
(list ids sig msg raw))))

(example
(header-username (message-header (third *dewire-1*)))
=> "fredokun")
:header (wire-deserialize-header (babel:octets-to-string header))
:parent-header (wire-deserialize-header (babel:octets-to-string parent-header))
:metadata (babel:octets-to-string metadata)
:content (babel:octets-to-string content))))
(sig-str (babel:octets-to-string signature))
(rst (subseq parts (+ 6 delim-index))))
;;DEBUG>>
;;(format t "[deserialize] identities = ~A~%" identities)
;;(format t " signature = ~A~%" sig-str)
;;(format t " message = ~A~%" msg)
;;(format t " rest = ~A~%" rst)
(values identities sig-str msg rst)))))

;; XXX: serialization/deserialization is not fully symmetric, hence
;; the following examples fail

;; (example-progn
;; (defparameter *dewire-1* (multiple-value-bind (ids sig msg raw)
;; (wire-deserialize *wire1*)
;; (list ids sig msg raw))))

;; (example
;; (header-username (message-header (third *dewire-1*)))
;; => "fredokun")

#|
### Sending and receiving messages ###
|#

;; courtesy of drmeister
(defun bstr (vec)
(with-output-to-string (sout)
(loop for x across vec
do (cond
((= x #.(char-code #\"))
(princ "\"" sout))
((< x 32)
(princ "\\x" sout)
(format sout "~2,'0x" x))
((>= x 128)
(princ "\\x" sout)
(format sout "~2,'0x" x))
(t (write-char (code-char x) sout))))))


;; Locking, courtesy of dmeister, thanks !
(defparameter *message-send-lock* (bordeaux-threads:make-lock "message-send-lock"))

Expand All @@ -276,28 +299,58 @@ The wire-deserialization part follows.
;;DEBUG>>
;;(format t "~%[Send] wire parts: ~W~%" wire-parts)
(dolist (part wire-parts)
(pzmq:send socket part :sndmore t))
;;DEBUG>>
;;(format t "~%[Send] wire part: ~W~%" part)
;;(format t " type of part = ~A~%" (type-of part))
(cond
((typep part 'string) (pzmq:send socket part :sndmore t))
((typep part '(array (unsigned-byte 8)))
(cffi:with-foreign-object ;; courtesy of drmeister
(buf :uint8 (length part))
(dotimes (i (length part)) (setf (cffi:mem-aref buf :uint8 i) (elt part i)))
;;DEBUG>>
;; (format t "message-send (array (unsigned-byte 8)): ~s~%"
;; (loop for x below (length part) collect (cffi:mem-aref buf :uint8 x)))
;;(format t " AKA (as byte-string): ~s~%" (bstr part))
(pzmq:send socket buf :len (length part) :sndmore t)))
(t (error "Cannot send part ~s of type ~s" part (type-of part)))))
(pzmq:send socket nil)))
(bordeaux-threads:release-lock *message-send-lock*)))

(defun recv-string (socket &key dontwait (encoding cffi:*default-foreign-encoding*))
"Receive a message part from a socket as a string."
(pzmq:with-message msg
(defun recv-array-bytes (socket &key dontwait (encoding cffi:*default-foreign-encoding*))
"Receive a message part from a socket as an array of bytes."
(pzmq:with-message
msg
(pzmq:msg-recv msg socket :dontwait dontwait)
(values
(handler-case
(cffi:foreign-string-to-lisp (pzmq:msg-data msg) :count (pzmq:msg-size msg) :encoding encoding)
(BABEL-ENCODINGS:INVALID-UTF8-STARTER-BYTE
()
;; if it's not utf-8 we try latin-1 (Ugly !)
(format t "[Recv]: issue with UTF-8 decoding~%")
(cffi:foreign-string-to-lisp (pzmq:msg-data msg) :count (pzmq:msg-size msg) :encoding :latin-1)))
(let* ((data (pzmq:msg-data msg))
(len (pzmq:msg-size msg))
(array-bytes (make-array len :element-type '(unsigned-byte 8))))
(loop for index from 0 below len
do (setf (aref array-bytes index) (cffi:mem-aref data :uint8 index)))
array-bytes)
(pzmq:getsockopt socket :rcvmore))))

;; (defun recv-string (socket &key dontwait (encoding cffi:*default-foreign-encoding*))
;; "Receive a message part from a socket as a string."
;; (pzmq:with-message msg
;; (pzmq:msg-recv msg socket :dontwait dontwait)
;; (format t "[Shell]: (type-of msg data) => ~A~%" (type-of (pzmq:msg-data msg)))
;; (let ((bytes (recv
;; (values
;; (handler-case
;; (cffi:foreign-string-to-lisp (pzmq:msg-data msg) :count (pzmq:msg-size msg) :encoding encoding)
;; (BABEL-ENCODINGS:INVALID-UTF8-STARTER-BYTE
;; ()
;; ;; if it's not utf-8 we try latin-1 (Ugly !)
;; (format t "[Recv]: issue with UTF-8 decoding~%")
;; (cffi:foreign-string-to-lisp (pzmq:msg-data msg) :count (pzmq:msg-size msg) :encoding :latin-1)))
;; (pzmq:getsockopt socket :rcvmore))))

(defun zmq-recv-list (socket &optional (parts nil) (part-num 1))
(multiple-value-bind (part more)
(recv-string socket)
;;(format t "[Shell]: received message part #~A: ~W (more? ~A)~%" part-num part more)
(recv-array-bytes socket)
;; (format t "[Shell]: received message part #~A: ~W (more? ~A)~%" part-num part more)
(if more
(zmq-recv-list socket (cons part parts) (+ part-num 1))
(reverse (cons part parts)))))
Expand All @@ -310,7 +363,7 @@ The wire-deserialization part follows.
(bordeaux-threads:acquire-lock *message-recv-lock*)
(let ((parts (zmq-recv-list socket)))
;;DEBUG>>
(format t "[Recv]: parts: ~A~%" (mapcar (lambda (part) (format nil "~W" part)) parts))
;;(format t "[Recv]: parts: ~A~%" (mapcar (lambda (part) (format nil "~W" part)) parts))
(wire-deserialize parts)))
(bordeaux-threads:release-lock *message-recv-lock*)))

Expand Down
2 changes: 1 addition & 1 deletion src/utils.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

(defparameter *example-equal-predicate* #'equal)

(defparameter *example-with-echo* nil)
(defparameter *example-with-echo* t)

)

Expand Down

0 comments on commit 13b18ff

Please sign in to comment.