multithread

  1/*
  2   @mindmaze_header@
  3*/
  4
  5/* multithreaded data write
  6 *
  7 * example of child process, to be used by pshared-parent.c example.
  8 *
  9 * This program writes to a shared text (in shared memory) concurrently with
 10 * other threads in the same process. When notified, a worker thread tries
 11 * to write its identification string ("|-thread-X+|") onto a text field of
 12 * the shared memory. The text after update of several worker threads looks
 13 * something like:
 14 *
 15 *     ...|+thread-Z+||+thread-W+||+thread-X+||+thread-Y+|...
 16 *
 17 * This file demonstrates how to:
 18 *  - map file into memory
 19 *  - use process shared mutex
 20 */
 21#include <stdlib.h>
 22#include <stdio.h>
 23#include <string.h>
 24#include <mmthread.h>
 25
 26#define NUM_THREAD	6
 27#define MAX_ID_LEN	16
 28
 29struct shared_data {
 30	mm_thr_mutex_t mutex;
 31	int len;
 32	char text[1024];
 33	mm_thr_mutex_t notif_mtx;
 34	mm_thr_cond_t notif_cond;
 35	int start;
 36};
 37
 38struct thread_data {
 39	struct shared_data* shdata;
 40	char id_str[MAX_ID_LEN];
 41};
 42
 43
 44/*
 45 * This function do the update of the shared text. It happens the
 46 * string |+@id_str+| to the text field in @psh_data.
 47 */
 48static
 49void write_shared_data(struct shared_data* shdata, const char* id_str)
 50{
 51	int id_str_len = strlen(id_str);
 52
 53	// Get the shared lock. Since we are using a normal mutex, we do not
 54	// have to check the return value
 55	mm_thr_mutex_lock(&shdata->mutex);
 56
 57	// Add "|+" in the text
 58	shdata->text[shdata->len++] = '|';
 59	shdata->text[shdata->len++] = '+';
 60
 61	// Append process identifier on text
 62	memcpy(shdata->text + shdata->len, id_str, id_str_len);
 63	shdata->len += id_str_len;
 64
 65	// Add "+|" in the text
 66	shdata->text[shdata->len++] = '+';
 67	shdata->text[shdata->len++] = '|';
 68
 69	mm_thr_mutex_unlock(&shdata->mutex);
 70}
 71
 72
 73static
 74void wait_start_notification(struct shared_data* shdata)
 75{
 76	mm_thr_mutex_lock(&shdata->notif_mtx);
 77
 78	// A while loop is necessary, because a spurious wakeup is always
 79	// possible
 80	while (!shdata->start)
 81		mm_thr_cond_wait(&shdata->notif_cond, &shdata->notif_mtx);
 82
 83	mm_thr_mutex_unlock(&shdata->notif_mtx);
 84}
 85
 86
 87static
 88void broadcast_start_notification(struct shared_data* shdata)
 89{
 90	// We want a worker thread to be be scheduled in a predictable way,
 91	// so we must own shdata->notif_mtx when calling
 92	// mm_thr_cond_broadcast()
 93	mm_thr_mutex_lock(&shdata->notif_mtx);
 94
 95	shdata->start = 1;
 96	mm_thr_cond_broadcast(&shdata->notif_cond);
 97
 98	mm_thr_mutex_unlock(&shdata->notif_mtx);
 99}
100
101
102static
103void* thread_func(void* data)
104{
105	struct thread_data* thdata = data;
106	struct shared_data* shdata = thdata->shdata;
107	const char* id_str = thdata->id_str;
108
109	// Put a wait here to force a litle bit of more contention. This is
110	// here only for demonstration purpose... Without it, since the
111	// update of text is short and simple, the text would be likely
112	// filed in the order of thread creation
113	wait_start_notification(shdata);
114
115	write_shared_data(shdata, id_str);
116
117	return NULL;
118}
119
120
121int main(void)
122{
123	int i;
124	mm_thread_t thid[NUM_THREAD];
125	struct thread_data thdata[NUM_THREAD];
126	struct shared_data shared = {
127		.mutex = MM_THR_MUTEX_INITIALIZER,
128		.notif_mtx = MM_THR_MUTEX_INITIALIZER,
129		.notif_cond = MM_THR_COND_INITIALIZER,
130		.start = 0,
131	};
132
133	// Create threads and assign each an ID string
134	for (i = 0; i < NUM_THREAD; i++) {
135		thdata[i].shdata = &shared;
136		sprintf(thdata[i].id_str, "thread-%i", i);
137		mm_thr_create(&thid[i], thread_func, &thdata[i]);
138	}
139
140	// Now that all thread are created, we can signal them to start
141	broadcast_start_notification(&shared);
142
143	for (i = 0; i < NUM_THREAD; i++)
144		mm_thr_join(thid[i], NULL);
145
146	printf("result string:%s\n", shared.text);
147	return EXIT_SUCCESS;
148}
149

