Use bytes instead of Any in RunnerApi.FunctionSpec
[beam.git] / sdks / python / apache_beam / utils / urns.py
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one or more
3 # contributor license agreements. See the NOTICE file distributed with
4 # this work for additional information regarding copyright ownership.
5 # The ASF licenses this file to You under the Apache License, Version 2.0
6 # (the "License"); you may not use this file except in compliance with
7 # the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 """For internal use only; no backwards-compatibility guarantees."""
19
20 import abc
21 import inspect
22
23 from google.protobuf import wrappers_pb2
24
25 from apache_beam.internal import pickler
26 from apache_beam.utils import proto_utils
27
28
29 PICKLED_WINDOW_FN = "beam:windowfn:pickled_python:v0.1"
30 GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1"
31 FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"
32 SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"
33 SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
34
35 PICKLED_DO_FN = "beam:dofn:pickled_python:v0.1"
36 PICKLED_DO_FN_INFO = "beam:dofn:pickled_python_info:v0.1"
37 PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1"
38 PICKLED_CODER = "beam:coder:pickled_python:v0.1"
39
40 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
41 PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
42 GROUP_BY_KEY_TRANSFORM = "beam:ptransform:group_by_key:v0.1"
43 GROUP_BY_KEY_ONLY_TRANSFORM = "beam:ptransform:group_by_key_only:v0.1"
44 GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
45 COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
46 COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1"
47 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
48 READ_TRANSFORM = "beam:ptransform:read:v0.1"
49 WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"
50
51 PICKLED_SOURCE = "beam:source:pickled_python:v0.1"
52
53
54 class RunnerApiFn(object):
55 """Abstract base class that provides urn registration utilities.
56
57 A class that inherits from this class will get a registration-based
58 from_runner_api and to_runner_api method that convert to and from
59 beam_runner_api_pb2.SdkFunctionSpec.
60
61 Additionally, register_pickle_urn can be called from the body of a class
62 to register serialization via pickling.
63 """
64
65 # TODO(BEAM-2685): Issue with dill + local classes + abc metaclass
66 # __metaclass__ = abc.ABCMeta
67
68 _known_urns = {}
69
70 @abc.abstractmethod
71 def to_runner_api_parameter(self, unused_context):
72 """Returns the urn and payload for this Fn.
73
74 The returned urn(s) should be registered with `register_urn`.
75 """
76 pass
77
78 @classmethod
79 def register_urn(cls, urn, parameter_type, fn=None):
80 """Registeres a urn with a constructor.
81
82 For example, if 'beam:fn:foo' had paramter type FooPayload, one could
83 write `RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto)`
84 where foo_from_proto took as arguments a FooPayload and a PipelineContext.
85 This function can also be used as a decorator rather than passing the
86 callable in as the final parameter.
87
88 A corresponding to_runner_api_parameter method would be expected that
89 returns the tuple ('beam:fn:foo', FooPayload)
90 """
91 def register(fn):
92 cls._known_urns[urn] = parameter_type, fn
93 return staticmethod(fn)
94 if fn:
95 # Used as a statement.
96 register(fn)
97 else:
98 # Used as a decorator.
99 return register
100
101 @classmethod
102 def register_pickle_urn(cls, pickle_urn):
103 """Registers and implements the given urn via pickling.
104 """
105 inspect.currentframe().f_back.f_locals['to_runner_api_parameter'] = (
106 lambda self, context: (
107 pickle_urn, wrappers_pb2.BytesValue(value=pickler.dumps(self))))
108 cls.register_urn(
109 pickle_urn,
110 wrappers_pb2.BytesValue,
111 lambda proto, unused_context: pickler.loads(proto.value))
112
113 def to_runner_api(self, context):
114 """Returns an SdkFunctionSpec encoding this Fn.
115
116 Prefer overriding self.to_runner_api_parameter.
117 """
118 from apache_beam.portability.api import beam_runner_api_pb2
119 urn, typed_param = self.to_runner_api_parameter(context)
120 return beam_runner_api_pb2.SdkFunctionSpec(
121 spec=beam_runner_api_pb2.FunctionSpec(
122 urn=urn,
123 any_param=proto_utils.pack_Any(typed_param),
124 payload=typed_param.SerializeToString()
125 if typed_param is not None else None))
126
127 @classmethod
128 def from_runner_api(cls, fn_proto, context):
129 """Converts from an SdkFunctionSpec to a Fn object.
130
131 Prefer registering a urn with its parameter type and constructor.
132 """
133 parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
134 return constructor(
135 proto_utils.parse_Bytes(fn_proto.spec.payload, parameter_type),
136 context)