Eclipse SUMO - Simulation of Urban MObility
Loading...
Searching...
No Matches
ParquetFormatter.h
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3// Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
18// Output formatter for Parquet output
19/****************************************************************************/
20#pragma once
21#include <config.h>
22
23#include <ostream>
24
25#ifdef _MSC_VER
26#pragma warning(push)
27/* Disable warning about unused parameters */
28#pragma warning(disable: 4100)
29/* Disable warning about hidden function arrow::io::Writable::Write */
30#pragma warning(disable: 4266)
31/* Disable warning about padded memory layout */
32#pragma warning(disable: 4324)
33/* Disable warning about this in initializers */
34#pragma warning(disable: 4355)
35/* Disable warning about changed memory layout due to virtual base class */
36#pragma warning(disable: 4435)
37/* Disable warning about declaration hiding class member */
38#pragma warning(disable: 4458)
39/* Disable warning about implicit conversion of int to bool */
40#pragma warning(disable: 4800)
41#endif
42#include <arrow/api.h>
43#include <arrow/io/interfaces.h>
44#include <arrow/status.h>
45#include <parquet/arrow/writer.h>
46#ifdef _MSC_VER
47#pragma warning(pop)
48#endif
49
51#include "OutputFormatter.h"
52
53
54// ===========================================================================
55// class definitions
56// ===========================================================================
57class ArrowOStreamWrapper : public arrow::io::OutputStream {
58public:
59 ArrowOStreamWrapper(std::ostream& out)
60 : myOStream(out), myAmOpen(true) {}
61
62 arrow::Status Close() override {
63 myAmOpen = false;
64 return arrow::Status::OK();
65 }
66
67 arrow::Status Flush() override {
68 myOStream.flush();
69 return arrow::Status::OK();
70 }
71
72 arrow::Result<int64_t> Tell() const override {
73 return myOStream.tellp();
74 }
75
76 bool closed() const override {
77 return !myAmOpen;
78 }
79
80 arrow::Status Write(const void* data, int64_t nbytes) override {
81 if (!myAmOpen) {
82 return arrow::Status::IOError("Write on closed stream");
83 }
84 myOStream.write(reinterpret_cast<const char*>(data), nbytes);
85 if (!myOStream) {
86 return arrow::Status::IOError("Failed to write to ostream");
87 }
88 return arrow::Status::OK();
89 }
90
91private:
92 std::ostream& myOStream;
94};
95
96
102public:
104 // for some motivation on the default batch size see https://stackoverflow.com/questions/76782018/what-is-actually-meant-when-referring-to-parquet-row-group-size
105 ParquetFormatter(const std::string& columnNames, const std::string& compression = "", const int batchSize = 1000000);
106
108 virtual ~ParquetFormatter() { }
109
116 void openTag(std::ostream& into, const std::string& xmlElement);
117
123 void openTag(std::ostream& into, const SumoXMLTag& xmlElement);
124
131 bool closeTag(std::ostream& into, const std::string& comment = "");
132
139 template <class T>
140 void writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const T& val, const bool isNull = false) {
141 checkAttr(attr);
142 if (!myWroteHeader) {
143 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::utf8()));
144 myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
145 }
146 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::StringScalar>(toString(val)));
147 }
148
149 template <class T>
150 void writeAttr(std::ostream& /* into */, const std::string& attr, const T& val) {
151 assert(!myCheckColumns);
152 if (!myWroteHeader) {
153 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::utf8()));
154 myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
155 }
156 myValues.push_back(std::make_shared<arrow::StringScalar>(toString(val)));
157 }
158
159 void writeTime(std::ostream& into, const SumoXMLAttr attr, const SUMOTime val) {
160 if (!gHumanReadableTime) {
161 if (!myWroteHeader) {
162 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
163 myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
164 }
165 myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
166 return;
167 }
168 writeAttr(into, attr, time2string(val));
169 }
170
171 bool wroteHeader() const {
172 return myWroteHeader;
173 }
174
175 void setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth = 2) {
176 myExpectedAttrs = expected;
177 myMaxDepth = depth;
178 myCheckColumns = expected.any();
179 }
180
181private:
182 inline const std::string getAttrString(const std::string& attrString) {
183 if (myHeaderFormat == "plain") {
184 return attrString;
185 }
186 if (myHeaderFormat == "auto") {
187 for (const auto& field : mySchema->fields()) {
188 if (field->name() == attrString) {
189 return myCurrentTag + "_" + attrString;
190 }
191 }
192 return attrString;
193 }
194 return myCurrentTag + "_" + attrString;
195 }
196
197 inline void checkAttr(const SumoXMLAttr attr) {
198 if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
199 mySeenAttrs.set(attr);
200 if (!myExpectedAttrs.test(attr)) {
201 throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
202 }
203 }
204 }
205
207 const std::string myHeaderFormat;
208
210 parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
211
213 const int myBatchSize;
214
216 std::string myCurrentTag;
217
219 std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
220
222 std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
223
225 std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
226
228 std::vector<int> myXMLStack;
229
231 std::vector<std::shared_ptr<arrow::Scalar> > myValues;
232
234 int myMaxDepth = 0;
235
237 bool myWroteHeader = false;
238
240 bool myCheckColumns = false;
241
244
247};
248
249
250// ===========================================================================
251// specialized template implementations
252// ===========================================================================
253template <>
254inline void ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
255 checkAttr(attr);
256 if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
257 if (!myWroteHeader) {
258 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
259 myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
260 }
261 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
262 } else {
263 if (!myWroteHeader) {
264 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float32()));
265 myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
266 }
267 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
268 }
269}
270
271template <>
272inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
273 checkAttr(attr);
274 if (!myWroteHeader) {
275 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::int32()));
276 myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
277 }
278 myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
279}
280
281template <>
282inline void ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val) {
283 assert(!myCheckColumns);
284 if (into.precision() > 2) {
285 if (!myWroteHeader) {
286 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float64()));
287 myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
288 }
289 myValues.push_back(std::make_shared<arrow::DoubleScalar>(val));
290 } else {
291 if (!myWroteHeader) {
292 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float32()));
293 myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
294 }
295 myValues.push_back(std::make_shared<arrow::FloatScalar>((float)val));
296 }
297}
298
299template <>
300inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val) {
301 assert(!myCheckColumns);
302 if (!myWroteHeader) {
303 mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::int32()));
304 myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
305 }
306 myValues.push_back(std::make_shared<arrow::Int32Scalar>(val));
307}
long long int SUMOTime
Definition GUI.h:36
#define TLF(string,...)
Definition MsgHandler.h:307
std::string time2string(SUMOTime t, bool humanReadable)
convert SUMOTime to string (independently of global format setting)
Definition SUMOTime.cpp:91
#define STEPS2TIME(x)
Definition SUMOTime.h:55
SumoXMLTag
Numbers representing SUMO-XML - element names.
std::bitset< 96 > SumoXMLAttrMask
SumoXMLAttr
Numbers representing SUMO-XML - attributes.
@ SUMO_ATTR_Y
@ SUMO_ATTR_X
bool gHumanReadableTime
Definition StdDefs.cpp:30
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition ToString.h:46
bool closed() const override
arrow::Status Close() override
arrow::Status Flush() override
arrow::Status Write(const void *data, int64_t nbytes) override
std::ostream & myOStream
ArrowOStreamWrapper(std::ostream &out)
arrow::Result< int64_t > Tell() const override
Abstract base class for output formatters.
Output formatter for Parquet output.
bool myCheckColumns
whether the columns should be checked for completeness
virtual ~ParquetFormatter()
Destructor.
const std::string getAttrString(const std::string &attrString)
void setExpectedAttributes(const SumoXMLAttrMask &expected, const int depth=2)
Set the expected attributes to write. This is used for tracking which attributes are expected in tabl...
SumoXMLAttrMask myExpectedAttrs
the attributes which are expected for a complete row (including null values)
void openTag(std::ostream &into, const std::string &xmlElement)
Keeps track of an open XML tag by adding a new element to the stack.
std::shared_ptr< arrow::Schema > mySchema
the table schema
void writeTime(std::ostream &into, const SumoXMLAttr attr, const SUMOTime val)
parquet::Compression::type myCompression
the compression to use
const std::string myHeaderFormat
the format to use for the column names
bool myWroteHeader
whether the schema has been constructed completely
std::vector< int > myXMLStack
The number of attributes in the currently open XML elements.
bool wroteHeader() const
Returns whether a header has been written. Useful to detect whether a file is being used by multiple ...
SumoXMLAttrMask mySeenAttrs
the attributes already seen (including null values)
void writeAttr(std::ostream &, const std::string &attr, const T &val)
std::vector< std::shared_ptr< arrow::ArrayBuilder > > myBuilders
the content array builders for the table
void checkAttr(const SumoXMLAttr attr)
std::string myCurrentTag
the currently read tag (only valid when generating the header)
std::unique_ptr< parquet::arrow::FileWriter > myParquetWriter
the output stream writer
bool closeTag(std::ostream &into, const std::string &comment="")
Closes the most recently opened tag.
std::vector< std::shared_ptr< arrow::Scalar > > myValues
the current attribute / column values
const int myBatchSize
the number of rows to write per batch
int myMaxDepth
the maximum depth of the XML hierarchy
void writeAttr(std::ostream &, const SumoXMLAttr attr, const T &val, const bool isNull=false)
writes a named attribute