parse_args

  1/*
  2 * @mindmaze_header@
  3 */
  4
  5#include <mmargparse.h>
  6#include <stdio.h>
  7#include <stdlib.h>
  8#include <string.h>
  9#include <errno.h>
 10#include <mmsysio.h>
 11
 12struct config {
 13	const char* detach_flag;
 14	unsigned int num_instance;
 15	const char* ip;
 16	const char* use_local_storage;
 17};
 18static
 19struct config cfg = {
 20	.num_instance = 10,
 21	.ip = "127.0.0.1",
 22};
 23
 24#define LOREM_IPSUM "Lorem ipsum dolor sit amet, consectetur adipiscing"   \
 25	"elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." \
 26	"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi " \
 27	"ut aliquip ex ea commodo consequat..."
 28
 29#define DEFAULT_PATH "/default/path"
 30
 31static
 32struct mm_arg_opt cmdline_optv[] = {
 33	{"detach", MM_OPT_NOVAL, "set", {.sptr = &cfg.detach_flag},
 34	 "detach server process."},
 35	{"n|num-instance", MM_OPT_NEEDUINT, NULL, {.uiptr = &cfg.num_instance},
 36	 "Server can accommodate up to @NUM client simultaneously. Here is "
 37	 "more explanation to test text wrapping. " LOREM_IPSUM},
 38	{"l|use-local-storage", MM_OPT_OPTSTR, DEFAULT_PATH, {NULL},
 39	 "Use local storage located at @PATH which must exist. "
 40	 "If unspecified @PATH is assumed "DEFAULT_PATH "."},
 41	{.name = "i", .flags = MM_OPT_NEEDSTR, .defval = NULL,
 42	 {.sptr = &cfg.ip},
 43	 .desc = "IP address of remote server. @ADDR must have dotted form."},
 44};
 45
 46
 47/**
 48 * parse_option_cb() - validate some option value and parse other
 49 * @opt:        parser configuration of option recognized
 50 * @value:      value about to be set for option
 51 * @data:       callback data
 52 * @state:      flags indicating the state of option parsing.
 53 *
 54 * Return: 0 is parsing must continue, -1 if error has been detect and
 55 * parsing must stop.
 56 */
 57static
 58int parse_option_cb(const struct mm_arg_opt* opt, union mm_arg_val value,
 59                    void* data, int state)
 60{
 61	struct config* conf = data;
 62	(void)state;
 63
 64	switch (mm_arg_opt_get_key(opt)) {
 65	case 'n':
 66		if (value.ui < 1) {
 67			fprintf(stderr,
 68			        "Server must support at least 1 instance\n");
 69			return -1;
 70		}
 71
 72		// We don't set value here, since variable to set is already
 73		// configured in option setup ({.strptr = &cfg.ip})
 74		return 0;
 75
 76	case 'l':
 77		if (mm_check_access(value.str, F_OK) != 0) {
 78			fprintf(stderr,
 79			        "storage file %s does not exist\n",
 80			        value.str);
 81			return -1;
 82		}
 83
 84		conf->use_local_storage = value.str;
 85		return 0;
 86
 87	default:
 88		return 0;
 89	}
 90}
 91
 92
 93int main(int argc, char* argv[])
 94{
 95	int i, arg_index;
 96	struct mm_arg_parser parser = {
 97		.doc = LOREM_IPSUM,
 98		.args_doc = "[options] cmd argument\n[options] hello",
 99		.optv = cmdline_optv,
100		.num_opt = MM_NELEM(cmdline_optv),
101		.cb = parse_option_cb,
102		.cb_data = &cfg,
103		.execname = argv[0],
104	};
105
106
107	arg_index = mm_arg_parse(&parser, argc, argv);
108
109	fprintf(stdout, "options used:\n\tdetach_flag: %s\n\tinstance: %u\n"
110	        "\tserver address: %s\n\tuse local path: %s\n",
111	        cfg.detach_flag, cfg.num_instance,
112	        cfg.ip, cfg.use_local_storage);
113
114	fprintf(stdout, "Execute ");
115	for (i = arg_index; i < argc; i++)
116		fprintf(stdout, "%s ", argv[i]);
117
118	fputc('\n', stdout);
119
120	return EXIT_SUCCESS;
121}

