Use bytes instead of Any in RunnerApi.FunctionSpec
[beam.git] / sdks / python / apache_beam / coders / coders.py
1 #
2 # Licensed to the Apache Software Foundation (ASF) under one or more
3 # contributor license agreements. See the NOTICE file distributed with
4 # this work for additional information regarding copyright ownership.
5 # The ASF licenses this file to You under the Apache License, Version 2.0
6 # (the "License"); you may not use this file except in compliance with
7 # the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 """Collection of useful coders.
19
20 Only those coders listed in __all__ are part of the public API of this module.
21 """
22
23 import base64
24 import cPickle as pickle
25 import google.protobuf
26
27 from apache_beam.coders import coder_impl
28 from apache_beam.portability.api import beam_runner_api_pb2
29 from apache_beam.utils import urns
30 from apache_beam.utils import proto_utils
31
32 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
33 try:
34 from stream import get_varint_size
35 except ImportError:
36 from slow_stream import get_varint_size
37 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
38
39
40 # pylint: disable=wrong-import-order, wrong-import-position
41 # Avoid dependencies on the full SDK.
42 try:
43 # Import dill from the pickler module to make sure our monkey-patching of dill
44 # occurs.
45 from apache_beam.internal.pickler import dill
46 except ImportError:
47 # We fall back to using the stock dill library in tests that don't use the
48 # full Python SDK.
49 import dill
50
51
52 __all__ = ['Coder',
53 'BytesCoder', 'DillCoder', 'FastPrimitivesCoder', 'FloatCoder',
54 'IterableCoder', 'PickleCoder', 'ProtoCoder', 'SingletonCoder',
55 'StrUtf8Coder', 'TimestampCoder', 'TupleCoder',
56 'TupleSequenceCoder', 'VarIntCoder', 'WindowedValueCoder']
57
58
59 def serialize_coder(coder):
60 from apache_beam.internal import pickler
61 return '%s$%s' % (coder.__class__.__name__, pickler.dumps(coder))
62
63
64 def deserialize_coder(serialized):
65 from apache_beam.internal import pickler
66 return pickler.loads(serialized.split('$', 1)[1])
67 # pylint: enable=wrong-import-order, wrong-import-position
68
69
70 class Coder(object):
71 """Base class for coders."""
72
73 def encode(self, value):
74 """Encodes the given object into a byte string."""
75 raise NotImplementedError('Encode not implemented: %s.' % self)
76
77 def decode(self, encoded):
78 """Decodes the given byte string into the corresponding object."""
79 raise NotImplementedError('Decode not implemented: %s.' % self)
80
81 def is_deterministic(self):
82 """Whether this coder is guaranteed to encode values deterministically.
83
84 A deterministic coder is required for key coders in GroupByKey operations
85 to produce consistent results.
86
87 For example, note that the default coder, the PickleCoder, is not
88 deterministic: the ordering of picked entries in maps may vary across
89 executions since there is no defined order, and such a coder is not in
90 general suitable for usage as a key coder in GroupByKey operations, since
91 each instance of the same key may be encoded differently.
92
93 Returns:
94 Whether coder is deterministic.
95 """
96 return False
97
98 def estimate_size(self, value):
99 """Estimates the encoded size of the given value, in bytes.
100
101 Dataflow estimates the encoded size of a PCollection processed in a pipeline
102 step by using the estimated size of a random sample of elements in that
103 PCollection.
104
105 The default implementation encodes the given value and returns its byte
106 size. If a coder can provide a fast estimate of the encoded size of a value
107 (e.g., if the encoding has a fixed size), it can provide its estimate here
108 to improve performance.
109
110 Arguments:
111 value: the value whose encoded size is to be estimated.
112
113 Returns:
114 The estimated encoded size of the given value.
115 """
116 return len(self.encode(value))
117
118 # ===========================================================================
119 # Methods below are internal SDK details that don't need to be modified for
120 # user-defined coders.
121 # ===========================================================================
122
123 def _create_impl(self):
124 """Creates a CoderImpl to do the actual encoding and decoding.
125 """
126 return coder_impl.CallbackCoderImpl(self.encode, self.decode,
127 self.estimate_size)
128
129 def get_impl(self):
130 """For internal use only; no backwards-compatibility guarantees.
131
132 Returns the CoderImpl backing this Coder.
133 """
134 if not hasattr(self, '_impl'):
135 self._impl = self._create_impl()
136 assert isinstance(self._impl, coder_impl.CoderImpl)
137 return self._impl
138
139 def __getstate__(self):
140 return self._dict_without_impl()
141
142 def _dict_without_impl(self):
143 if hasattr(self, '_impl'):
144 d = dict(self.__dict__)
145 del d['_impl']
146 return d
147 return self.__dict__
148
149 @classmethod
150 def from_type_hint(cls, unused_typehint, unused_registry):
151 # If not overridden, just construct the coder without arguments.
152 return cls()
153
154 def is_kv_coder(self):
155 return False
156
157 def key_coder(self):
158 if self.is_kv_coder():
159 raise NotImplementedError('key_coder: %s' % self)
160 else:
161 raise ValueError('Not a KV coder: %s.' % self)
162
163 def value_coder(self):
164 if self.is_kv_coder():
165 raise NotImplementedError('value_coder: %s' % self)
166 else:
167 raise ValueError('Not a KV coder: %s.' % self)
168
169 def _get_component_coders(self):
170 """For internal use only; no backwards-compatibility guarantees.
171
172 Returns the internal component coders of this coder."""
173 # This is an internal detail of the Coder API and does not need to be
174 # refined in user-defined Coders.
175 return []
176
177 def as_cloud_object(self):
178 """For internal use only; no backwards-compatibility guarantees.
179
180 Returns Google Cloud Dataflow API description of this coder."""
181 # This is an internal detail of the Coder API and does not need to be
182 # refined in user-defined Coders.
183
184 value = {
185 # We pass coders in the form "<coder_name>$<pickled_data>" to make the
186 # job description JSON more readable. Data before the $ is ignored by
187 # the worker.
188 '@type': serialize_coder(self),
189 'component_encodings': list(
190 component.as_cloud_object()
191 for component in self._get_component_coders()
192 ),
193 }
194 return value
195
196 def __repr__(self):
197 return self.__class__.__name__
198
199 def __eq__(self, other):
200 # pylint: disable=protected-access
201 return (self.__class__ == other.__class__
202 and self._dict_without_impl() == other._dict_without_impl())
203 # pylint: enable=protected-access
204
205 def to_runner_api(self, context):
206 """For internal use only; no backwards-compatibility guarantees.
207 """
208 # TODO(BEAM-115): Use specialized URNs and components.
209 serialized_coder = serialize_coder(self)
210 return beam_runner_api_pb2.Coder(
211 spec=beam_runner_api_pb2.SdkFunctionSpec(
212 spec=beam_runner_api_pb2.FunctionSpec(
213 urn=urns.PICKLED_CODER,
214 any_param=proto_utils.pack_Any(
215 google.protobuf.wrappers_pb2.BytesValue(
216 value=serialized_coder)),
217 payload=serialized_coder)))
218
219 @staticmethod
220 def from_runner_api(proto, context):
221 """For internal use only; no backwards-compatibility guarantees.
222 """
223 return deserialize_coder(proto.spec.spec.payload)
224
225
226 class StrUtf8Coder(Coder):
227 """A coder used for reading and writing strings as UTF-8."""
228
229 def encode(self, value):
230 return value.encode('utf-8')
231
232 def decode(self, value):
233 return value.decode('utf-8')
234
235 def is_deterministic(self):
236 return True
237
238
239 class ToStringCoder(Coder):
240 """A default string coder used if no sink coder is specified."""
241
242 def encode(self, value):
243 if isinstance(value, unicode):
244 return value.encode('utf-8')
245 elif isinstance(value, str):
246 return value
247 return str(value)
248
249 def decode(self, _):
250 raise NotImplementedError('ToStringCoder cannot be used for decoding.')
251
252 def is_deterministic(self):
253 return True
254
255
256 class FastCoder(Coder):
257 """Coder subclass used when a (faster) CoderImpl is supplied directly.
258
259 The Coder class defines _create_impl in terms of encode() and decode();
260 this class inverts that by defining encode() and decode() in terms of
261 _create_impl().
262 """
263
264 def encode(self, value):
265 """Encodes the given object into a byte string."""
266 return self.get_impl().encode(value)
267
268 def decode(self, encoded):
269 """Decodes the given byte string into the corresponding object."""
270 return self.get_impl().decode(encoded)
271
272 def estimate_size(self, value):
273 return self.get_impl().estimate_size(value)
274
275 def _create_impl(self):
276 raise NotImplementedError
277
278
279 class BytesCoder(FastCoder):
280 """Byte string coder."""
281
282 def _create_impl(self):
283 return coder_impl.BytesCoderImpl()
284
285 def is_deterministic(self):
286 return True
287
288 def as_cloud_object(self):
289 return {
290 '@type': 'kind:bytes',
291 }
292
293 def __eq__(self, other):
294 return type(self) == type(other)
295
296 def __hash__(self):
297 return hash(type(self))
298
299
300 class VarIntCoder(FastCoder):
301 """Variable-length integer coder."""
302
303 def _create_impl(self):
304 return coder_impl.VarIntCoderImpl()
305
306 def is_deterministic(self):
307 return True
308
309 def __eq__(self, other):
310 return type(self) == type(other)
311
312 def __hash__(self):
313 return hash(type(self))
314
315
316 class FloatCoder(FastCoder):
317 """A coder used for floating-point values."""
318
319 def _create_impl(self):
320 return coder_impl.FloatCoderImpl()
321
322 def is_deterministic(self):
323 return True
324
325 def __eq__(self, other):
326 return type(self) == type(other)
327
328 def __hash__(self):
329 return hash(type(self))
330
331
332 class TimestampCoder(FastCoder):
333 """A coder used for timeutil.Timestamp values."""
334
335 def _create_impl(self):
336 return coder_impl.TimestampCoderImpl()
337
338 def is_deterministic(self):
339 return True
340
341 def __eq__(self, other):
342 return type(self) == type(other)
343
344 def __hash__(self):
345 return hash(type(self))
346
347
348 class SingletonCoder(FastCoder):
349 """A coder that always encodes exactly one value."""
350
351 def __init__(self, value):
352 self._value = value
353
354 def _create_impl(self):
355 return coder_impl.SingletonCoderImpl(self._value)
356
357 def is_deterministic(self):
358 return True
359
360 def __eq__(self, other):
361 return type(self) == type(other) and self._value == other._value
362
363 def __hash__(self):
364 return hash(self._value)
365
366
367 def maybe_dill_dumps(o):
368 """Pickle using cPickle or the Dill pickler as a fallback."""
369 # We need to use the dill pickler for objects of certain custom classes,
370 # including, for example, ones that contain lambdas.
371 try:
372 return pickle.dumps(o, pickle.HIGHEST_PROTOCOL)
373 except Exception: # pylint: disable=broad-except
374 return dill.dumps(o)
375
376
377 def maybe_dill_loads(o):
378 """Unpickle using cPickle or the Dill pickler as a fallback."""
379 try:
380 return pickle.loads(o)
381 except Exception: # pylint: disable=broad-except
382 return dill.loads(o)
383
384
385 class _PickleCoderBase(FastCoder):
386 """Base class for pickling coders."""
387
388 def is_deterministic(self):
389 # Note that the default coder, the PickleCoder, is not deterministic (for
390 # example, the ordering of picked entries in maps may vary across
391 # executions), and so is not in general suitable for usage as a key coder in
392 # GroupByKey operations.
393 return False
394
395 def as_cloud_object(self, is_pair_like=True):
396 value = super(_PickleCoderBase, self).as_cloud_object()
397 # We currently use this coder in places where we cannot infer the coder to
398 # use for the value type in a more granular way. In places where the
399 # service expects a pair, it checks for the "is_pair_like" key, in which
400 # case we would fail without the hack below.
401 if is_pair_like:
402 value['is_pair_like'] = True
403 value['component_encodings'] = [
404 self.as_cloud_object(is_pair_like=False),
405 self.as_cloud_object(is_pair_like=False)
406 ]
407
408 return value
409
410 # We allow .key_coder() and .value_coder() to be called on PickleCoder since
411 # we can't always infer the return values of lambdas in ParDo operations, the
412 # result of which may be used in a GroupBykey.
413 def is_kv_coder(self):
414 return True
415
416 def key_coder(self):
417 return self
418
419 def value_coder(self):
420 return self
421
422 def __eq__(self, other):
423 return type(self) == type(other)
424
425 def __hash__(self):
426 return hash(type(self))
427
428
429 class PickleCoder(_PickleCoderBase):
430 """Coder using Python's pickle functionality."""
431
432 def _create_impl(self):
433 dumps = pickle.dumps
434 HIGHEST_PROTOCOL = pickle.HIGHEST_PROTOCOL
435 return coder_impl.CallbackCoderImpl(
436 lambda x: dumps(x, HIGHEST_PROTOCOL), pickle.loads)
437
438
439 class DillCoder(_PickleCoderBase):
440 """Coder using dill's pickle functionality."""
441
442 def _create_impl(self):
443 return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads)
444
445
446 class DeterministicFastPrimitivesCoder(FastCoder):
447 """Throws runtime errors when encoding non-deterministic values."""
448
449 def __init__(self, coder, step_label):
450 self._underlying_coder = coder
451 self._step_label = step_label
452
453 def _create_impl(self):
454 return coder_impl.DeterministicFastPrimitivesCoderImpl(
455 self._underlying_coder.get_impl(), self._step_label)
456
457 def is_deterministic(self):
458 return True
459
460 def is_kv_coder(self):
461 return True
462
463 def key_coder(self):
464 return self
465
466 def value_coder(self):
467 return self
468
469
470 class FastPrimitivesCoder(FastCoder):
471 """Encodes simple primitives (e.g. str, int) efficiently.
472
473 For unknown types, falls back to another coder (e.g. PickleCoder).
474 """
475 def __init__(self, fallback_coder=PickleCoder()):
476 self._fallback_coder = fallback_coder
477
478 def _create_impl(self):
479 return coder_impl.FastPrimitivesCoderImpl(
480 self._fallback_coder.get_impl())
481
482 def is_deterministic(self):
483 return self._fallback_coder.is_deterministic()
484
485 def as_cloud_object(self, is_pair_like=True):
486 value = super(FastCoder, self).as_cloud_object()
487 # We currently use this coder in places where we cannot infer the coder to
488 # use for the value type in a more granular way. In places where the
489 # service expects a pair, it checks for the "is_pair_like" key, in which
490 # case we would fail without the hack below.
491 if is_pair_like:
492 value['is_pair_like'] = True
493 value['component_encodings'] = [
494 self.as_cloud_object(is_pair_like=False),
495 self.as_cloud_object(is_pair_like=False)
496 ]
497
498 return value
499
500 # We allow .key_coder() and .value_coder() to be called on FastPrimitivesCoder
501 # since we can't always infer the return values of lambdas in ParDo
502 # operations, the result of which may be used in a GroupBykey.
503 def is_kv_coder(self):
504 return True
505
506 def key_coder(self):
507 return self
508
509 def value_coder(self):
510 return self
511
512 def __eq__(self, other):
513 return type(self) == type(other)
514
515 def __hash__(self):
516 return hash(type(self))
517
518
519 class Base64PickleCoder(Coder):
520 """Coder of objects by Python pickle, then base64 encoding."""
521 # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather
522 # than via a special Coder.
523
524 def encode(self, value):
525 return base64.b64encode(pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
526
527 def decode(self, encoded):
528 return pickle.loads(base64.b64decode(encoded))
529
530 def is_deterministic(self):
531 # Note that the Base64PickleCoder is not deterministic. See the
532 # corresponding comments for PickleCoder above.
533 return False
534
535 # We allow .key_coder() and .value_coder() to be called on Base64PickleCoder
536 # since we can't always infer the return values of lambdas in ParDo
537 # operations, the result of which may be used in a GroupBykey.
538 #
539 # TODO(ccy): this is currently only used for KV values from Create transforms.
540 # Investigate a way to unify this with PickleCoder.
541 def is_kv_coder(self):
542 return True
543
544 def key_coder(self):
545 return self
546
547 def value_coder(self):
548 return self
549
550
551 class ProtoCoder(FastCoder):
552 """A Coder for Google Protocol Buffers.
553
554 It supports both Protocol Buffers syntax versions 2 and 3. However,
555 the runtime version of the python protobuf library must exactly match the
556 version of the protoc compiler what was used to generate the protobuf
557 messages.
558
559 ProtoCoder is registered in the global CoderRegistry as the default coder for
560 any protobuf Message object.
561
562 """
563
564 def __init__(self, proto_message_type):
565 self.proto_message_type = proto_message_type
566
567 def _create_impl(self):
568 return coder_impl.ProtoCoderImpl(self.proto_message_type)
569
570 def is_deterministic(self):
571 # TODO(vikasrk): A proto message can be deterministic if it does not contain
572 # a Map.
573 return False
574
575 def __eq__(self, other):
576 return (type(self) == type(other)
577 and self.proto_message_type == other.proto_message_type)
578
579 def __hash__(self):
580 return hash(self.proto_message_type)
581
582 @staticmethod
583 def from_type_hint(typehint, unused_registry):
584 if issubclass(typehint, google.protobuf.message.Message):
585 return ProtoCoder(typehint)
586 else:
587 raise ValueError(('Expected a subclass of google.protobuf.message.Message'
588 ', but got a %s' % typehint))
589
590
591 class TupleCoder(FastCoder):
592 """Coder of tuple objects."""
593
594 def __init__(self, components):
595 self._coders = tuple(components)
596
597 def _create_impl(self):
598 return coder_impl.TupleCoderImpl([c.get_impl() for c in self._coders])
599
600 def is_deterministic(self):
601 return all(c.is_deterministic() for c in self._coders)
602
603 @staticmethod
604 def from_type_hint(typehint, registry):
605 return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
606
607 def as_cloud_object(self):
608 if self.is_kv_coder():
609 return {
610 '@type': 'kind:pair',
611 'is_pair_like': True,
612 'component_encodings': list(
613 component.as_cloud_object()
614 for component in self._get_component_coders()
615 ),
616 }
617
618 return super(TupleCoder, self).as_cloud_object()
619
620 def _get_component_coders(self):
621 return self.coders()
622
623 def coders(self):
624 return self._coders
625
626 def is_kv_coder(self):
627 return len(self._coders) == 2
628
629 def key_coder(self):
630 if len(self._coders) != 2:
631 raise ValueError('TupleCoder does not have exactly 2 components.')
632 return self._coders[0]
633
634 def value_coder(self):
635 if len(self._coders) != 2:
636 raise ValueError('TupleCoder does not have exactly 2 components.')
637 return self._coders[1]
638
639 def __repr__(self):
640 return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders)
641
642 def __eq__(self, other):
643 return (type(self) == type(other)
644 and self._coders == self._coders)
645
646 def __hash__(self):
647 return hash(self._coders)
648
649
650 class TupleSequenceCoder(FastCoder):
651 """Coder of homogeneous tuple objects."""
652
653 def __init__(self, elem_coder):
654 self._elem_coder = elem_coder
655
656 def _create_impl(self):
657 return coder_impl.TupleSequenceCoderImpl(self._elem_coder.get_impl())
658
659 def is_deterministic(self):
660 return self._elem_coder.is_deterministic()
661
662 @staticmethod
663 def from_type_hint(typehint, registry):
664 return TupleSequenceCoder(registry.get_coder(typehint.inner_type))
665
666 def _get_component_coders(self):
667 return (self._elem_coder,)
668
669 def __repr__(self):
670 return 'TupleSequenceCoder[%r]' % self._elem_coder
671
672 def __eq__(self, other):
673 return (type(self) == type(other)
674 and self._elem_coder == self._elem_coder)
675
676 def __hash__(self):
677 return hash((type(self), self._elem_coder))
678
679
680 class IterableCoder(FastCoder):
681 """Coder of iterables of homogeneous objects."""
682
683 def __init__(self, elem_coder):
684 self._elem_coder = elem_coder
685
686 def _create_impl(self):
687 return coder_impl.IterableCoderImpl(self._elem_coder.get_impl())
688
689 def is_deterministic(self):
690 return self._elem_coder.is_deterministic()
691
692 def as_cloud_object(self):
693 return {
694 '@type': 'kind:stream',
695 'is_stream_like': True,
696 'component_encodings': [self._elem_coder.as_cloud_object()],
697 }
698
699 def value_coder(self):
700 return self._elem_coder
701
702 @staticmethod
703 def from_type_hint(typehint, registry):
704 return IterableCoder(registry.get_coder(typehint.inner_type))
705
706 def _get_component_coders(self):
707 return (self._elem_coder,)
708
709 def __repr__(self):
710 return 'IterableCoder[%r]' % self._elem_coder
711
712 def __eq__(self, other):
713 return (type(self) == type(other)
714 and self._elem_coder == self._elem_coder)
715
716 def __hash__(self):
717 return hash((type(self), self._elem_coder))
718
719
720 class GlobalWindowCoder(SingletonCoder):
721 """Coder for global windows."""
722
723 def __init__(self):
724 from apache_beam.transforms import window
725 super(GlobalWindowCoder, self).__init__(window.GlobalWindow())
726
727 def as_cloud_object(self):
728 return {
729 '@type': 'kind:global_window',
730 }
731
732
733 class IntervalWindowCoder(FastCoder):
734 """Coder for an window defined by a start timestamp and a duration."""
735
736 def _create_impl(self):
737 return coder_impl.IntervalWindowCoderImpl()
738
739 def is_deterministic(self):
740 return True
741
742 def as_cloud_object(self):
743 return {
744 '@type': 'kind:interval_window',
745 }
746
747 def __eq__(self, other):
748 return type(self) == type(other)
749
750 def __hash__(self):
751 return hash(type(self))
752
753
754 class WindowedValueCoder(FastCoder):
755 """Coder for windowed values."""
756
757 def __init__(self, wrapped_value_coder, window_coder=None):
758 if not window_coder:
759 window_coder = PickleCoder()
760 self.wrapped_value_coder = wrapped_value_coder
761 self.timestamp_coder = TimestampCoder()
762 self.window_coder = window_coder
763
764 def _create_impl(self):
765 return coder_impl.WindowedValueCoderImpl(
766 self.wrapped_value_coder.get_impl(),
767 self.timestamp_coder.get_impl(),
768 self.window_coder.get_impl())
769
770 def is_deterministic(self):
771 return all(c.is_deterministic() for c in [self.wrapped_value_coder,
772 self.timestamp_coder,
773 self.window_coder])
774
775 def as_cloud_object(self):
776 return {
777 '@type': 'kind:windowed_value',
778 'is_wrapper': True,
779 'component_encodings': [
780 component.as_cloud_object()
781 for component in self._get_component_coders()],
782 }
783
784 def _get_component_coders(self):
785 return [self.wrapped_value_coder, self.window_coder]
786
787 def is_kv_coder(self):
788 return self.wrapped_value_coder.is_kv_coder()
789
790 def key_coder(self):
791 return self.wrapped_value_coder.key_coder()
792
793 def value_coder(self):
794 return self.wrapped_value_coder.value_coder()
795
796 def __repr__(self):
797 return 'WindowedValueCoder[%s]' % self.wrapped_value_coder
798
799 def __eq__(self, other):
800 return (type(self) == type(other)
801 and self.wrapped_value_coder == other.wrapped_value_coder
802 and self.timestamp_coder == other.timestamp_coder
803 and self.window_coder == other.window_coder)
804
805 def __hash__(self):
806 return hash(
807 (self.wrapped_value_coder, self.timestamp_coder, self.window_coder))
808
809
810 class LengthPrefixCoder(FastCoder):
811 """For internal use only; no backwards-compatibility guarantees.
812
813 Coder which prefixes the length of the encoded object in the stream."""
814
815 def __init__(self, value_coder):
816 self._value_coder = value_coder
817
818 def _create_impl(self):
819 return coder_impl.LengthPrefixCoderImpl(self._value_coder)
820
821 def is_deterministic(self):
822 return self._value_coder.is_deterministic()
823
824 def estimate_size(self, value):
825 value_size = self._value_coder.estimate_size(value)
826 return get_varint_size(value_size) + value_size
827
828 def value_coder(self):
829 return self._value_coder
830
831 def as_cloud_object(self):
832 return {
833 '@type': 'kind:length_prefix',
834 'component_encodings': [self._value_coder.as_cloud_object()],
835 }
836
837 def _get_component_coders(self):
838 return (self._value_coder,)
839
840 def __repr__(self):
841 return 'LengthPrefixCoder[%r]' % self._value_coder
842
843 def __eq__(self, other):
844 return (type(self) == type(other)
845 and self._value_coder == other._value_coder)
846
847 def __hash__(self):
848 return hash((type(self), self._value_coder))