Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
ParquetFormatter.cpp
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// An output formatter for Parquet files
19/****************************************************************************/
20#include <config.h>
21
22#include <arrow/io/api.h>
23
26#include "ParquetFormatter.h"
27
28
29// ===========================================================================
30// member method definitions
31// ===========================================================================
32ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
33 : OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {
34 if (compression == "snappy") {
35 myCompression = parquet::Compression::SNAPPY;
36 } else if (compression == "gzip") {
37 myCompression = parquet::Compression::GZIP;
38 } else if (compression == "brotli") {
39 myCompression = parquet::Compression::BROTLI;
40 } else if (compression == "zstd") {
41 myCompression = parquet::Compression::ZSTD;
42 } else if (compression == "lz4") {
43 myCompression = parquet::Compression::LZ4;
44 } else if (compression == "bz2") {
45 myCompression = parquet::Compression::BZ2;
46 } else if (compression != "" && compression != "uncompressed") {
47 WRITE_ERRORF("Unknown compression: %", compression);
48 }
49 if (!arrow::util::Codec::IsAvailable(myCompression)) {
50 WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
51 myCompression = parquet::Compression::UNCOMPRESSED;
52 }
53}
54
55void
56ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
57 myXMLStack.push_back(myValues.size());
58 if (!myWroteHeader) {
59 myCurrentTag = xmlElement;
60 }
61 if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {
62 WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);
63 }
64}
65
66
67void
68ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
69 myXMLStack.push_back(myValues.size());
70 if (!myWroteHeader) {
71 myCurrentTag = toString(xmlElement);
72 }
73 if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {
74 WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));
75 }
76}
77
78
79bool
80ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
81 if (myMaxDepth == 0) {
82 myMaxDepth = (int)myXMLStack.size();
83 }
84 if (myMaxDepth == (int)myXMLStack.size() && !myWroteHeader) {
85 if (!myCheckColumns) {
86 WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
87 }
88 auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
89 std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();
90 myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);
91 myWroteHeader = true;
92 }
93 bool writeBatch = false;
94 if ((int)myXMLStack.size() == myMaxDepth) {
96 for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {
97 if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {
98 WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
100 }
101 }
102 }
103 int index = 0;
104 for (auto& builder : myBuilders) {
105 const auto val = myValues[index++];
106 PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
107 }
108 writeBatch = myBuilders.back()->length() == myBatchSize;
109 mySeenAttrs.reset();
110 }
111 if (writeBatch || myXMLStack.empty()) {
112 std::vector<std::shared_ptr<arrow::Array> > data;
113 for (auto& builder : myBuilders) {
114 std::shared_ptr<arrow::Array> column;
115 PARQUET_THROW_NOT_OK(builder->Finish(&column));
116 data.push_back(column);
117 // builder.reset();
118 }
119 auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);
120 PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));
121 }
122 if (!myXMLStack.empty()) {
123 while ((int)myValues.size() > myXMLStack.back()) {
124 if (!myWroteHeader) {
125 mySchema = *mySchema->RemoveField(mySchema->num_fields() - 1);
126 myBuilders.pop_back();
127 }
128 myValues.pop_back();
129 }
130 myXMLStack.pop_back();
131 }
132 return false;
133}
134
135
136/****************************************************************************/
#define WRITE_WARNINGF(...)
Definition MsgHandler.h:288
#define WRITE_ERRORF(...)
Definition MsgHandler.h:297
#define WRITE_WARNING(msg)
Definition MsgHandler.h:287
OutputFormatterType
SumoXMLTag
Numbers representing SUMO-XML - element names.
SumoXMLAttr
Numbers representing SUMO-XML - attributes.
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition ToString.h:46
Abstract base class for output formatters.
bool myCheckColumns
whether the columns should be checked for completeness
ParquetFormatter(const std::string &columnNames, const std::string &compression="", const int batchSize=1000000)
Constructor.
SumoXMLAttrMask myExpectedAttrs
the attributes which are expected for a complete row (including null values)
void openTag(std::ostream &into, const std::string &xmlElement)
Keeps track of an open XML tag by adding a new element to the stack.
std::shared_ptr< arrow::Schema > mySchema
the table schema
parquet::Compression::type myCompression
the compression to use
bool myWroteHeader
whether the schema has been constructed completely
std::vector< int > myXMLStack
The number of attributes in the currently open XML elements.
SumoXMLAttrMask mySeenAttrs
the attributes already seen (including null values)
std::vector< std::shared_ptr< arrow::ArrayBuilder > > myBuilders
the content array builders for the table
std::string myCurrentTag
the currently read tag (only valid when generating the header)
std::unique_ptr< parquet::arrow::FileWriter > myParquetWriter
the output stream writer
bool closeTag(std::ostream &into, const std::string &comment="")
Closes the most recently opened tag.
std::vector< std::shared_ptr< arrow::Scalar > > myValues
the current attribute / column values
const int myBatchSize
the number of rows to write per batch
int myMaxDepth
the maximum depth of the XML hierarchy