pshared

pshared-common.h

 1/*
 2 * @mindmaze_header@
 3 */
 4#ifndef PSHARED_COMMON_H
 5#define PSHARED_COMMON_H
 6
 7#include <mmthread.h>
 8
 9#define SHM_CHILD_FD 3
10
11struct pshared_data {
12	mm_thr_mutex_t mutex;
13	int len;
14	char text[1024];
15	mm_thr_mutex_t notif_mtx;
16	mm_thr_cond_t notif_cond;
17	int start;
18};
19
20#endif /* ifndef PSHARED_COMMON_H */

pshared-child.h

  1/*
  2 * @mindmaze_header@
  3 */
  4
  5/* process shared data: child program
  6 *
  7 * example of child process, to be used by pshared-parent.c example.
  8 *
  9 * Similar to the multithreaded data write example, this program writes to a
 10 * shared text (in shared memory) concurrently to other children of the
 11 * parent process. Each child maps into memory a file descriptor
 12 * (SHM_CHILD_FD) inherited from parent and initialized there.  The child
 13 * tries to write its identification string ("|-child-X+|") onto a text
 14 * field of the shared memory. The text after update of several child looks
 15 * something like:
 16 *
 17 *     ...|+child-Z+||+child-W+||+child-X+||+child-Y+|...
 18 *
 19 * Because of the concurrent access, the children use a process shared mutex
 20 * mapped in the shared memory. They can recover from a child dying while
 21 * owning the mutex. Put simulate this, the SEGFAULT_IN_CHILD environment
 22 * variable can be set. If a child process see its identification string
 23 * ("child-X" for Xth child created), it will provoke a segfault while
 24 * updating the text.
 25 *
 26 * This file demonstrates how to:
 27 *  - map file into memory
 28 *  - use process shared mutex
 29 */
 30
 31#define MM_LOG_MODULE_NAME "pshared-child"
 32
 33#include <stdlib.h>
 34#include <string.h>
 35#include <stdio.h>
 36#include <stdbool.h>
 37#include <mmthread.h>
 38#include <mmsysio.h>
 39#include <mmerrno.h>
 40#include <mmlib.h>
 41
 42#include "pshared-common.h"
 43
 44#define BAD_ADDR (void*)0xDEADBEEF
 45
 46
 47static
 48void handle_notif_lock_retval(int lockret, struct pshared_data* psh_data)
 49{
 50	// By far the most usual case. We simply got the lock, nothing fancy
 51	// has happened.
 52	if (lockret == 0)
 53		return;
 54
 55	// Contrary to psh_data->mutex, there is no shared state to recover
 56	// with psh_data->notif_mtx. Simply mark it consistent
 57	if (lockret == EOWNERDEAD)
 58		mm_thr_mutex_consistent(&psh_data->notif_mtx);
 59
 60	if (lockret == ENOTRECOVERABLE)
 61		exit(EXIT_FAILURE);
 62}
 63
 64
 65/*
 66 * Wait that parent notifies to start, ie, mark psh_data->start = 1 and
 67 * broadcast psh_data->notif_cond.
 68 */
 69static
 70void wait_start_notification(struct pshared_data* psh_data)
 71{
 72	int lockret;
 73
 74	lockret = mm_thr_mutex_lock(&psh_data->notif_mtx);
 75	handle_notif_lock_retval(lockret, psh_data);
 76
 77	while (!psh_data->start) {
 78		lockret = mm_thr_cond_wait(&psh_data->notif_cond,
 79		                           &psh_data->notif_mtx);
 80		handle_notif_lock_retval(lockret, psh_data);
 81	}
 82
 83	mm_thr_mutex_unlock(&psh_data->notif_mtx);
 84}
 85
 86
 87/*
 88 * This function do the update of the shared text. It must be called while
 89 * holding the lock, ie, called from write_shared_data(). It happens the
 90 * string |+@id_str+| to the text field in @psh_data.
 91 *
 92 * If requested, this function will provoke a segfault in the middle of
 93 * string appending.
 94 */
 95static
 96void write_shared_text_locked(struct pshared_data* psh_data, const char* id_str,
 97                              bool provoke_segfault)
 98{
 99	int id_str_len = strlen(id_str);
100
101	// Add "|+" in the text
102	psh_data->text[psh_data->len++] = '|';
103	psh_data->text[psh_data->len++] = '+';
104
105	// Append process identifier on text (psh_data->len not updated yet)
106	memcpy(psh_data->text + psh_data->len, id_str, id_str_len);
107
108	// Segfaulting here is a good place for demonstration purpose:
109	// psh_data->len will be not consistent with the null-terminated
110	// string in psh_data->text
111	if (provoke_segfault)
112		strcpy(psh_data->text + psh_data->len, BAD_ADDR);
113
114	// Now update psh_data->len
115	psh_data->len += id_str_len;
116
117	// Add "+|" in the text
118	psh_data->text[psh_data->len++] = '+';
119	psh_data->text[psh_data->len++] = '|';
120}
121
122
123/*
124 * Function to recover shared state from the situation where the previous
125 * owner died while holding the lock.
126 */
127static
128void recover_shared_text_from_owner_dead(struct pshared_data* psh_data)
129{
130	int len = psh_data->len;
131	char* text = psh_data->text;
132
133	// Find index of text immediately after the last occurrence of "+|"
134	while (len > 0) {
135		if ((len > 2) && (text[len-2] == '+') && (text[len-1] == '|'))
136			break;
137
138		len--;
139	}
140
141	// Crop string and set to the proper found length
142	text[len] = '\0';
143	psh_data->len = len;
144}
145
146
147
148static
149void write_shared_data(struct pshared_data* psh_data, const char* id_str,
150                       bool provoke_segfault)
151{
152	int r;
153
154	// Get the shared lock. Since we are using a process shared mutex,
155	// we must check return value of the lock operation: If the previous
156	// owner has died while owning it, it will be only occasion to know
157	// about it and recover from this if we want to continue using it.
158	r = mm_thr_mutex_lock(&psh_data->mutex);
159	if (r == EOWNERDEAD) {
160		// We have the lock, but it is time to recover state since
161		// previous owner died while holding the lock
162		recover_shared_text_from_owner_dead(psh_data);
163
164		// We have recovered the shared state, so we can mark lock as
165		// consistent. After this, we will be back to normal operation
166		mm_thr_mutex_consistent(&psh_data->mutex);
167	} else if (r == ENOTRECOVERABLE) {
168		// There has been an lock owner that has died and the next
169		// owner failed (or refused) to mark lock as consistent,
170		// thus rendering the lock unusable. This provokes all
171		// waiters for the lock to be waken up and ENOTRECOVERABLE
172		// is returned. Any new attempt to lock the mutex will return
173		// ENOTRECOVERABLE (until it is deinit and init again).
174
175		// So now, we don't have the lock and we can only stop
176		return;
177	}
178
179	write_shared_text_locked(psh_data, id_str, provoke_segfault);
180
181	mm_thr_mutex_unlock(&psh_data->mutex);
182}
183
184
185int main(int argc, char* argv[])
186{
187	int mflags;
188	bool must_segfault;
189	struct pshared_data* psh_data = NULL;
190	const char* proc_string;
191
192	// identifier of process is passed in the first argument
193	if (argc < 2) {
194		fprintf(stderr, "%s is missing argument\n", argv[0]);
195		return EXIT_FAILURE;
196	}
197
198	proc_string = argv[1];
199
200	// Map shared memory object onto memory.  We know that child is
201	// created with shared memrory file descriptor inherited at
202	// SHM_CHILD_FD
203	mflags = MM_MAP_SHARED|MM_MAP_READ|MM_MAP_WRITE;
204	psh_data = mm_mapfile(SHM_CHILD_FD, 0, sizeof(*psh_data), mflags);
205	if (!psh_data) {
206		mm_print_lasterror("mm_mapfile(%i, ...) failed", SHM_CHILD_FD);
207		return EXIT_FAILURE;
208	}
209
210	// Close SHM_CHILD_FD because now that it is mapped, we don't need
211	// it any longer
212	mm_close(SHM_CHILD_FD);
213
214	// Get from environment if this particular instance must simulate a
215	// segfault while holding the lock.
216	must_segfault = false;
217	if (!strcmp(mm_getenv("SEGFAULT_IN_CHILD", ""), proc_string))
218		must_segfault = true;
219
220	// Wait until parent notify to start
221	wait_start_notification(psh_data);
222
223	// Try to update shared text.
224	write_shared_data(psh_data, proc_string, must_segfault);
225
226	mm_unmap(psh_data);
227	return EXIT_SUCCESS;
228}

