From 4fc6a0c2b8573b070c71bfea06cb9ad9e39882d9 Mon Sep 17 00:00:00 2001 From: Dave McKay Date: Tue, 12 Nov 2024 09:54:24 +0000 Subject: [PATCH] dask can serialise swift!! --- csd3-side/test_dask.ipynb | 597 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 597 insertions(+) diff --git a/csd3-side/test_dask.ipynb b/csd3-side/test_dask.ipynb index e69de29..1fdaa83 100644 --- a/csd3-side/test_dask.ipynb +++ b/csd3-side/test_dask.ipynb @@ -0,0 +1,597 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from dask.distributed import Client, WorkerPlugin\n", + "from psutil import virtual_memory as mem\n", + "import bucket_manager.bucket_manager as bm\n", + "import sys" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "n_workers=1\n", + "threads_per_worker=1\n", + "mem_per_worker=mem().total//n_workers//2\n", + "client = Client(n_workers=n_workers,threads_per_worker=threads_per_worker,memory_limit=mem_per_worker)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "
\n", + "
\n", + "

Client

\n", + "

Client-ce240754-a0db-11ef-9bb7-00155dce21ad

\n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + "
\n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "

Cluster Info

\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

LocalCluster

\n", + "

ed0592f7

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "\n", + " \n", + "
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + " \n", + " Workers: 1\n", + "
\n", + " Total threads: 1\n", + " \n", + " Total memory: 3.72 GiB\n", + "
Status: runningUsing processes: True
\n", + "\n", + "
\n", + " \n", + "

Scheduler Info

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

Scheduler

\n", + "

Scheduler-87a3b3c7-54ae-4a35-a134-d7a4bf427c47

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Comm: tcp://127.0.0.1:37247\n", + " \n", + " Workers: 1\n", + "
\n", + " Dashboard: http://127.0.0.1:8787/status\n", + " \n", + " Total threads: 1\n", + "
\n", + " Started: Just now\n", + " \n", + " Total memory: 3.72 GiB\n", + "
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "

Workers

