-
Notifications
You must be signed in to change notification settings - Fork 100
Expand file tree
/
Copy pathruntime_config.py
More file actions
128 lines (108 loc) · 4.56 KB
/
runtime_config.py
File metadata and controls
128 lines (108 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from typing import Optional, Any, Mapping
from feldera.enums import FaultToleranceModel
class Resources:
"""
Class used to specify the resource configuration for a pipeline.
:param config: A dictionary containing all the configuration values.
:param cpu_cores_max: The maximum number of CPU cores to reserve for an instance of the pipeline.
:param cpu_cores_min: The minimum number of CPU cores to reserve for an instance of the pipeline.
:param memory_mb_max: The maximum memory in Megabytes to reserve for an instance of the pipeline.
:param memory_mb_min: The minimum memory in Megabytes to reserve for an instance of the pipeline.
:param storage_class: The storage class to use for the pipeline. The class determines storage performance such
as IOPS and throughput.
:param storage_mb_max: The storage in Megabytes to reserve for an instance of the pipeline.
"""
def __init__(
self,
config: Optional[Mapping[str, Any]] = None,
cpu_cores_max: Optional[int] = None,
cpu_cores_min: Optional[int] = None,
memory_mb_max: Optional[int] = None,
memory_mb_min: Optional[int] = None,
storage_class: Optional[str] = None,
storage_mb_max: Optional[int] = None,
):
config = config or {}
self.cpu_cores_max = cpu_cores_max
self.cpu_cores_min = cpu_cores_min
self.memory_mb_max = memory_mb_max
self.memory_mb_min = memory_mb_min
self.storage_class = storage_class
self.storage_mb_max = storage_mb_max
self.__dict__.update(config)
class Storage:
"""Storage configuration for a pipeline.
:param min_storage_bytes: The minimum estimated number of bytes in a batch of data to write it to storage.
"""
def __init__(
self,
config: Optional[Mapping[str, Any]] = None,
min_storage_bytes: Optional[int] = None,
):
config = config or {}
self.min_storage_bytes = min_storage_bytes
self.__dict__.update(config)
class RuntimeConfig:
"""
Runtime configuration class to define the configuration for a pipeline.
To create runtime config from a dictionary, use
:meth:`.RuntimeConfig.from_dict`.
Documentation:
https://docs.feldera.com/pipelines/configuration/#runtime-configuration
"""
def __init__(
self,
workers: Optional[int] = None,
hosts: Optional[int] = None,
storage: Optional[Storage | bool] = None,
tracing: Optional[bool] = False,
tracing_endpoint_jaeger: Optional[str] = "",
cpu_profiler: bool = True,
max_buffering_delay_usecs: int = 0,
min_batch_size_records: int = 0,
clock_resolution_usecs: Optional[int] = None,
provisioning_timeout_secs: Optional[int] = None,
resources: Optional[Resources] = None,
fault_tolerance_model: Optional[FaultToleranceModel] = None,
checkpoint_interval_secs: Optional[int] = None,
dev_tweaks: Optional[dict] = None,
logging: Optional[str] = None,
):
self.workers = workers
self.hosts = hosts
self.tracing = tracing
self.tracing_endpoint_jaeger = tracing_endpoint_jaeger
self.cpu_profiler = cpu_profiler
self.max_buffering_delay_usecs = max_buffering_delay_usecs
self.min_batch_size_records = min_batch_size_records
self.clock_resolution_usecs = clock_resolution_usecs
self.provisioning_timeout_secs = provisioning_timeout_secs
if fault_tolerance_model is not None:
self.fault_tolerance = {
"model": str(fault_tolerance_model),
"checkpoint_interval_secs": checkpoint_interval_secs,
}
if resources is not None:
self.resources = resources.__dict__
if storage is not None:
if isinstance(storage, bool):
self.storage = storage
elif isinstance(storage, Storage):
self.storage = storage.__dict__
else:
raise ValueError(f"Unknown value '{storage}' for storage")
self.dev_tweaks = dev_tweaks
self.logging = logging
@staticmethod
def default() -> "RuntimeConfig":
return RuntimeConfig(resources=Resources())
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
"""
Create a :class:`.RuntimeConfig` object from a dictionary.
"""
conf = cls()
conf.__dict__ = d
return conf
def to_dict(self) -> dict:
return dict((k, v) for k, v in self.__dict__.items() if v is not None)