ESDM
Middleware for Earth System Data
esdm-stream.h
1 #ifndef ESDM_STREAM_H
2 #define ESDM_STREAM_H
3 
4 #include <esdm-datatypes.h>
5 #include <stdbool.h>
6 
7 #ifdef __cplusplus
8 extern "C" {
9 #endif
10 
12 
13 #define defineStreamType(streamType, elementType) typedef struct streamType { \
14  esdm_wstream_metadata_t* metadata; /*contains opaque implementation details of the stream API*/ \
15  elementType *buffer, *iter, *iterEnd, *bufferEnd; \
16 } streamType
17 defineStreamType(esdm_wstream_uint8_t, uint8_t);
18 defineStreamType(esdm_wstream_uint16_t, uint16_t);
19 defineStreamType(esdm_wstream_uint32_t, uint32_t);
20 defineStreamType(esdm_wstream_uint64_t, uint64_t);
21 defineStreamType(esdm_wstream_int8_t, int8_t);
22 defineStreamType(esdm_wstream_int16_t, int16_t);
23 defineStreamType(esdm_wstream_int32_t, int32_t);
24 defineStreamType(esdm_wstream_int64_t, int64_t);
25 defineStreamType(esdm_wstream_float_t, float);
26 defineStreamType(esdm_wstream_double_t, double);
27 #undef defineStreamType
28 
49 //XXX: Unfortunately, `_Generic()` can only return an expression, not a type.
50 // That is why we cannot implement a portable `esdm_typeof()` macro based on `_Generic()`.
51 // As such, we need to rely on compiler extensions instead.
52 // Currently, this is only implemented for `gcc`, but we should add some preprocessor magic to work with any compiler that offers some kind of a `typeof()` implementation.
53 #define esdm_wstream_start(stream, dataset, dimCount, offset, size) do { \
54  typeof(*stream)* const esdm_internal_stream_ptr = (stream); /*avoid multiple evaluation*/ \
55  esdm_wstream_metadata_t* esdm_internal_stream_metadata = esdm_wstream_metadata_create(dataset, dimCount, offset, size, smd_c_to_smd_type(*esdm_internal_stream_ptr->buffer)); \
56  int64_t esdm_internal_element_count = esdm_wstream_metadata_max_chunk_size(esdm_internal_stream_metadata); \
57  typeof(*stream.buffer) esdm_internal_buffer = (typeof(*stream.buffer)) malloc(esdm_internal_element_count*sizeof(*esdm_internal_stream_ptr->buffer)); \
58  *esdm_internal_stream_ptr = (typeof(*stream)){ \
59  .metadata = esdm_internal_stream_metadata, \
60  .buffer = esdm_internal_buffer, \
61  .iter = esdm_internal_buffer, \
62  .iterEnd = esdm_internal_buffer + esdm_wstream_metadata_next_chunk_size(esdm_internal_stream_metadata), \
63  .bufferEnd = esdm_internal_buffer + esdm_internal_element_count \
64  }; \
65 } while(0)
66 
67 
77 #define esdm_wstream_pack(stream, value) do { \
78  typeof(stream)* const esdm_internal_stream_ptr = &(stream); /*avoid multiple evaluation*/ \
79  if(esdm_internal_stream_ptr->iter >= esdm_internal_stream_ptr->iterEnd) { \
80  fprintf(stderr, "wstream attempt to push more data into a stream than defined at stream creation %p %p\n", esdm_internal_stream_ptr->iter, esdm_internal_stream_ptr->iterEnd); \
81  abort(); \
82  } \
83  *esdm_internal_stream_ptr->iter++ = (value); \
84  if(esdm_internal_stream_ptr->iter == esdm_internal_stream_ptr->iterEnd) { \
85  esdm_wstream_flush(esdm_internal_stream_ptr->metadata, esdm_internal_stream_ptr->buffer, esdm_internal_stream_ptr->iter); \
86  esdm_internal_stream_ptr->iter = esdm_internal_stream_ptr->buffer; \
87  esdm_internal_stream_ptr->iterEnd = esdm_internal_stream_ptr->iter + esdm_wstream_metadata_next_chunk_size(esdm_internal_stream_ptr->metadata); \
88  } \
89 } while(0)
90 
91 //printf("Next iter: %lld %lld\n", esdm_internal_stream_ptr->iter, esdm_internal_stream_ptr->iterEnd);
92 
101 #define esdm_wstream_commit(stream) do { \
102  typeof(stream)* const esdm_internal_stream_ptr = &(stream); /*avoid multiple evaluation*/ \
103  if(esdm_internal_stream_ptr->iter != esdm_internal_stream_ptr->buffer) { \
104  fprintf(stderr, "wstream: preliminary commit of a stream: too few calls to esdm_wstream_pack()\n"); \
105  abort(); \
106  } \
107  /*since `esdm_wstream_pack()` flushes the stream *after* adding the last value, we only need to perform local cleanup*/ \
108  esdm_wstream_metadata_destroy(esdm_internal_stream_ptr->metadata); \
109  free(esdm_internal_stream_ptr->buffer); \
110  *esdm_internal_stream_ptr = (typeof(stream)){0}; \
111 } while(0)
112 
114 // Internal API ////////////////////////////////////////////////////////////////////////////////////
116 
122 esdm_wstream_metadata_t* esdm_wstream_metadata_create(esdm_dataset_t* dataset, int64_t dimCount, int64_t* offset, int64_t* size, esdm_type_t type);
123 
129 int64_t esdm_wstream_metadata_max_chunk_size(esdm_wstream_metadata_t* metadata);
130 
136 int64_t esdm_wstream_metadata_next_chunk_size(esdm_wstream_metadata_t* metadata);
137 
145 void esdm_wstream_flush(esdm_wstream_metadata_t* metadata, void* buffer, void* bufferEnd);
146 
152 void esdm_wstream_metadata_destroy(esdm_wstream_metadata_t* metadata);
153 
155 // Callback API for data processing within the backends ////////////////////////////////////////////
157 
159  esdm_fragment_t * fragment;
160  void * backend_state; // backend may store anything here for streaming
161 };
162 
163 /*
164  * Pack the whole data of the fragment at once into outbuffer, potentially apply compression
165  *
166  * @param in_out_buff If *in_out_buff is != NULL, it is a pointer to a memory region where the output is stored. The output memory region must be large enough. If it is NULL, the function will return a pointer to the memory region that must be used. If it is != f->buffer, then it must be freed.
167  * @param out_size The actual capacity used of the output buffer.
168  */
169 int estream_mem_pack_fragment(esdm_fragment_t *f, void ** in_out_buff, size_t * out_size);
170 
171 
172 bool estream_mem_unpack_fragment_param(esdm_fragment_t *f, void ** out_buf, size_t * out_size);
173 /*
174  * Reverse function, takes the read buffer and stuffs the data into the output buffer
175  */
176 int estream_mem_unpack_fragment(esdm_fragment_t *f, void * rbuff, size_t size);
177 
178 #ifdef __cplusplus
179 }
180 #endif
181 
182 #endif
Datatype primitives provided by ESDM.
Definition: esdm-datatypes-internal.h:59
Definition: esdm-datatypes-internal.h:78
Definition: esdm-stream.c:8
Definition: esdm-stream.h:158