\n", + "
\n", + "\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: 0

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tcp://127.0.0.1:33445\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://127.0.0.1:41851/status\n", + " \n", + " Memory: 3.72 GiB\n", + "
\n", + " Nanny: tcp://127.0.0.1:42775\n", + "
\n", + " Local directory: /tmp/dask-scratch-space/worker-sjku42c4\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import json\n", + "kp = '~/lsst_keys.json'\n", + "with open(os.path.expanduser(kp), 'r') as kpf:\n", + " kd = json.load(kpf)\n", + " access_key = kd['access_key']\n", + " secret_key = kd['secret_key']\n", + "s3_host = 'echo.stfc.ac.uk'" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/urllib3/connectionpool.py:1100: InsecureRequestWarning: Unverified HTTPS request is being made to host 'echo.stfc.ac.uk'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "s3_host = 'echo.stfc.ac.uk'\n", + "# try:\n", + "# keys = bm.get_keys()\n", + "# except KeyError as e:\n", + "# print(f'KeyError {e}', file=sys.stderr)\n", + "# sys.exit()\n", + "# access_key = keys['access_key']\n", + "# secret_key = keys['secret_key']\n", + "\n", + "s3 = bm.get_resource(access_key, secret_key, s3_host)\n", + "bucket_list = bm.bucket_list(s3)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['DRP',\n", + " 'LSST-IR-FUSION',\n", + " 'LSST-IR-FUSION-Butlers',\n", + " 'LSST-IR-FUSION-Butlers-del',\n", + " 'LSST-IR-FUSION-TESTSTRATEGY',\n", + " 'LSST-IR-FUSION-rdsip005',\n", + " 'LSST-IR-FUSION-test',\n", + " 'LSST-IR-FUSION-testtree',\n", + " 'LSST-IR-FUSION-testtreec',\n", + " 'LSST-IR-FUSION-testtreecs',\n", + " 'LSST-IR-FUSION_gen3_conversion',\n", + " 'dmu4',\n", + " 'lsst-dac',\n", + " 'lsst-drp-config',\n", + " 'lsst-test',\n", + " 'lsst-test-3',\n", + " 'lsst-test2']" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "bucket_list" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "bucket_name = 'LSST-IR-FUSION-testplugin'" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "class S3WorkerPlugin(WorkerPlugin):\n", + " def __init__(self, access_key, secret_key, s3_host):\n", + " self.access_key = access_key\n", + " self.secret_key = secret_key\n", + " self.s3_host = s3_host\n", + " # self.s3 = bm.get_resource(self.access_key, \n", + " # self.secret_key, \n", + " # self.s3_host)\n", + "\n", + " def setup(self, worker):\n", + " self.worker = worker\n", + " self.s3 = bm.get_resource(self.access_key, \n", + " self.secret_key, \n", + " self.s3_host)\n", + "\n", + "def upload_to_s3(file_path, bucket_name, key):\n", + " s3.upload_file(file_path, bucket_name, key)\n", + "\n", + "def list_buckets():\n", + " return bm.bucket_list(s3)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'tcp://127.0.0.1:33445': {'status': 'OK'}}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Register the plugin with the Dask client\n", + "client.register_plugin(S3WorkerPlugin(access_key,secret_key,s3_host))\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-11-12 09:52:20,848 - distributed.protocol.pickle - ERROR - Failed to serialize \n", + " 0. 140160941370112\n", + ">.\n", + "Traceback (most recent call last):\n", + " File \"/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 77, in dumps\n", + " result = cloudpickle.dumps(x, **dump_kwargs)\n", + " File \"/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/cloudpickle/cloudpickle.py\", line 1479, in dumps\n", + " cp.dump(obj)\n", + " File \"/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/cloudpickle/cloudpickle.py\", line 1245, in dump\n", + " return super().dump(obj)\n", + "TypeError: cannot pickle '_thread.lock' object\n", + "\n", + "During handling of the above exception, another exception occurred:\n", + "\n", + "Traceback (most recent call last):\n", + " File \"/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 81, in dumps\n", + " result = cloudpickle.dumps(x, **dump_kwargs)\n", + " File \"/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/cloudpickle/cloudpickle.py\", line 1479, in dumps\n", + " cp.dump(obj)\n", + " File \"/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/cloudpickle/cloudpickle.py\", line 1245, in dump\n", + " return super().dump(obj)\n", + "TypeError: cannot pickle '_thread.lock' object\n" + ] + }, + { + "ename": "TypeError", + "evalue": "('Could not serialize object of type HighLevelGraph', '\\n 0. 140160941370112\\n>')", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/protocol/pickle.py:77\u001b[0m, in \u001b[0;36mdumps\u001b[0;34m(x, buffer_callback, protocol)\u001b[0m\n\u001b[1;32m 76\u001b[0m buffers\u001b[38;5;241m.\u001b[39mclear()\n\u001b[0;32m---> 77\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43mcloudpickle\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdumps\u001b[49m\u001b[43m(\u001b[49m\u001b[43mx\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mdump_kwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 78\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/cloudpickle/cloudpickle.py:1479\u001b[0m, in \u001b[0;36mdumps\u001b[0;34m(obj, protocol, buffer_callback)\u001b[0m\n\u001b[1;32m 1478\u001b[0m cp \u001b[38;5;241m=\u001b[39m Pickler(file, protocol\u001b[38;5;241m=\u001b[39mprotocol, buffer_callback\u001b[38;5;241m=\u001b[39mbuffer_callback)\n\u001b[0;32m-> 1479\u001b[0m \u001b[43mcp\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdump\u001b[49m\u001b[43m(\u001b[49m\u001b[43mobj\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1480\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m file\u001b[38;5;241m.\u001b[39mgetvalue()\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/cloudpickle/cloudpickle.py:1245\u001b[0m, in \u001b[0;36mPickler.dump\u001b[0;34m(self, obj)\u001b[0m\n\u001b[1;32m 1244\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 1245\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43msuper\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdump\u001b[49m\u001b[43m(\u001b[49m\u001b[43mobj\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1246\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n", + "\u001b[0;31mTypeError\u001b[0m: cannot pickle '_thread.lock' object", + "\nDuring handling of the above exception, another exception occurred:\n", + "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/protocol/serialize.py:353\u001b[0m, in \u001b[0;36mserialize\u001b[0;34m(x, serializers, on_error, context, iterate_collection)\u001b[0m\n\u001b[1;32m 352\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 353\u001b[0m header, frames \u001b[38;5;241m=\u001b[39m \u001b[43mdumps\u001b[49m\u001b[43m(\u001b[49m\u001b[43mx\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcontext\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcontext\u001b[49m\u001b[43m)\u001b[49m \u001b[38;5;28;01mif\u001b[39;00m wants_context \u001b[38;5;28;01melse\u001b[39;00m dumps(x)\n\u001b[1;32m 354\u001b[0m header[\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mserializer\u001b[39m\u001b[38;5;124m\"\u001b[39m] \u001b[38;5;241m=\u001b[39m name\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/protocol/serialize.py:76\u001b[0m, in \u001b[0;36mpickle_dumps\u001b[0;34m(x, context)\u001b[0m\n\u001b[1;32m 74\u001b[0m writeable\u001b[38;5;241m.\u001b[39mappend(\u001b[38;5;129;01mnot\u001b[39;00m f\u001b[38;5;241m.\u001b[39mreadonly)\n\u001b[0;32m---> 76\u001b[0m frames[\u001b[38;5;241m0\u001b[39m] \u001b[38;5;241m=\u001b[39m \u001b[43mpickle\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdumps\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 77\u001b[0m \u001b[43m \u001b[49m\u001b[43mx\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 78\u001b[0m \u001b[43m \u001b[49m\u001b[43mbuffer_callback\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mbuffer_callback\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 79\u001b[0m \u001b[43m \u001b[49m\u001b[43mprotocol\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcontext\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mget\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mpickle-protocol\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mif\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mcontext\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01melse\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m,\u001b[49m\n\u001b[1;32m 80\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 81\u001b[0m header \u001b[38;5;241m=\u001b[39m {\n\u001b[1;32m 82\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mserializer\u001b[39m\u001b[38;5;124m\"\u001b[39m: \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mpickle\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n\u001b[1;32m 83\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mwriteable\u001b[39m\u001b[38;5;124m\"\u001b[39m: \u001b[38;5;28mtuple\u001b[39m(writeable),\n\u001b[1;32m 84\u001b[0m }\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/protocol/pickle.py:81\u001b[0m, in \u001b[0;36mdumps\u001b[0;34m(x, buffer_callback, protocol)\u001b[0m\n\u001b[1;32m 80\u001b[0m buffers\u001b[38;5;241m.\u001b[39mclear()\n\u001b[0;32m---> 81\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[43mcloudpickle\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdumps\u001b[49m\u001b[43m(\u001b[49m\u001b[43mx\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mdump_kwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 82\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/cloudpickle/cloudpickle.py:1479\u001b[0m, in \u001b[0;36mdumps\u001b[0;34m(obj, protocol, buffer_callback)\u001b[0m\n\u001b[1;32m 1478\u001b[0m cp \u001b[38;5;241m=\u001b[39m Pickler(file, protocol\u001b[38;5;241m=\u001b[39mprotocol, buffer_callback\u001b[38;5;241m=\u001b[39mbuffer_callback)\n\u001b[0;32m-> 1479\u001b[0m \u001b[43mcp\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdump\u001b[49m\u001b[43m(\u001b[49m\u001b[43mobj\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1480\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m file\u001b[38;5;241m.\u001b[39mgetvalue()\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/cloudpickle/cloudpickle.py:1245\u001b[0m, in \u001b[0;36mPickler.dump\u001b[0;34m(self, obj)\u001b[0m\n\u001b[1;32m 1244\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 1245\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43msuper\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdump\u001b[49m\u001b[43m(\u001b[49m\u001b[43mobj\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1246\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n", + "\u001b[0;31mTypeError\u001b[0m: cannot pickle '_thread.lock' object", + "\nThe above exception was the direct cause of the following exception:\n", + "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[10], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m future \u001b[38;5;241m=\u001b[39m \u001b[43mclient\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msubmit\u001b[49m\u001b[43m(\u001b[49m\u001b[43mlist_buckets\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/client.py:1950\u001b[0m, in \u001b[0;36mClient.submit\u001b[0;34m(self, func, key, workers, resources, retries, priority, fifo_timeout, allow_other_workers, actor, actors, pure, *args, **kwargs)\u001b[0m\n\u001b[1;32m 1947\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 1948\u001b[0m dsk \u001b[38;5;241m=\u001b[39m {key: (func,) \u001b[38;5;241m+\u001b[39m \u001b[38;5;28mtuple\u001b[39m(args)}\n\u001b[0;32m-> 1950\u001b[0m futures \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_graph_to_futures\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 1951\u001b[0m \u001b[43m \u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1952\u001b[0m \u001b[43m \u001b[49m\u001b[43m[\u001b[49m\u001b[43mkey\u001b[49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1953\u001b[0m \u001b[43m \u001b[49m\u001b[43mworkers\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mworkers\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1954\u001b[0m \u001b[43m \u001b[49m\u001b[43mallow_other_workers\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mallow_other_workers\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1955\u001b[0m \u001b[43m \u001b[49m\u001b[43minternal_priority\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m{\u001b[49m\u001b[43mkey\u001b[49m\u001b[43m:\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m0\u001b[39;49m\u001b[43m}\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1956\u001b[0m \u001b[43m \u001b[49m\u001b[43muser_priority\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mpriority\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1957\u001b[0m \u001b[43m \u001b[49m\u001b[43mresources\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mresources\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1958\u001b[0m \u001b[43m \u001b[49m\u001b[43mretries\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mretries\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1959\u001b[0m \u001b[43m \u001b[49m\u001b[43mfifo_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mfifo_timeout\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1960\u001b[0m \u001b[43m \u001b[49m\u001b[43mactors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mactor\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 1961\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1963\u001b[0m logger\u001b[38;5;241m.\u001b[39mdebug(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mSubmit \u001b[39m\u001b[38;5;132;01m%s\u001b[39;00m\u001b[38;5;124m(...), \u001b[39m\u001b[38;5;132;01m%s\u001b[39;00m\u001b[38;5;124m\"\u001b[39m, funcname(func), key)\n\u001b[1;32m 1965\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m futures[key]\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/client.py:3151\u001b[0m, in \u001b[0;36mClient._graph_to_futures\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, internal_priority, user_priority, resources, retries, fifo_timeout, actors)\u001b[0m\n\u001b[1;32m 3148\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mdistributed\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mprotocol\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m serialize\n\u001b[1;32m 3149\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mdistributed\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mprotocol\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mserialize\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m ToPickle\n\u001b[0;32m-> 3151\u001b[0m header, frames \u001b[38;5;241m=\u001b[39m \u001b[43mserialize\u001b[49m\u001b[43m(\u001b[49m\u001b[43mToPickle\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mon_error\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mraise\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3153\u001b[0m pickled_size \u001b[38;5;241m=\u001b[39m \u001b[38;5;28msum\u001b[39m(\u001b[38;5;28mmap\u001b[39m(nbytes, [header] \u001b[38;5;241m+\u001b[39m frames))\n\u001b[1;32m 3154\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m pickled_size \u001b[38;5;241m>\u001b[39m parse_bytes(\n\u001b[1;32m 3155\u001b[0m dask\u001b[38;5;241m.\u001b[39mconfig\u001b[38;5;241m.\u001b[39mget(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mdistributed.admin.large-graph-warning-threshold\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 3156\u001b[0m ):\n", + "File \u001b[0;32m~/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/distributed/protocol/serialize.py:379\u001b[0m, in \u001b[0;36mserialize\u001b[0;34m(x, serializers, on_error, context, iterate_collection)\u001b[0m\n\u001b[1;32m 377\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 378\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m(msg) \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mexc\u001b[39;00m\n\u001b[0;32m--> 379\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m(msg, str_x) \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mexc\u001b[39;00m\n\u001b[1;32m 380\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m: \u001b[38;5;66;03m# pragma: nocover\u001b[39;00m\n\u001b[1;32m 381\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mon_error\u001b[38;5;132;01m=}\u001b[39;00m\u001b[38;5;124m; expected \u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mmessage\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m or \u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mraise\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n", + "\u001b[0;31mTypeError\u001b[0m: ('Could not serialize object of type HighLevelGraph', '\\n 0. 140160941370112\\n>')" + ] + } + ], + "source": [ + "future = client.submit(list_buckets)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "# client.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'lsst:swift'" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "with open(os.path.expanduser('~/lsst-swift-credentials.json'), 'r') as swiftf:\n", + " swiftk = json.load(swiftf)\n", + " user = swiftk['user']\n", + " secret_key = swiftk['secret_key']\n", + "user" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "# conn = bm.get_conn_swift(user, secret_key, s3_host)" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "# conn" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "import swiftclient\n", + "def get_conn(username, access_key, host):\n", + "\treturn swiftclient.Connection(\n", + " authurl = host,\n", + "\t\tuser = username,\n", + "\t\tkey = access_key,\n", + " insecure=True\n", + "\t)\n", + "conn = get_conn(user, secret_key, 'https://s3.echo.stfc.ac.uk/auth/1.0')\n", + "def list_containers(conn):\n", + " buckets = []\n", + " for container in conn.get_account()[1]:\n", + " buckets.append(container['name'])\n", + " return buckets" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/urllib3/connectionpool.py:1100: InsecureRequestWarning: Unverified HTTPS request is being made to host 's3.echo.stfc.ac.uk'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n", + " warnings.warn(\n", + "/home/dave/miniconda3/envs/lsst-uk/lib/python3.10/site-packages/urllib3/connectionpool.py:1100: InsecureRequestWarning: Unverified HTTPS request is being made to host 's3.echo.stfc.ac.uk'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#tls-warnings\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "f = client.submit(list_containers, conn)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['DRP',\n", + " 'LSST-IR-FUSION',\n", + " 'LSST-IR-FUSION-Butlers',\n", + " 'LSST-IR-FUSION-Butlers-del',\n", + " 'LSST-IR-FUSION-TESTSTRATEGY',\n", + " 'LSST-IR-FUSION-rdsip005',\n", + " 'LSST-IR-FUSION-test',\n", + " 'LSST-IR-FUSION-testtree',\n", + " 'LSST-IR-FUSION-testtreec',\n", + " 'LSST-IR-FUSION-testtreecs',\n", + " 'LSST-IR-FUSION_gen3_conversion',\n", + " 'dmu4',\n", + " 'lsst-dac',\n", + " 'lsst-drp-config',\n", + " 'lsst-test',\n", + " 'lsst-test-3',\n", + " 'lsst-test2']" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "f.result()" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "buckets = client.gather(f)" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [], + "source": [ + "buckets" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "lsst-uk", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}