Skip to content

local_cluster

DummyClient

Bases: object

Object to use as dask Client-like object for doing local operations with the dask Client API.

Source code in pyhdx/local_cluster.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class DummyClient(object):
    """Object to use as dask Client-like object for doing local operations with
    the dask Client API.
    """

    @staticmethod
    def submit(func: Callable, *args: Any, **kwargs) -> Future:
        future = Future()
        future.set_result(func(*args))
        return future

    @staticmethod
    def map(func: Callable, *iterables: Iterable, **kwargs) -> list[Future]:
        futures = []
        for items in zip(*iterables):
            result = func(*items)
            future = Future()
            future.set_result(result)
            futures.append(future)

        return futures

    @staticmethod
    def gather(futures) -> list[Any]:
        return [future.result() for future in futures]

blocking_cluster()

Start a dask LocalCluster and block until iterrupted

Source code in pyhdx/local_cluster.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def blocking_cluster():
    """Start a dask LocalCluster and block until iterrupted"""
    parser = argparse.ArgumentParser(description="Start a new Dask local cluster")
    parser.add_argument("-p", "--port", help="Port to use for the Dask local cluster", dest="port")

    args = parser.parse_args()

    if args.port:
        port = int(args.port)
    else:
        scheduler_address = cfg.cluster.scheduler_address
        port = int(scheduler_address.split(":")[-1])
    try:
        n_workers = cfg.cluster.n_workers
        local_cluster = LocalCluster(scheduler_port=port, n_workers=n_workers)
        print(f"Started local cluster at {local_cluster.scheduler_address}")
    except OSError:
        print(f"Could not start local cluster with at port: {port}")
        raise
    try:
        loop = True
        while loop:
            try:
                time.sleep(2)
            except KeyboardInterrupt:
                print("Interrupted")
                loop = False
    finally:
        local_cluster.close()

default_client(timeout='2s', **kwargs)

Return Dask client at scheduler adress as defined by the global config

Source code in pyhdx/local_cluster.py
43
44
45
46
47
48
49
50
51
def default_client(timeout="2s", **kwargs):
    """Return Dask client at scheduler adress as defined by the global config"""
    scheduler_address = cfg.cluster.scheduler_address
    try:
        client = Client(scheduler_address, timeout=timeout, **kwargs)
        return client
    except (TimeoutError, IOError):
        print(f"No valid Dask scheduler found at specified address: '{scheduler_address}'")
        return False

default_cluster(**kwargs)

Start a dask LocalCluster at the scheduler port given by the config

kwargs: override defaults

Source code in pyhdx/local_cluster.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def default_cluster(**kwargs):
    """Start a dask LocalCluster at the scheduler port given by the config

    kwargs: override defaults

    """

    scheduler_address = cfg.cluster.scheduler_address
    port = int(scheduler_address.split(":")[-1])

    settings = {
        "scheduler_port": port,
        "n_workers": cfg.cluster.n_workers,
    }
    settings.update(kwargs)
    cluster = LocalCluster(**settings)

    return cluster

verify_cluster(scheduler_address, timeout='2s')

Check if a valid dask scheduler is running at the provided scheduler_address

Source code in pyhdx/local_cluster.py
74
75
76
77
78
79
80
def verify_cluster(scheduler_address, timeout="2s"):
    """Check if a valid dask scheduler is running at the provided scheduler_address"""
    try:
        asyncio.run(connect(scheduler_address, timeout=timeout))
        return True
    except (TimeoutError, OSError):
        return False

verify_cluster_async(scheduler_address, timeout='2s')

Check if a valid dask scheduler is running at the provided scheduler_address

Source code in pyhdx/local_cluster.py
83
84
85
86
87
88
89
def verify_cluster_async(scheduler_address, timeout="2s"):
    """Check if a valid dask scheduler is running at the provided scheduler_address"""
    try:
        asyncio.run(connect(scheduler_address, timeout=timeout))
        return True
    except (TimeoutError, OSError):
        return False