-
Notifications
You must be signed in to change notification settings - Fork 101
Expand file tree
/
Copy pathoutput_handler.py
More file actions
79 lines (62 loc) · 2.18 KB
/
output_handler.py
File metadata and controls
79 lines (62 loc) · 2.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import pandas as pd
from typing import Optional
from threading import Event
from feldera import FelderaClient
from feldera._callback_runner import CallbackRunner
class OutputHandler:
def __init__(
self,
client: FelderaClient,
pipeline_name: str,
view_name: str,
):
"""
Initializes the output handler, but doesn't start it.
To start the output handler, call the `.OutputHandler.start` method.
"""
self.client: FelderaClient = client
self.pipeline_name: str = pipeline_name
self.view_name: str = view_name
self.buffer: list[pd.DataFrame] = []
self.exception: Optional[BaseException] = None
self.event = Event()
# the callback that is passed to the `CallbackRunner`
def callback(df: pd.DataFrame, _: int):
if not df.empty:
self.buffer.append(df)
def exception_callback(exception: BaseException):
self.exception = exception
# sets up the callback runner
self.handler = CallbackRunner(
self.client,
self.pipeline_name,
self.view_name,
callback,
exception_callback,
self.event,
)
def start(self):
"""
Starts the output handler in a separate thread
"""
self.handler.start()
_ = self.event.wait()
def to_pandas(self, clear_buffer: bool = True):
"""
Returns the output of the pipeline as a pandas DataFrame
:param clear_buffer: Whether to clear the buffer after getting the output.
"""
if self.exception is not None:
raise self.exception
if len(self.buffer) == 0:
return pd.DataFrame()
res = pd.concat(self.buffer, ignore_index=True)
if clear_buffer:
self.buffer.clear()
return res
def to_dict(self, clear_buffer: bool = True):
"""
Returns the output of the pipeline as a list of python dictionaries
:param clear_buffer: Whether to clear the buffer after getting the output.
"""
return self.to_pandas(clear_buffer).to_dict(orient="records")