Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Stream client gets stuck when sending subscribe/unsubscribe messages #491

Open
2 tasks done
bulletbs opened this issue Jul 26, 2024 · 4 comments
Open
2 tasks done

Comments

@bulletbs
Copy link

bulletbs commented Jul 26, 2024

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

I was trying to manage my symbols list in CryptoDataStream client.
But it works fine only after the first subscription message.
After the second message it freezes without any errors or warnings and no quotes comes after that from the server.

Same time I did check the console WS client websocat and subscribe/unsubscribe command works perfectly fine there.

Expected Behavior

It would be good to be able to manage the quotes symbols list or any other data types. To subscribe and unsubscribe without any issues.

SDK Version I encountered this issue in

alpaca-py==0.28.0

Steps To Reproduce

1. Run this script with your API_KEY and API_SECRET:
# ------------
import asyncio
import logging
from alpaca.data.live import CryptoDataStream

logging.basicConfig(filename='wsprices.log', level=logging.DEBUG)

async def handler(data):
    print(data)


client = CryptoDataStream(<API_KEY>, <API_SECRET>)


async def manage_socket():
    try:
        await asyncio.sleep(5)
        print('subscribe ETH/USD')
        client.subscribe_quotes(handler, 'ETH/USD')
        await asyncio.sleep(5)
        print("subscribe LINK/USD")
        client.subscribe_quotes(handler, 'LINK/USD')
        await asyncio.sleep(5)
        print("subscribe TON/USD")
        client.subscribe_quotes(handler, 'TON/USD')
        await asyncio.sleep(5)
        print('end')
    except Exception as e:
        print(f"Error in management: {e}")


async def start_server():
    await asyncio.gather(
        asyncio.create_task(client._run_forever())
        asyncio.create_task(manage_socket()),
    )

if __name__ == '__main__':
    asyncio.run(start_server())



2. You will see only one subscription message and this is the end.

3. If you subscribe only 1 time - it will work fine until you send next message

async def manage_socket():
        await asyncio.sleep(5)
        print('subscribe ETH/USD')
        client.subscribe_quotes(handler, 'ETH/USD')
        print('end')

Filled out the Steps to Reproduce section?

  • I have entered valid steps to reproduce my issue or have attached a minimally reproducible case in code that shows my issue happening; and understand that without this my issue will be flagged as invalid and closed after 30 days.

Anything else?

Here is my log from Logging, after client is stuck:
INFO:alpaca.data.live.websocket:started data stream
INFO:alpaca.data.live.websocket:starting data websocket connection
INFO:alpaca.data.live.websocket:connecting to wss://stream.data.alpaca.markets/v1beta3/crypto/us
DEBUG:websockets.client:= connection is CONNECTING
DEBUG:websockets.client:> GET /v1beta3/crypto/us HTTP/1.1
DEBUG:websockets.client:> Host: stream.data.alpaca.markets
DEBUG:websockets.client:> Upgrade: websocket
DEBUG:websockets.client:> Connection: Upgrade
DEBUG:websockets.client:> Sec-WebSocket-Key: VmQtSjKWRGZM63YE+e2C5A==
DEBUG:websockets.client:> Sec-WebSocket-Version: 13
DEBUG:websockets.client:> Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
DEBUG:websockets.client:> Content-Type: application/msgpack
DEBUG:websockets.client:> User-Agent: APCA-PY/0.28.0
DEBUG:websockets.client:< HTTP/1.1 101 Switching Protocols
DEBUG:websockets.client:< Date: Fri, 26 Jul 2024 14:50:30 GMT
DEBUG:websockets.client:< Connection: upgrade
DEBUG:websockets.client:< Upgrade: websocket
DEBUG:websockets.client:< Sec-WebSocket-Accept: CJRRxlvnS9ajFKdscEU42ZGkg0U=
DEBUG:websockets.client:< Strict-Transport-Security: max-age=31536000; includeSubDomains
DEBUG:websockets.client:= connection is OPEN
DEBUG:websockets.client:< BINARY 91 82 a1 54 a7 73 75 63 63 65 73 73 a3 6d 73 67 ... 6f 6e 6e 65 63 74 65 64 [26 bytes]
DEBUG:websockets.client:> BINARY 83 a6 61 63 74 69 6f 6e a4 61 75 74 68 a3 6b 65 ... 33 68 36 74 45 59 46 55 [87 bytes]
DEBUG:websockets.client:< BINARY 91 82 a1 54 a7 73 75 63 63 65 73 73 a3 6d 73 67 ... 6e 74 69 63 61 74 65 64 [30 bytes]
INFO:alpaca.data.live.websocket:connected to wss://stream.data.alpaca.markets/v1beta3/crypto/us
DEBUG:websockets.client:> BINARY 82 a6 71 75 6f 74 65 73 91 a7 45 54 48 2f 55 53 ... 75 62 73 63 72 69 62 65 [34 bytes, continued]
DEBUG:websockets.client:> CONT '' [0 bytes]
DEBUG:websockets.client:< BINARY 91 87 a1 54 ac 73 75 62 73 63 72 69 70 74 69 6f ... 69 6c 79 42 61 72 73 90 [83 bytes]
INFO:alpaca.data.live.websocket:subscribed to quotes: ['ETH/USD']

