diff --git a/.gitignore b/.gitignore index c841c99e..44d9418a 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ Manifest.toml *.jl.mem test/_scrap.jl -.DS_STORE +.vscode \ No newline at end of file diff --git a/Project.toml b/Project.toml index f0d4efeb..30fee6eb 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Arrow" uuid = "69666777-d1a9-59fb-9406-91d4454c9d45" authors = ["quinnj "] -version = "1.3.0" +version = "1.4.1" [deps] BitIntegers = "c3b6d118-76ef-56ca-8cc7-ebb389d030a1" @@ -33,6 +33,8 @@ JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +PyCall = "438e738f-606a-5dbb-bf0a-cddfbfd45ab0" +Conda = "8f4d0f93-b110-5947-807f-2305c1781a2d" [targets] -test = ["Test", "Random", "JSON3", "StructTypes", "CategoricalArrays"] +test = ["Test", "Random", "JSON3", "StructTypes", "CategoricalArrays", "PyCall", "Conda"] diff --git a/src/Arrow.jl b/src/Arrow.jl index 4eac1762..71927501 100644 --- a/src/Arrow.jl +++ b/src/Arrow.jl @@ -92,7 +92,8 @@ include("arraytypes/arraytypes.jl") include("eltypes.jl") include("table.jl") include("write.jl") -include("cinterface.jl") +include("append.jl") +include("CDataInterface/CDataInterface.jl") const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[] const ZSTD_COMPRESSOR = ZstdCompressor[] @@ -106,6 +107,7 @@ function __init__() CodecLz4.TranscodingStreams.initialize(lz4) push!(LZ4_FRAME_COMPRESSOR, lz4) end + OBJ_METADATA_LOCK[] = ReentrantLock() return end diff --git a/src/ArrowTypes/src/ArrowTypes.jl b/src/ArrowTypes/src/ArrowTypes.jl index 24c57186..f0f05c46 100644 --- a/src/ArrowTypes/src/ArrowTypes.jl +++ b/src/ArrowTypes/src/ArrowTypes.jl @@ -208,7 +208,8 @@ _symbol(ptr, len) = ccall(:jl_symbol_n, Ref{Symbol}, (Ptr{UInt8}, Int), ptr, len fromarrow(::Type{Symbol}, ptr::Ptr{UInt8}, len::Int) = _symbol(ptr, len) ArrowKind(::Type{<:AbstractArray}) = ListKind() -fromarrow(::Type{A}, x::AbstractVector{T}) where {A <: AbstractVector{T}} where {T} = x +fromarrow(::Type{A}, x::A) where {A <: AbstractVector{T}} where {T} = x +fromarrow(::Type{A}, x::AbstractVector{T}) where {A <: AbstractVector{T}} where {T} = convert(A, x) ArrowKind(::Type{<:AbstractSet}) = ListKind() ArrowType(::Type{T}) where {T <: AbstractSet{S}} where {S} = Vector{S} toarrow(x::AbstractSet) = collect(x) diff --git a/src/CDataInterface/CDataInterface.jl b/src/CDataInterface/CDataInterface.jl new file mode 100644 index 00000000..484e09e9 --- /dev/null +++ b/src/CDataInterface/CDataInterface.jl @@ -0,0 +1,38 @@ +module CDataInterface + +import ..Arrow +include("c_definitions.jl") +include("jl_definitions.jl") +include("c_to_jl.jl") + +export ArrowSchema, ArrowArray, getschema, getarray + +function get_schema(f) + schema_ref = Ref{CArrowSchema}() + ptr = Base.unsafe_convert(Ptr{CArrowSchema}, schema_ref) + f(ptr) + sch = ArrowSchema(schema_ref) + finalizer(sch) do x + r = getfield(x.c_arrow_schema[], :release) + if r != C_NULL + ccall(r, Cvoid, (Ptr{CArrowSchema},), x.carrowschema) + end + end + return sch +end + +function get_array(f) + arr_ref = Ref{CArrowArray}() + ptr = Base.unsafe_convert(Ptr{CArrowArray}, arr_ref) + f(ptr) + arr = InterimCArrowArray(arr_ref) + finalizer(arr) do x + r = getfield(x.c_arrow_array[], :release) + if r != C_NULL + ccall(r, Cvoid, (Ptr{CArrowArray},), x.c_arrow_array) + end + end + return arr +end + +end # module diff --git a/src/cinterface.jl b/src/CDataInterface/c_definitions.jl similarity index 58% rename from src/cinterface.jl rename to src/CDataInterface/c_definitions.jl index a665e7aa..6c10f2cc 100644 --- a/src/cinterface.jl +++ b/src/CDataInterface/c_definitions.jl @@ -1,7 +1,3 @@ -module CData - -export ArrowSchema, ArrowArray, getschema, getarray - const ARROW_FLAG_DICTIONARY_ORDERED = 1 const ARROW_FLAG_NULLABLE = 2 const ARROW_FLAG_MAP_KEYS_SORTED = 4 @@ -22,7 +18,7 @@ CArrowSchema() = CArrowSchema(C_NULL, C_NULL, C_NULL, 0, 0, C_NULL, C_NULL, _CNU Base.propertynames(::CArrowSchema) = (:format, :name, :metadata, :flags, :n_children, :children, :dictionary) -function readmetadata(ptr::Ptr{UInt8}) +function read_c_arrow_schema_metadata(ptr::Ptr{UInt8}) pos = 1 meta = Dict{String, String}() if ptr != C_NULL @@ -49,7 +45,7 @@ function Base.getproperty(x::CArrowSchema, nm::Symbol) elseif nm === :name return unsafe_string(getfield(x, :name)) elseif nm === :metadata - return readmetadata(getfield(x, :metadata)) + return read_c_arrow_schema_metadata(getfield(x, :metadata)) elseif nm === :flags return getfield(x, :flags) elseif nm === :n_children @@ -64,34 +60,6 @@ function Base.getproperty(x::CArrowSchema, nm::Symbol) error("unknown property requested: $nm") end -mutable struct ArrowSchema - format::String - name::String - metadata::Dict{String, String} - flags::Int64 - n_children::Int64 - children::Vector{ArrowSchema} - dictionary::Union{Nothing, ArrowSchema} - carrowschema::Ref{CArrowSchema} -end - -ArrowSchema(s::Ref{CArrowSchema}) = ArrowSchema(s[].format, s[].name, s[].metadata, s[].flags, s[].n_children, map(ArrowSchema, s[].children), s[].dictionary === nothing ? nothing : ArrowSchema(s[].dictionary), s) -ArrowSchema(s::CArrowSchema) = ArrowSchema(s.format, s.name, s.metadata, s.flags, s.n_children, map(ArrowSchema, s.children), s.dictionary === nothing ? nothing : ArrowSchema(s.dictionary), Ref{CArrowSchema}()) - -function getschema(f) - schref = Ref{CArrowSchema}() - ptr = Base.unsafe_convert(Ptr{CArrowSchema}, schref) - f(ptr) - sch = ArrowSchema(schref) - finalizer(sch) do x - r = getfield(x.carrowschema[], :release) - if r != C_NULL - ccall(r, Cvoid, (Ptr{CArrowSchema},), x.carrowschema) - end - end - return sch -end - struct CArrowArray length::Int64 null_count::Int64 @@ -131,35 +99,4 @@ function Base.getproperty(x::CArrowArray, nm::Symbol) return d == C_NULL ? nothing : unsafe_load(d) end error("unknown property requested: $nm") -end - -mutable struct ArrowArray - length::Int64 - null_count::Int64 - offset::Int64 - n_buffers::Int64 - n_children::Int64 - buffers::Vector{Ptr{UInt8}} - children::Vector{ArrowArray} - dictionary::Union{Nothing, ArrowArray} - carrowarray::Ref{CArrowArray} -end - -ArrowArray(a::Ref{CArrowArray}) = ArrowArray(a[].length, a[].null_count, a[].offset, a[].n_buffers, a[].n_children, a[].buffers, map(ArrowArray, a[].children), a[].dictionary === nothing ? nothing : ArrowArray(a[].dictionary), a) -ArrowArray(a::CArrowArray) = ArrowArray(a.length, a.null_count, a.offset, a.n_buffers, a.n_children, a.buffers, map(ArrowArray, a.children), a.dictionary === nothing ? nothing : ArrowArray(a.dictionary), Ref{CArrowArray}()) - -function getarray(f) - arrref = Ref{CArrowArray}() - ptr = Base.unsafe_convert(Ptr{CArrowArray}, arrref) - f(ptr) - arr = ArrowArray(arrref) - finalizer(arr) do x - r = getfield(x.carrowarray[], :release) - if r != C_NULL - ccall(r, Cvoid, (Ptr{CArrowArray},), x.carrowarray) - end - end - return arr -end - -end # module +end \ No newline at end of file diff --git a/src/CDataInterface/c_to_jl.jl b/src/CDataInterface/c_to_jl.jl new file mode 100644 index 00000000..4a53df15 --- /dev/null +++ b/src/CDataInterface/c_to_jl.jl @@ -0,0 +1,240 @@ +# https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings + +nullable = true + +function convert_primitive( + type ::Type, + bitwidth ::Int, + c_arrow_array ::InterimCArrowArray, + c_arrow_schema ::ArrowSchema + ) ::Arrow.ArrowVector + length = c_arrow_array.length + + arrow_data_buffer = Base.unsafe_wrap( + Array, + c_arrow_array.buffers[2], + cld(length * bitwidth, 8)) + validity_bytes = Base.unsafe_wrap( + Array, + c_arrow_array.buffers[1], + cld(length, 8)) + validity_bitmap = Arrow.ValidityBitmap( + validity_bytes, + 1, # Since we are not reading from a file, the start pos will always be 1 + length, + c_arrow_array.null_count) + + data = reinterpret(type, arrow_data_buffer) + + T = nullable ? Union{type, Missing} : type + + Arrow.Primitive{T, AbstractVector{T}}( + arrow_data_buffer, + validity_bitmap, + data, + length, + c_arrow_schema.metadata) +end + +function convert_to_string_vector( + c_arrow_array ::InterimCArrowArray, + c_arrow_schema ::ArrowSchema + ) ::Arrow.ArrowVector + + length = c_arrow_array.length + offsets_buffer_binary = Base.unsafe_wrap( + Array, + c_arrow_array.buffers[2], + cld((length + 1) * 32, 8)) + offsets = Arrow.Offsets{Int32}( + offsets_buffer_binary, + reinterpret(Int32, offsets_buffer_binary)) + + arrow_data_buffer = Base.unsafe_wrap(Array, c_arrow_array.buffers[3], offsets |> last |> last) + + validity_bytes = Base.unsafe_wrap(Array, c_arrow_array.buffers[1], cld(length, 8)) + validity_bitmap = Arrow.ValidityBitmap(validity_bytes, 1, length, c_arrow_array.null_count) + + type = String + T = nullable ? Union{type, Missing} : type + + return Arrow.List{T, Int32, AbstractVector{UInt8}}( + arrow_data_buffer, + validity_bitmap, + offsets, + arrow_data_buffer, + length, + c_arrow_schema.metadata) +end + +function convert_struct( + c_arrow_array ::InterimCArrowArray, + c_arrow_schema ::ArrowSchema + ) ::Arrow.ArrowVector + + children_list = map(1:c_arrow_schema.n_children) do i + convert_to_jl_arrow(c_arrow_array.children[i], c_arrow_schema.children[i]) + end + children_tuple = tuple(children_list ...) + + validity_bytes = Base.unsafe_wrap(Array, c_arrow_array.buffers[1], cld(c_arrow_schema.n_children, 8)) + validity_bitmap = Arrow.ValidityBitmap(validity_bytes, 1, c_arrow_schema.n_children, c_arrow_array.null_count) + + Arrow.Struct{Any, typeof(children_tuple)}( + validity_bitmap, + children_tuple, + c_arrow_array.length, + c_arrow_schema.metadata) +end + +function get_validity_bitmap( + c_arrow_array ::InterimCArrowArray, + validity_buffer_pointer ::Ptr{UInt8}, + length ::Int + ) ::Arrow.ValidityBitmap + validity_bytes = Base.unsafe_wrap(Array, validity_buffer_pointer, cld(length, 8)) + return Arrow.ValidityBitmap(validity_bytes, 1, length, c_arrow_array.null_count) +end + +function convert_to_jl_arrow( + c_arrow_array ::InterimCArrowArray, + c_arrow_schema ::ArrowSchema + ) ::Arrow.ArrowVector + + format_string = c_arrow_schema.format + # Primitives + if format_string == "n" + Nothing + elseif format_string == "b" + Bool + elseif format_string == "c" + convert_primitive( + Int8, + 8, + c_arrow_array, + c_arrow_schema) + elseif format_string == "C" + convert_primitive( + UInt8, + 8, + c_arrow_array, + c_arrow_schema) + elseif format_string == "s" + convert_primitive( + Int16, + 16, + c_arrow_array, + c_arrow_schema) + elseif format_string == "S" + convert_primitive( + UInt16, + 16, + c_arrow_array, + c_arrow_schema) + elseif format_string == "i" + convert_primitive( + Int32, + 32, + c_arrow_array, + c_arrow_schema) + elseif format_string == "I" + convert_primitive( + UInt32, + 32, + c_arrow_array, + c_arrow_schema) + elseif format_string == "l" + convert_primitive( + Int64, + 64, + c_arrow_array, + c_arrow_schema) + elseif format_string == "L" + convert_primitive( + UInt64, + 64, + c_arrow_array, + c_arrow_schema) + elseif format_string == "e" + convert_primitive( + Float16, + 16, + c_arrow_array, + c_arrow_schema) + elseif format_string == "f" + convert_primitive( + Float32, + 32, + c_arrow_array, + c_arrow_schema) + elseif format_string == "g" + convert_primitive( + Float64, + 64, + c_arrow_array, + c_arrow_schema) + + # Binary types + elseif format_string == "z" || format_string == "Z" + Vector{UInt8} + elseif format_string == "u" || format_string == "U" + convert_to_string_vector(c_arrow_array, c_arrow_schema) + elseif format_string[1] == 'd' + splits = Int.(split(format_string[3:end], ",")) + precision = splits[1] + scale = splits[2] + bitwidth = length(splits) == 3 ? splits[3] : 128 + Decimal{precision, scale, bitwidth} + elseif format_string[1] == 'w' + Arrow.FixedSizeList{UInt8} + + # Nested Types + elseif format_string[1] == '+' + if format_string[2] == 'l' || format_string[2] == 'L' + Arrow.List + elseif format_string[2] == 'w' + size = Int(format_string[4:end]) #TODO use this somehow + Arrow.FixedSizeList + elseif format_string[2] == 's' + convert_struct(c_arrow_array, c_arrow_schema) + elseif format_string[2] == 'm' + Arrow.Map + elseif format_string[2:3] == "ud" + type_strings = split(format_string[5:end], ",") # todo use this somehow + Arrow.DenseUnion + elseif format_string[2:3] == "us" + type_strings = split(format_string[5:end], ",") # todo use this somehow + Arrow.DenseUnion + end + + # Temporal types + elseif format_string[1] == 't' + if format_string[2:3] == "dD" + Arrow.Date{Arrow.Flatbuf.DateUnitModule.DAY, Int32} + elseif format_string[2:3] == "dm" + Arrow.Date{Arrow.Flatbuf.DateUnitModule.MILLISECOND, Int64} + elseif format_string[2:3] == "ts" + Arrow.Time{Arrow.Flatbuf.TimeUnitModule.SECOND, Int32} + elseif format_string[2:3] == "tm" + Arrow.Time{Arrow.Flatbuf.TimeUnitModule.MILLISECOND, Int32} + elseif format_string[2:3] == "tu" + Arrow.Time{Arrow.Flatbuf.TimeUnitModule.MICROSECOND, Int64} + elseif format_string[2:3] == "tn" + Arrow.Time{Arrow.Flatbuf.TimeUnitModule.NANOSECOND, Int64} + elseif format_string[2] == 's' + timestamp_unit = if format_string[3] == 's' + Arrow.Flatbuf.TimeUnitModule.SECOND + elseif format_string[3] == 'm' + Arrow.Flatbuf.TimeUnitModule.MILLISECOND + elseif format_string[3] == 'u' + Arrow.Flatbuf.TimeUnitModule.MICROSECOND + elseif format_string[3] == 'n' + Arrow.Flatbuf.TimeUnitModule.NANOSECOND + end + + timezone = length(format_string) == 4 ? nothing : format_string[5:end] + + Arrow.Timestamp{timestamp_unit, timezone} + end + end +end diff --git a/src/CDataInterface/jl_definitions.jl b/src/CDataInterface/jl_definitions.jl new file mode 100644 index 00000000..c1e9b3db --- /dev/null +++ b/src/CDataInterface/jl_definitions.jl @@ -0,0 +1,67 @@ +mutable struct ArrowSchema + format ::String + name ::String + metadata ::Dict{String,String} + flags ::Int64 + n_children ::Int64 + children ::Vector{ArrowSchema} + dictionary ::Union{Nothing,ArrowSchema} + c_arrow_schema ::Ref{CArrowSchema} +end + +ArrowSchema(s::Ref{CArrowSchema}) = ArrowSchema( + s[].format, + s[].name, + s[].metadata, + s[].flags, + s[].n_children, + map(ArrowSchema, s[].children), + s[].dictionary === nothing ? nothing : ArrowSchema(s[].dictionary), + s +) + +ArrowSchema(s::CArrowSchema) = ArrowSchema( + s.format, + s.name, + s.metadata, + s.flags, + s.n_children, + map(ArrowSchema, s.children), s.dictionary === nothing ? nothing : ArrowSchema(s.dictionary), + Ref{CArrowSchema}() +) + +mutable struct InterimCArrowArray + length ::Int64 + null_count ::Int64 + offset ::Int64 + n_buffers ::Int64 + n_children ::Int64 + buffers ::Vector{Ptr{UInt8}} + children ::Vector{InterimCArrowArray} + dictionary ::Union{Nothing,InterimCArrowArray} + c_arrow_array ::Ref{CArrowArray} +end + +InterimCArrowArray(a::Ref{CArrowArray}) = InterimCArrowArray( + a[].length, + a[].null_count, + a[].offset, + a[].n_buffers, + a[].n_children, + a[].buffers, + map(InterimCArrowArray, a[].children), + a[].dictionary === nothing ? nothing : InterimCArrowArray(a[].dictionary), + a +) + +InterimCArrowArray(a::CArrowArray) = InterimCArrowArray( + a.length, + a.null_count, + a.offset, + a.n_buffers, + a.n_children, + a.buffers, + map(InterimCArrowArray, a.children), + a.dictionary === nothing ? nothing : InterimCArrowArray(a.dictionary), + Ref{CArrowArray}() +) diff --git a/src/append.jl b/src/append.jl new file mode 100644 index 00000000..8a4d044e --- /dev/null +++ b/src/append.jl @@ -0,0 +1,165 @@ +""" + Arrow.append(io::IO, tbl) + Arrow.append(file::String, tbl) + tbl |> Arrow.append(file) + +Append any [Tables.jl](https://github.com/JuliaData/Tables.jl)-compatible `tbl` +to an existing arrow formatted file or IO. The existing arrow data must be in +IPC stream format. Note that appending to the "feather formatted file" is _not_ +allowed, as this file format doesn't support appending. That means files written +like `Arrow.write(filename::String, tbl)` _cannot_ be appended to; instead, you +should write like `Arrow.write(filename::String, tbl; file=false)`. + +When an IO object is provided to be written on to, it must support seeking. For +example, a file opened in `r+` mode or an `IOBuffer` that is readable, writable +and seekable can be appended to, but not a network stream. + +Multiple record batches will be written based on the number of +`Tables.partitions(tbl)` that are provided; by default, this is just +one for a given table, but some table sources support automatic +partitioning. Note you can turn multiple table objects into partitions +by doing `Tables.partitioner([tbl1, tbl2, ...])`, but note that +each table must have the exact same `Tables.Schema`. + +By default, `Arrow.append` will use multiple threads to write multiple +record batches simultaneously (e.g. if julia is started with `julia -t 8` +or the `JULIA_NUM_THREADS` environment variable is set). + +Supported keyword arguments to `Arrow.append` include: + * `alignment::Int=8`: specify the number of bytes to align buffers to when written in messages; strongly recommended to only use alignment values of 8 or 64 for modern memory cache line optimization + * `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written; to dict encode specific columns, wrap the column/array in `Arrow.DictEncode(col)` + * `dictencodenested::Bool=false`: whether nested data type columns should also dict encode nested arrays/buffers; other language implementations [may not support this](https://arrow.apache.org/docs/status.html) + * `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout + * `largelists::Bool=false`: causes list column types to be written with Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets will be used only if needed + * `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization level; this is provided by default to prevent accidental infinite recursion with mutually recursive data structures + * `ntasks::Int`: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass `ntasks=1` + * `convert::Bool`: whether certain arrow primitive types in the schema of `file` should be converted to Julia defaults for matching them to the schema of `tbl`; by default, `convert=true`. + * `file::Bool`: applicable when an `IO` is provided, whether it is a file; by default `file=false`. +""" +function append end + +append(io_or_file; kw...) = x -> append(io_or_file, x; kw...) + +function append(file::String, tbl; kwargs...) + open(file, "r+") do io + append(io, tbl; file=true, kwargs...) + end + + return file +end + +function append(io::IO, tbl; + largelists::Bool=false, + denseunions::Bool=true, + dictencode::Bool=false, + dictencodenested::Bool=false, + alignment::Int=8, + maxdepth::Int=DEFAULT_MAX_DEPTH, + ntasks=Inf, + convert::Bool=true, + file::Bool=false) + + if ntasks < 1 + throw(ArgumentError("ntasks keyword argument must be > 0; pass `ntasks=1` to disable multithreaded writing")) + end + + isstream, arrow_schema, compress = stream_properties(io; convert=convert) + if !isstream + throw(ArgumentError("append is supported only to files in arrow stream format")) + end + + if compress === :lz4 + compress = LZ4_FRAME_COMPRESSOR + elseif compress === :zstd + compress = ZSTD_COMPRESSOR + elseif compress isa Symbol + throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`")) + end + + append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks) + + return io +end + +function append(io::IO, source, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks) + seekend(io) + skip(io, -8) # overwrite last 8 bytes of last empty message footer + + sch = Ref{Tables.Schema}(arrow_schema) + msgs = OrderedChannel{Message}(ntasks) + dictencodings = Dict{Int64, Any}() # Lockable{DictEncoding} + # build messages + blocks = (Block[], Block[]) + # start message writing from channel + threaded = ntasks > 1 + tsk = threaded ? (Threads.@spawn for msg in msgs + Base.write(io, msg, blocks, sch, alignment) + end) : (@async for msg in msgs + Base.write(io, msg, blocks, sch, alignment) + end) + anyerror = Threads.Atomic{Bool}(false) + errorref = Ref{Any}() + @sync for (i, tbl) in enumerate(Tables.partitions(source)) + if anyerror[] + @error "error writing arrow data on partition = $(errorref[][3])" exception=(errorref[][1], errorref[][2]) + error("fatal error writing arrow data") + end + @debug 1 "processing table partition i = $i" + tbl_cols = Tables.columns(tbl) + tbl_schema = Tables.schema(tbl_cols) + + if !is_equivalent_schema(arrow_schema, tbl_schema) + throw(ArgumentError("Table schema does not match existing arrow file schema")) + end + + if threaded + Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) + else + @async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) + end + end + if anyerror[] + @error "error writing arrow data on partition = $(errorref[][3])" exception=(errorref[][1], errorref[][2]) + error("fatal error writing arrow data") + end + # close our message-writing channel, no further put!-ing is allowed + close(msgs) + # now wait for our message-writing task to finish writing + wait(tsk) + + Base.write(io, Message(UInt8[], nothing, 0, true, false, Meta.Schema), blocks, sch, alignment) + + return io +end + +function stream_properties(io::IO; convert::Bool=true) + startpos = position(io) + buff = similar(FILE_FORMAT_MAGIC_BYTES) + start_magic = read!(io, buff) == FILE_FORMAT_MAGIC_BYTES + seekend(io) + len = position(io) - startpos + skip(io, -length(FILE_FORMAT_MAGIC_BYTES)) + end_magic = read!(io, buff) == FILE_FORMAT_MAGIC_BYTES + seek(io, startpos) # leave the stream position unchanged + + isstream = !(len > 24 && start_magic && end_magic) + if isstream + stream = Stream(io, convert=convert) + for table in stream + # no need to scan further once we get compression information + (stream.compression[] !== nothing) && break + end + seek(io, startpos) # leave the stream position unchanged + return isstream, Tables.Schema(stream.names, stream.types), stream.compression[] + else + return isstream, nothing, nothing + end +end + +function is_equivalent_schema(sch1::Tables.Schema, sch2::Tables.Schema) + (sch1.names == sch2.names) || (return false) + for (t1,t2) in zip(sch1.types, sch2.types) + (t1 === t2) || (return false) + end + true +end \ No newline at end of file diff --git a/src/arraytypes/arraytypes.jl b/src/arraytypes/arraytypes.jl index c06c616c..6db185a6 100644 --- a/src/arraytypes/arraytypes.jl +++ b/src/arraytypes/arraytypes.jl @@ -56,7 +56,7 @@ function arrowvector(x, i, nl, fi, de, ded, meta; dictencoding::Bool=false, dict if !(x isa DictEncode) && !dictencoding && (dictencode || DataAPI.refarray(x) !== x) x = DictEncode(x, dictencodeid(i, nl, fi)) elseif x isa DictEncoded - return arrowvector(DictEncodeType, x, i, nl, fi, de, ded, meta; dictencode=dictencode, kw...) + return arrowvector(DictEncodeType, x, i, nl, fi, de, ded, meta; dictencode=dictencode, kw...) elseif !(x isa DictEncode) x = ToArrow(x) end @@ -66,7 +66,7 @@ function arrowvector(x, i, nl, fi, de, ded, meta; dictencoding::Bool=false, dict meta["ARROW:extension:name"] = String(ArrowTypes.arrowname(T)) meta["ARROW:extension:metadata"] = String(ArrowTypes.arrowmetadata(T)) end - return arrowvector(S, x, i, nl, fi, de, ded, meta; dictencode=dictencode, kw...) + return arrowvector(S, x, i, nl, fi, de, ded, meta; dictencode=dictencode, maxdepth=maxdepth, kw...) end # now we check for ArrowType converions and dispatch on ArrowKind diff --git a/src/arraytypes/list.jl b/src/arraytypes/list.jl index e5dd12df..d0daebec 100644 --- a/src/arraytypes/list.jl +++ b/src/arraytypes/list.jl @@ -16,7 +16,7 @@ struct Offsets{T <: Union{Int32, Int64}} <: ArrowVector{Tuple{T, T}} arrow::Vector{UInt8} # need to hold a reference to arrow memory blob - offsets::Vector{T} + offsets::AbstractVector{T} end Base.size(o::Offsets) = (length(o.offsets) - 1,) diff --git a/src/table.jl b/src/table.jl index 86f02910..d6c71f7b 100644 --- a/src/table.jl +++ b/src/table.jl @@ -44,10 +44,12 @@ struct Stream batchiterator::BatchIterator pos::Int names::Vector{Symbol} + types::Vector{Type} schema::Meta.Schema dictencodings::Dict{Int64, DictEncoding} # dictionary id => DictEncoding dictencoded::Dict{Int64, Meta.Field} # dictionary id => field convert::Bool + compression::Ref{Union{Symbol,Nothing}} end Tables.partitions(x::Stream) = x @@ -75,19 +77,23 @@ function Stream(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothi # assert endianness? # store custom_metadata? names = Symbol[] + types = Type[] for (i, field) in enumerate(schema.fields) push!(names, Symbol(field.name)) + push!(types, juliaeltype(field, buildmetadata(field.custom_metadata), convert)) # recursively find any dictionaries for any fields getdictionaries!(dictencoded, field) @debug 1 "parsed column from schema: field = $field" end - return Stream(batchiterator, pos, names, schema, dictencodings, dictencoded, convert) + return Stream(batchiterator, pos, names, types, schema, dictencodings, dictencoded, convert, Ref{Union{Symbol,Nothing}}(nothing)) end Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown() function Base.iterate(x::Stream, (pos, id)=(x.pos, 1)) columns = AbstractVector[] + compression = nothing + while true state = iterate(x.batchiterator, (pos, id)) state === nothing && return nothing @@ -97,6 +103,9 @@ function Base.iterate(x::Stream, (pos, id)=(x.pos, 1)) id = header.id recordbatch = header.data @debug 1 "parsing dictionary batch message: id = $id, compression = $(recordbatch.compression)" + if recordbatch.compression !== nothing + compression = recordbatch.compression + end if haskey(x.dictencodings, id) && header.isDelta # delta field = x.dictencoded[id] @@ -114,6 +123,9 @@ function Base.iterate(x::Stream, (pos, id)=(x.pos, 1)) @debug 1 "parsed dictionary batch message: id=$id, data=$values\n" elseif header isa Meta.RecordBatch @debug 1 "parsing record batch message: compression = $(header.compression)" + if header.compression !== nothing + compression = header.compression + end for vec in VectorIterator(x.schema, batch, x.dictencodings, x.convert) push!(columns, vec) end @@ -122,6 +134,17 @@ function Base.iterate(x::Stream, (pos, id)=(x.pos, 1)) throw(ArgumentError("unsupported arrow message type: $(typeof(header))")) end end + + if compression !== nothing + if compression.codec == Flatbuf.CompressionType.ZSTD + x.compression[] = :zstd + elseif compression.codec == Flatbuf.CompressionType.LZ4_FRAME + x.compression[] = :lz4 + else + throw(ArgumentError("unsupported compression codec: $(compression.codec)")) + end + end + lookup = Dict{Symbol, AbstractVector}() types = Type[] for (nm, col) in zip(x.names, columns) diff --git a/src/write.jl b/src/write.jl index 146fda75..03f1f838 100644 --- a/src/write.jl +++ b/src/write.jl @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +const OBJ_METADATA_LOCK = Ref{ReentrantLock}() const OBJ_METADATA = IdDict{Any, Dict{String, String}}() """ @@ -24,7 +25,9 @@ Metadata attached to a table or column will be serialized when written as a stream or file. """ function setmetadata!(x, meta::Dict{String, String}) - OBJ_METADATA[x] = meta + lock(OBJ_METADATA_LOCK[]) do + OBJ_METADATA[x] = meta + end return end @@ -37,12 +40,12 @@ Metadata may be attached to any object via [`Arrow.setmetadata!`](@ref), or deserialized via the arrow format directly (the format allows attaching metadata to table, column, and other objects). -Note that this function's return value directly aliases `x`'s attached metadata -(i.e. is not a copy of the underlying storage). Any method author that overloads +Note that this function's return value directly aliases `x`'s attached metadata +(i.e. is not a copy of the underlying storage). Any method author that overloads this function should preserve this behavior so that downstream callers can rely on this behavior in generic code. """ -getmetadata(x, default=nothing) = get(OBJ_METADATA, x, default) +getmetadata(x, default=nothing) = lock(() -> get(OBJ_METADATA, x, default), OBJ_METADATA_LOCK[]) const DEFAULT_MAX_DEPTH = 6 @@ -132,8 +135,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte error("fatal error writing arrow data") end @debug 1 "processing table partition i = $i" + tblcols = Tables.columns(tbl) if i == 1 - cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) + cols = toarrowtable(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) sch[] = Tables.schema(cols) firstcols[] = cols put!(msgs, makeschemamsg(sch[], cols), i) @@ -149,9 +153,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte put!(msgs, makerecordbatchmsg(sch[], cols, alignment), i, true) else if threaded - Threads.@spawn process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) + Threads.@spawn process_partition(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) else - @async process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) + @async process_partition(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) end end end @@ -205,9 +209,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte return io end -function process_partition(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) +function process_partition(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) try - cols = toarrowtable(tbl, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) + cols = toarrowtable(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) if !isempty(cols.dictencodingdeltas) for de in cols.dictencodingdeltas dictsch = Tables.Schema((:col,), (eltype(de.data),)) @@ -229,9 +233,8 @@ struct ToArrowTable dictencodingdeltas::Vector{DictEncoding} end -function toarrowtable(x, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) +function toarrowtable(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) @debug 1 "converting input table to arrow formatted columns" - cols = Tables.columns(x) meta = getmetadata(cols) sch = Tables.schema(cols) types = collect(sch.types) diff --git a/test/CDataInterface/c_data_interface_test.jl b/test/CDataInterface/c_data_interface_test.jl new file mode 100644 index 00000000..4b22e09d --- /dev/null +++ b/test/CDataInterface/c_data_interface_test.jl @@ -0,0 +1,19 @@ +using Arrow, PyCall, Random +pd = pyimport("pandas") +pa = pyimport("pyarrow") +## +df = pd.DataFrame(Dict( + "floats" => map(x -> rand() < 0.5 ? rand(1:10) : nothing, 1:1_000_000), + "strings" => map(x -> rand() < 0.5 ? randstring(12) : nothing, 1:1_000_000) +)) +rb = pa.record_batch(df) + +c_arrow_schema = Arrow.CDataInterface.get_schema() do ptr + rb.schema._export_to_c(Int(ptr)) +end +c_arrow_array = Arrow.CDataInterface.get_array() do ptr + rb._export_to_c(Int(ptr)) +end +## +Arrow.CDataInterface.convert_to_jl_arrow(c_arrow_array, c_arrow_schema) +## \ No newline at end of file diff --git a/test/arrowjson.jl b/test/arrowjson.jl index bc4c3330..3553c848 100644 --- a/test/arrowjson.jl +++ b/test/arrowjson.jl @@ -559,7 +559,7 @@ function DataFile(source) dictencodings = Dict{String, Tuple{Base.Type, DictEncoding}}() dictid = Ref(0) for (i, tbl1) in Tables.partitions(source) - tbl = Arrow.toarrowtable(tbl1) + tbl = Arrow.toarrowtable(Table.columns(tbl1)) if i == 1 sch = Tables.schema(tbl) for (nm, T, col) in zip(sch.names, sch.types, Tables.Columns(tbl)) diff --git a/test/runtests.jl b/test/runtests.jl index 97271980..9dd3d9d4 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -18,6 +18,7 @@ using Test, Arrow, Tables, Dates, PooledArrays, TimeZones, UUIDs, CategoricalArr include(joinpath(dirname(pathof(Arrow)), "ArrowTypes/test/tests.jl")) include(joinpath(dirname(pathof(Arrow)), "../test/testtables.jl")) +include(joinpath(dirname(pathof(Arrow)), "../test/testappend.jl")) include(joinpath(dirname(pathof(Arrow)), "../test/integrationtest.jl")) include(joinpath(dirname(pathof(Arrow)), "../test/dates.jl")) @@ -41,6 +42,20 @@ end end # @testset "table roundtrips" +@testset "table append" begin + + for case in testtables + testappend(case...) + end + + testappend_partitions() + + for compression_option in (:lz4, :zstd) + testappend_compression(compression_option) + end + +end # @testset "table append" + @testset "arrow json integration tests" begin for file in readdir(joinpath(dirname(pathof(Arrow)), "../test/arrowjson")) @@ -308,6 +323,24 @@ tbl = Arrow.Table(bytes) @test length(tbl.a) == 0 @test eltype(tbl.a) == Union{Int64, Missing} +# 181 +tbl = (x = [Dict()],) +d = tbl.x[]; +for i in 1:20 + d[i] = Dict() + d = d[i] +end +msg = "reached nested serialization level (42) deeper than provided max depth argument (41); to increase allowed nesting level, pass `maxdepth=X`" +@test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=41).x +@test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=42)).x == tbl.x + +# 167 +t = ( + col1=[["boop", "she"], ["boop", "she"], ["boo"]], +) +tbl = Arrow.Table(Arrow.tobuffer(t)) +@test eltype(tbl.col1) == Vector{String} + end # @testset "misc" end diff --git a/test/testappend.jl b/test/testappend.jl new file mode 100644 index 00000000..f3de532b --- /dev/null +++ b/test/testappend.jl @@ -0,0 +1,125 @@ +function testappend(nm, t, writekw, readkw, extratests) + println("testing append: $nm") + io = Arrow.tobuffer(t; writekw...) + bytes = read(io) + mktemp() do path, io + write(io, bytes) + close(io) + + t1 = Arrow.Table(read(path); readkw...) + f1 = first(Tables.columns(t1)) + Arrow.append(path, t1; writekw..., readkw...) + nparts = 0 + for t2 in Arrow.Stream(path) + @test isequal(f1, first(Tables.columns(t2))) + nparts += 1 + end + @test nparts == 2 + end +end + +function testappend_compression(compression_option) + mktempdir() do path + testdata = (col1=Int64[1,2,3,4,5,6,7,8,9,10],) + file1 = joinpath(path, "table1.arrow") + file2 = joinpath(path, "table2.arrow") + + open(file1, "w") do io + Arrow.write(io, testdata; file=false, compress=compression_option) + end + isstream, schema, compression = open(Arrow.stream_properties, file1) + @test isstream + @test compression == compression_option + + open(file2, "w") do io + Arrow.write(io, testdata; file=false) + end + + arrow_table2 = Arrow.Table(file2) + arrow_table2 |> Arrow.append(file1) + arrow_table1 = Arrow.Table(file1) + + isstream, schema, compression = open(Arrow.stream_properties, file1) + @test isstream + @test compression == compression_option + + @test length(Tables.columns(arrow_table1)[1]) == 20 + @test length(Tables.columns(arrow_table2)[1]) == 10 + end +end + +function testappend_partitions() + mktempdir() do path + testdata = (col1=Int64[1,2,3,4,5,6,7,8,9,10],) + file1 = joinpath(path, "table1.arrow") + file2 = joinpath(path, "table2.arrow") + open(file1, "w") do io + Arrow.write(io, testdata; file=false) + end + arrow_table1 = Arrow.Table(file1) + isstream, schema, compression = open(Arrow.stream_properties, file1) + @test isstream + @test compression === nothing + @test schema.names == (:col1,) + @test schema.types == (Int64,) + + # can only append to arrow stream + open(file2, "w") do io + Arrow.write(io, testdata; file=true) + end + @test_throws ArgumentError Arrow.append(file2, arrow_table1) + + # schema must match + testdata2 = (col2=Int64[1,2,3,4,5,6,7,8,9,10],) + open(file2, "w") do io + Arrow.write(io, testdata2; file=false) + end + @test_throws ArgumentError Arrow.append(file2, arrow_table1) + + # recreate file2 in arrow format with correct schema + open(file2, "w") do io + Arrow.write(io, testdata; file=false) + end + + # start + # arrow_table1: 1 partition, 10 rows + # arrow_table2: 1 partition, 10 rows + arrow_table2 = Arrow.Table(file2) + @test length(Tables.columns(arrow_table1)[1]) == 10 + @test length(Tables.columns(arrow_table2)[1]) == 10 + + @test_throws ArgumentError Arrow.append(file1, arrow_table2; ntasks = -1) + arrow_table2 |> Arrow.append(file1) + arrow_table1 = Arrow.Table(file1) + # now + # arrow_table1: 2 partitions, 20 rows + # arrow_table2: 1 partition, 10 rows + + @test Tables.schema(arrow_table1) == Tables.schema(arrow_table2) + @test length(Tables.columns(arrow_table1)[1]) == 20 + @test length(Tables.columns(arrow_table2)[1]) == 10 + @test length(collect(Tables.partitions(Arrow.Stream(file1)))) == 2 * length(collect(Tables.partitions(Arrow.Stream(file2)))) + + Arrow.append(file2, arrow_table1; ntasks=1) # append with single task + arrow_table2 = Arrow.Table(file2) + # now + # arrow_table1: 2 partitions, 20 rows + # arrow_table2: 2 partitions, 30 rows (both partitions of table1 are appended as single partition) + + @test Tables.schema(arrow_table1) == Tables.schema(arrow_table2) + @test length(Tables.columns(arrow_table1)[1]) == 20 + @test length(Tables.columns(arrow_table2)[1]) == 30 + @test length(collect(Tables.partitions(Arrow.Stream(file1)))) == length(collect(Tables.partitions(Arrow.Stream(file2)))) + + Arrow.append(file1, Arrow.Stream(file2)) + arrow_table1 = Arrow.Table(file1) + # now + # arrow_table1: 4 partitions, 50 rows (partitions of file2 stream are appended without being merged) + # arrow_table2: 2 partitions, 30 rows + + @test Tables.schema(arrow_table1) == Tables.schema(arrow_table2) + @test length(Tables.columns(arrow_table1)[1]) == 50 + @test length(Tables.columns(arrow_table2)[1]) == 30 + @test length(collect(Tables.partitions(Arrow.Stream(file1)))) == 2 * length(collect(Tables.partitions(Arrow.Stream(file2)))) + end +end \ No newline at end of file