42 lines
1.4 KiB
Diff
42 lines
1.4 KiB
Diff
|
diff --git a/apache_beam/runners/worker/operations.py b/apache_beam/runners/worker/operations.py
|
||
|
index 3464c5750c..5921c72b90 100644
|
||
|
--- a/apache_beam/runners/worker/operations.py
|
||
|
+++ b/apache_beam/runners/worker/operations.py
|
||
|
@@ -69,18 +69,6 @@ if TYPE_CHECKING:
|
||
|
from apache_beam.runners.worker.statesampler import StateSampler
|
||
|
from apache_beam.transforms.userstate import TimerSpec
|
||
|
|
||
|
-# Allow some "pure mode" declarations.
|
||
|
-try:
|
||
|
- import cython
|
||
|
-except ImportError:
|
||
|
-
|
||
|
- class FakeCython(object):
|
||
|
- @staticmethod
|
||
|
- def cast(type, value):
|
||
|
- return value
|
||
|
-
|
||
|
- globals()['cython'] = FakeCython()
|
||
|
-
|
||
|
_globally_windowed_value = GlobalWindows.windowed_value(None)
|
||
|
_global_window_type = type(_globally_windowed_value.windows[0])
|
||
|
|
||
|
@@ -149,7 +137,7 @@ class ConsumerSet(Receiver):
|
||
|
# type: (WindowedValue) -> None
|
||
|
self.update_counters_start(windowed_value)
|
||
|
for consumer in self.consumers:
|
||
|
- cython.cast(Operation, consumer).process(windowed_value)
|
||
|
+ consumer.process(windowed_value)
|
||
|
self.update_counters_finish()
|
||
|
|
||
|
def try_split(self, fraction_of_remainder):
|
||
|
@@ -345,7 +333,7 @@ class Operation(object):
|
||
|
|
||
|
def output(self, windowed_value, output_index=0):
|
||
|
# type: (WindowedValue, int) -> None
|
||
|
- cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
|
||
|
+ self.receivers[output_index].receive(windowed_value)
|
||
|
|
||
|
def add_receiver(self, operation, output_index=0):
|
||
|
# type: (Operation, int) -> None
|