Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions sdks/python/apache_beam/yaml/json_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ def maybe_nullable(beam_type, nullable):

json_type = json_schema.get('type', None)
if json_type != 'object':
raise ValueError('Expected object type, got {json_type}.')
raise ValueError(f'Expected object type, got {json_type}.')
if 'properties' not in json_schema:
# Technically this is a valid (vacuous) schema, but as it's not generally
# meaningful, throw an informative error instead.
# (We could add a flag to allow this degenerate case.)
raise ValueError('Missing properties for {json_schema}.')
raise ValueError(f'Missing properties for {json_schema}.')
required = set(json_schema.get('required', []))
return schema_pb2.Schema(
fields=[
Expand Down Expand Up @@ -314,23 +314,23 @@ def _validate_compatible(weak_schema, strong_schema):
return
if weak_schema['type'] != strong_schema['type']:
raise ValueError(
'Incompatible types: %r vs %r' %
(weak_schema['type'] != strong_schema['type']))
f"Incompatible types: {weak_schema['type']!r} vs "
f"{strong_schema['type']!r}")
if weak_schema['type'] == 'array':
_validate_compatible(weak_schema['items'], strong_schema['items'])
elif weak_schema == 'object':
elif weak_schema['type'] == 'object':
for required in strong_schema.get('required', []):
if required not in weak_schema['properties']:
raise ValueError('Missing or unkown property %r' % required)
for name, spec in weak_schema.get('properties', {}):
raise ValueError(f'Missing or unknown property {required!r}')
for name, spec in weak_schema.get('properties', {}).items():
if name in strong_schema['properties']:
try:
_validate_compatible(spec, strong_schema['properties'][name])
except Exception as exn:
raise ValueError('Incompatible schema for %r' % name) from exn
raise ValueError(f'Incompatible schema for {name!r}') from exn
elif not strong_schema.get('additionalProperties'):
raise ValueError(
'Prohibited property: {property}; '
f'Prohibited property: {name}; '
'perhaps additionalProperties: False is missing?')


Expand Down
26 changes: 26 additions & 0 deletions sdks/python/apache_beam/yaml/json_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,32 @@ def test_json_to_row_with_missing_required_field(self):
with self.assertRaises(KeyError):
converter(json_data)

def test_row_validator_compatibility_error(self):
beam_schema = schema_pb2.Schema(
fields=[
schema_pb2.Field(
name='f',
type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING))
])
json_schema = {
'type': 'object',
'properties': {
'f': {
'type': 'integer'
}
}
}
with self.assertRaisesRegex(ValueError, "Incompatible schema for 'f'"):
json_utils.row_validator(beam_schema, json_schema)

def test_json_schema_to_beam_schema_errors(self):
with self.assertRaisesRegex(
ValueError, "Expected object type, got not_object"):
json_utils.json_schema_to_beam_schema({'type': 'not_object'})
with self.assertRaisesRegex(
ValueError, "Missing properties for"):
json_utils.json_schema_to_beam_schema({'type': 'object'})


if __name__ == '__main__':
unittest.main()
Loading