From 50e6f794ee60403cfdf16cc6b5498182cad78dfd Mon Sep 17 00:00:00 2001 From: shen2 Date: Tue, 21 Jul 2015 12:23:14 +0800 Subject: [PATCH] simplify StreamReader, add ProgressiveStreamReader --- src/Connection.php | 716 +++++++++++------------ src/Response/ProgressiveStreamReader.php | 42 ++ src/Response/StreamReader.php | 31 +- src/Type/CollectionList.php | 2 +- src/Type/CollectionMap.php | 2 +- src/Type/Tuple.php | 2 +- 6 files changed, 406 insertions(+), 389 deletions(-) create mode 100644 src/Response/ProgressiveStreamReader.php diff --git a/src/Connection.php b/src/Connection.php index b502c08..5b4fc3e 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -4,372 +4,372 @@ class Connection { - /** - * Connection options - * @var array - */ - protected $_options = [ - 'CQL_VERSION' => '3.0.0' - ]; + /** + * Connection options + * @var array + */ + protected $_options = [ + 'CQL_VERSION' => '3.0.0' + ]; - /** - * @var string - */ - protected $_keyspace; + /** + * @var string + */ + protected $_keyspace; - /** - * @var array|\Traversable - */ - protected $_nodes; + /** + * @var array|\Traversable + */ + protected $_nodes; - /** - * @var Connection\Socket|Connection\Stream - */ - protected $_node; - - /** - * @var int - */ - protected $_lastStreamId = 0; - - /** - * - * @var array - */ - protected $_statements = []; - - /** - * - * @var \SplQueue - */ - protected $_recycledStreams; + /** + * @var Connection\Socket|Connection\Stream + */ + protected $_node; + + /** + * @var int + */ + protected $_lastStreamId = 0; + + /** + * + * @var array + */ + protected $_statements = []; + + /** + * + * @var \SplQueue + */ + protected $_recycledStreams; - /** - * @var int - */ - protected $_consistency = Request\Request::CONSISTENCY_ONE; + /** + * @var int + */ + protected $_consistency = Request\Request::CONSISTENCY_ONE; - /** - * @param array|\Traversable $nodes - * @param string $keyspace - * @param array $options - */ - public function __construct($nodes, $keyspace = '', array $options = []) { - if (is_array($nodes)) - shuffle($nodes); - - $this->_nodes = $nodes; - $this->_options = array_merge($this->_options, $options); - $this->_keyspace = $keyspace; - $this->_recycledStreams = new \SplQueue(); - } - - /** - * @throws Exception - */ - protected function _connect() { - foreach($this->_nodes as $options){ - if (is_string($options)){ - if (!preg_match('/^(((tcp|udp|unix|ssl|tls):\/\/)?[\w\.\-]+)(\:(\d+))?/i', $options, $matches)) - throw new Exception('Invalid host: ' . $options); - - $options = [ 'host' => $matches[1],]; - - if (!empty($matches[5])) - $options['port'] = $matches[5]; - - // Use Connection\Stream when protocol prefix is defined. - try { - $this->_node = empty($matches[2]) ? new Connection\Socket($options) : new Connection\Stream($options); - } catch (Exception $e) { - continue; - } - } - else{ - $className = isset($options['class']) ? $options['class'] : 'Cassandra\Connection\Socket'; - try { - $this->_node = new $className($options); - } catch (Exception $e) { - continue; - } - } - return; - } - - throw new Exception("Unable to connect to all Cassandra nodes."); - } - - /** - * @return bool - */ - public function disconnect() { - if ($this->_node === null) - return true; - - return $this->_node->close(); - } - - /** - * @return bool - */ - public function isConnected() { - return $this->_node !== null; - } - - /** - * - * @param Response\Event $response - */ - public function trigger($response){ - } - - /** - * - * @param int $streamId - * @throws Response\Exception - * @return Response\Response - */ - public function getResponse($streamId = 0){ - do{ - $response = $this->_getResponse(); - } - while($response->getStream() !== $streamId); - - return $response; - } - - /** - * - * @throws Response\Exception - * @return Response\Response - */ - protected function _getResponse() { - $version = unpack('C', $this->_node->read(1))[1]; - switch($version) { - case 0x83: - $header = unpack('Cflags/nstream/Copcode/Nlength', $this->_node->read(8)); - $body = $header['length'] === 0 ? '' : $this->_node->read($header['length']); - - static $responseClassMap = [ - Frame::OPCODE_ERROR => 'Cassandra\Response\Error', - Frame::OPCODE_READY => 'Cassandra\Response\Ready', - Frame::OPCODE_AUTHENTICATE => 'Cassandra\Response\Authenticate', - Frame::OPCODE_SUPPORTED => 'Cassandra\Response\Supported', - Frame::OPCODE_RESULT => 'Cassandra\Response\Result', - Frame::OPCODE_EVENT => 'Cassandra\Response\Event', - Frame::OPCODE_AUTH_SUCCESS => 'Cassandra\Response\AuthSuccess', - ]; - - if (!isset($responseClassMap[$header['opcode']])) - throw new Response\Exception('Unknown response'); - - $responseClass = $responseClassMap[$header['opcode']]; - $response = new $responseClass($header, Response\StreamReader::createFromData($body)); - - if ($header['stream'] !== 0){ - if (isset($this->_statements[$header['stream']])){ - $this->_statements[$header['stream']]->setResponse($response); - unset($this->_statements[$header['stream']]); - $this->_recycledStreams->enqueue($header['stream']); - } - elseif ($response instanceof Response\Event){ - $this->trigger($response); - } - } - - return $response; - default: - throw new Exception('php-cassandra supports CQL binary protocol v3 only, please upgrade your Cassandra to 2.1 or later.'); - } - } - - /** - * Wait until all statements received response. - */ - public function flush(){ - while(!empty($this->_statements)){ - $this->_getResponse(); - } - } - - /** - * @return Connection\Node - */ - public function getNode() { - return $this->_node; - } + /** + * @param array|\Traversable $nodes + * @param string $keyspace + * @param array $options + */ + public function __construct($nodes, $keyspace = '', array $options = []) { + if (is_array($nodes)) + shuffle($nodes); + + $this->_nodes = $nodes; + $this->_options = array_merge($this->_options, $options); + $this->_keyspace = $keyspace; + $this->_recycledStreams = new \SplQueue(); + } + + /** + * @throws Exception + */ + protected function _connect() { + foreach($this->_nodes as $options){ + if (is_string($options)){ + if (!preg_match('/^(((tcp|udp|unix|ssl|tls):\/\/)?[\w\.\-]+)(\:(\d+))?/i', $options, $matches)) + throw new Exception('Invalid host: ' . $options); + + $options = [ 'host' => $matches[1],]; + + if (!empty($matches[5])) + $options['port'] = $matches[5]; + + // Use Connection\Stream when protocol prefix is defined. + try { + $this->_node = empty($matches[2]) ? new Connection\Socket($options) : new Connection\Stream($options); + } catch (Exception $e) { + continue; + } + } + else{ + $className = isset($options['class']) ? $options['class'] : 'Cassandra\Connection\Socket'; + try { + $this->_node = new $className($options); + } catch (Exception $e) { + continue; + } + } + return; + } + + throw new Exception("Unable to connect to all Cassandra nodes."); + } + + /** + * @return bool + */ + public function disconnect() { + if ($this->_node === null) + return true; + + return $this->_node->close(); + } + + /** + * @return bool + */ + public function isConnected() { + return $this->_node !== null; + } + + /** + * + * @param Response\Event $response + */ + public function trigger($response){ + } + + /** + * + * @param int $streamId + * @throws Response\Exception + * @return Response\Response + */ + public function getResponse($streamId = 0){ + do{ + $response = $this->_getResponse(); + } + while($response->getStream() !== $streamId); + + return $response; + } + + /** + * + * @throws Response\Exception + * @return Response\Response + */ + protected function _getResponse() { + $version = unpack('C', $this->_node->read(1))[1]; + switch($version) { + case 0x83: + $header = unpack('Cflags/nstream/Copcode/Nlength', $this->_node->read(8)); + $body = $header['length'] === 0 ? '' : $this->_node->read($header['length']); + + static $responseClassMap = [ + Frame::OPCODE_ERROR => 'Cassandra\Response\Error', + Frame::OPCODE_READY => 'Cassandra\Response\Ready', + Frame::OPCODE_AUTHENTICATE => 'Cassandra\Response\Authenticate', + Frame::OPCODE_SUPPORTED => 'Cassandra\Response\Supported', + Frame::OPCODE_RESULT => 'Cassandra\Response\Result', + Frame::OPCODE_EVENT => 'Cassandra\Response\Event', + Frame::OPCODE_AUTH_SUCCESS => 'Cassandra\Response\AuthSuccess', + ]; + + if (!isset($responseClassMap[$header['opcode']])) + throw new Response\Exception('Unknown response'); + + $responseClass = $responseClassMap[$header['opcode']]; + $response = new $responseClass($header, new Response\StreamReader($body)); + + if ($header['stream'] !== 0){ + if (isset($this->_statements[$header['stream']])){ + $this->_statements[$header['stream']]->setResponse($response); + unset($this->_statements[$header['stream']]); + $this->_recycledStreams->enqueue($header['stream']); + } + elseif ($response instanceof Response\Event){ + $this->trigger($response); + } + } + + return $response; + default: + throw new Exception('php-cassandra supports CQL binary protocol v3 only, please upgrade your Cassandra to 2.1 or later.'); + } + } + + /** + * Wait until all statements received response. + */ + public function flush(){ + while(!empty($this->_statements)){ + $this->_getResponse(); + } + } + + /** + * @return Connection\Node + */ + public function getNode() { + return $this->_node; + } - /** - * Connect to database - * @throws Exception - * @return bool - */ - public function connect() { - if ($this->_node !== null) - return true; - - $this->_connect(); - - $response = $this->syncRequest(new Request\Startup($this->_options)); - - if ($response instanceof Response\Authenticate){ - $nodeOptions = $this->_node->getOptions(); - - if (empty($nodeOptions['username']) || empty($nodeOptions['password'])) - throw new Exception('Username and password are required.'); - - $this->syncRequest(new Request\AuthResponse($nodeOptions['username'], $nodeOptions['password'])); - } - - if (!empty($this->_keyspace)) - $this->syncRequest(new Request\Query("USE {$this->_keyspace};")); - - return true; - } + /** + * Connect to database + * @throws Exception + * @return bool + */ + public function connect() { + if ($this->_node !== null) + return true; + + $this->_connect(); + + $response = $this->syncRequest(new Request\Startup($this->_options)); + + if ($response instanceof Response\Authenticate){ + $nodeOptions = $this->_node->getOptions(); + + if (empty($nodeOptions['username']) || empty($nodeOptions['password'])) + throw new Exception('Username and password are required.'); + + $this->syncRequest(new Request\AuthResponse($nodeOptions['username'], $nodeOptions['password'])); + } + + if (!empty($this->_keyspace)) + $this->syncRequest(new Request\Query("USE {$this->_keyspace};")); + + return true; + } - /** - * @param Request\Request $request - * @throws Exception - * @return Response\Response - */ - public function syncRequest(Request\Request $request) { - if ($this->_node === null) - $this->connect(); - - $this->_node->write($request->__toString()); - - $response = $this->getResponse(); - - if ($response instanceof Response\Error) - throw $response->getException(); - - return $response; - } - - /** - * - * @param Request\Request $request - * @return Statement - */ - public function asyncRequest(Request\Request $request) { - if ($this->_node === null) - $this->connect(); - - $streamId = $this->_getNewStreamId(); - $request->setStream($streamId); - - $this->_node->write($request->__toString()); - - return $this->_statements[$streamId] = new Statement($this, $streamId); - } + /** + * @param Request\Request $request + * @throws Exception + * @return Response\Response + */ + public function syncRequest(Request\Request $request) { + if ($this->_node === null) + $this->connect(); + + $this->_node->write($request->__toString()); + + $response = $this->getResponse(); + + if ($response instanceof Response\Error) + throw $response->getException(); + + return $response; + } + + /** + * + * @param Request\Request $request + * @return Statement + */ + public function asyncRequest(Request\Request $request) { + if ($this->_node === null) + $this->connect(); + + $streamId = $this->_getNewStreamId(); + $request->setStream($streamId); + + $this->_node->write($request->__toString()); + + return $this->_statements[$streamId] = new Statement($this, $streamId); + } - /** - * - * @throws Exception - * @return int - */ - protected function _getNewStreamId(){ - if ($this->_lastStreamId < 32767) - return ++$this->_lastStreamId; - - while ($this->_recycledStreams->isEmpty()){ - $this->_getResponse(); - } - - return $this->_recycledStreams->dequeue(); - } - - /***** Shorthand Methods ******/ - /** - * - * @param string $cql - * @throws Exception - * @return array - */ - public function prepare($cql) { - $response = $this->syncRequest(new Request\Prepare($cql)); - - return $response->getData(); - } - - /** - * - * @param string $queryId - * @param array $values - * @param int $consistency - * @param array $options - * @throws Exception - * @return Response\Response - */ - public function executeSync($queryId, array $values = [], $consistency = null, array $options = []){ - $request = new Request\Execute($queryId, $values, $consistency === null ? $this->_consistency : $consistency, $options); - - return $this->syncRequest($request); - } - - /** - * - * @param string $queryId - * @param array $values - * @param int $consistency - * @param array $options - * @throws Exception - * @return Statement - */ - public function executeAsync($queryId, array $values = [], $consistency = null, array $options = []){ - $request = new Request\Execute($queryId, $values, $consistency === null ? $this->_consistency : $consistency, $options); - - return $this->asyncRequest($request); - } - - /** - * - * @param string $cql - * @param array $values - * @param int $consistency - * @param array $options - * @throws Exception - * @return Response\Response - */ - public function querySync($cql, array $values = [], $consistency = null, array $options = []){ - $request = new Request\Query($cql, $values, $consistency === null ? $this->_consistency : $consistency, $options); + /** + * + * @throws Exception + * @return int + */ + protected function _getNewStreamId(){ + if ($this->_lastStreamId < 32767) + return ++$this->_lastStreamId; + + while ($this->_recycledStreams->isEmpty()){ + $this->_getResponse(); + } + + return $this->_recycledStreams->dequeue(); + } + + /***** Shorthand Methods ******/ + /** + * + * @param string $cql + * @throws Exception + * @return array + */ + public function prepare($cql) { + $response = $this->syncRequest(new Request\Prepare($cql)); + + return $response->getData(); + } + + /** + * + * @param string $queryId + * @param array $values + * @param int $consistency + * @param array $options + * @throws Exception + * @return Response\Response + */ + public function executeSync($queryId, array $values = [], $consistency = null, array $options = []){ + $request = new Request\Execute($queryId, $values, $consistency === null ? $this->_consistency : $consistency, $options); + + return $this->syncRequest($request); + } + + /** + * + * @param string $queryId + * @param array $values + * @param int $consistency + * @param array $options + * @throws Exception + * @return Statement + */ + public function executeAsync($queryId, array $values = [], $consistency = null, array $options = []){ + $request = new Request\Execute($queryId, $values, $consistency === null ? $this->_consistency : $consistency, $options); + + return $this->asyncRequest($request); + } + + /** + * + * @param string $cql + * @param array $values + * @param int $consistency + * @param array $options + * @throws Exception + * @return Response\Response + */ + public function querySync($cql, array $values = [], $consistency = null, array $options = []){ + $request = new Request\Query($cql, $values, $consistency === null ? $this->_consistency : $consistency, $options); - return $this->syncRequest($request); - } - - /** - * - * @param string $cql - * @param array $values - * @param int $consistency - * @param array $options - * @throws Exception - * @return Statement - */ - public function queryAsync($cql, array $values = [], $consistency = null, array $options = []){ - $request = new Request\Query($cql, $values, $consistency === null ? $this->_consistency : $consistency, $options); + return $this->syncRequest($request); + } + + /** + * + * @param string $cql + * @param array $values + * @param int $consistency + * @param array $options + * @throws Exception + * @return Statement + */ + public function queryAsync($cql, array $values = [], $consistency = null, array $options = []){ + $request = new Request\Query($cql, $values, $consistency === null ? $this->_consistency : $consistency, $options); - return $this->asyncRequest($request); - } - - /** - * @param string $keyspace - * @throws Exception - * @return Response\Result - */ - public function setKeyspace($keyspace) { - $this->_keyspace = $keyspace; - - if ($this->_node === null) - return; - - return $this->syncRequest(new Request\Query("USE {$this->_keyspace};")); - } - - /** - * @param int $consistency - */ - public function setConsistency($consistency){ - $this->_consistency = $consistency; - } + return $this->asyncRequest($request); + } + + /** + * @param string $keyspace + * @throws Exception + * @return Response\Result + */ + public function setKeyspace($keyspace) { + $this->_keyspace = $keyspace; + + if ($this->_node === null) + return; + + return $this->syncRequest(new Request\Query("USE {$this->_keyspace};")); + } + + /** + * @param int $consistency + */ + public function setConsistency($consistency){ + $this->_consistency = $consistency; + } } diff --git a/src/Response/ProgressiveStreamReader.php b/src/Response/ProgressiveStreamReader.php new file mode 100644 index 0000000..2155eec --- /dev/null +++ b/src/Response/ProgressiveStreamReader.php @@ -0,0 +1,42 @@ +data = $data; + $this->dataLength = strlen($data); + } + + public function setSource($source){ + $this->source = $source; + } + + /** + * Read data from stream. + * + * NOTICE When $this->offset == strlen($this->data), substr() will return false. You'd better avoid call read() when $length == 0. + * + * @param int $length $length should be > 0. + * @return string + */ + protected function read($length) { + while($this->dataLength < $this->offset + $length){ + if ($this->source === null) + throw new Exception('The response is incomplete, or types expectation mismatch.'); + + $this->data .= $received = $this->source->readOnce($this->offset + $length - $this->dataLength); + $this->dataLength += strlen($received); + } + + $output = substr($this->data, $this->offset, $length); + $this->offset += $length; + return $output; + } +} diff --git a/src/Response/StreamReader.php b/src/Response/StreamReader.php index b8b12f3..51f4dca 100644 --- a/src/Response/StreamReader.php +++ b/src/Response/StreamReader.php @@ -5,38 +5,18 @@ class StreamReader { - /** - */ - protected $source; - /** * @var string */ - protected $data = ''; + protected $data; /** * @var int */ protected $offset = 0; - protected $dataLength = 0; - - protected $totalLength = 0; - - public function __construct($totalLength){ - $this->totalLength = $totalLength; - } - - public function setSource($source){ - $this->source = $source; - } - - public static function createFromData($data){ - $length = strlen($data); - $stream = new self($length); - $stream->data = $data; - $stream->dataLength = $length; - return $stream; + public function __construct($data){ + $this->data = $data; } /** @@ -48,11 +28,6 @@ public static function createFromData($data){ * @return string */ protected function read($length) { - while($this->dataLength < $this->offset + $length){ - $this->data .= $received = $this->source->readOnce($this->totalLength - $this->dataLength); - $this->dataLength += strlen($received); - } - $output = substr($this->data, $this->offset, $length); $this->offset += $length; return $output; diff --git a/src/Type/CollectionList.php b/src/Type/CollectionList.php index 117176c..6f049bb 100644 --- a/src/Type/CollectionList.php +++ b/src/Type/CollectionList.php @@ -32,6 +32,6 @@ public static function binary($value, array $definition){ } public static function parse($binary, array $definition){ - return \Cassandra\Response\StreamReader::createFromData($binary)->readList($definition); + return (new \Cassandra\Response\StreamReader($binary))->readList($definition); } } diff --git a/src/Type/CollectionMap.php b/src/Type/CollectionMap.php index 4e713fb..dea4ac7 100644 --- a/src/Type/CollectionMap.php +++ b/src/Type/CollectionMap.php @@ -44,6 +44,6 @@ public static function binary($value, array $definition){ } public static function parse($binary, array $definition){ - return \Cassandra\Response\StreamReader::createFromData($binary)->readMap($definition); + return (new \Cassandra\Response\StreamReader($binary))->readMap($definition); } } diff --git a/src/Type/Tuple.php b/src/Type/Tuple.php index 50629d0..411fea9 100644 --- a/src/Type/Tuple.php +++ b/src/Type/Tuple.php @@ -45,6 +45,6 @@ public static function binary($value, array $definition){ * @return array */ public static function parse($binary, array $definition){ - return \Cassandra\Response\StreamReader::createFromData($binary)->readTuple($definition); + return (new \Cassandra\Response\StreamReader($binary))->readTuple($definition); } }