@hiohiohio
Copy link
Contributor

hiohiohio commented Jul 29, 2024

@bulletbs seems alpaca-py does not support calling client.subscribe_*() while client.run() as you can see asyncio.run(self._run_forever()) in the client.run(). And for your case since you called client._run_forever() directly which is already unsupported operation of alpaca-py though, asyncio.run_coroutine_threadsafe() in client._subscribe().
edit: I realized that this may be possible with multi process/treads. but sorry, I haven't tried.

If you really want to achieve, you could call clinet._send_subscribe_msg() instead. But as you can see, it has _ prefix in the method, therefore, alpaca-py could change it at any version without any notification. Please be aware of this possibility of change in the future version of alpaca-py.

async def manage_socket():
    try:
        symbols = ["ETH/USD", "LINK/USD", "TON/USD"]
        for symbol in symbols:
            await asyncio.sleep(5)
            print(f'subscribe {symbol}')
            client._handlers["quotes"][symbol] = handler
            if client._running:
                await client._send_subscribe_msg()
        print('end')
    except Exception as e:
        print(f"Error in management: {e}")

@lacabra
Copy link

lacabra commented Aug 2, 2024

Thanks @hiohiohio for your insights on this issue, much appreciated. As I am reading also related Issue #476 where you suggested the use of _run_forever() through asyncio's create_task, gather and run to manage multiple websockets datastreams; but now you mention that alpaca-py does not support calling client.subscribe_*() while client.run().

I want to ask if you would revise your answer to both issues 476 and 491 together, namely:

What is the recommended way to start/manage two datastreams AND be able to dynamically/programmatically subscribe and unsubscribe symbols while the program is running?

Thank you! 🙏

@hiohiohio
Copy link
Contributor

I want to ask if you would revise your answer to both issues 476 and 491 together, namely:

@lacabra You are right. I should have mentioned the calling _ prefix function is unsupported operation of alpaca-py. I have added this into the comment.

What is the recommended way to start/manage two datastreams AND be able to dynamically/programmatically subscribe and unsubscribe symbols while the program is running?

This could be a future request of alpaca-py. By looking the current code of Alpaca-py, current implementation expects subscribing required symbols at the beginning and handle everything in the passed handler. Seems multi datastreams (i.e. equity/crypto/options) in a single thread are not considered or different thread/process. My feeling is rewriting some to cover the use-cases. However, it may take a some time. Therefore, I wrote the unsupported operations as a quick/optimized resolution. But again, in the process of rewriting in the future, even though I try to keep compatibility as much as possible, it could be possible to change structure/name of _ prefix functions or even non _ prefix functions if required.

@sakul95
Copy link

sakul95 commented Nov 27, 2024

Hey there - I also ran into this issue.

I also called run_forever directly as when I use run as intended, asyncio returns an error "was never awaited".
My current hotfix is creating a task to run the subscribe/unsubscribe coroutine instead of run_coroutine_threadsafe.

def _unsubscribe(self, channel: str, symbols: List[str]) -> None:
    if self._running:
        task = self._loop.create_task(self._send_unsubscribe_msg(channel, symbols))
        # asyncio.run_coroutine_threadsafe(
        #     self._send_unsubscribe_msg(channel, symbols), self._loop
        # ).result()
    ...

Of course same for the _subscribe function.

This allows me to dynamically subscribe/unsubscribe during runtime.

I am not happy to interfere with the library to make this work, so I appreciate any recommondations you might have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants