/* * Active Router Transport Protocol (ARTP) implementation * Copyright (c) 2004, Tomas Rebok * All rights reserved. * * This program is free software; you can redistribute it and/or * modify it under the terms of the "BSD License" which is * distributed with the software in the file LICENSE. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the BSD * License for more details. */ /** @file * @c ARTP library for send buffers. * @author Tomas Rebok * @date 2004 */ #include #include #include #include #include "sbuffers.h" #include "setting.h" #include "net.h" #include "errors.h" /** Structure used for saving packet information. */ struct item { /** packet payload */ char *packet; /** the pointer to the buffer whose packet it is */ struct Tsending_buffers *buffer_info; /** packet payload length (size) */ int length; /** packet sequence number */ SEQ_TYPE seq; /** packet first sending time */ double first_send_time; /** packet time to resend */ double time_to_resend; /** packet retransmit counter */ int retr_counter; /** the slipper sign (validity sign) */ int invalid; /** next packet in buffer */ struct item *next; /** last packet in buffer */ struct item *last; /** next packet in structure for retransmissions */ struct item *next_to_resend; /** last packet in structure for retransmissions */ struct item *last_to_resend; }; /** Structure for buffer identified by 0. * This structure is used for saving information about non-established * sessions - we must remember some more information about each packet. */ struct ext_item { /** generic packet structure */ struct item main_item; /** packet's receiver */ union artp_receiver receiver; /** packet's session identifier */ SID_TYPE sid; }; /** Send buffers main structure */ struct Tsending_buffers { /** session receiver */ union artp_receiver receiver; /** buffer for sent packets (its start) */ struct item *sent_start; /** buffer for sent packets (its end) */ struct item *sent_end; /** the beginning of buffer for packets to be sent */ struct item *to_send; /** the end of buffer for packets to be sent */ struct item *end; /** counter of buffer size */ unsigned long int buffer_size; /** mutex used for mutual exclusion of threads manipulating with buffer * size counter */ pthread_mutex_t buffer_size_mutex; /** mutex used for mutual exclusion of threads reading from sent packets * buffer. */ pthread_mutex_t sent_start_mutex; /** mutex used for mutual exclusion of threads writing to sent packets * buffer end */ pthread_mutex_t sent_end_mutex; /** mutex used for mutual exclusion of threads reading from to send packets * buffer */ pthread_mutex_t to_send_mutex; /** mutex used for mutual exclusion of threads writing to to send packets * buffer end */ pthread_mutex_t end_mutex; }; /** The total count of send buffers (corresponds to the count of the * pointers in array described below). */ static int sbuffers_count = 0; /** The array of pointers to send buffers structures. */ static struct Tsending_buffers **sbuffers; /** Mutex used as a mutual exclusion of threads reading the structure for * retransmissions. */ static pthread_mutex_t resend_mutex; /** structure for retransmissions (start) */ static struct item *resend_start = NULL; /** structure for retransmissions (end) */ static struct item *resend_end = NULL; int sbuffers_init(void) { /* initialize mutex used for structure for retransmission */ if (pthread_mutex_init(&resend_mutex, NULL) != 0) return E_MEMORY_FAIL; return 0; } int sbuffers_create(struct sockaddr *receiver) { int i; int position = 0; struct item *p; struct item *q; /* try to find some free space in array of pointer to buffers structure */ for (i = 0; i < sbuffers_count; i++) { if (sbuffers[i] == NULL) { position = i; break; } } if ((sbuffers_count == 0) || (sbuffers[position] != NULL)) { /* there's no free space. Resize the array of pointers and use the last * item. */ if ((sbuffers = (struct Tsending_buffers**) realloc(sbuffers, (sbuffers_count + 1) * sizeof(struct Tsending_buffers*))) == NULL) return E_MEMORY_FAIL; else position = sbuffers_count++; } /* allocate the send buffer */ if ((sbuffers[position] = (struct Tsending_buffers*) malloc(sizeof(struct Tsending_buffers))) == NULL) return E_MEMORY_FAIL; /* create slippers */ if (position == 0) { /* extended item */ if ((p = (struct item *) malloc(sizeof(struct ext_item))) == NULL) return E_MEMORY_FAIL; if ((q = (struct item *) malloc(sizeof(struct ext_item))) == NULL) return E_MEMORY_FAIL; } else { /* generic item */ if ((p = (struct item *) malloc(sizeof(struct item))) == NULL) return E_MEMORY_FAIL; if ((q = (struct item *) malloc(sizeof(struct item))) == NULL) return E_MEMORY_FAIL; } p->invalid = 1; q->invalid = 1; p->packet = NULL; q->packet = NULL; /* save session receiver */ rcvrcpy((struct sockaddr *) &(sbuffers[position]->receiver), receiver); /* initialize buffer parameters */ sbuffers[position]->sent_start = p; sbuffers[position]->sent_end = p; sbuffers[position]->to_send = q; sbuffers[position]->end = q; sbuffers[position]->buffer_size = 0; /* create mutexes */ if ((pthread_mutex_init(&sbuffers[position]->sent_start_mutex, NULL) != 0) || (pthread_mutex_init(&sbuffers[position]->sent_end_mutex, NULL) != 0) || (pthread_mutex_init(&sbuffers[position]->to_send_mutex, NULL) != 0) || (pthread_mutex_init(&sbuffers[position]->end_mutex, NULL) != 0) || (pthread_mutex_init(&sbuffers[position]->buffer_size_mutex, NULL) != 0)) return E_MEMORY_FAIL; return position; } static int del_from_resend_buffer(struct item *p) { pthread_mutex_lock(&resend_mutex); /* is everything correct? */ if ((resend_start == NULL) || (p == NULL)) { pthread_mutex_unlock(&resend_mutex); return E_PACKET_NOT_FOUND; } /* delete the item from structure for retransmissions */ if (p == resend_start) { /* it's the first item */ resend_start = p->next_to_resend; if (resend_start != NULL) resend_start->last_to_resend = NULL; } else if (p == resend_end) { /* it's the last item */ resend_end = p->last_to_resend; resend_end->next_to_resend = NULL; } else if ((p->last_to_resend != NULL) && (p->next_to_resend != NULL)) { /* it's somewhere in the middle */ p->last_to_resend->next_to_resend = p->next_to_resend; p->next_to_resend->last_to_resend = p->last_to_resend; } pthread_mutex_unlock(&resend_mutex); return 0; } int sbuffers_destroy(int id_buffer) { struct item *p; /* check whether given buffer identification number is correct */ if ((id_buffer < 1) || (id_buffer > sbuffers_count) || (sbuffers[id_buffer] == NULL)) return E_INVALID_BUFFER_ID; /* delete all packets in sent packets buffer */ while ((p = sbuffers[id_buffer]->sent_start) != NULL) { sbuffers[id_buffer]->sent_start = p->next; /* delete it from structure for retransmissions, too */ del_from_resend_buffer(p); /* deallocate packet */ free(p->packet); p->packet = NULL; free(p); p = NULL; } /* delete all packets in to send packets buffer */ while ((p = sbuffers[id_buffer]->to_send) != NULL) { sbuffers[id_buffer]->to_send = p->next; /* deallocate packet */ free(p->packet); p->packet = NULL; free(p); p = NULL; } /* clean memory */ sbuffers[id_buffer]->sent_start = NULL; sbuffers[id_buffer]->sent_end = NULL; sbuffers[id_buffer]->to_send = NULL; sbuffers[id_buffer]->end = NULL; /* delete all mutexes and other structures */ pthread_mutex_destroy(&sbuffers[id_buffer]->sent_start_mutex); pthread_mutex_destroy(&sbuffers[id_buffer]->sent_end_mutex); pthread_mutex_destroy(&sbuffers[id_buffer]->to_send_mutex); pthread_mutex_destroy(&sbuffers[id_buffer]->end_mutex); pthread_mutex_destroy(&sbuffers[id_buffer]->buffer_size_mutex); /* deallocate buffer */ free(sbuffers[id_buffer]); sbuffers[id_buffer] = NULL; return 0; } int sbuffers_get_size(int id_buffer, unsigned long int *buffer_size) { /* check whether given buffer identification number is correct */ if ((id_buffer < 0) || (id_buffer >= sbuffers_count) || (sbuffers[id_buffer] == NULL)) return E_INVALID_BUFFER_ID; *buffer_size = sbuffers[id_buffer]->buffer_size; return 0; } int sbuffers_add_packet(int id_buffer, struct sockaddr *receiver, SID_TYPE sid, char *value, int size, SEQ_TYPE seq) { struct item *p; struct item *r; /* check whether given buffer identification number is correct */ if ((id_buffer < 0) || (id_buffer >= sbuffers_count) || (sbuffers[id_buffer] == NULL)) return E_INVALID_BUFFER_ID; /* allocate new space for inserted packet */ if (id_buffer == 0) { /* extended item */ if ((p = (struct item *) malloc(sizeof(struct ext_item))) == NULL) return E_MEMORY_FAIL; } else { /* generic item */ if ((p = (struct item *) malloc(sizeof(struct item))) == NULL) return E_MEMORY_FAIL; } /* sign it as a new slipper */ p->invalid = 1; p->last = NULL; p->next = NULL; p->packet = NULL; /* lock to send buffer for inserting */ pthread_mutex_lock(&sbuffers[id_buffer]->end_mutex); r = sbuffers[id_buffer]->end; r->next = p; sbuffers[id_buffer]->end = p; /* unlock to send buffer for inserting */ pthread_mutex_unlock(&sbuffers[id_buffer]->end_mutex); /* when inserting into buffer identified by 0 (non-established sessions), * we have to save more information about each packet */ if (id_buffer == 0) { rcvrcpy((struct sockaddr *) &(((struct ext_item *) r)->receiver), receiver); ((struct ext_item *) r)->sid = sid; } /* save all necessary information */ r->buffer_info = sbuffers[id_buffer]; r->seq = seq; r->retr_counter = 0; r->first_send_time = 0; r->time_to_resend = 0; r->length = size; r->packet = value; r->next_to_resend = NULL; r->last_to_resend = NULL; r->invalid = 0; /* increase buffer size */ pthread_mutex_lock(&sbuffers[id_buffer]->buffer_size_mutex); sbuffers[id_buffer]->buffer_size += size; pthread_mutex_unlock(&sbuffers[id_buffer]->buffer_size_mutex); return 0; } int sbuffers_get_packet(int id_buffer, char **value, int *size, struct sockaddr **receiver) { /* check whether given buffer identification number is correct */ if ((id_buffer < 0) || (id_buffer >= sbuffers_count) || (sbuffers[id_buffer] == NULL)) return E_INVALID_BUFFER_ID; /* lock to send buffer for reading */ pthread_mutex_lock(&sbuffers[id_buffer]->to_send_mutex); if (sbuffers[id_buffer]->to_send->invalid == 1) { /* No packet is available. Unlock to send buffer for reading */ pthread_mutex_unlock(&sbuffers[id_buffer]->to_send_mutex); return E_EMPTY_BUFFER; } /* return all necessary information */ *value = sbuffers[id_buffer]->to_send->packet; *size = sbuffers[id_buffer]->to_send->length; /* if returning packet from buffer identified by 0 (non-established * sessions), we have to return more information about each packet. */ if (id_buffer == 0) *receiver = (struct sockaddr *) &(((struct ext_item *) sbuffers[id_buffer]->to_send)->receiver); else *receiver = (struct sockaddr *) &(sbuffers[id_buffer]->receiver); /* Unlock to send buffer for reading */ pthread_mutex_unlock(&sbuffers[id_buffer]->to_send_mutex); return 0; } int sbuffers_send_event(int id_buffer, char *bitstream, int size, double sent_time, double time_to_resend) { struct item *p; struct item *r; /* check whether given buffer identification number is correct */ if ((id_buffer < 0) || (id_buffer >= sbuffers_count) || (sbuffers[id_buffer] == NULL)) return E_INVALID_BUFFER_ID; /* lock to send buffer for reading */ pthread_mutex_lock(&sbuffers[id_buffer]->to_send_mutex); /* check whether buffer is empty or whether wanted packet was already * moved by another thread. */ if ((sbuffers[id_buffer]->to_send->invalid == 1) || (sbuffers[id_buffer]->to_send->packet != bitstream) || (sbuffers[id_buffer]->to_send->length != size)) { /* buffer is empty or there's another packet. Unlock to send buffer * for reading */ pthread_mutex_unlock(&sbuffers[id_buffer]->to_send_mutex); return E_EMPTY_BUFFER; } p = sbuffers[id_buffer]->to_send; sbuffers[id_buffer]->to_send = p->next; /* unlock to send buffer for reading */ pthread_mutex_unlock(&sbuffers[id_buffer]->to_send_mutex); /* lock buffer of sent packets for writing */ pthread_mutex_lock(&sbuffers[id_buffer]->sent_end_mutex); /* mark it as a new slipper */ p->next = NULL; p->last = sbuffers[id_buffer]->sent_end; p->invalid = 1; /* insert it at the end of buffer */ r = sbuffers[id_buffer]->sent_end; r->next = p; sbuffers[id_buffer]->sent_end = p; /* save some necessary information */ r->packet = p->packet; r->buffer_info = p->buffer_info; r->length = p->length; r->seq = p->seq; r->retr_counter = p->retr_counter; p->packet = NULL; /* if moving packet from buffer for non-established sessions, we have to * copy more inforamtion. */ if (id_buffer == 0) { rcvrcpy((struct sockaddr *) &(((struct ext_item *) r)->receiver), (struct sockaddr *) &(((struct ext_item *) p)->receiver)); ((struct ext_item *) r)->sid = ((struct ext_item *) p)->sid; } /* unlock buffer of sent packets for writing */ pthread_mutex_unlock(&sbuffers[id_buffer]->sent_end_mutex); r->first_send_time = sent_time; r->time_to_resend = time_to_resend; /* insert this packet into the structure for retransmissions */ /* lock this structure */ pthread_mutex_lock(&resend_mutex); /* find out the proper position for inserted packet */ p = resend_start; while ((p != NULL) && (p->time_to_resend <= r->time_to_resend)) p = p->next_to_resend; /* insert it */ if (resend_start == NULL) { /* inserting into empty structure */ resend_start = r; resend_end = r; r->last_to_resend = NULL; r->next_to_resend = NULL; } else if (p == resend_start) { /* inserting at the beginning of structure */ r->next_to_resend = resend_start; r->last_to_resend = NULL; resend_start->last_to_resend = r; resend_start = r; } else if (p == NULL) { /* inserting at the end of structure */ r->next_to_resend = NULL; r->last_to_resend = resend_end; resend_end->next_to_resend = r; resend_end = r; } else { /* inserting somewhere in the middle */ r->next_to_resend = p; r->last_to_resend = p->last_to_resend; r->next_to_resend->last_to_resend = r; r->last_to_resend->next_to_resend = r; } pthread_mutex_unlock(&resend_mutex); /* mark this new item as a valid one (before it was signed as a slipper) */ r->invalid = 0; return 0; } int sbuffers_resend_event(double time_to_resend) { struct item *p; struct item *r; /* lock structure for retransmissions */ pthread_mutex_lock(&resend_mutex); if (resend_start == NULL) { /* there is no packet to resend - structure is empty */ pthread_mutex_unlock(&resend_mutex); return E_EMPTY_BUFFER; } /* update time for next retransmission and increase retransmissions * counter. */ resend_start->time_to_resend = time_to_resend; ++resend_start->retr_counter; /* is there only one packet? No other steps are necessary. */ if (resend_start->next_to_resend == NULL) { /* unlock structure for retransmissions */ pthread_mutex_unlock(&resend_mutex); return 0; } /* move retransmitted packet to its right new position */ r = resend_start; resend_start = r->next_to_resend; if (resend_start != NULL) resend_start->last_to_resend = NULL; /* find this new position */ p = resend_start; while ((p != NULL) && (p->time_to_resend < r->time_to_resend)) p = p->next_to_resend; /* there're at least two packets */ if (p == resend_start) { /* inserting at the beginning of structure */ r->next_to_resend = resend_start; r->last_to_resend = NULL; resend_start->last_to_resend = r; resend_start = r; } else if (p == NULL) { /* inserting at the end of structure */ r->next_to_resend = NULL; r->last_to_resend = resend_end; resend_end->next_to_resend = r; resend_end = r; } else { /* inserting somewhere in the middle */ r->next_to_resend = p; r->last_to_resend = p->last_to_resend; r->next_to_resend->last_to_resend = r; r->last_to_resend->next_to_resend = r; } /* unlock structure for retransmissions */ pthread_mutex_unlock(&resend_mutex); return 0; } int sbuffers_ack_event(int id_buffer, struct sockaddr *sender, SID_TYPE sid, SEQ_TYPE seq, int *size, double *time) { struct item *p; /* check whether given buffer identification number is correct */ if ((id_buffer < 0) || (id_buffer >= sbuffers_count) || (sbuffers[id_buffer] == NULL)) return E_INVALID_BUFFER_ID; /* lock the sent buffer for reading (searching) */ pthread_mutex_lock(&sbuffers[id_buffer]->sent_start_mutex); /* try to find requested packet */ p = sbuffers[id_buffer]->sent_start; if (id_buffer == 0) { while ((p->invalid != 1) && (p->seq != seq) && (((struct ext_item *) p)->sid != sid) && (rcvrcmp((struct sockaddr *) &(((struct ext_item *) p)->receiver), sender))) p = p->next; } else { while ((p->invalid != 1) && (p->seq != seq)) p = p->next; } if (p->invalid == 1) { /* requested packet wasn't found. Unlock the sent buffer for reading */ pthread_mutex_unlock(&sbuffers[id_buffer]->sent_start_mutex); return E_PACKET_NOT_FOUND; } /* packet was found. Remove it from the buffer */ if (p == sbuffers[id_buffer]->sent_start) { /* it's the leading packet */ sbuffers[id_buffer]->sent_start = p->next; if (sbuffers[id_buffer]->sent_start != NULL) sbuffers[id_buffer]->sent_start->last = NULL; } else { /* it's somewhere in the middle */ p->last->next = p->next; p->next->last = p->last; } /* unlock the sent buffer for reading */ pthread_mutex_unlock(&sbuffers[id_buffer]->sent_start_mutex); /* remove this packet from structure for retransmissions */ del_from_resend_buffer(p); /* return necessary information */ if (size != NULL) *size = p->length; if (time != NULL) { /* if packet wasn't resent any time, return it's sent time */ if (p->retr_counter == 0) *time = p->first_send_time; else *time = -1; } /* decrease buffer size */ pthread_mutex_lock(&sbuffers[id_buffer]->buffer_size_mutex); sbuffers[id_buffer]->buffer_size -= p->length; pthread_mutex_unlock(&sbuffers[id_buffer]->buffer_size_mutex); /* deallocate space for this packet */ free(p->packet); p->packet = NULL; free(p); p = NULL; return 0; } double sbuffers_get_top_rsnd_time(void) { double temp_time; /* lock structure for retransmissions */ pthread_mutex_lock(&resend_mutex); if (resend_start == NULL) { /* there's no packet. Unlock that structure. */ pthread_mutex_unlock(&resend_mutex); return E_EMPTY_BUFFER; } /* return necessary information */ temp_time = resend_start->time_to_resend; /* unlock that structure */ pthread_mutex_unlock(&resend_mutex); return temp_time; } int sbuffers_get_rsnd_packet(char **value, int *size, struct sockaddr **receiver, double *first_send_time) { struct sockaddr *returned_receiver; /* lock structure for retransmissions */ pthread_mutex_lock(&resend_mutex); if (resend_start == NULL) { /* there's no packet. Unlock that structure. */ pthread_mutex_unlock(&resend_mutex); return E_EMPTY_BUFFER; } /* return all necessary information */ *value = resend_start->packet; *size = resend_start->length; *first_send_time = resend_start->first_send_time; /* if resending packet which belongs to non-established connection, return * some others information */ if (resend_start->buffer_info == sbuffers[0]) { /* allocate new space for returned receiver */ returned_receiver = (struct sockaddr *) malloc(rcvrsz((struct sockaddr *) &(((struct ext_item *) resend_start)->receiver))); /* save it into the allocated space */ rcvrcpy(returned_receiver, (struct sockaddr *) &(((struct ext_item *) resend_start)->receiver)); } else { /* allocate new space for returned receiver */ returned_receiver = (struct sockaddr *) malloc(rcvrsz((struct sockaddr *) &(resend_start->buffer_info->receiver))); /* save it into the allocated space */ rcvrcpy(returned_receiver, (struct sockaddr *) &(resend_start->buffer_info->receiver)); } *receiver = returned_receiver; /* unlock that structure */ pthread_mutex_unlock(&resend_mutex); return 0; } int sbuffers_ignore_first_rsnd(char *value) { struct item *p; /* lock structure for retransmissions */ pthread_mutex_lock(&resend_mutex); if (resend_start == NULL) { /* there's no packet. Unlock that structure. */ pthread_mutex_unlock(&resend_mutex); return E_EMPTY_BUFFER; } /* check whether trying to skip the right packet (someone could delete the * right one before). */ if (resend_start->packet == value) { p = resend_start; resend_start = p->next_to_resend; p->next_to_resend = NULL; p->last_to_resend = NULL; if (resend_start != NULL) resend_start->last_to_resend = NULL; } /* unlock the structure for retransmissions */ pthread_mutex_unlock(&resend_mutex); return 0; } /* vim: set ts=4 : */