pshared-parent.h

  1/*
  2 * @mindmaze_header@
  3 */
  4
  5/* process shared data: parent program
  6 *
  7 * example of parent process, child is implemented in pshared-child.c.
  8 *
  9 * This program writes to a shared text (in shared memory) concurrently to
 10 * other children of the parent process. Each child maps into memory a file
 11 * descriptor (SHM_CHILD_FD) inherited from parent and initialized there.
 12 * The child tries to write its identification string ("|-child-X+|") onto a
 13 * text field of the shared memory. The text after update of several child
 14 * looks something like:
 15 *
 16 *     ...|+child-Z+||+child-W+||+child-X+||+child-Y+|...
 17 *
 18 * Because of the concurrent access, the children use a process shared mutex
 19 * mapped in the shared memory. They can recover from a child dying while
 20 * owning the mutex. Put simulate this, the SEGFAULT_IN_CHILD environment
 21 * variable can be set. If a child process see its identification string
 22 * ("child-X" for Xth child created), it will provoke a segfault while
 23 * updating the text.
 24 *
 25 * Note on how to execute the program:
 26 * If left untouched, the program assumes that child executable is available
 27 * in the _current_ directory. Also it assumes that mmlib shared library is
 28 * accessible at runtime.
 29 *
 30 * This file demonstrates how to:
 31 *  - create an anonymous shared memory object
 32 *  - map file into memory
 33 *  - initialize process shared mutex
 34 *  - create child process with passing file descriptor to them
 35 */
 36#define MM_LOG_MODULE_NAME "pshared-parent"
 37
 38#include <stdlib.h>
 39#include <stdio.h>
 40#include <mmthread.h>
 41#include <mmsysio.h>
 42#include <mmpredefs.h>
 43#include <mmerrno.h>
 44#include <mmlib.h>
 45
 46#include "pshared-common.h"
 47
 48
 49#ifdef _WIN32
 50#  define       BINEXT ".exe"
 51#else
 52#  define       BINEXT
 53#endif
 54
 55#define NUM_CHILD 6
 56#define PSHARED_CHILD_BIN "./pshared-child" BINEXT
 57
 58/*
 59 * Create, map into memory and initialize the data that will shared with the
 60 * children. The process shared mutex is initialized here.
 61 */
 62static
 63struct pshared_data* init_shared_mem_data(int* shm_fd)
 64{
 65	int fd, mflags;
 66	struct pshared_data* psh_data = NULL;
 67
 68	// Create an new anonymous shared memory object. We could have use a
 69	// normal file (with mm_open()) without changing of the rest of the
 70	// following if we wanted to keep the result of memory access on the
 71	// shared memory.
 72	fd = mm_anon_shm();
 73	if (fd < 0)
 74		return NULL;
 75
 76	// Size it to accommodate the data that will be shared between
 77	// parent and children.
 78	if (mm_ftruncate(fd, sizeof(*psh_data)))
 79		goto failure;
 80
 81	// Map shared memory object onto memory
 82	mflags = MM_MAP_SHARED|MM_MAP_READ|MM_MAP_WRITE;
 83	psh_data = mm_mapfile(fd, 0, sizeof(*psh_data), mflags);
 84	if (!psh_data)
 85		goto failure;
 86
 87	// Reset the while content of structure to 0/NULL fields
 88	*psh_data = (struct pshared_data) {.start = 0};
 89
 90	// Initialize synchronization primitives of shared data
 91	if (mm_thr_mutex_init(&psh_data->mutex, MM_THR_PSHARED)
 92	    || mm_thr_mutex_init(&psh_data->notif_mtx, MM_THR_PSHARED)
 93	    || mm_thr_cond_init(&psh_data->notif_cond, MM_THR_PSHARED))
 94		goto failure;
 95
 96	*shm_fd = fd;
 97	return psh_data;
 98
 99failure:
100	mm_close(fd);
101	return NULL;
102}
103
104
105/*
106 * Starts all children process ensuring that they inherit of the shared
107 * memory file descriptor. Pass the string identify a particular process
108 * instance as the first argument.
109 */
110static
111int spawn_children(int shm_fd, int num_child, mm_pid_t* children)
112{
113	int i;
114	char process_identifier[32];
115	char* argv[] = {PSHARED_CHILD_BIN, process_identifier, NULL};
116	struct mm_remap_fd fd_map = {
117		.child_fd = SHM_CHILD_FD,
118		.parent_fd = shm_fd,
119	};
120
121	for (i = 0; i < num_child; i++) {
122		// Set the process identifier (it is just a string to
123		// identify which child process is running). This string is
124		// already set as second element in argv, ie, the first
125		// argument
126		sprintf(process_identifier, "child-%i", i);
127
128		// Spawn the process
129		if (mm_spawn(&children[i], argv[0], 1, &fd_map, 0, argv, NULL))
130			return -1;
131	}
132
133	return 0;
134}
135
136
137static
138int wait_children_termination(int num_child, const mm_pid_t* children)
139{
140	int i;
141
142	for (i = 0; i < num_child; i++) {
143		if (mm_wait_process(children[i], NULL))
144			return -1;
145	}
146
147	return 0;
148}
149
150
151static
152void broadcast_start_notification(struct pshared_data* psh_data)
153{
154	int lockret;
155
156	// We want a worker thread to be be scheduled in a predictable way,
157	// so we must own shdata->notif_mtx when calling
158	// mm_thr_cond_broadcast()
159	lockret = mm_thr_mutex_lock(&psh_data->notif_mtx);
160	if (lockret == ENOTRECOVERABLE)
161		return;
162
163	if (lockret == EOWNERDEAD)
164		mm_thr_mutex_consistent(&psh_data->notif_mtx);
165
166	psh_data->start = 1;
167	mm_thr_cond_broadcast(&psh_data->notif_cond);
168
169	mm_thr_mutex_unlock(&psh_data->notif_mtx);
170}
171
172
173int main(void)
174{
175	mm_pid_t children[NUM_CHILD];
176	int shm_fd = -1;
177	struct pshared_data* psh_data = NULL;
178	int exitcode = EXIT_FAILURE;
179
180	fprintf(stderr, "SEGFAULT_IN_CHILD=%s\n",
181	        mm_getenv("SEGFAULT_IN_CHILD", ""));
182
183	// Create a shared memory object with the right size and map into
184	// memory
185	psh_data = init_shared_mem_data(&shm_fd);
186	if (!psh_data)
187		goto exit;
188
189	// Create the children inheriting the shared memory object
190	if (spawn_children(shm_fd, MM_NELEM(children), children))
191		goto exit;
192
193	// Close shm_fd because now that it is mapped, and transmitted to
194	// children, we don't need its file descriptor.
195	mm_close(shm_fd);
196	shm_fd = -1;
197
198	broadcast_start_notification(psh_data);
199
200	wait_children_termination(MM_NELEM(children), children);
201	exitcode = EXIT_SUCCESS;
202
203exit:
204	if (exitcode == EXIT_FAILURE)
205		mm_print_lasterror("pshared-parent failed");
206	else
207		printf("result string:%s\n", psh_data->text);
208
209	mm_close(shm_fd);
210	mm_unmap(psh_data);
211	return exitcode;
212}