/* * Active Router Transport Protocol (ARTP) implementation * Copyright (c) 2004, Tomas Rebok * All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the "BSD License" which is * distributed with the software in the file LICENSE. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the BSD * License for more details. */ /** @file * Active router transport protocol's (@c ARTP) main library. * @author Tomas Rebok * @date 2004 */ #include #include #include #include #include #include #include #include #include #include #include "artp.h" #include "structs.h" #include "options.h" #include "setting.h" #include "config.h" #include "rbuffers.h" #include "sbuffers.h" #include "abuffers.h" #include "rwlocks.h" #include "net.h" /** the array of pointers to sessions information */ static struct session_item **sessions_info; /** the sessions array size */ static int sessions_count = 0; /** socket that the protocol will use */ static int sckt; /** global session identification numbers' counter */ static SID_TYPE global_sid; /** mutual exclusion for getting available session identification number */ static pthread_mutex_t global_sid_mutex; /* some couners for readers/writers locking */ /** writer threads count */ static int wcount; /** reader threads count */ static int rcount; /** readers/writers necessary mutexes */ static pthread_mutex_t x; static pthread_mutex_t y; static pthread_mutex_t z; static pthread_mutex_t wsem; static pthread_mutex_t rsem; /** the beginning of the structure for undeliverable non-established sessions */ static struct dead_zero_sessions *dead_zero_start; /** the end of the structure for undeliverable non-established sessions */ static struct dead_zero_sessions *dead_zero_end; /** mutex used for mutual exclusion for reading the structure for undeliverable * non-established sessions */ static pthread_mutex_t dead_zero_start_mutex; /** Find out session's position in array of pointers to sessions. * This function searches the position of given session in array of pointers * to sessions. The searched session is given by its session identification * number * and its receiver. The searching is first made by searching using the session * identification number (made by interval halfing). When the proper session * identification number is found then the linear seaching takes place to find * the proper session by session receiver. * If wanted, the pointer to found session could be returned (and the counter of * pointers to that session will be increased). * * @param sid * searched session's identification number * * @param receiver * the pointer to the place where session's receiver is stored * * @param session * the relevant pointer which would be moved to the place where the found * session's information is stored (if wanted) * * @param get_reference * indicates whether we have to return the pointer to found session or * not (0 = do not return, 1 = return). * * @return above zero * session was found. Returned number is the position of searched session * in array of pointers to sessions. * * @return zero * session wasn't found. Returned number (or pointer, too) is the position * of session for non-established sessions. * * @return below zero * related error code if something failed (for further information see * documentation of file @c errors.h). */ static int get_session(SID_TYPE sid, struct sockaddr *receiver, struct session_item **session, int get_reference) { int l; int r; int m; int position; /* is there any session created? */ if (sessions_count == 0) return 0; /* is there only one session? */ if (sessions_count == 1) { if (get_reference) *session = sessions_info[0]; return 0; } l = 1; r = sessions_count - 1; m = (l + r) / 2; /* lock access to sessions array for reading */ READERS_LOCK(x, y, z, wsem, rsem, wcount, rcount); /* search using session identification number */ while ((sessions_info[m]->session_sid != sid) && (l < r)) { if (sessions_info[l]->session_sid == sid) { m = l; continue; } if (sessions_info[r]->session_sid == sid) { m = r; continue; } if (sessions_info[m]->session_sid < sid) l = m; else r = m; m = (l + r) / 2; } /* the proper session wasn't found */ if (sessions_info[m]->session_sid != sid) { if (get_reference) *session = sessions_info[0]; /* end of lock for readers */ READERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return 0; } else if (rcvrcmp((struct sockaddr *) &(sessions_info[m]->session_receiver), receiver) == 0) { /* the searched session is on the found position */ if (get_reference) { *session = sessions_info[m]; /* increase pointers counter */ pthread_mutex_lock(&sessions_info[m]->ref_counter_mutex); sessions_info[m]->ref_counter++; pthread_mutex_unlock(&sessions_info[m]->ref_counter_mutex); } /* end of readers lock */ READERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return m; } /* search in the left half from found position */ position = m - 1; while ((position > 0) && (sessions_info[position]->session_sid == sid)) { if (rcvrcmp((struct sockaddr *) &(sessions_info[position]-> session_receiver), receiver) == 0) { if (get_reference) { *session = sessions_info[position]; /* increase pointers counter */ pthread_mutex_lock(&sessions_info[position]->ref_counter_mutex); sessions_info[position]->ref_counter++; pthread_mutex_unlock( &sessions_info[position]->ref_counter_mutex); } /* end of readers lock */ READERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return position; } --position; } /* search in the right half from found position */ position = m + 1; while ((position < sessions_count) && (sessions_info[position]->session_sid == sid)) { if (rcvrcmp((struct sockaddr *) &(sessions_info[position]-> session_receiver), receiver) == 0) { if (get_reference) { *session = sessions_info[position]; /* increase pointers counter */ pthread_mutex_lock(&sessions_info[position]->ref_counter_mutex); sessions_info[position]->ref_counter++; pthread_mutex_unlock( &sessions_info[position]->ref_counter_mutex); } /* end of readers lock */ READERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return position; } ++position; } /* the searched session wasn't found */ if (get_reference) *session = sessions_info[0]; /* end of lock for readers */ READERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return 0; } /** Sort sessions. * This function sorts last session to its proper position (depending on its * identification number). * * @return zero * success. * * @return nonzero * related error code if something failed (for further information see * documentation of file @c errors.h). */ static int sort_sessions(void) { struct session_item *temp; int i; temp = sessions_info[sessions_count - 1]; i = sessions_count - 1; while ((i > 1) && (sessions_info[i-1]->session_sid > temp->session_sid)) { sessions_info[i] = sessions_info[i-1]; i--; } sessions_info[i] = temp; return 0; } /** Destroy reference to given session. * This function removes pointer to given session (it decreases the counter of * references in given session). If this counter is equal to 0, the session * is deleted. * * @param session * the pointer to the place where session information is stored * * @return zero * success. * * @return nonzero * related error code if something failed (for further information see * documentation of file @c errors.h). */ static int destroy_reference(struct session_item *session) { int i; /* check whether we don't want to remove non-established session */ if (session->session_type == NON_EST) return E_NONEST_SESSION; /* decrease the reference counter */ pthread_mutex_lock(&session->ref_counter_mutex); session->ref_counter--; pthread_mutex_unlock(&session->ref_counter_mutex); /* check if the reference counter is equal to 0. If so, remove the * session. */ if (session->ref_counter == 0) { /* delete all its structures */ sbuffers_destroy(session->buffers_id); rbuffers_destroy(session->buffers_id); abuffers_destroy(session->buffers_id); /* remove structure for options */ for (i = 0; i < OPTIONS_COUNT; i++) { free(session->options[i]); session->options[i] = NULL; } /* remove structure for partner options */ for (i = 0; i < OPTIONS_COUNT; i++) { free(session->partner_options[i]); session->partner_options[i] = NULL; } /* destroy all mutexes */ pthread_mutex_destroy(&session->session_mutex); pthread_mutex_destroy(&session->ref_counter_mutex); /* deallocate session */ free(session); session = NULL; } return 0; } int artp_prepare_connection(SID_TYPE sid, struct sockaddr *receiver) { int i; /* check whether wanted session already exists */ if (get_session(sid, receiver, NULL, 0) != 0) return E_SESSION_EXISTS; /* lock the session pointers array for writing */ WRITERS_LOCK(x, y, z, wsem, rsem, wcount, rcount); /* resize the sessions array to proper size */ if ((sessions_info = (struct session_item **) realloc(sessions_info, (sessions_count + 1) * sizeof(struct session_item *))) == NULL) { /* end of writer lock */ WRITERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return E_MEMORY_FAIL; } else ++sessions_count; /* allocate session structure for this new session */ if ((sessions_info[sessions_count - 1] = (struct session_item *) malloc(sizeof(struct session_item))) == NULL) { /* end of writer lock */ WRITERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); /* decrease the array size and reallocate it */ --sessions_count; sessions_info = (struct session_item **) realloc(sessions_info, sessions_count * sizeof(struct session_item *)); return E_MEMORY_FAIL; } /* copy some necessary information */ sessions_info[sessions_count - 1]->session_sid = sid; rcvrcpy((struct sockaddr *) &(sessions_info[sessions_count - 1]-> session_receiver), receiver); /* create all session buffers and mutexes */ if (((sessions_info[sessions_count - 1]->buffers_id = sbuffers_create(receiver)) < 0) || (rbuffers_create(sessions_info[sessions_count - 1]->buffers_id) < 0) || (abuffers_create(sessions_info[sessions_count - 1]->buffers_id, sid, receiver) < 0) || (pthread_mutex_init(&sessions_info[sessions_count - 1]-> ref_counter_mutex, NULL) != 0) || (pthread_mutex_init(&sessions_info[sessions_count - 1]-> session_mutex, NULL) != 0)) { /* creating of buffers or mutexes failed. Exiting. */ sbuffers_destroy(sessions_info[sessions_count - 1]->buffers_id); rbuffers_destroy(sessions_info[sessions_count - 1]->buffers_id); abuffers_destroy(sessions_info[sessions_count - 1]->buffers_id); /* unallocate session */ free(sessions_info[sessions_count - 1]); sessions_info[sessions_count - 1] = NULL; /* decrease the array size and reallocate it */ --sessions_count; sessions_info = (struct session_item **) realloc(sessions_info, sessions_count * sizeof(struct session_item *)); /* end of writer lock */ WRITERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return E_MEMORY_FAIL; } /* initialize all its parameters */ /* set the initial packets' sequence number to random value */ srandom(time(NULL)); sessions_info[sessions_count - 1]->current_seq = random(); sessions_info[sessions_count - 1]->retries_timeout = global_setting.default_retries_timeout; sessions_info[sessions_count - 1]->max_acks_count = global_setting.default_max_acks_count; sessions_info[sessions_count - 1]->mss = global_setting.initial_mss; sessions_info[sessions_count - 1]->cwnd = INITIAL_WINDOW_SIZE(sessions_info[sessions_count - 1]->mss); sessions_info[sessions_count - 1]->flight = 0; sessions_info[sessions_count - 1]->rtt = 0; sessions_info[sessions_count - 1]->srtt = 0; sessions_info[sessions_count - 1]->rto = global_setting.initial_rto_time; sessions_info[sessions_count - 1]->ts_delta = 0; sessions_info[sessions_count - 1]->session_status = LIVE; sessions_info[sessions_count - 1]->last_send_time = 0; sessions_info[sessions_count - 1]->ref_counter = 1; sessions_info[sessions_count - 1]->sbuffer_max_size = global_setting.initial_sbuffers_max_size; sessions_info[sessions_count - 1]->rbuffer_max_size = global_setting.initial_rbuffers_max_size; sessions_info[sessions_count - 1]->rbuffer_red_limit = global_setting.initial_rbuffers_red_limit; sessions_info[sessions_count - 1]->rbuffer_red_prob = global_setting.default_red_drop_probability; /* set all session options to NULL */ for (i = 0; i < OPTIONS_COUNT; i++) sessions_info[sessions_count - 1]->options[i] = NULL; /* set all session's partner options to NULL */ for (i = 0; i < OPTIONS_COUNT; i++) sessions_info[sessions_count - 1]->partner_options[i] = NULL; if ((sessions_count != 1) && (set_default_options(sessions_info[sessions_count - 1]) != 0)) return E_MEMORY_FAIL; if (sessions_count == 1) { /* we are creating the buffer for non-established sessions */ sessions_info[sessions_count - 1]->session_type = NON_EST; sessions_info[sessions_count - 1]->expiration_time = 0; } else { /* we are creating the buffer for established sessions */ sessions_info[sessions_count - 1]->session_type = EST; sessions_info[sessions_count - 1]->expiration_time = global_setting.initial_exp_time; } /* sort the sessions array */ sort_sessions(); /* end of writer lock */ WRITERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return 0; } int artp_destroy_connection(SID_TYPE sid, struct sockaddr *receiver) { int position; int i; /* check whether we want to remove proper (previously established) * session. */ if ((position = get_session(sid, receiver, NULL, 0)) == 0) return E_NONEST_SESSION; /* lock sessions array for writing */ WRITERS_LOCK(x, y, z, wsem, rsem, wcount, rcount); /* remove reference for that session */ destroy_reference(sessions_info[position]); /* delete the session and adjust the array's size */ for (i = position; i < sessions_count - 1; i++) sessions_info[i] = sessions_info[i + 1]; sessions_info[sessions_count - 1] = NULL; sessions_info = (struct session_item **) realloc(sessions_info, (--sessions_count) * sizeof(struct session_item *)); /* end of writer lock */ WRITERS_UNLOCK(x, y, z, wsem, rsem, wcount, rcount); return 0; } /** Send out next packet. * This function tries to send next packet from given session buffer of * packets waiting for its sending. It checks the current congestion window * size and decides whether to send that packet or not. If the packet was sent, * it adjusts the current window size. * * @param session * the pointer to the place where session information is saved * * @return zero * success. * * @return nonzero * related error code if something failed (for further information see * documentation of file @c errors.h). */ static int send_out_next_packet(struct session_item *session) { char *bitstream; int size; int sendlen; int retval; int position; struct timeval current_time; struct sockaddr *receiver; double time; double partner_time; double retransmit_time; /* check whether there's any packet available for given session */ if (sbuffers_get_packet(session->buffers_id, &bitstream, &size, &receiver) != 0) return E_EMPTY_BUFFER; /* if it's the established session, check whether the congestion window * is full or not. */ if ((session->session_type != NON_EST) && (session->flight + size > session->cwnd)) { return E_FULL_CWND; } gettimeofday(¤t_time, 0); time = current_time.tv_sec + current_time.tv_usec / 1000000.0; /* save into acquired packet session partner time */ partner_time = time - session->ts_delta; position = 1 + sizeof(SID_TYPE) + sizeof(OPTSZ_TYPE) + sizeof(SEQ_TYPE); *((TS_TYPE *) (bitstream + position)) = htonl((TS_TYPE) partner_time); position += sizeof(TS_TYPE); *((TS_TYPE *) (bitstream + position)) = htonl((TS_TYPE) ((partner_time - ((TS_TYPE) partner_time)) * 1000000)); position += sizeof(TS_TYPE); /* if we don't know the difference between our time and our partner's time, * mark packets as unexpirable. Otherwise insert the right expiration time * into packets. */ if ((session->ts_delta == 0) && (session->srtt == 0)) *((TS_TYPE *) (bitstream + position)) = htonl(0); else *((TS_TYPE *) (bitstream + position)) = htonl(session->expiration_time); /* try to send this packet */ sendlen = sendto(sckt, bitstream, size, 0, receiver, rcvrsz(receiver)); if (sendlen == size) { /* successfully sent */ retransmit_time = time + session->rto; /* if the link was idle more than defined time, decrease the * congestion window size */ if (time - session->last_send_time > LINK_IDLE(session->rto)) session->cwnd = INITIAL_WINDOW_SIZE(session -> mss); /* update time of last sending */ session->last_send_time = time; /* move packet to its new position in send buffer */ retval = sbuffers_send_event(session->buffers_id, bitstream, size, time, retransmit_time); if ((session->session_type != NON_EST) && (retval == 0)) { /* if successfull, increase the congestion's window size */ pthread_mutex_lock(&session->session_mutex); session->flight += size; pthread_mutex_unlock(&session->session_mutex); } } else return E_SENDING_ERROR; return 0; } /** Get ARTP packet header. * This function returns ARTP packet header. * * @param session * the pointer to the place where session information is saved * * @param sid * the session identification number (for non-established sessions) * * @param dgram * the pointer to the place where the datagram information is stored * (datagram whose header we are going to assemble) * * @param options * the pointer to the place where current session options are saved * * @param options_size * the options size * * @param header * the relevant pointer which would be moved to the place where the * packet header will be stored. * * @param header_length * the pointer to the place where packet header size will be stored. * * @param seq * the pointer to the place where packet sequence number could be * stored. * * @return zero * success. * * @return nonzero * related error code if something failed (for further information see * documentation of file @c errors.h). */ static int get_packet_header(struct session_item *session, SID_TYPE sid, struct artp_dgram *dgram, char *options, int options_size, char **header, int *header_length, SEQ_TYPE *seq) { int position; double expiration; struct timeval current_time; enum packet_type type; uint8_t temp; /* find out the packet header size and allocate a place for it */ *header_length = 1 + sizeof(SID_TYPE) + sizeof(OPTSZ_TYPE) + sizeof(SEQ_TYPE) + (3 * sizeof(TS_TYPE)) + options_size; if ((*header = (char *) malloc(*header_length * sizeof(char))) == NULL) return E_MEMORY_FAIL; if (dgram == NULL) /* we're sending ACK packet */ type = ACK; else /* we're sending DATA or CTRL packet */ type = dgram->type; /* if this is established session, find out the proper session * identification number. */ if (session->session_type != NON_EST) sid = session->session_sid; /* increase the sequence number, too */ pthread_mutex_lock(&session->session_mutex); *seq = session->current_seq++; pthread_mutex_unlock(&session->session_mutex); /* set expiration time, too */ expiration = session->expiration_time; gettimeofday(¤t_time, 0); /* create packet header */ temp = ARTP_VERSION; temp <<= 4; temp |= type; *(*header) = temp; position = 1; *((SID_TYPE *) *header + position) = sid; position += sizeof(SID_TYPE); *((OPTSZ_TYPE *) (*header + position)) = htons(options_size); position += sizeof(OPTSZ_TYPE); *((SEQ_TYPE *) (*header + position)) = htonl(*seq); position += sizeof(SEQ_TYPE); *((TS_TYPE *) (*header + position)) = htonl(current_time.tv_sec); position += sizeof(TS_TYPE); *((TS_TYPE *) (*header + position)) = htonl(current_time.tv_usec); position += sizeof(TS_TYPE); *((TS_TYPE *) (*header + position)) = htonl(expiration); position += sizeof(TS_TYPE); /* copy there given options */ memcpy(*header + position, options, options_size); return 0; } int artp_send_dgram(struct artp_dgram *dgram, SID_TYPE sid, struct sockaddr *receiver) { char *header; char *msg; char *bitstream; char *options; int current_mss; int header_size; int full_header_size; int msg_size; int options_size; int msg_fragment_size; FRAGMENTS_TYPE fragments_count; FRAGMENTS_TYPE i; int act_position; int msg_position; struct session_item *session; SEQ_TYPE act_seq; int buffers_id; int retval; unsigned long int buffer_size; CTRL_SZ_TYPE ctrl_struct_size; if (dgram == NULL) return E_BAD_DGRAM; /* find out the proper session */ get_session(sid, receiver, &session, 1); /* is session alive? */ if (session->session_status != LIVE) { destroy_reference(session); return E_DEAD_SESSION; } /* find out the actual maximum segment size and buffer id. This is done * because of somebody could change it during our process. */ current_mss = session->mss; buffers_id = session->buffers_id; if ((dgram->type == DATA) && (session->partner_options[MAX_DGRAM_LEN] != NULL) && (dgram->payload.data.sigsz + dgram->payload.data.encsz > *((MAX_DGRAM_LEN_TYPE *) session->partner_options[MAX_DGRAM_LEN]))) { /* we cannot send greater datagram than our partner wishes */ destroy_reference(session); return E_BIG_DGRAM; } /* get buffer size */ if ((retval = sbuffers_get_size(buffers_id, &buffer_size)) != 0) { destroy_reference(session); return retval; } /* check whether it's full */ if ((session->sbuffer_max_size != 0) && (buffer_size >= session->sbuffer_max_size)) { destroy_reference(session); return E_FULL_BUFFER; } /* obtain session options */ if ((retval = get_session_options(session, dgram, &options, &options_size)) != 0) { destroy_reference(session); return retval; } /* try to get packet header */ if ((retval = get_packet_header(session, sid, dgram, options, options_size, &header, &header_size, &act_seq)) != 0) { destroy_reference(session); return retval; } /* make packet payload */ switch (dgram->type) { case DATA: /* is the current maximum segment size big enough? */ if (current_mss <= header_size + sizeof(DSEQ_TYPE) + 2 * sizeof(FRAGMENTS_TYPE) + sizeof(SIGSZ_TYPE)) { destroy_reference(session); return E_SMALL_MSS; } /* count the total size of the MSG item */ msg_size = sizeof(SIGSZ_TYPE) + dgram->payload.data.sigsz + dgram->payload.data.encsz; /* allocate a space for it */ if ((msg = (char *) malloc(msg_size * sizeof(char))) == NULL) { destroy_reference(session); return E_MEMORY_FAIL; } /* copy there necessary information */ msg_position = 0; *((SIGSZ_TYPE *) (msg + msg_position)) = htonl(dgram->payload.data.sigsz); msg_position += sizeof(SIGSZ_TYPE); memcpy(msg + msg_position, dgram->payload.data.sigdata, dgram->payload.data.sigsz); msg_position += dgram->payload.data.sigsz; memcpy(msg + msg_position, dgram->payload.data.encdata, dgram->payload.data.encsz); msg_position += dgram->payload.data.encsz; /* find out the total header size (only for DATA dgrams) */ full_header_size = header_size + sizeof(DSEQ_TYPE) + 2 * sizeof(FRAGMENTS_TYPE); /* compute the fragment count */ msg_fragment_size = current_mss - full_header_size; fragments_count = msg_size / msg_fragment_size; if (msg_size % msg_fragment_size != 0) ++fragments_count; /* create all fragments */ for (i = 0; i < fragments_count; i++) { if (i == fragments_count - 1) { /* allocating last fragment */ if ((bitstream = (char *) malloc((full_header_size + (msg_size - i * msg_fragment_size)) * sizeof(char))) == NULL) { destroy_reference(session); return E_MEMORY_FAIL; } } else { /* allocating other fragments */ if ((bitstream = (char *) malloc(current_mss * sizeof(char))) == NULL) { destroy_reference(session); return E_MEMORY_FAIL; } } /* copy necessary information to each packet */ memcpy(bitstream, header, header_size); act_position = header_size; *((DSEQ_TYPE *) (bitstream + act_position)) = htonl(dgram->payload.data.dseq); act_position += sizeof(DSEQ_TYPE); *((FRAGMENTS_TYPE *) (bitstream + act_position)) = htons(i + 1); act_position += sizeof(FRAGMENTS_TYPE); *((FRAGMENTS_TYPE *) (bitstream + act_position)) = htons(fragments_count); act_position += sizeof(FRAGMENTS_TYPE); /* unallocate header (we'll allocate new one) */ free(header); header = NULL; if (i != fragments_count - 1) { /* we're sending full packet */ memcpy(bitstream + full_header_size, msg + i * msg_fragment_size, msg_fragment_size); sbuffers_add_packet(buffers_id, receiver, sid, bitstream, current_mss, act_seq); /* find out next header */ if ((retval = get_packet_header(session, sid, dgram, options, options_size, &header, &header_size, &act_seq)) != 0) { destroy_reference(session); return retval; } } else { /* we're sending smaller packet than the full one. * It's the last one. */ memcpy(bitstream + full_header_size, msg + i * msg_fragment_size, msg_size - i * msg_fragment_size); sbuffers_add_packet(buffers_id, receiver, sid, bitstream, full_header_size + msg_size - i * msg_fragment_size, act_seq); } /* try to send out next packet */ send_out_next_packet(session); } /* unallocate space allocated for message */ free(msg); msg = NULL; break; case CTRL: /* we're sending control dgram */ /* allocate space for it */ if ((bitstream = (char *) malloc(current_mss * sizeof(char))) == NULL) { destroy_reference(session); return E_MEMORY_FAIL; } /* copy its header there */ memcpy(bitstream, header, header_size); act_position = header_size; /* copy all structures into it. If there isn't space enough, send * this control packet and make the other one. */ for (i = 0; i < dgram->payload.ctrl.count; i++) { /* current control structure size */ ctrl_struct_size = sizeof(CTRL_SZ_TYPE) + 1 + sizeof(CTRL_OPTID_TYPE) + dgram->payload.ctrl.control[i].valuesize; /* check whether there's enough space for it */ if (header_size + ctrl_struct_size > current_mss) { destroy_reference(session); return E_BIG_DGRAM; } /* it there's no other space in currently filled packet, * send it and create the new one */ if (act_position + ctrl_struct_size > current_mss) { sbuffers_add_packet(buffers_id, receiver, sid, bitstream, act_position, act_seq); /* try to send out next packet */ send_out_next_packet(session); if ((bitstream = (char *) malloc(current_mss * sizeof(char))) == NULL) { destroy_reference(session); return E_MEMORY_FAIL; } /* unallocate header (we'll create the new one) */ free(header); header = NULL; /* try to create new header */ if ((retval = get_packet_header(session, sid, dgram, options, options_size, &header, &header_size, &act_seq)) != 0) { destroy_reference(session); return retval; } /* copy header into packet */ memcpy(bitstream, header, header_size); act_position = header_size; } /* insert the control structure into packet */ *((CTRL_SZ_TYPE *) (bitstream + act_position)) = htons(ctrl_struct_size); act_position += sizeof(CTRL_SZ_TYPE); bitstream[act_position] = dgram->payload.ctrl.control[i].type; act_position += 1; *((CTRL_OPTID_TYPE *) (bitstream + act_position)) = dgram->payload.ctrl.control[i].optid; act_position += sizeof(CTRL_OPTID_TYPE); memcpy(bitstream + act_position, dgram->payload.ctrl.control[i].value, dgram->payload.ctrl.control[i].valuesize); act_position += dgram->payload.ctrl.control[i].valuesize; } /* save packet into buffer */ sbuffers_add_packet(buffers_id, receiver, sid, bitstream, act_position, act_seq); /* try to send out next packet */ send_out_next_packet(session); /* free space allocated for header */ free(header); header = NULL; break; default: return E_BAD_PACKET_TYPE; break; } /* remove reference for this session */ destroy_reference(session); /* unallocate space allocated for session options */ free(options); options = NULL; return 0; } /** Parse control dgram's payload. * This function parses control dgram's payload into ARTP control structure. * * @param stream * the pointer to the place where the control dgram's payload is stored * (this stream will be parsed). * * @param stream_size * the size of control dgram's payload. * * @param ctrl * the pointer to the place where parsed control dgram's structure could * be saved. * * @return zero * success. * * @return nonzero * related error code if something failed (for further information see * documentation of file @c errors.h). */ static int parse_controls(char *stream, int stream_size, struct payload_CTRL *ctrl) { CTRL_VALUE_SZ_TYPE position; CTRL_SZ_TYPE size; int count; int i; position = 0; count = 0; /* compute the control options count */ while (position < stream_size) { position += ntohs(*((CTRL_SZ_TYPE *) (stream + position))); ++count; } /* allocate space for it */ if ((ctrl->control = (struct control_type *) malloc(count * sizeof(struct control_type))) == NULL) return E_MEMORY_FAIL; ctrl->count = count; /* parse that options */ position = 0; for (i = 0; i < ctrl->count; i++) { size = ntohs(*((CTRL_SZ_TYPE *) (stream + position))); position += sizeof(CTRL_SZ_TYPE); ctrl->control[i].type = stream[position]; position += 1; ctrl->control[i].optid = *((CTRL_OPTID_TYPE *) (stream + position)); position += sizeof(CTRL_OPTID_TYPE); ctrl->control[i].valuesize = size - sizeof(CTRL_SZ_TYPE) - 1 - sizeof(CTRL_OPTID_TYPE); if ((ctrl->control[i].value = (char *) malloc(sizeof(char)* (ctrl->control[i].valuesize))) == NULL) { return E_MEMORY_FAIL; } memcpy(ctrl->control[i].value, stream + position, ctrl->control[i].valuesize); position += ctrl->control[i].valuesize; } return 0; } /** Parse data dgram's payload. * This function parses data dgram's payload into ARTP data structure. * * @param stream * the pointer to the place where the data dgram's payload is stored * (this stream will be parsed). * * @param stream_size * the size of data dgram's payload. * * @param data * the pointer to the place where parsed data dgram's structure could * be saved. * * @return zero * success. * * @return nonzero * related error code if something failed (for further information see * documentation of file @c errors.h). */ static int parse_data(char *stream, int stream_size, struct payload_DATA *data) { ENCDATA_SIZE_TYPE position; position = 0; /* parse given data dgram */ data->sigsz = ntohl(*((SIGSZ_TYPE *) (stream))); position = sizeof(SIGSZ_TYPE); if ((data->sigdata = (char *) malloc(sizeof(char) * (data->sigsz))) == NULL) { return E_MEMORY_FAIL; } memcpy(data->sigdata, stream + position, data->sigsz); position += data->sigsz; data->encsz = stream_size - position; if ((data->encdata = (char *) malloc(sizeof(char) * (data->encsz))) == NULL) { return E_MEMORY_FAIL; } memcpy(data->encdata, stream + position, data->encsz); return 0; } int artp_free_dgram(struct artp_dgram *dgram) { int i; /* unallocate space occupied by this dgram depending on it's type */ if (dgram->type == DATA) { free(dgram->payload.data.sigdata); dgram->payload.data.sigdata = NULL; free(dgram->payload.data.encdata); dgram->payload.data.encdata = NULL; } else { for (i = 0; i < dgram->payload.ctrl.count; i++) { free(dgram->payload.ctrl.control[i].value); dgram->payload.ctrl.control[i].value = NULL; } free(dgram->payload.ctrl.control); dgram->payload.ctrl.control = NULL; } return 0; } int artp_receive_dgram(SID_TYPE sid, struct sockaddr *sender, struct artp_dgram *dgram) { struct session_item *session; DSEQ_TYPE dseq; char *bitstream; int dgram_size; /* try to find wanted session */ get_session(sid, sender, &session, 1); if (session->session_type == NON_EST) return E_NONEST_SESSION; /* try to obtain any datagram from buffer */ if (rbuffers_get_dgram(session->buffers_id, &bitstream, &dgram_size, &dgram->type, &dseq, NULL, NULL) < 0) { if (session->session_status != LIVE) { /* there's nothing to receive and session is dead */ destroy_reference(session); return E_DEAD_SESSION; } else { /* there's nothing to receive but session is alive */ destroy_reference(session); return E_EMPTY_BUFFER; } } /* parse incoming datagram depending on it's type */ switch (dgram->type) { case CTRL: parse_controls(bitstream, dgram_size, (struct payload_CTRL *) &dgram->payload); break; case DATA: parse_data(bitstream, dgram_size, (struct payload_DATA *) &dgram->payload); dgram->payload.data.dseq = dseq; break; default: return E_BAD_DGRAM; } /* remove reference for this session */ destroy_reference(session); /* clean memory */ free(bitstream); bitstream = NULL; return 0; } int artp_receive_any_dgram(SID_TYPE *sid, struct sockaddr **sender, struct artp_dgram *dgram) { char *bitstream; DSEQ_TYPE dseq; int dgram_size; /* try to obtain any datagram from buffer */ if (rbuffers_get_dgram(0, &bitstream, &dgram_size, &dgram->type, &dseq, sender, sid) < 0) return E_EMPTY_BUFFER; /* parse incoming datagram depending on it's type */ switch (dgram->type) { case CTRL: parse_controls(bitstream, dgram_size, (struct payload_CTRL *) &dgram->payload); break; case DATA: parse_data(bitstream, dgram_size, (struct payload_DATA *) &dgram->payload); dgram->payload.data.dseq = dseq; break; default: return E_BAD_DGRAM; } /* clean memory */ free(bitstream); bitstream = NULL; return 0; } int artp_get_undlvr_session(SID_TYPE *sid, struct sockaddr *receiver) { struct dead_zero_sessions *p; /* lock structure for undeliverable sessions */ pthread_mutex_lock(&dead_zero_start_mutex); /* check whether there's any undeliverable session */ if ((dead_zero_start->invalid == 1) || (receiver == NULL)) { /* unlock structure for undeliverable sessions */ pthread_mutex_unlock(&dead_zero_start_mutex); return E_EMPTY_BUFFER; } /* move pointers to next undeliverable session */ p = dead_zero_start; dead_zero_start = dead_zero_start->next; pthread_mutex_unlock(&dead_zero_start_mutex); /* return necessary information */ *sid = p->sid; rcvrcpy(receiver, (struct sockaddr *) &p->receiver); /* free memory */ free(p); p = NULL; return 0; } int artp_get_sid(struct sockaddr *receiver, SID_TYPE *sid) { SID_TYPE initial_sid; /* lock mutex for searching next available sid */ pthread_mutex_lock(&global_sid_mutex); /* remember the starting session identification number (to determine * whether there's any session id available. */ initial_sid = global_sid - 1; /* find the available session id */ while ((get_session(global_sid, receiver, NULL, 0) != 0) && (global_sid != initial_sid)) ++global_sid; /* is there any session id available? */ if (global_sid == initial_sid) { /* unlock mutex for searching next available session id */ pthread_mutex_unlock(&global_sid_mutex); return E_NO_AVAIL_SID; } /* return found session id */ *sid = global_sid; /* increase global session id for next searching */ ++global_sid; /* unlock mutex for searching next available session id */ pthread_mutex_unlock(&global_sid_mutex); return 0; } int artp_set_session_options(SID_TYPE sid, struct sockaddr *receiver, int use, enum artp_session_options option, ...) { struct session_item *session; int retval; va_list ap; /* try to find proper session */ if (get_session(sid, receiver, &session, 1) == 0) { destroy_reference(session); return E_NONEST_SESSION; } /* initialize variable parameters list */ va_start(ap, option); /* try to set wanted option */ retval = use_options(session, use, option, &ap); /* deinitialize variable parameters list */ va_end(ap); /* remove reference for this session */ destroy_reference(session); return retval; } int artp_set_session_params(SID_TYPE sid, struct sockaddr *receiver, enum artp_session_params param, ...) { struct session_item *session; va_list ap; unsigned int temp_ui; MSS_TYPE temp_mss; RETR_TIMEOUT_TYPE temp_retr; int temp_i; unsigned long int temp_ul; TS_TYPE temp_t; /* try to find proper session */ if (get_session(sid, receiver, &session, 1) <= 0) { destroy_reference(session); return E_NONEST_SESSION; } /* initialize variable parameters list */ va_start(ap, param); /* set parameter */ switch (param) { case RETRIES_TIMEOUT: temp_retr = va_arg(ap, unsigned int); if (temp_retr <= 0) return E_BAD_PARAM_VALUE; session->retries_timeout = temp_retr; break; case MSS: temp_mss = va_arg(ap, unsigned int); if (temp_mss <= 0) return E_BAD_PARAM_VALUE; if ((session->partner_options[MSS] != NULL) && (temp_mss > *((MSS_TYPE *) session->partner_options[MSS]))) return E_PARTNER_MSS; session->mss = temp_mss; break; case EXP_TIME: temp_t = va_arg(ap, TS_TYPE); if (temp_t < 0) return E_BAD_PARAM_VALUE; session->expiration_time = temp_t; break; case MAX_ACKS_COUNT: temp_ui = va_arg(ap, unsigned int); if (temp_ui <= 0) return E_BAD_PARAM_VALUE; session->max_acks_count = temp_ui; break; case MAX_SBUFFER_SIZE: temp_ul = va_arg(ap, unsigned long int); if (temp_ul < 0) return E_BAD_PARAM_VALUE; session->sbuffer_max_size = temp_ul; break; case MAX_RBUFFER_SIZE: temp_ul = va_arg(ap, unsigned long int); if (temp_ul < 0) return E_BAD_PARAM_VALUE; session->rbuffer_max_size = temp_ul; break; case MAX_RBUFFER_RED_LIMIT: temp_ul = va_arg(ap, unsigned long int); if (temp_ul < 0) return E_BAD_PARAM_VALUE; session->rbuffer_red_limit = temp_ul; break; case MAX_RBUFFER_RED_PROBABILITY: temp_i = va_arg(ap, int); if (temp_i < 0) return E_BAD_PARAM_VALUE; session->rbuffer_red_prob = temp_i; break; default: destroy_reference(session); va_end(ap); return E_INVALID_PARAMETER; } /* deinitialize variable parameters list */ va_end(ap); /* remove reference for this session */ destroy_reference(session); return 0; } /** Remove sent packets after incoming acknowledgement. * This function removes packets from the sent packets buffer after their * incoming acknowledgement. When the acknowledging packet is found, possible * session parameters are computed. * * @param session * the pointer to the place where session information is saved. * * @param sid * the session identification number (used for non-established sessions * only). * * @param sender * the pointer to the place where session sender/receiver is stored * (used for non-established sessions only). * * @param seq * the array of acknowledging sequence numbers. * * @param seq_count * the count of sequence numbers in array. * * @param current_time * actual time. * * @param ack_sent_time * acknowledgement packet sending time. * * @return zero * success. * * @return nonzero * related error code if something failed (for further information see * documentation of file @c errors.h). */ static int incoming_acks(struct session_item *session, SID_TYPE sid, struct sockaddr *sender, SEQ_TYPE *seq, int seq_count, double current_time, double ack_sent_time) { int i; double sent_time; int size; /* necessary when ack came for non-established session but this session * was during travel time established. We have to delete that packet from * non-established buffer, too. */ if ((seq_count == 1) && (session->session_type != NON_EST)) sbuffers_ack_event(0, sender, sid, seq[0], &size, &sent_time); /* delete all packets from sent buffer identified by its sequence number */ for (i = 0; i < seq_count; i++) { if (sbuffers_ack_event(session->buffers_id, sender, sid, seq[i], &size, &sent_time) == 0) { /* packet was found in buffer */ /* lock critical session information for writing */ pthread_mutex_lock(&session->session_mutex); /* decrease actual window size */ if (session->session_type != NON_EST) session->flight -= size; /* change actual congestion window size */ session->cwnd += CWND_ACK(session->mss, session->cwnd); /* if packet wasn't retransmitted yes, compute round trip time and * others possible information */ if ((session->session_type != NON_EST) && (sent_time != -1)) { session->rtt = current_time - sent_time; session->srtt = SRTT_COMPUTE(session->rtt, session->srtt); session->rto = RTO_COMPUTE(session->srtt, global_setting.initial_rto_time); session->ts_delta = TS_DELTA_COMPUTE(current_time, ack_sent_time, session->srtt, session->ts_delta); } /* unlock critical session information for writing */ pthread_mutex_unlock(&session->session_mutex); } /* for non-established sessions congestion window has no sense */ if (session->session_type != NON_EST) while (send_out_next_packet(session) == 0); send_out_next_packet(session); } return 0; } /** Thread for retransmitting packets. * This function works as a thread which looks for some packet to be resend * and resends it. */ static void * timer_thread(void *arg) { double time; double partner_time; double top_rto_time; double first_send_time; char *bitstream; int size; int sent_size; int position; unsigned long int usec_wait = 0; SID_TYPE sid; SEQ_TYPE seq; struct session_item *session; struct timeval current_time; struct sockaddr *rcvr; struct dead_zero_sessions *p; struct dead_zero_sessions *q; while (1) { gettimeofday(¤t_time, 0); time = current_time.tv_sec + current_time.tv_usec / 1000000.0; /* find out the top time for making retransmission */ top_rto_time = sbuffers_get_top_rsnd_time(); /* shall we resend that packet just now? */ if ((top_rto_time > 0) && (top_rto_time <= time)) { /* obtain packet information from buffer */ if (sbuffers_get_rsnd_packet(&bitstream, &size, &rcvr, &first_send_time) == 0) { sid = *((SID_TYPE *) bitstream + 1); get_session(sid, rcvr, &session, 1); /* if the packet's session is dead, skip it */ if (session->session_status == DEAD) { sbuffers_ignore_first_rsnd(bitstream); destroy_reference(session); continue; } /* check whether the session is dead or not */ if (first_send_time + session->retries_timeout < time) { if (session->session_type == NON_EST) { /* for non-established sessions we have to save * information about non-deliverable packet to * relevant structure. */ /* try to allocate new space for it */ if ((p = (struct dead_zero_sessions *) malloc(sizeof(struct dead_zero_sessions))) == NULL) exit(E_MEMORY_FAIL); /* insert it at the end of that structure */ p->invalid = 1; q = dead_zero_end; dead_zero_end->next = p; dead_zero_end = p; q->sid = sid; rcvrcpy((struct sockaddr *) &(q->receiver), rcvr); q->invalid = 0; /* delete that packet from sent buffer */ seq = ntohl(*((SEQ_TYPE *) bitstream + sizeof(SID_TYPE) + 1 + sizeof(OPTSZ_TYPE))); sbuffers_ack_event(session->buffers_id, rcvr, sid, seq, NULL, NULL); } else { /* established connections */ session->session_status = DEAD; /* skip leading packet for retransmitting */ sbuffers_ignore_first_rsnd(bitstream); } continue; } /* everything's ok. We have to retransmit given packet */ /* change packet time information */ partner_time = time - session->ts_delta; position = 1 + sizeof(SID_TYPE) + sizeof(OPTSZ_TYPE) + sizeof(SEQ_TYPE); *((TS_TYPE *) (bitstream + position)) = htonl((TS_TYPE) partner_time); position += sizeof(TS_TYPE); *((TS_TYPE *) (bitstream + position)) = htonl((TS_TYPE) ((partner_time - ((TS_TYPE) partner_time)) * 1000000)); position += sizeof(TS_TYPE); if ((session->ts_delta == 0) && (session->srtt == 0)) *((TS_TYPE *) (bitstream + position)) = htonl(0); else *((TS_TYPE *) (bitstream + position)) = htonl(session->expiration_time); /* send it */ sent_size = sendto(sckt, bitstream, size, 0, rcvr, rcvrsz(rcvr)); if (sent_size == size) { /* change congestion window size */ pthread_mutex_lock(&session->session_mutex); session->cwnd = CWND_RETRANS(session -> mss, session->cwnd); pthread_mutex_unlock(&session->session_mutex); /* move packet in buffer */ sbuffers_resend_event(time + session->rto); /* update last sending time */ session->last_send_time = time; } /* remove reference for this session */ destroy_reference(session); /* deallocate given receiver */ free(rcvr); rcvr = NULL; } } else if (top_rto_time > 0) { /* wait for next packet */ usec_wait = (unsigned long int) (top_rto_time - time) * 1000000; usleep(usec_wait); } else usleep(100); } return 0; } /** Thread for receiving packets. * This function works as a thread which waits for incoming packets, * partially parses them and saves them into receiving buffers. * This thread sends acknowledgement packets (if necessary), too. */ static void * receiver_thread(void *arg) { socklen_t sock_addr_length; int rcvd_len; char bitstream[MAX_ARTP_PACKET_SIZE]; char *header; char *options; int header_size; int options_size; int position; int size; int i; unsigned long int buffer_size; fd_set rset; struct timeval timeout; union artp_receiver from; struct sockaddr *receiver; struct session_item *session; int buffers_id; SEQ_TYPE *acks; int acks_array_size; int retval = 0; double current_time; unsigned int retransmits_timeout; double delta; struct timeval tv_current_time; int mss; SID_TYPE packet_sid; SEQ_TYPE packet_seq; long double packet_ts; long double packet_exp; enum packet_type packet_type; OPTSZ_TYPE packet_optsz; DSEQ_TYPE packet_dseq; FRAGMENTS_TYPE packet_frag; FRAGMENTS_TYPE packet_nfrag; char *packet_payload; /* create array for incoming sequence numbers (from acknowledgement * packets). */ acks_array_size = 10; if ((acks = (SEQ_TYPE *) malloc(acks_array_size * sizeof(SEQ_TYPE))) == NULL) { exit(E_MEMORY_FAIL); } while (1) { FD_ZERO(&rset); FD_SET(sckt, &rset); timeout.tv_sec = 2; timeout.tv_usec = 0; /* find out actual time */ gettimeofday(&tv_current_time, 0); current_time = tv_current_time.tv_sec + tv_current_time.tv_usec / 1000000.0; /* are there any acks to be sent right now? */ while (abuffers_get_ack(current_time, &packet_sid, &receiver, &packet_payload, &size) == 0) { /* yes, there're */ /* find out the proper session */ get_session(packet_sid, receiver, &session, 1); /* find out maximum segment size */ mss = session->mss; /* find out session options */ get_session_options(session, NULL, &options, &options_size); /* create acknowledgement packet */ position = 0; do { /* try to get header for that packet */ if ((retval = get_packet_header(session, packet_sid, NULL, options, options_size, &header, &header_size, &packet_seq)) != 0) exit(retval); /* copy header to the packet */ memcpy(bitstream, header, header_size); rcvd_len = (size - position + header_size < mss) ? size - position : ((mss - header_size) % sizeof(SEQ_TYPE)) * sizeof(SEQ_TYPE); /* copy there the payload */ memcpy(bitstream + header_size, packet_payload + position, rcvd_len); /* send it */ sendto(sckt, bitstream, header_size + rcvd_len, 0, receiver, rcvrsz(receiver)); position += rcvd_len; /* free header */ free(header); header = NULL; } while (position < size); /* free space allocated for options */ free(options); options = NULL; /* remove reference for that session */ destroy_reference(session); } /* is there any incoming packet? */ if (select(sckt + 1, &rset, NULL, NULL, &timeout) < 0) continue; /* yes, there's */ sock_addr_length = sizeof(union artp_receiver); if (FD_ISSET(sckt, &rset)) { /* receive that packet */ rcvd_len = recvfrom(sckt, bitstream, MAX_ARTP_PACKET_SIZE, 0, (struct sockaddr *) &from, &sock_addr_length); if (rcvd_len > 0) { /* check whether it's the right version */ if ((((bitstream[0]) & 240) >> 4) != ARTP_VERSION) continue; /* parse packet information */ packet_type = (bitstream[0]) & 15; position = 1; packet_sid = *((SID_TYPE *) bitstream + position); position += sizeof(SID_TYPE); packet_optsz = ntohs(*((OPTSZ_TYPE *) (bitstream + position))); position += sizeof(OPTSZ_TYPE); packet_seq = ntohl(*((SEQ_TYPE *) (bitstream + position))); position += sizeof(SEQ_TYPE); packet_ts = ntohl(*((TS_TYPE *) (bitstream + position))); position += sizeof(TS_TYPE); packet_ts += ntohl(*((TS_TYPE *) (bitstream + position))) / 1000000.0; position += sizeof(TS_TYPE); packet_exp = ntohl(*((TS_TYPE *) (bitstream + position))) / 1000000.0; position += sizeof(TS_TYPE); /* find the session whose packet it is */ get_session(packet_sid, (struct sockaddr *) &from, &session, 1); buffers_id = session->buffers_id; delta = session->ts_delta; /* check whether it's expired */ if (packet_exp != 0) { switch (packet_type) { case DATA: case CTRL: if (packet_ts + packet_exp < current_time) { destroy_reference(session); continue; } break; case ACK: if ((delta != 0) && (session->rtt != 0) && (packet_ts + delta + packet_exp < current_time)) { destroy_reference(session); continue; } break; } } /* we ban data in non-established sessions */ if ((session->session_type == NON_EST) && (packet_type == DATA)) { destroy_reference(session); continue; } /* parse incoming options */ parse_session_options(session, bitstream + position, packet_optsz); position += packet_optsz; /* check whether receive buffer is full */ if ((packet_type != ACK) && (session->rbuffer_max_size != 0)) { /* find out the buffer size */ if (rbuffers_get_size(buffers_id, &buffer_size) != 0) { /* some error happened */ destroy_reference(session); continue; } if (buffer_size >= session->rbuffer_max_size) { /* buffer is full */ destroy_reference(session); continue; } if ((session->rbuffer_red_limit != 0) && (buffer_size >= session->rbuffer_red_limit)) { /* buffer is not full but its count increased a lot. * We will use random early detection. */ srandom(time(NULL)); /* find out random number between 0 and 100. If it's * lesser than probability of dropping, drop this * packet. */ if ((int) ((100.0 * random()) / (RAND_MAX + 1.0)) <= session->rbuffer_red_prob) { /* dropping this packet */ destroy_reference(session); continue; } } } /* next steps depends on its type */ switch (packet_type) { case DATA: /* parse rest necessary information */ packet_dseq = ntohl(*((DSEQ_TYPE *) (bitstream + position))); position += sizeof(DSEQ_TYPE); packet_frag = ntohs(*((FRAGMENTS_TYPE *) (bitstream + position))); position += sizeof(FRAGMENTS_TYPE); packet_nfrag = ntohs(*((FRAGMENTS_TYPE *) (bitstream + position))); position += sizeof(FRAGMENTS_TYPE); /* find out current partner retransmit timeout */ if (session->partner_options[RETRANSMITS_TIMEOUT] != NULL) retransmits_timeout = *((RETR_TIMEOUT_TYPE *) session-> partner_options[RETRANSMITS_TIMEOUT]); else retransmits_timeout = 0; /* allocate space for the packet payload */ if ((packet_payload = (char *) malloc(sizeof(char) * (rcvd_len - position))) == NULL) { exit(E_MEMORY_FAIL); } /* move the payload there */ memcpy(packet_payload, bitstream + position, rcvd_len - position); /* save it into buffer */ retval = rbuffers_add_packet(buffers_id, packet_payload, rcvd_len - position, (struct sockaddr *) &from, packet_sid, packet_type, packet_seq, packet_dseq, packet_frag, packet_nfrag, current_time, retransmits_timeout); /* if failed, deallocate packet's payload space */ if (retval != 0) { free(packet_payload); packet_payload = NULL; } break; case CTRL: /* find out current partner retransmit timeout */ if (session->partner_options[RETRANSMITS_TIMEOUT] != NULL) retransmits_timeout = *((RETR_TIMEOUT_TYPE *) session-> partner_options[RETRANSMITS_TIMEOUT]); else retransmits_timeout = 0; /* allocate space for incoming packet */ if ((packet_payload = (char *) malloc(sizeof(char) * (rcvd_len - position))) == NULL) { exit(E_MEMORY_FAIL); } /* copy payload to the new space */ memcpy(packet_payload, bitstream + position, rcvd_len - position); /* save it into buffer */ retval = rbuffers_add_packet(buffers_id, packet_payload, rcvd_len - position, (struct sockaddr *) &from, packet_sid, packet_type, packet_seq, 0, 1, 1, current_time, retransmits_timeout); /* if failed, deallocate packet's payload space */ if (retval != 0) { free(packet_payload); packet_payload = NULL; } break; case ACK : retval = 0; /* obtain all acked sequence numbers */ i = 0; while (position < rcvd_len) { acks[i] = ntohl(*((SEQ_TYPE *) (bitstream + position))); position += sizeof(SEQ_TYPE); i++; /* is the array for sequence numbers big enough? */ if (i == acks_array_size) { acks_array_size += 5; if ((acks = (SEQ_TYPE *) realloc(acks, acks_array_size * sizeof(SEQ_TYPE))) == NULL) { exit(E_MEMORY_FAIL); } } } /* delete all proper packets from sent buffer */ incoming_acks(session, packet_sid, (struct sockaddr *) &from, acks, i, current_time, packet_ts); break; } /* save incoming sequence number to ack buffer. * NOTE: If duplicity packet came we must send acknowledgement * because our last ack could lost. */ if ((packet_type != ACK) && ((retval == 0) || (retval == E_DUPLICITY_PACKET))) { abuffers_add_seq(buffers_id, packet_sid, (struct sockaddr *) &from, packet_seq, current_time + LATEST_ACKS_SEND(session->srtt), session->max_acks_count); } /* remove reference for that session */ destroy_reference(session); } } } return 0; } int artp_init(int socket, char *filename) { pthread_t timer; pthread_t receiver; struct dead_zero_sessions *p; int retval; /* try to initialize buffers */ if ((sbuffers_init() != 0) || (rbuffers_init() != 0) || (abuffers_init() != 0) || (options_init() != 0)) return E_BUF_INIT_ERROR; /* initialize locks for readers/writers */ RW_INIT(x, y, z, wsem, rsem, wcount, rcount); /* set default setting */ setting_set_defaults(); /* if filename is specified read options from the proper file */ if ((filename != NULL) && (filename[0] != '\0') && ((retval = setting_read_file(filename)) != 0)) return retval; /* create session for non-established connections */ if ((retval = artp_prepare_connection(0, NULL)) != 0) return retval; /* initialize sessions identification number counter */ srandom(time(NULL)); global_sid = (int) ((((double) SID_MAX) * rand()) / (RAND_MAX + 1.0)); if (pthread_mutex_init(&global_sid_mutex, NULL) != 0) return E_MEMORY_FAIL; /* create structure for undeliverable sessions */ if ((p = (struct dead_zero_sessions *) malloc(sizeof(struct dead_zero_sessions))) == NULL) return E_MEMORY_FAIL; /* mark this new item as a slipper */ p->invalid = 1; dead_zero_start = p; dead_zero_end = p; /* create mutex for this structure */ if (pthread_mutex_init(&dead_zero_start_mutex, NULL) != 0) return E_MEMORY_FAIL; sckt = socket; /* try to start all necessary threads */ if ((pthread_create(&timer, NULL, timer_thread, NULL) != 0) || (pthread_create(&receiver, NULL, receiver_thread, NULL) != 0)) return E_START_THREADS; return 0; } /* vim: set ts=4 : */